nopenpilot/selfdrive/pandad.py

127 lines
3.8 KiB
Python
Executable File

#!/usr/bin/env python3
# simple boardd wrapper that updates the panda first
import os
import usb1
import time
import subprocess
from typing import List, NoReturn
from functools import cmp_to_key
from panda import DEFAULT_FW_FN, DEFAULT_H7_FW_FN, MCU_TYPE_H7, Panda, PandaDFU
from common.basedir import BASEDIR
from common.params import Params
from selfdrive.swaglog import cloudlog
def get_expected_signature(panda : Panda) -> bytes:
fn = DEFAULT_H7_FW_FN if (panda.get_mcu_type() == MCU_TYPE_H7) else DEFAULT_FW_FN
try:
return Panda.get_signature_from_firmware(fn)
except Exception:
cloudlog.exception("Error computing expected signature")
return b""
def flash_panda(panda_serial : str) -> Panda:
panda = Panda(panda_serial)
fw_signature = get_expected_signature(panda)
panda_version = "bootstub" if panda.bootstub else panda.get_version()
panda_signature = b"" if panda.bootstub else panda.get_signature()
cloudlog.warning("Panda {} connected, version: {}, signature {}, expected {}".format(
panda_serial,
panda_version,
panda_signature.hex()[:16],
fw_signature.hex()[:16],
))
if panda.bootstub or panda_signature != fw_signature:
cloudlog.info("Panda firmware out of date, update required")
panda.flash()
cloudlog.info("Done flashing")
if panda.bootstub:
bootstub_version = panda.get_version()
cloudlog.info(f"Flashed firmware not booting, flashing development bootloader. Bootstub version: {bootstub_version}")
panda.recover()
cloudlog.info("Done flashing bootloader")
if panda.bootstub:
cloudlog.info("Panda still not booting, exiting")
raise AssertionError
panda_signature = panda.get_signature()
if panda_signature != fw_signature:
cloudlog.info("Version mismatch after flashing, exiting")
raise AssertionError
return panda
def panda_sort_cmp(a : Panda, b : Panda):
a_type = a.get_type()
b_type = b.get_type()
# make sure the internal one is always first
if a.is_internal() and not b.is_internal():
return -1
if not a.is_internal() and b.is_internal():
return 1
# sort by hardware type
if a_type != b_type:
return a_type < b_type
# last resort: sort by serial number
return a.get_usb_serial() < b.get_usb_serial()
def main() -> NoReturn:
while True:
try:
# Flash all Pandas in DFU mode
for p in PandaDFU.list():
cloudlog.info(f"Panda in DFU mode found, flashing recovery {p}")
PandaDFU(p).recover()
time.sleep(1)
panda_serials = Panda.list()
if len(panda_serials) == 0:
continue
cloudlog.info(f"{len(panda_serials)} panda(s) found, connecting - {panda_serials}")
# Flash pandas
pandas = []
for serial in panda_serials:
pandas.append(flash_panda(serial))
# check health for lost heartbeat
for panda in pandas:
health = panda.health()
if health["heartbeat_lost"]:
Params().put_bool("PandaHeartbeatLost", True)
cloudlog.event("heartbeat lost", deviceState=health, serial=panda.get_usb_serial())
cloudlog.info(f"Resetting panda {panda.get_usb_serial()}")
panda.reset()
# sort pandas to have deterministic order
pandas.sort(key=cmp_to_key(panda_sort_cmp))
panda_serials = list(map(lambda p: p.get_usb_serial(), pandas))
# close all pandas
for p in pandas:
p.close()
except (usb1.USBErrorNoDevice, usb1.USBErrorPipe):
# a panda was disconnected while setting everything up. let's try again
cloudlog.exception("Panda USB exception while setting up")
continue
# run boardd with all connected serials as arguments
os.chdir(os.path.join(BASEDIR, "selfdrive/boardd"))
subprocess.run(["./boardd", *panda_serials], check=True)
if __name__ == "__main__":
main()