Add celery support
parent
aab31e0abe
commit
a39792c2ef
|
@ -24,3 +24,6 @@ media
|
|||
/staticfiles/*
|
||||
node_modules
|
||||
yarn-error.log
|
||||
|
||||
# Celery
|
||||
celerybeat-schedule
|
||||
|
|
|
@ -0,0 +1,3 @@
|
|||
#!/bin/bash
|
||||
|
||||
celery -A network worker -B -l INFO
|
|
@ -9,6 +9,19 @@ services:
|
|||
- MYSQL_ROOT_PASSWORD=toor
|
||||
redis:
|
||||
image: redis:3.2.8
|
||||
celery:
|
||||
build:
|
||||
context: .
|
||||
dockerfile: docker/dev
|
||||
volumes:
|
||||
- .:/app
|
||||
links:
|
||||
- db
|
||||
- redis
|
||||
environment:
|
||||
- DATABASE_URL=mysql://network:network@db/network
|
||||
command:
|
||||
./bin/run-celery.sh
|
||||
web:
|
||||
build:
|
||||
context: .
|
||||
|
@ -20,6 +33,7 @@ services:
|
|||
links:
|
||||
- db
|
||||
- redis
|
||||
- celery
|
||||
environment:
|
||||
- DATABASE_URL=mysql://network:network@db/network
|
||||
command:
|
||||
|
|
39
env-dist
39
env-dist
|
@ -1,27 +1,28 @@
|
|||
ENVIRONMENT=dev
|
||||
DEBUG=True
|
||||
ENVIRONMENT = 'dev'
|
||||
DEBUG = True
|
||||
|
||||
# Email
|
||||
DEFAULT_FROM_EMAIL=noreply@example.com
|
||||
ADMINS_FROM_NAME=Admins
|
||||
ADMINS_FROM_EMAIL=noreply@example.com
|
||||
DEFAULT_FROM_EMAIL = 'noreply@example.com'
|
||||
ADMINS_FROM_NAME = 'Admins'
|
||||
ADMINS_FROM_EMAIL = 'noreply@example.com'
|
||||
|
||||
# Security
|
||||
SECRET_KEY=changeme
|
||||
ALLOWED_HOSTS=*
|
||||
SITE_URL=http://localhost:8000
|
||||
SECRET_KEY = 'changeme'
|
||||
ALLOWED_HOSTS = 'localhost'
|
||||
SITE_URL = 'http://localhost:8000'
|
||||
|
||||
# Database
|
||||
DATABASE_URL=sqlite:///db.sqlite3
|
||||
|
||||
# Services
|
||||
MAPBOX_TOKEN=
|
||||
MAPBOX_MAP_ID=
|
||||
GOOGLE_ANALYTICS_KEY=
|
||||
OPBEAT_ORGID=
|
||||
OPBEAT_APPID=
|
||||
OPBEAT_SECRET=
|
||||
DATABASE_URL = 'sqlite:///db.sqlite3'
|
||||
|
||||
# Application
|
||||
DB_API_ENDPOINT=https://db.satnogs.org/api/
|
||||
STATION_HEARTBEAT_TIME=60
|
||||
DB_API_ENDPOINT = 'https://db.satnogs.org/api/'
|
||||
STATION_HEARTBEAT_TIME = 60
|
||||
|
||||
# Cache
|
||||
CACHE_BACKEND = 'django.core.cache.backends.locmem.LocMemCache'
|
||||
CACHE_LOCATION = 'unique-snowflake'
|
||||
CACHE_CLIENT_CLASS = None
|
||||
|
||||
# Celery
|
||||
CELERY_BROKER_URL = 'redis://redis:6379/0'
|
||||
CELERY_RESULT_BACKEND = 'redis://redis:6379/0'
|
||||
|
|
|
@ -0,0 +1,6 @@
|
|||
from __future__ import absolute_import
|
||||
|
||||
from .celery import app as celery_app # noqa
|
||||
|
||||
|
||||
__all__ = ['celery_app']
|
|
@ -29,14 +29,13 @@ class Command(BaseCommand):
|
|||
tle = sat.tle()
|
||||
try:
|
||||
latest_tle = obj.latest_tle.tle1
|
||||
if latest_tle == tle[1]:
|
||||
self.stdout.write(('{0} - {1}: TLE already exists [defer]')
|
||||
.format(obj.name, obj.norad_cat_id))
|
||||
continue
|
||||
except:
|
||||
pass
|
||||
if latest_tle == tle[1]:
|
||||
self.stdout.write(('{0} - {1}: TLE already exists [defer]')
|
||||
.format(obj.name, obj.norad_cat_id))
|
||||
continue
|
||||
|
||||
Tle.objects.create(tle0=tle[0], tle1=tle[1], tle2=tle[2], satellite=obj)
|
||||
|
||||
self.stdout.write(('{0} - {1}: new TLE found [updated]')
|
||||
.format(obj.name, obj.norad_cat_id))
|
||||
|
|
|
@ -319,7 +319,7 @@ class Observation(models.Model):
|
|||
|
||||
@property
|
||||
def payload_exists(self):
|
||||
""" Run some checks on the payload for existence of data """
|
||||
"""Run some checks on the payload for existence of data."""
|
||||
if self.payload is None:
|
||||
return False
|
||||
if not os.path.isfile(os.path.join(settings.MEDIA_ROOT, self.payload.name)):
|
||||
|
|
|
@ -0,0 +1,95 @@
|
|||
import json
|
||||
import urllib2
|
||||
|
||||
from orbit import satellite
|
||||
|
||||
from django.conf import settings
|
||||
|
||||
from network.base.models import Satellite, Tle, Mode, Transmitter
|
||||
from network.celery import app
|
||||
|
||||
|
||||
@app.task
|
||||
def update_all_tle():
|
||||
"""Task to update all satellite TLEs"""
|
||||
satellites = Satellite.objects.exclude(manual_tle=True)
|
||||
|
||||
for obj in satellites:
|
||||
try:
|
||||
sat = satellite(obj.norad_cat_id)
|
||||
except:
|
||||
continue
|
||||
|
||||
# Get latest satellite TLE and check if it changed
|
||||
tle = sat.tle()
|
||||
try:
|
||||
latest_tle = obj.latest_tle.tle1
|
||||
except:
|
||||
pass
|
||||
if latest_tle == tle[1]:
|
||||
continue
|
||||
|
||||
Tle.objects.create(tle0=tle[0], tle1=tle[1], tle2=tle[2], satellite=obj)
|
||||
|
||||
|
||||
@app.task
|
||||
def fetch_data():
|
||||
"""Task to fetch all data from DB"""
|
||||
apiurl = settings.DB_API_ENDPOINT
|
||||
modes_url = "{0}modes".format(apiurl)
|
||||
satellites_url = "{0}satellites".format(apiurl)
|
||||
transmitters_url = "{0}transmitters".format(apiurl)
|
||||
|
||||
try:
|
||||
modes = urllib2.urlopen(modes_url).read()
|
||||
satellites = urllib2.urlopen(satellites_url).read()
|
||||
transmitters = urllib2.urlopen(transmitters_url).read()
|
||||
except:
|
||||
raise Exception('API is unreachable')
|
||||
|
||||
# Fetch Modes
|
||||
for mode in json.loads(modes):
|
||||
id = mode['id']
|
||||
try:
|
||||
existing_mode = Mode.objects.get(id=id)
|
||||
existing_mode.__dict__.update(mode)
|
||||
existing_mode.save()
|
||||
except Mode.DoesNotExist:
|
||||
Mode.objects.create(**mode)
|
||||
|
||||
# Fetch Satellites
|
||||
for sat in json.loads(satellites):
|
||||
norad_cat_id = sat['norad_cat_id']
|
||||
try:
|
||||
existing_satellite = Satellite.objects.get(norad_cat_id=norad_cat_id)
|
||||
existing_satellite.__dict__.update(sat)
|
||||
existing_satellite.save()
|
||||
except Satellite.DoesNotExist:
|
||||
Satellite.objects.create(**sat)
|
||||
|
||||
# Fetch Transmitters
|
||||
for transmitter in json.loads(transmitters):
|
||||
norad_cat_id = transmitter['norad_cat_id']
|
||||
uuid = transmitter['uuid']
|
||||
mode_id = transmitter['mode_id']
|
||||
|
||||
try:
|
||||
sat = Satellite.objects.get(norad_cat_id=norad_cat_id)
|
||||
except Satellite.DoesNotExist:
|
||||
continue
|
||||
transmitter.pop('norad_cat_id')
|
||||
|
||||
try:
|
||||
mode = Mode.objects.get(id=mode_id)
|
||||
except Mode.DoesNotExist:
|
||||
mode = None
|
||||
try:
|
||||
existing_transmitter = Transmitter.objects.get(uuid=uuid)
|
||||
existing_transmitter.__dict__.update(transmitter)
|
||||
existing_transmitter.satellite = sat
|
||||
existing_transmitter.save()
|
||||
except Transmitter.DoesNotExist:
|
||||
new_transmitter = Transmitter.objects.create(**transmitter)
|
||||
new_transmitter.satellite = sat
|
||||
new_transmitter.mode = mode
|
||||
new_transmitter.save()
|
|
@ -0,0 +1,44 @@
|
|||
from __future__ import absolute_import
|
||||
|
||||
import os
|
||||
|
||||
from celery import Celery
|
||||
|
||||
import dotenv
|
||||
|
||||
dotenv.read_dotenv('.env')
|
||||
|
||||
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'network.settings')
|
||||
|
||||
from django.conf import settings # noqa
|
||||
|
||||
RUN_DAILY = 60 * 60 * 24
|
||||
RUN_HOURLY = 60 * 60
|
||||
|
||||
app = Celery('network')
|
||||
|
||||
app.config_from_object('django.conf:settings', namespace='CELERY')
|
||||
app.autodiscover_tasks(lambda: settings.INSTALLED_APPS)
|
||||
|
||||
|
||||
@app.on_after_finalize.connect
|
||||
def setup_periodic_tasks(sender, **kwargs):
|
||||
from network.base.tasks import update_all_tle, fetch_data
|
||||
|
||||
sender.add_periodic_task(RUN_HOURLY, update_all_tle.s(),
|
||||
name='update-all-tle')
|
||||
|
||||
sender.add_periodic_task(RUN_HOURLY, fetch_data.s(),
|
||||
name='fetch-data')
|
||||
|
||||
|
||||
if settings.ENVIRONMENT == 'production':
|
||||
from opbeat.contrib.django.models import client, logger, register_handlers
|
||||
from opbeat.contrib.celery import register_signal
|
||||
|
||||
try:
|
||||
register_signal(client)
|
||||
except Exception as e:
|
||||
logger.exception('Failed installing celery hook: {0}'.format(e))
|
||||
|
||||
register_handlers()
|
|
@ -78,10 +78,10 @@ SERVER_EMAIL = DEFAULT_FROM_EMAIL
|
|||
# Cache
|
||||
CACHES = {
|
||||
'default': {
|
||||
'BACKEND': getenv('CACHE_BACKEND', 'django.core.cache.backends.locmem.LocMemCache'),
|
||||
'LOCATION': getenv('CACHE_LOCATION', 'unique-snowflake'),
|
||||
'BACKEND': getenv('CACHE_BACKEND', 'redis_cache.RedisCache'),
|
||||
'LOCATION': getenv('CACHE_LOCATION', 'unix://var/run/redis/redis.sock'),
|
||||
'OPTIONS': {
|
||||
'CLIENT_CLASS': getenv('CACHE_CLIENT_CLASS', None),
|
||||
'CLIENT_CLASS': getenv('CACHE_CLIENT_CLASS', 'django_redis.client.DefaultClient'),
|
||||
},
|
||||
'KEY_PREFIX': 'network-{0}'.format(ENVIRONMENT),
|
||||
}
|
||||
|
@ -215,6 +215,24 @@ LOGGING = {
|
|||
}
|
||||
}
|
||||
|
||||
# Celery
|
||||
CELERY_ENABLE_UTC = USE_TZ
|
||||
CELERY_TIMEZONE = TIME_ZONE
|
||||
CELERY_TASK_RESULTS_EXPIRES = 3600
|
||||
CELERY_ACCEPT_CONTENT = ['application/json']
|
||||
CELERY_TASK_SERIALIZER = 'json'
|
||||
CELERY_RESULT_SERIALIZER = 'json'
|
||||
CELERY_SEND_TASK_ERROR_EMAILS = True
|
||||
CELERY_TASK_ALWAYS_EAGER = False
|
||||
CELERY_DEFAULT_QUEUE = 'network-{0}-queue'.format(ENVIRONMENT)
|
||||
CELERY_BROKER_URL = getenv('CELERY_BROKER_URL', 'redis://redis:6379/0')
|
||||
CELERY_RESULT_BACKEND = getenv('CELERY_RESULT_BACKEND', 'redis://redis:6379/0')
|
||||
REDIS_CONNECT_RETRY = True
|
||||
CELERY_BROKER_TRANSPORT_OPTIONS = {
|
||||
'visibility_timeout': getenv('REDIS_VISIBILITY_TIMEOUT', default=604800),
|
||||
'fanout_prefix': True
|
||||
}
|
||||
|
||||
# API
|
||||
REST_FRAMEWORK = {
|
||||
'DEFAULT_PERMISSION_CLASSES': (
|
||||
|
|
|
@ -4,6 +4,9 @@ Django==1.11.5 \
|
|||
--hash=sha256:1836878162dfdf865492bacfdff0321e4ee8f1e7d51d93192546000b54982b29
|
||||
django-shortuuidfield==0.1.3 \
|
||||
--hash=sha256:a292c0fe5538abe947b131e2b914edd9ac44afcc6a40eaec71448e6231a3ef00
|
||||
celery==4.1.0 \
|
||||
--hash=sha256:81a67f0d53a688ec2bc8557bd5d6d7218f925a6f2e6df80e01560de9e28997ec \
|
||||
--hash=sha256:77ff3730198d6a17b3c1f05579ebe570b579efb35f6d7e13dba3b1368d068b35
|
||||
|
||||
# Configuration
|
||||
unicode-slugify==0.1.3 \
|
||||
|
@ -209,3 +212,15 @@ idna==2.6 \
|
|||
Unipath==1.1 \
|
||||
--hash=sha256:e6257e508d8abbfb6ddd8ec357e33589f1f48b1599127f23b017124d90b0fff7 \
|
||||
--hash=sha256:09839adcc72e8a24d4f76d63656f30b5a1f721fc40c9bcd79d8c67bdd8b47dae
|
||||
amqp==2.2.2 \
|
||||
--hash=sha256:4e28d3ea61a64ae61830000c909662cb053642efddbe96503db0e7783a6ee85b \
|
||||
--hash=sha256:cba1ace9d4ff6049b190d8b7991f9c1006b443a5238021aca96dd6ad2ac9da22
|
||||
billiard==3.5.0.3 \
|
||||
--hash=sha256:abd9ce008c9a71ccde2c816f8daa36246e92a21e6a799831b887d88277187ecd \
|
||||
--hash=sha256:1d7b22bdc47aa52841120fcd22a74ae4fc8c13e9d3935643098184f5788c3ce6
|
||||
kombu==4.1.0 \
|
||||
--hash=sha256:01f0da9fe222a2183345004243d1518c0fbe5875955f1b24842f2d9c65709ade \
|
||||
--hash=sha256:4249d9dd9dbf1fcec471d1c2def20653c9310dd1a217272d77e4844f9d5273cb
|
||||
vine==1.1.4 \
|
||||
--hash=sha256:6849544be74ec3638e84d90bc1cf2e1e9224cc10d96cd4383ec3f69e9bce077b \
|
||||
--hash=sha256:52116d59bc45392af9fdd3b75ed98ae48a93e822cee21e5fda249105c59a7a72
|
||||
|
|
Loading…
Reference in New Issue