Rudimentary working version
parent
d428139a57
commit
1dcacedcce
|
@ -9,12 +9,17 @@ import matplotlib.pyplot as plt
|
|||
from matplotlib.patches import Rectangle
|
||||
from matplotlib.dates import DayLocator, HourLocator, DateFormatter, drange
|
||||
from matplotlib import colors as mcolors
|
||||
import itertools
|
||||
from satellite_tle import fetch_tles
|
||||
import os
|
||||
import glob
|
||||
import lxml.html
|
||||
|
||||
class satellite:
|
||||
"""Satellite class"""
|
||||
class twolineelement:
|
||||
"""TLE class"""
|
||||
|
||||
def __init__(self, tle0, tle1, tle2):
|
||||
"""Define a satellite"""
|
||||
"""Define a TLE"""
|
||||
|
||||
self.tle0 = tle0
|
||||
self.tle1 = tle1
|
||||
|
@ -23,7 +28,21 @@ class satellite:
|
|||
self.name = tle0[2:]
|
||||
else:
|
||||
self.name = tle0
|
||||
self.id = tle1.split(" ")[1][:5]
|
||||
self.id = int(tle1.split(" ")[1][:5])
|
||||
|
||||
|
||||
class satellite:
|
||||
"""Satellite class"""
|
||||
|
||||
def __init__(self, tle, transmitter):
|
||||
"""Define a satellite"""
|
||||
|
||||
self.tle0 = tle.tle0
|
||||
self.tle1 = tle.tle1
|
||||
self.tle2 = tle.tle2
|
||||
self.id = tle.id
|
||||
self.name = tle.name
|
||||
self.transmitter = transmitter
|
||||
|
||||
def get_scheduled_passes_from_network(ground_station, tmin, tmax):
|
||||
# Get first page
|
||||
|
@ -175,6 +194,7 @@ def find_passes(satellites, observer, tmin, tmax, minimum_altitude):
|
|||
'ts': ts.datetime(), # Set time
|
||||
'azs': azimuth_s, # Set azimuth
|
||||
'valid': valid,
|
||||
'uuid': satellite.transmitter,
|
||||
'scheduled': False}
|
||||
passes.append(satpass)
|
||||
observer.date = ephem.Date(ts).datetime() + timedelta(minutes=1)
|
||||
|
@ -215,15 +235,112 @@ def get_groundstation_info(ground_station_id):
|
|||
else:
|
||||
return {}
|
||||
|
||||
def get_active_transmitter_info(fmin, fmax):
|
||||
# Open session
|
||||
client = requests.session()
|
||||
r = client.get("https://db.satnogs.org/api/transmitters")
|
||||
|
||||
# Loop
|
||||
transmitters = []
|
||||
for o in r.json():
|
||||
if o["alive"] and o["downlink_low"]>fmin and o["downlink_low"]<=fmax:
|
||||
transmitter = {"norad_cat_id": o["norad_cat_id"],
|
||||
"uuid": o["uuid"]}
|
||||
transmitters.append(transmitter)
|
||||
|
||||
return transmitters
|
||||
|
||||
def get_last_update(fname):
|
||||
try:
|
||||
fp = open(fname, "r")
|
||||
line = fp.readline()
|
||||
fp.close()
|
||||
return datetime.strptime(line.strip(), "%Y-%m-%dT%H:%M:%S")
|
||||
except IOError:
|
||||
return None
|
||||
|
||||
|
||||
def schedule_observation(username, password, norad_cat_id, uuid, ground_station_id, starttime, endtime):
|
||||
loginUrl = "https://network.satnogs.org/accounts/login/" #login URL
|
||||
session = requests.session()
|
||||
login = session.get(loginUrl) #Get login page for CSFR token
|
||||
login_html = lxml.html.fromstring(login.text)
|
||||
login_hidden_inputs = login_html.xpath(r'//form//input[@type="hidden"]') #Get CSFR token
|
||||
form = {x.attrib["name"]: x.attrib["value"] for x in login_hidden_inputs}
|
||||
form["login"] = username
|
||||
form["password"] = password
|
||||
session.post(loginUrl, data=form, headers={'referer':loginUrl}) #Login
|
||||
|
||||
obsURL = "https://network.satnogs.org/observations/new/" #Observation URL
|
||||
obs = session.get(obsURL) #Get the observation/new/ page to get the CSFR token
|
||||
obs_html = lxml.html.fromstring(obs.text)
|
||||
hidden_inputs = obs_html.xpath(r'//form//input[@type="hidden"]')
|
||||
form = {x.attrib["name"]: x.attrib["value"] for x in hidden_inputs}
|
||||
form["satellite"] = norad_cat_id
|
||||
form["transmitter"] = uuid
|
||||
form["start-time"] = starttime
|
||||
form["end-time"] = endtime
|
||||
form["0-starting_time"] = starttime
|
||||
form["0-ending_time"] = endtime
|
||||
form["0-station"] = ground_station_id
|
||||
form["total"] = str(1)
|
||||
session.post(obsURL, data=form, headers={'referer':obsURL})
|
||||
|
||||
if __name__ == "__main__":
|
||||
# Settings
|
||||
ground_station_id = 39
|
||||
tlefile = "vhf.txt"
|
||||
ground_station_id = 40
|
||||
length_hours = 2
|
||||
|
||||
data_age_hours = 24
|
||||
cache_dir = "/tmp/cache"
|
||||
username = ""
|
||||
password = ""
|
||||
schedule = False
|
||||
|
||||
# Set time range
|
||||
tnow = datetime.utcnow()
|
||||
tmin = tnow
|
||||
tmax = tnow+timedelta(hours=length_hours)
|
||||
|
||||
# Get ground station information
|
||||
ground_station = get_groundstation_info(ground_station_id)
|
||||
|
||||
# Create cache
|
||||
if not os.path.isdir(cache_dir):
|
||||
os.mkdir(cache_dir)
|
||||
|
||||
# Get last update
|
||||
tlast = get_last_update(os.path.join(cache_dir, "last_update_%d.txt"%ground_station_id))
|
||||
|
||||
# Update
|
||||
if tlast==None or (tnow-tlast).total_seconds()>data_age_hours*3600:
|
||||
# Store current time
|
||||
with open(os.path.join(cache_dir, "last_update_%d.txt"%ground_station_id), "w") as fp:
|
||||
fp.write(tnow.strftime("%Y-%m-%dT%H:%M:%S")+"\n")
|
||||
|
||||
# Get active transmitters in frequency range of each antenna
|
||||
transmitters = []
|
||||
for antenna in ground_station['antenna']:
|
||||
transmitters.append(get_active_transmitter_info(antenna["frequency"], antenna["frequency_max"]))
|
||||
transmitters = list(itertools.chain.from_iterable(transmitters))
|
||||
|
||||
# Store transmitters
|
||||
fp = open(os.path.join(cache_dir, "transmitters_%d.txt"%ground_station_id), "w")
|
||||
for transmitter in transmitters:
|
||||
fp.write("%05d %s\n"%(transmitter["norad_cat_id"], transmitter["uuid"]))
|
||||
fp.close()
|
||||
|
||||
# Get NORAD IDs
|
||||
norad_cat_ids = sorted(set([transmitter["norad_cat_id"] for transmitter in transmitters]))
|
||||
|
||||
# Get TLEs
|
||||
tles = fetch_tles(norad_cat_ids)
|
||||
|
||||
# Store TLEs
|
||||
fp = open(os.path.join(cache_dir, "tles_%d.txt"%ground_station_id), "w")
|
||||
for norad_cat_id, (source, tle) in tles.items():
|
||||
fp.write("%s\n%s\n%s\n"%(tle[0], tle[1], tle[2]))
|
||||
fp.close()
|
||||
|
||||
# Set observer
|
||||
observer = ephem.Observer()
|
||||
observer.lon = str(ground_station['lng'])
|
||||
|
@ -231,15 +348,21 @@ if __name__ == "__main__":
|
|||
observer.elevation = ground_station['altitude']
|
||||
minimum_altitude = ground_station['min_horizon']
|
||||
|
||||
tmin = datetime.utcnow()
|
||||
tmax = datetime.utcnow()+timedelta(hours=length_hours)
|
||||
|
||||
# Read satellites
|
||||
with open(tlefile, "r") as f:
|
||||
# Read tles
|
||||
with open(os.path.join(cache_dir, "tles_%d.txt"%ground_station_id), "r") as f:
|
||||
lines = f.readlines()
|
||||
satellites = [satellite(lines[i], lines[i+1], lines[i+2])
|
||||
for i in range(0, len(lines), 3)]
|
||||
tles = [twolineelement(lines[i], lines[i+1], lines[i+2])
|
||||
for i in range(0, len(lines), 3)]
|
||||
|
||||
# Read transmitters
|
||||
satellites = []
|
||||
with open(os.path.join(cache_dir, "transmitters_%d.txt"%ground_station_id), "r") as f:
|
||||
lines = f.readlines()
|
||||
for line in lines:
|
||||
norad_cat_id, uuid = int(line.split()[0]), line.split()[1]
|
||||
for tle in tles:
|
||||
if tle.id == norad_cat_id:
|
||||
satellites.append(satellite(tle, uuid))
|
||||
|
||||
# Find passes
|
||||
passes = find_passes(satellites, observer, tmin, tmax, minimum_altitude)
|
||||
|
@ -248,7 +371,6 @@ if __name__ == "__main__":
|
|||
priorities = {"40069": 1.000, "25338": 0.990, "28654": 0.990, "33591": 0.990}
|
||||
|
||||
# List of scheduled passes
|
||||
# scheduledpasses = []
|
||||
scheduledpasses = get_scheduled_passes_from_network(ground_station_id, tmin, tmax)
|
||||
print("Found %d scheduled passes between %s and %s on ground station %d\n"%(len(scheduledpasses), tmin, tmax, ground_station_id))
|
||||
|
||||
|
@ -314,11 +436,26 @@ if __name__ == "__main__":
|
|||
# for satpass in scheduledpasses:
|
||||
for satpass in sorted(scheduledpasses, key=lambda satpass: satpass['tr']):
|
||||
if satpass['scheduled']==False:
|
||||
print("%s %s %3d %3d %3d %5.2f | %s %s"%(satpass['tr'].strftime("%Y-%m-%dT%H:%M:%S"), satpass['ts'].strftime("%Y-%m-%dT%H:%M:%S"), float(satpass['azr']), float(satpass['altt']), float(satpass['azs']),satpass['priority'],satpass['id'], satpass['name'].rstrip()))
|
||||
print("%s %s %3d %3d %3d %5.2f | %s %s %s"%(satpass['tr'].strftime("%Y-%m-%dT%H:%M:%S"), satpass['ts'].strftime("%Y-%m-%dT%H:%M:%S"), float(satpass['azr']), float(satpass['altt']), float(satpass['azs']),satpass['priority'], satpass['uuid'], satpass['id'], satpass['name'].rstrip()))
|
||||
else:
|
||||
print("%s %s %3d %3d %3d %5.2f | %s %s"%(satpass['tr'].strftime("%Y-%m-%dT%H:%M:%S"), satpass['ts'].strftime("%Y-%m-%dT%H:%M:%S"), 0.0, 0.0, 0.0, 0.0, satpass['id'], ""))
|
||||
print("%s %s %3d %3d %3d %5.2f | %s %s %s"%(satpass['tr'].strftime("%Y-%m-%dT%H:%M:%S"), satpass['ts'].strftime("%Y-%m-%dT%H:%M:%S"), 0.0, 0.0, 0.0, 0.0, "", satpass['id'], ""))
|
||||
|
||||
# Print schedule commands
|
||||
# Schedule passes
|
||||
for satpass in sorted(scheduledpasses, key=lambda satpass: satpass['tr']):
|
||||
if satpass['scheduled']==False:
|
||||
print("firefox \"https://network.satnogs.org/observations/new/?norad=%s&ground_station_id=%d&start_date=%s&end_date=%s\""%(satpass['id'], ground_station_id, (satpass['tr']-timedelta(minutes=1)).strftime("%Y/%m/%d%%20%H:%M"), (satpass['ts']+timedelta(minutes=1)).strftime("%Y/%m/%d%%20%H:%M")))
|
||||
print(username,
|
||||
password,
|
||||
int(satpass['id']),
|
||||
satpass['uuid'],
|
||||
ground_station_id,
|
||||
satpass['tr'].strftime("%Y-%m-%d %H:%M:%S")+".000",
|
||||
satpass['ts'].strftime("%Y-%m-%d %H:%M:%S")+".000")
|
||||
if schedule:
|
||||
schedule_observation(username,
|
||||
password,
|
||||
int(satpass['id']),
|
||||
satpass['uuid'],
|
||||
ground_station_id,
|
||||
satpass['tr'].strftime("%Y-%m-%d %H:%M:%S")+".000",
|
||||
satpass['ts'].strftime("%Y-%m-%d %H:%M:%S")+".000")
|
||||
|
||||
|
|
Loading…
Reference in New Issue