Add OPCUA demo
parent
b661ecbdc9
commit
3114dbb601
File diff suppressed because one or more lines are too long
|
@ -0,0 +1,52 @@
|
|||
import logging
|
||||
|
||||
from opcua import ua, uamethod, Server, Client
|
||||
import numpy as np
|
||||
|
||||
DEFAULT_URI = "http://lofar.eu"
|
||||
NUM_RCU = 96
|
||||
|
||||
|
||||
def download_xst(subband: int, integration_time_s: int, url: str = 'localhost', port: int = 50000):
|
||||
"""
|
||||
Download cross correlation statistics
|
||||
|
||||
Args:
|
||||
subband (int): Subband number
|
||||
integration_time_s (int): Integration time in seconds
|
||||
url (str): URL to connect to, defaults to 'localhost'
|
||||
port (str): Port to connect to, defaults to 50000
|
||||
|
||||
Returns:
|
||||
Tuple[datetime.datetime, np.ndarray, int]: UTC time, visibilities (shape n_ant x n_ant),
|
||||
RCU mode
|
||||
|
||||
Raises:
|
||||
RuntimeError: if in mixed RCU mode
|
||||
"""
|
||||
client = Client("opc.tcp://{}:{}/".format(url, port), timeout=1000)
|
||||
client.connect()
|
||||
client.load_type_definitions()
|
||||
objects = client.get_objects_node()
|
||||
idx = client.get_namespace_index(DEFAULT_URI)
|
||||
obj = client.get_root_node().get_child(["0:Objects",
|
||||
"{}:StationMetrics".format(idx),
|
||||
"{}:RCU".format(idx)])
|
||||
|
||||
obstime, visibilities_opc, rcu_modes = obj.call_method("{}:record_cross".format(idx), subband, integration_time_s)
|
||||
|
||||
client.close_session()
|
||||
client.close_secure_channel()
|
||||
|
||||
rcu_modes_on = set([mode for mode in rcu_modes if mode != '0'])
|
||||
if len(rcu_modes_on) == 1:
|
||||
rcu_mode = int(rcu_modes_on.pop())
|
||||
elif len(rcu_modes_on) == 0:
|
||||
rcu_mode = 0
|
||||
else:
|
||||
raise RuntimeError("Multiple nonzero RCU modes are used, that's not supported yet")
|
||||
|
||||
assert(len(visibilities_opc) == 2) # Real and complex part
|
||||
visibilities = np.array(visibilities_opc)[0] + 1j * np.array(visibilities_opc[1])
|
||||
|
||||
return obstime, visibilities, rcu_mode
|
|
@ -117,7 +117,7 @@ def find_caltable(field_name: str, rcu_mode: Union[str, int], caltable_dir='calt
|
|||
Find the file of a caltable.
|
||||
|
||||
Args:
|
||||
field_name: Name of the antenna field, e.g. 'DE602LBA'
|
||||
field_name: Name of the antenna field, e.g. 'DE602LBA' or 'DE602'
|
||||
rcu_mode: Receiver mode for which the calibration table is requested.
|
||||
Probably should be 'inner' or 'outer'
|
||||
caltable_dir: Root directory under which station information is stored in
|
||||
|
@ -137,15 +137,15 @@ def find_caltable(field_name: str, rcu_mode: Union[str, int], caltable_dir='calt
|
|||
|
||||
filename = f"CalTable-{station_number}"
|
||||
|
||||
if str(rcu_mode) in ('outer', '1', '2') and 'LBA' in field_name:
|
||||
if str(rcu_mode) in ('outer', '1', '2'):
|
||||
filename += "-LBA_OUTER-10_90.dat"
|
||||
elif str(rcu_mode) in ('inner', '3', '4') and 'LBA' in field_name:
|
||||
elif str(rcu_mode) in ('inner', '3', '4'):
|
||||
filename += "-LBA_INNER-10_90.dat"
|
||||
elif str(rcu_mode) == '5' and 'HBA' in field_name:
|
||||
elif str(rcu_mode) == '5':
|
||||
filename += "-HBA-110_190.dat"
|
||||
elif str(rcu_mode) == '6' and 'HBA' in field_name:
|
||||
elif str(rcu_mode) == '6':
|
||||
filename += "-HBA-170_230.dat"
|
||||
elif str(rcu_mode) == '7' and 'HBA' in field_name:
|
||||
elif str(rcu_mode) == '7':
|
||||
filename += "-HBA-210_250.dat"
|
||||
else:
|
||||
raise RuntimeError("Unexpected mode: " + str(rcu_mode) + " for field_name " + str(field_name))
|
||||
|
|
Loading…
Reference in New Issue