process replay: add timeout to prevent hanging when tested process crashes (#1007)

albatross
Adeeb 2020-01-25 18:45:51 -08:00 committed by George Hotz
parent 9a05283566
commit 361be2630f
1 changed files with 15 additions and 11 deletions

View File

@ -19,6 +19,10 @@ from collections import namedtuple
ProcessConfig = namedtuple('ProcessConfig', ['proc_name', 'pub_sub', 'ignore', 'init_callback', 'should_recv_callback'])
def wait_for_event(evt):
if not evt.wait(15):
raise Exception("Timeout reached. Thread likely crashed.")
class FakeSocket:
def __init__(self, wait=True):
self.data = []
@ -32,13 +36,13 @@ class FakeSocket:
if self.wait:
self.recv_called.set()
self.recv_ready.wait()
wait_for_event(self.recv_ready)
self.recv_ready.clear()
return self.data.pop()
def send(self, data):
if self.wait:
self.recv_called.wait()
wait_for_event(self.recv_called)
self.recv_called.clear()
self.data.append(data)
@ -47,7 +51,7 @@ class FakeSocket:
self.recv_ready.set()
def wait_for_recv(self):
self.recv_called.wait()
wait_for_event(self.recv_called)
class DumbSocket:
def __init__(self, s=None):
@ -75,23 +79,23 @@ class FakeSubMaster(messaging.SubMaster):
# hack to know when fingerprinting is done
if self.wait_on_getitem:
self.update_called.set()
self.update_ready.wait()
wait_for_event(self.update_ready)
self.update_ready.clear()
return self.data[s]
def update(self, timeout=-1):
self.update_called.set()
self.update_ready.wait()
wait_for_event(self.update_ready)
self.update_ready.clear()
def update_msgs(self, cur_time, msgs):
self.update_called.wait()
wait_for_event(self.update_called)
self.update_called.clear()
super(FakeSubMaster, self).update_msgs(cur_time, msgs)
self.update_ready.set()
def wait_for_update(self):
self.update_called.wait()
wait_for_event(self.update_called)
class FakePubMaster(messaging.PubMaster):
def __init__(self, services):
@ -116,11 +120,11 @@ class FakePubMaster(messaging.PubMaster):
else:
self.data[s] = dat.as_reader()
self.send_called.set()
self.get_called.wait()
wait_for_event(self.get_called)
self.get_called.clear()
def wait_for_msg(self):
self.send_called.wait()
wait_for_event(self.send_called)
self.send_called.clear()
dat = self.data[self.last_updated]
self.get_called.set()
@ -132,14 +136,14 @@ def fingerprint(msgs, fsm, can_sock):
# populate fake socket with data for fingerprinting
canmsgs = [msg for msg in msgs if msg.which() == "can"]
can_sock.recv_called.wait()
wait_for_event(can_sock.recv_called)
can_sock.recv_called.clear()
can_sock.data = [msg.as_builder().to_bytes() for msg in canmsgs[:300]]
can_sock.recv_ready.set()
can_sock.wait = False
# we know fingerprinting is done when controlsd sets sm['pathPlan'].sensorValid
fsm.update_called.wait()
wait_for_event(fsm.update_called)
fsm.update_called.clear()
fsm.wait_on_getitem = False