Black panda Jenkins (#256)

* Jenkins test refactor and black panda addition

* Added HW types needed by previous commit

* Fixed ignition interrupts when not on EON build

* Added functions for load switches

* More test scripts for black panda

* Added NONE power mode to the code

* Fixed race condition when setting GPIO pins was interrupted.

* Added relay test script

* Fixed flashing with critical sections and GPS load switch

* Fixing critical depth after reboot

* Made the loopback test asserting

* Made critical depth a local variable to avoid race conditions

* Added GPS to power savings mode

* Fixed DFU mode on white panda and bumped version

* Fixed PEDAL_USB compilation error

* Fixed misra compliance of new critical depth code

* Cleaned up heartbeat logic in the testing code. Re-added ALL_CAN_BUT_MAIN_SILENT. Bumped version. Improved critical section code.

* Fixed DFU flashing (once again)

* Fixed VERSION

* Added relay endurance test

* Changed to alloutput on ELM mode for fingerprinting.

* Fixed minor remarks
master
robbederks 2019-08-28 12:57:42 -07:00 committed by rbiasini
parent d68508c79a
commit 6f532c6d51
21 changed files with 939 additions and 342 deletions

2
.gitignore vendored
View File

@ -4,6 +4,7 @@
*.o
*.so
*.d
*.dump
a.out
*~
.#*
@ -12,4 +13,5 @@ pandacan.egg-info/
board/obj/
examples/output.csv
.DS_Store
.vscode
nosetests.xml

View File

@ -1 +1 @@
v1.4.3
v1.4.6

View File

@ -24,7 +24,7 @@ struct board {
};
// ******************* Definitions ********************
// These should match the enum in cereal/log.capnp
// These should match the enums in cereal/log.capnp and __init__.py
#define HW_TYPE_UNKNOWN 0U
#define HW_TYPE_WHITE_PANDA 1U
#define HW_TYPE_GREY_PANDA 2U
@ -54,4 +54,4 @@ struct board {
#define CAN_MODE_OBD_CAN2 3U
// ********************* Globals **********************
uint8_t usb_power_mode = USB_POWER_NONE;
uint8_t usb_power_mode = USB_POWER_NONE;

View File

@ -23,8 +23,9 @@ void black_enable_can_transciever(uint8_t transciever, bool enabled) {
}
void black_enable_can_transcievers(bool enabled) {
for(uint8_t i=1; i<=4U; i++)
for(uint8_t i=1U; i<=4U; i++){
black_enable_can_transciever(i, enabled);
}
}
void black_set_led(uint8_t color, bool enabled) {
@ -43,26 +44,41 @@ void black_set_led(uint8_t color, bool enabled) {
}
}
void black_set_usb_power_mode(uint8_t mode){
void black_set_gps_load_switch(bool enabled) {
set_gpio_output(GPIOC, 12, enabled);
}
void black_set_usb_load_switch(bool enabled) {
set_gpio_output(GPIOB, 1, !enabled);
}
void black_set_usb_power_mode(uint8_t mode) {
usb_power_mode = mode;
puts("Trying to set USB power mode on black panda. This is not supported.\n");
if (mode == USB_POWER_NONE) {
black_set_usb_load_switch(false);
} else {
black_set_usb_load_switch(true);
}
}
void black_set_esp_gps_mode(uint8_t mode) {
switch (mode) {
case ESP_GPS_DISABLED:
// ESP OFF
// GPS OFF
set_gpio_output(GPIOC, 14, 0);
set_gpio_output(GPIOC, 5, 0);
black_set_gps_load_switch(false);
break;
case ESP_GPS_ENABLED:
// ESP ON
// GPS ON
set_gpio_output(GPIOC, 14, 1);
set_gpio_output(GPIOC, 5, 1);
black_set_gps_load_switch(true);
break;
case ESP_GPS_BOOTMODE:
set_gpio_output(GPIOC, 14, 1);
set_gpio_output(GPIOC, 5, 0);
black_set_gps_load_switch(true);
break;
default:
puts("Invalid ESP/GPS mode\n");
@ -132,9 +148,11 @@ void black_init(void) {
// C8: FAN aka TIM3_CH3
set_gpio_alternate(GPIOC, 8, GPIO_AF2_TIM3);
// C12: GPS load switch. Turn on permanently for now
set_gpio_output(GPIOC, 12, true);
//set_gpio_output(GPIOC, 12, false); //TODO: stupid inverted switch on prototype
// Turn on GPS load switch.
black_set_gps_load_switch(true);
// Turn on USB load switch.
black_set_usb_load_switch(true);
// Initialize harness
harness_init();

View File

@ -285,6 +285,13 @@ void white_init(void) {
// Set normal CAN mode
white_set_can_mode(CAN_MODE_NORMAL);
// Setup ignition interrupts
SYSCFG->EXTICR[1] = SYSCFG_EXTICR1_EXTI1_PA;
EXTI->IMR |= (1U << 1);
EXTI->RTSR |= (1U << 1);
EXTI->FTSR |= (1U << 1);
NVIC_EnableIRQ(EXTI1_IRQn);
}
const harness_configuration white_harness_config = {

View File

@ -78,9 +78,11 @@ void started_interrupt_handler(uint8_t interrupt_line) {
// jenky debounce
delay(100000);
// set power savings mode here
int power_save_state = current_board->check_ignition() ? POWER_SAVE_STATUS_DISABLED : POWER_SAVE_STATUS_ENABLED;
set_power_save_state(power_save_state);
// set power savings mode here if on EON build
#ifdef EON
int power_save_state = current_board->check_ignition() ? POWER_SAVE_STATUS_DISABLED : POWER_SAVE_STATUS_ENABLED;
set_power_save_state(power_save_state);
#endif
}
EXTI->PR = (1U << interrupt_line);
}
@ -100,14 +102,6 @@ void EXTI3_IRQHandler(void) {
started_interrupt_handler(3);
}
void started_interrupt_init(void) {
SYSCFG->EXTICR[1] = SYSCFG_EXTICR1_EXTI1_PA;
EXTI->IMR |= (1U << 1);
EXTI->RTSR |= (1U << 1);
EXTI->FTSR |= (1U << 1);
NVIC_EnableIRQ(EXTI1_IRQn);
}
// ****************************** safety mode ******************************
// this is the only way to leave silent mode
@ -116,30 +110,29 @@ void set_safety_mode(uint16_t mode, int16_t param) {
if (err == -1) {
puts("Error: safety set mode failed\n");
} else {
if (mode == SAFETY_NOOUTPUT) {
can_silent = ALL_CAN_SILENT;
} else {
can_silent = ALL_CAN_LIVE;
}
switch (mode) {
case SAFETY_NOOUTPUT:
set_intercept_relay(false);
if(hw_type == HW_TYPE_BLACK_PANDA){
current_board->set_can_mode(CAN_MODE_NORMAL);
}
can_silent = ALL_CAN_SILENT;
break;
case SAFETY_ELM327:
set_intercept_relay(false);
heartbeat_counter = 0U;
if(hw_type == HW_TYPE_BLACK_PANDA){
current_board->set_can_mode(CAN_MODE_OBD_CAN2);
}
can_silent = ALL_CAN_LIVE;
break;
default:
set_intercept_relay(true);
heartbeat_counter = 0U;
if(hw_type == HW_TYPE_BLACK_PANDA){
current_board->set_can_mode(CAN_MODE_NORMAL);
}
can_silent = ALL_CAN_LIVE;
break;
}
if (safety_ignition_hook() != -1) {
@ -464,7 +457,10 @@ int usb_cb_control_msg(USB_Setup_TypeDef *setup, uint8_t *resp, bool hardwired)
break;
// **** 0xe6: set USB power
case 0xe6:
if (setup->b.wValue.w == 1U) {
if (setup->b.wValue.w == 0U) {
puts("user setting NONE mode\n");
current_board->set_usb_power_mode(USB_POWER_NONE);
} else if (setup->b.wValue.w == 1U) {
puts("user setting CDP mode\n");
current_board->set_usb_power_mode(USB_POWER_CDP);
} else if (setup->b.wValue.w == 2U) {
@ -610,11 +606,10 @@ void TIM3_IRQHandler(void) {
pending_can_live = 0;
}
#ifdef DEBUG
//TODO: re-enable
//puts("** blink ");
//puth(can_rx_q.r_ptr); puts(" "); puth(can_rx_q.w_ptr); puts(" ");
//puth(can_tx1_q.r_ptr); puts(" "); puth(can_tx1_q.w_ptr); puts(" ");
//puth(can_tx2_q.r_ptr); puts(" "); puth(can_tx2_q.w_ptr); puts("\n");
puts("** blink ");
puth(can_rx_q.r_ptr); puts(" "); puth(can_rx_q.w_ptr); puts(" ");
puth(can_tx1_q.r_ptr); puts(" "); puth(can_tx1_q.w_ptr); puts(" ");
puth(can_tx2_q.r_ptr); puts(" "); puth(can_tx2_q.w_ptr); puts("\n");
#endif
// set green LED to be controls allowed
@ -730,11 +725,6 @@ int main(void) {
/*if (current_board->check_ignition()) {
set_power_save_state(POWER_SAVE_STATUS_ENABLED);
}*/
if (hw_type != HW_TYPE_BLACK_PANDA) {
// interrupt on started line
started_interrupt_init();
}
#endif
// 48mhz / 65536 ~= 732 / 732 = 1

View File

@ -28,6 +28,13 @@ void set_power_save_state(int state) {
// Switch CAN transcievers
current_board->enable_can_transcievers(enable);
// Switch EPS/GPS
if (enable) {
current_board->set_esp_gps_mode(ESP_GPS_ENABLED);
} else {
current_board->set_esp_gps_mode(ESP_GPS_DISABLED);
}
if(hw_type != HW_TYPE_BLACK_PANDA){
// turn on GMLAN
set_gpio_output(GPIOB, 14, enable);

View File

@ -135,6 +135,12 @@ class Panda(object):
REQUEST_IN = usb1.ENDPOINT_IN | usb1.TYPE_VENDOR | usb1.RECIPIENT_DEVICE
REQUEST_OUT = usb1.ENDPOINT_OUT | usb1.TYPE_VENDOR | usb1.RECIPIENT_DEVICE
HW_TYPE_UNKNOWN = '\x00'
HW_TYPE_WHITE_PANDA = '\x01'
HW_TYPE_GREY_PANDA = '\x02'
HW_TYPE_BLACK_PANDA = '\x03'
HW_TYPE_PEDAL = '\x04'
def __init__(self, serial=None, claim=True):
self._serial = serial
self._handle = None
@ -363,11 +369,14 @@ class Panda(object):
def get_type(self):
return self._handle.controlRead(Panda.REQUEST_IN, 0xc1, 0, 0, 0x40)
def is_white(self):
return self.get_type() == Panda.HW_TYPE_WHITE_PANDA
def is_grey(self):
return self.get_type() == "\x02"
return self.get_type() == Panda.HW_TYPE_GREY_PANDA
def is_black(self):
return self.get_type() == "\x03"
return self.get_type() == Panda.HW_TYPE_BLACK_PANDA
def get_serial(self):
dat = self._handle.controlRead(Panda.REQUEST_IN, 0xd0, 0, 0, 0x20)
@ -470,6 +479,7 @@ class Panda(object):
break
except (usb1.USBErrorIO, usb1.USBErrorOverflow):
print("CAN: BAD RECV, RETRYING")
time.sleep(0.1)
return parse_can_buffer(dat)
def can_clear(self, bus):

View File

@ -1,15 +1,13 @@
import os
from panda import Panda
from helpers import panda_color_to_serial, test_white_and_grey
from helpers import panda_type_to_serial, test_white_and_grey, test_all_pandas, panda_connect_and_init
@test_white_and_grey
@panda_color_to_serial
def test_recover(serial=None):
p = Panda(serial=serial)
@test_all_pandas
@panda_connect_and_init
def test_recover(p):
assert p.recover(timeout=30)
@test_white_and_grey
@panda_color_to_serial
def test_flash(serial=None):
p = Panda(serial=serial)
@test_all_pandas
@panda_connect_and_init
def test_flash(p):
p.flash()

View File

@ -4,16 +4,11 @@ import sys
import time
from panda import Panda
from nose.tools import assert_equal, assert_less, assert_greater
from helpers import time_many_sends, connect_wo_esp, test_white_and_grey, panda_color_to_serial
SPEED_NORMAL = 500
SPEED_GMLAN = 33.3
@test_white_and_grey
@panda_color_to_serial
def test_can_loopback(serial=None):
p = connect_wo_esp(serial)
from helpers import SPEED_NORMAL, SPEED_GMLAN, time_many_sends, test_white_and_grey, panda_type_to_serial, test_all_pandas, panda_connect_and_init
@test_all_pandas
@panda_connect_and_init
def test_can_loopback(p):
# enable output mode
p.set_safety_mode(Panda.SAFETY_ALLOUTPUT)
@ -26,9 +21,6 @@ def test_can_loopback(serial=None):
busses = [0,1,2]
for bus in busses:
# send heartbeat
p.send_heartbeat()
# set bus 0 speed to 250
p.set_can_speed_kbps(bus, 250)
@ -47,17 +39,12 @@ def test_can_loopback(serial=None):
assert 0x1aa == sr[0][0] == lb[0][0]
assert "message" == sr[0][2] == lb[0][2]
@test_white_and_grey
@panda_color_to_serial
def test_safety_nooutput(serial=None):
p = connect_wo_esp(serial)
@test_all_pandas
@panda_connect_and_init
def test_safety_nooutput(p):
# enable output mode
p.set_safety_mode(Panda.SAFETY_NOOUTPUT)
# send heartbeat
p.send_heartbeat()
# enable CAN loopback mode
p.set_can_loopback(True)
@ -69,11 +56,9 @@ def test_safety_nooutput(serial=None):
r = p.can_recv()
assert len(r) == 0
@test_white_and_grey
@panda_color_to_serial
def test_reliability(serial=None):
p = connect_wo_esp(serial)
@test_all_pandas
@panda_connect_and_init
def test_reliability(p):
LOOP_COUNT = 100
MSG_COUNT = 100
@ -82,17 +67,11 @@ def test_reliability(serial=None):
p.set_can_loopback(True)
p.set_can_speed_kbps(0, 1000)
# send heartbeat
p.send_heartbeat()
addrs = range(100, 100+MSG_COUNT)
ts = [(j, 0, "\xaa"*8, 0) for j in addrs]
# 100 loops
for i in range(LOOP_COUNT):
# send heartbeat
p.send_heartbeat()
st = time.time()
p.can_send_many(ts)
@ -115,17 +94,12 @@ def test_reliability(serial=None):
sys.stdout.write("P")
sys.stdout.flush()
@test_white_and_grey
@panda_color_to_serial
def test_throughput(serial=None):
p = connect_wo_esp(serial)
@test_all_pandas
@panda_connect_and_init
def test_throughput(p):
# enable output mode
p.set_safety_mode(Panda.SAFETY_ALLOUTPUT)
# send heartbeat
p.send_heartbeat()
# enable CAN loopback mode
p.set_can_loopback(True)
@ -134,9 +108,6 @@ def test_throughput(serial=None):
p.set_can_speed_kbps(0, speed)
time.sleep(0.05)
# send heartbeat
p.send_heartbeat()
comp_kbps = time_many_sends(p, 0)
# bit count from https://en.wikipedia.org/wiki/CAN_bus
@ -147,19 +118,15 @@ def test_throughput(serial=None):
print("loopback 100 messages at speed %d, comp speed is %.2f, percent %.2f" % (speed, comp_kbps, saturation_pct))
@test_white_and_grey
@panda_color_to_serial
def test_gmlan(serial=None):
p = connect_wo_esp(serial)
@panda_type_to_serial
@panda_connect_and_init
def test_gmlan(p):
if p.legacy:
return
# enable output mode
p.set_safety_mode(Panda.SAFETY_ALLOUTPUT)
# send heartbeat
p.send_heartbeat()
# enable CAN loopback mode
p.set_can_loopback(True)
@ -169,9 +136,6 @@ def test_gmlan(serial=None):
# set gmlan on CAN2
for bus in [Panda.GMLAN_CAN2, Panda.GMLAN_CAN3, Panda.GMLAN_CAN2, Panda.GMLAN_CAN3]:
# send heartbeat
p.send_heartbeat()
p.set_gmlan(bus)
comp_kbps_gmlan = time_many_sends(p, 3)
assert_greater(comp_kbps_gmlan, 0.8 * SPEED_GMLAN)
@ -185,27 +149,20 @@ def test_gmlan(serial=None):
print("%d: %.2f kbps vs %.2f kbps" % (bus, comp_kbps_gmlan, comp_kbps_normal))
@test_white_and_grey
@panda_color_to_serial
def test_gmlan_bad_toggle(serial=None):
p = connect_wo_esp(serial)
@panda_type_to_serial
@panda_connect_and_init
def test_gmlan_bad_toggle(p):
if p.legacy:
return
# enable output mode
p.set_safety_mode(Panda.SAFETY_ALLOUTPUT)
# send heartbeat
p.send_heartbeat()
# enable CAN loopback mode
p.set_can_loopback(True)
# GMLAN_CAN2
for bus in [Panda.GMLAN_CAN2, Panda.GMLAN_CAN3]:
# send heartbeat
p.send_heartbeat()
p.set_gmlan(bus)
comp_kbps_gmlan = time_many_sends(p, 3)
assert_greater(comp_kbps_gmlan, 0.6 * SPEED_GMLAN)
@ -213,9 +170,6 @@ def test_gmlan_bad_toggle(serial=None):
# normal
for bus in [Panda.GMLAN_CAN2, Panda.GMLAN_CAN3]:
# send heartbeat
p.send_heartbeat()
p.set_gmlan(None)
comp_kbps_normal = time_many_sends(p, bus)
assert_greater(comp_kbps_normal, 0.6 * SPEED_NORMAL)
@ -223,10 +177,9 @@ def test_gmlan_bad_toggle(serial=None):
# this will fail if you have hardware serial connected
@test_white_and_grey
@panda_color_to_serial
def test_serial_debug(serial=None):
p = connect_wo_esp(serial)
@test_all_pandas
@panda_connect_and_init
def test_serial_debug(p):
junk = p.serial_read(Panda.SERIAL_DEBUG)
p.call_control_api(0xc0)
assert(p.serial_read(Panda.SERIAL_DEBUG).startswith("can "))

View File

@ -2,46 +2,44 @@ from __future__ import print_function
import os
import time
from panda import Panda
from helpers import connect_wifi, test_white, test_white_and_grey, panda_color_to_serial
from helpers import connect_wifi, test_white, test_all_pandas, panda_type_to_serial, panda_connect_and_init
import requests
@test_white_and_grey
@panda_color_to_serial
def test_get_serial(serial=None):
p = Panda(serial)
@test_all_pandas
@panda_connect_and_init
def test_get_serial(p):
print(p.get_serial())
@test_white_and_grey
@panda_color_to_serial
def test_get_serial_in_flash_mode(serial=None):
p = Panda(serial)
@test_all_pandas
@panda_connect_and_init
def test_get_serial_in_flash_mode(p):
p.reset(enter_bootstub=True)
assert(p.bootstub)
print(p.get_serial())
p.reset()
@test_white
@panda_color_to_serial
def test_connect_wifi(serial=None):
connect_wifi(serial)
@panda_type_to_serial
def test_connect_wifi(serials=None):
connect_wifi(serials[0])
@test_white
@panda_color_to_serial
def test_flash_wifi(serial=None):
connect_wifi(serial)
@panda_type_to_serial
def test_flash_wifi(serials=None):
connect_wifi(serials[0])
assert Panda.flash_ota_wifi(release=False), "OTA Wifi Flash Failed"
connect_wifi(serial)
connect_wifi(serials[0])
@test_white
@panda_color_to_serial
def test_wifi_flash_st(serial=None):
connect_wifi(serial)
@panda_type_to_serial
def test_wifi_flash_st(serials=None):
connect_wifi(serials[0])
assert Panda.flash_ota_st(), "OTA ST Flash Failed"
connected = False
st = time.time()
while not connected and (time.time() - st) < 20:
try:
p = Panda(serial=serial)
p = Panda(serial=serials[0])
p.get_serial()
connected = True
except:
@ -51,9 +49,9 @@ def test_wifi_flash_st(serial=None):
assert False, "Panda failed to connect on USB after flashing"
@test_white
@panda_color_to_serial
def test_webpage_fetch(serial=None):
connect_wifi(serial)
@panda_type_to_serial
def test_webpage_fetch(serials=None):
connect_wifi(serials[0])
r = requests.get("http://192.168.0.10/")
print(r.text)

View File

@ -1,38 +1,32 @@
from __future__ import print_function
import time
from panda import Panda
from helpers import time_many_sends, connect_wifi, test_white, panda_color_to_serial
from helpers import time_many_sends, connect_wifi, test_white, panda_type_to_serial
from nose.tools import timed, assert_equal, assert_less, assert_greater
@test_white
@panda_color_to_serial
def test_get_serial_wifi(serial=None):
connect_wifi(serial)
@panda_type_to_serial
def test_get_serial_wifi(serials=None):
connect_wifi(serials[0])
p = Panda("WIFI")
print(p.get_serial())
@test_white
@panda_color_to_serial
def test_throughput(serial=None):
connect_wifi(serial)
p = Panda(serial)
@panda_type_to_serial
def test_throughput(serials=None):
connect_wifi(serials[0])
p = Panda(serials[0])
# enable output mode
p.set_safety_mode(Panda.SAFETY_ALLOUTPUT)
# send heartbeat
p.send_heartbeat()
# enable CAN loopback mode
p.set_can_loopback(True)
p = Panda("WIFI")
for speed in [100,250,500,750,1000]:
# send heartbeat
p.send_heartbeat()
# set bus 0 speed to speed
p.set_can_speed_kbps(0, speed)
time.sleep(0.1)
@ -47,23 +41,17 @@ def test_throughput(serial=None):
print("WIFI loopback 100 messages at speed %d, comp speed is %.2f, percent %.2f" % (speed, comp_kbps, saturation_pct))
@test_white
@panda_color_to_serial
def test_recv_only(serial=None):
connect_wifi(serial)
p = Panda(serial)
@panda_type_to_serial
def test_recv_only(serials=None):
connect_wifi(serials[0])
p = Panda(serials[0])
p.set_safety_mode(Panda.SAFETY_ALLOUTPUT)
# send heartbeat
p.send_heartbeat()
p.set_can_loopback(True)
pwifi = Panda("WIFI")
# TODO: msg_count=1000 drops packets, is this fixable?
for msg_count in [10,100,200]:
# send heartbeat
p.send_heartbeat()
speed = 500
p.set_can_speed_kbps(0, speed)
comp_kbps = time_many_sends(p, 0, pwifi, msg_count)

View File

@ -1,16 +1,16 @@
from __future__ import print_function
import sys
import time
from helpers import time_many_sends, connect_wifi, test_white, panda_color_to_serial
from helpers import time_many_sends, connect_wifi, test_white, panda_type_to_serial
from panda import Panda, PandaWifiStreaming
from nose.tools import timed, assert_equal, assert_less, assert_greater
@test_white
@panda_color_to_serial
def test_udp_doesnt_drop(serial=None):
connect_wifi(serial)
@panda_type_to_serial
def test_udp_doesnt_drop(serials=None):
connect_wifi(serials[0])
p = Panda(serial)
p = Panda(serials[0])
p.set_safety_mode(Panda.SAFETY_ALLOUTPUT)
p.set_can_loopback(True)

View File

@ -1,21 +1,18 @@
from __future__ import print_function
import os
import time
import random
from panda import Panda
from nose.tools import assert_equal, assert_less, assert_greater
from helpers import time_many_sends, test_two_panda, panda_color_to_serial
from helpers import time_many_sends, test_two_panda, test_two_black_panda, panda_type_to_serial, clear_can_buffers, panda_connect_and_init
@test_two_panda
@panda_color_to_serial
def test_send_recv(serial_sender=None, serial_reciever=None):
p_send = Panda(serial_sender)
p_recv = Panda(serial_reciever)
@panda_type_to_serial
@panda_connect_and_init
def test_send_recv(p_send, p_recv):
p_send.set_safety_mode(Panda.SAFETY_ALLOUTPUT)
p_recv.set_safety_mode(Panda.SAFETY_ALLOUTPUT)
p_send.set_can_loopback(False)
# send heartbeat
p_send.send_heartbeat()
p_recv.set_can_loopback(False)
assert not p_send.legacy
@ -30,9 +27,6 @@ def test_send_recv(serial_sender=None, serial_reciever=None):
for bus in busses:
for speed in [100, 250, 500, 750, 1000]:
# send heartbeat
p_send.send_heartbeat()
p_send.set_can_speed_kbps(bus, speed)
p_recv.set_can_speed_kbps(bus, speed)
time.sleep(0.05)
@ -46,18 +40,12 @@ def test_send_recv(serial_sender=None, serial_reciever=None):
print("two pandas bus {}, 100 messages at speed {:4d}, comp speed is {:7.2f}, percent {:6.2f}".format(bus, speed, comp_kbps, saturation_pct))
@test_two_panda
@panda_color_to_serial
def test_latency(serial_sender=None, serial_reciever=None):
p_send = Panda(serial_sender)
p_recv = Panda(serial_reciever)
# send heartbeat
p_send.send_heartbeat()
p_recv.send_heartbeat()
@panda_type_to_serial
@panda_connect_and_init
def test_latency(p_send, p_recv):
p_send.set_safety_mode(Panda.SAFETY_ALLOUTPUT)
p_recv.set_safety_mode(Panda.SAFETY_ALLOUTPUT)
p_send.set_can_loopback(False)
p_recv.set_can_loopback(False)
assert not p_send.legacy
@ -72,29 +60,17 @@ def test_latency(serial_sender=None, serial_reciever=None):
p_recv.can_recv()
p_send.can_recv()
# send heartbeat
p_send.send_heartbeat()
p_recv.send_heartbeat()
busses = [0,1,2]
for bus in busses:
for speed in [100, 250, 500, 750, 1000]:
# send heartbeat
p_send.send_heartbeat()
p_recv.send_heartbeat()
p_send.set_can_speed_kbps(bus, speed)
p_recv.set_can_speed_kbps(bus, speed)
time.sleep(0.1)
#clear can buffers
r = [1]
while len(r) > 0:
r = p_send.can_recv()
r = [1]
while len(r) > 0:
r = p_recv.can_recv()
time.sleep(0.05)
clear_can_buffers(p_send)
clear_can_buffers(p_recv)
latencies = []
comp_kbps_list = []
@ -137,3 +113,83 @@ def test_latency(serial_sender=None, serial_reciever=None):
print("two pandas bus {}, {} message average at speed {:4d}, latency is {:5.3f}ms, comp speed is {:7.2f}, percent {:6.2f}"\
.format(bus, num_messages, speed, average_latency, average_comp_kbps, average_saturation_pct))
@test_two_black_panda
@panda_type_to_serial
@panda_connect_and_init
def test_black_loopback(panda0, panda1):
# disable safety modes
panda0.set_safety_mode(Panda.SAFETY_ALLOUTPUT)
panda1.set_safety_mode(Panda.SAFETY_ALLOUTPUT)
# disable loopback
panda0.set_can_loopback(False)
panda1.set_can_loopback(False)
# clear stuff
panda0.can_send_many([(0x1ba, 0, "testmsg", 0)]*10)
time.sleep(0.05)
panda0.can_recv()
panda1.can_recv()
# test array (send bus, sender obd, reciever obd, expected busses)
test_array = [
(0, False, False, [0]),
(1, False, False, [1]),
(2, False, False, [2]),
(0, False, True, [0, 1]),
(1, False, True, []),
(2, False, True, [2]),
(0, True, False, [0]),
(1, True, False, [0]),
(2, True, False, [2]),
(0, True, True, [0, 1]),
(1, True, True, [0, 1]),
(2, True, True, [2])
]
# test functions
def get_test_string():
return b"test"+os.urandom(10)
def _test_buses(send_panda, recv_panda, _test_array):
for send_bus, send_obd, recv_obd, recv_buses in _test_array:
print("\nSend bus:", send_bus, " Send OBD:", send_obd, " Recv OBD:", recv_obd)
# set OBD on pandas
send_panda.set_gmlan(True if send_obd else None)
recv_panda.set_gmlan(True if recv_obd else None)
# clear buffers
clear_can_buffers(send_panda)
clear_can_buffers(recv_panda)
# send the characters
at = random.randint(1, 2000)
st = get_test_string()[0:8]
send_panda.can_send(at, st, send_bus)
time.sleep(0.1)
# check for receive
cans_echo = send_panda.can_recv()
cans_loop = recv_panda.can_recv()
loop_buses = []
for loop in cans_loop:
print(" Loop on bus", str(loop[3]))
loop_buses.append(loop[3])
if len(cans_loop) == 0:
print(" No loop")
# test loop buses
recv_buses.sort()
loop_buses.sort()
assert recv_buses == loop_buses
print(" TEST PASSED")
print("\n")
# test both orientations
print("***************** TESTING (0 --> 1) *****************")
_test_buses(panda0, panda1, test_array)
print("***************** TESTING (1 --> 0) *****************")
_test_buses(panda1, panda0, test_array)

View File

@ -4,43 +4,41 @@ import time
import random
import subprocess
import requests
import thread
from functools import wraps
from panda import Panda
from nose.tools import timed, assert_equal, assert_less, assert_greater
from parameterized import parameterized, param
test_white_and_grey = parameterized([param(panda_color="White"),
param(panda_color="Grey")])
test_white = parameterized([param(panda_color="White")])
test_grey = parameterized([param(panda_color="Grey")])
test_two_panda = parameterized([param(panda_color=["Grey", "White"]),
param(panda_color=["White", "Grey"])])
SPEED_NORMAL = 500
SPEED_GMLAN = 33.3
_serials = {}
def get_panda_serial(is_grey=None):
global _serials
if is_grey not in _serials:
for serial in Panda.list():
p = Panda(serial=serial)
if is_grey is None or p.is_grey() == is_grey:
_serials[is_grey] = serial
return serial
raise IOError("Panda not found. is_grey: {}".format(is_grey))
else:
return _serials[is_grey]
def connect_wo_esp(serial=None):
# connect to the panda
p = Panda(serial=serial)
# power down the ESP
p.set_esp_power(False)
# clear old junk
while len(p.can_recv()) > 0:
pass
return p
test_all_types = parameterized([
param(panda_type=Panda.HW_TYPE_WHITE_PANDA),
param(panda_type=Panda.HW_TYPE_GREY_PANDA),
param(panda_type=Panda.HW_TYPE_BLACK_PANDA)
])
test_all_pandas = parameterized(
Panda.list()
)
test_white_and_grey = parameterized([
param(panda_type=Panda.HW_TYPE_WHITE_PANDA),
param(panda_type=Panda.HW_TYPE_GREY_PANDA)
])
test_white = parameterized([
param(panda_type=Panda.HW_TYPE_WHITE_PANDA)
])
test_grey = parameterized([
param(panda_type=Panda.HW_TYPE_GREY_PANDA)
])
test_two_panda = parameterized([
param(panda_type=[Panda.HW_TYPE_GREY_PANDA, Panda.HW_TYPE_WHITE_PANDA]),
param(panda_type=[Panda.HW_TYPE_WHITE_PANDA, Panda.HW_TYPE_GREY_PANDA]),
param(panda_type=[Panda.HW_TYPE_BLACK_PANDA, Panda.HW_TYPE_BLACK_PANDA])
])
test_two_black_panda = parameterized([
param(panda_type=[Panda.HW_TYPE_BLACK_PANDA, Panda.HW_TYPE_BLACK_PANDA])
])
def connect_wifi(serial=None):
p = Panda(serial=serial)
@ -170,23 +168,93 @@ def time_many_sends(p, bus, precv=None, msg_count=100, msg_id=None, two_pandas=F
return comp_kbps
def panda_color_to_serial(fn):
_panda_serials = None
def panda_type_to_serial(fn):
@wraps(fn)
def wrapper(panda_color=None, **kwargs):
pandas_is_grey = []
if panda_color is not None:
if not isinstance(panda_color, list):
panda_color = [panda_color]
panda_color = [s.lower() for s in panda_color]
for p in panda_color:
if p is None:
pandas_is_grey.append(None)
elif p in ["grey", "gray"]:
pandas_is_grey.append(True)
elif p in ["white"]:
pandas_is_grey.append(False)
else:
raise ValueError("Invalid Panda Color {}".format(p))
return fn(*[get_panda_serial(is_grey) for is_grey in pandas_is_grey], **kwargs)
def wrapper(panda_type=None, **kwargs):
# Change panda_types to a list
if panda_type is not None:
if not isinstance(panda_type, list):
panda_type = [panda_type]
# If not done already, get panda serials and their type
global _panda_serials
if _panda_serials == None:
_panda_serials = []
for serial in Panda.list():
p = Panda(serial=serial)
_panda_serials.append((serial, p.get_type()))
p.close()
# Find a panda with the correct types and add the corresponding serial
serials = []
for p_type in panda_type:
found = False
for serial, pt in _panda_serials:
# Never take the same panda twice
if (pt == p_type) and (serial not in serials):
serials.append(serial)
found = True
break
if not found:
raise IOError("No unused panda found for type: {}".format(p_type))
return fn(serials, **kwargs)
return wrapper
def heartbeat_thread(p):
while True:
try:
p.send_heartbeat()
time.sleep(1)
except:
break
def panda_connect_and_init(fn):
@wraps(fn)
def wrapper(panda_serials=None, **kwargs):
# Change panda_serials to a list
if panda_serials is not None:
if not isinstance(panda_serials, list):
panda_serials = [panda_serials]
# Connect to pandas
pandas = []
for panda_serial in panda_serials:
pandas.append(Panda(serial=panda_serial))
# Initialize pandas
for panda in pandas:
panda.set_can_loopback(False)
panda.set_gmlan(None)
panda.set_esp_power(False)
for bus, speed in [(0, SPEED_NORMAL), (1, SPEED_NORMAL), (2, SPEED_NORMAL), (3, SPEED_GMLAN)]:
panda.set_can_speed_kbps(bus, speed)
clear_can_buffers(panda)
thread.start_new_thread(heartbeat_thread, (panda,))
# Run test function
ret = fn(*pandas, **kwargs)
# Close all connections
for panda in pandas:
panda.close()
# Return test function result
return ret
return wrapper
def clear_can_buffers(panda):
# clear tx buffers
for i in range(4):
panda.can_clear(i)
# clear rx buffers
panda.can_clear(0xFFFF)
r = [1]
st = time.time()
while len(r) > 0:
r = panda.can_recv()
time.sleep(0.05)
if (time.time() - st) > 10:
print("Unable to clear can buffers for panda ", panda.get_serial())
assert False

View File

@ -1,6 +1,6 @@
#!/usr/bin/env python
# Loopback test between black panda (+ harness and power) and white/grey panda
# Loopback test between two black pandas (+ harness and power)
# Tests all buses, including OBD CAN, which is on the same bus as CAN0 in this test.
# To be sure, the test should be run with both harness orientations
@ -33,81 +33,70 @@ def run_test(sleep_duration):
pandas[0] = Panda(pandas[0])
pandas[1] = Panda(pandas[1])
# find out which one is black
# find out the hardware types
type0 = pandas[0].get_type()
type1 = pandas[1].get_type()
black_panda = None
other_panda = None
if type0 == "\x03" and type1 != "\x03":
black_panda = pandas[0]
other_panda = pandas[1]
elif type0 != "\x03" and type1 == "\x03":
black_panda = pandas[1]
other_panda = pandas[0]
else:
print("Connect white/grey and black panda to run this test!")
if type0 != "\x03" or type1 != "\x03":
print("Connect two black pandas to run this test!")
assert False
# disable safety modes
black_panda.set_safety_mode(Panda.SAFETY_ALLOUTPUT)
other_panda.set_safety_mode(Panda.SAFETY_ALLOUTPUT)
for panda in pandas:
# disable safety modes
panda.set_safety_mode(Panda.SAFETY_ALLOUTPUT)
# test health packet
print("black panda health", black_panda.health())
print("other panda health", other_panda.health())
# test health packet
print("panda health", panda.health())
# test black -> other
test_buses(black_panda, other_panda, True, [(0, False, [0]), (1, False, [1]), (2, False, [2]), (1, True, [0])], sleep_duration)
test_buses(black_panda, other_panda, False, [(0, False, [0]), (1, False, [1]), (2, False, [2]), (0, True, [0, 1])], sleep_duration)
# setup test array (send bus, sender obd, reciever obd, expected busses)
test_array = [
(0, False, False, [0]),
(1, False, False, [1]),
(2, False, False, [2]),
(0, False, True, [0, 1]),
(1, False, True, []),
(2, False, True, [2]),
(0, True, False, [0]),
(1, True, False, [0]),
(2, True, False, [2]),
(0, True, True, [0, 1]),
(1, True, True, [0, 1]),
(2, True, True, [2])
]
# test both orientations
print("***************** TESTING (0 --> 1) *****************")
test_buses(pandas[0], pandas[1], test_array, sleep_duration)
print("***************** TESTING (1 --> 0) *****************")
test_buses(pandas[1], pandas[0], test_array, sleep_duration)
def test_buses(black_panda, other_panda, direction, test_array, sleep_duration):
if direction:
print("***************** TESTING (BLACK --> OTHER) *****************")
else:
print("***************** TESTING (OTHER --> BLACK) *****************")
for send_bus, obd, recv_buses in test_array:
black_panda.send_heartbeat()
other_panda.send_heartbeat()
print("\ntest can: ", send_bus, " OBD: ", obd)
def test_buses(send_panda, recv_panda, test_array, sleep_duration):
for send_bus, send_obd, recv_obd, recv_buses in test_array:
send_panda.send_heartbeat()
recv_panda.send_heartbeat()
print("\nSend bus:", send_bus, " Send OBD:", send_obd, " Recv OBD:", recv_obd)
# set OBD on black panda
black_panda.set_gmlan(True if obd else None)
# set OBD on pandas
send_panda.set_gmlan(True if send_obd else None)
recv_panda.set_gmlan(True if recv_obd else None)
# clear and flush
if direction:
black_panda.can_clear(send_bus)
else:
other_panda.can_clear(send_bus)
send_panda.can_clear(send_bus)
for recv_bus in recv_buses:
if direction:
other_panda.can_clear(recv_bus)
else:
black_panda.can_clear(recv_bus)
black_panda.can_recv()
other_panda.can_recv()
recv_panda.can_clear(recv_bus)
send_panda.can_recv()
recv_panda.can_recv()
# send the characters
at = random.randint(1, 2000)
st = get_test_string()[0:8]
if direction:
black_panda.can_send(at, st, send_bus)
else:
other_panda.can_send(at, st, send_bus)
send_panda.can_send(at, st, send_bus)
time.sleep(0.1)
# check for receive
if direction:
cans_echo = black_panda.can_recv()
cans_loop = other_panda.can_recv()
else:
cans_echo = other_panda.can_recv()
cans_loop = black_panda.can_recv()
cans_echo = send_panda.can_recv()
cans_loop = recv_panda.can_recv()
loop_buses = []
for loop in cans_loop:

View File

@ -0,0 +1,169 @@
#!/usr/bin/env python
# Loopback test between black panda (+ harness and power) and white/grey panda
# Tests all buses, including OBD CAN, which is on the same bus as CAN0 in this test.
# To be sure, the test should be run with both harness orientations
from __future__ import print_function
import os
import sys
import time
import random
import argparse
from hexdump import hexdump
from itertools import permutations
sys.path.append(os.path.join(os.path.dirname(os.path.realpath(__file__)), ".."))
from panda import Panda
def get_test_string():
return b"test"+os.urandom(10)
counter = 0
nonzero_bus_errors = 0
zero_bus_errors = 0
content_errors = 0
def run_test(sleep_duration):
global counter, nonzero_bus_errors, zero_bus_errors, content_errors
pandas = Panda.list()
print(pandas)
# make sure two pandas are connected
if len(pandas) != 2:
print("Connect white/grey and black panda to run this test!")
assert False
# connect
pandas[0] = Panda(pandas[0])
pandas[1] = Panda(pandas[1])
# find out which one is black
type0 = pandas[0].get_type()
type1 = pandas[1].get_type()
black_panda = None
other_panda = None
if type0 == "\x03" and type1 != "\x03":
black_panda = pandas[0]
other_panda = pandas[1]
elif type0 != "\x03" and type1 == "\x03":
black_panda = pandas[1]
other_panda = pandas[0]
else:
print("Connect white/grey and black panda to run this test!")
assert False
# disable safety modes
black_panda.set_safety_mode(Panda.SAFETY_ALLOUTPUT)
other_panda.set_safety_mode(Panda.SAFETY_ALLOUTPUT)
# test health packet
print("black panda health", black_panda.health())
print("other panda health", other_panda.health())
# test black -> other
while True:
test_buses(black_panda, other_panda, True, [(0, False, [0]), (1, False, [1]), (2, False, [2]), (1, True, [0])], sleep_duration)
test_buses(black_panda, other_panda, False, [(0, False, [0]), (1, False, [1]), (2, False, [2]), (0, True, [0, 1])], sleep_duration)
counter += 1
print("Number of cycles:", counter, "Non-zero bus errors:", nonzero_bus_errors, "Zero bus errors:", zero_bus_errors, "Content errors:", content_errors)
# Toggle relay
black_panda.set_safety_mode(Panda.SAFETY_NOOUTPUT)
time.sleep(1)
black_panda.set_safety_mode(Panda.SAFETY_ALLOUTPUT)
time.sleep(1)
def test_buses(black_panda, other_panda, direction, test_array, sleep_duration):
global nonzero_bus_errors, zero_bus_errors, content_errors
if direction:
print("***************** TESTING (BLACK --> OTHER) *****************")
else:
print("***************** TESTING (OTHER --> BLACK) *****************")
for send_bus, obd, recv_buses in test_array:
black_panda.send_heartbeat()
other_panda.send_heartbeat()
print("\ntest can: ", send_bus, " OBD: ", obd)
# set OBD on black panda
black_panda.set_gmlan(True if obd else None)
# clear and flush
if direction:
black_panda.can_clear(send_bus)
else:
other_panda.can_clear(send_bus)
for recv_bus in recv_buses:
if direction:
other_panda.can_clear(recv_bus)
else:
black_panda.can_clear(recv_bus)
black_panda.can_recv()
other_panda.can_recv()
# send the characters
at = random.randint(1, 2000)
st = get_test_string()[0:8]
if direction:
black_panda.can_send(at, st, send_bus)
else:
other_panda.can_send(at, st, send_bus)
time.sleep(0.1)
# check for receive
if direction:
cans_echo = black_panda.can_recv()
cans_loop = other_panda.can_recv()
else:
cans_echo = other_panda.can_recv()
cans_loop = black_panda.can_recv()
loop_buses = []
for loop in cans_loop:
if (loop[0] != at) or (loop[2] != st):
content_errors += 1
print(" Loop on bus", str(loop[3]))
loop_buses.append(loop[3])
if len(cans_loop) == 0:
print(" No loop")
if not os.getenv("NOASSERT"):
assert False
# test loop buses
recv_buses.sort()
loop_buses.sort()
if(recv_buses != loop_buses):
if len(loop_buses) == 0:
zero_bus_errors += 1
else:
nonzero_bus_errors += 1
if not os.getenv("NOASSERT"):
assert False
else:
print(" TEST PASSED")
time.sleep(sleep_duration)
print("\n")
if __name__ == "__main__":
parser = argparse.ArgumentParser()
parser.add_argument("-n", type=int, help="Number of test iterations to run")
parser.add_argument("-sleep", type=int, help="Sleep time between tests", default=0)
args = parser.parse_args()
if args.n is None:
while True:
run_test(sleep_duration=args.sleep)
else:
for i in range(args.n):
run_test(sleep_duration=args.sleep)

View File

@ -0,0 +1,175 @@
#!/usr/bin/env python
# Loopback test between black panda (+ harness and power) and white/grey panda
# Tests all buses, including OBD CAN, which is on the same bus as CAN0 in this test.
# To be sure, the test should be run with both harness orientations
from __future__ import print_function
import os
import sys
import time
import random
import argparse
from hexdump import hexdump
from itertools import permutations
sys.path.append(os.path.join(os.path.dirname(os.path.realpath(__file__)), ".."))
from panda import Panda
def get_test_string():
return b"test"+os.urandom(10)
counter = 0
nonzero_bus_errors = 0
zero_bus_errors = 0
content_errors = 0
def run_test(sleep_duration):
global counter, nonzero_bus_errors, zero_bus_errors, content_errors
pandas = Panda.list()
print(pandas)
# make sure two pandas are connected
if len(pandas) != 2:
print("Connect white/grey and black panda to run this test!")
assert False
# connect
pandas[0] = Panda(pandas[0])
pandas[1] = Panda(pandas[1])
# find out which one is black
type0 = pandas[0].get_type()
type1 = pandas[1].get_type()
black_panda = None
other_panda = None
if type0 == "\x03" and type1 != "\x03":
black_panda = pandas[0]
other_panda = pandas[1]
elif type0 != "\x03" and type1 == "\x03":
black_panda = pandas[1]
other_panda = pandas[0]
else:
print("Connect white/grey and black panda to run this test!")
assert False
# disable safety modes
black_panda.set_safety_mode(Panda.SAFETY_ALLOUTPUT)
other_panda.set_safety_mode(Panda.SAFETY_ALLOUTPUT)
# test health packet
print("black panda health", black_panda.health())
print("other panda health", other_panda.health())
# test black -> other
start_time = time.time()
temp_start_time = start_time
while True:
test_buses(black_panda, other_panda, True, [(0, False, [0]), (1, False, [1]), (2, False, [2]), (1, True, [0])], sleep_duration)
test_buses(black_panda, other_panda, False, [(0, False, [0]), (1, False, [1]), (2, False, [2]), (0, True, [0, 1])], sleep_duration)
counter += 1
runtime = time.time() - start_time
print("Number of cycles:", counter, "Non-zero bus errors:", nonzero_bus_errors, "Zero bus errors:", zero_bus_errors, "Content errors:", content_errors, "Runtime: ", runtime)
if (time.time() - temp_start_time) > 3600*6:
# Toggle relay
black_panda.set_safety_mode(Panda.SAFETY_NOOUTPUT)
time.sleep(1)
black_panda.set_safety_mode(Panda.SAFETY_ALLOUTPUT)
time.sleep(1)
temp_start_time = time.time()
def test_buses(black_panda, other_panda, direction, test_array, sleep_duration):
global nonzero_bus_errors, zero_bus_errors, content_errors
if direction:
print("***************** TESTING (BLACK --> OTHER) *****************")
else:
print("***************** TESTING (OTHER --> BLACK) *****************")
for send_bus, obd, recv_buses in test_array:
black_panda.send_heartbeat()
other_panda.send_heartbeat()
print("\ntest can: ", send_bus, " OBD: ", obd)
# set OBD on black panda
black_panda.set_gmlan(True if obd else None)
# clear and flush
if direction:
black_panda.can_clear(send_bus)
else:
other_panda.can_clear(send_bus)
for recv_bus in recv_buses:
if direction:
other_panda.can_clear(recv_bus)
else:
black_panda.can_clear(recv_bus)
black_panda.can_recv()
other_panda.can_recv()
# send the characters
at = random.randint(1, 2000)
st = get_test_string()[0:8]
if direction:
black_panda.can_send(at, st, send_bus)
else:
other_panda.can_send(at, st, send_bus)
time.sleep(0.1)
# check for receive
if direction:
cans_echo = black_panda.can_recv()
cans_loop = other_panda.can_recv()
else:
cans_echo = other_panda.can_recv()
cans_loop = black_panda.can_recv()
loop_buses = []
for loop in cans_loop:
if (loop[0] != at) or (loop[2] != st):
content_errors += 1
print(" Loop on bus", str(loop[3]))
loop_buses.append(loop[3])
if len(cans_loop) == 0:
print(" No loop")
if not os.getenv("NOASSERT"):
assert False
# test loop buses
recv_buses.sort()
loop_buses.sort()
if(recv_buses != loop_buses):
if len(loop_buses) == 0:
zero_bus_errors += 1
else:
nonzero_bus_errors += 1
if not os.getenv("NOASSERT"):
assert False
else:
print(" TEST PASSED")
time.sleep(sleep_duration)
print("\n")
if __name__ == "__main__":
parser = argparse.ArgumentParser()
parser.add_argument("-n", type=int, help="Number of test iterations to run")
parser.add_argument("-sleep", type=int, help="Sleep time between tests", default=0)
args = parser.parse_args()
if args.n is None:
while True:
run_test(sleep_duration=args.sleep)
else:
for i in range(args.n):
run_test(sleep_duration=args.sleep)

View File

@ -0,0 +1,145 @@
#!/usr/bin/env python
# Relay test with loopback between black panda (+ harness and power) and white/grey panda
# Tests the relay switching multiple times / second by looking at the buses on which loop occurs.
from __future__ import print_function
import os
import sys
import time
import random
import argparse
from hexdump import hexdump
from itertools import permutations
sys.path.append(os.path.join(os.path.dirname(os.path.realpath(__file__)), ".."))
from panda import Panda
def get_test_string():
return b"test"+os.urandom(10)
counter = 0
open_errors = 0
closed_errors = 0
content_errors = 0
def run_test(sleep_duration):
global counter, open_errors, closed_errors, content_errors
pandas = Panda.list()
#pandas = ["540046000c51363338383037", "07801b800f51363038363036"]
print(pandas)
# make sure two pandas are connected
if len(pandas) != 2:
print("Connect white/grey and black panda to run this test!")
assert False
# connect
pandas[0] = Panda(pandas[0])
pandas[1] = Panda(pandas[1])
# find out which one is black
type0 = pandas[0].get_type()
type1 = pandas[1].get_type()
black_panda = None
other_panda = None
if type0 == "\x03" and type1 != "\x03":
black_panda = pandas[0]
other_panda = pandas[1]
elif type0 != "\x03" and type1 == "\x03":
black_panda = pandas[1]
other_panda = pandas[0]
else:
print("Connect white/grey and black panda to run this test!")
assert False
# disable safety modes
black_panda.set_safety_mode(Panda.SAFETY_ALLOUTPUT)
other_panda.set_safety_mode(Panda.SAFETY_ALLOUTPUT)
# test health packet
print("black panda health", black_panda.health())
print("other panda health", other_panda.health())
# test black -> other
while True:
# Switch on relay
black_panda.set_safety_mode(Panda.SAFETY_ALLOUTPUT)
time.sleep(0.05)
if not test_buses(black_panda, other_panda, (0, False, [0])):
open_errors += 1
print("Open error")
assert False
# Switch off relay
black_panda.set_safety_mode(Panda.SAFETY_NOOUTPUT)
time.sleep(0.05)
if not test_buses(black_panda, other_panda, (0, False, [0, 2])):
closed_errors += 1
print("Close error")
assert False
counter += 1
print("Number of cycles:", counter, "Open errors:", open_errors, "Closed errors:", closed_errors, "Content errors:", content_errors)
def test_buses(black_panda, other_panda, test_obj):
global content_errors
send_bus, obd, recv_buses = test_obj
black_panda.send_heartbeat()
other_panda.send_heartbeat()
# Set OBD on send panda
other_panda.set_gmlan(True if obd else None)
# clear and flush
other_panda.can_clear(send_bus)
for recv_bus in recv_buses:
black_panda.can_clear(recv_bus)
black_panda.can_recv()
other_panda.can_recv()
# send the characters
at = random.randint(1, 2000)
st = get_test_string()[0:8]
other_panda.can_send(at, st, send_bus)
time.sleep(0.05)
# check for receive
cans_echo = other_panda.can_recv()
cans_loop = black_panda.can_recv()
loop_buses = []
for loop in cans_loop:
if (loop[0] != at) or (loop[2] != st):
content_errors += 1
loop_buses.append(loop[3])
# test loop buses
recv_buses.sort()
loop_buses.sort()
if(recv_buses != loop_buses):
return False
else:
return True
if __name__ == "__main__":
parser = argparse.ArgumentParser()
parser.add_argument("-n", type=int, help="Number of test iterations to run")
parser.add_argument("-sleep", type=int, help="Sleep time between tests", default=0)
args = parser.parse_args()
if args.n is None:
while True:
run_test(sleep_duration=args.sleep)
else:
for i in range(args.n):
run_test(sleep_duration=args.sleep)

View File

@ -12,33 +12,38 @@ setcolor = ["\033[1;32;40m", "\033[1;31;40m"]
unsetcolor = "\033[00m"
if __name__ == "__main__":
port_number = int(os.getenv("PORT", 0))
claim = os.getenv("CLAIM") is not None
serials = Panda.list()
if os.getenv("SERIAL"):
serials = filter(lambda x: x==os.getenv("SERIAL"), serials)
pandas = list(map(lambda x: Panda(x, claim=claim), serials))
if not len(pandas):
sys.exit("no pandas found")
if os.getenv("BAUD") is not None:
for panda in pandas:
panda.set_uart_baud(port_number, int(os.getenv("BAUD")))
while True:
for i, panda in enumerate(pandas):
try:
port_number = int(os.getenv("PORT", 0))
claim = os.getenv("CLAIM") is not None
serials = Panda.list()
if os.getenv("SERIAL"):
serials = filter(lambda x: x==os.getenv("SERIAL"), serials)
pandas = list(map(lambda x: Panda(x, claim=claim), serials))
if not len(pandas):
sys.exit("no pandas found")
if os.getenv("BAUD") is not None:
for panda in pandas:
panda.set_uart_baud(port_number, int(os.getenv("BAUD")))
while True:
ret = panda.serial_read(port_number)
if len(ret) > 0:
sys.stdout.write(setcolor[i] + str(ret) + unsetcolor)
sys.stdout.flush()
else:
break
if select.select([sys.stdin], [], [], 0) == ([sys.stdin], [], []):
ln = sys.stdin.readline()
if claim:
panda.serial_write(port_number, ln)
time.sleep(0.01)
for i, panda in enumerate(pandas):
while True:
ret = panda.serial_read(port_number)
if len(ret) > 0:
sys.stdout.write(setcolor[i] + str(ret) + unsetcolor)
sys.stdout.flush()
else:
break
if select.select([sys.stdin], [], [], 0) == ([sys.stdin], [], []):
ln = sys.stdin.readline()
if claim:
panda.serial_write(port_number, ln)
time.sleep(0.01)
except:
print("panda disconnected!")
time.sleep(0.5);

View File

@ -0,0 +1,19 @@
#!/usr/bin/env python
import time
from panda import Panda
if __name__ == "__main__":
panda_serials = Panda.list()
pandas = []
for ps in panda_serials:
pandas.append(Panda(serial=ps))
if len(pandas) == 0:
print("No pandas connected")
assert False
while True:
for panda in pandas:
print(panda.health())
print("\n")
time.sleep(0.5)