lofarimaging/lofarimaging/lofarimaging.py

213 lines
8.0 KiB
Python

"""Functions for working with LOFAR single station data"""
from typing import Dict, List
import numpy as np
from numpy.linalg import norm, lstsq
import numexpr as ne
import numba
from astropy.coordinates import SkyCoord, SkyOffsetFrame, CartesianRepresentation
__all__ = ["nearfield_imager", "sky_imager", "ground_imager", "skycoord_to_lmn", "calibrate", "simulate_sky_source",
"subtract_sources"]
__version__ = "1.5.0"
SPEED_OF_LIGHT = 299792458.0
def skycoord_to_lmn(pos: SkyCoord, phasecentre: SkyCoord):
"""
Convert astropy sky coordinates into the l,m,n coordinate system
relative to a phase centre.
The l,m,n is a RHS coordinate system with
* its origin on the sky sphere
* m,n and the celestial north on the same plane
* l,m a tangential plane of the sky sphere
Note that this means that l increases east-wards
This function was taken from https://github.com/SKA-ScienceDataProcessor/algorithm-reference-library
"""
# Determine relative sky position
todc = pos.transform_to(SkyOffsetFrame(origin=phasecentre))
dc = todc.represent_as(CartesianRepresentation)
dc /= dc.norm()
# Do coordinate transformation - astropy's relative coordinates do
# not quite follow imaging conventions
return dc.y.value, dc.z.value, dc.x.value - 1
@numba.jit(parallel=True)
def sky_imager(visibilities, baselines, freq, npix_l, npix_m):
"""
Sky imager
Args:
visibilities: Numpy array with visibilities, shape [num_antennas x num_antennas]
baselines: Numpy array with distances between antennas, shape [num_antennas, num_antennas, 3]
freq: frequency
npix_l: Number of pixels in l-direction
npix_m: Number of pixels in m-direction
Returns:
np.array(float): Real valued array of shape [npix_l, npix_m]
"""
img = np.zeros((npix_m, npix_l), dtype=np.complex128)
for m_ix in range(npix_m):
m = -1 + m_ix * 2 / npix_m
for l_ix in range(npix_l):
l = 1 - l_ix * 2 / npix_l
img[m_ix, l_ix] = np.mean(visibilities * np.exp(-2j * np.pi * freq *
(baselines[:, :, 0] * l + baselines[:, :, 1] * m) /
SPEED_OF_LIGHT))
return np.real(img)
def ground_imager(visibilities, freq, npix_p, npix_q, dims, station_pqr, height=1.5):
"""Do a Fourier transform for ground imaging"""
img = np.zeros([npix_q, npix_p], dtype=np.complex128)
for q_ix, q in enumerate(np.linspace(dims[2], dims[3], npix_q)):
for p_ix, p in enumerate(np.linspace(dims[0], dims[1], npix_p)):
r = height
pqr = np.array([p, q, r], dtype=np.float32)
antdist = np.linalg.norm(station_pqr - pqr[np.newaxis, :], axis=1)
groundbase = antdist[:, np.newaxis] - antdist[np.newaxis, :]
img[q_ix, p_ix] = np.mean(visibilities * np.exp(-2j * np.pi * freq * (-groundbase) / SPEED_OF_LIGHT))
return img
def nearfield_imager(visibilities, baseline_indices, freqs, npix_p, npix_q, extent, station_pqr, height=1.5,
max_memory_mb=200):
"""
Nearfield imager
Args:
visibilities: Numpy array with visibilities, shape [num_visibilities x num_frequencies]
baseline_indices: List with tuples of antenna numbers in visibilities, shape [2 x num_visibilities]
freqs: List of frequencies
npix_p: Number of pixels in p-direction
npix_q: Number of pixels in q-direction
extent: Extent (in m) that the image should span
station_pqr: PQR coordinates of stations
height: Height of image in metre
max_memory_mb: Maximum amount of memory to use for the biggest array. Higher may improve performance.
Returns:
np.array(complex): Complex valued array of shape [npix_p, npix_q]
"""
z = height
x = np.linspace(extent[0], extent[1], npix_p)
y = np.linspace(extent[2], extent[3], npix_q)
posx, posy = np.meshgrid(x, y)
posxyz = np.transpose(np.array([posx, posy, z * np.ones_like(posx)]), [1, 2, 0])
diff_vectors = (station_pqr[:, None, None, :] - posxyz[None, :, :, :])
distances = np.linalg.norm(diff_vectors, axis=3)
vis_chunksize = max_memory_mb * 1024 * 1024 // (8 * npix_p * npix_q)
bl_diff = np.zeros((vis_chunksize, npix_q, npix_p), dtype=np.float64)
img = np.zeros((npix_q, npix_p), dtype=np.complex128)
for vis_chunkstart in range(0, len(baseline_indices), vis_chunksize):
vis_chunkend = min(vis_chunkstart + vis_chunksize, baseline_indices.shape[0])
# For the last chunk, bl_diff_chunk is a bit smaller than bl_diff
bl_diff_chunk = bl_diff[:vis_chunkend - vis_chunkstart, :]
np.add(distances[baseline_indices[vis_chunkstart:vis_chunkend, 0]],
-distances[baseline_indices[vis_chunkstart:vis_chunkend, 1]], out=bl_diff_chunk)
j2pi = 1j * 2 * np.pi
for ifreq, freq in enumerate(freqs):
v = visibilities[vis_chunkstart:vis_chunkend, ifreq][:, None, None]
lamb = SPEED_OF_LIGHT / freq
# v[:,np.newaxis,np.newaxis]*np.exp(-2j*np.pi*freq/c*groundbase_pixels[:,:,:]/c)
# groundbase_pixels=nvis x npix x npix
np.add(img, np.sum(ne.evaluate("v * exp(j2pi * bl_diff_chunk / lamb)"), axis=0), out=img)
img /= len(freqs) * len(baseline_indices)
return img
def calibrate(vis, modelvis, maxiter=30, amplitudeonly=True):
"""
Calibrate and subtract some sources
Args:
vis: visibility matrix, shape [n_st, n_st]
modelvis: model visibility matrices, shape [n_dir, n_st, n_st]
maxiter: max iterations (default 30)
amplitudeonly: fit only amplitudes (default True)
Returns:
residual: visibilities with calibrated directions subtracted, shape [n_st, n_st]
gains: gains, shape [n_dir, n_st]
"""
nst = vis.shape[1]
ndir = np.array(modelvis).shape[0]
gains = np.ones([ndir, nst], dtype=np.complex)
if ndir == 0:
return vis, gains
else:
gains *= np.sqrt(norm(vis) / norm(modelvis))
iteration = 0
while iteration < maxiter:
iteration += 1
gains_prev = gains.copy()
for k in range(nst):
z = np.conj(gains_prev) * np.array(modelvis)[:, :, k]
gains[:, k] = lstsq(z.T, vis[:, k], rcond=None)[0]
if amplitudeonly:
gains = np.abs(gains).astype(np.complex)
if iteration % 2 == 0 and iteration > 0:
dg = norm(gains - gains_prev)
residual = vis.copy()
for d in range(ndir):
residual -= np.diag(np.conj(gains[d])) @ modelvis[d] @ np.diag(gains[d])
gains = 0.5 * gains + 0.5 * gains_prev
return residual, gains
def simulate_sky_source(lmn_coord: np.array, baselines: np.array, freq: float):
"""
Simulate visibilities for a sky source
Args:
lmn_coord (np.array): l, m, n coordinate
baselines (np.array): baseline distances in metres, shape (n_ant, n_ant)
freq (float): Frequency in Hz
"""
return np.exp(2j * np.pi * freq * baselines.dot(np.array(lmn_coord)) / SPEED_OF_LIGHT)
def subtract_sources(vis: np.array, baselines: np.array, freq: float, lmn_dict: Dict[str, np.array],
sources=["Cas A", "Cyg A", "Sun"]):
"""
Subtract sky sources from visibilities
Args:
vis (np.array): visibility matrix, shape [n_ant, n_ant]
lmn_dict (Dict[str, np.array]): dictionary with lmn coordinates
baselines (np.array): baseline distances in metres, shape (n_ant, n_ant)
freq (float): Frequency in Hz
sources (List[str]): list with source names to subtract (should all be in lmn_dict).
Default ["Cas A", "Sun"]
Returns:
vis (np.array): visibility matrix with sources subtracted
"""
modelvis = [simulate_sky_source(lmn_dict[srcname], baselines, freq) for srcname in lmn_dict
if srcname in sources]
residual, _ = calibrate(vis, modelvis)
return residual