c++ replay: publish all frames in CameraServer (#22378)

* cameraserver

* support yuv

* init camera server in start()

* trigger ci
pull/22430/head
Dean Lee 2021-10-04 22:45:28 +08:00 committed by GitHub
parent fd801c454a
commit 1eb79d7a59
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 180 additions and 76 deletions

View File

@ -53,11 +53,11 @@ void run_camera(CameraState *s) {
// loop stream
stream_frame_id = 0;
}
uint8_t *dat = s->frame->get(stream_frame_id++);
if (dat) {
if (auto dat = s->frame->get(stream_frame_id++)) {
auto [rgb_buf, yuv_buf] = *dat;
s->buf.camera_bufs_metadata[buf_idx] = {.frame_id = frame_id};
auto &buf = s->buf.camera_bufs[buf_idx];
CL_CHECK(clEnqueueWriteBuffer(buf.copy_q, buf.buf_cl, CL_TRUE, 0, s->frame->getRGBSize(), dat, 0, NULL, NULL));
CL_CHECK(clEnqueueWriteBuffer(buf.copy_q, buf.buf_cl, CL_TRUE, 0, s->frame->getRGBSize(), rgb_buf, 0, NULL, NULL));
s->buf.queue(buf_idx);
++frame_id;
buf_idx = (buf_idx + 1) % FRAME_BUF_COUNT;

View File

@ -108,7 +108,7 @@ if GetOption('setup'):
if arch in ['x86_64', 'Darwin'] and os.path.exists(Dir("#tools/").get_abspath()):
qt_env['CXXFLAGS'] += ["-Wno-deprecated-declarations"]
replay_lib_src = ["replay/replay.cc", "replay/logreader.cc", "replay/framereader.cc", "replay/route.cc", "replay/util.cc"]
replay_lib_src = ["replay/replay.cc", "replay/camera.cc", "replay/logreader.cc", "replay/framereader.cc", "replay/route.cc", "replay/util.cc"]
replay_lib = qt_env.Library("qt_replay", replay_lib_src, LIBS=base_libs)
replay_libs = [replay_lib, 'avutil', 'avcodec', 'avformat', 'swscale', 'bz2', 'curl'] + qt_libs

View File

@ -0,0 +1,67 @@
#include "selfdrive/ui/replay/camera.h"
#include <cassert>
#include <iostream>
const int YUV_BUF_COUNT = 50;
CameraServer::CameraServer() {
device_id_ = cl_get_device_id(CL_DEVICE_TYPE_DEFAULT);
context_ = CL_CHECK_ERR(clCreateContext(nullptr, 1, &device_id_, nullptr, nullptr, &err));
camera_thread_ = std::thread(&CameraServer::thread, this);
}
CameraServer::~CameraServer() {
queue_.push({});
camera_thread_.join();
vipc_server_.reset(nullptr);
CL_CHECK(clReleaseContext(context_));
}
void CameraServer::startVipcServer() {
std::cout << (vipc_server_ ? "start" : "restart") << " vipc server" << std::endl;
vipc_server_.reset(new VisionIpcServer("camerad", device_id_, context_));
for (auto &cam : cameras_) {
if (cam.width > 0 && cam.height > 0) {
vipc_server_->create_buffers(cam.rgb_type, UI_BUF_COUNT, true, cam.width, cam.height);
vipc_server_->create_buffers(cam.yuv_type, YUV_BUF_COUNT, false, cam.width, cam.height);
}
}
vipc_server_->start_listener();
}
void CameraServer::thread() {
while (true) {
const auto [type, fr, eidx] = queue_.pop();
if (!fr) break;
auto &cam = cameras_[type];
// start|restart the vipc server if frame size changed
if (cam.width != fr->width || cam.height != fr->height) {
cam.width = fr->width;
cam.height = fr->height;
std::cout << "camera[" << type << "] frame size " << cam.width << "x" << cam.height << std::endl;
startVipcServer();
}
// send frame
if (auto dat = fr->get(eidx.getSegmentId())) {
auto [rgb_dat, yuv_dat] = *dat;
VisionIpcBufExtra extra = {
.frame_id = eidx.getFrameId(),
.timestamp_sof = eidx.getTimestampSof(),
.timestamp_eof = eidx.getTimestampEof(),
};
VisionBuf *rgb_buf = vipc_server_->get_buffer(cam.rgb_type);
memcpy(rgb_buf->addr, rgb_dat, fr->getRGBSize());
VisionBuf *yuv_buf = vipc_server_->get_buffer(cam.yuv_type);
memcpy(yuv_buf->addr, yuv_dat, fr->getYUVSize());
vipc_server_->send(rgb_buf, &extra, false);
vipc_server_->send(yuv_buf, &extra, false);
} else {
std::cout << "camera[" << type << "] failed to get frame:" << eidx.getSegmentId() << std::endl;
}
}
}

View File

@ -0,0 +1,41 @@
#pragma once
#include <unistd.h>
#include "cereal/visionipc/visionipc_server.h"
#include "selfdrive/common/queue.h"
#include "selfdrive/ui/replay/framereader.h"
#include "selfdrive/ui/replay/logreader.h"
class CameraServer {
public:
CameraServer();
~CameraServer();
inline void pushFrame(CameraType type, FrameReader* fr, const cereal::EncodeIndex::Reader& eidx) {
queue_.push({type, fr, eidx});
}
inline void waitFinish() {
while (!queue_.empty()) usleep(0);
}
protected:
void startVipcServer();
void thread();
struct Camera {
VisionStreamType rgb_type;
VisionStreamType yuv_type;
int width;
int height;
};
Camera cameras_[MAX_CAMERAS] = {
{.rgb_type = VISION_STREAM_RGB_BACK, .yuv_type = VISION_STREAM_YUV_BACK},
{.rgb_type = VISION_STREAM_RGB_FRONT, .yuv_type = VISION_STREAM_YUV_FRONT},
{.rgb_type = VISION_STREAM_RGB_WIDE, .yuv_type = VISION_STREAM_YUV_WIDE},
};
cl_device_id device_id_;
cl_context context_;
std::thread camera_thread_;
std::unique_ptr<VisionIpcServer> vipc_server_;
SafeQueue<std::tuple<CameraType, FrameReader*, const cereal::EncodeIndex::Reader>> queue_;
};

View File

@ -48,13 +48,6 @@ FrameReader::~FrameReader() {
// free all.
for (auto &f : frames_) {
av_free_packet(&f.pkt);
if (f.data) {
delete[] f.data;
}
}
while (!buffer_pool.empty()) {
delete[] buffer_pool.front();
buffer_pool.pop();
}
if (frmRgb_) {
av_frame_free(&frmRgb_);
@ -78,7 +71,7 @@ bool FrameReader::load(const std::string &url) {
return false;
}
avformat_find_stream_info(pFormatCtx_, NULL);
av_dump_format(pFormatCtx_, 0, url.c_str(), 0);
// av_dump_format(pFormatCtx_, 0, url.c_str(), 0);
auto pCodecCtxOrig = pFormatCtx_->streams[0]->codec;
auto pCodec = avcodec_find_decoder(pCodecCtxOrig->codec_id);
@ -119,15 +112,18 @@ bool FrameReader::load(const std::string &url) {
return valid_;
}
uint8_t *FrameReader::get(int idx) {
std::optional<std::pair<uint8_t *, uint8_t *>> FrameReader::get(int idx) {
if (!valid_ || idx < 0 || idx >= frames_.size()) {
return nullptr;
return std::nullopt;
}
std::unique_lock lk(mutex_);
decode_idx_ = idx;
cv_decode_.notify_one();
cv_frame_.wait(lk, [=] { return exit_ || frames_[idx].data || frames_[idx].failed; });
return frames_[idx].data;
cv_frame_.wait(lk, [=] { return exit_ || frames_[idx].rgb_data || frames_[idx].failed; });
if (!frames_[idx].rgb_data) {
return std::nullopt;
}
return std::make_pair(frames_[idx].rgb_data.get(), frames_[idx].yuv_data.get());
}
void FrameReader::decodeThread() {
@ -138,16 +134,17 @@ void FrameReader::decodeThread() {
for (int i = 0; i < frames_.size() && !exit_; ++i) {
Frame &frame = frames_[i];
if (i >= from && i < to) {
if (frame.data || frame.failed) continue;
if (frame.rgb_data || frame.failed) continue;
uint8_t *dat = decodeFrame(&frame.pkt);
auto [rgb_data, yuv_data] = decodeFrame(&frame.pkt);
std::unique_lock lk(mutex_);
frame.data = dat;
frame.failed = !dat;
frame.rgb_data.reset(rgb_data);
frame.yuv_data.reset(yuv_data);
frame.failed = !rgb_data;
cv_frame_.notify_all();
} else if (frame.data) {
buffer_pool.push(frame.data);
frame.data = nullptr;
} else {
frame.rgb_data.reset(nullptr);
frame.yuv_data.reset(nullptr);
frame.failed = false;
}
}
@ -160,28 +157,33 @@ void FrameReader::decodeThread() {
}
}
uint8_t *FrameReader::decodeFrame(AVPacket *pkt) {
int gotFrame;
std::pair<uint8_t *, uint8_t *> FrameReader::decodeFrame(AVPacket *pkt) {
uint8_t *rgb_data = nullptr, *yuv_data = nullptr;
int gotFrame = 0;
AVFrame *f = av_frame_alloc();
avcodec_decode_video2(pCodecCtx_, f, &gotFrame, pkt);
uint8_t *dat = nullptr;
if (gotFrame) {
if (!buffer_pool.empty()) {
dat = buffer_pool.front();
buffer_pool.pop();
} else {
dat = new uint8_t[getRGBSize()];
rgb_data = new uint8_t[getRGBSize()];
yuv_data = new uint8_t[getYUVSize()];
int i, j, k;
for (i = 0; i < f->height; i++) {
memcpy(yuv_data + f->width * i, f->data[0] + f->linesize[0] * i, f->width);
}
for (j = 0; j < f->height / 2; j++) {
memcpy(yuv_data + f->width * i + f->width / 2 * j, f->data[1] + f->linesize[1] * j, f->width / 2);
}
for (k = 0; k < f->height / 2; k++) {
memcpy(yuv_data + f->width * i + f->width / 2 * j + f->width / 2 * k, f->data[2] + f->linesize[2] * k, f->width / 2);
}
int ret = avpicture_fill((AVPicture *)frmRgb_, dat, AV_PIX_FMT_BGR24, f->width, f->height);
int ret = avpicture_fill((AVPicture *)frmRgb_, rgb_data, AV_PIX_FMT_BGR24, f->width, f->height);
assert(ret > 0);
if (sws_scale(sws_ctx_, (const uint8_t **)f->data, f->linesize, 0,
f->height, frmRgb_->data, frmRgb_->linesize) <= 0) {
delete[] dat;
dat = nullptr;
if (sws_scale(sws_ctx_, (const uint8_t **)f->data, f->linesize, 0, f->height, frmRgb_->data, frmRgb_->linesize) <= 0) {
delete[] rgb_data;
delete[] yuv_data;
rgb_data = yuv_data = nullptr;
}
}
av_frame_free(&f);
return dat;
return {rgb_data, yuv_data};
}

View File

@ -1,11 +1,9 @@
#pragma once
#include <unistd.h>
#include <atomic>
#include <condition_variable>
#include <mutex>
#include <queue>
#include <optional>
#include <string>
#include <thread>
#include <vector>
@ -22,8 +20,9 @@ public:
FrameReader();
~FrameReader();
bool load(const std::string &url);
uint8_t *get(int idx);
std::optional<std::pair<uint8_t *, uint8_t*>> get(int idx);
int getRGBSize() const { return width * height * 3; }
int getYUVSize() const { return width * height * 3 / 2; }
size_t getFrameCount() const { return frames_.size(); }
bool valid() const { return valid_; }
@ -31,10 +30,11 @@ public:
private:
void decodeThread();
uint8_t *decodeFrame(AVPacket *pkt);
std::pair<uint8_t *, uint8_t *> decodeFrame(AVPacket *pkt);
struct Frame {
AVPacket pkt = {};
uint8_t *data = nullptr;
std::unique_ptr<uint8_t[]> rgb_data = nullptr;
std::unique_ptr<uint8_t[]> yuv_data = nullptr;
bool failed = false;
};
std::vector<Frame> frames_;
@ -42,7 +42,6 @@ private:
AVFormatContext *pFormatCtx_ = nullptr;
AVCodecContext *pCodecCtx_ = nullptr;
AVFrame *frmRgb_ = nullptr;
std::queue<uint8_t *> buffer_pool;
struct SwsContext *sws_ctx_ = nullptr;
std::mutex mutex_;

View File

@ -51,6 +51,8 @@ bool Replay::load() {
void Replay::start(int seconds) {
seekTo(seconds);
camera_server_ = std::make_unique<CameraServer>();
// start stream thread
thread = new QThread;
QObject::connect(thread, &QThread::started, [=]() { stream(); });
@ -184,6 +186,22 @@ void Replay::mergeSegments(int cur_seg, int end_idx) {
}
}
void Replay::publishFrame(const Event *e) {
auto publish = [=](CameraType cam_type, const cereal::EncodeIndex::Reader &eidx) {
auto &seg = segments_[eidx.getSegmentNum()];
if (seg && seg->isLoaded() && seg->frames[cam_type] && eidx.getType() == cereal::EncodeIndex::Type::FULL_H_E_V_C) {
camera_server_->pushFrame(cam_type, seg->frames[cam_type].get(), eidx);
}
};
if (e->which == cereal::Event::ROAD_ENCODE_IDX) {
publish(RoadCam, e->event.getRoadEncodeIdx());
} else if (e->which == cereal::Event::DRIVER_ENCODE_IDX) {
publish(DriverCam, e->event.getDriverEncodeIdx());
} else if (e->which == cereal::Event::WIDE_ROAD_ENCODE_IDX) {
publish(WideRoadCam, e->event.getWideRoadEncodeIdx());
}
}
void Replay::stream() {
float last_print = 0;
cereal::Event::Which cur_which = cereal::Event::Which::INIT_DATA;
@ -227,38 +245,10 @@ void Replay::stream() {
precise_nano_sleep(behind_ns);
}
// publish frame
if (evt->frame) {
// TODO: publish all frames
if (evt->which == cereal::Event::ROAD_ENCODE_IDX) {
auto idx = evt->event.getRoadEncodeIdx();
auto &seg = segments_[idx.getSegmentNum()];
if (seg && seg->isLoaded() && idx.getType() == cereal::EncodeIndex::Type::FULL_H_E_V_C) {
auto &frm = seg->frames[RoadCam];
if (vipc_server == nullptr) {
cl_device_id device_id = cl_get_device_id(CL_DEVICE_TYPE_DEFAULT);
cl_context context = CL_CHECK_ERR(clCreateContext(NULL, 1, &device_id, NULL, NULL, &err));
vipc_server = new VisionIpcServer("camerad", device_id, context);
vipc_server->create_buffers(VisionStreamType::VISION_STREAM_RGB_BACK, UI_BUF_COUNT,
true, frm->width, frm->height);
vipc_server->start_listener();
}
uint8_t *dat = frm->get(idx.getSegmentId());
if (dat) {
VisionIpcBufExtra extra = {};
VisionBuf *buf = vipc_server->get_buffer(VisionStreamType::VISION_STREAM_RGB_BACK);
memcpy(buf->addr, dat, frm->getRGBSize());
vipc_server->send(buf, &extra, false);
}
}
}
// publish msg
publishFrame(evt);
} else {
// publish msg
if (sm == nullptr) {
auto bytes = evt->bytes();
pm->send(sockets_[cur_which], (capnp::byte *)bytes.begin(), bytes.size());
@ -268,5 +258,8 @@ void Replay::stream() {
}
}
}
// wait for frame to be sent before unlock.(frameReader may be deleted after unlock)
camera_server_->waitFinish();
}
}

View File

@ -4,6 +4,7 @@
#include <capnp/dynamic.h>
#include "cereal/visionipc/visionipc_server.h"
#include "selfdrive/ui/replay/camera.h"
#include "selfdrive/ui/replay/route.h"
constexpr int FORWARD_SEGS = 2;
@ -33,6 +34,7 @@ protected:
void setCurrentSegment(int n);
void mergeSegments(int begin_idx, int end_idx);
void updateEvents(const std::function<bool()>& lambda);
void publishFrame(const Event *e);
QThread *thread;
@ -55,7 +57,7 @@ protected:
SubMaster *sm;
PubMaster *pm;
std::vector<const char*> sockets_;
VisionIpcServer *vipc_server = nullptr;
std::unique_ptr<Route> route_;
bool load_dcam = false, load_ecam = false;
std::unique_ptr<CameraServer> camera_server_;
};