panda/python/uds.py

738 lines
32 KiB
Python
Raw Normal View History

2019-10-12 19:52:17 -06:00
#!/usr/bin/env python3
2018-11-18 00:18:48 -07:00
import time
import struct
2019-10-13 21:43:55 -06:00
from typing import NamedTuple, List
2018-11-25 13:53:56 -07:00
from enum import IntEnum
2019-10-12 19:52:17 -06:00
from queue import Queue, Empty
from threading import Thread
2018-12-03 23:33:14 -07:00
from binascii import hexlify
2018-11-18 00:18:48 -07:00
2018-11-25 13:53:56 -07:00
class SERVICE_TYPE(IntEnum):
2018-11-25 12:51:39 -07:00
DIAGNOSTIC_SESSION_CONTROL = 0x10
ECU_RESET = 0x11
SECURITY_ACCESS = 0x27
COMMUNICATION_CONTROL = 0x28
TESTER_PRESENT = 0x3E
ACCESS_TIMING_PARAMETER = 0x83
SECURED_DATA_TRANSMISSION = 0x84
CONTROL_DTC_SETTING = 0x85
RESPONSE_ON_EVENT = 0x86
LINK_CONTROL = 0x87
READ_DATA_BY_IDENTIFIER = 0x22
READ_MEMORY_BY_ADDRESS = 0x23
READ_SCALING_DATA_BY_IDENTIFIER = 0x24
READ_DATA_BY_PERIODIC_IDENTIFIER = 0x2A
DYNAMICALLY_DEFINE_DATA_IDENTIFIER = 0x2C
WRITE_DATA_BY_IDENTIFIER = 0x2E
WRITE_MEMORY_BY_ADDRESS = 0x3D
CLEAR_DIAGNOSTIC_INFORMATION = 0x14
READ_DTC_INFORMATION = 0x19
INPUT_OUTPUT_CONTROL_BY_IDENTIFIER = 0x2F
ROUTINE_CONTROL = 0x31
REQUEST_DOWNLOAD = 0x34
REQUEST_UPLOAD = 0x35
TRANSFER_DATA = 0x36
REQUEST_TRANSFER_EXIT = 0x37
2018-11-18 00:18:48 -07:00
2018-11-25 13:53:56 -07:00
class SESSION_TYPE(IntEnum):
2018-11-18 00:18:48 -07:00
DEFAULT = 1
PROGRAMMING = 2
EXTENDED_DIAGNOSTIC = 3
SAFETY_SYSTEM_DIAGNOSTIC = 4
2018-11-25 13:53:56 -07:00
class RESET_TYPE(IntEnum):
2018-11-18 00:18:48 -07:00
HARD = 1
KEY_OFF_ON = 2
SOFT = 3
ENABLE_RAPID_POWER_SHUTDOWN = 4
DISABLE_RAPID_POWER_SHUTDOWN = 5
2018-11-25 13:53:56 -07:00
class ACCESS_TYPE(IntEnum):
2018-11-18 00:18:48 -07:00
REQUEST_SEED = 1
SEND_KEY = 2
2018-11-25 13:53:56 -07:00
class CONTROL_TYPE(IntEnum):
2018-11-18 00:18:48 -07:00
ENABLE_RX_ENABLE_TX = 0
ENABLE_RX_DISABLE_TX = 1
DISABLE_RX_ENABLE_TX = 2
DISABLE_RX_DISABLE_TX = 3
2018-11-25 13:53:56 -07:00
class MESSAGE_TYPE(IntEnum):
2018-11-18 00:18:48 -07:00
NORMAL = 1
NETWORK_MANAGEMENT = 2
NORMAL_AND_NETWORK_MANAGEMENT = 3
2018-11-25 13:53:56 -07:00
class TIMING_PARAMETER_TYPE(IntEnum):
2018-11-18 00:18:48 -07:00
READ_EXTENDED_SET = 1
SET_TO_DEFAULT_VALUES = 2
READ_CURRENTLY_ACTIVE = 3
SET_TO_GIVEN_VALUES = 4
2018-11-25 13:53:56 -07:00
class DTC_SETTING_TYPE(IntEnum):
2018-11-18 00:18:48 -07:00
ON = 1
OFF = 2
2018-11-25 13:53:56 -07:00
class RESPONSE_EVENT_TYPE(IntEnum):
2018-11-18 00:18:48 -07:00
STOP_RESPONSE_ON_EVENT = 0
ON_DTC_STATUS_CHANGE = 1
ON_TIMER_INTERRUPT = 2
ON_CHANGE_OF_DATA_IDENTIFIER = 3
REPORT_ACTIVATED_EVENTS = 4
START_RESPONSE_ON_EVENT = 5
CLEAR_RESPONSE_ON_EVENT = 6
ON_COMPARISON_OF_VALUES = 7
2018-11-25 13:53:56 -07:00
class LINK_CONTROL_TYPE(IntEnum):
2018-11-18 00:18:48 -07:00
VERIFY_BAUDRATE_TRANSITION_WITH_FIXED_BAUDRATE = 1
VERIFY_BAUDRATE_TRANSITION_WITH_SPECIFIC_BAUDRATE = 2
TRANSITION_BAUDRATE = 3
2018-11-25 13:53:56 -07:00
class BAUD_RATE_TYPE(IntEnum):
2018-11-18 00:18:48 -07:00
PC9600 = 1
PC19200 = 2
PC38400 = 3
PC57600 = 4
PC115200 = 5
CAN125000 = 16
CAN250000 = 17
CAN500000 = 18
CAN1000000 = 19
2018-11-25 13:53:56 -07:00
class DATA_IDENTIFIER_TYPE(IntEnum):
2019-10-13 21:54:02 -06:00
BOOT_SOFTWARE_IDENTIFICATION = 0xF180
APPLICATION_SOFTWARE_IDENTIFICATION = 0xF181
APPLICATION_DATA_IDENTIFICATION = 0xF182
BOOT_SOFTWARE_FINGERPRINT = 0xF183
APPLICATION_SOFTWARE_FINGERPRINT = 0xF184
APPLICATION_DATA_FINGERPRINT = 0xF185
ACTIVE_DIAGNOSTIC_SESSION = 0xF186
VEHICLE_MANUFACTURER_SPARE_PART_NUMBER = 0xF187
VEHICLE_MANUFACTURER_ECU_SOFTWARE_NUMBER = 0xF188
VEHICLE_MANUFACTURER_ECU_SOFTWARE_VERSION_NUMBER = 0xF189
SYSTEM_SUPPLIER_IDENTIFIER = 0xF18A
ECU_MANUFACTURING_DATE = 0xF18B
ECU_SERIAL_NUMBER = 0xF18C
SUPPORTED_FUNCTIONAL_UNITS = 0xF18D
VEHICLE_MANUFACTURER_KIT_ASSEMBLY_PART_NUMBER = 0xF18E
VIN = 0xF190
VEHICLE_MANUFACTURER_ECU_HARDWARE_NUMBER = 0xF191
SYSTEM_SUPPLIER_ECU_HARDWARE_NUMBER = 0xF192
SYSTEM_SUPPLIER_ECU_HARDWARE_VERSION_NUMBER = 0xF193
SYSTEM_SUPPLIER_ECU_SOFTWARE_NUMBER = 0xF194
SYSTEM_SUPPLIER_ECU_SOFTWARE_VERSION_NUMBER = 0xF195
EXHAUST_REGULATION_OR_TYPE_APPROVAL_NUMBER = 0xF196
SYSTEM_NAME_OR_ENGINE_TYPE = 0xF197
REPAIR_SHOP_CODE_OR_TESTER_SERIAL_NUMBER = 0xF198
PROGRAMMING_DATE = 0xF199
CALIBRATION_REPAIR_SHOP_CODE_OR_CALIBRATION_EQUIPMENT_SERIAL_NUMBER = 0xF19A
CALIBRATION_DATE = 0xF19B
CALIBRATION_EQUIPMENT_SOFTWARE_NUMBER = 0xF19C
ECU_INSTALLATION_DATE = 0xF19D
ODX_FILE = 0xF19E
ENTITY = 0xF19F
2018-11-18 00:18:48 -07:00
2018-11-25 13:53:56 -07:00
class TRANSMISSION_MODE_TYPE(IntEnum):
2018-11-25 03:46:20 -07:00
SEND_AT_SLOW_RATE = 1
SEND_AT_MEDIUM_RATE = 2
SEND_AT_FAST_RATE = 3
STOP_SENDING = 4
2018-11-18 00:18:48 -07:00
2018-11-25 13:53:56 -07:00
class DYNAMIC_DEFINITION_TYPE(IntEnum):
2018-11-25 03:46:20 -07:00
DEFINE_BY_IDENTIFIER = 1
DEFINE_BY_MEMORY_ADDRESS = 2
CLEAR_DYNAMICALLY_DEFINED_DATA_IDENTIFIER = 3
2018-11-18 00:18:48 -07:00
2019-10-13 21:43:55 -06:00
class DynamicSourceDefinition(NamedTuple):
data_identifier: int
position: int
memory_size: int
memory_address: int
2018-11-25 13:53:56 -07:00
class DTC_GROUP_TYPE(IntEnum):
2018-11-25 03:46:20 -07:00
EMISSIONS = 0x000000
ALL = 0xFFFFFF
2018-11-25 13:53:56 -07:00
class DTC_REPORT_TYPE(IntEnum):
2018-11-25 03:46:20 -07:00
NUMBER_OF_DTC_BY_STATUS_MASK = 0x01
DTC_BY_STATUS_MASK = 0x02
DTC_SNAPSHOT_IDENTIFICATION = 0x03
DTC_SNAPSHOT_RECORD_BY_DTC_NUMBER = 0x04
DTC_SNAPSHOT_RECORD_BY_RECORD_NUMBER = 0x05
DTC_EXTENDED_DATA_RECORD_BY_DTC_NUMBER = 0x06
NUMBER_OF_DTC_BY_SEVERITY_MASK_RECORD = 0x07
DTC_BY_SEVERITY_MASK_RECORD = 0x08
SEVERITY_INFORMATION_OF_DTC = 0x09
SUPPORTED_DTC = 0x0A
FIRST_TEST_FAILED_DTC = 0x0B
FIRST_CONFIRMED_DTC = 0x0C
MOST_RECENT_TEST_FAILED_DTC = 0x0D
MOST_RECENT_CONFIRMED_DTC = 0x0E
MIRROR_MEMORY_DTC_BY_STATUS_MASK = 0x0F
MIRROR_MEMORY_DTC_EXTENDED_DATA_RECORD_BY_DTC_NUMBER = 0x10
NUMBER_OF_MIRROR_MEMORY_DTC_BY_STATUS_MASK = 0x11
NUMBER_OF_EMISSIONS_RELATED_OBD_DTC_BY_STATUS_MASK = 0x12
EMISSIONS_RELATED_OBD_DTC_BY_STATUS_MASK = 0x13
DTC_FAULT_DETECTION_COUNTER = 0x14
DTC_WITH_PERMANENT_STATUS = 0x15
2018-11-25 13:53:56 -07:00
class DTC_STATUS_MASK_TYPE(IntEnum):
2018-11-25 03:46:20 -07:00
TEST_FAILED = 0x01
TEST_FAILED_THIS_OPERATION_CYCLE = 0x02
PENDING_DTC = 0x04
CONFIRMED_DTC = 0x08
TEST_NOT_COMPLETED_SINCE_LAST_CLEAR = 0x10
TEST_FAILED_SINCE_LAST_CLEAR = 0x20
TEST_NOT_COMPLETED_THIS_OPERATION_CYCLE = 0x40
2019-10-14 18:30:30 -06:00
WARNING_INDICATOR_REQUESTED = 0x80
2018-11-25 03:46:20 -07:00
ALL = 0xFF
2018-11-25 13:53:56 -07:00
class DTC_SEVERITY_MASK_TYPE(IntEnum):
2018-11-25 03:46:20 -07:00
MAINTENANCE_ONLY = 0x20
CHECK_AT_NEXT_HALT = 0x40
CHECK_IMMEDIATELY = 0x80
ALL = 0xE0
2019-10-13 21:43:55 -06:00
class CONTROL_PARAMETER_TYPE(IntEnum):
2018-11-18 00:18:48 -07:00
RETURN_CONTROL_TO_ECU = 0
RESET_TO_DEFAULT = 1
FREEZE_CURRENT_STATE = 2
SHORT_TERM_ADJUSTMENT = 3
2018-11-25 13:53:56 -07:00
class ROUTINE_CONTROL_TYPE(IntEnum):
2018-11-25 03:46:20 -07:00
START = 1
STOP = 2
REQUEST_RESULTS = 3
2018-11-25 13:53:56 -07:00
class ROUTINE_IDENTIFIER_TYPE(IntEnum):
2018-11-18 00:18:48 -07:00
ERASE_MEMORY = 0xFF00
CHECK_PROGRAMMING_DEPENDENCIES = 0xFF01
ERASE_MIRROR_MEMORY_DTCS = 0xFF02
2018-12-12 10:08:09 -07:00
class MessageTimeoutError(Exception):
pass
class NegativeResponseError(Exception):
def __init__(self, message, service_id, error_code):
super(Exception, self).__init__(message)
self.service_id = service_id
self.error_code = error_code
class InvalidServiceIdError(Exception):
pass
class InvalidSubFunctioneError(Exception):
pass
_negative_response_codes = {
0x00: 'positive response',
0x10: 'general reject',
0x11: 'service not supported',
0x12: 'sub-function not supported',
0x13: 'incorrect message length or invalid format',
0x14: 'response too long',
0x21: 'busy repeat request',
0x22: 'conditions not correct',
0x24: 'request sequence error',
0x25: 'no response from subnet component',
0x26: 'failure prevents execution of requested action',
0x31: 'request out of range',
0x33: 'security access denied',
0x35: 'invalid key',
0x36: 'exceed numebr of attempts',
0x37: 'required time delay not expired',
0x70: 'upload download not accepted',
0x71: 'transfer data suspended',
0x72: 'general programming failure',
0x73: 'wrong block sequence counter',
0x78: 'request correctly received - response pending',
0x7e: 'sub-function not supported in active session',
0x7f: 'service not supported in active session',
0x81: 'rpm too high',
0x82: 'rpm too low',
0x83: 'engine is running',
0x84: 'engine is not running',
0x85: 'engine run time too low',
0x86: 'temperature too high',
0x87: 'temperature too low',
0x88: 'vehicle speed too high',
0x89: 'vehicle speed too low',
0x8a: 'throttle/pedal too high',
0x8b: 'throttle/pedal too low',
0x8c: 'transmission not in neutral',
0x8d: 'transmission not in gear',
0x8f: 'brake switch(es) not closed',
0x90: 'shifter lever not in park',
0x91: 'torque converter clutch locked',
0x92: 'voltage too high',
0x93: 'voltage too low',
}
class UdsClient():
2019-10-13 21:43:55 -06:00
def __init__(self, panda, tx_addr: int, rx_addr: int=None, bus: int=0, timeout: int=10, debug: bool=False):
2018-12-12 10:08:09 -07:00
self.panda = panda
self.bus = bus
self.tx_addr = tx_addr
if rx_addr == None:
if tx_addr < 0xFFF8:
2019-10-12 19:52:17 -06:00
# standard 11 bit response addr (add 8)
2018-12-12 10:08:09 -07:00
self.rx_addr = tx_addr+8
elif tx_addr > 0x10000000 and tx_addr < 0xFFFFFFFF:
2019-10-13 13:13:44 -06:00
# standard 29 bit response addr (flip last two bytes)
2018-12-12 10:08:09 -07:00
self.rx_addr = (tx_addr & 0xFFFF0000) + (tx_addr<<8 & 0xFF00) + (tx_addr>>8 & 0xFF)
else:
raise ValueError("invalid tx_addr: {}".format(tx_addr))
self.tx_queue = Queue()
self.rx_queue = Queue()
2018-12-20 01:22:55 -07:00
self.timeout = timeout
2018-12-12 10:08:09 -07:00
self.debug = debug
2019-10-12 19:52:17 -06:00
self.can_reader_t = Thread(target=self._isotp_thread, args=(self.debug,))
2018-12-12 10:08:09 -07:00
self.can_reader_t.daemon = True
self.can_reader_t.start()
2019-10-13 21:43:55 -06:00
def _isotp_thread(self, debug: bool=False):
2018-12-12 10:08:09 -07:00
try:
2019-10-13 09:30:44 -06:00
rx_frame = {"size": 0, "data": b"", "idx": 0, "done": True}
tx_frame = {"size": 0, "data": b"", "idx": 0, "done": True}
2018-12-12 10:08:09 -07:00
# allow all output
2018-12-17 01:45:10 -07:00
self.panda.set_safety_mode(0x1337)
2018-12-12 10:08:09 -07:00
# clear tx buffer
2018-12-17 01:45:10 -07:00
self.panda.can_clear(self.bus)
2018-12-12 10:08:09 -07:00
# clear rx buffer
2018-12-17 01:45:10 -07:00
self.panda.can_clear(0xFFFF)
2018-12-12 10:08:09 -07:00
while True:
2018-12-17 01:45:10 -07:00
messages = self.panda.can_recv()
2018-12-12 10:08:09 -07:00
for rx_addr, rx_ts, rx_data, rx_bus in messages:
2018-12-17 01:45:10 -07:00
if rx_bus != self.bus or rx_addr != self.rx_addr or len(rx_data) == 0:
2018-12-12 10:08:09 -07:00
continue
if (debug): print("R: {} {}".format(hex(rx_addr), hexlify(rx_data)))
if rx_data[0] >> 4 == 0x0:
# single rx_frame
rx_frame["size"] = rx_data[0] & 0xFF
rx_frame["data"] = rx_data[1:1+rx_frame["size"]]
rx_frame["idx"] = 0
rx_frame["done"] = True
2018-12-17 01:45:10 -07:00
self.rx_queue.put(rx_frame["data"])
2018-12-12 10:08:09 -07:00
elif rx_data[0] >> 4 == 0x1:
# first rx_frame
rx_frame["size"] = ((rx_data[0] & 0x0F) << 8) + rx_data[1]
rx_frame["data"] = rx_data[2:]
rx_frame["idx"] = 0
rx_frame["done"] = False
# send flow control message (send all bytes)
2019-10-12 19:52:17 -06:00
msg = b"\x30\x00\x00".ljust(8, b"\x00")
2018-12-17 01:45:10 -07:00
if (debug): print("S: {} {}".format(hex(self.tx_addr), hexlify(msg)))
self.panda.can_send(self.tx_addr, msg, self.bus)
2018-12-12 10:08:09 -07:00
elif rx_data[0] >> 4 == 0x2:
# consecutive rx frame
assert rx_frame["done"] == False, "rx: no active frame"
# validate frame index
rx_frame["idx"] += 1
assert rx_frame["idx"] & 0xF == rx_data[0] & 0xF, "rx: invalid consecutive frame index"
rx_size = rx_frame["size"] - len(rx_frame["data"])
rx_frame["data"] += rx_data[1:1+min(rx_size, 7)]
if rx_frame["size"] == len(rx_frame["data"]):
rx_frame["done"] = True
2018-12-17 01:45:10 -07:00
self.rx_queue.put(rx_frame["data"])
2018-12-12 10:08:09 -07:00
elif rx_data[0] >> 4 == 0x3:
# flow control
2019-10-13 09:30:44 -06:00
if tx_frame["done"] != False:
tx_frame["done"] = True
self.rx_queue.put(b"\x7F\xFF\xFFtx: no active frame")
if rx_data[0] == 0x32:
2019-10-13 13:13:44 -06:00
# 0x32 = overflow/abort
2019-10-13 09:30:44 -06:00
tx_frame["done"] = True
self.rx_queue.put(b"\x7F\xFF\xFFtx: flow-control error - overflow/abort")
2019-10-13 13:13:44 -06:00
if rx_data[0] != 0x30 and rx_data[0] != 0x31:
# 0x30 = continue
# 0x31 = wait
tx_frame["done"] = True
self.rx_queue.put(b"\x7F\xFF\xFFtx: flow-control error - invalid transfer state indicator")
if rx_data[0] == 0x30:
delay_ts = rx_data[2] & 0x7F
# scale is 1 milliseconds if first bit == 0, 100 micro seconds if first bit == 1
delay_div = 1000. if rx_data[2] & 0x80 == 0 else 10000.
# first frame = 6 bytes, each consecutive frame = 7 bytes
start = 6 + tx_frame["idx"] * 7
count = rx_data[1]
end = start + count * 7 if count > 0 else tx_frame["size"]
for i in range(start, end, 7):
if delay_ts > 0 and i > start:
if (debug): print("D: {}".format(delay_ts / delay_div))
time.sleep(delay_ts / delay_div)
tx_frame["idx"] += 1
# consecutive tx frames
msg = (bytes([0x20 | (tx_frame["idx"] & 0xF)]) + tx_frame["data"][i:i+7]).ljust(8, b"\x00")
if (debug): print("S: {} {}".format(hex(self.tx_addr), hexlify(msg)))
self.panda.can_send(self.tx_addr, msg, self.bus)
if end >= tx_frame["size"]:
tx_frame["done"] = True
2018-12-12 10:08:09 -07:00
2018-12-17 01:45:10 -07:00
if not self.tx_queue.empty():
req = self.tx_queue.get(block=False)
2018-12-12 10:08:09 -07:00
# reset rx and tx frames
2019-10-13 09:30:44 -06:00
rx_frame = {"size": 0, "data": b"", "idx": 0, "done": True}
2018-12-12 10:08:09 -07:00
tx_frame = {"size": len(req), "data": req, "idx": 0, "done": False}
if tx_frame["size"] < 8:
# single frame
tx_frame["done"] = True
2019-10-13 13:13:44 -06:00
msg = (bytes([tx_frame["size"]]) + tx_frame["data"]).ljust(8, b"\x00")
2018-12-17 01:45:10 -07:00
if (debug): print("S: {} {}".format(hex(self.tx_addr), hexlify(msg)))
self.panda.can_send(self.tx_addr, msg, self.bus)
2018-12-12 10:08:09 -07:00
else:
# first rx_frame
tx_frame["done"] = False
2019-10-12 19:52:17 -06:00
msg = (struct.pack("!H", 0x1000 | tx_frame["size"]) + tx_frame["data"][:6]).ljust(8, b"\x00")
2018-12-17 01:45:10 -07:00
if (debug): print("S: {} {}".format(hex(self.tx_addr), hexlify(msg)))
self.panda.can_send(self.tx_addr, msg, self.bus)
2018-12-12 10:08:09 -07:00
else:
time.sleep(0.01)
finally:
2018-12-17 01:45:10 -07:00
self.panda.close()
self.rx_queue.put(None)
2018-12-12 10:08:09 -07:00
# generic uds request
2019-10-13 21:43:55 -06:00
def _uds_request(self, service_type: SERVICE_TYPE, subfunction: int=None, data: bytes=None) -> bytes:
2019-10-13 13:13:44 -06:00
req = bytes([service_type])
2018-12-12 10:08:09 -07:00
if subfunction is not None:
2019-10-13 13:13:44 -06:00
req += bytes([subfunction])
2018-12-12 10:08:09 -07:00
if data is not None:
req += data
self.tx_queue.put(req)
while True:
try:
2018-12-20 01:22:55 -07:00
resp = self.rx_queue.get(block=True, timeout=self.timeout)
2018-12-12 10:08:09 -07:00
except Empty:
raise MessageTimeoutError("timeout waiting for response")
if resp is None:
raise MessageTimeoutError("timeout waiting for response")
resp_sid = resp[0] if len(resp) > 0 else None
# negative response
if resp_sid == 0x7F:
service_id = resp[1] if len(resp) > 1 else -1
try:
service_desc = SERVICE_TYPE(service_id).name
except Exception:
service_desc = 'NON_STANDARD_SERVICE'
error_code = resp[2] if len(resp) > 2 else -1
try:
error_desc = _negative_response_codes[error_code]
except Exception:
2019-10-13 09:30:44 -06:00
error_desc = resp[3:]
2018-12-12 10:08:09 -07:00
# wait for another message if response pending
if error_code == 0x78:
time.sleep(0.1)
continue
raise NegativeResponseError('{} - {}'.format(service_desc, error_desc), service_id, error_code)
break
# positive response
if service_type+0x40 != resp_sid:
resp_sid_hex = hex(resp_sid) if resp_sid is not None else None
raise InvalidServiceIdError('invalid response service id: {}'.format(resp_sid_hex))
if subfunction is not None:
resp_sfn = resp[1] if len(resp) > 1 else None
if subfunction != resp_sfn:
resp_sfn_hex = hex(resp_sfn) if resp_sfn is not None else None
raise InvalidSubFunctioneError('invalid response subfunction: {}'.format(hex(resp_sfn)))
# return data (exclude service id and sub-function id)
return resp[(1 if subfunction is None else 2):]
# services
2019-10-13 21:43:55 -06:00
def diagnostic_session_control(self, session_type: SESSION_TYPE):
2018-12-12 10:08:09 -07:00
self._uds_request(SERVICE_TYPE.DIAGNOSTIC_SESSION_CONTROL, subfunction=session_type)
2019-10-13 21:43:55 -06:00
def ecu_reset(self, reset_type: RESET_TYPE):
2018-12-12 10:08:09 -07:00
resp = self._uds_request(SERVICE_TYPE.ECU_RESET, subfunction=reset_type)
power_down_time = None
if reset_type == RESET_TYPE.ENABLE_RAPID_POWER_SHUTDOWN:
power_down_time = resp[0]
return power_down_time
2019-10-13 21:43:55 -06:00
def security_access(self, access_type: ACCESS_TYPE, security_key: bytes=None):
2018-12-12 10:08:09 -07:00
request_seed = access_type % 2 != 0
if request_seed and security_key is not None:
raise ValueError('security_key not allowed')
if not request_seed and security_key is None:
raise ValueError('security_key is missing')
resp = self._uds_request(SERVICE_TYPE.SECURITY_ACCESS, subfunction=access_type, data=security_key)
if request_seed:
security_seed = resp
return security_seed
2019-10-13 21:43:55 -06:00
def communication_control(self, control_type: CONTROL_TYPE, message_type: MESSAGE_TYPE):
2019-10-13 13:13:44 -06:00
data = bytes([message_type])
2018-12-12 10:08:09 -07:00
self._uds_request(SERVICE_TYPE.COMMUNICATION_CONTROL, subfunction=control_type, data=data)
def tester_present(self, ):
self._uds_request(SERVICE_TYPE.TESTER_PRESENT, subfunction=0x00)
2019-10-13 21:43:55 -06:00
def access_timing_parameter(self, timing_parameter_type: TIMING_PARAMETER_TYPE, parameter_values: bytes=None):
2018-12-19 23:46:40 -07:00
write_custom_values = timing_parameter_type == TIMING_PARAMETER_TYPE.SET_TO_GIVEN_VALUES
2018-12-12 10:08:09 -07:00
read_values = (
2018-12-19 23:46:40 -07:00
timing_parameter_type == TIMING_PARAMETER_TYPE.READ_CURRENTLY_ACTIVE or
timing_parameter_type == TIMING_PARAMETER_TYPE.READ_EXTENDED_SET
2018-12-12 10:08:09 -07:00
)
if not write_custom_values and parameter_values is not None:
raise ValueError('parameter_values not allowed')
if write_custom_values and parameter_values is None:
raise ValueError('parameter_values is missing')
resp = self._uds_request(SERVICE_TYPE.ACCESS_TIMING_PARAMETER, subfunction=timing_parameter_type, data=parameter_values)
if read_values:
# TODO: parse response into values?
parameter_values = resp
return parameter_values
2019-10-13 21:43:55 -06:00
def secured_data_transmission(self, data: bytes):
2018-12-12 10:08:09 -07:00
# TODO: split data into multiple input parameters?
resp = self._uds_request(SERVICE_TYPE.SECURED_DATA_TRANSMISSION, subfunction=None, data=data)
# TODO: parse response into multiple output values?
return resp
2019-10-13 21:43:55 -06:00
def control_dtc_setting(self, dtc_setting_type: DTC_SETTING_TYPE):
2018-12-12 10:08:09 -07:00
self._uds_request(SERVICE_TYPE.CONTROL_DTC_SETTING, subfunction=dtc_setting_type)
2019-10-13 21:43:55 -06:00
def response_on_event(self, response_event_type: RESPONSE_EVENT_TYPE, store_event: bool, window_time: int, event_type_record: int, service_response_record: int):
2018-12-12 10:08:09 -07:00
if store_event:
response_event_type |= 0x20
# TODO: split record parameters into arrays
2019-10-13 13:13:44 -06:00
data = bytes([window_time, event_type_record, service_response_record])
2018-12-12 10:08:09 -07:00
resp = self._uds_request(SERVICE_TYPE.RESPONSE_ON_EVENT, subfunction=response_event_type, data=data)
2018-12-19 23:46:40 -07:00
if response_event_type == RESPONSE_EVENT_TYPE.REPORT_ACTIVATED_EVENTS:
2018-12-12 10:08:09 -07:00
return {
"num_of_activated_events": resp[0],
"data": resp[1:], # TODO: parse the reset of response
}
return {
"num_of_identified_events": resp[0],
"event_window_time": resp[1],
"data": resp[2:], # TODO: parse the reset of response
}
2019-10-13 21:43:55 -06:00
def link_control(self, link_control_type: LINK_CONTROL_TYPE, baud_rate_type: BAUD_RATE_TYPE=None):
2018-12-19 23:46:40 -07:00
if link_control_type == LINK_CONTROL_TYPE.VERIFY_BAUDRATE_TRANSITION_WITH_FIXED_BAUDRATE:
2018-12-12 10:08:09 -07:00
# baud_rate_type = BAUD_RATE_TYPE
2019-10-13 13:13:44 -06:00
data = bytes([baud_rate_type])
2018-12-19 23:46:40 -07:00
elif link_control_type == LINK_CONTROL_TYPE.VERIFY_BAUDRATE_TRANSITION_WITH_SPECIFIC_BAUDRATE:
2018-12-12 10:08:09 -07:00
# baud_rate_type = custom value (3 bytes big-endian)
data = struct.pack('!I', baud_rate_type)[1:]
else:
data = None
self._uds_request(SERVICE_TYPE.LINK_CONTROL, subfunction=link_control_type, data=data)
2019-10-13 21:43:55 -06:00
def read_data_by_identifier(self, data_identifier_type: DATA_IDENTIFIER_TYPE):
2018-12-12 10:08:09 -07:00
# TODO: support list of identifiers
data = struct.pack('!H', data_identifier_type)
resp = self._uds_request(SERVICE_TYPE.READ_DATA_BY_IDENTIFIER, subfunction=None, data=data)
resp_id = struct.unpack('!H', resp[0:2])[0] if len(resp) >= 2 else None
if resp_id != data_identifier_type:
raise ValueError('invalid response data identifier: {}'.format(hex(resp_id)))
return resp[2:]
2019-10-13 21:43:55 -06:00
def read_memory_by_address(self, memory_address: int, memory_size: int, memory_address_bytes: int=4, memory_size_bytes: int=1):
2018-12-12 10:08:09 -07:00
if memory_address_bytes < 1 or memory_address_bytes > 4:
raise ValueError('invalid memory_address_bytes: {}'.format(memory_address_bytes))
if memory_size_bytes < 1 or memory_size_bytes > 4:
raise ValueError('invalid memory_size_bytes: {}'.format(memory_size_bytes))
2019-10-13 13:13:44 -06:00
data = bytes([memory_size_bytes<<4 | memory_address_bytes])
2018-12-12 10:08:09 -07:00
if memory_address >= 1<<(memory_address_bytes*8):
raise ValueError('invalid memory_address: {}'.format(memory_address))
data += struct.pack('!I', memory_address)[4-memory_address_bytes:]
if memory_size >= 1<<(memory_size_bytes*8):
raise ValueError('invalid memory_size: {}'.format(memory_size))
data += struct.pack('!I', memory_size)[4-memory_size_bytes:]
resp = self._uds_request(SERVICE_TYPE.READ_MEMORY_BY_ADDRESS, subfunction=None, data=data)
return resp
2019-10-13 21:43:55 -06:00
def read_scaling_data_by_identifier(self, data_identifier_type: DATA_IDENTIFIER_TYPE):
2018-12-12 10:08:09 -07:00
data = struct.pack('!H', data_identifier_type)
resp = self._uds_request(SERVICE_TYPE.READ_SCALING_DATA_BY_IDENTIFIER, subfunction=None, data=data)
resp_id = struct.unpack('!H', resp[0:2])[0] if len(resp) >= 2 else None
if resp_id != data_identifier_type:
raise ValueError('invalid response data identifier: {}'.format(hex(resp_id)))
return resp[2:] # TODO: parse the response
2019-10-13 21:43:55 -06:00
def read_data_by_periodic_identifier(self, transmission_mode_type: TRANSMISSION_MODE_TYPE, periodic_data_identifier: int):
2018-12-12 10:08:09 -07:00
# TODO: support list of identifiers
2019-10-13 13:13:44 -06:00
data = bytes([transmission_mode_type, periodic_data_identifier])
2018-12-12 10:08:09 -07:00
self._uds_request(SERVICE_TYPE.READ_DATA_BY_PERIODIC_IDENTIFIER, subfunction=None, data=data)
2019-10-13 21:43:55 -06:00
def dynamically_define_data_identifier(self, dynamic_definition_type: DYNAMIC_DEFINITION_TYPE, dynamic_data_identifier: int, source_definitions: List[DynamicSourceDefinition], memory_address_bytes: int=4, memory_size_bytes: int=1):
2018-12-12 10:08:09 -07:00
if memory_address_bytes < 1 or memory_address_bytes > 4:
raise ValueError('invalid memory_address_bytes: {}'.format(memory_address_bytes))
if memory_size_bytes < 1 or memory_size_bytes > 4:
raise ValueError('invalid memory_size_bytes: {}'.format(memory_size_bytes))
data = struct.pack('!H', dynamic_data_identifier)
if dynamic_definition_type == DYNAMIC_DEFINITION_TYPE.DEFINE_BY_IDENTIFIER:
for s in source_definitions:
2019-10-13 13:13:44 -06:00
data += struct.pack('!H', s["data_identifier"]) + bytes([s["position"], s["memory_size"]])
2018-12-12 10:08:09 -07:00
elif dynamic_definition_type == DYNAMIC_DEFINITION_TYPE.DEFINE_BY_MEMORY_ADDRESS:
2019-10-13 13:13:44 -06:00
data += bytes([memory_size_bytes<<4 | memory_address_bytes])
2018-12-12 10:08:09 -07:00
for s in source_definitions:
if s["memory_address"] >= 1<<(memory_address_bytes*8):
raise ValueError('invalid memory_address: {}'.format(s["memory_address"]))
data += struct.pack('!I', memory_address)[4-memory_address_bytes:]
if s["memory_size"] >= 1<<(memory_size_bytes*8):
raise ValueError('invalid memory_size: {}'.format(s["memory_size"]))
data += struct.pack('!I', s["memory_size"])[4-memory_size_bytes:]
elif dynamic_definition_type == DYNAMIC_DEFINITION_TYPE.CLEAR_DYNAMICALLY_DEFINED_DATA_IDENTIFIER:
pass
else:
raise ValueError('invalid dynamic identifier type: {}'.format(hex(dynamic_definition_type)))
self._uds_request(SERVICE_TYPE.DYNAMICALLY_DEFINE_DATA_IDENTIFIER, subfunction=dynamic_definition_type, data=data)
2019-10-13 21:43:55 -06:00
def write_data_by_identifier(self, data_identifier_type: DATA_IDENTIFIER_TYPE, data_record: bytes):
2018-12-12 10:08:09 -07:00
data = struct.pack('!H', data_identifier_type) + data_record
resp = self._uds_request(SERVICE_TYPE.WRITE_DATA_BY_IDENTIFIER, subfunction=None, data=data)
resp_id = struct.unpack('!H', resp[0:2])[0] if len(resp) >= 2 else None
if resp_id != data_identifier_type:
raise ValueError('invalid response data identifier: {}'.format(hex(resp_id)))
2019-10-13 21:43:55 -06:00
def write_memory_by_address(self, memory_address: int, memory_size: int, data_record: bytes, memory_address_bytes: int=4, memory_size_bytes: int=1):
2018-12-12 10:08:09 -07:00
if memory_address_bytes < 1 or memory_address_bytes > 4:
raise ValueError('invalid memory_address_bytes: {}'.format(memory_address_bytes))
if memory_size_bytes < 1 or memory_size_bytes > 4:
raise ValueError('invalid memory_size_bytes: {}'.format(memory_size_bytes))
2019-10-13 13:13:44 -06:00
data = bytes([memory_size_bytes<<4 | memory_address_bytes])
2018-12-12 10:08:09 -07:00
if memory_address >= 1<<(memory_address_bytes*8):
raise ValueError('invalid memory_address: {}'.format(memory_address))
data += struct.pack('!I', memory_address)[4-memory_address_bytes:]
if memory_size >= 1<<(memory_size_bytes*8):
raise ValueError('invalid memory_size: {}'.format(memory_size))
data += struct.pack('!I', memory_size)[4-memory_size_bytes:]
data += data_record
self._uds_request(SERVICE_TYPE.WRITE_MEMORY_BY_ADDRESS, subfunction=0x00, data=data)
2019-10-13 21:43:55 -06:00
def clear_diagnostic_information(self, dtc_group_type: DTC_GROUP_TYPE):
2018-12-12 10:08:09 -07:00
data = struct.pack('!I', dtc_group_type)[1:] # 3 bytes
self._uds_request(SERVICE_TYPE.CLEAR_DIAGNOSTIC_INFORMATION, subfunction=None, data=data)
2019-10-13 21:43:55 -06:00
def read_dtc_information(self, dtc_report_type: DTC_REPORT_TYPE, dtc_status_mask_type: DTC_STATUS_MASK_TYPE=DTC_STATUS_MASK_TYPE.ALL, dtc_severity_mask_type: DTC_SEVERITY_MASK_TYPE=DTC_SEVERITY_MASK_TYPE.ALL, dtc_mask_record: int=0xFFFFFF, dtc_snapshot_record_num: int=0xFF, dtc_extended_record_num: int=0xFF):
2019-10-13 09:30:44 -06:00
data = b''
2018-12-12 10:08:09 -07:00
# dtc_status_mask_type
if dtc_report_type == DTC_REPORT_TYPE.NUMBER_OF_DTC_BY_STATUS_MASK or \
dtc_report_type == DTC_REPORT_TYPE.DTC_BY_STATUS_MASK or \
dtc_report_type == DTC_REPORT_TYPE.MIRROR_MEMORY_DTC_BY_STATUS_MASK or \
dtc_report_type == DTC_REPORT_TYPE.NUMBER_OF_MIRROR_MEMORY_DTC_BY_STATUS_MASK or \
dtc_report_type == DTC_REPORT_TYPE.NUMBER_OF_EMISSIONS_RELATED_OBD_DTC_BY_STATUS_MASK or \
dtc_report_type == DTC_REPORT_TYPE.EMISSIONS_RELATED_OBD_DTC_BY_STATUS_MASK:
2019-10-13 13:13:44 -06:00
data += bytes([dtc_status_mask_type])
2018-12-12 10:08:09 -07:00
# dtc_mask_record
if dtc_report_type == DTC_REPORT_TYPE.DTC_SNAPSHOT_IDENTIFICATION or \
dtc_report_type == DTC_REPORT_TYPE.DTC_SNAPSHOT_RECORD_BY_DTC_NUMBER or \
dtc_report_type == DTC_REPORT_TYPE.DTC_EXTENDED_DATA_RECORD_BY_DTC_NUMBER or \
dtc_report_type == DTC_REPORT_TYPE.MIRROR_MEMORY_DTC_EXTENDED_DATA_RECORD_BY_DTC_NUMBER or \
dtc_report_type == DTC_REPORT_TYPE.SEVERITY_INFORMATION_OF_DTC:
data += struct.pack('!I', dtc_mask_record)[1:] # 3 bytes
# dtc_snapshot_record_num
if dtc_report_type == DTC_REPORT_TYPE.DTC_SNAPSHOT_IDENTIFICATION or \
dtc_report_type == DTC_REPORT_TYPE.DTC_SNAPSHOT_RECORD_BY_DTC_NUMBER or \
dtc_report_type == DTC_REPORT_TYPE.DTC_SNAPSHOT_RECORD_BY_RECORD_NUMBER:
data += ord(dtc_snapshot_record_num)
# dtc_extended_record_num
if dtc_report_type == DTC_REPORT_TYPE.DTC_EXTENDED_DATA_RECORD_BY_DTC_NUMBER or \
dtc_report_type == DTC_REPORT_TYPE.MIRROR_MEMORY_DTC_EXTENDED_DATA_RECORD_BY_DTC_NUMBER:
2019-10-13 13:13:44 -06:00
data += bytes([dtc_extended_record_num])
2018-12-12 10:08:09 -07:00
# dtc_severity_mask_type
if dtc_report_type == DTC_REPORT_TYPE.NUMBER_OF_DTC_BY_SEVERITY_MASK_RECORD or \
dtc_report_type == DTC_REPORT_TYPE.DTC_BY_SEVERITY_MASK_RECORD:
2019-10-13 13:13:44 -06:00
data += bytes([dtc_severity_mask_type, dtc_status_mask_type])
2018-12-12 10:08:09 -07:00
resp = self._uds_request(SERVICE_TYPE.READ_DTC_INFORMATION, subfunction=dtc_report_type, data=data)
# TODO: parse response
return resp
2019-10-13 21:43:55 -06:00
def input_output_control_by_identifier(self, data_identifier_type: DATA_IDENTIFIER_TYPE, control_parameter_type: CONTROL_PARAMETER_TYPE, control_option_record: bytes, control_enable_mask_record: bytes=b''):
data = struct.pack('!H', data_identifier_type) + bytes([control_parameter_type]) + control_option_record + control_enable_mask_record
2018-12-12 10:08:09 -07:00
resp = self._uds_request(SERVICE_TYPE.INPUT_OUTPUT_CONTROL_BY_IDENTIFIER, subfunction=None, data=data)
resp_id = struct.unpack('!H', resp[0:2])[0] if len(resp) >= 2 else None
if resp_id != data_identifier_type:
raise ValueError('invalid response data identifier: {}'.format(hex(resp_id)))
return resp[2:]
2019-10-13 21:43:55 -06:00
def routine_control(self, routine_control_type: ROUTINE_CONTROL_TYPE, routine_identifier_type: ROUTINE_IDENTIFIER_TYPE, routine_option_record: bytes=b''):
2018-12-12 10:08:09 -07:00
data = struct.pack('!H', routine_identifier_type) + routine_option_record
resp = self._uds_request(SERVICE_TYPE.ROUTINE_CONTROL, subfunction=routine_control_type, data=data)
resp_id = struct.unpack('!H', resp[0:2])[0] if len(resp) >= 2 else None
if resp_id != routine_identifier_type:
raise ValueError('invalid response routine identifier: {}'.format(hex(resp_id)))
return resp[2:]
2019-10-13 21:43:55 -06:00
def request_download(self, memory_address: int, memory_size: int, memory_address_bytes: int=4, memory_size_bytes: int=4, data_format: int=0x00):
2019-10-13 13:13:44 -06:00
data = bytes([data_format])
2018-12-12 10:08:09 -07:00
if memory_address_bytes < 1 or memory_address_bytes > 4:
raise ValueError('invalid memory_address_bytes: {}'.format(memory_address_bytes))
if memory_size_bytes < 1 or memory_size_bytes > 4:
raise ValueError('invalid memory_size_bytes: {}'.format(memory_size_bytes))
2019-10-13 13:13:44 -06:00
data += bytes([memory_size_bytes<<4 | memory_address_bytes])
2018-12-12 10:08:09 -07:00
if memory_address >= 1<<(memory_address_bytes*8):
raise ValueError('invalid memory_address: {}'.format(memory_address))
data += struct.pack('!I', memory_address)[4-memory_address_bytes:]
if memory_size >= 1<<(memory_size_bytes*8):
raise ValueError('invalid memory_size: {}'.format(memory_size))
data += struct.pack('!I', memory_size)[4-memory_size_bytes:]
resp = self._uds_request(SERVICE_TYPE.REQUEST_DOWNLOAD, subfunction=None, data=data)
max_num_bytes_len = resp[0] >> 4 if len(resp) > 0 else None
if max_num_bytes_len >= 1 and max_num_bytes_len <= 4:
2019-10-12 21:52:51 -06:00
max_num_bytes = struct.unpack('!I', (b"\x00"*(4-max_num_bytes_len))+resp[1:max_num_bytes_len+1])[0]
2018-12-12 10:08:09 -07:00
else:
raise ValueError('invalid max_num_bytes_len: {}'.format(max_num_bytes_len))
return max_num_bytes # max number of bytes per transfer data request
2019-10-13 21:43:55 -06:00
def request_upload(self, memory_address: int, memory_size: int, memory_address_bytes: int=4, memory_size_bytes: int=4, data_format: int=0x00):
2019-10-13 13:13:44 -06:00
data = bytes([data_format])
2018-12-12 10:08:09 -07:00
if memory_address_bytes < 1 or memory_address_bytes > 4:
raise ValueError('invalid memory_address_bytes: {}'.format(memory_address_bytes))
if memory_size_bytes < 1 or memory_size_bytes > 4:
raise ValueError('invalid memory_size_bytes: {}'.format(memory_size_bytes))
2019-10-13 13:13:44 -06:00
data += bytes([memory_size_bytes<<4 | memory_address_bytes])
2018-12-12 10:08:09 -07:00
if memory_address >= 1<<(memory_address_bytes*8):
raise ValueError('invalid memory_address: {}'.format(memory_address))
data += struct.pack('!I', memory_address)[4-memory_address_bytes:]
if memory_size >= 1<<(memory_size_bytes*8):
raise ValueError('invalid memory_size: {}'.format(memory_size))
data += struct.pack('!I', memory_size)[4-memory_size_bytes:]
resp = self._uds_request(SERVICE_TYPE.REQUEST_UPLOAD, subfunction=None, data=data)
max_num_bytes_len = resp[0] >> 4 if len(resp) > 0 else None
if max_num_bytes_len >= 1 and max_num_bytes_len <= 4:
2019-10-12 21:52:51 -06:00
max_num_bytes = struct.unpack('!I', (b"\x00"*(4-max_num_bytes_len))+resp[1:max_num_bytes_len+1])[0]
2018-12-12 10:08:09 -07:00
else:
raise ValueError('invalid max_num_bytes_len: {}'.format(max_num_bytes_len))
return max_num_bytes # max number of bytes per transfer data request
2019-10-13 21:43:55 -06:00
def transfer_data(self, block_sequence_count: int, data: bytes=b''):
2019-10-13 13:13:44 -06:00
data = bytes([block_sequence_count]) + data
2018-12-12 10:08:09 -07:00
resp = self._uds_request(SERVICE_TYPE.TRANSFER_DATA, subfunction=None, data=data)
resp_id = resp[0] if len(resp) > 0 else None
if resp_id != block_sequence_count:
raise ValueError('invalid block_sequence_count: {}'.format(resp_id))
return resp[1:]
def request_transfer_exit(self):
self._uds_request(SERVICE_TYPE.REQUEST_TRANSFER_EXIT, subfunction=None)