From ec2706053d314e895b662d42d9b4efbc1d4030f9 Mon Sep 17 00:00:00 2001 From: Jeff Moe Date: Thu, 19 Jan 2023 10:36:17 -0700 Subject: [PATCH] kludge old py2 top_block with gain --- top_block-gain-10.py | 202 ++++++++++++++++++++++ top_block.py.orig => top_block-gain-30.py | 0 top_block.py | 122 ++++++------- 3 files changed, 253 insertions(+), 71 deletions(-) create mode 100644 top_block-gain-10.py rename top_block.py.orig => top_block-gain-30.py (100%) mode change 100755 => 100644 top_block.py diff --git a/top_block-gain-10.py b/top_block-gain-10.py new file mode 100644 index 0000000..650a936 --- /dev/null +++ b/top_block-gain-10.py @@ -0,0 +1,202 @@ +#!/usr/bin/env python2 +# -*- coding: utf-8 -*- +################################################## +# GNU Radio Python Flow Graph +# Title: Top Block +# Generated: Mon Oct 28 01:00:35 2019 +################################################## + + +from gnuradio import blocks +from gnuradio import eng_notation +from gnuradio import fft +from gnuradio import gr +from gnuradio.eng_option import eng_option +from gnuradio.fft import window +from gnuradio.filter import firdes +from optparse import OptionParser +import numpy as np +import osmosdr +import time + + +class top_block(gr.top_block): + + def __init__(self, c_freq=1420000000, nbin=1000, nchan=1024, obs_time=60, samp_rate=2400000): + gr.top_block.__init__(self, "Top Block") + + ################################################## + # Parameters + ################################################## + self.c_freq = c_freq + self.nbin = nbin + self.nchan = nchan + self.obs_time = obs_time + self.samp_rate = samp_rate + + ################################################## + # Variables + ################################################## + self.sinc_sample_locations = sinc_sample_locations = np.arange(-np.pi*4/2.0, np.pi*4/2.0, np.pi/nchan) + self.sinc = sinc = np.sinc(sinc_sample_locations/np.pi) + self.custom_window = custom_window = sinc*np.hamming(4*nchan) + + ################################################## + # Blocks + ################################################## + self.fft_vxx_0 = fft.fft_vcc(nchan, True, (window.blackmanharris(nchan)), True, 1) + self.blocks_stream_to_vector_0_2 = blocks.stream_to_vector(gr.sizeof_gr_complex*1, nchan) + self.blocks_stream_to_vector_0_1 = blocks.stream_to_vector(gr.sizeof_gr_complex*1, nchan) + self.blocks_stream_to_vector_0_0 = blocks.stream_to_vector(gr.sizeof_gr_complex*1, nchan) + self.blocks_stream_to_vector_0 = blocks.stream_to_vector(gr.sizeof_gr_complex*1, nchan) + self.blocks_multiply_const_vxx_0_0_0_0 = blocks.multiply_const_vcc((custom_window[0:nchan])) + self.blocks_multiply_const_vxx_0_0_0 = blocks.multiply_const_vcc((custom_window[nchan:2*nchan])) + self.blocks_multiply_const_vxx_0_0 = blocks.multiply_const_vcc((custom_window[2*nchan:3*nchan])) + self.blocks_multiply_const_vxx_0 = blocks.multiply_const_vcc((custom_window[-nchan:])) + self.blocks_integrate_xx_0 = blocks.integrate_ff(nbin, nchan) + self.blocks_head_0 = blocks.head(gr.sizeof_gr_complex*1, int(obs_time*samp_rate)) + self.blocks_file_sink_0 = blocks.file_sink(gr.sizeof_float*nchan, 'observation.dat', True) + self.blocks_file_sink_0.set_unbuffered(False) + self.blocks_delay_0_1 = blocks.delay(gr.sizeof_gr_complex*1, nchan) + self.blocks_delay_0_0 = blocks.delay(gr.sizeof_gr_complex*1, nchan*2) + self.blocks_delay_0 = blocks.delay(gr.sizeof_gr_complex*1, nchan*3) + self.blocks_complex_to_mag_squared_0 = blocks.complex_to_mag_squared(nchan) + self.blocks_add_xx_0 = blocks.add_vcc(nchan) + self.RTL820T = osmosdr.source( args="numchan=" + str(1) + " " + '' ) + self.RTL820T.set_sample_rate(samp_rate) + self.RTL820T.set_center_freq(c_freq, 0) + self.RTL820T.set_freq_corr(0, 0) + self.RTL820T.set_dc_offset_mode(0, 0) + self.RTL820T.set_iq_balance_mode(0, 0) + self.RTL820T.set_gain_mode(False, 0) + self.RTL820T.set_gain(10, 0) + self.RTL820T.set_if_gain(10, 0) + self.RTL820T.set_bb_gain(10, 0) + self.RTL820T.set_antenna('', 0) + self.RTL820T.set_bandwidth(0, 0) + + + ################################################## + # Connections + ################################################## + self.connect((self.RTL820T, 0), (self.blocks_head_0, 0)) + self.connect((self.blocks_add_xx_0, 0), (self.fft_vxx_0, 0)) + self.connect((self.blocks_complex_to_mag_squared_0, 0), (self.blocks_integrate_xx_0, 0)) + self.connect((self.blocks_delay_0, 0), (self.blocks_stream_to_vector_0_2, 0)) + self.connect((self.blocks_delay_0_0, 0), (self.blocks_stream_to_vector_0_0, 0)) + self.connect((self.blocks_delay_0_1, 0), (self.blocks_stream_to_vector_0_1, 0)) + self.connect((self.blocks_head_0, 0), (self.blocks_delay_0, 0)) + self.connect((self.blocks_head_0, 0), (self.blocks_delay_0_0, 0)) + self.connect((self.blocks_head_0, 0), (self.blocks_delay_0_1, 0)) + self.connect((self.blocks_head_0, 0), (self.blocks_stream_to_vector_0, 0)) + self.connect((self.blocks_integrate_xx_0, 0), (self.blocks_file_sink_0, 0)) + self.connect((self.blocks_multiply_const_vxx_0, 0), (self.blocks_add_xx_0, 0)) + self.connect((self.blocks_multiply_const_vxx_0_0, 0), (self.blocks_add_xx_0, 1)) + self.connect((self.blocks_multiply_const_vxx_0_0_0, 0), (self.blocks_add_xx_0, 2)) + self.connect((self.blocks_multiply_const_vxx_0_0_0_0, 0), (self.blocks_add_xx_0, 3)) + self.connect((self.blocks_stream_to_vector_0, 0), (self.blocks_multiply_const_vxx_0, 0)) + self.connect((self.blocks_stream_to_vector_0_0, 0), (self.blocks_multiply_const_vxx_0_0_0, 0)) + self.connect((self.blocks_stream_to_vector_0_1, 0), (self.blocks_multiply_const_vxx_0_0, 0)) + self.connect((self.blocks_stream_to_vector_0_2, 0), (self.blocks_multiply_const_vxx_0_0_0_0, 0)) + self.connect((self.fft_vxx_0, 0), (self.blocks_complex_to_mag_squared_0, 0)) + + def get_c_freq(self): + return self.c_freq + + def set_c_freq(self, c_freq): + self.c_freq = c_freq + self.RTL820T.set_center_freq(self.c_freq, 0) + + def get_nbin(self): + return self.nbin + + def set_nbin(self, nbin): + self.nbin = nbin + + def get_nchan(self): + return self.nchan + + def set_nchan(self, nchan): + self.nchan = nchan + self.set_custom_window(self.sinc*np.hamming(4*self.nchan)) + self.set_sinc_sample_locations(np.arange(-np.pi*4/2.0, np.pi*4/2.0, np.pi/self.nchan)) + self.blocks_multiply_const_vxx_0_0_0_0.set_k((self.custom_window[0:self.nchan])) + self.blocks_multiply_const_vxx_0_0_0.set_k((self.custom_window[self.nchan:2*self.nchan])) + self.blocks_multiply_const_vxx_0_0.set_k((self.custom_window[2*self.nchan:3*self.nchan])) + self.blocks_multiply_const_vxx_0.set_k((self.custom_window[-self.nchan:])) + self.blocks_delay_0_1.set_dly(self.nchan) + self.blocks_delay_0_0.set_dly(self.nchan*2) + self.blocks_delay_0.set_dly(self.nchan*3) + + def get_obs_time(self): + return self.obs_time + + def set_obs_time(self, obs_time): + self.obs_time = obs_time + self.blocks_head_0.set_length(int(self.obs_time*self.samp_rate)) + + def get_samp_rate(self): + return self.samp_rate + + def set_samp_rate(self, samp_rate): + self.samp_rate = samp_rate + self.blocks_head_0.set_length(int(self.obs_time*self.samp_rate)) + self.RTL820T.set_sample_rate(self.samp_rate) + + def get_sinc_sample_locations(self): + return self.sinc_sample_locations + + def set_sinc_sample_locations(self, sinc_sample_locations): + self.sinc_sample_locations = sinc_sample_locations + self.set_sinc(np.sinc(self.sinc_sample_locations/np.pi)) + + def get_sinc(self): + return self.sinc + + def set_sinc(self, sinc): + self.sinc = sinc + self.set_custom_window(self.sinc*np.hamming(4*self.nchan)) + self.set_sinc(np.sinc(self.sinc_sample_locations/np.pi)) + + def get_custom_window(self): + return self.custom_window + + def set_custom_window(self, custom_window): + self.custom_window = custom_window + self.blocks_multiply_const_vxx_0_0_0_0.set_k((self.custom_window[0:self.nchan])) + self.blocks_multiply_const_vxx_0_0_0.set_k((self.custom_window[self.nchan:2*self.nchan])) + self.blocks_multiply_const_vxx_0_0.set_k((self.custom_window[2*self.nchan:3*self.nchan])) + self.blocks_multiply_const_vxx_0.set_k((self.custom_window[-self.nchan:])) + + +def argument_parser(): + parser = OptionParser(usage="%prog: [options]", option_class=eng_option) + parser.add_option( + "", "--c-freq", dest="c_freq", type="eng_float", default=eng_notation.num_to_str(1420000000), + help="Set c_freq [default=%default]") + parser.add_option( + "", "--nbin", dest="nbin", type="intx", default=1000, + help="Set nbin [default=%default]") + parser.add_option( + "", "--nchan", dest="nchan", type="intx", default=1024, + help="Set nchan [default=%default]") + parser.add_option( + "", "--obs-time", dest="obs_time", type="eng_float", default=eng_notation.num_to_str(60), + help="Set obs_time [default=%default]") + parser.add_option( + "", "--samp-rate", dest="samp_rate", type="eng_float", default=eng_notation.num_to_str(2400000), + help="Set samp_rate [default=%default]") + return parser + + +def main(top_block_cls=top_block, options=None): + if options is None: + options, _ = argument_parser().parse_args() + + tb = top_block_cls(c_freq=options.c_freq, nbin=options.nbin, nchan=options.nchan, obs_time=options.obs_time, samp_rate=options.samp_rate) + tb.start() + tb.wait() + + +if __name__ == '__main__': + main() diff --git a/top_block.py.orig b/top_block-gain-30.py similarity index 100% rename from top_block.py.orig rename to top_block-gain-30.py diff --git a/top_block.py b/top_block.py old mode 100755 new mode 100644 index 365cbbe..650a936 --- a/top_block.py +++ b/top_block.py @@ -1,30 +1,25 @@ -#!/usr/bin/env python3 +#!/usr/bin/env python2 # -*- coding: utf-8 -*- - -# -# SPDX-License-Identifier: GPL-3.0 -# +################################################## # GNU Radio Python Flow Graph # Title: Top Block -# GNU Radio version: 3.10.5.0 +# Generated: Mon Oct 28 01:00:35 2019 +################################################## + from gnuradio import blocks -from gnuradio import fft -from gnuradio.fft import window -from gnuradio import gr -from gnuradio.filter import firdes -import sys -import signal -from argparse import ArgumentParser -from gnuradio.eng_arg import eng_float, intx from gnuradio import eng_notation +from gnuradio import fft +from gnuradio import gr +from gnuradio.eng_option import eng_option +from gnuradio.fft import window +from gnuradio.filter import firdes +from optparse import OptionParser import numpy as np import osmosdr import time - - class top_block(gr.top_block): def __init__(self, c_freq=1420000000, nbin=1000, nchan=1024, obs_time=60, samp_rate=2400000): @@ -49,28 +44,25 @@ class top_block(gr.top_block): ################################################## # Blocks ################################################## - self.fft_vxx_0 = fft.fft_vcc(nchan, True, window.blackmanharris(nchan), True, 1) + self.fft_vxx_0 = fft.fft_vcc(nchan, True, (window.blackmanharris(nchan)), True, 1) self.blocks_stream_to_vector_0_2 = blocks.stream_to_vector(gr.sizeof_gr_complex*1, nchan) self.blocks_stream_to_vector_0_1 = blocks.stream_to_vector(gr.sizeof_gr_complex*1, nchan) self.blocks_stream_to_vector_0_0 = blocks.stream_to_vector(gr.sizeof_gr_complex*1, nchan) self.blocks_stream_to_vector_0 = blocks.stream_to_vector(gr.sizeof_gr_complex*1, nchan) - self.blocks_multiply_const_vxx_0_0_0_0 = blocks.multiply_const_vcc(custom_window[0:nchan]) - self.blocks_multiply_const_vxx_0_0_0 = blocks.multiply_const_vcc(custom_window[nchan:2*nchan]) - self.blocks_multiply_const_vxx_0_0 = blocks.multiply_const_vcc(custom_window[2*nchan:3*nchan]) - self.blocks_multiply_const_vxx_0 = blocks.multiply_const_vcc(custom_window[-nchan:]) + self.blocks_multiply_const_vxx_0_0_0_0 = blocks.multiply_const_vcc((custom_window[0:nchan])) + self.blocks_multiply_const_vxx_0_0_0 = blocks.multiply_const_vcc((custom_window[nchan:2*nchan])) + self.blocks_multiply_const_vxx_0_0 = blocks.multiply_const_vcc((custom_window[2*nchan:3*nchan])) + self.blocks_multiply_const_vxx_0 = blocks.multiply_const_vcc((custom_window[-nchan:])) self.blocks_integrate_xx_0 = blocks.integrate_ff(nbin, nchan) - self.blocks_head_0 = blocks.head(gr.sizeof_gr_complex*1, (int(obs_time*samp_rate))) + self.blocks_head_0 = blocks.head(gr.sizeof_gr_complex*1, int(obs_time*samp_rate)) self.blocks_file_sink_0 = blocks.file_sink(gr.sizeof_float*nchan, 'observation.dat', True) self.blocks_file_sink_0.set_unbuffered(False) self.blocks_delay_0_1 = blocks.delay(gr.sizeof_gr_complex*1, nchan) - self.blocks_delay_0_0 = blocks.delay(gr.sizeof_gr_complex*1, (nchan*2)) - self.blocks_delay_0 = blocks.delay(gr.sizeof_gr_complex*1, (nchan*3)) + self.blocks_delay_0_0 = blocks.delay(gr.sizeof_gr_complex*1, nchan*2) + self.blocks_delay_0 = blocks.delay(gr.sizeof_gr_complex*1, nchan*3) self.blocks_complex_to_mag_squared_0 = blocks.complex_to_mag_squared(nchan) self.blocks_add_xx_0 = blocks.add_vcc(nchan) - self.RTL820T = osmosdr.source( - args="numchan=" + str(1) + " " + '' - ) - self.RTL820T.set_time_unknown_pps(osmosdr.time_spec_t()) + self.RTL820T = osmosdr.source( args="numchan=" + str(1) + " " + '' ) self.RTL820T.set_sample_rate(samp_rate) self.RTL820T.set_center_freq(c_freq, 0) self.RTL820T.set_freq_corr(0, 0) @@ -108,7 +100,6 @@ class top_block(gr.top_block): self.connect((self.blocks_stream_to_vector_0_2, 0), (self.blocks_multiply_const_vxx_0_0_0_0, 0)) self.connect((self.fft_vxx_0, 0), (self.blocks_complex_to_mag_squared_0, 0)) - def get_c_freq(self): return self.c_freq @@ -129,28 +120,28 @@ class top_block(gr.top_block): self.nchan = nchan self.set_custom_window(self.sinc*np.hamming(4*self.nchan)) self.set_sinc_sample_locations(np.arange(-np.pi*4/2.0, np.pi*4/2.0, np.pi/self.nchan)) - self.blocks_delay_0.set_dly(int((self.nchan*3))) - self.blocks_delay_0_0.set_dly(int((self.nchan*2))) - self.blocks_delay_0_1.set_dly(int(self.nchan)) - self.blocks_multiply_const_vxx_0.set_k(self.custom_window[-self.nchan:]) - self.blocks_multiply_const_vxx_0_0.set_k(self.custom_window[2*self.nchan:3*self.nchan]) - self.blocks_multiply_const_vxx_0_0_0.set_k(self.custom_window[self.nchan:2*self.nchan]) - self.blocks_multiply_const_vxx_0_0_0_0.set_k(self.custom_window[0:self.nchan]) + self.blocks_multiply_const_vxx_0_0_0_0.set_k((self.custom_window[0:self.nchan])) + self.blocks_multiply_const_vxx_0_0_0.set_k((self.custom_window[self.nchan:2*self.nchan])) + self.blocks_multiply_const_vxx_0_0.set_k((self.custom_window[2*self.nchan:3*self.nchan])) + self.blocks_multiply_const_vxx_0.set_k((self.custom_window[-self.nchan:])) + self.blocks_delay_0_1.set_dly(self.nchan) + self.blocks_delay_0_0.set_dly(self.nchan*2) + self.blocks_delay_0.set_dly(self.nchan*3) def get_obs_time(self): return self.obs_time def set_obs_time(self, obs_time): self.obs_time = obs_time - self.blocks_head_0.set_length((int(self.obs_time*self.samp_rate))) + self.blocks_head_0.set_length(int(self.obs_time*self.samp_rate)) def get_samp_rate(self): return self.samp_rate def set_samp_rate(self, samp_rate): self.samp_rate = samp_rate + self.blocks_head_0.set_length(int(self.obs_time*self.samp_rate)) self.RTL820T.set_sample_rate(self.samp_rate) - self.blocks_head_0.set_length((int(self.obs_time*self.samp_rate))) def get_sinc_sample_locations(self): return self.sinc_sample_locations @@ -172,49 +163,38 @@ class top_block(gr.top_block): def set_custom_window(self, custom_window): self.custom_window = custom_window - self.blocks_multiply_const_vxx_0.set_k(self.custom_window[-self.nchan:]) - self.blocks_multiply_const_vxx_0_0.set_k(self.custom_window[2*self.nchan:3*self.nchan]) - self.blocks_multiply_const_vxx_0_0_0.set_k(self.custom_window[self.nchan:2*self.nchan]) - self.blocks_multiply_const_vxx_0_0_0_0.set_k(self.custom_window[0:self.nchan]) - + self.blocks_multiply_const_vxx_0_0_0_0.set_k((self.custom_window[0:self.nchan])) + self.blocks_multiply_const_vxx_0_0_0.set_k((self.custom_window[self.nchan:2*self.nchan])) + self.blocks_multiply_const_vxx_0_0.set_k((self.custom_window[2*self.nchan:3*self.nchan])) + self.blocks_multiply_const_vxx_0.set_k((self.custom_window[-self.nchan:])) def argument_parser(): - parser = ArgumentParser() - parser.add_argument( - "--c-freq", dest="c_freq", type=eng_float, default=eng_notation.num_to_str(float(1420000000)), - help="Set c_freq [default=%(default)r]") - parser.add_argument( - "--nbin", dest="nbin", type=intx, default=1000, - help="Set nbin [default=%(default)r]") - parser.add_argument( - "--nchan", dest="nchan", type=intx, default=1024, - help="Set nchan [default=%(default)r]") - parser.add_argument( - "--obs-time", dest="obs_time", type=eng_float, default=eng_notation.num_to_str(float(60)), - help="Set obs_time [default=%(default)r]") - parser.add_argument( - "--samp-rate", dest="samp_rate", type=eng_float, default=eng_notation.num_to_str(float(2400000)), - help="Set samp_rate [default=%(default)r]") + parser = OptionParser(usage="%prog: [options]", option_class=eng_option) + parser.add_option( + "", "--c-freq", dest="c_freq", type="eng_float", default=eng_notation.num_to_str(1420000000), + help="Set c_freq [default=%default]") + parser.add_option( + "", "--nbin", dest="nbin", type="intx", default=1000, + help="Set nbin [default=%default]") + parser.add_option( + "", "--nchan", dest="nchan", type="intx", default=1024, + help="Set nchan [default=%default]") + parser.add_option( + "", "--obs-time", dest="obs_time", type="eng_float", default=eng_notation.num_to_str(60), + help="Set obs_time [default=%default]") + parser.add_option( + "", "--samp-rate", dest="samp_rate", type="eng_float", default=eng_notation.num_to_str(2400000), + help="Set samp_rate [default=%default]") return parser def main(top_block_cls=top_block, options=None): if options is None: - options = argument_parser().parse_args() + options, _ = argument_parser().parse_args() + tb = top_block_cls(c_freq=options.c_freq, nbin=options.nbin, nchan=options.nchan, obs_time=options.obs_time, samp_rate=options.samp_rate) - - def sig_handler(sig=None, frame=None): - tb.stop() - tb.wait() - - sys.exit(0) - - signal.signal(signal.SIGINT, sig_handler) - signal.signal(signal.SIGTERM, sig_handler) - tb.start() - tb.wait()