Improve docs, add tests

pull/2/head
Tammo Jan Dijkema 2020-03-13 17:11:59 +01:00
parent 6376249878
commit 574e7f6cef
5 changed files with 162 additions and 69 deletions

4
.gitignore vendored
View File

@ -1,6 +1,8 @@
results/* results/*
tilecache/* tilecache/*
caltables/* .idea
*.zip
data/*
# Byte-compiled / optimized / DLL files # Byte-compiled / optimized / DLL files
__pycache__/ __pycache__/

View File

@ -78,6 +78,7 @@ def ground_imager(visibilities, freq, npix_p, npix_q, dims, station_pqr, height=
return img return img
def nearfield_imager(visibilities, baseline_indices, freqs, npix_p, npix_q, extent, station_pqr, height=1.5, def nearfield_imager(visibilities, baseline_indices, freqs, npix_p, npix_q, extent, station_pqr, height=1.5,
max_memory_mb=200): max_memory_mb=200):
""" """

View File

@ -19,7 +19,7 @@ from astropy.coordinates import SkyCoord, GCRS, EarthLocation, AltAz, get_sun
import astropy.units as u import astropy.units as u
from astropy.time import Time from astropy.time import Time
from typing import List, Dict, Any, Tuple from typing import List, Dict, Any, Tuple, Union
import lofarantpos import lofarantpos
from packaging import version from packaging import version
@ -29,7 +29,7 @@ from .lofarimaging import nearfield_imager, sky_imager, skycoord_to_lmn
__all__ = ["sb_from_freq", "freq_from_sb", "find_caltable", "read_caltable", __all__ = ["sb_from_freq", "freq_from_sb", "find_caltable", "read_caltable",
"rcus_in_station", "read_acm_cube", "get_station_pqr", "rcus_in_station", "read_acm_cube", "get_station_pqr", "get_station_type",
"make_sky_plot", "make_ground_plot", "make_xst_plots"] "make_sky_plot", "make_ground_plot", "make_xst_plots"]
__version__ = "1.5.0" __version__ = "1.5.0"
@ -47,7 +47,7 @@ GENERIC_REMOTE_201512 = [0, 13, 12, 4, 11, 11, 7, 8, 2, 7, 11, 2, 10, 2, 6, 3, 8
assert (version.parse(lofarantpos.__version__) >= version.parse("0.4.0")) assert (version.parse(lofarantpos.__version__) >= version.parse("0.4.0"))
def sb_from_freq(freq: float, rcu_mode='1'): def sb_from_freq(freq: float, rcu_mode: Union[int, str] = 1) -> int:
""" """
Convert subband number to central frequency Convert subband number to central frequency
@ -57,23 +57,29 @@ def sb_from_freq(freq: float, rcu_mode='1'):
Returns: Returns:
int: subband number int: subband number
Example:
>>> sb_from_freq(58007812.5, '3')
297
""" """
clock = 200e6 clock = 200e6
if rcu_mode == '6': if int(rcu_mode) == 6:
clock = 160e6 clock = 160e6
sb_bandwidth = 0.5 * clock / 512.
freq_offset = 0 freq_offset = 0
if rcu_mode == '5': if int(rcu_mode) == 5:
freq_offset = 100e6 freq_offset = 100e6
elif rcu_mode == '6': elif int(rcu_mode) == 6:
freq_offset = 160e6 freq_offset = 160e6
elif rcu_mode == '7': elif int(rcu_mode) == 7:
freq_offset = 200e6 freq_offset = 200e6
sb_bandwidth = 0.5 * clock / 512.
sb = round((freq - freq_offset) / sb_bandwidth) sb = round((freq - freq_offset) / sb_bandwidth)
return int(sb) return int(sb)
def freq_from_sb(sb: int, rcu_mode='1'): def freq_from_sb(sb: int, rcu_mode: Union[str, int] = 1):
""" """
Convert central frequency to subband number Convert central frequency to subband number
@ -83,25 +89,32 @@ def freq_from_sb(sb: int, rcu_mode='1'):
Returns: Returns:
float: frequency in Hz float: frequency in Hz
Example:
>>> freq_from_sb(297, '3')
58007812.5
""" """
clock = 200e6 clock = 200e6
if rcu_mode == '6': if int(rcu_mode) == 6:
clock = 160e6 clock = 160e6
freq_offset = 0 freq_offset = 0
if rcu_mode == '5': if int(rcu_mode) == 5:
freq_offset = 100e6 freq_offset = 100e6
elif rcu_mode == '6': elif int(rcu_mode) == 6:
freq_offset = 160e6 freq_offset = 160e6
elif rcu_mode == '7': elif int(rcu_mode) == 7:
freq_offset = 200e6 freq_offset = 200e6
sb_bandwidth = 0.5 * clock / 512. sb_bandwidth = 0.5 * clock / 512.
freq = (sb * sb_bandwidth) + freq_offset freq = (sb * sb_bandwidth) + freq_offset
return freq return freq
def find_caltable(field_name: str, rcu_mode: str, config_dir='caltables'): def find_caltable(field_name: str, rcu_mode: Union[str, int], config_dir='caltables'):
""" """
Find the file of a caltable. Find the file of a caltable.
Args: Args:
field_name: Name of the antenna field, e.g. 'DE602LBA' field_name: Name of the antenna field, e.g. 'DE602LBA'
rcu_mode: Receiver mode for which the calibration table is requested. rcu_mode: Receiver mode for which the calibration table is requested.
@ -109,38 +122,39 @@ def find_caltable(field_name: str, rcu_mode: str, config_dir='caltables'):
config_dir: Root directory under which station information is stored in config_dir: Root directory under which station information is stored in
subdirectories DE602C/etc/, RS106/etc/, ... subdirectories DE602C/etc/, RS106/etc/, ...
Returns: Returns:
str: filename if it exists, None if nothing found str: full path to caltable if it exists, None if nothing found
Example:
>>> find_caltable("DE603LBA", "3", config_dir="test/CalTables")
'test/CalTables/DE603/CalTable-603-LBA_INNER-10_90.dat'
>>> find_caltable("ES615HBA", "5") is None
True
""" """
station, field = field_name[0:5].upper(), field_name[5:].upper() station, field = field_name[0:5].upper(), field_name[5:].upper()
station_number = station[2:5] station_number = station[2:5]
# Map to the correct file depending on the RCU mode
if rcu_mode == 'outer' and 'LBA' in field_name:
filename = os.path.join(config_dir, f"CalTable-{station_number}-LBA_OUTER-10_90.dat")
elif rcu_mode == 'inner' and 'LBA' in field_name:
filename = os.path.join(config_dir, f"CalTable-{station_number}-LBA_INNER-10_90.dat")
elif rcu_mode == '5' and 'HBA' in field_name:
filename = os.path.join(config_dir, f"CalTable-{station_number}-HBA-110_190.dat")
elif rcu_mode == '6' and 'HBA' in field_name:
filename = os.path.join(config_dir, f"CalTable-{station_number}-HBA-170_230.dat")
elif rcu_mode == '7' and 'HBA' in field_name:
filename = os.path.join(config_dir, f"CalTable-{station_number}-HBA-210_250.dat")
if os.path.exists(filename): filename = f"CalTable-{station_number}"
return filename
# If the original folder structure is kept
if rcu_mode == 'outer' and 'LBA' in field_name:
filename = os.path.join(config_dir, f"{station}/CalTable-{station_number}-LBA_OUTER-10_90.dat")
elif rcu_mode == 'inner' and 'LBA' in field_name:
filename = os.path.join(config_dir, f"{station}/CalTable-{station_number}-LBA_INNER-10_90.dat")
elif rcu_mode == '5' and 'HBA' in field_name:
filename = os.path.join(config_dir, f"{station}/CalTable-{station_number}-HBA-110_190.dat")
elif rcu_mode == '6' and 'HBA' in field_name:
filename = os.path.join(config_dir, f"{station}/CalTable-{station_number}-HBA-170_230.dat")
elif rcu_mode == '7' and 'HBA' in field_name:
filename = os.path.join(config_dir, f"{station}/CalTable-{station_number}-HBA-210_250.dat")
if os.path.exists(filename): if str(rcu_mode) in ('outer', '1', '2') and 'LBA' in field_name:
return filename filename += "-LBA_OUTER-10_90.dat"
elif str(rcu_mode) in ('inner', '3', '4') and 'LBA' in field_name:
filename += "-LBA_INNER-10_90.dat"
elif str(rcu_mode) == '5' and 'HBA' in field_name:
filename += "-HBA-110_190.dat"
elif str(rcu_mode) == '6' and 'HBA' in field_name:
filename += "-HBA-170_230.dat"
elif str(rcu_mode) == '7' and 'HBA' in field_name:
filename += "-HBA-210_250.dat"
else:
raise RuntimeError("Unexpected mode: " + str(rcu_mode) + " for field_name " + str(field_name))
if os.path.exists(os.path.join(config_dir, filename)):
# All caltables in one directory
return os.path.join(config_dir, filename)
elif os.path.exists(os.path.join(config_dir, station, filename)):
# Caltables in a directory per station
return os.path.join(config_dir, station, filename)
else: else:
return None return None
@ -190,6 +204,10 @@ def rcus_in_station(station_type: str):
Args: Args:
station_type: Kind of station that produced the correlation. One of station_type: Kind of station that produced the correlation. One of
'core', 'remote', 'intl'. 'core', 'remote', 'intl'.
Example:
>>> rcus_in_station('remote')
96
""" """
return {'core': 96, 'remote': 96, 'intl': 192}[station_type] return {'core': 96, 'remote': 96, 'intl': 192}[station_type]
@ -206,10 +224,10 @@ def read_acm_cube(filename: str, station_type: str):
Returns: Returns:
np.array: 3D cube of complex numbers, with indices [time slots, rcu, rcu]. np.array: 3D cube of complex numbers, with indices [time slots, rcu, rcu].
Examples: Example:
>>> cube = read_acm_cube('20170720_095816_xst.dat', 'intl') >>> cube = read_acm_cube('test/20170720_095816_mode_3_xst_sb297.dat', 'intl')
>>> cube.shape >>> cube.shape
(29, 192, 192) (29, 192, 192)
""" """
num_rcu = rcus_in_station(station_type) num_rcu = rcus_in_station(station_type)
data = np.fromfile(filename, dtype=np.complex128) data = np.fromfile(filename, dtype=np.complex128)
@ -217,7 +235,48 @@ def read_acm_cube(filename: str, station_type: str):
return data.reshape((time_slots, num_rcu, num_rcu)) return data.reshape((time_slots, num_rcu, num_rcu))
def get_station_pqr(station_name: str, station_type: str, array_type: str, db): def get_station_type(station_name: str) -> str:
"""
Get the station type, one of 'intl', 'core' or 'remote'
Args:
station_name: Station name, e.g. "DE603LBA" or just "DE603"
Returns:
str: station type, one of 'intl', 'core' or 'remote'
Example:
>>> get_station_type("DE603LBA")
'intl'
"""
if station_name[0] == "C":
return "core"
elif station_name[0] == "R" or station_name[:5] == "PL611":
return "remote"
else:
return "intl"
def get_station_pqr(station_name: str, array_type: str, db):
"""
Get PQR coordinates for the relevant subset of antennas in a station.
Args:
station_name: Station name, e.g. DE603LBA
array_type: Array type, one of 'inner' or 'outer' (ignored for HBA)
db: instance of LofarAntennaDatabase from lofarantpos
Example:
>>> from lofarantpos.db import LofarAntennaDatabase
>>> db = LofarAntennaDatabase()
>>> pqr = get_station_pqr("DE603LBA", "outer", db)
>>> pqr.shape
(96, 3)
>>> pqr[0, 0]
1.7434713
"""
station_type = get_station_type(station_name)
if 'LBA' in station_name: if 'LBA' in station_name:
# Get the PQR positions for an individual station # Get the PQR positions for an individual station
station_pqr = db.antenna_pqr(station_name) station_pqr = db.antenna_pqr(station_name)
@ -241,7 +300,8 @@ def get_station_pqr(station_name: str, station_type: str, array_type: str, db):
def make_ground_plot(image: np.array, background_map: np.array, extent: List[int], title: str = "Ground plot", def make_ground_plot(image: np.array, background_map: np.array, extent: List[int], title: str = "Ground plot",
subtitle: str = "", opacity: float = 0.6, fig: plt.Figure = None, **kwargs) -> Tuple[plt.Figure, np.array]: subtitle: str = "", opacity: float = 0.6, fig: plt.Figure = None, **kwargs) \
-> Tuple[plt.Figure, np.array]:
""" """
Make a ground plot of an array with data Make a ground plot of an array with data
@ -256,6 +316,12 @@ def make_ground_plot(image: np.array, background_map: np.array, extent: List[int
Returns: Returns:
Updated figure and numpy array with only the plot Updated figure and numpy array with only the plot
Example:
>>> dummy_image = np.zeros((150, 150))
>>> fig, plot_array = make_ground_plot(dummy_image, dummy_image, [-300, 300, -100, 100])
>>> plot_array.shape
(150, 150, 4)
""" """
if fig is None: if fig is None:
fig = plt.figure(figsize=(10, 10), constrained_layout=True) fig = plt.figure(figsize=(10, 10), constrained_layout=True)
@ -323,6 +389,10 @@ def make_sky_plot(image: np.array, marked_bodies_lmn: Dict[str, np.array],
Returns: Returns:
Updated figure Updated figure
Example:
>>> dummy_image = np.zeros((150, 150))
>>> fig = make_sky_plot(dummy_image, {})
""" """
if fig is None: if fig is None:
fig = plt.figure(figsize=(10, 10)) fig = plt.figure(figsize=(10, 10))
@ -364,31 +434,51 @@ def make_sky_plot(image: np.array, marked_bodies_lmn: Dict[str, np.array],
return fig return fig
def make_xst_plots(xst_filename, def make_xst_plots(xst_filename: str,
station_name, station_name: str,
caltable_dir="caltables", caltable_dir: str = "CalTables",
extent=None, extent: List[float] = None,
pixels_per_metre=0.5, pixels_per_metre: float = 0.5,
sky_vmin=None, sky_vmin: float = None,
sky_vmax=None, sky_vmax: float = None,
ground_vmin=None, ground_vmin: float = None,
ground_vmax=None, ground_vmax: float = None,
height=1.5, height: float = 1.5,
map_zoom=19, map_zoom: int = 19,
sky_only=False, sky_only: bool = False,
opacity=0.6): opacity: float = 0.6):
"""Make a sky image and a ground image from an XST file""" """
Create sky and ground plots for an XST file
Args:
xst_filename: Full path to XST file
station_name: Full station name, e.g. "DE603LBA"
caltable_dir: Caltable directory. Defaults to "CalTables".
extent: Extent (in m) for ground image. Defaults to [-150, 150, -150, 150]
pixels_per_metre: Pixels per metre. Defaults to 0.5.
height: Height (in m) for ground image. Defaults to 1.5.
map_zoom: Zoom level for map tiles. Defaults to 19.
sky_only: Make sky image only. Defaults to False.
opacity: Opacity for map overlay. Defaults to 0.6.
Returns:
Leaflet map (that renders as an interactive map in a notebook)
Example:
>>> leafletmap = make_xst_plots("test/20170720_095816_mode_3_xst_sb297.dat", \
"DE603LBA", caltable_dir="test/CalTables")
Maximum at -6m east, 70m north of station center (lat/long 50.97998, 11.71118)
>>> type(leafletmap)
<class 'folium.folium.Map'>
"""
cubename = os.path.basename(xst_filename) cubename = os.path.basename(xst_filename)
if extent is None: if extent is None:
extent = [-150, 150, -150, 150] extent = [-150, 150, -150, 150]
if station_name[0] == "C": station_type = get_station_type(station_name)
station_type = "core"
elif station_name[0] == "R" or station_name[:5] == "PL611":
station_type = "remote"
else:
station_type = "intl"
os.makedirs('results', exist_ok=True) os.makedirs('results', exist_ok=True)
@ -452,7 +542,7 @@ def make_xst_plots(xst_filename,
# Setup the database # Setup the database
db = LofarAntennaDatabase() db = LofarAntennaDatabase()
station_pqr = get_station_pqr(station_name, station_type, array_type, db) station_pqr = get_station_pqr(station_name, array_type, db)
# Rotate station_pqr to a north-oriented xyz frame, where y points North, in a plane through the station. # Rotate station_pqr to a north-oriented xyz frame, where y points North, in a plane through the station.
rotation = db.rotation_from_north(station_name) rotation = db.rotation_from_north(station_name)