loggerd: trigger rotate on frame id instead of frame count (#22848)

Co-authored-by: Comma Device <device@comma.ai>
pull/22902/head
Adeeb Shihadeh 2021-11-13 15:38:10 -08:00 committed by GitHub
parent 45ced84acf
commit ea761cbbd5
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 45 additions and 40 deletions

View File

@ -113,35 +113,35 @@ struct LoggerdState {
std::condition_variable rotate_cv;
std::atomic<int> rotate_segment;
std::atomic<double> last_camera_seen_tms;
std::atomic<int> waiting_rotate;
std::atomic<int> ready_to_rotate; // count of encoders ready to rotate
int max_waiting = 0;
double last_rotate_tms = 0.;
double last_rotate_tms = 0.; // last rotate time in ms
// Sync logic for startup
std::atomic<int> encoders_ready = 0;
std::atomic<uint32_t> latest_frame_id = 0;
std::atomic<uint32_t> start_frame_id = 0;
bool camera_ready[WideRoadCam + 1] = {};
bool camera_synced[WideRoadCam + 1] = {};
};
LoggerdState s;
// Wait for all encoders to reach the same frame id
// Handle initial encoder syncing by waiting for all encoders to reach the same frame id
bool sync_encoders(LoggerdState *state, CameraType cam_type, uint32_t frame_id) {
if (state->camera_synced[cam_type]) return true;
if (state->max_waiting > 1 && state->encoders_ready != state->max_waiting) {
update_max_atomic(state->latest_frame_id, frame_id);
// add a small margin to the start frame id in case one of the encoders already dropped the next frame
update_max_atomic(state->start_frame_id, frame_id + 2);
if (std::exchange(state->camera_ready[cam_type], true) == false) {
++state->encoders_ready;
LOGE("camera %d encoder ready", cam_type);
}
return false;
} else {
// Small margin in case one of the encoders already dropped the next frame
uint32_t start_frame_id = state->latest_frame_id + 2;
bool synced = frame_id >= start_frame_id;
if (state->max_waiting == 1) update_max_atomic(state->start_frame_id, frame_id);
bool synced = frame_id >= state->start_frame_id;
state->camera_synced[cam_type] = synced;
if (!synced) LOGE("camera %d waiting for frame %d, cur %d", cam_type, start_frame_id, frame_id);
if (!synced) LOGE("camera %d waiting for frame %d, cur %d", cam_type, (int)state->start_frame_id, frame_id);
return synced;
}
}
@ -149,7 +149,7 @@ bool sync_encoders(LoggerdState *state, CameraType cam_type, uint32_t frame_id)
void encoder_thread(const LogCameraInfo &cam_info) {
set_thread_name(cam_info.filename);
int cnt = 0, cur_seg = -1;
int cur_seg = -1;
int encode_idx = 0;
LoggerHandle *lh = NULL;
std::vector<Encoder *> encoders;
@ -187,20 +187,23 @@ void encoder_thread(const LogCameraInfo &cam_info) {
if (!sync_encoders(&s, cam_info.type, extra.frame_id)) {
continue;
}
}
if (cam_info.trigger_rotate && (cnt >= SEGMENT_LENGTH * MAIN_FPS)) {
// trigger rotate and wait logger rotated to new segment
++s.waiting_rotate;
std::unique_lock lk(s.rotate_lock);
s.rotate_cv.wait(lk, [&] { return s.rotate_segment > cur_seg || do_exit; });
// check if we're ready to rotate
const int frames_per_seg = SEGMENT_LENGTH * MAIN_FPS;
if (cur_seg >= 0 && extra.frame_id >= ((cur_seg+1) * frames_per_seg) + s.start_frame_id) {
// trigger rotate and wait until the main logger has rotated to the new segment
++s.ready_to_rotate;
std::unique_lock lk(s.rotate_lock);
s.rotate_cv.wait(lk, [&] {
return s.rotate_segment > cur_seg || do_exit;
});
if (do_exit) break;
}
}
if (do_exit) break;
// rotate the encoder if the logger is on a newer segment
if (s.rotate_segment > cur_seg) {
cur_seg = s.rotate_segment;
cnt = 0;
LOGW("camera %d rotate encoder to %s", cam_info.type, s.segment_path);
for (auto &e : encoders) {
@ -247,7 +250,6 @@ void encoder_thread(const LogCameraInfo &cam_info) {
}
}
cnt++;
encode_idx++;
}
@ -283,7 +285,7 @@ void logger_rotate() {
int err = logger_next(&s.logger, LOG_ROOT.c_str(), s.segment_path, sizeof(s.segment_path), &segment);
assert(err == 0);
s.rotate_segment = segment;
s.waiting_rotate = 0;
s.ready_to_rotate = 0;
s.last_rotate_tms = millis_since_boot();
}
s.rotate_cv.notify_all();
@ -291,7 +293,7 @@ void logger_rotate() {
}
void rotate_if_needed() {
if (s.waiting_rotate == s.max_waiting) {
if (s.ready_to_rotate == s.max_waiting) {
logger_rotate();
}
@ -347,10 +349,10 @@ int main(int argc, char** argv) {
// init encoders
s.last_camera_seen_tms = millis_since_boot();
std::vector<std::thread> encoder_threads;
for (const auto &ci : cameras_logged) {
if (ci.enable) {
encoder_threads.push_back(std::thread(encoder_thread, ci));
if (ci.trigger_rotate) s.max_waiting++;
for (const auto &cam : cameras_logged) {
if (cam.enable) {
encoder_threads.push_back(std::thread(encoder_thread, cam));
if (cam.trigger_rotate) s.max_waiting++;
}
}

View File

@ -97,10 +97,8 @@ class TestEncoder(unittest.TestCase):
file_path = f"{route_prefix_path}--{i}/{camera}"
# check file size
self.assertTrue(os.path.exists(file_path))
file_size = os.path.getsize(file_path)
self.assertTrue(math.isclose(file_size, size, rel_tol=FILE_SIZE_TOLERANCE))
# check file exists
self.assertTrue(os.path.exists(file_path), f"segment #{i}: '{file_path}' missing")
# TODO: this ffprobe call is really slow
# check frame count
@ -116,7 +114,11 @@ class TestEncoder(unittest.TestCase):
counts.append(frame_count)
self.assertTrue(abs(expected_frames - frame_count) <= frame_tolerance,
f"{camera} failed frame count check: expected {expected_frames}, got {frame_count}")
f"segment #{i}: {camera} failed frame count check: expected {expected_frames}, got {frame_count}")
# sanity check file size
file_size = os.path.getsize(file_path)
self.assertTrue(math.isclose(file_size, size, rel_tol=FILE_SIZE_TOLERANCE))
# Check encodeIdx
if encode_idx_name is not None:
@ -151,15 +153,16 @@ class TestEncoder(unittest.TestCase):
self.assertEqual(min(counts), expected_frames)
shutil.rmtree(f"{route_prefix_path}--{i}")
for i in trange(num_segments + 1):
# poll for next segment
with Timeout(int(SEGMENT_LENGTH*10), error_msg=f"timed out waiting for segment {i}"):
while Path(f"{route_prefix_path}--{i}") not in Path(ROOT).iterdir():
time.sleep(0.1)
managed_processes['loggerd'].stop()
managed_processes['camerad'].stop()
managed_processes['sensord'].stop()
try:
for i in trange(num_segments + 1):
# poll for next segment
with Timeout(int(SEGMENT_LENGTH*10), error_msg=f"timed out waiting for segment {i}"):
while Path(f"{route_prefix_path}--{i}") not in Path(ROOT).iterdir():
time.sleep(0.1)
finally:
managed_processes['loggerd'].stop()
managed_processes['camerad'].stop()
managed_processes['sensord'].stop()
for i in trange(num_segments):
check_seg(i)

View File

@ -133,7 +133,7 @@ class TestLoggerd(unittest.TestCase):
p = Path(f"{route_path}--{n}")
logged = set([f.name for f in p.iterdir() if f.is_file()])
diff = logged ^ expected_files
self.assertEqual(len(diff), 0, f"{_=} {route_path=} {n=}, {logged=} {expected_files=}")
self.assertEqual(len(diff), 0, f"didn't get all expected files. run={_} seg={n} {route_path=}, {diff=}\n{logged=} {expected_files=}")
def test_bootlog(self):
# generate bootlog with fake launch log