add wifi tests
parent
4b0b82cd14
commit
991ad5be20
|
@ -13,6 +13,7 @@
|
|||
#endif
|
||||
|
||||
#include "libc.h"
|
||||
#include "provision.h"
|
||||
|
||||
#include "drivers/drivers.h"
|
||||
|
||||
|
|
|
@ -47,7 +47,7 @@ endif
|
|||
obj/cert.h: ../crypto/getcertheader.py
|
||||
../crypto/getcertheader.py ../certs/debug.pub ../certs/release.pub > $@
|
||||
|
||||
obj/%.$(PROJ_NAME).o: %.c obj/cert.h obj/gitversion.h config.h drivers/*.h gpio.h
|
||||
obj/%.$(PROJ_NAME).o: %.c obj/cert.h obj/gitversion.h config.h drivers/*.h gpio.h provision.h
|
||||
$(CC) $(CFLAGS) -o $@ -c $<
|
||||
|
||||
obj/%.$(PROJ_NAME).o: ../crypto/%.c
|
||||
|
|
|
@ -5,6 +5,7 @@
|
|||
|
||||
#include "libc.h"
|
||||
#include "safety.h"
|
||||
#include "provision.h"
|
||||
|
||||
#include "drivers/drivers.h"
|
||||
|
||||
|
@ -168,8 +169,8 @@ int usb_cb_control_msg(USB_Setup_TypeDef *setup, uint8_t *resp, int hardwired) {
|
|||
memcpy(resp, (void *)0x1fff79c0, 0x10);
|
||||
resp_len = 0x10;
|
||||
} else {
|
||||
memcpy(resp, (void *)0x1fff79e0, 0x20);
|
||||
resp_len = 0x20;
|
||||
get_provision_chunk(resp);
|
||||
resp_len = PROVISION_CHUNK_LEN;
|
||||
}
|
||||
#endif
|
||||
break;
|
||||
|
|
|
@ -0,0 +1,13 @@
|
|||
#define PROVISION_CHUNK_LEN 0x20
|
||||
|
||||
// WiFi SSID = 0x0 - 0x10
|
||||
// WiFi password = 0x10 - 0x1C
|
||||
// SHA1 checksum = 0x1C - 0x20
|
||||
|
||||
void get_provision_chunk(char *resp) {
|
||||
memcpy(resp, (void *)0x1fff79e0, PROVISION_CHUNK_LEN);
|
||||
if (memcmp(resp, "\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff", 0x20) == 0) {
|
||||
memcpy(resp, "unprovisioned\x00\x00\x00testing123\x00\x00\xa3\xa6\x99\xec", 0x20);
|
||||
}
|
||||
}
|
||||
|
|
@ -52,8 +52,8 @@ int usb_cb_control_msg(USB_Setup_TypeDef *setup, uint8_t *resp, int hardwired) {
|
|||
memcpy(resp, (void *)0x1fff79c0, 0x10);
|
||||
resp_len = 0x10;
|
||||
} else {
|
||||
memcpy(resp, (void *)0x1fff79e0, 0x20);
|
||||
resp_len = 0x20;
|
||||
get_provision_chunk(resp);
|
||||
resp_len = PROVISION_CHUNK_LEN;
|
||||
}
|
||||
#endif
|
||||
break;
|
||||
|
|
|
@ -107,9 +107,11 @@ class Panda(object):
|
|||
if self._serial == "WIFI":
|
||||
self._handle = WifiHandle()
|
||||
print("opening WIFI device")
|
||||
self.wifi = True
|
||||
else:
|
||||
context = usb1.USBContext()
|
||||
self._handle = None
|
||||
self.wifi = False
|
||||
|
||||
while 1:
|
||||
try:
|
||||
|
@ -287,7 +289,11 @@ class Panda(object):
|
|||
while True:
|
||||
try:
|
||||
#print("DAT: %s"%b''.join(snds).__repr__())
|
||||
self._handle.bulkWrite(3, b''.join(snds))
|
||||
if self.wifi:
|
||||
for s in snds:
|
||||
self._handle.bulkWrite(3, s)
|
||||
else:
|
||||
self._handle.bulkWrite(3, b''.join(snds))
|
||||
break
|
||||
except (usb1.USBErrorIO, usb1.USBErrorOverflow):
|
||||
print("CAN: BAD SEND MANY, RETRYING")
|
||||
|
|
|
@ -4,18 +4,7 @@ import sys
|
|||
import time
|
||||
from panda import Panda
|
||||
from nose.tools import timed, assert_equal, assert_less, assert_greater
|
||||
|
||||
def connect_wo_esp():
|
||||
# connect to the panda
|
||||
p = Panda()
|
||||
|
||||
# power down the ESP
|
||||
p.set_esp_power(False)
|
||||
return p
|
||||
|
||||
# clear old junk
|
||||
while len(p.can_recv()) > 0:
|
||||
pass
|
||||
from helpers import time_many_sends, connect_wo_esp
|
||||
|
||||
def test_can_loopback():
|
||||
p = connect_wo_esp()
|
||||
|
@ -105,27 +94,6 @@ def test_reliability():
|
|||
sys.stdout.write("P")
|
||||
sys.stdout.flush()
|
||||
|
||||
def time_many_sends(p, bus):
|
||||
MSG_COUNT = 100
|
||||
|
||||
st = time.time()
|
||||
p.can_send_many([(0x1aa, 0, "\xaa"*8, bus)]*MSG_COUNT)
|
||||
r = []
|
||||
|
||||
while len(r) < 200 and (time.time() - st) < 3:
|
||||
r.extend(p.can_recv())
|
||||
|
||||
sent_echo = filter(lambda x: x[3] == 0x80 | bus, r)
|
||||
loopback_resp = filter(lambda x: x[3] == bus, r)
|
||||
|
||||
assert_equal(len(sent_echo), 100)
|
||||
assert_equal(len(loopback_resp), 100)
|
||||
|
||||
et = (time.time()-st)*1000.0
|
||||
comp_kbps = (1+11+1+1+1+4+8*8+15+1+1+1+7)*MSG_COUNT / et
|
||||
|
||||
return comp_kbps
|
||||
|
||||
def test_throughput():
|
||||
p = connect_wo_esp()
|
||||
|
||||
|
|
|
@ -0,0 +1,29 @@
|
|||
from __future__ import print_function
|
||||
import os
|
||||
from panda import Panda
|
||||
|
||||
def test_get_serial():
|
||||
p = Panda()
|
||||
print(p.get_serial())
|
||||
|
||||
def test_get_serial_in_flash_mode():
|
||||
p = Panda()
|
||||
p.reset(enter_bootstub=True)
|
||||
assert(p.bootstub)
|
||||
print(p.get_serial())
|
||||
p.reset()
|
||||
|
||||
def test_connect_wifi():
|
||||
p = Panda()
|
||||
ssid, pw = p.get_serial()
|
||||
ssid = ssid.strip("\x00")
|
||||
assert(ssid.isalnum())
|
||||
assert(pw.isalnum())
|
||||
ssid = "panda-" + ssid
|
||||
|
||||
# Mac OS X only
|
||||
# TODO: Ubuntu
|
||||
os.system("networksetup -setairportnetwork en0 %s %s" % (ssid, pw))
|
||||
|
||||
|
||||
|
|
@ -0,0 +1,43 @@
|
|||
from __future__ import print_function
|
||||
import requests
|
||||
import time
|
||||
from panda import Panda
|
||||
from helpers import time_many_sends
|
||||
from nose.tools import timed, assert_equal, assert_less, assert_greater
|
||||
|
||||
def test_webpage_fetch():
|
||||
r = requests.get("http://192.168.0.10/")
|
||||
print(r.text)
|
||||
|
||||
assert "This is your comma.ai panda" in r.text
|
||||
|
||||
def test_get_serial_wifi():
|
||||
p = Panda("WIFI")
|
||||
print(p.get_serial())
|
||||
|
||||
def test_throughput():
|
||||
p = Panda()
|
||||
|
||||
# enable output mode
|
||||
p.set_safety_mode(Panda.SAFETY_ALLOUTPUT)
|
||||
|
||||
# enable CAN loopback mode
|
||||
p.set_can_loopback(True)
|
||||
|
||||
p = Panda("WIFI")
|
||||
|
||||
for speed in [100,250,500,750,1000]:
|
||||
# set bus 0 speed to speed
|
||||
p.set_can_speed_kbps(0, speed)
|
||||
time.sleep(0.05)
|
||||
|
||||
comp_kbps = time_many_sends(p, 0)
|
||||
|
||||
# bit count from https://en.wikipedia.org/wiki/CAN_bus
|
||||
saturation_pct = (comp_kbps/speed) * 100.0
|
||||
#assert_greater(saturation_pct, 80)
|
||||
#assert_less(saturation_pct, 100)
|
||||
|
||||
print("WIFI loopback 100 messages at speed %d, comp speed is %.2f, percent %.2f" % (speed, comp_kbps, saturation_pct))
|
||||
|
||||
|
|
@ -0,0 +1,36 @@
|
|||
from panda import Panda
|
||||
from nose.tools import timed, assert_equal, assert_less, assert_greater
|
||||
import time
|
||||
|
||||
def connect_wo_esp():
|
||||
# connect to the panda
|
||||
p = Panda()
|
||||
|
||||
# power down the ESP
|
||||
p.set_esp_power(False)
|
||||
return p
|
||||
|
||||
# clear old junk
|
||||
while len(p.can_recv()) > 0:
|
||||
pass
|
||||
|
||||
def time_many_sends(p, bus):
|
||||
MSG_COUNT = 100
|
||||
|
||||
st = time.time()
|
||||
p.can_send_many([(0x1aa, 0, "\xaa"*8, bus)]*MSG_COUNT)
|
||||
r = []
|
||||
|
||||
while len(r) < 200 and (time.time() - st) < 3:
|
||||
r.extend(p.can_recv())
|
||||
|
||||
sent_echo = filter(lambda x: x[3] == 0x80 | bus, r)
|
||||
loopback_resp = filter(lambda x: x[3] == bus, r)
|
||||
|
||||
assert_equal(len(sent_echo), 100)
|
||||
assert_equal(len(loopback_resp), 100)
|
||||
|
||||
et = (time.time()-st)*1000.0
|
||||
comp_kbps = (1+11+1+1+1+4+8*8+15+1+1+1+7)*MSG_COUNT / et
|
||||
|
||||
return comp_kbps
|
Loading…
Reference in New Issue