diff --git a/python/uds.py b/python/uds.py index 9e931d3..b596b4e 100644 --- a/python/uds.py +++ b/python/uds.py @@ -75,6 +75,13 @@ _negative_response_codes = { '\x93': 'voltage too low', } +class NegativeResponseError(Exception): + pass +class InvalidServiceIdError(Exception): + pass +class InvalidSubFunctioneError(Exception): + pass + # generic uds request def _request(address, service, subfunction, data=None): # TODO: send request @@ -84,24 +91,24 @@ def _request(address, service, subfunction, data=None): if resp[0] == '\x7f' error_code = resp[2] error_desc = _negative_response_codes[error_code] - raise Exception('[{}] {}'.format(hex(ord(error_code)), error_desc)) + raise NegativeResponseError('[{}] {}'.format(hex(ord(error_code)), error_desc)) resp_sid = ord(resp[0]) if len(resp) > 0 else None if service != resp_sid + 0x40: resp_sid_hex = hex(resp_sid) if resp_sid is not None else None - raise Exception('invalid response service id: {}'.format(resp_sid_hex)) + raise InvalidServiceIdError('invalid response service id: {}'.format(resp_sid_hex)) if subfunction is None: resp_subf = ord(resp[1]) if len(resp) > 1 else None if subfunction != resp_subf: resp_subf_hex = hex(resp_subf) if resp_subf is not None else None - raise Exception('invalid response subfunction: {}'.format(hex(resp_subf))) + raise InvalidSubFunctioneError('invalid response subfunction: {}'.format(hex(resp_subf))) # return data (exclude service id and sub-function id) return resp[(1 if subfunction is None else 2):] # services -class DIAGNOSTIC_SESSION_CONTROL_TYPE(Enum): +class SESSION_TYPE(Enum): DEFAULT = 1 PROGRAMMING = 2 EXTENDED_DIAGNOSTIC = 3 @@ -110,7 +117,7 @@ class DIAGNOSTIC_SESSION_CONTROL_TYPE(Enum): def diagnostic_session_control(address, session_type): _request(address, service=0x10, subfunction=session_type) -class ECU_RESET_TYPE(Enum): +class RESET_TYPE(Enum): HARD = 1 KEY_OFF_ON = 2 SOFT = 3 @@ -122,9 +129,9 @@ def ecu_reset(address, reset_type): power_down_time = None if reset_type == RESET_TYPE.ENABLE_RAPID_POWER_SHUTDOWN power_down_time = ord(resp[0]) - return {"power_down_time": power_down_time} + return power_down_time -class SECURITY_ACCESS_TYPE(Enum): +class ACCESS_TYPE(Enum): REQUEST_SEED = 1 SEND_KEY = 2 @@ -136,15 +143,16 @@ def security_access(address, access_type, security_key=None): raise ValueError('security_key is missing') resp = _request(address, service=0x27, subfunction=access_type, data=security_key) if request_seed: - return {"security_seed": resp} + security_seed = resp + return security_seed -class COMMUNICATION_CONTROL_TYPE(Enum): +class CONTROL_TYPE(Enum): ENABLE_RX_ENABLE_TX = 0 ENABLE_RX_DISABLE_TX = 1 DISABLE_RX_ENABLE_TX = 2 DISABLE_RX_DISABLE_TX = 3 -class COMMUNICATION_CONTROL_MESSAGE_TYPE(Enum): +class MESSAGE_TYPE(Enum): NORMAL = 1 NETWORK_MANAGEMENT = 2 NORMAL_AND_NETWORK_MANAGEMENT = 3 @@ -156,41 +164,42 @@ def communication_control(address, control_type, message_type): def tester_present(address): _request(address, service=0x3E, subfunction=0x00) -class ACCESS_TIMING_PARAMETER_TYPE(Enum): +class TIMING_PARAMETER_TYPE(Enum): READ_EXTENDED_SET = 1 SET_TO_DEFAULT_VALUES = 2 READ_CURRENTLY_ACTIVE = 3 SET_TO_GIVEN_VALUES = 4 -def access_timing_parameter(address, parameter_type, parameter_values): - write_custom_values = parameter_type == ACCESS_TIMING_PARAMETER_TYPE.SET_TO_GIVEN_VALUES +def access_timing_parameter(address, timing_parameter_type, parameter_values): + write_custom_values = timing_parameter_type == ACCESS_TIMING_PARAMETER_TYPE.SET_TO_GIVEN_VALUES read_values = ( - parameter_type == ACCESS_TIMING_PARAMETER_TYPE.READ_CURRENTLY_ACTIVE or - parameter_type == ACCESS_TIMING_PARAMETER_TYPE.READ_EXTENDED_SET + timing_parameter_type == ACCESS_TIMING_PARAMETER_TYPE.READ_CURRENTLY_ACTIVE or + timing_parameter_type == ACCESS_TIMING_PARAMETER_TYPE.READ_EXTENDED_SET ) if not write_custom_values and parameter_values is not None: raise ValueError('parameter_values not allowed') if write_custom_values and parameter_values is None: raise ValueError('parameter_values is missing') - resp = _request(address, service=0x83, subfunction=parameter_type, data=parameter_values) + resp = _request(address, service=0x83, subfunction=timing_parameter_type, data=parameter_values) if read_values: # TODO: parse response into values? - return {"parameter_values": resp} + parameter_values = resp + return parameter_values def secured_data_transmission(address, data): # TODO: split data into multiple input parameters? resp = _request(address, service=0x84, subfunction=None, data=data) # TODO: parse response into multiple output values? - return {"data"=resp} + return resp -class CONTROL_DTC_SETTING_TYPE(Enum): +class DTC_SETTING_TYPE(Enum): ON = 1 OFF = 2 -def control_dtc_setting(address, setting_type): - _request(address, service=0x85, subfunction=setting_type) +def control_dtc_setting(address, dtc_setting_type): + _request(address, service=0x85, subfunction=dtc_setting_type) -class RESPONSE_ON_EVENT_TYPE(Enum): +class RESPONSE_EVENT_TYPE(Enum): STOP_RESPONSE_ON_EVENT = 0 ON_DTC_STATUS_CHANGE = 1 ON_TIMER_INTERRUPT = 2 @@ -200,20 +209,31 @@ class RESPONSE_ON_EVENT_TYPE(Enum): CLEAR_RESPONSE_ON_EVENT = 6 ON_COMPARISON_OF_VALUES = 7 -def response_on_event(address, event_type, store_event, window_time, event_type_record, service_response_record): +def response_on_event(address, response_event_type, store_event, window_time, event_type_record, service_response_record): if store_event: - event_type |= 0x20 + response_event_type |= 0x20 # TODO: split record parameters into arrays data = char(window_time) + event_type_record + service_response_record - resp = _request(address, service=0x86, subfunction=event_type, data=data) - # TODO: parse the reset of response + resp = _request(address, service=0x86, subfunction=response_event_type, data=data) + + if response_event_type == REPORT_ACTIVATED_EVENTS: + return { + "num_of_activated_events": ord(resp[0]), + "data": resp[1:], # TODO: parse the reset of response + } + + return { + "num_of_identified_events": ord(resp[0]), + "event_window_time": ord(resp[1]), + "data": resp[2:], # TODO: parse the reset of response + } class LINK_CONTROL_TYPE(Enum): VERIFY_BAUDRATE_TRANSITION_WITH_FIXED_BAUDRATE = 1 VERIFY_BAUDRATE_TRANSITION_WITH_SPECIFIC_BAUDRATE = 2 TRANSITION_BAUDRATE = 3 -class LINK_CONTROL_BAUD_RATE(Enum): +class BAUD_RATE_TYPE(Enum): PC9600 = 1 PC19200 = 2 PC38400 = 3 @@ -224,18 +244,18 @@ class LINK_CONTROL_BAUD_RATE(Enum): CAN500000 = 18 CAN1000000 = 19 -def link_control(address, control_type, baud_rate=None): +def link_control(address, link_control_type, baud_rate_type=None): if LINK_CONTROL_TYPE.VERIFY_BAUDRATE_TRANSITION_WITH_FIXED_BAUDRATE: - # baud_rate = LINK_CONTROL_BAUD_RATE - data = chr(baud_rate) + # baud_rate_type = BAUD_RATE_TYPE + data = chr(baud_rate_type) elif LINK_CONTROL_TYPE.VERIFY_BAUDRATE_TRANSITION_WITH_SPECIFIC_BAUDRATE: - # baud_rate = custom value (3 bytes big-endian) - data = struct.pack('!I', baud_rate)[1:] + # baud_rate_type = custom value (3 bytes big-endian) + data = struct.pack('!I', baud_rate_type)[1:] else: data = None - _request(address, service=0x87, subfunction=control_type, data=data) + _request(address, service=0x87, subfunction=link_control_type, data=data) -class DATA_IDENTIFIER(Enum): +class DATA_IDENTIFIER_TYPE(Enum): BOOT_SOFTWARE_IDENTIFICATION = 0XF180 APPLICATION_SOFTWARE_IDENTIFICATION = 0XF181 APPLICATION_DATA_IDENTIFICATION = 0XF182 @@ -268,16 +288,16 @@ class DATA_IDENTIFIER(Enum): ODX_FILE = 0XF19E ENTITY = 0XF19F -def read_data_by_identifier(address, data_identifier): +def read_data_by_identifier(address, data_identifier_type): # TODO: support list of identifiers - data = struct.pack('!H', data_id) + data = struct.pack('!H', data_identifier_type) resp = _request(address, service=0x22, subfunction=None, data=data) - resp_id = struct.unpack('!H', data[0:2])[0] if len(data) >= 2 else None - if resp_id != data_id: + resp_id = struct.unpack('!H', resp[0:2])[0] if len(resp) >= 2 else None + if resp_id != data_identifier_type: raise ValueError('invalid response data identifier: {}'.format(hex(resp_id))) return resp[2:] -def read_memory_by_address(address, memory_address, memory_size, memory_address_bytes=4, memory_size_bytes=4): +def read_memory_by_address(address, memory_address, memory_size, memory_address_bytes=4, memory_size_bytes=1): if memory_address_bytes < 1 or memory_address_bytes > 4: raise ValueError('invalid memory_address_bytes: {}'.format(memory_address_bytes)) if memory_size_bytes < 1 or memory_size_bytes > 4: @@ -288,74 +308,261 @@ def read_memory_by_address(address, memory_address, memory_size, memory_address_ raise ValueError('invalid memory_address: {}'.format(memory_address)) data += struct.pack('!I', memory_address)[4-memory_address_bytes:] if memory_size >= 1<<(memory_size_bytes*8) - raise ValueError('invalid memory_size: {}'.format(memory_address)) + raise ValueError('invalid memory_size: {}'.format(memory_size)) data += struct.pack('!I', memory_size)[4-memory_size_bytes:] resp = _request(address, service=0x23, subfunction=None, data=data) return resp -def read_scaling_data_by_identifier(address): - raise NotImplementedError() - _request(address, service=0x24, subfunction=0x00) +def read_scaling_data_by_identifier(address, data_identifier_type): + data = struct.pack('!H', data_identifier_type) + resp = _request(address, service=0x24, subfunction=None, data=data) + resp_id = struct.unpack('!H', resp[0:2])[0] if len(resp) >= 2 else None + if resp_id != data_identifier_type: + raise ValueError('invalid response data identifier: {}'.format(hex(resp_id))) + return resp[2:] # TODO: parse the response -def read_data_by_periodic_identifier(address): - raise NotImplementedError() - _request(address, service=0x2A, subfunction=0x00) +class TRANSMISSION_MODE_TYPE(Enum): + SEND_AT_SLOW_RATE = 1 + SEND_AT_MEDIUM_RATE = 2 + SEND_AT_FAST_RATE = 3 + STOP_SENDING = 4 -def dynamically_define_data_identifier(address): - raise NotImplementedError() - _request(address, service=0x2C, subfunction=0x00) +def read_data_by_periodic_identifier(address, transmission_mode_type, periodic_data_identifier): + # TODO: support list of identifiers + data = chr(transmission_mode_type) + chr(periodic_data_identifier) + _request(address, service=0x2A, subfunction=None, data=data) -def write_data_by_identifier(address): - raise NotImplementedError() - _request(address, service=0x2E, subfunction=0x00) +class DYNAMIC_DEFINITION_TYPE(Enum): + DEFINE_BY_IDENTIFIER = 1 + DEFINE_BY_MEMORY_ADDRESS = 2 + CLEAR_DYNAMICALLY_DEFINED_DATA_IDENTIFIER = 3 -def write_memory_by_address(address): - raise NotImplementedError() - _request(address, service=0x3D, subfunction=0x00) +def dynamically_define_data_identifier(address, dynamic_definition_type, dynamic_data_identifier, source_definitions, memory_address_bytes=4, memory_size_bytes=1): + if memory_address_bytes < 1 or memory_address_bytes > 4: + raise ValueError('invalid memory_address_bytes: {}'.format(memory_address_bytes)) + if memory_size_bytes < 1 or memory_size_bytes > 4: + raise ValueError('invalid memory_size_bytes: {}'.format(memory_size_bytes)) + data = struct.pack('!BB', memory_size_bytes, memory_address_bytes) -def clear_diagnostic_information(address): - raise NotImplementedError() - _request(address, service=0x14, subfunction=0x00) + data = struct.pack('!H', dynamic_data_identifier) + if dynamic_definition_type == DYNAMIC_DEFINITION_TYPE.DEFINE_BY_IDENTIFIER: + for s in source_definitions: + data += struct.pack('!H', s["data_identifier"]) + chr(s["position"]) + chr(s["memory_size"]) + elif dynamic_definition_type == DYNAMIC_DEFINITION_TYPE.DEFINE_BY_MEMORY_ADDRESS: + data += struct.pack('!BB', memory_size_bytes, memory_address_bytes) + for s in source_definitions: + if s["memory_address"] >= 1<<(memory_address_bytes*8) + raise ValueError('invalid memory_address: {}'.format(s["memory_address"])) + data += struct.pack('!I', memory_address)[4-memory_address_bytes:] + if s["memory_size"] >= 1<<(memory_size_bytes*8) + raise ValueError('invalid memory_size: {}'.format(s["memory_size"])) + data += struct.pack('!I', s["memory_size"])[4-memory_size_bytes:] + elif dynamic_definition_type == DYNAMIC_DEFINITION_TYPE.CLEAR_DYNAMICALLY_DEFINED_DATA_IDENTIFIER: + pass + else: + raise ValueError('invalid dynamic identifier type: {}'.format(hex(dynamic_definition_type))) + _request(address, service=0x2C, subfunction=dynamic_definition_type, data=data) -def read_dtc_information(address): - raise NotImplementedError() - _request(address, service=0x19, subfunction=0x00) +def write_data_by_identifier(address, data_identifier_type, data_record): + data = struct.pack('!H', data_identifier_type) + data_record + resp = _request(address, service=0x2E, subfunction=None, data=data) + resp_id = struct.unpack('!H', resp[0:2])[0] if len(resp) >= 2 else None + if resp_id != data_identifier_type: + raise ValueError('invalid response data identifier: {}'.format(hex(resp_id))) -class INPUT_OUTPUT_CONTROL_PARAMETER(Enum): +def write_memory_by_address(address, memory_address, memory_size, data_record, memory_address_bytes=4, memory_size_bytes=1): + if memory_address_bytes < 1 or memory_address_bytes > 4: + raise ValueError('invalid memory_address_bytes: {}'.format(memory_address_bytes)) + if memory_size_bytes < 1 or memory_size_bytes > 4: + raise ValueError('invalid memory_size_bytes: {}'.format(memory_size_bytes)) + data = struct.pack('!BB', memory_size_bytes, memory_address_bytes) + + if memory_address >= 1<<(memory_address_bytes*8) + raise ValueError('invalid memory_address: {}'.format(memory_address)) + data += struct.pack('!I', memory_address)[4-memory_address_bytes:] + if memory_size >= 1<<(memory_size_bytes*8) + raise ValueError('invalid memory_size: {}'.format(memory_size)) + data += struct.pack('!I', memory_size)[4-memory_size_bytes:] + + data += data_record + _request(address, service=0x3D, subfunction=0x00, data=data) + +class DTC_GROUP_TYPE(Enum): + EMISSIONS = 0x000000 + ALL = 0xFFFFFF + +def clear_diagnostic_information(address, dtc_group_type): + data = struct.pack('!I', dtc_group_type)[1:] # 3 bytes + _request(address, service=0x14, subfunction=None, data=data) + +class DTC_REPORT_TYPE(Enum): + NUMBER_OF_DTC_BY_STATUS_MASK = 0x01 + DTC_BY_STATUS_MASK = 0x02 + DTC_SNAPSHOT_IDENTIFICATION = 0x03 + DTC_SNAPSHOT_RECORD_BY_DTC_NUMBER = 0x04 + DTC_SNAPSHOT_RECORD_BY_RECORD_NUMBER = 0x05 + DTC_EXTENDED_DATA_RECORD_BY_DTC_NUMBER = 0x06 + NUMBER_OF_DTC_BY_SEVERITY_MASK_RECORD = 0x07 + DTC_BY_SEVERITY_MASK_RECORD = 0x08 + SEVERITY_INFORMATION_OF_DTC = 0x09 + SUPPORTED_DTC = 0x0A + FIRST_TEST_FAILED_DTC = 0x0B + FIRST_CONFIRMED_DTC = 0x0C + MOST_RECENT_TEST_FAILED_DTC = 0x0D + MOST_RECENT_CONFIRMED_DTC = 0x0E + MIRROR_MEMORY_DTC_BY_STATUS_MASK = 0x0F + MIRROR_MEMORY_DTC_EXTENDED_DATA_RECORD_BY_DTC_NUMBER = 0x10 + NUMBER_OF_MIRROR_MEMORY_DTC_BY_STATUS_MASK = 0x11 + NUMBER_OF_EMISSIONS_RELATED_OBD_DTC_BY_STATUS_MASK = 0x12 + EMISSIONS_RELATED_OBD_DTC_BY_STATUS_MASK = 0x13 + DTC_FAULT_DETECTION_COUNTER = 0x14 + DTC_WITH_PERMANENT_STATUS = 0x15 + +class DTC_STATUS_MASK_TYPE(Enum): + TEST_FAILED = 0x01 + TEST_FAILED_THIS_OPERATION_CYCLE = 0x02 + PENDING_DTC = 0x04 + CONFIRMED_DTC = 0x08 + TEST_NOT_COMPLETED_SINCE_LAST_CLEAR = 0x10 + TEST_FAILED_SINCE_LAST_CLEAR = 0x20 + TEST_NOT_COMPLETED_THIS_OPERATION_CYCLE = 0x40 + WARNING_INDICATOR_REQUESTED = 0x80 + ALL = 0xFF + +class DTC_SEVERITY_MASK_TYPE(Enum): + MAINTENANCE_ONLY = 0x20 + CHECK_AT_NEXT_HALT = 0x40 + CHECK_IMMEDIATELY = 0x80 + ALL = 0xE0 + +def read_dtc_information(address, dtc_report_type, dtc_status_mask_type=DTC_STATUS_MASK_TYPE.ALL, dtc_severity_mask_type=DTC_SEVERITY_MASK_TYPE.ALL, dtc_mask_record=0xFFFFFF, dtc_snapshot_record_num=0xFF, dtc_extended_record_num=0xFF): + data = '' + # dtc_status_mask_type + if dtc_report_type == DTC_REPORT_TYPE.NUMBER_OF_DTC_BY_STATUS_MASK or + dtc_report_type == DTC_REPORT_TYPE.DTC_BY_STATUS_MASK or + dtc_report_type == DTC_REPORT_TYPE.MIRROR_MEMORY_DTC_BY_STATUS_MASK or + dtc_report_type == DTC_REPORT_TYPE.NUMBER_OF_MIRROR_MEMORY_DTC_BY_STATUS_MASK or + dtc_report_type == DTC_REPORT_TYPE.NUMBER_OF_EMISSIONS_RELATED_OBD_DTC_BY_STATUS_MASK or + dtc_report_type == DTC_REPORT_TYPE.EMISSIONS_RELATED_OBD_DTC_BY_STATUS_MASK: + data += chr(dtc_status_mask_type) + # dtc_mask_record + if dtc_report_type == DTC_REPORT_TYPE.DTC_SNAPSHOT_IDENTIFICATION or + dtc_report_type == DTC_REPORT_TYPE.DTC_SNAPSHOT_RECORD_BY_DTC_NUMBER or + dtc_report_type == DTC_REPORT_TYPE.DTC_EXTENDED_DATA_RECORD_BY_DTC_NUMBER or + dtc_report_type == DTC_REPORT_TYPE.MIRROR_MEMORY_DTC_EXTENDED_DATA_RECORD_BY_DTC_NUMBER or + dtc_report_type == DTC_REPORT_TYPE.SEVERITY_INFORMATION_OF_DTC: + data += struct.pack('!I', dtc_mask_record)[1:] # 3 bytes + # dtc_snapshot_record_num + if dtc_report_type == DTC_REPORT_TYPE.DTC_SNAPSHOT_IDENTIFICATION or + dtc_report_type == DTC_REPORT_TYPE.DTC_SNAPSHOT_RECORD_BY_DTC_NUMBER or + dtc_report_type == DTC_REPORT_TYPE.DTC_SNAPSHOT_RECORD_BY_RECORD_NUMBER: + data += ord(dtc_snapshot_record_num) + # dtc_extended_record_num + if dtc_report_type == DTC_REPORT_TYPE.DTC_EXTENDED_DATA_RECORD_BY_DTC_NUMBER or + dtc_report_type == DTC_REPORT_TYPE.MIRROR_MEMORY_DTC_EXTENDED_DATA_RECORD_BY_DTC_NUMBER: + data += chr(dtc_extended_record_num) + # dtc_severity_mask_type + if dtc_report_type == DTC_REPORT_TYPE.NUMBER_OF_DTC_BY_SEVERITY_MASK_RECORD or + dtc_report_type == DTC_REPORT_TYPE.DTC_BY_SEVERITY_MASK_RECORD: + data += chr(dtc_severity_mask_type) + chr(dtc_status_mask_type) + + resp = _request(address, service=0x19, subfunction=dtc_report_type, data=data) + + # TODO: parse response + return resp + +class CONTROL_OPTION_TYPE(Enum): RETURN_CONTROL_TO_ECU = 0 RESET_TO_DEFAULT = 1 FREEZE_CURRENT_STATE = 2 SHORT_TERM_ADJUSTMENT = 3 -def input_output_control_by_identifier(address): - raise NotImplementedError() - _request(address, service=0x2F, subfunction=0x00) +def input_output_control_by_identifier(address, data_identifier_type, control_option_record, control_enable_mask_record=''): + data = struct.pack('!H', data_identifier_type) + control_option_record + control_enable_mask_record + resp = _request(address, service=0x2F, subfunction=None, data=data) + resp_id = struct.unpack('!H', resp[0:2])[0] if len(resp) >= 2 else None + if resp_id != data_identifier_type: + raise ValueError('invalid response data identifier: {}'.format(hex(resp_id))) + return resp[2:] class ROUTINE_CONTROL_TYPE(Enum): + START = 1 + STOP = 2 + REQUEST_RESULTS = 3 + +class ROUTINE_IDENTIFIER_TYPE(Enum): ERASE_MEMORY = 0xFF00 CHECK_PROGRAMMING_DEPENDENCIES = 0xFF01 ERASE_MIRROR_MEMORY_DTCS = 0xFF02 -def routine_control(address): - raise NotImplementedError() - _request(address, service=0x31, subfunction=0x00) +def routine_control(address, routine_control_type, routine_identifier_type, routine_option_record=''): + data = struct.pack('!H', routine_identifier_type) + routine_option_record + _request(address, service=0x31, subfunction=routine_control_type, data=data) + resp = resp_id = struct.unpack('!H', resp[0:2])[0] if len(resp) >= 2 else None + if resp_id != routine_identifier_type: + raise ValueError('invalid response routine identifier: {}'.format(hex(resp_id))) + return resp[2:] -def request_download(address): - raise NotImplementedError() - _request(address, service=0x34, subfunction=0x00) +def request_download(address, memory_address, memory_size, memory_address_bytes=4, memory_size_bytes=4, data_format=0x00): + data = chr(data_format) -def request_upload(address): - raise NotImplementedError() - _request(address, service=0x35, subfunction=0x00) + if memory_address_bytes < 1 or memory_address_bytes > 4: + raise ValueError('invalid memory_address_bytes: {}'.format(memory_address_bytes)) + if memory_size_bytes < 1 or memory_size_bytes > 4: + raise ValueError('invalid memory_size_bytes: {}'.format(memory_size_bytes)) + data += struct.pack('!BB', memory_size_bytes, memory_address_bytes) + + if memory_address >= 1<<(memory_address_bytes*8) + raise ValueError('invalid memory_address: {}'.format(memory_address)) + data += struct.pack('!I', memory_address)[4-memory_address_bytes:] + if memory_size >= 1<<(memory_size_bytes*8) + raise ValueError('invalid memory_size: {}'.format(memory_size)) + data += struct.pack('!I', memory_size)[4-memory_size_bytes:] + + resp = _request(address, service=0x34, subfunction=None, data=data) + max_num_bytes_len = ord(resp[0]) >> 4 if len(resp) > 0 else None + if max_num_bytes_len >= 1 and max_num_bytes_len <= 4: + max_num_bytes = struct.unpack('!I', ('\x00'*(4-max_num_bytes_len))+resp[1:max_num_bytes_len+1])[0] + else: + raise ValueError('invalid max_num_bytes_len: {}'.format(max_num_bytes_len)) + + return max_num_bytes # max number of bytes per transfer data request + +def request_upload(address, memory_address, memory_size, memory_address_bytes=4, memory_size_bytes=4, data_format=0x00): + data = chr(data_format) + + if memory_address_bytes < 1 or memory_address_bytes > 4: + raise ValueError('invalid memory_address_bytes: {}'.format(memory_address_bytes)) + if memory_size_bytes < 1 or memory_size_bytes > 4: + raise ValueError('invalid memory_size_bytes: {}'.format(memory_size_bytes)) + data += struct.pack('!BB', memory_size_bytes, memory_address_bytes) + + if memory_address >= 1<<(memory_address_bytes*8) + raise ValueError('invalid memory_address: {}'.format(memory_address)) + data += struct.pack('!I', memory_address)[4-memory_address_bytes:] + if memory_size >= 1<<(memory_size_bytes*8) + raise ValueError('invalid memory_size: {}'.format(memory_size)) + data += struct.pack('!I', memory_size)[4-memory_size_bytes:] + + resp = _request(address, service=0x35, subfunction=None, data=data) + max_num_bytes_len = ord(resp[0]) >> 4 if len(resp) > 0 else None + if max_num_bytes_len >= 1 and max_num_bytes_len <= 4: + max_num_bytes = struct.unpack('!I', ('\x00'*(4-max_num_bytes_len))+resp[1:max_num_bytes_len+1])[0] + else: + raise ValueError('invalid max_num_bytes_len: {}'.format(max_num_bytes_len)) + + return max_num_bytes # max number of bytes per transfer data request + +def transfer_data(address, block_sequence_count, data=''): + resp = _request(address, service=0x36, subfunction=None, data=data) + resp_id = ord(resp[0]) if len(resp) > 0 else None + if resp_id != block_sequence_count: + raise ValueError('invalid block_sequence_count: {}'.format(resp_id)) + return resp[1:] -def transfer_data(address): - raise NotImplementedError() - _request(address, service=0x36, subfunction=0x00) - def request_transfer_exit(address) - raise NotImplementedError() - _request(address, service=0x37, subfunction=0x00) + _request(address, service=0x37, subfunction=None) if __name__ == "__main__": from . import uds