fix typing errors

master
Willem Melching 2020-05-28 14:06:13 -07:00
parent 9bece64e8f
commit a618e64d1a
2 changed files with 26 additions and 19 deletions

1
.gitignore vendored
View File

@ -15,3 +15,4 @@ examples/output.csv
.DS_Store
.vscode
nosetests.xml
.mypy_cache/

View File

@ -2,7 +2,7 @@
import time
import struct
from collections import deque
from typing import Callable, NamedTuple, Tuple, List
from typing import Callable, NamedTuple, Tuple, List, Deque, Generator, Optional, cast
from enum import IntEnum
class SERVICE_TYPE(IntEnum):
@ -271,12 +271,12 @@ _negative_response_codes = {
class CanClient():
def __init__(self, can_send: Callable[[Tuple[int, bytes, int]], None], can_recv: Callable[[], List[Tuple[int, int, bytes, int]]], tx_addr: int, rx_addr: int, bus: int, sub_addr: int=None, debug: bool=False):
def __init__(self, can_send: Callable[[int, bytes, int], None], can_recv: Callable[[], List[Tuple[int, int, bytes, int]]], tx_addr: int, rx_addr: int, bus: int, sub_addr: int=None, debug: bool=False):
self.tx = can_send
self.rx = can_recv
self.tx_addr = tx_addr
self.rx_addr = rx_addr
self.rx_buff = deque()
self.rx_buff = deque() # type: Deque[bytes]
self.sub_addr = sub_addr
self.bus = bus
self.debug = debug
@ -320,7 +320,7 @@ class CanClient():
if len(msgs) < 254:
return
def recv(self, drain: bool=False) -> List[bytes]:
def recv(self, drain: bool=False) -> Generator[bytes, None, None]:
# buffer rx messages in case two response messages are received at once
# (e.g. response pending and success/failure response)
self._recv_buffer(drain)
@ -383,7 +383,7 @@ class IsoTpMessage():
msg = (struct.pack("!H", 0x1000 | self.tx_len) + self.tx_dat[:self.max_len - 2]).ljust(self.max_len - 2, b"\x00")
self._can_client.send([msg])
def recv(self) -> bytes:
def recv(self) -> Optional[bytes]:
start_time = time.time()
try:
while True:
@ -505,6 +505,10 @@ class UdsClient():
isotp_msg.send(req)
while True:
resp = isotp_msg.recv()
if resp is None:
continue
resp_sid = resp[0] if len(resp) > 0 else None
# negative response
@ -518,7 +522,7 @@ class UdsClient():
try:
error_desc = _negative_response_codes[error_code]
except BaseException:
error_desc = resp[3:]
error_desc = resp[3:].hex()
# wait for another message if response pending
if error_code == 0x78:
if self.debug: print("UDS-RX: response pending")
@ -534,7 +538,7 @@ class UdsClient():
resp_sfn = resp[1] if len(resp) > 1 else None
if subfunction != resp_sfn:
resp_sfn_hex = hex(resp_sfn) if resp_sfn is not None else None
raise InvalidSubFunctioneError('invalid response subfunction: {}'.format(hex(resp_sfn_hex)))
raise InvalidSubFunctioneError(f'invalid response subfunction: {resp_sfn_hex:x}')
# return data (exclude service id and sub-function id)
return resp[(1 if subfunction is None else 2):]
@ -595,7 +599,7 @@ class UdsClient():
def response_on_event(self, response_event_type: RESPONSE_EVENT_TYPE, store_event: bool, window_time: int, event_type_record: int, service_response_record: int):
if store_event:
response_event_type |= 0x20
response_event_type |= 0x20 # type: ignore
# TODO: split record parameters into arrays
data = bytes([window_time, event_type_record, service_response_record])
resp = self._uds_request(SERVICE_TYPE.RESPONSE_ON_EVENT, subfunction=response_event_type, data=data)
@ -613,9 +617,11 @@ class UdsClient():
}
def link_control(self, link_control_type: LINK_CONTROL_TYPE, baud_rate_type: BAUD_RATE_TYPE=None):
data : Optional[bytes]
if link_control_type == LINK_CONTROL_TYPE.VERIFY_BAUDRATE_TRANSITION_WITH_FIXED_BAUDRATE:
# baud_rate_type = BAUD_RATE_TYPE
data = bytes([baud_rate_type])
data = bytes([cast(int, baud_rate_type)])
elif link_control_type == LINK_CONTROL_TYPE.VERIFY_BAUDRATE_TRANSITION_WITH_SPECIFIC_BAUDRATE:
# baud_rate_type = custom value (3 bytes big-endian)
data = struct.pack('!I', baud_rate_type)[1:]
@ -671,16 +677,16 @@ class UdsClient():
data = struct.pack('!H', dynamic_data_identifier)
if dynamic_definition_type == DYNAMIC_DEFINITION_TYPE.DEFINE_BY_IDENTIFIER:
for s in source_definitions:
data += struct.pack('!H', s["data_identifier"]) + bytes([s["position"], s["memory_size"]])
data += struct.pack('!H', s.data_identifier) + bytes([s.position, s.memory_size])
elif dynamic_definition_type == DYNAMIC_DEFINITION_TYPE.DEFINE_BY_MEMORY_ADDRESS:
data += bytes([memory_size_bytes<<4 | memory_address_bytes])
for s in source_definitions:
if s["memory_address"] >= 1<<(memory_address_bytes*8):
raise ValueError('invalid memory_address: {}'.format(s["memory_address"]))
data += struct.pack('!I', s["memory_address"])[4-memory_address_bytes:]
if s["memory_size"] >= 1<<(memory_size_bytes*8):
raise ValueError('invalid memory_size: {}'.format(s["memory_size"]))
data += struct.pack('!I', s["memory_size"])[4-memory_size_bytes:]
if s.memory_address >= 1<<(memory_address_bytes*8):
raise ValueError('invalid memory_address: {}'.format(s.memory_address))
data += struct.pack('!I', s.memory_address)[4-memory_address_bytes:]
if s.memory_size >= 1<<(memory_size_bytes*8):
raise ValueError('invalid memory_size: {}'.format(s.memory_size))
data += struct.pack('!I', s.memory_size)[4-memory_size_bytes:]
elif dynamic_definition_type == DYNAMIC_DEFINITION_TYPE.CLEAR_DYNAMICALLY_DEFINED_DATA_IDENTIFIER:
pass
else:
@ -736,7 +742,7 @@ class UdsClient():
if dtc_report_type == DTC_REPORT_TYPE.DTC_SNAPSHOT_IDENTIFICATION or \
dtc_report_type == DTC_REPORT_TYPE.DTC_SNAPSHOT_RECORD_BY_DTC_NUMBER or \
dtc_report_type == DTC_REPORT_TYPE.DTC_SNAPSHOT_RECORD_BY_RECORD_NUMBER:
data += ord(dtc_snapshot_record_num)
data += bytes([dtc_snapshot_record_num])
# dtc_extended_record_num
if dtc_report_type == DTC_REPORT_TYPE.DTC_EXTENDED_DATA_RECORD_BY_DTC_NUMBER or \
dtc_report_type == DTC_REPORT_TYPE.MIRROR_MEMORY_DTC_EXTENDED_DATA_RECORD_BY_DTC_NUMBER:
@ -784,7 +790,7 @@ class UdsClient():
data += struct.pack('!I', memory_size)[4-memory_size_bytes:]
resp = self._uds_request(SERVICE_TYPE.REQUEST_DOWNLOAD, subfunction=None, data=data)
max_num_bytes_len = resp[0] >> 4 if len(resp) > 0 else None
max_num_bytes_len = resp[0] >> 4 if len(resp) > 0 else 0
if max_num_bytes_len >= 1 and max_num_bytes_len <= 4:
max_num_bytes = struct.unpack('!I', (b"\x00"*(4-max_num_bytes_len))+resp[1:max_num_bytes_len+1])[0]
else:
@ -809,7 +815,7 @@ class UdsClient():
data += struct.pack('!I', memory_size)[4-memory_size_bytes:]
resp = self._uds_request(SERVICE_TYPE.REQUEST_UPLOAD, subfunction=None, data=data)
max_num_bytes_len = resp[0] >> 4 if len(resp) > 0 else None
max_num_bytes_len = resp[0] >> 4 if len(resp) > 0 else 0
if max_num_bytes_len >= 1 and max_num_bytes_len <= 4:
max_num_bytes = struct.unpack('!I', (b"\x00"*(4-max_num_bytes_len))+resp[1:max_num_bytes_len+1])[0]
else: