UDS: handle remote addressing

Willem Melching 2019-12-16 15:54:30 -08:00
parent 39818691dd
commit ded4b3f743
1 changed files with 33 additions and 14 deletions

View File

@ -268,17 +268,19 @@ _negative_response_codes = {
0x93: 'voltage too low',
class CanClient():
def __init__(self, can_send: Callable[[Tuple[int, bytes, int]], None], can_recv: Callable[[], List[Tuple[int, int, bytes, int]]], tx_addr: int, rx_addr: int, bus: int, debug: bool=False):
def __init__(self, can_send: Callable[[Tuple[int, bytes, int]], None], can_recv: Callable[[], List[Tuple[int, int, bytes, int]]], tx_addr: int, rx_addr: int, bus: int, sub_addr: int=None, debug: bool=False):
self.tx = can_send
self.rx = can_recv
self.tx_addr = tx_addr
self.rx_addr = rx_addr
self.sub_addr = sub_addr
self.bus = bus
self.debug = debug
def _recv_filter(self, bus, addr):
# handle functionl addresses (switch to first addr to respond)
# handle functional addresses (switch to first addr to respond)
if self.tx_addr == 0x7DF:
is_response = addr >= 0x7E8 and addr <= 0x7EF
if is_response:
@ -303,8 +305,14 @@ class CanClient():
for rx_addr, rx_ts, rx_data, rx_bus in msgs or []:
if self._recv_filter(rx_bus, rx_addr) and len(rx_data) > 0:
rx_data = bytes(rx_data) # convert bytearray to bytes
rx_data = bytes(rx_data) # convert bytearray to bytes
if self.debug: print(f"CAN-RX: {hex(rx_addr)} - 0x{bytes.hex(rx_data)}")
# Cut off sub addr in first byte
if self.sub_addr is not None:
rx_data = rx_data[1:]
# break when non-full buffer is processed
if len(msgs) < 254:
@ -316,15 +324,23 @@ class CanClient():
if delay and not first:
if self.debug: print(f"CAN-TX: delay - {delay}")
if self.sub_addr is not None:
msg = bytes([self.sub_addr]) + msg
if self.debug: print(f"CAN-TX: {hex(self.tx_addr)} - 0x{bytes.hex(msg)}")
assert len(msg) <= 8
self.tx(self.tx_addr, msg, self.bus)
first = False
class IsoTpMessage():
def __init__(self, can_client: CanClient, timeout: float=1, debug: bool=False):
def __init__(self, can_client: CanClient, timeout: float=1, debug: bool=False, max_len: int=8):
self._can_client = can_client
self.timeout = timeout
self.debug = debug
self.max_len = max_len
def send(self, dat: bytes) -> None:
# throw away any stale data
@ -347,12 +363,12 @@ class IsoTpMessage():
if self.tx_len < 8:
# single frame (send all bytes)
if self.debug: print("ISO-TP: TX - single frame")
msg = (bytes([self.tx_len]) + self.tx_dat).ljust(8, b"\x00")
msg = (bytes([self.tx_len]) + self.tx_dat).ljust(self.max_len, b"\x00")
self.tx_done = True
# first frame (send first 6 bytes)
if self.debug: print("ISO-TP: TX - first frame")
msg = (struct.pack("!H", 0x1000 | self.tx_len) + self.tx_dat[:6]).ljust(8, b"\x00")
msg = (struct.pack("!H", 0x1000 | self.tx_len) + self.tx_dat[:6]).ljust(self.max_len, b"\x00")
def recv(self) -> bytes:
@ -390,7 +406,7 @@ class IsoTpMessage():
if self.debug: print(f"ISO-TP: RX - first frame - idx={self.rx_idx} done={self.rx_done}")
if self.debug: print(f"ISO-TP: TX - flow control continue")
# send flow control message (send all bytes)
msg = b"\x30\x00\x00".ljust(8, b"\x00")
msg = b"\x30\x00\x00".ljust(self.max_len, b"\x00")
@ -400,7 +416,7 @@ class IsoTpMessage():
self.rx_idx += 1
assert self.rx_idx & 0xF == rx_data[0] & 0xF, "isotp - rx: invalid consecutive frame index"
rx_size = self.rx_len - len(self.rx_dat)
self.rx_dat += rx_data[1:1+min(rx_size, 7)]
self.rx_dat += rx_data[1:1+rx_size]
if self.rx_len == len(self.rx_dat):
self.rx_done = True
if self.debug: print(f"ISO-TP: RX - consecutive frame - idx={self.rx_idx} done={self.rx_done}")
@ -417,15 +433,17 @@ class IsoTpMessage():
# scale is 1 milliseconds if first bit == 0, 100 micro seconds if first bit == 1
delay_div = 1000. if rx_data[2] & 0x80 == 0 else 10000.
delay_sec = delay_ts / delay_div
# first frame = 6 bytes, each consecutive frame = 7 bytes
start = 6 + self.tx_idx * 7
num_bytes = self.max_len - 1
start = 6 + self.tx_idx * num_bytes
count = rx_data[1]
end = start + count * 7 if count > 0 else self.tx_len
end = start + count * num_bytes if count > 0 else self.tx_len
tx_msgs = []
for i in range(start, end, 7):
for i in range(start, end, num_bytes):
self.tx_idx += 1
# consecutive tx messages
msg = (bytes([0x20 | (self.tx_idx & 0xF)]) + self.tx_dat[i:i+7]).ljust(8, b"\x00")
msg = (bytes([0x20 | (self.tx_idx & 0xF)]) + self.tx_dat[i:i+num_bytes]).ljust(self.max_len, b"\x00")
# send consecutive tx messages
self._can_client.send(tx_msgs, delay=delay_sec)
@ -445,13 +463,14 @@ def get_rx_addr_for_tx_addr(tx_addr):
if tx_addr < 0xFFF8:
# standard 11 bit response addr (add 8)
return tx_addr + 8
if tx_addr > 0x10000000 and tx_addr < 0xFFFFFFFF:
# standard 29 bit response addr (flip last two bytes)
return (tx_addr & 0xFFFF0000) + (tx_addr<<8 & 0xFF00) + (tx_addr>>8 & 0xFF)
raise ValueError("invalid tx_addr: {}".format(tx_addr))
class UdsClient():
def __init__(self, panda, tx_addr: int, rx_addr: int=None, bus: int=0, timeout: float=1, debug: bool=False):
self.bus = bus
@ -714,7 +733,7 @@ class UdsClient():
data += bytes([dtc_severity_mask_type, dtc_status_mask_type])
resp = self._uds_request(SERVICE_TYPE.READ_DTC_INFORMATION, subfunction=dtc_report_type, data=data)
# TODO: parse response