From 4454e3a6bb090b9b2cc65ba4ec5943f47fe266b4 Mon Sep 17 00:00:00 2001 From: Greg Hogan Date: Tue, 15 Oct 2019 12:07:19 -0700 Subject: [PATCH] better CAN comm abstraction --- python/uds.py | 257 ++++++++++++++++++++++++++++---------------------- 1 file changed, 143 insertions(+), 114 deletions(-) diff --git a/python/uds.py b/python/uds.py index 5acfb45..2fdb68e 100644 --- a/python/uds.py +++ b/python/uds.py @@ -267,6 +267,117 @@ _negative_response_codes = { 0x93: 'voltage too low', } +class IsoTpMessage(): + def __init__(self, can_tx_queue: Queue, can_rx_queue: Queue, timeout: float, debug: bool=False): + self.can_tx_queue = can_tx_queue + self.can_rx_queue = can_rx_queue + self.timeout = timeout + self.debug = debug + + def send(self, dat: bytes) -> None: + self.tx_dat = dat + self.tx_len = len(dat) + self.tx_idx = 0 + self.tx_done = False + + if self.debug: print(f"ISO-TP: REQUEST - {hexlify(self.tx_dat)}") + self._tx_first_frame() + + def _tx_first_frame(self) -> None: + if self.tx_len < 8: + # single frame (send all bytes) + if self.debug: print("ISO-TP: TX - single frame") + msg = (bytes([self.tx_len]) + self.tx_dat).ljust(8, b"\x00") + self.tx_done = True + else: + # first frame (send first 6 bytes) + if self.debug: print("ISO-TP: TX - first frame") + msg = (struct.pack("!H", 0x1000 | self.tx_len) + self.tx_dat[:6]).ljust(8, b"\x00") + self.can_tx_queue.put(msg) + + def recv(self) -> bytes: + self.rx_dat = b"" + self.rx_len = 0 + self.rx_idx = 0 + self.rx_done = False + + try: + while True: + self._isotp_rx_next() + if self.tx_done and self.rx_done: + return self.rx_dat + except Empty: + raise MessageTimeoutError("timeout waiting for response") + finally: + if self.debug: print(f"ISO-TP: RESPONSE - {hexlify(self.rx_dat)}") + + def _isotp_rx_next(self) -> None: + rx_data = self.can_rx_queue.get(block=True, timeout=self.timeout) + + # single rx_frame + if rx_data[0] >> 4 == 0x0: + self.rx_len = rx_data[0] & 0xFF + self.rx_dat = rx_data[1:1+self.rx_len] + self.rx_idx = 0 + self.rx_done = True + if self.debug: print(f"ISO-TP: RX - single frame - idx={self.rx_idx} done={self.rx_done}") + return + + # first rx_frame + if rx_data[0] >> 4 == 0x1: + self.rx_len = ((rx_data[0] & 0x0F) << 8) + rx_data[1] + self.rx_dat = rx_data[2:] + self.rx_idx = 0 + self.rx_done = False + if self.debug: print(f"ISO-TP: RX - first frame - idx={self.rx_idx} done={self.rx_done}") + if self.debug: print(f"ISO-TP: TX - flow control continue") + # send flow control message (send all bytes) + msg = b"\x30\x00\x00".ljust(8, b"\x00") + self.can_tx_queue.put(msg) + return + + # consecutive rx frame + if rx_data[0] >> 4 == 0x2: + assert self.rx_done == False, "isotp - rx: consecutive frame with no active frame" + self.rx_idx += 1 + assert self.rx_idx & 0xF == rx_data[0] & 0xF, "isotp - rx: invalid consecutive frame index" + rx_size = self.rx_len - len(self.rx_dat) + self.rx_dat += rx_data[1:1+min(rx_size, 7)] + if self.rx_len == len(self.rx_dat): + self.rx_done = True + if self.debug: print(f"ISO-TP: RX - consecutive frame - idx={self.rx_idx} done={self.rx_done}") + return + + # flow control + if rx_data[0] >> 4 == 0x3: + assert self.tx_done == False, "isotp - rx: flow control with no active frame" + assert rx_data[0] != 0x32, "isotp - rx: flow-control overflow/abort" + assert rx_data[0] == 0x30 or rx_data[0] == 0x31, "isotp - rx: flow-control transfer state indicator invalid" + if rx_data[0] == 0x30: + if self.debug: print("ISO-TP: RX - flow control continue") + delay_ts = rx_data[2] & 0x7F + # scale is 1 milliseconds if first bit == 0, 100 micro seconds if first bit == 1 + delay_div = 1000. if rx_data[2] & 0x80 == 0 else 10000. + # first frame = 6 bytes, each consecutive frame = 7 bytes + start = 6 + self.tx_idx * 7 + count = rx_data[1] + end = start + count * 7 if count > 0 else self.tx_len + for i in range(start, end, 7): + if delay_ts > 0 and i > start: + delay_s = delay_ts / delay_div + if self.debug: print(f"ISO-TP: TX - delay - seconds={delay_s}") + time.sleep(delay_s) + self.tx_idx += 1 + # consecutive tx frames + msg = (bytes([0x20 | (self.tx_idx & 0xF)]) + self.tx_dat[i:i+7]).ljust(8, b"\x00") + self.can_tx_queue.put(msg) + if end >= self.tx_len: + self.tx_done = True + if self.debug: print(f"ISO-TP: TX - consecutive frame - idx={self.tx_idx} done={self.tx_done}") + elif rx_data[0] == 0x31: + # wait (do nothing until next flow control message) + if self.debug: print("ISO-TP: TX - flow control wait") + class UdsClient(): def __init__(self, panda, tx_addr: int, rx_addr: int=None, bus: int=0, timeout: int=10, debug: bool=False): self.panda = panda @@ -282,20 +393,17 @@ class UdsClient(): else: raise ValueError("invalid tx_addr: {}".format(tx_addr)) - self.tx_queue = Queue() - self.rx_queue = Queue() + self.can_tx_queue = Queue() + self.can_rx_queue = Queue() self.timeout = timeout self.debug = debug - self.can_reader_t = Thread(target=self._isotp_thread, args=(self.debug,)) - self.can_reader_t.daemon = True - self.can_reader_t.start() + self.can_thread = Thread(target=self._can_thread, args=(self.debug,)) + self.can_thread.daemon = True + self.can_thread.start() - def _isotp_thread(self, debug: bool=False): + def _can_thread(self, debug: bool=False): try: - rx_frame = {"size": 0, "data": b"", "idx": 0, "done": True} - tx_frame = {"size": 0, "data": b"", "idx": 0, "done": True} - # allow all output self.panda.set_safety_mode(0x1337) # clear tx buffer @@ -304,96 +412,23 @@ class UdsClient(): self.panda.can_clear(0xFFFF) while True: - messages = self.panda.can_recv() - for rx_addr, rx_ts, rx_data, rx_bus in messages: + # send + while not self.can_tx_queue.empty(): + msg = self.can_tx_queue.get(block=False) + if debug: print("CAN-TX: {} - {}".format(hex(self.tx_addr), hexlify(msg))) + self.panda.can_send(self.tx_addr, msg, self.bus) + + # receive + msgs = self.panda.can_recv() + for rx_addr, rx_ts, rx_data, rx_bus in msgs: if rx_bus != self.bus or rx_addr != self.rx_addr or len(rx_data) == 0: continue - - if (debug): print("R: {} {}".format(hex(rx_addr), hexlify(rx_data))) - if rx_data[0] >> 4 == 0x0: - # single rx_frame - rx_frame["size"] = rx_data[0] & 0xFF - rx_frame["data"] = rx_data[1:1+rx_frame["size"]] - rx_frame["idx"] = 0 - rx_frame["done"] = True - self.rx_queue.put(rx_frame["data"]) - elif rx_data[0] >> 4 == 0x1: - # first rx_frame - rx_frame["size"] = ((rx_data[0] & 0x0F) << 8) + rx_data[1] - rx_frame["data"] = rx_data[2:] - rx_frame["idx"] = 0 - rx_frame["done"] = False - # send flow control message (send all bytes) - msg = b"\x30\x00\x00".ljust(8, b"\x00") - if (debug): print("S: {} {}".format(hex(self.tx_addr), hexlify(msg))) - self.panda.can_send(self.tx_addr, msg, self.bus) - elif rx_data[0] >> 4 == 0x2: - # consecutive rx frame - assert rx_frame["done"] == False, "rx: no active frame" - # validate frame index - rx_frame["idx"] += 1 - assert rx_frame["idx"] & 0xF == rx_data[0] & 0xF, "rx: invalid consecutive frame index" - rx_size = rx_frame["size"] - len(rx_frame["data"]) - rx_frame["data"] += rx_data[1:1+min(rx_size, 7)] - if rx_frame["size"] == len(rx_frame["data"]): - rx_frame["done"] = True - self.rx_queue.put(rx_frame["data"]) - elif rx_data[0] >> 4 == 0x3: - # flow control - if tx_frame["done"] != False: - tx_frame["done"] = True - self.rx_queue.put(b"\x7F\xFF\xFFtx: no active frame") - if rx_data[0] == 0x32: - # 0x32 = overflow/abort - tx_frame["done"] = True - self.rx_queue.put(b"\x7F\xFF\xFFtx: flow-control error - overflow/abort") - if rx_data[0] != 0x30 and rx_data[0] != 0x31: - # 0x30 = continue - # 0x31 = wait - tx_frame["done"] = True - self.rx_queue.put(b"\x7F\xFF\xFFtx: flow-control error - invalid transfer state indicator") - if rx_data[0] == 0x30: - delay_ts = rx_data[2] & 0x7F - # scale is 1 milliseconds if first bit == 0, 100 micro seconds if first bit == 1 - delay_div = 1000. if rx_data[2] & 0x80 == 0 else 10000. - # first frame = 6 bytes, each consecutive frame = 7 bytes - start = 6 + tx_frame["idx"] * 7 - count = rx_data[1] - end = start + count * 7 if count > 0 else tx_frame["size"] - for i in range(start, end, 7): - if delay_ts > 0 and i > start: - if (debug): print("D: {}".format(delay_ts / delay_div)) - time.sleep(delay_ts / delay_div) - tx_frame["idx"] += 1 - # consecutive tx frames - msg = (bytes([0x20 | (tx_frame["idx"] & 0xF)]) + tx_frame["data"][i:i+7]).ljust(8, b"\x00") - if (debug): print("S: {} {}".format(hex(self.tx_addr), hexlify(msg))) - self.panda.can_send(self.tx_addr, msg, self.bus) - if end >= tx_frame["size"]: - tx_frame["done"] = True - - if not self.tx_queue.empty(): - req = self.tx_queue.get(block=False) - # reset rx and tx frames - rx_frame = {"size": 0, "data": b"", "idx": 0, "done": True} - tx_frame = {"size": len(req), "data": req, "idx": 0, "done": False} - if tx_frame["size"] < 8: - # single frame - tx_frame["done"] = True - msg = (bytes([tx_frame["size"]]) + tx_frame["data"]).ljust(8, b"\x00") - if (debug): print("S: {} {}".format(hex(self.tx_addr), hexlify(msg))) - self.panda.can_send(self.tx_addr, msg, self.bus) - else: - # first rx_frame - tx_frame["done"] = False - msg = (struct.pack("!H", 0x1000 | tx_frame["size"]) + tx_frame["data"][:6]).ljust(8, b"\x00") - if (debug): print("S: {} {}".format(hex(self.tx_addr), hexlify(msg))) - self.panda.can_send(self.tx_addr, msg, self.bus) + if debug: print("CAN-RX: {} - {}".format(hex(self.rx_addr), hexlify(rx_data))) + self.can_rx_queue.put(rx_data) else: time.sleep(0.01) finally: self.panda.close() - self.rx_queue.put(None) # generic uds request def _uds_request(self, service_type: SERVICE_TYPE, subfunction: int=None, data: bytes=None) -> bytes: @@ -402,16 +437,12 @@ class UdsClient(): req += bytes([subfunction]) if data is not None: req += data - self.tx_queue.put(req) + # send request, wait for response + isotp_msg = IsoTpMessage(self.can_tx_queue, self.can_rx_queue, self.timeout, self.debug) + isotp_msg.send(req) while True: - try: - resp = self.rx_queue.get(block=True, timeout=self.timeout) - except Empty: - raise MessageTimeoutError("timeout waiting for response") - if resp is None: - raise MessageTimeoutError("timeout waiting for response") - + resp = isotp_msg.recv() resp_sid = resp[0] if len(resp) > 0 else None # negative response @@ -428,24 +459,22 @@ class UdsClient(): error_desc = resp[3:] # wait for another message if response pending if error_code == 0x78: - time.sleep(0.1) continue raise NegativeResponseError('{} - {}'.format(service_desc, error_desc), service_id, error_code) - break - # positive response - if service_type+0x40 != resp_sid: - resp_sid_hex = hex(resp_sid) if resp_sid is not None else None - raise InvalidServiceIdError('invalid response service id: {}'.format(resp_sid_hex)) + # positive response + if service_type+0x40 != resp_sid: + resp_sid_hex = hex(resp_sid) if resp_sid is not None else None + raise InvalidServiceIdError('invalid response service id: {}'.format(resp_sid_hex)) - if subfunction is not None: - resp_sfn = resp[1] if len(resp) > 1 else None - if subfunction != resp_sfn: - resp_sfn_hex = hex(resp_sfn) if resp_sfn is not None else None - raise InvalidSubFunctioneError('invalid response subfunction: {}'.format(hex(resp_sfn))) + if subfunction is not None: + resp_sfn = resp[1] if len(resp) > 1 else None + if subfunction != resp_sfn: + resp_sfn_hex = hex(resp_sfn) if resp_sfn is not None else None + raise InvalidSubFunctioneError('invalid response subfunction: {}'.format(hex(resp_sfn))) - # return data (exclude service id and sub-function id) - return resp[(1 if subfunction is None else 2):] + # return data (exclude service id and sub-function id) + return resp[(1 if subfunction is None else 2):] # services def diagnostic_session_control(self, session_type: SESSION_TYPE):