Restructure util file and add settings
parent
dde72b7f45
commit
e743364933
|
@ -0,0 +1,4 @@
|
|||
# Byte-compiled / optimized / DLL files
|
||||
__pycache__/
|
||||
*.py[cod]
|
||||
*$py.class
|
|
@ -1,25 +1,21 @@
|
|||
#!/usr/bin/env python
|
||||
from __future__ import division
|
||||
import json
|
||||
import requests
|
||||
import ephem
|
||||
import math
|
||||
import random
|
||||
from datetime import datetime, timedelta
|
||||
from satellite_tle import fetch_tles
|
||||
import os
|
||||
import glob
|
||||
import lxml.html
|
||||
import argparse
|
||||
import logging
|
||||
from utils import get_active_transmitter_info, \
|
||||
get_transmitter_stats, \
|
||||
DB_BASE_URL, \
|
||||
NETWORK_BASE_URL
|
||||
|
||||
from utils import get_active_transmitter_info, get_transmitter_stats, \
|
||||
get_groundstation_info, get_last_update, get_scheduled_passes_from_network, ordered_scheduler, \
|
||||
efficiency, find_passes, schedule_observation
|
||||
import settings
|
||||
|
||||
_LOG_LEVEL_STRINGS = ['CRITICAL', 'ERROR', 'WARNING', 'INFO', 'DEBUG']
|
||||
|
||||
|
||||
class twolineelement:
|
||||
"""TLE class"""
|
||||
|
||||
|
@ -56,248 +52,6 @@ class satellite:
|
|||
self.data_count = data_count
|
||||
|
||||
|
||||
def get_scheduled_passes_from_network(ground_station, tmin, tmax):
|
||||
# Get first page
|
||||
client = requests.session()
|
||||
|
||||
# Loop
|
||||
start = True
|
||||
scheduledpasses = []
|
||||
|
||||
logging.info("Requesting scheduled passes for ground station %d" % ground_station)
|
||||
while True:
|
||||
if start:
|
||||
r = client.get('{}/observations/?ground_station={:d}'.format(
|
||||
NETWORK_BASE_URL,
|
||||
ground_station))
|
||||
start = False
|
||||
else:
|
||||
nextpage = r.links.get("next")
|
||||
r = client.get(nextpage["url"])
|
||||
|
||||
# r.json() is a list of dicts
|
||||
for o in r.json():
|
||||
satpass = {
|
||||
"id": o['norad_cat_id'],
|
||||
"tr": datetime.strptime(
|
||||
o['start'].replace(
|
||||
"Z",
|
||||
""),
|
||||
"%Y-%m-%dT%H:%M:%S"),
|
||||
"ts": datetime.strptime(
|
||||
o['end'].replace(
|
||||
"Z",
|
||||
""),
|
||||
"%Y-%m-%dT%H:%M:%S"),
|
||||
"scheduled": True}
|
||||
|
||||
if satpass['ts'] > tmin and satpass['tr'] < tmax:
|
||||
scheduledpasses.append(satpass)
|
||||
if satpass['ts'] < tmin:
|
||||
break
|
||||
|
||||
logging.info("Scheduled passes for ground station %d retrieved!" % ground_station)
|
||||
return scheduledpasses
|
||||
|
||||
|
||||
def overlap(satpass, scheduledpasses):
|
||||
# No overlap
|
||||
overlap = False
|
||||
# Loop over scheduled passes
|
||||
for scheduledpass in scheduledpasses:
|
||||
# Test pass falls within scheduled pass
|
||||
if satpass['tr'] >= scheduledpass['tr'] and satpass['ts'] < scheduledpass['ts']:
|
||||
overlap = True
|
||||
# Scheduled pass falls within test pass
|
||||
elif scheduledpass['tr'] >= satpass['tr'] and scheduledpass['ts'] < satpass['ts']:
|
||||
overlap = True
|
||||
# Pass start falls within pass
|
||||
elif satpass['tr'] >= scheduledpass['tr'] and satpass['tr'] < scheduledpass['ts']:
|
||||
overlap = True
|
||||
# Pass end falls within end
|
||||
elif satpass['ts'] >= scheduledpass['tr'] and satpass['ts'] < scheduledpass['ts']:
|
||||
overlap = True
|
||||
if overlap:
|
||||
break
|
||||
|
||||
return overlap
|
||||
|
||||
|
||||
def ordered_scheduler(passes, scheduledpasses):
|
||||
"""Loop through a list of ordered passes and schedule each next one that fits"""
|
||||
# Loop over passes
|
||||
for satpass in passes:
|
||||
# Schedule if there is no overlap with already scheduled passes
|
||||
if not overlap(satpass, scheduledpasses):
|
||||
scheduledpasses.append(satpass)
|
||||
|
||||
return scheduledpasses
|
||||
|
||||
|
||||
def random_scheduler(passes, scheduledpasses):
|
||||
"""Schedule passes based on random ordering"""
|
||||
# Shuffle passes
|
||||
random.shuffle(passes)
|
||||
|
||||
return ordered_scheduler(passes, scheduledpasses)
|
||||
|
||||
|
||||
def efficiency(passes):
|
||||
|
||||
# Loop over passes
|
||||
start = False
|
||||
for satpass in passes:
|
||||
if not start:
|
||||
dt = satpass['ts'] - satpass['tr']
|
||||
tmin = satpass['tr']
|
||||
tmax = satpass['ts']
|
||||
start = True
|
||||
else:
|
||||
dt += satpass['ts'] - satpass['tr']
|
||||
if satpass['tr'] < tmin:
|
||||
tmin = satpass['tr']
|
||||
if satpass['ts'] > tmax:
|
||||
tmax = satpass['ts']
|
||||
# Total time covered
|
||||
dttot = tmax - tmin
|
||||
|
||||
return dt.total_seconds(), dttot.total_seconds(
|
||||
), dt.total_seconds() / dttot.total_seconds()
|
||||
|
||||
|
||||
def find_passes(satellites, observer, tmin, tmax, minimum_altitude):
|
||||
# Loop over satellites
|
||||
passes = []
|
||||
passid = 0
|
||||
for satellite in satellites:
|
||||
# Set start time
|
||||
observer.date = ephem.date(tmin)
|
||||
|
||||
# Load TLE
|
||||
try:
|
||||
sat_ephem = ephem.readtle(str(satellite.tle0),
|
||||
str(satellite.tle1),
|
||||
str(satellite.tle2))
|
||||
except (ValueError, AttributeError):
|
||||
continue
|
||||
|
||||
# Loop over passes
|
||||
keep_digging = True
|
||||
while keep_digging:
|
||||
try:
|
||||
tr, azr, tt, altt, ts, azs = observer.next_pass(sat_ephem)
|
||||
except ValueError:
|
||||
break # there will be sats in our list that fall below horizon, skip
|
||||
except TypeError:
|
||||
break # if there happens to be a non-EarthSatellite object in the list
|
||||
except Exception:
|
||||
break
|
||||
|
||||
if tr is None:
|
||||
break
|
||||
|
||||
# using the angles module convert the sexagesimal degree into
|
||||
# something more easily read by a human
|
||||
try:
|
||||
elevation = format(math.degrees(altt), '.0f')
|
||||
azimuth_r = format(math.degrees(azr), '.0f')
|
||||
azimuth_s = format(math.degrees(azs), '.0f')
|
||||
except TypeError:
|
||||
break
|
||||
passid += 1
|
||||
|
||||
# show only if >= configured horizon and in next 6 hours,
|
||||
# and not directly overhead (tr < ts see issue 199)
|
||||
if tr < ephem.date(tmax):
|
||||
if (float(elevation) >= minimum_altitude and tr < ts):
|
||||
valid = True
|
||||
if tr < ephem.Date(datetime.now() +
|
||||
timedelta(minutes=5)):
|
||||
valid = False
|
||||
satpass = {'passid': passid,
|
||||
'mytime': str(observer.date),
|
||||
'name': str(satellite.name),
|
||||
'id': str(satellite.id),
|
||||
'tle1': str(satellite.tle1),
|
||||
'tle2': str(satellite.tle2),
|
||||
'tr': tr.datetime(), # Rise time
|
||||
'azr': azimuth_r, # Rise Azimuth
|
||||
'tt': tt.datetime(), # Max altitude time
|
||||
'altt': elevation, # Max altitude
|
||||
'ts': ts.datetime(), # Set time
|
||||
'azs': azimuth_s, # Set azimuth
|
||||
'valid': valid,
|
||||
'uuid': satellite.transmitter,
|
||||
'success_rate': satellite.success_rate,
|
||||
'good_count': satellite.good_count,
|
||||
'data_count': satellite.data_count,
|
||||
'scheduled': False}
|
||||
passes.append(satpass)
|
||||
observer.date = ephem.Date(
|
||||
ts).datetime() + timedelta(minutes=1)
|
||||
else:
|
||||
keep_digging = False
|
||||
|
||||
return passes
|
||||
|
||||
|
||||
def get_groundstation_info(ground_station_id):
|
||||
|
||||
logging.info("Requesting information for ground station %d" % ground_station_id)
|
||||
client = requests.session()
|
||||
|
||||
# Loop
|
||||
found = False
|
||||
r = client.get("{}/stations/?id={:d}".format(
|
||||
NETWORK_BASE_URL,
|
||||
ground_station_id))
|
||||
for o in r.json():
|
||||
if o['id'] == ground_station_id:
|
||||
found = True
|
||||
break
|
||||
if found:
|
||||
logging.info('Ground station infromation retrieved!')
|
||||
return o
|
||||
else:
|
||||
logging.info('No ground station information found!')
|
||||
return {}
|
||||
|
||||
|
||||
def get_last_update(fname):
|
||||
try:
|
||||
fp = open(fname, "r")
|
||||
line = fp.readline()
|
||||
fp.close()
|
||||
return datetime.strptime(line.strip(), "%Y-%m-%dT%H:%M:%S")
|
||||
except IOError:
|
||||
return None
|
||||
|
||||
|
||||
def schedule_observation(
|
||||
session,
|
||||
norad_cat_id,
|
||||
uuid,
|
||||
ground_station_id,
|
||||
starttime,
|
||||
endtime):
|
||||
|
||||
obsURL = "https://network.satnogs.org/observations/new/" # Observation URL
|
||||
# Get the observation/new/ page to get the CSFR token
|
||||
obs = session.get(obsURL)
|
||||
obs_html = lxml.html.fromstring(obs.text)
|
||||
hidden_inputs = obs_html.xpath(r'//form//input[@type="hidden"]')
|
||||
form = {x.attrib["name"]: x.attrib["value"] for x in hidden_inputs}
|
||||
form["satellite"] = norad_cat_id
|
||||
form["transmitter"] = uuid
|
||||
form["start-time"] = starttime
|
||||
form["end-time"] = endtime
|
||||
form["0-starting_time"] = starttime
|
||||
form["0-ending_time"] = endtime
|
||||
form["0-station"] = ground_station_id
|
||||
form["total"] = str(1)
|
||||
session.post(obsURL, data=form, headers={'referer': obsURL})
|
||||
|
||||
|
||||
def _log_level_string_to_int(log_level_string):
|
||||
if log_level_string not in _LOG_LEVEL_STRINGS:
|
||||
message = 'invalid choice: {0} (choose from {1})'.format(log_level_string,
|
||||
|
@ -345,9 +99,9 @@ if __name__ == "__main__":
|
|||
# Setting logging level
|
||||
numeric_level = args.log_level
|
||||
if not isinstance(numeric_level, int):
|
||||
raise ValueError('Invalid log level: %s' % loglevel)
|
||||
raise ValueError("Invalid log level")
|
||||
logging.basicConfig(level=numeric_level,
|
||||
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')
|
||||
format="%(asctime)s - %(name)s - %(levelname)s - %(message)s")
|
||||
|
||||
# Settings
|
||||
ground_station_id = args.station
|
||||
|
@ -492,7 +246,7 @@ if __name__ == "__main__":
|
|||
scheduledpasses = get_scheduled_passes_from_network(
|
||||
ground_station_id, tmin, tmax)
|
||||
logging.info(
|
||||
"Found %d scheduled passes between %s and %s on ground station %d\n" %
|
||||
"Found %d scheduled passes between %s and %s on ground station %d" %
|
||||
(len(scheduledpasses), tmin, tmax, ground_station_id))
|
||||
|
||||
# Get passes of priority objects
|
||||
|
@ -545,7 +299,7 @@ if __name__ == "__main__":
|
|||
satpass['name'].rstrip()))
|
||||
|
||||
# Login
|
||||
loginUrl = "https://network.satnogs.org/accounts/login/" # login URL
|
||||
loginUrl = '{}/accounts/login/'.format(settings.NETWORK_BASE_URL) # login URL
|
||||
session = requests.session()
|
||||
login = session.get(loginUrl) # Get login page for CSFR token
|
||||
login_html = lxml.html.fromstring(login.text)
|
||||
|
|
|
@ -0,0 +1,2 @@
|
|||
DB_BASE_URL = 'https://db.satnogs.org/api'
|
||||
NETWORK_BASE_URL = 'https://network.satnogs.org/api'
|
255
utils.py
255
utils.py
|
@ -1,14 +1,17 @@
|
|||
import requests
|
||||
import logging
|
||||
|
||||
DB_BASE_URL = 'https://db.satnogs.org/api'
|
||||
NETWORK_BASE_URL = 'https://network.satnogs.org/api'
|
||||
import math
|
||||
import random
|
||||
from datetime import datetime, timedelta
|
||||
import ephem
|
||||
import lxml
|
||||
import settings
|
||||
|
||||
|
||||
def get_active_transmitter_info(fmin, fmax):
|
||||
# Open session
|
||||
logging.info("Fetching transmitter information from DB.")
|
||||
r = requests.get('{}/transmitters'.format(DB_BASE_URL))
|
||||
r = requests.get('{}/transmitters'.format(settings.DB_BASE_URL))
|
||||
logging.info("Transmitters received!")
|
||||
|
||||
# Loop
|
||||
|
@ -25,5 +28,247 @@ def get_active_transmitter_info(fmin, fmax):
|
|||
|
||||
def get_transmitter_stats():
|
||||
logging.debug("Requesting transmitter success rates for all satellite")
|
||||
transmitters = requests.get('{}/transmitters/'.format(NETWORK_BASE_URL))
|
||||
transmitters = requests.get('{}/transmitters/'.format(settings.NETWORK_BASE_URL))
|
||||
return transmitters.json()
|
||||
|
||||
|
||||
def get_scheduled_passes_from_network(ground_station, tmin, tmax):
|
||||
# Get first page
|
||||
client = requests.session()
|
||||
|
||||
# Loop
|
||||
start = True
|
||||
scheduledpasses = []
|
||||
|
||||
logging.info("Requesting scheduled passes for ground station %d" % ground_station)
|
||||
while True:
|
||||
if start:
|
||||
r = client.get('{}/observations/?ground_station={:d}'.format(
|
||||
settings.NETWORK_BASE_URL,
|
||||
ground_station))
|
||||
start = False
|
||||
else:
|
||||
nextpage = r.links.get("next")
|
||||
r = client.get(nextpage["url"])
|
||||
|
||||
# r.json() is a list of dicts
|
||||
for o in r.json():
|
||||
satpass = {
|
||||
"id": o['norad_cat_id'],
|
||||
"tr": datetime.strptime(
|
||||
o['start'].replace(
|
||||
"Z",
|
||||
""),
|
||||
"%Y-%m-%dT%H:%M:%S"),
|
||||
"ts": datetime.strptime(
|
||||
o['end'].replace(
|
||||
"Z",
|
||||
""),
|
||||
"%Y-%m-%dT%H:%M:%S"),
|
||||
"scheduled": True}
|
||||
|
||||
if satpass['ts'] > tmin and satpass['tr'] < tmax:
|
||||
scheduledpasses.append(satpass)
|
||||
if satpass['ts'] < tmin:
|
||||
break
|
||||
|
||||
logging.info("Scheduled passes for ground station %d retrieved!" % ground_station)
|
||||
return scheduledpasses
|
||||
|
||||
|
||||
def overlap(satpass, scheduledpasses):
|
||||
# No overlap
|
||||
overlap = False
|
||||
# Loop over scheduled passes
|
||||
for scheduledpass in scheduledpasses:
|
||||
# Test pass falls within scheduled pass
|
||||
if satpass['tr'] >= scheduledpass['tr'] and satpass['ts'] < scheduledpass['ts']:
|
||||
overlap = True
|
||||
# Scheduled pass falls within test pass
|
||||
elif scheduledpass['tr'] >= satpass['tr'] and scheduledpass['ts'] < satpass['ts']:
|
||||
overlap = True
|
||||
# Pass start falls within pass
|
||||
elif satpass['tr'] >= scheduledpass['tr'] and satpass['tr'] < scheduledpass['ts']:
|
||||
overlap = True
|
||||
# Pass end falls within end
|
||||
elif satpass['ts'] >= scheduledpass['tr'] and satpass['ts'] < scheduledpass['ts']:
|
||||
overlap = True
|
||||
if overlap:
|
||||
break
|
||||
|
||||
return overlap
|
||||
|
||||
|
||||
def ordered_scheduler(passes, scheduledpasses):
|
||||
"""Loop through a list of ordered passes and schedule each next one that fits"""
|
||||
# Loop over passes
|
||||
for satpass in passes:
|
||||
# Schedule if there is no overlap with already scheduled passes
|
||||
if not overlap(satpass, scheduledpasses):
|
||||
scheduledpasses.append(satpass)
|
||||
|
||||
return scheduledpasses
|
||||
|
||||
|
||||
def random_scheduler(passes, scheduledpasses):
|
||||
"""Schedule passes based on random ordering"""
|
||||
# Shuffle passes
|
||||
random.shuffle(passes)
|
||||
|
||||
return ordered_scheduler(passes, scheduledpasses)
|
||||
|
||||
|
||||
def efficiency(passes):
|
||||
|
||||
# Loop over passes
|
||||
start = False
|
||||
for satpass in passes:
|
||||
if not start:
|
||||
dt = satpass['ts'] - satpass['tr']
|
||||
tmin = satpass['tr']
|
||||
tmax = satpass['ts']
|
||||
start = True
|
||||
else:
|
||||
dt += satpass['ts'] - satpass['tr']
|
||||
if satpass['tr'] < tmin:
|
||||
tmin = satpass['tr']
|
||||
if satpass['ts'] > tmax:
|
||||
tmax = satpass['ts']
|
||||
# Total time covered
|
||||
dttot = tmax - tmin
|
||||
|
||||
return dt.total_seconds(), dttot.total_seconds(
|
||||
), dt.total_seconds() / dttot.total_seconds()
|
||||
|
||||
|
||||
def find_passes(satellites, observer, tmin, tmax, minimum_altitude):
|
||||
# Loop over satellites
|
||||
passes = []
|
||||
passid = 0
|
||||
for satellite in satellites:
|
||||
# Set start time
|
||||
observer.date = ephem.date(tmin)
|
||||
|
||||
# Load TLE
|
||||
try:
|
||||
sat_ephem = ephem.readtle(str(satellite.tle0),
|
||||
str(satellite.tle1),
|
||||
str(satellite.tle2))
|
||||
except (ValueError, AttributeError):
|
||||
continue
|
||||
|
||||
# Loop over passes
|
||||
keep_digging = True
|
||||
while keep_digging:
|
||||
try:
|
||||
tr, azr, tt, altt, ts, azs = observer.next_pass(sat_ephem)
|
||||
except ValueError:
|
||||
break # there will be sats in our list that fall below horizon, skip
|
||||
except TypeError:
|
||||
break # if there happens to be a non-EarthSatellite object in the list
|
||||
except Exception:
|
||||
break
|
||||
|
||||
if tr is None:
|
||||
break
|
||||
|
||||
# using the angles module convert the sexagesimal degree into
|
||||
# something more easily read by a human
|
||||
try:
|
||||
elevation = format(math.degrees(altt), '.0f')
|
||||
azimuth_r = format(math.degrees(azr), '.0f')
|
||||
azimuth_s = format(math.degrees(azs), '.0f')
|
||||
except TypeError:
|
||||
break
|
||||
passid += 1
|
||||
|
||||
# show only if >= configured horizon and in next 6 hours,
|
||||
# and not directly overhead (tr < ts see issue 199)
|
||||
if tr < ephem.date(tmax):
|
||||
if (float(elevation) >= minimum_altitude and tr < ts):
|
||||
valid = True
|
||||
if tr < ephem.Date(datetime.now() +
|
||||
timedelta(minutes=5)):
|
||||
valid = False
|
||||
satpass = {'passid': passid,
|
||||
'mytime': str(observer.date),
|
||||
'name': str(satellite.name),
|
||||
'id': str(satellite.id),
|
||||
'tle1': str(satellite.tle1),
|
||||
'tle2': str(satellite.tle2),
|
||||
'tr': tr.datetime(), # Rise time
|
||||
'azr': azimuth_r, # Rise Azimuth
|
||||
'tt': tt.datetime(), # Max altitude time
|
||||
'altt': elevation, # Max altitude
|
||||
'ts': ts.datetime(), # Set time
|
||||
'azs': azimuth_s, # Set azimuth
|
||||
'valid': valid,
|
||||
'uuid': satellite.transmitter,
|
||||
'success_rate': satellite.success_rate,
|
||||
'good_count': satellite.good_count,
|
||||
'data_count': satellite.data_count,
|
||||
'scheduled': False}
|
||||
passes.append(satpass)
|
||||
observer.date = ephem.Date(
|
||||
ts).datetime() + timedelta(minutes=1)
|
||||
else:
|
||||
keep_digging = False
|
||||
|
||||
return passes
|
||||
|
||||
|
||||
def get_groundstation_info(ground_station_id):
|
||||
|
||||
logging.info("Requesting information for ground station %d" % ground_station_id)
|
||||
client = requests.session()
|
||||
|
||||
# Loop
|
||||
found = False
|
||||
r = client.get("{}/stations/?id={:d}".format(
|
||||
settings.NETWORK_BASE_URL,
|
||||
ground_station_id))
|
||||
for o in r.json():
|
||||
if o['id'] == ground_station_id:
|
||||
found = True
|
||||
break
|
||||
if found:
|
||||
logging.info('Ground station infromation retrieved!')
|
||||
return o
|
||||
else:
|
||||
logging.info('No ground station information found!')
|
||||
return {}
|
||||
|
||||
|
||||
def get_last_update(fname):
|
||||
try:
|
||||
fp = open(fname, "r")
|
||||
line = fp.readline()
|
||||
fp.close()
|
||||
return datetime.strptime(line.strip(), "%Y-%m-%dT%H:%M:%S")
|
||||
except IOError:
|
||||
return None
|
||||
|
||||
|
||||
def schedule_observation(
|
||||
session,
|
||||
norad_cat_id,
|
||||
uuid,
|
||||
ground_station_id,
|
||||
starttime,
|
||||
endtime):
|
||||
|
||||
obsURL = '{}/observations/new/'.format(settings.NETWORK_BASE_URL) # Observation URL
|
||||
# Get the observation/new/ page to get the CSFR token
|
||||
obs = session.get(obsURL)
|
||||
obs_html = lxml.html.fromstring(obs.text)
|
||||
hidden_inputs = obs_html.xpath(r'//form//input[@type="hidden"]')
|
||||
form = {x.attrib["name"]: x.attrib["value"] for x in hidden_inputs}
|
||||
form["satellite"] = norad_cat_id
|
||||
form["transmitter"] = uuid
|
||||
form["start-time"] = starttime
|
||||
form["end-time"] = endtime
|
||||
form["0-starting_time"] = starttime
|
||||
form["0-ending_time"] = endtime
|
||||
form["0-station"] = ground_station_id
|
||||
form["total"] = str(1)
|
||||
session.post(obsURL, data=form, headers={'referer': obsURL})
|
||||
|
|
Loading…
Reference in New Issue