kludge old py2 top_block with gain

spacecruft
Jeff Moe 2023-01-19 10:36:17 -07:00
parent fdc594b64a
commit ec2706053d
3 changed files with 253 additions and 71 deletions

View File

@ -0,0 +1,202 @@
#!/usr/bin/env python2
# -*- coding: utf-8 -*-
##################################################
# GNU Radio Python Flow Graph
# Title: Top Block
# Generated: Mon Oct 28 01:00:35 2019
##################################################
from gnuradio import blocks
from gnuradio import eng_notation
from gnuradio import fft
from gnuradio import gr
from gnuradio.eng_option import eng_option
from gnuradio.fft import window
from gnuradio.filter import firdes
from optparse import OptionParser
import numpy as np
import osmosdr
import time
class top_block(gr.top_block):
def __init__(self, c_freq=1420000000, nbin=1000, nchan=1024, obs_time=60, samp_rate=2400000):
gr.top_block.__init__(self, "Top Block")
##################################################
# Parameters
##################################################
self.c_freq = c_freq
self.nbin = nbin
self.nchan = nchan
self.obs_time = obs_time
self.samp_rate = samp_rate
##################################################
# Variables
##################################################
self.sinc_sample_locations = sinc_sample_locations = np.arange(-np.pi*4/2.0, np.pi*4/2.0, np.pi/nchan)
self.sinc = sinc = np.sinc(sinc_sample_locations/np.pi)
self.custom_window = custom_window = sinc*np.hamming(4*nchan)
##################################################
# Blocks
##################################################
self.fft_vxx_0 = fft.fft_vcc(nchan, True, (window.blackmanharris(nchan)), True, 1)
self.blocks_stream_to_vector_0_2 = blocks.stream_to_vector(gr.sizeof_gr_complex*1, nchan)
self.blocks_stream_to_vector_0_1 = blocks.stream_to_vector(gr.sizeof_gr_complex*1, nchan)
self.blocks_stream_to_vector_0_0 = blocks.stream_to_vector(gr.sizeof_gr_complex*1, nchan)
self.blocks_stream_to_vector_0 = blocks.stream_to_vector(gr.sizeof_gr_complex*1, nchan)
self.blocks_multiply_const_vxx_0_0_0_0 = blocks.multiply_const_vcc((custom_window[0:nchan]))
self.blocks_multiply_const_vxx_0_0_0 = blocks.multiply_const_vcc((custom_window[nchan:2*nchan]))
self.blocks_multiply_const_vxx_0_0 = blocks.multiply_const_vcc((custom_window[2*nchan:3*nchan]))
self.blocks_multiply_const_vxx_0 = blocks.multiply_const_vcc((custom_window[-nchan:]))
self.blocks_integrate_xx_0 = blocks.integrate_ff(nbin, nchan)
self.blocks_head_0 = blocks.head(gr.sizeof_gr_complex*1, int(obs_time*samp_rate))
self.blocks_file_sink_0 = blocks.file_sink(gr.sizeof_float*nchan, 'observation.dat', True)
self.blocks_file_sink_0.set_unbuffered(False)
self.blocks_delay_0_1 = blocks.delay(gr.sizeof_gr_complex*1, nchan)
self.blocks_delay_0_0 = blocks.delay(gr.sizeof_gr_complex*1, nchan*2)
self.blocks_delay_0 = blocks.delay(gr.sizeof_gr_complex*1, nchan*3)
self.blocks_complex_to_mag_squared_0 = blocks.complex_to_mag_squared(nchan)
self.blocks_add_xx_0 = blocks.add_vcc(nchan)
self.RTL820T = osmosdr.source( args="numchan=" + str(1) + " " + '' )
self.RTL820T.set_sample_rate(samp_rate)
self.RTL820T.set_center_freq(c_freq, 0)
self.RTL820T.set_freq_corr(0, 0)
self.RTL820T.set_dc_offset_mode(0, 0)
self.RTL820T.set_iq_balance_mode(0, 0)
self.RTL820T.set_gain_mode(False, 0)
self.RTL820T.set_gain(10, 0)
self.RTL820T.set_if_gain(10, 0)
self.RTL820T.set_bb_gain(10, 0)
self.RTL820T.set_antenna('', 0)
self.RTL820T.set_bandwidth(0, 0)
##################################################
# Connections
##################################################
self.connect((self.RTL820T, 0), (self.blocks_head_0, 0))
self.connect((self.blocks_add_xx_0, 0), (self.fft_vxx_0, 0))
self.connect((self.blocks_complex_to_mag_squared_0, 0), (self.blocks_integrate_xx_0, 0))
self.connect((self.blocks_delay_0, 0), (self.blocks_stream_to_vector_0_2, 0))
self.connect((self.blocks_delay_0_0, 0), (self.blocks_stream_to_vector_0_0, 0))
self.connect((self.blocks_delay_0_1, 0), (self.blocks_stream_to_vector_0_1, 0))
self.connect((self.blocks_head_0, 0), (self.blocks_delay_0, 0))
self.connect((self.blocks_head_0, 0), (self.blocks_delay_0_0, 0))
self.connect((self.blocks_head_0, 0), (self.blocks_delay_0_1, 0))
self.connect((self.blocks_head_0, 0), (self.blocks_stream_to_vector_0, 0))
self.connect((self.blocks_integrate_xx_0, 0), (self.blocks_file_sink_0, 0))
self.connect((self.blocks_multiply_const_vxx_0, 0), (self.blocks_add_xx_0, 0))
self.connect((self.blocks_multiply_const_vxx_0_0, 0), (self.blocks_add_xx_0, 1))
self.connect((self.blocks_multiply_const_vxx_0_0_0, 0), (self.blocks_add_xx_0, 2))
self.connect((self.blocks_multiply_const_vxx_0_0_0_0, 0), (self.blocks_add_xx_0, 3))
self.connect((self.blocks_stream_to_vector_0, 0), (self.blocks_multiply_const_vxx_0, 0))
self.connect((self.blocks_stream_to_vector_0_0, 0), (self.blocks_multiply_const_vxx_0_0_0, 0))
self.connect((self.blocks_stream_to_vector_0_1, 0), (self.blocks_multiply_const_vxx_0_0, 0))
self.connect((self.blocks_stream_to_vector_0_2, 0), (self.blocks_multiply_const_vxx_0_0_0_0, 0))
self.connect((self.fft_vxx_0, 0), (self.blocks_complex_to_mag_squared_0, 0))
def get_c_freq(self):
return self.c_freq
def set_c_freq(self, c_freq):
self.c_freq = c_freq
self.RTL820T.set_center_freq(self.c_freq, 0)
def get_nbin(self):
return self.nbin
def set_nbin(self, nbin):
self.nbin = nbin
def get_nchan(self):
return self.nchan
def set_nchan(self, nchan):
self.nchan = nchan
self.set_custom_window(self.sinc*np.hamming(4*self.nchan))
self.set_sinc_sample_locations(np.arange(-np.pi*4/2.0, np.pi*4/2.0, np.pi/self.nchan))
self.blocks_multiply_const_vxx_0_0_0_0.set_k((self.custom_window[0:self.nchan]))
self.blocks_multiply_const_vxx_0_0_0.set_k((self.custom_window[self.nchan:2*self.nchan]))
self.blocks_multiply_const_vxx_0_0.set_k((self.custom_window[2*self.nchan:3*self.nchan]))
self.blocks_multiply_const_vxx_0.set_k((self.custom_window[-self.nchan:]))
self.blocks_delay_0_1.set_dly(self.nchan)
self.blocks_delay_0_0.set_dly(self.nchan*2)
self.blocks_delay_0.set_dly(self.nchan*3)
def get_obs_time(self):
return self.obs_time
def set_obs_time(self, obs_time):
self.obs_time = obs_time
self.blocks_head_0.set_length(int(self.obs_time*self.samp_rate))
def get_samp_rate(self):
return self.samp_rate
def set_samp_rate(self, samp_rate):
self.samp_rate = samp_rate
self.blocks_head_0.set_length(int(self.obs_time*self.samp_rate))
self.RTL820T.set_sample_rate(self.samp_rate)
def get_sinc_sample_locations(self):
return self.sinc_sample_locations
def set_sinc_sample_locations(self, sinc_sample_locations):
self.sinc_sample_locations = sinc_sample_locations
self.set_sinc(np.sinc(self.sinc_sample_locations/np.pi))
def get_sinc(self):
return self.sinc
def set_sinc(self, sinc):
self.sinc = sinc
self.set_custom_window(self.sinc*np.hamming(4*self.nchan))
self.set_sinc(np.sinc(self.sinc_sample_locations/np.pi))
def get_custom_window(self):
return self.custom_window
def set_custom_window(self, custom_window):
self.custom_window = custom_window
self.blocks_multiply_const_vxx_0_0_0_0.set_k((self.custom_window[0:self.nchan]))
self.blocks_multiply_const_vxx_0_0_0.set_k((self.custom_window[self.nchan:2*self.nchan]))
self.blocks_multiply_const_vxx_0_0.set_k((self.custom_window[2*self.nchan:3*self.nchan]))
self.blocks_multiply_const_vxx_0.set_k((self.custom_window[-self.nchan:]))
def argument_parser():
parser = OptionParser(usage="%prog: [options]", option_class=eng_option)
parser.add_option(
"", "--c-freq", dest="c_freq", type="eng_float", default=eng_notation.num_to_str(1420000000),
help="Set c_freq [default=%default]")
parser.add_option(
"", "--nbin", dest="nbin", type="intx", default=1000,
help="Set nbin [default=%default]")
parser.add_option(
"", "--nchan", dest="nchan", type="intx", default=1024,
help="Set nchan [default=%default]")
parser.add_option(
"", "--obs-time", dest="obs_time", type="eng_float", default=eng_notation.num_to_str(60),
help="Set obs_time [default=%default]")
parser.add_option(
"", "--samp-rate", dest="samp_rate", type="eng_float", default=eng_notation.num_to_str(2400000),
help="Set samp_rate [default=%default]")
return parser
def main(top_block_cls=top_block, options=None):
if options is None:
options, _ = argument_parser().parse_args()
tb = top_block_cls(c_freq=options.c_freq, nbin=options.nbin, nchan=options.nchan, obs_time=options.obs_time, samp_rate=options.samp_rate)
tb.start()
tb.wait()
if __name__ == '__main__':
main()

122
top_block.py 100755 → 100644
View File

@ -1,30 +1,25 @@
#!/usr/bin/env python3
#!/usr/bin/env python2
# -*- coding: utf-8 -*-
#
# SPDX-License-Identifier: GPL-3.0
#
##################################################
# GNU Radio Python Flow Graph
# Title: Top Block
# GNU Radio version: 3.10.5.0
# Generated: Mon Oct 28 01:00:35 2019
##################################################
from gnuradio import blocks
from gnuradio import fft
from gnuradio.fft import window
from gnuradio import gr
from gnuradio.filter import firdes
import sys
import signal
from argparse import ArgumentParser
from gnuradio.eng_arg import eng_float, intx
from gnuradio import eng_notation
from gnuradio import fft
from gnuradio import gr
from gnuradio.eng_option import eng_option
from gnuradio.fft import window
from gnuradio.filter import firdes
from optparse import OptionParser
import numpy as np
import osmosdr
import time
class top_block(gr.top_block):
def __init__(self, c_freq=1420000000, nbin=1000, nchan=1024, obs_time=60, samp_rate=2400000):
@ -49,28 +44,25 @@ class top_block(gr.top_block):
##################################################
# Blocks
##################################################
self.fft_vxx_0 = fft.fft_vcc(nchan, True, window.blackmanharris(nchan), True, 1)
self.fft_vxx_0 = fft.fft_vcc(nchan, True, (window.blackmanharris(nchan)), True, 1)
self.blocks_stream_to_vector_0_2 = blocks.stream_to_vector(gr.sizeof_gr_complex*1, nchan)
self.blocks_stream_to_vector_0_1 = blocks.stream_to_vector(gr.sizeof_gr_complex*1, nchan)
self.blocks_stream_to_vector_0_0 = blocks.stream_to_vector(gr.sizeof_gr_complex*1, nchan)
self.blocks_stream_to_vector_0 = blocks.stream_to_vector(gr.sizeof_gr_complex*1, nchan)
self.blocks_multiply_const_vxx_0_0_0_0 = blocks.multiply_const_vcc(custom_window[0:nchan])
self.blocks_multiply_const_vxx_0_0_0 = blocks.multiply_const_vcc(custom_window[nchan:2*nchan])
self.blocks_multiply_const_vxx_0_0 = blocks.multiply_const_vcc(custom_window[2*nchan:3*nchan])
self.blocks_multiply_const_vxx_0 = blocks.multiply_const_vcc(custom_window[-nchan:])
self.blocks_multiply_const_vxx_0_0_0_0 = blocks.multiply_const_vcc((custom_window[0:nchan]))
self.blocks_multiply_const_vxx_0_0_0 = blocks.multiply_const_vcc((custom_window[nchan:2*nchan]))
self.blocks_multiply_const_vxx_0_0 = blocks.multiply_const_vcc((custom_window[2*nchan:3*nchan]))
self.blocks_multiply_const_vxx_0 = blocks.multiply_const_vcc((custom_window[-nchan:]))
self.blocks_integrate_xx_0 = blocks.integrate_ff(nbin, nchan)
self.blocks_head_0 = blocks.head(gr.sizeof_gr_complex*1, (int(obs_time*samp_rate)))
self.blocks_head_0 = blocks.head(gr.sizeof_gr_complex*1, int(obs_time*samp_rate))
self.blocks_file_sink_0 = blocks.file_sink(gr.sizeof_float*nchan, 'observation.dat', True)
self.blocks_file_sink_0.set_unbuffered(False)
self.blocks_delay_0_1 = blocks.delay(gr.sizeof_gr_complex*1, nchan)
self.blocks_delay_0_0 = blocks.delay(gr.sizeof_gr_complex*1, (nchan*2))
self.blocks_delay_0 = blocks.delay(gr.sizeof_gr_complex*1, (nchan*3))
self.blocks_delay_0_0 = blocks.delay(gr.sizeof_gr_complex*1, nchan*2)
self.blocks_delay_0 = blocks.delay(gr.sizeof_gr_complex*1, nchan*3)
self.blocks_complex_to_mag_squared_0 = blocks.complex_to_mag_squared(nchan)
self.blocks_add_xx_0 = blocks.add_vcc(nchan)
self.RTL820T = osmosdr.source(
args="numchan=" + str(1) + " " + ''
)
self.RTL820T.set_time_unknown_pps(osmosdr.time_spec_t())
self.RTL820T = osmosdr.source( args="numchan=" + str(1) + " " + '' )
self.RTL820T.set_sample_rate(samp_rate)
self.RTL820T.set_center_freq(c_freq, 0)
self.RTL820T.set_freq_corr(0, 0)
@ -108,7 +100,6 @@ class top_block(gr.top_block):
self.connect((self.blocks_stream_to_vector_0_2, 0), (self.blocks_multiply_const_vxx_0_0_0_0, 0))
self.connect((self.fft_vxx_0, 0), (self.blocks_complex_to_mag_squared_0, 0))
def get_c_freq(self):
return self.c_freq
@ -129,28 +120,28 @@ class top_block(gr.top_block):
self.nchan = nchan
self.set_custom_window(self.sinc*np.hamming(4*self.nchan))
self.set_sinc_sample_locations(np.arange(-np.pi*4/2.0, np.pi*4/2.0, np.pi/self.nchan))
self.blocks_delay_0.set_dly(int((self.nchan*3)))
self.blocks_delay_0_0.set_dly(int((self.nchan*2)))
self.blocks_delay_0_1.set_dly(int(self.nchan))
self.blocks_multiply_const_vxx_0.set_k(self.custom_window[-self.nchan:])
self.blocks_multiply_const_vxx_0_0.set_k(self.custom_window[2*self.nchan:3*self.nchan])
self.blocks_multiply_const_vxx_0_0_0.set_k(self.custom_window[self.nchan:2*self.nchan])
self.blocks_multiply_const_vxx_0_0_0_0.set_k(self.custom_window[0:self.nchan])
self.blocks_multiply_const_vxx_0_0_0_0.set_k((self.custom_window[0:self.nchan]))
self.blocks_multiply_const_vxx_0_0_0.set_k((self.custom_window[self.nchan:2*self.nchan]))
self.blocks_multiply_const_vxx_0_0.set_k((self.custom_window[2*self.nchan:3*self.nchan]))
self.blocks_multiply_const_vxx_0.set_k((self.custom_window[-self.nchan:]))
self.blocks_delay_0_1.set_dly(self.nchan)
self.blocks_delay_0_0.set_dly(self.nchan*2)
self.blocks_delay_0.set_dly(self.nchan*3)
def get_obs_time(self):
return self.obs_time
def set_obs_time(self, obs_time):
self.obs_time = obs_time
self.blocks_head_0.set_length((int(self.obs_time*self.samp_rate)))
self.blocks_head_0.set_length(int(self.obs_time*self.samp_rate))
def get_samp_rate(self):
return self.samp_rate
def set_samp_rate(self, samp_rate):
self.samp_rate = samp_rate
self.blocks_head_0.set_length(int(self.obs_time*self.samp_rate))
self.RTL820T.set_sample_rate(self.samp_rate)
self.blocks_head_0.set_length((int(self.obs_time*self.samp_rate)))
def get_sinc_sample_locations(self):
return self.sinc_sample_locations
@ -172,49 +163,38 @@ class top_block(gr.top_block):
def set_custom_window(self, custom_window):
self.custom_window = custom_window
self.blocks_multiply_const_vxx_0.set_k(self.custom_window[-self.nchan:])
self.blocks_multiply_const_vxx_0_0.set_k(self.custom_window[2*self.nchan:3*self.nchan])
self.blocks_multiply_const_vxx_0_0_0.set_k(self.custom_window[self.nchan:2*self.nchan])
self.blocks_multiply_const_vxx_0_0_0_0.set_k(self.custom_window[0:self.nchan])
self.blocks_multiply_const_vxx_0_0_0_0.set_k((self.custom_window[0:self.nchan]))
self.blocks_multiply_const_vxx_0_0_0.set_k((self.custom_window[self.nchan:2*self.nchan]))
self.blocks_multiply_const_vxx_0_0.set_k((self.custom_window[2*self.nchan:3*self.nchan]))
self.blocks_multiply_const_vxx_0.set_k((self.custom_window[-self.nchan:]))
def argument_parser():
parser = ArgumentParser()
parser.add_argument(
"--c-freq", dest="c_freq", type=eng_float, default=eng_notation.num_to_str(float(1420000000)),
help="Set c_freq [default=%(default)r]")
parser.add_argument(
"--nbin", dest="nbin", type=intx, default=1000,
help="Set nbin [default=%(default)r]")
parser.add_argument(
"--nchan", dest="nchan", type=intx, default=1024,
help="Set nchan [default=%(default)r]")
parser.add_argument(
"--obs-time", dest="obs_time", type=eng_float, default=eng_notation.num_to_str(float(60)),
help="Set obs_time [default=%(default)r]")
parser.add_argument(
"--samp-rate", dest="samp_rate", type=eng_float, default=eng_notation.num_to_str(float(2400000)),
help="Set samp_rate [default=%(default)r]")
parser = OptionParser(usage="%prog: [options]", option_class=eng_option)
parser.add_option(
"", "--c-freq", dest="c_freq", type="eng_float", default=eng_notation.num_to_str(1420000000),
help="Set c_freq [default=%default]")
parser.add_option(
"", "--nbin", dest="nbin", type="intx", default=1000,
help="Set nbin [default=%default]")
parser.add_option(
"", "--nchan", dest="nchan", type="intx", default=1024,
help="Set nchan [default=%default]")
parser.add_option(
"", "--obs-time", dest="obs_time", type="eng_float", default=eng_notation.num_to_str(60),
help="Set obs_time [default=%default]")
parser.add_option(
"", "--samp-rate", dest="samp_rate", type="eng_float", default=eng_notation.num_to_str(2400000),
help="Set samp_rate [default=%default]")
return parser
def main(top_block_cls=top_block, options=None):
if options is None:
options = argument_parser().parse_args()
options, _ = argument_parser().parse_args()
tb = top_block_cls(c_freq=options.c_freq, nbin=options.nbin, nchan=options.nchan, obs_time=options.obs_time, samp_rate=options.samp_rate)
def sig_handler(sig=None, frame=None):
tb.stop()
tb.wait()
sys.exit(0)
signal.signal(signal.SIGINT, sig_handler)
signal.signal(signal.SIGTERM, sig_handler)
tb.start()
tb.wait()