nopenpilot/selfdrive/debug/internal/toyota/sonar.py

160 lines
3.5 KiB
Python
Executable File

#!/usr/bin/env python3
import sys
import zmq
import os
import time
import random
from collections import defaultdict, OrderedDict
from selfdrive.boardd.boardd import can_list_to_can_capnp
from selfdrive.car.toyota.toyotacan import make_can_msg
import cereal.messaging as messaging
from cereal.services import service_list
changing = []
fields = range(0, 256)
# fields = [225, 50, 39, 40]
fields = [50]
field_results = defaultdict(lambda: "\x00\x00")
cur_field = 97
def send(sendcan, addr, m):
packet = make_can_msg(addr, m, 0, False)
packets = can_list_to_can_capnp([packet], msgtype='sendcan')
sendcan.send(packets.to_bytes())
def recv(can, addr):
received = False
r = []
while not received:
c = messaging.recv_one(can)
for msg in c.can:
if msg.address == addr:
r.append(msg)
received = True
return r
def recv_timeout(can, addr):
received = False
r = []
t = time.time()
while not received:
c = messaging.recv_one_or_none(can)
if c is not None:
for msg in c.can:
if msg.address == addr:
r.append(msg)
received = True
if time.time() - t > 0.05:
received = True
return r
def print_hex(d):
s = map(ord, d)
s = "".join(["{:02X}".format(b) for b in s])
print(s)
TYPES = {
0: 'single',
1: 'first',
2: 'consecutive',
3: 'flow'
}
CONTINUE = "\x67\x30\x01\x00\x00\x00\x00\x00"
TEST_ON = "\x67\x02\x10\x74\x00\x00\x00\x00"
POLL = "\x67\x02\x21\x69\x00\x00\x00\x00"
# POLL = "\x67\x02\x10\x69\x00\x00\x00\x00"
prev_rcv_t = ""
recv_data = []
l = 0
index = 0
can = messaging.sub_sock('can')
sendcan = messaging.pub_sock('sendcan')
time.sleep(0.5)
results = []
test_mode = False
while True:
# Send flow control if necessary
if prev_rcv_t == 'first' or prev_rcv_t == 'consecutive':
send(sendcan, 1872, CONTINUE)
received = recv_timeout(can, 1880)
if len(received) == 0:
sys.stdout.flush()
print(chr(27) + "[2J")
print(time.time())
print(changing)
if len(results):
if results[0] != "\x7F\x21\x31":
old = field_results[cur_field]
if old != '\x00\x00' and old != results[0] and cur_field not in changing:
changing.append(cur_field)
field_results[cur_field] = results[0]
else:
fields.remove(cur_field)
for k in fields:
# if field_results[k] == "\x00\x00":
# continue
print(k, end=' ')
print_hex(field_results[k])
results = []
if not test_mode:
send(sendcan, 1872, TEST_ON)
test_mode = True
else:
cur_field = random.choice(fields)
send(sendcan, 1872, POLL.replace('\x69', chr(cur_field)))
for r in received:
data = r.dat
# Check message type
t = TYPES[ord(data[1]) >> 4]
if t == 'single':
l = ord(data[1]) & 0x0F
elif t == 'first':
a = ord(data[1]) & 0x0F
b = ord(data[2])
l = b + (a << 8)
recv_data = []
prev_rcv_t = t
if t == 'single':
recv_data = data[2: 2 + l]
results.append(recv_data)
if t == 'first':
index = 0
recv_data += data[3: min(8, 3 + l)]
if t == 'consecutive':
index += 1
assert index == ord(data[1]) & 0x0F
pending_l = l - len(recv_data)
recv_data += data[2: min(8, 2 + pending_l)]
if len(recv_data) == l:
prev_rcv_t = ""
results.append(recv_data)