convert uds lib to class

master
Greg Hogan 2018-12-12 09:08:09 -08:00
parent 59cd2b47f9
commit 80fb6a6fa0
1 changed files with 580 additions and 594 deletions

View File

@ -6,8 +6,6 @@ from Queue import Queue, Empty
import threading
from binascii import hexlify
DEBUG = False
class SERVICE_TYPE(IntEnum):
DIAGNOSTIC_SESSION_CONTROL = 0x10
ECU_RESET = 0x11
@ -35,6 +33,189 @@ class SERVICE_TYPE(IntEnum):
TRANSFER_DATA = 0x36
REQUEST_TRANSFER_EXIT = 0x37
class SESSION_TYPE(IntEnum):
DEFAULT = 1
PROGRAMMING = 2
EXTENDED_DIAGNOSTIC = 3
SAFETY_SYSTEM_DIAGNOSTIC = 4
class RESET_TYPE(IntEnum):
HARD = 1
KEY_OFF_ON = 2
SOFT = 3
ENABLE_RAPID_POWER_SHUTDOWN = 4
DISABLE_RAPID_POWER_SHUTDOWN = 5
class ACCESS_TYPE(IntEnum):
REQUEST_SEED = 1
SEND_KEY = 2
class CONTROL_TYPE(IntEnum):
ENABLE_RX_ENABLE_TX = 0
ENABLE_RX_DISABLE_TX = 1
DISABLE_RX_ENABLE_TX = 2
DISABLE_RX_DISABLE_TX = 3
class MESSAGE_TYPE(IntEnum):
NORMAL = 1
NETWORK_MANAGEMENT = 2
NORMAL_AND_NETWORK_MANAGEMENT = 3
class TIMING_PARAMETER_TYPE(IntEnum):
READ_EXTENDED_SET = 1
SET_TO_DEFAULT_VALUES = 2
READ_CURRENTLY_ACTIVE = 3
SET_TO_GIVEN_VALUES = 4
class DTC_SETTING_TYPE(IntEnum):
ON = 1
OFF = 2
class RESPONSE_EVENT_TYPE(IntEnum):
STOP_RESPONSE_ON_EVENT = 0
ON_DTC_STATUS_CHANGE = 1
ON_TIMER_INTERRUPT = 2
ON_CHANGE_OF_DATA_IDENTIFIER = 3
REPORT_ACTIVATED_EVENTS = 4
START_RESPONSE_ON_EVENT = 5
CLEAR_RESPONSE_ON_EVENT = 6
ON_COMPARISON_OF_VALUES = 7
class LINK_CONTROL_TYPE(IntEnum):
VERIFY_BAUDRATE_TRANSITION_WITH_FIXED_BAUDRATE = 1
VERIFY_BAUDRATE_TRANSITION_WITH_SPECIFIC_BAUDRATE = 2
TRANSITION_BAUDRATE = 3
class BAUD_RATE_TYPE(IntEnum):
PC9600 = 1
PC19200 = 2
PC38400 = 3
PC57600 = 4
PC115200 = 5
CAN125000 = 16
CAN250000 = 17
CAN500000 = 18
CAN1000000 = 19
class DATA_IDENTIFIER_TYPE(IntEnum):
BOOT_SOFTWARE_IDENTIFICATION = 0XF180
APPLICATION_SOFTWARE_IDENTIFICATION = 0XF181
APPLICATION_DATA_IDENTIFICATION = 0XF182
BOOT_SOFTWARE_FINGERPRINT = 0XF183
APPLICATION_SOFTWARE_FINGERPRINT = 0XF184
APPLICATION_DATA_FINGERPRINT = 0XF185
ACTIVE_DIAGNOSTIC_SESSION = 0XF186
VEHICLE_MANUFACTURER_SPARE_PART_NUMBER = 0XF187
VEHICLE_MANUFACTURER_ECU_SOFTWARE_NUMBER = 0XF188
VEHICLE_MANUFACTURER_ECU_SOFTWARE_VERSION_NUMBER = 0XF189
SYSTEM_SUPPLIER_IDENTIFIER = 0XF18A
ECU_MANUFACTURING_DATE = 0XF18B
ECU_SERIAL_NUMBER = 0XF18C
SUPPORTED_FUNCTIONAL_UNITS = 0XF18D
VEHICLE_MANUFACTURER_KIT_ASSEMBLY_PART_NUMBER = 0XF18E
VIN = 0XF190
VEHICLE_MANUFACTURER_ECU_HARDWARE_NUMBER = 0XF191
SYSTEM_SUPPLIER_ECU_HARDWARE_NUMBER = 0XF192
SYSTEM_SUPPLIER_ECU_HARDWARE_VERSION_NUMBER = 0XF193
SYSTEM_SUPPLIER_ECU_SOFTWARE_NUMBER = 0XF194
SYSTEM_SUPPLIER_ECU_SOFTWARE_VERSION_NUMBER = 0XF195
EXHAUST_REGULATION_OR_TYPE_APPROVAL_NUMBER = 0XF196
SYSTEM_NAME_OR_ENGINE_TYPE = 0XF197
REPAIR_SHOP_CODE_OR_TESTER_SERIAL_NUMBER = 0XF198
PROGRAMMING_DATE = 0XF199
CALIBRATION_REPAIR_SHOP_CODE_OR_CALIBRATION_EQUIPMENT_SERIAL_NUMBER = 0XF19A
CALIBRATION_DATE = 0XF19B
CALIBRATION_EQUIPMENT_SOFTWARE_NUMBER = 0XF19C
ECU_INSTALLATION_DATE = 0XF19D
ODX_FILE = 0XF19E
ENTITY = 0XF19F
class TRANSMISSION_MODE_TYPE(IntEnum):
SEND_AT_SLOW_RATE = 1
SEND_AT_MEDIUM_RATE = 2
SEND_AT_FAST_RATE = 3
STOP_SENDING = 4
class DYNAMIC_DEFINITION_TYPE(IntEnum):
DEFINE_BY_IDENTIFIER = 1
DEFINE_BY_MEMORY_ADDRESS = 2
CLEAR_DYNAMICALLY_DEFINED_DATA_IDENTIFIER = 3
class DTC_GROUP_TYPE(IntEnum):
EMISSIONS = 0x000000
ALL = 0xFFFFFF
class DTC_REPORT_TYPE(IntEnum):
NUMBER_OF_DTC_BY_STATUS_MASK = 0x01
DTC_BY_STATUS_MASK = 0x02
DTC_SNAPSHOT_IDENTIFICATION = 0x03
DTC_SNAPSHOT_RECORD_BY_DTC_NUMBER = 0x04
DTC_SNAPSHOT_RECORD_BY_RECORD_NUMBER = 0x05
DTC_EXTENDED_DATA_RECORD_BY_DTC_NUMBER = 0x06
NUMBER_OF_DTC_BY_SEVERITY_MASK_RECORD = 0x07
DTC_BY_SEVERITY_MASK_RECORD = 0x08
SEVERITY_INFORMATION_OF_DTC = 0x09
SUPPORTED_DTC = 0x0A
FIRST_TEST_FAILED_DTC = 0x0B
FIRST_CONFIRMED_DTC = 0x0C
MOST_RECENT_TEST_FAILED_DTC = 0x0D
MOST_RECENT_CONFIRMED_DTC = 0x0E
MIRROR_MEMORY_DTC_BY_STATUS_MASK = 0x0F
MIRROR_MEMORY_DTC_EXTENDED_DATA_RECORD_BY_DTC_NUMBER = 0x10
NUMBER_OF_MIRROR_MEMORY_DTC_BY_STATUS_MASK = 0x11
NUMBER_OF_EMISSIONS_RELATED_OBD_DTC_BY_STATUS_MASK = 0x12
EMISSIONS_RELATED_OBD_DTC_BY_STATUS_MASK = 0x13
DTC_FAULT_DETECTION_COUNTER = 0x14
DTC_WITH_PERMANENT_STATUS = 0x15
class DTC_STATUS_MASK_TYPE(IntEnum):
TEST_FAILED = 0x01
TEST_FAILED_THIS_OPERATION_CYCLE = 0x02
PENDING_DTC = 0x04
CONFIRMED_DTC = 0x08
TEST_NOT_COMPLETED_SINCE_LAST_CLEAR = 0x10
TEST_FAILED_SINCE_LAST_CLEAR = 0x20
TEST_NOT_COMPLETED_THIS_OPERATION_CYCLE = 0x40
WARNING_INDICATOR_uds_requestED = 0x80
ALL = 0xFF
class DTC_SEVERITY_MASK_TYPE(IntEnum):
MAINTENANCE_ONLY = 0x20
CHECK_AT_NEXT_HALT = 0x40
CHECK_IMMEDIATELY = 0x80
ALL = 0xE0
class CONTROL_OPTION_TYPE(IntEnum):
RETURN_CONTROL_TO_ECU = 0
RESET_TO_DEFAULT = 1
FREEZE_CURRENT_STATE = 2
SHORT_TERM_ADJUSTMENT = 3
class ROUTINE_CONTROL_TYPE(IntEnum):
START = 1
STOP = 2
REQUEST_RESULTS = 3
class ROUTINE_IDENTIFIER_TYPE(IntEnum):
ERASE_MEMORY = 0xFF00
CHECK_PROGRAMMING_DEPENDENCIES = 0xFF01
ERASE_MIRROR_MEMORY_DTCS = 0xFF02
class MessageTimeoutError(Exception):
pass
class NegativeResponseError(Exception):
def __init__(self, message, service_id, error_code):
super(Exception, self).__init__(message)
self.service_id = service_id
self.error_code = error_code
class InvalidServiceIdError(Exception):
pass
class InvalidSubFunctioneError(Exception):
pass
_negative_response_codes = {
0x00: 'positive response',
0x10: 'general reject',
@ -79,33 +260,34 @@ _negative_response_codes = {
0x93: 'voltage too low',
}
class MessageTimeoutError(Exception):
pass
class NegativeResponseError(Exception):
def __init__(self, message, service_id, error_code):
super(Exception, self).__init__(message)
self.service_id = service_id
self.error_code = error_code
class InvalidServiceIdError(Exception):
pass
class InvalidSubFunctioneError(Exception):
pass
def _isotp_thread(panda, bus, tx_addr, tx_queue, rx_queue):
try:
panda.set_safety_mode(Panda.SAFETY_ALLOUTPUT)
class UdsClient():
def __init__(self, panda, tx_addr, rx_addr=None, bus=0, debug=False):
self.panda = panda
self.bus = bus
self.tx_addr = tx_addr
if rx_addr == None:
if tx_addr < 0xFFF8:
filter_addr = tx_addr+8
self.rx_addr = tx_addr+8
elif tx_addr > 0x10000000 and tx_addr < 0xFFFFFFFF:
filter_addr = (tx_addr & 0xFFFF0000) + (tx_addr<<8 & 0xFF00) + (tx_addr>>8 & 0xFF)
self.rx_addr = (tx_addr & 0xFFFF0000) + (tx_addr<<8 & 0xFF00) + (tx_addr>>8 & 0xFF)
else:
raise ValueError("invalid tx_addr: {}".format(tx_addr))
self.tx_queue = Queue()
self.rx_queue = Queue()
self.debug = debug
self.can_reader_t = threading.Thread(target=self._isotp_thread, args=(self.panda, self.bus, self.tx_addr, self.tx_queue, self.rx_queue, self.debug))
self.can_reader_t.daemon = True
self.can_reader_t.start()
def _isotp_thread(self, panda, bus, tx_addr, tx_queue, rx_queue, debug):
try:
rx_frame = {"size": 0, "data": "", "idx": 0, "done": True}
tx_frame = {"size": 0, "data": "", "idx": 0, "done": True}
# allow all output
panda.set_safety_mode(0x1337)
# clear tx buffer
panda.can_clear(bus)
# clear rx buffer
@ -114,10 +296,10 @@ def _isotp_thread(panda, bus, tx_addr, tx_queue, rx_queue):
while True:
messages = panda.can_recv()
for rx_addr, rx_ts, rx_data, rx_bus in messages:
if rx_bus != bus or rx_addr != filter_addr or len(rx_data) == 0:
if rx_bus != bus or rx_addr != rx_addr or len(rx_data) == 0:
continue
if (DEBUG): print("R: {} {}".format(hex(rx_addr), hexlify(rx_data)))
if (debug): print("R: {} {}".format(hex(rx_addr), hexlify(rx_data)))
if rx_data[0] >> 4 == 0x0:
# single rx_frame
rx_frame["size"] = rx_data[0] & 0xFF
@ -133,7 +315,7 @@ def _isotp_thread(panda, bus, tx_addr, tx_queue, rx_queue):
rx_frame["done"] = False
# send flow control message (send all bytes)
msg = "\x30\x00\x00".ljust(8, "\x00")
if (DEBUG): print("S: {} {}".format(hex(tx_addr), hexlify(msg)))
if (debug): print("S: {} {}".format(hex(tx_addr), hexlify(msg)))
panda.can_send(tx_addr, msg, bus)
elif rx_data[0] >> 4 == 0x2:
# consecutive rx frame
@ -162,7 +344,7 @@ def _isotp_thread(panda, bus, tx_addr, tx_queue, rx_queue):
tx_frame["idx"] += 1
# consecutive tx frames
msg = (chr(0x20 | (tx_frame["idx"] & 0xF)) + tx_frame["data"][i:i+7]).ljust(8, "\x00")
if (DEBUG): print("S: {} {}".format(hex(tx_addr), hexlify(msg)))
if (debug): print("S: {} {}".format(hex(tx_addr), hexlify(msg)))
panda.can_send(tx_addr, msg, bus)
if delay_ts > 0:
time.sleep(delay_ts / delay_div)
@ -177,33 +359,37 @@ def _isotp_thread(panda, bus, tx_addr, tx_queue, rx_queue):
# single frame
tx_frame["done"] = True
msg = (chr(tx_frame["size"]) + tx_frame["data"]).ljust(8, "\x00")
if (DEBUG): print("S: {} {}".format(hex(tx_addr), hexlify(msg)))
if (debug): print("S: {} {}".format(hex(tx_addr), hexlify(msg)))
panda.can_send(tx_addr, msg, bus)
else:
# first rx_frame
tx_frame["done"] = False
msg = (struct.pack("!H", 0x1000 | tx_frame["size"]) + tx_frame["data"][:6]).ljust(8, "\x00")
if (DEBUG): print("S: {} {}".format(hex(tx_addr), hexlify(msg)))
if (debug): print("S: {} {}".format(hex(tx_addr), hexlify(msg)))
panda.can_send(tx_addr, msg, bus)
else:
time.sleep(0.01)
finally:
panda.close()
rx_queue.put(None)
# generic uds request
def _uds_request(address, service_type, subfunction=None, data=None):
def _uds_request(self, service_type, subfunction=None, data=None):
req = chr(service_type)
if subfunction is not None:
req += chr(subfunction)
if data is not None:
req += data
tx_queue.put(req)
self.tx_queue.put(req)
while True:
try:
resp = rx_queue.get(block=True, timeout=10)
resp = self.rx_queue.get(block=True, timeout=10)
except Empty:
raise MessageTimeoutError("timeout waiting for response")
if resp is None:
raise MessageTimeoutError("timeout waiting for response")
resp_sid = resp[0] if len(resp) > 0 else None
# negative response
@ -240,69 +426,35 @@ def _uds_request(address, service_type, subfunction=None, data=None):
return resp[(1 if subfunction is None else 2):]
# services
class SESSION_TYPE(IntEnum):
DEFAULT = 1
PROGRAMMING = 2
EXTENDED_DIAGNOSTIC = 3
SAFETY_SYSTEM_DIAGNOSTIC = 4
def diagnostic_session_control(self, session_type):
self._uds_request(SERVICE_TYPE.DIAGNOSTIC_SESSION_CONTROL, subfunction=session_type)
def diagnostic_session_control(address, session_type):
_uds_request(address, SERVICE_TYPE.DIAGNOSTIC_SESSION_CONTROL, subfunction=session_type)
class RESET_TYPE(IntEnum):
HARD = 1
KEY_OFF_ON = 2
SOFT = 3
ENABLE_RAPID_POWER_SHUTDOWN = 4
DISABLE_RAPID_POWER_SHUTDOWN = 5
def ecu_reset(address, reset_type):
resp = _uds_request(address, SERVICE_TYPE.ECU_RESET, subfunction=reset_type)
def ecu_reset(self, reset_type):
resp = self._uds_request(SERVICE_TYPE.ECU_RESET, subfunction=reset_type)
power_down_time = None
if reset_type == RESET_TYPE.ENABLE_RAPID_POWER_SHUTDOWN:
power_down_time = resp[0]
return power_down_time
class ACCESS_TYPE(IntEnum):
REQUEST_SEED = 1
SEND_KEY = 2
def security_access(address, access_type, security_key=None):
def security_access(self, access_type, security_key=None):
request_seed = access_type % 2 != 0
if request_seed and security_key is not None:
raise ValueError('security_key not allowed')
if not request_seed and security_key is None:
raise ValueError('security_key is missing')
resp = _uds_request(address, SERVICE_TYPE.SECURITY_ACCESS, subfunction=access_type, data=security_key)
resp = self._uds_request(SERVICE_TYPE.SECURITY_ACCESS, subfunction=access_type, data=security_key)
if request_seed:
security_seed = resp
return security_seed
class CONTROL_TYPE(IntEnum):
ENABLE_RX_ENABLE_TX = 0
ENABLE_RX_DISABLE_TX = 1
DISABLE_RX_ENABLE_TX = 2
DISABLE_RX_DISABLE_TX = 3
class MESSAGE_TYPE(IntEnum):
NORMAL = 1
NETWORK_MANAGEMENT = 2
NORMAL_AND_NETWORK_MANAGEMENT = 3
def communication_control(address, control_type, message_type):
def communication_control(self, control_type, message_type):
data = chr(message_type)
_uds_request(address, SERVICE_TYPE.COMMUNICATION_CONTROL, subfunction=control_type, data=data)
self._uds_request(SERVICE_TYPE.COMMUNICATION_CONTROL, subfunction=control_type, data=data)
def tester_present(address):
_uds_request(address, SERVICE_TYPE.TESTER_PRESENT, subfunction=0x00)
def tester_present(self, ):
self._uds_request(SERVICE_TYPE.TESTER_PRESENT, subfunction=0x00)
class TIMING_PARAMETER_TYPE(IntEnum):
READ_EXTENDED_SET = 1
SET_TO_DEFAULT_VALUES = 2
READ_CURRENTLY_ACTIVE = 3
SET_TO_GIVEN_VALUES = 4
def access_timing_parameter(address, timing_parameter_type, parameter_values):
def access_timing_parameter(self, timing_parameter_type, parameter_values):
write_custom_values = timing_parameter_type == ACCESS_TIMING_PARAMETER_TYPE.SET_TO_GIVEN_VALUES
read_values = (
timing_parameter_type == ACCESS_TIMING_PARAMETER_TYPE.READ_CURRENTLY_ACTIVE or
@ -312,41 +464,27 @@ def access_timing_parameter(address, timing_parameter_type, parameter_values):
raise ValueError('parameter_values not allowed')
if write_custom_values and parameter_values is None:
raise ValueError('parameter_values is missing')
resp = _uds_request(address, SERVICE_TYPE.ACCESS_TIMING_PARAMETER, subfunction=timing_parameter_type, data=parameter_values)
resp = self._uds_request(SERVICE_TYPE.ACCESS_TIMING_PARAMETER, subfunction=timing_parameter_type, data=parameter_values)
if read_values:
# TODO: parse response into values?
parameter_values = resp
return parameter_values
def secured_data_transmission(address, data):
def secured_data_transmission(self, data):
# TODO: split data into multiple input parameters?
resp = _uds_request(address, SERVICE_TYPE.SECURED_DATA_TRANSMISSION, subfunction=None, data=data)
resp = self._uds_request(SERVICE_TYPE.SECURED_DATA_TRANSMISSION, subfunction=None, data=data)
# TODO: parse response into multiple output values?
return resp
class DTC_SETTING_TYPE(IntEnum):
ON = 1
OFF = 2
def control_dtc_setting(self, dtc_setting_type):
self._uds_request(SERVICE_TYPE.CONTROL_DTC_SETTING, subfunction=dtc_setting_type)
def control_dtc_setting(address, dtc_setting_type):
_uds_request(address, SERVICE_TYPE.CONTROL_DTC_SETTING, subfunction=dtc_setting_type)
class RESPONSE_EVENT_TYPE(IntEnum):
STOP_RESPONSE_ON_EVENT = 0
ON_DTC_STATUS_CHANGE = 1
ON_TIMER_INTERRUPT = 2
ON_CHANGE_OF_DATA_IDENTIFIER = 3
REPORT_ACTIVATED_EVENTS = 4
START_RESPONSE_ON_EVENT = 5
CLEAR_RESPONSE_ON_EVENT = 6
ON_COMPARISON_OF_VALUES = 7
def response_on_event(address, response_event_type, store_event, window_time, event_type_record, service_response_record):
def response_on_event(self, response_event_type, store_event, window_time, event_type_record, service_response_record):
if store_event:
response_event_type |= 0x20
# TODO: split record parameters into arrays
data = char(window_time) + event_type_record + service_response_record
resp = _uds_request(address, SERVICE_TYPE.RESPONSE_ON_EVENT, subfunction=response_event_type, data=data)
resp = self._uds_request(SERVICE_TYPE.RESPONSE_ON_EVENT, subfunction=response_event_type, data=data)
if response_event_type == REPORT_ACTIVATED_EVENTS:
return {
@ -360,23 +498,7 @@ def response_on_event(address, response_event_type, store_event, window_time, ev
"data": resp[2:], # TODO: parse the reset of response
}
class LINK_CONTROL_TYPE(IntEnum):
VERIFY_BAUDRATE_TRANSITION_WITH_FIXED_BAUDRATE = 1
VERIFY_BAUDRATE_TRANSITION_WITH_SPECIFIC_BAUDRATE = 2
TRANSITION_BAUDRATE = 3
class BAUD_RATE_TYPE(IntEnum):
PC9600 = 1
PC19200 = 2
PC38400 = 3
PC57600 = 4
PC115200 = 5
CAN125000 = 16
CAN250000 = 17
CAN500000 = 18
CAN1000000 = 19
def link_control(address, link_control_type, baud_rate_type=None):
def link_control(self, link_control_type, baud_rate_type=None):
if LINK_CONTROL_TYPE.VERIFY_BAUDRATE_TRANSITION_WITH_FIXED_BAUDRATE:
# baud_rate_type = BAUD_RATE_TYPE
data = chr(baud_rate_type)
@ -385,51 +507,18 @@ def link_control(address, link_control_type, baud_rate_type=None):
data = struct.pack('!I', baud_rate_type)[1:]
else:
data = None
_uds_request(address, SERVICE_TYPE.LINK_CONTROL, subfunction=link_control_type, data=data)
self._uds_request(SERVICE_TYPE.LINK_CONTROL, subfunction=link_control_type, data=data)
class DATA_IDENTIFIER_TYPE(IntEnum):
BOOT_SOFTWARE_IDENTIFICATION = 0XF180
APPLICATION_SOFTWARE_IDENTIFICATION = 0XF181
APPLICATION_DATA_IDENTIFICATION = 0XF182
BOOT_SOFTWARE_FINGERPRINT = 0XF183
APPLICATION_SOFTWARE_FINGERPRINT = 0XF184
APPLICATION_DATA_FINGERPRINT = 0XF185
ACTIVE_DIAGNOSTIC_SESSION = 0XF186
VEHICLE_MANUFACTURER_SPARE_PART_NUMBER = 0XF187
VEHICLE_MANUFACTURER_ECU_SOFTWARE_NUMBER = 0XF188
VEHICLE_MANUFACTURER_ECU_SOFTWARE_VERSION_NUMBER = 0XF189
SYSTEM_SUPPLIER_IDENTIFIER = 0XF18A
ECU_MANUFACTURING_DATE = 0XF18B
ECU_SERIAL_NUMBER = 0XF18C
SUPPORTED_FUNCTIONAL_UNITS = 0XF18D
VEHICLE_MANUFACTURER_KIT_ASSEMBLY_PART_NUMBER = 0XF18E
VIN = 0XF190
VEHICLE_MANUFACTURER_ECU_HARDWARE_NUMBER = 0XF191
SYSTEM_SUPPLIER_ECU_HARDWARE_NUMBER = 0XF192
SYSTEM_SUPPLIER_ECU_HARDWARE_VERSION_NUMBER = 0XF193
SYSTEM_SUPPLIER_ECU_SOFTWARE_NUMBER = 0XF194
SYSTEM_SUPPLIER_ECU_SOFTWARE_VERSION_NUMBER = 0XF195
EXHAUST_REGULATION_OR_TYPE_APPROVAL_NUMBER = 0XF196
SYSTEM_NAME_OR_ENGINE_TYPE = 0XF197
REPAIR_SHOP_CODE_OR_TESTER_SERIAL_NUMBER = 0XF198
PROGRAMMING_DATE = 0XF199
CALIBRATION_REPAIR_SHOP_CODE_OR_CALIBRATION_EQUIPMENT_SERIAL_NUMBER = 0XF19A
CALIBRATION_DATE = 0XF19B
CALIBRATION_EQUIPMENT_SOFTWARE_NUMBER = 0XF19C
ECU_INSTALLATION_DATE = 0XF19D
ODX_FILE = 0XF19E
ENTITY = 0XF19F
def read_data_by_identifier(address, data_identifier_type):
def read_data_by_identifier(self, data_identifier_type):
# TODO: support list of identifiers
data = struct.pack('!H', data_identifier_type)
resp = _uds_request(address, SERVICE_TYPE.READ_DATA_BY_IDENTIFIER, subfunction=None, data=data)
resp = self._uds_request(SERVICE_TYPE.READ_DATA_BY_IDENTIFIER, subfunction=None, data=data)
resp_id = struct.unpack('!H', resp[0:2])[0] if len(resp) >= 2 else None
if resp_id != data_identifier_type:
raise ValueError('invalid response data identifier: {}'.format(hex(resp_id)))
return resp[2:]
def read_memory_by_address(address, memory_address, memory_size, memory_address_bytes=4, memory_size_bytes=1):
def read_memory_by_address(self, memory_address, memory_size, memory_address_bytes=4, memory_size_bytes=1):
if memory_address_bytes < 1 or memory_address_bytes > 4:
raise ValueError('invalid memory_address_bytes: {}'.format(memory_address_bytes))
if memory_size_bytes < 1 or memory_size_bytes > 4:
@ -443,34 +532,23 @@ def read_memory_by_address(address, memory_address, memory_size, memory_address_
raise ValueError('invalid memory_size: {}'.format(memory_size))
data += struct.pack('!I', memory_size)[4-memory_size_bytes:]
resp = _uds_request(address, SERVICE_TYPE.READ_MEMORY_BY_ADDRESS, subfunction=None, data=data)
resp = self._uds_request(SERVICE_TYPE.READ_MEMORY_BY_ADDRESS, subfunction=None, data=data)
return resp
def read_scaling_data_by_identifier(address, data_identifier_type):
def read_scaling_data_by_identifier(self, data_identifier_type):
data = struct.pack('!H', data_identifier_type)
resp = _uds_request(address, SERVICE_TYPE.READ_SCALING_DATA_BY_IDENTIFIER, subfunction=None, data=data)
resp = self._uds_request(SERVICE_TYPE.READ_SCALING_DATA_BY_IDENTIFIER, subfunction=None, data=data)
resp_id = struct.unpack('!H', resp[0:2])[0] if len(resp) >= 2 else None
if resp_id != data_identifier_type:
raise ValueError('invalid response data identifier: {}'.format(hex(resp_id)))
return resp[2:] # TODO: parse the response
class TRANSMISSION_MODE_TYPE(IntEnum):
SEND_AT_SLOW_RATE = 1
SEND_AT_MEDIUM_RATE = 2
SEND_AT_FAST_RATE = 3
STOP_SENDING = 4
def read_data_by_periodic_identifier(address, transmission_mode_type, periodic_data_identifier):
def read_data_by_periodic_identifier(self, transmission_mode_type, periodic_data_identifier):
# TODO: support list of identifiers
data = chr(transmission_mode_type) + chr(periodic_data_identifier)
_uds_request(address, SERVICE_TYPE.READ_DATA_BY_PERIODIC_IDENTIFIER, subfunction=None, data=data)
self._uds_request(SERVICE_TYPE.READ_DATA_BY_PERIODIC_IDENTIFIER, subfunction=None, data=data)
class DYNAMIC_DEFINITION_TYPE(IntEnum):
DEFINE_BY_IDENTIFIER = 1
DEFINE_BY_MEMORY_ADDRESS = 2
CLEAR_DYNAMICALLY_DEFINED_DATA_IDENTIFIER = 3
def dynamically_define_data_identifier(address, dynamic_definition_type, dynamic_data_identifier, source_definitions, memory_address_bytes=4, memory_size_bytes=1):
def dynamically_define_data_identifier(self, dynamic_definition_type, dynamic_data_identifier, source_definitions, memory_address_bytes=4, memory_size_bytes=1):
if memory_address_bytes < 1 or memory_address_bytes > 4:
raise ValueError('invalid memory_address_bytes: {}'.format(memory_address_bytes))
if memory_size_bytes < 1 or memory_size_bytes > 4:
@ -494,16 +572,16 @@ def dynamically_define_data_identifier(address, dynamic_definition_type, dynamic
pass
else:
raise ValueError('invalid dynamic identifier type: {}'.format(hex(dynamic_definition_type)))
_uds_request(address, SERVICE_TYPE.DYNAMICALLY_DEFINE_DATA_IDENTIFIER, subfunction=dynamic_definition_type, data=data)
self._uds_request(SERVICE_TYPE.DYNAMICALLY_DEFINE_DATA_IDENTIFIER, subfunction=dynamic_definition_type, data=data)
def write_data_by_identifier(address, data_identifier_type, data_record):
def write_data_by_identifier(self, data_identifier_type, data_record):
data = struct.pack('!H', data_identifier_type) + data_record
resp = _uds_request(address, SERVICE_TYPE.WRITE_DATA_BY_IDENTIFIER, subfunction=None, data=data)
resp = self._uds_request(SERVICE_TYPE.WRITE_DATA_BY_IDENTIFIER, subfunction=None, data=data)
resp_id = struct.unpack('!H', resp[0:2])[0] if len(resp) >= 2 else None
if resp_id != data_identifier_type:
raise ValueError('invalid response data identifier: {}'.format(hex(resp_id)))
def write_memory_by_address(address, memory_address, memory_size, data_record, memory_address_bytes=4, memory_size_bytes=1):
def write_memory_by_address(self, memory_address, memory_size, data_record, memory_address_bytes=4, memory_size_bytes=1):
if memory_address_bytes < 1 or memory_address_bytes > 4:
raise ValueError('invalid memory_address_bytes: {}'.format(memory_address_bytes))
if memory_size_bytes < 1 or memory_size_bytes > 4:
@ -518,57 +596,13 @@ def write_memory_by_address(address, memory_address, memory_size, data_record, m
data += struct.pack('!I', memory_size)[4-memory_size_bytes:]
data += data_record
_uds_request(address, SERVICE_TYPE.WRITE_MEMORY_BY_ADDRESS, subfunction=0x00, data=data)
self._uds_request(SERVICE_TYPE.WRITE_MEMORY_BY_ADDRESS, subfunction=0x00, data=data)
class DTC_GROUP_TYPE(IntEnum):
EMISSIONS = 0x000000
ALL = 0xFFFFFF
def clear_diagnostic_information(address, dtc_group_type):
def clear_diagnostic_information(self, dtc_group_type):
data = struct.pack('!I', dtc_group_type)[1:] # 3 bytes
_uds_request(address, SERVICE_TYPE.CLEAR_DIAGNOSTIC_INFORMATION, subfunction=None, data=data)
self._uds_request(SERVICE_TYPE.CLEAR_DIAGNOSTIC_INFORMATION, subfunction=None, data=data)
class DTC_REPORT_TYPE(IntEnum):
NUMBER_OF_DTC_BY_STATUS_MASK = 0x01
DTC_BY_STATUS_MASK = 0x02
DTC_SNAPSHOT_IDENTIFICATION = 0x03
DTC_SNAPSHOT_RECORD_BY_DTC_NUMBER = 0x04
DTC_SNAPSHOT_RECORD_BY_RECORD_NUMBER = 0x05
DTC_EXTENDED_DATA_RECORD_BY_DTC_NUMBER = 0x06
NUMBER_OF_DTC_BY_SEVERITY_MASK_RECORD = 0x07
DTC_BY_SEVERITY_MASK_RECORD = 0x08
SEVERITY_INFORMATION_OF_DTC = 0x09
SUPPORTED_DTC = 0x0A
FIRST_TEST_FAILED_DTC = 0x0B
FIRST_CONFIRMED_DTC = 0x0C
MOST_RECENT_TEST_FAILED_DTC = 0x0D
MOST_RECENT_CONFIRMED_DTC = 0x0E
MIRROR_MEMORY_DTC_BY_STATUS_MASK = 0x0F
MIRROR_MEMORY_DTC_EXTENDED_DATA_RECORD_BY_DTC_NUMBER = 0x10
NUMBER_OF_MIRROR_MEMORY_DTC_BY_STATUS_MASK = 0x11
NUMBER_OF_EMISSIONS_RELATED_OBD_DTC_BY_STATUS_MASK = 0x12
EMISSIONS_RELATED_OBD_DTC_BY_STATUS_MASK = 0x13
DTC_FAULT_DETECTION_COUNTER = 0x14
DTC_WITH_PERMANENT_STATUS = 0x15
class DTC_STATUS_MASK_TYPE(IntEnum):
TEST_FAILED = 0x01
TEST_FAILED_THIS_OPERATION_CYCLE = 0x02
PENDING_DTC = 0x04
CONFIRMED_DTC = 0x08
TEST_NOT_COMPLETED_SINCE_LAST_CLEAR = 0x10
TEST_FAILED_SINCE_LAST_CLEAR = 0x20
TEST_NOT_COMPLETED_THIS_OPERATION_CYCLE = 0x40
WARNING_INDICATOR_uds_requestED = 0x80
ALL = 0xFF
class DTC_SEVERITY_MASK_TYPE(IntEnum):
MAINTENANCE_ONLY = 0x20
CHECK_AT_NEXT_HALT = 0x40
CHECK_IMMEDIATELY = 0x80
ALL = 0xE0
def read_dtc_information(address, dtc_report_type, dtc_status_mask_type=DTC_STATUS_MASK_TYPE.ALL, dtc_severity_mask_type=DTC_SEVERITY_MASK_TYPE.ALL, dtc_mask_record=0xFFFFFF, dtc_snapshot_record_num=0xFF, dtc_extended_record_num=0xFF):
def read_dtc_information(self, dtc_report_type, dtc_status_mask_type=DTC_STATUS_MASK_TYPE.ALL, dtc_severity_mask_type=DTC_SEVERITY_MASK_TYPE.ALL, dtc_mask_record=0xFFFFFF, dtc_snapshot_record_num=0xFF, dtc_extended_record_num=0xFF):
data = ''
# dtc_status_mask_type
if dtc_report_type == DTC_REPORT_TYPE.NUMBER_OF_DTC_BY_STATUS_MASK or \
@ -599,44 +633,28 @@ def read_dtc_information(address, dtc_report_type, dtc_status_mask_type=DTC_STAT
dtc_report_type == DTC_REPORT_TYPE.DTC_BY_SEVERITY_MASK_RECORD:
data += chr(dtc_severity_mask_type) + chr(dtc_status_mask_type)
resp = _uds_request(address, SERVICE_TYPE.READ_DTC_INFORMATION, subfunction=dtc_report_type, data=data)
resp = self._uds_request(SERVICE_TYPE.READ_DTC_INFORMATION, subfunction=dtc_report_type, data=data)
# TODO: parse response
return resp
class CONTROL_OPTION_TYPE(IntEnum):
RETURN_CONTROL_TO_ECU = 0
RESET_TO_DEFAULT = 1
FREEZE_CURRENT_STATE = 2
SHORT_TERM_ADJUSTMENT = 3
def input_output_control_by_identifier(address, data_identifier_type, control_option_record, control_enable_mask_record=''):
def input_output_control_by_identifier(self, data_identifier_type, control_option_record, control_enable_mask_record=''):
data = struct.pack('!H', data_identifier_type) + control_option_record + control_enable_mask_record
resp = _uds_request(address, SERVICE_TYPE.INPUT_OUTPUT_CONTROL_BY_IDENTIFIER, subfunction=None, data=data)
resp = self._uds_request(SERVICE_TYPE.INPUT_OUTPUT_CONTROL_BY_IDENTIFIER, subfunction=None, data=data)
resp_id = struct.unpack('!H', resp[0:2])[0] if len(resp) >= 2 else None
if resp_id != data_identifier_type:
raise ValueError('invalid response data identifier: {}'.format(hex(resp_id)))
return resp[2:]
class ROUTINE_CONTROL_TYPE(IntEnum):
START = 1
STOP = 2
REQUEST_RESULTS = 3
class ROUTINE_IDENTIFIER_TYPE(IntEnum):
ERASE_MEMORY = 0xFF00
CHECK_PROGRAMMING_DEPENDENCIES = 0xFF01
ERASE_MIRROR_MEMORY_DTCS = 0xFF02
def routine_control(address, routine_control_type, routine_identifier_type, routine_option_record=''):
def routine_control(self, routine_control_type, routine_identifier_type, routine_option_record=''):
data = struct.pack('!H', routine_identifier_type) + routine_option_record
resp = _uds_request(address, SERVICE_TYPE.ROUTINE_CONTROL, subfunction=routine_control_type, data=data)
resp = self._uds_request(SERVICE_TYPE.ROUTINE_CONTROL, subfunction=routine_control_type, data=data)
resp_id = struct.unpack('!H', resp[0:2])[0] if len(resp) >= 2 else None
if resp_id != routine_identifier_type:
raise ValueError('invalid response routine identifier: {}'.format(hex(resp_id)))
return resp[2:]
def request_download(address, memory_address, memory_size, memory_address_bytes=4, memory_size_bytes=4, data_format=0x00):
def request_download(self, memory_address, memory_size, memory_address_bytes=4, memory_size_bytes=4, data_format=0x00):
data = chr(data_format)
if memory_address_bytes < 1 or memory_address_bytes > 4:
@ -652,7 +670,7 @@ def request_download(address, memory_address, memory_size, memory_address_bytes=
raise ValueError('invalid memory_size: {}'.format(memory_size))
data += struct.pack('!I', memory_size)[4-memory_size_bytes:]
resp = _uds_request(address, SERVICE_TYPE.REQUEST_DOWNLOAD, subfunction=None, data=data)
resp = self._uds_request(SERVICE_TYPE.REQUEST_DOWNLOAD, subfunction=None, data=data)
max_num_bytes_len = resp[0] >> 4 if len(resp) > 0 else None
if max_num_bytes_len >= 1 and max_num_bytes_len <= 4:
max_num_bytes = struct.unpack('!I', ('\x00'*(4-max_num_bytes_len))+resp[1:max_num_bytes_len+1])[0]
@ -661,7 +679,7 @@ def request_download(address, memory_address, memory_size, memory_address_bytes=
return max_num_bytes # max number of bytes per transfer data request
def request_upload(address, memory_address, memory_size, memory_address_bytes=4, memory_size_bytes=4, data_format=0x00):
def request_upload(self, memory_address, memory_size, memory_address_bytes=4, memory_size_bytes=4, data_format=0x00):
data = chr(data_format)
if memory_address_bytes < 1 or memory_address_bytes > 4:
@ -677,7 +695,7 @@ def request_upload(address, memory_address, memory_size, memory_address_bytes=4,
raise ValueError('invalid memory_size: {}'.format(memory_size))
data += struct.pack('!I', memory_size)[4-memory_size_bytes:]
resp = _uds_request(address, SERVICE_TYPE.REQUEST_UPLOAD, subfunction=None, data=data)
resp = self._uds_request(SERVICE_TYPE.REQUEST_UPLOAD, subfunction=None, data=data)
max_num_bytes_len = resp[0] >> 4 if len(resp) > 0 else None
if max_num_bytes_len >= 1 and max_num_bytes_len <= 4:
max_num_bytes = struct.unpack('!I', ('\x00'*(4-max_num_bytes_len))+resp[1:max_num_bytes_len+1])[0]
@ -686,45 +704,13 @@ def request_upload(address, memory_address, memory_size, memory_address_bytes=4,
return max_num_bytes # max number of bytes per transfer data request
def transfer_data(address, block_sequence_count, data=''):
def transfer_data(self, block_sequence_count, data=''):
data = chr(block_sequence_count)+data
resp = _uds_request(address, SERVICE_TYPE.TRANSFER_DATA, subfunction=None, data=data)
resp = self._uds_request(SERVICE_TYPE.TRANSFER_DATA, subfunction=None, data=data)
resp_id = resp[0] if len(resp) > 0 else None
if resp_id != block_sequence_count:
raise ValueError('invalid block_sequence_count: {}'.format(resp_id))
return resp[1:]
def request_transfer_exit(address):
_uds_request(address, SERVICE_TYPE.REQUEST_TRANSFER_EXIT, subfunction=None)
if __name__ == "__main__":
from python import Panda
panda = Panda()
bus = 0
tx_addr = 0x18da30f1 # EPS
tx_queue = Queue()
rx_queue = Queue()
can_reader_t = threading.Thread(target=_isotp_thread, args=(panda, bus, tx_addr, tx_queue, rx_queue))
can_reader_t.daemon = True
can_reader_t.start()
# examples
print("tester present ...")
tester_present(tx_addr)
print("extended diagnostic session ...")
diagnostic_session_control(tx_addr, SESSION_TYPE.EXTENDED_DIAGNOSTIC)
print("read data by id: application software id ...")
app_id = read_data_by_identifier(tx_addr, DATA_IDENTIFIER_TYPE.APPLICATION_SOFTWARE_IDENTIFICATION)
print(app_id)
# for i in range(0xF100, 0xF2FF):
# try:
# dat = read_data_by_identifier(tx_addr, i)
# desc = ""
# try:
# desc = " [" + DATA_IDENTIFIER_TYPE(i).name + "]"
# except ValueError:
# pass
# print("{}:{} {} {}".format(hex(i), desc, hexlify(dat), "")) #, dat.decode(errors="ignore")))
# except NegativeResponseError as e:
# if e.error_code != 0x31:
# print("{}: {}".format(hex(i), e))
def request_transfer_exit(self):
self._uds_request(SERVICE_TYPE.REQUEST_TRANSFER_EXIT, subfunction=None)