diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml index e8464da..14dc690 100644 --- a/.pre-commit-config.yaml +++ b/.pre-commit-config.yaml @@ -17,7 +17,9 @@ repos: - id: flake8 exclude: '^(tests/automated)/' args: - - --select=F + - --ignore=E111,E114,E121,E501,E302,E305,W504 + - --exclude=python/esptool.py,tests/gmbitbang/* + - --statistics - repo: local hooks: - id: pylint diff --git a/board/tools/enter_download_mode.py b/board/tools/enter_download_mode.py index 17d30dd..4e6a2b4 100755 --- a/board/tools/enter_download_mode.py +++ b/board/tools/enter_download_mode.py @@ -21,12 +21,12 @@ def enter_download_mode(device): def find_first_panda(context=None): context = context or usb1.USBContext() for device in context.getDeviceList(skip_on_error=True): - if device.getVendorID() == 0xbbaa and device.getProductID()&0xFF00 == 0xdd00: + if device.getVendorID() == 0xbbaa and device.getProductID() & 0xFF00 == 0xdd00: return device if __name__ == "__main__": panda_dev = find_first_panda() - if panda_dev == None: + if panda_dev is None: print("no device found") sys.exit(0) print("found device") diff --git a/crypto/getcertheader.py b/crypto/getcertheader.py index bbbe475..54b1025 100755 --- a/crypto/getcertheader.py +++ b/crypto/getcertheader.py @@ -18,15 +18,15 @@ def modinv(a, m): def to_c_string(x): mod = (hex(x)[2:-1].rjust(0x100, '0')) - hh = ''.join('\\x'+mod[i:i+2] for i in range(0, 0x100, 2)) + hh = ''.join('\\x' + mod[i:i + 2] for i in range(0, 0x100, 2)) return hh def to_c_uint32(x): nums = [] for i in range(0x20): - nums.append(x%(2**32)) + nums.append(x % (2**32)) x //= (2**32) - return "{"+'U,'.join(map(str, nums))+"U}" + return "{" + 'U,'.join(map(str, nums)) + "U}" for fn in sys.argv[1:]: rsa = RSA.importKey(open(fn).read()) @@ -35,11 +35,9 @@ for fn in sys.argv[1:]: cname = fn.split("/")[-1].split(".")[0] + "_rsa_key" - print('RSAPublicKey '+cname+' = {.len = 0x20,') + print('RSAPublicKey ' + cname + ' = {.len = 0x20,') print(' .n0inv = %dU,' % n0inv) print(' .n = %s,' % to_c_uint32(rsa.n)) print(' .rr = %s,' % to_c_uint32(rr)) print(' .exponent = %d,' % rsa.e) print('};') - - diff --git a/crypto/sign.py b/crypto/sign.py index 8cbe6f4..e199a6b 100755 --- a/crypto/sign.py +++ b/crypto/sign.py @@ -29,9 +29,8 @@ with open(sys.argv[2], "wb") as f: dd = hashlib.sha1(dat).digest() print("hash:", str(binascii.hexlify(dd), "utf-8")) - dd = b"\x00\x01" + b"\xff"*0x69 + b"\x00" + dd + dd = b"\x00\x01" + b"\xff" * 0x69 + b"\x00" + dd rsa_out = pow(int.from_bytes(dd, byteorder='big', signed=False), rsa.d, rsa.n) sig = (hex(rsa_out)[2:].rjust(0x100, '0')) x += binascii.unhexlify(sig) f.write(x) - diff --git a/examples/can_bit_transition.py b/examples/can_bit_transition.py index aa4aa9d..f40b306 100755 --- a/examples/can_bit_transition.py +++ b/examples/can_bit_transition.py @@ -4,6 +4,7 @@ import sys class Message(): """Details about a specific message ID.""" + def __init__(self, message_id): self.message_id = message_id self.ones = [0] * 8 # bit set if 1 is always seen @@ -32,7 +33,8 @@ class Info(): reader = csv.reader(input) next(reader, None) # skip the CSV header for row in reader: - if not len(row): continue + if not len(row): + continue time = float(row[0]) bus = int(row[2]) if time < start or bus > 127: @@ -76,7 +78,8 @@ def PrintUnique(log_file, low_range, high_range): if message_id in low.messages: high.messages[message_id].printBitDiff(low.messages[message_id]) found = True - if not found: print('No messages that transition from always low to always high found!') + if not found: + print('No messages that transition from always low to always high found!') if __name__ == "__main__": if len(sys.argv) < 4: diff --git a/examples/can_logger.py b/examples/can_logger.py index 12dcf58..8f48cd4 100755 --- a/examples/can_logger.py +++ b/examples/can_logger.py @@ -15,14 +15,14 @@ def can_logger(): try: p = Panda("WIFI") - except: + except Exception: print("WiFi connection timed out. Please make sure your Panda is connected and try again.") sys.exit(0) try: outputfile = open('output.csv', 'w') csvwriter = csv.writer(outputfile) - #Write Header + # Write Header csvwriter.writerow(['Bus', 'MessageID', 'Message', 'MessageLength']) print("Writing csv file output.csv. Press Ctrl-C to exit...\n") @@ -33,7 +33,7 @@ def can_logger(): while True: can_recv = p.can_recv() - for address, _, dat, src in can_recv: + for address, _, dat, src in can_recv: csvwriter.writerow([str(src), str(hex(address)), f"0x{dat.hex()}", len(dat)]) if src == 0: diff --git a/examples/can_unique.py b/examples/can_unique.py index ed5c6bc..e112046 100755 --- a/examples/can_unique.py +++ b/examples/can_unique.py @@ -24,6 +24,7 @@ import sys class Message(): """Details about a specific message ID.""" + def __init__(self, message_id): self.message_id = message_id self.data = {} # keyed by hex string encoded message data diff --git a/examples/get_panda_password.py b/examples/get_panda_password.py index 54cbe57..817adf6 100644 --- a/examples/get_panda_password.py +++ b/examples/get_panda_password.py @@ -13,7 +13,6 @@ def get_panda_password(): sys.exit(0) wifi = p.get_serial() - #print('[%s]' % ', '.join(map(str, wifi))) print("SSID: " + wifi[0]) print("Password: " + wifi[1]) diff --git a/examples/query_fw_versions.py b/examples/query_fw_versions.py index 2177335..38dda5b 100755 --- a/examples/query_fw_versions.py +++ b/examples/query_fw_versions.py @@ -5,7 +5,7 @@ from panda.python.uds import UdsClient, MessageTimeoutError, NegativeResponseErr if __name__ == "__main__": addrs = [0x700 + i for i in range(256)] - addrs += [0x18da0000 + (i<<8) + 0xf1 for i in range(256)] + addrs += [0x18da0000 + (i << 8) + 0xf1 for i in range(256)] results = {} panda = Panda() @@ -31,37 +31,43 @@ if __name__ == "__main__": try: data = uds_client.read_data_by_identifier(DATA_IDENTIFIER_TYPE.BOOT_SOFTWARE_IDENTIFICATION) - if data: resp[DATA_IDENTIFIER_TYPE.BOOT_SOFTWARE_IDENTIFICATION] = data + if data: + resp[DATA_IDENTIFIER_TYPE.BOOT_SOFTWARE_IDENTIFICATION] = data except NegativeResponseError: pass try: data = uds_client.read_data_by_identifier(DATA_IDENTIFIER_TYPE.APPLICATION_SOFTWARE_IDENTIFICATION) - if data: resp[DATA_IDENTIFIER_TYPE.APPLICATION_SOFTWARE_IDENTIFICATION] = data + if data: + resp[DATA_IDENTIFIER_TYPE.APPLICATION_SOFTWARE_IDENTIFICATION] = data except NegativeResponseError: pass try: data = uds_client.read_data_by_identifier(DATA_IDENTIFIER_TYPE.APPLICATION_DATA_IDENTIFICATION) - if data: resp[DATA_IDENTIFIER_TYPE.APPLICATION_DATA_IDENTIFICATION] = data + if data: + resp[DATA_IDENTIFIER_TYPE.APPLICATION_DATA_IDENTIFICATION] = data except NegativeResponseError: pass try: data = uds_client.read_data_by_identifier(DATA_IDENTIFIER_TYPE.BOOT_SOFTWARE_FINGERPRINT) - if data: resp[DATA_IDENTIFIER_TYPE.BOOT_SOFTWARE_FINGERPRINT] = data + if data: + resp[DATA_IDENTIFIER_TYPE.BOOT_SOFTWARE_FINGERPRINT] = data except NegativeResponseError: pass try: data = uds_client.read_data_by_identifier(DATA_IDENTIFIER_TYPE.APPLICATION_SOFTWARE_FINGERPRINT) - if data: resp[DATA_IDENTIFIER_TYPE.APPLICATION_SOFTWARE_FINGERPRINT] = data + if data: + resp[DATA_IDENTIFIER_TYPE.APPLICATION_SOFTWARE_FINGERPRINT] = data except NegativeResponseError: pass try: data = uds_client.read_data_by_identifier(DATA_IDENTIFIER_TYPE.APPLICATION_DATA_FINGERPRINT) - if data: resp[DATA_IDENTIFIER_TYPE.APPLICATION_DATA_FINGERPRINT] = data + if data: + resp[DATA_IDENTIFIER_TYPE.APPLICATION_DATA_FINGERPRINT] = data except NegativeResponseError: pass diff --git a/examples/query_vin_and_stats.py b/examples/query_vin_and_stats.py index f8b5d9b..564b5b9 100755 --- a/examples/query_vin_and_stats.py +++ b/examples/query_vin_and_stats.py @@ -11,7 +11,7 @@ from panda.python.isotp import isotp_send, isotp_recv def get_current_data_for_pid(pid): # 01 xx = Show current data - isotp_send(panda, b"\x01"+ bytes([pid]), 0x7e0) + isotp_send(panda, b"\x01" + bytes([pid]), 0x7e0) return isotp_recv(panda, 0x7e8) def get_supported_pids(): @@ -19,7 +19,7 @@ def get_supported_pids(): pid = 0 while 1: supported = struct.unpack(">I", get_current_data_for_pid(pid)[2:])[0] - for i in range(1+pid, 0x21+pid): + for i in range(1 + pid, 0x21 + pid): if supported & 0x80000000: ret.append(i) supported <<= 1 @@ -45,16 +45,13 @@ if __name__ == "__main__": print("DTCs:", "".join(map(chr, dtcs[:2]))) supported_pids = get_supported_pids() - print("Supported PIDs:",supported_pids) + print("Supported PIDs:", supported_pids) while 1: speed = struct.unpack(">B", get_current_data_for_pid(13)[2:])[0] # kph - rpm = struct.unpack(">H", get_current_data_for_pid(12)[2:])[0]/4.0 # revs - throttle = struct.unpack(">B", get_current_data_for_pid(17)[2:])[0]/255.0 * 100 # percent + rpm = struct.unpack(">H", get_current_data_for_pid(12)[2:])[0] / 4.0 # revs + throttle = struct.unpack(">B", get_current_data_for_pid(17)[2:])[0] / 255.0 * 100 # percent temp = struct.unpack(">B", get_current_data_for_pid(5)[2:])[0] - 40 # degrees C - load = struct.unpack(">B", get_current_data_for_pid(4)[2:])[0]/255.0 * 100 # percent + load = struct.unpack(">B", get_current_data_for_pid(4)[2:])[0] / 255.0 * 100 # percent print("%d KPH, %d RPM, %.1f%% Throttle, %d deg C, %.1f%% load" % (speed, rpm, throttle, temp, load)) time.sleep(0.2) - - - diff --git a/examples/tesla_tester.py b/examples/tesla_tester.py index d5b09e3..4d1c602 100644 --- a/examples/tesla_tester.py +++ b/examples/tesla_tester.py @@ -1,4 +1,6 @@ #!/usr/bin/env python3 +# flake8: noqa + import sys import binascii from panda import Panda @@ -45,7 +47,7 @@ def tesla_tester(): while True: #Read the VIN can_recv = p.can_recv() - for address, _, dat, src in can_recv: + for address, _, dat, src in can_recv: if src == body_bus_num: if address == 1384: # 0x568 is VIN vin_index = int(binascii.hexlify(dat)[:2]) # first byte is the index, 00, 01, 02 diff --git a/python/__init__.py b/python/__init__.py index e0b136a..facdd60 100644 --- a/python/__init__.py +++ b/python/__init__.py @@ -38,17 +38,17 @@ def build_st(target, mkfile="Makefile", clean=True): def parse_can_buffer(dat): ret = [] for j in range(0, len(dat), 0x10): - ddat = dat[j:j+0x10] + ddat = dat[j:j + 0x10] f1, f2 = struct.unpack("II", ddat[0:8]) extended = 4 if f1 & extended: address = f1 >> 3 else: address = f1 >> 21 - dddat = ddat[8:8+(f2&0xF)] + dddat = ddat[8:8 + (f2 & 0xF)] if DEBUG: print(" R %x: %s" % (address, binascii.hexlify(dddat))) - ret.append((address, f2>>16, dddat, (f2>>4)&0xFF)) + ret.append((address, f2 >> 16, dddat, (f2 >> 4) & 0xFF)) return ret class PandaWifiStreaming(object): @@ -67,7 +67,7 @@ class PandaWifiStreaming(object): ret = [] while True: try: - dat, addr = self.sock.recvfrom(0x200*0x10) + dat, addr = self.sock.recvfrom(0x200 * 0x10) if addr == (self.ip, self.port): ret += parse_can_buffer(dat) except socket.error as e: @@ -84,7 +84,7 @@ class WifiHandle(object): def __recv(self): ret = self.sock.recv(0x44) length = struct.unpack("I", ret[0:4])[0] - return ret[4:4+length] + return ret[4:4 + length] def controlWrite(self, request_type, request, value, index, data, timeout=0): # ignore data in reply, panda doesn't use it @@ -97,7 +97,7 @@ class WifiHandle(object): def bulkWrite(self, endpoint, data, timeout=0): if len(data) > 0x10: raise ValueError("Data must not be longer than 0x10") - self.sock.send(struct.pack("HH", endpoint, len(data))+data) + self.sock.send(struct.pack("HH", endpoint, len(data)) + data) self.__recv() # to /dev/null def bulkRead(self, endpoint, length, timeout=0): @@ -161,7 +161,7 @@ class Panda(object): self._handle = None def connect(self, claim=True, wait=False): - if self._handle != None: + if self._handle is not None: self.close() if self._serial == "WIFI": @@ -176,7 +176,6 @@ class Panda(object): while 1: try: for device in context.getDeviceList(skip_on_error=True): - #print(device) if device.getVendorID() == 0xbbaa and device.getProductID() in [0xddcc, 0xddee]: try: this_serial = device.getSerialNumber() @@ -189,19 +188,19 @@ class Panda(object): self.bootstub = device.getProductID() == 0xddee self.legacy = (device.getbcdDevice() != 0x2300) self._handle = device.open() - if not sys.platform in ["win32", "cygwin", "msys"]: + if sys.platform not in ["win32", "cygwin", "msys"]: self._handle.setAutoDetachKernelDriver(True) if claim: self._handle.claimInterface(0) - #self._handle.setInterfaceAltSetting(0, 0) #Issue in USB stack + # self._handle.setInterfaceAltSetting(0, 0) # Issue in USB stack break except Exception as e: print("exception", e) traceback.print_exc() - if wait == False or self._handle != None: + if not wait or self._handle is not None: break context = usb1.USBContext() # New context needed so new devices show up - assert(self._handle != None) + assert(self._handle is not None) print("connected") def reset(self, enter_bootstub=False, enter_bootloader=False): @@ -230,7 +229,7 @@ class Panda(object): success = True break except Exception: - print("reconnecting is taking %d seconds..." % (i+1)) + print("reconnecting is taking %d seconds..." % (i + 1)) try: dfu = PandaDFU(PandaDFU.st_serial_to_dfu_serial(self._serial)) dfu.recover() @@ -259,7 +258,7 @@ class Panda(object): STEP = 0x10 print("flash: flashing") for i in range(0, len(code), STEP): - handle.bulkWrite(2, code[i:i+STEP]) + handle.bulkWrite(2, code[i:i + STEP]) # reset print("flash: resetting") @@ -321,14 +320,14 @@ class Panda(object): def flash_ota_st(): ret = os.system("cd %s && make clean && make ota" % (os.path.join(BASEDIR, "board"))) time.sleep(1) - return ret==0 + return ret == 0 @staticmethod def flash_ota_wifi(release=False): release_str = "RELEASE=1" if release else "" - ret = os.system("cd {} && make clean && {} make ota".format(os.path.join(BASEDIR, "boardesp"),release_str)) + ret = os.system("cd {} && make clean && {} make ota".format(os.path.join(BASEDIR, "boardesp"), release_str)) time.sleep(1) - return ret==0 + return ret == 0 @staticmethod def list(): @@ -344,7 +343,7 @@ class Panda(object): except Exception: pass # TODO: detect if this is real - #ret += ["WIFI"] + # ret += ["WIFI"] return ret def call_control_api(self, msg): @@ -420,7 +419,7 @@ class Panda(object): dat = self._handle.controlRead(Panda.REQUEST_IN, 0xd0, 0, 0, 0x20) hashsig, calc_hash = dat[0x1c:], hashlib.sha1(dat[0:0x1c]).digest()[0:4] assert(hashsig == calc_hash) - return [dat[0:0x10].decode("utf8"), dat[0x10:0x10+10].decode("utf8")] + return [dat[0:0x10].decode("utf8"), dat[0x10:0x10 + 10].decode("utf8")] def get_secret(self): return self._handle.controlRead(Panda.REQUEST_IN, 0xd0, 1, 0, 0x10) @@ -467,10 +466,10 @@ class Panda(object): self._handle.controlWrite(Panda.REQUEST_OUT, 0xf4, int(bus_num), int(enable), b'') def set_can_speed_kbps(self, bus, speed): - self._handle.controlWrite(Panda.REQUEST_OUT, 0xde, bus, int(speed*10), b'') + self._handle.controlWrite(Panda.REQUEST_OUT, 0xde, bus, int(speed * 10), b'') def set_uart_baud(self, uart, rate): - self._handle.controlWrite(Panda.REQUEST_OUT, 0xe4, uart, int(rate/300), b'') + self._handle.controlWrite(Panda.REQUEST_OUT, 0xe4, uart, int(rate / 300), b'') def set_uart_parity(self, uart, parity): # parity, 0=off, 1=even, 2=odd @@ -504,7 +503,6 @@ class Panda(object): while True: try: - #print("DAT: %s"%b''.join(snds).__repr__()) if self.wifi: for s in snds: self._handle.bulkWrite(3, s) @@ -521,7 +519,7 @@ class Panda(object): dat = bytearray() while True: try: - dat = self._handle.bulkRead(1, 0x10*256) + dat = self._handle.bulkRead(1, 0x10 * 256) break except (usb1.USBErrorIO, usb1.USBErrorOverflow): print("CAN: BAD RECV, RETRYING") @@ -561,7 +559,7 @@ class Panda(object): def serial_write(self, port_number, ln): ret = 0 for i in range(0, len(ln), 0x20): - ret += self._handle.bulkWrite(2, struct.pack("B", port_number) + ln[i:i+0x20]) + ret += self._handle.bulkWrite(2, struct.pack("B", port_number) + ln[i:i + 0x20]) return ret def serial_clear(self, port_number): @@ -599,7 +597,7 @@ class Panda(object): def kline_ll_recv(self, cnt, bus=2): echo = bytearray() while len(echo) != cnt: - ret = self._handle.controlRead(Panda.REQUEST_OUT, 0xe0, bus, 0, cnt-len(echo)) + ret = self._handle.controlRead(Panda.REQUEST_OUT, 0xe0, bus, 0, cnt - len(echo)) if DEBUG and len(ret) > 0: print("kline recv: " + binascii.hexlify(ret)) echo += ret @@ -616,7 +614,7 @@ class Panda(object): if checksum: x += get_checksum(x) for i in range(0, len(x), 0xf): - ts = x[i:i+0xf] + ts = x[i:i + 0xf] if DEBUG: print("kline send: " + binascii.hexlify(ts)) self._handle.bulkWrite(2, bytes([bus]) + ts) @@ -629,7 +627,7 @@ class Panda(object): def kline_recv(self, bus=2): msg = self.kline_ll_recv(2, bus=bus) - msg += self.kline_ll_recv(ord(msg[1])-2, bus=bus) + msg += self.kline_ll_recv(ord(msg[1]) - 2, bus=bus) return msg def send_heartbeat(self): @@ -663,6 +661,6 @@ class Panda(object): a = struct.unpack("H", dat) return a[0] -# ****************** Phone ***************** + # ****************** Phone ***************** def set_phone_power(self, enabled): self._handle.controlWrite(Panda.REQUEST_OUT, 0xb3, int(enabled), 0, b'') diff --git a/python/dfu.py b/python/dfu.py index d9d619a..658b6a9 100644 --- a/python/dfu.py +++ b/python/dfu.py @@ -24,7 +24,7 @@ class PandaDFU(object): self._handle = device.open() self.legacy = "07*128Kg" in self._handle.getASCIIStringDescriptor(4) return - raise Exception("failed to open "+dfu_serial) + raise Exception("failed to open " + dfu_serial) @staticmethod def list(): @@ -43,9 +43,9 @@ class PandaDFU(object): @staticmethod def st_serial_to_dfu_serial(st): - if st == None or st == "none": + if st is None or st == "none": return None - uid_base = struct.unpack("H"*6, bytes.fromhex(st)) + uid_base = struct.unpack("H" * 6, bytes.fromhex(st)) return binascii.hexlify(struct.pack("!HHH", uid_base[1] + uid_base[5], uid_base[0] + uid_base[4] + 0xA, uid_base[3])).upper().decode("utf-8") def status(self): @@ -69,7 +69,7 @@ class PandaDFU(object): self.status() def program(self, address, dat, block_size=None): - if block_size == None: + if block_size is None: block_size = len(dat) # Set Address Pointer @@ -77,11 +77,11 @@ class PandaDFU(object): self.status() # Program - dat += b"\xFF"*((block_size-len(dat)) % block_size) - for i in range(0, len(dat)//block_size): - ldat = dat[i*block_size:(i+1)*block_size] + dat += b"\xFF" * ((block_size - len(dat)) % block_size) + for i in range(0, len(dat) // block_size): + ldat = dat[i * block_size:(i + 1) * block_size] print("programming %d with length %d" % (i, len(ldat))) - self._handle.controlWrite(0x21, DFU_DNLOAD, 2+i, 0, ldat) + self._handle.controlWrite(0x21, DFU_DNLOAD, 2 + i, 0, ldat) self.status() def program_bootstub(self, code_bootstub): diff --git a/python/esptool.py b/python/esptool.py index 74e383e..9afe36e 100755 --- a/python/esptool.py +++ b/python/esptool.py @@ -18,6 +18,8 @@ # this program; if not, write to the Free Software Foundation, Inc., 51 Franklin # Street, Fifth Floor, Boston, MA 02110-1301 USA. +# flake8: noqa + import argparse import hashlib import inspect diff --git a/python/flash_release.py b/python/flash_release.py index 85bc553..961a83a 100755 --- a/python/flash_release.py +++ b/python/flash_release.py @@ -11,9 +11,9 @@ def flash_release(path=None, st_serial=None): from zipfile import ZipFile def status(x): - print("\033[1;32;40m"+x+"\033[00m") + print("\033[1;32;40m" + x + "\033[00m") - if st_serial == None: + if st_serial is not None: # look for Panda panda_list = Panda.list() if len(panda_list) == 0: @@ -23,7 +23,7 @@ def flash_release(path=None, st_serial=None): st_serial = panda_list[0] print("Using panda with serial %s" % st_serial) - if path == None: + if path is not None: print("Fetching latest firmware from github.com/commaai/panda-artifacts") r = requests.get("https://raw.githubusercontent.com/commaai/panda-artifacts/master/latest.json") url = json.loads(r.text)['url'] @@ -35,7 +35,7 @@ def flash_release(path=None, st_serial=None): zf.printdir() version = zf.read("version") - status("0. Preparing to flash "+version) + status("0. Preparing to flash " + version) code_bootstub = zf.read("bootstub.panda.bin") code_panda = zf.read("panda.bin") @@ -67,14 +67,17 @@ def flash_release(path=None, st_serial=None): # flashing ESP if panda.is_white(): status("4. Flashing ESP (slow!)") - align = lambda x, sz=0x1000: x+"\xFF"*((sz-len(x)) % sz) + + def align(x, sz=0x1000): + x + "\xFF" * ((sz - len(x)) % sz) + esp = ESPROM(st_serial) esp.connect() flasher = CesantaFlasher(esp, 230400) flasher.flash_write(0x0, align(code_boot_15), True) flasher.flash_write(0x1000, align(code_user1), True) flasher.flash_write(0x81000, align(code_user2), True) - flasher.flash_write(0x3FE000, "\xFF"*0x1000) + flasher.flash_write(0x3FE000, "\xFF" * 0x1000) flasher.boot_fw() del flasher del esp @@ -95,4 +98,3 @@ def flash_release(path=None, st_serial=None): if __name__ == "__main__": flash_release(*sys.argv[1:]) - diff --git a/python/isotp.py b/python/isotp.py index 69cddc4..0bed3bb 100644 --- a/python/isotp.py +++ b/python/isotp.py @@ -34,22 +34,22 @@ def isotp_recv_subaddr(panda, addr, bus, sendaddr, subaddr): # TODO: handle other subaddr also communicating assert msg[0] == subaddr - if msg[1]&0xf0 == 0x10: + if msg[1] & 0xf0 == 0x10: # first tlen = ((msg[1] & 0xf) << 8) | msg[2] dat = msg[3:] # 0 block size? - CONTINUE = bytes([subaddr]) + b"\x30" + b"\x00"*6 + CONTINUE = bytes([subaddr]) + b"\x30" + b"\x00" * 6 panda.can_send(sendaddr, CONTINUE, bus) idx = 1 - for mm in recv(panda, (tlen-len(dat) + 5)//6, addr, bus): + for mm in recv(panda, (tlen - len(dat) + 5) // 6, addr, bus): assert mm[0] == subaddr - assert mm[1] == (0x20 | (idx&0xF)) + assert mm[1] == (0x20 | (idx & 0xF)) dat += mm[2:] idx += 1 - elif msg[1]&0xf0 == 0x00: + elif msg[1] & 0xf0 == 0x00: # single tlen = msg[1] & 0xf dat = msg[2:] @@ -63,7 +63,7 @@ def isotp_recv_subaddr(panda, addr, bus, sendaddr, subaddr): def isotp_send(panda, x, addr, bus=0, recvaddr=None, subaddr=None): if recvaddr is None: - recvaddr = addr+8 + recvaddr = addr + 8 if len(x) <= 7 and subaddr is None: panda.can_send(addr, msg(x), bus) @@ -100,7 +100,7 @@ def isotp_send(panda, x, addr, bus=0, recvaddr=None, subaddr=None): def isotp_recv(panda, addr, bus=0, sendaddr=None, subaddr=None): if sendaddr is None: - sendaddr = addr-8 + sendaddr = addr - 8 if subaddr is not None: dat = isotp_recv_subaddr(panda, addr, bus, sendaddr, subaddr) @@ -113,13 +113,13 @@ def isotp_recv(panda, addr, bus=0, sendaddr=None, subaddr=None): dat = msg[2:] # 0 block size? - CONTINUE = b"\x30" + b"\x00"*7 + CONTINUE = b"\x30" + b"\x00" * 7 panda.can_send(sendaddr, CONTINUE, bus) idx = 1 - for mm in recv(panda, (tlen-len(dat) + 6)//7, addr, bus): - assert mm[0] == (0x20 | (idx&0xF)) + for mm in recv(panda, (tlen - len(dat) + 6) // 7, addr, bus): + assert mm[0] == (0x20 | (idx & 0xF)) dat += mm[1:] idx += 1 elif msg[0] & 0xf0 == 0x00: @@ -134,4 +134,3 @@ def isotp_recv(panda, addr, bus=0, sendaddr=None, subaddr=None): print("R:", binascii.hexlify(dat)) return dat - diff --git a/python/serial.py b/python/serial.py index 4d900cf..93ed2df 100644 --- a/python/serial.py +++ b/python/serial.py @@ -7,21 +7,16 @@ class PandaSerial(object): self.panda.set_uart_baud(self.port, baud) self.buf = b"" - def read(self, l=1): + def read(self, l=1): # noqa: E741 tt = self.panda.serial_read(self.port) if len(tt) > 0: - #print "R: ", tt.encode("hex") self.buf += tt ret = self.buf[0:l] self.buf = self.buf[l:] return ret def write(self, dat): - #print "W: ", dat.encode("hex") - #print ' pigeon_send("' + ''.join(map(lambda x: "\\x%02X" % ord(x), dat)) + '");' return self.panda.serial_write(self.port, dat) def close(self): pass - - diff --git a/python/uds.py b/python/uds.py index a29de93..6a13a09 100644 --- a/python/uds.py +++ b/python/uds.py @@ -6,31 +6,31 @@ from typing import Callable, NamedTuple, Tuple, List, Deque, Generator, Optional from enum import IntEnum class SERVICE_TYPE(IntEnum): - DIAGNOSTIC_SESSION_CONTROL = 0x10 - ECU_RESET = 0x11 - SECURITY_ACCESS = 0x27 - COMMUNICATION_CONTROL = 0x28 - TESTER_PRESENT = 0x3E - ACCESS_TIMING_PARAMETER = 0x83 - SECURED_DATA_TRANSMISSION = 0x84 - CONTROL_DTC_SETTING = 0x85 - RESPONSE_ON_EVENT = 0x86 - LINK_CONTROL = 0x87 - READ_DATA_BY_IDENTIFIER = 0x22 - READ_MEMORY_BY_ADDRESS = 0x23 - READ_SCALING_DATA_BY_IDENTIFIER = 0x24 - READ_DATA_BY_PERIODIC_IDENTIFIER = 0x2A + DIAGNOSTIC_SESSION_CONTROL = 0x10 + ECU_RESET = 0x11 + SECURITY_ACCESS = 0x27 + COMMUNICATION_CONTROL = 0x28 + TESTER_PRESENT = 0x3E + ACCESS_TIMING_PARAMETER = 0x83 + SECURED_DATA_TRANSMISSION = 0x84 + CONTROL_DTC_SETTING = 0x85 + RESPONSE_ON_EVENT = 0x86 + LINK_CONTROL = 0x87 + READ_DATA_BY_IDENTIFIER = 0x22 + READ_MEMORY_BY_ADDRESS = 0x23 + READ_SCALING_DATA_BY_IDENTIFIER = 0x24 + READ_DATA_BY_PERIODIC_IDENTIFIER = 0x2A DYNAMICALLY_DEFINE_DATA_IDENTIFIER = 0x2C - WRITE_DATA_BY_IDENTIFIER = 0x2E - WRITE_MEMORY_BY_ADDRESS = 0x3D - CLEAR_DIAGNOSTIC_INFORMATION = 0x14 - READ_DTC_INFORMATION = 0x19 + WRITE_DATA_BY_IDENTIFIER = 0x2E + WRITE_MEMORY_BY_ADDRESS = 0x3D + CLEAR_DIAGNOSTIC_INFORMATION = 0x14 + READ_DTC_INFORMATION = 0x19 INPUT_OUTPUT_CONTROL_BY_IDENTIFIER = 0x2F - ROUTINE_CONTROL = 0x31 - REQUEST_DOWNLOAD = 0x34 - REQUEST_UPLOAD = 0x35 - TRANSFER_DATA = 0x36 - REQUEST_TRANSFER_EXIT = 0x37 + ROUTINE_CONTROL = 0x31 + REQUEST_DOWNLOAD = 0x34 + REQUEST_UPLOAD = 0x35 + TRANSFER_DATA = 0x36 + REQUEST_TRANSFER_EXIT = 0x37 class SESSION_TYPE(IntEnum): DEFAULT = 1 @@ -271,7 +271,7 @@ _negative_response_codes = { class CanClient(): - def __init__(self, can_send: Callable[[int, bytes, int], None], can_recv: Callable[[], List[Tuple[int, int, bytes, int]]], tx_addr: int, rx_addr: int, bus: int, sub_addr: int=None, debug: bool=False): + def __init__(self, can_send: Callable[[int, bytes, int], None], can_recv: Callable[[], List[Tuple[int, int, bytes, int]]], tx_addr: int, rx_addr: int, bus: int, sub_addr: int = None, debug: bool = False): self.tx = can_send self.rx = can_recv self.tx_addr = tx_addr @@ -286,30 +286,34 @@ class CanClient(): if self.tx_addr == 0x7DF: is_response = addr >= 0x7E8 and addr <= 0x7EF if is_response: - if self.debug: print(f"switch to physical addr {hex(addr)}") - self.tx_addr = addr-8 + if self.debug: + print(f"switch to physical addr {hex(addr)}") + self.tx_addr = addr - 8 self.rx_addr = addr return is_response if self.tx_addr == 0x18DB33F1: is_response = addr >= 0x18DAF100 and addr <= 0x18DAF1FF if is_response: - if self.debug: print(f"switch to physical addr {hex(addr)}") - self.tx_addr = 0x18DA00F1 + (addr<<8 & 0xFF00) + if self.debug: + print(f"switch to physical addr {hex(addr)}") + self.tx_addr = 0x18DA00F1 + (addr << 8 & 0xFF00) self.rx_addr = addr return bus == self.bus and addr == self.rx_addr - def _recv_buffer(self, drain: bool=False) -> None: + def _recv_buffer(self, drain: bool = False) -> None: while True: msgs = self.rx() if drain: - if self.debug: print("CAN-RX: drain - {}".format(len(msgs))) + if self.debug: + print("CAN-RX: drain - {}".format(len(msgs))) self.rx_buff.clear() else: for rx_addr, rx_ts, rx_data, rx_bus in msgs or []: if self._recv_filter(rx_bus, rx_addr) and len(rx_data) > 0: rx_data = bytes(rx_data) # convert bytearray to bytes - if self.debug: print(f"CAN-RX: {hex(rx_addr)} - 0x{bytes.hex(rx_data)}") + if self.debug: + print(f"CAN-RX: {hex(rx_addr)} - 0x{bytes.hex(rx_data)}") # Cut off sub addr in first byte if self.sub_addr is not None: @@ -320,7 +324,7 @@ class CanClient(): if len(msgs) < 254: return - def recv(self, drain: bool=False) -> Generator[bytes, None, None]: + def recv(self, drain: bool = False) -> Generator[bytes, None, None]: # buffer rx messages in case two response messages are received at once # (e.g. response pending and success/failure response) self._recv_buffer(drain) @@ -330,16 +334,18 @@ class CanClient(): except IndexError: pass # empty - def send(self, msgs: List[bytes], delay: float=0) -> None: + def send(self, msgs: List[bytes], delay: float = 0) -> None: for i, msg in enumerate(msgs): if delay and i != 0: - if self.debug: print(f"CAN-TX: delay - {delay}") + if self.debug: + print(f"CAN-TX: delay - {delay}") time.sleep(delay) if self.sub_addr is not None: msg = bytes([self.sub_addr]) + msg - if self.debug: print(f"CAN-TX: {hex(self.tx_addr)} - 0x{bytes.hex(msg)}") + if self.debug: + print(f"CAN-TX: {hex(self.tx_addr)} - 0x{bytes.hex(msg)}") assert len(msg) <= 8 self.tx(self.tx_addr, msg, self.bus) @@ -348,7 +354,7 @@ class CanClient(): self._recv_buffer() class IsoTpMessage(): - def __init__(self, can_client: CanClient, timeout: float=1, debug: bool=False, max_len: int=8): + def __init__(self, can_client: CanClient, timeout: float = 1, debug: bool = False, max_len: int = 8): self._can_client = can_client self.timeout = timeout self.debug = debug @@ -368,18 +374,21 @@ class IsoTpMessage(): self.rx_idx = 0 self.rx_done = False - if self.debug: print(f"ISO-TP: REQUEST - 0x{bytes.hex(self.tx_dat)}") + if self.debug: + print(f"ISO-TP: REQUEST - 0x{bytes.hex(self.tx_dat)}") self._tx_first_frame() def _tx_first_frame(self) -> None: if self.tx_len < self.max_len: # single frame (send all bytes) - if self.debug: print("ISO-TP: TX - single frame") + if self.debug: + print("ISO-TP: TX - single frame") msg = (bytes([self.tx_len]) + self.tx_dat).ljust(self.max_len, b"\x00") self.tx_done = True else: # first frame (send first 6 bytes) - if self.debug: print("ISO-TP: TX - first frame") + if self.debug: + print("ISO-TP: TX - first frame") msg = (struct.pack("!H", 0x1000 | self.tx_len) + self.tx_dat[:self.max_len - 2]).ljust(self.max_len - 2, b"\x00") self._can_client.send([msg]) @@ -397,16 +406,18 @@ class IsoTpMessage(): if time.time() - start_time > self.timeout: raise MessageTimeoutError("timeout waiting for response") finally: - if self.debug and self.rx_dat: print(f"ISO-TP: RESPONSE - 0x{bytes.hex(self.rx_dat)}") + if self.debug and self.rx_dat: + print(f"ISO-TP: RESPONSE - 0x{bytes.hex(self.rx_dat)}") def _isotp_rx_next(self, rx_data: bytes) -> None: # single rx_frame if rx_data[0] >> 4 == 0x0: self.rx_len = rx_data[0] & 0xFF - self.rx_dat = rx_data[1:1+self.rx_len] + self.rx_dat = rx_data[1:1 + self.rx_len] self.rx_idx = 0 self.rx_done = True - if self.debug: print(f"ISO-TP: RX - single frame - idx={self.rx_idx} done={self.rx_done}") + if self.debug: + print(f"ISO-TP: RX - single frame - idx={self.rx_idx} done={self.rx_done}") return # first rx_frame @@ -415,8 +426,10 @@ class IsoTpMessage(): self.rx_dat = rx_data[2:] self.rx_idx = 0 self.rx_done = False - if self.debug: print(f"ISO-TP: RX - first frame - idx={self.rx_idx} done={self.rx_done}") - if self.debug: print("ISO-TP: TX - flow control continue") + if self.debug: + print(f"ISO-TP: RX - first frame - idx={self.rx_idx} done={self.rx_done}") + if self.debug: + print("ISO-TP: TX - flow control continue") # send flow control message (send all bytes) msg = b"\x30\x00\x00".ljust(self.max_len, b"\x00") self._can_client.send([msg]) @@ -424,23 +437,25 @@ class IsoTpMessage(): # consecutive rx frame if rx_data[0] >> 4 == 0x2: - assert self.rx_done == False, "isotp - rx: consecutive frame with no active frame" + assert not self.rx_done, "isotp - rx: consecutive frame with no active frame" self.rx_idx += 1 assert self.rx_idx & 0xF == rx_data[0] & 0xF, "isotp - rx: invalid consecutive frame index" rx_size = self.rx_len - len(self.rx_dat) - self.rx_dat += rx_data[1:1+rx_size] + self.rx_dat += rx_data[1:1 + rx_size] if self.rx_len == len(self.rx_dat): self.rx_done = True - if self.debug: print(f"ISO-TP: RX - consecutive frame - idx={self.rx_idx} done={self.rx_done}") + if self.debug: + print(f"ISO-TP: RX - consecutive frame - idx={self.rx_idx} done={self.rx_done}") return # flow control if rx_data[0] >> 4 == 0x3: - assert self.tx_done == False, "isotp - rx: flow control with no active frame" + assert not self.tx_done, "isotp - rx: flow control with no active frame" assert rx_data[0] != 0x32, "isotp - rx: flow-control overflow/abort" assert rx_data[0] == 0x30 or rx_data[0] == 0x31, "isotp - rx: flow-control transfer state indicator invalid" if rx_data[0] == 0x30: - if self.debug: print("ISO-TP: RX - flow control continue") + if self.debug: + print("ISO-TP: RX - flow control continue") delay_ts = rx_data[2] & 0x7F # scale is 1 milliseconds if first bit == 0, 100 micro seconds if first bit == 1 delay_div = 1000. if rx_data[2] & 0x80 == 0 else 10000. @@ -455,16 +470,18 @@ class IsoTpMessage(): for i in range(start, end, num_bytes): self.tx_idx += 1 # consecutive tx messages - msg = (bytes([0x20 | (self.tx_idx & 0xF)]) + self.tx_dat[i:i+num_bytes]).ljust(self.max_len, b"\x00") + msg = (bytes([0x20 | (self.tx_idx & 0xF)]) + self.tx_dat[i:i + num_bytes]).ljust(self.max_len, b"\x00") tx_msgs.append(msg) # send consecutive tx messages self._can_client.send(tx_msgs, delay=delay_sec) if end >= self.tx_len: self.tx_done = True - if self.debug: print(f"ISO-TP: TX - consecutive frame - idx={self.tx_idx} done={self.tx_done}") + if self.debug: + print(f"ISO-TP: TX - consecutive frame - idx={self.tx_idx} done={self.tx_done}") elif rx_data[0] == 0x31: # wait (do nothing until next flow control message) - if self.debug: print("ISO-TP: TX - flow control wait") + if self.debug: + print("ISO-TP: TX - flow control wait") FUNCTIONAL_ADDRS = [0x7DF, 0x18DB33F1] @@ -478,13 +495,13 @@ def get_rx_addr_for_tx_addr(tx_addr): if tx_addr > 0x10000000 and tx_addr < 0xFFFFFFFF: # standard 29 bit response addr (flip last two bytes) - return (tx_addr & 0xFFFF0000) + (tx_addr<<8 & 0xFF00) + (tx_addr>>8 & 0xFF) + return (tx_addr & 0xFFFF0000) + (tx_addr << 8 & 0xFF00) + (tx_addr >> 8 & 0xFF) raise ValueError("invalid tx_addr: {}".format(tx_addr)) class UdsClient(): - def __init__(self, panda, tx_addr: int, rx_addr: int=None, bus: int=0, timeout: float=1, debug: bool=False): + def __init__(self, panda, tx_addr: int, rx_addr: int = None, bus: int = 0, timeout: float = 1, debug: bool = False): self.bus = bus self.tx_addr = tx_addr self.rx_addr = rx_addr if rx_addr is not None else get_rx_addr_for_tx_addr(tx_addr) @@ -493,7 +510,7 @@ class UdsClient(): self._can_client = CanClient(panda.can_send, panda.can_recv, self.tx_addr, self.rx_addr, self.bus, debug=self.debug) # generic uds request - def _uds_request(self, service_type: SERVICE_TYPE, subfunction: int=None, data: bytes=None) -> bytes: + def _uds_request(self, service_type: SERVICE_TYPE, subfunction: int = None, data: bytes = None) -> bytes: req = bytes([service_type]) if subfunction is not None: req += bytes([subfunction]) @@ -525,12 +542,13 @@ class UdsClient(): error_desc = resp[3:].hex() # wait for another message if response pending if error_code == 0x78: - if self.debug: print("UDS-RX: response pending") + if self.debug: + print("UDS-RX: response pending") continue raise NegativeResponseError('{} - {}'.format(service_desc, error_desc), service_id, error_code) # positive response - if service_type+0x40 != resp_sid: + if service_type + 0x40 != resp_sid: resp_sid_hex = hex(resp_sid) if resp_sid is not None else None raise InvalidServiceIdError('invalid response service id: {}'.format(resp_sid_hex)) @@ -554,7 +572,7 @@ class UdsClient(): power_down_time = resp[0] return power_down_time - def security_access(self, access_type: ACCESS_TYPE, security_key: bytes=None): + def security_access(self, access_type: ACCESS_TYPE, security_key: bytes = None): request_seed = access_type % 2 != 0 if request_seed and security_key is not None: raise ValueError('security_key not allowed') @@ -572,12 +590,10 @@ class UdsClient(): def tester_present(self, ): self._uds_request(SERVICE_TYPE.TESTER_PRESENT, subfunction=0x00) - def access_timing_parameter(self, timing_parameter_type: TIMING_PARAMETER_TYPE, parameter_values: bytes=None): + def access_timing_parameter(self, timing_parameter_type: TIMING_PARAMETER_TYPE, parameter_values: bytes = None): write_custom_values = timing_parameter_type == TIMING_PARAMETER_TYPE.SET_TO_GIVEN_VALUES - read_values = ( - timing_parameter_type == TIMING_PARAMETER_TYPE.READ_CURRENTLY_ACTIVE or - timing_parameter_type == TIMING_PARAMETER_TYPE.READ_EXTENDED_SET - ) + read_values = (timing_parameter_type == TIMING_PARAMETER_TYPE.READ_CURRENTLY_ACTIVE or + timing_parameter_type == TIMING_PARAMETER_TYPE.READ_EXTENDED_SET) if not write_custom_values and parameter_values is not None: raise ValueError('parameter_values not allowed') if write_custom_values and parameter_values is None: @@ -616,7 +632,7 @@ class UdsClient(): "data": resp[2:], # TODO: parse the reset of response } - def link_control(self, link_control_type: LINK_CONTROL_TYPE, baud_rate_type: BAUD_RATE_TYPE=None): + def link_control(self, link_control_type: LINK_CONTROL_TYPE, baud_rate_type: BAUD_RATE_TYPE = None): data: Optional[bytes] if link_control_type == LINK_CONTROL_TYPE.VERIFY_BAUDRATE_TRANSITION_WITH_FIXED_BAUDRATE: @@ -638,19 +654,19 @@ class UdsClient(): raise ValueError('invalid response data identifier: {}'.format(hex(resp_id))) return resp[2:] - def read_memory_by_address(self, memory_address: int, memory_size: int, memory_address_bytes: int=4, memory_size_bytes: int=1): + def read_memory_by_address(self, memory_address: int, memory_size: int, memory_address_bytes: int = 4, memory_size_bytes: int = 1): if memory_address_bytes < 1 or memory_address_bytes > 4: raise ValueError('invalid memory_address_bytes: {}'.format(memory_address_bytes)) if memory_size_bytes < 1 or memory_size_bytes > 4: raise ValueError('invalid memory_size_bytes: {}'.format(memory_size_bytes)) - data = bytes([memory_size_bytes<<4 | memory_address_bytes]) + data = bytes([memory_size_bytes << 4 | memory_address_bytes]) - if memory_address >= 1<<(memory_address_bytes*8): + if memory_address >= 1 << (memory_address_bytes * 8): raise ValueError('invalid memory_address: {}'.format(memory_address)) - data += struct.pack('!I', memory_address)[4-memory_address_bytes:] - if memory_size >= 1<<(memory_size_bytes*8): + data += struct.pack('!I', memory_address)[4 - memory_address_bytes:] + if memory_size >= 1 << (memory_size_bytes * 8): raise ValueError('invalid memory_size: {}'.format(memory_size)) - data += struct.pack('!I', memory_size)[4-memory_size_bytes:] + data += struct.pack('!I', memory_size)[4 - memory_size_bytes:] resp = self._uds_request(SERVICE_TYPE.READ_MEMORY_BY_ADDRESS, subfunction=None, data=data) return resp @@ -668,7 +684,7 @@ class UdsClient(): data = bytes([transmission_mode_type, periodic_data_identifier]) self._uds_request(SERVICE_TYPE.READ_DATA_BY_PERIODIC_IDENTIFIER, subfunction=None, data=data) - def dynamically_define_data_identifier(self, dynamic_definition_type: DYNAMIC_DEFINITION_TYPE, dynamic_data_identifier: int, source_definitions: List[DynamicSourceDefinition], memory_address_bytes: int=4, memory_size_bytes: int=1): + def dynamically_define_data_identifier(self, dynamic_definition_type: DYNAMIC_DEFINITION_TYPE, dynamic_data_identifier: int, source_definitions: List[DynamicSourceDefinition], memory_address_bytes: int = 4, memory_size_bytes: int = 1): if memory_address_bytes < 1 or memory_address_bytes > 4: raise ValueError('invalid memory_address_bytes: {}'.format(memory_address_bytes)) if memory_size_bytes < 1 or memory_size_bytes > 4: @@ -679,14 +695,14 @@ class UdsClient(): for s in source_definitions: data += struct.pack('!H', s.data_identifier) + bytes([s.position, s.memory_size]) elif dynamic_definition_type == DYNAMIC_DEFINITION_TYPE.DEFINE_BY_MEMORY_ADDRESS: - data += bytes([memory_size_bytes<<4 | memory_address_bytes]) + data += bytes([memory_size_bytes << 4 | memory_address_bytes]) for s in source_definitions: - if s.memory_address >= 1<<(memory_address_bytes*8): + if s.memory_address >= 1 << (memory_address_bytes * 8): raise ValueError('invalid memory_address: {}'.format(s.memory_address)) - data += struct.pack('!I', s.memory_address)[4-memory_address_bytes:] - if s.memory_size >= 1<<(memory_size_bytes*8): + data += struct.pack('!I', s.memory_address)[4 - memory_address_bytes:] + if s.memory_size >= 1 << (memory_size_bytes * 8): raise ValueError('invalid memory_size: {}'.format(s.memory_size)) - data += struct.pack('!I', s.memory_size)[4-memory_size_bytes:] + data += struct.pack('!I', s.memory_size)[4 - memory_size_bytes:] elif dynamic_definition_type == DYNAMIC_DEFINITION_TYPE.CLEAR_DYNAMICALLY_DEFINED_DATA_IDENTIFIER: pass else: @@ -700,19 +716,19 @@ class UdsClient(): if resp_id != data_identifier_type: raise ValueError('invalid response data identifier: {}'.format(hex(resp_id))) - def write_memory_by_address(self, memory_address: int, memory_size: int, data_record: bytes, memory_address_bytes: int=4, memory_size_bytes: int=1): + def write_memory_by_address(self, memory_address: int, memory_size: int, data_record: bytes, memory_address_bytes: int = 4, memory_size_bytes: int = 1): if memory_address_bytes < 1 or memory_address_bytes > 4: raise ValueError('invalid memory_address_bytes: {}'.format(memory_address_bytes)) if memory_size_bytes < 1 or memory_size_bytes > 4: raise ValueError('invalid memory_size_bytes: {}'.format(memory_size_bytes)) - data = bytes([memory_size_bytes<<4 | memory_address_bytes]) + data = bytes([memory_size_bytes << 4 | memory_address_bytes]) - if memory_address >= 1<<(memory_address_bytes*8): + if memory_address >= 1 << (memory_address_bytes * 8): raise ValueError('invalid memory_address: {}'.format(memory_address)) - data += struct.pack('!I', memory_address)[4-memory_address_bytes:] - if memory_size >= 1<<(memory_size_bytes*8): + data += struct.pack('!I', memory_address)[4 - memory_address_bytes:] + if memory_size >= 1 << (memory_size_bytes * 8): raise ValueError('invalid memory_size: {}'.format(memory_size)) - data += struct.pack('!I', memory_size)[4-memory_size_bytes:] + data += struct.pack('!I', memory_size)[4 - memory_size_bytes:] data += data_record self._uds_request(SERVICE_TYPE.WRITE_MEMORY_BY_ADDRESS, subfunction=0x00, data=data) @@ -721,43 +737,43 @@ class UdsClient(): data = struct.pack('!I', dtc_group_type)[1:] # 3 bytes self._uds_request(SERVICE_TYPE.CLEAR_DIAGNOSTIC_INFORMATION, subfunction=None, data=data) - def read_dtc_information(self, dtc_report_type: DTC_REPORT_TYPE, dtc_status_mask_type: DTC_STATUS_MASK_TYPE=DTC_STATUS_MASK_TYPE.ALL, dtc_severity_mask_type: DTC_SEVERITY_MASK_TYPE=DTC_SEVERITY_MASK_TYPE.ALL, dtc_mask_record: int=0xFFFFFF, dtc_snapshot_record_num: int=0xFF, dtc_extended_record_num: int=0xFF): + def read_dtc_information(self, dtc_report_type: DTC_REPORT_TYPE, dtc_status_mask_type: DTC_STATUS_MASK_TYPE = DTC_STATUS_MASK_TYPE.ALL, dtc_severity_mask_type: DTC_SEVERITY_MASK_TYPE = DTC_SEVERITY_MASK_TYPE.ALL, dtc_mask_record: int = 0xFFFFFF, dtc_snapshot_record_num: int = 0xFF, dtc_extended_record_num: int = 0xFF): data = b'' # dtc_status_mask_type if dtc_report_type == DTC_REPORT_TYPE.NUMBER_OF_DTC_BY_STATUS_MASK or \ - dtc_report_type == DTC_REPORT_TYPE.DTC_BY_STATUS_MASK or \ - dtc_report_type == DTC_REPORT_TYPE.MIRROR_MEMORY_DTC_BY_STATUS_MASK or \ - dtc_report_type == DTC_REPORT_TYPE.NUMBER_OF_MIRROR_MEMORY_DTC_BY_STATUS_MASK or \ - dtc_report_type == DTC_REPORT_TYPE.NUMBER_OF_EMISSIONS_RELATED_OBD_DTC_BY_STATUS_MASK or \ - dtc_report_type == DTC_REPORT_TYPE.EMISSIONS_RELATED_OBD_DTC_BY_STATUS_MASK: - data += bytes([dtc_status_mask_type]) + dtc_report_type == DTC_REPORT_TYPE.DTC_BY_STATUS_MASK or \ + dtc_report_type == DTC_REPORT_TYPE.MIRROR_MEMORY_DTC_BY_STATUS_MASK or \ + dtc_report_type == DTC_REPORT_TYPE.NUMBER_OF_MIRROR_MEMORY_DTC_BY_STATUS_MASK or \ + dtc_report_type == DTC_REPORT_TYPE.NUMBER_OF_EMISSIONS_RELATED_OBD_DTC_BY_STATUS_MASK or \ + dtc_report_type == DTC_REPORT_TYPE.EMISSIONS_RELATED_OBD_DTC_BY_STATUS_MASK: + data += bytes([dtc_status_mask_type]) # dtc_mask_record if dtc_report_type == DTC_REPORT_TYPE.DTC_SNAPSHOT_IDENTIFICATION or \ - dtc_report_type == DTC_REPORT_TYPE.DTC_SNAPSHOT_RECORD_BY_DTC_NUMBER or \ - dtc_report_type == DTC_REPORT_TYPE.DTC_EXTENDED_DATA_RECORD_BY_DTC_NUMBER or \ - dtc_report_type == DTC_REPORT_TYPE.MIRROR_MEMORY_DTC_EXTENDED_DATA_RECORD_BY_DTC_NUMBER or \ - dtc_report_type == DTC_REPORT_TYPE.SEVERITY_INFORMATION_OF_DTC: - data += struct.pack('!I', dtc_mask_record)[1:] # 3 bytes + dtc_report_type == DTC_REPORT_TYPE.DTC_SNAPSHOT_RECORD_BY_DTC_NUMBER or \ + dtc_report_type == DTC_REPORT_TYPE.DTC_EXTENDED_DATA_RECORD_BY_DTC_NUMBER or \ + dtc_report_type == DTC_REPORT_TYPE.MIRROR_MEMORY_DTC_EXTENDED_DATA_RECORD_BY_DTC_NUMBER or \ + dtc_report_type == DTC_REPORT_TYPE.SEVERITY_INFORMATION_OF_DTC: + data += struct.pack('!I', dtc_mask_record)[1:] # 3 bytes # dtc_snapshot_record_num if dtc_report_type == DTC_REPORT_TYPE.DTC_SNAPSHOT_IDENTIFICATION or \ - dtc_report_type == DTC_REPORT_TYPE.DTC_SNAPSHOT_RECORD_BY_DTC_NUMBER or \ - dtc_report_type == DTC_REPORT_TYPE.DTC_SNAPSHOT_RECORD_BY_RECORD_NUMBER: - data += bytes([dtc_snapshot_record_num]) + dtc_report_type == DTC_REPORT_TYPE.DTC_SNAPSHOT_RECORD_BY_DTC_NUMBER or \ + dtc_report_type == DTC_REPORT_TYPE.DTC_SNAPSHOT_RECORD_BY_RECORD_NUMBER: + data += bytes([dtc_snapshot_record_num]) # dtc_extended_record_num if dtc_report_type == DTC_REPORT_TYPE.DTC_EXTENDED_DATA_RECORD_BY_DTC_NUMBER or \ - dtc_report_type == DTC_REPORT_TYPE.MIRROR_MEMORY_DTC_EXTENDED_DATA_RECORD_BY_DTC_NUMBER: - data += bytes([dtc_extended_record_num]) + dtc_report_type == DTC_REPORT_TYPE.MIRROR_MEMORY_DTC_EXTENDED_DATA_RECORD_BY_DTC_NUMBER: + data += bytes([dtc_extended_record_num]) # dtc_severity_mask_type if dtc_report_type == DTC_REPORT_TYPE.NUMBER_OF_DTC_BY_SEVERITY_MASK_RECORD or \ - dtc_report_type == DTC_REPORT_TYPE.DTC_BY_SEVERITY_MASK_RECORD: - data += bytes([dtc_severity_mask_type, dtc_status_mask_type]) + dtc_report_type == DTC_REPORT_TYPE.DTC_BY_SEVERITY_MASK_RECORD: + data += bytes([dtc_severity_mask_type, dtc_status_mask_type]) resp = self._uds_request(SERVICE_TYPE.READ_DTC_INFORMATION, subfunction=dtc_report_type, data=data) # TODO: parse response return resp - def input_output_control_by_identifier(self, data_identifier_type: DATA_IDENTIFIER_TYPE, control_parameter_type: CONTROL_PARAMETER_TYPE, control_option_record: bytes, control_enable_mask_record: bytes=b''): + def input_output_control_by_identifier(self, data_identifier_type: DATA_IDENTIFIER_TYPE, control_parameter_type: CONTROL_PARAMETER_TYPE, control_option_record: bytes, control_enable_mask_record: bytes = b''): data = struct.pack('!H', data_identifier_type) + bytes([control_parameter_type]) + control_option_record + control_enable_mask_record resp = self._uds_request(SERVICE_TYPE.INPUT_OUTPUT_CONTROL_BY_IDENTIFIER, subfunction=None, data=data) resp_id = struct.unpack('!H', resp[0:2])[0] if len(resp) >= 2 else None @@ -765,7 +781,7 @@ class UdsClient(): raise ValueError('invalid response data identifier: {}'.format(hex(resp_id))) return resp[2:] - def routine_control(self, routine_control_type: ROUTINE_CONTROL_TYPE, routine_identifier_type: ROUTINE_IDENTIFIER_TYPE, routine_option_record: bytes=b''): + def routine_control(self, routine_control_type: ROUTINE_CONTROL_TYPE, routine_identifier_type: ROUTINE_IDENTIFIER_TYPE, routine_option_record: bytes = b''): data = struct.pack('!H', routine_identifier_type) + routine_option_record resp = self._uds_request(SERVICE_TYPE.ROUTINE_CONTROL, subfunction=routine_control_type, data=data) resp_id = struct.unpack('!H', resp[0:2])[0] if len(resp) >= 2 else None @@ -773,57 +789,57 @@ class UdsClient(): raise ValueError('invalid response routine identifier: {}'.format(hex(resp_id))) return resp[2:] - def request_download(self, memory_address: int, memory_size: int, memory_address_bytes: int=4, memory_size_bytes: int=4, data_format: int=0x00): + def request_download(self, memory_address: int, memory_size: int, memory_address_bytes: int = 4, memory_size_bytes: int = 4, data_format: int = 0x00): data = bytes([data_format]) if memory_address_bytes < 1 or memory_address_bytes > 4: raise ValueError('invalid memory_address_bytes: {}'.format(memory_address_bytes)) if memory_size_bytes < 1 or memory_size_bytes > 4: raise ValueError('invalid memory_size_bytes: {}'.format(memory_size_bytes)) - data += bytes([memory_size_bytes<<4 | memory_address_bytes]) + data += bytes([memory_size_bytes << 4 | memory_address_bytes]) - if memory_address >= 1<<(memory_address_bytes*8): + if memory_address >= 1 << (memory_address_bytes * 8): raise ValueError('invalid memory_address: {}'.format(memory_address)) - data += struct.pack('!I', memory_address)[4-memory_address_bytes:] - if memory_size >= 1<<(memory_size_bytes*8): + data += struct.pack('!I', memory_address)[4 - memory_address_bytes:] + if memory_size >= 1 << (memory_size_bytes * 8): raise ValueError('invalid memory_size: {}'.format(memory_size)) - data += struct.pack('!I', memory_size)[4-memory_size_bytes:] + data += struct.pack('!I', memory_size)[4 - memory_size_bytes:] resp = self._uds_request(SERVICE_TYPE.REQUEST_DOWNLOAD, subfunction=None, data=data) max_num_bytes_len = resp[0] >> 4 if len(resp) > 0 else 0 if max_num_bytes_len >= 1 and max_num_bytes_len <= 4: - max_num_bytes = struct.unpack('!I', (b"\x00"*(4-max_num_bytes_len))+resp[1:max_num_bytes_len+1])[0] + max_num_bytes = struct.unpack('!I', (b"\x00" * (4 - max_num_bytes_len)) + resp[1:max_num_bytes_len + 1])[0] else: raise ValueError('invalid max_num_bytes_len: {}'.format(max_num_bytes_len)) return max_num_bytes # max number of bytes per transfer data request - def request_upload(self, memory_address: int, memory_size: int, memory_address_bytes: int=4, memory_size_bytes: int=4, data_format: int=0x00): + def request_upload(self, memory_address: int, memory_size: int, memory_address_bytes: int = 4, memory_size_bytes: int = 4, data_format: int = 0x00): data = bytes([data_format]) if memory_address_bytes < 1 or memory_address_bytes > 4: raise ValueError('invalid memory_address_bytes: {}'.format(memory_address_bytes)) if memory_size_bytes < 1 or memory_size_bytes > 4: raise ValueError('invalid memory_size_bytes: {}'.format(memory_size_bytes)) - data += bytes([memory_size_bytes<<4 | memory_address_bytes]) + data += bytes([memory_size_bytes << 4 | memory_address_bytes]) - if memory_address >= 1<<(memory_address_bytes*8): + if memory_address >= 1 << (memory_address_bytes * 8): raise ValueError('invalid memory_address: {}'.format(memory_address)) - data += struct.pack('!I', memory_address)[4-memory_address_bytes:] - if memory_size >= 1<<(memory_size_bytes*8): + data += struct.pack('!I', memory_address)[4 - memory_address_bytes:] + if memory_size >= 1 << (memory_size_bytes * 8): raise ValueError('invalid memory_size: {}'.format(memory_size)) - data += struct.pack('!I', memory_size)[4-memory_size_bytes:] + data += struct.pack('!I', memory_size)[4 - memory_size_bytes:] resp = self._uds_request(SERVICE_TYPE.REQUEST_UPLOAD, subfunction=None, data=data) max_num_bytes_len = resp[0] >> 4 if len(resp) > 0 else 0 if max_num_bytes_len >= 1 and max_num_bytes_len <= 4: - max_num_bytes = struct.unpack('!I', (b"\x00"*(4-max_num_bytes_len))+resp[1:max_num_bytes_len+1])[0] + max_num_bytes = struct.unpack('!I', (b"\x00" * (4 - max_num_bytes_len)) + resp[1:max_num_bytes_len + 1])[0] else: raise ValueError('invalid max_num_bytes_len: {}'.format(max_num_bytes_len)) return max_num_bytes # max number of bytes per transfer data request - def transfer_data(self, block_sequence_count: int, data: bytes=b''): + def transfer_data(self, block_sequence_count: int, data: bytes = b''): data = bytes([block_sequence_count]) + data resp = self._uds_request(SERVICE_TYPE.TRANSFER_DATA, subfunction=None, data=data) resp_id = resp[0] if len(resp) > 0 else None diff --git a/python/update.py b/python/update.py index 7d8d2c0..f8e3280 100755 --- a/python/update.py +++ b/python/update.py @@ -42,4 +42,3 @@ def ensure_st_up_to_date(): if __name__ == "__main__": ensure_st_up_to_date() - diff --git a/setup.py b/setup.py index 6c9de15..ecb2699 100644 --- a/setup.py +++ b/setup.py @@ -1,4 +1,4 @@ -#-*- coding: utf-8 -*- +# -*- coding: utf-8 -*- """ Panda CAN Controller Dongle @@ -41,7 +41,7 @@ setup( author_email='', packages=[ 'panda', - ], + ], package_dir={'panda': 'python'}, platforms='any', license='MIT', diff --git a/tests/automated/0_builds.py b/tests/automated/0_builds.py index fa3a112..95030ef 100644 --- a/tests/automated/0_builds.py +++ b/tests/automated/0_builds.py @@ -5,4 +5,3 @@ def test_build_panda(): def test_build_bootstub_panda(): build_st("obj/bootstub.panda.bin") - diff --git a/tests/automated/3_usb_to_can.py b/tests/automated/3_usb_to_can.py index 282a37e..86eb9ec 100644 --- a/tests/automated/3_usb_to_can.py +++ b/tests/automated/3_usb_to_can.py @@ -21,9 +21,9 @@ def test_can_loopback(p): p.set_can_loopback(True) if p.legacy: - busses = [0,1] + busses = [0, 1] else: - busses = [0,1,2] + busses = [0, 1, 2] for bus in busses: # set bus 0 speed to 250 @@ -78,8 +78,8 @@ def test_reliability(p): p.set_can_loopback(True) p.set_can_speed_kbps(0, 1000) - addrs = list(range(100, 100+MSG_COUNT)) - ts = [(j, 0, b"\xaa"*8, 0) for j in addrs] + addrs = list(range(100, 100 + MSG_COUNT)) + ts = [(j, 0, b"\xaa" * 8, 0) for j in addrs] # 100 loops for i in range(LOOP_COUNT): @@ -99,7 +99,7 @@ def test_reliability(p): assert_equal(len(r), 200) # take sub 20ms - et = (time.time()-st)*1000.0 + et = (time.time() - st) * 1000.0 assert_less(et, 20) sys.stdout.write("P") @@ -117,7 +117,7 @@ def test_throughput(p): # enable CAN loopback mode p.set_can_loopback(True) - for speed in [100,250,500,750,1000]: + for speed in [100, 250, 500, 750, 1000]: # set bus 0 speed to speed p.set_can_speed_kbps(0, speed) time.sleep(0.05) @@ -125,7 +125,7 @@ def test_throughput(p): comp_kbps = time_many_sends(p, 0) # bit count from https://en.wikipedia.org/wiki/CAN_bus - saturation_pct = (comp_kbps/speed) * 100.0 + saturation_pct = (comp_kbps / speed) * 100.0 assert_greater(saturation_pct, 80) assert_less(saturation_pct, 100) diff --git a/tests/automated/5_wifi_functionality.py b/tests/automated/5_wifi_functionality.py index 88c81dc..f467400 100644 --- a/tests/automated/5_wifi_functionality.py +++ b/tests/automated/5_wifi_functionality.py @@ -31,7 +31,7 @@ def test_throughput(serials=None): p = Panda("WIFI") - for speed in [100,250,500,750,1000]: + for speed in [100, 250, 500, 750, 1000]: # set bus 0 speed to speed p.set_can_speed_kbps(0, speed) time.sleep(0.1) @@ -39,7 +39,7 @@ def test_throughput(serials=None): comp_kbps = time_many_sends(p, 0) # bit count from https://en.wikipedia.org/wiki/CAN_bus - saturation_pct = (comp_kbps/speed) * 100.0 + saturation_pct = (comp_kbps / speed) * 100.0 #assert_greater(saturation_pct, 80) #assert_less(saturation_pct, 100) @@ -60,10 +60,10 @@ def test_recv_only(serials=None): pwifi = Panda("WIFI") # TODO: msg_count=1000 drops packets, is this fixable? - for msg_count in [10,100,200]: + for msg_count in [10, 100, 200]: speed = 500 p.set_can_speed_kbps(0, speed) comp_kbps = time_many_sends(p, 0, pwifi, msg_count) - saturation_pct = (comp_kbps/speed) * 100.0 + saturation_pct = (comp_kbps / speed) * 100.0 print("HT WIFI loopback %d messages at speed %d, comp speed is %.2f, percent %.2f" % (msg_count, speed, comp_kbps, saturation_pct)) diff --git a/tests/automated/6_wifi_udp.py b/tests/automated/6_wifi_udp.py index 5361ae5..bee5130 100644 --- a/tests/automated/6_wifi_udp.py +++ b/tests/automated/6_wifi_udp.py @@ -33,8 +33,8 @@ def test_udp_doesnt_drop(serials=None): speed = 500 p.set_can_speed_kbps(0, speed) - comp_kbps = time_many_sends(p, 0, pwifi, msg_count=msg_count, msg_id=0x100+i) - saturation_pct = (comp_kbps/speed) * 100.0 + comp_kbps = time_many_sends(p, 0, pwifi, msg_count=msg_count, msg_id=0x100 + i) + saturation_pct = (comp_kbps / speed) * 100.0 if msg_count == 1: sys.stdout.write(".") @@ -45,7 +45,7 @@ def test_udp_doesnt_drop(serials=None): assert_less(saturation_pct, 100) saturation_pcts.append(saturation_pct) if len(saturation_pcts) > 0: - assert_greater(sum(saturation_pcts)/len(saturation_pcts), 60) + assert_greater(sum(saturation_pcts) / len(saturation_pcts), 60) time.sleep(5) usb_ok_cnt = 0 diff --git a/tests/automated/7_can_loopback.py b/tests/automated/7_can_loopback.py index f3f79ae..691ef57 100644 --- a/tests/automated/7_can_loopback.py +++ b/tests/automated/7_can_loopback.py @@ -17,12 +17,12 @@ def test_send_recv(p): p_send.set_can_loopback(False) p_recv.set_can_loopback(False) - p_send.can_send_many([(0x1ba, 0, b"message", 0)]*2) + p_send.can_send_many([(0x1ba, 0, b"message", 0)] * 2) time.sleep(0.05) p_recv.can_recv() p_send.can_recv() - busses = [0,1,2] + busses = [0, 1, 2] for bus in busses: for speed in [100, 250, 500, 750, 1000]: @@ -35,7 +35,7 @@ def test_send_recv(p): comp_kbps = time_many_sends(p_send, bus, p_recv, two_pandas=True) - saturation_pct = (comp_kbps/speed) * 100.0 + saturation_pct = (comp_kbps / speed) * 100.0 assert_greater(saturation_pct, 80) assert_less(saturation_pct, 100) @@ -70,12 +70,12 @@ def test_latency(p): p_recv.set_can_speed_kbps(0, 100) time.sleep(0.05) - p_send.can_send_many([(0x1ba, 0, b"testmsg", 0)]*10) + p_send.can_send_many([(0x1ba, 0, b"testmsg", 0)] * 10) time.sleep(0.05) p_recv.can_recv() p_send.can_recv() - busses = [0,1,2] + busses = [0, 1, 2] for bus in busses: for speed in [100, 250, 500, 750, 1000]: @@ -107,24 +107,24 @@ def test_latency(p): if len(r) == 0 or len(r_echo) == 0: print("r: {}, r_echo: {}".format(r, r_echo)) - assert_equal(len(r),1) - assert_equal(len(r_echo),1) + assert_equal(len(r), 1) + assert_equal(len(r_echo), 1) - et = (et - st)*1000.0 - comp_kbps = (1+11+1+1+1+4+8*8+15+1+1+1+7) / et - latency = et - ((1+11+1+1+1+4+8*8+15+1+1+1+7) / speed) + et = (et - st) * 1000.0 + comp_kbps = (1 + 11 + 1 + 1 + 1 + 4 + 8 * 8 + 15 + 1 + 1 + 1 + 7) / et + latency = et - ((1 + 11 + 1 + 1 + 1 + 4 + 8 * 8 + 15 + 1 + 1 + 1 + 7) / speed) assert_less(latency, 5.0) - saturation_pct = (comp_kbps/speed) * 100.0 + saturation_pct = (comp_kbps / speed) * 100.0 latencies.append(latency) comp_kbps_list.append(comp_kbps) saturation_pcts.append(saturation_pct) - average_latency = sum(latencies)/num_messages + average_latency = sum(latencies) / num_messages assert_less(average_latency, 1.0) - average_comp_kbps = sum(comp_kbps_list)/num_messages - average_saturation_pct = sum(saturation_pcts)/num_messages + average_comp_kbps = sum(comp_kbps_list) / num_messages + average_saturation_pct = sum(saturation_pcts) / num_messages print("two pandas bus {}, {} message average at speed {:4d}, latency is {:5.3f}ms, comp speed is {:7.2f}, percent {:6.2f}" .format(bus, num_messages, speed, average_latency, average_comp_kbps, average_saturation_pct)) @@ -157,14 +157,14 @@ def test_gen2_loopback(p): if bus == 3: obd = True bus = 1 - + # Clear buses clear_can_buffers(p_send) clear_can_buffers(p_recv) # Send a random string addr = random.randint(1, 2000) - string = b"test"+os.urandom(4) + string = b"test" + os.urandom(4) p_send.set_obd(obd) p_recv.set_obd(obd) time.sleep(0.2) @@ -178,7 +178,7 @@ def test_gen2_loopback(p): # Check content assert content[0][0] == addr and content[0][2] == string - + # Check bus assert content[0][3] == bus @@ -210,20 +210,20 @@ def test_bulk_write(p): def flood_tx(panda): print('Sending!') - msg = b"\xaa"*4 + msg = b"\xaa" * 4 packet = [[0xaa, None, msg, 0], [0xaa, None, msg, 1], [0xaa, None, msg, 2]] * NUM_MESSAGES_PER_BUS # Disable timeout panda.can_send_many(packet, timeout=0) print(f"Done sending {3*NUM_MESSAGES_PER_BUS} messages!") - + # Start heartbeat start_heartbeat_thread(p) # Set safety mode and power saving p.set_safety_mode(Panda.SAFETY_ALLOUTPUT) p.set_power_save(False) - + # Start transmisson threading.Thread(target=flood_tx, args=(p,)).start() @@ -237,7 +237,7 @@ def test_bulk_write(p): print(f"Received {len(rx)} messages") # All messages should have been received - if len(rx) != 3*NUM_MESSAGES_PER_BUS: + if len(rx) != 3 * NUM_MESSAGES_PER_BUS: Exception("Did not receive all messages!") # Set back to silent mode diff --git a/tests/automated/helpers.py b/tests/automated/helpers.py index a5e1506..dbe9e72 100644 --- a/tests/automated/helpers.py +++ b/tests/automated/helpers.py @@ -86,10 +86,10 @@ def time_many_sends(p, bus, p_recv=None, msg_count=100, msg_id=None, two_pandas= raise ValueError("Cannot have two pandas that are the same panda") start_time = time.time() - p.can_send_many([(msg_id, 0, b"\xaa"*8, bus)]*msg_count) + p.can_send_many([(msg_id, 0, b"\xaa" * 8, bus)] * msg_count) r = [] r_echo = [] - r_len_expected = msg_count if two_pandas else msg_count*2 + r_len_expected = msg_count if two_pandas else msg_count * 2 r_echo_len_exected = msg_count if two_pandas else 0 while len(r) < r_len_expected and (time.time() - start_time) < 5: @@ -109,8 +109,8 @@ def time_many_sends(p, bus, p_recv=None, msg_count=100, msg_id=None, two_pandas= assert_equal(len(resp), msg_count) assert_equal(len(sent_echo), msg_count) - end_time = (end_time-start_time)*1000.0 - comp_kbps = (1+11+1+1+1+4+8*8+15+1+1+1+7)*msg_count / end_time + end_time = (end_time - start_time) * 1000.0 + comp_kbps = (1 + 11 + 1 + 1 + 1 + 4 + 8 * 8 + 15 + 1 + 1 + 1 + 7) * msg_count / end_time return comp_kbps diff --git a/tests/automated/timeout.py b/tests/automated/timeout.py index f937844..f20f770 100644 --- a/tests/automated/timeout.py +++ b/tests/automated/timeout.py @@ -10,16 +10,16 @@ def run_with_timeout(timeout, fn, *kwargs): except Exception as e: print(e) raise e - + process = Process(target=runner, args=(fn, kwargs)) process.start() counter = 0 while process.is_alive(): time.sleep(INTERVAL) - counter+=1 + counter += 1 if (counter * INTERVAL) > timeout: process.terminate() raise TimeoutError("Function timed out!") if process.exitcode != 0: - raise RuntimeError("Test failed with exit code: ", str(process.exitcode)) \ No newline at end of file + raise RuntimeError("Test failed with exit code: ", str(process.exitcode)) diff --git a/tests/automated/wifi_helpers.py b/tests/automated/wifi_helpers.py index d3be8ff..f92aee9 100644 --- a/tests/automated/wifi_helpers.py +++ b/tests/automated/wifi_helpers.py @@ -32,7 +32,7 @@ def _connect_wifi(dongle_id, pw, insecure_okay=False): print("WIFI: scanning %d" % cnt) os.system("iwlist %s scanning > /dev/null" % wlan_interface) os.system("nmcli device wifi rescan") - wifi_networks = [x.decode("utf8") for x in subprocess.check_output(["nmcli","dev", "wifi", "list"]).split(b"\n")] + wifi_networks = [x.decode("utf8") for x in subprocess.check_output(["nmcli", "dev", "wifi", "list"]).split(b"\n")] wifi_scan = [x for x in wifi_networks if ssid in x] if len(wifi_scan) != 0: break @@ -60,7 +60,7 @@ def _connect_wifi(dongle_id, pw, insecure_okay=False): r = requests.get("http://192.168.0.10/") except requests.ConnectionError: r = requests.get("http://192.168.0.10/") - assert r.status_code==200 + assert r.status_code == 200 print("securing") try: @@ -87,4 +87,4 @@ def _connect_wifi(dongle_id, pw, insecure_okay=False): if ping_ok: break - # TODO: confirm that it's connected to the right panda \ No newline at end of file + # TODO: confirm that it's connected to the right panda diff --git a/tests/black_loopback_test.py b/tests/black_loopback_test.py index 5fe0c48..bcfcaef 100755 --- a/tests/black_loopback_test.py +++ b/tests/black_loopback_test.py @@ -12,10 +12,10 @@ import random import argparse sys.path.append(os.path.join(os.path.dirname(os.path.realpath(__file__)), "..")) -from panda import Panda +from panda import Panda # noqa: E402 def get_test_string(): - return b"test"+os.urandom(10) + return b"test" + os.urandom(10) def run_test(sleep_duration): pandas = Panda.list() diff --git a/tests/black_white_loopback_test.py b/tests/black_white_loopback_test.py index d700068..9ef9f06 100755 --- a/tests/black_white_loopback_test.py +++ b/tests/black_white_loopback_test.py @@ -12,10 +12,10 @@ import random import argparse sys.path.append(os.path.join(os.path.dirname(os.path.realpath(__file__)), "..")) -from panda import Panda +from panda import Panda # noqa: E402 def get_test_string(): - return b"test"+os.urandom(10) + return b"test" + os.urandom(10) counter = 0 nonzero_bus_errors = 0 @@ -124,7 +124,7 @@ def test_buses(black_panda, other_panda, direction, test_array, sleep_duration): loop_buses = [] for loop in cans_loop: if (loop[0] != at) or (loop[2] != st): - content_errors += 1 + content_errors += 1 print(" Loop on bus", str(loop[3])) loop_buses.append(loop[3]) @@ -138,9 +138,9 @@ def test_buses(black_panda, other_panda, direction, test_array, sleep_duration): loop_buses.sort() if(recv_buses != loop_buses): if len(loop_buses) == 0: - zero_bus_errors += 1 + zero_bus_errors += 1 else: - nonzero_bus_errors += 1 + nonzero_bus_errors += 1 if not os.getenv("NOASSERT"): assert False else: diff --git a/tests/black_white_relay_endurance.py b/tests/black_white_relay_endurance.py index d3d61be..f86182d 100755 --- a/tests/black_white_relay_endurance.py +++ b/tests/black_white_relay_endurance.py @@ -12,10 +12,10 @@ import random import argparse sys.path.append(os.path.join(os.path.dirname(os.path.realpath(__file__)), "..")) -from panda import Panda +from panda import Panda # noqa: E402 def get_test_string(): - return b"test"+os.urandom(10) + return b"test" + os.urandom(10) counter = 0 nonzero_bus_errors = 0 @@ -70,8 +70,8 @@ def run_test(sleep_duration): runtime = time.time() - start_time print("Number of cycles:", counter, "Non-zero bus errors:", nonzero_bus_errors, "Zero bus errors:", zero_bus_errors, "Content errors:", content_errors, "Runtime: ", runtime) - if (time.time() - temp_start_time) > 3600*6: - # Toggle relay + if (time.time() - temp_start_time) > 3600 * 6: + # Toggle relay black_panda.set_safety_mode(Panda.SAFETY_SILENT) time.sleep(1) black_panda.set_safety_mode(Panda.SAFETY_ALLOUTPUT) @@ -105,7 +105,7 @@ def test_buses(black_panda, other_panda, direction, test_array, sleep_duration): if direction: other_panda.can_clear(recv_bus) else: - black_panda.can_clear(recv_bus) + black_panda.can_clear(recv_bus) black_panda.can_recv() other_panda.can_recv() @@ -130,7 +130,7 @@ def test_buses(black_panda, other_panda, direction, test_array, sleep_duration): loop_buses = [] for loop in cans_loop: if (loop[0] != at) or (loop[2] != st): - content_errors += 1 + content_errors += 1 print(" Loop on bus", str(loop[3])) loop_buses.append(loop[3]) @@ -144,9 +144,9 @@ def test_buses(black_panda, other_panda, direction, test_array, sleep_duration): loop_buses.sort() if(recv_buses != loop_buses): if len(loop_buses) == 0: - zero_bus_errors += 1 + zero_bus_errors += 1 else: - nonzero_bus_errors += 1 + nonzero_bus_errors += 1 if not os.getenv("NOASSERT"): assert False else: diff --git a/tests/black_white_relay_test.py b/tests/black_white_relay_test.py index f12e42d..3480d0e 100755 --- a/tests/black_white_relay_test.py +++ b/tests/black_white_relay_test.py @@ -11,10 +11,10 @@ import random import argparse sys.path.append(os.path.join(os.path.dirname(os.path.realpath(__file__)), "..")) -from panda import Panda +from panda import Panda # noqa: E402 def get_test_string(): - return b"test"+os.urandom(10) + return b"test" + os.urandom(10) counter = 0 open_errors = 0 @@ -25,7 +25,6 @@ def run_test(sleep_duration): global counter, open_errors, closed_errors, content_errors pandas = Panda.list() - #pandas = ["540046000c51363338383037", "07801b800f51363038363036"] print(pandas) # make sure two pandas are connected diff --git a/tests/bulk_write_test.py b/tests/bulk_write_test.py index a99ab28..45761d2 100755 --- a/tests/bulk_write_test.py +++ b/tests/bulk_write_test.py @@ -10,7 +10,7 @@ NUM_MESSAGES_PER_BUS = 10000 def flood_tx(panda): print('Sending!') - msg = b"\xaa"*4 + msg = b"\xaa" * 4 packet = [[0xaa, None, msg, 0], [0xaa, None, msg, 1], [0xaa, None, msg, 2]] * NUM_MESSAGES_PER_BUS panda.can_send_many(packet) print(f"Done sending {3*NUM_MESSAGES_PER_BUS} messages!") diff --git a/tests/can_printer.py b/tests/can_printer.py index c8bbc3f..749ad14 100755 --- a/tests/can_printer.py +++ b/tests/can_printer.py @@ -7,7 +7,7 @@ from collections import defaultdict import binascii sys.path.append(os.path.join(os.path.dirname(os.path.realpath(__file__)), "..")) -from panda import Panda +from panda import Panda # noqa: E402 # fake def sec_since_boot(): @@ -23,15 +23,15 @@ def can_printer(): canbus = int(os.getenv("CAN", 0)) while True: can_recv = p.can_recv() - for address, _, dat, src in can_recv: + for address, _, dat, src in can_recv: if src == canbus: msgs[address].append(dat) if sec_since_boot() - lp > 0.1: dd = chr(27) + "[2J" dd += "%5.2f\n" % (sec_since_boot() - start) - for k,v in sorted(zip(list(msgs.keys()), [binascii.hexlify(x[-1]) for x in list(msgs.values())])): - dd += "%s(%6d) %s\n" % ("%04X(%4d)" % (k,k),len(msgs[k]), v) + for k, v in sorted(zip(list(msgs.keys()), [binascii.hexlify(x[-1]) for x in list(msgs.values())])): + dd += "%s(%6d) %s\n" % ("%04X(%4d)" % (k, k), len(msgs[k]), v) print(dd) lp = sec_since_boot() diff --git a/tests/debug_console.py b/tests/debug_console.py index 724e650..7fcb212 100755 --- a/tests/debug_console.py +++ b/tests/debug_console.py @@ -6,7 +6,7 @@ import time import select sys.path.append(os.path.join(os.path.dirname(os.path.realpath(__file__)), "..")) -from panda import Panda +from panda import Panda # noqa: E402 setcolor = ["\033[1;32;40m", "\033[1;31;40m"] unsetcolor = "\033[00m" @@ -19,7 +19,7 @@ if __name__ == "__main__": serials = Panda.list() if os.getenv("SERIAL"): - serials = [x for x in serials if x==os.getenv("SERIAL")] + serials = [x for x in serials if x == os.getenv("SERIAL")] pandas = list([Panda(x, claim=claim) for x in serials]) @@ -46,4 +46,4 @@ if __name__ == "__main__": time.sleep(0.01) except Exception: print("panda disconnected!") - time.sleep(0.5); + time.sleep(0.5) diff --git a/tests/development/register_hashmap_spread.py b/tests/development/register_hashmap_spread.py index 2894849..ee4fc7b 100755 --- a/tests/development/register_hashmap_spread.py +++ b/tests/development/register_hashmap_spread.py @@ -32,7 +32,7 @@ def hash(reg_addr): hashes = [] double_hashes = [] for (start_addr, stop_addr) in REGISTER_ADDRESS_REGIONS: - for addr in range(start_addr, stop_addr+1, BYTES_PER_REG): + for addr in range(start_addr, stop_addr + 1, BYTES_PER_REG): h = hash(addr) hashes.append(h) double_hashes.append(hash(h)) diff --git a/tests/disable_esp.py b/tests/disable_esp.py index ce1dd6d..1194c2e 100755 --- a/tests/disable_esp.py +++ b/tests/disable_esp.py @@ -1,4 +1,3 @@ #!/usr/bin/env python3 from panda import Panda Panda().set_esp_power(False) - diff --git a/tests/echo.py b/tests/echo.py index 9ef0cf1..f89c5da 100755 --- a/tests/echo.py +++ b/tests/echo.py @@ -5,7 +5,7 @@ import time import _thread sys.path.append(os.path.join(os.path.dirname(os.path.realpath(__file__)), "..")) -from panda import Panda +from panda import Panda # noqa: E402 # This script is intended to be used in conjunction with the echo_loopback_test.py test script from panda jungle. # It sends a reversed response back for every message received containing b"test". @@ -15,7 +15,7 @@ def heartbeat_thread(p): try: p.send_heartbeat() time.sleep(1) - except: + except Exception: break # Resend every CAN message that has been received on the same bus, but with the data reversed @@ -31,5 +31,3 @@ if __name__ == "__main__": address, notused, data, bus = message if b'test' in data: p.can_send(address, data[::-1], bus) - - diff --git a/tests/elm_car_simulator.py b/tests/elm_car_simulator.py index f9aba19..8c8360a 100755 --- a/tests/elm_car_simulator.py +++ b/tests/elm_car_simulator.py @@ -1,4 +1,6 @@ #!/usr/bin/env python3 +# flake8: noqa + """Used to Reverse/Test ELM protocol auto detect and OBD message response without a car.""" import sys @@ -10,7 +12,7 @@ import threading from collections import deque sys.path.append(os.path.join(os.path.dirname(os.path.realpath(__file__)), "..")) -from panda import Panda +from panda import Panda # noqa: E402 def lin_checksum(dat): return sum(dat) % 0x100 @@ -114,8 +116,8 @@ class ELMCarSimulator(): #SEND = 0x33 # Car OBD Functional Address headers = struct.pack("BBB", PHYS_ADDR | len(msg), RECV, to_addr) if not self.__silent: - print(" Sending LIN", binascii.hexlify(headers+msg), - hex(sum(bytearray(headers+msg))%0x100)) + print(" Sending LIN", binascii.hexlify(headers + msg), + hex(sum(bytearray(headers + msg)) % 0x100)) self.panda.kline_send(headers + msg) def __reset_lin_timeout(self): @@ -152,14 +154,14 @@ class ELMCarSimulator(): if len(outmsg) <= 5: self._lin_send(0x10, obd_header + outmsg) else: - first_msg_len = min(4, len(outmsg)%4) or 4 + first_msg_len = min(4, len(outmsg) % 4) or 4 self._lin_send(0x10, obd_header + b'\x01' + - b'\x00'*(4-first_msg_len) + + b'\x00' * (4 - first_msg_len) + outmsg[:first_msg_len]) for num, i in enumerate(range(first_msg_len, len(outmsg), 4)): self._lin_send(0x10, obd_header + - struct.pack('B', (num+2)%0x100) + outmsg[i:i+4]) + struct.pack('B', (num + 2) % 0x100) + outmsg[i:i + 4]) ######################### # CAN related functions # @@ -201,11 +203,11 @@ class ELMCarSimulator(): def _can_send(self, addr, msg): if not self.__silent: print(" CAN Reply (%x)" % addr, binascii.hexlify(msg)) - self.panda.can_send(addr, msg + b'\x00'*(8-len(msg)), 0) + self.panda.can_send(addr, msg + b'\x00' * (8 - len(msg)), 0) if self.__can_extra_noise_msgs: noise = self.__can_extra_noise_msgs.popleft() self.panda.can_send(noise[0] if noise[0] is not None else addr, - noise[1] + b'\x00'*(8-len(noise[1])), 0) + noise[1] + b'\x00' * (8 - len(noise[1])), 0) def _can_addr_matches(self, addr): if self.__can11b and (addr == 0x7DF or (addr & 0x7F8) == 0x7E0): @@ -216,7 +218,7 @@ class ELMCarSimulator(): def __can_process_msg(self, mode, pid, address, ts, data, src): if not self.__silent: - print("CAN MSG", binascii.hexlify(data[1:1+data[0]]), + print("CAN MSG", binascii.hexlify(data[1:1 + data[0]]), "Addr:", hex(address), "Mode:", hex(mode)[2:].zfill(2), "PID:", hex(pid)[2:].zfill(2), "canLen:", len(data), binascii.hexlify(data)) @@ -225,7 +227,7 @@ class ELMCarSimulator(): outmsg = None if data[:3] == b'\x30\x00\x00' and len(self.__can_multipart_data): if not self.__silent: - print("Request for more data"); + print("Request for more data") outaddr = 0x7E8 if address == 0x7DF or address == 0x7E0 else 0x18DAF110 msgnum = 1 while(self.__can_multipart_data): @@ -233,7 +235,7 @@ class ELMCarSimulator(): msgpiece = struct.pack("B", 0x20 | msgnum) + self.__can_multipart_data[:datalen] self._can_send(outaddr, msgpiece) self.__can_multipart_data = self.__can_multipart_data[7:] - msgnum = (msgnum+1)%0x10 + msgnum = (msgnum + 1) % 0x10 time.sleep(0.01) else: @@ -244,13 +246,13 @@ class ELMCarSimulator(): if len(outmsg) <= 5: self._can_send(outaddr, - struct.pack("BBB", len(outmsg)+2, 0x40|data[1], pid) + outmsg) + struct.pack("BBB", len(outmsg) + 2, 0x40 | data[1], pid) + outmsg) else: - first_msg_len = min(3, len(outmsg)%7) - payload_len = len(outmsg)+3 - msgpiece = struct.pack("BBBBB", 0x10 | ((payload_len>>8)&0xF), - payload_len&0xFF, - 0x40|data[1], pid, 1) + outmsg[:first_msg_len] + first_msg_len = min(3, len(outmsg) % 7) + payload_len = len(outmsg) + 3 + msgpiece = struct.pack("BBBBB", 0x10 | ((payload_len >> 8) & 0xF), + payload_len & 0xFF, + 0x40 | data[1], pid, 1) + outmsg[:first_msg_len] self._can_send(outaddr, msgpiece) self.__can_multipart_data = outmsg[first_msg_len:] @@ -260,7 +262,7 @@ class ELMCarSimulator(): def _process_obd(self, mode, pid): if mode == 0x01: # Mode: Show current data - if pid == 0x00: #List supported things + if pid == 0x00: # List supported things return b"\xff\xff\xff\xfe" # b"\xBE\x1F\xB8\x10" #Bitfield, random features elif pid == 0x01: # Monitor Status since DTC cleared return b"\x00\x00\x00\x00" # Bitfield, random features @@ -284,7 +286,7 @@ class ELMCarSimulator(): if pid == 0x02: # Show VIN return b"1D4GP00R55B123456" if pid == 0xFC: # test long multi message. Ligned up for LIN responses - return b''.join((struct.pack(">BBH", 0xAA, 0xAA, num+1) for num in range(80))) + return b''.join((struct.pack(">BBH", 0xAA, 0xAA, num + 1) for num in range(80))) if pid == 0xFD: # test long multi message parts = (b'\xAA\xAA\xAA' + struct.pack(">I", num) for num in range(80)) return b'\xAA\xAA\xAA' + b''.join(parts) @@ -292,8 +294,8 @@ class ELMCarSimulator(): parts = (b'\xAA\xAA\xAA' + struct.pack(">I", num) for num in range(584)) return b'\xAA\xAA\xAA' + b''.join(parts) + b'\xAA' if pid == 0xFF: - return b'\xAA\x00\x00' +\ - b"".join(((b'\xAA'*5)+struct.pack(">H", num+1) for num in range(584))) + return b'\xAA\x00\x00' + \ + b"".join(((b'\xAA' * 5) + struct.pack(">H", num + 1) for num in range(584))) #return b"\xAA"*100#(0xFFF-3) @@ -312,6 +314,7 @@ if __name__ == "__main__": sim.can_mode_29b() import signal + def signal_handler(signal, frame): print('\nShutting down simulator') sim.stop() diff --git a/tests/elm_throughput.py b/tests/elm_throughput.py index 2895b09..75fb1c6 100755 --- a/tests/elm_throughput.py +++ b/tests/elm_throughput.py @@ -31,8 +31,6 @@ def send_msg(s, msg): if __name__ == "__main__": s = socket.create_connection(("192.168.0.10", 35000)) - #t1 = Reader(s) - #t1.start() send_msg(s, b"ATZ\r") send_msg(s, b"ATL1\r") print(send_msg(s, b"ATE0\r")) diff --git a/tests/elm_wifi.py b/tests/elm_wifi.py index 6fad90d..726795a 100644 --- a/tests/elm_wifi.py +++ b/tests/elm_wifi.py @@ -1,3 +1,5 @@ +# flake8: noqa + import os import sys import time @@ -32,7 +34,7 @@ def send_compare(s, dat, ret, timeout=4): ready = select.select([s], [], [], timeout) if not ready[0]: print("current recv data:", repr(res)) - break; + break res += s.recv(1000) #print("final recv data: '%s'" % repr(res)) assert ret == res # , "Data does not agree (%s) (%s)"%(repr(ret), repr(res)) @@ -64,7 +66,7 @@ def test_elm_cli(): #Test Echo Off #Expected to be misimplimentation, but this is how the reference device behaved. send_compare(s, b'ATE0\r', b'ATE0\rOK\r\r>') # Here is the odd part - send_compare(s, b'ATE0\r', b'OK\r\r>') #Should prob show this immediately + send_compare(s, b'ATE0\r', b'OK\r\r>') # Should prob show this immediately send_compare(s, b'ATI\r', b'ELM327 v1.5\r\r>') #Test Newline On @@ -281,8 +283,8 @@ def test_elm_send_lin_multiline_msg_throughput(): send_compare(s, b'09fc\r', # headers OFF, Spaces OFF b"BUS INIT: OK\r" + - b''.join((b'49FC' + hex(num+1)[2:].upper().zfill(2) + - b'AAAA' + hex(num+1)[2:].upper().zfill(4) + b'\r' + b''.join((b'49FC' + hex(num + 1)[2:].upper().zfill(2) + + b'AAAA' + hex(num + 1)[2:].upper().zfill(4) + b'\r' for num in range(80))) + b"\r>", timeout=10 @@ -299,7 +301,7 @@ def test_elm_panda_safety_mode_KWPFast(): p_car.kline_drain() p_elm = Panda("WIFI") - p_elm.set_safety_mode(Panda.SAFETY_ELM327); + p_elm.set_safety_mode(Panda.SAFETY_ELM327) def get_checksum(dat): result = 0 @@ -310,8 +312,8 @@ def test_elm_panda_safety_mode_KWPFast(): t = time.time() msg = bytearray() - while time.time()-t < 0.5 and len(msg) != len(goodmsg): - msg += p._handle.controlRead(Panda.REQUEST_OUT, 0xe0, bus, 0, len(goodmsg)-len(msg)) + while time.time() - t < 0.5 and len(msg) != len(goodmsg): + msg += p._handle.controlRead(Panda.REQUEST_OUT, 0xe0, bus, 0, len(goodmsg) - len(msg)) #print("Received", repr(msg)) if msg == goodmsg: return True @@ -543,10 +545,10 @@ def test_elm_send_can_multiline_msg_throughput(): rows = 584 send_compare(s, b'09ff\r', # headers ON, Spaces OFF - ("7E8" + "1" + hex((rows*7)+6)[2:].upper().zfill(3) + "49FF01"+"AA0000\r" + + ("7E8" + "1" + hex((rows * 7) + 6)[2:].upper().zfill(3) + "49FF01" + "AA0000\r" + "".join( - ("7E82"+hex((num+1)%0x10)[2:].upper()+("AA"*5) + - hex(num+1)[2:].upper().zfill(4) + "\r" for num in range(rows)) + ("7E82" + hex((num + 1) % 0x10)[2:].upper() + ("AA" * 5) + + hex(num + 1)[2:].upper().zfill(4) + "\r" for num in range(rows)) ) + "\r>").encode(), timeout=10 ) @@ -623,7 +625,7 @@ def test_elm_panda_safety_mode_ISO15765(): p_car.set_safety_mode(Panda.SAFETY_ALLOUTPUT) p_elm = Panda("WIFI") - p_elm.set_safety_mode(Panda.SAFETY_ELM327); + p_elm.set_safety_mode(Panda.SAFETY_ELM327) #sim = elm_car_simulator.ELMCarSimulator(serial, lin=False) #sim.start() @@ -631,7 +633,7 @@ def test_elm_panda_safety_mode_ISO15765(): def did_send(p, addr, dat, bus): p.can_send(addr, dat, bus) t = time.time() - while time.time()-t < 0.5: + while time.time() - t < 0.5: msg = p.can_recv() for addrin, _, datin, busin in msg: if (0x80 | bus) == busin and addr == addrin and datin == dat: diff --git a/tests/fan_test.py b/tests/fan_test.py index 7385698..410c63b 100755 --- a/tests/fan_test.py +++ b/tests/fan_test.py @@ -4,7 +4,7 @@ import sys import time sys.path.append(os.path.join(os.path.dirname(os.path.realpath(__file__)), "..")) -from panda import Panda +from panda import Panda # noqa: E402 power = 0 if __name__ == "__main__": @@ -14,4 +14,4 @@ if __name__ == "__main__": time.sleep(5) print("Power: ", power, "RPM: ", str(p.get_fan_rpm())) power += 10 - power %=100 + power %= 100 diff --git a/tests/get_version.py b/tests/get_version.py index 73e51e1..116d8ab 100755 --- a/tests/get_version.py +++ b/tests/get_version.py @@ -5,5 +5,3 @@ if __name__ == "__main__": for p in Panda.list(): pp = Panda(p) print("%s: %s" % (pp.get_serial()[0], pp.get_version())) - - diff --git a/tests/gmbitbang/rigol.py b/tests/gmbitbang/rigol.py index 5fcdf24..b061d95 100755 --- a/tests/gmbitbang/rigol.py +++ b/tests/gmbitbang/rigol.py @@ -33,4 +33,3 @@ plt.show() #data = (data - 130.0 - voltoffset/voltscale*25) / 25 * voltscale print(data) - diff --git a/tests/gmbitbang/test.py b/tests/gmbitbang/test.py index 0934c64..b804113 100755 --- a/tests/gmbitbang/test.py +++ b/tests/gmbitbang/test.py @@ -30,4 +30,3 @@ while 1: time.sleep(0.01) print(p2.can_recv()) #exit(0) - diff --git a/tests/gmbitbang/test_one.py b/tests/gmbitbang/test_one.py index 635cd2c..981edc5 100755 --- a/tests/gmbitbang/test_one.py +++ b/tests/gmbitbang/test_one.py @@ -20,4 +20,3 @@ while 1: iden += 1 p.can_send(iden, dat, bus=3) time.sleep(0.01) - diff --git a/tests/gps_stability_test.py b/tests/gps_stability_test.py index 529970e..49e5839 100755 --- a/tests/gps_stability_test.py +++ b/tests/gps_stability_test.py @@ -7,7 +7,7 @@ import random import threading sys.path.append(os.path.join(os.path.dirname(os.path.realpath(__file__)), "..")) -from panda import Panda, PandaSerial +from panda import Panda, PandaSerial # noqa: E402 INIT_GPS_BAUD = 9600 GPS_BAUD = 460800 @@ -45,7 +45,7 @@ def spam_buses_thread(panda): panda.set_safety_mode(Panda.SAFETY_ALLOUTPUT) while True: at = random.randint(1, 2000) - st = (b"test"+os.urandom(10))[0:8] + st = (b"test" + os.urandom(10))[0:8] bus = random.randint(0, 2) panda.can_send(at, st, bus) except Exception as e: @@ -77,7 +77,7 @@ def init_gps(panda): # Upping baud rate print("Upping GPS baud rate") - msg = str.encode(add_nmea_checksum("$PUBX,41,1,0007,0003,%d,0" % GPS_BAUD)+"\r\n") + msg = str.encode(add_nmea_checksum("$PUBX,41,1,0007,0003,%d,0" % GPS_BAUD) + "\r\n") ser.write(msg) time.sleep(1) # needs a wait for it to actually send @@ -116,10 +116,9 @@ def gps_read_thread(panda): while True: ret = ser.read(1024) time.sleep(0.001) - l = len(ret) - if l > 0: - received_messages+=1 - received_bytes+=l + if len(ret): + received_messages += 1 + received_bytes += len(ret) if send_something: ser.write("test") send_something = False @@ -149,11 +148,11 @@ if __name__ == "__main__": if(received_bytes < MIN_BYTES): print("Panda is not sending out enough data! Got " + str(received_messages) + " (" + str(received_bytes) + "B) in the last " + str(CHECK_PERIOD) + " seconds") send_something = True - min_failures+=1 + min_failures += 1 elif(received_bytes > MAX_BYTES): print("Panda is not sending out too much data! Got " + str(received_messages) + " (" + str(received_bytes) + "B) in the last " + str(CHECK_PERIOD) + " seconds") print("Probably not on the right baud rate, got reset somehow? Resetting...") - max_failures+=1 + max_failures += 1 init_gps(gps_panda) else: print("Got " + str(received_messages) + " (" + str(received_bytes) + "B) messages in the last " + str(CHECK_PERIOD) + " seconds.") diff --git a/tests/health_test.py b/tests/health_test.py index 69234c7..86e56f9 100755 --- a/tests/health_test.py +++ b/tests/health_test.py @@ -16,4 +16,3 @@ if __name__ == "__main__": print(panda.health()) print("\n") time.sleep(0.5) - diff --git a/tests/ir_test.py b/tests/ir_test.py index a65fe06..8566f37 100755 --- a/tests/ir_test.py +++ b/tests/ir_test.py @@ -4,7 +4,7 @@ import sys import time sys.path.append(os.path.join(os.path.dirname(os.path.realpath(__file__)), "..")) -from panda import Panda +from panda import Panda # noqa: E402 power = 0 if __name__ == "__main__": @@ -14,4 +14,4 @@ if __name__ == "__main__": print("Power: ", str(power)) time.sleep(1) power += 10 - power %=100 + power %= 100 diff --git a/tests/location_listener.py b/tests/location_listener.py index 1a784ba..d997327 100755 --- a/tests/location_listener.py +++ b/tests/location_listener.py @@ -4,7 +4,7 @@ import time import sys sys.path.append(os.path.join(os.path.dirname(os.path.realpath(__file__)), "..")) -from panda import Panda, PandaSerial +from panda import Panda, PandaSerial # noqa: 402 def add_nmea_checksum(msg): d = msg[1:] @@ -30,10 +30,10 @@ if __name__ == "__main__": baudrate = 460800 print("upping baud rate") - msg = str.encode(add_nmea_checksum("$PUBX,41,1,0007,0003,%d,0" % baudrate)+"\r\n") + msg = str.encode(add_nmea_checksum("$PUBX,41,1,0007,0003,%d,0" % baudrate) + "\r\n") print(msg) ser.write(msg) - time.sleep(0.1) # needs a wait for it to actually send + time.sleep(0.1) # needs a wait for it to actually send # new panda serial ser = PandaSerial(panda, 1, baudrate) @@ -43,5 +43,3 @@ if __name__ == "__main__": if len(ret) > 0: sys.stdout.write(ret.decode('ascii', 'ignore')) sys.stdout.flush() - #print str(ret).encode("hex") - diff --git a/tests/loopback_test.py b/tests/loopback_test.py index 60009fa..d7731d0 100755 --- a/tests/loopback_test.py +++ b/tests/loopback_test.py @@ -10,10 +10,10 @@ from hexdump import hexdump from itertools import permutations sys.path.append(os.path.join(os.path.dirname(os.path.realpath(__file__)), "..")) -from panda import Panda +from panda import Panda # noqa: E402 def get_test_string(): - return b"test"+os.urandom(10) + return b"test" + os.urandom(10) def run_test(sleep_duration): pandas = Panda.list() @@ -49,7 +49,7 @@ def run_test_w_pandas(pandas, sleep_duration): print("health", ho[0], h[ho[0]].health()) # **** test K/L line loopback **** - for bus in [2,3]: + for bus in [2, 3]: # flush the output h[ho[1]].kline_drain(bus=bus) diff --git a/tests/pedal/enter_canloader.py b/tests/pedal/enter_canloader.py index 4488c32..747aa0f 100755 --- a/tests/pedal/enter_canloader.py +++ b/tests/pedal/enter_canloader.py @@ -10,7 +10,6 @@ class CanHandle(object): self.p = p def transact(self, dat): - #print "W:",dat.encode("hex") self.p.isotp_send(1, dat, 0, recvaddr=2) def _handle_timeout(signum, frame): @@ -24,7 +23,6 @@ class CanHandle(object): finally: signal.alarm(0) - #print "R:",ret.encode("hex") return ret def controlWrite(self, request_type, request, value, index, data, timeout=0): @@ -38,7 +36,7 @@ class CanHandle(object): def bulkWrite(self, endpoint, data, timeout=0): if len(data) > 0x10: raise ValueError("Data must not be longer than 0x10") - dat = struct.pack("HH", endpoint, len(data))+data + dat = struct.pack("HH", endpoint, len(data)) + data return self.transact(dat) def bulkRead(self, endpoint, length, timeout=0): @@ -71,5 +69,3 @@ if __name__ == "__main__": Panda.flash_static(CanHandle(p), code) print("can flash done") - - diff --git a/tests/read_winusb_descriptors.py b/tests/read_winusb_descriptors.py index ea5e93f..c0ed1d6 100644 --- a/tests/read_winusb_descriptors.py +++ b/tests/read_winusb_descriptors.py @@ -10,20 +10,24 @@ if __name__ == "__main__": len = p._handle.controlRead(Panda.REQUEST_IN, 0x06, 3 << 8 | 238, 0, 1) print('Microsoft OS String Descriptor') dat = p._handle.controlRead(Panda.REQUEST_IN, 0x06, 3 << 8 | 238, 0, len[0]) - if DEBUG: print('LEN: {}'.format(hex(len[0]))) + if DEBUG: + print('LEN: {}'.format(hex(len[0]))) hexdump("".join(map(chr, dat))) ms_vendor_code = dat[16] - if DEBUG: print('MS_VENDOR_CODE: {}'.format(hex(len[0]))) + if DEBUG: + print('MS_VENDOR_CODE: {}'.format(hex(len[0]))) print('\nMicrosoft Compatible ID Feature Descriptor') len = p._handle.controlRead(Panda.REQUEST_IN, ms_vendor_code, 0, 4, 1) - if DEBUG: print('LEN: {}'.format(hex(len[0]))) + if DEBUG: + print('LEN: {}'.format(hex(len[0]))) dat = p._handle.controlRead(Panda.REQUEST_IN, ms_vendor_code, 0, 4, len[0]) hexdump("".join(map(chr, dat))) print('\nMicrosoft Extended Properties Feature Descriptor') len = p._handle.controlRead(Panda.REQUEST_IN, ms_vendor_code, 0, 5, 1) - if DEBUG: print('LEN: {}'.format(hex(len[0]))) + if DEBUG: + print('LEN: {}'.format(hex(len[0]))) dat = p._handle.controlRead(Panda.REQUEST_IN, ms_vendor_code, 0, 5, len[0]) hexdump("".join(map(chr, dat))) diff --git a/tests/rtc_test.py b/tests/rtc_test.py index 3732cb7..24edf09 100755 --- a/tests/rtc_test.py +++ b/tests/rtc_test.py @@ -4,10 +4,10 @@ import sys import datetime sys.path.append(os.path.join(os.path.dirname(os.path.realpath(__file__)), "..")) -from panda import Panda +from panda import Panda # noqa: E402 if __name__ == "__main__": p = Panda() - + p.set_datetime(datetime.datetime.now()) print(p.get_datetime()) diff --git a/tests/safety/common.py b/tests/safety/common.py index e2a397e..709427e 100644 --- a/tests/safety/common.py +++ b/tests/safety/common.py @@ -30,7 +30,7 @@ def package_can_msg(msg): return ret def make_msg(bus, addr, length=8): - return package_can_msg([addr, 0, b'\x00'*length, bus]) + return package_can_msg([addr, 0, b'\x00' * length, bus]) class CANPackerPanda(CANPacker): def make_can_msg_panda(self, name_or_addr, bus, values, counter=-1, fix_checksum=None): @@ -143,7 +143,7 @@ class TorqueSteeringSafetyTest(PandaSafetyTestBase): def test_steer_safety_check(self): for enabled in [0, 1]: - for t in range(-self.MAX_TORQUE*2, self.MAX_TORQUE*2): + for t in range(-self.MAX_TORQUE * 2, self.MAX_TORQUE * 2): self.safety.set_controls_allowed(enabled) self._set_prev_torque(t) if abs(t) > self.MAX_TORQUE or (not enabled and abs(t) > 0): @@ -207,14 +207,14 @@ class TorqueSteeringSafetyTest(PandaSafetyTestBase): for sign in [-1, 1]: self.safety.init_tests() self._set_prev_torque(0) - for t in np.arange(0, self.MAX_RT_DELTA+1, 1): + for t in np.arange(0, self.MAX_RT_DELTA + 1, 1): t *= sign self.safety.set_torque_meas(t, t) self.assertTrue(self._tx(self._torque_msg(t))) self.assertFalse(self._tx(self._torque_msg(sign * (self.MAX_RT_DELTA + 1)))) self._set_prev_torque(0) - for t in np.arange(0, self.MAX_RT_DELTA+1, 1): + for t in np.arange(0, self.MAX_RT_DELTA + 1, 1): t *= sign self.safety.set_torque_meas(t, t) self.assertTrue(self._tx(self._torque_msg(t))) @@ -230,17 +230,17 @@ class TorqueSteeringSafetyTest(PandaSafetyTestBase): self._rx(self._torque_meas_msg(t)) max_range = range(trq, trq + self.TORQUE_MEAS_TOLERANCE + 1) - min_range = range(-(trq+self.TORQUE_MEAS_TOLERANCE), -trq + 1) + min_range = range(-(trq + self.TORQUE_MEAS_TOLERANCE), -trq + 1) self.assertTrue(self.safety.get_torque_meas_min() in min_range) self.assertTrue(self.safety.get_torque_meas_max() in max_range) - max_range = range(0, self.TORQUE_MEAS_TOLERANCE+1) - min_range = range(-(trq+self.TORQUE_MEAS_TOLERANCE), -trq + 1) + max_range = range(0, self.TORQUE_MEAS_TOLERANCE + 1) + min_range = range(-(trq + self.TORQUE_MEAS_TOLERANCE), -trq + 1) self._rx(self._torque_meas_msg(0)) self.assertTrue(self.safety.get_torque_meas_min() in min_range) self.assertTrue(self.safety.get_torque_meas_max() in max_range) - max_range = range(0, self.TORQUE_MEAS_TOLERANCE+1) + max_range = range(0, self.TORQUE_MEAS_TOLERANCE + 1) min_range = range(-self.TORQUE_MEAS_TOLERANCE, 0 + 1) self._rx(self._torque_meas_msg(0)) self.assertTrue(self.safety.get_torque_meas_min() in min_range) @@ -320,7 +320,7 @@ class PandaSafetyTest(PandaSafetyTestBase): def test_prev_gas(self): self.assertFalse(self.safety.get_gas_pressed_prev()) - for pressed in [self.GAS_PRESSED_THRESHOLD+1, 0]: + for pressed in [self.GAS_PRESSED_THRESHOLD + 1, 0]: self._rx(self._gas_msg(pressed)) self.assertEqual(bool(pressed), self.safety.get_gas_pressed_prev()) @@ -335,14 +335,14 @@ class PandaSafetyTest(PandaSafetyTestBase): def test_disengage_on_gas(self): self._rx(self._gas_msg(0)) self.safety.set_controls_allowed(True) - self._rx(self._gas_msg(self.GAS_PRESSED_THRESHOLD+1)) + self._rx(self._gas_msg(self.GAS_PRESSED_THRESHOLD + 1)) self.assertFalse(self.safety.get_controls_allowed()) def test_unsafe_mode_no_disengage_on_gas(self): self._rx(self._gas_msg(0)) self.safety.set_controls_allowed(True) self.safety.set_unsafe_mode(UNSAFE_MODE.DISABLE_DISENGAGE_ON_GAS) - self._rx(self._gas_msg(self.GAS_PRESSED_THRESHOLD+1)) + self._rx(self._gas_msg(self.GAS_PRESSED_THRESHOLD + 1)) self.assertTrue(self.safety.get_controls_allowed()) def test_prev_brake(self): @@ -409,5 +409,5 @@ class PandaSafetyTest(PandaSafetyTestBase): self.assertFalse(self.safety.get_vehicle_moving()) # past threshold - self.safety.safety_rx_hook(self._speed_msg(self.STANDSTILL_THRESHOLD+1)) + self.safety.safety_rx_hook(self._speed_msg(self.STANDSTILL_THRESHOLD + 1)) self.assertTrue(self.safety.get_vehicle_moving()) diff --git a/tests/safety/test_chrysler.py b/tests/safety/test_chrysler.py index c3981a8..ca38b5c 100755 --- a/tests/safety/test_chrysler.py +++ b/tests/safety/test_chrysler.py @@ -53,7 +53,7 @@ class TestChryslerSafety(common.PandaSafetyTest, common.TorqueSteeringSafetyTest def _brake_msg(self, brake): values = {"BRAKE_PRESSED_2": 5 if brake else 0, - "COUNTER": self.cnt_brake % 16} + "COUNTER": self.cnt_brake % 16} self.__class__.cnt_brake += 1 return self.packer.make_can_msg_panda("BRAKE_2", 0, values) diff --git a/tests/safety/test_gm.py b/tests/safety/test_gm.py index 0a1fca5..897ff03 100644 --- a/tests/safety/test_gm.py +++ b/tests/safety/test_gm.py @@ -39,12 +39,17 @@ class TestGmSafety(common.PandaSafetyTest): self.safety.init_tests() # override these tests from PandaSafetyTest, GM uses button enable - def test_disable_control_allowed_from_cruise(self): pass - def test_enable_control_allowed_from_cruise(self): pass - def test_cruise_engaged_prev(self): pass + def test_disable_control_allowed_from_cruise(self): + pass + + def test_enable_control_allowed_from_cruise(self): + pass + + def test_cruise_engaged_prev(self): + pass def _speed_msg(self, speed): - values = {"%sWheelSpd"%s: speed for s in ["RL", "RR"]} + values = {"%sWheelSpd" % s: speed for s in ["RL", "RR"]} return self.packer.make_can_msg_panda("EBCMWheelSpdRear", 0, values) def _button_msg(self, buttons): @@ -109,7 +114,7 @@ class TestGmSafety(common.PandaSafetyTest): def test_gas_safety_check(self): for enabled in [0, 1]: - for g in range(0, 2**12-1): + for g in range(0, 2**12 - 1): self.safety.set_controls_allowed(enabled) if abs(g) > MAX_GAS or (not enabled and g != MAX_REGEN): self.assertFalse(self._tx(self._send_gas_msg(g))) diff --git a/tests/safety/test_honda.py b/tests/safety/test_honda.py index 755c215..aca985a 100755 --- a/tests/safety/test_honda.py +++ b/tests/safety/test_honda.py @@ -5,8 +5,7 @@ import numpy as np from panda import Panda from panda.tests.safety import libpandasafety_py import panda.tests.safety.common as common -from panda.tests.safety.common import CANPackerPanda, make_msg, \ - MAX_WRONG_COUNTERS, UNSAFE_MODE +from panda.tests.safety.common import CANPackerPanda, make_msg, MAX_WRONG_COUNTERS, UNSAFE_MODE class Btn: CANCEL = 2 @@ -36,9 +35,14 @@ class TestHondaSafety(common.PandaSafetyTest): raise unittest.SkipTest # override these inherited tests. honda doesn't use pcm enable - def test_disable_control_allowed_from_cruise(self): pass - def test_enable_control_allowed_from_cruise(self): pass - def test_cruise_engaged_prev(self): pass + def test_disable_control_allowed_from_cruise(self): + pass + + def test_enable_control_allowed_from_cruise(self): + pass + + def test_cruise_engaged_prev(self): + pass def _speed_msg(self, speed): values = {"XMISSION_SPEED": speed, "COUNTER": self.cnt_speed % 4} @@ -198,7 +202,7 @@ class TestHondaNidecSafety(TestHondaSafety, common.InterceptorSafetyTest): to_send = make_msg(0, addr, 6) gas2 = gas * 2 to_send[0].RDLR = ((gas & 0xff) << 8) | ((gas & 0xff00) >> 8) | \ - ((gas2 & 0xff) << 24) | ((gas2 & 0xff00) << 8) + ((gas2 & 0xff) << 24) | ((gas2 & 0xff00) << 8) return to_send def _send_brake_msg(self, brake): @@ -236,8 +240,8 @@ class TestHondaNidecSafety(TestHondaSafety, common.InterceptorSafetyTest): for mode in [UNSAFE_MODE.DEFAULT, UNSAFE_MODE.DISABLE_DISENGAGE_ON_GAS]: self.safety.set_unsafe_mode(mode) # gas_interceptor_prev > INTERCEPTOR_THRESHOLD - self._rx(self._interceptor_msg(self.INTERCEPTOR_THRESHOLD+1, 0x201)) - self._rx(self._interceptor_msg(self.INTERCEPTOR_THRESHOLD+1, 0x201)) + self._rx(self._interceptor_msg(self.INTERCEPTOR_THRESHOLD + 1, 0x201)) + self._rx(self._interceptor_msg(self.INTERCEPTOR_THRESHOLD + 1, 0x201)) allow_ctrl = mode == UNSAFE_MODE.DISABLE_DISENGAGE_ON_GAS self.safety.set_controls_allowed(1) diff --git a/tests/safety/test_hyundai.py b/tests/safety/test_hyundai.py index 8da310f..2910179 100644 --- a/tests/safety/test_hyundai.py +++ b/tests/safety/test_hyundai.py @@ -68,7 +68,7 @@ class TestHyundaiSafety(common.PandaSafetyTest): def _speed_msg(self, speed): # panda safety doesn't scale, so undo the scaling - values = {"WHL_SPD_%s"%s: speed*0.03125 for s in ["FL", "FR", "RL", "RR"]} + values = {"WHL_SPD_%s" % s: speed * 0.03125 for s in ["FL", "FR", "RL", "RR"]} values["WHL_SPD_AliveCounter_LSB"] = (self.cnt_speed % 16) & 0x3 values["WHL_SPD_AliveCounter_MSB"] = (self.cnt_speed % 16) >> 2 self.__class__.cnt_speed += 1 diff --git a/tests/safety/test_mazda.py b/tests/safety/test_mazda.py index f82816c..3ca0f6b 100755 --- a/tests/safety/test_mazda.py +++ b/tests/safety/test_mazda.py @@ -24,7 +24,7 @@ class TestMazdaSafety(common.PandaSafetyTest): RELAY_MALFUNCTION_BUS = 0 FWD_BLACKLISTED_ADDRS = {2: [0x243]} FWD_BUS_LOOKUP = {0: 2, 2: 0} - LKAS_ENABLE_SPEED = 52 + LKAS_ENABLE_SPEED = 52 LKAS_DISABLE_SPEED = 45 def setUp(self): diff --git a/tests/safety/test_nissan.py b/tests/safety/test_nissan.py index 63a32cc..971ec90 100644 --- a/tests/safety/test_nissan.py +++ b/tests/safety/test_nissan.py @@ -53,7 +53,7 @@ class TestNissanSafety(common.PandaSafetyTest): def _speed_msg(self, speed): # TODO: why the 3.6? m/s to kph? not in dbc - values = {"WHEEL_SPEED_%s"%s: speed*3.6 for s in ["RR", "RL"]} + values = {"WHEEL_SPEED_%s" % s: speed * 3.6 for s in ["RR", "RL"]} return self.packer.make_can_msg_panda("WHEEL_SPEEDS_REAR", 0, values) def _brake_msg(self, brake): diff --git a/tests/safety/test_subaru.py b/tests/safety/test_subaru.py index 6984746..38078d4 100644 --- a/tests/safety/test_subaru.py +++ b/tests/safety/test_subaru.py @@ -48,7 +48,7 @@ class TestSubaruSafety(common.PandaSafetyTest): def _speed_msg(self, speed): # subaru safety doesn't use the scaled value, so undo the scaling - values = {s: speed*0.057 for s in ["FR", "FL", "RR", "RL"]} + values = {s: speed * 0.057 for s in ["FR", "FL", "RR", "RL"]} values["Counter"] = self.cnt_speed % 4 self.__class__.cnt_speed += 1 return self.packer.make_can_msg_panda("Wheel_Speeds", 0, values) @@ -180,16 +180,23 @@ class TestSubaruLegacySafety(TestSubaruSafety): self.safety.init_tests() # subaru legacy safety doesn't have brake checks - def test_prev_brake(self): pass - def test_not_allow_brake_when_moving(self): pass - def test_allow_brake_at_zero_speed(self): pass + def test_prev_brake(self): + pass + + def test_not_allow_brake_when_moving(self): + pass + + def test_allow_brake_at_zero_speed(self): + pass + # doesn't have speed checks either - def test_sample_speed(self): pass + def test_sample_speed(self): + pass def _torque_driver_msg(self, torque): # TODO: figure out if this scaling factor from the DBC is right. # if it is, remove the scaling from here and put it in the safety code - values = {"Steer_Torque_Sensor": torque*8} + values = {"Steer_Torque_Sensor": torque * 8} return self.packer.make_can_msg_panda("Steering_Torque", 0, values) def _torque_msg(self, torque): diff --git a/tests/safety/test_toyota.py b/tests/safety/test_toyota.py index 52370e9..2b6bcea 100755 --- a/tests/safety/test_toyota.py +++ b/tests/safety/test_toyota.py @@ -53,7 +53,7 @@ class TestToyotaSafety(common.PandaSafetyTest, common.InterceptorSafetyTest, return self.packer.make_can_msg_panda("ACC_CONTROL", 0, values) def _speed_msg(self, s): - values = {("WHEEL_SPEED_%s"%n): s for n in ["FR", "FL", "RR", "RL"]} + values = {("WHEEL_SPEED_%s" % n): s for n in ["FR", "FL", "RR", "RL"]} return self.packer.make_can_msg_panda("WHEEL_SPEEDS", 0, values) def _brake_msg(self, pressed): @@ -86,7 +86,7 @@ class TestToyotaSafety(common.PandaSafetyTest, common.InterceptorSafetyTest, self.safety.set_controls_allowed(controls_allowed) self.safety.set_unsafe_mode(unsafe_mode) if controls_allowed: - should_tx = int(min_accel*1000) <= int(accel*1000) <= int(max_accel*1000) + should_tx = int(min_accel * 1000) <= int(accel * 1000) <= int(max_accel * 1000) else: should_tx = np.isclose(accel, 0, atol=0.0001) self.assertEqual(should_tx, self._tx(self._accel_msg(accel))) diff --git a/tests/safety/test_volkswagen_mqb.py b/tests/safety/test_volkswagen_mqb.py index 237891d..f8e7291 100644 --- a/tests/safety/test_volkswagen_mqb.py +++ b/tests/safety/test_volkswagen_mqb.py @@ -48,7 +48,8 @@ class TestVolkswagenMqbSafety(common.PandaSafetyTest): self.safety.init_tests() # override these inherited tests from PandaSafetyTest - def test_cruise_engaged_prev(self): pass + def test_cruise_engaged_prev(self): + pass def _set_prev_torque(self, t): self.safety.set_desired_torque_last(t) @@ -56,7 +57,7 @@ class TestVolkswagenMqbSafety(common.PandaSafetyTest): # Wheel speeds _esp_19_msg def _speed_msg(self, speed): - values = {"ESP_%s_Radgeschw_02"%s: speed for s in ["HL", "HR", "VL", "VR"]} + values = {"ESP_%s_Radgeschw_02" % s: speed for s in ["HL", "HR", "VL", "VR"]} return self.packer.make_can_msg_panda("ESP_19", 0, values) # Brake light switch _esp_05_msg @@ -80,21 +81,21 @@ class TestVolkswagenMqbSafety(common.PandaSafetyTest): # Driver steering input torque def _eps_01_msg(self, torque): values = {"Driver_Strain": abs(torque), "Driver_Strain_VZ": torque < 0, - "COUNTER": self.cnt_eps_01 % 16} + "COUNTER": self.cnt_eps_01 % 16} self.__class__.cnt_eps_01 += 1 return self.packer.make_can_msg_panda("EPS_01", 0, values) # openpilot steering output torque def _hca_01_msg(self, torque): values = {"Assist_Torque": abs(torque), "Assist_VZ": torque < 0, - "COUNTER": self.cnt_hca_01 % 16} + "COUNTER": self.cnt_hca_01 % 16} self.__class__.cnt_hca_01 += 1 return self.packer.make_can_msg_panda("HCA_01", 0, values) # Cruise control buttons def _gra_acc_01_msg(self, cancel=0, resume=0, _set=0): values = {"GRA_Abbrechen": cancel, "GRA_Tip_Setzen": _set, - "GRA_Tip_Wiederaufnahme": resume, "COUNTER": self.cnt_gra_acc_01 % 16} + "GRA_Tip_Wiederaufnahme": resume, "COUNTER": self.cnt_gra_acc_01 % 16} self.__class__.cnt_gra_acc_01 += 1 return self.packer.make_can_msg_panda("GRA_ACC_01", 0, values) diff --git a/tests/safety/test_volkswagen_pq.py b/tests/safety/test_volkswagen_pq.py index 11310e0..3ca4e4e 100644 --- a/tests/safety/test_volkswagen_pq.py +++ b/tests/safety/test_volkswagen_pq.py @@ -53,7 +53,8 @@ class TestVolkswagenPqSafety(common.PandaSafetyTest): self.safety.init_tests() # override these inherited tests from PandaSafetyTest - def test_cruise_engaged_prev(self): pass + def test_cruise_engaged_prev(self): + pass def _set_prev_torque(self, t): self.safety.set_desired_torque_last(t) diff --git a/tests/safety_replay/helpers.py b/tests/safety_replay/helpers.py index ee29469..572ab7d 100644 --- a/tests/safety_replay/helpers.py +++ b/tests/safety_replay/helpers.py @@ -87,4 +87,3 @@ def init_segment(safety, lr, mode): safety.set_controls_allowed(1) set_desired_torque_last(safety, mode, torque) assert safety.safety_tx_hook(to_send), "failed to initialize panda safety for segment" - diff --git a/tests/safety_replay/replay_drive.py b/tests/safety_replay/replay_drive.py index 60a8a89..c507754 100755 --- a/tests/safety_replay/replay_drive.py +++ b/tests/safety_replay/replay_drive.py @@ -23,7 +23,7 @@ def replay_drive(lr, safety_mode, param): for msg in lr: if start_t is None: start_t = msg.logMonoTime - safety.set_timer(((msg.logMonoTime // 1000)) % 0xFFFFFFFF) + safety.set_timer(((msg.logMonoTime // 1000)) % 0xFFFFFFFF) if msg.which() == 'sendcan': for canmsg in msg.sendcan: @@ -35,7 +35,7 @@ def replay_drive(lr, safety_mode, param): blocked_addrs.add(canmsg.address) if "DEBUG" in os.environ: - print("blocked bus %d msg %d at %f" % (canmsg.src, canmsg.address, (msg.logMonoTime - start_t)/(1e9))) + print("blocked bus %d msg %d at %f" % (canmsg.src, canmsg.address, (msg.logMonoTime - start_t) / (1e9))) tx_controls += safety.get_controls_allowed() tx_tot += 1 elif msg.which() == 'can': @@ -75,4 +75,3 @@ if __name__ == "__main__": print("replaying drive %s with safety mode %d and param %d" % (sys.argv[1], mode, param)) replay_drive(lr, mode, param) - diff --git a/tests/spam_can.py b/tests/spam_can.py index 1daffdb..8c80fb4 100755 --- a/tests/spam_can.py +++ b/tests/spam_can.py @@ -1,14 +1,13 @@ #!/usr/bin/env python3 - import os import sys import random sys.path.append(os.path.join(os.path.dirname(os.path.realpath(__file__)), "..")) -from panda import Panda +from panda import Panda # noqa: E402 def get_test_string(): - return b"test"+os.urandom(10) + return b"test" + os.urandom(10) if __name__ == "__main__": p = Panda() @@ -20,4 +19,4 @@ if __name__ == "__main__": st = get_test_string()[0:8] bus = random.randint(0, 2) p.can_send(at, st, bus) - #print("Sent message on bus: ", bus) + # print("Sent message on bus: ", bus) diff --git a/tests/standalone_test.py b/tests/standalone_test.py index 9e181d8..b3c2510 100755 --- a/tests/standalone_test.py +++ b/tests/standalone_test.py @@ -5,7 +5,7 @@ import struct import time sys.path.append(os.path.join(os.path.dirname(os.path.realpath(__file__)), "..")) -from panda import Panda +from panda import Panda # noqa: E402 if __name__ == "__main__": if os.getenv("WIFI") is not None: @@ -19,14 +19,14 @@ if __name__ == "__main__": for i in range(100): p.get_serial() t2 = time.time() - print("100 requests took %.2f ms" % ((t2-t1)*1000)) + print("100 requests took %.2f ms" % ((t2 - t1) * 1000)) p.set_safety_mode(Panda.SAFETY_ALLOUTPUT) a = 0 while True: # flood - msg = b"\xaa"*4 + struct.pack("I", a) + msg = b"\xaa" * 4 + struct.pack("I", a) p.can_send(0xaa, msg, 0) p.can_send(0xaa, msg, 1) p.can_send(0xaa, msg, 4) diff --git a/tests/throughput_test.py b/tests/throughput_test.py index c02f0f7..3b4617c 100755 --- a/tests/throughput_test.py +++ b/tests/throughput_test.py @@ -7,7 +7,7 @@ import time from tqdm import tqdm sys.path.append(os.path.join(os.path.dirname(os.path.realpath(__file__)), "..")) -from panda import Panda, PandaWifiStreaming +from panda import Panda, PandaWifiStreaming # noqa: E402 # test throughput between USB and wifi @@ -15,16 +15,11 @@ if __name__ == "__main__": print(Panda.list()) p_out = Panda("108018800f51363038363036") print(p_out.get_serial()) - #p_in = Panda("02001b000f51363038363036") p_in = Panda("WIFI") print(p_in.get_serial()) p_in = PandaWifiStreaming() # type: ignore - #while True: - # p_in.can_recv() - #sys.exit(0) - p_out.set_safety_mode(Panda.SAFETY_ALLOUTPUT) set_out, set_in = set(), set() @@ -36,8 +31,8 @@ if __name__ == "__main__": BATCH_SIZE = 16 for a in tqdm(list(range(0, 10000, BATCH_SIZE))): for b in range(0, BATCH_SIZE): - msg = b"\xaa"*4 + struct.pack("I", a+b) - if a%1 == 0: + msg = b"\xaa" * 4 + struct.pack("I", a + b) + if a % 1 == 0: p_out.can_send(0xaa, msg, 0) dat_out, dat_in = p_out.can_recv(), p_in.can_recv() diff --git a/tests/tucan_loopback.py b/tests/tucan_loopback.py index cb5d1cb..9153617 100755 --- a/tests/tucan_loopback.py +++ b/tests/tucan_loopback.py @@ -10,10 +10,10 @@ from hexdump import hexdump from itertools import permutations sys.path.append(os.path.join(os.path.dirname(os.path.realpath(__file__)), "..")) -from panda import Panda +from panda import Panda # noqa: E402 def get_test_string(): - return b"test"+os.urandom(10) + return b"test" + os.urandom(10) def run_test(sleep_duration): pandas = Panda.list() @@ -49,7 +49,7 @@ def run_test_w_pandas(pandas, sleep_duration): print("health", ho[0], h[ho[0]].health()) # **** test K/L line loopback **** - for bus in [2,3]: + for bus in [2, 3]: # flush the output h[ho[1]].kline_drain(bus=bus)