Process replay for C++ daemons (#2288)

* more like a stash

* mem

* comment cpp

* remove some tests

* WORKSpython test_processes.py python test_processes.py

* all tests work, time to clean up

* initialize cleanup

* fix spaces

* refactor

* Update selfdrive/test/process_replay/process_replay.py

Co-authored-by: Adeeb Shihadeh <adeebshihadeh@gmail.com>

* revert debug change

* mini refactor

* remove submaster

* update README

* add delay

* check for missaligned start

* Update compare_logs.py

* should finally work now, I hope...

* update comment

* real fix

* real fix now

* fix styling

* fix styling

* Update process_replay.py

* fix code review

* speed up tests

* Update selfdrive/test/process_replay/process_replay.py

Co-authored-by: Adeeb Shihadeh <adeebshihadeh@gmail.com>

* Update selfdrive/test/process_replay/process_replay.py

Co-authored-by: Adeeb Shihadeh <adeebshihadeh@gmail.com>

* fix reviews

* nicer code

Co-authored-by: Adeeb Shihadeh <adeebshihadeh@gmail.com>
pull/2321/head
grekiki 2020-10-12 11:18:53 +02:00 committed by GitHub
parent 8fc9cfea38
commit 870644b59e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 61 additions and 5 deletions

View File

@ -12,6 +12,7 @@ Currently the following processes are tested:
* radard
* plannerd
* calibrationd
* ubloxd
## Forks
@ -19,6 +20,6 @@ openpilot forks can use this test with their own reference logs
To generate new logs:
`./update-refs.py --no-upload`
`./update_refs.py --no-upload`
Then, check in the new logs using git-lfs. Make sure to also include the updated `ref_commit` file.

View File

@ -3,7 +3,6 @@ import bz2
import os
import sys
import numbers
import dictdiffer
if "CI" in os.environ:
@ -59,6 +58,7 @@ def compare_logs(log1, log2, ignore_fields=None, ignore_msgs=None, tolerance=Non
ignore_msgs = []
log1, log2 = [list(filter(lambda m: m.which() not in ignore_msgs, log)) for log in (log1, log2)]
if len(log1) != len(log2):
raise Exception(f"logs are not same length: {len(log1)} VS {len(log2)}")

View File

@ -4,6 +4,7 @@ import os
import sys
import threading
import importlib
import time
if "CI" in os.environ:
def tqdm(x):
@ -18,13 +19,12 @@ import cereal.messaging as messaging
from common.params import Params
from cereal.services import service_list
from collections import namedtuple
from selfdrive.manager import managed_processes
# Numpy gives different results based on CPU features after version 19
NUMPY_TOLERANCE = 1e-7
ProcessConfig = namedtuple('ProcessConfig', ['proc_name', 'pub_sub', 'ignore', 'init_callback', 'should_recv_callback', 'tolerance'])
def wait_for_event(evt):
if not evt.wait(15):
if threading.currentThread().getName() == "MainThread":
@ -206,6 +206,15 @@ def calibration_rcv_callback(msg, CP, cfg, fsm):
recv_socks = ["liveCalibration"]
return recv_socks, fsm.frame == 0 or msg.which() == 'cameraOdometry'
def ublox_rcv_callback(msg):
msg_class, msg_id = msg.ubloxRaw[2:4]
if (msg_class, msg_id) in {(1, 7 * 16)}:
return ["gpsLocationExternal"]
elif (msg_class, msg_id) in {(2, 1 * 16 + 5), (10, 9)}:
return ["ubloxGnss"]
else:
return []
CONFIGS = [
ProcessConfig(
proc_name="controlsd",
@ -285,9 +294,27 @@ CONFIGS = [
should_recv_callback=None,
tolerance=NUMPY_TOLERANCE,
),
ProcessConfig(
proc_name="ubloxd",
pub_sub={
"ubloxRaw": ["ubloxGnss", "gpsLocationExternal"],
},
ignore=["logMonoTime"],
init_callback=None,
should_recv_callback=ublox_rcv_callback,
tolerance=None,
),
]
def replay_process(cfg, lr):
proc = managed_processes[cfg.proc_name]
if isinstance(proc, str):
return python_replay_process(cfg, lr)
else:
return cpp_replay_process(cfg, lr)
def python_replay_process(cfg, lr):
sub_sockets = [s for _, sub in cfg.pub_sub.items() for s in sub]
pub_sockets = [s for s in cfg.pub_sub.keys() if s != 'can']
@ -353,3 +380,31 @@ def replay_process(cfg, lr):
recv_cnt -= m.which() in recv_socks
return log_msgs
def cpp_replay_process(cfg, lr):
sub_sockets = [s for _, sub in cfg.pub_sub.items() for s in sub] # We get responses here
pm = messaging.PubMaster(cfg.pub_sub.keys())
sockets = {s : messaging.sub_sock(s, timeout=1000) for s in sub_sockets}
all_msgs = sorted(lr, key=lambda msg: msg.logMonoTime)
pub_msgs = [msg for msg in all_msgs if msg.which() in list(cfg.pub_sub.keys())]
manager.prepare_managed_process(cfg.proc_name)
manager.start_managed_process(cfg.proc_name)
time.sleep(1) # We give the process time to start
log_msgs = []
for s in sub_sockets:
messaging.recv_one_or_none(sockets[s])
for msg in tqdm(pub_msgs):
pm.send(msg.which(), msg.as_builder())
resp_sockets = sub_sockets if cfg.should_recv_callback is None else cfg.should_recv_callback(msg)
for s in resp_sockets:
response = messaging.recv_one(sockets[s])
if response is not None:
log_msgs.append(response)
manager.kill_managed_process(cfg.proc_name)
return log_msgs

View File

@ -1 +1 @@
d6364d7b9c574790642c273d87ee2e8e46664aab
1ca0db37c9fab1328fe1f86cc884b9bc7123aaee