1
0
Fork 0

kaitai decoding and influxdb support

Introduces the following:

- decoding of DemodData through decoders that are compiled from kaitai structs and stored in base/decoders/*.py

- storage of the decoded data will either be in the payload_decoded field (default) or into influxdb if USE_INFLUX=True and the proper settings are met

- A scheduled task is set */15 to look for the last 4 hours of submitted frames for anything not decoded, and then decodes it. (we will want to tweak these numbers as we get a good feel for any race conditions that may occur)

- 2 new commands in the admin console, one that will reset all decoded data for a satellite, and another that will trigger decoding of all frames in the db for a satellite. These could be useful when making fixes or improvements to a decoder, or when a new decoder is initially added to the db.
environments/stage/deployments/27
Corey Shields 2018-08-18 20:43:02 -04:00
parent 63b69d7746
commit 8527564089
12 changed files with 489 additions and 4 deletions

View File

@ -8,9 +8,10 @@ from django.template.loader import render_to_string
from django.conf import settings
from django.contrib.sites.shortcuts import get_current_site
from django.http import HttpResponseRedirect
from django.shortcuts import redirect
from db.base.models import Mode, Satellite, Transmitter, Suggestion, DemodData, Telemetry
from db.base.tasks import check_celery
from db.base.tasks import check_celery, reset_decoded_data, decode_all_data
logger = logging.getLogger('db')
@ -29,6 +30,10 @@ class SatelliteAdmin(admin.ModelAdmin):
urls = super(SatelliteAdmin, self).get_urls()
my_urls = [
url(r'^check_celery/$', self.check_celery, name='check_celery'),
url(r'^reset_data/(?P<norad>[0-9]+)/$', self.reset_data,
name='reset_data'),
url(r'^decode_all_data/(?P<norad>[0-9]+)/$', self.decode_all_data,
name='decode_all_data'),
]
return my_urls + urls
@ -48,6 +53,21 @@ class SatelliteAdmin(admin.ModelAdmin):
finally:
return HttpResponseRedirect(reverse('admin:index'))
# resets all decoded data and changes the is_decoded flag back to False
# THIS IS VERY DISTRUCTIVE, but the expectation is that a decode_all_data
# would follow.
def reset_data(self, request, norad):
reset_decoded_data.delay(norad)
messages.success(request, 'Data reset task was triggered successfully!')
return redirect(reverse('admin:index'))
# force a decode of all data for a norad ID. This could be very resource
# intensive but necessary when catching a satellite up with a new decoder
def decode_all_data(self, request, norad):
decode_all_data.delay(norad)
messages.success(request, 'Decode task was triggered successfully!')
return redirect(reverse('admin:index'))
@admin.register(Transmitter)
class TransmitterAdmin(admin.ModelAdmin):

View File

@ -0,0 +1,118 @@
# This is a generated file! Please edit source .ksy file and use kaitai-struct-compiler to rebuild
from pkg_resources import parse_version
from kaitaistruct import __version__ as ks_version, KaitaiStruct, KaitaiStream, BytesIO
if parse_version(ks_version) < parse_version('0.7'):
raise Exception("Incompatible Kaitai Struct Python API: 0.7 or later is required, but you have %s" % (ks_version))
class Cas4(KaitaiStruct):
def __init__(self, _io, _parent=None, _root=None):
self._io = _io
self._parent = _parent
self._root = _root if _root else self
self._read()
def _read(self):
self.ax25header = self._io.read_bytes(16)
self.syncpacket = self._io.ensure_fixed_contents(b"\xEB\x90")
_on = (self.framecounter % 4)
if _on == 0:
self._raw_telemetry = self._io.read_bytes(4)
io = KaitaiStream(BytesIO(self._raw_telemetry))
self.telemetry = self._root.Frame1(io, self, self._root)
elif _on == 1:
self._raw_telemetry = self._io.read_bytes(4)
io = KaitaiStream(BytesIO(self._raw_telemetry))
self.telemetry = self._root.Frame2(io, self, self._root)
elif _on == 3:
self._raw_telemetry = self._io.read_bytes(4)
io = KaitaiStream(BytesIO(self._raw_telemetry))
self.telemetry = self._root.Frame4(io, self, self._root)
elif _on == 2:
self._raw_telemetry = self._io.read_bytes(4)
io = KaitaiStream(BytesIO(self._raw_telemetry))
self.telemetry = self._root.Frame3(io, self, self._root)
else:
self.telemetry = self._io.read_bytes(4)
self.reserved1 = self._io.read_bytes(9)
self.framecounterplaceholder = self._io.read_u1()
self.reserved2 = self._io.read_bytes(112)
class Frame1(KaitaiStruct):
def __init__(self, _io, _parent=None, _root=None):
self._io = _io
self._parent = _parent
self._root = _root if _root else self
self._read()
def _read(self):
self.u1_pwr_volt = self._io.read_u1()
self.pwr_cur = self._io.read_u1()
self.convert_volt = self._io.read_u1()
self.convert_cur = self._io.read_u1()
class Frame2(KaitaiStruct):
def __init__(self, _io, _parent=None, _root=None):
self._io = _io
self._parent = _parent
self._root = _root if _root else self
self._read()
def _read(self):
self.obc_temp = self._io.read_u1()
self.rf_amp_temp = self._io.read_u1()
self.rcv_agc_volt = self._io.read_u1()
self.rf_fwd = self._io.read_u1()
class Frame3(KaitaiStruct):
def __init__(self, _io, _parent=None, _root=None):
self._io = _io
self._parent = _parent
self._root = _root if _root else self
self._read()
def _read(self):
self.rf_ref = self._io.read_u1()
self.obc_volt = self._io.read_u1()
self.obc_reset = self._io.read_u1()
self.pkt_cnt = self._io.read_bits_int(4)
self.sat_num = self._io.read_bits_int(4)
class Frame4(KaitaiStruct):
def __init__(self, _io, _parent=None, _root=None):
self._io = _io
self._parent = _parent
self._root = _root if _root else self
self._read()
def _read(self):
self.op_mode = self._io.read_bits_int(4)
self.pwr_on_op_mode = self._io.read_bits_int(4)
self.i2c_watchdog = self._io.read_bits_int(1) != 0
self.i2c_recon_cnt = self._io.read_bits_int(3)
self.tc_watchdog = self._io.read_bits_int(1) != 0
self.tc_reset_cnt = self._io.read_bits_int(3)
self.adc_watchdog = self._io.read_bits_int(1) != 0
self.adc_reset_cnt = self._io.read_bits_int(3)
self.spi_watchdog = self._io.read_bits_int(1) != 0
self.spi_init_cnt = self._io.read_bits_int(3)
self.cpu_watchdog = self._io.read_bits_int(1) != 0
self.cpu_reset_cnt = self._io.read_bits_int(3)
@property
def framecounter(self):
if hasattr(self, '_m_framecounter'):
return self._m_framecounter if hasattr(self, '_m_framecounter') else None
_pos = self._io.pos()
self._io.seek(31)
self._m_framecounter = self._io.read_u1()
self._io.seek(_pos)
return self._m_framecounter if hasattr(self, '_m_framecounter') else None

View File

@ -0,0 +1,73 @@
# This is a generated file! Please edit source .ksy file and use kaitai-struct-compiler to rebuild
from pkg_resources import parse_version
from kaitaistruct import __version__ as ks_version, KaitaiStruct, KaitaiStream, BytesIO
if parse_version(ks_version) < parse_version('0.7'):
raise Exception("Incompatible Kaitai Struct Python API: 0.7 or later is required, but you have %s" % (ks_version))
class Siriussat(KaitaiStruct):
def __init__(self, _io, _parent=None, _root=None):
self._io = _io
self._parent = _parent
self._root = _root if _root else self
self._read()
def _read(self):
self.ax25header = self._io.read_bytes(16)
self.u_panel1 = self._io.read_u2le()
self.u_panel2 = self._io.read_u2le()
self.u_panel3 = self._io.read_u2le()
self.i_panel1 = self._io.read_u2le()
self.i_panel2 = self._io.read_u2le()
self.i_panel3 = self._io.read_u2le()
self.i_bat = self._io.read_s2le()
self.i_ch1 = self._io.read_u2le()
self.i_ch2 = self._io.read_u2le()
self.i_ch3 = self._io.read_u2le()
self.i_ch4 = self._io.read_u2le()
self.t1_pw = self._io.read_s2le()
self.t2_pw = self._io.read_s2le()
self.t3_pw = self._io.read_s2le()
self.t4_pw = self._io.read_s2le()
self.u_bat_crit = self._io.read_bits_int(1) != 0
self.u_bat_min = self._io.read_bits_int(1) != 0
self.heater2_manual = self._io.read_bits_int(1) != 0
self.heater1_manual = self._io.read_bits_int(1) != 0
self.heater2_on = self._io.read_bits_int(1) != 0
self.heater1_on = self._io.read_bits_int(1) != 0
self.t_bat_max = self._io.read_bits_int(1) != 0
self.t_bat_min = self._io.read_bits_int(1) != 0
self.channel_on4 = self._io.read_bits_int(1) != 0
self.channel_on3 = self._io.read_bits_int(1) != 0
self.channel_on2 = self._io.read_bits_int(1) != 0
self.channel_on1 = self._io.read_bits_int(1) != 0
self.i_ch_limit4 = self._io.read_bits_int(1) != 0
self.i_ch_limit3 = self._io.read_bits_int(1) != 0
self.i_ch_limit2 = self._io.read_bits_int(1) != 0
self.i_ch_limit1 = self._io.read_bits_int(1) != 0
self.reserved0 = self._io.read_bits_int(7)
self.charger = self._io.read_bits_int(1) != 0
self._io.align_to_byte()
self.reserved1 = self._io.read_u1()
self.u_bat = self._io.read_s2le()
self.reg_tel_id = self._io.read_u4le()
self.pss_time = self._io.read_s4le()
self.pss_n_reset = self._io.read_u1()
self.pss_flags = self._io.read_u1()
self.t_amp = self._io.read_s1()
self.t_uhf = self._io.read_s1()
self.rssi_rx = self._io.read_s1()
self.rssi_idle = self._io.read_s1()
self.power_forward = self._io.read_s1()
self.power_reflected = self._io.read_s1()
self.uhf_n_reset = self._io.read_u1()
self.uhf_flags = self._io.read_u1()
self.uhf_time = self._io.read_s4le()
self.uptime = self._io.read_u4le()
self.uhf_current = self._io.read_u2le()
self.uhf_voltage = self._io.read_s2le()

View File

@ -0,0 +1,59 @@
# This is a generated file! Please edit source .ksy file and use kaitai-struct-compiler to rebuild
from pkg_resources import parse_version
from kaitaistruct import __version__ as ks_version, KaitaiStruct, KaitaiStream, BytesIO
if parse_version(ks_version) < parse_version('0.7'):
raise Exception("Incompatible Kaitai Struct Python API: 0.7 or later is required, but you have %s" % (ks_version))
class Us6(KaitaiStruct):
"""
.. seealso::
Source - https://www.gaussteam.com/radio-amateur-information-for-unisat-6/
"""
def __init__(self, _io, _parent=None, _root=None):
self._io = _io
self._parent = _parent
self._root = _root if _root else self
self._read()
def _read(self):
self.header = self._io.read_bytes(16)
self.syncpacket = self._io.ensure_fixed_contents(b"\x55\x53\x36")
self.packetindex = self._io.read_u2le()
self.groundindexack = self._io.read_u2le()
self.packettype = self._io.read_u1()
self.payloadsize = self._io.read_u1()
self.rebootcounter = self._io.read_u2le()
self.uptime = self._io.read_u4le()
self.unixtime = self._io.read_u4le()
self.tempmcu = self._io.read_s1()
self.tempfpga = self._io.read_s1()
self.magnetometerx = self._io.read_s2le()
self.magnetometery = self._io.read_s2le()
self.magnetometerz = self._io.read_s2le()
self.gyroscopex = self._io.read_s2le()
self.gyroscopey = self._io.read_s2le()
self.gyroscopez = self._io.read_s2le()
self.cpucurrent = self._io.read_u2le()
self.tempradio = self._io.read_s1()
self.payloadreserved1 = self._io.read_u1()
self.payloadreserved2 = self._io.read_u1()
self.tempbottom = self._io.read_u1()
self.tempupper = self._io.read_u1()
self.payloadreserved3 = self._io.read_u1()
self.epsvbat = self._io.read_u2le()
self.epscurrent_sun = self._io.read_u2le()
self.epscurrent_out = self._io.read_u2le()
self.epsvpanel01 = self._io.read_u2le()
self.epsvpanel02 = self._io.read_u2le()
self.epsvpanel03 = self._io.read_u2le()
self.epscurrent01 = self._io.read_u2le()
self.epscurrent02 = self._io.read_u2le()
self.epscurrent03 = self._io.read_u2le()
self.epsbatttemp = self._io.read_u2le()
self.payloadreserved4 = self._io.read_u1()
self.saterrorflags = self._io.read_u2le()
self.satoperationstatus = self._io.read_u1()
self.crc = self._io.read_u1()

View File

@ -10,10 +10,12 @@ from django.core.mail import send_mail
from django.contrib.sites.models import Site
from django.template.loader import render_to_string
from django.utils.timezone import make_aware
from influxdb import InfluxDBClient
from db.base.models import Satellite, DemodData
from db.base.utils import calculate_statistics
from db.celery import app
from db.base.utils import decode_data
@app.task(task_ignore_result=False)
@ -95,3 +97,42 @@ def cache_statistics():
latest_payload=Max('timestamp')) \
.order_by('-count')
cache.set('stats_observers', observers, 60 * 60 * 2)
# resets all decoded data and changes the is_decoded flag back to False
# THIS IS VERY DISTRUCTIVE, but the expectation is that a decode_all_data would
# follow.
@app.task
def reset_decoded_data(norad):
"""DESTRUCTIVE: deletes decoded data from db and/or influxdb"""
frames = DemodData.objects.filter(satellite__norad_cat_id=norad)
for frame in frames:
frame.payload_decoded = ''
frame.is_decoded = False
frame.save()
if settings.USE_INFLUX:
client = InfluxDBClient(settings.INFLUX_HOST, settings.INFLUX_PORT,
settings.INFLUX_USER, settings.INFLUX_PASS,
settings.INFLUX_DB)
client.query('DROP SERIES FROM /.*/ WHERE \"norad\" = \'{0}\''
.format(norad))
# decode data for a satellite, and a given time frame (if provided). If not
# provided it is expected that we want to try decoding all frames in the db.
@app.task
def decode_all_data(norad):
"""Task to trigger a full decode of data for a satellite."""
decode_data(norad)
@app.task
def decode_recent_data():
"""Task to trigger a partial/recent decode of data for all satellites."""
satellites = Satellite.objects.all()
for obj in satellites:
try:
decode_data(obj.norad_cat_id, period=1)
except Exception:
continue

View File

@ -1,4 +1,10 @@
from db.base.models import Satellite, Transmitter, Mode, DemodData
import json
import binascii
from datetime import datetime, timedelta
from db.base.models import Satellite, Transmitter, Mode, DemodData, Telemetry
from django.conf import settings
from django.utils.timezone import make_aware
from influxdb import InfluxDBClient
def calculate_statistics():
@ -97,3 +103,128 @@ def calculate_statistics():
'band_data': band_data_sorted
}
return statistics
# kaitai does not give us a good way to export attributes when we don't know
# what those attributes are, and here we are dealing with a lot of various
# decoders with different attributes. This is a hacky way of getting them
# to json. We also need to sanitize this for any binary data left over as
# it won't export to json.
def kaitai_to_json(struct):
"""Take a kaitai decode object and send to db as a json object"""
structdict = struct.__dict__
tojson = {}
for key, value in structdict.iteritems():
if key != '_root' and \
key != '_parent' and \
key != '_io': # kaitai objects we want to skip
if isinstance(value, basestring): # skip binary values
try:
value.decode('utf-8')
tojson[key] = value
except UnicodeError:
continue
else:
tojson[key] = value
return json.dumps(tojson)
# send the data from kaitai to influxdb, expects a kaitai object, the satellite
# telemetry and demoddata for that observation to tag metadata.
def kaitai_to_influx(struct, satellite, telemetry, demoddata):
"""Take a kaitai decode object and send to influxdb."""
client = InfluxDBClient(settings.INFLUX_HOST, settings.INFLUX_PORT,
settings.INFLUX_USER, settings.INFLUX_PASS,
settings.INFLUX_DB)
structdict = struct.__dict__
for key, value in structdict.iteritems():
if key != '_root' and \
key != '_parent' and \
key != '_io': # kaitai objects we want to skip
influx_tlm = []
data = {
'time': demoddata.timestamp.strftime('%Y-%m-%dT%H:%M:%SZ'),
'measurement': key,
'tags': {
'satellite': satellite.name,
'norad': satellite.norad_cat_id,
'decoder': telemetry.decoder,
'station': demoddata.station,
'observer': demoddata.observer,
'source': demoddata.source
}
}
if isinstance(value, basestring): # skip binary values
try:
value.decode('utf-8')
data.update({'fields': {'value': value}})
except UnicodeError:
continue
else:
data.update({'fields': {'value': value}})
if 'value' in data['fields']:
influx_tlm.append(data)
client.write_points(influx_tlm)
def decode_data(norad, period=None):
"""Decode data for a satellite, with an option to limit the scope."""
sat = Satellite.objects.get(norad_cat_id=norad)
if sat.has_telemetry_decoders:
now = datetime.utcnow()
if period:
q = now - timedelta(hours=4)
q = make_aware(q)
data = DemodData.objects.filter(satellite__norad_cat_id=norad,
timestamp__gte=q) \
.filter(is_decoded=False)
else:
data = DemodData.objects.filter(satellite=sat) \
.filter(is_decoded=False)
telemetry_decoders = Telemetry.objects.filter(satellite=sat)
# iterate over DemodData objects
for obj in data:
# iterate over Telemetry decoders
for tlmdecoder in telemetry_decoders:
decoder_module = 'db.base.decoders.{0}' \
.format(tlmdecoder.decoder)
decoder = __import__(decoder_module, fromlist='.')
decoder_class = getattr(decoder,
tlmdecoder.decoder.capitalize())
with open(obj.payload_frame.path) as fp:
# we get data frames in hex but kaitai requires binary
hexdata = fp.read()
bindata = binascii.unhexlify(hexdata)
# if we are set to use InfluxDB, send the decoded data there,
# otherwise we store it in the local DB.
if settings.USE_INFLUX:
try:
frame = decoder_class.from_bytes(bindata)
# find kaitai_to_influx in utils.py
kaitai_to_influx(frame, sat, tlmdecoder, obj)
obj.payload_decoded = 'influxdb'
obj.is_decoded = True
obj.save()
break
except Exception:
obj.is_decoded = False
obj.save()
continue
else: # store in the local db instead of influx
try:
frame = decoder_class.from_bytes(bindata)
except Exception:
obj.payload_decoded = ''
obj.is_decoded = False
obj.save()
continue
else:
# find kaitai_to_json in utils.py
obj.payload_decoded = kaitai_to_json(frame)
obj.is_decoded = True
obj.save()
break

View File

@ -8,6 +8,7 @@ os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'db.settings')
from django.conf import settings # noqa
RUN_EVERY_15 = 60 * 15
RUN_HOURLY = 60 * 60
RUN_DAILY = 60 * 60 * 24
@ -19,10 +20,13 @@ app.autodiscover_tasks(lambda: settings.INSTALLED_APPS)
@app.on_after_finalize.connect
def setup_periodic_tasks(sender, **kwargs):
from db.base.tasks import update_all_tle, cache_statistics
from db.base.tasks import update_all_tle, cache_statistics, decode_recent_data
sender.add_periodic_task(RUN_DAILY, update_all_tle.s(),
name='update-all-tle')
sender.add_periodic_task(RUN_HOURLY, cache_statistics.s(),
name='cache-statistics')
sender.add_periodic_task(RUN_EVERY_15, decode_recent_data.s(),
name='decode-recent-data')

View File

@ -261,6 +261,14 @@ DATA_FETCH_DAYS = config('DATA_FETCH_DAYS', default=10, cast=int)
MAPBOX_GEOCODE_URL = 'https://api.tiles.mapbox.com/v4/geocode/mapbox.places/'
MAPBOX_TOKEN = config('MAPBOX_TOKEN', default='')
# Influx DB for decoded data_id
USE_INFLUX = config('USE_INFLUX', default=False, cast=bool)
INFLUX_HOST = config('INFLUX_HOST', default='localhost')
INFLUX_PORT = config('INFLUX_PORT', default='8086')
INFLUX_USER = config('INFLUX_USER', default='db')
INFLUX_PASS = config('INFLUX_PASS', default='db')
INFLUX_DB = config('INFLUX_DB', default='db')
if ENVIRONMENT == 'dev':
# Disable template caching
for backend in TEMPLATES:

View File

@ -0,0 +1,15 @@
{% extends "admin/change_form.html" %}
{% block object-tools-items %}
<li>
<a href="{% url 'admin:reset_data' norad=original.norad_cat_id %}" class="resetdatalink">
RESET Demod Data
</a>
</li>
<li>
<a href="{% url 'admin:decode_all_data' norad=original.norad_cat_id %}" class="decodedatalink">
Decode All Data
</a>
</li>
{{ block.super }}
{% endblock %}

View File

@ -8,3 +8,11 @@ DATABASE_URL='sqlite:///db.sqlite3'
CACHE_BACKEND='django.core.cache.backends.locmem.LocMemCache'
CACHE_LOCATION='unique-snowflake'
CACHE_CLIENT_CLASS=None
# InfluxDB
USE_INFLUX = False
INFLUX_HOST = 'localhost'
INFLUX_PORT = '8086'
INFLUX_USER = 'db'
INFLUX_PASS = 'db'
INFLUX_DB = 'db'

View File

@ -210,3 +210,11 @@ chardet==3.0.4 \
idna==2.6 \
--hash=sha256:8c7309c718f94b3a625cb648ace320157ad16ff131ae0af362c9f21b80ef6ec4 \
--hash=sha256:2c6a5de3089009e3da7c5dde64a141dbc8551d5b7f6cf4ed7c2568d0cc520a8f
influxdb==5.2.0 \
--hash=sha256:9ae603939b6c3bd0eeb72fdd1d4fbc0a9c7c66a91260486f0bb0effa7d197727
pytool==3.10.0 \
--hash=sha256:b2eba54725b8c8339bee99889fdedb087362748d9e8815c9b71bce561c86e42a
simplejson==3.16.0 \
--hash=sha256:b1f329139ba647a9548aa05fb95d046b4a677643070dc2afc05fa2e975d09ca5
kaitaistruct==0.8 \
--hash=sha256:d1d17c7f6839b3d28fc22b21295f787974786c2201e8788975e72e2a1d109ff5

View File

@ -2,4 +2,4 @@
max-complexity = 23
max-line-length = 99
ignore = F403
exclude = *migrations/*,docs
exclude = *migrations/*,docs,*decoders/*