Enable almost all Flake8 checks (#548)

* fix W391

* E262

* E703

* W293

* some E

* E231

* some more E

* E225

* more E

* E252

* no tabs

* more tabs

* E701

* uds.py

* almost all of them

* only e265 left

* not sure why this is triggering on commented out code

* ignore esptool
master
Adeeb 2020-06-01 01:49:26 -07:00 committed by GitHub
parent 3d5a7179b0
commit d7f7b14118
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
71 changed files with 448 additions and 431 deletions

View File

@ -17,7 +17,9 @@ repos:
- id: flake8
exclude: '^(tests/automated)/'
args:
- --select=F
- --ignore=E111,E114,E121,E501,E302,E305,W504
- --exclude=python/esptool.py,tests/gmbitbang/*
- --statistics
- repo: local
hooks:
- id: pylint

View File

@ -26,7 +26,7 @@ def find_first_panda(context=None):
if __name__ == "__main__":
panda_dev = find_first_panda()
if panda_dev == None:
if panda_dev is None:
print("no device found")
sys.exit(0)
print("found device")

View File

@ -41,5 +41,3 @@ for fn in sys.argv[1:]:
print(' .rr = %s,' % to_c_uint32(rr))
print(' .exponent = %d,' % rsa.e)
print('};')

View File

@ -34,4 +34,3 @@ with open(sys.argv[2], "wb") as f:
sig = (hex(rsa_out)[2:].rjust(0x100, '0'))
x += binascii.unhexlify(sig)
f.write(x)

View File

@ -4,6 +4,7 @@ import sys
class Message():
"""Details about a specific message ID."""
def __init__(self, message_id):
self.message_id = message_id
self.ones = [0] * 8 # bit set if 1 is always seen
@ -32,7 +33,8 @@ class Info():
reader = csv.reader(input)
next(reader, None) # skip the CSV header
for row in reader:
if not len(row): continue
if not len(row):
continue
time = float(row[0])
bus = int(row[2])
if time < start or bus > 127:
@ -76,7 +78,8 @@ def PrintUnique(log_file, low_range, high_range):
if message_id in low.messages:
high.messages[message_id].printBitDiff(low.messages[message_id])
found = True
if not found: print('No messages that transition from always low to always high found!')
if not found:
print('No messages that transition from always low to always high found!')
if __name__ == "__main__":
if len(sys.argv) < 4:

View File

@ -15,7 +15,7 @@ def can_logger():
try:
p = Panda("WIFI")
except:
except Exception:
print("WiFi connection timed out. Please make sure your Panda is connected and try again.")
sys.exit(0)

View File

@ -24,6 +24,7 @@ import sys
class Message():
"""Details about a specific message ID."""
def __init__(self, message_id):
self.message_id = message_id
self.data = {} # keyed by hex string encoded message data

View File

@ -13,7 +13,6 @@ def get_panda_password():
sys.exit(0)
wifi = p.get_serial()
#print('[%s]' % ', '.join(map(str, wifi)))
print("SSID: " + wifi[0])
print("Password: " + wifi[1])

View File

@ -31,37 +31,43 @@ if __name__ == "__main__":
try:
data = uds_client.read_data_by_identifier(DATA_IDENTIFIER_TYPE.BOOT_SOFTWARE_IDENTIFICATION)
if data: resp[DATA_IDENTIFIER_TYPE.BOOT_SOFTWARE_IDENTIFICATION] = data
if data:
resp[DATA_IDENTIFIER_TYPE.BOOT_SOFTWARE_IDENTIFICATION] = data
except NegativeResponseError:
pass
try:
data = uds_client.read_data_by_identifier(DATA_IDENTIFIER_TYPE.APPLICATION_SOFTWARE_IDENTIFICATION)
if data: resp[DATA_IDENTIFIER_TYPE.APPLICATION_SOFTWARE_IDENTIFICATION] = data
if data:
resp[DATA_IDENTIFIER_TYPE.APPLICATION_SOFTWARE_IDENTIFICATION] = data
except NegativeResponseError:
pass
try:
data = uds_client.read_data_by_identifier(DATA_IDENTIFIER_TYPE.APPLICATION_DATA_IDENTIFICATION)
if data: resp[DATA_IDENTIFIER_TYPE.APPLICATION_DATA_IDENTIFICATION] = data
if data:
resp[DATA_IDENTIFIER_TYPE.APPLICATION_DATA_IDENTIFICATION] = data
except NegativeResponseError:
pass
try:
data = uds_client.read_data_by_identifier(DATA_IDENTIFIER_TYPE.BOOT_SOFTWARE_FINGERPRINT)
if data: resp[DATA_IDENTIFIER_TYPE.BOOT_SOFTWARE_FINGERPRINT] = data
if data:
resp[DATA_IDENTIFIER_TYPE.BOOT_SOFTWARE_FINGERPRINT] = data
except NegativeResponseError:
pass
try:
data = uds_client.read_data_by_identifier(DATA_IDENTIFIER_TYPE.APPLICATION_SOFTWARE_FINGERPRINT)
if data: resp[DATA_IDENTIFIER_TYPE.APPLICATION_SOFTWARE_FINGERPRINT] = data
if data:
resp[DATA_IDENTIFIER_TYPE.APPLICATION_SOFTWARE_FINGERPRINT] = data
except NegativeResponseError:
pass
try:
data = uds_client.read_data_by_identifier(DATA_IDENTIFIER_TYPE.APPLICATION_DATA_FINGERPRINT)
if data: resp[DATA_IDENTIFIER_TYPE.APPLICATION_DATA_FINGERPRINT] = data
if data:
resp[DATA_IDENTIFIER_TYPE.APPLICATION_DATA_FINGERPRINT] = data
except NegativeResponseError:
pass

View File

@ -55,6 +55,3 @@ if __name__ == "__main__":
load = struct.unpack(">B", get_current_data_for_pid(4)[2:])[0] / 255.0 * 100 # percent
print("%d KPH, %d RPM, %.1f%% Throttle, %d deg C, %.1f%% load" % (speed, rpm, throttle, temp, load))
time.sleep(0.2)

View File

@ -1,4 +1,6 @@
#!/usr/bin/env python3
# flake8: noqa
import sys
import binascii
from panda import Panda

View File

@ -161,7 +161,7 @@ class Panda(object):
self._handle = None
def connect(self, claim=True, wait=False):
if self._handle != None:
if self._handle is not None:
self.close()
if self._serial == "WIFI":
@ -176,7 +176,6 @@ class Panda(object):
while 1:
try:
for device in context.getDeviceList(skip_on_error=True):
#print(device)
if device.getVendorID() == 0xbbaa and device.getProductID() in [0xddcc, 0xddee]:
try:
this_serial = device.getSerialNumber()
@ -189,7 +188,7 @@ class Panda(object):
self.bootstub = device.getProductID() == 0xddee
self.legacy = (device.getbcdDevice() != 0x2300)
self._handle = device.open()
if not sys.platform in ["win32", "cygwin", "msys"]:
if sys.platform not in ["win32", "cygwin", "msys"]:
self._handle.setAutoDetachKernelDriver(True)
if claim:
self._handle.claimInterface(0)
@ -198,10 +197,10 @@ class Panda(object):
except Exception as e:
print("exception", e)
traceback.print_exc()
if wait == False or self._handle != None:
if not wait or self._handle is not None:
break
context = usb1.USBContext() # New context needed so new devices show up
assert(self._handle != None)
assert(self._handle is not None)
print("connected")
def reset(self, enter_bootstub=False, enter_bootloader=False):
@ -504,7 +503,6 @@ class Panda(object):
while True:
try:
#print("DAT: %s"%b''.join(snds).__repr__())
if self.wifi:
for s in snds:
self._handle.bulkWrite(3, s)

View File

@ -43,7 +43,7 @@ class PandaDFU(object):
@staticmethod
def st_serial_to_dfu_serial(st):
if st == None or st == "none":
if st is None or st == "none":
return None
uid_base = struct.unpack("H" * 6, bytes.fromhex(st))
return binascii.hexlify(struct.pack("!HHH", uid_base[1] + uid_base[5], uid_base[0] + uid_base[4] + 0xA, uid_base[3])).upper().decode("utf-8")
@ -69,7 +69,7 @@ class PandaDFU(object):
self.status()
def program(self, address, dat, block_size=None):
if block_size == None:
if block_size is None:
block_size = len(dat)
# Set Address Pointer

View File

@ -18,6 +18,8 @@
# this program; if not, write to the Free Software Foundation, Inc., 51 Franklin
# Street, Fifth Floor, Boston, MA 02110-1301 USA.
# flake8: noqa
import argparse
import hashlib
import inspect

View File

@ -13,7 +13,7 @@ def flash_release(path=None, st_serial=None):
def status(x):
print("\033[1;32;40m" + x + "\033[00m")
if st_serial == None:
if st_serial is not None:
# look for Panda
panda_list = Panda.list()
if len(panda_list) == 0:
@ -23,7 +23,7 @@ def flash_release(path=None, st_serial=None):
st_serial = panda_list[0]
print("Using panda with serial %s" % st_serial)
if path == None:
if path is not None:
print("Fetching latest firmware from github.com/commaai/panda-artifacts")
r = requests.get("https://raw.githubusercontent.com/commaai/panda-artifacts/master/latest.json")
url = json.loads(r.text)['url']
@ -67,7 +67,10 @@ def flash_release(path=None, st_serial=None):
# flashing ESP
if panda.is_white():
status("4. Flashing ESP (slow!)")
align = lambda x, sz=0x1000: x+"\xFF"*((sz-len(x)) % sz)
def align(x, sz=0x1000):
x + "\xFF" * ((sz - len(x)) % sz)
esp = ESPROM(st_serial)
esp.connect()
flasher = CesantaFlasher(esp, 230400)
@ -95,4 +98,3 @@ def flash_release(path=None, st_serial=None):
if __name__ == "__main__":
flash_release(*sys.argv[1:])

View File

@ -134,4 +134,3 @@ def isotp_recv(panda, addr, bus=0, sendaddr=None, subaddr=None):
print("R:", binascii.hexlify(dat))
return dat

View File

@ -7,21 +7,16 @@ class PandaSerial(object):
self.panda.set_uart_baud(self.port, baud)
self.buf = b""
def read(self, l=1):
def read(self, l=1): # noqa: E741
tt = self.panda.serial_read(self.port)
if len(tt) > 0:
#print "R: ", tt.encode("hex")
self.buf += tt
ret = self.buf[0:l]
self.buf = self.buf[l:]
return ret
def write(self, dat):
#print "W: ", dat.encode("hex")
#print ' pigeon_send("' + ''.join(map(lambda x: "\\x%02X" % ord(x), dat)) + '");'
return self.panda.serial_write(self.port, dat)
def close(self):
pass

View File

@ -286,14 +286,16 @@ class CanClient():
if self.tx_addr == 0x7DF:
is_response = addr >= 0x7E8 and addr <= 0x7EF
if is_response:
if self.debug: print(f"switch to physical addr {hex(addr)}")
if self.debug:
print(f"switch to physical addr {hex(addr)}")
self.tx_addr = addr - 8
self.rx_addr = addr
return is_response
if self.tx_addr == 0x18DB33F1:
is_response = addr >= 0x18DAF100 and addr <= 0x18DAF1FF
if is_response:
if self.debug: print(f"switch to physical addr {hex(addr)}")
if self.debug:
print(f"switch to physical addr {hex(addr)}")
self.tx_addr = 0x18DA00F1 + (addr << 8 & 0xFF00)
self.rx_addr = addr
return bus == self.bus and addr == self.rx_addr
@ -302,14 +304,16 @@ class CanClient():
while True:
msgs = self.rx()
if drain:
if self.debug: print("CAN-RX: drain - {}".format(len(msgs)))
if self.debug:
print("CAN-RX: drain - {}".format(len(msgs)))
self.rx_buff.clear()
else:
for rx_addr, rx_ts, rx_data, rx_bus in msgs or []:
if self._recv_filter(rx_bus, rx_addr) and len(rx_data) > 0:
rx_data = bytes(rx_data) # convert bytearray to bytes
if self.debug: print(f"CAN-RX: {hex(rx_addr)} - 0x{bytes.hex(rx_data)}")
if self.debug:
print(f"CAN-RX: {hex(rx_addr)} - 0x{bytes.hex(rx_data)}")
# Cut off sub addr in first byte
if self.sub_addr is not None:
@ -333,13 +337,15 @@ class CanClient():
def send(self, msgs: List[bytes], delay: float = 0) -> None:
for i, msg in enumerate(msgs):
if delay and i != 0:
if self.debug: print(f"CAN-TX: delay - {delay}")
if self.debug:
print(f"CAN-TX: delay - {delay}")
time.sleep(delay)
if self.sub_addr is not None:
msg = bytes([self.sub_addr]) + msg
if self.debug: print(f"CAN-TX: {hex(self.tx_addr)} - 0x{bytes.hex(msg)}")
if self.debug:
print(f"CAN-TX: {hex(self.tx_addr)} - 0x{bytes.hex(msg)}")
assert len(msg) <= 8
self.tx(self.tx_addr, msg, self.bus)
@ -368,18 +374,21 @@ class IsoTpMessage():
self.rx_idx = 0
self.rx_done = False
if self.debug: print(f"ISO-TP: REQUEST - 0x{bytes.hex(self.tx_dat)}")
if self.debug:
print(f"ISO-TP: REQUEST - 0x{bytes.hex(self.tx_dat)}")
self._tx_first_frame()
def _tx_first_frame(self) -> None:
if self.tx_len < self.max_len:
# single frame (send all bytes)
if self.debug: print("ISO-TP: TX - single frame")
if self.debug:
print("ISO-TP: TX - single frame")
msg = (bytes([self.tx_len]) + self.tx_dat).ljust(self.max_len, b"\x00")
self.tx_done = True
else:
# first frame (send first 6 bytes)
if self.debug: print("ISO-TP: TX - first frame")
if self.debug:
print("ISO-TP: TX - first frame")
msg = (struct.pack("!H", 0x1000 | self.tx_len) + self.tx_dat[:self.max_len - 2]).ljust(self.max_len - 2, b"\x00")
self._can_client.send([msg])
@ -397,7 +406,8 @@ class IsoTpMessage():
if time.time() - start_time > self.timeout:
raise MessageTimeoutError("timeout waiting for response")
finally:
if self.debug and self.rx_dat: print(f"ISO-TP: RESPONSE - 0x{bytes.hex(self.rx_dat)}")
if self.debug and self.rx_dat:
print(f"ISO-TP: RESPONSE - 0x{bytes.hex(self.rx_dat)}")
def _isotp_rx_next(self, rx_data: bytes) -> None:
# single rx_frame
@ -406,7 +416,8 @@ class IsoTpMessage():
self.rx_dat = rx_data[1:1 + self.rx_len]
self.rx_idx = 0
self.rx_done = True
if self.debug: print(f"ISO-TP: RX - single frame - idx={self.rx_idx} done={self.rx_done}")
if self.debug:
print(f"ISO-TP: RX - single frame - idx={self.rx_idx} done={self.rx_done}")
return
# first rx_frame
@ -415,8 +426,10 @@ class IsoTpMessage():
self.rx_dat = rx_data[2:]
self.rx_idx = 0
self.rx_done = False
if self.debug: print(f"ISO-TP: RX - first frame - idx={self.rx_idx} done={self.rx_done}")
if self.debug: print("ISO-TP: TX - flow control continue")
if self.debug:
print(f"ISO-TP: RX - first frame - idx={self.rx_idx} done={self.rx_done}")
if self.debug:
print("ISO-TP: TX - flow control continue")
# send flow control message (send all bytes)
msg = b"\x30\x00\x00".ljust(self.max_len, b"\x00")
self._can_client.send([msg])
@ -424,23 +437,25 @@ class IsoTpMessage():
# consecutive rx frame
if rx_data[0] >> 4 == 0x2:
assert self.rx_done == False, "isotp - rx: consecutive frame with no active frame"
assert not self.rx_done, "isotp - rx: consecutive frame with no active frame"
self.rx_idx += 1
assert self.rx_idx & 0xF == rx_data[0] & 0xF, "isotp - rx: invalid consecutive frame index"
rx_size = self.rx_len - len(self.rx_dat)
self.rx_dat += rx_data[1:1 + rx_size]
if self.rx_len == len(self.rx_dat):
self.rx_done = True
if self.debug: print(f"ISO-TP: RX - consecutive frame - idx={self.rx_idx} done={self.rx_done}")
if self.debug:
print(f"ISO-TP: RX - consecutive frame - idx={self.rx_idx} done={self.rx_done}")
return
# flow control
if rx_data[0] >> 4 == 0x3:
assert self.tx_done == False, "isotp - rx: flow control with no active frame"
assert not self.tx_done, "isotp - rx: flow control with no active frame"
assert rx_data[0] != 0x32, "isotp - rx: flow-control overflow/abort"
assert rx_data[0] == 0x30 or rx_data[0] == 0x31, "isotp - rx: flow-control transfer state indicator invalid"
if rx_data[0] == 0x30:
if self.debug: print("ISO-TP: RX - flow control continue")
if self.debug:
print("ISO-TP: RX - flow control continue")
delay_ts = rx_data[2] & 0x7F
# scale is 1 milliseconds if first bit == 0, 100 micro seconds if first bit == 1
delay_div = 1000. if rx_data[2] & 0x80 == 0 else 10000.
@ -461,10 +476,12 @@ class IsoTpMessage():
self._can_client.send(tx_msgs, delay=delay_sec)
if end >= self.tx_len:
self.tx_done = True
if self.debug: print(f"ISO-TP: TX - consecutive frame - idx={self.tx_idx} done={self.tx_done}")
if self.debug:
print(f"ISO-TP: TX - consecutive frame - idx={self.tx_idx} done={self.tx_done}")
elif rx_data[0] == 0x31:
# wait (do nothing until next flow control message)
if self.debug: print("ISO-TP: TX - flow control wait")
if self.debug:
print("ISO-TP: TX - flow control wait")
FUNCTIONAL_ADDRS = [0x7DF, 0x18DB33F1]
@ -525,7 +542,8 @@ class UdsClient():
error_desc = resp[3:].hex()
# wait for another message if response pending
if error_code == 0x78:
if self.debug: print("UDS-RX: response pending")
if self.debug:
print("UDS-RX: response pending")
continue
raise NegativeResponseError('{} - {}'.format(service_desc, error_desc), service_id, error_code)
@ -574,10 +592,8 @@ class UdsClient():
def access_timing_parameter(self, timing_parameter_type: TIMING_PARAMETER_TYPE, parameter_values: bytes = None):
write_custom_values = timing_parameter_type == TIMING_PARAMETER_TYPE.SET_TO_GIVEN_VALUES
read_values = (
timing_parameter_type == TIMING_PARAMETER_TYPE.READ_CURRENTLY_ACTIVE or
timing_parameter_type == TIMING_PARAMETER_TYPE.READ_EXTENDED_SET
)
read_values = (timing_parameter_type == TIMING_PARAMETER_TYPE.READ_CURRENTLY_ACTIVE or
timing_parameter_type == TIMING_PARAMETER_TYPE.READ_EXTENDED_SET)
if not write_custom_values and parameter_values is not None:
raise ValueError('parameter_values not allowed')
if write_custom_values and parameter_values is None:

View File

@ -42,4 +42,3 @@ def ensure_st_up_to_date():
if __name__ == "__main__":
ensure_st_up_to_date()

View File

@ -5,4 +5,3 @@ def test_build_panda():
def test_build_bootstub_panda():
build_st("obj/bootstub.panda.bin")

View File

@ -12,7 +12,7 @@ import random
import argparse
sys.path.append(os.path.join(os.path.dirname(os.path.realpath(__file__)), ".."))
from panda import Panda
from panda import Panda # noqa: E402
def get_test_string():
return b"test" + os.urandom(10)

View File

@ -12,7 +12,7 @@ import random
import argparse
sys.path.append(os.path.join(os.path.dirname(os.path.realpath(__file__)), ".."))
from panda import Panda
from panda import Panda # noqa: E402
def get_test_string():
return b"test" + os.urandom(10)

View File

@ -12,7 +12,7 @@ import random
import argparse
sys.path.append(os.path.join(os.path.dirname(os.path.realpath(__file__)), ".."))
from panda import Panda
from panda import Panda # noqa: E402
def get_test_string():
return b"test" + os.urandom(10)

View File

@ -11,7 +11,7 @@ import random
import argparse
sys.path.append(os.path.join(os.path.dirname(os.path.realpath(__file__)), ".."))
from panda import Panda
from panda import Panda # noqa: E402
def get_test_string():
return b"test" + os.urandom(10)
@ -25,7 +25,6 @@ def run_test(sleep_duration):
global counter, open_errors, closed_errors, content_errors
pandas = Panda.list()
#pandas = ["540046000c51363338383037", "07801b800f51363038363036"]
print(pandas)
# make sure two pandas are connected

View File

@ -7,7 +7,7 @@ from collections import defaultdict
import binascii
sys.path.append(os.path.join(os.path.dirname(os.path.realpath(__file__)), ".."))
from panda import Panda
from panda import Panda # noqa: E402
# fake
def sec_since_boot():

View File

@ -6,7 +6,7 @@ import time
import select
sys.path.append(os.path.join(os.path.dirname(os.path.realpath(__file__)), ".."))
from panda import Panda
from panda import Panda # noqa: E402
setcolor = ["\033[1;32;40m", "\033[1;31;40m"]
unsetcolor = "\033[00m"
@ -46,4 +46,4 @@ if __name__ == "__main__":
time.sleep(0.01)
except Exception:
print("panda disconnected!")
time.sleep(0.5);
time.sleep(0.5)

View File

@ -1,4 +1,3 @@
#!/usr/bin/env python3
from panda import Panda
Panda().set_esp_power(False)

View File

@ -5,7 +5,7 @@ import time
import _thread
sys.path.append(os.path.join(os.path.dirname(os.path.realpath(__file__)), ".."))
from panda import Panda
from panda import Panda # noqa: E402
# This script is intended to be used in conjunction with the echo_loopback_test.py test script from panda jungle.
# It sends a reversed response back for every message received containing b"test".
@ -15,7 +15,7 @@ def heartbeat_thread(p):
try:
p.send_heartbeat()
time.sleep(1)
except:
except Exception:
break
# Resend every CAN message that has been received on the same bus, but with the data reversed
@ -31,5 +31,3 @@ if __name__ == "__main__":
address, notused, data, bus = message
if b'test' in data:
p.can_send(address, data[::-1], bus)

View File

@ -1,4 +1,6 @@
#!/usr/bin/env python3
# flake8: noqa
"""Used to Reverse/Test ELM protocol auto detect and OBD message response without a car."""
import sys
@ -10,7 +12,7 @@ import threading
from collections import deque
sys.path.append(os.path.join(os.path.dirname(os.path.realpath(__file__)), ".."))
from panda import Panda
from panda import Panda # noqa: E402
def lin_checksum(dat):
return sum(dat) % 0x100
@ -225,7 +227,7 @@ class ELMCarSimulator():
outmsg = None
if data[:3] == b'\x30\x00\x00' and len(self.__can_multipart_data):
if not self.__silent:
print("Request for more data");
print("Request for more data")
outaddr = 0x7E8 if address == 0x7DF or address == 0x7E0 else 0x18DAF110
msgnum = 1
while(self.__can_multipart_data):
@ -312,6 +314,7 @@ if __name__ == "__main__":
sim.can_mode_29b()
import signal
def signal_handler(signal, frame):
print('\nShutting down simulator')
sim.stop()

View File

@ -31,8 +31,6 @@ def send_msg(s, msg):
if __name__ == "__main__":
s = socket.create_connection(("192.168.0.10", 35000))
#t1 = Reader(s)
#t1.start()
send_msg(s, b"ATZ\r")
send_msg(s, b"ATL1\r")
print(send_msg(s, b"ATE0\r"))

View File

@ -1,3 +1,5 @@
# flake8: noqa
import os
import sys
import time
@ -32,7 +34,7 @@ def send_compare(s, dat, ret, timeout=4):
ready = select.select([s], [], [], timeout)
if not ready[0]:
print("current recv data:", repr(res))
break;
break
res += s.recv(1000)
#print("final recv data: '%s'" % repr(res))
assert ret == res # , "Data does not agree (%s) (%s)"%(repr(ret), repr(res))
@ -299,7 +301,7 @@ def test_elm_panda_safety_mode_KWPFast():
p_car.kline_drain()
p_elm = Panda("WIFI")
p_elm.set_safety_mode(Panda.SAFETY_ELM327);
p_elm.set_safety_mode(Panda.SAFETY_ELM327)
def get_checksum(dat):
result = 0
@ -623,7 +625,7 @@ def test_elm_panda_safety_mode_ISO15765():
p_car.set_safety_mode(Panda.SAFETY_ALLOUTPUT)
p_elm = Panda("WIFI")
p_elm.set_safety_mode(Panda.SAFETY_ELM327);
p_elm.set_safety_mode(Panda.SAFETY_ELM327)
#sim = elm_car_simulator.ELMCarSimulator(serial, lin=False)
#sim.start()

View File

@ -4,7 +4,7 @@ import sys
import time
sys.path.append(os.path.join(os.path.dirname(os.path.realpath(__file__)), ".."))
from panda import Panda
from panda import Panda # noqa: E402
power = 0
if __name__ == "__main__":

View File

@ -5,5 +5,3 @@ if __name__ == "__main__":
for p in Panda.list():
pp = Panda(p)
print("%s: %s" % (pp.get_serial()[0], pp.get_version()))

View File

@ -33,4 +33,3 @@ plt.show()
#data = (data - 130.0 - voltoffset/voltscale*25) / 25 * voltscale
print(data)

View File

@ -30,4 +30,3 @@ while 1:
time.sleep(0.01)
print(p2.can_recv())
#exit(0)

View File

@ -20,4 +20,3 @@ while 1:
iden += 1
p.can_send(iden, dat, bus=3)
time.sleep(0.01)

View File

@ -7,7 +7,7 @@ import random
import threading
sys.path.append(os.path.join(os.path.dirname(os.path.realpath(__file__)), ".."))
from panda import Panda, PandaSerial
from panda import Panda, PandaSerial # noqa: E402
INIT_GPS_BAUD = 9600
GPS_BAUD = 460800
@ -116,10 +116,9 @@ def gps_read_thread(panda):
while True:
ret = ser.read(1024)
time.sleep(0.001)
l = len(ret)
if l > 0:
if len(ret):
received_messages += 1
received_bytes+=l
received_bytes += len(ret)
if send_something:
ser.write("test")
send_something = False

View File

@ -16,4 +16,3 @@ if __name__ == "__main__":
print(panda.health())
print("\n")
time.sleep(0.5)

View File

@ -4,7 +4,7 @@ import sys
import time
sys.path.append(os.path.join(os.path.dirname(os.path.realpath(__file__)), ".."))
from panda import Panda
from panda import Panda # noqa: E402
power = 0
if __name__ == "__main__":

View File

@ -4,7 +4,7 @@ import time
import sys
sys.path.append(os.path.join(os.path.dirname(os.path.realpath(__file__)), ".."))
from panda import Panda, PandaSerial
from panda import Panda, PandaSerial # noqa: 402
def add_nmea_checksum(msg):
d = msg[1:]
@ -43,5 +43,3 @@ if __name__ == "__main__":
if len(ret) > 0:
sys.stdout.write(ret.decode('ascii', 'ignore'))
sys.stdout.flush()
#print str(ret).encode("hex")

View File

@ -10,7 +10,7 @@ from hexdump import hexdump
from itertools import permutations
sys.path.append(os.path.join(os.path.dirname(os.path.realpath(__file__)), ".."))
from panda import Panda
from panda import Panda # noqa: E402
def get_test_string():
return b"test" + os.urandom(10)

View File

@ -10,7 +10,6 @@ class CanHandle(object):
self.p = p
def transact(self, dat):
#print "W:",dat.encode("hex")
self.p.isotp_send(1, dat, 0, recvaddr=2)
def _handle_timeout(signum, frame):
@ -24,7 +23,6 @@ class CanHandle(object):
finally:
signal.alarm(0)
#print "R:",ret.encode("hex")
return ret
def controlWrite(self, request_type, request, value, index, data, timeout=0):
@ -71,5 +69,3 @@ if __name__ == "__main__":
Panda.flash_static(CanHandle(p), code)
print("can flash done")

View File

@ -10,20 +10,24 @@ if __name__ == "__main__":
len = p._handle.controlRead(Panda.REQUEST_IN, 0x06, 3 << 8 | 238, 0, 1)
print('Microsoft OS String Descriptor')
dat = p._handle.controlRead(Panda.REQUEST_IN, 0x06, 3 << 8 | 238, 0, len[0])
if DEBUG: print('LEN: {}'.format(hex(len[0])))
if DEBUG:
print('LEN: {}'.format(hex(len[0])))
hexdump("".join(map(chr, dat)))
ms_vendor_code = dat[16]
if DEBUG: print('MS_VENDOR_CODE: {}'.format(hex(len[0])))
if DEBUG:
print('MS_VENDOR_CODE: {}'.format(hex(len[0])))
print('\nMicrosoft Compatible ID Feature Descriptor')
len = p._handle.controlRead(Panda.REQUEST_IN, ms_vendor_code, 0, 4, 1)
if DEBUG: print('LEN: {}'.format(hex(len[0])))
if DEBUG:
print('LEN: {}'.format(hex(len[0])))
dat = p._handle.controlRead(Panda.REQUEST_IN, ms_vendor_code, 0, 4, len[0])
hexdump("".join(map(chr, dat)))
print('\nMicrosoft Extended Properties Feature Descriptor')
len = p._handle.controlRead(Panda.REQUEST_IN, ms_vendor_code, 0, 5, 1)
if DEBUG: print('LEN: {}'.format(hex(len[0])))
if DEBUG:
print('LEN: {}'.format(hex(len[0])))
dat = p._handle.controlRead(Panda.REQUEST_IN, ms_vendor_code, 0, 5, len[0])
hexdump("".join(map(chr, dat)))

View File

@ -4,7 +4,7 @@ import sys
import datetime
sys.path.append(os.path.join(os.path.dirname(os.path.realpath(__file__)), ".."))
from panda import Panda
from panda import Panda # noqa: E402
if __name__ == "__main__":
p = Panda()

View File

@ -39,9 +39,14 @@ class TestGmSafety(common.PandaSafetyTest):
self.safety.init_tests()
# override these tests from PandaSafetyTest, GM uses button enable
def test_disable_control_allowed_from_cruise(self): pass
def test_enable_control_allowed_from_cruise(self): pass
def test_cruise_engaged_prev(self): pass
def test_disable_control_allowed_from_cruise(self):
pass
def test_enable_control_allowed_from_cruise(self):
pass
def test_cruise_engaged_prev(self):
pass
def _speed_msg(self, speed):
values = {"%sWheelSpd" % s: speed for s in ["RL", "RR"]}

View File

@ -5,8 +5,7 @@ import numpy as np
from panda import Panda
from panda.tests.safety import libpandasafety_py
import panda.tests.safety.common as common
from panda.tests.safety.common import CANPackerPanda, make_msg, \
MAX_WRONG_COUNTERS, UNSAFE_MODE
from panda.tests.safety.common import CANPackerPanda, make_msg, MAX_WRONG_COUNTERS, UNSAFE_MODE
class Btn:
CANCEL = 2
@ -36,9 +35,14 @@ class TestHondaSafety(common.PandaSafetyTest):
raise unittest.SkipTest
# override these inherited tests. honda doesn't use pcm enable
def test_disable_control_allowed_from_cruise(self): pass
def test_enable_control_allowed_from_cruise(self): pass
def test_cruise_engaged_prev(self): pass
def test_disable_control_allowed_from_cruise(self):
pass
def test_enable_control_allowed_from_cruise(self):
pass
def test_cruise_engaged_prev(self):
pass
def _speed_msg(self, speed):
values = {"XMISSION_SPEED": speed, "COUNTER": self.cnt_speed % 4}

View File

@ -180,11 +180,18 @@ class TestSubaruLegacySafety(TestSubaruSafety):
self.safety.init_tests()
# subaru legacy safety doesn't have brake checks
def test_prev_brake(self): pass
def test_not_allow_brake_when_moving(self): pass
def test_allow_brake_at_zero_speed(self): pass
def test_prev_brake(self):
pass
def test_not_allow_brake_when_moving(self):
pass
def test_allow_brake_at_zero_speed(self):
pass
# doesn't have speed checks either
def test_sample_speed(self): pass
def test_sample_speed(self):
pass
def _torque_driver_msg(self, torque):
# TODO: figure out if this scaling factor from the DBC is right.

View File

@ -48,7 +48,8 @@ class TestVolkswagenMqbSafety(common.PandaSafetyTest):
self.safety.init_tests()
# override these inherited tests from PandaSafetyTest
def test_cruise_engaged_prev(self): pass
def test_cruise_engaged_prev(self):
pass
def _set_prev_torque(self, t):
self.safety.set_desired_torque_last(t)

View File

@ -53,7 +53,8 @@ class TestVolkswagenPqSafety(common.PandaSafetyTest):
self.safety.init_tests()
# override these inherited tests from PandaSafetyTest
def test_cruise_engaged_prev(self): pass
def test_cruise_engaged_prev(self):
pass
def _set_prev_torque(self, t):
self.safety.set_desired_torque_last(t)

View File

@ -87,4 +87,3 @@ def init_segment(safety, lr, mode):
safety.set_controls_allowed(1)
set_desired_torque_last(safety, mode, torque)
assert safety.safety_tx_hook(to_send), "failed to initialize panda safety for segment"

View File

@ -75,4 +75,3 @@ if __name__ == "__main__":
print("replaying drive %s with safety mode %d and param %d" % (sys.argv[1], mode, param))
replay_drive(lr, mode, param)

View File

@ -1,11 +1,10 @@
#!/usr/bin/env python3
import os
import sys
import random
sys.path.append(os.path.join(os.path.dirname(os.path.realpath(__file__)), ".."))
from panda import Panda
from panda import Panda # noqa: E402
def get_test_string():
return b"test" + os.urandom(10)

View File

@ -5,7 +5,7 @@ import struct
import time
sys.path.append(os.path.join(os.path.dirname(os.path.realpath(__file__)), ".."))
from panda import Panda
from panda import Panda # noqa: E402
if __name__ == "__main__":
if os.getenv("WIFI") is not None:

View File

@ -7,7 +7,7 @@ import time
from tqdm import tqdm
sys.path.append(os.path.join(os.path.dirname(os.path.realpath(__file__)), ".."))
from panda import Panda, PandaWifiStreaming
from panda import Panda, PandaWifiStreaming # noqa: E402
# test throughput between USB and wifi
@ -15,16 +15,11 @@ if __name__ == "__main__":
print(Panda.list())
p_out = Panda("108018800f51363038363036")
print(p_out.get_serial())
#p_in = Panda("02001b000f51363038363036")
p_in = Panda("WIFI")
print(p_in.get_serial())
p_in = PandaWifiStreaming() # type: ignore
#while True:
# p_in.can_recv()
#sys.exit(0)
p_out.set_safety_mode(Panda.SAFETY_ALLOUTPUT)
set_out, set_in = set(), set()

View File

@ -10,7 +10,7 @@ from hexdump import hexdump
from itertools import permutations
sys.path.append(os.path.join(os.path.dirname(os.path.realpath(__file__)), ".."))
from panda import Panda
from panda import Panda # noqa: E402
def get_test_string():
return b"test" + os.urandom(10)