replay: refactor FrameReader (#22438)

* decode from the previous keyframe after seek

* less memory

* some stream seems to contian no keyframes

* test random seek

* merge master

* continue

update test_cases

use fr

* merge master
pull/22488/head^2
Dean Lee 2021-10-19 05:19:23 +08:00 committed by GitHub
parent 84de248fa7
commit 0189a19b8e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 89 additions and 107 deletions

View File

@ -48,16 +48,16 @@ void camera_close(CameraState *s) {
void run_camera(CameraState *s) {
uint32_t stream_frame_id = 0, frame_id = 0;
size_t buf_idx = 0;
std::unique_ptr<uint8_t[]> rgb_buf = std::make_unique<uint8_t[]>(s->frame->getRGBSize());
while (!do_exit) {
if (stream_frame_id == s->frame->getFrameCount()) {
// loop stream
stream_frame_id = 0;
}
if (auto dat = s->frame->get(stream_frame_id++)) {
auto [rgb_buf, yuv_buf] = *dat;
if (s->frame->get(stream_frame_id++, rgb_buf.get(), nullptr)) {
s->buf.camera_bufs_metadata[buf_idx] = {.frame_id = frame_id};
auto &buf = s->buf.camera_bufs[buf_idx];
CL_CHECK(clEnqueueWriteBuffer(buf.copy_q, buf.buf_cl, CL_TRUE, 0, s->frame->getRGBSize(), rgb_buf, 0, NULL, NULL));
CL_CHECK(clEnqueueWriteBuffer(buf.copy_q, buf.buf_cl, CL_TRUE, 0, s->frame->getRGBSize(), rgb_buf.get(), 0, NULL, NULL));
s->buf.queue(buf_idx);
++frame_id;
buf_idx = (buf_idx + 1) % FRAME_BUF_COUNT;

View File

@ -48,19 +48,14 @@ void CameraServer::thread() {
}
// send frame
if (auto dat = fr->get(eidx.getSegmentId())) {
auto [rgb_dat, yuv_dat] = *dat;
VisionBuf *rgb_buf = vipc_server_->get_buffer(cam.rgb_type);
VisionBuf *yuv_buf = vipc_server_->get_buffer(cam.yuv_type);
if (fr->get(eidx.getSegmentId(), (uint8_t *)rgb_buf->addr, (uint8_t *)yuv_buf->addr)) {
VisionIpcBufExtra extra = {
.frame_id = eidx.getFrameId(),
.timestamp_sof = eidx.getTimestampSof(),
.timestamp_eof = eidx.getTimestampEof(),
};
VisionBuf *rgb_buf = vipc_server_->get_buffer(cam.rgb_type);
memcpy(rgb_buf->addr, rgb_dat, fr->getRGBSize());
VisionBuf *yuv_buf = vipc_server_->get_buffer(cam.yuv_type);
memcpy(yuv_buf->addr, yuv_dat, fr->getYUVSize());
vipc_server_->send(rgb_buf, &extra, false);
vipc_server_->send(yuv_buf, &extra, false);
} else {

View File

@ -1,6 +1,8 @@
#include "selfdrive/ui/replay/framereader.h"
#include <unistd.h>
#include <cassert>
#include <mutex>
#include "libyuv.h"
static int ffmpeg_lockmgr_cb(void **arg, enum AVLockOp op) {
@ -21,15 +23,13 @@ static int ffmpeg_lockmgr_cb(void **arg, enum AVLockOp op) {
return 0;
}
class AVInitializer {
public:
struct AVInitializer {
AVInitializer() {
int ret = av_lockmgr_register(ffmpeg_lockmgr_cb);
assert(ret >= 0);
av_register_all();
avformat_network_init();
}
~AVInitializer() { avformat_network_deinit(); }
};
@ -38,15 +38,6 @@ FrameReader::FrameReader() {
}
FrameReader::~FrameReader() {
// wait until thread is finished.
exit_ = true;
cv_decode_.notify_all();
cv_frame_.notify_all();
if (decode_thread_.joinable()) {
decode_thread_.join();
}
// free all.
for (auto &f : frames_) {
av_free_packet(&f.pkt);
}
@ -57,6 +48,9 @@ FrameReader::~FrameReader() {
if (pFormatCtx_) {
avformat_close_input(&pFormatCtx_);
}
if (av_frame_) {
av_frame_free(&av_frame_);
}
}
bool FrameReader::load(const std::string &url) {
@ -77,14 +71,18 @@ bool FrameReader::load(const std::string &url) {
int ret = avcodec_copy_context(pCodecCtx_, pCodecCtxOrig);
if (ret != 0) return false;
pCodecCtx_->thread_count = 0;
pCodecCtx_->thread_type = FF_THREAD_FRAME;
ret = avcodec_open2(pCodecCtx_, pCodec, NULL);
if (ret < 0) return false;
av_frame_ = av_frame_alloc();
width = pCodecCtxOrig->width;
height = pCodecCtxOrig->height;
frames_.reserve(60 * 20); // 20fps, one minute
do {
while (true) {
Frame &frame = frames_.emplace_back();
int err = av_read_frame(pFormatCtx_, &frame.pkt);
if (err < 0) {
@ -92,81 +90,77 @@ bool FrameReader::load(const std::string &url) {
valid_ = (err == AVERROR_EOF);
break;
}
} while (!exit_);
if (valid_) {
decode_thread_ = std::thread(&FrameReader::decodeThread, this);
// some stream seems to contian no keyframes
key_frames_count_ += frame.pkt.flags & AV_PKT_FLAG_KEY;
}
return valid_;
}
std::optional<std::pair<uint8_t *, uint8_t *>> FrameReader::get(int idx) {
bool FrameReader::get(int idx, uint8_t *rgb, uint8_t *yuv) {
assert(rgb != nullptr);
if (!valid_ || idx < 0 || idx >= frames_.size()) {
return std::nullopt;
return false;
}
std::unique_lock lk(mutex_);
decode_idx_ = idx;
cv_decode_.notify_one();
cv_frame_.wait(lk, [=] { return exit_ || frames_[idx].rgb_data || frames_[idx].failed; });
if (!frames_[idx].rgb_data) {
return std::nullopt;
}
return std::make_pair(frames_[idx].rgb_data.get(), frames_[idx].yuv_data.get());
return decode(idx, rgb, yuv);
}
void FrameReader::decodeThread() {
int idx = 0;
while (!exit_) {
const int from = std::max(idx - 15, 0);
const int to = std::min(idx + 20, (int)frames_.size());
for (int i = 0; i < frames_.size() && !exit_; ++i) {
Frame &frame = frames_[i];
if (i >= from && i < to) {
if (frame.rgb_data || frame.failed) continue;
bool FrameReader::decode(int idx, uint8_t *rgb, uint8_t *yuv) {
auto get_keyframe = [=](int idx) {
for (int i = idx; i >= 0 && key_frames_count_ > 1; --i) {
if (frames_[i].pkt.flags & AV_PKT_FLAG_KEY) return i;
}
return idx;
};
auto [rgb_data, yuv_data] = decodeFrame(&frame.pkt);
std::unique_lock lk(mutex_);
frame.rgb_data.reset(rgb_data);
frame.yuv_data.reset(yuv_data);
frame.failed = !rgb_data;
cv_frame_.notify_all();
} else {
frame.rgb_data.reset(nullptr);
frame.yuv_data.reset(nullptr);
frame.failed = false;
int from_idx = idx;
if (idx > 0 && !frames_[idx].decoded && !frames_[idx - 1].decoded) {
// find the previous keyframe
from_idx = get_keyframe(idx);
}
for (int i = from_idx; i <= idx; ++i) {
Frame &frame = frames_[i];
if ((!frame.decoded || i == idx) && !frame.failed) {
while (true) {
int ret = avcodec_decode_video2(pCodecCtx_, av_frame_, &frame.decoded, &(frame.pkt));
if (ret > 0 && !frame.decoded) {
// decode thread is still receiving the initial packets
usleep(0);
} else {
break;
}
}
frame.failed = !frame.decoded;
if (frame.decoded && i == idx) {
return decodeFrame(av_frame_, rgb, yuv);
}
}
// sleep & wait
std::unique_lock lk(mutex_);
cv_decode_.wait(lk, [=] { return exit_ || decode_idx_ != -1; });
idx = decode_idx_;
decode_idx_ = -1;
}
return false;
}
std::pair<uint8_t *, uint8_t *> FrameReader::decodeFrame(AVPacket *pkt) {
uint8_t *rgb_data = nullptr, *yuv_data = nullptr;
int gotFrame = 0;
AVFrame *f = av_frame_alloc();
avcodec_decode_video2(pCodecCtx_, f, &gotFrame, pkt);
if (gotFrame) {
rgb_data = new uint8_t[getRGBSize()];
yuv_data = new uint8_t[getYUVSize()];
int i, j, k;
for (i = 0; i < f->height; i++) {
memcpy(yuv_data + f->width * i, f->data[0] + f->linesize[0] * i, f->width);
bool FrameReader::decodeFrame(AVFrame *f, uint8_t *rgb, uint8_t *yuv) {
uint8_t *y = yuv;
if (!yuv) {
if (yuv_buf_.empty()) {
yuv_buf_.resize(getYUVSize());
}
for (j = 0; j < f->height / 2; j++) {
memcpy(yuv_data + f->width * i + f->width / 2 * j, f->data[1] + f->linesize[1] * j, f->width / 2);
}
for (k = 0; k < f->height / 2; k++) {
memcpy(yuv_data + f->width * i + f->width / 2 * j + f->width / 2 * k, f->data[2] + f->linesize[2] * k, f->width / 2);
}
uint8_t *u = yuv_data + f->width * f->height;
uint8_t *v = u + (f->width / 2) * (f->height / 2);
libyuv::I420ToRGB24(yuv_data, f->width, u, f->width / 2, v, f->width / 2, rgb_data, f->width * 3, f->width, f->height);
y = yuv_buf_.data();
}
av_frame_free(&f);
return {rgb_data, yuv_data};
int i, j, k;
for (i = 0; i < f->height; i++) {
memcpy(y + f->width * i, f->data[0] + f->linesize[0] * i, f->width);
}
for (j = 0; j < f->height / 2; j++) {
memcpy(y + f->width * i + f->width / 2 * j, f->data[1] + f->linesize[1] * j, f->width / 2);
}
for (k = 0; k < f->height / 2; k++) {
memcpy(y + f->width * i + f->width / 2 * j + f->width / 2 * k, f->data[2] + f->linesize[2] * k, f->width / 2);
}
uint8_t *u = yuv + f->width * f->height;
uint8_t *v = u + (f->width / 2) * (f->height / 2);
libyuv::I420ToRGB24(yuv, f->width, u, f->width / 2, v, f->width / 2, rgb, f->width * 3, f->width, f->height);
return true;
}

View File

@ -1,14 +1,8 @@
#pragma once
#include <atomic>
#include <condition_variable>
#include <mutex>
#include <optional>
#include <string>
#include <thread>
#include <vector>
// independent of QT, needs ffmpeg
extern "C" {
#include <libavcodec/avcodec.h>
#include <libavformat/avformat.h>
@ -19,7 +13,7 @@ public:
FrameReader();
~FrameReader();
bool load(const std::string &url);
std::optional<std::pair<uint8_t *, uint8_t*>> get(int idx);
bool get(int idx, uint8_t *rgb, uint8_t *yuv);
int getRGBSize() const { return width * height * 3; }
int getYUVSize() const { return width * height * 3 / 2; }
size_t getFrameCount() const { return frames_.size(); }
@ -28,24 +22,19 @@ public:
int width = 0, height = 0;
private:
void decodeThread();
std::pair<uint8_t *, uint8_t *> decodeFrame(AVPacket *pkt);
bool decode(int idx, uint8_t *rgb, uint8_t *yuv);
bool decodeFrame(AVFrame *f, uint8_t *rgb, uint8_t *yuv);
struct Frame {
AVPacket pkt = {};
std::unique_ptr<uint8_t[]> rgb_data = nullptr;
std::unique_ptr<uint8_t[]> yuv_data = nullptr;
int decoded = false;
bool failed = false;
};
std::vector<Frame> frames_;
AVFrame *av_frame_ = nullptr;
AVFormatContext *pFormatCtx_ = nullptr;
AVCodecContext *pCodecCtx_ = nullptr;
std::mutex mutex_;
std::condition_variable cv_decode_;
std::condition_variable cv_frame_;
int decode_idx_ = 0;
std::atomic<bool> exit_ = false;
int key_frames_count_ = 0;
std::vector<uint8_t> yuv_buf_;
bool valid_ = false;
std::thread decode_thread_;
};

View File

@ -53,10 +53,14 @@ TEST_CASE("Segment") {
// LogReader & FrameReader
REQUIRE(segment.log->events.size() > 0);
REQUIRE(std::is_sorted(segment.log->events.begin(), segment.log->events.end(), Event::lessThan()));
// sequence get 50 frames {
REQUIRE(segment.frames[RoadCam]->getFrameCount() == 1200);
auto &fr = segment.frames[RoadCam];
REQUIRE(fr->getFrameCount() == 1200);
std::unique_ptr<uint8_t[]> rgb_buf = std::make_unique<uint8_t[]>(fr->getRGBSize());
std::unique_ptr<uint8_t[]> yuv_buf = std::make_unique<uint8_t[]>(fr->getYUVSize());
// sequence get 50 frames
for (int i = 0; i < 50; ++i) {
REQUIRE(segment.frames[RoadCam]->get(i));
REQUIRE(fr->get(i, rgb_buf.get(), yuv_buf.get()));
}
loop.quit();
});