DFU fix (#288)
* DFU fix * fix test 2 * this should fix all the remaining jenkins test * Fixed pyenv shim not being a python file, but a sh scriptmaster
parent
70219d7bb2
commit
af0960ad3d
|
@ -3,6 +3,7 @@ import os
|
|||
import usb1
|
||||
import struct
|
||||
import time
|
||||
import binascii
|
||||
|
||||
# *** DFU mode ***
|
||||
|
||||
|
@ -46,28 +47,27 @@ class PandaDFU(object):
|
|||
def st_serial_to_dfu_serial(st):
|
||||
if st == None or st == "none":
|
||||
return None
|
||||
uid_base = struct.unpack("H"*6, st.decode("hex"))
|
||||
return struct.pack("!HHH", uid_base[1] + uid_base[5], uid_base[0] + uid_base[4] + 0xA, uid_base[3]).encode("hex").upper()
|
||||
|
||||
uid_base = struct.unpack("H"*6, bytes.fromhex(st))
|
||||
return binascii.hexlify(struct.pack("!HHH", uid_base[1] + uid_base[5], uid_base[0] + uid_base[4] + 0xA, uid_base[3])).upper().decode("utf-8")
|
||||
|
||||
def status(self):
|
||||
while 1:
|
||||
dat = str(self._handle.controlRead(0x21, DFU_GETSTATUS, 0, 0, 6))
|
||||
if dat[1] == "\x00":
|
||||
dat = self._handle.controlRead(0x21, DFU_GETSTATUS, 0, 0, 6)
|
||||
if dat[1] == 0:
|
||||
break
|
||||
|
||||
def clear_status(self):
|
||||
# Clear status
|
||||
stat = str(self._handle.controlRead(0x21, DFU_GETSTATUS, 0, 0, 6))
|
||||
if stat[4] == "\x0a":
|
||||
stat = self._handle.controlRead(0x21, DFU_GETSTATUS, 0, 0, 6)
|
||||
if stat[4] == 0xa:
|
||||
self._handle.controlRead(0x21, DFU_CLRSTATUS, 0, 0, 0)
|
||||
elif stat[4] == "\x09":
|
||||
self._handle.controlWrite(0x21, DFU_ABORT, 0, 0, "")
|
||||
elif stat[4] == 0x9:
|
||||
self._handle.controlWrite(0x21, DFU_ABORT, 0, 0, b"")
|
||||
self.status()
|
||||
stat = str(self._handle.controlRead(0x21, DFU_GETSTATUS, 0, 0, 6))
|
||||
|
||||
def erase(self, address):
|
||||
self._handle.controlWrite(0x21, DFU_DNLOAD, 0, 0, "\x41" + struct.pack("I", address))
|
||||
self._handle.controlWrite(0x21, DFU_DNLOAD, 0, 0, b"\x41" + struct.pack("I", address))
|
||||
self.status()
|
||||
|
||||
def program(self, address, dat, block_size=None):
|
||||
|
@ -75,12 +75,12 @@ class PandaDFU(object):
|
|||
block_size = len(dat)
|
||||
|
||||
# Set Address Pointer
|
||||
self._handle.controlWrite(0x21, DFU_DNLOAD, 0, 0, "\x21" + struct.pack("I", address))
|
||||
self._handle.controlWrite(0x21, DFU_DNLOAD, 0, 0, b"\x21" + struct.pack("I", address))
|
||||
self.status()
|
||||
|
||||
# Program
|
||||
dat += "\xFF"*((block_size-len(dat)) % block_size)
|
||||
for i in range(0, len(dat)/block_size):
|
||||
dat += b"\xFF"*((block_size-len(dat)) % block_size)
|
||||
for i in range(0, len(dat)//block_size):
|
||||
ldat = dat[i*block_size:(i+1)*block_size]
|
||||
print("programming %d with length %d" % (i, len(ldat)))
|
||||
self._handle.controlWrite(0x21, DFU_DNLOAD, 2+i, 0, ldat)
|
||||
|
@ -105,17 +105,17 @@ class PandaDFU(object):
|
|||
build_st(fn)
|
||||
fn = os.path.join(BASEDIR, "board", fn)
|
||||
|
||||
with open(fn) as f:
|
||||
with open(fn, "rb") as f:
|
||||
code = f.read()
|
||||
|
||||
self.program_bootstub(code)
|
||||
|
||||
def reset(self):
|
||||
# **** Reset ****
|
||||
self._handle.controlWrite(0x21, DFU_DNLOAD, 0, 0, "\x21" + struct.pack("I", 0x8000000))
|
||||
self._handle.controlWrite(0x21, DFU_DNLOAD, 0, 0, b"\x21" + struct.pack("I", 0x8000000))
|
||||
self.status()
|
||||
try:
|
||||
self._handle.controlWrite(0x21, DFU_DNLOAD, 2, 0, "")
|
||||
self._handle.controlWrite(0x21, DFU_DNLOAD, 2, 0, b"")
|
||||
stat = str(self._handle.controlRead(0x21, DFU_GETSTATUS, 0, 0, 6))
|
||||
except Exception:
|
||||
pass
|
||||
|
|
|
@ -18,4 +18,4 @@ do
|
|||
nmcli connection delete "$NAME"
|
||||
done
|
||||
|
||||
PYTHONPATH="." python $(which nosetests) -v --with-xunit --xunit-file=./$TEST_FILENAME --xunit-testsuite-name=$TESTSUITE_NAME -s $TEST_SCRIPTS
|
||||
PYTHONPATH="." $(which nosetests) -v --with-xunit --xunit-file=./$TEST_FILENAME --xunit-testsuite-name=$TESTSUITE_NAME -s $TEST_SCRIPTS
|
||||
|
|
|
@ -25,7 +25,7 @@ def test_can_loopback(p):
|
|||
p.set_can_speed_kbps(bus, 250)
|
||||
|
||||
# send a message on bus 0
|
||||
p.can_send(0x1aa, "message", bus)
|
||||
p.can_send(0x1aa, b"message", bus)
|
||||
|
||||
# confirm receive both on loopback and send receipt
|
||||
time.sleep(0.05)
|
||||
|
@ -37,7 +37,7 @@ def test_can_loopback(p):
|
|||
|
||||
# confirm data is correct
|
||||
assert 0x1aa == sr[0][0] == lb[0][0]
|
||||
assert "message" == sr[0][2] == lb[0][2]
|
||||
assert b"message" == sr[0][2] == lb[0][2]
|
||||
|
||||
@test_all_pandas
|
||||
@panda_connect_and_init
|
||||
|
@ -49,7 +49,7 @@ def test_safety_nooutput(p):
|
|||
p.set_can_loopback(True)
|
||||
|
||||
# send a message on bus 0
|
||||
p.can_send(0x1aa, "message", 0)
|
||||
p.can_send(0x1aa, b"message", 0)
|
||||
|
||||
# confirm receive nothing
|
||||
time.sleep(0.05)
|
||||
|
@ -68,7 +68,7 @@ def test_reliability(p):
|
|||
p.set_can_speed_kbps(0, 1000)
|
||||
|
||||
addrs = list(range(100, 100+MSG_COUNT))
|
||||
ts = [(j, 0, "\xaa"*8, 0) for j in addrs]
|
||||
ts = [(j, 0, b"\xaa"*8, 0) for j in addrs]
|
||||
|
||||
# 100 loops
|
||||
for i in range(LOOP_COUNT):
|
||||
|
@ -182,4 +182,4 @@ def test_gmlan_bad_toggle(p):
|
|||
def test_serial_debug(p):
|
||||
junk = p.serial_read(Panda.SERIAL_DEBUG)
|
||||
p.call_control_api(0xc0)
|
||||
assert(p.serial_read(Panda.SERIAL_DEBUG).startswith("can "))
|
||||
assert(p.serial_read(Panda.SERIAL_DEBUG).startswith(b"can "))
|
||||
|
|
|
@ -18,7 +18,7 @@ def test_send_recv(p_send, p_recv):
|
|||
assert not p_send.legacy
|
||||
assert not p_recv.legacy
|
||||
|
||||
p_send.can_send_many([(0x1ba, 0, "message", 0)]*2)
|
||||
p_send.can_send_many([(0x1ba, 0, b"message", 0)]*2)
|
||||
time.sleep(0.05)
|
||||
p_recv.can_recv()
|
||||
p_send.can_recv()
|
||||
|
@ -55,7 +55,7 @@ def test_latency(p_send, p_recv):
|
|||
p_recv.set_can_speed_kbps(0, 100)
|
||||
time.sleep(0.05)
|
||||
|
||||
p_send.can_send_many([(0x1ba, 0, "testmsg", 0)]*10)
|
||||
p_send.can_send_many([(0x1ba, 0, b"testmsg", 0)]*10)
|
||||
time.sleep(0.05)
|
||||
p_recv.can_recv()
|
||||
p_send.can_recv()
|
||||
|
@ -80,7 +80,7 @@ def test_latency(p_send, p_recv):
|
|||
|
||||
for i in range(num_messages):
|
||||
st = time.time()
|
||||
p_send.can_send(0x1ab, "message", bus)
|
||||
p_send.can_send(0x1ab, b"message", bus)
|
||||
r = []
|
||||
while len(r) < 1 and (time.time() - st) < 5:
|
||||
r = p_recv.can_recv()
|
||||
|
@ -127,7 +127,7 @@ def test_black_loopback(panda0, panda1):
|
|||
panda1.set_can_loopback(False)
|
||||
|
||||
# clear stuff
|
||||
panda0.can_send_many([(0x1ba, 0, "testmsg", 0)]*10)
|
||||
panda0.can_send_many([(0x1ba, 0, b"testmsg", 0)]*10)
|
||||
time.sleep(0.05)
|
||||
panda0.can_recv()
|
||||
panda1.can_recv()
|
||||
|
@ -155,7 +155,7 @@ def test_black_loopback(panda0, panda1):
|
|||
def _test_buses(send_panda, recv_panda, _test_array):
|
||||
for send_bus, send_obd, recv_obd, recv_buses in _test_array:
|
||||
print("\nSend bus:", send_bus, " Send OBD:", send_obd, " Recv OBD:", recv_obd)
|
||||
|
||||
|
||||
# set OBD on pandas
|
||||
send_panda.set_gmlan(True if send_obd else None)
|
||||
recv_panda.set_gmlan(True if recv_obd else None)
|
||||
|
@ -180,7 +180,7 @@ def test_black_loopback(panda0, panda1):
|
|||
loop_buses.append(loop[3])
|
||||
if len(cans_loop) == 0:
|
||||
print(" No loop")
|
||||
|
||||
|
||||
# test loop buses
|
||||
recv_buses.sort()
|
||||
loop_buses.sort()
|
||||
|
@ -192,4 +192,4 @@ def test_black_loopback(panda0, panda1):
|
|||
print("***************** TESTING (0 --> 1) *****************")
|
||||
_test_buses(panda0, panda1, test_array)
|
||||
print("***************** TESTING (1 --> 0) *****************")
|
||||
_test_buses(panda1, panda0, test_array)
|
||||
_test_buses(panda1, panda0, test_array)
|
||||
|
|
|
@ -140,7 +140,7 @@ def time_many_sends(p, bus, precv=None, msg_count=100, msg_id=None, two_pandas=F
|
|||
raise ValueError("Cannot have two pandas that are the same panda")
|
||||
|
||||
st = time.time()
|
||||
p.can_send_many([(msg_id, 0, "\xaa"*8, bus)]*msg_count)
|
||||
p.can_send_many([(msg_id, 0, b"\xaa"*8, bus)]*msg_count)
|
||||
r = []
|
||||
r_echo = []
|
||||
r_len_expected = msg_count if two_pandas else msg_count*2
|
||||
|
|
Loading…
Reference in New Issue