1
0
Fork 0

Celery task and model extension to sync demodulated frames to SatNOGS DB

environments/stage/deployments/196
Corey Shields 2018-09-02 16:05:15 -04:00
parent 4510f6e6f5
commit d999961a92
4 changed files with 90 additions and 3 deletions

View File

@ -0,0 +1,25 @@
# -*- coding: utf-8 -*-
# Generated by Django 1.11.11 on 2018-09-02 22:17
from __future__ import unicode_literals
from django.db import migrations, models
class Migration(migrations.Migration):
dependencies = [
('base', '0047_auto_20180827_1318'),
]
operations = [
migrations.AddField(
model_name='demoddata',
name='copied_to_db',
field=models.BooleanField(default=False),
),
migrations.AddField(
model_name='transmitter',
name='sync_to_db',
field=models.BooleanField(default=False),
),
]

View File

@ -374,6 +374,7 @@ class Transmitter(models.Model):
baud = models.FloatField(validators=[MinValueValidator(0)], null=True, blank=True)
satellite = models.ForeignKey(Satellite, related_name='transmitters',
on_delete=models.CASCADE, null=True, blank=True)
sync_to_db = models.BooleanField(default=False)
@property
def data_count(self):
@ -570,6 +571,7 @@ class DemodData(models.Model):
observation = models.ForeignKey(Observation, related_name='demoddata',
on_delete=models.CASCADE, blank=True, null=True)
payload_demod = models.FileField(upload_to=_name_obs_demoddata, blank=True, null=True)
copied_to_db = models.BooleanField(default=False)
def is_image(self):
with open(self.payload_demod.path) as fp:

View File

@ -1,7 +1,8 @@
from datetime import timedelta
from datetime import timedelta, datetime
import json
import os
from requests.exceptions import ReadTimeout, HTTPError
import urllib
import urllib2
from internetarchive import upload
@ -12,7 +13,7 @@ from django.contrib.sites.models import Site
from django.core.cache import cache
from django.utils.timezone import now
from network.base.models import Satellite, Tle, Mode, Transmitter, Observation, Station
from network.base.models import Satellite, Tle, Mode, Transmitter, Observation, Station, DemodData
from network.celery import app
@ -150,6 +151,60 @@ def clean_observations():
archive_audio.delay(obs.id)
@app.task
def demod_to_db(frame_id):
"""Task to send a frame from SatNOGS network to SatNOGS db"""
frame = DemodData.objects.get(id=frame_id)
obs = frame.observation
sat = obs.satellite
ground_station = obs.ground_station
# need to abstract the timestamp from the filename. hacky..
file_datetime = frame.payload_demod.name.split('/')[2].split('_')[2]
frame_datetime = datetime.strptime(file_datetime, '%Y-%m-%dT%H-%M-%S')
submit_datetime = datetime.strftime(frame_datetime,
'%Y-%m-%dT%H:%M:%S.000Z')
# SiDS parameters
params = {}
params['noradID'] = sat.norad_cat_id
params['source'] = ground_station.name
params['timestamp'] = submit_datetime
params['locator'] = ground_station.qthlocator
params['longitude'] = ground_station.lng
params['latitude'] = ground_station.lat
params['frame'] = frame.display_payload().replace(' ', '')
params['satnogs_network'] = 'True' # NOT a part of SiDS
apiurl = settings.DB_API_ENDPOINT
telemetry_url = "{0}telemetry/".format(apiurl)
try:
res = urllib2.urlopen(telemetry_url, urllib.urlencode(params))
code = str(res.getcode())
if code.startswith('2'):
frame.copied_to_db = True
frame.save()
except (ReadTimeout, HTTPError):
return
@app.task
def sync_to_db():
"""Task to send demod data to db / SiDS"""
q = now() - timedelta(days=1)
frames = DemodData.objects.filter(observation__end__gte=q,
copied_to_db=False,
observation__transmitter__sync_to_db=True)
for frame in frames:
try:
if not frame.is_image():
if os.path.isfile(frame.payload_demod.path):
demod_to_db.delay(frame.id)
except Exception:
continue
@app.task(ignore_result=True)
def station_status_update():
"""Task to update Station status."""

View File

@ -12,6 +12,7 @@ from django.conf import settings # noqa
RUN_DAILY = 60 * 60 * 24
RUN_EVERY_TWO_HOURS = 2 * 60 * 60
RUN_HOURLY = 60 * 60
RUN_EVERY_MINUTE = 60
app = Celery('network')
@ -22,7 +23,8 @@ app.autodiscover_tasks(lambda: settings.INSTALLED_APPS)
@app.on_after_finalize.connect
def setup_periodic_tasks(sender, **kwargs):
from network.base.tasks import (update_all_tle, fetch_data, clean_observations,
station_status_update, stations_cache_rates)
station_status_update, stations_cache_rates,
sync_to_db)
sender.add_periodic_task(RUN_EVERY_TWO_HOURS, update_all_tle.s(),
name='update-all-tle')
@ -38,3 +40,6 @@ def setup_periodic_tasks(sender, **kwargs):
sender.add_periodic_task(RUN_HOURLY, stations_cache_rates.s(),
name='stations-cache-rates')
sender.add_periodic_task(RUN_EVERY_MINUTE, sync_to_db.s(),
name='sync-to-db')