Reformat python code with black

spacecruft
Jeff Moe 2023-01-21 10:59:08 -07:00
parent a8ba269233
commit 9c2c248aed
5 changed files with 637 additions and 276 deletions

View File

@ -16,14 +16,14 @@ parser.add_argument("-F", "--fmax", type=float, required=True)
args = parser.parse_args()
bandwidth=args.bandwidth
channels=args.channels
t_int=args.t_int
duration=args.duration
fmin=args.fmin
fmax=args.fmax
bandwidth = args.bandwidth
channels = args.channels
t_int = args.t_int
duration = args.duration
fmin = args.fmin
fmax = args.fmax
#Define GRC observation parameters
# Define GRC observation parameters
bandwidth = str(bandwidth)
channels = str(channels)
t_int = str(t_int)
@ -31,69 +31,141 @@ duration = str(duration)
fmin = float(fmin)
fmax = float(fmax)
nbins = str(int(float(t_int) * float(bandwidth)/float(channels)))
nbins = str(int(float(t_int) * float(bandwidth) / float(channels)))
#Delete pre-existing observation.dat file
for file_name in os.listdir('.'):
if file_name.endswith('.dat'):
# Delete pre-existing observation.dat file
for file_name in os.listdir("."):
if file_name.endswith(".dat"):
try:
os.remove(file_name)
except OSError:
pass
try:
os.remove('rfi_plot.png')
os.remove("rfi_plot.png")
except OSError:
pass
print('\n\033[1;33;48m+=================================================================+')
print('\033[1;33;48m| \033[1;36;48mCygnusRFI\033[1;33;48m:\033[1;32;48m An open-source Radio Frequency Interference analyzer\033[1;33;48m |')
print('\033[1;33;48m+=================================================================+')
print(
"\n\033[1;33;48m+=================================================================+"
)
print(
"\033[1;33;48m| \033[1;36;48mCygnusRFI\033[1;33;48m:\033[1;32;48m An open-source Radio Frequency Interference analyzer\033[1;33;48m |"
)
print(
"\033[1;33;48m+=================================================================+"
)
sleep(0.5)
print('\n\033[1;33;48m\033[4;33;48mRFI Measurement Parameters:\033[0;32;48m')
print("\n\033[1;33;48m\033[4;33;48mRFI Measurement Parameters:\033[0;32;48m")
sleep(0.15)
print(('\033[1;32;48mFrequency range to scan: \033[1;36;48m'+str(float(fmin)/1000000)+'-'+str(float(fmax)/1000000)+' MHz'))
print(
(
"\033[1;32;48mFrequency range to scan: \033[1;36;48m"
+ str(float(fmin) / 1000000)
+ "-"
+ str(float(fmax) / 1000000)
+ " MHz"
)
)
sleep(0.15)
print(('\033[1;32;48mBandwidth per spectrum: \033[1;36;48m'+str(float(bandwidth)/1000000)+' MHz'))
print(
(
"\033[1;32;48mBandwidth per spectrum: \033[1;36;48m"
+ str(float(bandwidth) / 1000000)
+ " MHz"
)
)
sleep(0.15)
print(('\033[1;32;48mIntegration time per spectrum: \033[1;36;48m'+duration+' sec'))
print(("\033[1;32;48mIntegration time per spectrum: \033[1;36;48m" + duration + " sec"))
sleep(0.15)
print(('\033[1;32;48mNumber of channels per spectrum (FFT Size should be a power of 2): \033[1;36;48m'+str(channels)))
print(
(
"\033[1;32;48mNumber of channels per spectrum (FFT Size should be a power of 2): \033[1;36;48m"
+ str(channels)
)
)
sleep(0.15)
print(('\033[1;32;48mIntegration time per FFT sample: \033[1;36;48m'+t_int+' sec'))
print(("\033[1;32;48mIntegration time per FFT sample: \033[1;36;48m" + t_int + " sec"))
sleep(0.5)
print(("\n\033[1;32;48mEstimated completion time: \033[1;36;48m"+str(float(duration)*float(fmax-fmin)/float(bandwidth))+" sec"))
print(
(
"\n\033[1;32;48mEstimated completion time: \033[1;36;48m"
+ str(float(duration) * float(fmax - fmin) / float(bandwidth))
+ " sec"
)
)
sleep(0.5)
#proceed = eval(input("\n\033[1;36;48mProceed to measurement? [Y/n]: \033[1;33;48m"))
proceed = 'Y'
# proceed = eval(input("\n\033[1;36;48mProceed to measurement? [Y/n]: \033[1;33;48m"))
proceed = "Y"
if proceed.lower() != 'n' and proceed.lower() != 'no':
print('\n\033[1;33;48m+=================================================================+')
print('\033[1;33;48m| [+] \033[1;32;48m Starting measurement...\033[1;33;48m |')
print('\033[1;33;48m+=================================================================+\n')
q=0
if proceed.lower() != "n" and proceed.lower() != "no":
print(
"\n\033[1;33;48m+=================================================================+"
)
print(
"\033[1;33;48m| [+] \033[1;32;48m Starting measurement...\033[1;33;48m |"
)
print(
"\033[1;33;48m+=================================================================+\n"
)
q = 0
for freq in range(int(fmin), int(fmax), int(float(bandwidth))):
print(("\033[1;33;48m\n---------------------------------------------------------------------------\n \033[1;33;48m[*] \033[1;32;48mCurrently monitoring f_center = "+str(0.000001*freq)+" +/- "+str(float(float(bandwidth)*0.000001)/2)+" MHz (iteration: "+str(q)+")...\n\033[1;33;48m---------------------------------------------------------------------------"))
#Define observation frequency
print(
(
"\033[1;33;48m\n---------------------------------------------------------------------------\n \033[1;33;48m[*] \033[1;32;48mCurrently monitoring f_center = "
+ str(0.000001 * freq)
+ " +/- "
+ str(float(float(bandwidth) * 0.000001) / 2)
+ " MHz (iteration: "
+ str(q)
+ ")...\n\033[1;33;48m---------------------------------------------------------------------------"
)
)
# Define observation frequency
f_center = str(freq)
#Execute top_block.py with parameters
print('\033[0m')
sys.argv = ['top_block.py', '--c-freq='+f_center, '--samp-rate='+bandwidth, '--nchan='+channels, '--nbin='+nbins, '--obs-time='+duration]
exec(compile(open('top_block.py', "rb").read(), 'top_block.py', 'exec'))
os.rename('observation.dat', str(q)+'.dat')
q = q+1
print('\n\033[1;33;48m+=================================================================+')
print('\033[1;33;48m| \033[1;32;48mMeasurement finished!\033[1;33;48m |')
print('\033[1;33;48m+=================================================================+\n')
# Execute top_block.py with parameters
print("\033[0m")
sys.argv = [
"top_block.py",
"--c-freq=" + f_center,
"--samp-rate=" + bandwidth,
"--nchan=" + channels,
"--nbin=" + nbins,
"--obs-time=" + duration,
]
exec(compile(open("top_block.py", "rb").read(), "top_block.py", "exec"))
os.rename("observation.dat", str(q) + ".dat")
q = q + 1
print(
"\n\033[1;33;48m+=================================================================+"
)
print(
"\033[1;33;48m| \033[1;32;48mMeasurement finished!\033[1;33;48m |"
)
print(
"\033[1;33;48m+=================================================================+\n"
)
f_center = str(fmin)
sys.argv = ['rfi_plotter.py', 'freq='+f_center, 'samp_rate='+bandwidth, 'nchan='+channels, 'nbin='+nbins, 'n='+str(q), 'dur='+duration, 'fminimum='+str(fmin), 'fmaximum='+str(fmax)]
exec(compile(open('rfi_plotter.py', "rb").read(), 'rfi_plotter.py', 'exec'))
print('\033[1;32;48mYour data has been saved as \033[1;36;48mrfi_plot.png\033[1;32;48m.')
sys.argv = [
"rfi_plotter.py",
"freq=" + f_center,
"samp_rate=" + bandwidth,
"nchan=" + channels,
"nbin=" + nbins,
"n=" + str(q),
"dur=" + duration,
"fminimum=" + str(fmin),
"fmaximum=" + str(fmax),
]
exec(compile(open("rfi_plotter.py", "rb").read(), "rfi_plotter.py", "exec"))
print(
"\033[1;32;48mYour data has been saved as \033[1;36;48mrfi_plot.png\033[1;32;48m."
)
else:
print('\n\033[1;31;48m[!] Exiting...\n')
print("\n\033[1;31;48m[!] Exiting...\n")

View File

@ -1,31 +1,33 @@
#!/usr/bin/env python
try:
import matplotlib
matplotlib.use('Agg')
matplotlib.use("Agg")
import numpy as np
import matplotlib.pyplot as plt
plt.rcParams.update({'font.size': 32})
plt.rcParams.update({"font.size": 32})
import argparse
from matplotlib.gridspec import GridSpec
parser = argparse.ArgumentParser()
parser.add_argument('freq')
parser.add_argument('samp_rate')
parser.add_argument('nchan')
parser.add_argument('nbin')
parser.add_argument('n')
parser.add_argument('dur')
parser.add_argument('fminimum')
parser.add_argument('fmaximum')
parser.add_argument("freq")
parser.add_argument("samp_rate")
parser.add_argument("nchan")
parser.add_argument("nbin")
parser.add_argument("n")
parser.add_argument("dur")
parser.add_argument("fminimum")
parser.add_argument("fmaximum")
args = parser.parse_args()
def decibel(x):
return 10.0*np.log10(x)
#return x
return 10.0 * np.log10(x)
# return x
if __name__ == "__main__":
#Observation parameters
# Observation parameters
exec(args.freq)
exec(args.samp_rate)
exec(args.nchan)
@ -34,46 +36,56 @@ try:
exec(args.dur)
exec(args.fminimum)
exec(args.fmaximum)
frequency=freq
#Load data
z=1000*(np.fromfile("0.dat", dtype="float32").reshape(-1, nchan)/nbin)
allarrays=[]
#N = amount of .dat files
frequency = freq
# Load data
z = 1000 * (np.fromfile("0.dat", dtype="float32").reshape(-1, nchan) / nbin)
allarrays = []
# N = amount of .dat files
for i in range(int(n)):
final = 1000*(np.fromfile(str(i)+".dat", dtype="float32").reshape(-1, nchan)/nbin)
final = 1000 * (
np.fromfile(str(i) + ".dat", dtype="float32").reshape(-1, nchan) / nbin
)
allarrays.append(final)
z_final = np.concatenate(allarrays,axis=1)
#Number of sub-integrations
z_final = np.concatenate(allarrays, axis=1)
# Number of sub-integrations
nsub = z_final.shape[0]
#Compute average spectrum
# Compute average spectrum
zmean = np.mean(z_final, axis=0)
#Compute time axis
tint = float(nbin*nchan)/samp_rate
t = tint*np.arange(nsub)
#Compute frequency axis (convert to MHz)
allfreq=[]
# Compute time axis
tint = float(nbin * nchan) / samp_rate
t = tint * np.arange(nsub)
# Compute frequency axis (convert to MHz)
allfreq = []
for i in range(int(n)):
freq = np.linspace((frequency+samp_rate*i)-0.5*samp_rate, (frequency+samp_rate*i)+0.5*samp_rate, nchan, endpoint=False)*1e-6
freq = (
np.linspace(
(frequency + samp_rate * i) - 0.5 * samp_rate,
(frequency + samp_rate * i) + 0.5 * samp_rate,
nchan,
endpoint=False,
)
* 1e-6
)
allfreq.append(freq)
freq = np.concatenate(allfreq)
#Initialize plot
fig = plt.figure(figsize=(5*n,20.25))
gs = GridSpec(1,1)
#Plot average spectrum
ax1 = fig.add_subplot(gs[0,0])
ax1.plot(freq, decibel(zmean), '#3182bd')
ax1.fill_between(freq, decibel(zmean), color='#deebf7')
ax1.set_xlim(np.min(freq), np.max(freq)) #np.min(freq), np.max(freq)
ax1.set_ylim(np.amin(decibel(zmean))-0.5, np.amin(decibel(zmean))+15)
# Initialize plot
fig = plt.figure(figsize=(5 * n, 20.25))
gs = GridSpec(1, 1)
# Plot average spectrum
ax1 = fig.add_subplot(gs[0, 0])
ax1.plot(freq, decibel(zmean), "#3182bd")
ax1.fill_between(freq, decibel(zmean), color="#deebf7")
ax1.set_xlim(np.min(freq), np.max(freq)) # np.min(freq), np.max(freq)
ax1.set_ylim(np.amin(decibel(zmean)) - 0.5, np.amin(decibel(zmean)) + 15)
ax1.ticklabel_format(useOffset=False)
ax1.set_xlabel("Frequency (MHz)", labelpad=25)
ax1.set_ylabel("Relative Power (dB)", labelpad=25)
@ -81,9 +93,28 @@ try:
ax1.set_title("\nAveraged RFI Spectrum", pad=25)
except:
ax1.set_title("\nAveraged RFI Spectrum")
ax1.annotate('Frequency range scanned: '+str(float(fminimum)/1000000)+'-'+str(float(fmaximum)/1000000)+' MHz ($\\Delta\\nu$ = '+str(float((fmaximum)-float(fminimum))/1000000)+' MHz)\nBandwidth per spectrum: '+str(float(samp_rate)/1000000)+' MHz\nIntegration time per spectrum: '+str(dur)+" sec\nNumber of channels per spectrum (FFT size): "+str(nchan), xy=(17, 1179), xycoords='axes points', size=32, ha='left', va='top', color='brown')
ax1.annotate(
"Frequency range scanned: "
+ str(float(fminimum) / 1000000)
+ "-"
+ str(float(fmaximum) / 1000000)
+ " MHz ($\\Delta\\nu$ = "
+ str(float((fmaximum) - float(fminimum)) / 1000000)
+ " MHz)\nBandwidth per spectrum: "
+ str(float(samp_rate) / 1000000)
+ " MHz\nIntegration time per spectrum: "
+ str(dur)
+ " sec\nNumber of channels per spectrum (FFT size): "
+ str(nchan),
xy=(17, 1179),
xycoords="axes points",
size=32,
ha="left",
va="top",
color="brown",
)
ax1.grid()
plt.tight_layout()
plt.savefig("rfi_plot.png")
except Exception as e:

View File

@ -21,8 +21,9 @@ import time
class top_block(gr.top_block):
def __init__(self, c_freq=1420000000, nbin=1000, nchan=1024, obs_time=60, samp_rate=2400000):
def __init__(
self, c_freq=1420000000, nbin=1000, nchan=1024, obs_time=60, samp_rate=2400000
):
gr.top_block.__init__(self, "Top Block")
##################################################
@ -37,32 +38,56 @@ class top_block(gr.top_block):
##################################################
# Variables
##################################################
self.sinc_sample_locations = sinc_sample_locations = np.arange(-np.pi*4/2.0, np.pi*4/2.0, np.pi/nchan)
self.sinc = sinc = np.sinc(sinc_sample_locations/np.pi)
self.custom_window = custom_window = sinc*np.hamming(4*nchan)
self.sinc_sample_locations = sinc_sample_locations = np.arange(
-np.pi * 4 / 2.0, np.pi * 4 / 2.0, np.pi / nchan
)
self.sinc = sinc = np.sinc(sinc_sample_locations / np.pi)
self.custom_window = custom_window = sinc * np.hamming(4 * nchan)
##################################################
# Blocks
##################################################
self.fft_vxx_0 = fft.fft_vcc(nchan, True, (window.blackmanharris(nchan)), True, 1)
self.blocks_stream_to_vector_0_2 = blocks.stream_to_vector(gr.sizeof_gr_complex*1, nchan)
self.blocks_stream_to_vector_0_1 = blocks.stream_to_vector(gr.sizeof_gr_complex*1, nchan)
self.blocks_stream_to_vector_0_0 = blocks.stream_to_vector(gr.sizeof_gr_complex*1, nchan)
self.blocks_stream_to_vector_0 = blocks.stream_to_vector(gr.sizeof_gr_complex*1, nchan)
self.blocks_multiply_const_vxx_0_0_0_0 = blocks.multiply_const_vcc((custom_window[0:nchan]))
self.blocks_multiply_const_vxx_0_0_0 = blocks.multiply_const_vcc((custom_window[nchan:2*nchan]))
self.blocks_multiply_const_vxx_0_0 = blocks.multiply_const_vcc((custom_window[2*nchan:3*nchan]))
self.blocks_multiply_const_vxx_0 = blocks.multiply_const_vcc((custom_window[-nchan:]))
self.fft_vxx_0 = fft.fft_vcc(
nchan, True, (window.blackmanharris(nchan)), True, 1
)
self.blocks_stream_to_vector_0_2 = blocks.stream_to_vector(
gr.sizeof_gr_complex * 1, nchan
)
self.blocks_stream_to_vector_0_1 = blocks.stream_to_vector(
gr.sizeof_gr_complex * 1, nchan
)
self.blocks_stream_to_vector_0_0 = blocks.stream_to_vector(
gr.sizeof_gr_complex * 1, nchan
)
self.blocks_stream_to_vector_0 = blocks.stream_to_vector(
gr.sizeof_gr_complex * 1, nchan
)
self.blocks_multiply_const_vxx_0_0_0_0 = blocks.multiply_const_vcc(
(custom_window[0:nchan])
)
self.blocks_multiply_const_vxx_0_0_0 = blocks.multiply_const_vcc(
(custom_window[nchan : 2 * nchan])
)
self.blocks_multiply_const_vxx_0_0 = blocks.multiply_const_vcc(
(custom_window[2 * nchan : 3 * nchan])
)
self.blocks_multiply_const_vxx_0 = blocks.multiply_const_vcc(
(custom_window[-nchan:])
)
self.blocks_integrate_xx_0 = blocks.integrate_ff(nbin, nchan)
self.blocks_head_0 = blocks.head(gr.sizeof_gr_complex*1, int(obs_time*samp_rate))
self.blocks_file_sink_0 = blocks.file_sink(gr.sizeof_float*nchan, 'observation.dat', True)
self.blocks_head_0 = blocks.head(
gr.sizeof_gr_complex * 1, int(obs_time * samp_rate)
)
self.blocks_file_sink_0 = blocks.file_sink(
gr.sizeof_float * nchan, "observation.dat", True
)
self.blocks_file_sink_0.set_unbuffered(False)
self.blocks_delay_0_1 = blocks.delay(gr.sizeof_gr_complex*1, nchan)
self.blocks_delay_0_0 = blocks.delay(gr.sizeof_gr_complex*1, nchan*2)
self.blocks_delay_0 = blocks.delay(gr.sizeof_gr_complex*1, nchan*3)
self.blocks_delay_0_1 = blocks.delay(gr.sizeof_gr_complex * 1, nchan)
self.blocks_delay_0_0 = blocks.delay(gr.sizeof_gr_complex * 1, nchan * 2)
self.blocks_delay_0 = blocks.delay(gr.sizeof_gr_complex * 1, nchan * 3)
self.blocks_complex_to_mag_squared_0 = blocks.complex_to_mag_squared(nchan)
self.blocks_add_xx_0 = blocks.add_vcc(nchan)
self.RTL820T = osmosdr.source( args="numchan=" + str(1) + " " + '' )
self.RTL820T = osmosdr.source(args="numchan=" + str(1) + " " + "")
self.RTL820T.set_sample_rate(samp_rate)
self.RTL820T.set_center_freq(c_freq, 0)
self.RTL820T.set_freq_corr(0, 0)
@ -72,16 +97,17 @@ class top_block(gr.top_block):
self.RTL820T.set_gain(10, 0)
self.RTL820T.set_if_gain(10, 0)
self.RTL820T.set_bb_gain(10, 0)
self.RTL820T.set_antenna('', 0)
self.RTL820T.set_antenna("", 0)
self.RTL820T.set_bandwidth(0, 0)
##################################################
# Connections
##################################################
self.connect((self.RTL820T, 0), (self.blocks_head_0, 0))
self.connect((self.blocks_add_xx_0, 0), (self.fft_vxx_0, 0))
self.connect((self.blocks_complex_to_mag_squared_0, 0), (self.blocks_integrate_xx_0, 0))
self.connect(
(self.blocks_complex_to_mag_squared_0, 0), (self.blocks_integrate_xx_0, 0)
)
self.connect((self.blocks_delay_0, 0), (self.blocks_stream_to_vector_0_2, 0))
self.connect((self.blocks_delay_0_0, 0), (self.blocks_stream_to_vector_0_0, 0))
self.connect((self.blocks_delay_0_1, 0), (self.blocks_stream_to_vector_0_1, 0))
@ -92,12 +118,27 @@ class top_block(gr.top_block):
self.connect((self.blocks_integrate_xx_0, 0), (self.blocks_file_sink_0, 0))
self.connect((self.blocks_multiply_const_vxx_0, 0), (self.blocks_add_xx_0, 0))
self.connect((self.blocks_multiply_const_vxx_0_0, 0), (self.blocks_add_xx_0, 1))
self.connect((self.blocks_multiply_const_vxx_0_0_0, 0), (self.blocks_add_xx_0, 2))
self.connect((self.blocks_multiply_const_vxx_0_0_0_0, 0), (self.blocks_add_xx_0, 3))
self.connect((self.blocks_stream_to_vector_0, 0), (self.blocks_multiply_const_vxx_0, 0))
self.connect((self.blocks_stream_to_vector_0_0, 0), (self.blocks_multiply_const_vxx_0_0_0, 0))
self.connect((self.blocks_stream_to_vector_0_1, 0), (self.blocks_multiply_const_vxx_0_0, 0))
self.connect((self.blocks_stream_to_vector_0_2, 0), (self.blocks_multiply_const_vxx_0_0_0_0, 0))
self.connect(
(self.blocks_multiply_const_vxx_0_0_0, 0), (self.blocks_add_xx_0, 2)
)
self.connect(
(self.blocks_multiply_const_vxx_0_0_0_0, 0), (self.blocks_add_xx_0, 3)
)
self.connect(
(self.blocks_stream_to_vector_0, 0), (self.blocks_multiply_const_vxx_0, 0)
)
self.connect(
(self.blocks_stream_to_vector_0_0, 0),
(self.blocks_multiply_const_vxx_0_0_0, 0),
)
self.connect(
(self.blocks_stream_to_vector_0_1, 0),
(self.blocks_multiply_const_vxx_0_0, 0),
)
self.connect(
(self.blocks_stream_to_vector_0_2, 0),
(self.blocks_multiply_const_vxx_0_0_0_0, 0),
)
self.connect((self.fft_vxx_0, 0), (self.blocks_complex_to_mag_squared_0, 0))
def get_c_freq(self):
@ -118,29 +159,37 @@ class top_block(gr.top_block):
def set_nchan(self, nchan):
self.nchan = nchan
self.set_custom_window(self.sinc*np.hamming(4*self.nchan))
self.set_sinc_sample_locations(np.arange(-np.pi*4/2.0, np.pi*4/2.0, np.pi/self.nchan))
self.blocks_multiply_const_vxx_0_0_0_0.set_k((self.custom_window[0:self.nchan]))
self.blocks_multiply_const_vxx_0_0_0.set_k((self.custom_window[self.nchan:2*self.nchan]))
self.blocks_multiply_const_vxx_0_0.set_k((self.custom_window[2*self.nchan:3*self.nchan]))
self.blocks_multiply_const_vxx_0.set_k((self.custom_window[-self.nchan:]))
self.set_custom_window(self.sinc * np.hamming(4 * self.nchan))
self.set_sinc_sample_locations(
np.arange(-np.pi * 4 / 2.0, np.pi * 4 / 2.0, np.pi / self.nchan)
)
self.blocks_multiply_const_vxx_0_0_0_0.set_k(
(self.custom_window[0 : self.nchan])
)
self.blocks_multiply_const_vxx_0_0_0.set_k(
(self.custom_window[self.nchan : 2 * self.nchan])
)
self.blocks_multiply_const_vxx_0_0.set_k(
(self.custom_window[2 * self.nchan : 3 * self.nchan])
)
self.blocks_multiply_const_vxx_0.set_k((self.custom_window[-self.nchan :]))
self.blocks_delay_0_1.set_dly(self.nchan)
self.blocks_delay_0_0.set_dly(self.nchan*2)
self.blocks_delay_0.set_dly(self.nchan*3)
self.blocks_delay_0_0.set_dly(self.nchan * 2)
self.blocks_delay_0.set_dly(self.nchan * 3)
def get_obs_time(self):
return self.obs_time
def set_obs_time(self, obs_time):
self.obs_time = obs_time
self.blocks_head_0.set_length(int(self.obs_time*self.samp_rate))
self.blocks_head_0.set_length(int(self.obs_time * self.samp_rate))
def get_samp_rate(self):
return self.samp_rate
def set_samp_rate(self, samp_rate):
self.samp_rate = samp_rate
self.blocks_head_0.set_length(int(self.obs_time*self.samp_rate))
self.blocks_head_0.set_length(int(self.obs_time * self.samp_rate))
self.RTL820T.set_sample_rate(self.samp_rate)
def get_sinc_sample_locations(self):
@ -148,44 +197,75 @@ class top_block(gr.top_block):
def set_sinc_sample_locations(self, sinc_sample_locations):
self.sinc_sample_locations = sinc_sample_locations
self.set_sinc(np.sinc(self.sinc_sample_locations/np.pi))
self.set_sinc(np.sinc(self.sinc_sample_locations / np.pi))
def get_sinc(self):
return self.sinc
def set_sinc(self, sinc):
self.sinc = sinc
self.set_custom_window(self.sinc*np.hamming(4*self.nchan))
self.set_sinc(np.sinc(self.sinc_sample_locations/np.pi))
self.set_custom_window(self.sinc * np.hamming(4 * self.nchan))
self.set_sinc(np.sinc(self.sinc_sample_locations / np.pi))
def get_custom_window(self):
return self.custom_window
def set_custom_window(self, custom_window):
self.custom_window = custom_window
self.blocks_multiply_const_vxx_0_0_0_0.set_k((self.custom_window[0:self.nchan]))
self.blocks_multiply_const_vxx_0_0_0.set_k((self.custom_window[self.nchan:2*self.nchan]))
self.blocks_multiply_const_vxx_0_0.set_k((self.custom_window[2*self.nchan:3*self.nchan]))
self.blocks_multiply_const_vxx_0.set_k((self.custom_window[-self.nchan:]))
self.blocks_multiply_const_vxx_0_0_0_0.set_k(
(self.custom_window[0 : self.nchan])
)
self.blocks_multiply_const_vxx_0_0_0.set_k(
(self.custom_window[self.nchan : 2 * self.nchan])
)
self.blocks_multiply_const_vxx_0_0.set_k(
(self.custom_window[2 * self.nchan : 3 * self.nchan])
)
self.blocks_multiply_const_vxx_0.set_k((self.custom_window[-self.nchan :]))
def argument_parser():
parser = OptionParser(usage="%prog: [options]", option_class=eng_option)
parser.add_option(
"", "--c-freq", dest="c_freq", type="eng_float", default=eng_notation.num_to_str(1420000000),
help="Set c_freq [default=%default]")
"",
"--c-freq",
dest="c_freq",
type="eng_float",
default=eng_notation.num_to_str(1420000000),
help="Set c_freq [default=%default]",
)
parser.add_option(
"", "--nbin", dest="nbin", type="intx", default=1000,
help="Set nbin [default=%default]")
"",
"--nbin",
dest="nbin",
type="intx",
default=1000,
help="Set nbin [default=%default]",
)
parser.add_option(
"", "--nchan", dest="nchan", type="intx", default=1024,
help="Set nchan [default=%default]")
"",
"--nchan",
dest="nchan",
type="intx",
default=1024,
help="Set nchan [default=%default]",
)
parser.add_option(
"", "--obs-time", dest="obs_time", type="eng_float", default=eng_notation.num_to_str(60),
help="Set obs_time [default=%default]")
"",
"--obs-time",
dest="obs_time",
type="eng_float",
default=eng_notation.num_to_str(60),
help="Set obs_time [default=%default]",
)
parser.add_option(
"", "--samp-rate", dest="samp_rate", type="eng_float", default=eng_notation.num_to_str(2400000),
help="Set samp_rate [default=%default]")
"",
"--samp-rate",
dest="samp_rate",
type="eng_float",
default=eng_notation.num_to_str(2400000),
help="Set samp_rate [default=%default]",
)
return parser
@ -193,10 +273,16 @@ def main(top_block_cls=top_block, options=None):
if options is None:
options, _ = argument_parser().parse_args()
tb = top_block_cls(c_freq=options.c_freq, nbin=options.nbin, nchan=options.nchan, obs_time=options.obs_time, samp_rate=options.samp_rate)
tb = top_block_cls(
c_freq=options.c_freq,
nbin=options.nbin,
nchan=options.nchan,
obs_time=options.obs_time,
samp_rate=options.samp_rate,
)
tb.start()
tb.wait()
if __name__ == '__main__':
if __name__ == "__main__":
main()

View File

@ -21,8 +21,9 @@ import time
class top_block(gr.top_block):
def __init__(self, c_freq=1420000000, nbin=1000, nchan=1024, obs_time=60, samp_rate=2400000):
def __init__(
self, c_freq=1420000000, nbin=1000, nchan=1024, obs_time=60, samp_rate=2400000
):
gr.top_block.__init__(self, "Top Block")
##################################################
@ -37,32 +38,56 @@ class top_block(gr.top_block):
##################################################
# Variables
##################################################
self.sinc_sample_locations = sinc_sample_locations = np.arange(-np.pi*4/2.0, np.pi*4/2.0, np.pi/nchan)
self.sinc = sinc = np.sinc(sinc_sample_locations/np.pi)
self.custom_window = custom_window = sinc*np.hamming(4*nchan)
self.sinc_sample_locations = sinc_sample_locations = np.arange(
-np.pi * 4 / 2.0, np.pi * 4 / 2.0, np.pi / nchan
)
self.sinc = sinc = np.sinc(sinc_sample_locations / np.pi)
self.custom_window = custom_window = sinc * np.hamming(4 * nchan)
##################################################
# Blocks
##################################################
self.fft_vxx_0 = fft.fft_vcc(nchan, True, (window.blackmanharris(nchan)), True, 1)
self.blocks_stream_to_vector_0_2 = blocks.stream_to_vector(gr.sizeof_gr_complex*1, nchan)
self.blocks_stream_to_vector_0_1 = blocks.stream_to_vector(gr.sizeof_gr_complex*1, nchan)
self.blocks_stream_to_vector_0_0 = blocks.stream_to_vector(gr.sizeof_gr_complex*1, nchan)
self.blocks_stream_to_vector_0 = blocks.stream_to_vector(gr.sizeof_gr_complex*1, nchan)
self.blocks_multiply_const_vxx_0_0_0_0 = blocks.multiply_const_vcc((custom_window[0:nchan]))
self.blocks_multiply_const_vxx_0_0_0 = blocks.multiply_const_vcc((custom_window[nchan:2*nchan]))
self.blocks_multiply_const_vxx_0_0 = blocks.multiply_const_vcc((custom_window[2*nchan:3*nchan]))
self.blocks_multiply_const_vxx_0 = blocks.multiply_const_vcc((custom_window[-nchan:]))
self.fft_vxx_0 = fft.fft_vcc(
nchan, True, (window.blackmanharris(nchan)), True, 1
)
self.blocks_stream_to_vector_0_2 = blocks.stream_to_vector(
gr.sizeof_gr_complex * 1, nchan
)
self.blocks_stream_to_vector_0_1 = blocks.stream_to_vector(
gr.sizeof_gr_complex * 1, nchan
)
self.blocks_stream_to_vector_0_0 = blocks.stream_to_vector(
gr.sizeof_gr_complex * 1, nchan
)
self.blocks_stream_to_vector_0 = blocks.stream_to_vector(
gr.sizeof_gr_complex * 1, nchan
)
self.blocks_multiply_const_vxx_0_0_0_0 = blocks.multiply_const_vcc(
(custom_window[0:nchan])
)
self.blocks_multiply_const_vxx_0_0_0 = blocks.multiply_const_vcc(
(custom_window[nchan : 2 * nchan])
)
self.blocks_multiply_const_vxx_0_0 = blocks.multiply_const_vcc(
(custom_window[2 * nchan : 3 * nchan])
)
self.blocks_multiply_const_vxx_0 = blocks.multiply_const_vcc(
(custom_window[-nchan:])
)
self.blocks_integrate_xx_0 = blocks.integrate_ff(nbin, nchan)
self.blocks_head_0 = blocks.head(gr.sizeof_gr_complex*1, int(obs_time*samp_rate))
self.blocks_file_sink_0 = blocks.file_sink(gr.sizeof_float*nchan, 'observation.dat', True)
self.blocks_head_0 = blocks.head(
gr.sizeof_gr_complex * 1, int(obs_time * samp_rate)
)
self.blocks_file_sink_0 = blocks.file_sink(
gr.sizeof_float * nchan, "observation.dat", True
)
self.blocks_file_sink_0.set_unbuffered(False)
self.blocks_delay_0_1 = blocks.delay(gr.sizeof_gr_complex*1, nchan)
self.blocks_delay_0_0 = blocks.delay(gr.sizeof_gr_complex*1, nchan*2)
self.blocks_delay_0 = blocks.delay(gr.sizeof_gr_complex*1, nchan*3)
self.blocks_delay_0_1 = blocks.delay(gr.sizeof_gr_complex * 1, nchan)
self.blocks_delay_0_0 = blocks.delay(gr.sizeof_gr_complex * 1, nchan * 2)
self.blocks_delay_0 = blocks.delay(gr.sizeof_gr_complex * 1, nchan * 3)
self.blocks_complex_to_mag_squared_0 = blocks.complex_to_mag_squared(nchan)
self.blocks_add_xx_0 = blocks.add_vcc(nchan)
self.RTL820T = osmosdr.source( args="numchan=" + str(1) + " " + '' )
self.RTL820T = osmosdr.source(args="numchan=" + str(1) + " " + "")
self.RTL820T.set_sample_rate(samp_rate)
self.RTL820T.set_center_freq(c_freq, 0)
self.RTL820T.set_freq_corr(0, 0)
@ -72,16 +97,17 @@ class top_block(gr.top_block):
self.RTL820T.set_gain(30, 0)
self.RTL820T.set_if_gain(30, 0)
self.RTL820T.set_bb_gain(30, 0)
self.RTL820T.set_antenna('', 0)
self.RTL820T.set_antenna("", 0)
self.RTL820T.set_bandwidth(0, 0)
##################################################
# Connections
##################################################
self.connect((self.RTL820T, 0), (self.blocks_head_0, 0))
self.connect((self.blocks_add_xx_0, 0), (self.fft_vxx_0, 0))
self.connect((self.blocks_complex_to_mag_squared_0, 0), (self.blocks_integrate_xx_0, 0))
self.connect(
(self.blocks_complex_to_mag_squared_0, 0), (self.blocks_integrate_xx_0, 0)
)
self.connect((self.blocks_delay_0, 0), (self.blocks_stream_to_vector_0_2, 0))
self.connect((self.blocks_delay_0_0, 0), (self.blocks_stream_to_vector_0_0, 0))
self.connect((self.blocks_delay_0_1, 0), (self.blocks_stream_to_vector_0_1, 0))
@ -92,12 +118,27 @@ class top_block(gr.top_block):
self.connect((self.blocks_integrate_xx_0, 0), (self.blocks_file_sink_0, 0))
self.connect((self.blocks_multiply_const_vxx_0, 0), (self.blocks_add_xx_0, 0))
self.connect((self.blocks_multiply_const_vxx_0_0, 0), (self.blocks_add_xx_0, 1))
self.connect((self.blocks_multiply_const_vxx_0_0_0, 0), (self.blocks_add_xx_0, 2))
self.connect((self.blocks_multiply_const_vxx_0_0_0_0, 0), (self.blocks_add_xx_0, 3))
self.connect((self.blocks_stream_to_vector_0, 0), (self.blocks_multiply_const_vxx_0, 0))
self.connect((self.blocks_stream_to_vector_0_0, 0), (self.blocks_multiply_const_vxx_0_0_0, 0))
self.connect((self.blocks_stream_to_vector_0_1, 0), (self.blocks_multiply_const_vxx_0_0, 0))
self.connect((self.blocks_stream_to_vector_0_2, 0), (self.blocks_multiply_const_vxx_0_0_0_0, 0))
self.connect(
(self.blocks_multiply_const_vxx_0_0_0, 0), (self.blocks_add_xx_0, 2)
)
self.connect(
(self.blocks_multiply_const_vxx_0_0_0_0, 0), (self.blocks_add_xx_0, 3)
)
self.connect(
(self.blocks_stream_to_vector_0, 0), (self.blocks_multiply_const_vxx_0, 0)
)
self.connect(
(self.blocks_stream_to_vector_0_0, 0),
(self.blocks_multiply_const_vxx_0_0_0, 0),
)
self.connect(
(self.blocks_stream_to_vector_0_1, 0),
(self.blocks_multiply_const_vxx_0_0, 0),
)
self.connect(
(self.blocks_stream_to_vector_0_2, 0),
(self.blocks_multiply_const_vxx_0_0_0_0, 0),
)
self.connect((self.fft_vxx_0, 0), (self.blocks_complex_to_mag_squared_0, 0))
def get_c_freq(self):
@ -118,29 +159,37 @@ class top_block(gr.top_block):
def set_nchan(self, nchan):
self.nchan = nchan
self.set_custom_window(self.sinc*np.hamming(4*self.nchan))
self.set_sinc_sample_locations(np.arange(-np.pi*4/2.0, np.pi*4/2.0, np.pi/self.nchan))
self.blocks_multiply_const_vxx_0_0_0_0.set_k((self.custom_window[0:self.nchan]))
self.blocks_multiply_const_vxx_0_0_0.set_k((self.custom_window[self.nchan:2*self.nchan]))
self.blocks_multiply_const_vxx_0_0.set_k((self.custom_window[2*self.nchan:3*self.nchan]))
self.blocks_multiply_const_vxx_0.set_k((self.custom_window[-self.nchan:]))
self.set_custom_window(self.sinc * np.hamming(4 * self.nchan))
self.set_sinc_sample_locations(
np.arange(-np.pi * 4 / 2.0, np.pi * 4 / 2.0, np.pi / self.nchan)
)
self.blocks_multiply_const_vxx_0_0_0_0.set_k(
(self.custom_window[0 : self.nchan])
)
self.blocks_multiply_const_vxx_0_0_0.set_k(
(self.custom_window[self.nchan : 2 * self.nchan])
)
self.blocks_multiply_const_vxx_0_0.set_k(
(self.custom_window[2 * self.nchan : 3 * self.nchan])
)
self.blocks_multiply_const_vxx_0.set_k((self.custom_window[-self.nchan :]))
self.blocks_delay_0_1.set_dly(self.nchan)
self.blocks_delay_0_0.set_dly(self.nchan*2)
self.blocks_delay_0.set_dly(self.nchan*3)
self.blocks_delay_0_0.set_dly(self.nchan * 2)
self.blocks_delay_0.set_dly(self.nchan * 3)
def get_obs_time(self):
return self.obs_time
def set_obs_time(self, obs_time):
self.obs_time = obs_time
self.blocks_head_0.set_length(int(self.obs_time*self.samp_rate))
self.blocks_head_0.set_length(int(self.obs_time * self.samp_rate))
def get_samp_rate(self):
return self.samp_rate
def set_samp_rate(self, samp_rate):
self.samp_rate = samp_rate
self.blocks_head_0.set_length(int(self.obs_time*self.samp_rate))
self.blocks_head_0.set_length(int(self.obs_time * self.samp_rate))
self.RTL820T.set_sample_rate(self.samp_rate)
def get_sinc_sample_locations(self):
@ -148,44 +197,75 @@ class top_block(gr.top_block):
def set_sinc_sample_locations(self, sinc_sample_locations):
self.sinc_sample_locations = sinc_sample_locations
self.set_sinc(np.sinc(self.sinc_sample_locations/np.pi))
self.set_sinc(np.sinc(self.sinc_sample_locations / np.pi))
def get_sinc(self):
return self.sinc
def set_sinc(self, sinc):
self.sinc = sinc
self.set_custom_window(self.sinc*np.hamming(4*self.nchan))
self.set_sinc(np.sinc(self.sinc_sample_locations/np.pi))
self.set_custom_window(self.sinc * np.hamming(4 * self.nchan))
self.set_sinc(np.sinc(self.sinc_sample_locations / np.pi))
def get_custom_window(self):
return self.custom_window
def set_custom_window(self, custom_window):
self.custom_window = custom_window
self.blocks_multiply_const_vxx_0_0_0_0.set_k((self.custom_window[0:self.nchan]))
self.blocks_multiply_const_vxx_0_0_0.set_k((self.custom_window[self.nchan:2*self.nchan]))
self.blocks_multiply_const_vxx_0_0.set_k((self.custom_window[2*self.nchan:3*self.nchan]))
self.blocks_multiply_const_vxx_0.set_k((self.custom_window[-self.nchan:]))
self.blocks_multiply_const_vxx_0_0_0_0.set_k(
(self.custom_window[0 : self.nchan])
)
self.blocks_multiply_const_vxx_0_0_0.set_k(
(self.custom_window[self.nchan : 2 * self.nchan])
)
self.blocks_multiply_const_vxx_0_0.set_k(
(self.custom_window[2 * self.nchan : 3 * self.nchan])
)
self.blocks_multiply_const_vxx_0.set_k((self.custom_window[-self.nchan :]))
def argument_parser():
parser = OptionParser(usage="%prog: [options]", option_class=eng_option)
parser.add_option(
"", "--c-freq", dest="c_freq", type="eng_float", default=eng_notation.num_to_str(1420000000),
help="Set c_freq [default=%default]")
"",
"--c-freq",
dest="c_freq",
type="eng_float",
default=eng_notation.num_to_str(1420000000),
help="Set c_freq [default=%default]",
)
parser.add_option(
"", "--nbin", dest="nbin", type="intx", default=1000,
help="Set nbin [default=%default]")
"",
"--nbin",
dest="nbin",
type="intx",
default=1000,
help="Set nbin [default=%default]",
)
parser.add_option(
"", "--nchan", dest="nchan", type="intx", default=1024,
help="Set nchan [default=%default]")
"",
"--nchan",
dest="nchan",
type="intx",
default=1024,
help="Set nchan [default=%default]",
)
parser.add_option(
"", "--obs-time", dest="obs_time", type="eng_float", default=eng_notation.num_to_str(60),
help="Set obs_time [default=%default]")
"",
"--obs-time",
dest="obs_time",
type="eng_float",
default=eng_notation.num_to_str(60),
help="Set obs_time [default=%default]",
)
parser.add_option(
"", "--samp-rate", dest="samp_rate", type="eng_float", default=eng_notation.num_to_str(2400000),
help="Set samp_rate [default=%default]")
"",
"--samp-rate",
dest="samp_rate",
type="eng_float",
default=eng_notation.num_to_str(2400000),
help="Set samp_rate [default=%default]",
)
return parser
@ -193,10 +273,16 @@ def main(top_block_cls=top_block, options=None):
if options is None:
options, _ = argument_parser().parse_args()
tb = top_block_cls(c_freq=options.c_freq, nbin=options.nbin, nchan=options.nchan, obs_time=options.obs_time, samp_rate=options.samp_rate)
tb = top_block_cls(
c_freq=options.c_freq,
nbin=options.nbin,
nchan=options.nchan,
obs_time=options.obs_time,
samp_rate=options.samp_rate,
)
tb.start()
tb.wait()
if __name__ == '__main__':
if __name__ == "__main__":
main()

View File

@ -21,8 +21,9 @@ import time
class top_block(gr.top_block):
def __init__(self, c_freq=1420000000, nbin=1000, nchan=1024, obs_time=60, samp_rate=2400000):
def __init__(
self, c_freq=1420000000, nbin=1000, nchan=1024, obs_time=60, samp_rate=2400000
):
gr.top_block.__init__(self, "Top Block")
##################################################
@ -37,32 +38,56 @@ class top_block(gr.top_block):
##################################################
# Variables
##################################################
self.sinc_sample_locations = sinc_sample_locations = np.arange(-np.pi*4/2.0, np.pi*4/2.0, np.pi/nchan)
self.sinc = sinc = np.sinc(sinc_sample_locations/np.pi)
self.custom_window = custom_window = sinc*np.hamming(4*nchan)
self.sinc_sample_locations = sinc_sample_locations = np.arange(
-np.pi * 4 / 2.0, np.pi * 4 / 2.0, np.pi / nchan
)
self.sinc = sinc = np.sinc(sinc_sample_locations / np.pi)
self.custom_window = custom_window = sinc * np.hamming(4 * nchan)
##################################################
# Blocks
##################################################
self.fft_vxx_0 = fft.fft_vcc(nchan, True, (window.blackmanharris(nchan)), True, 1)
self.blocks_stream_to_vector_0_2 = blocks.stream_to_vector(gr.sizeof_gr_complex*1, nchan)
self.blocks_stream_to_vector_0_1 = blocks.stream_to_vector(gr.sizeof_gr_complex*1, nchan)
self.blocks_stream_to_vector_0_0 = blocks.stream_to_vector(gr.sizeof_gr_complex*1, nchan)
self.blocks_stream_to_vector_0 = blocks.stream_to_vector(gr.sizeof_gr_complex*1, nchan)
self.blocks_multiply_const_vxx_0_0_0_0 = blocks.multiply_const_vcc((custom_window[0:nchan]))
self.blocks_multiply_const_vxx_0_0_0 = blocks.multiply_const_vcc((custom_window[nchan:2*nchan]))
self.blocks_multiply_const_vxx_0_0 = blocks.multiply_const_vcc((custom_window[2*nchan:3*nchan]))
self.blocks_multiply_const_vxx_0 = blocks.multiply_const_vcc((custom_window[-nchan:]))
self.fft_vxx_0 = fft.fft_vcc(
nchan, True, (window.blackmanharris(nchan)), True, 1
)
self.blocks_stream_to_vector_0_2 = blocks.stream_to_vector(
gr.sizeof_gr_complex * 1, nchan
)
self.blocks_stream_to_vector_0_1 = blocks.stream_to_vector(
gr.sizeof_gr_complex * 1, nchan
)
self.blocks_stream_to_vector_0_0 = blocks.stream_to_vector(
gr.sizeof_gr_complex * 1, nchan
)
self.blocks_stream_to_vector_0 = blocks.stream_to_vector(
gr.sizeof_gr_complex * 1, nchan
)
self.blocks_multiply_const_vxx_0_0_0_0 = blocks.multiply_const_vcc(
(custom_window[0:nchan])
)
self.blocks_multiply_const_vxx_0_0_0 = blocks.multiply_const_vcc(
(custom_window[nchan : 2 * nchan])
)
self.blocks_multiply_const_vxx_0_0 = blocks.multiply_const_vcc(
(custom_window[2 * nchan : 3 * nchan])
)
self.blocks_multiply_const_vxx_0 = blocks.multiply_const_vcc(
(custom_window[-nchan:])
)
self.blocks_integrate_xx_0 = blocks.integrate_ff(nbin, nchan)
self.blocks_head_0 = blocks.head(gr.sizeof_gr_complex*1, int(obs_time*samp_rate))
self.blocks_file_sink_0 = blocks.file_sink(gr.sizeof_float*nchan, 'observation.dat', True)
self.blocks_head_0 = blocks.head(
gr.sizeof_gr_complex * 1, int(obs_time * samp_rate)
)
self.blocks_file_sink_0 = blocks.file_sink(
gr.sizeof_float * nchan, "observation.dat", True
)
self.blocks_file_sink_0.set_unbuffered(False)
self.blocks_delay_0_1 = blocks.delay(gr.sizeof_gr_complex*1, nchan)
self.blocks_delay_0_0 = blocks.delay(gr.sizeof_gr_complex*1, nchan*2)
self.blocks_delay_0 = blocks.delay(gr.sizeof_gr_complex*1, nchan*3)
self.blocks_delay_0_1 = blocks.delay(gr.sizeof_gr_complex * 1, nchan)
self.blocks_delay_0_0 = blocks.delay(gr.sizeof_gr_complex * 1, nchan * 2)
self.blocks_delay_0 = blocks.delay(gr.sizeof_gr_complex * 1, nchan * 3)
self.blocks_complex_to_mag_squared_0 = blocks.complex_to_mag_squared(nchan)
self.blocks_add_xx_0 = blocks.add_vcc(nchan)
self.RTL820T = osmosdr.source( args="numchan=" + str(1) + " " + '' )
self.RTL820T = osmosdr.source(args="numchan=" + str(1) + " " + "")
self.RTL820T.set_sample_rate(samp_rate)
self.RTL820T.set_center_freq(c_freq, 0)
self.RTL820T.set_freq_corr(0, 0)
@ -72,16 +97,17 @@ class top_block(gr.top_block):
self.RTL820T.set_gain(10, 0)
self.RTL820T.set_if_gain(10, 0)
self.RTL820T.set_bb_gain(10, 0)
self.RTL820T.set_antenna('', 0)
self.RTL820T.set_antenna("", 0)
self.RTL820T.set_bandwidth(0, 0)
##################################################
# Connections
##################################################
self.connect((self.RTL820T, 0), (self.blocks_head_0, 0))
self.connect((self.blocks_add_xx_0, 0), (self.fft_vxx_0, 0))
self.connect((self.blocks_complex_to_mag_squared_0, 0), (self.blocks_integrate_xx_0, 0))
self.connect(
(self.blocks_complex_to_mag_squared_0, 0), (self.blocks_integrate_xx_0, 0)
)
self.connect((self.blocks_delay_0, 0), (self.blocks_stream_to_vector_0_2, 0))
self.connect((self.blocks_delay_0_0, 0), (self.blocks_stream_to_vector_0_0, 0))
self.connect((self.blocks_delay_0_1, 0), (self.blocks_stream_to_vector_0_1, 0))
@ -92,12 +118,27 @@ class top_block(gr.top_block):
self.connect((self.blocks_integrate_xx_0, 0), (self.blocks_file_sink_0, 0))
self.connect((self.blocks_multiply_const_vxx_0, 0), (self.blocks_add_xx_0, 0))
self.connect((self.blocks_multiply_const_vxx_0_0, 0), (self.blocks_add_xx_0, 1))
self.connect((self.blocks_multiply_const_vxx_0_0_0, 0), (self.blocks_add_xx_0, 2))
self.connect((self.blocks_multiply_const_vxx_0_0_0_0, 0), (self.blocks_add_xx_0, 3))
self.connect((self.blocks_stream_to_vector_0, 0), (self.blocks_multiply_const_vxx_0, 0))
self.connect((self.blocks_stream_to_vector_0_0, 0), (self.blocks_multiply_const_vxx_0_0_0, 0))
self.connect((self.blocks_stream_to_vector_0_1, 0), (self.blocks_multiply_const_vxx_0_0, 0))
self.connect((self.blocks_stream_to_vector_0_2, 0), (self.blocks_multiply_const_vxx_0_0_0_0, 0))
self.connect(
(self.blocks_multiply_const_vxx_0_0_0, 0), (self.blocks_add_xx_0, 2)
)
self.connect(
(self.blocks_multiply_const_vxx_0_0_0_0, 0), (self.blocks_add_xx_0, 3)
)
self.connect(
(self.blocks_stream_to_vector_0, 0), (self.blocks_multiply_const_vxx_0, 0)
)
self.connect(
(self.blocks_stream_to_vector_0_0, 0),
(self.blocks_multiply_const_vxx_0_0_0, 0),
)
self.connect(
(self.blocks_stream_to_vector_0_1, 0),
(self.blocks_multiply_const_vxx_0_0, 0),
)
self.connect(
(self.blocks_stream_to_vector_0_2, 0),
(self.blocks_multiply_const_vxx_0_0_0_0, 0),
)
self.connect((self.fft_vxx_0, 0), (self.blocks_complex_to_mag_squared_0, 0))
def get_c_freq(self):
@ -118,29 +159,37 @@ class top_block(gr.top_block):
def set_nchan(self, nchan):
self.nchan = nchan
self.set_custom_window(self.sinc*np.hamming(4*self.nchan))
self.set_sinc_sample_locations(np.arange(-np.pi*4/2.0, np.pi*4/2.0, np.pi/self.nchan))
self.blocks_multiply_const_vxx_0_0_0_0.set_k((self.custom_window[0:self.nchan]))
self.blocks_multiply_const_vxx_0_0_0.set_k((self.custom_window[self.nchan:2*self.nchan]))
self.blocks_multiply_const_vxx_0_0.set_k((self.custom_window[2*self.nchan:3*self.nchan]))
self.blocks_multiply_const_vxx_0.set_k((self.custom_window[-self.nchan:]))
self.set_custom_window(self.sinc * np.hamming(4 * self.nchan))
self.set_sinc_sample_locations(
np.arange(-np.pi * 4 / 2.0, np.pi * 4 / 2.0, np.pi / self.nchan)
)
self.blocks_multiply_const_vxx_0_0_0_0.set_k(
(self.custom_window[0 : self.nchan])
)
self.blocks_multiply_const_vxx_0_0_0.set_k(
(self.custom_window[self.nchan : 2 * self.nchan])
)
self.blocks_multiply_const_vxx_0_0.set_k(
(self.custom_window[2 * self.nchan : 3 * self.nchan])
)
self.blocks_multiply_const_vxx_0.set_k((self.custom_window[-self.nchan :]))
self.blocks_delay_0_1.set_dly(self.nchan)
self.blocks_delay_0_0.set_dly(self.nchan*2)
self.blocks_delay_0.set_dly(self.nchan*3)
self.blocks_delay_0_0.set_dly(self.nchan * 2)
self.blocks_delay_0.set_dly(self.nchan * 3)
def get_obs_time(self):
return self.obs_time
def set_obs_time(self, obs_time):
self.obs_time = obs_time
self.blocks_head_0.set_length(int(self.obs_time*self.samp_rate))
self.blocks_head_0.set_length(int(self.obs_time * self.samp_rate))
def get_samp_rate(self):
return self.samp_rate
def set_samp_rate(self, samp_rate):
self.samp_rate = samp_rate
self.blocks_head_0.set_length(int(self.obs_time*self.samp_rate))
self.blocks_head_0.set_length(int(self.obs_time * self.samp_rate))
self.RTL820T.set_sample_rate(self.samp_rate)
def get_sinc_sample_locations(self):
@ -148,44 +197,75 @@ class top_block(gr.top_block):
def set_sinc_sample_locations(self, sinc_sample_locations):
self.sinc_sample_locations = sinc_sample_locations
self.set_sinc(np.sinc(self.sinc_sample_locations/np.pi))
self.set_sinc(np.sinc(self.sinc_sample_locations / np.pi))
def get_sinc(self):
return self.sinc
def set_sinc(self, sinc):
self.sinc = sinc
self.set_custom_window(self.sinc*np.hamming(4*self.nchan))
self.set_sinc(np.sinc(self.sinc_sample_locations/np.pi))
self.set_custom_window(self.sinc * np.hamming(4 * self.nchan))
self.set_sinc(np.sinc(self.sinc_sample_locations / np.pi))
def get_custom_window(self):
return self.custom_window
def set_custom_window(self, custom_window):
self.custom_window = custom_window
self.blocks_multiply_const_vxx_0_0_0_0.set_k((self.custom_window[0:self.nchan]))
self.blocks_multiply_const_vxx_0_0_0.set_k((self.custom_window[self.nchan:2*self.nchan]))
self.blocks_multiply_const_vxx_0_0.set_k((self.custom_window[2*self.nchan:3*self.nchan]))
self.blocks_multiply_const_vxx_0.set_k((self.custom_window[-self.nchan:]))
self.blocks_multiply_const_vxx_0_0_0_0.set_k(
(self.custom_window[0 : self.nchan])
)
self.blocks_multiply_const_vxx_0_0_0.set_k(
(self.custom_window[self.nchan : 2 * self.nchan])
)
self.blocks_multiply_const_vxx_0_0.set_k(
(self.custom_window[2 * self.nchan : 3 * self.nchan])
)
self.blocks_multiply_const_vxx_0.set_k((self.custom_window[-self.nchan :]))
def argument_parser():
parser = OptionParser(usage="%prog: [options]", option_class=eng_option)
parser.add_option(
"", "--c-freq", dest="c_freq", type="eng_float", default=eng_notation.num_to_str(1420000000),
help="Set c_freq [default=%default]")
"",
"--c-freq",
dest="c_freq",
type="eng_float",
default=eng_notation.num_to_str(1420000000),
help="Set c_freq [default=%default]",
)
parser.add_option(
"", "--nbin", dest="nbin", type="intx", default=1000,
help="Set nbin [default=%default]")
"",
"--nbin",
dest="nbin",
type="intx",
default=1000,
help="Set nbin [default=%default]",
)
parser.add_option(
"", "--nchan", dest="nchan", type="intx", default=1024,
help="Set nchan [default=%default]")
"",
"--nchan",
dest="nchan",
type="intx",
default=1024,
help="Set nchan [default=%default]",
)
parser.add_option(
"", "--obs-time", dest="obs_time", type="eng_float", default=eng_notation.num_to_str(60),
help="Set obs_time [default=%default]")
"",
"--obs-time",
dest="obs_time",
type="eng_float",
default=eng_notation.num_to_str(60),
help="Set obs_time [default=%default]",
)
parser.add_option(
"", "--samp-rate", dest="samp_rate", type="eng_float", default=eng_notation.num_to_str(2400000),
help="Set samp_rate [default=%default]")
"",
"--samp-rate",
dest="samp_rate",
type="eng_float",
default=eng_notation.num_to_str(2400000),
help="Set samp_rate [default=%default]",
)
return parser
@ -193,10 +273,16 @@ def main(top_block_cls=top_block, options=None):
if options is None:
options, _ = argument_parser().parse_args()
tb = top_block_cls(c_freq=options.c_freq, nbin=options.nbin, nchan=options.nchan, obs_time=options.obs_time, samp_rate=options.samp_rate)
tb = top_block_cls(
c_freq=options.c_freq,
nbin=options.nbin,
nchan=options.nchan,
obs_time=options.obs_time,
samp_rate=options.samp_rate,
)
tb.start()
tb.wait()
if __name__ == '__main__':
if __name__ == "__main__":
main()