#include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include "cereal/messaging/messaging.h" #include "cereal/services.h" #include "cereal/visionipc/visionipc.h" #include "cereal/visionipc/visionipc_client.h" #include "selfdrive/camerad/cameras/camera_common.h" #include "selfdrive/common/params.h" #include "selfdrive/common/swaglog.h" #include "selfdrive/common/timing.h" #include "selfdrive/common/util.h" #include "selfdrive/hardware/hw.h" #include "selfdrive/loggerd/encoder.h" #include "selfdrive/loggerd/logger.h" #if defined(QCOM) || defined(QCOM2) #include "selfdrive/loggerd/omx_encoder.h" #define Encoder OmxEncoder #else #include "selfdrive/loggerd/raw_logger.h" #define Encoder RawLogger #endif namespace { constexpr int MAIN_FPS = 20; const int MAIN_BITRATE = Hardware::TICI() ? 10000000 : 5000000; const int DCAM_BITRATE = Hardware::TICI() ? MAIN_BITRATE : 2500000; #define NO_CAMERA_PATIENCE 500 // fall back to time-based rotation if all cameras are dead const bool LOGGERD_TEST = getenv("LOGGERD_TEST"); const int SEGMENT_LENGTH = LOGGERD_TEST ? atoi(getenv("LOGGERD_SEGMENT_LENGTH")) : 60; ExitHandler do_exit; const LogCameraInfo cameras_logged[] = { { .type = RoadCam, .stream_type = VISION_STREAM_YUV_BACK, .filename = "fcamera.hevc", .frame_packet_name = "roadCameraState", .fps = MAIN_FPS, .bitrate = MAIN_BITRATE, .is_h265 = true, .downscale = false, .has_qcamera = true, .trigger_rotate = true, .enable = true, .record = true, }, { .type = DriverCam, .stream_type = VISION_STREAM_YUV_FRONT, .filename = "dcamera.hevc", .frame_packet_name = "driverCameraState", .fps = MAIN_FPS, // on EONs, more compressed this way .bitrate = DCAM_BITRATE, .is_h265 = true, .downscale = false, .has_qcamera = false, .trigger_rotate = Hardware::TICI(), .enable = !Hardware::PC(), .record = Params().getBool("RecordFront"), }, { .type = WideRoadCam, .stream_type = VISION_STREAM_YUV_WIDE, .filename = "ecamera.hevc", .frame_packet_name = "wideRoadCameraState", .fps = MAIN_FPS, .bitrate = MAIN_BITRATE, .is_h265 = true, .downscale = false, .has_qcamera = false, .trigger_rotate = true, .enable = Hardware::TICI(), .record = Hardware::TICI(), }, }; const LogCameraInfo qcam_info = { .filename = "qcamera.ts", .fps = MAIN_FPS, .bitrate = 256000, .is_h265 = false, .downscale = true, .frame_width = Hardware::TICI() ? 526 : 480, .frame_height = Hardware::TICI() ? 330 : 360 // keep pixel count the same? }; struct LoggerdState { Context *ctx; LoggerState logger = {}; char segment_path[4096]; std::mutex rotate_lock; std::condition_variable rotate_cv; std::atomic rotate_segment; std::atomic last_camera_seen_tms; std::atomic waiting_rotate; int max_waiting = 0; double last_rotate_tms = 0.; // Sync logic for startup std::atomic encoders_ready = 0; std::atomic latest_frame_id = 0; bool camera_ready[WideRoadCam + 1] = {}; bool camera_synced[WideRoadCam + 1] = {}; }; LoggerdState s; // Wait for all encoders to reach the same frame id bool sync_encoders(LoggerdState *state, CameraType cam_type, uint32_t frame_id) { if (state->camera_synced[cam_type]) return true; if (state->max_waiting > 1 && state->encoders_ready != state->max_waiting) { update_max_atomic(state->latest_frame_id, frame_id); if (std::exchange(state->camera_ready[cam_type], true) == false) { ++state->encoders_ready; LOGE("camera %d encoder ready", cam_type); } return false; } else { // Small margin in case one of the encoders already dropped the next frame uint32_t start_frame_id = state->latest_frame_id + 2; bool synced = frame_id >= start_frame_id; state->camera_synced[cam_type] = synced; if (!synced) LOGE("camera %d waiting for frame %d, cur %d", cam_type, start_frame_id, frame_id); return synced; } } void encoder_thread(const LogCameraInfo &cam_info) { set_thread_name(cam_info.filename); int cnt = 0, cur_seg = -1; int encode_idx = 0; LoggerHandle *lh = NULL; std::vector encoders; VisionIpcClient vipc_client = VisionIpcClient("camerad", cam_info.stream_type, false); while (!do_exit) { if (!vipc_client.connect(false)) { util::sleep_for(5); continue; } // init encoders if (encoders.empty()) { VisionBuf buf_info = vipc_client.buffers[0]; LOGD("encoder init %dx%d", buf_info.width, buf_info.height); // main encoder encoders.push_back(new Encoder(cam_info.filename, buf_info.width, buf_info.height, cam_info.fps, cam_info.bitrate, cam_info.is_h265, cam_info.downscale, cam_info.record)); // qcamera encoder if (cam_info.has_qcamera) { encoders.push_back(new Encoder(qcam_info.filename, qcam_info.frame_width, qcam_info.frame_height, qcam_info.fps, qcam_info.bitrate, qcam_info.is_h265, qcam_info.downscale)); } } while (!do_exit) { VisionIpcBufExtra extra; VisionBuf* buf = vipc_client.recv(&extra); if (buf == nullptr) continue; if (cam_info.trigger_rotate) { s.last_camera_seen_tms = millis_since_boot(); if (!sync_encoders(&s, cam_info.type, extra.frame_id)) { continue; } } if (cam_info.trigger_rotate && (cnt >= SEGMENT_LENGTH * MAIN_FPS)) { // trigger rotate and wait logger rotated to new segment ++s.waiting_rotate; std::unique_lock lk(s.rotate_lock); s.rotate_cv.wait(lk, [&] { return s.rotate_segment > cur_seg || do_exit; }); } if (do_exit) break; // rotate the encoder if the logger is on a newer segment if (s.rotate_segment > cur_seg) { cur_seg = s.rotate_segment; cnt = 0; LOGW("camera %d rotate encoder to %s", cam_info.type, s.segment_path); for (auto &e : encoders) { e->encoder_close(); e->encoder_open(s.segment_path); } if (lh) { lh_close(lh); } lh = logger_get_handle(&s.logger); } // encode a frame for (int i = 0; i < encoders.size(); ++i) { int out_id = encoders[i]->encode_frame(buf->y, buf->u, buf->v, buf->width, buf->height, extra.timestamp_eof); if (out_id == -1) { LOGE("Failed to encode frame. frame_id: %d encode_id: %d", extra.frame_id, encode_idx); } // publish encode index if (i == 0 && out_id != -1) { MessageBuilder msg; // this is really ugly bool valid = (buf->get_frame_id() == extra.frame_id); auto eidx = cam_info.type == DriverCam ? msg.initEvent(valid).initDriverEncodeIdx() : (cam_info.type == WideRoadCam ? msg.initEvent(valid).initWideRoadEncodeIdx() : msg.initEvent(valid).initRoadEncodeIdx()); eidx.setFrameId(extra.frame_id); eidx.setTimestampSof(extra.timestamp_sof); eidx.setTimestampEof(extra.timestamp_eof); if (Hardware::TICI()) { eidx.setType(cereal::EncodeIndex::Type::FULL_H_E_V_C); } else { eidx.setType(cam_info.type == DriverCam ? cereal::EncodeIndex::Type::FRONT : cereal::EncodeIndex::Type::FULL_H_E_V_C); } eidx.setEncodeId(encode_idx); eidx.setSegmentNum(cur_seg); eidx.setSegmentId(out_id); if (lh) { auto bytes = msg.toBytes(); lh_log(lh, bytes.begin(), bytes.size(), true); } } } cnt++; encode_idx++; } if (lh) { lh_close(lh); lh = NULL; } } LOG("encoder destroy"); for(auto &e : encoders) { e->encoder_close(); delete e; } } int clear_locks_fn(const char* fpath, const struct stat *sb, int tyupeflag) { const char* dot = strrchr(fpath, '.'); if (dot && strcmp(dot, ".lock") == 0) { unlink(fpath); } return 0; } void clear_locks() { ftw(LOG_ROOT.c_str(), clear_locks_fn, 16); } void logger_rotate() { { std::unique_lock lk(s.rotate_lock); int segment = -1; int err = logger_next(&s.logger, LOG_ROOT.c_str(), s.segment_path, sizeof(s.segment_path), &segment); assert(err == 0); s.rotate_segment = segment; s.waiting_rotate = 0; s.last_rotate_tms = millis_since_boot(); } s.rotate_cv.notify_all(); LOGW((s.logger.part == 0) ? "logging to %s" : "rotated to %s", s.segment_path); } void rotate_if_needed() { if (s.waiting_rotate == s.max_waiting) { logger_rotate(); } double tms = millis_since_boot(); if ((tms - s.last_rotate_tms) > SEGMENT_LENGTH * 1000 && (tms - s.last_camera_seen_tms) > NO_CAMERA_PATIENCE && !LOGGERD_TEST) { LOGW("no camera packet seen. auto rotating"); logger_rotate(); } } } // namespace int main(int argc, char** argv) { if (Hardware::EON()) { setpriority(PRIO_PROCESS, 0, -20); } else if (Hardware::TICI()) { int ret; ret = set_core_affinity({0, 1, 2, 3}); assert(ret == 0); // TODO: why does this impact camerad timings? //ret = set_realtime_priority(1); //assert(ret == 0); } clear_locks(); // setup messaging typedef struct QlogState { int counter, freq; } QlogState; std::unordered_map qlog_states; s.ctx = Context::create(); Poller * poller = Poller::create(); // subscribe to all socks for (const auto& it : services) { if (!it.should_log) continue; SubSocket * sock = SubSocket::create(s.ctx, it.name); assert(sock != NULL); poller->registerSocket(sock); qlog_states[sock] = {.counter = 0, .freq = it.decimation}; } // init logger logger_init(&s.logger, "rlog", true); logger_rotate(); Params().put("CurrentRoute", s.logger.route_name); // init encoders s.last_camera_seen_tms = millis_since_boot(); std::vector encoder_threads; for (const auto &ci : cameras_logged) { if (ci.enable) { encoder_threads.push_back(std::thread(encoder_thread, ci)); if (ci.trigger_rotate) s.max_waiting++; } } uint64_t msg_count = 0, bytes_count = 0; double start_ts = millis_since_boot(); while (!do_exit) { // poll for new messages on all sockets for (auto sock : poller->poll(1000)) { // drain socket QlogState &qs = qlog_states[sock]; Message *msg = nullptr; while (!do_exit && (msg = sock->receive(true))) { const bool in_qlog = qs.freq != -1 && (qs.counter++ % qs.freq == 0); logger_log(&s.logger, (uint8_t *)msg->getData(), msg->getSize(), in_qlog); bytes_count += msg->getSize(); delete msg; rotate_if_needed(); if ((++msg_count % 1000) == 0) { double seconds = (millis_since_boot() - start_ts) / 1000.0; LOGD("%lu messages, %.2f msg/sec, %.2f KB/sec", msg_count, msg_count / seconds, bytes_count * 0.001 / seconds); } } } } LOGW("closing encoders"); s.rotate_cv.notify_all(); for (auto &t : encoder_threads) t.join(); LOGW("closing logger"); logger_close(&s.logger, &do_exit); if (do_exit.power_failure) { LOGE("power failure"); sync(); LOGE("sync done"); } // messaging cleanup for (auto &[sock, qs] : qlog_states) delete sock; delete poller; delete s.ctx; return 0; }