#!/usr/bin/env python3 from __future__ import print_function import os import glob import numpy as np import subprocess import shutil import astropy.units as u from astropy.io import fits from astropy import wcs from astropy.coordinates import SkyCoord, FK5, ICRS from astropy.time import Time from scipy import optimize from stvid.stars import pixel_catalog # Class for the Tycho 2 catalog class tycho2_catalog: """Tycho2 catalog""" def __init__(self, maxmag=9.0): hdu = fits.open(os.path.join(os.getenv("ST_DATADIR"), "data/tyc2.fits")) ra = hdu[1].data.field('RA')*u.deg dec = hdu[1].data.field('DEC')*u.deg mag = hdu[1].data.field('MAG_VT') c = mag < maxmag self.ra = ra[c] self.dec = dec[c] self.mag = mag[c] # Estimate the WCS from a reference file def estimate_wcs_from_reference(ref, fname): # Read header of reference hdu = fits.open(ref) hdu[0].header["NAXIS"] = 2 w = wcs.WCS(hdu[0].header) # Get time and position from reference tref = Time(hdu[0].header["MJD-OBS"], format="mjd", scale="utc") pref = SkyCoord(ra=w.wcs.crval[0], dec=w.wcs.crval[1], unit="deg", frame="icrs").transform_to(FK5(equinox=tref)) # Read time from target hdu = fits.open(fname) t = Time(hdu[0].header["MJD-OBS"], format="mjd", scale="utc") # Correct wcs dra = (t.sidereal_time("mean", "greenwich") - tref.sidereal_time("mean", "greenwich")) p = FK5(ra=pref.ra+dra, dec=pref.dec, equinox=t).transform_to(ICRS) w.wcs.crval = np.array([p.ra.degree, p.dec.degree]) return w # Match the astrometry and pixel catalog def match_catalogs(ast_catalog, pix_catalog, w, rmin): # Select stars towards pointing center ra, dec = w.wcs.crval*u.deg d = np.arccos(np.sin(dec) * np.sin(ast_catalog.dec) + np.cos(dec) * np.cos(ast_catalog.dec) * np.cos(ra-ast_catalog.ra)) c = (d < 30.0*u.deg) ra, dec, mag = ast_catalog.ra[c], ast_catalog.dec[c], ast_catalog.mag[c] # Convert RA/Dec to pixels pix = w.wcs_world2pix(np.stack((ra, dec), axis=-1), 0) xs, ys = pix[:, 0], pix[:, 1] # Loop over stars nmatch = 0 for i in range(len(pix_catalog.x)): dx = xs-pix_catalog.x[i] dy = ys-pix_catalog.y[i] r = np.sqrt(dx*dx+dy*dy) if np.min(r) < rmin: j = np.argmin(r) pix_catalog.ra[i] = ra[j].value pix_catalog.dec[i] = dec[j].value pix_catalog.imag[i] = mag[j] pix_catalog.flag[i] = 1 nmatch += 1 return nmatch # Residual function def residual(a, x, y, z): return z-(a[0]+a[1]*x+a[2]*y) # Fit transformation def fit_wcs(w, pix_catalog): x0, y0 = w.wcs.crpix ra0, dec0 = w.wcs.crval dx, dy = pix_catalog.x-x0, pix_catalog.y-y0 # Iterate to remove outliers nstars = np.sum(pix_catalog.flag == 1) for j in range(10): w = wcs.WCS(naxis=2) w.wcs.crpix = np.array([0.0, 0.0]) w.wcs.cd = np.array([[1.0, 0.0], [0.0, 1.0]]) w.wcs.ctype = ["RA---TAN", "DEC--TAN"] w.wcs.set_pv([(2, 1, 45.0)]) c = pix_catalog.flag == 1 # Iterate to move crval to crpix location for i in range(5): w.wcs.crval = np.array([ra0, dec0]) world = np.stack((pix_catalog.ra, pix_catalog.dec), axis=-1) pix = w.wcs_world2pix(world, 1) rx, ry = pix[:, 0], pix[:, 1] ax, cov_q, infodict, mesg, ierr = optimize.leastsq(residual, [0.0, 0.0, 0.0], args=(dx[c], dy[c], rx[c]), full_output=1) ay, cov_q, infodict, mesg, ierr = optimize.leastsq(residual, [0.0, 0.0, 0.0], args=(dx[c], dy[c], ry[c]), full_output=1) ra0, dec0 = w.wcs_pix2world([[ax[0], ay[0]]], 1)[0] # Compute residuals drx = ax[0]+ax[1]*dx+ax[2]*dy-rx dry = ay[0]+ay[1]*dx+ay[2]*dy-ry dr = np.sqrt(drx*drx+dry*dry) rms = np.sqrt(np.sum(dr[c]**2)/len(dr[c])) dr[~c] = 1.0 c = (dr < 2.0*rms) pix_catalog.flag[~c] = 0 # Break if converged if np.sum(c) == nstars: break nstars = np.sum(c) # Compute residuals rmsx = np.sqrt(np.sum(drx[c]**2)/len(drx[c])) rmsy = np.sqrt(np.sum(dry[c]**2)/len(dry[c])) # Store header w = wcs.WCS(naxis=2) w.wcs.crpix = np.array([x0, y0]) w.wcs.crval = np.array([ra0, dec0]) w.wcs.cd = np.array([[ax[1], ax[2]], [ay[1], ay[2]]]) w.wcs.ctype = ["RA---TAN", "DEC--TAN"] w.wcs.set_pv([(2, 1, 45.0)]) return w, rmsx, rmsy, rms def add_wcs(fname, w, rmsx, rmsy): # Read fits hdu = fits.open(fname) whdr = {"CRPIX1": w.wcs.crpix[0], "CRPIX2": w.wcs.crpix[1], "CRVAL1": w.wcs.crval[0], "CRVAL2": w.wcs.crval[1], "CD1_1": w.wcs.cd[0, 0], "CD1_2": w.wcs.cd[0, 1], "CD2_1": w.wcs.cd[1, 0], "CD2_2": w.wcs.cd[1, 1], "CTYPE1": "RA---TAN", "CTYPE2": "DEC--TAN", "CUNIT1": "DEG", "CUNIT2": "DEG", "CRRES1": rmsx, "CRRES2": rmsy} # Add keywords hdr = hdu[0].header for k, v in whdr.items(): hdr[k] = v hdu = fits.PrimaryHDU(header=hdr, data=hdu[0].data) hdu.writeto(fname, overwrite=True, output_verify="ignore") return def calibrate_from_reference(fname, ref, pix_catalog): # Estimated WCS w = estimate_wcs_from_reference(ref, fname) # Default rms values rmsx = 0.0 rmsy = 0.0 # Read catalogs if (pix_catalog.nstars > 4): ast_catalog = tycho2_catalog(10.0) # Match catalogs nmatch = match_catalogs(ast_catalog, pix_catalog, w, 10.0) # Fit transformation if nmatch > 4: w, rmsx, rmsy, rms = fit_wcs(w, pix_catalog) # Add wcs add_wcs(fname, w, rmsx, rmsy) return w, rmsx, rmsy def is_calibrated(ff): if (3600.0*ff.crres[0] < 1e-3) | \ (3600.0*ff.crres[1] < 1e-3) | \ (ff.crres[0]/ff.sx > 2.0) | \ (ff.crres[1]/ff.sy > 2.0): return False else: return True def generate_reference_with_anet(fname, cmd_args, reffname="test.fits", tempfroot="cal"): # Copy file to generic name shutil.copy2(fname, tempfroot + ".fits") # Get center hdu = fits.open(fname) ny, nx = hdu[0].data[0].shape # Generate command command = "solve-field %s -l 20 -O -N %s --crpix-x %d --crpix-y %d -t 1 %s.fits" % (cmd_args, reffname, nx//2, ny//2, tempfroot) # Run command try: subprocess.check_output(command, shell=True, stderr=subprocess.STDOUT) # Did the astrometry succeed? solved = os.path.exists("%s.solved" % tempfroot) except: solved = False # Remove temporary files for fname in glob.glob("%s*" % tempfroot): try: os.remove(fname) except: pass return solved