From af0960ad3df3b7d712ec3858d2db773c843d0dfa Mon Sep 17 00:00:00 2001 From: rbiasini Date: Thu, 3 Oct 2019 19:46:28 -0700 Subject: [PATCH] DFU fix (#288) * DFU fix * fix test 2 * this should fix all the remaining jenkins test * Fixed pyenv shim not being a python file, but a sh script --- python/dfu.py | 32 ++++++++++++++++---------------- run_automated_tests.sh | 2 +- tests/automated/2_usb_to_can.py | 10 +++++----- tests/automated/6_two_panda.py | 14 +++++++------- tests/automated/helpers.py | 2 +- 5 files changed, 30 insertions(+), 30 deletions(-) diff --git a/python/dfu.py b/python/dfu.py index b3bf05d..bbab812 100644 --- a/python/dfu.py +++ b/python/dfu.py @@ -3,6 +3,7 @@ import os import usb1 import struct import time +import binascii # *** DFU mode *** @@ -46,28 +47,27 @@ class PandaDFU(object): def st_serial_to_dfu_serial(st): if st == None or st == "none": return None - uid_base = struct.unpack("H"*6, st.decode("hex")) - return struct.pack("!HHH", uid_base[1] + uid_base[5], uid_base[0] + uid_base[4] + 0xA, uid_base[3]).encode("hex").upper() - + uid_base = struct.unpack("H"*6, bytes.fromhex(st)) + return binascii.hexlify(struct.pack("!HHH", uid_base[1] + uid_base[5], uid_base[0] + uid_base[4] + 0xA, uid_base[3])).upper().decode("utf-8") def status(self): while 1: - dat = str(self._handle.controlRead(0x21, DFU_GETSTATUS, 0, 0, 6)) - if dat[1] == "\x00": + dat = self._handle.controlRead(0x21, DFU_GETSTATUS, 0, 0, 6) + if dat[1] == 0: break def clear_status(self): # Clear status - stat = str(self._handle.controlRead(0x21, DFU_GETSTATUS, 0, 0, 6)) - if stat[4] == "\x0a": + stat = self._handle.controlRead(0x21, DFU_GETSTATUS, 0, 0, 6) + if stat[4] == 0xa: self._handle.controlRead(0x21, DFU_CLRSTATUS, 0, 0, 0) - elif stat[4] == "\x09": - self._handle.controlWrite(0x21, DFU_ABORT, 0, 0, "") + elif stat[4] == 0x9: + self._handle.controlWrite(0x21, DFU_ABORT, 0, 0, b"") self.status() stat = str(self._handle.controlRead(0x21, DFU_GETSTATUS, 0, 0, 6)) def erase(self, address): - self._handle.controlWrite(0x21, DFU_DNLOAD, 0, 0, "\x41" + struct.pack("I", address)) + self._handle.controlWrite(0x21, DFU_DNLOAD, 0, 0, b"\x41" + struct.pack("I", address)) self.status() def program(self, address, dat, block_size=None): @@ -75,12 +75,12 @@ class PandaDFU(object): block_size = len(dat) # Set Address Pointer - self._handle.controlWrite(0x21, DFU_DNLOAD, 0, 0, "\x21" + struct.pack("I", address)) + self._handle.controlWrite(0x21, DFU_DNLOAD, 0, 0, b"\x21" + struct.pack("I", address)) self.status() # Program - dat += "\xFF"*((block_size-len(dat)) % block_size) - for i in range(0, len(dat)/block_size): + dat += b"\xFF"*((block_size-len(dat)) % block_size) + for i in range(0, len(dat)//block_size): ldat = dat[i*block_size:(i+1)*block_size] print("programming %d with length %d" % (i, len(ldat))) self._handle.controlWrite(0x21, DFU_DNLOAD, 2+i, 0, ldat) @@ -105,17 +105,17 @@ class PandaDFU(object): build_st(fn) fn = os.path.join(BASEDIR, "board", fn) - with open(fn) as f: + with open(fn, "rb") as f: code = f.read() self.program_bootstub(code) def reset(self): # **** Reset **** - self._handle.controlWrite(0x21, DFU_DNLOAD, 0, 0, "\x21" + struct.pack("I", 0x8000000)) + self._handle.controlWrite(0x21, DFU_DNLOAD, 0, 0, b"\x21" + struct.pack("I", 0x8000000)) self.status() try: - self._handle.controlWrite(0x21, DFU_DNLOAD, 2, 0, "") + self._handle.controlWrite(0x21, DFU_DNLOAD, 2, 0, b"") stat = str(self._handle.controlRead(0x21, DFU_GETSTATUS, 0, 0, 6)) except Exception: pass diff --git a/run_automated_tests.sh b/run_automated_tests.sh index 583d6c1..e876c67 100755 --- a/run_automated_tests.sh +++ b/run_automated_tests.sh @@ -18,4 +18,4 @@ do nmcli connection delete "$NAME" done -PYTHONPATH="." python $(which nosetests) -v --with-xunit --xunit-file=./$TEST_FILENAME --xunit-testsuite-name=$TESTSUITE_NAME -s $TEST_SCRIPTS +PYTHONPATH="." $(which nosetests) -v --with-xunit --xunit-file=./$TEST_FILENAME --xunit-testsuite-name=$TESTSUITE_NAME -s $TEST_SCRIPTS diff --git a/tests/automated/2_usb_to_can.py b/tests/automated/2_usb_to_can.py index eba421e..645f686 100644 --- a/tests/automated/2_usb_to_can.py +++ b/tests/automated/2_usb_to_can.py @@ -25,7 +25,7 @@ def test_can_loopback(p): p.set_can_speed_kbps(bus, 250) # send a message on bus 0 - p.can_send(0x1aa, "message", bus) + p.can_send(0x1aa, b"message", bus) # confirm receive both on loopback and send receipt time.sleep(0.05) @@ -37,7 +37,7 @@ def test_can_loopback(p): # confirm data is correct assert 0x1aa == sr[0][0] == lb[0][0] - assert "message" == sr[0][2] == lb[0][2] + assert b"message" == sr[0][2] == lb[0][2] @test_all_pandas @panda_connect_and_init @@ -49,7 +49,7 @@ def test_safety_nooutput(p): p.set_can_loopback(True) # send a message on bus 0 - p.can_send(0x1aa, "message", 0) + p.can_send(0x1aa, b"message", 0) # confirm receive nothing time.sleep(0.05) @@ -68,7 +68,7 @@ def test_reliability(p): p.set_can_speed_kbps(0, 1000) addrs = list(range(100, 100+MSG_COUNT)) - ts = [(j, 0, "\xaa"*8, 0) for j in addrs] + ts = [(j, 0, b"\xaa"*8, 0) for j in addrs] # 100 loops for i in range(LOOP_COUNT): @@ -182,4 +182,4 @@ def test_gmlan_bad_toggle(p): def test_serial_debug(p): junk = p.serial_read(Panda.SERIAL_DEBUG) p.call_control_api(0xc0) - assert(p.serial_read(Panda.SERIAL_DEBUG).startswith("can ")) + assert(p.serial_read(Panda.SERIAL_DEBUG).startswith(b"can ")) diff --git a/tests/automated/6_two_panda.py b/tests/automated/6_two_panda.py index d15f98e..711b83f 100644 --- a/tests/automated/6_two_panda.py +++ b/tests/automated/6_two_panda.py @@ -18,7 +18,7 @@ def test_send_recv(p_send, p_recv): assert not p_send.legacy assert not p_recv.legacy - p_send.can_send_many([(0x1ba, 0, "message", 0)]*2) + p_send.can_send_many([(0x1ba, 0, b"message", 0)]*2) time.sleep(0.05) p_recv.can_recv() p_send.can_recv() @@ -55,7 +55,7 @@ def test_latency(p_send, p_recv): p_recv.set_can_speed_kbps(0, 100) time.sleep(0.05) - p_send.can_send_many([(0x1ba, 0, "testmsg", 0)]*10) + p_send.can_send_many([(0x1ba, 0, b"testmsg", 0)]*10) time.sleep(0.05) p_recv.can_recv() p_send.can_recv() @@ -80,7 +80,7 @@ def test_latency(p_send, p_recv): for i in range(num_messages): st = time.time() - p_send.can_send(0x1ab, "message", bus) + p_send.can_send(0x1ab, b"message", bus) r = [] while len(r) < 1 and (time.time() - st) < 5: r = p_recv.can_recv() @@ -127,7 +127,7 @@ def test_black_loopback(panda0, panda1): panda1.set_can_loopback(False) # clear stuff - panda0.can_send_many([(0x1ba, 0, "testmsg", 0)]*10) + panda0.can_send_many([(0x1ba, 0, b"testmsg", 0)]*10) time.sleep(0.05) panda0.can_recv() panda1.can_recv() @@ -155,7 +155,7 @@ def test_black_loopback(panda0, panda1): def _test_buses(send_panda, recv_panda, _test_array): for send_bus, send_obd, recv_obd, recv_buses in _test_array: print("\nSend bus:", send_bus, " Send OBD:", send_obd, " Recv OBD:", recv_obd) - + # set OBD on pandas send_panda.set_gmlan(True if send_obd else None) recv_panda.set_gmlan(True if recv_obd else None) @@ -180,7 +180,7 @@ def test_black_loopback(panda0, panda1): loop_buses.append(loop[3]) if len(cans_loop) == 0: print(" No loop") - + # test loop buses recv_buses.sort() loop_buses.sort() @@ -192,4 +192,4 @@ def test_black_loopback(panda0, panda1): print("***************** TESTING (0 --> 1) *****************") _test_buses(panda0, panda1, test_array) print("***************** TESTING (1 --> 0) *****************") - _test_buses(panda1, panda0, test_array) \ No newline at end of file + _test_buses(panda1, panda0, test_array) diff --git a/tests/automated/helpers.py b/tests/automated/helpers.py index 370e696..6c46959 100644 --- a/tests/automated/helpers.py +++ b/tests/automated/helpers.py @@ -140,7 +140,7 @@ def time_many_sends(p, bus, precv=None, msg_count=100, msg_id=None, two_pandas=F raise ValueError("Cannot have two pandas that are the same panda") st = time.time() - p.can_send_many([(msg_id, 0, "\xaa"*8, bus)]*msg_count) + p.can_send_many([(msg_id, 0, b"\xaa"*8, bus)]*msg_count) r = [] r_echo = [] r_len_expected = msg_count if two_pandas else msg_count*2