#!/usr/bin/env python3 from __future__ import print_function import glob import numpy as np from stvid.stio import FourFrame from stvid.stars import generate_star_catalog from stvid.stars import store_calibration from stvid.stars import pixel_catalog from stvid.astrometry import calibrate_from_reference from stvid.astrometry import is_calibrated from stvid.astrometry import generate_reference_with_anet from stvid.satellite import generate_satellite_predictions from stvid.satellite import find_hough3d_lines from stvid.extract import extract_tracks import astropy.units as u from astropy.utils.exceptions import AstropyWarning from astropy.coordinates import EarthLocation from astropy.time import Time # For getting IERS table in single-thread session import multiprocessing as mp import warnings import configparser import argparse import os from termcolor import colored import time import shutil import sys """ process.py - Utility to process stvid/acquire.py FITS images to detect and extract satellite positions and create IODs. Terminal output Color Codes: GREEN: Calibrated file RED: Not calibrated file YELLOW: Computing astrometric calibration BLUE: Classified satellite GREY: Catalog satellite MAGENTA: UNID satellite """ def chunks(lst, n): """Yield successive n-sized chunks from lst.""" for i in range(0, len(lst), n): yield lst[i:i + n] def process_loop(fname): """ Thread to process satobs FourFrame FITS files in a multi-thread compatible manner """ # Generate star catalog if not os.path.exists(fname + ".cat"): pix_catalog = generate_star_catalog(fname) else: pix_catalog = pixel_catalog(fname + ".cat") # Calibrate from reference calibrate_from_reference(fname, "test.fits", pix_catalog) # Store calibration store_calibration(pix_catalog, fname + ".cal") # Generate satellite predictions generate_satellite_predictions(fname) # Detect lines with 3D Hough transform ids = find_hough3d_lines(fname, nhoughmin, houghrmin) # Get properties ff = FourFrame(fname) # Extract tracks if is_calibrated(ff): screenoutput_idents = extract_tracks(fname, trkrmin, drdtmin, drdtmax, trksig, ntrkmin, root_dir, results_dir, tle_dir) else: screenoutput_idents = None # Stars available and used nused = np.sum(pix_catalog.flag == 1) nstars = pix_catalog.nstars # Write output screenoutput = "%s %10.6f %10.6f %4d/%4d %5.1f %5.1f %6.2f +- %6.2f" % ( ff.fname, ff.crval[0], ff.crval[1], nused, nstars, 3600.0 * ff.crres[0], 3600.0 * ff.crres[1], np.mean( ff.zavg), np.std(ff.zavg)) if is_calibrated(ff): screenoutput = colored(screenoutput, "green") else: screenoutput = colored(screenoutput, "red") imgstat_output = ("%s,%.8lf,%.6f,%.6f,%.3f,%.3f,%.3f," + "%.3f,%d,%d\n") % ( (ff.fname, ff.mjd, ff.crval[0], ff.crval[1], 3600 * ff.crres[0], 3600 * ff.crres[1], np.mean( ff.zavg), np.std(ff.zavg), nstars, nused)) # Move processed files shutil.move(fname, os.path.join(processed_dir, fname)) shutil.move(fname + ".png", os.path.join(processed_dir, fname + ".png")) shutil.move(fname + ".id", os.path.join(processed_dir, fname + ".id")) shutil.move(fname + ".cat", os.path.join(processed_dir, fname + ".cat")) shutil.move(fname + ".cal", os.path.join(processed_dir, fname + ".cal")) return (screenoutput, imgstat_output, screenoutput_idents) if __name__ == "__main__": # Read commandline options conf_parser = argparse.ArgumentParser(description='Process captured' + ' video frames.') conf_parser.add_argument("-c", "--conf_file", help="Specify configuration file. If no file" + " is specified 'configuration.ini' is used.", metavar="FILE") conf_parser.add_argument("-d", "--directory", help="Specify directory of observations. If no" + " directory is specified parent will be used.", metavar='DIR', dest='file_dir', default=".") args = conf_parser.parse_args() # Process commandline options and parse configuration cfg = configparser.ConfigParser(inline_comment_prefixes=('#', ';')) conf_file = args.conf_file if args.conf_file else "configuration.ini" result = cfg.read([conf_file]) if not result: print("Could not read config file: %s\nExiting..." % conf_file) sys.exit() # Set warnings warnings.filterwarnings("ignore", category=UserWarning, append=True) warnings.simplefilter("ignore", AstropyWarning) # Set location loc = EarthLocation(lat=cfg.getfloat('Common', 'observer_lat') * u.deg, lon=cfg.getfloat('Common', 'observer_lon') * u.deg, height=cfg.getfloat('Common', 'observer_height') * u.m) # Extract settings drdtmin = cfg.getfloat('Processing', 'drdtmin') drdtmax = cfg.getfloat('Processing', 'drdtmax') trksig = cfg.getfloat('Processing', 'trksig') trkrmin = cfg.getfloat('Processing', 'trkrmin') ntrkmin = cfg.getint('Processing', 'ntrkmin') houghrmin = cfg.getfloat('Processing', 'houghrmin') nhoughmin = cfg.getint('Processing', 'nhoughmin') nstarsmin = cfg.getint('Processing', 'nstarsmin') # Move to processing directory os.chdir(args.file_dir) # Force single-threaded IERS table update, if needed t = Time.now() _ = t.ut1 # Statistics file fstat = open("imgstat.csv", "w") fstat.write("fname,mjd,ra,de,rmsx,rmsy,mean,std,nstars,nused\n") # Directory logic file_dir = os.path.abspath(args.file_dir.rstrip("/")) root_dir = os.path.split(file_dir)[0] tle_dir = cfg.get('Common', 'tle_path') if cfg.has_option('Common', 'results_path'): results_dir = os.path.join(cfg.get('Common', 'results_path'), os.path.split(root_dir)[-1]) else: results_dir = root_dir processed_dir = os.path.join(file_dir, "processed") # Create output dirs if not os.path.exists(os.path.join(results_dir, "classfd")): os.makedirs(os.path.join(results_dir, "classfd")) if not os.path.exists(os.path.join(results_dir, "catalog")): os.makedirs(os.path.join(results_dir, "catalog")) if not os.path.exists(os.path.join(results_dir, "unid")): os.makedirs(os.path.join(results_dir, "unid")) if not os.path.exists(os.path.join(results_dir, "not_seen")): os.makedirs(os.path.join(results_dir, "not_seen")) if not os.path.exists(processed_dir): os.makedirs(processed_dir) cpu_count = os.cpu_count() if (cpu_count > 1): print("Processing with {} threads".format(cpu_count)) # Forever loop while True: # Get files fnames = sorted(glob.glob("2*.fits")) # Create reference calibration file in single threaded environment if not os.path.exists("test.fits"): solved = False # Loop over files to find a suitable calibration file for fname in fnames: # Was this file already tried? if not os.path.exists(fname+".cat"): # Generate star catalog pix_catalog = generate_star_catalog(fname) # Solve if pix_catalog.nstars > nstarsmin: print(colored("Computing astrometric calibration for %s" % fname, "yellow")) solved = generate_reference_with_anet(fname, "") # Break when solved if solved: break else: # test.fits exists, so calibration has been solved solved = True # Only attempt processing if we have a calibration file if solved: p = mp.Pool(processes=cpu_count) try: # Loop over files in parallel, print output every batch size of cpu_count satobs_chunks = chunks(fnames,cpu_count) for chk in satobs_chunks: for result in p.map(process_loop, chk): (screenoutput, fileoutput, screenoutput_idents) = result fstat.write(fileoutput) print(screenoutput) if screenoutput_idents is not None: for [outfilename, iod_line, color] in screenoutput_idents: print(colored(iod_line,color)) # Write iodline with open(outfilename, "a") as fp: fp.write("%s\n" % iod_line) p.close() p.join() except KeyboardInterrupt: fstat.close() p.close() sys.exit() # Sleep try: print("File queue empty, waiting for new files...\r", end = '') time.sleep(1) except KeyboardInterrupt: fstat.close() sys.exit() # Close files fstat.close()