remove cqueue, use class SafeQueue (#19774)

* class SafeQueue

* cleanup includes

* space

* add timeout

use try_poll

* add function empty() & size()

* class SafeQueue

* add timeout

use try_poll

* add function empty() & size()

* rebase master

* rebase master

* for loop

* fix bug
pull/19904/head
Dean Lee 2021-01-28 22:12:45 +08:00 committed by GitHub
parent 6583206ed4
commit e6783f4d9f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
10 changed files with 105 additions and 153 deletions

View File

@ -193,7 +193,7 @@ selfdrive/common/swaglog.h
selfdrive/common/swaglog.cc
selfdrive/common/util.cc
selfdrive/common/util.h
selfdrive/common/cqueue.[c,h]
selfdrive/common/queue.h
selfdrive/common/clutil.cc
selfdrive/common/clutil.h
selfdrive/common/params.h

View File

@ -121,14 +121,7 @@ CameraBuf::~CameraBuf() {
}
bool CameraBuf::acquire() {
{
std::unique_lock<std::mutex> lk(frame_queue_mutex);
bool got_frame = frame_queue_cv.wait_for(lk, std::chrono::milliseconds(1), [this]{ return !frame_queue.empty(); });
if (!got_frame) return false;
cur_buf_idx = frame_queue.front();
frame_queue.pop();
}
if (!safe_queue.try_pop(cur_buf_idx, 1)) return false;
const FrameMetadata &frame_data = camera_bufs_metadata[cur_buf_idx];
if (frame_data.frame_id == -1) {
@ -193,12 +186,8 @@ void CameraBuf::release() {
}
}
void CameraBuf::queue(size_t buf_idx){
{
std::lock_guard<std::mutex> lk(frame_queue_mutex);
frame_queue.push(buf_idx);
}
frame_queue_cv.notify_one();
void CameraBuf::queue(size_t buf_idx) {
safe_queue.push(buf_idx);
}
// common functions

View File

@ -1,7 +1,4 @@
#pragma once
#include <queue>
#include <mutex>
#include <condition_variable>
#include <stdlib.h>
#include <stdbool.h>
@ -10,6 +7,7 @@
#include <thread>
#include "common/mat.h"
#include "common/swaglog.h"
#include "common/queue.h"
#include "visionbuf.h"
#include "common/visionimg.h"
#include "imgproc/utils.h"
@ -103,9 +101,7 @@ private:
int cur_buf_idx;
std::mutex frame_queue_mutex;
std::condition_variable frame_queue_cv;
std::queue<size_t> frame_queue;
SafeQueue<int> safe_queue;
int frame_buf_count;
release_cb release_callback;

View File

@ -5,7 +5,7 @@ if SHARED:
else:
fxn = env.Library
common_libs = ['params.cc', 'swaglog.cc', 'cqueue.c', 'util.cc', 'gpio.cc', 'i2c.cc']
common_libs = ['params.cc', 'swaglog.cc', 'util.cc', 'gpio.cc', 'i2c.cc']
_common = fxn('common', common_libs, LIBS="json11")

View File

@ -1,54 +0,0 @@
#include <stdlib.h>
#include <string.h>
#include <assert.h>
#include "cqueue.h"
// TODO: replace by C++ queue and CV. See camerad
void queue_init(Queue *q) {
memset(q, 0, sizeof(*q));
TAILQ_INIT(&q->q);
pthread_mutex_init(&q->lock, NULL);
pthread_cond_init(&q->cv, NULL);
}
void* queue_pop(Queue *q) {
pthread_mutex_lock(&q->lock);
while (TAILQ_EMPTY(&q->q)) {
pthread_cond_wait(&q->cv, &q->lock);
}
QueueEntry *entry = TAILQ_FIRST(&q->q);
TAILQ_REMOVE(&q->q, entry, entries);
pthread_mutex_unlock(&q->lock);
void* r = entry->data;
free(entry);
return r;
}
void* queue_try_pop(Queue *q) {
pthread_mutex_lock(&q->lock);
void* r = NULL;
if (!TAILQ_EMPTY(&q->q)) {
QueueEntry *entry = TAILQ_FIRST(&q->q);
TAILQ_REMOVE(&q->q, entry, entries);
r = entry->data;
free(entry);
}
pthread_mutex_unlock(&q->lock);
return r;
}
void queue_push(Queue *q, void *data) {
QueueEntry *entry = calloc(1, sizeof(QueueEntry));
assert(entry);
entry->data = data;
pthread_mutex_lock(&q->lock);
TAILQ_INSERT_TAIL(&q->q, entry, entries);
pthread_cond_signal(&q->cv);
pthread_mutex_unlock(&q->lock);
}

View File

@ -1,33 +0,0 @@
#ifndef COMMON_CQUEUE_H
#define COMMON_CQUEUE_H
#include <sys/queue.h>
#include <pthread.h>
#ifdef __cplusplus
extern "C" {
#endif
// a blocking queue
typedef struct QueueEntry {
TAILQ_ENTRY(QueueEntry) entries;
void *data;
} QueueEntry;
typedef struct Queue {
TAILQ_HEAD(queue, QueueEntry) q;
pthread_mutex_t lock;
pthread_cond_t cv;
} Queue;
void queue_init(Queue *q);
void* queue_pop(Queue *q);
void* queue_try_pop(Queue *q);
void queue_push(Queue *q, void *data);
#ifdef __cplusplus
} // extern "C"
#endif
#endif

View File

@ -0,0 +1,52 @@
#pragma once
#include <condition_variable>
#include <mutex>
#include <queue>
template <class T>
class SafeQueue {
public:
SafeQueue() = default;
void push(const T& v) {
{
std::unique_lock lk(m);
q.push(v);
}
cv.notify_one();
}
T pop() {
std::unique_lock lk(m);
cv.wait(lk, [this] { return !q.empty(); });
T v = q.front();
q.pop();
return v;
}
bool try_pop(T& v, int timeout_ms = 0) {
std::unique_lock lk(m);
if (!cv.wait_for(lk, std::chrono::milliseconds(timeout_ms), [this] { return !q.empty(); })) {
return false;
}
v = q.front();
q.pop();
return true;
}
bool empty() const {
std::scoped_lock lk(m);
return q.empty();
}
size_t size() const {
std::scoped_lock lk(m);
return q.size();
}
private:
mutable std::mutex m;
std::condition_variable cv;
std::queue<T> q;
};

View File

@ -266,38 +266,32 @@ void encoder_thread(int cam_idx) {
rotate_state.setStreamFrameId(extra.frame_id);
// encode a frame
{
for (int i = 0; i < encoders.size(); ++i) {
int out_segment = -1;
int out_id = encoders[0]->encode_frame(buf->y, buf->u, buf->v,
int out_id = encoders[i]->encode_frame(buf->y, buf->u, buf->v,
buf->width, buf->height,
&out_segment, extra.timestamp_eof);
if (encoders.size() > 1) {
int out_segment_alt = -1;
encoders[1]->encode_frame(buf->y, buf->u, buf->v,
buf->width, buf->height,
&out_segment_alt, extra.timestamp_eof);
}
// publish encode index
MessageBuilder msg;
// this is really ugly
auto eidx = cam_idx == LOG_CAMERA_ID_DCAMERA ? msg.initEvent().initFrontEncodeIdx() :
(cam_idx == LOG_CAMERA_ID_ECAMERA ? msg.initEvent().initWideEncodeIdx() : msg.initEvent().initEncodeIdx());
eidx.setFrameId(extra.frame_id);
eidx.setTimestampSof(extra.timestamp_sof);
eidx.setTimestampEof(extra.timestamp_eof);
#ifdef QCOM2
eidx.setType(cereal::EncodeIndex::Type::FULL_H_E_V_C);
#else
eidx.setType(cam_idx == LOG_CAMERA_ID_DCAMERA ? cereal::EncodeIndex::Type::FRONT : cereal::EncodeIndex::Type::FULL_H_E_V_C);
#endif
eidx.setEncodeId(cnt);
eidx.setSegmentNum(out_segment);
eidx.setSegmentId(out_id);
if (lh) {
auto bytes = msg.toBytes();
lh_log(lh, bytes.begin(), bytes.size(), false);
if (i == 0 && out_id != -1) {
// publish encode index
MessageBuilder msg;
// this is really ugly
auto eidx = cam_idx == LOG_CAMERA_ID_DCAMERA ? msg.initEvent().initFrontEncodeIdx() :
(cam_idx == LOG_CAMERA_ID_ECAMERA ? msg.initEvent().initWideEncodeIdx() : msg.initEvent().initEncodeIdx());
eidx.setFrameId(extra.frame_id);
eidx.setTimestampSof(extra.timestamp_sof);
eidx.setTimestampEof(extra.timestamp_eof);
#ifdef QCOM2
eidx.setType(cereal::EncodeIndex::Type::FULL_H_E_V_C);
#else
eidx.setType(cam_idx == LOG_CAMERA_ID_DCAMERA ? cereal::EncodeIndex::Type::FRONT : cereal::EncodeIndex::Type::FULL_H_E_V_C);
#endif
eidx.setEncodeId(cnt);
eidx.setSegmentNum(out_segment);
eidx.setSegmentId(out_id);
if (lh) {
auto bytes = msg.toBytes();
lh_log(lh, bytes.begin(), bytes.size(), false);
}
}
}

View File

@ -19,6 +19,7 @@
#include <msm_media_info.h>
#include "common/mutex.h"
#include "common/util.h"
#include "common/swaglog.h"
#include "omx_encoder.h"
@ -29,6 +30,8 @@
assert(OMX_ErrorNone == _expr); \
} while (0)
extern ExitHandler do_exit;
// ***** OMX callback functions *****
void OmxEncoder::wait_for_state(OMX_STATETYPE state) {
@ -75,7 +78,7 @@ OMX_ERRORTYPE OmxEncoder::empty_buffer_done(OMX_HANDLETYPE component, OMX_PTR ap
OMX_BUFFERHEADERTYPE *buffer) {
// printf("empty_buffer_done\n");
OmxEncoder *e = (OmxEncoder*)app_data;
queue_push(&e->free_in, (void*)buffer);
e->free_in.push(buffer);
return OMX_ErrorNone;
}
@ -83,7 +86,7 @@ OMX_ERRORTYPE OmxEncoder::fill_buffer_done(OMX_HANDLETYPE component, OMX_PTR app
OMX_BUFFERHEADERTYPE *buffer) {
// printf("fill_buffer_done\n");
OmxEncoder *e = (OmxEncoder*)app_data;
queue_push(&e->done_out, (void*)buffer);
e->done_out.push(buffer);
return OMX_ErrorNone;
}
@ -170,9 +173,6 @@ OmxEncoder::OmxEncoder(const char* filename, int width, int height, int fps, int
this->fps = fps;
this->remuxing = !h265;
queue_init(&this->free_in);
queue_init(&this->done_out);
mutex_init_reentrant(&this->lock);
pthread_mutex_init(&this->state_lock, NULL);
pthread_cond_init(&this->state_cv, NULL);
@ -330,7 +330,7 @@ OmxEncoder::OmxEncoder(const char* filename, int width, int height, int fps, int
// fill the input free queue
for (auto &buf : this->in_buf_headers) {
queue_push(&this->free_in, (void*)buf);
this->free_in.push(buf);
}
}
@ -418,7 +418,14 @@ int OmxEncoder::encode_frame(const uint8_t *y_ptr, const uint8_t *u_ptr, const u
// this sometimes freezes... put it outside the encoder lock so we can still trigger rotates...
// THIS IS A REALLY BAD IDEA, but apparently the race has to happen 30 times to trigger this
//pthread_mutex_unlock(&this->lock);
OMX_BUFFERHEADERTYPE* in_buf = (OMX_BUFFERHEADERTYPE *)queue_pop(&this->free_in);
OMX_BUFFERHEADERTYPE* in_buf = nullptr;
while (!this->free_in.try_pop(in_buf, 20)) {
if (do_exit) {
pthread_mutex_unlock(&this->lock);
return -1;
}
}
//pthread_mutex_lock(&this->lock);
int ret = this->counter;
@ -465,8 +472,8 @@ int OmxEncoder::encode_frame(const uint8_t *y_ptr, const uint8_t *u_ptr, const u
// pump output
while (true) {
OMX_BUFFERHEADERTYPE *out_buf = (OMX_BUFFERHEADERTYPE *)queue_try_pop(&this->done_out);
if (!out_buf) {
OMX_BUFFERHEADERTYPE *out_buf;
if (!this->done_out.try_pop(out_buf)) {
break;
}
handle_out_buf(this, out_buf);
@ -547,7 +554,7 @@ void OmxEncoder::encoder_close() {
if (this->dirty) {
// drain output only if there could be frames in the encoder
OMX_BUFFERHEADERTYPE* in_buf = (OMX_BUFFERHEADERTYPE *)queue_pop(&this->free_in);
OMX_BUFFERHEADERTYPE* in_buf = this->free_in.pop();
in_buf->nFilledLen = 0;
in_buf->nOffset = 0;
in_buf->nFlags = OMX_BUFFERFLAG_EOS;
@ -556,7 +563,7 @@ void OmxEncoder::encoder_close() {
OMX_CHECK(OMX_EmptyThisBuffer(this->handle, in_buf));
while (true) {
OMX_BUFFERHEADERTYPE *out_buf = (OMX_BUFFERHEADERTYPE *)queue_pop(&this->done_out);
OMX_BUFFERHEADERTYPE *out_buf = this->done_out.pop();
handle_out_buf(this, out_buf);
@ -604,8 +611,9 @@ OmxEncoder::~OmxEncoder() {
OMX_CHECK(OMX_FreeHandle(this->handle));
while (queue_try_pop(&this->free_in));
while (queue_try_pop(&this->done_out));
OMX_BUFFERHEADERTYPE *out_buf;
while (this->free_in.try_pop(out_buf));
while (this->done_out.try_pop(out_buf));
if (this->codec_config) {
free(this->codec_config);

View File

@ -65,8 +65,8 @@ private:
uint64_t last_t;
Queue free_in;
Queue done_out;
SafeQueue<OMX_BUFFERHEADERTYPE *> free_in;
SafeQueue<OMX_BUFFERHEADERTYPE *> done_out;
AVFormatContext *ofmt_ctx;
AVCodecContext *codec_ctx;