OMX encoder stability (#19758)

* cleanup test first

* wait for other encoder threads

* fixes initial crash

* no print

* unused

* fix up tests

* need that

Co-authored-by: Comma Device <device@comma.ai>
albatross
Adeeb Shihadeh 2021-01-21 10:40:08 -08:00 committed by GitHub
parent 76cf500669
commit d4489fbf91
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 47 additions and 44 deletions

View File

@ -108,9 +108,11 @@ public:
SubSocket* fpkt_sock;
uint32_t stream_frame_id, log_frame_id, last_rotate_frame_id;
bool enabled, should_rotate, initialized;
std::atomic<bool> rotating;
std::atomic<int> cur_seg;
RotateState() : fpkt_sock(nullptr), stream_frame_id(0), log_frame_id(0),
last_rotate_frame_id(UINT32_MAX), enabled(false), should_rotate(false), initialized(false) {};
last_rotate_frame_id(UINT32_MAX), enabled(false), should_rotate(false), initialized(false), rotating(false), cur_seg(-1) {};
void waitLogThread() {
std::unique_lock<std::mutex> lk(fid_lock);
@ -232,17 +234,31 @@ void encoder_thread(int cam_idx) {
rotate_state.initialized = true;
}
// get new logger handle for new segment
if (lh) {
lh_close(lh);
}
lh = logger_get_handle(&s.logger);
// wait for all to start rotating
rotate_state.rotating = true;
for(auto &r : s.rotate_state) {
while(r.enabled && !r.rotating && !do_exit) util::sleep_for(5);
}
pthread_mutex_lock(&s.rotate_lock);
for (auto &e : encoders) {
e->encoder_close();
e->encoder_open(s.segment_path, s.rotate_segment);
}
rotate_state.cur_seg = s.rotate_segment;
pthread_mutex_unlock(&s.rotate_lock);
// wait for all to finish rotating
for(auto &r : s.rotate_state) {
while(r.enabled && r.cur_seg != s.rotate_segment && !do_exit) util::sleep_for(5);
}
rotate_state.rotating = false;
rotate_state.finish_rotate();
}
}

View File

@ -4,7 +4,6 @@ import os
import random
import shutil
import subprocess
import threading
import time
import unittest
from parameterized import parameterized
@ -15,22 +14,24 @@ from common.params import Params
from common.timeout import Timeout
from selfdrive.hardware import EON, TICI
from selfdrive.test.helpers import with_processes
from selfdrive.loggerd.config import ROOT, CAMERA_FPS
from selfdrive.loggerd.config import ROOT
# baseline file sizes for a 2s segment, in bytes
SEGMENT_LENGTH = 2
FULL_SIZE = 1253786
FULL_SIZE = 1253786 # file size for a 2s segment in bytes
if EON:
CAMERAS = {
"fcamera": FULL_SIZE,
"dcamera": 770920,
"qcamera": 38533,
}
CAMERAS = [
("fcamera.hevc", 20, FULL_SIZE),
("dcamera.hevc", 10, 770920),
("qcamera.ts", 20, 38533),
]
else:
CAMERAS = {f"{c}camera": FULL_SIZE if c!="q" else 38533 for c in ["f", "e", "d", "q"]}
ALL_CAMERA_COMBINATIONS = [(cameras,) for cameras in [CAMERAS, {k:CAMERAS[k] for k in CAMERAS if k!='dcamera'}]]
CAMERAS = [
("fcamera.hevc", 20, FULL_SIZE),
("dcamera.hevc", 20, FULL_SIZE),
("ecamera.hevc", 20, FULL_SIZE),
("qcamera.ts", 20, 38533),
]
# we check frame count, so we don't have to be too strict on size
FILE_SIZE_TOLERANCE = 0.5
@ -60,11 +61,10 @@ class TestEncoder(unittest.TestCase):
return os.path.join(ROOT, last_route)
# TODO: this should run faster than real time
@parameterized.expand(ALL_CAMERA_COMBINATIONS)
@with_processes(['camerad', 'sensord', 'loggerd'])
def test_log_rotation(self, cameras):
print("checking targets:", cameras)
Params().put("RecordFront", "1" if 'dcamera' in cameras else "0")
@parameterized.expand([(True, ), (False, )])
@with_processes(['camerad', 'sensord', 'loggerd'], init_time=3)
def test_log_rotation(self, record_front):
Params().put("RecordFront", str(int(record_front)))
num_segments = random.randint(80, 150)
if "CI" in os.environ:
@ -72,58 +72,45 @@ class TestEncoder(unittest.TestCase):
# wait for loggerd to make the dir for first segment
route_prefix_path = None
with Timeout(int(SEGMENT_LENGTH*2)):
with Timeout(int(SEGMENT_LENGTH*3)):
while route_prefix_path is None:
try:
route_prefix_path = self._get_latest_segment_path().rsplit("--", 1)[0]
except Exception:
time.sleep(0.1)
continue
def check_seg(i):
# check each camera file size
for camera, size in cameras.items():
ext = "ts" if camera=='qcamera' else "hevc"
file_path = f"{route_prefix_path}--{i}/{camera}.{ext}"
for camera, fps, size in CAMERAS:
if not record_front and "dcamera" in camera:
continue
file_path = f"{route_prefix_path}--{i}/{camera}"
# check file size
self.assertTrue(os.path.exists(file_path), f"couldn't find {file_path}")
self.assertTrue(os.path.exists(file_path))
file_size = os.path.getsize(file_path)
self.assertTrue(math.isclose(file_size, size, rel_tol=FILE_SIZE_TOLERANCE),
f"{camera} failed size check: expected {size}, got {file_size}")
if camera == 'qcamera':
continue
self.assertTrue(math.isclose(file_size, size, rel_tol=FILE_SIZE_TOLERANCE))
# TODO: this ffprobe call is really slow
# check frame count
cmd = f"ffprobe -v error -count_frames -select_streams v:0 -show_entries stream=nb_read_frames \
-of default=nokey=1:noprint_wrappers=1 {file_path}"
expected_frames = SEGMENT_LENGTH * CAMERA_FPS // 2 if (EON and camera=='dcamera') else SEGMENT_LENGTH * CAMERA_FPS
frame_tolerance = 1 if (EON and camera == 'dcamera') else 0
frame_count = int(subprocess.check_output(cmd, shell=True, encoding='utf8').strip())
expected_frames = fps * SEGMENT_LENGTH
frame_tolerance = 1 if (EON and camera == 'dcamera.hevc') else 0
probe = subprocess.check_output(cmd, shell=True, encoding='utf8')
frame_count = int(probe.split('\n')[0].strip())
self.assertTrue(abs(expected_frames - frame_count) <= frame_tolerance,
f"{camera} failed frame count check: expected {expected_frames}, got {frame_count}")
shutil.rmtree(f"{route_prefix_path}--{i}")
def join(ts, timeout):
for t in ts:
t.join(timeout)
threads = []
for i in trange(num_segments):
# poll for next segment
with Timeout(int(SEGMENT_LENGTH*2), error_msg=f"timed out waiting for segment {i}"):
while int(self._get_latest_segment_path().rsplit("--", 1)[1]) <= i:
time.sleep(0.1)
t = threading.Thread(target=check_seg, args=(i, ))
t.start()
threads.append(t)
join(threads, 0.1)
with Timeout(20):
join(threads, None)
check_seg(i)
if __name__ == "__main__":
unittest.main()