* Improve panda automated testing

 * begin testing both white and grey panda
 * Improve wifi reliability

* First commit for docker

* Fix usb devices not showing up on reconnect

* Add tests for two pandas, latency and throughput

* Add Jenkinsfile

* Allow flashing release on wifi

* Fix Jenkins stuck in DFU mode and docker container running

* Add pandaextra from xx to docker

* Need more time for ST to restart, sometimes

* Add xml output to tests

* Try making wifi more reliable

* Fix infinite loop in ping

* Check connected after flash

* Cleanup two panda tests

* Try fixing failing test with check after udp

* Try to fix with sleep

* Temporarily run just 5 and 6

* Cleanup

* Desperate times call for desperate measurse

* BUGFIX: power saving when rx only

* Fix failing when white panda is first after udp

* Test both Dev and EON build

* Jenkins test results for both builds

* Better test case naming
master
Nigel Armstrong 2019-04-09 14:09:18 -07:00 committed by GitHub
parent f383eee968
commit 1d2f8f0abf
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
14 changed files with 515 additions and 77 deletions

3
.dockerignore 100644
View File

@ -0,0 +1,3 @@
.git
.DS_Store
boardesp/esp-open-sdk

64
Dockerfile 100644
View File

@ -0,0 +1,64 @@
FROM ubuntu:16.04
ENV PYTHONUNBUFFERED 1
RUN apt-get update && apt-get install -y \
autoconf \
automake \
bash \
bison \
bzip2 \
curl \
dfu-util \
flex \
g++ \
gawk \
gcc \
git \
gperf \
help2man \
iputils-ping \
libexpat-dev \
libstdc++-arm-none-eabi-newlib \
libtool \
libtool-bin \
libusb-1.0-0 \
make \
ncurses-dev \
network-manager \
python-dev \
python-serial \
sed \
texinfo \
unrar-free \
unzip \
wget \
build-essential \
python-dev \
python-pip \
screen \
vim \
wget \
wireless-tools
RUN pip install --upgrade pip==18.0
COPY requirements.txt /tmp/
RUN pip install -r /tmp/requirements.txt
RUN mkdir -p /home/batman
ENV HOME /home/batman
ENV PYTHONPATH /tmp:$PYTHONPATH
COPY ./boardesp/get_sdk_ci.sh /tmp/panda/boardesp/
RUN useradd --system -s /sbin/nologin pandauser
RUN mkdir -p /tmp/panda/boardesp/esp-open-sdk
RUN chown pandauser /tmp/panda/boardesp/esp-open-sdk
USER pandauser
RUN cd /tmp/panda/boardesp && ./get_sdk_ci.sh
USER root
COPY ./xx/pandaextra /tmp/pandaextra
ADD ./panda.tar.gz /tmp/panda

55
Jenkinsfile vendored 100644
View File

@ -0,0 +1,55 @@
pipeline {
agent any
environment {
AUTHOR = """${sh(
returnStdout: true,
script: "git --no-pager show -s --format='%an' ${GIT_COMMIT}"
).trim()}"""
DOCKER_IMAGE_TAG = "panda:build-${env.BUILD_ID}"
}
stages {
stage('Build Docker Image') {
steps {
timeout(time: 60, unit: 'MINUTES') {
script {
sh 'git clone --no-checkout --depth 1 git@github.com:commaai/xx.git || true'
sh 'cd xx && git fetch origin && git checkout origin/master -- pandaextra && cd ..' // Needed for certs for panda flashing
sh 'git archive -v -o panda.tar.gz --format=tar.gz HEAD'
dockerImage = docker.build("${env.DOCKER_IMAGE_TAG}")
}
}
}
}
stage('Test Dev Build') {
steps {
lock(resource: "Pandas", inversePrecedence: true, quantity:1){
timeout(time: 60, unit: 'MINUTES') {
sh "docker stop panda-test || true && docker rm panda-test || true"
sh "docker run --name panda-test --privileged --volume /dev/bus/usb:/dev/bus/usb --volume /var/run/dbus:/var/run/dbus --net host ${env.DOCKER_IMAGE_TAG} bash -c 'cd /tmp/panda; ./run_automated_tests.sh '"
}
}
}
}
stage('Test EON Build') {
steps {
lock(resource: "Pandas", inversePrecedence: true, quantity:1){
timeout(time: 60, unit: 'MINUTES') {
sh "docker cp panda-test:/tmp/panda/nosetests.xml test_results_dev.xml"
sh "touch EON && docker cp EON panda-test:/EON"
sh "docker start -a panda-test"
}
}
}
}
}
post {
always {
script {
sh "docker cp panda-test:/tmp/panda/nosetests.xml test_results_EON.xml"
sh "docker rm panda-test"
}
junit "test_results*.xml"
}
}
}

View File

@ -0,0 +1,5 @@
#!/bin/bash
git clone --recursive https://github.com/pfalcon/esp-open-sdk.git
cd esp-open-sdk
git checkout 03f5e898a059451ec5f3de30e7feff30455f7cec
LD_LIBRARY_PATH="" make STANDALONE=y

View File

@ -182,6 +182,7 @@ class Panda(object):
traceback.print_exc() traceback.print_exc()
if wait == False or self._handle != None: if wait == False or self._handle != None:
break break
context = usb1.USBContext() #New context needed so new devices show up
assert(self._handle != None) assert(self._handle != None)
print("connected") print("connected")
@ -280,11 +281,14 @@ class Panda(object):
if reconnect: if reconnect:
self.reconnect() self.reconnect()
def recover(self): def recover(self, timeout=None):
self.reset(enter_bootloader=True) self.reset(enter_bootloader=True)
t_start = time.time()
while len(PandaDFU.list()) == 0: while len(PandaDFU.list()) == 0:
print("waiting for DFU...") print("waiting for DFU...")
time.sleep(0.1) time.sleep(0.1)
if timeout is not None and (time.time() - t_start) > timeout:
return False
dfu = PandaDFU(PandaDFU.st_serial_to_dfu_serial(self._serial)) dfu = PandaDFU(PandaDFU.st_serial_to_dfu_serial(self._serial))
dfu.recover() dfu.recover()
@ -292,6 +296,7 @@ class Panda(object):
# reflash after recover # reflash after recover
self.connect(True, True) self.connect(True, True)
self.flash() self.flash()
return True
@staticmethod @staticmethod
def flash_ota_st(): def flash_ota_st():
@ -300,8 +305,9 @@ class Panda(object):
return ret==0 return ret==0
@staticmethod @staticmethod
def flash_ota_wifi(): def flash_ota_wifi(release=False):
ret = os.system("cd %s && make clean && make ota" % (os.path.join(BASEDIR, "boardesp"))) release_str = "RELEASE=1" if release else ""
ret = os.system("cd {} && make clean && {} make ota".format(os.path.join(BASEDIR, "boardesp"),release_str))
time.sleep(1) time.sleep(1)
return ret==0 return ret==0

View File

@ -1,4 +1,7 @@
libusb1 libusb1 == 1.6.6
hexdump hexdump
pycrypto pycrypto
tqdm tqdm
nose
parameterized
requests

View File

@ -1,3 +1,9 @@
#!/bin/bash #!/bin/bash
PYTHONPATH="." nosetests -x -s tests/automated/$1*.py TEST_FILENAME=${TEST_FILENAME:-nosetests.xml}
if [ ! -f "/EON" ]; then
TESTSUITE_NAME="Panda_Test-EON"
else
TESTSUITE_NAME="Panda_Test-DEV"
fi
PYTHONPATH="." nosetests -v --with-xunit --xunit-file=./$TEST_FILENAME --xunit-testsuite-name=$TESTSUITE_NAME -s tests/automated/$1*.py

View File

@ -1,11 +1,15 @@
import os import os
from panda import Panda from panda import Panda
from helpers import panda_color_to_serial, test_white_and_grey
def test_recover(): @test_white_and_grey
p = Panda() @panda_color_to_serial
p.recover() def test_recover(serial=None):
p = Panda(serial=serial)
assert p.recover(timeout=30)
def test_flash(): @test_white_and_grey
p = Panda() @panda_color_to_serial
def test_flash(serial=None):
p = Panda(serial=serial)
p.flash() p.flash()

View File

@ -3,14 +3,16 @@ import os
import sys import sys
import time import time
from panda import Panda from panda import Panda
from nose.tools import timed, assert_equal, assert_less, assert_greater from nose.tools import assert_equal, assert_less, assert_greater
from helpers import time_many_sends, connect_wo_esp from helpers import time_many_sends, connect_wo_esp, test_white_and_grey, panda_color_to_serial
SPEED_NORMAL = 500 SPEED_NORMAL = 500
SPEED_GMLAN = 33.3 SPEED_GMLAN = 33.3
def test_can_loopback(): @test_white_and_grey
p = connect_wo_esp() @panda_color_to_serial
def test_can_loopback(serial=None):
p = connect_wo_esp(serial)
# enable output mode # enable output mode
p.set_safety_mode(Panda.SAFETY_ALLOUTPUT) p.set_safety_mode(Panda.SAFETY_ALLOUTPUT)
@ -42,8 +44,10 @@ def test_can_loopback():
assert 0x1aa == sr[0][0] == lb[0][0] assert 0x1aa == sr[0][0] == lb[0][0]
assert "message" == sr[0][2] == lb[0][2] assert "message" == sr[0][2] == lb[0][2]
def test_safety_nooutput(): @test_white_and_grey
p = connect_wo_esp() @panda_color_to_serial
def test_safety_nooutput(serial=None):
p = connect_wo_esp(serial)
# enable output mode # enable output mode
p.set_safety_mode(Panda.SAFETY_NOOUTPUT) p.set_safety_mode(Panda.SAFETY_NOOUTPUT)
@ -59,8 +63,10 @@ def test_safety_nooutput():
r = p.can_recv() r = p.can_recv()
assert len(r) == 0 assert len(r) == 0
def test_reliability(): @test_white_and_grey
p = connect_wo_esp() @panda_color_to_serial
def test_reliability(serial=None):
p = connect_wo_esp(serial)
LOOP_COUNT = 100 LOOP_COUNT = 100
MSG_COUNT = 100 MSG_COUNT = 100
@ -97,8 +103,10 @@ def test_reliability():
sys.stdout.write("P") sys.stdout.write("P")
sys.stdout.flush() sys.stdout.flush()
def test_throughput(): @test_white_and_grey
p = connect_wo_esp() @panda_color_to_serial
def test_throughput(serial=None):
p = connect_wo_esp(serial)
# enable output mode # enable output mode
p.set_safety_mode(Panda.SAFETY_ALLOUTPUT) p.set_safety_mode(Panda.SAFETY_ALLOUTPUT)
@ -120,8 +128,10 @@ def test_throughput():
print("loopback 100 messages at speed %d, comp speed is %.2f, percent %.2f" % (speed, comp_kbps, saturation_pct)) print("loopback 100 messages at speed %d, comp speed is %.2f, percent %.2f" % (speed, comp_kbps, saturation_pct))
def test_gmlan(): @test_white_and_grey
p = connect_wo_esp() @panda_color_to_serial
def test_gmlan(serial=None):
p = connect_wo_esp(serial)
if p.legacy: if p.legacy:
return return
@ -135,7 +145,7 @@ def test_gmlan():
p.set_can_speed_kbps(1, SPEED_NORMAL) p.set_can_speed_kbps(1, SPEED_NORMAL)
p.set_can_speed_kbps(2, SPEED_NORMAL) p.set_can_speed_kbps(2, SPEED_NORMAL)
p.set_can_speed_kbps(3, SPEED_GMLAN) p.set_can_speed_kbps(3, SPEED_GMLAN)
# set gmlan on CAN2 # set gmlan on CAN2
for bus in [Panda.GMLAN_CAN2, Panda.GMLAN_CAN3, Panda.GMLAN_CAN2, Panda.GMLAN_CAN3]: for bus in [Panda.GMLAN_CAN2, Panda.GMLAN_CAN3, Panda.GMLAN_CAN2, Panda.GMLAN_CAN3]:
p.set_gmlan(bus) p.set_gmlan(bus)
@ -150,8 +160,10 @@ def test_gmlan():
print("%d: %.2f kbps vs %.2f kbps" % (bus, comp_kbps_gmlan, comp_kbps_normal)) print("%d: %.2f kbps vs %.2f kbps" % (bus, comp_kbps_gmlan, comp_kbps_normal))
def test_gmlan_bad_toggle(): @test_white_and_grey
p = connect_wo_esp() @panda_color_to_serial
def test_gmlan_bad_toggle(serial=None):
p = connect_wo_esp(serial)
if p.legacy: if p.legacy:
return return
@ -178,9 +190,10 @@ def test_gmlan_bad_toggle():
# this will fail if you have hardware serial connected # this will fail if you have hardware serial connected
def test_serial_debug(): @test_white_and_grey
p = connect_wo_esp() @panda_color_to_serial
def test_serial_debug(serial=None):
p = connect_wo_esp(serial)
junk = p.serial_read(Panda.SERIAL_DEBUG) junk = p.serial_read(Panda.SERIAL_DEBUG)
p.call_control_api(0xc0) p.call_control_api(0xc0)
assert(p.serial_read(Panda.SERIAL_DEBUG).startswith("can ")) assert(p.serial_read(Panda.SERIAL_DEBUG).startswith("can "))

View File

@ -1,33 +1,60 @@
from __future__ import print_function from __future__ import print_function
import os import os
import time
from panda import Panda from panda import Panda
from helpers import connect_wifi from helpers import connect_wifi, test_white, test_white_and_grey, panda_color_to_serial
import requests import requests
def test_get_serial(): @test_white_and_grey
p = Panda() @panda_color_to_serial
def test_get_serial(serial=None):
p = Panda(serial)
print(p.get_serial()) print(p.get_serial())
def test_get_serial_in_flash_mode(): @test_white_and_grey
p = Panda() @panda_color_to_serial
def test_get_serial_in_flash_mode(serial=None):
p = Panda(serial)
p.reset(enter_bootstub=True) p.reset(enter_bootstub=True)
assert(p.bootstub) assert(p.bootstub)
print(p.get_serial()) print(p.get_serial())
p.reset() p.reset()
def test_connect_wifi(): @test_white
connect_wifi() @panda_color_to_serial
def test_connect_wifi(serial=None):
connect_wifi(serial)
def test_flash_wifi(): @test_white
Panda.flash_ota_wifi() @panda_color_to_serial
connect_wifi() def test_flash_wifi(serial=None):
connect_wifi(serial)
assert Panda.flash_ota_wifi(release=True), "OTA Wifi Flash Failed"
connect_wifi(serial)
def test_wifi_flash_st(): @test_white
Panda.flash_ota_st() @panda_color_to_serial
def test_wifi_flash_st(serial=None):
connect_wifi(serial)
assert Panda.flash_ota_st(), "OTA ST Flash Failed"
connected = False
st = time.time()
while not connected and (time.time() - st) < 20:
try:
p = Panda(serial=serial)
p.get_serial()
connected = True
except:
time.sleep(1)
def test_webpage_fetch(): if not connected:
assert False, "Panda failed to connect on USB after flashing"
@test_white
@panda_color_to_serial
def test_webpage_fetch(serial=None):
connect_wifi(serial)
r = requests.get("http://192.168.0.10/") r = requests.get("http://192.168.0.10/")
print(r.text) print(r.text)
assert "This is your comma.ai panda" in r.text assert "This is your comma.ai panda" in r.text

View File

@ -1,17 +1,22 @@
from __future__ import print_function from __future__ import print_function
import time import time
from panda import Panda from panda import Panda
from helpers import time_many_sends, connect_wifi from helpers import time_many_sends, connect_wifi, test_white, panda_color_to_serial
from nose.tools import timed, assert_equal, assert_less, assert_greater from nose.tools import timed, assert_equal, assert_less, assert_greater
def test_get_serial_wifi(): @test_white
connect_wifi() @panda_color_to_serial
def test_get_serial_wifi(serial=None):
connect_wifi(serial)
p = Panda("WIFI") p = Panda("WIFI")
print(p.get_serial()) print(p.get_serial())
def test_throughput(): @test_white
p = Panda() @panda_color_to_serial
def test_throughput(serial=None):
connect_wifi(serial)
p = Panda(serial)
# enable output mode # enable output mode
p.set_safety_mode(Panda.SAFETY_ALLOUTPUT) p.set_safety_mode(Panda.SAFETY_ALLOUTPUT)
@ -24,7 +29,7 @@ def test_throughput():
for speed in [100,250,500,750,1000]: for speed in [100,250,500,750,1000]:
# set bus 0 speed to speed # set bus 0 speed to speed
p.set_can_speed_kbps(0, speed) p.set_can_speed_kbps(0, speed)
time.sleep(0.05) time.sleep(0.1)
comp_kbps = time_many_sends(p, 0) comp_kbps = time_many_sends(p, 0)
@ -35,8 +40,11 @@ def test_throughput():
print("WIFI loopback 100 messages at speed %d, comp speed is %.2f, percent %.2f" % (speed, comp_kbps, saturation_pct)) print("WIFI loopback 100 messages at speed %d, comp speed is %.2f, percent %.2f" % (speed, comp_kbps, saturation_pct))
def test_recv_only(): @test_white
p = Panda() @panda_color_to_serial
def test_recv_only(serial=None):
connect_wifi(serial)
p = Panda(serial)
p.set_safety_mode(Panda.SAFETY_ALLOUTPUT) p.set_safety_mode(Panda.SAFETY_ALLOUTPUT)
p.set_can_loopback(True) p.set_can_loopback(True)
pwifi = Panda("WIFI") pwifi = Panda("WIFI")
@ -49,4 +57,3 @@ def test_recv_only():
saturation_pct = (comp_kbps/speed) * 100.0 saturation_pct = (comp_kbps/speed) * 100.0
print("HT WIFI loopback %d messages at speed %d, comp speed is %.2f, percent %.2f" % (msg_count, speed, comp_kbps, saturation_pct)) print("HT WIFI loopback %d messages at speed %d, comp speed is %.2f, percent %.2f" % (msg_count, speed, comp_kbps, saturation_pct))

View File

@ -1,14 +1,16 @@
from __future__ import print_function from __future__ import print_function
import sys import sys
import time import time
from helpers import time_many_sends, connect_wifi from helpers import time_many_sends, connect_wifi, test_white, panda_color_to_serial
from panda import Panda, PandaWifiStreaming from panda import Panda, PandaWifiStreaming
from nose.tools import timed, assert_equal, assert_less, assert_greater from nose.tools import timed, assert_equal, assert_less, assert_greater
def test_udp_doesnt_drop(): @test_white
connect_wifi() @panda_color_to_serial
def test_udp_doesnt_drop(serial=None):
connect_wifi(serial)
p = Panda() p = Panda(serial)
p.set_safety_mode(Panda.SAFETY_ALLOUTPUT) p.set_safety_mode(Panda.SAFETY_ALLOUTPUT)
p.set_can_loopback(True) p.set_can_loopback(True)
@ -18,6 +20,7 @@ def test_udp_doesnt_drop():
break break
for msg_count in [1, 100]: for msg_count in [1, 100]:
saturation_pcts = []
for i in range({1: 0x80, 100: 0x20}[msg_count]): for i in range({1: 0x80, 100: 0x20}[msg_count]):
pwifi.kick() pwifi.kick()
@ -31,9 +34,33 @@ def test_udp_doesnt_drop():
sys.stdout.flush() sys.stdout.flush()
else: else:
print("UDP WIFI loopback %d messages at speed %d, comp speed is %.2f, percent %.2f" % (msg_count, speed, comp_kbps, saturation_pct)) print("UDP WIFI loopback %d messages at speed %d, comp speed is %.2f, percent %.2f" % (msg_count, speed, comp_kbps, saturation_pct))
assert_greater(saturation_pct, 40) assert_greater(saturation_pct, 20) #sometimes the wifi can be slow...
assert_less(saturation_pct, 100) assert_less(saturation_pct, 100)
print("") saturation_pcts.append(saturation_pct)
if len(saturation_pcts) > 0:
assert_greater(sum(saturation_pcts)/len(saturation_pcts), 60)
time.sleep(5)
usb_ok_cnt = 0
REQ_USB_OK_CNT = 500
st = time.time()
msg_id = 0x1bb
bus = 0
last_missing_msg = 0
while usb_ok_cnt < REQ_USB_OK_CNT and (time.time() - st) < 40:
p.can_send(msg_id, "message", bus)
time.sleep(0.01)
r = [1]
missing = True
while len(r) > 0:
r = p.can_recv()
r = filter(lambda x: x[3] == bus and x[0] == msg_id, r)
if len(r) > 0:
missing = False
usb_ok_cnt += len(r)
if missing:
last_missing_msg = time.time()
et = time.time() - st
last_missing_msg = last_missing_msg - st
print("waited {} for panda to recv can on usb, {} msgs, last missing at {}".format(et, usb_ok_cnt, last_missing_msg))
assert usb_ok_cnt >= REQ_USB_OK_CNT, "Unable to recv can on USB after UDP"

View File

@ -0,0 +1,121 @@
from __future__ import print_function
import time
from panda import Panda
from nose.tools import assert_equal, assert_less, assert_greater
from helpers import time_many_sends, test_two_panda, panda_color_to_serial
@test_two_panda
@panda_color_to_serial
def test_send_recv(serial_sender=None, serial_reciever=None):
p_send = Panda(serial_sender)
p_recv = Panda(serial_reciever)
p_send.set_safety_mode(Panda.SAFETY_ALLOUTPUT)
p_send.set_can_loopback(False)
p_recv.set_can_loopback(False)
assert not p_send.legacy
assert not p_recv.legacy
p_send.can_send_many([(0x1ba, 0, "message", 0)]*2)
time.sleep(0.05)
p_recv.can_recv()
p_send.can_recv()
busses = [0,1,2]
for bus in busses:
for speed in [100, 250, 500, 750, 1000]:
p_send.set_can_speed_kbps(bus, speed)
p_recv.set_can_speed_kbps(bus, speed)
time.sleep(0.05)
comp_kbps = time_many_sends(p_send, bus, p_recv, two_pandas=True)
saturation_pct = (comp_kbps/speed) * 100.0
assert_greater(saturation_pct, 80)
assert_less(saturation_pct, 100)
print("two pandas bus {}, 100 messages at speed {:4d}, comp speed is {:7.2f}, percent {:6.2f}".format(bus, speed, comp_kbps, saturation_pct))
@test_two_panda
@panda_color_to_serial
def test_latency(serial_sender=None, serial_reciever=None):
p_send = Panda(serial_sender)
p_recv = Panda(serial_reciever)
p_send.set_safety_mode(Panda.SAFETY_ALLOUTPUT)
p_send.set_can_loopback(False)
p_recv.set_can_loopback(False)
assert not p_send.legacy
assert not p_recv.legacy
p_send.set_can_speed_kbps(0, 100)
p_recv.set_can_speed_kbps(0, 100)
time.sleep(0.05)
p_send.can_send_many([(0x1ba, 0, "testmsg", 0)]*10)
time.sleep(0.05)
p_recv.can_recv()
p_send.can_recv()
busses = [0,1,2]
for bus in busses:
for speed in [100, 250, 500, 750, 1000]:
p_send.set_can_speed_kbps(bus, speed)
p_recv.set_can_speed_kbps(bus, speed)
time.sleep(0.1)
#clear can buffers
r = [1]
while len(r) > 0:
r = p_send.can_recv()
r = [1]
while len(r) > 0:
r = p_recv.can_recv()
time.sleep(0.05)
latencies = []
comp_kbps_list = []
saturation_pcts = []
num_messages = 100
for i in range(num_messages):
st = time.time()
p_send.can_send(0x1ab, "message", bus)
r = []
while len(r) < 1 and (time.time() - st) < 5:
r = p_recv.can_recv()
et = time.time()
r_echo = []
while len(r_echo) < 1 and (time.time() - st) < 10:
r_echo = p_send.can_recv()
if len(r) == 0 or len(r_echo) == 0:
print("r: {}, r_echo: {}".format(r, r_echo))
assert_equal(len(r),1)
assert_equal(len(r_echo),1)
et = (et - st)*1000.0
comp_kbps = (1+11+1+1+1+4+8*8+15+1+1+1+7) / et
latency = et - ((1+11+1+1+1+4+8*8+15+1+1+1+7) / speed)
assert_less(latency, 5.0)
saturation_pct = (comp_kbps/speed) * 100.0
latencies.append(latency)
comp_kbps_list.append(comp_kbps)
saturation_pcts.append(saturation_pct)
average_latency = sum(latencies)/num_messages
assert_less(average_latency, 1.0)
average_comp_kbps = sum(comp_kbps_list)/num_messages
average_saturation_pct = sum(saturation_pcts)/num_messages
print("two pandas bus {}, {} message average at speed {:4d}, latency is {:5.3f}ms, comp speed is {:7.2f}, percent {:6.2f}"\
.format(bus, num_messages, speed, average_latency, average_comp_kbps, average_saturation_pct))

View File

@ -4,12 +4,34 @@ import time
import random import random
import subprocess import subprocess
import requests import requests
from functools import wraps
from panda import Panda from panda import Panda
from nose.tools import timed, assert_equal, assert_less, assert_greater from nose.tools import timed, assert_equal, assert_less, assert_greater
from parameterized import parameterized, param
def connect_wo_esp(): test_white_and_grey = parameterized([param(panda_color="White"),
param(panda_color="Grey")])
test_white = parameterized([param(panda_color="White")])
test_grey = parameterized([param(panda_color="Grey")])
test_two_panda = parameterized([param(panda_color=["Grey", "White"]),
param(panda_color=["White", "Grey"])])
_serials = {}
def get_panda_serial(is_grey=None):
global _serials
if is_grey not in _serials:
for serial in Panda.list():
p = Panda(serial=serial)
if is_grey is None or p.is_grey() == is_grey:
_serials[is_grey] = serial
return serial
raise IOError("Panda not found. is_grey: {}".format(is_grey))
else:
return _serials[is_grey]
def connect_wo_esp(serial=None):
# connect to the panda # connect to the panda
p = Panda() p = Panda(serial=serial)
# power down the ESP # power down the ESP
p.set_esp_power(False) p.set_esp_power(False)
@ -20,15 +42,28 @@ def connect_wo_esp():
return p return p
def connect_wifi(): def connect_wifi(serial=None):
p = Panda() p = Panda(serial=serial)
p.set_esp_power(True)
dongle_id, pw = p.get_serial() dongle_id, pw = p.get_serial()
assert(dongle_id.isalnum()) assert(dongle_id.isalnum())
_connect_wifi(dongle_id, pw) _connect_wifi(dongle_id, pw)
FNULL = open(os.devnull, 'w')
def _connect_wifi(dongle_id, pw, insecure_okay=False): def _connect_wifi(dongle_id, pw, insecure_okay=False):
ssid = str("panda-" + dongle_id) ssid = str("panda-" + dongle_id)
r = subprocess.call(["ping", "-W", "4", "-c", "1", "192.168.0.10"], stdout=FNULL, stderr=subprocess.STDOUT)
if not r:
#Can already ping, try connecting on wifi
try:
p = Panda("WIFI")
p.get_serial()
print("Already connected")
return
except:
pass
print("WIFI: connecting to %s" % ssid) print("WIFI: connecting to %s" % ssid)
while 1: while 1:
@ -39,8 +74,8 @@ def _connect_wifi(dongle_id, pw, insecure_okay=False):
cnt = 0 cnt = 0
MAX_TRIES = 10 MAX_TRIES = 10
while cnt < MAX_TRIES: while cnt < MAX_TRIES:
print "WIFI: scanning %d" % cnt print("WIFI: scanning %d" % cnt)
os.system("sudo iwlist %s scanning > /dev/null" % wlan_interface) os.system("iwlist %s scanning > /dev/null" % wlan_interface)
os.system("nmcli device wifi rescan") os.system("nmcli device wifi rescan")
wifi_scan = filter(lambda x: ssid in x, subprocess.check_output(["nmcli","dev", "wifi", "list"]).split("\n")) wifi_scan = filter(lambda x: ssid in x, subprocess.check_output(["nmcli","dev", "wifi", "list"]).split("\n"))
if len(wifi_scan) != 0: if len(wifi_scan) != 0:
@ -51,45 +86,107 @@ def _connect_wifi(dongle_id, pw, insecure_okay=False):
assert cnt < MAX_TRIES assert cnt < MAX_TRIES
if "-pair" in wifi_scan[0]: if "-pair" in wifi_scan[0]:
os.system("nmcli d wifi connect %s-pair" % (ssid)) os.system("nmcli d wifi connect %s-pair" % (ssid))
connect_cnt = 0
MAX_TRIES = 20
while connect_cnt < MAX_TRIES:
connect_cnt += 1
r = subprocess.call(["ping", "-W", "4", "-c", "1", "192.168.0.10"], stdout=FNULL, stderr=subprocess.STDOUT)
if r:
print("Waiting for panda to ping...")
time.sleep(0.1)
else:
break
if insecure_okay: if insecure_okay:
break break
# fetch webpage # fetch webpage
print "connecting to insecure network to secure" print("connecting to insecure network to secure")
r = requests.get("http://192.168.0.10/") try:
r = requests.get("http://192.168.0.10/")
except requests.ConnectionError:
r = requests.get("http://192.168.0.10/")
assert r.status_code==200 assert r.status_code==200
print "securing" print("securing")
try: try:
r = requests.get("http://192.168.0.10/secure", timeout=0.01) r = requests.get("http://192.168.0.10/secure", timeout=0.01)
except requests.exceptions.Timeout: except requests.exceptions.Timeout:
print("timeout http request to secure")
pass pass
else: else:
os.system("nmcli d wifi connect %s password %s" % (ssid, pw)) ret = os.system("nmcli d wifi connect %s password %s" % (ssid, pw))
break if os.WEXITSTATUS(ret) == 0:
#check ping too
ping_ok = False
connect_cnt = 0
MAX_TRIES = 10
while connect_cnt < MAX_TRIES:
connect_cnt += 1
r = subprocess.call(["ping", "-W", "4", "-c", "1", "192.168.0.10"], stdout=FNULL, stderr=subprocess.STDOUT)
if r:
print("Waiting for panda to ping...")
time.sleep(0.1)
else:
ping_ok = True
break
if ping_ok:
break
# TODO: confirm that it's connected to the right panda # TODO: confirm that it's connected to the right panda
def time_many_sends(p, bus, precv=None, msg_count=100, msg_id=None): def time_many_sends(p, bus, precv=None, msg_count=100, msg_id=None, two_pandas=False):
if precv == None: if precv == None:
precv = p precv = p
if msg_id == None: if msg_id == None:
msg_id = random.randint(0x100, 0x200) msg_id = random.randint(0x100, 0x200)
if p == precv and two_pandas:
raise ValueError("Cannot have two pandas that are the same panda")
st = time.time() st = time.time()
p.can_send_many([(msg_id, 0, "\xaa"*8, bus)]*msg_count) p.can_send_many([(msg_id, 0, "\xaa"*8, bus)]*msg_count)
r = [] r = []
r_echo = []
r_len_expected = msg_count if two_pandas else msg_count*2
r_echo_len_exected = msg_count if two_pandas else 0
while len(r) < (msg_count*2) and (time.time() - st) < 3: while len(r) < r_len_expected and (time.time() - st) < 5:
r.extend(precv.can_recv()) r.extend(precv.can_recv())
et = time.time()
if two_pandas:
while len(r_echo) < r_echo_len_exected and (time.time() - st) < 10:
r_echo.extend(p.can_recv())
sent_echo = filter(lambda x: x[3] == 0x80 | bus and x[0] == msg_id, r) sent_echo = filter(lambda x: x[3] == 0x80 | bus and x[0] == msg_id, r)
loopback_resp = filter(lambda x: x[3] == bus and x[0] == msg_id, r) sent_echo.extend(filter(lambda x: x[3] == 0x80 | bus and x[0] == msg_id, r_echo))
resp = filter(lambda x: x[3] == bus and x[0] == msg_id, r)
leftovers = filter(lambda x: (x[3] != 0x80 | bus and x[3] != bus) or x[0] != msg_id, r)
assert_equal(len(leftovers), 0)
assert_equal(len(resp), msg_count)
assert_equal(len(sent_echo), msg_count) assert_equal(len(sent_echo), msg_count)
assert_equal(len(loopback_resp), msg_count)
et = (time.time()-st)*1000.0 et = (et-st)*1000.0
comp_kbps = (1+11+1+1+1+4+8*8+15+1+1+1+7)*msg_count / et comp_kbps = (1+11+1+1+1+4+8*8+15+1+1+1+7)*msg_count / et
return comp_kbps return comp_kbps
def panda_color_to_serial(fn):
@wraps(fn)
def wrapper(panda_color=None, **kwargs):
pandas_is_grey = []
if panda_color is not None:
if not isinstance(panda_color, list):
panda_color = [panda_color]
panda_color = [s.lower() for s in panda_color]
for p in panda_color:
if p is None:
pandas_is_grey.append(None)
elif p in ["grey", "gray"]:
pandas_is_grey.append(True)
elif p in ["white"]:
pandas_is_grey.append(False)
else:
raise ValueError("Invalid Panda Color {}".format(p))
return fn(*[get_panda_serial(is_grey) for is_grey in pandas_is_grey], **kwargs)
return wrapper