157 lines
4.6 KiB
Python
Executable File
157 lines
4.6 KiB
Python
Executable File
#!/usr/bin/env python3
|
|
|
|
import logging
|
|
|
|
import matplotlib
|
|
import numpy as np
|
|
|
|
matplotlib.use('Agg')
|
|
|
|
import matplotlib.pyplot as plt # isort:skip # noqa: E402 # pylint: disable=C0411,C0412,C0413
|
|
|
|
# Change this to your input file name
|
|
datafile_path='receiving_waterfall_.dat'
|
|
|
|
# This is the output PNG filename:
|
|
figure_path='waterfall.png'
|
|
|
|
LOGGER = logging.getLogger(__name__)
|
|
|
|
OFFSET_IN_STDS = -2.0
|
|
SCALE_IN_STDS = 8.0
|
|
|
|
|
|
class EmptyArrayError(Exception):
|
|
"""
|
|
Empty data array exception
|
|
"""
|
|
|
|
|
|
def _read_waterfall(datafile_path):
|
|
"""
|
|
Read waterfall data file
|
|
|
|
:param datafile_path: Path to data file
|
|
:type datafile_path: str
|
|
:return: Waterfall data
|
|
:rtype: dict
|
|
"""
|
|
LOGGER.info('Reading waterfall file')
|
|
|
|
datafile = open(datafile_path, mode='rb')
|
|
|
|
waterfall = {
|
|
'timestamp': np.fromfile(datafile, dtype='|S32', count=1)[0],
|
|
'nchan': np.fromfile(datafile, dtype='>i4', count=1)[0],
|
|
'samp_rate': np.fromfile(datafile, dtype='>i4', count=1)[0],
|
|
'nfft_per_row': np.fromfile(datafile, dtype='>i4', count=1)[0],
|
|
'center_freq': np.fromfile(datafile, dtype='>f4', count=1)[0],
|
|
'endianess': np.fromfile(datafile, dtype='>i4', count=1)[0]
|
|
}
|
|
data_dtypes = np.dtype([('tabs', 'int64'), ('spec', 'float32', (waterfall['nchan'], ))])
|
|
waterfall['data'] = np.fromfile(datafile, dtype=data_dtypes)
|
|
if waterfall['data'].size == 0:
|
|
raise EmptyArrayError
|
|
|
|
datafile.close()
|
|
|
|
return waterfall
|
|
|
|
|
|
def _compress_waterfall(waterfall):
|
|
"""
|
|
Compress spectra of waterfall
|
|
|
|
:param waterfall: Watefall data
|
|
:type waterfall: dict
|
|
:return: Compressed spectra
|
|
:rtype: dict
|
|
"""
|
|
|
|
spec = waterfall['data']['spec']
|
|
std = np.std(spec, axis=0)
|
|
offset = np.mean(spec, axis=0) + OFFSET_IN_STDS * std
|
|
scale = SCALE_IN_STDS * std / 255.0
|
|
values = np.clip((spec - offset) / scale, 0.0, 255.0).astype('uint8')
|
|
|
|
return {'offset': offset, 'scale': scale, 'values': values}
|
|
|
|
|
|
def _get_waterfall(datafile_path):
|
|
"""
|
|
Get waterfall data
|
|
|
|
:param datafile_path: Path to data file
|
|
:type datafile_path: str_array
|
|
:return: Waterfall data including compressed data
|
|
:rtype: dict
|
|
"""
|
|
waterfall = _read_waterfall(datafile_path)
|
|
|
|
nint = waterfall['data']['spec'].shape[0]
|
|
waterfall['trel'] = np.arange(nint) * waterfall['nfft_per_row'] * waterfall['nchan'] / float(
|
|
waterfall['samp_rate'])
|
|
waterfall['freq'] = np.linspace(-0.5 * waterfall['samp_rate'],
|
|
0.5 * waterfall['samp_rate'],
|
|
waterfall['nchan'],
|
|
endpoint=False)
|
|
waterfall['compressed'] = _compress_waterfall(waterfall)
|
|
|
|
return waterfall
|
|
|
|
|
|
class Waterfall(): # pylint: disable=R0903
|
|
"""
|
|
Parse waterfall data file
|
|
|
|
:param datafile_path: Path to data file
|
|
:type datafile_path: str_array
|
|
"""
|
|
def __init__(self, datafile_path):
|
|
"""
|
|
Class constructor
|
|
"""
|
|
self.data = _get_waterfall(datafile_path)
|
|
|
|
def plot(self, figure_path, vmin=None, vmax=None):
|
|
"""
|
|
Plot waterfall into a figure
|
|
|
|
:param figure_path: Path of figure file to save
|
|
:type figure_path: str
|
|
:param value_range: Minimum and maximum value range
|
|
:type value_range: tuple
|
|
"""
|
|
tmin = np.min(self.data['data']['tabs'] / 1000000.0)
|
|
tmax = np.max(self.data['data']['tabs'] / 1000000.0)
|
|
fmin = np.min(self.data['freq'] / 1000.0)
|
|
fmax = np.max(self.data['freq'] / 1000.0)
|
|
if vmin is None or vmax is None:
|
|
vmin = -100
|
|
vmax = -50
|
|
c_idx = self.data['data']['spec'] > -200.0
|
|
if np.sum(c_idx) > 100:
|
|
data_mean = np.mean(self.data['data']['spec'][c_idx])
|
|
data_std = np.std(self.data['data']['spec'][c_idx])
|
|
vmin = data_mean - 2.0 * data_std
|
|
vmax = data_mean + 6.0 * data_std
|
|
plt.figure(figsize=(10, 20))
|
|
plt.imshow(self.data['data']['spec'],
|
|
origin='lower',
|
|
aspect='auto',
|
|
interpolation='None',
|
|
extent=[fmin, fmax, tmin, tmax],
|
|
vmin=vmin,
|
|
vmax=vmax,
|
|
cmap='viridis')
|
|
plt.xlabel('Frequency (kHz)')
|
|
plt.ylabel('Time (seconds)')
|
|
fig = plt.colorbar(aspect=50)
|
|
fig.set_label('Power (dB)')
|
|
plt.savefig(figure_path, bbox_inches='tight')
|
|
plt.close()
|
|
|
|
Waterfall.plot(Waterfall(datafile_path), figure_path=figure_path)
|
|
|
|
print("Wrote file:", figure_path)
|