189 lines
6.5 KiB
Python
Executable File
189 lines
6.5 KiB
Python
Executable File
#!/usr/bin/env python3
|
|
import os
|
|
import sys
|
|
import glob
|
|
import time
|
|
|
|
import argparse
|
|
import configparser
|
|
|
|
import numpy as np
|
|
|
|
import warnings
|
|
|
|
from termcolor import colored
|
|
|
|
from stvid.fourframe import FourFrame
|
|
from stvid.fourframe import Observation
|
|
|
|
from stvid.stars import pixel_catalog
|
|
from stvid.stars import store_calibration
|
|
from stvid.stars import generate_star_catalog
|
|
from stvid.astrometry import calibrate_from_reference
|
|
from stvid.astrometry import is_calibrated
|
|
from stvid.astrometry import generate_reference_with_anet
|
|
|
|
from astropy.utils.exceptions import AstropyWarning
|
|
|
|
if __name__ == "__main__":
|
|
# Read commandline options
|
|
conf_parser = argparse.ArgumentParser(description="Process captured" +
|
|
" video frames.")
|
|
conf_parser.add_argument("-c",
|
|
"--conf_file",
|
|
help="Specify configuration file. If no file" +
|
|
" is specified 'configuration.ini' is used.",
|
|
metavar="FILE")
|
|
conf_parser.add_argument("-d",
|
|
"--directory",
|
|
help="Specify directory of observations. If no" +
|
|
" directory is specified parent will be used.",
|
|
metavar="DIR",
|
|
dest="file_dir",
|
|
default=".")
|
|
args = conf_parser.parse_args()
|
|
|
|
# Read configuration file
|
|
cfg = configparser.ConfigParser(inline_comment_prefixes=("#", ":"))
|
|
conf_file = args.conf_file if args.conf_file else "configuration.ini"
|
|
result = cfg.read([conf_file])
|
|
if not result:
|
|
print("Could not read config file: %s\nExiting..." % conf_file)
|
|
sys.exit()
|
|
|
|
# Set warnings
|
|
warnings.filterwarnings("ignore", category=UserWarning, append=True)
|
|
warnings.simplefilter("ignore", AstropyWarning)
|
|
|
|
|
|
|
|
nstarsmin = cfg.getint("Processing", "nstarsmin")
|
|
|
|
|
|
# Extract abbrevs for TLE files
|
|
abbrevs, tlefiles = [], []
|
|
for key, value in cfg.items("Elements"):
|
|
if "tlefile" in key:
|
|
tlefiles.append(value)
|
|
elif "abbrev" in key:
|
|
abbrevs.append(value)
|
|
|
|
# Start processing loop
|
|
while True:
|
|
# Get files
|
|
fitsfnames = sorted(glob.glob(os.path.join(args.file_dir, "2*.fits")))
|
|
procfnames = sorted(glob.glob(os.path.join(args.file_dir, "2*.fits.png")))
|
|
fnames = [fname for fname in fitsfnames if f"{fname}.png" not in procfnames]
|
|
|
|
# Loop over files
|
|
for fname in fnames:
|
|
# Create reference calibration file in single threaded environment
|
|
calfname = os.path.join(args.file_dir, "test.fits")
|
|
if not os.path.exists(calfname):
|
|
solved = False
|
|
# Loop over files to find a suitable calibration file
|
|
for fname in fnames:
|
|
# Was this file already tried?
|
|
if not os.path.exists(f"{fname}.cat"):
|
|
# Generate star catalog
|
|
pix_catalog = generate_star_catalog(fname)
|
|
|
|
# Solve
|
|
if pix_catalog.nstars > nstarsmin:
|
|
print(colored(f"Computing astrometric calibration for {fname}", "yellow"))
|
|
solved = generate_reference_with_anet(fname, "", calfname)
|
|
|
|
# Break when solved
|
|
if solved:
|
|
break
|
|
else:
|
|
# test.fits exists, so calibration has been solved
|
|
solved = True
|
|
|
|
# Exit on failure to solve
|
|
if not solved:
|
|
break
|
|
|
|
# Generate star catalog
|
|
if not os.path.exists(f"{fname}.cat"):
|
|
pix_catalog = generate_star_catalog(fname)
|
|
else:
|
|
pix_catalog = pixel_catalog(f"{fname}.cat")
|
|
|
|
# Calibrate from reference
|
|
calibrate_from_reference(fname, calfname, pix_catalog)
|
|
|
|
# Store calibration
|
|
store_calibration(pix_catalog, f"{fname}.cal")
|
|
|
|
# Read Fourframe
|
|
ff = FourFrame(fname)
|
|
|
|
# Stars available and used
|
|
nused = np.sum(pix_catalog.flag == 1)
|
|
nstars = pix_catalog.nstars
|
|
|
|
# Write output
|
|
screenoutput = "%s %10.6f %10.6f %4d/%4d %5.1f %5.1f %6.2f +- %6.2f" % (
|
|
os.path.basename(ff.fname), ff.crval[0], ff.crval[1], nused, nstars,
|
|
3600.0 * ff.crres[0], 3600.0 * ff.crres[1], np.mean(
|
|
ff.zavg), np.std(ff.zavg))
|
|
|
|
if is_calibrated(ff):
|
|
color = "green"
|
|
else:
|
|
color = "red"
|
|
print(colored(screenoutput, color))
|
|
|
|
# Generate predictions
|
|
predictions = ff.generate_satellite_predictions(cfg)
|
|
|
|
# Find tracks
|
|
if is_calibrated(ff):
|
|
tracks = ff.find_tracks_by_hough3d(cfg)
|
|
else:
|
|
tracks = []
|
|
|
|
# Identify tracks
|
|
satno = 90000
|
|
for t in tracks:
|
|
is_identified = t.identify(predictions, satno, "22 500A", None, cfg, abbrevs, tlefiles)
|
|
if not is_identified:
|
|
satno += 1
|
|
|
|
# Format observations
|
|
obs = []
|
|
for t in tracks:
|
|
# Add to observation
|
|
obs.append(Observation(ff, t.tmid, t.x0, t.y0, ff.site_id,
|
|
t.satno, t.cospar, t.catalogname))
|
|
|
|
# Write observations
|
|
for o in obs:
|
|
iod_line = o.to_iod_line()
|
|
|
|
# Open file
|
|
outfname = f"{ff.froot}_{o.satno:05d}_{o.catalogname}.dat"
|
|
with open(outfname, "w") as fp:
|
|
fp.write(f"{iod_line}\n")
|
|
if o.catalogname == "catalog":
|
|
color = "grey"
|
|
elif o.catalogname == "classfd":
|
|
color = "blue"
|
|
elif o.catalogname == "unid":
|
|
color = "magenta"
|
|
print(colored(iod_line, color))
|
|
|
|
# Generate plots
|
|
ff.diagnostic_plot(predictions, None, None, cfg)
|
|
for track, o in zip(tracks, obs):
|
|
ff.diagnostic_plot(predictions, track, o, cfg)
|
|
|
|
|
|
# Sleep
|
|
try:
|
|
print("File queue empty, waiting for new files...\r", end = "")
|
|
time.sleep(10)
|
|
except KeyboardInterrupt:
|
|
sys.exit()
|