loggerd: fix concurrency issues in sync encoders (#22590)
* sync_encoders * add variable camera_synced * cleanuppull/22160/head^2
parent
921600b95c
commit
18caf3535b
|
@ -118,13 +118,34 @@ struct LoggerdState {
|
|||
double last_rotate_tms = 0.;
|
||||
|
||||
// Sync logic for startup
|
||||
std::atomic<bool> encoders_synced;
|
||||
std::atomic<int> encoders_ready;
|
||||
std::atomic<uint32_t> start_frame_id;
|
||||
std::atomic<uint32_t> latest_frame_id;
|
||||
std::atomic<int> encoders_ready = 0;
|
||||
std::atomic<uint32_t> latest_frame_id = 0;
|
||||
bool camera_ready[WideRoadCam + 1] = {};
|
||||
bool camera_synced[WideRoadCam + 1] = {};
|
||||
};
|
||||
LoggerdState s;
|
||||
|
||||
// Wait for all encoders to reach the same frame id
|
||||
bool sync_encoders(LoggerdState *s, CameraType cam_type, uint32_t frame_id) {
|
||||
if (s->camera_synced[cam_type]) return true;
|
||||
|
||||
if (s->max_waiting > 1 && s->encoders_ready != s->max_waiting) {
|
||||
update_max_atomic(s->latest_frame_id, frame_id);
|
||||
if (std::exchange(s->camera_ready[cam_type], true) == false) {
|
||||
++s->encoders_ready;
|
||||
LOGE("camera %d encoder ready", cam_type);
|
||||
}
|
||||
return false;
|
||||
} else {
|
||||
// Small margin in case one of the encoders already dropped the next frame
|
||||
uint32_t start_frame_id = s->latest_frame_id + 2;
|
||||
bool synced = frame_id >= start_frame_id;
|
||||
s->camera_synced[cam_type] = synced;
|
||||
if (!synced) LOGE("camera %d waiting for frame %d, cur %d", cam_type, start_frame_id, frame_id);
|
||||
return synced;
|
||||
}
|
||||
}
|
||||
|
||||
void encoder_thread(const LogCameraInfo &cam_info) {
|
||||
set_thread_name(cam_info.filename);
|
||||
|
||||
|
@ -134,8 +155,6 @@ void encoder_thread(const LogCameraInfo &cam_info) {
|
|||
std::vector<Encoder *> encoders;
|
||||
VisionIpcClient vipc_client = VisionIpcClient("camerad", cam_info.stream_type, false);
|
||||
|
||||
bool ready = false;
|
||||
|
||||
while (!do_exit) {
|
||||
if (!vipc_client.connect(false)) {
|
||||
util::sleep_for(5);
|
||||
|
@ -163,27 +182,11 @@ void encoder_thread(const LogCameraInfo &cam_info) {
|
|||
VisionBuf* buf = vipc_client.recv(&extra);
|
||||
if (buf == nullptr) continue;
|
||||
|
||||
if (cam_info.trigger_rotate && (s.max_waiting > 1)) {
|
||||
if (!ready) {
|
||||
LOGE("%s encoder ready", cam_info.filename);
|
||||
++s.encoders_ready;
|
||||
ready = true;
|
||||
}
|
||||
|
||||
if (!s.encoders_synced) {
|
||||
update_max_atomic(s.latest_frame_id, extra.frame_id);
|
||||
continue;
|
||||
} else {
|
||||
// Wait for all encoders to reach the same frame id
|
||||
if (extra.frame_id < s.start_frame_id) {
|
||||
LOGE("%s waiting for frame %d, cur %d", cam_info.filename, s.start_frame_id.load(), extra.frame_id);
|
||||
continue;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if (cam_info.trigger_rotate) {
|
||||
s.last_camera_seen_tms = millis_since_boot();
|
||||
if (!sync_encoders(&s, cam_info.type, extra.frame_id)) {
|
||||
continue;
|
||||
}
|
||||
}
|
||||
|
||||
if (cam_info.trigger_rotate && (cnt >= SEGMENT_LENGTH * MAIN_FPS)) {
|
||||
|
@ -354,15 +357,6 @@ int main(int argc, char** argv) {
|
|||
uint64_t msg_count = 0, bytes_count = 0;
|
||||
double start_ts = millis_since_boot();
|
||||
while (!do_exit) {
|
||||
// Check if all encoders are ready and start encoding at the same time
|
||||
if ((s.max_waiting > 1) && !s.encoders_synced && (s.encoders_ready == s.max_waiting)) {
|
||||
// Small margin in case one of the encoders already dropped the next frame
|
||||
s.start_frame_id = s.latest_frame_id + 2;
|
||||
s.encoders_synced = true;
|
||||
LOGE("starting encoders at frame id %d", s.start_frame_id.load());
|
||||
}
|
||||
|
||||
|
||||
// poll for new messages on all sockets
|
||||
for (auto sock : poller->poll(1000)) {
|
||||
// drain socket
|
||||
|
|
Loading…
Reference in New Issue