1
0
Fork 0
sstvdec/sstv/decode.py

348 lines
12 KiB
Python

"""Class and methods to decode SSTV signal"""
import numpy as np
import soundfile
from PIL import Image
from scipy.signal.windows import hann
from . import spec
from .common import log_message, progress_bar
def calc_lum(freq):
"""Converts SSTV pixel frequency range into 0-255 luminance byte"""
lum = int(round((freq - 1500) / 3.1372549))
return min(max(lum, 0), 255)
def barycentric_peak_interp(bins, x):
"""Interpolate between frequency bins to find x value of peak"""
# Takes x as the index of the largest bin and interpolates the
# x value of the peak using neighbours in the bins array
# Make sure data is in bounds
y1 = bins[x] if x <= 0 else bins[x-1]
y3 = bins[x] if x + 1 >= len(bins) else bins[x+1]
denom = y3 + bins[x] + y1
if denom == 0:
return 0 # erroneous
return (y3 - y1) / denom + x
class SSTVDecoder(object):
"""Create an SSTV decoder for decoding audio data"""
def __init__(self, audio_file):
self.mode = None
self._audio_file = audio_file
self._samples, self._sample_rate = soundfile.read(self._audio_file)
if self._samples.ndim > 1: # convert to mono if stereo
self._samples = self._samples.mean(axis=1)
def __enter__(self):
return self
def __exit__(self, exc_type, exc_val, traceback):
self.close()
def __del__(self):
self.close()
def decode(self, skip=0.0):
"""Attempts to decode the audio data as an SSTV signal
Returns a PIL image on success, and None if no SSTV signal was found
"""
if skip > 0.0:
self._samples = self._samples[round(skip * self._sample_rate):]
header_end = self._find_header()
if header_end is None:
return None
self.mode = self._decode_vis(header_end)
vis_end = header_end + round(spec.VIS_BIT_SIZE * 9 * self._sample_rate)
image_data = self._decode_image_data(vis_end)
return self._draw_image(image_data)
def close(self):
"""Closes any input files if they exist"""
if self._audio_file is not None and not self._audio_file.closed:
self._audio_file.close()
def _peak_fft_freq(self, data):
"""Finds the peak frequency from a section of audio data"""
windowed_data = data * hann(len(data))
fft = np.abs(np.fft.rfft(windowed_data))
# Get index of bin with highest magnitude
x = np.argmax(fft)
# Interpolated peak frequency
peak = barycentric_peak_interp(fft, x)
# Return frequency in hz
return peak * self._sample_rate / len(windowed_data)
def _find_header(self):
"""Finds the approx sample of the end of the calibration header"""
header_size = round(spec.HDR_SIZE * self._sample_rate)
window_size = round(spec.HDR_WINDOW_SIZE * self._sample_rate)
# Relative sample offsets of the header tones
leader_1_sample = 0
leader_1_search = leader_1_sample + window_size
break_sample = round(spec.BREAK_OFFSET * self._sample_rate)
break_search = break_sample + window_size
leader_2_sample = round(spec.LEADER_OFFSET * self._sample_rate)
leader_2_search = leader_2_sample + window_size
vis_start_sample = round(spec.VIS_START_OFFSET * self._sample_rate)
vis_start_search = vis_start_sample + window_size
jump_size = round(0.002 * self._sample_rate) # check every 2ms
# The margin of error created here will be negligible when decoding the
# vis due to each bit having a length of 30ms. We fix this error margin
# when decoding the image by aligning each sync pulse
for current_sample in range(0, len(self._samples) - header_size,
jump_size):
# Update search progress message
if current_sample % (jump_size * 256) == 0:
search_msg = "Searching for calibration header... {:.1f}s"
progress = current_sample / self._sample_rate
log_message(search_msg.format(progress), recur=True)
search_end = current_sample + header_size
search_area = self._samples[current_sample:search_end]
leader_1_area = search_area[leader_1_sample:leader_1_search]
break_area = search_area[break_sample:break_search]
leader_2_area = search_area[leader_2_sample:leader_2_search]
vis_start_area = search_area[vis_start_sample:vis_start_search]
# Check they're the correct frequencies
if (abs(self._peak_fft_freq(leader_1_area) - 1900) < 50
and abs(self._peak_fft_freq(break_area) - 1200) < 50
and abs(self._peak_fft_freq(leader_2_area) - 1900) < 50
and abs(self._peak_fft_freq(vis_start_area) - 1200) < 50):
stop_msg = "Searching for calibration header... Found!{:>4}"
log_message(stop_msg.format(' '))
return current_sample + header_size
log_message()
log_message("Couldn't find SSTV header in the given audio file",
err=True)
return None
def _decode_vis(self, vis_start):
"""Decodes the vis from the audio data and returns the SSTV mode"""
bit_size = round(spec.VIS_BIT_SIZE * self._sample_rate)
vis_bits = []
for bit_idx in range(8):
bit_offset = vis_start + bit_idx * bit_size
section = self._samples[bit_offset:bit_offset+bit_size]
freq = self._peak_fft_freq(section)
# 1100 hz = 1, 1300hz = 0
vis_bits.append(int(freq <= 1200))
# Check for even parity in last bit
parity = sum(vis_bits) % 2 == 0
if not parity:
raise ValueError("Error decoding VIS header (invalid parity bit)")
# LSB first so we must reverse and ignore the parity bit
vis_value = 0
for bit in vis_bits[-2::-1]:
vis_value = (vis_value << 1) | bit
if vis_value not in spec.VIS_MAP:
error = "SSTV mode is unsupported (VIS: {})"
raise ValueError(error.format(vis_value))
mode = spec.VIS_MAP[vis_value]
log_message("Detected SSTV mode {}".format(mode.NAME))
return mode
def _align_sync(self, align_start, start_of_sync=True):
"""Returns sample where the beginning of the sync pulse was found"""
# TODO - improve this
sync_window = round(self.mode.SYNC_PULSE * 1.4 * self._sample_rate)
align_stop = len(self._samples) - sync_window
if align_stop <= align_start:
return None # Reached end of audio
for current_sample in range(align_start, align_stop):
section_end = current_sample + sync_window
search_section = self._samples[current_sample:section_end]
if self._peak_fft_freq(search_section) > 1350:
break
end_sync = current_sample + (sync_window // 2)
if start_of_sync:
return end_sync - round(self.mode.SYNC_PULSE * self._sample_rate)
else:
return end_sync
def _decode_image_data(self, image_start):
"""Decodes image from the transmission section of an sstv signal"""
window_factor = self.mode.WINDOW_FACTOR
centre_window_time = (self.mode.PIXEL_TIME * window_factor) / 2
pixel_window = round(centre_window_time * 2 * self._sample_rate)
height = self.mode.LINE_COUNT
channels = self.mode.CHAN_COUNT
width = self.mode.LINE_WIDTH
# Use list comprehension to init list so we can return data early
image_data = [[[0 for i in range(width)]
for j in range(channels)] for k in range(height)]
seq_start = image_start
if self.mode.HAS_START_SYNC:
# Start at the end of the initial sync pulse
seq_start = self._align_sync(image_start, start_of_sync=False)
if seq_start is None:
raise EOFError("Reached end of audio before image data")
for line in range(height):
if self.mode.CHAN_SYNC > 0 and line == 0:
# Align seq_start to the beginning of the previous sync pulse
sync_offset = self.mode.CHAN_OFFSETS[self.mode.CHAN_SYNC]
seq_start -= round((sync_offset + self.mode.SCAN_TIME)
* self._sample_rate)
for chan in range(channels):
if chan == self.mode.CHAN_SYNC:
if line > 0 or chan > 0:
# Set base offset to the next line
seq_start += round(self.mode.LINE_TIME *
self._sample_rate)
# Align to start of sync pulse
seq_start = self._align_sync(seq_start)
if seq_start is None:
log_message()
log_message("Reached end of audio whilst decoding.")
return image_data
pixel_time = self.mode.PIXEL_TIME
if self.mode.HAS_HALF_SCAN:
# Robot mode has half-length second/third scans
if chan > 0:
pixel_time = self.mode.HALF_PIXEL_TIME
centre_window_time = (pixel_time * window_factor) / 2
pixel_window = round(centre_window_time * 2 *
self._sample_rate)
for px in range(width):
chan_offset = self.mode.CHAN_OFFSETS[chan]
px_pos = round(seq_start + (chan_offset + px *
pixel_time - centre_window_time) *
self._sample_rate)
px_end = px_pos + pixel_window
# If we are performing fft past audio length, stop early
if px_end >= len(self._samples):
log_message()
log_message("Reached end of audio whilst decoding.")
return image_data
pixel_area = self._samples[px_pos:px_end]
freq = self._peak_fft_freq(pixel_area)
image_data[line][chan][px] = calc_lum(freq)
progress_bar(line, height - 1, "Decoding image...")
return image_data
def _draw_image(self, image_data):
"""Renders the image from the decoded sstv signal"""
# Let PIL do YUV-RGB conversion for us
if self.mode.COLOR == spec.COL_FMT.YUV:
col_mode = "YCbCr"
else:
col_mode = "RGB"
width = self.mode.LINE_WIDTH
height = self.mode.LINE_COUNT
channels = self.mode.CHAN_COUNT
image = Image.new(col_mode, (width, height))
pixel_data = image.load()
log_message("Drawing image data...")
for y in range(height):
odd_line = y % 2
for x in range(width):
if channels == 2:
if self.mode.HAS_ALT_SCAN:
if self.mode.COLOR == spec.COL_FMT.YUV:
# R36
pixel = (image_data[y][0][x],
image_data[y-(odd_line-1)][1][x],
image_data[y-odd_line][1][x])
elif channels == 3:
if self.mode.COLOR == spec.COL_FMT.GBR:
# M1, M2, S1, S2, SDX
pixel = (image_data[y][2][x],
image_data[y][0][x],
image_data[y][1][x])
elif self.mode.COLOR == spec.COL_FMT.YUV:
# R72
pixel = (image_data[y][0][x],
image_data[y][2][x],
image_data[y][1][x])
elif self.mode.COLOR == spec.COL_FMT.RGB:
pixel = (image_data[y][0][x],
image_data[y][1][x],
image_data[y][2][x])
pixel_data[x, y] = pixel
if image.mode != "RGB":
image = image.convert("RGB")
log_message("...Done!")
return image