diff --git a/python/uds.py b/python/uds.py index ed0404a..6580bd3 100644 --- a/python/uds.py +++ b/python/uds.py @@ -193,26 +193,32 @@ def _uds_request(address, service_type, subfunction=None, data=None): if data is not None: req += data tx_queue.put(req) - try: - resp = rx_queue.get(block=True, timeout=10) - except Empty: - raise MessageTimeoutError("timeout waiting for response") - resp_sid = resp[0] if len(resp) > 0 else None - # negative response - if resp_sid == 0x7F: - service_id = 0 + while True: try: - service_id = resp[1] - service_desc = SERVICE_TYPE(service_id).name - except: - service_desc = 'NON_STANDARD_SERVICE' - error_code = resp[2] if len(resp) > 2 else '0x??' - try: - error_desc = _negative_response_codes[error_code] - except: - error_desc = 'unknown error' - raise NegativeResponseError('{} - {}'.format(service_desc, error_desc), service_id, error_code) + resp = rx_queue.get(block=True, timeout=10) + except Empty: + raise MessageTimeoutError("timeout waiting for response") + resp_sid = resp[0] if len(resp) > 0 else None + + # negative response + if resp_sid == 0x7F: + service_id = resp[1] if len(resp) > 1 else -1 + try: + service_desc = SERVICE_TYPE(service_id).name + except Exception: + service_desc = 'NON_STANDARD_SERVICE' + error_code = resp[2] if len(resp) > 2 else -1 + try: + error_desc = _negative_response_codes[error_code] + except Exception: + error_desc = 'unknown error' + # wait for another message if response pending + if error_code == 0x78: + time.sleep(0.1) + continue + raise NegativeResponseError('{} - {}'.format(service_desc, error_desc), service_id, error_code) + break # positive response if service_type+0x40 != resp_sid: @@ -249,7 +255,7 @@ def ecu_reset(address, reset_type): resp = _uds_request(address, SERVICE_TYPE.ECU_RESET, subfunction=reset_type) power_down_time = None if reset_type == RESET_TYPE.ENABLE_RAPID_POWER_SHUTDOWN: - power_down_time = ord(resp[0]) + power_down_time = resp[0] return power_down_time class ACCESS_TYPE(IntEnum): @@ -257,7 +263,7 @@ class ACCESS_TYPE(IntEnum): SEND_KEY = 2 def security_access(address, access_type, security_key=None): - request_seed = access_type % 2 == 0 + request_seed = access_type % 2 != 0 if request_seed and security_key is not None: raise ValueError('security_key not allowed') if not request_seed and security_key is None: @@ -339,13 +345,13 @@ def response_on_event(address, response_event_type, store_event, window_time, ev if response_event_type == REPORT_ACTIVATED_EVENTS: return { - "num_of_activated_events": ord(resp[0]), + "num_of_activated_events": resp[0], "data": resp[1:], # TODO: parse the reset of response } return { - "num_of_identified_events": ord(resp[0]), - "event_window_time": ord(resp[1]), + "num_of_identified_events": resp[0], + "event_window_time": resp[1], "data": resp[2:], # TODO: parse the reset of response } @@ -423,7 +429,7 @@ def read_memory_by_address(address, memory_address, memory_size, memory_address_ raise ValueError('invalid memory_address_bytes: {}'.format(memory_address_bytes)) if memory_size_bytes < 1 or memory_size_bytes > 4: raise ValueError('invalid memory_size_bytes: {}'.format(memory_size_bytes)) - data = struct.pack('!BB', memory_size_bytes, memory_address_bytes) + data = chr(memory_size_bytes<<4 | memory_address_bytes) if memory_address >= 1<<(memory_address_bytes*8): raise ValueError('invalid memory_address: {}'.format(memory_address)) @@ -464,14 +470,14 @@ def dynamically_define_data_identifier(address, dynamic_definition_type, dynamic raise ValueError('invalid memory_address_bytes: {}'.format(memory_address_bytes)) if memory_size_bytes < 1 or memory_size_bytes > 4: raise ValueError('invalid memory_size_bytes: {}'.format(memory_size_bytes)) - data = struct.pack('!BB', memory_size_bytes, memory_address_bytes) + data = chr(memory_size_bytes<<4 | memory_address_bytes) data = struct.pack('!H', dynamic_data_identifier) if dynamic_definition_type == DYNAMIC_DEFINITION_TYPE.DEFINE_BY_IDENTIFIER: for s in source_definitions: data += struct.pack('!H', s["data_identifier"]) + chr(s["position"]) + chr(s["memory_size"]) elif dynamic_definition_type == DYNAMIC_DEFINITION_TYPE.DEFINE_BY_MEMORY_ADDRESS: - data += struct.pack('!BB', memory_size_bytes, memory_address_bytes) + data += chr(memory_size_bytes<<4 | memory_address_bytes) for s in source_definitions: if s["memory_address"] >= 1<<(memory_address_bytes*8): raise ValueError('invalid memory_address: {}'.format(s["memory_address"])) @@ -497,7 +503,7 @@ def write_memory_by_address(address, memory_address, memory_size, data_record, m raise ValueError('invalid memory_address_bytes: {}'.format(memory_address_bytes)) if memory_size_bytes < 1 or memory_size_bytes > 4: raise ValueError('invalid memory_size_bytes: {}'.format(memory_size_bytes)) - data = struct.pack('!BB', memory_size_bytes, memory_address_bytes) + data = chr(memory_size_bytes<<4 | memory_address_bytes) if memory_address >= 1<<(memory_address_bytes*8): raise ValueError('invalid memory_address: {}'.format(memory_address)) @@ -619,8 +625,8 @@ class ROUTINE_IDENTIFIER_TYPE(IntEnum): def routine_control(address, routine_control_type, routine_identifier_type, routine_option_record=''): data = struct.pack('!H', routine_identifier_type) + routine_option_record - _uds_request(address, SERVICE_TYPE.ROUTINE_CONTROL, subfunction=routine_control_type, data=data) - resp = resp_id = struct.unpack('!H', resp[0:2])[0] if len(resp) >= 2 else None + resp = _uds_request(address, SERVICE_TYPE.ROUTINE_CONTROL, subfunction=routine_control_type, data=data) + resp_id = struct.unpack('!H', resp[0:2])[0] if len(resp) >= 2 else None if resp_id != routine_identifier_type: raise ValueError('invalid response routine identifier: {}'.format(hex(resp_id))) return resp[2:] @@ -632,7 +638,7 @@ def request_download(address, memory_address, memory_size, memory_address_bytes= raise ValueError('invalid memory_address_bytes: {}'.format(memory_address_bytes)) if memory_size_bytes < 1 or memory_size_bytes > 4: raise ValueError('invalid memory_size_bytes: {}'.format(memory_size_bytes)) - data += struct.pack('!BB', memory_size_bytes, memory_address_bytes) + data += chr(memory_size_bytes<<4 | memory_address_bytes) if memory_address >= 1<<(memory_address_bytes*8): raise ValueError('invalid memory_address: {}'.format(memory_address)) @@ -642,7 +648,7 @@ def request_download(address, memory_address, memory_size, memory_address_bytes= data += struct.pack('!I', memory_size)[4-memory_size_bytes:] resp = _uds_request(address, SERVICE_TYPE.REQUEST_DOWNLOAD, subfunction=None, data=data) - max_num_bytes_len = ord(resp[0]) >> 4 if len(resp) > 0 else None + max_num_bytes_len = resp[0] >> 4 if len(resp) > 0 else None if max_num_bytes_len >= 1 and max_num_bytes_len <= 4: max_num_bytes = struct.unpack('!I', ('\x00'*(4-max_num_bytes_len))+resp[1:max_num_bytes_len+1])[0] else: @@ -657,7 +663,7 @@ def request_upload(address, memory_address, memory_size, memory_address_bytes=4, raise ValueError('invalid memory_address_bytes: {}'.format(memory_address_bytes)) if memory_size_bytes < 1 or memory_size_bytes > 4: raise ValueError('invalid memory_size_bytes: {}'.format(memory_size_bytes)) - data += struct.pack('!BB', memory_size_bytes, memory_address_bytes) + data += chr(memory_size_bytes<<4 | memory_address_bytes) if memory_address >= 1<<(memory_address_bytes*8): raise ValueError('invalid memory_address: {}'.format(memory_address)) @@ -667,7 +673,7 @@ def request_upload(address, memory_address, memory_size, memory_address_bytes=4, data += struct.pack('!I', memory_size)[4-memory_size_bytes:] resp = _uds_request(address, SERVICE_TYPE.REQUEST_UPLOAD, subfunction=None, data=data) - max_num_bytes_len = ord(resp[0]) >> 4 if len(resp) > 0 else None + max_num_bytes_len = resp[0] >> 4 if len(resp) > 0 else None if max_num_bytes_len >= 1 and max_num_bytes_len <= 4: max_num_bytes = struct.unpack('!I', ('\x00'*(4-max_num_bytes_len))+resp[1:max_num_bytes_len+1])[0] else: @@ -676,8 +682,9 @@ def request_upload(address, memory_address, memory_size, memory_address_bytes=4, return max_num_bytes # max number of bytes per transfer data request def transfer_data(address, block_sequence_count, data=''): + data = chr(block_sequence_count)+data resp = _uds_request(address, SERVICE_TYPE.TRANSFER_DATA, subfunction=None, data=data) - resp_id = ord(resp[0]) if len(resp) > 0 else None + resp_id = resp[0] if len(resp) > 0 else None if resp_id != block_sequence_count: raise ValueError('invalid block_sequence_count: {}'.format(resp_id)) return resp[1:] @@ -687,7 +694,6 @@ def request_transfer_exit(address): if __name__ == "__main__": from python import Panda - from string import printable panda = Panda() bus = 0 tx_addr = 0x18da30f1 # EPS @@ -702,7 +708,7 @@ if __name__ == "__main__": tester_present(tx_addr) print("extended diagnostic session ...") diagnostic_session_control(tx_addr, SESSION_TYPE.EXTENDED_DIAGNOSTIC) - print("application software id ...") + print("read data by id: application software id ...") app_id = read_data_by_identifier(tx_addr, DATA_IDENTIFIER_TYPE.APPLICATION_SOFTWARE_IDENTIFICATION) print(app_id) # for i in range(0xF100, 0xF2FF): @@ -713,7 +719,7 @@ if __name__ == "__main__": # desc = " [" + DATA_IDENTIFIER_TYPE(i).name + "]" # except ValueError: # pass - # print("{}:{} {} {}".format(hex(i), desc, hexlify(dat), dat.decode(errors="ignore"))) + # print("{}:{} {} {}".format(hex(i), desc, hexlify(dat), "")) #, dat.decode(errors="ignore"))) # except NegativeResponseError as e: # if e.error_code != 0x31: # print("{}: {}".format(hex(i), e))