348 lines
12 KiB
Python
348 lines
12 KiB
Python
"""Class and methods to decode SSTV signal"""
|
|
|
|
import numpy as np
|
|
import soundfile
|
|
from PIL import Image
|
|
from scipy.signal.windows import hann
|
|
|
|
from . import spec
|
|
from .common import log_message, progress_bar
|
|
|
|
|
|
def calc_lum(freq):
|
|
"""Converts SSTV pixel frequency range into 0-255 luminance byte"""
|
|
|
|
lum = int(round((freq - 1500) / 3.1372549))
|
|
return min(max(lum, 0), 255)
|
|
|
|
|
|
def barycentric_peak_interp(bins, x):
|
|
"""Interpolate between frequency bins to find x value of peak"""
|
|
|
|
# Takes x as the index of the largest bin and interpolates the
|
|
# x value of the peak using neighbours in the bins array
|
|
|
|
# Make sure data is in bounds
|
|
y1 = bins[x] if x <= 0 else bins[x-1]
|
|
y3 = bins[x] if x + 1 >= len(bins) else bins[x+1]
|
|
|
|
denom = y3 + bins[x] + y1
|
|
if denom == 0:
|
|
return 0 # erroneous
|
|
|
|
return (y3 - y1) / denom + x
|
|
|
|
|
|
class SSTVDecoder(object):
|
|
|
|
"""Create an SSTV decoder for decoding audio data"""
|
|
|
|
def __init__(self, audio_file):
|
|
self.mode = None
|
|
|
|
self._audio_file = audio_file
|
|
|
|
self._samples, self._sample_rate = soundfile.read(self._audio_file)
|
|
|
|
if self._samples.ndim > 1: # convert to mono if stereo
|
|
self._samples = self._samples.mean(axis=1)
|
|
|
|
def __enter__(self):
|
|
return self
|
|
|
|
def __exit__(self, exc_type, exc_val, traceback):
|
|
self.close()
|
|
|
|
def __del__(self):
|
|
self.close()
|
|
|
|
def decode(self, skip=0.0):
|
|
"""Attempts to decode the audio data as an SSTV signal
|
|
|
|
Returns a PIL image on success, and None if no SSTV signal was found
|
|
"""
|
|
|
|
if skip > 0.0:
|
|
self._samples = self._samples[round(skip * self._sample_rate):]
|
|
|
|
header_end = self._find_header()
|
|
|
|
if header_end is None:
|
|
return None
|
|
|
|
self.mode = self._decode_vis(header_end)
|
|
|
|
vis_end = header_end + round(spec.VIS_BIT_SIZE * 9 * self._sample_rate)
|
|
|
|
image_data = self._decode_image_data(vis_end)
|
|
|
|
return self._draw_image(image_data)
|
|
|
|
def close(self):
|
|
"""Closes any input files if they exist"""
|
|
|
|
if self._audio_file is not None and not self._audio_file.closed:
|
|
self._audio_file.close()
|
|
|
|
def _peak_fft_freq(self, data):
|
|
"""Finds the peak frequency from a section of audio data"""
|
|
|
|
windowed_data = data * hann(len(data))
|
|
fft = np.abs(np.fft.rfft(windowed_data))
|
|
|
|
# Get index of bin with highest magnitude
|
|
x = np.argmax(fft)
|
|
# Interpolated peak frequency
|
|
peak = barycentric_peak_interp(fft, x)
|
|
|
|
# Return frequency in hz
|
|
return peak * self._sample_rate / len(windowed_data)
|
|
|
|
def _find_header(self):
|
|
"""Finds the approx sample of the end of the calibration header"""
|
|
|
|
header_size = round(spec.HDR_SIZE * self._sample_rate)
|
|
window_size = round(spec.HDR_WINDOW_SIZE * self._sample_rate)
|
|
|
|
# Relative sample offsets of the header tones
|
|
leader_1_sample = 0
|
|
leader_1_search = leader_1_sample + window_size
|
|
|
|
break_sample = round(spec.BREAK_OFFSET * self._sample_rate)
|
|
break_search = break_sample + window_size
|
|
|
|
leader_2_sample = round(spec.LEADER_OFFSET * self._sample_rate)
|
|
leader_2_search = leader_2_sample + window_size
|
|
|
|
vis_start_sample = round(spec.VIS_START_OFFSET * self._sample_rate)
|
|
vis_start_search = vis_start_sample + window_size
|
|
|
|
jump_size = round(0.002 * self._sample_rate) # check every 2ms
|
|
|
|
# The margin of error created here will be negligible when decoding the
|
|
# vis due to each bit having a length of 30ms. We fix this error margin
|
|
# when decoding the image by aligning each sync pulse
|
|
|
|
for current_sample in range(0, len(self._samples) - header_size,
|
|
jump_size):
|
|
# Update search progress message
|
|
if current_sample % (jump_size * 256) == 0:
|
|
search_msg = "Searching for calibration header... {:.1f}s"
|
|
progress = current_sample / self._sample_rate
|
|
log_message(search_msg.format(progress), recur=True)
|
|
|
|
search_end = current_sample + header_size
|
|
search_area = self._samples[current_sample:search_end]
|
|
|
|
leader_1_area = search_area[leader_1_sample:leader_1_search]
|
|
break_area = search_area[break_sample:break_search]
|
|
leader_2_area = search_area[leader_2_sample:leader_2_search]
|
|
vis_start_area = search_area[vis_start_sample:vis_start_search]
|
|
|
|
# Check they're the correct frequencies
|
|
if (abs(self._peak_fft_freq(leader_1_area) - 1900) < 50
|
|
and abs(self._peak_fft_freq(break_area) - 1200) < 50
|
|
and abs(self._peak_fft_freq(leader_2_area) - 1900) < 50
|
|
and abs(self._peak_fft_freq(vis_start_area) - 1200) < 50):
|
|
|
|
stop_msg = "Searching for calibration header... Found!{:>4}"
|
|
log_message(stop_msg.format(' '))
|
|
return current_sample + header_size
|
|
|
|
log_message()
|
|
log_message("Couldn't find SSTV header in the given audio file",
|
|
err=True)
|
|
return None
|
|
|
|
def _decode_vis(self, vis_start):
|
|
"""Decodes the vis from the audio data and returns the SSTV mode"""
|
|
|
|
bit_size = round(spec.VIS_BIT_SIZE * self._sample_rate)
|
|
vis_bits = []
|
|
|
|
for bit_idx in range(8):
|
|
bit_offset = vis_start + bit_idx * bit_size
|
|
section = self._samples[bit_offset:bit_offset+bit_size]
|
|
freq = self._peak_fft_freq(section)
|
|
# 1100 hz = 1, 1300hz = 0
|
|
vis_bits.append(int(freq <= 1200))
|
|
|
|
# Check for even parity in last bit
|
|
parity = sum(vis_bits) % 2 == 0
|
|
if not parity:
|
|
raise ValueError("Error decoding VIS header (invalid parity bit)")
|
|
|
|
# LSB first so we must reverse and ignore the parity bit
|
|
vis_value = 0
|
|
for bit in vis_bits[-2::-1]:
|
|
vis_value = (vis_value << 1) | bit
|
|
|
|
if vis_value not in spec.VIS_MAP:
|
|
error = "SSTV mode is unsupported (VIS: {})"
|
|
raise ValueError(error.format(vis_value))
|
|
|
|
mode = spec.VIS_MAP[vis_value]
|
|
log_message("Detected SSTV mode {}".format(mode.NAME))
|
|
|
|
return mode
|
|
|
|
def _align_sync(self, align_start, start_of_sync=True):
|
|
"""Returns sample where the beginning of the sync pulse was found"""
|
|
|
|
# TODO - improve this
|
|
|
|
sync_window = round(self.mode.SYNC_PULSE * 1.4 * self._sample_rate)
|
|
align_stop = len(self._samples) - sync_window
|
|
|
|
if align_stop <= align_start:
|
|
return None # Reached end of audio
|
|
|
|
for current_sample in range(align_start, align_stop):
|
|
section_end = current_sample + sync_window
|
|
search_section = self._samples[current_sample:section_end]
|
|
|
|
if self._peak_fft_freq(search_section) > 1350:
|
|
break
|
|
|
|
end_sync = current_sample + (sync_window // 2)
|
|
|
|
if start_of_sync:
|
|
return end_sync - round(self.mode.SYNC_PULSE * self._sample_rate)
|
|
else:
|
|
return end_sync
|
|
|
|
def _decode_image_data(self, image_start):
|
|
"""Decodes image from the transmission section of an sstv signal"""
|
|
|
|
window_factor = self.mode.WINDOW_FACTOR
|
|
centre_window_time = (self.mode.PIXEL_TIME * window_factor) / 2
|
|
pixel_window = round(centre_window_time * 2 * self._sample_rate)
|
|
|
|
height = self.mode.LINE_COUNT
|
|
channels = self.mode.CHAN_COUNT
|
|
width = self.mode.LINE_WIDTH
|
|
# Use list comprehension to init list so we can return data early
|
|
image_data = [[[0 for i in range(width)]
|
|
for j in range(channels)] for k in range(height)]
|
|
|
|
seq_start = image_start
|
|
if self.mode.HAS_START_SYNC:
|
|
# Start at the end of the initial sync pulse
|
|
seq_start = self._align_sync(image_start, start_of_sync=False)
|
|
if seq_start is None:
|
|
raise EOFError("Reached end of audio before image data")
|
|
|
|
for line in range(height):
|
|
|
|
if self.mode.CHAN_SYNC > 0 and line == 0:
|
|
# Align seq_start to the beginning of the previous sync pulse
|
|
sync_offset = self.mode.CHAN_OFFSETS[self.mode.CHAN_SYNC]
|
|
seq_start -= round((sync_offset + self.mode.SCAN_TIME)
|
|
* self._sample_rate)
|
|
|
|
for chan in range(channels):
|
|
|
|
if chan == self.mode.CHAN_SYNC:
|
|
if line > 0 or chan > 0:
|
|
# Set base offset to the next line
|
|
seq_start += round(self.mode.LINE_TIME *
|
|
self._sample_rate)
|
|
|
|
# Align to start of sync pulse
|
|
seq_start = self._align_sync(seq_start)
|
|
if seq_start is None:
|
|
log_message()
|
|
log_message("Reached end of audio whilst decoding.")
|
|
return image_data
|
|
|
|
pixel_time = self.mode.PIXEL_TIME
|
|
if self.mode.HAS_HALF_SCAN:
|
|
# Robot mode has half-length second/third scans
|
|
if chan > 0:
|
|
pixel_time = self.mode.HALF_PIXEL_TIME
|
|
|
|
centre_window_time = (pixel_time * window_factor) / 2
|
|
pixel_window = round(centre_window_time * 2 *
|
|
self._sample_rate)
|
|
|
|
for px in range(width):
|
|
|
|
chan_offset = self.mode.CHAN_OFFSETS[chan]
|
|
|
|
px_pos = round(seq_start + (chan_offset + px *
|
|
pixel_time - centre_window_time) *
|
|
self._sample_rate)
|
|
px_end = px_pos + pixel_window
|
|
|
|
# If we are performing fft past audio length, stop early
|
|
if px_end >= len(self._samples):
|
|
log_message()
|
|
log_message("Reached end of audio whilst decoding.")
|
|
return image_data
|
|
|
|
pixel_area = self._samples[px_pos:px_end]
|
|
freq = self._peak_fft_freq(pixel_area)
|
|
|
|
image_data[line][chan][px] = calc_lum(freq)
|
|
|
|
progress_bar(line, height - 1, "Decoding image...")
|
|
|
|
return image_data
|
|
|
|
def _draw_image(self, image_data):
|
|
"""Renders the image from the decoded sstv signal"""
|
|
|
|
# Let PIL do YUV-RGB conversion for us
|
|
if self.mode.COLOR == spec.COL_FMT.YUV:
|
|
col_mode = "YCbCr"
|
|
else:
|
|
col_mode = "RGB"
|
|
|
|
width = self.mode.LINE_WIDTH
|
|
height = self.mode.LINE_COUNT
|
|
channels = self.mode.CHAN_COUNT
|
|
|
|
image = Image.new(col_mode, (width, height))
|
|
pixel_data = image.load()
|
|
|
|
log_message("Drawing image data...")
|
|
|
|
for y in range(height):
|
|
|
|
odd_line = y % 2
|
|
for x in range(width):
|
|
|
|
if channels == 2:
|
|
|
|
if self.mode.HAS_ALT_SCAN:
|
|
if self.mode.COLOR == spec.COL_FMT.YUV:
|
|
# R36
|
|
pixel = (image_data[y][0][x],
|
|
image_data[y-(odd_line-1)][1][x],
|
|
image_data[y-odd_line][1][x])
|
|
|
|
elif channels == 3:
|
|
|
|
if self.mode.COLOR == spec.COL_FMT.GBR:
|
|
# M1, M2, S1, S2, SDX
|
|
pixel = (image_data[y][2][x],
|
|
image_data[y][0][x],
|
|
image_data[y][1][x])
|
|
elif self.mode.COLOR == spec.COL_FMT.YUV:
|
|
# R72
|
|
pixel = (image_data[y][0][x],
|
|
image_data[y][2][x],
|
|
image_data[y][1][x])
|
|
elif self.mode.COLOR == spec.COL_FMT.RGB:
|
|
pixel = (image_data[y][0][x],
|
|
image_data[y][1][x],
|
|
image_data[y][2][x])
|
|
|
|
pixel_data[x, y] = pixel
|
|
|
|
if image.mode != "RGB":
|
|
image = image.convert("RGB")
|
|
|
|
log_message("...Done!")
|
|
return image
|