use same flake8 config as openpilot

master
Adeeb Shihadeh 2020-06-03 16:22:30 -07:00
parent 20eb68b179
commit 6848c7576b
4 changed files with 16 additions and 7 deletions

View File

@ -19,8 +19,9 @@ repos:
- id: flake8
exclude: '^(tests/automated)/'
args:
- --ignore=E111,E114,E121,E501,E302,E305,W504
- --select=E112,E113,E304,E501,E502,E701,E702,E703,E71,E72,E731,W191,W6
- --exclude=python/esptool.py,tests/gmbitbang/*
- --max-line-length=160
- --statistics
- repo: local
hooks:

View File

@ -271,7 +271,8 @@ _negative_response_codes = {
class CanClient():
def __init__(self, can_send: Callable[[int, bytes, int], None], can_recv: Callable[[], List[Tuple[int, int, bytes, int]]], tx_addr: int, rx_addr: int, bus: int, sub_addr: int = None, debug: bool = False):
def __init__(self, can_send: Callable[[int, bytes, int], None], can_recv: Callable[[], List[Tuple[int, int, bytes, int]]],
tx_addr: int, rx_addr: int, bus: int, sub_addr: int = None, debug: bool = False):
self.tx = can_send
self.rx = can_recv
self.tx_addr = tx_addr
@ -613,7 +614,8 @@ class UdsClient():
def control_dtc_setting(self, dtc_setting_type: DTC_SETTING_TYPE):
self._uds_request(SERVICE_TYPE.CONTROL_DTC_SETTING, subfunction=dtc_setting_type)
def response_on_event(self, response_event_type: RESPONSE_EVENT_TYPE, store_event: bool, window_time: int, event_type_record: int, service_response_record: int):
def response_on_event(self, response_event_type: RESPONSE_EVENT_TYPE, store_event: bool, window_time: int,
event_type_record: int, service_response_record: int):
if store_event:
response_event_type |= 0x20 # type: ignore
# TODO: split record parameters into arrays
@ -684,7 +686,8 @@ class UdsClient():
data = bytes([transmission_mode_type, periodic_data_identifier])
self._uds_request(SERVICE_TYPE.READ_DATA_BY_PERIODIC_IDENTIFIER, subfunction=None, data=data)
def dynamically_define_data_identifier(self, dynamic_definition_type: DYNAMIC_DEFINITION_TYPE, dynamic_data_identifier: int, source_definitions: List[DynamicSourceDefinition], memory_address_bytes: int = 4, memory_size_bytes: int = 1):
def dynamically_define_data_identifier(self, dynamic_definition_type: DYNAMIC_DEFINITION_TYPE, dynamic_data_identifier: int,
source_definitions: List[DynamicSourceDefinition], memory_address_bytes: int = 4, memory_size_bytes: int = 1):
if memory_address_bytes < 1 or memory_address_bytes > 4:
raise ValueError('invalid memory_address_bytes: {}'.format(memory_address_bytes))
if memory_size_bytes < 1 or memory_size_bytes > 4:
@ -737,7 +740,9 @@ class UdsClient():
data = struct.pack('!I', dtc_group_type)[1:] # 3 bytes
self._uds_request(SERVICE_TYPE.CLEAR_DIAGNOSTIC_INFORMATION, subfunction=None, data=data)
def read_dtc_information(self, dtc_report_type: DTC_REPORT_TYPE, dtc_status_mask_type: DTC_STATUS_MASK_TYPE = DTC_STATUS_MASK_TYPE.ALL, dtc_severity_mask_type: DTC_SEVERITY_MASK_TYPE = DTC_SEVERITY_MASK_TYPE.ALL, dtc_mask_record: int = 0xFFFFFF, dtc_snapshot_record_num: int = 0xFF, dtc_extended_record_num: int = 0xFF):
def read_dtc_information(self, dtc_report_type: DTC_REPORT_TYPE, dtc_status_mask_type: DTC_STATUS_MASK_TYPE = DTC_STATUS_MASK_TYPE.ALL,
dtc_severity_mask_type: DTC_SEVERITY_MASK_TYPE = DTC_SEVERITY_MASK_TYPE.ALL, dtc_mask_record: int = 0xFFFFFF,
dtc_snapshot_record_num: int = 0xFF, dtc_extended_record_num: int = 0xFF):
data = b''
# dtc_status_mask_type
if dtc_report_type == DTC_REPORT_TYPE.NUMBER_OF_DTC_BY_STATUS_MASK or \
@ -773,7 +778,8 @@ class UdsClient():
# TODO: parse response
return resp
def input_output_control_by_identifier(self, data_identifier_type: DATA_IDENTIFIER_TYPE, control_parameter_type: CONTROL_PARAMETER_TYPE, control_option_record: bytes, control_enable_mask_record: bytes = b''):
def input_output_control_by_identifier(self, data_identifier_type: DATA_IDENTIFIER_TYPE, control_parameter_type: CONTROL_PARAMETER_TYPE,
control_option_record: bytes, control_enable_mask_record: bytes = b''):
data = struct.pack('!H', data_identifier_type) + bytes([control_parameter_type]) + control_option_record + control_enable_mask_record
resp = self._uds_request(SERVICE_TYPE.INPUT_OUTPUT_CONTROL_BY_IDENTIFIER, subfunction=None, data=data)
resp_id = struct.unpack('!H', resp[0:2])[0] if len(resp) >= 2 else None

View File

@ -68,7 +68,8 @@ def run_test(sleep_duration):
counter += 1
runtime = time.time() - start_time
print("Number of cycles:", counter, "Non-zero bus errors:", nonzero_bus_errors, "Zero bus errors:", zero_bus_errors, "Content errors:", content_errors, "Runtime: ", runtime)
print("Number of cycles:", counter, "Non-zero bus errors:", nonzero_bus_errors, "Zero bus errors:", zero_bus_errors,
"Content errors:", content_errors, "Runtime: ", runtime)
if (time.time() - temp_start_time) > 3600 * 6:
# Toggle relay

View File

@ -1,4 +1,5 @@
#!/usr/bin/env python3
# flake8: noqa
import os
import sys