lofarimaging/lofarimaging/singlestationutil.py

724 lines
27 KiB
Python
Raw Normal View History

2020-02-28 05:54:45 -07:00
"""Functions for working with LOFAR single station data"""
import numpy as np
import os
import datetime
import lofargeotiff
from lofarantpos.db import LofarAntennaDatabase
import matplotlib.pyplot as plt
from matplotlib.ticker import FormatStrFormatter
from matplotlib import cm
2020-03-14 15:55:33 -06:00
from matplotlib.figure import Figure
2020-02-28 05:54:45 -07:00
from matplotlib.colors import ListedColormap, Normalize
from mpl_toolkits.axes_grid1 import make_axes_locatable
2020-03-02 13:05:50 -07:00
from matplotlib.patches import Circle
2020-02-28 05:54:45 -07:00
import matplotlib.axes as maxes
from astropy.coordinates import SkyCoord, GCRS, EarthLocation, AltAz, get_sun
import astropy.units as u
from astropy.time import Time
2020-03-13 10:11:59 -06:00
from typing import List, Dict, Any, Tuple, Union
2020-02-28 05:54:45 -07:00
import lofarantpos
from packaging import version
from .maputil import get_map, make_leaflet_map
from .lofarimaging import nearfield_imager, sky_imager, skycoord_to_lmn
__all__ = ["sb_from_freq", "freq_from_sb", "find_caltable", "read_caltable",
2020-03-13 10:11:59 -06:00
"rcus_in_station", "read_acm_cube", "get_station_pqr", "get_station_type",
2020-03-18 04:50:33 -06:00
"make_sky_plot", "make_ground_plot", "make_xst_plots", "apply_calibration",
"get_full_station_name"]
2020-02-28 05:54:45 -07:00
__version__ = "1.5.0"
# Configurations for HBA observations with a single dipole activated per tile.
GENERIC_INT_201512 = [0, 5, 3, 1, 8, 3, 12, 15, 10, 13, 11, 5, 12, 12, 5, 2, 10, 8, 0, 3, 5, 1, 4, 0, 11, 6, 2, 4, 9,
14, 15, 3, 7, 5, 13, 15, 5, 6, 5, 12, 15, 7, 1, 1, 14, 9, 4, 9, 3, 9, 3, 13, 7, 14, 7, 14, 2, 8,
8, 0, 1, 4, 2, 2, 12, 15, 5, 7, 6, 10, 12, 3, 3, 12, 7, 4, 6, 0, 5, 9, 1, 10, 10, 11, 5, 11, 7, 9,
7, 6, 4, 4, 15, 4, 1, 15]
GENERIC_CORE_201512 = [0, 10, 4, 3, 14, 0, 5, 5, 3, 13, 10, 3, 12, 2, 7, 15, 6, 14, 7, 5, 7, 9, 0, 15, 0, 10, 4, 3, 14,
0, 5, 5, 3, 13, 10, 3, 12, 2, 7, 15, 6, 14, 7, 5, 7, 9, 0, 15]
GENERIC_REMOTE_201512 = [0, 13, 12, 4, 11, 11, 7, 8, 2, 7, 11, 2, 10, 2, 6, 3, 8, 3, 1, 7, 1, 15, 13, 1, 11, 1, 12, 7,
10, 15, 8, 2, 12, 13, 9, 13, 4, 5, 5, 12, 5, 5, 9, 11, 15, 12, 2, 15]
assert (version.parse(lofarantpos.__version__) >= version.parse("0.4.0"))
2020-03-13 10:11:59 -06:00
def sb_from_freq(freq: float, rcu_mode: Union[int, str] = 1) -> int:
2020-02-28 05:54:45 -07:00
"""
Convert subband number to central frequency
Args:
rcu_mode: rcu mode
freq: frequency in Hz
Returns:
int: subband number
2020-03-13 10:11:59 -06:00
Example:
>>> sb_from_freq(58007812.5, '3')
297
2020-02-28 05:54:45 -07:00
"""
clock = 200e6
2020-03-13 10:11:59 -06:00
if int(rcu_mode) == 6:
2020-02-28 05:54:45 -07:00
clock = 160e6
2020-03-13 10:11:59 -06:00
2020-02-28 05:54:45 -07:00
freq_offset = 0
2020-03-13 10:11:59 -06:00
if int(rcu_mode) == 5:
2020-02-28 05:54:45 -07:00
freq_offset = 100e6
2020-03-13 10:11:59 -06:00
elif int(rcu_mode) == 6:
2020-02-28 05:54:45 -07:00
freq_offset = 160e6
2020-03-13 10:11:59 -06:00
elif int(rcu_mode) == 7:
2020-02-28 05:54:45 -07:00
freq_offset = 200e6
2020-03-13 10:11:59 -06:00
sb_bandwidth = 0.5 * clock / 512.
2020-02-28 05:54:45 -07:00
sb = round((freq - freq_offset) / sb_bandwidth)
return int(sb)
2020-03-13 10:11:59 -06:00
def freq_from_sb(sb: int, rcu_mode: Union[str, int] = 1):
2020-02-28 05:54:45 -07:00
"""
Convert central frequency to subband number
Args:
rcu_mode: rcu mode
sb: subband number
Returns:
float: frequency in Hz
2020-03-13 10:11:59 -06:00
Example:
>>> freq_from_sb(297, '3')
58007812.5
2020-02-28 05:54:45 -07:00
"""
clock = 200e6
2020-03-13 10:11:59 -06:00
if int(rcu_mode) == 6:
2020-02-28 05:54:45 -07:00
clock = 160e6
2020-03-13 10:11:59 -06:00
2020-02-28 05:54:45 -07:00
freq_offset = 0
2020-03-13 10:11:59 -06:00
if int(rcu_mode) == 5:
2020-02-28 05:54:45 -07:00
freq_offset = 100e6
2020-03-13 10:11:59 -06:00
elif int(rcu_mode) == 6:
2020-02-28 05:54:45 -07:00
freq_offset = 160e6
2020-03-13 10:11:59 -06:00
elif int(rcu_mode) == 7:
2020-02-28 05:54:45 -07:00
freq_offset = 200e6
2020-03-13 10:11:59 -06:00
2020-02-28 05:54:45 -07:00
sb_bandwidth = 0.5 * clock / 512.
freq = (sb * sb_bandwidth) + freq_offset
return freq
2020-03-17 14:13:48 -06:00
def find_caltable(field_name: str, rcu_mode: Union[str, int], caltable_dir='caltables'):
2020-02-28 05:54:45 -07:00
"""
Find the file of a caltable.
2020-03-13 10:11:59 -06:00
2020-02-28 05:54:45 -07:00
Args:
field_name: Name of the antenna field, e.g. 'DE602LBA'
rcu_mode: Receiver mode for which the calibration table is requested.
2020-03-17 14:13:48 -06:00
caltable_dir: Root directory under which station information is stored in
2020-02-28 05:54:45 -07:00
subdirectories DE602C/etc/, RS106/etc/, ...
Returns:
2020-03-13 10:11:59 -06:00
str: full path to caltable if it exists, None if nothing found
Example:
2020-03-17 14:13:48 -06:00
>>> find_caltable("DE603LBA", "3", caltable_dir="test/CalTables")
2020-03-13 10:11:59 -06:00
'test/CalTables/DE603/CalTable-603-LBA_INNER-10_90.dat'
>>> find_caltable("ES615HBA", "5") is None
True
2020-02-28 05:54:45 -07:00
"""
station, field = field_name[0:5].upper(), field_name[5:].upper()
station_number = station[2:5]
2020-03-13 10:11:59 -06:00
filename = f"CalTable-{station_number}"
if str(rcu_mode) in ('outer', '1', '2'):
2020-03-13 10:11:59 -06:00
filename += "-LBA_OUTER-10_90.dat"
elif str(rcu_mode) in ('inner', '3', '4'):
2020-03-13 10:11:59 -06:00
filename += "-LBA_INNER-10_90.dat"
elif str(rcu_mode) == '5':
2020-03-13 10:11:59 -06:00
filename += "-HBA-110_190.dat"
elif str(rcu_mode) == '6':
2020-03-13 10:11:59 -06:00
filename += "-HBA-170_230.dat"
elif str(rcu_mode) == '7':
2020-03-13 10:11:59 -06:00
filename += "-HBA-210_250.dat"
else:
raise RuntimeError("Unexpected mode: " + str(rcu_mode) + " for field_name " + str(field_name))
2020-03-17 14:13:48 -06:00
if os.path.exists(os.path.join(caltable_dir, filename)):
2020-03-13 10:11:59 -06:00
# All caltables in one directory
2020-03-17 14:13:48 -06:00
return os.path.join(caltable_dir, filename)
elif os.path.exists(os.path.join(caltable_dir, station, filename)):
2020-03-13 10:11:59 -06:00
# Caltables in a directory per station
2020-03-17 14:13:48 -06:00
return os.path.join(caltable_dir, station, filename)
2020-02-28 05:54:45 -07:00
else:
return None
2020-03-14 15:55:33 -06:00
def read_caltable(filename: str, num_subbands=512) -> Tuple[Dict[str, str], np.ndarray]:
2020-02-28 05:54:45 -07:00
"""
Read a station's calibration table.
Args:
filename: Filename with the caltable
num_subbands: Number of subbands
Returns:
2020-03-14 15:55:33 -06:00
Tuple[Dict[str, str], np.ndarray]: A tuple containing a dict with
2020-02-28 05:54:45 -07:00
the header lines, and a 2D numpy.array of complex numbers
representing the station gain coefficients.
"""
infile = open(filename, 'rb')
header_lines = []
try:
while True:
header_lines.append(infile.readline().decode('utf8').strip())
if 'HeaderStop' in header_lines[-1]:
break
except UnicodeDecodeError:
# No header; close and open again
infile.close()
infile = open(filename, 'rb')
caldata = np.fromfile(infile, dtype=np.complex128)
num_rcus = len(caldata) // num_subbands
infile.close()
header_dict = {key: val for key, val in [line.split(" = ")
for line in header_lines[1:-1]]}
return header_dict, caldata.reshape((num_subbands, num_rcus))
2020-03-17 14:13:48 -06:00
def apply_calibration(visibilities: np.ndarray, station_name: str, rcu_mode: Union[str, int],
subband: int, caltable_dir: str = "CalTables"):
"""
Apply calibration to visibilities
Args:
visibilities (np.ndarray): Visibility cube
station_name (str): Station name, e.g. "DE603"
rcu_mode (Union[str, int]): RCU mode, e.g. 5
subband (int): Subband
caltable_dir (str, optional): Directory with calibration tables. Defaults to "CalTables".
Returns:
Tuple[np.ndarray, Dict[str, str]]: modified visibilities and dictionary with calibration info
"""
caltable_filename = find_caltable(station_name, rcu_mode=rcu_mode,
caltable_dir=caltable_dir)
cal_header = {}
if caltable_filename is None:
print('No calibration table found... cube remains uncalibrated!')
else:
cal_header, cal_data = read_caltable(caltable_filename)
rcu_gains = cal_data[subband, :]
rcu_gains = np.array(rcu_gains, dtype=np.complex64)
gain_matrix = rcu_gains[np.newaxis, :] * np.conj(rcu_gains[:, np.newaxis])
visibilities = visibilities / gain_matrix
calibration_info = {}
if "CalTableHeader.Observation.Date" in cal_header:
calibration_info["calibration_obsdate"] = cal_header["CalTableHeader.Observation.Date"]
if "CalTableHeader.Calibration.Date" in cal_header:
calibration_info["calibration_date"] = cal_header["CalTableHeader.Calibration.Date"]
if "CalTableHeader.Comment" in cal_header:
calibration_info["calibration_comment"] = cal_header["CalTableHeader.Comment"]
if caltable_filename is not None:
calibration_info["calibration_filename"] = caltable_filename
return visibilities, calibration_info
2020-02-28 05:54:45 -07:00
def rcus_in_station(station_type: str):
"""
Give the number of RCUs in a station, given its type.
Args:
station_type: Kind of station that produced the correlation. One of
'core', 'remote', 'intl'.
2020-03-13 10:11:59 -06:00
Example:
>>> rcus_in_station('remote')
96
2020-02-28 05:54:45 -07:00
"""
return {'core': 96, 'remote': 96, 'intl': 192}[station_type]
def read_acm_cube(filename: str, station_type: str):
"""
Read an ACM binary data cube (function from Michiel)
Args:
filename: File containing the array correlation matrix.
station_type: Kind of station that produced the correlation. One of
'core', 'remote', 'intl'.
Returns:
np.array: 3D cube of complex numbers, with indices [time slots, rcu, rcu].
2020-03-13 10:11:59 -06:00
Example:
>>> cube = read_acm_cube('test/20170720_095816_mode_3_xst_sb297.dat', 'intl')
>>> cube.shape
(29, 192, 192)
2020-02-28 05:54:45 -07:00
"""
num_rcu = rcus_in_station(station_type)
data = np.fromfile(filename, dtype=np.complex128)
time_slots = int(len(data) / num_rcu / num_rcu)
return data.reshape((time_slots, num_rcu, num_rcu))
2020-03-13 10:11:59 -06:00
def get_station_type(station_name: str) -> str:
"""
Get the station type, one of 'intl', 'core' or 'remote'
Args:
station_name: Station name, e.g. "DE603LBA" or just "DE603"
Returns:
str: station type, one of 'intl', 'core' or 'remote'
Example:
>>> get_station_type("DE603")
2020-03-13 10:11:59 -06:00
'intl'
"""
if station_name[0] == "C":
return "core"
elif station_name[0] == "R" or station_name[:5] == "PL611":
return "remote"
else:
return "intl"
2020-03-17 13:37:54 -06:00
def get_station_pqr(station_name: str, rcu_mode: Union[str, int], db):
2020-03-13 10:11:59 -06:00
"""
Get PQR coordinates for the relevant subset of antennas in a station.
Args:
station_name: Station name, e.g. 'DE603LBA' or 'DE603'
2020-03-17 13:37:54 -06:00
rcu_mode: RCU mode (0 - 6, can be string)
2020-03-13 10:11:59 -06:00
db: instance of LofarAntennaDatabase from lofarantpos
Example:
>>> from lofarantpos.db import LofarAntennaDatabase
>>> db = LofarAntennaDatabase()
>>> pqr = get_station_pqr("DE603", "outer", db)
2020-03-13 10:11:59 -06:00
>>> pqr.shape
(96, 3)
>>> pqr[0, 0]
1.7434713
>>> pqr = get_station_pqr("LV614", "5", db)
>>> pqr.shape
(96, 3)
2020-03-13 10:11:59 -06:00
"""
full_station_name = get_full_station_name(station_name, rcu_mode)
station_type = get_station_type(full_station_name)
2020-03-13 10:11:59 -06:00
if 'LBA' in station_name or str(rcu_mode) in ('1', '2', '3', '4', 'inner', 'outer'):
2020-02-28 05:54:45 -07:00
# Get the PQR positions for an individual station
station_pqr = db.antenna_pqr(full_station_name)
2020-02-28 05:54:45 -07:00
# Exception: for Dutch stations (sparse not yet accommodated)
2020-03-17 13:37:54 -06:00
if (station_type == 'core' or station_type == 'remote') and int(rcu_mode) in (3, 4):
2020-02-28 05:54:45 -07:00
station_pqr = station_pqr[0:48, :]
2020-03-17 13:37:54 -06:00
elif (station_type == 'core' or station_type == 'remote') and int(rcu_mode) in (1,2):
2020-02-28 05:54:45 -07:00
station_pqr = station_pqr[48:, :]
elif 'HBA' in station_name or str(rcu_mode) in ('5', '6', '7', '8'):
2020-02-28 05:54:45 -07:00
selected_dipole_config = {
'intl': GENERIC_INT_201512, 'remote': GENERIC_REMOTE_201512, 'core': GENERIC_CORE_201512
}
selected_dipoles = selected_dipole_config[station_type] + \
np.arange(len(selected_dipole_config[station_type])) * 16
station_pqr = db.hba_dipole_pqr(full_station_name)[selected_dipoles]
2020-02-28 05:54:45 -07:00
else:
raise RuntimeError("Station name did not contain LBA or HBA, could not load antenna positions")
return station_pqr.astype('float32')
2020-03-14 15:55:33 -06:00
def make_ground_plot(image: np.ndarray, background_map: np.ndarray, extent: List[float], title: str = "Ground plot",
subtitle: str = "", opacity: float = 0.6, fig: Figure = None, **kwargs) \
-> Tuple[Figure, np.ndarray]:
2020-03-04 13:47:11 -07:00
"""
Make a ground plot of an array with data
Args:
image: numpy array (two dimensions with data)
background_map: background map
title: Title for the plot
subtitle: Subtitle for the plot
opacity: maximum opacity of the plot
fig: exisiting figure object to be reused
**kwargs: other options to be passed to plt.imshow (e.g. vmin)
Returns:
Updated figure and numpy array with only the plot
2020-03-13 10:11:59 -06:00
Example:
>>> dummy_image = np.random.rand(150, 150)
2020-03-13 10:11:59 -06:00
>>> fig, plot_array = make_ground_plot(dummy_image, dummy_image, [-300, 300, -100, 100])
>>> plot_array.shape
(150, 150, 4)
2020-03-04 13:47:11 -07:00
"""
if fig is None:
fig = plt.figure(figsize=(10, 10), constrained_layout=True)
# Make colors semi-transparent in the lower 3/4 of the scale
cmap = cm.Spectral_r
cmap_with_alpha = cmap(np.arange(cmap.N))
cmap_with_alpha[:, -1] = np.clip(np.linspace(0, 1.5, cmap.N), 0., 1.)
cmap_with_alpha = ListedColormap(cmap_with_alpha)
# Plot the resulting image
ax = fig.add_subplot(111, ymargin=-0.4)
ax.imshow(background_map, extent=extent)
cimg = ax.imshow(image, origin='lower', cmap=cmap_with_alpha, extent=extent,
alpha=opacity, **kwargs)
divider = make_axes_locatable(ax)
cax = divider.append_axes("right", size="5%", pad=0.2, axes_class=maxes.Axes)
cbar = fig.colorbar(cimg, cax=cax, orientation="vertical", format="%.1e")
cbar.set_alpha(1.0)
cbar.draw_all()
# cbar.set_ticks([])
ax.set_xlabel('$W-E$ (metres)', fontsize=14)
ax.set_ylabel('$S-N$ (metres)', fontsize=14)
ax.text(0.5, 1.05, title, fontsize=17, ha='center', va='bottom', transform=ax.transAxes)
ax.text(0.5, 1.02, subtitle, fontsize=12, ha='center', va='bottom', transform=ax.transAxes)
# Change limits to match the original specified extent in the localnorth frame
ax.set_xlim(extent[0], extent[1])
ax.set_ylim(extent[2], extent[3])
ax.tick_params(axis='both', which='both', length=0)
# Place the NSEW coordinate directions
ax.text(0.95, 0.5, 'E', color='w', fontsize=18, transform=ax.transAxes, ha='center', va='center')
ax.text(0.05, 0.5, 'W', color='w', fontsize=18, transform=ax.transAxes, ha='center', va='center')
ax.text(0.5, 0.95, 'N', color='w', fontsize=18, transform=ax.transAxes, ha='center', va='center')
ax.text(0.5, 0.05, 'S', color='w', fontsize=18, transform=ax.transAxes, ha='center', va='center')
ground_vmin_img, ground_vmax_img = cimg.get_clim()
ax.contour(image, np.linspace(ground_vmin_img, ground_vmax_img, 15), origin='lower', cmap=cm.Greys,
extent=extent, linewidths=0.5, alpha=opacity)
ax.grid(True, alpha=0.3)
vmin, vmax = cimg.get_clim()
raw_plotdata = cmap_with_alpha(Normalize(vmin=vmin, vmax=vmax)(image))[::-1, :]
return fig, raw_plotdata
2020-03-14 15:55:33 -06:00
def make_sky_plot(image: np.ndarray, marked_bodies_lmn: Dict[str, Tuple[float, float, float]],
title: str = "Sky plot", subtitle: str = "", fig: Figure = None,
**kwargs) -> Figure:
2020-03-02 13:05:50 -07:00
"""
2020-03-04 13:47:11 -07:00
Make a sky plot out of an array with data
2020-03-02 13:05:50 -07:00
Args:
image: numpy array (two dimensions with data)
marked_bodies_lmn: dict with objects to annotate (values should be lmn coordinates)
title: Title for the plot
subtitle: Subtitle for the plot
fig: existing figure object to be reused
**kwargs: other options to be passed to plt.imshow (e.g. vmin)
Returns:
2020-03-04 13:47:11 -07:00
Updated figure
2020-03-13 10:11:59 -06:00
Example:
>>> dummy_image = np.zeros((150, 150))
>>> fig = make_sky_plot(dummy_image, {})
2020-03-02 13:05:50 -07:00
"""
if fig is None:
fig = plt.figure(figsize=(10, 10))
ax = fig.add_subplot(1, 1, 1)
circle1 = Circle((0, 0), 1.0, edgecolor='k', fill=False, facecolor='none', alpha=0.3)
ax.add_artist(circle1)
cimg = ax.imshow(image, origin='lower', cmap=cm.Spectral_r, extent=(1, -1, -1, 1),
clip_path=circle1, clip_on=True, **kwargs)
divider = make_axes_locatable(ax)
cax = divider.append_axes("right", size="5%", pad=0.2, axes_class=maxes.Axes)
fig.colorbar(cimg, cax=cax, orientation="vertical", format="%.1e")
ax.set_xlim(1, -1)
ax.set_xticks(np.arange(-1, 1.1, 0.5))
ax.xaxis.set_major_formatter(FormatStrFormatter('%.1f'))
ax.set_yticks(np.arange(-1, 1.1, 0.5))
ax.yaxis.set_major_formatter(FormatStrFormatter('%.1f'))
# Labels
ax.set_xlabel('$â„“$', fontsize=14)
ax.set_ylabel('$m$', fontsize=14)
ax.text(0.5, 1.05, title, fontsize=17, ha='center', va='bottom', transform=ax.transAxes)
ax.text(0.5, 1.02, subtitle, fontsize=12, ha='center', va='bottom', transform=ax.transAxes)
for body_name, lmn in marked_bodies_lmn.items():
ax.plot([lmn[0]], [lmn[1]], marker='x', color='black', mew=0.5)
ax.annotate(body_name, (lmn[0], lmn[1]))
# Plot the compass directions
ax.text(0.9, 0, 'E', horizontalalignment='center', verticalalignment='center', color='w', fontsize=17)
ax.text(-0.9, 0, 'W', horizontalalignment='center', verticalalignment='center', color='w', fontsize=17)
ax.text(0, 0.9, 'N', horizontalalignment='center', verticalalignment='center', color='w', fontsize=17)
ax.text(0, -0.9, 'S', horizontalalignment='center', verticalalignment='center', color='w', fontsize=17)
return fig
def get_full_station_name(station_name: str, rcu_mode: Union[str, int]) -> str:
"""
Get full station name with the field appended, e.g. DE603LBA
Args:
station_name (str): Short station name, e.g. 'DE603'
rcu_mode (Union[str, int]): RCU mode
Returns:
str: Full station name, e.g. DE603LBA
Example:
>>> get_full_station_name("DE603", '3')
'DE603LBA'
>>> get_full_station_name("LV614", 5)
'LV614HBA'
>>> get_full_station_name("CS013LBA", 1)
'CS013LBA'
>>> get_full_station_name("CS002", 1)
'CS002LBA'
"""
if len(station_name) > 5:
return station_name
if str(rcu_mode) in ('1', '2', 'outer'):
if len(station_name) == 5:
station_name += "LBA"
elif str(rcu_mode) in ('3', '4', 'inner'):
if len(station_name) == 5:
station_name += "LBA"
elif str(rcu_mode) in ('5', '6', '7'):
if len(station_name) == 5:
station_name += "HBA"
else:
raise Exception("Unexpected rcu_mode: ", rcu_mode)
return station_name
2020-03-13 10:11:59 -06:00
def make_xst_plots(xst_filename: str,
station_name: str,
caltable_dir: str = "CalTables",
extent: List[float] = None,
pixels_per_metre: float = 0.5,
sky_vmin: float = None,
sky_vmax: float = None,
ground_vmin: float = None,
ground_vmax: float = None,
height: float = 1.5,
map_zoom: int = 19,
sky_only: bool = False,
opacity: float = 0.6):
"""
Create sky and ground plots for an XST file
Args:
xst_filename: Full path to XST file
station_name: Full station name, e.g. "DE603LBA"
caltable_dir: Caltable directory. Defaults to "CalTables".
extent: Extent (in m) for ground image. Defaults to [-150, 150, -150, 150]
pixels_per_metre: Pixels per metre. Defaults to 0.5.
height: Height (in m) for ground image. Defaults to 1.5.
map_zoom: Zoom level for map tiles. Defaults to 19.
sky_only: Make sky image only. Defaults to False.
opacity: Opacity for map overlay. Defaults to 0.6.
Returns:
Leaflet map (that renders as an interactive map in a notebook)
Example:
>>> leafletmap = make_xst_plots("test/20170720_095816_mode_3_xst_sb297.dat", \
"DE603LBA", caltable_dir="test/CalTables")
Maximum at -6m east, 70m north of station center (lat/long 50.97998, 11.71118)
>>> type(leafletmap)
2020-03-13 10:21:44 -06:00
<class 'folium.folium.Map'>
2020-03-13 10:11:59 -06:00
"""
2020-02-28 05:54:45 -07:00
cubename = os.path.basename(xst_filename)
if extent is None:
extent = [-150, 150, -150, 150]
2020-03-13 10:11:59 -06:00
station_type = get_station_type(station_name)
2020-02-28 05:54:45 -07:00
os.makedirs('results', exist_ok=True)
2020-02-28 05:54:45 -07:00
# Distill metadata from filename
obsdatestr, obstimestr, _, rcu_mode, _, subbandname = cubename.rstrip(".dat").split("_")
subband = int(subbandname[2:])
# Get the data
fname = f"{obsdatestr}_{obstimestr}_{station_name}_SB{subband}"
npix_l, npix_m = 131, 131
freq = freq_from_sb(subband, rcu_mode=rcu_mode)
# Which slice in time to visualise
timestep = 0
# For ground imaging
ground_resolution = pixels_per_metre # pixels per metre for ground_imaging, default is 0.5 pixel/metre
obstime = datetime.datetime.strptime(obsdatestr + ":" + obstimestr, '%Y%m%d:%H%M%S')
cube = read_acm_cube(xst_filename, station_type)
2020-03-17 14:13:48 -06:00
cube, calibration_info = apply_calibration(cube, station_name, rcu_mode, subband, caltable_dir=caltable_dir)
2020-02-28 05:54:45 -07:00
# Split into the XX and YY polarisations (RCUs)
# This needs to be modified in future for LBA sparse
cube_xx = cube[:, 0::2, 0::2]
cube_yy = cube[:, 1::2, 1::2]
visibilities_all = cube_xx + cube_yy
# Stokes I for specified timestep
visibilities = visibilities_all[timestep]
# Setup the database
db = LofarAntennaDatabase()
2020-03-17 13:37:54 -06:00
station_pqr = get_station_pqr(station_name, rcu_mode, db)
2020-02-28 05:54:45 -07:00
# Rotate station_pqr to a north-oriented xyz frame, where y points North, in a plane through the station.
rotation = db.rotation_from_north(station_name)
pqr_to_xyz = np.array([[np.cos(-rotation), -np.sin(-rotation), 0],
[np.sin(-rotation), np.cos(-rotation), 0],
[0, 0, 1]])
station_name = get_full_station_name(station_name, rcu_mode)
2020-02-28 05:54:45 -07:00
station_xyz = (pqr_to_xyz @ station_pqr.T).T
baselines = station_xyz[:, np.newaxis, :] - station_xyz[np.newaxis, :, :]
2020-03-02 13:05:50 -07:00
# Fourier transform
2020-02-28 05:54:45 -07:00
# visibilities = cube_xx[2,:,:]
img = sky_imager(visibilities, baselines, freq, npix_l, npix_m)
obstime_astropy = Time(obstime)
# Determine positions of Cas A and Cyg A
station_earthlocation = EarthLocation.from_geocentric(*(db.phase_centres[station_name] * u.m))
zenith = AltAz(az=0 * u.deg, alt=90 * u.deg, obstime=obstime_astropy,
location=station_earthlocation).transform_to(GCRS)
marked_bodies = {
'Cas A': SkyCoord(ra=350.85 * u.deg, dec=58.815 * u.deg),
'Cyg A': SkyCoord(ra=299.868 * u.deg, dec=40.734 * u.deg),
# 'Per A': SkyCoord.from_name("Perseus A"),
# 'Her A': SkyCoord.from_name("Hercules A"),
# 'Cen A': SkyCoord.from_name("Centaurus A"),
# '?': SkyCoord.from_name("J101415.9+105106"),
# '3C295': SkyCoord.from_name("3C295"),
# 'Moon': get_moon(obstime_astropy, location=station_earthlocation).transform_to(GCRS),
'Sun': get_sun(obstime_astropy)
# '3C196': SkyCoord.from_name("3C196")
}
marked_bodies_lmn = {}
for body_name, body_coord in marked_bodies.items():
# print(body_name, body_coord.separation(zenith), body_coord.separation(zenith))
if body_coord.transform_to(AltAz(location=station_earthlocation, obstime=obstime_astropy)).alt > 0:
marked_bodies_lmn[body_name] = skycoord_to_lmn(marked_bodies[body_name], zenith)
# Plot the resulting sky image
2020-03-02 13:05:50 -07:00
fig = plt.figure(figsize=(10, 10))
2020-02-28 05:54:45 -07:00
2020-03-02 13:05:50 -07:00
make_sky_plot(img, marked_bodies_lmn, title=f"Sky image for {station_name}",
2020-03-04 14:05:56 -07:00
subtitle=f"SB {subband} ({freq / 1e6:.1f} MHz), {str(obstime)[:16]}", fig=fig,
vmin=sky_vmin, vmax=sky_vmax)
2020-02-28 05:54:45 -07:00
2020-03-02 13:05:50 -07:00
fig.savefig(os.path.join('results', f'{fname}_sky_calibrated.png'), bbox_inches='tight', dpi=200)
2020-02-28 05:54:45 -07:00
plt.close(fig)
if sky_only:
return img
npix_x, npix_y = int(ground_resolution * (extent[1] - extent[0])), int(ground_resolution * (extent[3] - extent[2]))
os.environ["NUMEXPR_NUM_THREADS"] = "3"
# Select a subset of visibilities, only the lower triangular part
baseline_indices = np.tril_indices(visibilities.shape[0])
visibilities_selection = visibilities[baseline_indices]
img = nearfield_imager(visibilities_selection.flatten()[:, np.newaxis],
np.array(baseline_indices).T,
[freq], npix_x, npix_y, extent, station_xyz, height=height)
# Correct for taking only lower triangular part
img = np.real(2 * img)
# Convert bottom left and upper right to PQR just for lofargeo
pmin, qmin, _ = pqr_to_xyz.T @ (np.array([extent[0], extent[2], 0]))
pmax, qmax, _ = pqr_to_xyz.T @ (np.array([extent[1], extent[3], 0]))
lon_center, lat_center, _ = lofargeotiff.pqr_to_longlatheight([0, 0, 0], station_name)
lon_min, lat_min, _ = lofargeotiff.pqr_to_longlatheight([pmin, qmin, 0], station_name)
lon_max, lat_max, _ = lofargeotiff.pqr_to_longlatheight([pmax, qmax, 0], station_name)
background_map = get_map(lon_min, lon_max, lat_min, lat_max, zoom=map_zoom)
2020-03-04 13:47:11 -07:00
fig, folium_overlay = make_ground_plot(img, background_map, extent,
title=f"Near field image for {station_name}",
subtitle=f"SB {subband} ({freq / 1e6:.1f} MHz), {str(obstime)[:16]}",
2020-03-04 14:05:56 -07:00
opacity=opacity, vmin=ground_vmin, vmax=ground_vmax)
2020-02-28 05:54:45 -07:00
2020-03-02 13:05:50 -07:00
fig.savefig(os.path.join("results", f"{fname}_nearfield_calibrated.png"), bbox_inches='tight', dpi=200)
2020-02-28 05:54:45 -07:00
plt.close(fig)
maxpixel_ypix, maxpixel_xpix = np.unravel_index(np.argmax(img), img.shape)
maxpixel_x = np.interp(maxpixel_xpix, [0, npix_x], [extent[0], extent[1]])
maxpixel_y = np.interp(maxpixel_ypix, [0, npix_y], [extent[2], extent[3]])
[maxpixel_p, maxpixel_q, _] = pqr_to_xyz.T @ np.array([maxpixel_x, maxpixel_y, height])
maxpixel_lon, maxpixel_lat, _ = lofargeotiff.pqr_to_longlatheight([maxpixel_p, maxpixel_q], station_name)
# Show location of maximum if not at the image border
if 2 < maxpixel_xpix < npix_x - 2 and 2 < maxpixel_ypix < npix_y - 2:
print(f"Maximum at {maxpixel_x:.0f}m east, {maxpixel_y:.0f}m north of station center " +
f"(lat/long {maxpixel_lat:.5f}, {maxpixel_lon:.5f})")
obstime = datetime.datetime.strptime(obsdatestr + ":" + obstimestr, '%Y%m%d:%H%M%S')
tags = {"datafile": xst_filename,
"generated_with": f"lofarimaging v{__version__}",
"subband": subband,
"frequency": freq,
"extent_xyz": extent,
"height": height,
"station": station_name,
"pixels_per_metre": pixels_per_metre}
2020-03-17 14:13:48 -06:00
tags.update(calibration_info)
2020-02-28 08:05:36 -07:00
lofargeotiff.write_geotiff(img, os.path.join("results", f"{fname}_nearfield_calibrated.tiff"),
2020-02-28 05:54:45 -07:00
(pmin, qmin), (pmax, qmax), stationname=station_name,
obsdate=obstime, tags=tags)
return make_leaflet_map(folium_overlay, lon_center, lat_center, lon_min, lat_min, lon_max, lat_max)