1
0
Fork 0

Publish new SiDS frames by using a ZeroMQ socket

Signed-off-by: Alfredos-Panagiotis Damkalis <fredy@fredy.gr>
spacecruft
Alfredos-Panagiotis Damkalis 2020-12-31 08:27:25 +02:00
parent 1b366d1722
commit 4b8239739a
4 changed files with 46 additions and 3 deletions

View File

@ -7,3 +7,8 @@ ignored-argument-names=args|kwargs
disable=
C0412,
R0801, # needs to remain disabled see https://github.com/PyCQA/pylint/issues/214
[TYPECHECK]
# zmq.{EAGAIN,RCVTIMEO,XPUB} is dynamically generated and so pylint
# doesn't see it, causing false positives.
generated-members=EAGAIN,RCVTIMEO,XPUB

View File

@ -17,7 +17,7 @@ from db.api.renderers import BrowserableJSONLDRenderer, JSONLDRenderer
from db.base.helpers import gridsquare
from db.base.models import SATELLITE_STATUS, SERVICE_TYPE, TRANSMITTER_STATUS, TRANSMITTER_TYPE, \
Artifact, DemodData, LatestTleSet, Mode, Satellite, Transmitter
from db.base.tasks import update_satellite
from db.base.tasks import publish_current_frame, update_satellite
ISS_EXAMPLE = OpenApiExample('25544 (ISS)', value=25544)
@ -271,8 +271,11 @@ class TelemetryViewSet( # pylint: disable=R0901
serializer = serializers.SidsSerializer(data=data)
serializer.is_valid(raise_exception=True)
self.perform_create(serializer)
headers = self.get_success_headers(serializer.data)
return Response(status=status.HTTP_201_CREATED, headers=headers)
# Run task to publish the current frame via ZeroMQ
publish_current_frame.delay(norad_cat_id, timestamp, request.data.get('frame'))
return Response(status=status.HTTP_201_CREATED)
@extend_schema_view(

View File

@ -5,6 +5,7 @@ import tempfile
from datetime import datetime, timedelta
from smtplib import SMTPException
import zmq
from celery import shared_task
from django.conf import settings
from django.contrib.auth import get_user_model
@ -25,6 +26,9 @@ from db.base.utils import cache_statistics, decode_data, get_tle_sources, update
LOGGER = logging.getLogger('db')
# Initialize shared ZeroMQ context
CONTEXT = zmq.Context()
@shared_task
def check_celery():
@ -258,3 +262,28 @@ def decode_recent_data():
def decode_current_frame(norad_id, demoddata_id):
"""Task to trigger a decode of a current frame for a satellite."""
decode_data(norad_id=norad_id, demoddata_id=demoddata_id)
@shared_task
def publish_current_frame(norad_id, timestamp, frame):
"""Task to publish a current frame for a satellite."""
# Initialize ZeroMQ socket
publisher = CONTEXT.socket(zmq.XPUB)
publisher.setsockopt(zmq.RCVTIMEO, settings.ZEROMQ_SOCKET_RCVTIMEO)
publisher.connect(settings.ZEROMQ_SOCKET_URI)
try:
publisher.recv()
except zmq.ZMQError as error:
if error.errno == zmq.EAGAIN:
print('EAGAIN error - No subscription was received')
else:
raise
else:
publisher.send_multipart(
[bytes(str(norad_id), 'utf-8'),
bytes(frame, 'utf-8'),
bytes(timestamp, 'utf-8')]
)
finally:
publisher.close()

View File

@ -441,6 +441,12 @@ DATA_FETCH_DAYS = config('DATA_FETCH_DAYS', default=10, cast=int)
MAPBOX_GEOCODE_URL = 'https://api.tiles.mapbox.com/v4/geocode/mapbox.places/'
MAPBOX_TOKEN = config('MAPBOX_TOKEN', default='')
# ZEROMQ
ZEROMQ_SOCKET_URI = config('ZEROMQ_SOCKET_URI', default='tcp://0.0.0.0:5555')
ZEROMQ_SOCKET_RCVTIMEO = config(
'ZEROMQ_SOCKET_RCVTIMEO', default='100', cast=int
) # Time to wait for subscriber message in ms
# TLE Sources
TLE_SOURCES_REDISTRIBUTABLE = config('TLE_SOURCES_REDISTRIBUTABLE', default='manual', cast=Csv())
TLE_SOURCES_JSON = config('TLE_SOURCES_JSON', default='')