Compare commits
2 Commits
master
...
fast-fw-fp
Author | SHA1 | Date |
---|---|---|
![]() |
9c47eb66df | |
![]() |
e483d3eb6a |
|
@ -0,0 +1,66 @@
|
||||||
|
#!/usr/bin/env python3
|
||||||
|
import time
|
||||||
|
import traceback
|
||||||
|
from typing import Any, Set
|
||||||
|
|
||||||
|
import cereal.messaging as messaging
|
||||||
|
from panda.python.uds import SERVICE_TYPE
|
||||||
|
from selfdrive.boardd.boardd import can_list_to_can_capnp
|
||||||
|
from selfdrive.swaglog import cloudlog
|
||||||
|
|
||||||
|
# TODO: figure out type annotation for CanData
|
||||||
|
def is_tester_present_response(msg: Any) -> bool:
|
||||||
|
# ISO-TP messages are always padded to 8 bytes
|
||||||
|
# tester present response is always a single frame
|
||||||
|
if len(msg.dat) == 8 and msg.dat[0] >= 1 and msg.dat[0] <= 7:
|
||||||
|
# success response
|
||||||
|
if msg.dat[1] == (SERVICE_TYPE.TESTER_PRESENT + 0x40):
|
||||||
|
return True
|
||||||
|
# error response
|
||||||
|
if msg.dat[1] == 0x7F and msg.dat[2] == SERVICE_TYPE.TESTER_PRESENT:
|
||||||
|
return True
|
||||||
|
return False
|
||||||
|
|
||||||
|
def get_ecu_addrs(logcan: messaging.SubSocket, sendcan: messaging.PubSocket, bus: int, timeout: float=1, debug: bool=True) -> Set[int]:
|
||||||
|
ecu_addrs = set()
|
||||||
|
try:
|
||||||
|
addr_list = [0x700 + i for i in range(256)] + [0x18da00f1 + (i << 8) for i in range(256)]
|
||||||
|
tester_present = bytes([0x02, SERVICE_TYPE.TESTER_PRESENT, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0])
|
||||||
|
msgs = [[addr, 0, tester_present, bus] for addr in addr_list]
|
||||||
|
|
||||||
|
messaging.drain_sock(logcan)
|
||||||
|
sendcan.send(can_list_to_can_capnp(msgs, msgtype='sendcan'))
|
||||||
|
start_time = time.monotonic()
|
||||||
|
while time.monotonic() - start_time < timeout:
|
||||||
|
can_packets = messaging.drain_sock(logcan, wait_for_one=True)
|
||||||
|
for packet in can_packets:
|
||||||
|
for msg in packet.can:
|
||||||
|
if msg.src == bus and msg.address in addr_list and is_tester_present_response(msg):
|
||||||
|
if debug:
|
||||||
|
print(f"CAN-RX: {hex(msg.address)} - 0x{bytes.hex(msg.dat)}")
|
||||||
|
if msg.address in ecu_addrs:
|
||||||
|
print(f"Duplicate ECU address: {hex(msg.address)}")
|
||||||
|
ecu_addrs.add(msg.address)
|
||||||
|
except Exception:
|
||||||
|
cloudlog.warning(f"ECU addr scan exception: {traceback.format_exc()}")
|
||||||
|
return ecu_addrs
|
||||||
|
|
||||||
|
if __name__ == "__main__":
|
||||||
|
import argparse
|
||||||
|
|
||||||
|
parser = argparse.ArgumentParser(description='Get addresses of all ECUs')
|
||||||
|
parser.add_argument('--debug', action='store_true')
|
||||||
|
args = parser.parse_args()
|
||||||
|
|
||||||
|
logcan = messaging.sub_sock('can')
|
||||||
|
sendcan = messaging.pub_sock('sendcan')
|
||||||
|
|
||||||
|
time.sleep(1.0)
|
||||||
|
|
||||||
|
print("Getting ECU addresses ...")
|
||||||
|
ecu_addrs = get_ecu_addrs(logcan, sendcan, 1, debug=args.debug)
|
||||||
|
|
||||||
|
print()
|
||||||
|
print("Found ECUs on addresses:")
|
||||||
|
for addr in ecu_addrs:
|
||||||
|
print(f" {hex(addr)}")
|
|
@ -8,6 +8,7 @@ from tqdm import tqdm
|
||||||
|
|
||||||
import panda.python.uds as uds
|
import panda.python.uds as uds
|
||||||
from cereal import car
|
from cereal import car
|
||||||
|
from selfdrive.car.ecu_addrs import get_ecu_addrs
|
||||||
from selfdrive.car.fingerprints import FW_VERSIONS, get_attr_from_cars
|
from selfdrive.car.fingerprints import FW_VERSIONS, get_attr_from_cars
|
||||||
from selfdrive.car.isotp_parallel_query import IsoTpParallelQuery
|
from selfdrive.car.isotp_parallel_query import IsoTpParallelQuery
|
||||||
from selfdrive.car.toyota.values import CAR as TOYOTA
|
from selfdrive.car.toyota.values import CAR as TOYOTA
|
||||||
|
@ -272,6 +273,29 @@ def match_fw_to_car(fw_versions, allow_fuzzy=True):
|
||||||
|
|
||||||
return exact_match, matches
|
return exact_match, matches
|
||||||
|
|
||||||
|
def get_brand_candidates(logcan, sendcan, bus, versions):
|
||||||
|
response_offset_by_brand = dict()
|
||||||
|
for brand, _, _, response_offset in REQUESTS:
|
||||||
|
response_offset_by_brand[brand] = response_offset
|
||||||
|
|
||||||
|
response_addrs_by_brand = dict()
|
||||||
|
for brand, brand_versions in versions.items():
|
||||||
|
if brand not in response_offset_by_brand:
|
||||||
|
continue
|
||||||
|
|
||||||
|
response_addrs_by_brand[brand] = set()
|
||||||
|
for c in brand_versions.values():
|
||||||
|
for _, addr, _ in c.keys():
|
||||||
|
response_addr = addr + response_offset_by_brand[brand]
|
||||||
|
response_addrs_by_brand[brand].add(response_addr)
|
||||||
|
|
||||||
|
ecu_response_addrs = get_ecu_addrs(logcan, sendcan, bus)
|
||||||
|
|
||||||
|
brand_candidates = list()
|
||||||
|
for brand, brand_addrs in response_addrs_by_brand.items():
|
||||||
|
if len(brand_addrs.intersection(ecu_response_addrs)) >= 4:
|
||||||
|
brand_candidates.append(brand)
|
||||||
|
return brand_candidates
|
||||||
|
|
||||||
def get_fw_versions(logcan, sendcan, bus, extra=None, timeout=0.1, debug=False, progress=False):
|
def get_fw_versions(logcan, sendcan, bus, extra=None, timeout=0.1, debug=False, progress=False):
|
||||||
ecu_types = {}
|
ecu_types = {}
|
||||||
|
@ -285,7 +309,10 @@ def get_fw_versions(logcan, sendcan, bus, extra=None, timeout=0.1, debug=False,
|
||||||
if extra is not None:
|
if extra is not None:
|
||||||
versions.update(extra)
|
versions.update(extra)
|
||||||
|
|
||||||
|
brand_candidates = get_brand_candidates(logcan, sendcan, bus, versions)
|
||||||
for brand, brand_versions in versions.items():
|
for brand, brand_versions in versions.items():
|
||||||
|
if brand_candidates and brand not in brand_candidates:
|
||||||
|
continue
|
||||||
for c in brand_versions.values():
|
for c in brand_versions.values():
|
||||||
for ecu_type, addr, sub_addr in c.keys():
|
for ecu_type, addr, sub_addr in c.keys():
|
||||||
a = (brand, addr, sub_addr)
|
a = (brand, addr, sub_addr)
|
||||||
|
|
Loading…
Reference in New Issue