1
0
Fork 0

Fix celery cyclical dependency

Fixes the cyclical dependency between celery.py and tasks.py when update_all_tle is called outside of celery by switching to autodiscovered tasks

Signed-off-by: Corey Shields <cshields@gmail.com>
[acinonyx@openwrt.gr: use @shared_task decorator and wrapper tasks to workaround celery bug]
Signed-off-by: Vasilis Tsiligiannis <acinonyx@openwrt.gr>
merge-requests/403/head
Corey Shields 2019-08-10 16:00:14 -04:00 committed by Vasilis Tsiligiannis
parent 6920bbab03
commit 61f9137aae
2 changed files with 32 additions and 14 deletions

View File

@ -6,7 +6,7 @@ import csv
import logging
from datetime import datetime, timedelta
from celery import Celery
from celery import shared_task
from django.conf import settings
from django.contrib.sites.models import Site
from django.core.mail import send_mail
@ -21,16 +21,14 @@ from db.base.utils import cache_statistics, decode_data
LOGGER = logging.getLogger('db')
APP = Celery('db')
@APP.task(task_ignore_result=False)
@shared_task
def check_celery():
"""Dummy celery task to check that everything runs smoothly."""
LOGGER.info('check_celery has been triggered')
@APP.task
@shared_task
def update_satellite(norad_id, update_name=True, update_tle=True):
"""Task to update the name and/or the tle of a satellite, or create a
new satellite in the db if no satellite with given norad_id can be found"""
@ -60,7 +58,7 @@ def update_satellite(norad_id, update_name=True, update_tle=True):
print('Updated satellite {}: {}'.format(satellite.norad_cat_id, satellite.name))
@APP.task
@shared_task
def update_all_tle():
"""Task to update all satellite TLEs"""
@ -111,7 +109,7 @@ def update_all_tle():
print('Ignored {} with temporary NORAD ID {}'.format(satellite.name, norad_id))
@APP.task
@shared_task
def export_frames(norad, email, uid, period=None):
"""Task to export satellite frames in csv."""
now = datetime.utcnow()
@ -158,7 +156,7 @@ def notify_user_export(filename, norad, email):
send_mail(subject, message, settings.DEFAULT_FROM_EMAIL, [email], False)
@APP.task
@shared_task
def background_cache_statistics():
"""Task to periodically cache statistics"""
cache_statistics()
@ -166,13 +164,13 @@ def background_cache_statistics():
# decode data for a satellite, and a given time frame (if provided). If not
# provided it is expected that we want to try decoding all frames in the db.
@APP.task
@shared_task
def decode_all_data(norad):
"""Task to trigger a full decode of data for a satellite."""
decode_data(norad)
@APP.task
@shared_task
def decode_recent_data():
"""Task to trigger a partial/recent decode of data for all satellites."""
satellites = Satellite.objects.all()

View File

@ -5,7 +5,6 @@ from __future__ import absolute_import, division, print_function, \
import os
from celery import Celery
from django.conf import settings # noqa
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'db.settings')
@ -16,15 +15,36 @@ RUN_DAILY = 60 * 60 * 24
APP = Celery('db')
APP.config_from_object('django.conf:settings', namespace='CELERY')
APP.autodiscover_tasks(lambda: settings.INSTALLED_APPS)
APP.autodiscover_tasks()
# Wrapper tasks as workaround for registering shared tasks to beat scheduler
# See https://github.com/celery/celery/issues/5059
@APP.task
def update_all_tle():
"""Wrapper task for 'update_all_tle' shared task"""
from db.base.tasks import update_all_tle as periodic_task
periodic_task()
@APP.task
def background_cache_statistics():
"""Wrapper task for 'background_cache_statistics' shared task"""
from db.base.tasks import background_cache_statistics as periodic_task
periodic_task()
@APP.task
def decode_recent_data():
"""Wrapper task for 'decocde_recent_data' shared task"""
from db.base.tasks import decode_recent_data as periodic_task
periodic_task()
# after python3, remove W0613 disable
@APP.on_after_finalize.connect
def setup_periodic_tasks(sender, **kwargs): # pylint: disable=W0613
"""Initializes celery tasks that need to run on a scheduled basis"""
from db.base.tasks import update_all_tle, background_cache_statistics, decode_recent_data
sender.add_periodic_task(RUN_DAILY, update_all_tle.s(), name='update-all-tle')
sender.add_periodic_task(