stvid/process_new.py

189 lines
6.5 KiB
Python
Executable File

#!/usr/bin/env python3
import os
import sys
import glob
import time
import argparse
import configparser
import numpy as np
import warnings
from termcolor import colored
from stvid.fourframe import FourFrame
from stvid.fourframe import Observation
from stvid.stars import pixel_catalog
from stvid.stars import store_calibration
from stvid.stars import generate_star_catalog
from stvid.astrometry import calibrate_from_reference
from stvid.astrometry import is_calibrated
from stvid.astrometry import generate_reference_with_anet
from astropy.utils.exceptions import AstropyWarning
if __name__ == "__main__":
# Read commandline options
conf_parser = argparse.ArgumentParser(description="Process captured" +
" video frames.")
conf_parser.add_argument("-c",
"--conf_file",
help="Specify configuration file. If no file" +
" is specified 'configuration.ini' is used.",
metavar="FILE")
conf_parser.add_argument("-d",
"--directory",
help="Specify directory of observations. If no" +
" directory is specified parent will be used.",
metavar="DIR",
dest="file_dir",
default=".")
args = conf_parser.parse_args()
# Read configuration file
cfg = configparser.ConfigParser(inline_comment_prefixes=("#", ":"))
conf_file = args.conf_file if args.conf_file else "configuration.ini"
result = cfg.read([conf_file])
if not result:
print("Could not read config file: %s\nExiting..." % conf_file)
sys.exit()
# Set warnings
warnings.filterwarnings("ignore", category=UserWarning, append=True)
warnings.simplefilter("ignore", AstropyWarning)
nstarsmin = cfg.getint("Processing", "nstarsmin")
# Extract abbrevs for TLE files
abbrevs, tlefiles = [], []
for key, value in cfg.items("Elements"):
if "tlefile" in key:
tlefiles.append(value)
elif "abbrev" in key:
abbrevs.append(value)
# Start processing loop
while True:
# Get files
fitsfnames = sorted(glob.glob(os.path.join(args.file_dir, "2*.fits")))
procfnames = sorted(glob.glob(os.path.join(args.file_dir, "2*.fits.png")))
fnames = [fname for fname in fitsfnames if f"{fname}.png" not in procfnames]
# Loop over files
for fname in fnames:
# Create reference calibration file in single threaded environment
calfname = os.path.join(args.file_dir, "test.fits")
if not os.path.exists(calfname):
solved = False
# Loop over files to find a suitable calibration file
for fname in fnames:
# Was this file already tried?
if not os.path.exists(f"{fname}.cat"):
# Generate star catalog
pix_catalog = generate_star_catalog(fname)
# Solve
if pix_catalog.nstars > nstarsmin:
print(colored(f"Computing astrometric calibration for {fname}", "yellow"))
solved = generate_reference_with_anet(fname, "", calfname)
# Break when solved
if solved:
break
else:
# test.fits exists, so calibration has been solved
solved = True
# Exit on failure to solve
if not solved:
break
# Generate star catalog
if not os.path.exists(f"{fname}.cat"):
pix_catalog = generate_star_catalog(fname)
else:
pix_catalog = pixel_catalog(f"{fname}.cat")
# Calibrate from reference
calibrate_from_reference(fname, calfname, pix_catalog)
# Store calibration
store_calibration(pix_catalog, f"{fname}.cal")
# Read Fourframe
ff = FourFrame(fname)
# Stars available and used
nused = np.sum(pix_catalog.flag == 1)
nstars = pix_catalog.nstars
# Write output
screenoutput = "%s %10.6f %10.6f %4d/%4d %5.1f %5.1f %6.2f +- %6.2f" % (
os.path.basename(ff.fname), ff.crval[0], ff.crval[1], nused, nstars,
3600.0 * ff.crres[0], 3600.0 * ff.crres[1], np.mean(
ff.zavg), np.std(ff.zavg))
if is_calibrated(ff):
color = "green"
else:
color = "red"
print(colored(screenoutput, color))
# Generate predictions
predictions = ff.generate_satellite_predictions(cfg)
# Find tracks
if is_calibrated(ff):
tracks = ff.find_tracks_by_hough3d(cfg)
else:
tracks = []
# Identify tracks
satno = 90000
for t in tracks:
is_identified = t.identify(predictions, satno, "22 500A", None, cfg, abbrevs, tlefiles)
if not is_identified:
satno += 1
# Format observations
obs = []
for t in tracks:
# Add to observation
obs.append(Observation(ff, t.tmid, t.x0, t.y0, ff.site_id,
t.satno, t.cospar, t.catalogname))
# Write observations
for o in obs:
iod_line = o.to_iod_line()
# Open file
outfname = f"{ff.froot}_{o.satno:05d}_{o.catalogname}.dat"
with open(outfname, "w") as fp:
fp.write(f"{iod_line}\n")
if o.catalogname == "catalog":
color = "grey"
elif o.catalogname == "classfd":
color = "blue"
elif o.catalogname == "unid":
color = "magenta"
print(colored(iod_line, color))
# Generate plots
ff.diagnostic_plot(predictions, None, None, cfg)
for track, o in zip(tracks, obs):
ff.diagnostic_plot(predictions, track, o, cfg)
# Sleep
try:
print("File queue empty, waiting for new files...\r", end = "")
time.sleep(10)
except KeyboardInterrupt:
sys.exit()