diff --git a/release/files_common b/release/files_common index fe12037f..05152663 100644 --- a/release/files_common +++ b/release/files_common @@ -193,7 +193,7 @@ selfdrive/common/swaglog.h selfdrive/common/swaglog.cc selfdrive/common/util.cc selfdrive/common/util.h -selfdrive/common/cqueue.[c,h] +selfdrive/common/queue.h selfdrive/common/clutil.cc selfdrive/common/clutil.h selfdrive/common/params.h diff --git a/selfdrive/camerad/cameras/camera_common.cc b/selfdrive/camerad/cameras/camera_common.cc index 8d63921f..b3b44aad 100644 --- a/selfdrive/camerad/cameras/camera_common.cc +++ b/selfdrive/camerad/cameras/camera_common.cc @@ -121,14 +121,7 @@ CameraBuf::~CameraBuf() { } bool CameraBuf::acquire() { - { - std::unique_lock lk(frame_queue_mutex); - bool got_frame = frame_queue_cv.wait_for(lk, std::chrono::milliseconds(1), [this]{ return !frame_queue.empty(); }); - if (!got_frame) return false; - - cur_buf_idx = frame_queue.front(); - frame_queue.pop(); - } + if (!safe_queue.try_pop(cur_buf_idx, 1)) return false; const FrameMetadata &frame_data = camera_bufs_metadata[cur_buf_idx]; if (frame_data.frame_id == -1) { @@ -193,12 +186,8 @@ void CameraBuf::release() { } } -void CameraBuf::queue(size_t buf_idx){ - { - std::lock_guard lk(frame_queue_mutex); - frame_queue.push(buf_idx); - } - frame_queue_cv.notify_one(); +void CameraBuf::queue(size_t buf_idx) { + safe_queue.push(buf_idx); } // common functions diff --git a/selfdrive/camerad/cameras/camera_common.h b/selfdrive/camerad/cameras/camera_common.h index 9582ca53..fb7018a2 100644 --- a/selfdrive/camerad/cameras/camera_common.h +++ b/selfdrive/camerad/cameras/camera_common.h @@ -1,7 +1,4 @@ #pragma once -#include -#include -#include #include #include @@ -10,6 +7,7 @@ #include #include "common/mat.h" #include "common/swaglog.h" +#include "common/queue.h" #include "visionbuf.h" #include "common/visionimg.h" #include "imgproc/utils.h" @@ -103,9 +101,7 @@ private: int cur_buf_idx; - std::mutex frame_queue_mutex; - std::condition_variable frame_queue_cv; - std::queue frame_queue; + SafeQueue safe_queue; int frame_buf_count; release_cb release_callback; diff --git a/selfdrive/common/SConscript b/selfdrive/common/SConscript index 567706f6..ec3603d3 100644 --- a/selfdrive/common/SConscript +++ b/selfdrive/common/SConscript @@ -5,7 +5,7 @@ if SHARED: else: fxn = env.Library -common_libs = ['params.cc', 'swaglog.cc', 'cqueue.c', 'util.cc', 'gpio.cc', 'i2c.cc'] +common_libs = ['params.cc', 'swaglog.cc', 'util.cc', 'gpio.cc', 'i2c.cc'] _common = fxn('common', common_libs, LIBS="json11") diff --git a/selfdrive/common/cqueue.c b/selfdrive/common/cqueue.c deleted file mode 100644 index 582a55fa..00000000 --- a/selfdrive/common/cqueue.c +++ /dev/null @@ -1,54 +0,0 @@ -#include -#include -#include - -#include "cqueue.h" - -// TODO: replace by C++ queue and CV. See camerad - -void queue_init(Queue *q) { - memset(q, 0, sizeof(*q)); - TAILQ_INIT(&q->q); - pthread_mutex_init(&q->lock, NULL); - pthread_cond_init(&q->cv, NULL); -} - -void* queue_pop(Queue *q) { - pthread_mutex_lock(&q->lock); - while (TAILQ_EMPTY(&q->q)) { - pthread_cond_wait(&q->cv, &q->lock); - } - QueueEntry *entry = TAILQ_FIRST(&q->q); - TAILQ_REMOVE(&q->q, entry, entries); - pthread_mutex_unlock(&q->lock); - - void* r = entry->data; - free(entry); - return r; -} - -void* queue_try_pop(Queue *q) { - pthread_mutex_lock(&q->lock); - - void* r = NULL; - if (!TAILQ_EMPTY(&q->q)) { - QueueEntry *entry = TAILQ_FIRST(&q->q); - TAILQ_REMOVE(&q->q, entry, entries); - r = entry->data; - free(entry); - } - - pthread_mutex_unlock(&q->lock); - return r; -} - -void queue_push(Queue *q, void *data) { - QueueEntry *entry = calloc(1, sizeof(QueueEntry)); - assert(entry); - entry->data = data; - - pthread_mutex_lock(&q->lock); - TAILQ_INSERT_TAIL(&q->q, entry, entries); - pthread_cond_signal(&q->cv); - pthread_mutex_unlock(&q->lock); -} diff --git a/selfdrive/common/cqueue.h b/selfdrive/common/cqueue.h deleted file mode 100644 index f2613660..00000000 --- a/selfdrive/common/cqueue.h +++ /dev/null @@ -1,33 +0,0 @@ -#ifndef COMMON_CQUEUE_H -#define COMMON_CQUEUE_H - -#include -#include - -#ifdef __cplusplus -extern "C" { -#endif - -// a blocking queue - -typedef struct QueueEntry { - TAILQ_ENTRY(QueueEntry) entries; - void *data; -} QueueEntry; - -typedef struct Queue { - TAILQ_HEAD(queue, QueueEntry) q; - pthread_mutex_t lock; - pthread_cond_t cv; -} Queue; - -void queue_init(Queue *q); -void* queue_pop(Queue *q); -void* queue_try_pop(Queue *q); -void queue_push(Queue *q, void *data); - -#ifdef __cplusplus -} // extern "C" -#endif - -#endif diff --git a/selfdrive/common/queue.h b/selfdrive/common/queue.h new file mode 100644 index 00000000..b3558b11 --- /dev/null +++ b/selfdrive/common/queue.h @@ -0,0 +1,52 @@ +#pragma once + +#include +#include +#include + +template +class SafeQueue { +public: + SafeQueue() = default; + + void push(const T& v) { + { + std::unique_lock lk(m); + q.push(v); + } + cv.notify_one(); + } + + T pop() { + std::unique_lock lk(m); + cv.wait(lk, [this] { return !q.empty(); }); + T v = q.front(); + q.pop(); + return v; + } + + bool try_pop(T& v, int timeout_ms = 0) { + std::unique_lock lk(m); + if (!cv.wait_for(lk, std::chrono::milliseconds(timeout_ms), [this] { return !q.empty(); })) { + return false; + } + v = q.front(); + q.pop(); + return true; + } + + bool empty() const { + std::scoped_lock lk(m); + return q.empty(); + } + + size_t size() const { + std::scoped_lock lk(m); + return q.size(); + } + +private: + mutable std::mutex m; + std::condition_variable cv; + std::queue q; +}; diff --git a/selfdrive/loggerd/loggerd.cc b/selfdrive/loggerd/loggerd.cc index 1d1daa4b..648b2137 100644 --- a/selfdrive/loggerd/loggerd.cc +++ b/selfdrive/loggerd/loggerd.cc @@ -266,38 +266,32 @@ void encoder_thread(int cam_idx) { rotate_state.setStreamFrameId(extra.frame_id); // encode a frame - { + for (int i = 0; i < encoders.size(); ++i) { int out_segment = -1; - int out_id = encoders[0]->encode_frame(buf->y, buf->u, buf->v, + int out_id = encoders[i]->encode_frame(buf->y, buf->u, buf->v, buf->width, buf->height, &out_segment, extra.timestamp_eof); - if (encoders.size() > 1) { - int out_segment_alt = -1; - encoders[1]->encode_frame(buf->y, buf->u, buf->v, - buf->width, buf->height, - &out_segment_alt, extra.timestamp_eof); - } - - // publish encode index - MessageBuilder msg; - // this is really ugly - auto eidx = cam_idx == LOG_CAMERA_ID_DCAMERA ? msg.initEvent().initFrontEncodeIdx() : - (cam_idx == LOG_CAMERA_ID_ECAMERA ? msg.initEvent().initWideEncodeIdx() : msg.initEvent().initEncodeIdx()); - eidx.setFrameId(extra.frame_id); - eidx.setTimestampSof(extra.timestamp_sof); - eidx.setTimestampEof(extra.timestamp_eof); - #ifdef QCOM2 - eidx.setType(cereal::EncodeIndex::Type::FULL_H_E_V_C); - #else - eidx.setType(cam_idx == LOG_CAMERA_ID_DCAMERA ? cereal::EncodeIndex::Type::FRONT : cereal::EncodeIndex::Type::FULL_H_E_V_C); - #endif - eidx.setEncodeId(cnt); - eidx.setSegmentNum(out_segment); - eidx.setSegmentId(out_id); - - if (lh) { - auto bytes = msg.toBytes(); - lh_log(lh, bytes.begin(), bytes.size(), false); + if (i == 0 && out_id != -1) { + // publish encode index + MessageBuilder msg; + // this is really ugly + auto eidx = cam_idx == LOG_CAMERA_ID_DCAMERA ? msg.initEvent().initFrontEncodeIdx() : + (cam_idx == LOG_CAMERA_ID_ECAMERA ? msg.initEvent().initWideEncodeIdx() : msg.initEvent().initEncodeIdx()); + eidx.setFrameId(extra.frame_id); + eidx.setTimestampSof(extra.timestamp_sof); + eidx.setTimestampEof(extra.timestamp_eof); + #ifdef QCOM2 + eidx.setType(cereal::EncodeIndex::Type::FULL_H_E_V_C); + #else + eidx.setType(cam_idx == LOG_CAMERA_ID_DCAMERA ? cereal::EncodeIndex::Type::FRONT : cereal::EncodeIndex::Type::FULL_H_E_V_C); + #endif + eidx.setEncodeId(cnt); + eidx.setSegmentNum(out_segment); + eidx.setSegmentId(out_id); + if (lh) { + auto bytes = msg.toBytes(); + lh_log(lh, bytes.begin(), bytes.size(), false); + } } } diff --git a/selfdrive/loggerd/omx_encoder.cc b/selfdrive/loggerd/omx_encoder.cc index 40b16739..b96d340f 100644 --- a/selfdrive/loggerd/omx_encoder.cc +++ b/selfdrive/loggerd/omx_encoder.cc @@ -19,6 +19,7 @@ #include #include "common/mutex.h" +#include "common/util.h" #include "common/swaglog.h" #include "omx_encoder.h" @@ -29,6 +30,8 @@ assert(OMX_ErrorNone == _expr); \ } while (0) +extern ExitHandler do_exit; + // ***** OMX callback functions ***** void OmxEncoder::wait_for_state(OMX_STATETYPE state) { @@ -75,7 +78,7 @@ OMX_ERRORTYPE OmxEncoder::empty_buffer_done(OMX_HANDLETYPE component, OMX_PTR ap OMX_BUFFERHEADERTYPE *buffer) { // printf("empty_buffer_done\n"); OmxEncoder *e = (OmxEncoder*)app_data; - queue_push(&e->free_in, (void*)buffer); + e->free_in.push(buffer); return OMX_ErrorNone; } @@ -83,7 +86,7 @@ OMX_ERRORTYPE OmxEncoder::fill_buffer_done(OMX_HANDLETYPE component, OMX_PTR app OMX_BUFFERHEADERTYPE *buffer) { // printf("fill_buffer_done\n"); OmxEncoder *e = (OmxEncoder*)app_data; - queue_push(&e->done_out, (void*)buffer); + e->done_out.push(buffer); return OMX_ErrorNone; } @@ -170,9 +173,6 @@ OmxEncoder::OmxEncoder(const char* filename, int width, int height, int fps, int this->fps = fps; this->remuxing = !h265; - queue_init(&this->free_in); - queue_init(&this->done_out); - mutex_init_reentrant(&this->lock); pthread_mutex_init(&this->state_lock, NULL); pthread_cond_init(&this->state_cv, NULL); @@ -330,7 +330,7 @@ OmxEncoder::OmxEncoder(const char* filename, int width, int height, int fps, int // fill the input free queue for (auto &buf : this->in_buf_headers) { - queue_push(&this->free_in, (void*)buf); + this->free_in.push(buf); } } @@ -418,7 +418,14 @@ int OmxEncoder::encode_frame(const uint8_t *y_ptr, const uint8_t *u_ptr, const u // this sometimes freezes... put it outside the encoder lock so we can still trigger rotates... // THIS IS A REALLY BAD IDEA, but apparently the race has to happen 30 times to trigger this //pthread_mutex_unlock(&this->lock); - OMX_BUFFERHEADERTYPE* in_buf = (OMX_BUFFERHEADERTYPE *)queue_pop(&this->free_in); + OMX_BUFFERHEADERTYPE* in_buf = nullptr; + while (!this->free_in.try_pop(in_buf, 20)) { + if (do_exit) { + pthread_mutex_unlock(&this->lock); + return -1; + } + } + //pthread_mutex_lock(&this->lock); int ret = this->counter; @@ -465,8 +472,8 @@ int OmxEncoder::encode_frame(const uint8_t *y_ptr, const uint8_t *u_ptr, const u // pump output while (true) { - OMX_BUFFERHEADERTYPE *out_buf = (OMX_BUFFERHEADERTYPE *)queue_try_pop(&this->done_out); - if (!out_buf) { + OMX_BUFFERHEADERTYPE *out_buf; + if (!this->done_out.try_pop(out_buf)) { break; } handle_out_buf(this, out_buf); @@ -547,7 +554,7 @@ void OmxEncoder::encoder_close() { if (this->dirty) { // drain output only if there could be frames in the encoder - OMX_BUFFERHEADERTYPE* in_buf = (OMX_BUFFERHEADERTYPE *)queue_pop(&this->free_in); + OMX_BUFFERHEADERTYPE* in_buf = this->free_in.pop(); in_buf->nFilledLen = 0; in_buf->nOffset = 0; in_buf->nFlags = OMX_BUFFERFLAG_EOS; @@ -556,7 +563,7 @@ void OmxEncoder::encoder_close() { OMX_CHECK(OMX_EmptyThisBuffer(this->handle, in_buf)); while (true) { - OMX_BUFFERHEADERTYPE *out_buf = (OMX_BUFFERHEADERTYPE *)queue_pop(&this->done_out); + OMX_BUFFERHEADERTYPE *out_buf = this->done_out.pop(); handle_out_buf(this, out_buf); @@ -604,8 +611,9 @@ OmxEncoder::~OmxEncoder() { OMX_CHECK(OMX_FreeHandle(this->handle)); - while (queue_try_pop(&this->free_in)); - while (queue_try_pop(&this->done_out)); + OMX_BUFFERHEADERTYPE *out_buf; + while (this->free_in.try_pop(out_buf)); + while (this->done_out.try_pop(out_buf)); if (this->codec_config) { free(this->codec_config); diff --git a/selfdrive/loggerd/omx_encoder.h b/selfdrive/loggerd/omx_encoder.h index 62912a65..c9f43352 100644 --- a/selfdrive/loggerd/omx_encoder.h +++ b/selfdrive/loggerd/omx_encoder.h @@ -65,8 +65,8 @@ private: uint64_t last_t; - Queue free_in; - Queue done_out; + SafeQueue free_in; + SafeQueue done_out; AVFormatContext *ofmt_ctx; AVCodecContext *codec_ctx;