diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml index 82bfe4e..fa7229f 100644 --- a/.pre-commit-config.yaml +++ b/.pre-commit-config.yaml @@ -19,8 +19,9 @@ repos: - id: flake8 exclude: '^(tests/automated)/' args: - - --ignore=E111,E114,E121,E501,E302,E305,W504 + - --select=E112,E113,E304,E501,E502,E701,E702,E703,E71,E72,E731,W191,W6 - --exclude=python/esptool.py,tests/gmbitbang/* + - --max-line-length=160 - --statistics - repo: local hooks: diff --git a/python/uds.py b/python/uds.py index f46e643..f83c11f 100644 --- a/python/uds.py +++ b/python/uds.py @@ -271,7 +271,8 @@ _negative_response_codes = { class CanClient(): - def __init__(self, can_send: Callable[[int, bytes, int], None], can_recv: Callable[[], List[Tuple[int, int, bytes, int]]], tx_addr: int, rx_addr: int, bus: int, sub_addr: int = None, debug: bool = False): + def __init__(self, can_send: Callable[[int, bytes, int], None], can_recv: Callable[[], List[Tuple[int, int, bytes, int]]], + tx_addr: int, rx_addr: int, bus: int, sub_addr: int = None, debug: bool = False): self.tx = can_send self.rx = can_recv self.tx_addr = tx_addr @@ -613,7 +614,8 @@ class UdsClient(): def control_dtc_setting(self, dtc_setting_type: DTC_SETTING_TYPE): self._uds_request(SERVICE_TYPE.CONTROL_DTC_SETTING, subfunction=dtc_setting_type) - def response_on_event(self, response_event_type: RESPONSE_EVENT_TYPE, store_event: bool, window_time: int, event_type_record: int, service_response_record: int): + def response_on_event(self, response_event_type: RESPONSE_EVENT_TYPE, store_event: bool, window_time: int, + event_type_record: int, service_response_record: int): if store_event: response_event_type |= 0x20 # type: ignore # TODO: split record parameters into arrays @@ -684,7 +686,8 @@ class UdsClient(): data = bytes([transmission_mode_type, periodic_data_identifier]) self._uds_request(SERVICE_TYPE.READ_DATA_BY_PERIODIC_IDENTIFIER, subfunction=None, data=data) - def dynamically_define_data_identifier(self, dynamic_definition_type: DYNAMIC_DEFINITION_TYPE, dynamic_data_identifier: int, source_definitions: List[DynamicSourceDefinition], memory_address_bytes: int = 4, memory_size_bytes: int = 1): + def dynamically_define_data_identifier(self, dynamic_definition_type: DYNAMIC_DEFINITION_TYPE, dynamic_data_identifier: int, + source_definitions: List[DynamicSourceDefinition], memory_address_bytes: int = 4, memory_size_bytes: int = 1): if memory_address_bytes < 1 or memory_address_bytes > 4: raise ValueError('invalid memory_address_bytes: {}'.format(memory_address_bytes)) if memory_size_bytes < 1 or memory_size_bytes > 4: @@ -737,7 +740,9 @@ class UdsClient(): data = struct.pack('!I', dtc_group_type)[1:] # 3 bytes self._uds_request(SERVICE_TYPE.CLEAR_DIAGNOSTIC_INFORMATION, subfunction=None, data=data) - def read_dtc_information(self, dtc_report_type: DTC_REPORT_TYPE, dtc_status_mask_type: DTC_STATUS_MASK_TYPE = DTC_STATUS_MASK_TYPE.ALL, dtc_severity_mask_type: DTC_SEVERITY_MASK_TYPE = DTC_SEVERITY_MASK_TYPE.ALL, dtc_mask_record: int = 0xFFFFFF, dtc_snapshot_record_num: int = 0xFF, dtc_extended_record_num: int = 0xFF): + def read_dtc_information(self, dtc_report_type: DTC_REPORT_TYPE, dtc_status_mask_type: DTC_STATUS_MASK_TYPE = DTC_STATUS_MASK_TYPE.ALL, + dtc_severity_mask_type: DTC_SEVERITY_MASK_TYPE = DTC_SEVERITY_MASK_TYPE.ALL, dtc_mask_record: int = 0xFFFFFF, + dtc_snapshot_record_num: int = 0xFF, dtc_extended_record_num: int = 0xFF): data = b'' # dtc_status_mask_type if dtc_report_type == DTC_REPORT_TYPE.NUMBER_OF_DTC_BY_STATUS_MASK or \ @@ -773,7 +778,8 @@ class UdsClient(): # TODO: parse response return resp - def input_output_control_by_identifier(self, data_identifier_type: DATA_IDENTIFIER_TYPE, control_parameter_type: CONTROL_PARAMETER_TYPE, control_option_record: bytes, control_enable_mask_record: bytes = b''): + def input_output_control_by_identifier(self, data_identifier_type: DATA_IDENTIFIER_TYPE, control_parameter_type: CONTROL_PARAMETER_TYPE, + control_option_record: bytes, control_enable_mask_record: bytes = b''): data = struct.pack('!H', data_identifier_type) + bytes([control_parameter_type]) + control_option_record + control_enable_mask_record resp = self._uds_request(SERVICE_TYPE.INPUT_OUTPUT_CONTROL_BY_IDENTIFIER, subfunction=None, data=data) resp_id = struct.unpack('!H', resp[0:2])[0] if len(resp) >= 2 else None diff --git a/tests/black_white_relay_endurance.py b/tests/black_white_relay_endurance.py index f86182d..32c77f2 100755 --- a/tests/black_white_relay_endurance.py +++ b/tests/black_white_relay_endurance.py @@ -68,7 +68,8 @@ def run_test(sleep_duration): counter += 1 runtime = time.time() - start_time - print("Number of cycles:", counter, "Non-zero bus errors:", nonzero_bus_errors, "Zero bus errors:", zero_bus_errors, "Content errors:", content_errors, "Runtime: ", runtime) + print("Number of cycles:", counter, "Non-zero bus errors:", nonzero_bus_errors, "Zero bus errors:", zero_bus_errors, + "Content errors:", content_errors, "Runtime: ", runtime) if (time.time() - temp_start_time) > 3600 * 6: # Toggle relay diff --git a/tests/gps_stability_test.py b/tests/gps_stability_test.py index 49e5839..d113f7e 100755 --- a/tests/gps_stability_test.py +++ b/tests/gps_stability_test.py @@ -1,4 +1,5 @@ #!/usr/bin/env python3 +# flake8: noqa import os import sys