nopenpilot/selfdrive/hardware/eon/hardware.py

424 lines
13 KiB
Python

import binascii
import itertools
import os
import re
import serial
import struct
import subprocess
from typing import List, Union
from cereal import log
from selfdrive.hardware.base import HardwareBase, ThermalConfig
try:
from common.params import Params
except Exception:
# openpilot is not built yet
Params = None
NetworkType = log.DeviceState.NetworkType
NetworkStrength = log.DeviceState.NetworkStrength
MODEM_PATH = "/dev/smd11"
def service_call(call: List[str]) -> Union[bytes, None]:
try:
ret = subprocess.check_output(["service", "call", *call], encoding='utf8').strip()
if 'Parcel' not in ret:
return None
return parse_service_call_bytes(ret)
except subprocess.CalledProcessError:
return None
def parse_service_call_unpack(r, fmt) -> Union[bytes, None]:
try:
return struct.unpack(fmt, r)[0]
except Exception:
return None
def parse_service_call_string(r: bytes) -> Union[str, None]:
try:
r = r[8:] # Cut off length field
r_str = r.decode('utf_16_be')
# All pairs of two characters seem to be swapped. Not sure why
result = ""
for a, b, in itertools.zip_longest(r_str[::2], r_str[1::2], fillvalue='\x00'):
result += b + a
return result.replace('\x00', '')
except Exception:
return None
def parse_service_call_bytes(ret: str) -> Union[bytes, None]:
try:
r = b""
for hex_part in re.findall(r'[ (]([0-9a-f]{8})', ret):
r += binascii.unhexlify(hex_part)
return r
except Exception:
return None
def getprop(key: str) -> Union[str, None]:
try:
return subprocess.check_output(["getprop", key], encoding='utf8').strip()
except subprocess.CalledProcessError:
return None
class Android(HardwareBase):
def get_os_version(self):
with open("/VERSION") as f:
return f.read().strip()
def get_device_type(self):
try:
if int(Params().get("LastPeripheralPandaType")) == log.PandaState.PandaType.uno:
return "two"
except Exception:
pass
return "eon"
def get_sound_card_online(self):
return (os.path.isfile('/proc/asound/card0/state') and
open('/proc/asound/card0/state').read().strip() == 'ONLINE')
def get_imei(self, slot):
slot = str(slot)
if slot not in ("0", "1"):
raise ValueError("SIM slot must be 0 or 1")
return parse_service_call_string(service_call(["iphonesubinfo", "3", "i32", str(slot)]))
def get_serial(self):
ret = getprop("ro.serialno")
if len(ret) == 0:
ret = "cccccccc"
return ret
def get_subscriber_info(self):
ret = parse_service_call_string(service_call(["iphonesubinfo", "7"]))
if ret is None or len(ret) < 8:
return ""
return ret
def reboot(self, reason=None):
# e.g. reason="recovery" to go into recover mode
if reason is None:
reason_args = ["null"]
else:
reason_args = ["s16", reason]
subprocess.check_output([
"service", "call", "power", "16", # IPowerManager.reboot
"i32", "0", # no confirmation,
*reason_args,
"i32", "1" # wait
])
def uninstall(self):
with open('/cache/recovery/command', 'w') as f:
f.write('--wipe_data\n')
# IPowerManager.reboot(confirm=false, reason="recovery", wait=true)
self.reboot(reason="recovery")
def get_sim_info(self):
# Used for athena
# TODO: build using methods from this class
sim_state = getprop("gsm.sim.state").split(",")
network_type = getprop("gsm.network.type").split(',')
mcc_mnc = getprop("gsm.sim.operator.numeric") or None
sim_id = parse_service_call_string(service_call(['iphonesubinfo', '11']))
cell_data_state = parse_service_call_unpack(service_call(['phone', '46']), ">q")
cell_data_connected = (cell_data_state == 2)
return {
'sim_id': sim_id,
'mcc_mnc': mcc_mnc,
'network_type': network_type,
'sim_state': sim_state,
'data_connected': cell_data_connected
}
def get_network_info(self):
msg = log.DeviceState.NetworkInfo.new_message()
msg.state = getprop("gsm.sim.state") or ""
msg.technology = getprop("gsm.network.type") or ""
msg.operator = getprop("gsm.sim.operator.numeric") or ""
try:
modem = serial.Serial(MODEM_PATH, 115200, timeout=0.1)
modem.write(b"AT$QCRSRP?\r")
msg.extra = modem.read_until(b"OK\r\n").decode('utf-8')
rsrp = msg.extra.split("$QCRSRP: ")[1].split("\r")[0].split(",")
msg.channel = int(rsrp[1])
except Exception:
pass
return msg
def get_network_type(self):
wifi_check = parse_service_call_string(service_call(["connectivity", "2"]))
if wifi_check is None:
return NetworkType.none
elif 'WIFI' in wifi_check:
return NetworkType.wifi
else:
cell_check = parse_service_call_unpack(service_call(['phone', '59']), ">q")
# from TelephonyManager.java
cell_networks = {
0: NetworkType.none,
1: NetworkType.cell2G,
2: NetworkType.cell2G,
3: NetworkType.cell3G,
4: NetworkType.cell2G,
5: NetworkType.cell3G,
6: NetworkType.cell3G,
7: NetworkType.cell3G,
8: NetworkType.cell3G,
9: NetworkType.cell3G,
10: NetworkType.cell3G,
11: NetworkType.cell2G,
12: NetworkType.cell3G,
13: NetworkType.cell4G,
14: NetworkType.cell4G,
15: NetworkType.cell3G,
16: NetworkType.cell2G,
17: NetworkType.cell3G,
18: NetworkType.cell4G,
19: NetworkType.cell4G
}
return cell_networks.get(cell_check, NetworkType.none)
def get_network_strength(self, network_type):
network_strength = NetworkStrength.unknown
# from SignalStrength.java
def get_lte_level(rsrp, rssnr):
INT_MAX = 2147483647
if rsrp == INT_MAX:
lvl_rsrp = NetworkStrength.unknown
elif rsrp >= -95:
lvl_rsrp = NetworkStrength.great
elif rsrp >= -105:
lvl_rsrp = NetworkStrength.good
elif rsrp >= -115:
lvl_rsrp = NetworkStrength.moderate
else:
lvl_rsrp = NetworkStrength.poor
if rssnr == INT_MAX:
lvl_rssnr = NetworkStrength.unknown
elif rssnr >= 45:
lvl_rssnr = NetworkStrength.great
elif rssnr >= 10:
lvl_rssnr = NetworkStrength.good
elif rssnr >= -30:
lvl_rssnr = NetworkStrength.moderate
else:
lvl_rssnr = NetworkStrength.poor
return max(lvl_rsrp, lvl_rssnr)
def get_tdscdma_level(tdscmadbm):
lvl = NetworkStrength.unknown
if tdscmadbm > -25:
lvl = NetworkStrength.unknown
elif tdscmadbm >= -49:
lvl = NetworkStrength.great
elif tdscmadbm >= -73:
lvl = NetworkStrength.good
elif tdscmadbm >= -97:
lvl = NetworkStrength.moderate
elif tdscmadbm >= -110:
lvl = NetworkStrength.poor
return lvl
def get_gsm_level(asu):
if asu <= 2 or asu == 99:
lvl = NetworkStrength.unknown
elif asu >= 12:
lvl = NetworkStrength.great
elif asu >= 8:
lvl = NetworkStrength.good
elif asu >= 5:
lvl = NetworkStrength.moderate
else:
lvl = NetworkStrength.poor
return lvl
def get_evdo_level(evdodbm, evdosnr):
lvl_evdodbm = NetworkStrength.unknown
lvl_evdosnr = NetworkStrength.unknown
if evdodbm >= -65:
lvl_evdodbm = NetworkStrength.great
elif evdodbm >= -75:
lvl_evdodbm = NetworkStrength.good
elif evdodbm >= -90:
lvl_evdodbm = NetworkStrength.moderate
elif evdodbm >= -105:
lvl_evdodbm = NetworkStrength.poor
if evdosnr >= 7:
lvl_evdosnr = NetworkStrength.great
elif evdosnr >= 5:
lvl_evdosnr = NetworkStrength.good
elif evdosnr >= 3:
lvl_evdosnr = NetworkStrength.moderate
elif evdosnr >= 1:
lvl_evdosnr = NetworkStrength.poor
return max(lvl_evdodbm, lvl_evdosnr)
def get_cdma_level(cdmadbm, cdmaecio):
lvl_cdmadbm = NetworkStrength.unknown
lvl_cdmaecio = NetworkStrength.unknown
if cdmadbm >= -75:
lvl_cdmadbm = NetworkStrength.great
elif cdmadbm >= -85:
lvl_cdmadbm = NetworkStrength.good
elif cdmadbm >= -95:
lvl_cdmadbm = NetworkStrength.moderate
elif cdmadbm >= -100:
lvl_cdmadbm = NetworkStrength.poor
if cdmaecio >= -90:
lvl_cdmaecio = NetworkStrength.great
elif cdmaecio >= -110:
lvl_cdmaecio = NetworkStrength.good
elif cdmaecio >= -130:
lvl_cdmaecio = NetworkStrength.moderate
elif cdmaecio >= -150:
lvl_cdmaecio = NetworkStrength.poor
return max(lvl_cdmadbm, lvl_cdmaecio)
if network_type == NetworkType.none:
return network_strength
if network_type == NetworkType.wifi:
out = subprocess.check_output('dumpsys connectivity', shell=True).decode('utf-8')
network_strength = NetworkStrength.unknown
for line in out.split('\n'):
signal_str = "SignalStrength: "
if signal_str in line:
lvl_idx_start = line.find(signal_str) + len(signal_str)
lvl_idx_end = line.find(']', lvl_idx_start)
lvl = int(line[lvl_idx_start : lvl_idx_end])
if lvl >= -50:
network_strength = NetworkStrength.great
elif lvl >= -60:
network_strength = NetworkStrength.good
elif lvl >= -70:
network_strength = NetworkStrength.moderate
else:
network_strength = NetworkStrength.poor
return network_strength
else:
# check cell strength
out = subprocess.check_output('dumpsys telephony.registry', shell=True).decode('utf-8')
for line in out.split('\n'):
if "mSignalStrength" in line:
arr = line.split(' ')
ns = 0
if ("gsm" in arr[14]):
rsrp = int(arr[9])
rssnr = int(arr[11])
ns = get_lte_level(rsrp, rssnr)
if ns == NetworkStrength.unknown:
tdscmadbm = int(arr[13])
ns = get_tdscdma_level(tdscmadbm)
if ns == NetworkStrength.unknown:
asu = int(arr[1])
ns = get_gsm_level(asu)
else:
cdmadbm = int(arr[3])
cdmaecio = int(arr[4])
evdodbm = int(arr[5])
evdosnr = int(arr[7])
lvl_cdma = get_cdma_level(cdmadbm, cdmaecio)
lvl_edmo = get_evdo_level(evdodbm, evdosnr)
if lvl_edmo == NetworkStrength.unknown:
ns = lvl_cdma
elif lvl_cdma == NetworkStrength.unknown:
ns = lvl_edmo
else:
ns = min(lvl_cdma, lvl_edmo)
network_strength = max(network_strength, ns)
return network_strength
def get_battery_capacity(self):
return self.read_param_file("/sys/class/power_supply/battery/capacity", int, 100)
def get_battery_status(self):
# This does not correspond with actual charging or not.
# If a USB cable is plugged in, it responds with 'Charging', even when charging is disabled
return self.read_param_file("/sys/class/power_supply/battery/status", lambda x: x.strip(), '')
def get_battery_current(self):
return self.read_param_file("/sys/class/power_supply/battery/current_now", int)
def get_battery_voltage(self):
return self.read_param_file("/sys/class/power_supply/battery/voltage_now", int)
def get_battery_charging(self):
# This does correspond with actually charging
return self.read_param_file("/sys/class/power_supply/battery/charge_type", lambda x: x.strip() != "N/A", True)
def set_battery_charging(self, on):
with open('/sys/class/power_supply/battery/charging_enabled', 'w') as f:
f.write(f"{1 if on else 0}\n")
def get_usb_present(self):
return self.read_param_file("/sys/class/power_supply/usb/present", lambda x: bool(int(x)), False)
def get_current_power_draw(self):
# We don't have a good direct way to measure this on android
return None
def shutdown(self):
os.system('LD_LIBRARY_PATH="" svc power shutdown')
def get_thermal_config(self):
# the thermal sensors on the 820 don't have meaningful names
return ThermalConfig(cpu=((5, 7, 10, 12), 10), gpu=((16,), 10), mem=(2, 10),
bat=("battery", 1000), ambient=("pa_therm0", 1), pmic=(("pm8994_tz",), 1000))
def set_screen_brightness(self, percentage):
with open("/sys/class/leds/lcd-backlight/brightness", "w") as f:
f.write(str(int(percentage * 2.55)))
def get_screen_brightness(self):
try:
with open("/sys/class/leds/lcd-backlight/brightness") as f:
return int(float(f.read()) / 2.55)
except Exception:
return 0
def set_power_save(self, powersave_enabled):
pass
def get_gpu_usage_percent(self):
try:
used, total = open('/sys/devices/soc/b00000.qcom,kgsl-3d0/kgsl/kgsl-3d0/gpubusy').read().strip().split()
perc = 100.0 * int(used) / int(total)
return min(max(perc, 0), 100)
except Exception:
return 0
def get_modem_version(self):
return None
def get_modem_temperatures(self):
# Not sure if we can get this on the LeEco
return []
def get_nvme_temperatures(self):
return []
def initialize_hardware(self):
pass
def get_networks(self):
return None