From 2b974150c4478f6df69b7fbbc7037548e9e0bbbf Mon Sep 17 00:00:00 2001 From: ZwX1616 Date: Sat, 3 Oct 2020 07:56:16 -0700 Subject: [PATCH] Refactor loggerd rotations (#2247) * loggerd:refactor rotate encoder * change notify_all to notify_one * finish * cancelWait * start front_encoder_thread on need * rebase master to resolve conflict * tici qcam works, but * ifs are ugly * synced rotate off frame ids * only start counting frames after modeld alive * typo * fix some bugs * never over rotate * works * missing space * zero frame count tolerance * lol * add back omx race protection * not need that * not to confuse different locks * playing with locks is playing with fire * tici now has qcam * size is overrated * make cppcheck happy * fall back if no camera * warn * missing condition * first is freestyle * no stream id before connecting * wait for increment * first will be imperfect * wait is futile * block frames when writing state * just log write time * exit loops when exit * tici qcamera * fixed * add atomics * incl Co-authored-by: deanlee Co-authored-by: Comma Device --- selfdrive/camerad/cameras/camera_common.h | 21 ++ selfdrive/loggerd/encoder.c | 3 + selfdrive/loggerd/loggerd.cc | 438 ++++++++++++++-------- selfdrive/loggerd/tests/test_loggerd.py | 8 +- 4 files changed, 313 insertions(+), 157 deletions(-) diff --git a/selfdrive/camerad/cameras/camera_common.h b/selfdrive/camerad/cameras/camera_common.h index 1bd8ac6c7..bd3d9d281 100644 --- a/selfdrive/camerad/cameras/camera_common.h +++ b/selfdrive/camerad/cameras/camera_common.h @@ -4,6 +4,8 @@ #include #include +#include "common/visionipc.h" + #define CAMERA_ID_IMX298 0 #define CAMERA_ID_IMX179 1 #define CAMERA_ID_S5K3P8SP 2 @@ -15,6 +17,12 @@ #define CAMERA_ID_AR0231 8 #define CAMERA_ID_MAX 9 +#define LOG_CAMERA_ID_FCAMERA 0 +#define LOG_CAMERA_ID_DCAMERA 1 +#define LOG_CAMERA_ID_ECAMERA 2 +#define LOG_CAMERA_ID_QCAMERA 3 +#define LOG_CAMERA_ID_MAX 4 + #ifdef __cplusplus extern "C" { #endif @@ -28,6 +36,19 @@ typedef struct CameraInfo { bool hdr; } CameraInfo; +typedef struct LogCameraInfo { + const char* filename; + const char* frame_packet_name; + const char* encode_idx_name; + VisionStreamType stream_type; + int frame_width, frame_height; + int fps; + int bitrate; + bool is_h265; + bool downscale; + bool has_qcamera; +} LogCameraInfo; + typedef struct FrameMetadata { uint32_t frame_id; uint64_t timestamp_eof; diff --git a/selfdrive/loggerd/encoder.c b/selfdrive/loggerd/encoder.c index 4939a4853..721c6901e 100644 --- a/selfdrive/loggerd/encoder.c +++ b/selfdrive/loggerd/encoder.c @@ -564,6 +564,9 @@ void encoder_open(EncoderState *s, const char* path) { avformat_alloc_output_context2(&s->ofmt_ctx, NULL, NULL, s->vid_path); assert(s->ofmt_ctx); +#ifdef QCOM2 + s->ofmt_ctx->oformat->flags = AVFMT_TS_NONSTRICT; +#endif s->out_stream = avformat_new_stream(s->ofmt_ctx, NULL); assert(s->out_stream); diff --git a/selfdrive/loggerd/loggerd.cc b/selfdrive/loggerd/loggerd.cc index c7487677a..687c9cfb8 100644 --- a/selfdrive/loggerd/loggerd.cc +++ b/selfdrive/loggerd/loggerd.cc @@ -11,6 +11,7 @@ #include #include #include +#include #include #include @@ -19,6 +20,7 @@ #include #include #include +#include #include #include @@ -34,7 +36,7 @@ #include "common/visionipc.h" #include "common/utilpp.h" #include "common/util.h" - +#include "camerad/cameras/camera_common.h" #include "logger.h" #include "messaging.hpp" #include "services.h" @@ -44,28 +46,73 @@ #define DISABLE_ENCODER #endif - #ifndef DISABLE_ENCODER #include "encoder.h" #include "raw_logger.h" #endif -#include "cereal/gen/cpp/log.capnp.h" - -#define CAM_IDX_FCAM 0 -#define CAM_IDX_DCAM 1 -#define CAM_IDX_ECAM 2 - -#define CAMERA_FPS 20 -#define SEGMENT_LENGTH 60 - #define MAIN_BITRATE 5000000 +#define QCAM_BITRATE 128000 +#define MAIN_FPS 20 #ifndef QCOM2 +#define MAX_CAM_IDX LOG_CAMERA_ID_DCAMERA #define DCAM_BITRATE 2500000 #else +#define MAX_CAM_IDX LOG_CAMERA_ID_ECAMERA #define DCAM_BITRATE MAIN_BITRATE #endif +#define NO_CAMERA_PATIENCE 500 // fall back to time-based rotation if all cameras are dead + +LogCameraInfo cameras_logged[LOG_CAMERA_ID_MAX] = { + [LOG_CAMERA_ID_FCAMERA] = { + .stream_type = VISION_STREAM_YUV, + .filename = "fcamera.hevc", + .frame_packet_name = "frame", + .encode_idx_name = "encodeIdx", + .fps = MAIN_FPS, + .bitrate = MAIN_BITRATE, + .is_h265 = true, + .downscale = false, + .has_qcamera = true + }, + [LOG_CAMERA_ID_DCAMERA] = { + .stream_type = VISION_STREAM_YUV_FRONT, + .filename = "dcamera.hevc", + .frame_packet_name = "frontFrame", + .encode_idx_name = "frontEncodeIdx", + .fps = MAIN_FPS, // on EONs, more compressed this way + .bitrate = DCAM_BITRATE, + .is_h265 = true, + .downscale = false, + .has_qcamera = false + }, + [LOG_CAMERA_ID_ECAMERA] = { + .stream_type = VISION_STREAM_YUV_WIDE, + .filename = "ecamera.hevc", + .frame_packet_name = "wideFrame", + .encode_idx_name = "wideEncodeIdx", + .fps = MAIN_FPS, + .bitrate = MAIN_BITRATE, + .is_h265 = true, + .downscale = false, + .has_qcamera = false + }, + [LOG_CAMERA_ID_QCAMERA] = { + .filename = "qcamera.ts", + .fps = MAIN_FPS, + .bitrate = QCAM_BITRATE, + .is_h265 = false, + .downscale = true, +#ifndef QCOM2 + .frame_width = 480, .frame_height = 360 +#else + .frame_width = 526, .frame_height = 330 // keep pixel count the same? +#endif + }, +}; +#define SEGMENT_LENGTH 60 + #define LOG_ROOT "/data/media/0/realdata" #define RAW_CLIP_LENGTH 100 // 5 seconds at 20fps @@ -86,44 +133,94 @@ volatile sig_atomic_t do_exit = 0; static void set_do_exit(int sig) { do_exit = 1; } + +class RotateState { +public: + SubSocket* fpkt_sock; + uint32_t stream_frame_id, log_frame_id, last_rotate_frame_id; + bool enabled, should_rotate; + + RotateState() : fpkt_sock(nullptr), stream_frame_id(0), log_frame_id(0), + last_rotate_frame_id(UINT32_MAX), enabled(false), should_rotate(false) {}; + + void waitLogThread() { + std::unique_lock lk(fid_lock); + while (stream_frame_id > log_frame_id //if the log camera is older, wait for it to catch up. + && (stream_frame_id - log_frame_id) < 8 // but if its too old then there probably was a discontinuity (visiond restarted) + && !do_exit) { + cv.wait(lk); + } + } + + void cancelWait() { cv.notify_one(); } + + void setStreamFrameId(uint32_t frame_id) { + fid_lock.lock(); + stream_frame_id = frame_id; + fid_lock.unlock(); + cv.notify_one(); + } + + void setLogFrameId(uint32_t frame_id) { + fid_lock.lock(); + log_frame_id = frame_id; + fid_lock.unlock(); + cv.notify_one(); + } + + void rotate() { + if (!enabled) { return; } + std::unique_lock lk(fid_lock); + should_rotate = true; + last_rotate_frame_id = stream_frame_id; + } + + void finish_rotate() { + std::unique_lock lk(fid_lock); + should_rotate = false; + } + +private: + std::mutex fid_lock; + std::condition_variable cv; +}; + struct LoggerdState { Context *ctx; LoggerState logger; - - std::mutex lock; - std::condition_variable cv; char segment_path[4096]; - uint32_t last_frame_id; - uint32_t rotate_last_frame_id; int rotate_segment; - int rotate_seq_id; - pthread_mutex_t rotate_lock; int num_encoder; - int should_close; - int finish_close; + std::atomic rotate_seq_id; + std::atomic should_close; + std::atomic finish_close; + + RotateState rotate_state[LOG_CAMERA_ID_MAX-1]; }; LoggerdState s; #ifndef DISABLE_ENCODER -void encoder_thread(bool is_streaming, bool raw_clips, int cam_idx) { - int err; +void encoder_thread(RotateState *rotate_state, bool is_streaming, bool raw_clips, int cam_idx) { - // 0:f, 1:d, 2:e - if (cam_idx == CAM_IDX_DCAM) { - // TODO: add this back - #ifndef QCOM2 - if (!read_db_bool("RecordFront")) return; - LOGW("recording front camera"); - #endif - set_thread_name("FrontCameraEncoder"); - } else if (cam_idx == CAM_IDX_FCAM) { - set_thread_name("RearCameraEncoder"); - } else if (cam_idx == CAM_IDX_ECAM) { - set_thread_name("WideCameraEncoder"); - } else { - LOGE("unexpected camera index provided"); - assert(false); + switch (cam_idx) { + case LOG_CAMERA_ID_DCAMERA: { + LOGW("recording front camera"); + set_thread_name("FrontCameraEncoder"); + break; + } + case LOG_CAMERA_ID_FCAMERA: { + set_thread_name("RearCameraEncoder"); + break; + } + case LOG_CAMERA_ID_ECAMERA: { + set_thread_name("WideCameraEncoder"); + break; + } + default: { + LOGE("unexpected camera index provided"); + assert(false); + } } VisionStream stream; @@ -131,30 +228,24 @@ void encoder_thread(bool is_streaming, bool raw_clips, int cam_idx) { bool encoder_inited = false; EncoderState encoder; EncoderState encoder_alt; - bool has_encoder_alt = false; + bool has_encoder_alt = cameras_logged[cam_idx].has_qcamera; + int encoder_segment = -1; + int cnt = 0; + rotate_state->enabled = true; pthread_mutex_lock(&s.rotate_lock); int my_idx = s.num_encoder; s.num_encoder += 1; pthread_mutex_unlock(&s.rotate_lock); - int encoder_segment = -1; - int cnt = 0; - - PubSocket *idx_sock = PubSocket::create(s.ctx, cam_idx == CAM_IDX_DCAM ? "frontEncodeIdx" : (cam_idx == CAM_IDX_ECAM ? "wideEncodeIdx" : "encodeIdx")); + PubSocket *idx_sock = PubSocket::create(s.ctx, cameras_logged[cam_idx].encode_idx_name); assert(idx_sock != NULL); LoggerHandle *lh = NULL; while (!do_exit) { VisionStreamBufs buf_info; - if (cam_idx == CAM_IDX_DCAM) { - err = visionstream_init(&stream, VISION_STREAM_YUV_FRONT, false, &buf_info); - } else if (cam_idx == CAM_IDX_FCAM) { - err = visionstream_init(&stream, VISION_STREAM_YUV, false, &buf_info); - } else if (cam_idx == CAM_IDX_ECAM) { - err = visionstream_init(&stream, VISION_STREAM_YUV_WIDE, false, &buf_info); - } + int err = visionstream_init(&stream, cameras_logged[cam_idx].stream_type, false, &buf_info); if (err != 0) { LOGD("visionstream connect fail"); usleep(100000); @@ -163,15 +254,24 @@ void encoder_thread(bool is_streaming, bool raw_clips, int cam_idx) { if (!encoder_inited) { LOGD("encoder init %dx%d", buf_info.width, buf_info.height); - encoder_init(&encoder, cam_idx == CAM_IDX_DCAM ? "dcamera.hevc" : (cam_idx == CAM_IDX_ECAM ? "ecamera.hevc" : "fcamera.hevc"), buf_info.width, buf_info.height, CAMERA_FPS, cam_idx == CAM_IDX_DCAM ? DCAM_BITRATE:MAIN_BITRATE, true, false); + encoder_init(&encoder, cameras_logged[cam_idx].filename, + buf_info.width, + buf_info.height, + cameras_logged[cam_idx].fps, + cameras_logged[cam_idx].bitrate, + cameras_logged[cam_idx].is_h265, + cameras_logged[cam_idx].downscale); - #ifndef QCOM2 - // TODO: fix qcamera on tici - if (cam_idx == CAM_IDX_FCAM) { - encoder_init(&encoder_alt, "qcamera.ts", 480, 360, CAMERA_FPS, 128000, false, true); - has_encoder_alt = true; + if (has_encoder_alt) { + encoder_init(&encoder_alt, cameras_logged[LOG_CAMERA_ID_QCAMERA].filename, + cameras_logged[LOG_CAMERA_ID_QCAMERA].frame_width, + cameras_logged[LOG_CAMERA_ID_QCAMERA].frame_height, + cameras_logged[LOG_CAMERA_ID_QCAMERA].fps, + cameras_logged[LOG_CAMERA_ID_QCAMERA].bitrate, + cameras_logged[LOG_CAMERA_ID_QCAMERA].is_h265, + cameras_logged[LOG_CAMERA_ID_QCAMERA].downscale); } - #endif + encoder_inited = true; if (is_streaming) { encoder.zmq_ctx = zmq_ctx_new(); @@ -187,7 +287,7 @@ void encoder_thread(bool is_streaming, bool raw_clips, int cam_idx) { RawLogger *rawlogger = NULL; if (raw_clips) { - rawlogger = new RawLogger("prcamera", buf_info.width, buf_info.height, CAMERA_FPS); + rawlogger = new RawLogger("prcamera", buf_info.width, buf_info.height, MAIN_FPS); } while (!do_exit) { @@ -203,66 +303,46 @@ void encoder_thread(bool is_streaming, bool raw_clips, int cam_idx) { //double msdiff = (double) diff / 1000000.0; // printf("logger latency to tsEof: %f\n", msdiff); - uint8_t *y = (uint8_t*)buf->addr; - uint8_t *u = y + (buf_info.width*buf_info.height); - uint8_t *v = u + (buf_info.width/2)*(buf_info.height/2); + { // all the rotation stuff + + pthread_mutex_lock(&s.rotate_lock); + pthread_mutex_unlock(&s.rotate_lock); + + // wait if camera pkt id is older than stream + rotate_state->waitLogThread(); - { - // all the rotation stuff - bool should_rotate = false; - std::unique_lock lk(s.lock); - if (cam_idx == CAM_IDX_FCAM) { // TODO: should wait for three cameras on tici? - // wait if log camera is older on back camera - while ( extra.frame_id > s.last_frame_id //if the log camera is older, wait for it to catch up. - && (extra.frame_id-s.last_frame_id) < 8 // but if its too old then there probably was a discontinuity (visiond restarted) - && !do_exit) { - s.cv.wait(lk); - } - should_rotate = extra.frame_id > s.rotate_last_frame_id && encoder_segment < s.rotate_segment && s.rotate_seq_id == my_idx; - } else { - // front camera is best effort - should_rotate = encoder_segment < s.rotate_segment && s.rotate_seq_id == my_idx; - } if (do_exit) break; // rotate the encoder if the logger is on a newer segment - if (should_rotate) { - LOG("rotate encoder to %s", s.segment_path); - + if (rotate_state->should_rotate) { + if (rotate_state->last_rotate_frame_id == 0) { + rotate_state->last_rotate_frame_id = extra.frame_id - 1; + } + while (s.rotate_seq_id != my_idx && !do_exit) { usleep(1000); } + LOGW("camera %d rotate encoder to %s.", cam_idx, s.segment_path); encoder_rotate(&encoder, s.segment_path, s.rotate_segment); s.rotate_seq_id = (my_idx + 1) % s.num_encoder; - if (has_encoder_alt) { encoder_rotate(&encoder_alt, s.segment_path, s.rotate_segment); } - if (raw_clips) { rawlogger->Rotate(s.segment_path, s.rotate_segment); } - encoder_segment = s.rotate_segment; if (lh) { lh_close(lh); } lh = logger_get_handle(&s.logger); - } - if (encoder.rotating) { pthread_mutex_lock(&s.rotate_lock); s.should_close += 1; pthread_mutex_unlock(&s.rotate_lock); - while(s.should_close > 0 && s.should_close < s.num_encoder) { - // printf("%d waiting for others to reach close, %d/%d \n", my_idx, s.should_close, s.num_encoder); - s.cv.wait(lk); - } + while(s.should_close > 0 && s.should_close < s.num_encoder && !do_exit) { usleep(1000); } pthread_mutex_lock(&s.rotate_lock); - if (s.should_close == s.num_encoder) { - s.should_close = 1 - s.num_encoder; - } else { - s.should_close += 1; - } + s.should_close = s.should_close == s.num_encoder ? 1 - s.num_encoder : s.should_close + 1; + encoder_close(&encoder); encoder_open(&encoder, encoder.next_path); encoder.segment = encoder.next_segment; @@ -276,13 +356,18 @@ void encoder_thread(bool is_streaming, bool raw_clips, int cam_idx) { s.finish_close += 1; pthread_mutex_unlock(&s.rotate_lock); - while(s.finish_close > 0 && s.finish_close < s.num_encoder) { - // printf("%d waiting for others to actually close, %d/%d \n", my_idx, s.finish_close, s.num_encoder); - s.cv.wait(lk); - } + while(s.finish_close > 0 && s.finish_close < s.num_encoder && !do_exit) { usleep(1000); } s.finish_close = 0; + + rotate_state->finish_rotate(); } } + + rotate_state->setStreamFrameId(extra.frame_id); + + uint8_t *y = (uint8_t*)buf->addr; + uint8_t *u = y + (buf_info.width*buf_info.height); + uint8_t *v = u + (buf_info.width/2)*(buf_info.height/2); { // encode hevc int out_segment = -1; @@ -302,11 +387,12 @@ void encoder_thread(bool is_streaming, bool raw_clips, int cam_idx) { MessageBuilder msg; auto eidx = msg.initEvent().initEncodeIdx(); eidx.setFrameId(extra.frame_id); -#ifdef QCOM2 + #ifdef QCOM2 eidx.setType(cereal::EncodeIndex::Type::FULL_H_E_V_C); -#else - eidx.setType(cam_idx == CAM_IDX_DCAM ? cereal::EncodeIndex::Type::FRONT : cereal::EncodeIndex::Type::FULL_H_E_V_C); -#endif + #else + eidx.setType(cam_idx == LOG_CAMERA_ID_DCAMERA ? cereal::EncodeIndex::Type::FRONT:cereal::EncodeIndex::Type::FULL_H_E_V_C); + #endif + eidx.setEncodeId(cnt); eidx.setSegmentNum(out_segment); eidx.setSegmentId(out_id); @@ -533,6 +619,10 @@ int main(int argc, char** argv) { if (getenv("LOGGERD_TEST")) { segment_length = atoi(getenv("LOGGERD_SEGMENT_LENGTH")); } + bool record_front = true; +#ifndef QCOM2 + record_front = read_db_bool("RecordFront"); +#endif setpriority(PRIO_PROCESS, 0, -12); @@ -546,7 +636,6 @@ int main(int argc, char** argv) { // subscribe to all services - SubSocket *frame_sock = NULL; std::vector socks; std::map qlog_counter; @@ -558,12 +647,11 @@ int main(int argc, char** argv) { if (it.should_log) { SubSocket * sock = SubSocket::create(s.ctx, name); assert(sock != NULL); - poller->registerSocket(sock); socks.push_back(sock); - if (name == "frame") { - frame_sock = sock; + for (int cid=0;cid<=MAX_CAM_IDX;cid++) { + if (name == cameras_logged[cid].frame_packet_name) { s.rotate_state[cid].fpkt_sock = sock; } } qlog_counter[sock] = (it.decimation == -1) ? -1 : 0; @@ -571,7 +659,6 @@ int main(int argc, char** argv) { } } - { auto words = gen_init_data(); auto bytes = words.asBytes(); @@ -588,14 +675,6 @@ int main(int argc, char** argv) { is_logging = false; } - if (is_logging) { - err = logger_next(&s.logger, LOG_ROOT, s.segment_path, sizeof(s.segment_path), &s.rotate_segment); - assert(err == 0); - LOGW("logging to %s", s.segment_path); - } - - double start_ts = seconds_since_boot(); - double last_rotate_ts = start_ts; s.rotate_seq_id = 0; s.should_close = 0; s.finish_close = 0; @@ -603,74 +682,124 @@ int main(int argc, char** argv) { pthread_mutex_init(&s.rotate_lock, NULL); #ifndef DISABLE_ENCODER // rear camera - std::thread encoder_thread_handle(encoder_thread, is_streaming, false, CAM_IDX_FCAM); - + std::thread encoder_thread_handle(encoder_thread, &s.rotate_state[LOG_CAMERA_ID_FCAMERA], is_streaming, false, LOG_CAMERA_ID_FCAMERA); // front camera - std::thread front_encoder_thread_handle(encoder_thread, false, false, CAM_IDX_DCAM); - + std::thread front_encoder_thread_handle; + if (record_front) { + front_encoder_thread_handle = std::thread(encoder_thread, &s.rotate_state[LOG_CAMERA_ID_DCAMERA], false, false, LOG_CAMERA_ID_DCAMERA); + } #ifdef QCOM2 // wide camera - std::thread wide_encoder_thread_handle(encoder_thread, false, false, CAM_IDX_ECAM); + std::thread wide_encoder_thread_handle(encoder_thread, &s.rotate_state[LOG_CAMERA_ID_ECAMERA], false, false, LOG_CAMERA_ID_ECAMERA); #endif #endif uint64_t msg_count = 0; uint64_t bytes_count = 0; + kj::Array buf = kj::heapArray(1024); + double start_ts = seconds_since_boot(); + double last_rotate_tms = millis_since_boot(); + double last_camera_seen_tms = millis_since_boot(); + uint32_t last_seen_log_frame_id[LOG_CAMERA_ID_MAX-1] = {0}; + uint32_t last_seen_log_frame_id_max = 0; while (!do_exit) { - for (auto sock : poller->poll(100 * 1000)) { + for (auto sock : poller->poll(100 * 1000)) { + Message * last_msg = nullptr; while (true) { Message * msg = sock->receive(true); if (msg == NULL){ break; } + delete last_msg; + last_msg = msg; - uint8_t* data = (uint8_t*)msg->getData(); - size_t len = msg->getSize(); - - if (sock == frame_sock) { - // track camera frames to sync to encoder - auto amsg = kj::heapArray((len / sizeof(capnp::word)) + 1); - memcpy(amsg.begin(), data, len); - - capnp::FlatArrayMessageReader cmsg(amsg); - cereal::Event::Reader event = cmsg.getRoot(); - if (event.isFrame()) { - std::unique_lock lk(s.lock); - s.last_frame_id = event.getFrame().getFrameId(); - lk.unlock(); - s.cv.notify_all(); - } - } - - logger_log(&s.logger, data, len, qlog_counter[sock] == 0); - delete msg; + logger_log(&s.logger, (uint8_t*)msg->getData(), msg->getSize(), qlog_counter[sock] == 0); if (qlog_counter[sock] != -1) { //printf("%p: %d/%d\n", socks[i], qlog_counter[socks[i]], qlog_freqs[socks[i]]); qlog_counter[sock]++; qlog_counter[sock] %= qlog_freqs[sock]; } - - bytes_count += len; + bytes_count += msg->getSize(); msg_count++; } + + if (last_msg) { + int fpkt_id = -1; + for (int cid=0;cid<=MAX_CAM_IDX;cid++) { + if (sock == s.rotate_state[cid].fpkt_sock) {fpkt_id=cid; break;} + } + if (fpkt_id>=0) { + // track camera frames to sync to encoder + // only process last frame + const uint8_t* data = (uint8_t*)last_msg->getData(); + const size_t len = last_msg->getSize(); + const size_t size = len / sizeof(capnp::word) + 1; + if (buf.size() < size) { + buf = kj::heapArray(size); + } + memcpy(buf.begin(), data, len); + + capnp::FlatArrayMessageReader cmsg(buf); + cereal::Event::Reader event = cmsg.getRoot(); + + if (fpkt_id == LOG_CAMERA_ID_FCAMERA) { + s.rotate_state[fpkt_id].setLogFrameId(event.getFrame().getFrameId()); + } else if (fpkt_id == LOG_CAMERA_ID_DCAMERA) { + s.rotate_state[fpkt_id].setLogFrameId(event.getFrontFrame().getFrameId()); + } else if (fpkt_id == LOG_CAMERA_ID_ECAMERA) { + s.rotate_state[fpkt_id].setLogFrameId(event.getWideFrame().getFrameId()); + } + last_seen_log_frame_id[fpkt_id] = s.rotate_state[fpkt_id].log_frame_id; + last_seen_log_frame_id_max = fmax(last_seen_log_frame_id_max, s.rotate_state[fpkt_id].log_frame_id); + last_camera_seen_tms = millis_since_boot(); + } + delete last_msg; + } } double ts = seconds_since_boot(); - if (ts - last_rotate_ts > segment_length) { + double tms = millis_since_boot(); + + bool new_segment = false; + + if (s.logger.part > -1) { + new_segment = true; + if (tms - last_camera_seen_tms <= NO_CAMERA_PATIENCE) { + for (int cid=0;cid<=MAX_CAM_IDX;cid++) { + // this *should* be redundant on tici since all camera frames are synced + new_segment &= (((s.rotate_state[cid].stream_frame_id >= s.rotate_state[cid].last_rotate_frame_id + segment_length * MAIN_FPS) && + (!s.rotate_state[cid].should_rotate)) || + (!s.rotate_state[cid].enabled)); + if (last_seen_log_frame_id[cid] + 2 < last_seen_log_frame_id_max) { LOGW("camera %d lags behind", cid); } +#ifndef QCOM2 + break; // only look at fcamera frame id if not QCOM2 +#endif + } + } else { + new_segment &= tms - last_rotate_tms > segment_length * 1000; + if (new_segment) { LOGW("no camera packet seen. auto rotated"); } + } + } else if (s.logger.part == -1) { + // always starts first segment immediately + new_segment = true; + } + + if (new_segment) { + pthread_mutex_lock(&s.rotate_lock); + last_rotate_tms = millis_since_boot(); + // rotate the log - - last_rotate_ts += segment_length; - - std::lock_guard guard(s.lock); - s.rotate_last_frame_id = s.last_frame_id; - if (is_logging) { err = logger_next(&s.logger, LOG_ROOT, s.segment_path, sizeof(s.segment_path), &s.rotate_segment); assert(err == 0); + if (s.logger.part == 0) { LOGW("logging to %s", s.segment_path); } LOGW("rotated to %s", s.segment_path); } + // rotate the encoders + for (int cid=0;cid<=MAX_CAM_IDX;cid++) { s.rotate_state[cid].rotate(); } + pthread_mutex_unlock(&s.rotate_lock); } if ((msg_count%1000) == 0) { @@ -679,14 +808,15 @@ int main(int argc, char** argv) { } LOGW("joining threads"); - s.cv.notify_all(); - + for (int cid=0;cid<=MAX_CAM_IDX;cid++) { s.rotate_state[cid].cancelWait(); } #ifndef DISABLE_ENCODER - #ifdef QCOM2 +#ifdef QCOM2 wide_encoder_thread_handle.join(); - #endif - front_encoder_thread_handle.join(); +#endif + if (record_front) { + front_encoder_thread_handle.join(); + } encoder_thread_handle.join(); LOGW("encoder joined"); #endif diff --git a/selfdrive/loggerd/tests/test_loggerd.py b/selfdrive/loggerd/tests/test_loggerd.py index 76398102c..78cd742d3 100755 --- a/selfdrive/loggerd/tests/test_loggerd.py +++ b/selfdrive/loggerd/tests/test_loggerd.py @@ -26,13 +26,13 @@ if EON: "qcamera": 38533, } elif TICI: - CAMERAS = {f"{c}camera": FULL_SIZE for c in ["f", "e", "d"]} + CAMERAS = {f"{c}camera": FULL_SIZE if c!="q" else 38533 for c in ["f", "e", "d", "q"]} else: CAMERAS = {} ALL_CAMERA_COMBINATIONS = [(cameras,) for cameras in [CAMERAS, {k:CAMERAS[k] for k in CAMERAS if k!='dcamera'}]] -FRAME_TOLERANCE = 2 +FRAME_TOLERANCE = 0 FILE_SIZE_TOLERANCE = 0.5 class TestLoggerd(unittest.TestCase): @@ -113,8 +113,10 @@ class TestLoggerd(unittest.TestCase): cmd = f"ffprobe -v error -count_frames -select_streams v:0 -show_entries stream=nb_read_frames \ -of default=nokey=1:noprint_wrappers=1 {file_path}" expected_frames = self.segment_length * CAMERA_FPS // 2 if (EON and camera=='dcamera') else self.segment_length * CAMERA_FPS + frame_tolerance = FRAME_TOLERANCE+1 if (EON and camera=='dcamera') or i==0 else FRAME_TOLERANCE frame_count = int(subprocess.check_output(cmd, shell=True, encoding='utf8').strip()) - self.assertTrue(abs(expected_frames - frame_count) <= FRAME_TOLERANCE, + + self.assertTrue(abs(expected_frames - frame_count) <= frame_tolerance, f"{camera} failed frame count check: expected {expected_frames}, got {frame_count}") shutil.rmtree(f"{route_prefix_path}--{i}")