1
0
Fork 0
satnogs-db/db/base/utils.py

280 lines
9.1 KiB
Python
Raw Normal View History

"""Miscellaneous functions for SatNOGS DB"""
from __future__ import absolute_import, division, print_function, \
unicode_literals
import binascii
import logging
from datetime import datetime, timedelta
from django.conf import settings
from django.core.cache import cache
from django.db.models import Count, Max
from django.utils.timezone import make_aware
from influxdb import InfluxDBClient
from satnogsdecoders import __version__ as decoders_version
from satnogsdecoders import decoder
from db.base.models import DemodData, Mode, Satellite, Telemetry, Transmitter
2017-11-13 03:27:11 -07:00
LOGGER = logging.getLogger('db')
2017-11-13 03:27:11 -07:00
def calculate_statistics():
"""Calculates statistics about the data we have in DB
:returns: a dictionary of statistics
"""
2017-11-13 03:27:11 -07:00
satellites = Satellite.objects.all()
transmitters = Transmitter.objects.all()
modes = Mode.objects.all()
total_satellites = satellites.count()
total_transmitters = transmitters.count()
total_data = DemodData.objects.all().count()
alive_transmitters = transmitters.filter(status='active').count()
if alive_transmitters > 0 and total_transmitters > 0:
try:
alive_transmitters_percentage = '{0}%'.format(
round((float(alive_transmitters) / float(total_transmitters)) * 100, 2)
)
except ZeroDivisionError as error:
LOGGER.error(error, exc_info=True)
alive_transmitters_percentage = '0%'
else:
alive_transmitters_percentage = '0%'
2017-11-13 03:27:11 -07:00
mode_label = []
mode_data = []
for mode in modes:
filtered_transmitters = transmitters.filter(mode=mode).count()
2017-11-13 03:27:11 -07:00
mode_label.append(mode.name)
mode_data.append(filtered_transmitters)
2017-11-13 03:27:11 -07:00
# needed to pass testing in a fresh environment with no modes in db
if not mode_label:
mode_label = ['FM']
if not mode_data:
mode_data = ['FM']
2017-11-13 03:27:11 -07:00
band_label = []
band_data = []
bands = [
# <30.000.000 - HF
{
'lower_limit': 0,
'upper_limit': 30000000,
'label': 'HF'
},
# 30.000.000 ~ 300.000.000 - VHF
{
'lower_limit': 30000000,
'upper_limit': 300000000,
'label': 'VHF'
},
# 300.000.000 ~ 1.000.000.000 - UHF
{
'lower_limit': 300000000,
'upper_limit': 1000000000,
'label': 'UHF',
},
# 1G ~ 2G - L
{
'lower_limit': 1000000000,
'upper_limit': 2000000000,
'label': 'L',
},
# 2G ~ 4G - S
{
'lower_limit': 2000000000,
'upper_limit': 4000000000,
'label': 'S',
},
# 4G ~ 8G - C
{
'lower_limit': 4000000000,
'upper_limit': 8000000000,
'label': 'C',
},
# 8G ~ 12G - X
{
'lower_limit': 8000000000,
'upper_limit': 12000000000,
'label': 'X',
},
# 12G ~ 18G - Ku
{
'lower_limit': 12000000000,
'upper_limit': 18000000000,
'label': 'Ku',
},
# 18G ~ 27G - K
{
'lower_limit': 18000000000,
'upper_limit': 27000000000,
'label': 'K',
},
# 27G ~ 40G - Ka
{
'lower_limit': 27000000000,
'upper_limit': 40000000000,
'label': 'Ka',
},
]
for band in bands:
filtered = transmitters.filter(
downlink_low__gte=band['lower_limit'], downlink_low__lt=band['upper_limit']
).count()
band_label.append(band['label'])
band_data.append(filtered)
2017-11-13 03:27:11 -07:00
mode_data_sorted, mode_label_sorted = \
list(zip(*sorted(zip(mode_data, mode_label), reverse=True)))
band_data_sorted, band_label_sorted = \
list(zip(*sorted(zip(band_data, band_label), reverse=True)))
2017-11-13 03:27:11 -07:00
statistics = {
'total_satellites': total_satellites,
'total_data': total_data,
'transmitters': total_transmitters,
'transmitters_alive': alive_transmitters_percentage,
'mode_label': mode_label_sorted,
'mode_data': mode_data_sorted,
'band_label': band_label_sorted,
'band_data': band_data_sorted
}
return statistics
def create_point(fields, satellite, telemetry, demoddata, version):
"""Create a decoded data point in JSON format that is influxdb compatible
:returns: a JSON formatted time series data point
"""
point = [
{
'time': demoddata.timestamp.strftime('%Y-%m-%dT%H:%M:%SZ'),
'measurement': satellite.norad_cat_id,
'tags': {
'satellite': satellite.name,
'decoder': telemetry.decoder,
'station': demoddata.station,
'observer': demoddata.observer,
'source': demoddata.app_source,
'version': version
},
'fields': fields
}
]
return point
def write_influx(json_obj):
"""Take a json object and send to influxdb."""
client = InfluxDBClient(
settings.INFLUX_HOST,
settings.INFLUX_PORT,
settings.INFLUX_USER,
settings.INFLUX_PASS,
settings.INFLUX_DB,
ssl=settings.INFLUX_SSL
)
client.write_points(json_obj)
def decode_data(norad, period=None):
"""Decode data for a satellite, with an option to limit the scope.
:param norad: the NORAD ID of the satellite to decode data for
:param period: if period exists, only attempt to decode the last 4 hours,
otherwise attempt to decode everything
"""
sat = Satellite.objects.get(norad_cat_id=norad)
if not sat.has_telemetry_decoders:
return
now = datetime.utcnow()
if period:
time_period = now - timedelta(hours=4)
time_period = make_aware(time_period)
data = DemodData.objects.filter(satellite__norad_cat_id=norad, timestamp__gte=time_period)
else:
data = DemodData.objects.filter(satellite=sat)
telemetry_decoders = Telemetry.objects.filter(satellite=sat)
# iterate over DemodData objects
for obj in data:
# iterate over Telemetry decoders
for tlmdecoder in telemetry_decoders:
try:
decoder_class = getattr(decoder, tlmdecoder.decoder.capitalize())
except AttributeError:
continue
try:
with open(obj.payload_frame.path) as frame_file:
# we get data frames in hex but kaitai requires binary
hexdata = frame_file.read()
bindata = binascii.unhexlify(hexdata)
# if we are set to use InfluxDB, send the decoded data
# there, otherwise we store it in the local DB.
if settings.USE_INFLUX:
try:
frame = decoder_class.from_bytes(bindata)
json_obj = create_point(
decoder.get_fields(frame), sat, tlmdecoder, obj, decoders_version
)
write_influx(json_obj)
obj.payload_decoded = 'influxdb'
obj.is_decoded = True
obj.save()
break
except Exception: # pylint: disable=W0703
obj.is_decoded = False
obj.save()
continue
else: # store in the local db instead of influx
try:
frame = decoder_class.from_bytes(bindata)
except Exception: # pylint: disable=W0703
obj.payload_decoded = ''
obj.is_decoded = False
obj.save()
continue
else:
json_obj = create_point(
decoder.get_fields(frame), sat, tlmdecoder, obj, decoders_version
)
obj.payload_decoded = json_obj
obj.is_decoded = True
obj.save()
break
except IOError:
continue
# Caches stats about satellites and data
def cache_statistics():
"""Populate a django cache with statistics from data in DB
.. seealso:: calculate_statistics
"""
statistics = calculate_statistics()
cache.set('stats_transmitters', statistics, 60 * 60 * 2)
satellites = Satellite.objects \
.values('name', 'norad_cat_id') \
.annotate(count=Count('telemetry_data'),
latest_payload=Max('telemetry_data__timestamp')) \
.order_by('-count')
cache.set('stats_satellites', satellites, 60 * 60 * 2)
observers = DemodData.objects \
.values('observer') \
.annotate(count=Count('observer'),
latest_payload=Max('timestamp')) \
.order_by('-count')
cache.set('stats_observers', observers, 60 * 60 * 2)