satnogs-auto-scheduler/cache.py

139 lines
5.0 KiB
Python

import os
import logging
from datetime import datetime
from satellite_tle import fetch_tles
from satnogs_client import get_active_transmitter_info, \
get_satellite_info, \
get_transmitter_stats
class CacheManager:
def __init__(self,
ground_station_id,
ground_station_antennas,
cache_dir,
cache_age,
max_norad_cat_id):
self.ground_station_id = ground_station_id
self.ground_station_antennas = ground_station_antennas
self.cache_dir = cache_dir
self.cache_age = cache_age
self.max_norad_cat_id = max_norad_cat_id
self.transmitters_file = os.path.join(self.cache_dir, "transmitters_%d.txt" % self.ground_station_id)
self.tles_file = os.path.join(self.cache_dir, "tles_%d.txt" % self.ground_station_id)
self.last_update_file = os.path.join(self.cache_dir, "last_update_%d.txt" % ground_station_id)
# Create cache
if not os.path.isdir(self.cache_dir):
os.mkdir(self.cache_dir)
def last_update(self):
try:
with open(self.last_update_file, "r") as f:
line = f.readline()
return datetime.strptime(line.strip(), "%Y-%m-%dT%H:%M:%S")
except IOError:
return None
def update_needed(self):
tnow = datetime.now()
# Get last update
tlast = self.last_update()
if tlast is None or (tnow - tlast).total_seconds() > self.cache_age * 3600:
return True
if not os.path.isfile(self.transmitters_file):
return True
if not os.path.isfile(self.tles_file):
return True
return False
def update(self, force=False):
if not force and not self.update_needed():
# Cache is valid, skip the update
return
logging.info('Updating transmitters and TLEs for station')
tnow = datetime.now()
# Store current time
with open(self.last_update_file, "w") as fp:
fp.write(tnow.strftime("%Y-%m-%dT%H:%M:%S") + "\n")
# Get active transmitters in frequency range of each antenna
transmitters = {}
for antenna in self.ground_station_antennas:
for transmitter in get_active_transmitter_info(antenna["frequency"],
antenna["frequency_max"]):
transmitters[transmitter['uuid']] = transmitter
# Get satellites which are alive
alive_norad_cat_ids = get_satellite_info()
# Extract NORAD IDs from transmitters
norad_cat_ids = sorted(
set([
transmitter["norad_cat_id"] for transmitter in transmitters.values()
if transmitter["norad_cat_id"] < self.max_norad_cat_id and
transmitter["norad_cat_id"] in alive_norad_cat_ids
]))
# Store transmitters
fp = open(self.transmitters_file, "w")
logging.info("Requesting transmitter success rates.")
transmitters_stats = get_transmitter_stats()
for transmitter in transmitters_stats:
uuid = transmitter["uuid"]
# Skip absent transmitters
if uuid not in transmitters.keys():
continue
# Skip dead satellites
if transmitters[uuid]["norad_cat_id"] not in alive_norad_cat_ids:
continue
fp.write(
"%05d %s %d %d %d %s\n" %
(transmitters[uuid]["norad_cat_id"], uuid, transmitter["stats"]["success_rate"],
transmitter["stats"]["good_count"], transmitter["stats"]["total_count"], transmitters[uuid]["mode"]))
logging.info("Transmitter success rates received!")
fp.close()
# Get TLEs
tles = fetch_tles(norad_cat_ids)
# Store TLEs
with open(self.tles_file, "w") as f:
for norad_cat_id, (source, tle) in tles.items():
f.write("%s\n%s\n%s\n" % (tle[0], tle[1], tle[2]))
def read_tles(self):
with open(self.tles_file, "r") as f:
lines = f.readlines()
for i in range(0, len(lines), 3):
tle0 = lines[i]
tle1 = lines[i + 1]
tle2 = lines[i + 2]
if tle1.split(" ")[1] == "":
norad_cat_id = int(tle1.split(" ")[2][:4])
else:
norad_cat_id = int(tle1.split(" ")[1][:5])
yield {'norad_cat_id': norad_cat_id,
'lines': [tle0, tle1, tle2]}
def read_transmitters(self):
with open(self.transmitters_file, "r") as f:
for line in f.readlines():
item = line.split()
yield {"norad_cat_id": int(item[0]),
"uuid": item[1],
"success_rate": float(item[2]) / 100.0,
"good_count": int(item[3]),
"data_count": int(item[4]),
"mode": item[5]}