bug fixes

master
Greg Hogan 2018-12-05 01:19:19 -08:00
parent 8ee89a091d
commit 33a5167d93
1 changed files with 43 additions and 37 deletions

View File

@ -193,26 +193,32 @@ def _uds_request(address, service_type, subfunction=None, data=None):
if data is not None:
req += data
tx_queue.put(req)
try:
resp = rx_queue.get(block=True, timeout=10)
except Empty:
raise MessageTimeoutError("timeout waiting for response")
resp_sid = resp[0] if len(resp) > 0 else None
# negative response
if resp_sid == 0x7F:
service_id = 0
while True:
try:
service_id = resp[1]
service_desc = SERVICE_TYPE(service_id).name
except:
service_desc = 'NON_STANDARD_SERVICE'
error_code = resp[2] if len(resp) > 2 else '0x??'
try:
error_desc = _negative_response_codes[error_code]
except:
error_desc = 'unknown error'
raise NegativeResponseError('{} - {}'.format(service_desc, error_desc), service_id, error_code)
resp = rx_queue.get(block=True, timeout=10)
except Empty:
raise MessageTimeoutError("timeout waiting for response")
resp_sid = resp[0] if len(resp) > 0 else None
# negative response
if resp_sid == 0x7F:
service_id = resp[1] if len(resp) > 1 else -1
try:
service_desc = SERVICE_TYPE(service_id).name
except Exception:
service_desc = 'NON_STANDARD_SERVICE'
error_code = resp[2] if len(resp) > 2 else -1
try:
error_desc = _negative_response_codes[error_code]
except Exception:
error_desc = 'unknown error'
# wait for another message if response pending
if error_code == 0x78:
time.sleep(0.1)
continue
raise NegativeResponseError('{} - {}'.format(service_desc, error_desc), service_id, error_code)
break
# positive response
if service_type+0x40 != resp_sid:
@ -249,7 +255,7 @@ def ecu_reset(address, reset_type):
resp = _uds_request(address, SERVICE_TYPE.ECU_RESET, subfunction=reset_type)
power_down_time = None
if reset_type == RESET_TYPE.ENABLE_RAPID_POWER_SHUTDOWN:
power_down_time = ord(resp[0])
power_down_time = resp[0]
return power_down_time
class ACCESS_TYPE(IntEnum):
@ -257,7 +263,7 @@ class ACCESS_TYPE(IntEnum):
SEND_KEY = 2
def security_access(address, access_type, security_key=None):
request_seed = access_type % 2 == 0
request_seed = access_type % 2 != 0
if request_seed and security_key is not None:
raise ValueError('security_key not allowed')
if not request_seed and security_key is None:
@ -339,13 +345,13 @@ def response_on_event(address, response_event_type, store_event, window_time, ev
if response_event_type == REPORT_ACTIVATED_EVENTS:
return {
"num_of_activated_events": ord(resp[0]),
"num_of_activated_events": resp[0],
"data": resp[1:], # TODO: parse the reset of response
}
return {
"num_of_identified_events": ord(resp[0]),
"event_window_time": ord(resp[1]),
"num_of_identified_events": resp[0],
"event_window_time": resp[1],
"data": resp[2:], # TODO: parse the reset of response
}
@ -423,7 +429,7 @@ def read_memory_by_address(address, memory_address, memory_size, memory_address_
raise ValueError('invalid memory_address_bytes: {}'.format(memory_address_bytes))
if memory_size_bytes < 1 or memory_size_bytes > 4:
raise ValueError('invalid memory_size_bytes: {}'.format(memory_size_bytes))
data = struct.pack('!BB', memory_size_bytes, memory_address_bytes)
data = chr(memory_size_bytes<<4 | memory_address_bytes)
if memory_address >= 1<<(memory_address_bytes*8):
raise ValueError('invalid memory_address: {}'.format(memory_address))
@ -464,14 +470,14 @@ def dynamically_define_data_identifier(address, dynamic_definition_type, dynamic
raise ValueError('invalid memory_address_bytes: {}'.format(memory_address_bytes))
if memory_size_bytes < 1 or memory_size_bytes > 4:
raise ValueError('invalid memory_size_bytes: {}'.format(memory_size_bytes))
data = struct.pack('!BB', memory_size_bytes, memory_address_bytes)
data = chr(memory_size_bytes<<4 | memory_address_bytes)
data = struct.pack('!H', dynamic_data_identifier)
if dynamic_definition_type == DYNAMIC_DEFINITION_TYPE.DEFINE_BY_IDENTIFIER:
for s in source_definitions:
data += struct.pack('!H', s["data_identifier"]) + chr(s["position"]) + chr(s["memory_size"])
elif dynamic_definition_type == DYNAMIC_DEFINITION_TYPE.DEFINE_BY_MEMORY_ADDRESS:
data += struct.pack('!BB', memory_size_bytes, memory_address_bytes)
data += chr(memory_size_bytes<<4 | memory_address_bytes)
for s in source_definitions:
if s["memory_address"] >= 1<<(memory_address_bytes*8):
raise ValueError('invalid memory_address: {}'.format(s["memory_address"]))
@ -497,7 +503,7 @@ def write_memory_by_address(address, memory_address, memory_size, data_record, m
raise ValueError('invalid memory_address_bytes: {}'.format(memory_address_bytes))
if memory_size_bytes < 1 or memory_size_bytes > 4:
raise ValueError('invalid memory_size_bytes: {}'.format(memory_size_bytes))
data = struct.pack('!BB', memory_size_bytes, memory_address_bytes)
data = chr(memory_size_bytes<<4 | memory_address_bytes)
if memory_address >= 1<<(memory_address_bytes*8):
raise ValueError('invalid memory_address: {}'.format(memory_address))
@ -619,8 +625,8 @@ class ROUTINE_IDENTIFIER_TYPE(IntEnum):
def routine_control(address, routine_control_type, routine_identifier_type, routine_option_record=''):
data = struct.pack('!H', routine_identifier_type) + routine_option_record
_uds_request(address, SERVICE_TYPE.ROUTINE_CONTROL, subfunction=routine_control_type, data=data)
resp = resp_id = struct.unpack('!H', resp[0:2])[0] if len(resp) >= 2 else None
resp = _uds_request(address, SERVICE_TYPE.ROUTINE_CONTROL, subfunction=routine_control_type, data=data)
resp_id = struct.unpack('!H', resp[0:2])[0] if len(resp) >= 2 else None
if resp_id != routine_identifier_type:
raise ValueError('invalid response routine identifier: {}'.format(hex(resp_id)))
return resp[2:]
@ -632,7 +638,7 @@ def request_download(address, memory_address, memory_size, memory_address_bytes=
raise ValueError('invalid memory_address_bytes: {}'.format(memory_address_bytes))
if memory_size_bytes < 1 or memory_size_bytes > 4:
raise ValueError('invalid memory_size_bytes: {}'.format(memory_size_bytes))
data += struct.pack('!BB', memory_size_bytes, memory_address_bytes)
data += chr(memory_size_bytes<<4 | memory_address_bytes)
if memory_address >= 1<<(memory_address_bytes*8):
raise ValueError('invalid memory_address: {}'.format(memory_address))
@ -642,7 +648,7 @@ def request_download(address, memory_address, memory_size, memory_address_bytes=
data += struct.pack('!I', memory_size)[4-memory_size_bytes:]
resp = _uds_request(address, SERVICE_TYPE.REQUEST_DOWNLOAD, subfunction=None, data=data)
max_num_bytes_len = ord(resp[0]) >> 4 if len(resp) > 0 else None
max_num_bytes_len = resp[0] >> 4 if len(resp) > 0 else None
if max_num_bytes_len >= 1 and max_num_bytes_len <= 4:
max_num_bytes = struct.unpack('!I', ('\x00'*(4-max_num_bytes_len))+resp[1:max_num_bytes_len+1])[0]
else:
@ -657,7 +663,7 @@ def request_upload(address, memory_address, memory_size, memory_address_bytes=4,
raise ValueError('invalid memory_address_bytes: {}'.format(memory_address_bytes))
if memory_size_bytes < 1 or memory_size_bytes > 4:
raise ValueError('invalid memory_size_bytes: {}'.format(memory_size_bytes))
data += struct.pack('!BB', memory_size_bytes, memory_address_bytes)
data += chr(memory_size_bytes<<4 | memory_address_bytes)
if memory_address >= 1<<(memory_address_bytes*8):
raise ValueError('invalid memory_address: {}'.format(memory_address))
@ -667,7 +673,7 @@ def request_upload(address, memory_address, memory_size, memory_address_bytes=4,
data += struct.pack('!I', memory_size)[4-memory_size_bytes:]
resp = _uds_request(address, SERVICE_TYPE.REQUEST_UPLOAD, subfunction=None, data=data)
max_num_bytes_len = ord(resp[0]) >> 4 if len(resp) > 0 else None
max_num_bytes_len = resp[0] >> 4 if len(resp) > 0 else None
if max_num_bytes_len >= 1 and max_num_bytes_len <= 4:
max_num_bytes = struct.unpack('!I', ('\x00'*(4-max_num_bytes_len))+resp[1:max_num_bytes_len+1])[0]
else:
@ -676,8 +682,9 @@ def request_upload(address, memory_address, memory_size, memory_address_bytes=4,
return max_num_bytes # max number of bytes per transfer data request
def transfer_data(address, block_sequence_count, data=''):
data = chr(block_sequence_count)+data
resp = _uds_request(address, SERVICE_TYPE.TRANSFER_DATA, subfunction=None, data=data)
resp_id = ord(resp[0]) if len(resp) > 0 else None
resp_id = resp[0] if len(resp) > 0 else None
if resp_id != block_sequence_count:
raise ValueError('invalid block_sequence_count: {}'.format(resp_id))
return resp[1:]
@ -687,7 +694,6 @@ def request_transfer_exit(address):
if __name__ == "__main__":
from python import Panda
from string import printable
panda = Panda()
bus = 0
tx_addr = 0x18da30f1 # EPS
@ -702,7 +708,7 @@ if __name__ == "__main__":
tester_present(tx_addr)
print("extended diagnostic session ...")
diagnostic_session_control(tx_addr, SESSION_TYPE.EXTENDED_DIAGNOSTIC)
print("application software id ...")
print("read data by id: application software id ...")
app_id = read_data_by_identifier(tx_addr, DATA_IDENTIFIER_TYPE.APPLICATION_SOFTWARE_IDENTIFICATION)
print(app_id)
# for i in range(0xF100, 0xF2FF):
@ -713,7 +719,7 @@ if __name__ == "__main__":
# desc = " [" + DATA_IDENTIFIER_TYPE(i).name + "]"
# except ValueError:
# pass
# print("{}:{} {} {}".format(hex(i), desc, hexlify(dat), dat.decode(errors="ignore")))
# print("{}:{} {} {}".format(hex(i), desc, hexlify(dat), "")) #, dat.decode(errors="ignore")))
# except NegativeResponseError as e:
# if e.error_code != 0x31:
# print("{}: {}".format(hex(i), e))