1
0
Fork 0

pylint C0103

Linting for Pylint C0103

Signed-off-by: Corey Shields <cshields@gmail.com>
merge-requests/393/head
Corey Shields 2019-07-11 19:58:30 -04:00 committed by Vasilis Tsiligiannis
parent 853381a5dc
commit b7ec119853
13 changed files with 63 additions and 62 deletions

View File

@ -4,7 +4,6 @@ load-plugins=pylint_django
[MESSAGES CONTROL]
disable=
C0103,
C0111,
C0122,
C0411,

View File

@ -1,7 +1,7 @@
from __future__ import absolute_import
from ._version import get_versions
from .celery import app as celery_app # noqa
from .celery import APP as celery_app # noqa
__all__ = ['celery_app']

View File

@ -5,11 +5,11 @@ from rest_framework import routers
from db.api import views
router = routers.DefaultRouter()
ROUTER = routers.DefaultRouter()
router.register(r'modes', views.ModeView)
router.register(r'satellites', views.SatelliteView)
router.register(r'transmitters', views.TransmitterView)
router.register(r'telemetry', views.TelemetryView)
ROUTER.register(r'modes', views.ModeView)
ROUTER.register(r'satellites', views.SatelliteView)
ROUTER.register(r'transmitters', views.TransmitterView)
ROUTER.register(r'telemetry', views.TelemetryView)
api_urlpatterns = router.urls
API_URLPATTERNS = ROUTER.urls

View File

@ -1,7 +1,6 @@
from __future__ import absolute_import, division, print_function, \
unicode_literals
import logging
from datetime import datetime
from socket import error as socket_error
@ -15,8 +14,6 @@ from db.base.models import DemodData, Mode, Satellite, Telemetry, \
Transmitter, TransmitterEntry, TransmitterSuggestion
from db.base.tasks import check_celery, decode_all_data
logger = logging.getLogger('db')
@admin.register(Mode)
class ModeAdmin(admin.ModelAdmin):
@ -44,14 +41,14 @@ class SatelliteAdmin(admin.ModelAdmin):
def check_celery(self, request):
try:
investigator = check_celery.delay()
except socket_error as e:
messages.error(request, 'Cannot connect to broker: %s' % e)
except socket_error as error:
messages.error(request, 'Cannot connect to broker: %s' % error)
return HttpResponseRedirect(reverse('admin:index'))
try:
investigator.get(timeout=5)
except investigator.TimeoutError as e:
messages.error(request, 'Worker timeout: %s' % e)
except investigator.TimeoutError as error:
messages.error(request, 'Worker timeout: %s' % error)
else:
messages.success(request, 'Celery is OK')
finally:

View File

@ -58,12 +58,12 @@ def cache_get_key(*args, **kwargs):
def cache_for(time):
def decorator(fn):
def decorator(func):
def wrapper(*args, **kwargs):
key = cache_get_key(fn.__name__, *args, **kwargs)
key = cache_get_key(func.__name__, *args, **kwargs)
result = cache.get(key)
if not result:
result = fn(*args, **kwargs)
result = func(*args, **kwargs)
cache.set(key, result, time)
return result

View File

@ -26,8 +26,8 @@ class Command(BaseCommand):
decoder_module = 'db.base.decoders.{0}'.format(option.decoder)
decoder = __import__(decoder_module, fromlist='.')
with open(obj.payload_frame.path) as fp:
frame = fp.read()
with open(obj.payload_frame.path) as data_file:
frame = data_file.read()
try:
payload_decoded = decoder.decode_payload(

View File

@ -19,7 +19,7 @@ from shortuuidfield import ShortUUIDField
from db.base.helpers import gridsquare
logger = logging.getLogger('db')
LOGGER = logging.getLogger('db')
DATA_SOURCES = ['manual', 'network', 'sids']
SATELLITE_STATUS = ['alive', 'dead', 're-entered']
@ -159,7 +159,7 @@ class TransmitterEntry(models.Model):
return self.description
def save(self, *args, **kwargs):
self.id = None
self.id = None # pylint: disable=C0103
super(TransmitterEntry, self).save()
@ -252,10 +252,10 @@ class DemodData(models.Model):
def display_frame(self):
try:
with open(self.payload_frame.path) as (fp):
return fp.read()
with open(self.payload_frame.path) as frame_file:
return frame_file.read()
except IOError as err:
logger.error(
LOGGER.error(
err, exc_info=True, extra={
'payload frame path': self.payload_frame.path,
}

View File

@ -16,18 +16,18 @@ from sgp4.io import twoline2rv
from db.base.models import DemodData, Satellite
from db.base.utils import cache_statistics, decode_data
from db.celery import app
from db.celery import APP
logger = logging.getLogger('db')
LOGGER = logging.getLogger('db')
@app.task(task_ignore_result=False)
@APP.task(task_ignore_result=False)
def check_celery():
"""Dummy celery task to check that everything runs smoothly."""
pass
@app.task
@APP.task
def update_satellite(norad_id, update_name=True, update_tle=True):
"""Task to update the name and/or the tle of a satellite, or create a
new satellite in the db if no satellite with given norad_id can be found"""
@ -57,7 +57,7 @@ def update_satellite(norad_id, update_name=True, update_tle=True):
print('Updated satellite {}: {}'.format(satellite.norad_cat_id, satellite.name))
@app.task
@APP.task
def update_all_tle():
"""Task to update all satellite TLEs"""
@ -89,7 +89,7 @@ def update_all_tle():
# Epoch of new TLE is larger then the TLE already in the db
continue
except ValueError:
logger.error('ERROR: TLE malformed for ' + norad_id)
LOGGER.error('ERROR: TLE malformed for ' + norad_id)
continue
satellite.tle_source = source
@ -108,26 +108,28 @@ def update_all_tle():
print('Ignored {} with temporary NORAD ID {}'.format(satellite.name, norad_id))
@app.task
@APP.task
def export_frames(norad, email, uid, period=None):
"""Task to export satellite frames in csv."""
now = datetime.utcnow()
if period:
if period == '1':
q = now - timedelta(days=7)
time_period = now - timedelta(days=7)
suffix = 'week'
else:
q = now - timedelta(days=30)
time_period = now - timedelta(days=30)
suffix = 'month'
q = make_aware(q)
frames = DemodData.objects.filter(satellite__norad_cat_id=norad, timestamp__gte=q)
time_period = make_aware(time_period)
frames = DemodData.objects.filter(
satellite__norad_cat_id=norad, timestamp__gte=time_period
)
else:
frames = DemodData.objects.filter(satellite__norad_cat_id=norad)
suffix = 'all'
filename = '{0}-{1}-{2}-{3}.csv'.format(norad, uid, now.strftime('%Y%m%dT%H%M%SZ'), suffix)
filepath = '{0}/download/{1}'.format(settings.MEDIA_ROOT, filename)
with open(filepath, 'w') as f:
writer = csv.writer(f, delimiter='|')
with open(filepath, 'w') as output_file:
writer = csv.writer(output_file, delimiter='|')
for obj in frames:
frame = obj.display_frame()
if frame is not None:
@ -145,7 +147,7 @@ def export_frames(norad, email, uid, period=None):
send_mail(subject, message, settings.DEFAULT_FROM_EMAIL, [email], False)
@app.task
@APP.task
def background_cache_statistics():
"""Task to periodically cache statistics"""
cache_statistics()
@ -153,13 +155,13 @@ def background_cache_statistics():
# decode data for a satellite, and a given time frame (if provided). If not
# provided it is expected that we want to try decoding all frames in the db.
@app.task
@APP.task
def decode_all_data(norad):
"""Task to trigger a full decode of data for a satellite."""
decode_data(norad)
@app.task
@APP.task
def decode_recent_data():
"""Task to trigger a partial/recent decode of data for all satellites."""
satellites = Satellite.objects.all()

View File

@ -5,7 +5,7 @@ from django.conf.urls import url
from db.base import views
base_urlpatterns = (
BASE_URLPATTERNS = (
[
url(r'^$', views.home, name='home'),
url(r'^about/$', views.about, name='about'),

View File

@ -15,7 +15,7 @@ from satnogsdecoders import decoder
from db.base.models import DemodData, Mode, Satellite, Telemetry, Transmitter
logger = logging.getLogger('db')
LOGGER = logging.getLogger('db')
def calculate_statistics():
@ -34,7 +34,7 @@ def calculate_statistics():
round((float(alive_transmitters) / float(total_transmitters)) * 100, 2)
)
except ZeroDivisionError as error:
logger.error(error, exc_info=True)
LOGGER.error(error, exc_info=True)
alive_transmitters_percentage = '0%'
else:
alive_transmitters_percentage = '0%'
@ -42,9 +42,9 @@ def calculate_statistics():
mode_label = []
mode_data = []
for mode in modes:
tr = transmitters.filter(mode=mode).count()
filtered_transmitters = transmitters.filter(mode=mode).count()
mode_label.append(mode.name)
mode_data.append(tr)
mode_data.append(filtered_transmitters)
# needed to pass testing in a fresh environment with no modes in db
if not mode_label:
@ -179,9 +179,11 @@ def decode_data(norad, period=None):
if sat.has_telemetry_decoders:
now = datetime.utcnow()
if period:
q = now - timedelta(hours=4)
q = make_aware(q)
data = DemodData.objects.filter(satellite__norad_cat_id=norad, timestamp__gte=q)
time_period = now - timedelta(hours=4)
time_period = make_aware(time_period)
data = DemodData.objects.filter(
satellite__norad_cat_id=norad, timestamp__gte=time_period
)
else:
data = DemodData.objects.filter(satellite=sat)
telemetry_decoders = Telemetry.objects.filter(satellite=sat)
@ -195,9 +197,9 @@ def decode_data(norad, period=None):
except AttributeError:
continue
try:
with open(obj.payload_frame.path) as fp:
with open(obj.payload_frame.path) as frame_file:
# we get data frames in hex but kaitai requires binary
hexdata = fp.read()
hexdata = frame_file.read()
bindata = binascii.unhexlify(hexdata)
# if we are set to use InfluxDB, send the decoded data

View File

@ -25,7 +25,7 @@ from db.base.models import SERVICE_TYPE, TRANSMITTER_STATUS, \
from db.base.tasks import export_frames
from db.base.utils import cache_statistics
logger = logging.getLogger('db')
LOGGER = logging.getLogger('db')
def home(request):
@ -154,7 +154,7 @@ def transmitter_suggestion(request):
try:
user.email_user(subject, message, from_email=settings.DEFAULT_FROM_EMAIL)
except Exception:
logger.error('Could not send email to user', exc_info=True)
LOGGER.error('Could not send email to user', exc_info=True)
messages.success(
request,
@ -163,7 +163,7 @@ def transmitter_suggestion(request):
)
return redirect(reverse('satellite', kwargs={'norad': transmitter.satellite.norad_cat_id}))
else:
logger.error(
LOGGER.error(
'Suggestion form was not valid {0}'.format(transmitter_form.errors),
exc_info=True,
extra={

View File

@ -12,13 +12,13 @@ RUN_EVERY_15 = 60 * 15
RUN_HOURLY = 60 * 60
RUN_DAILY = 60 * 60 * 24
app = Celery('db')
APP = Celery('db')
app.config_from_object('django.conf:settings', namespace='CELERY')
app.autodiscover_tasks(lambda: settings.INSTALLED_APPS)
APP.config_from_object('django.conf:settings', namespace='CELERY')
APP.autodiscover_tasks(lambda: settings.INSTALLED_APPS)
@app.on_after_finalize.connect
@APP.on_after_finalize.connect
def setup_periodic_tasks(sender, **kwargs):
from db.base.tasks import update_all_tle, background_cache_statistics, decode_recent_data

View File

@ -7,21 +7,22 @@ from django.conf.urls import include, url
from django.contrib import admin
from django.views.static import serve
from db.api.urls import api_urlpatterns
from db.base.urls import base_urlpatterns
from db.api.urls import API_URLPATTERNS
from db.base.urls import BASE_URLPATTERNS
# pylint: disable=C0103
handler404 = 'db.base.views.custom_404'
handler500 = 'db.base.views.custom_500'
urlpatterns = [
# Base
url(r'^', include(base_urlpatterns)),
url(r'^', include(BASE_URLPATTERNS)),
# Accounts
url(r'^accounts/', include(allauth_urls)),
# API
url(r'^api/', include(api_urlpatterns)),
url(r'^api/', include(API_URLPATTERNS)),
# Admin
url(r'^admin/', admin.site.urls),