updates for python3
parent
932745f62b
commit
4f288586d5
|
@ -1,9 +1,9 @@
|
||||||
#!/usr/bin/env python
|
#!/usr/bin/env python3
|
||||||
import time
|
import time
|
||||||
import struct
|
import struct
|
||||||
from enum import IntEnum
|
from enum import IntEnum
|
||||||
from Queue import Queue, Empty
|
from queue import Queue, Empty
|
||||||
import threading
|
from threading import Thread
|
||||||
from binascii import hexlify
|
from binascii import hexlify
|
||||||
|
|
||||||
class SERVICE_TYPE(IntEnum):
|
class SERVICE_TYPE(IntEnum):
|
||||||
|
@ -267,8 +267,10 @@ class UdsClient():
|
||||||
self.tx_addr = tx_addr
|
self.tx_addr = tx_addr
|
||||||
if rx_addr == None:
|
if rx_addr == None:
|
||||||
if tx_addr < 0xFFF8:
|
if tx_addr < 0xFFF8:
|
||||||
|
# standard 11 bit response addr (add 8)
|
||||||
self.rx_addr = tx_addr+8
|
self.rx_addr = tx_addr+8
|
||||||
elif tx_addr > 0x10000000 and tx_addr < 0xFFFFFFFF:
|
elif tx_addr > 0x10000000 and tx_addr < 0xFFFFFFFF:
|
||||||
|
# standard 19 bit response addr (flip last two bytes)
|
||||||
self.rx_addr = (tx_addr & 0xFFFF0000) + (tx_addr<<8 & 0xFF00) + (tx_addr>>8 & 0xFF)
|
self.rx_addr = (tx_addr & 0xFFFF0000) + (tx_addr<<8 & 0xFF00) + (tx_addr>>8 & 0xFF)
|
||||||
else:
|
else:
|
||||||
raise ValueError("invalid tx_addr: {}".format(tx_addr))
|
raise ValueError("invalid tx_addr: {}".format(tx_addr))
|
||||||
|
@ -278,7 +280,7 @@ class UdsClient():
|
||||||
self.timeout = timeout
|
self.timeout = timeout
|
||||||
self.debug = debug
|
self.debug = debug
|
||||||
|
|
||||||
self.can_reader_t = threading.Thread(target=self._isotp_thread, args=(self.debug,))
|
self.can_reader_t = Thread(target=self._isotp_thread, args=(self.debug,))
|
||||||
self.can_reader_t.daemon = True
|
self.can_reader_t.daemon = True
|
||||||
self.can_reader_t.start()
|
self.can_reader_t.start()
|
||||||
|
|
||||||
|
@ -315,7 +317,7 @@ class UdsClient():
|
||||||
rx_frame["idx"] = 0
|
rx_frame["idx"] = 0
|
||||||
rx_frame["done"] = False
|
rx_frame["done"] = False
|
||||||
# send flow control message (send all bytes)
|
# send flow control message (send all bytes)
|
||||||
msg = "\x30\x00\x00".ljust(8, "\x00")
|
msg = b"\x30\x00\x00".ljust(8, b"\x00")
|
||||||
if (debug): print("S: {} {}".format(hex(self.tx_addr), hexlify(msg)))
|
if (debug): print("S: {} {}".format(hex(self.tx_addr), hexlify(msg)))
|
||||||
self.panda.can_send(self.tx_addr, msg, self.bus)
|
self.panda.can_send(self.tx_addr, msg, self.bus)
|
||||||
elif rx_data[0] >> 4 == 0x2:
|
elif rx_data[0] >> 4 == 0x2:
|
||||||
|
@ -344,7 +346,7 @@ class UdsClient():
|
||||||
for i in range(start, end, 7):
|
for i in range(start, end, 7):
|
||||||
tx_frame["idx"] += 1
|
tx_frame["idx"] += 1
|
||||||
# consecutive tx frames
|
# consecutive tx frames
|
||||||
msg = (chr(0x20 | (tx_frame["idx"] & 0xF)) + tx_frame["data"][i:i+7]).ljust(8, "\x00")
|
msg = (chr(0x20 | (tx_frame["idx"] & 0xF)).encode("utf8") + tx_frame["data"][i:i+7]).ljust(8, b"\x00")
|
||||||
if (debug): print("S: {} {}".format(hex(self.tx_addr), hexlify(msg)))
|
if (debug): print("S: {} {}".format(hex(self.tx_addr), hexlify(msg)))
|
||||||
self.panda.can_send(self.tx_addr, msg, self.bus)
|
self.panda.can_send(self.tx_addr, msg, self.bus)
|
||||||
if delay_ts > 0:
|
if delay_ts > 0:
|
||||||
|
@ -360,13 +362,13 @@ class UdsClient():
|
||||||
if tx_frame["size"] < 8:
|
if tx_frame["size"] < 8:
|
||||||
# single frame
|
# single frame
|
||||||
tx_frame["done"] = True
|
tx_frame["done"] = True
|
||||||
msg = (chr(tx_frame["size"]) + tx_frame["data"]).ljust(8, "\x00")
|
msg = (chr(tx_frame["size"]).encode("utf8") + tx_frame["data"]).ljust(8, b"\x00")
|
||||||
if (debug): print("S: {} {}".format(hex(self.tx_addr), hexlify(msg)))
|
if (debug): print("S: {} {}".format(hex(self.tx_addr), hexlify(msg)))
|
||||||
self.panda.can_send(self.tx_addr, msg, self.bus)
|
self.panda.can_send(self.tx_addr, msg, self.bus)
|
||||||
else:
|
else:
|
||||||
# first rx_frame
|
# first rx_frame
|
||||||
tx_frame["done"] = False
|
tx_frame["done"] = False
|
||||||
msg = (struct.pack("!H", 0x1000 | tx_frame["size"]) + tx_frame["data"][:6]).ljust(8, "\x00")
|
msg = (struct.pack("!H", 0x1000 | tx_frame["size"]) + tx_frame["data"][:6]).ljust(8, b"\x00")
|
||||||
if (debug): print("S: {} {}".format(hex(self.tx_addr), hexlify(msg)))
|
if (debug): print("S: {} {}".format(hex(self.tx_addr), hexlify(msg)))
|
||||||
self.panda.can_send(self.tx_addr, msg, self.bus)
|
self.panda.can_send(self.tx_addr, msg, self.bus)
|
||||||
else:
|
else:
|
||||||
|
@ -377,9 +379,9 @@ class UdsClient():
|
||||||
|
|
||||||
# generic uds request
|
# generic uds request
|
||||||
def _uds_request(self, service_type, subfunction=None, data=None):
|
def _uds_request(self, service_type, subfunction=None, data=None):
|
||||||
req = chr(service_type)
|
req = chr(service_type).encode("utf8")
|
||||||
if subfunction is not None:
|
if subfunction is not None:
|
||||||
req += chr(subfunction)
|
req += chr(subfunction).encode("utf8")
|
||||||
if data is not None:
|
if data is not None:
|
||||||
req += data
|
req += data
|
||||||
self.tx_queue.put(req)
|
self.tx_queue.put(req)
|
||||||
|
@ -450,7 +452,7 @@ class UdsClient():
|
||||||
return security_seed
|
return security_seed
|
||||||
|
|
||||||
def communication_control(self, control_type, message_type):
|
def communication_control(self, control_type, message_type):
|
||||||
data = chr(message_type)
|
data = chr(message_type).encode("utf8")
|
||||||
self._uds_request(SERVICE_TYPE.COMMUNICATION_CONTROL, subfunction=control_type, data=data)
|
self._uds_request(SERVICE_TYPE.COMMUNICATION_CONTROL, subfunction=control_type, data=data)
|
||||||
|
|
||||||
def tester_present(self, ):
|
def tester_present(self, ):
|
||||||
|
@ -503,7 +505,7 @@ class UdsClient():
|
||||||
def link_control(self, link_control_type, baud_rate_type=None):
|
def link_control(self, link_control_type, baud_rate_type=None):
|
||||||
if link_control_type == LINK_CONTROL_TYPE.VERIFY_BAUDRATE_TRANSITION_WITH_FIXED_BAUDRATE:
|
if link_control_type == LINK_CONTROL_TYPE.VERIFY_BAUDRATE_TRANSITION_WITH_FIXED_BAUDRATE:
|
||||||
# baud_rate_type = BAUD_RATE_TYPE
|
# baud_rate_type = BAUD_RATE_TYPE
|
||||||
data = chr(baud_rate_type)
|
data = chr(baud_rate_type).encode("utf8")
|
||||||
elif link_control_type == LINK_CONTROL_TYPE.VERIFY_BAUDRATE_TRANSITION_WITH_SPECIFIC_BAUDRATE:
|
elif link_control_type == LINK_CONTROL_TYPE.VERIFY_BAUDRATE_TRANSITION_WITH_SPECIFIC_BAUDRATE:
|
||||||
# baud_rate_type = custom value (3 bytes big-endian)
|
# baud_rate_type = custom value (3 bytes big-endian)
|
||||||
data = struct.pack('!I', baud_rate_type)[1:]
|
data = struct.pack('!I', baud_rate_type)[1:]
|
||||||
|
@ -525,7 +527,7 @@ class UdsClient():
|
||||||
raise ValueError('invalid memory_address_bytes: {}'.format(memory_address_bytes))
|
raise ValueError('invalid memory_address_bytes: {}'.format(memory_address_bytes))
|
||||||
if memory_size_bytes < 1 or memory_size_bytes > 4:
|
if memory_size_bytes < 1 or memory_size_bytes > 4:
|
||||||
raise ValueError('invalid memory_size_bytes: {}'.format(memory_size_bytes))
|
raise ValueError('invalid memory_size_bytes: {}'.format(memory_size_bytes))
|
||||||
data = chr(memory_size_bytes<<4 | memory_address_bytes)
|
data = chr(memory_size_bytes<<4 | memory_address_bytes).encode("utf8")
|
||||||
|
|
||||||
if memory_address >= 1<<(memory_address_bytes*8):
|
if memory_address >= 1<<(memory_address_bytes*8):
|
||||||
raise ValueError('invalid memory_address: {}'.format(memory_address))
|
raise ValueError('invalid memory_address: {}'.format(memory_address))
|
||||||
|
@ -547,7 +549,7 @@ class UdsClient():
|
||||||
|
|
||||||
def read_data_by_periodic_identifier(self, transmission_mode_type, periodic_data_identifier):
|
def read_data_by_periodic_identifier(self, transmission_mode_type, periodic_data_identifier):
|
||||||
# TODO: support list of identifiers
|
# TODO: support list of identifiers
|
||||||
data = chr(transmission_mode_type) + chr(periodic_data_identifier)
|
data = chr(transmission_mode_type).encode("utf8") + chr(periodic_data_identifier).encode("utf8")
|
||||||
self._uds_request(SERVICE_TYPE.READ_DATA_BY_PERIODIC_IDENTIFIER, subfunction=None, data=data)
|
self._uds_request(SERVICE_TYPE.READ_DATA_BY_PERIODIC_IDENTIFIER, subfunction=None, data=data)
|
||||||
|
|
||||||
def dynamically_define_data_identifier(self, dynamic_definition_type, dynamic_data_identifier, source_definitions, memory_address_bytes=4, memory_size_bytes=1):
|
def dynamically_define_data_identifier(self, dynamic_definition_type, dynamic_data_identifier, source_definitions, memory_address_bytes=4, memory_size_bytes=1):
|
||||||
|
@ -559,9 +561,9 @@ class UdsClient():
|
||||||
data = struct.pack('!H', dynamic_data_identifier)
|
data = struct.pack('!H', dynamic_data_identifier)
|
||||||
if dynamic_definition_type == DYNAMIC_DEFINITION_TYPE.DEFINE_BY_IDENTIFIER:
|
if dynamic_definition_type == DYNAMIC_DEFINITION_TYPE.DEFINE_BY_IDENTIFIER:
|
||||||
for s in source_definitions:
|
for s in source_definitions:
|
||||||
data += struct.pack('!H', s["data_identifier"]) + chr(s["position"]) + chr(s["memory_size"])
|
data += struct.pack('!H', s["data_identifier"]) + chr(s["position"]).encode("utf8") + chr(s["memory_size"]).encode("utf8")
|
||||||
elif dynamic_definition_type == DYNAMIC_DEFINITION_TYPE.DEFINE_BY_MEMORY_ADDRESS:
|
elif dynamic_definition_type == DYNAMIC_DEFINITION_TYPE.DEFINE_BY_MEMORY_ADDRESS:
|
||||||
data += chr(memory_size_bytes<<4 | memory_address_bytes)
|
data += chr(memory_size_bytes<<4 | memory_address_bytes).encode("utf8")
|
||||||
for s in source_definitions:
|
for s in source_definitions:
|
||||||
if s["memory_address"] >= 1<<(memory_address_bytes*8):
|
if s["memory_address"] >= 1<<(memory_address_bytes*8):
|
||||||
raise ValueError('invalid memory_address: {}'.format(s["memory_address"]))
|
raise ValueError('invalid memory_address: {}'.format(s["memory_address"]))
|
||||||
|
@ -587,7 +589,7 @@ class UdsClient():
|
||||||
raise ValueError('invalid memory_address_bytes: {}'.format(memory_address_bytes))
|
raise ValueError('invalid memory_address_bytes: {}'.format(memory_address_bytes))
|
||||||
if memory_size_bytes < 1 or memory_size_bytes > 4:
|
if memory_size_bytes < 1 or memory_size_bytes > 4:
|
||||||
raise ValueError('invalid memory_size_bytes: {}'.format(memory_size_bytes))
|
raise ValueError('invalid memory_size_bytes: {}'.format(memory_size_bytes))
|
||||||
data = chr(memory_size_bytes<<4 | memory_address_bytes)
|
data = chr(memory_size_bytes<<4 | memory_address_bytes).encode("utf8")
|
||||||
|
|
||||||
if memory_address >= 1<<(memory_address_bytes*8):
|
if memory_address >= 1<<(memory_address_bytes*8):
|
||||||
raise ValueError('invalid memory_address: {}'.format(memory_address))
|
raise ValueError('invalid memory_address: {}'.format(memory_address))
|
||||||
|
@ -612,7 +614,7 @@ class UdsClient():
|
||||||
dtc_report_type == DTC_REPORT_TYPE.NUMBER_OF_MIRROR_MEMORY_DTC_BY_STATUS_MASK or \
|
dtc_report_type == DTC_REPORT_TYPE.NUMBER_OF_MIRROR_MEMORY_DTC_BY_STATUS_MASK or \
|
||||||
dtc_report_type == DTC_REPORT_TYPE.NUMBER_OF_EMISSIONS_RELATED_OBD_DTC_BY_STATUS_MASK or \
|
dtc_report_type == DTC_REPORT_TYPE.NUMBER_OF_EMISSIONS_RELATED_OBD_DTC_BY_STATUS_MASK or \
|
||||||
dtc_report_type == DTC_REPORT_TYPE.EMISSIONS_RELATED_OBD_DTC_BY_STATUS_MASK:
|
dtc_report_type == DTC_REPORT_TYPE.EMISSIONS_RELATED_OBD_DTC_BY_STATUS_MASK:
|
||||||
data += chr(dtc_status_mask_type)
|
data += chr(dtc_status_mask_type).encode("utf8")
|
||||||
# dtc_mask_record
|
# dtc_mask_record
|
||||||
if dtc_report_type == DTC_REPORT_TYPE.DTC_SNAPSHOT_IDENTIFICATION or \
|
if dtc_report_type == DTC_REPORT_TYPE.DTC_SNAPSHOT_IDENTIFICATION or \
|
||||||
dtc_report_type == DTC_REPORT_TYPE.DTC_SNAPSHOT_RECORD_BY_DTC_NUMBER or \
|
dtc_report_type == DTC_REPORT_TYPE.DTC_SNAPSHOT_RECORD_BY_DTC_NUMBER or \
|
||||||
|
@ -628,11 +630,11 @@ class UdsClient():
|
||||||
# dtc_extended_record_num
|
# dtc_extended_record_num
|
||||||
if dtc_report_type == DTC_REPORT_TYPE.DTC_EXTENDED_DATA_RECORD_BY_DTC_NUMBER or \
|
if dtc_report_type == DTC_REPORT_TYPE.DTC_EXTENDED_DATA_RECORD_BY_DTC_NUMBER or \
|
||||||
dtc_report_type == DTC_REPORT_TYPE.MIRROR_MEMORY_DTC_EXTENDED_DATA_RECORD_BY_DTC_NUMBER:
|
dtc_report_type == DTC_REPORT_TYPE.MIRROR_MEMORY_DTC_EXTENDED_DATA_RECORD_BY_DTC_NUMBER:
|
||||||
data += chr(dtc_extended_record_num)
|
data += chr(dtc_extended_record_num).encode("utf8")
|
||||||
# dtc_severity_mask_type
|
# dtc_severity_mask_type
|
||||||
if dtc_report_type == DTC_REPORT_TYPE.NUMBER_OF_DTC_BY_SEVERITY_MASK_RECORD or \
|
if dtc_report_type == DTC_REPORT_TYPE.NUMBER_OF_DTC_BY_SEVERITY_MASK_RECORD or \
|
||||||
dtc_report_type == DTC_REPORT_TYPE.DTC_BY_SEVERITY_MASK_RECORD:
|
dtc_report_type == DTC_REPORT_TYPE.DTC_BY_SEVERITY_MASK_RECORD:
|
||||||
data += chr(dtc_severity_mask_type) + chr(dtc_status_mask_type)
|
data += chr(dtc_severity_mask_type).encode("utf8") + chr(dtc_status_mask_type).encode("utf8")
|
||||||
|
|
||||||
resp = self._uds_request(SERVICE_TYPE.READ_DTC_INFORMATION, subfunction=dtc_report_type, data=data)
|
resp = self._uds_request(SERVICE_TYPE.READ_DTC_INFORMATION, subfunction=dtc_report_type, data=data)
|
||||||
|
|
||||||
|
@ -656,13 +658,13 @@ class UdsClient():
|
||||||
return resp[2:]
|
return resp[2:]
|
||||||
|
|
||||||
def request_download(self, memory_address, memory_size, memory_address_bytes=4, memory_size_bytes=4, data_format=0x00):
|
def request_download(self, memory_address, memory_size, memory_address_bytes=4, memory_size_bytes=4, data_format=0x00):
|
||||||
data = chr(data_format)
|
data = chr(data_format).encode("utf8")
|
||||||
|
|
||||||
if memory_address_bytes < 1 or memory_address_bytes > 4:
|
if memory_address_bytes < 1 or memory_address_bytes > 4:
|
||||||
raise ValueError('invalid memory_address_bytes: {}'.format(memory_address_bytes))
|
raise ValueError('invalid memory_address_bytes: {}'.format(memory_address_bytes))
|
||||||
if memory_size_bytes < 1 or memory_size_bytes > 4:
|
if memory_size_bytes < 1 or memory_size_bytes > 4:
|
||||||
raise ValueError('invalid memory_size_bytes: {}'.format(memory_size_bytes))
|
raise ValueError('invalid memory_size_bytes: {}'.format(memory_size_bytes))
|
||||||
data += chr(memory_size_bytes<<4 | memory_address_bytes)
|
data += chr(memory_size_bytes<<4 | memory_address_bytes).encode("utf8")
|
||||||
|
|
||||||
if memory_address >= 1<<(memory_address_bytes*8):
|
if memory_address >= 1<<(memory_address_bytes*8):
|
||||||
raise ValueError('invalid memory_address: {}'.format(memory_address))
|
raise ValueError('invalid memory_address: {}'.format(memory_address))
|
||||||
|
@ -681,13 +683,13 @@ class UdsClient():
|
||||||
return max_num_bytes # max number of bytes per transfer data request
|
return max_num_bytes # max number of bytes per transfer data request
|
||||||
|
|
||||||
def request_upload(self, memory_address, memory_size, memory_address_bytes=4, memory_size_bytes=4, data_format=0x00):
|
def request_upload(self, memory_address, memory_size, memory_address_bytes=4, memory_size_bytes=4, data_format=0x00):
|
||||||
data = chr(data_format)
|
data = chr(data_format).encode("utf8")
|
||||||
|
|
||||||
if memory_address_bytes < 1 or memory_address_bytes > 4:
|
if memory_address_bytes < 1 or memory_address_bytes > 4:
|
||||||
raise ValueError('invalid memory_address_bytes: {}'.format(memory_address_bytes))
|
raise ValueError('invalid memory_address_bytes: {}'.format(memory_address_bytes))
|
||||||
if memory_size_bytes < 1 or memory_size_bytes > 4:
|
if memory_size_bytes < 1 or memory_size_bytes > 4:
|
||||||
raise ValueError('invalid memory_size_bytes: {}'.format(memory_size_bytes))
|
raise ValueError('invalid memory_size_bytes: {}'.format(memory_size_bytes))
|
||||||
data += chr(memory_size_bytes<<4 | memory_address_bytes)
|
data += chr(memory_size_bytes<<4 | memory_address_bytes).encode("utf8")
|
||||||
|
|
||||||
if memory_address >= 1<<(memory_address_bytes*8):
|
if memory_address >= 1<<(memory_address_bytes*8):
|
||||||
raise ValueError('invalid memory_address: {}'.format(memory_address))
|
raise ValueError('invalid memory_address: {}'.format(memory_address))
|
||||||
|
@ -706,7 +708,7 @@ class UdsClient():
|
||||||
return max_num_bytes # max number of bytes per transfer data request
|
return max_num_bytes # max number of bytes per transfer data request
|
||||||
|
|
||||||
def transfer_data(self, block_sequence_count, data=''):
|
def transfer_data(self, block_sequence_count, data=''):
|
||||||
data = chr(block_sequence_count)+data
|
data = chr(block_sequence_count).encode("utf8") + data
|
||||||
resp = self._uds_request(SERVICE_TYPE.TRANSFER_DATA, subfunction=None, data=data)
|
resp = self._uds_request(SERVICE_TYPE.TRANSFER_DATA, subfunction=None, data=data)
|
||||||
resp_id = resp[0] if len(resp) > 0 else None
|
resp_id = resp[0] if len(resp) > 0 else None
|
||||||
if resp_id != block_sequence_count:
|
if resp_id != block_sequence_count:
|
||||||
|
|
Loading…
Reference in New Issue