From 4f288586d59e3e308852e4ec0f988a6529b53894 Mon Sep 17 00:00:00 2001 From: Greg Hogan Date: Sat, 12 Oct 2019 18:52:17 -0700 Subject: [PATCH] updates for python3 --- python/uds.py | 52 ++++++++++++++++++++++++++------------------------- 1 file changed, 27 insertions(+), 25 deletions(-) diff --git a/python/uds.py b/python/uds.py index 9f0a8ae..3c96477 100644 --- a/python/uds.py +++ b/python/uds.py @@ -1,9 +1,9 @@ -#!/usr/bin/env python +#!/usr/bin/env python3 import time import struct from enum import IntEnum -from Queue import Queue, Empty -import threading +from queue import Queue, Empty +from threading import Thread from binascii import hexlify class SERVICE_TYPE(IntEnum): @@ -267,8 +267,10 @@ class UdsClient(): self.tx_addr = tx_addr if rx_addr == None: if tx_addr < 0xFFF8: + # standard 11 bit response addr (add 8) self.rx_addr = tx_addr+8 elif tx_addr > 0x10000000 and tx_addr < 0xFFFFFFFF: + # standard 19 bit response addr (flip last two bytes) self.rx_addr = (tx_addr & 0xFFFF0000) + (tx_addr<<8 & 0xFF00) + (tx_addr>>8 & 0xFF) else: raise ValueError("invalid tx_addr: {}".format(tx_addr)) @@ -278,7 +280,7 @@ class UdsClient(): self.timeout = timeout self.debug = debug - self.can_reader_t = threading.Thread(target=self._isotp_thread, args=(self.debug,)) + self.can_reader_t = Thread(target=self._isotp_thread, args=(self.debug,)) self.can_reader_t.daemon = True self.can_reader_t.start() @@ -315,7 +317,7 @@ class UdsClient(): rx_frame["idx"] = 0 rx_frame["done"] = False # send flow control message (send all bytes) - msg = "\x30\x00\x00".ljust(8, "\x00") + msg = b"\x30\x00\x00".ljust(8, b"\x00") if (debug): print("S: {} {}".format(hex(self.tx_addr), hexlify(msg))) self.panda.can_send(self.tx_addr, msg, self.bus) elif rx_data[0] >> 4 == 0x2: @@ -344,7 +346,7 @@ class UdsClient(): for i in range(start, end, 7): tx_frame["idx"] += 1 # consecutive tx frames - msg = (chr(0x20 | (tx_frame["idx"] & 0xF)) + tx_frame["data"][i:i+7]).ljust(8, "\x00") + msg = (chr(0x20 | (tx_frame["idx"] & 0xF)).encode("utf8") + tx_frame["data"][i:i+7]).ljust(8, b"\x00") if (debug): print("S: {} {}".format(hex(self.tx_addr), hexlify(msg))) self.panda.can_send(self.tx_addr, msg, self.bus) if delay_ts > 0: @@ -360,13 +362,13 @@ class UdsClient(): if tx_frame["size"] < 8: # single frame tx_frame["done"] = True - msg = (chr(tx_frame["size"]) + tx_frame["data"]).ljust(8, "\x00") + msg = (chr(tx_frame["size"]).encode("utf8") + tx_frame["data"]).ljust(8, b"\x00") if (debug): print("S: {} {}".format(hex(self.tx_addr), hexlify(msg))) self.panda.can_send(self.tx_addr, msg, self.bus) else: # first rx_frame tx_frame["done"] = False - msg = (struct.pack("!H", 0x1000 | tx_frame["size"]) + tx_frame["data"][:6]).ljust(8, "\x00") + msg = (struct.pack("!H", 0x1000 | tx_frame["size"]) + tx_frame["data"][:6]).ljust(8, b"\x00") if (debug): print("S: {} {}".format(hex(self.tx_addr), hexlify(msg))) self.panda.can_send(self.tx_addr, msg, self.bus) else: @@ -377,9 +379,9 @@ class UdsClient(): # generic uds request def _uds_request(self, service_type, subfunction=None, data=None): - req = chr(service_type) + req = chr(service_type).encode("utf8") if subfunction is not None: - req += chr(subfunction) + req += chr(subfunction).encode("utf8") if data is not None: req += data self.tx_queue.put(req) @@ -450,7 +452,7 @@ class UdsClient(): return security_seed def communication_control(self, control_type, message_type): - data = chr(message_type) + data = chr(message_type).encode("utf8") self._uds_request(SERVICE_TYPE.COMMUNICATION_CONTROL, subfunction=control_type, data=data) def tester_present(self, ): @@ -503,7 +505,7 @@ class UdsClient(): def link_control(self, link_control_type, baud_rate_type=None): if link_control_type == LINK_CONTROL_TYPE.VERIFY_BAUDRATE_TRANSITION_WITH_FIXED_BAUDRATE: # baud_rate_type = BAUD_RATE_TYPE - data = chr(baud_rate_type) + data = chr(baud_rate_type).encode("utf8") elif link_control_type == LINK_CONTROL_TYPE.VERIFY_BAUDRATE_TRANSITION_WITH_SPECIFIC_BAUDRATE: # baud_rate_type = custom value (3 bytes big-endian) data = struct.pack('!I', baud_rate_type)[1:] @@ -525,7 +527,7 @@ class UdsClient(): raise ValueError('invalid memory_address_bytes: {}'.format(memory_address_bytes)) if memory_size_bytes < 1 or memory_size_bytes > 4: raise ValueError('invalid memory_size_bytes: {}'.format(memory_size_bytes)) - data = chr(memory_size_bytes<<4 | memory_address_bytes) + data = chr(memory_size_bytes<<4 | memory_address_bytes).encode("utf8") if memory_address >= 1<<(memory_address_bytes*8): raise ValueError('invalid memory_address: {}'.format(memory_address)) @@ -547,7 +549,7 @@ class UdsClient(): def read_data_by_periodic_identifier(self, transmission_mode_type, periodic_data_identifier): # TODO: support list of identifiers - data = chr(transmission_mode_type) + chr(periodic_data_identifier) + data = chr(transmission_mode_type).encode("utf8") + chr(periodic_data_identifier).encode("utf8") self._uds_request(SERVICE_TYPE.READ_DATA_BY_PERIODIC_IDENTIFIER, subfunction=None, data=data) def dynamically_define_data_identifier(self, dynamic_definition_type, dynamic_data_identifier, source_definitions, memory_address_bytes=4, memory_size_bytes=1): @@ -559,9 +561,9 @@ class UdsClient(): data = struct.pack('!H', dynamic_data_identifier) if dynamic_definition_type == DYNAMIC_DEFINITION_TYPE.DEFINE_BY_IDENTIFIER: for s in source_definitions: - data += struct.pack('!H', s["data_identifier"]) + chr(s["position"]) + chr(s["memory_size"]) + data += struct.pack('!H', s["data_identifier"]) + chr(s["position"]).encode("utf8") + chr(s["memory_size"]).encode("utf8") elif dynamic_definition_type == DYNAMIC_DEFINITION_TYPE.DEFINE_BY_MEMORY_ADDRESS: - data += chr(memory_size_bytes<<4 | memory_address_bytes) + data += chr(memory_size_bytes<<4 | memory_address_bytes).encode("utf8") for s in source_definitions: if s["memory_address"] >= 1<<(memory_address_bytes*8): raise ValueError('invalid memory_address: {}'.format(s["memory_address"])) @@ -587,7 +589,7 @@ class UdsClient(): raise ValueError('invalid memory_address_bytes: {}'.format(memory_address_bytes)) if memory_size_bytes < 1 or memory_size_bytes > 4: raise ValueError('invalid memory_size_bytes: {}'.format(memory_size_bytes)) - data = chr(memory_size_bytes<<4 | memory_address_bytes) + data = chr(memory_size_bytes<<4 | memory_address_bytes).encode("utf8") if memory_address >= 1<<(memory_address_bytes*8): raise ValueError('invalid memory_address: {}'.format(memory_address)) @@ -612,7 +614,7 @@ class UdsClient(): dtc_report_type == DTC_REPORT_TYPE.NUMBER_OF_MIRROR_MEMORY_DTC_BY_STATUS_MASK or \ dtc_report_type == DTC_REPORT_TYPE.NUMBER_OF_EMISSIONS_RELATED_OBD_DTC_BY_STATUS_MASK or \ dtc_report_type == DTC_REPORT_TYPE.EMISSIONS_RELATED_OBD_DTC_BY_STATUS_MASK: - data += chr(dtc_status_mask_type) + data += chr(dtc_status_mask_type).encode("utf8") # dtc_mask_record if dtc_report_type == DTC_REPORT_TYPE.DTC_SNAPSHOT_IDENTIFICATION or \ dtc_report_type == DTC_REPORT_TYPE.DTC_SNAPSHOT_RECORD_BY_DTC_NUMBER or \ @@ -628,11 +630,11 @@ class UdsClient(): # dtc_extended_record_num if dtc_report_type == DTC_REPORT_TYPE.DTC_EXTENDED_DATA_RECORD_BY_DTC_NUMBER or \ dtc_report_type == DTC_REPORT_TYPE.MIRROR_MEMORY_DTC_EXTENDED_DATA_RECORD_BY_DTC_NUMBER: - data += chr(dtc_extended_record_num) + data += chr(dtc_extended_record_num).encode("utf8") # dtc_severity_mask_type if dtc_report_type == DTC_REPORT_TYPE.NUMBER_OF_DTC_BY_SEVERITY_MASK_RECORD or \ dtc_report_type == DTC_REPORT_TYPE.DTC_BY_SEVERITY_MASK_RECORD: - data += chr(dtc_severity_mask_type) + chr(dtc_status_mask_type) + data += chr(dtc_severity_mask_type).encode("utf8") + chr(dtc_status_mask_type).encode("utf8") resp = self._uds_request(SERVICE_TYPE.READ_DTC_INFORMATION, subfunction=dtc_report_type, data=data) @@ -656,13 +658,13 @@ class UdsClient(): return resp[2:] def request_download(self, memory_address, memory_size, memory_address_bytes=4, memory_size_bytes=4, data_format=0x00): - data = chr(data_format) + data = chr(data_format).encode("utf8") if memory_address_bytes < 1 or memory_address_bytes > 4: raise ValueError('invalid memory_address_bytes: {}'.format(memory_address_bytes)) if memory_size_bytes < 1 or memory_size_bytes > 4: raise ValueError('invalid memory_size_bytes: {}'.format(memory_size_bytes)) - data += chr(memory_size_bytes<<4 | memory_address_bytes) + data += chr(memory_size_bytes<<4 | memory_address_bytes).encode("utf8") if memory_address >= 1<<(memory_address_bytes*8): raise ValueError('invalid memory_address: {}'.format(memory_address)) @@ -681,13 +683,13 @@ class UdsClient(): return max_num_bytes # max number of bytes per transfer data request def request_upload(self, memory_address, memory_size, memory_address_bytes=4, memory_size_bytes=4, data_format=0x00): - data = chr(data_format) + data = chr(data_format).encode("utf8") if memory_address_bytes < 1 or memory_address_bytes > 4: raise ValueError('invalid memory_address_bytes: {}'.format(memory_address_bytes)) if memory_size_bytes < 1 or memory_size_bytes > 4: raise ValueError('invalid memory_size_bytes: {}'.format(memory_size_bytes)) - data += chr(memory_size_bytes<<4 | memory_address_bytes) + data += chr(memory_size_bytes<<4 | memory_address_bytes).encode("utf8") if memory_address >= 1<<(memory_address_bytes*8): raise ValueError('invalid memory_address: {}'.format(memory_address)) @@ -706,7 +708,7 @@ class UdsClient(): return max_num_bytes # max number of bytes per transfer data request def transfer_data(self, block_sequence_count, data=''): - data = chr(block_sequence_count)+data + data = chr(block_sequence_count).encode("utf8") + data resp = self._uds_request(SERVICE_TYPE.TRANSFER_DATA, subfunction=None, data=data) resp_id = resp[0] if len(resp) > 0 else None if resp_id != block_sequence_count: