1
0
Fork 0
satnogs-network/network/base/tasks.py

236 lines
8.0 KiB
Python
Raw Normal View History

from datetime import timedelta, datetime
2017-09-27 11:04:18 -06:00
import json
import os
from requests.exceptions import ReadTimeout, HTTPError
import urllib
2017-09-27 11:04:18 -06:00
import urllib2
from internetarchive import upload
2017-09-27 11:04:18 -06:00
from orbit import satellite
from django.conf import settings
from django.contrib.sites.models import Site
from django.core.cache import cache
from django.utils.timezone import now
2017-09-27 11:04:18 -06:00
from network.base.models import Satellite, Tle, Mode, Transmitter, Observation, Station, DemodData
2017-09-27 11:04:18 -06:00
from network.celery import app
@app.task(ignore_result=True)
2017-09-27 11:04:18 -06:00
def update_all_tle():
"""Task to update all satellite TLEs"""
satellites = Satellite.objects.exclude(manual_tle=True,
norad_follow_id__isnull=True)
2017-09-27 11:04:18 -06:00
for obj in satellites:
norad_id = obj.norad_cat_id
if (obj.manual_tle):
norad_id = obj.norad_follow_id
2017-09-27 11:04:18 -06:00
try:
sat = satellite(norad_id)
except IndexError:
2017-09-27 11:04:18 -06:00
continue
# Get latest satellite TLE and check if it changed
tle = sat.tle()
try:
latest_tle = obj.latest_tle.tle1
except AttributeError:
2017-09-27 11:04:18 -06:00
pass
if latest_tle == tle[1]:
continue
Tle.objects.create(tle0=tle[0], tle1=tle[1], tle2=tle[2], satellite=obj)
@app.task(ignore_result=True)
2017-09-27 11:04:18 -06:00
def fetch_data():
"""Task to fetch all data from DB"""
apiurl = settings.DB_API_ENDPOINT
modes_url = "{0}modes".format(apiurl)
satellites_url = "{0}satellites".format(apiurl)
transmitters_url = "{0}transmitters".format(apiurl)
try:
modes = urllib2.urlopen(modes_url).read()
satellites = urllib2.urlopen(satellites_url).read()
transmitters = urllib2.urlopen(transmitters_url).read()
except urllib2.URLError:
2017-09-27 11:04:18 -06:00
raise Exception('API is unreachable')
# Fetch Modes
for mode in json.loads(modes):
id = mode['id']
try:
existing_mode = Mode.objects.get(id=id)
existing_mode.__dict__.update(mode)
existing_mode.save()
except Mode.DoesNotExist:
Mode.objects.create(**mode)
# Fetch Satellites
for sat in json.loads(satellites):
norad_cat_id = sat['norad_cat_id']
sat.pop('decayed', None)
2017-09-27 11:04:18 -06:00
try:
existing_satellite = Satellite.objects.get(norad_cat_id=norad_cat_id)
existing_satellite.__dict__.update(sat)
existing_satellite.save()
except Satellite.DoesNotExist:
Satellite.objects.create(**sat)
# Fetch Transmitters
for transmitter in json.loads(transmitters):
norad_cat_id = transmitter['norad_cat_id']
uuid = transmitter['uuid']
mode_id = transmitter['mode_id']
try:
sat = Satellite.objects.get(norad_cat_id=norad_cat_id)
except Satellite.DoesNotExist:
continue
transmitter.pop('norad_cat_id')
try:
mode = Mode.objects.get(id=mode_id)
except Mode.DoesNotExist:
mode = None
try:
existing_transmitter = Transmitter.objects.get(uuid=uuid)
existing_transmitter.__dict__.update(transmitter)
existing_transmitter.satellite = sat
existing_transmitter.save()
except Transmitter.DoesNotExist:
new_transmitter = Transmitter.objects.create(**transmitter)
new_transmitter.satellite = sat
new_transmitter.mode = mode
new_transmitter.save()
@app.task(ignore_result=True)
def archive_audio(obs_id):
obs = Observation.objects.get(id=obs_id)
suffix = '-{0}'.format(settings.ENVIRONMENT)
if settings.ENVIRONMENT == 'production':
suffix = ''
identifier = 'satnogs{0}-observation-{1}'.format(suffix, obs.id)
ogg = obs.payload.path
filename = obs.payload.name.split('/')[-1]
site = Site.objects.get_current()
description = ('<p>Audio file from SatNOGS{0} <a href="{1}/observations/{2}">'
'Observation {3}</a>.</p>').format(suffix, site.domain,
obs.id, obs.id)
md = dict(collection=settings.ARCHIVE_COLLECTION,
title=identifier,
mediatype='audio',
licenseurl='http://creativecommons.org/licenses/by-sa/4.0/',
description=description)
try:
res = upload(identifier, files=[ogg], metadata=md,
access_key=settings.S3_ACCESS_KEY,
secret_key=settings.S3_SECRET_KEY)
except (ReadTimeout, HTTPError):
return
if res[0].status_code == 200:
obs.archived = True
obs.archive_url = '{0}{1}/{2}'.format(settings.ARCHIVE_URL, identifier, filename)
obs.archive_identifier = identifier
obs.save()
obs.payload.delete()
@app.task(ignore_result=True)
def clean_observations():
"""Task to clean up old observations that lack actual data."""
threshold = now() - timedelta(days=int(settings.OBSERVATION_OLD_RANGE))
observations = Observation.objects.filter(end__lt=threshold, archived=False) \
.exclude(payload='')
for obs in observations:
if settings.ENVIRONMENT == 'stage':
if not obs.is_good:
obs.delete()
return
if os.path.isfile(obs.payload.path):
archive_audio.delay(obs.id)
2018-01-09 09:15:36 -07:00
@app.task
def demod_to_db(frame_id):
"""Task to send a frame from SatNOGS network to SatNOGS db"""
frame = DemodData.objects.get(id=frame_id)
obs = frame.observation
sat = obs.satellite
ground_station = obs.ground_station
# need to abstract the timestamp from the filename. hacky..
file_datetime = frame.payload_demod.name.split('/')[2].split('_')[2]
frame_datetime = datetime.strptime(file_datetime, '%Y-%m-%dT%H-%M-%S')
submit_datetime = datetime.strftime(frame_datetime,
'%Y-%m-%dT%H:%M:%S.000Z')
# SiDS parameters
params = {}
params['noradID'] = sat.norad_cat_id
params['source'] = ground_station.name
params['timestamp'] = submit_datetime
params['locator'] = 'longLat'
params['longitude'] = ground_station.lng
params['latitude'] = ground_station.lat
params['frame'] = frame.display_payload().replace(' ', '')
params['satnogs_network'] = 'True' # NOT a part of SiDS
apiurl = settings.DB_API_ENDPOINT
telemetry_url = "{0}telemetry/".format(apiurl)
try:
res = urllib2.urlopen(telemetry_url, urllib.urlencode(params))
code = str(res.getcode())
if code.startswith('2'):
frame.copied_to_db = True
frame.save()
except (ReadTimeout, HTTPError):
return
@app.task
def sync_to_db():
"""Task to send demod data to db / SiDS"""
q = now() - timedelta(days=1)
frames = DemodData.objects.filter(observation__end__gte=q,
copied_to_db=False,
observation__transmitter__sync_to_db=True)
for frame in frames:
try:
if not frame.is_image():
if os.path.isfile(frame.payload_demod.path):
demod_to_db.delay(frame.id)
except Exception:
continue
@app.task(ignore_result=True)
2018-01-09 09:15:36 -07:00
def station_status_update():
"""Task to update Station status."""
for station in Station.objects.all():
if station.is_offline:
station.status = 0
elif station.testing:
station.status = 1
else:
station.status = 2
station.save()
@app.task(ignore_result=True)
def stations_cache_rates():
stations = Station.objects.all()
for station in stations:
observations = station.observations.exclude(testing=True).exclude(vetted_status="unknown")
success = observations.filter(id__in=(o.id for o in observations
if o.is_good or o.is_bad)).count()
if observations:
rate = int(100 * (float(success) / float(observations.count())))
cache.set('station-{0}-rate'.format(station.id), rate, 60 * 60 * 2)