From a618e64d1a1778f5b3ac95dcd8a41b64d1a18a3f Mon Sep 17 00:00:00 2001 From: Willem Melching Date: Thu, 28 May 2020 14:06:13 -0700 Subject: [PATCH] fix typing errors --- .gitignore | 1 + python/uds.py | 44 +++++++++++++++++++++++++------------------- 2 files changed, 26 insertions(+), 19 deletions(-) diff --git a/.gitignore b/.gitignore index 397996a..503ce91 100644 --- a/.gitignore +++ b/.gitignore @@ -15,3 +15,4 @@ examples/output.csv .DS_Store .vscode nosetests.xml +.mypy_cache/ diff --git a/python/uds.py b/python/uds.py index 92fc46b..6c7b21f 100644 --- a/python/uds.py +++ b/python/uds.py @@ -2,7 +2,7 @@ import time import struct from collections import deque -from typing import Callable, NamedTuple, Tuple, List +from typing import Callable, NamedTuple, Tuple, List, Deque, Generator, Optional, cast from enum import IntEnum class SERVICE_TYPE(IntEnum): @@ -271,12 +271,12 @@ _negative_response_codes = { class CanClient(): - def __init__(self, can_send: Callable[[Tuple[int, bytes, int]], None], can_recv: Callable[[], List[Tuple[int, int, bytes, int]]], tx_addr: int, rx_addr: int, bus: int, sub_addr: int=None, debug: bool=False): + def __init__(self, can_send: Callable[[int, bytes, int], None], can_recv: Callable[[], List[Tuple[int, int, bytes, int]]], tx_addr: int, rx_addr: int, bus: int, sub_addr: int=None, debug: bool=False): self.tx = can_send self.rx = can_recv self.tx_addr = tx_addr self.rx_addr = rx_addr - self.rx_buff = deque() + self.rx_buff = deque() # type: Deque[bytes] self.sub_addr = sub_addr self.bus = bus self.debug = debug @@ -320,7 +320,7 @@ class CanClient(): if len(msgs) < 254: return - def recv(self, drain: bool=False) -> List[bytes]: + def recv(self, drain: bool=False) -> Generator[bytes, None, None]: # buffer rx messages in case two response messages are received at once # (e.g. response pending and success/failure response) self._recv_buffer(drain) @@ -383,7 +383,7 @@ class IsoTpMessage(): msg = (struct.pack("!H", 0x1000 | self.tx_len) + self.tx_dat[:self.max_len - 2]).ljust(self.max_len - 2, b"\x00") self._can_client.send([msg]) - def recv(self) -> bytes: + def recv(self) -> Optional[bytes]: start_time = time.time() try: while True: @@ -505,6 +505,10 @@ class UdsClient(): isotp_msg.send(req) while True: resp = isotp_msg.recv() + + if resp is None: + continue + resp_sid = resp[0] if len(resp) > 0 else None # negative response @@ -518,7 +522,7 @@ class UdsClient(): try: error_desc = _negative_response_codes[error_code] except BaseException: - error_desc = resp[3:] + error_desc = resp[3:].hex() # wait for another message if response pending if error_code == 0x78: if self.debug: print("UDS-RX: response pending") @@ -534,7 +538,7 @@ class UdsClient(): resp_sfn = resp[1] if len(resp) > 1 else None if subfunction != resp_sfn: resp_sfn_hex = hex(resp_sfn) if resp_sfn is not None else None - raise InvalidSubFunctioneError('invalid response subfunction: {}'.format(hex(resp_sfn_hex))) + raise InvalidSubFunctioneError(f'invalid response subfunction: {resp_sfn_hex:x}') # return data (exclude service id and sub-function id) return resp[(1 if subfunction is None else 2):] @@ -595,7 +599,7 @@ class UdsClient(): def response_on_event(self, response_event_type: RESPONSE_EVENT_TYPE, store_event: bool, window_time: int, event_type_record: int, service_response_record: int): if store_event: - response_event_type |= 0x20 + response_event_type |= 0x20 # type: ignore # TODO: split record parameters into arrays data = bytes([window_time, event_type_record, service_response_record]) resp = self._uds_request(SERVICE_TYPE.RESPONSE_ON_EVENT, subfunction=response_event_type, data=data) @@ -613,9 +617,11 @@ class UdsClient(): } def link_control(self, link_control_type: LINK_CONTROL_TYPE, baud_rate_type: BAUD_RATE_TYPE=None): + data : Optional[bytes] + if link_control_type == LINK_CONTROL_TYPE.VERIFY_BAUDRATE_TRANSITION_WITH_FIXED_BAUDRATE: # baud_rate_type = BAUD_RATE_TYPE - data = bytes([baud_rate_type]) + data = bytes([cast(int, baud_rate_type)]) elif link_control_type == LINK_CONTROL_TYPE.VERIFY_BAUDRATE_TRANSITION_WITH_SPECIFIC_BAUDRATE: # baud_rate_type = custom value (3 bytes big-endian) data = struct.pack('!I', baud_rate_type)[1:] @@ -671,16 +677,16 @@ class UdsClient(): data = struct.pack('!H', dynamic_data_identifier) if dynamic_definition_type == DYNAMIC_DEFINITION_TYPE.DEFINE_BY_IDENTIFIER: for s in source_definitions: - data += struct.pack('!H', s["data_identifier"]) + bytes([s["position"], s["memory_size"]]) + data += struct.pack('!H', s.data_identifier) + bytes([s.position, s.memory_size]) elif dynamic_definition_type == DYNAMIC_DEFINITION_TYPE.DEFINE_BY_MEMORY_ADDRESS: data += bytes([memory_size_bytes<<4 | memory_address_bytes]) for s in source_definitions: - if s["memory_address"] >= 1<<(memory_address_bytes*8): - raise ValueError('invalid memory_address: {}'.format(s["memory_address"])) - data += struct.pack('!I', s["memory_address"])[4-memory_address_bytes:] - if s["memory_size"] >= 1<<(memory_size_bytes*8): - raise ValueError('invalid memory_size: {}'.format(s["memory_size"])) - data += struct.pack('!I', s["memory_size"])[4-memory_size_bytes:] + if s.memory_address >= 1<<(memory_address_bytes*8): + raise ValueError('invalid memory_address: {}'.format(s.memory_address)) + data += struct.pack('!I', s.memory_address)[4-memory_address_bytes:] + if s.memory_size >= 1<<(memory_size_bytes*8): + raise ValueError('invalid memory_size: {}'.format(s.memory_size)) + data += struct.pack('!I', s.memory_size)[4-memory_size_bytes:] elif dynamic_definition_type == DYNAMIC_DEFINITION_TYPE.CLEAR_DYNAMICALLY_DEFINED_DATA_IDENTIFIER: pass else: @@ -736,7 +742,7 @@ class UdsClient(): if dtc_report_type == DTC_REPORT_TYPE.DTC_SNAPSHOT_IDENTIFICATION or \ dtc_report_type == DTC_REPORT_TYPE.DTC_SNAPSHOT_RECORD_BY_DTC_NUMBER or \ dtc_report_type == DTC_REPORT_TYPE.DTC_SNAPSHOT_RECORD_BY_RECORD_NUMBER: - data += ord(dtc_snapshot_record_num) + data += bytes([dtc_snapshot_record_num]) # dtc_extended_record_num if dtc_report_type == DTC_REPORT_TYPE.DTC_EXTENDED_DATA_RECORD_BY_DTC_NUMBER or \ dtc_report_type == DTC_REPORT_TYPE.MIRROR_MEMORY_DTC_EXTENDED_DATA_RECORD_BY_DTC_NUMBER: @@ -784,7 +790,7 @@ class UdsClient(): data += struct.pack('!I', memory_size)[4-memory_size_bytes:] resp = self._uds_request(SERVICE_TYPE.REQUEST_DOWNLOAD, subfunction=None, data=data) - max_num_bytes_len = resp[0] >> 4 if len(resp) > 0 else None + max_num_bytes_len = resp[0] >> 4 if len(resp) > 0 else 0 if max_num_bytes_len >= 1 and max_num_bytes_len <= 4: max_num_bytes = struct.unpack('!I', (b"\x00"*(4-max_num_bytes_len))+resp[1:max_num_bytes_len+1])[0] else: @@ -809,7 +815,7 @@ class UdsClient(): data += struct.pack('!I', memory_size)[4-memory_size_bytes:] resp = self._uds_request(SERVICE_TYPE.REQUEST_UPLOAD, subfunction=None, data=data) - max_num_bytes_len = resp[0] >> 4 if len(resp) > 0 else None + max_num_bytes_len = resp[0] >> 4 if len(resp) > 0 else 0 if max_num_bytes_len >= 1 and max_num_bytes_len <= 4: max_num_bytes = struct.unpack('!I', (b"\x00"*(4-max_num_bytes_len))+resp[1:max_num_bytes_len+1])[0] else: