this is pretty nice now
parent
5c5a56c5e6
commit
bcf66c1271
|
@ -7,16 +7,19 @@ from collections import defaultdict
|
|||
import cereal.messaging as messaging
|
||||
from tools.lib.logreader import logreader_from_route_or_segment
|
||||
|
||||
RED = '\033[91m'
|
||||
CLEAR = '\033[0m'
|
||||
|
||||
def update(msgs, bus, low_to_high, high_to_low, quiet=False):
|
||||
def update(msgs, bus, dat, low_to_high, high_to_low, quiet=False):
|
||||
for x in msgs:
|
||||
if x.which() != 'can':
|
||||
continue
|
||||
|
||||
for y in x.can:
|
||||
if y.src == bus:
|
||||
i = int.from_bytes(y.dat, byteorder='big')
|
||||
dat[y.address] = y.dat
|
||||
|
||||
i = int.from_bytes(y.dat, byteorder='big')
|
||||
l_h = low_to_high[y.address]
|
||||
h_l = high_to_low[y.address]
|
||||
|
||||
|
@ -34,35 +37,51 @@ def update(msgs, bus, low_to_high, high_to_low, quiet=False):
|
|||
|
||||
|
||||
def can_printer(bus=0, init_msgs=None, new_msgs=None):
|
||||
logcan = messaging.sub_sock('can')
|
||||
logcan = messaging.sub_sock('can', timeout=10)
|
||||
|
||||
dat = defaultdict(int)
|
||||
low_to_high = defaultdict(int)
|
||||
high_to_low = defaultdict(int)
|
||||
|
||||
if init_msgs is not None:
|
||||
update(init_msgs, bus, low_to_high, high_to_low, quiet=True)
|
||||
update(init_msgs, bus, dat, low_to_high, high_to_low, quiet=True)
|
||||
|
||||
low_to_high_init = low_to_high.copy()
|
||||
high_to_low_init = high_to_low.copy()
|
||||
|
||||
if new_msgs is not None:
|
||||
update(new_msgs, bus, low_to_high, high_to_low)
|
||||
update(new_msgs, bus, dat, low_to_high, high_to_low)
|
||||
else:
|
||||
# Live mode
|
||||
while 1:
|
||||
can_recv = messaging.drain_sock(logcan, wait_for_one=True)
|
||||
update(can_recv, bus, low_to_high, high_to_low)
|
||||
try:
|
||||
while 1:
|
||||
can_recv = messaging.drain_sock(logcan)
|
||||
update(can_recv, bus, dat, low_to_high, high_to_low)
|
||||
time.sleep(0.02)
|
||||
except KeyboardInterrupt:
|
||||
pass
|
||||
|
||||
print("\n\n")
|
||||
for addr in sorted(dat.keys()):
|
||||
init = low_to_high_init[addr] & high_to_low_init[addr]
|
||||
now = low_to_high[addr] & high_to_low[addr]
|
||||
d = now & ~init
|
||||
if d == 0:
|
||||
continue
|
||||
b = d.to_bytes(len(dat[addr]), byteorder='big')
|
||||
byts = ''.join([(c if c == '0' else f'{RED}{c}{CLEAR}') for c in str(binascii.hexlify(b))[2:-1]])
|
||||
print(f"{hex(addr).ljust(6)}({str(addr).ljust(4)})", byts)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
desc = """Collects messages and prints when a new bit transition is observed.
|
||||
This is very useful to find signals based on user triggered actions, such as blinkers and seatbelt.
|
||||
Leave the script running until no new transitions are seen, then perform the action."""
|
||||
|
||||
parser = argparse.ArgumentParser(description=desc,
|
||||
formatter_class=argparse.ArgumentDefaultsHelpFormatter)
|
||||
|
||||
parser.add_argument("--bus", type=int, help="CAN bus to print out", default=0)
|
||||
|
||||
parser.add_argument("init", type=str, help="Route or segment to initialize with")
|
||||
parser.add_argument("comp", type=str, help="Route or segment to compare against init")
|
||||
parser.add_argument("init", type=str, nargs='?', help="Route or segment to initialize with")
|
||||
parser.add_argument("comp", type=str, nargs='?', help="Route or segment to compare against init")
|
||||
|
||||
args = parser.parse_args()
|
||||
|
||||
|
|
Loading…
Reference in New Issue