loggerd tests (#19671)
parent
07df6956f0
commit
af81190cfc
|
@ -138,6 +138,7 @@ pipeline {
|
|||
["test sounds", "nosetests -s selfdrive/test/test_sounds.py"],
|
||||
["test boardd loopback", "nosetests -s selfdrive/boardd/tests/test_boardd_loopback.py"],
|
||||
["test loggerd", "CI=1 python selfdrive/loggerd/tests/test_loggerd.py"],
|
||||
["test encoder", "CI=1 python selfdrive/loggerd/tests/test_encoder.py"],
|
||||
//["test camerad", "CI=1 python selfdrive/camerad/test/test_camerad.py"], // wait for shelf refactor
|
||||
//["test updater", "python installer/updater/test_updater.py"],
|
||||
])
|
||||
|
|
|
@ -600,7 +600,7 @@ int main(int argc, char** argv) {
|
|||
double last_rotate_tms = millis_since_boot();
|
||||
double last_camera_seen_tms = millis_since_boot();
|
||||
while (!do_exit) {
|
||||
for (auto sock : poller->poll(100 * 1000)) {
|
||||
for (auto sock : poller->poll(1000)) {
|
||||
Message * last_msg = nullptr;
|
||||
while (!do_exit) {
|
||||
Message * msg = sock->receive(true);
|
||||
|
|
|
@ -0,0 +1,124 @@
|
|||
#!/usr/bin/env python3
|
||||
import math
|
||||
import os
|
||||
import random
|
||||
import shutil
|
||||
import subprocess
|
||||
import time
|
||||
import unittest
|
||||
from parameterized import parameterized
|
||||
from pathlib import Path
|
||||
from tqdm import trange
|
||||
|
||||
from common.params import Params
|
||||
from common.timeout import Timeout
|
||||
from selfdrive.hardware import EON, TICI
|
||||
from selfdrive.test.helpers import with_processes
|
||||
from selfdrive.loggerd.config import ROOT, CAMERA_FPS
|
||||
|
||||
|
||||
# baseline file sizes for a 2s segment, in bytes
|
||||
FULL_SIZE = 1253786
|
||||
if EON:
|
||||
CAMERAS = {
|
||||
"fcamera": FULL_SIZE,
|
||||
"dcamera": 770920,
|
||||
"qcamera": 38533,
|
||||
}
|
||||
elif TICI:
|
||||
CAMERAS = {f"{c}camera": FULL_SIZE if c!="q" else 38533 for c in ["f", "e", "d", "q"]}
|
||||
else:
|
||||
CAMERAS = {}
|
||||
|
||||
ALL_CAMERA_COMBINATIONS = [(cameras,) for cameras in [CAMERAS, {k:CAMERAS[k] for k in CAMERAS if k!='dcamera'}]]
|
||||
|
||||
FRAME_TOLERANCE = 0
|
||||
FILE_SIZE_TOLERANCE = 0.5
|
||||
|
||||
class TestEncoder(unittest.TestCase):
|
||||
|
||||
# TODO: all of loggerd should work on PC
|
||||
@classmethod
|
||||
def setUpClass(cls):
|
||||
if not (EON or TICI):
|
||||
raise unittest.SkipTest
|
||||
|
||||
def setUp(self):
|
||||
self._clear_logs()
|
||||
|
||||
self.segment_length = 2
|
||||
os.environ["LOGGERD_TEST"] = "1"
|
||||
os.environ["LOGGERD_SEGMENT_LENGTH"] = str(self.segment_length)
|
||||
|
||||
def tearDown(self):
|
||||
self._clear_logs()
|
||||
|
||||
def _clear_logs(self):
|
||||
if os.path.exists(ROOT):
|
||||
shutil.rmtree(ROOT)
|
||||
|
||||
def _get_latest_segment_path(self):
|
||||
last_route = sorted(Path(ROOT).iterdir(), key=os.path.getmtime)[-1]
|
||||
return os.path.join(ROOT, last_route)
|
||||
|
||||
# TODO: this should run faster than real time
|
||||
@parameterized.expand(ALL_CAMERA_COMBINATIONS)
|
||||
@with_processes(['camerad', 'sensord', 'loggerd'], init_time=5)
|
||||
def test_log_rotation(self, cameras):
|
||||
print("checking targets:", cameras)
|
||||
Params().put("RecordFront", "1" if 'dcamera' in cameras else "0")
|
||||
|
||||
num_segments = random.randint(80, 150)
|
||||
if "CI" in os.environ:
|
||||
num_segments = random.randint(15, 20) # ffprobe is slow on comma two
|
||||
|
||||
# wait for loggerd to make the dir for first segment
|
||||
time.sleep(10)
|
||||
route_prefix_path = None
|
||||
with Timeout(30):
|
||||
while route_prefix_path is None:
|
||||
try:
|
||||
route_prefix_path = self._get_latest_segment_path().rsplit("--", 1)[0]
|
||||
except Exception:
|
||||
time.sleep(2)
|
||||
continue
|
||||
|
||||
for i in trange(num_segments):
|
||||
# poll for next segment
|
||||
if i < num_segments - 1:
|
||||
with Timeout(self.segment_length*3, error_msg=f"timed out waiting for segment {i}"):
|
||||
while True:
|
||||
seg_num = int(self._get_latest_segment_path().rsplit("--", 1)[1])
|
||||
if seg_num > i:
|
||||
break
|
||||
time.sleep(0.1)
|
||||
else:
|
||||
time.sleep(self.segment_length)
|
||||
|
||||
# check each camera file size
|
||||
for camera, size in cameras.items():
|
||||
ext = "ts" if camera=='qcamera' else "hevc"
|
||||
file_path = f"{route_prefix_path}--{i}/{camera}.{ext}"
|
||||
|
||||
# check file size
|
||||
self.assertTrue(os.path.exists(file_path), f"couldn't find {file_path}")
|
||||
file_size = os.path.getsize(file_path)
|
||||
self.assertTrue(math.isclose(file_size, size, rel_tol=FILE_SIZE_TOLERANCE),
|
||||
f"{camera} failed size check: expected {size}, got {file_size}")
|
||||
|
||||
if camera == 'qcamera':
|
||||
continue
|
||||
|
||||
# check frame count
|
||||
cmd = f"ffprobe -v error -count_frames -select_streams v:0 -show_entries stream=nb_read_frames \
|
||||
-of default=nokey=1:noprint_wrappers=1 {file_path}"
|
||||
expected_frames = self.segment_length * CAMERA_FPS // 2 if (EON and camera=='dcamera') else self.segment_length * CAMERA_FPS
|
||||
frame_tolerance = FRAME_TOLERANCE+1 if (EON and camera=='dcamera') or i==0 else FRAME_TOLERANCE
|
||||
frame_count = int(subprocess.check_output(cmd, shell=True, encoding='utf8').strip())
|
||||
|
||||
self.assertTrue(abs(expected_frames - frame_count) <= frame_tolerance,
|
||||
f"{camera} failed frame count check: expected {expected_frames}, got {frame_count}")
|
||||
shutil.rmtree(f"{route_prefix_path}--{i}")
|
||||
|
||||
if __name__ == "__main__":
|
||||
unittest.main()
|
|
@ -1,124 +1,211 @@
|
|||
#!/usr/bin/env python3
|
||||
import math
|
||||
import os
|
||||
import random
|
||||
import shutil
|
||||
import string
|
||||
import subprocess
|
||||
import time
|
||||
import unittest
|
||||
from parameterized import parameterized
|
||||
from collections import defaultdict
|
||||
from pathlib import Path
|
||||
from tqdm import trange
|
||||
|
||||
from common.params import Params
|
||||
from cereal import log
|
||||
import cereal.messaging as messaging
|
||||
from cereal.services import service_list
|
||||
from common.basedir import BASEDIR
|
||||
from common.timeout import Timeout
|
||||
from selfdrive.hardware import EON, TICI
|
||||
from selfdrive.test.helpers import with_processes
|
||||
from selfdrive.loggerd.config import ROOT, CAMERA_FPS
|
||||
from common.params import Params
|
||||
import selfdrive.manager as manager
|
||||
from selfdrive.loggerd.config import ROOT
|
||||
from selfdrive.version import version as VERSION
|
||||
from tools.lib.logreader import LogReader
|
||||
|
||||
SentinelType = log.Sentinel.SentinelType
|
||||
|
||||
# baseline file sizes for a 2s segment, in bytes
|
||||
FULL_SIZE = 1253786
|
||||
if EON:
|
||||
CAMERAS = {
|
||||
"fcamera": FULL_SIZE,
|
||||
"dcamera": 770920,
|
||||
"qcamera": 38533,
|
||||
}
|
||||
elif TICI:
|
||||
CAMERAS = {f"{c}camera": FULL_SIZE if c!="q" else 38533 for c in ["f", "e", "d", "q"]}
|
||||
else:
|
||||
CAMERAS = {}
|
||||
|
||||
ALL_CAMERA_COMBINATIONS = [(cameras,) for cameras in [CAMERAS, {k:CAMERAS[k] for k in CAMERAS if k!='dcamera'}]]
|
||||
|
||||
FRAME_TOLERANCE = 0
|
||||
FILE_SIZE_TOLERANCE = 0.5
|
||||
CEREAL_SERVICES = [f for f in log.Event.schema.union_fields if f in service_list
|
||||
and service_list[f].should_log and "encode" not in f.lower()]
|
||||
|
||||
class TestLoggerd(unittest.TestCase):
|
||||
|
||||
# TODO: all of loggerd should work on PC
|
||||
@classmethod
|
||||
def setUpClass(cls):
|
||||
if not (EON or TICI):
|
||||
raise unittest.SkipTest
|
||||
def _get_latest_log_dir(self):
|
||||
log_dirs = sorted(Path(ROOT).iterdir(), key=lambda f: f.stat().st_mtime)
|
||||
return log_dirs[-1]
|
||||
|
||||
def setUp(self):
|
||||
self._clear_logs()
|
||||
def _get_log_dir(self, x):
|
||||
for p in x.split(' '):
|
||||
path = Path(p.strip())
|
||||
if path.is_dir():
|
||||
return path
|
||||
return None
|
||||
|
||||
self.segment_length = 2
|
||||
os.environ["LOGGERD_TEST"] = "1"
|
||||
os.environ["LOGGERD_SEGMENT_LENGTH"] = str(self.segment_length)
|
||||
def _gen_bootlog(self):
|
||||
with Timeout(5):
|
||||
out = subprocess.check_output(["./loggerd", "--bootlog"], cwd=os.path.join(BASEDIR, "selfdrive/loggerd"), encoding='utf-8')
|
||||
|
||||
def tearDown(self):
|
||||
self._clear_logs()
|
||||
# check existence
|
||||
d = self._get_log_dir(out)
|
||||
path = Path(os.path.join(d, "bootlog.bz2"))
|
||||
assert path.is_file(), "failed to create bootlog file"
|
||||
return path
|
||||
|
||||
def _clear_logs(self):
|
||||
if os.path.exists(ROOT):
|
||||
shutil.rmtree(ROOT)
|
||||
def _check_init_data(self, msgs):
|
||||
msg = msgs[0]
|
||||
assert msg.which() == 'initData'
|
||||
|
||||
def _get_latest_segment_path(self):
|
||||
last_route = sorted(Path(ROOT).iterdir(), key=os.path.getmtime)[-1]
|
||||
return os.path.join(ROOT, last_route)
|
||||
def _check_sentinel(self, msgs, route):
|
||||
start_type = SentinelType.startOfRoute if route else SentinelType.startOfSegment
|
||||
assert msgs[1].sentinel.type == start_type
|
||||
|
||||
# TODO: this should run faster than real time
|
||||
@parameterized.expand(ALL_CAMERA_COMBINATIONS)
|
||||
@with_processes(['camerad', 'sensord', 'loggerd'], init_time=5)
|
||||
def test_log_rotation(self, cameras):
|
||||
print("checking targets:", cameras)
|
||||
Params().put("RecordFront", "1" if 'dcamera' in cameras else "0")
|
||||
end_type = SentinelType.endOfRoute if route else SentinelType.endOfSegment
|
||||
assert msgs[-1].sentinel.type == end_type
|
||||
|
||||
num_segments = random.randint(80, 150)
|
||||
if "CI" in os.environ:
|
||||
num_segments = random.randint(15, 20) # ffprobe is slow on comma two
|
||||
def test_init_data_values(self):
|
||||
os.environ["CLEAN"] = random.choice(["0", "1"])
|
||||
os.environ["DONGLE_ID"] = ''.join(random.choice(string.printable) for n in range(random.randint(1, 100)))
|
||||
|
||||
# wait for loggerd to make the dir for first segment
|
||||
time.sleep(10)
|
||||
route_prefix_path = None
|
||||
with Timeout(30):
|
||||
while route_prefix_path is None:
|
||||
fake_params = [
|
||||
("GitCommit", "gitCommit", "commit"),
|
||||
("GitBranch", "gitBranch", "branch"),
|
||||
("GitRemote", "gitRemote", "remote"),
|
||||
]
|
||||
params = Params()
|
||||
for k, _, v in fake_params:
|
||||
params.put(k, v)
|
||||
|
||||
lr = list(LogReader(str(self._gen_bootlog())))
|
||||
initData = lr[0].initData
|
||||
|
||||
assert initData.dirty != bool(os.environ["CLEAN"])
|
||||
assert initData.dongleId == os.environ["DONGLE_ID"]
|
||||
assert initData.version == VERSION
|
||||
|
||||
if os.path.isfile("/proc/cmdline"):
|
||||
with open("/proc/cmdline") as f:
|
||||
assert list(initData.kernelArgs) == f.read().strip().split(" ")
|
||||
|
||||
with open("/proc/version") as f:
|
||||
assert initData.kernelVersion == f.read()
|
||||
|
||||
for _, k, v in fake_params:
|
||||
assert getattr(initData, k) == v
|
||||
|
||||
def test_bootlog(self):
|
||||
# generate bootlog with fake launch log
|
||||
launch_log = ''.join([str(random.choice(string.printable)) for _ in range(100)])
|
||||
with open("/tmp/launch_log", "w") as f:
|
||||
f.write(launch_log)
|
||||
|
||||
bootlog_path = self._gen_bootlog()
|
||||
lr = list(LogReader(str(bootlog_path)))
|
||||
|
||||
# check length
|
||||
assert len(lr) == 4 # boot + initData + 2x sentinel
|
||||
|
||||
# check initData and sentinel
|
||||
self._check_init_data(lr)
|
||||
self._check_sentinel(lr, True)
|
||||
|
||||
# check msgs
|
||||
bootlog_msgs = [m for m in lr if m.which() == 'boot']
|
||||
assert len(bootlog_msgs) == 1
|
||||
|
||||
# sanity check values
|
||||
boot = bootlog_msgs.pop().boot
|
||||
assert abs(boot.wallTimeNanos - time.time_ns()) < 5*1e9 # within 5s
|
||||
assert boot.launchLog == launch_log
|
||||
|
||||
for field, path in [("lastKmsg", "console-ramoops"), ("lastPmsg", "pmsg-ramoops-0")]:
|
||||
path = Path(os.path.join("/sys/fs/pstore/", path))
|
||||
val = b""
|
||||
if path.is_file():
|
||||
val = open(path, "rb").read()
|
||||
assert getattr(boot, field) == val
|
||||
|
||||
def test_qlog(self):
|
||||
qlog_services = [s for s in CEREAL_SERVICES if service_list[s].decimation is not None]
|
||||
no_qlog_services = [s for s in CEREAL_SERVICES if service_list[s].decimation is None]
|
||||
|
||||
services = random.sample(qlog_services, random.randint(2, 10)) + \
|
||||
random.sample(no_qlog_services, random.randint(2, 10))
|
||||
|
||||
pm = messaging.PubMaster(services)
|
||||
|
||||
# sleep enough for the first poll to time out
|
||||
# TOOD: fix loggerd bug dropping the msgs from the first poll
|
||||
manager.start_managed_process("loggerd")
|
||||
time.sleep(2)
|
||||
|
||||
sent_msgs = defaultdict(list)
|
||||
for _ in range(random.randint(2, 10) * 100):
|
||||
for s in services:
|
||||
try:
|
||||
route_prefix_path = self._get_latest_segment_path().rsplit("--", 1)[0]
|
||||
m = messaging.new_message(s)
|
||||
except Exception:
|
||||
time.sleep(2)
|
||||
continue
|
||||
m = messaging.new_message(s, random.randint(2, 10))
|
||||
pm.send(s, m)
|
||||
sent_msgs[s].append(m)
|
||||
time.sleep(0.01)
|
||||
|
||||
for i in trange(num_segments):
|
||||
# poll for next segment
|
||||
if i < num_segments - 1:
|
||||
with Timeout(self.segment_length*3, error_msg=f"timed out waiting for segment {i}"):
|
||||
while True:
|
||||
seg_num = int(self._get_latest_segment_path().rsplit("--", 1)[1])
|
||||
if seg_num > i:
|
||||
break
|
||||
time.sleep(0.1)
|
||||
time.sleep(1)
|
||||
manager.kill_managed_process("loggerd")
|
||||
|
||||
qlog_path = os.path.join(self._get_latest_log_dir(), "qlog.bz2")
|
||||
lr = list(LogReader(qlog_path))
|
||||
|
||||
# check initData and sentinel
|
||||
self._check_init_data(lr)
|
||||
self._check_sentinel(lr, True)
|
||||
|
||||
recv_msgs = defaultdict(list)
|
||||
for m in lr:
|
||||
recv_msgs[m.which()].append(m)
|
||||
|
||||
for s, msgs in sent_msgs.items():
|
||||
recv_cnt = len(recv_msgs[s])
|
||||
|
||||
if s in no_qlog_services:
|
||||
# check services with no specific decimation aren't in qlog
|
||||
assert recv_cnt == 0, f"got {recv_cnt} {s} msgs in qlog"
|
||||
else:
|
||||
time.sleep(self.segment_length)
|
||||
# check logged message count matches decimation
|
||||
expected_cnt = len(msgs) // service_list[s].decimation
|
||||
assert recv_cnt == expected_cnt, f"expected {expected_cnt} msgs for {s}, got {recv_cnt}"
|
||||
|
||||
# check each camera file size
|
||||
for camera, size in cameras.items():
|
||||
ext = "ts" if camera=='qcamera' else "hevc"
|
||||
file_path = f"{route_prefix_path}--{i}/{camera}.{ext}"
|
||||
def test_rlog(self):
|
||||
services = random.sample(CEREAL_SERVICES, random.randint(5, 10))
|
||||
pm = messaging.PubMaster(services)
|
||||
|
||||
# check file size
|
||||
self.assertTrue(os.path.exists(file_path), f"couldn't find {file_path}")
|
||||
file_size = os.path.getsize(file_path)
|
||||
self.assertTrue(math.isclose(file_size, size, rel_tol=FILE_SIZE_TOLERANCE),
|
||||
f"{camera} failed size check: expected {size}, got {file_size}")
|
||||
# sleep enough for the first poll to time out
|
||||
# TOOD: fix loggerd bug dropping the msgs from the first poll
|
||||
manager.start_managed_process("loggerd")
|
||||
time.sleep(2)
|
||||
|
||||
if camera == 'qcamera':
|
||||
continue
|
||||
sent_msgs = defaultdict(list)
|
||||
for _ in range(random.randint(2, 10) * 100):
|
||||
for s in services:
|
||||
try:
|
||||
m = messaging.new_message(s)
|
||||
except Exception:
|
||||
m = messaging.new_message(s, random.randint(2, 10))
|
||||
pm.send(s, m)
|
||||
sent_msgs[s].append(m)
|
||||
time.sleep(0.01)
|
||||
|
||||
# check frame count
|
||||
cmd = f"ffprobe -v error -count_frames -select_streams v:0 -show_entries stream=nb_read_frames \
|
||||
-of default=nokey=1:noprint_wrappers=1 {file_path}"
|
||||
expected_frames = self.segment_length * CAMERA_FPS // 2 if (EON and camera=='dcamera') else self.segment_length * CAMERA_FPS
|
||||
frame_tolerance = FRAME_TOLERANCE+1 if (EON and camera=='dcamera') or i==0 else FRAME_TOLERANCE
|
||||
frame_count = int(subprocess.check_output(cmd, shell=True, encoding='utf8').strip())
|
||||
time.sleep(1)
|
||||
manager.kill_managed_process("loggerd")
|
||||
|
||||
lr = list(LogReader(os.path.join(self._get_latest_log_dir(), "rlog.bz2")))
|
||||
|
||||
# check initData and sentinel
|
||||
self._check_init_data(lr)
|
||||
self._check_sentinel(lr, True)
|
||||
|
||||
# check all messages were logged and in order
|
||||
lr = lr[2:-1] # slice off initData and both sentinels
|
||||
for m in lr:
|
||||
sent = sent_msgs[m.which()].pop(0)
|
||||
sent.clear_write_flag()
|
||||
assert sent.to_bytes() == m.as_builder().to_bytes()
|
||||
|
||||
self.assertTrue(abs(expected_frames - frame_count) <= frame_tolerance,
|
||||
f"{camera} failed frame count check: expected {expected_frames}, got {frame_count}")
|
||||
shutil.rmtree(f"{route_prefix_path}--{i}")
|
||||
|
||||
if __name__ == "__main__":
|
||||
unittest.main()
|
||||
|
|
|
@ -19,8 +19,6 @@ def main():
|
|||
dat = b''.join(sock.recv_multipart())
|
||||
dat = dat.decode('utf8')
|
||||
|
||||
# print "RECV", repr(dat)
|
||||
|
||||
levelnum = ord(dat[0])
|
||||
dat = dat[1:]
|
||||
|
||||
|
|
Loading…
Reference in New Issue