Athena getNetworks method (#21597)
* add iwlist scan * return using athena * add lte * add last one too * unused * add release files * more compact * typo * remove debug code * different file * array * rebase mistakepull/21599/head
parent
cbae2899df
commit
a7aa22253b
|
@ -278,6 +278,7 @@ selfdrive/hardware/eon/hardware.py
|
|||
selfdrive/hardware/tici/__init__.py
|
||||
selfdrive/hardware/tici/hardware.py
|
||||
selfdrive/hardware/tici/amplifier.py
|
||||
selfdrive/hardware/tici/iwlist.py
|
||||
selfdrive/hardware/pc/__init__.py
|
||||
selfdrive/hardware/pc/hardware.py
|
||||
|
||||
|
|
|
@ -267,6 +267,11 @@ def getNetworkType():
|
|||
return HARDWARE.get_network_type()
|
||||
|
||||
|
||||
@dispatcher.add_method
|
||||
def getNetworks():
|
||||
return HARDWARE.get_networks()
|
||||
|
||||
|
||||
@dispatcher.add_method
|
||||
def takeSnapshot():
|
||||
from selfdrive.camerad.snapshot.snapshot import snapshot, jpeg_write
|
||||
|
|
|
@ -125,3 +125,7 @@ class HardwareBase:
|
|||
@abstractmethod
|
||||
def initialize_hardware(self):
|
||||
pass
|
||||
|
||||
@abstractmethod
|
||||
def get_networks(self):
|
||||
pass
|
||||
|
|
|
@ -391,3 +391,6 @@ class Android(HardwareBase):
|
|||
|
||||
def initialize_hardware(self):
|
||||
pass
|
||||
|
||||
def get_networks(self):
|
||||
return None
|
||||
|
|
|
@ -94,3 +94,6 @@ class Pc(HardwareBase):
|
|||
|
||||
def initialize_hardware(self):
|
||||
pass
|
||||
|
||||
def get_networks(self):
|
||||
return None
|
||||
|
|
|
@ -7,6 +7,7 @@ from pathlib import Path
|
|||
from cereal import log
|
||||
from selfdrive.hardware.base import HardwareBase, ThermalConfig
|
||||
from selfdrive.hardware.tici.amplifier import Amplifier
|
||||
from selfdrive.hardware.tici import iwlist
|
||||
|
||||
NM = 'org.freedesktop.NetworkManager'
|
||||
NM_CON_ACT = NM + '.Connection.Active'
|
||||
|
@ -281,3 +282,30 @@ class Tici(HardwareBase):
|
|||
|
||||
def initialize_hardware(self):
|
||||
self.amplifier.initialize_configuration()
|
||||
|
||||
def get_networks(self):
|
||||
r = {}
|
||||
|
||||
wlan = iwlist.scan()
|
||||
if wlan is not None:
|
||||
r['wlan'] = wlan
|
||||
|
||||
lte_info = self.get_network_info()
|
||||
if lte_info is not None:
|
||||
extra = lte_info['extra']
|
||||
|
||||
# <state>,"LTE",<is_tdd>,<mcc>,<mnc>,<cellid>,<pcid>,<earfcn>,<freq_band_ind>,
|
||||
# <ul_bandwidth>,<dl_bandwidth>,<tac>,<rsrp>,<rsrq>,<rssi>,<sinr>,<srxlev>
|
||||
if 'LTE' in extra:
|
||||
extra = extra.split(',')
|
||||
try:
|
||||
r['lte'] = [{
|
||||
"mcc": int(extra[3]),
|
||||
"mnc": int(extra[4]),
|
||||
"cid": int(extra[5], 16),
|
||||
"nmr": [{"pci": int(extra[6]), "earfcn": int(extra[7])}],
|
||||
}]
|
||||
except (ValueError, IndexError):
|
||||
pass
|
||||
|
||||
return r
|
||||
|
|
|
@ -0,0 +1,35 @@
|
|||
import subprocess
|
||||
|
||||
|
||||
def scan(interface="wlan0"):
|
||||
result = []
|
||||
try:
|
||||
r = subprocess.check_output(["iwlist", interface, "scan"], encoding='utf8')
|
||||
|
||||
mac = None
|
||||
for line in r.split('\n'):
|
||||
if "Address" in line:
|
||||
# Based on the adapter eithere a percentage or dBm is returned
|
||||
# Add previous network in case no dBm signal level was seen
|
||||
if mac is not None:
|
||||
result.append({"mac": mac})
|
||||
mac = None
|
||||
|
||||
mac = line.split(' ')[-1]
|
||||
elif "dBm" in line:
|
||||
try:
|
||||
level = line.split('Signal level=')[1]
|
||||
rss = int(level.split(' ')[0])
|
||||
result.append({"mac": mac, "rss": rss})
|
||||
mac = None
|
||||
except ValueError:
|
||||
continue
|
||||
|
||||
# Add last network if no dBm was found
|
||||
if mac is not None:
|
||||
result.append({"mac": mac})
|
||||
|
||||
return result
|
||||
|
||||
except Exception:
|
||||
return None
|
Loading…
Reference in New Issue