C++ Process replay improvements - Part 1 (#20828)

* C++ Process replay improvements

* revert that change

* create publisher before subscribers

* dont block forever, print warning

* add comment

* create sockets after process init

* try once more to receive the message

* SIMULATION env variable

* print message num when no response
albatross
Willem Melching 2021-05-05 17:14:29 +02:00 committed by GitHub
parent 33edb62967
commit d96edb6817
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 17 additions and 10 deletions

2
cereal

@ -1 +1 @@
Subproject commit 3be0bf50c801f4d01f92ee08cebcf960f09b180d
Subproject commit bab2f2b95e38da4c968efc31369163056e938b30

View File

@ -340,8 +340,8 @@ kj::ArrayPtr<capnp::byte> Localizer::get_message_bytes(MessageBuilder& msg_build
int Localizer::locationd_thread() {
const std::initializer_list<const char *> service_list =
{ "gpsLocationExternal", "sensorEvents", "cameraOdometry", "liveCalibration", "carState" };
SubMaster sm(service_list, nullptr, { "gpsLocationExternal" });
PubMaster pm({ "liveLocationKalman" });
SubMaster sm(service_list, nullptr, { "gpsLocationExternal" });
Params params;

View File

@ -13,12 +13,13 @@ int main() {
AlignedBuffer aligned_buf;
UbloxMsgParser parser;
PubMaster pm({"ubloxGnss", "gpsLocationExternal"});
Context * context = Context::create();
SubSocket * subscriber = SubSocket::create(context, "ubloxRaw");
assert(subscriber != NULL);
subscriber->setTimeout(100);
PubMaster pm({"ubloxGnss", "gpsLocationExternal"});
while (!do_exit) {
Message * msg = subscriber->receive();

View File

@ -432,32 +432,38 @@ def python_replay_process(cfg, lr, fingerprint=None):
def cpp_replay_process(cfg, lr, fingerprint=None):
sub_sockets = [s for _, sub in cfg.pub_sub.items() for s in sub] # We get responses here
pm = messaging.PubMaster(cfg.pub_sub.keys())
sockets = {s: messaging.sub_sock(s, timeout=1000) for s in sub_sockets}
all_msgs = sorted(lr, key=lambda msg: msg.logMonoTime)
pub_msgs = [msg for msg in all_msgs if msg.which() in list(cfg.pub_sub.keys())]
os.environ["SIMULATION"] = "1" # Disable submaster alive checks
managed_processes[cfg.proc_name].prepare()
managed_processes[cfg.proc_name].start()
time.sleep(1) # We give the process time to start
while not all(pm.all_readers_updated(s) for s in cfg.pub_sub.keys()):
time.sleep(0)
log_msgs = []
# Make sure all subscribers are connected
sockets = {s: messaging.sub_sock(s, timeout=1000) for s in sub_sockets}
for s in sub_sockets:
messaging.recv_one_or_none(sockets[s])
for msg in tqdm(pub_msgs, disable=CI):
log_msgs = []
for i, msg in enumerate(tqdm(pub_msgs, disable=CI)):
pm.send(msg.which(), msg.as_builder())
resp_sockets = cfg.pub_sub[msg.which()] if cfg.should_recv_callback is None else cfg.should_recv_callback(msg)
for s in resp_sockets:
response = messaging.recv_one(sockets[s])
if response is not None:
if response is None:
print(f"Warning, no response received {i}")
else:
log_msgs.append(response)
if not len(resp_sockets):
if not len(resp_sockets): # We only need to wait if we didn't already wait for a response
while not pm.all_readers_updated(msg.which()):
time.sleep(0)
time.sleep(0.0001)
managed_processes[cfg.proc_name].stop()
return log_msgs