better CAN comm abstraction

Greg Hogan 2019-10-15 12:07:19 -07:00
parent 43adad3116
commit 4454e3a6bb
1 changed files with 143 additions and 114 deletions

View File

@ -267,6 +267,117 @@ _negative_response_codes = {
0x93: 'voltage too low',
class IsoTpMessage():
def __init__(self, can_tx_queue: Queue, can_rx_queue: Queue, timeout: float, debug: bool=False):
self.can_tx_queue = can_tx_queue
self.can_rx_queue = can_rx_queue
self.timeout = timeout
self.debug = debug
def send(self, dat: bytes) -> None:
self.tx_dat = dat
self.tx_len = len(dat)
self.tx_idx = 0
self.tx_done = False
if self.debug: print(f"ISO-TP: REQUEST - {hexlify(self.tx_dat)}")
def _tx_first_frame(self) -> None:
if self.tx_len < 8:
# single frame (send all bytes)
if self.debug: print("ISO-TP: TX - single frame")
msg = (bytes([self.tx_len]) + self.tx_dat).ljust(8, b"\x00")
self.tx_done = True
# first frame (send first 6 bytes)
if self.debug: print("ISO-TP: TX - first frame")
msg = (struct.pack("!H", 0x1000 | self.tx_len) + self.tx_dat[:6]).ljust(8, b"\x00")
def recv(self) -> bytes:
self.rx_dat = b""
self.rx_len = 0
self.rx_idx = 0
self.rx_done = False
while True:
if self.tx_done and self.rx_done:
return self.rx_dat
except Empty:
raise MessageTimeoutError("timeout waiting for response")
if self.debug: print(f"ISO-TP: RESPONSE - {hexlify(self.rx_dat)}")
def _isotp_rx_next(self) -> None:
rx_data = self.can_rx_queue.get(block=True, timeout=self.timeout)
# single rx_frame
if rx_data[0] >> 4 == 0x0:
self.rx_len = rx_data[0] & 0xFF
self.rx_dat = rx_data[1:1+self.rx_len]
self.rx_idx = 0
self.rx_done = True
if self.debug: print(f"ISO-TP: RX - single frame - idx={self.rx_idx} done={self.rx_done}")
# first rx_frame
if rx_data[0] >> 4 == 0x1:
self.rx_len = ((rx_data[0] & 0x0F) << 8) + rx_data[1]
self.rx_dat = rx_data[2:]
self.rx_idx = 0
self.rx_done = False
if self.debug: print(f"ISO-TP: RX - first frame - idx={self.rx_idx} done={self.rx_done}")
if self.debug: print(f"ISO-TP: TX - flow control continue")
# send flow control message (send all bytes)
msg = b"\x30\x00\x00".ljust(8, b"\x00")
# consecutive rx frame
if rx_data[0] >> 4 == 0x2:
assert self.rx_done == False, "isotp - rx: consecutive frame with no active frame"
self.rx_idx += 1
assert self.rx_idx & 0xF == rx_data[0] & 0xF, "isotp - rx: invalid consecutive frame index"
rx_size = self.rx_len - len(self.rx_dat)
self.rx_dat += rx_data[1:1+min(rx_size, 7)]
if self.rx_len == len(self.rx_dat):
self.rx_done = True
if self.debug: print(f"ISO-TP: RX - consecutive frame - idx={self.rx_idx} done={self.rx_done}")
# flow control
if rx_data[0] >> 4 == 0x3:
assert self.tx_done == False, "isotp - rx: flow control with no active frame"
assert rx_data[0] != 0x32, "isotp - rx: flow-control overflow/abort"
assert rx_data[0] == 0x30 or rx_data[0] == 0x31, "isotp - rx: flow-control transfer state indicator invalid"
if rx_data[0] == 0x30:
if self.debug: print("ISO-TP: RX - flow control continue")
delay_ts = rx_data[2] & 0x7F
# scale is 1 milliseconds if first bit == 0, 100 micro seconds if first bit == 1
delay_div = 1000. if rx_data[2] & 0x80 == 0 else 10000.
# first frame = 6 bytes, each consecutive frame = 7 bytes
start = 6 + self.tx_idx * 7
count = rx_data[1]
end = start + count * 7 if count > 0 else self.tx_len
for i in range(start, end, 7):
if delay_ts > 0 and i > start:
delay_s = delay_ts / delay_div
if self.debug: print(f"ISO-TP: TX - delay - seconds={delay_s}")
self.tx_idx += 1
# consecutive tx frames
msg = (bytes([0x20 | (self.tx_idx & 0xF)]) + self.tx_dat[i:i+7]).ljust(8, b"\x00")
if end >= self.tx_len:
self.tx_done = True
if self.debug: print(f"ISO-TP: TX - consecutive frame - idx={self.tx_idx} done={self.tx_done}")
elif rx_data[0] == 0x31:
# wait (do nothing until next flow control message)
if self.debug: print("ISO-TP: TX - flow control wait")
class UdsClient():
def __init__(self, panda, tx_addr: int, rx_addr: int=None, bus: int=0, timeout: int=10, debug: bool=False):
self.panda = panda
@ -282,20 +393,17 @@ class UdsClient():
raise ValueError("invalid tx_addr: {}".format(tx_addr))
self.tx_queue = Queue()
self.rx_queue = Queue()
self.can_tx_queue = Queue()
self.can_rx_queue = Queue()
self.timeout = timeout
self.debug = debug
self.can_reader_t = Thread(target=self._isotp_thread, args=(self.debug,))
self.can_reader_t.daemon = True
self.can_thread = Thread(target=self._can_thread, args=(self.debug,))
self.can_thread.daemon = True
def _isotp_thread(self, debug: bool=False):
def _can_thread(self, debug: bool=False):
rx_frame = {"size": 0, "data": b"", "idx": 0, "done": True}
tx_frame = {"size": 0, "data": b"", "idx": 0, "done": True}
# allow all output
# clear tx buffer
@ -304,96 +412,23 @@ class UdsClient():
while True:
messages = self.panda.can_recv()
for rx_addr, rx_ts, rx_data, rx_bus in messages:
# send
while not self.can_tx_queue.empty():
msg = self.can_tx_queue.get(block=False)
if debug: print("CAN-TX: {} - {}".format(hex(self.tx_addr), hexlify(msg)))
self.panda.can_send(self.tx_addr, msg, self.bus)
# receive
msgs = self.panda.can_recv()
for rx_addr, rx_ts, rx_data, rx_bus in msgs:
if rx_bus != self.bus or rx_addr != self.rx_addr or len(rx_data) == 0:
if (debug): print("R: {} {}".format(hex(rx_addr), hexlify(rx_data)))
if rx_data[0] >> 4 == 0x0:
# single rx_frame
rx_frame["size"] = rx_data[0] & 0xFF
rx_frame["data"] = rx_data[1:1+rx_frame["size"]]
rx_frame["idx"] = 0
rx_frame["done"] = True
elif rx_data[0] >> 4 == 0x1:
# first rx_frame
rx_frame["size"] = ((rx_data[0] & 0x0F) << 8) + rx_data[1]
rx_frame["data"] = rx_data[2:]
rx_frame["idx"] = 0
rx_frame["done"] = False
# send flow control message (send all bytes)
msg = b"\x30\x00\x00".ljust(8, b"\x00")
if (debug): print("S: {} {}".format(hex(self.tx_addr), hexlify(msg)))
self.panda.can_send(self.tx_addr, msg, self.bus)
elif rx_data[0] >> 4 == 0x2:
# consecutive rx frame
assert rx_frame["done"] == False, "rx: no active frame"
# validate frame index
rx_frame["idx"] += 1
assert rx_frame["idx"] & 0xF == rx_data[0] & 0xF, "rx: invalid consecutive frame index"
rx_size = rx_frame["size"] - len(rx_frame["data"])
rx_frame["data"] += rx_data[1:1+min(rx_size, 7)]
if rx_frame["size"] == len(rx_frame["data"]):
rx_frame["done"] = True
elif rx_data[0] >> 4 == 0x3:
# flow control
if tx_frame["done"] != False:
tx_frame["done"] = True
self.rx_queue.put(b"\x7F\xFF\xFFtx: no active frame")
if rx_data[0] == 0x32:
# 0x32 = overflow/abort
tx_frame["done"] = True
self.rx_queue.put(b"\x7F\xFF\xFFtx: flow-control error - overflow/abort")
if rx_data[0] != 0x30 and rx_data[0] != 0x31:
# 0x30 = continue
# 0x31 = wait
tx_frame["done"] = True
self.rx_queue.put(b"\x7F\xFF\xFFtx: flow-control error - invalid transfer state indicator")
if rx_data[0] == 0x30:
delay_ts = rx_data[2] & 0x7F
# scale is 1 milliseconds if first bit == 0, 100 micro seconds if first bit == 1
delay_div = 1000. if rx_data[2] & 0x80 == 0 else 10000.
# first frame = 6 bytes, each consecutive frame = 7 bytes
start = 6 + tx_frame["idx"] * 7
count = rx_data[1]
end = start + count * 7 if count > 0 else tx_frame["size"]
for i in range(start, end, 7):
if delay_ts > 0 and i > start:
if (debug): print("D: {}".format(delay_ts / delay_div))
time.sleep(delay_ts / delay_div)
tx_frame["idx"] += 1
# consecutive tx frames
msg = (bytes([0x20 | (tx_frame["idx"] & 0xF)]) + tx_frame["data"][i:i+7]).ljust(8, b"\x00")
if (debug): print("S: {} {}".format(hex(self.tx_addr), hexlify(msg)))
self.panda.can_send(self.tx_addr, msg, self.bus)
if end >= tx_frame["size"]:
tx_frame["done"] = True
if not self.tx_queue.empty():
req = self.tx_queue.get(block=False)
# reset rx and tx frames
rx_frame = {"size": 0, "data": b"", "idx": 0, "done": True}
tx_frame = {"size": len(req), "data": req, "idx": 0, "done": False}
if tx_frame["size"] < 8:
# single frame
tx_frame["done"] = True
msg = (bytes([tx_frame["size"]]) + tx_frame["data"]).ljust(8, b"\x00")
if (debug): print("S: {} {}".format(hex(self.tx_addr), hexlify(msg)))
self.panda.can_send(self.tx_addr, msg, self.bus)
# first rx_frame
tx_frame["done"] = False
msg = (struct.pack("!H", 0x1000 | tx_frame["size"]) + tx_frame["data"][:6]).ljust(8, b"\x00")
if (debug): print("S: {} {}".format(hex(self.tx_addr), hexlify(msg)))
self.panda.can_send(self.tx_addr, msg, self.bus)
if debug: print("CAN-RX: {} - {}".format(hex(self.rx_addr), hexlify(rx_data)))
# generic uds request
def _uds_request(self, service_type: SERVICE_TYPE, subfunction: int=None, data: bytes=None) -> bytes:
@ -402,16 +437,12 @@ class UdsClient():
req += bytes([subfunction])
if data is not None:
req += data
# send request, wait for response
isotp_msg = IsoTpMessage(self.can_tx_queue, self.can_rx_queue, self.timeout, self.debug)
while True:
resp = self.rx_queue.get(block=True, timeout=self.timeout)
except Empty:
raise MessageTimeoutError("timeout waiting for response")
if resp is None:
raise MessageTimeoutError("timeout waiting for response")
resp = isotp_msg.recv()
resp_sid = resp[0] if len(resp) > 0 else None
# negative response
@ -428,24 +459,22 @@ class UdsClient():
error_desc = resp[3:]
# wait for another message if response pending
if error_code == 0x78:
raise NegativeResponseError('{} - {}'.format(service_desc, error_desc), service_id, error_code)
# positive response
if service_type+0x40 != resp_sid:
resp_sid_hex = hex(resp_sid) if resp_sid is not None else None
raise InvalidServiceIdError('invalid response service id: {}'.format(resp_sid_hex))
# positive response
if service_type+0x40 != resp_sid:
resp_sid_hex = hex(resp_sid) if resp_sid is not None else None
raise InvalidServiceIdError('invalid response service id: {}'.format(resp_sid_hex))
if subfunction is not None:
resp_sfn = resp[1] if len(resp) > 1 else None
if subfunction != resp_sfn:
resp_sfn_hex = hex(resp_sfn) if resp_sfn is not None else None
raise InvalidSubFunctioneError('invalid response subfunction: {}'.format(hex(resp_sfn)))
if subfunction is not None:
resp_sfn = resp[1] if len(resp) > 1 else None
if subfunction != resp_sfn:
resp_sfn_hex = hex(resp_sfn) if resp_sfn is not None else None
raise InvalidSubFunctioneError('invalid response subfunction: {}'.format(hex(resp_sfn)))
# return data (exclude service id and sub-function id)
return resp[(1 if subfunction is None else 2):]
# return data (exclude service id and sub-function id)
return resp[(1 if subfunction is None else 2):]
# services
def diagnostic_session_control(self, session_type: SESSION_TYPE):