Factor out apply_calibration

pull/2/head
Tammo Jan Dijkema 2020-03-17 21:13:48 +01:00
parent 70719e9215
commit b661ecbdc9
2 changed files with 105 additions and 152 deletions

File diff suppressed because one or more lines are too long

View File

@ -31,7 +31,7 @@ from .lofarimaging import nearfield_imager, sky_imager, skycoord_to_lmn
__all__ = ["sb_from_freq", "freq_from_sb", "find_caltable", "read_caltable",
"rcus_in_station", "read_acm_cube", "get_station_pqr", "get_station_type",
"make_sky_plot", "make_ground_plot", "make_xst_plots"]
"make_sky_plot", "make_ground_plot", "make_xst_plots", "apply_calibration"]
__version__ = "1.5.0"
@ -112,7 +112,7 @@ def freq_from_sb(sb: int, rcu_mode: Union[str, int] = 1):
return freq
def find_caltable(field_name: str, rcu_mode: Union[str, int], config_dir='caltables'):
def find_caltable(field_name: str, rcu_mode: Union[str, int], caltable_dir='caltables'):
"""
Find the file of a caltable.
@ -120,13 +120,13 @@ def find_caltable(field_name: str, rcu_mode: Union[str, int], config_dir='caltab
field_name: Name of the antenna field, e.g. 'DE602LBA'
rcu_mode: Receiver mode for which the calibration table is requested.
Probably should be 'inner' or 'outer'
config_dir: Root directory under which station information is stored in
caltable_dir: Root directory under which station information is stored in
subdirectories DE602C/etc/, RS106/etc/, ...
Returns:
str: full path to caltable if it exists, None if nothing found
Example:
>>> find_caltable("DE603LBA", "3", config_dir="test/CalTables")
>>> find_caltable("DE603LBA", "3", caltable_dir="test/CalTables")
'test/CalTables/DE603/CalTable-603-LBA_INNER-10_90.dat'
>>> find_caltable("ES615HBA", "5") is None
@ -150,12 +150,12 @@ def find_caltable(field_name: str, rcu_mode: Union[str, int], config_dir='caltab
else:
raise RuntimeError("Unexpected mode: " + str(rcu_mode) + " for field_name " + str(field_name))
if os.path.exists(os.path.join(config_dir, filename)):
if os.path.exists(os.path.join(caltable_dir, filename)):
# All caltables in one directory
return os.path.join(config_dir, filename)
elif os.path.exists(os.path.join(config_dir, station, filename)):
return os.path.join(caltable_dir, filename)
elif os.path.exists(os.path.join(caltable_dir, station, filename)):
# Caltables in a directory per station
return os.path.join(config_dir, station, filename)
return os.path.join(caltable_dir, station, filename)
else:
return None
@ -198,6 +198,46 @@ def read_caltable(filename: str, num_subbands=512) -> Tuple[Dict[str, str], np.n
return header_dict, caldata.reshape((num_subbands, num_rcus))
def apply_calibration(visibilities: np.ndarray, station_name: str, rcu_mode: Union[str, int],
subband: int, caltable_dir: str = "CalTables"):
"""
Apply calibration to visibilities
Args:
visibilities (np.ndarray): Visibility cube
station_name (str): Station name, e.g. "DE603"
rcu_mode (Union[str, int]): RCU mode, e.g. 5
subband (int): Subband
caltable_dir (str, optional): Directory with calibration tables. Defaults to "CalTables".
Returns:
Tuple[np.ndarray, Dict[str, str]]: modified visibilities and dictionary with calibration info
"""
caltable_filename = find_caltable(station_name, rcu_mode=rcu_mode,
caltable_dir=caltable_dir)
cal_header = {}
if caltable_filename is None:
print('No calibration table found... cube remains uncalibrated!')
else:
cal_header, cal_data = read_caltable(caltable_filename)
rcu_gains = cal_data[subband, :]
rcu_gains = np.array(rcu_gains, dtype=np.complex64)
gain_matrix = rcu_gains[np.newaxis, :] * np.conj(rcu_gains[:, np.newaxis])
visibilities = visibilities / gain_matrix
calibration_info = {}
if "CalTableHeader.Observation.Date" in cal_header:
calibration_info["calibration_obsdate"] = cal_header["CalTableHeader.Observation.Date"]
if "CalTableHeader.Calibration.Date" in cal_header:
calibration_info["calibration_date"] = cal_header["CalTableHeader.Calibration.Date"]
if "CalTableHeader.Comment" in cal_header:
calibration_info["calibration_comment"] = cal_header["CalTableHeader.Comment"]
if caltable_filename is not None:
calibration_info["calibration_filename"] = caltable_filename
return visibilities, calibration_info
def rcus_in_station(station_type: str):
"""
Give the number of RCUs in a station, given its type.
@ -517,21 +557,7 @@ def make_xst_plots(xst_filename: str,
cube = read_acm_cube(xst_filename, station_type)
# Apply calibration
caltable_filename = find_caltable(station_name, rcu_mode=rcu_mode,
config_dir=caltable_dir)
cal_header = None
if caltable_filename is None:
print('No calibration table found... cube remains uncalibrated!')
else:
cal_header, cal_data = read_caltable(caltable_filename)
rcu_gains = cal_data[subband, :]
rcu_gains = np.array(rcu_gains, dtype=np.complex64)
gain_matrix = rcu_gains[np.newaxis, :] * np.conj(rcu_gains[:, np.newaxis])
cube = cube / gain_matrix
cube, calibration_info = apply_calibration(cube, station_name, rcu_mode, subband, caltable_dir=caltable_dir)
# Split into the XX and YY polarisations (RCUs)
# This needs to be modified in future for LBA sparse
@ -648,20 +674,13 @@ def make_xst_plots(xst_filename: str,
tags = {"datafile": xst_filename,
"generated_with": f"lofarimaging v{__version__}",
"caltable": caltable_filename,
"subband": subband,
"frequency": freq,
"extent_xyz": extent,
"height": height,
"station": station_name,
"pixels_per_metre": pixels_per_metre}
if caltable_filename is not None:
if "CalTableHeader.Observation.Date" in cal_header:
tags["calibration_obsdate"] = cal_header["CalTableHeader.Observation.Date"]
if "CalTableHeader.Calibration.Date" in cal_header:
tags["calibration_date"] = cal_header["CalTableHeader.Calibration.Date"]
if "CalTableHeader.Comment" in cal_header:
tags["calibration_comment"] = cal_header["CalTableHeader.Comment"]
tags.update(calibration_info)
lofargeotiff.write_geotiff(img, os.path.join("results", f"{fname}_nearfield_calibrated.tiff"),
(pmin, qmin), (pmax, qmax), stationname=station_name,
obsdate=obstime, tags=tags)