Panda Jungle testing (#394)

Rebase all open branches on this commit to make sure they are tested correctly!
master
robbederks 2019-11-26 16:18:34 -08:00 committed by GitHub
parent 2a093a39f2
commit e1c34a1a29
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
12 changed files with 404 additions and 245 deletions

View File

@ -71,6 +71,8 @@ ENV PYTHONPATH /tmp:$PYTHONPATH
COPY ./boardesp/get_sdk_ci.sh /tmp/panda/boardesp/
COPY ./boardesp/python2_make.py /tmp/panda/boardesp/
COPY ./panda_jungle /tmp/panda_jungle
RUN useradd --system -s /sbin/nologin pandauser
RUN mkdir -p /tmp/panda/boardesp/esp-open-sdk
RUN chown pandauser /tmp/panda/boardesp/esp-open-sdk

5
Jenkinsfile vendored
View File

@ -14,6 +14,11 @@ pipeline {
steps {
timeout(time: 60, unit: 'MINUTES') {
script {
try {
sh 'cp -R /home/batman/panda_jungle .'
} catch (err) {
echo "Folder already exists"
}
sh 'git archive -v -o panda.tar.gz --format=tar.gz HEAD'
dockerImage = docker.build("${env.DOCKER_IMAGE_TAG}")
}

View File

@ -1,4 +1,8 @@
from .helpers import test_all_pandas, panda_connect_and_init
from .helpers import reset_pandas, test_all_pandas, panda_connect_and_init
# Reset the pandas before flashing them
def aaaa_reset_before_tests():
reset_pandas()
@test_all_pandas
@panda_connect_and_init

View File

@ -0,0 +1,37 @@
import time
from panda_jungle import PandaJungle # pylint: disable=import-error
from .helpers import panda_jungle, reset_pandas, test_all_pandas, test_all_gen2_pandas, panda_connect_and_init
# Reset the pandas before running tests
def aaaa_reset_before_tests():
reset_pandas()
@test_all_pandas
@panda_connect_and_init
def test_ignition(p):
try:
# Set harness orientation to #2, since the ignition line is on the wrong SBU bus :/
panda_jungle.set_harness_orientation(PandaJungle.HARNESS_ORIENTATION_2)
reset_pandas()
p.reconnect()
panda_jungle.set_ignition(False)
time.sleep(2)
assert p.health()['ignition_line'] == False
panda_jungle.set_ignition(True)
time.sleep(2)
assert p.health()['ignition_line'] == True
finally:
panda_jungle.set_harness_orientation(PandaJungle.HARNESS_ORIENTATION_1)
@test_all_gen2_pandas
@panda_connect_and_init
def test_orientation_detection(p):
seen_orientations = []
for i in range(3):
panda_jungle.set_harness_orientation(i)
reset_pandas()
p.reconnect()
detected_harness_orientation = p.health()['car_harness_status']
if (i == 0 and detected_harness_orientation != 0) or detected_harness_orientation in seen_orientations:
assert False
seen_orientations.append(detected_harness_orientation)

View File

@ -2,11 +2,18 @@ import sys
import time
from panda import Panda
from nose.tools import assert_equal, assert_less, assert_greater
from .helpers import SPEED_NORMAL, SPEED_GMLAN, time_many_sends, test_white_and_grey, panda_type_to_serial, test_all_pandas, panda_connect_and_init
from .helpers import start_heartbeat_thread, reset_pandas, SPEED_NORMAL, SPEED_GMLAN, time_many_sends, test_white_and_grey, panda_type_to_serial, test_all_pandas, panda_connect_and_init
# Reset the pandas before running tests
def aaaa_reset_before_tests():
reset_pandas()
@test_all_pandas
@panda_connect_and_init
def test_can_loopback(p):
# Start heartbeat
start_heartbeat_thread(p)
# enable output mode
p.set_safety_mode(Panda.SAFETY_ALLOUTPUT)
@ -40,6 +47,9 @@ def test_can_loopback(p):
@test_all_pandas
@panda_connect_and_init
def test_safety_nooutput(p):
# Start heartbeat
start_heartbeat_thread(p)
# enable output mode
p.set_safety_mode(Panda.SAFETY_SILENT)
@ -60,6 +70,9 @@ def test_reliability(p):
LOOP_COUNT = 100
MSG_COUNT = 100
# Start heartbeat
start_heartbeat_thread(p)
# enable output mode
p.set_safety_mode(Panda.SAFETY_ALLOUTPUT)
p.set_can_loopback(True)
@ -95,6 +108,9 @@ def test_reliability(p):
@test_all_pandas
@panda_connect_and_init
def test_throughput(p):
# Start heartbeat
start_heartbeat_thread(p)
# enable output mode
p.set_safety_mode(Panda.SAFETY_ALLOUTPUT)
@ -122,6 +138,9 @@ def test_gmlan(p):
if p.legacy:
return
# Start heartbeat
start_heartbeat_thread(p)
# enable output mode
p.set_safety_mode(Panda.SAFETY_ALLOUTPUT)
@ -153,6 +172,9 @@ def test_gmlan_bad_toggle(p):
if p.legacy:
return
# Start heartbeat
start_heartbeat_thread(p)
# enable output mode
p.set_safety_mode(Panda.SAFETY_ALLOUTPUT)

View File

@ -1,8 +1,12 @@
import time
from panda import Panda
from .helpers import connect_wifi, test_white, test_all_pandas, panda_type_to_serial, panda_connect_and_init
from .helpers import reset_pandas, connect_wifi, test_white, test_all_pandas, panda_type_to_serial, panda_connect_and_init
import requests
# Reset the pandas before running tests
def aaaa_reset_before_tests():
reset_pandas()
@test_all_pandas
@panda_connect_and_init
def test_get_serial(p):

View File

@ -1,6 +1,10 @@
import time
from panda import Panda
from .helpers import time_many_sends, connect_wifi, test_white, panda_type_to_serial
from .helpers import start_heartbeat_thread, reset_pandas, time_many_sends, connect_wifi, test_white, panda_type_to_serial
# Reset the pandas before running tests
def aaaa_reset_before_tests():
reset_pandas()
@test_white
@panda_type_to_serial
@ -16,6 +20,9 @@ def test_throughput(serials=None):
connect_wifi(serials[0])
p = Panda(serials[0])
# Start heartbeat
start_heartbeat_thread(p)
# enable output mode
p.set_safety_mode(Panda.SAFETY_ALLOUTPUT)
@ -43,6 +50,10 @@ def test_throughput(serials=None):
def test_recv_only(serials=None):
connect_wifi(serials[0])
p = Panda(serials[0])
# Start heartbeat
start_heartbeat_thread(p)
p.set_safety_mode(Panda.SAFETY_ALLOUTPUT)
p.set_can_loopback(True)

View File

@ -1,195 +0,0 @@
import os
import time
import random
from panda import Panda
from nose.tools import assert_equal, assert_less, assert_greater
from .helpers import time_many_sends, test_two_panda, test_two_black_panda, panda_type_to_serial, clear_can_buffers, panda_connect_and_init
@test_two_panda
@panda_type_to_serial
@panda_connect_and_init
def test_send_recv(p_send, p_recv):
p_send.set_safety_mode(Panda.SAFETY_ALLOUTPUT)
p_recv.set_safety_mode(Panda.SAFETY_ALLOUTPUT)
p_send.set_can_loopback(False)
p_recv.set_can_loopback(False)
assert not p_send.legacy
assert not p_recv.legacy
p_send.can_send_many([(0x1ba, 0, b"message", 0)]*2)
time.sleep(0.05)
p_recv.can_recv()
p_send.can_recv()
busses = [0,1,2]
for bus in busses:
for speed in [100, 250, 500, 750, 1000]:
p_send.set_can_speed_kbps(bus, speed)
p_recv.set_can_speed_kbps(bus, speed)
time.sleep(0.05)
comp_kbps = time_many_sends(p_send, bus, p_recv, two_pandas=True)
saturation_pct = (comp_kbps/speed) * 100.0
assert_greater(saturation_pct, 80)
assert_less(saturation_pct, 100)
print("two pandas bus {}, 100 messages at speed {:4d}, comp speed is {:7.2f}, percent {:6.2f}".format(bus, speed, comp_kbps, saturation_pct))
@test_two_panda
@panda_type_to_serial
@panda_connect_and_init
def test_latency(p_send, p_recv):
p_send.set_safety_mode(Panda.SAFETY_ALLOUTPUT)
p_recv.set_safety_mode(Panda.SAFETY_ALLOUTPUT)
p_send.set_can_loopback(False)
p_recv.set_can_loopback(False)
assert not p_send.legacy
assert not p_recv.legacy
p_send.set_can_speed_kbps(0, 100)
p_recv.set_can_speed_kbps(0, 100)
time.sleep(0.05)
p_send.can_send_many([(0x1ba, 0, b"testmsg", 0)]*10)
time.sleep(0.05)
p_recv.can_recv()
p_send.can_recv()
busses = [0,1,2]
for bus in busses:
for speed in [100, 250, 500, 750, 1000]:
p_send.set_can_speed_kbps(bus, speed)
p_recv.set_can_speed_kbps(bus, speed)
time.sleep(0.1)
#clear can buffers
clear_can_buffers(p_send)
clear_can_buffers(p_recv)
latencies = []
comp_kbps_list = []
saturation_pcts = []
num_messages = 100
for i in range(num_messages):
st = time.time()
p_send.can_send(0x1ab, b"message", bus)
r = []
while len(r) < 1 and (time.time() - st) < 5:
r = p_recv.can_recv()
et = time.time()
r_echo = []
while len(r_echo) < 1 and (time.time() - st) < 10:
r_echo = p_send.can_recv()
if len(r) == 0 or len(r_echo) == 0:
print("r: {}, r_echo: {}".format(r, r_echo))
assert_equal(len(r),1)
assert_equal(len(r_echo),1)
et = (et - st)*1000.0
comp_kbps = (1+11+1+1+1+4+8*8+15+1+1+1+7) / et
latency = et - ((1+11+1+1+1+4+8*8+15+1+1+1+7) / speed)
assert_less(latency, 5.0)
saturation_pct = (comp_kbps/speed) * 100.0
latencies.append(latency)
comp_kbps_list.append(comp_kbps)
saturation_pcts.append(saturation_pct)
average_latency = sum(latencies)/num_messages
assert_less(average_latency, 1.0)
average_comp_kbps = sum(comp_kbps_list)/num_messages
average_saturation_pct = sum(saturation_pcts)/num_messages
print("two pandas bus {}, {} message average at speed {:4d}, latency is {:5.3f}ms, comp speed is {:7.2f}, percent {:6.2f}"\
.format(bus, num_messages, speed, average_latency, average_comp_kbps, average_saturation_pct))
@test_two_black_panda
@panda_type_to_serial
@panda_connect_and_init
def test_black_loopback(panda0, panda1):
# disable safety modes
panda0.set_safety_mode(Panda.SAFETY_ALLOUTPUT)
panda1.set_safety_mode(Panda.SAFETY_ALLOUTPUT)
# disable loopback
panda0.set_can_loopback(False)
panda1.set_can_loopback(False)
# clear stuff
panda0.can_send_many([(0x1ba, 0, b"testmsg", 0)]*10)
time.sleep(0.05)
panda0.can_recv()
panda1.can_recv()
# test array (send bus, sender obd, reciever obd, expected busses)
test_array = [
(0, False, False, [0]),
(1, False, False, [1]),
(2, False, False, [2]),
(0, False, True, [0, 1]),
(1, False, True, []),
(2, False, True, [2]),
(0, True, False, [0]),
(1, True, False, [0]),
(2, True, False, [2]),
(0, True, True, [0, 1]),
(1, True, True, [0, 1]),
(2, True, True, [2])
]
# test functions
def get_test_string():
return b"test"+os.urandom(10)
def _test_buses(send_panda, recv_panda, _test_array):
for send_bus, send_obd, recv_obd, recv_buses in _test_array:
print("\nSend bus:", send_bus, " Send OBD:", send_obd, " Recv OBD:", recv_obd)
# set OBD on pandas
send_panda.set_gmlan(True if send_obd else None)
recv_panda.set_gmlan(True if recv_obd else None)
# clear buffers
clear_can_buffers(send_panda)
clear_can_buffers(recv_panda)
# send the characters
at = random.randint(1, 2000)
st = get_test_string()[0:8]
send_panda.can_send(at, st, send_bus)
time.sleep(0.1)
# check for receive
_ = send_panda.can_recv() # cans echo
cans_loop = recv_panda.can_recv()
loop_buses = []
for loop in cans_loop:
print(" Loop on bus", str(loop[3]))
loop_buses.append(loop[3])
if len(cans_loop) == 0:
print(" No loop")
# test loop buses
recv_buses.sort()
loop_buses.sort()
assert recv_buses == loop_buses
print(" TEST PASSED")
print("\n")
# test both orientations
print("***************** TESTING (0 --> 1) *****************")
_test_buses(panda0, panda1, test_array)
print("***************** TESTING (1 --> 0) *****************")
_test_buses(panda1, panda0, test_array)

View File

@ -1,16 +1,23 @@
import sys
import time
from .helpers import time_many_sends, connect_wifi, test_white, panda_type_to_serial
from .helpers import start_heartbeat_thread, reset_pandas, time_many_sends, connect_wifi, test_white, panda_type_to_serial
from panda import Panda, PandaWifiStreaming
from nose.tools import assert_less, assert_greater
# Reset the pandas before running tests
def aaaa_reset_before_tests():
reset_pandas()
@test_white
@panda_type_to_serial
def test_udp_doesnt_drop(serials=None):
connect_wifi(serials[0])
p = Panda(serials[0])
# Start heartbeat
start_heartbeat_thread(p)
p.set_safety_mode(Panda.SAFETY_ALLOUTPUT)
p.set_can_loopback(True)

View File

@ -0,0 +1,202 @@
import os
import time
import random
from panda import Panda
from nose.tools import assert_equal, assert_less, assert_greater
from .helpers import panda_jungle, start_heartbeat_thread, reset_pandas, time_many_sends, test_all_pandas, test_all_gen2_pandas, clear_can_buffers, panda_connect_and_init
# Reset the pandas before running tests
def aaaa_reset_before_tests():
reset_pandas()
@test_all_pandas
@panda_connect_and_init
def test_send_recv(p):
def test(p_send, p_recv):
p_send.set_can_loopback(False)
p_recv.set_can_loopback(False)
p_send.can_send_many([(0x1ba, 0, b"message", 0)]*2)
time.sleep(0.05)
p_recv.can_recv()
p_send.can_recv()
busses = [0,1,2]
for bus in busses:
for speed in [100, 250, 500, 750, 1000]:
p_send.set_can_speed_kbps(bus, speed)
p_recv.set_can_speed_kbps(bus, speed)
time.sleep(0.05)
clear_can_buffers(p_send)
clear_can_buffers(p_recv)
comp_kbps = time_many_sends(p_send, bus, p_recv, two_pandas=True)
saturation_pct = (comp_kbps/speed) * 100.0
assert_greater(saturation_pct, 80)
assert_less(saturation_pct, 100)
print("two pandas bus {}, 100 messages at speed {:4d}, comp speed is {:7.2f}, percent {:6.2f}".format(bus, speed, comp_kbps, saturation_pct))
# Start heartbeat
start_heartbeat_thread(p)
# Set safety mode and power saving
p.set_safety_mode(Panda.SAFETY_ALLOUTPUT)
p.set_power_save(False)
try:
# Run tests in both directions
test(p, panda_jungle)
test(panda_jungle, p)
except Exception as e:
# Raise errors again, we don't want them to get lost
raise e
finally:
# Set back to silent mode
p.set_safety_mode(Panda.SAFETY_SILENT)
@test_all_pandas
@panda_connect_and_init
def test_latency(p):
def test(p_send, p_recv):
p_send.set_can_loopback(False)
p_recv.set_can_loopback(False)
p_send.set_can_speed_kbps(0, 100)
p_recv.set_can_speed_kbps(0, 100)
time.sleep(0.05)
p_send.can_send_many([(0x1ba, 0, b"testmsg", 0)]*10)
time.sleep(0.05)
p_recv.can_recv()
p_send.can_recv()
busses = [0,1,2]
for bus in busses:
for speed in [100, 250, 500, 750, 1000]:
p_send.set_can_speed_kbps(bus, speed)
p_recv.set_can_speed_kbps(bus, speed)
time.sleep(0.1)
# clear can buffers
clear_can_buffers(p_send)
clear_can_buffers(p_recv)
latencies = []
comp_kbps_list = []
saturation_pcts = []
num_messages = 100
for i in range(num_messages):
st = time.time()
p_send.can_send(0x1ab, b"message", bus)
r = []
while len(r) < 1 and (time.time() - st) < 5:
r = p_recv.can_recv()
et = time.time()
r_echo = []
while len(r_echo) < 1 and (time.time() - st) < 10:
r_echo = p_send.can_recv()
if len(r) == 0 or len(r_echo) == 0:
print("r: {}, r_echo: {}".format(r, r_echo))
assert_equal(len(r),1)
assert_equal(len(r_echo),1)
et = (et - st)*1000.0
comp_kbps = (1+11+1+1+1+4+8*8+15+1+1+1+7) / et
latency = et - ((1+11+1+1+1+4+8*8+15+1+1+1+7) / speed)
assert_less(latency, 5.0)
saturation_pct = (comp_kbps/speed) * 100.0
latencies.append(latency)
comp_kbps_list.append(comp_kbps)
saturation_pcts.append(saturation_pct)
average_latency = sum(latencies)/num_messages
assert_less(average_latency, 1.0)
average_comp_kbps = sum(comp_kbps_list)/num_messages
average_saturation_pct = sum(saturation_pcts)/num_messages
print("two pandas bus {}, {} message average at speed {:4d}, latency is {:5.3f}ms, comp speed is {:7.2f}, percent {:6.2f}"\
.format(bus, num_messages, speed, average_latency, average_comp_kbps, average_saturation_pct))
# Start heartbeat
start_heartbeat_thread(p)
# Set safety mode and power saving
p.set_safety_mode(Panda.SAFETY_ALLOUTPUT)
p.set_power_save(False)
try:
# Run tests in both directions
test(p, panda_jungle)
test(panda_jungle, p)
except Exception as e:
# Raise errors again, we don't want them to get lost
raise e
finally:
# Set back to silent mode
p.set_safety_mode(Panda.SAFETY_SILENT)
@test_all_gen2_pandas
@panda_connect_and_init
def test_gen2_loopback(p):
def test(p_send, p_recv):
for bus in range(4):
obd = False
if bus == 3:
obd = True
bus = 1
# Clear buses
clear_can_buffers(p_send)
clear_can_buffers(p_recv)
# Send a random string
addr = random.randint(1, 2000)
string = b"test"+os.urandom(4)
p_send.set_obd(obd)
p_recv.set_obd(obd)
time.sleep(0.2)
p_send.can_send(addr, string, bus)
time.sleep(0.2)
content = p_recv.can_recv()
# Check amount of messages
assert len(content) == 1
# Check content
assert content[0][0] == addr and content[0][2] == string
# Check bus
assert content[0][3] == bus
print("Bus:", bus, "OBD:", obd, "OK")
# Start heartbeat
start_heartbeat_thread(p)
# Set safety mode and power saving
p.set_safety_mode(Panda.SAFETY_ALLOUTPUT)
p.set_power_save(False)
try:
# Run tests in both directions
test(p, panda_jungle)
test(panda_jungle, p)
except Exception as e:
# Raise errors again, we don't want them to get lost
raise e
finally:
# Set back to silent mode
p.set_safety_mode(Panda.SAFETY_SILENT)

View File

@ -5,21 +5,51 @@ import random
import subprocess
import requests
import _thread
import faulthandler
from functools import wraps
from panda import Panda
from panda_jungle import PandaJungle # pylint: disable=import-error
from nose.tools import assert_equal
from parameterized import parameterized, param
from .timeout import run_with_timeout
SPEED_NORMAL = 500
SPEED_GMLAN = 33.3
BUS_SPEEDS = [(0, SPEED_NORMAL), (1, SPEED_NORMAL), (2, SPEED_NORMAL), (3, SPEED_GMLAN)]
TIMEOUT = 30
GEN2_HW_TYPES = [Panda.HW_TYPE_BLACK_PANDA, Panda.HW_TYPE_UNO]
# Enable fault debug
faulthandler.enable(all_threads=False)
# Connect to Panda Jungle
panda_jungle = PandaJungle()
# Find all panda's connected
_panda_serials = None
def init_panda_serials():
global panda_jungle, _panda_serials
_panda_serials = []
panda_jungle.set_panda_power(True)
time.sleep(5)
for serial in Panda.list():
p = Panda(serial=serial)
_panda_serials.append((serial, p.get_type()))
p.close()
print('Found', str(len(_panda_serials)), 'pandas')
init_panda_serials()
# Panda providers
test_all_types = parameterized([
param(panda_type=Panda.HW_TYPE_WHITE_PANDA),
param(panda_type=Panda.HW_TYPE_GREY_PANDA),
param(panda_type=Panda.HW_TYPE_BLACK_PANDA)
])
test_all_pandas = parameterized(
Panda.list()
list(map(lambda x: x[0], _panda_serials))
)
test_all_gen2_pandas = parameterized(
list(map(lambda x: x[0], filter(lambda x: x[1] in GEN2_HW_TYPES, _panda_serials)))
)
test_white_and_grey = parameterized([
param(panda_type=Panda.HW_TYPE_WHITE_PANDA),
@ -31,13 +61,8 @@ test_white = parameterized([
test_grey = parameterized([
param(panda_type=Panda.HW_TYPE_GREY_PANDA)
])
test_two_panda = parameterized([
param(panda_type=[Panda.HW_TYPE_GREY_PANDA, Panda.HW_TYPE_WHITE_PANDA]),
param(panda_type=[Panda.HW_TYPE_WHITE_PANDA, Panda.HW_TYPE_GREY_PANDA]),
param(panda_type=[Panda.HW_TYPE_BLACK_PANDA, Panda.HW_TYPE_BLACK_PANDA])
])
test_two_black_panda = parameterized([
param(panda_type=[Panda.HW_TYPE_BLACK_PANDA, Panda.HW_TYPE_BLACK_PANDA])
test_black = parameterized([
param(panda_type=Panda.HW_TYPE_BLACK_PANDA)
])
def connect_wifi(serial=None):
@ -53,7 +78,7 @@ def _connect_wifi(dongle_id, pw, insecure_okay=False):
r = subprocess.call(["ping", "-W", "4", "-c", "1", "192.168.0.10"], stdout=FNULL, stderr=subprocess.STDOUT)
if not r:
#Can already ping, try connecting on wifi
# Can already ping, try connecting on wifi
try:
p = Panda("WIFI")
p.get_serial()
@ -132,26 +157,26 @@ def _connect_wifi(dongle_id, pw, insecure_okay=False):
# TODO: confirm that it's connected to the right panda
def time_many_sends(p, bus, precv=None, msg_count=100, msg_id=None, two_pandas=False):
if precv == None:
precv = p
def time_many_sends(p, bus, p_recv=None, msg_count=100, msg_id=None, two_pandas=False):
if p_recv == None:
p_recv = p
if msg_id == None:
msg_id = random.randint(0x100, 0x200)
if p == precv and two_pandas:
if p == p_recv and two_pandas:
raise ValueError("Cannot have two pandas that are the same panda")
st = time.time()
start_time = time.time()
p.can_send_many([(msg_id, 0, b"\xaa"*8, bus)]*msg_count)
r = []
r_echo = []
r_len_expected = msg_count if two_pandas else msg_count*2
r_echo_len_exected = msg_count if two_pandas else 0
while len(r) < r_len_expected and (time.time() - st) < 5:
r.extend(precv.can_recv())
et = time.time()
while len(r) < r_len_expected and (time.time() - start_time) < 5:
r.extend(p_recv.can_recv())
end_time = time.time()
if two_pandas:
while len(r_echo) < r_echo_len_exected and (time.time() - st) < 10:
while len(r_echo) < r_echo_len_exected and (time.time() - start_time) < 10:
r_echo.extend(p.can_recv())
sent_echo = [x for x in r if x[3] == 0x80 | bus and x[0] == msg_id]
@ -164,12 +189,17 @@ def time_many_sends(p, bus, precv=None, msg_count=100, msg_id=None, two_pandas=F
assert_equal(len(resp), msg_count)
assert_equal(len(sent_echo), msg_count)
et = (et-st)*1000.0
comp_kbps = (1+11+1+1+1+4+8*8+15+1+1+1+7)*msg_count / et
end_time = (end_time-start_time)*1000.0
comp_kbps = (1+11+1+1+1+4+8*8+15+1+1+1+7)*msg_count / end_time
return comp_kbps
_panda_serials = None
def reset_pandas():
panda_jungle.set_panda_power(False)
time.sleep(2)
panda_jungle.set_panda_power(True)
time.sleep(5)
def panda_type_to_serial(fn):
@wraps(fn)
def wrapper(panda_type=None, **kwargs):
@ -181,11 +211,7 @@ def panda_type_to_serial(fn):
# If not done already, get panda serials and their type
global _panda_serials
if _panda_serials == None:
_panda_serials = []
for serial in Panda.list():
p = Panda(serial=serial)
_panda_serials.append((serial, p.get_type()))
p.close()
init_panda_serials()
# Find a panda with the correct types and add the corresponding serial
serials = []
@ -202,13 +228,15 @@ def panda_type_to_serial(fn):
return fn(serials, **kwargs)
return wrapper
def heartbeat_thread(p):
while True:
try:
p.send_heartbeat()
time.sleep(1)
except:
break
def start_heartbeat_thread(p):
def heartbeat_thread(p):
while True:
try:
p.send_heartbeat()
time.sleep(1)
except:
break
_thread.start_new_thread(heartbeat_thread, (p,))
def panda_connect_and_init(fn):
@wraps(fn)
@ -223,26 +251,33 @@ def panda_connect_and_init(fn):
for panda_serial in panda_serials:
pandas.append(Panda(serial=panda_serial))
# Initialize jungle
clear_can_buffers(panda_jungle)
panda_jungle.set_can_loopback(False)
panda_jungle.set_obd(False)
panda_jungle.set_harness_orientation(PandaJungle.HARNESS_ORIENTATION_1)
for bus, speed in BUS_SPEEDS:
panda_jungle.set_can_speed_kbps(bus, speed)
# Initialize pandas
for panda in pandas:
panda.set_can_loopback(False)
panda.set_gmlan(None)
panda.set_esp_power(False)
for bus, speed in [(0, SPEED_NORMAL), (1, SPEED_NORMAL), (2, SPEED_NORMAL), (3, SPEED_GMLAN)]:
panda.set_power_save(False)
for bus, speed in BUS_SPEEDS:
panda.set_can_speed_kbps(bus, speed)
clear_can_buffers(panda)
_thread.start_new_thread(heartbeat_thread, (panda,))
panda.set_power_save(False)
# Run test function
ret = fn(*pandas, **kwargs)
# Close all connections
for panda in pandas:
panda.close()
# Return test function result
return ret
try:
run_with_timeout(TIMEOUT, fn, *pandas, **kwargs)
except Exception as e:
raise e
finally:
# Close all connections
for panda in pandas:
panda.close()
return wrapper
def clear_can_buffers(panda):

View File

@ -0,0 +1,25 @@
import time
from multiprocessing import Process
# Note: this does not return any return values of the function, just the exit status
INTERVAL = 0.1
def run_with_timeout(timeout, fn, *kwargs):
def runner(fn, kwargs):
try:
fn(*kwargs)
except Exception as e:
print(e)
raise e
process = Process(target=runner, args=(fn, kwargs))
process.start()
counter = 0
while process.is_alive():
time.sleep(INTERVAL)
counter+=1
if (counter * INTERVAL) > timeout:
process.terminate()
raise TimeoutError("Function timed out!")
if process.exitcode != 0:
raise RuntimeError("Test failed with exit code: ", str(process.exitcode))