replay improvements (#22203)

* refactor replay

* cleanup

small cleanup

* merge 22239

* cleanup

* add optional argument for start time

* small cleaup
pull/22218/head
Dean Lee 2021-09-20 03:24:28 +08:00 committed by GitHub
parent 8eb92a98ba
commit 50ae7dd6a9
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
11 changed files with 420 additions and 317 deletions

View File

@ -108,7 +108,7 @@ if GetOption('setup'):
if arch in ['x86_64', 'Darwin'] and os.path.exists(Dir("#tools/").get_abspath()):
qt_env['CXXFLAGS'] += ["-Wno-deprecated-declarations"]
replay_lib_src = ["replay/replay.cc", "replay/filereader.cc", "replay/framereader.cc"]
replay_lib_src = ["replay/replay.cc", "replay/filereader.cc", "replay/framereader.cc", "replay/route.cc"]
replay_lib = qt_env.Library("qt_replay", replay_lib_src, LIBS=base_libs)
replay_libs = [replay_lib, 'avutil', 'avcodec', 'avformat', 'swscale', 'bz2'] + qt_libs

View File

@ -1,7 +1,8 @@
#include "selfdrive/ui/replay/filereader.h"
#include <cassert>
#include <bzlib.h>
#include <QtNetwork>
#include "selfdrive/common/util.h"
static bool decompressBZ2(std::vector<uint8_t> &dest, const char srcData[], size_t srcSize,
size_t outputSizeIncrement = 0x100000U) {
@ -25,85 +26,24 @@ static bool decompressBZ2(std::vector<uint8_t> &dest, const char srcData[], size
return ret == BZ_STREAM_END;
}
// class FileReader
FileReader::FileReader(const QString &fn, QObject *parent) : url_(fn), QObject(parent) {}
void FileReader::read() {
if (url_.isLocalFile()) {
QFile file(url_.toLocalFile());
if (file.open(QIODevice::ReadOnly)) {
emit finished(file.readAll());
} else {
emit failed(QString("Failed to read file %1").arg(url_.toString()));
}
} else {
startHttpRequest();
}
}
void FileReader::startHttpRequest() {
QNetworkAccessManager *qnam = new QNetworkAccessManager(this);
QNetworkRequest request(url_);
request.setAttribute(QNetworkRequest::FollowRedirectsAttribute, true);
reply_ = qnam->get(request);
connect(reply_, &QNetworkReply::finished, [=]() {
if (!reply_->error()) {
emit finished(reply_->readAll());
} else {
emit failed(reply_->errorString());
}
reply_->deleteLater();
reply_ = nullptr;
});
}
void FileReader::abort() {
if (reply_) reply_->abort();
}
// class LogReader
LogReader::LogReader(const QString &file, QObject *parent) : QObject(parent) {
file_reader_ = new FileReader(file);
file_reader_->moveToThread(&thread_);
connect(&thread_, &QThread::started, file_reader_, &FileReader::read);
connect(&thread_, &QThread::finished, file_reader_, &FileReader::deleteLater);
connect(file_reader_, &FileReader::finished, [=](const QByteArray &dat) {
parseEvents(dat);
});
connect(file_reader_, &FileReader::failed, [=](const QString &err) {
qDebug() << err;
});
thread_.start();
}
LogReader::~LogReader() {
// wait until thread is finished.
exit_ = true;
file_reader_->abort();
thread_.quit();
thread_.wait();
// clear events
for (auto e : events) {
delete e;
}
for (auto e : events) delete e;
}
void LogReader::parseEvents(const QByteArray &dat) {
bool LogReader::load(const std::string &file) {
raw_.resize(1024 * 1024 * 64);
if (!decompressBZ2(raw_, dat.data(), dat.size())) {
qWarning() << "bz2 decompress failed";
std::string dat = util::read_file(file);
if (dat.empty() || !decompressBZ2(raw_, dat.data(), dat.size())) {
LOGW("bz2 decompress failed");
return false;
}
auto insertEidx = [&](CameraType type, const cereal::EncodeIndex::Reader &e) {
eidx[type][e.getFrameId()] = {e.getSegmentNum(), e.getSegmentId()};
};
valid_ = true;
kj::ArrayPtr<const capnp::word> words((const capnp::word *)raw_.data(), raw_.size() / sizeof(capnp::word));
while (!exit_ && words.size() > 0) {
while (words.size() > 0) {
try {
std::unique_ptr<Event> evt = std::make_unique<Event>(words);
switch (evt->which) {
@ -120,13 +60,11 @@ void LogReader::parseEvents(const QByteArray &dat) {
break;
}
words = kj::arrayPtr(evt->reader.getEnd(), words.end());
events.insert(evt->mono_time, evt.release());
events.push_back(evt.release());
} catch (const kj::Exception &e) {
valid_ = false;
break;
return false;
}
}
if (!exit_) {
emit finished(valid_);
}
std::sort(events.begin(), events.end(), Event::lessThan());
return true;
}

View File

@ -3,46 +3,23 @@
#include <unordered_map>
#include <vector>
#include <QElapsedTimer>
#include <QMultiMap>
#include <QNetworkAccessManager>
#include <QString>
#include <QThread>
#include <capnp/serialize.h>
#include "cereal/gen/cpp/log.capnp.h"
#include "selfdrive/camerad/cameras/camera_common.h"
class FileReader : public QObject {
Q_OBJECT
public:
FileReader(const QString &fn, QObject *parent = nullptr);
void read();
void abort();
signals:
void finished(const QByteArray &dat);
void failed(const QString &err);
private:
void startHttpRequest();
QNetworkReply *reply_ = nullptr;
QUrl url_;
};
const CameraType ALL_CAMERAS[] = {RoadCam, DriverCam, WideRoadCam};
const int MAX_CAMERAS = std::size(ALL_CAMERAS);
struct EncodeIdx {
int segmentNum;
uint32_t frameEncodeId;
};
class Event {
public:
Event(cereal::Event::Which which, uint64_t mono_time) : reader(kj::ArrayPtr<capnp::word>{}) {
// construct a dummy Event for binary search, e.g std::upper_bound
this->which = which;
this->mono_time = mono_time;
}
Event(const kj::ArrayPtr<const capnp::word> &amsg) : reader(amsg) {
words = kj::ArrayPtr<const capnp::word>(amsg.begin(), reader.getEnd());
event = reader.getRoot<cereal::Event>();
@ -51,6 +28,12 @@ public:
}
inline kj::ArrayPtr<const capnp::byte> bytes() const { return words.asBytes(); }
struct lessThan {
inline bool operator()(const Event *l, const Event *r) {
return l->mono_time < r->mono_time || (l->mono_time == r->mono_time && l->which < r->which);
}
};
uint64_t mono_time;
cereal::Event::Which which;
cereal::Event::Reader event;
@ -58,27 +41,15 @@ public:
kj::ArrayPtr<const capnp::word> words;
};
class LogReader : public QObject {
Q_OBJECT
class LogReader {
public:
LogReader(const QString &file, QObject *parent = nullptr);
LogReader() = default;
~LogReader();
inline bool valid() const { return valid_; }
bool load(const std::string &file);
QMultiMap<uint64_t, Event*> events;
std::vector<Event*> events;
std::unordered_map<uint32_t, EncodeIdx> eidx[MAX_CAMERAS] = {};
signals:
void finished(bool success);
private:
void parseEvents(const QByteArray &dat);
std::atomic<bool> exit_ = false;
std::atomic<bool> valid_ = false;
std::vector<uint8_t> raw_;
FileReader *file_reader_ = nullptr;
QThread thread_;
};

View File

@ -1,9 +1,6 @@
#include "selfdrive/ui/replay/framereader.h"
#include <unistd.h>
#include <cassert>
#include "selfdrive/common/timing.h"
static int ffmpeg_lockmgr_cb(void **arg, enum AVLockOp op) {
std::mutex *mutex = (std::mutex *)*arg;
@ -35,7 +32,7 @@ public:
~AVInitializer() { avformat_network_deinit(); }
};
FrameReader::FrameReader(const std::string &url, int timeout_sec) : url_(url), timeout_(timeout_sec) {
FrameReader::FrameReader() {
static AVInitializer av_initializer;
}
@ -74,24 +71,14 @@ FrameReader::~FrameReader() {
}
}
int FrameReader::check_interrupt(void *p) {
FrameReader *fr = static_cast<FrameReader*>(p);
return fr->exit_ || (fr->timeout_ > 0 && millis_since_boot() > fr->timeout_ms_);
}
bool FrameReader::process() {
bool FrameReader::load(const std::string &url) {
pFormatCtx_ = avformat_alloc_context();
pFormatCtx_->interrupt_callback.callback = &FrameReader::check_interrupt;
pFormatCtx_->interrupt_callback.opaque = (void *)this;
if (timeout_ > 0) {
timeout_ms_ = millis_since_boot() + timeout_ * 1000;
}
if (avformat_open_input(&pFormatCtx_, url_.c_str(), NULL, NULL) != 0) {
printf("error loading %s\n", url_.c_str());
if (avformat_open_input(&pFormatCtx_, url.c_str(), NULL, NULL) != 0) {
printf("error loading %s\n", url.c_str());
return false;
}
avformat_find_stream_info(pFormatCtx_, NULL);
av_dump_format(pFormatCtx_, 0, url_.c_str(), 0);
av_dump_format(pFormatCtx_, 0, url.c_str(), 0);
auto pCodecCtxOrig = pFormatCtx_->streams[0]->codec;
auto pCodec = avcodec_find_decoder(pCodecCtxOrig->codec_id);

View File

@ -19,9 +19,9 @@ extern "C" {
class FrameReader {
public:
FrameReader(const std::string &url, int timeout_sec = 0);
FrameReader();
~FrameReader();
bool process();
bool load(const std::string &url);
uint8_t *get(int idx);
int getRGBSize() const { return width * height * 3; }
size_t getFrameCount() const { return frames_.size(); }
@ -32,7 +32,6 @@ public:
private:
void decodeThread();
uint8_t *decodeFrame(AVPacket *pkt);
static int check_interrupt(void *p);
struct Frame {
AVPacket pkt = {};
uint8_t *data = nullptr;
@ -52,8 +51,5 @@ private:
int decode_idx_ = 0;
std::atomic<bool> exit_ = false;
bool valid_ = false;
std::string url_;
std::thread decode_thread_;
int timeout_ = 0;
double timeout_ms_ = 0;
};

View File

@ -1,5 +1,6 @@
#include "selfdrive/ui/replay/replay.h"
#include <iostream>
#include <termios.h>
#include <QApplication>
@ -67,6 +68,7 @@ int main(int argc, char *argv[]){
parser.addPositionalArgument("route", "the drive to replay. find your drives at connect.comma.ai");
parser.addOption({{"a", "allow"}, "whitelist of services to send", "allow"});
parser.addOption({{"b", "block"}, "blacklist of services to send", "block"});
parser.addOption({{"s", "start"}, "start from <seconds>", "seconds"});
parser.addOption({"demo", "use a demo route instead of providing your own"});
parser.process(a);
@ -79,7 +81,7 @@ int main(int argc, char *argv[]){
QStringList allow = parser.value("allow").isEmpty() ? QStringList{} : parser.value("allow").split(",");
QStringList block = parser.value("block").isEmpty() ? QStringList{} : parser.value("block").split(",");
Replay *replay = new Replay(route, allow, block);
replay->start();
replay->start(parser.value("start").toInt());
// start keyboard control thread
QThread *t = QThread::create(keyboardThread, replay);

View File

@ -1,7 +1,6 @@
#include "selfdrive/ui/replay/replay.h"
#include <QJsonDocument>
#include <QJsonObject>
#include <QApplication>
#include "cereal/services.h"
#include "selfdrive/camerad/cameras/camera_common.h"
@ -14,7 +13,7 @@ Replay::Replay(QString route, QStringList allow, QStringList block, SubMaster *s
if ((allow.size() == 0 || allow.contains(it.name)) &&
!block.contains(it.name)) {
s.push_back(it.name);
socks.append(std::string(it.name));
socks.insert(it.name);
}
}
qDebug() << "services " << s;
@ -23,184 +22,196 @@ Replay::Replay(QString route, QStringList allow, QStringList block, SubMaster *s
pm = new PubMaster(s);
}
const QString url = CommaApi::BASE_URL + "/v1/route/" + route + "/files";
http = new HttpRequest(this, !Hardware::PC());
QObject::connect(http, &HttpRequest::receivedResponse, this, &Replay::parseResponse);
http->sendRequest(url);
route_ = std::make_unique<Route>(route);
events = new std::vector<Event *>();
// queueSegment is always executed in the main thread
connect(this, &Replay::segmentChanged, this, &Replay::queueSegment);
}
void Replay::parseResponse(const QString &response) {
QJsonDocument doc = QJsonDocument::fromJson(response.trimmed().toUtf8());
if (doc.isNull()) {
qDebug() << "JSON Parse failed";
Replay::~Replay() {
// TODO: quit stream thread and free resources.
}
void Replay::start(int seconds){
// load route
if (!route_->load() || route_->size() == 0) {
qDebug() << "failed load route" << route_->name() << "from server";
return;
}
camera_paths = doc["cameras"].toArray();
log_paths = doc["logs"].toArray();
qDebug() << "load route" << route_->name() << route_->size() << "segments, start from" << seconds;
segments.resize(route_->size());
seekTo(seconds);
seekTo(0);
}
void Replay::addSegment(int n) {
assert((n >= 0) && (n < log_paths.size()) && (n < camera_paths.size()));
if (lrs.find(n) != lrs.end()) {
return;
}
lrs[n] = new LogReader(log_paths.at(n).toString());
// this is a queued connection, mergeEvents is executed in the main thread.
QObject::connect(lrs[n], &LogReader::finished, this, &Replay::mergeEvents);
frs[n] = new FrameReader(qPrintable(camera_paths.at(n).toString()));
QThread * t = QThread::create([=]() { frs[n]->process(); });
QObject::connect(t, &QThread::finished, t, &QThread::deleteLater);
t->start();
}
void Replay::mergeEvents() {
const int start_idx = std::max(current_segment - BACKWARD_SEGS, 0);
const int end_idx = std::min(current_segment + FORWARD_SEGS, log_paths.size());
// merge logs
QMultiMap<uint64_t, Event *> *new_events = new QMultiMap<uint64_t, Event *>();
std::unordered_map<uint32_t, EncodeIdx> *new_eidx = new std::unordered_map<uint32_t, EncodeIdx>[MAX_CAMERAS];
for (int i = start_idx; i <= end_idx; ++i) {
if (auto it = lrs.find(i); it != lrs.end()) {
*new_events += (*it)->events;
for (CameraType cam_type : ALL_CAMERAS) {
new_eidx[cam_type].insert((*it)->eidx[cam_type].begin(), (*it)->eidx[cam_type].end());
}
}
}
// update logs
updating_events = true; // set updating_events to true to force stream thread relase the lock
lock.lock();
auto prev_events = std::exchange(events, new_events);
auto prev_eidx = std::exchange(eidx, new_eidx);
updating_events = false;
lock.unlock();
// free logs
delete prev_events;
delete[] prev_eidx;
for (int i = 0; i < log_paths.size(); i++) {
if (i < start_idx || i > end_idx) {
delete lrs.take(i);
delete frs.take(i);
}
}
}
void Replay::start(){
// start stream thread
thread = new QThread;
QObject::connect(thread, &QThread::started, [=](){
stream();
});
QObject::connect(thread, &QThread::started, [=]() { stream(); });
thread->start();
queue_thread = new QThread;
QObject::connect(queue_thread, &QThread::started, [=](){
segmentQueueThread();
});
queue_thread->start();
}
void Replay::seekTo(int seconds) {
if (segments.empty()) return;
updating_events = true;
std::unique_lock lk(lock);
seconds = std::clamp(seconds, 0, log_paths.size() * 60);
seconds = std::clamp(seconds, 0, (int)segments.size() * 60);
qInfo() << "seeking to " << seconds;
seek_ts = seconds;
current_segment = seconds / 60;
setCurrentSegment(std::clamp(seconds / 60, 0, (int)segments.size() - 1));
updating_events = false;
}
void Replay::relativeSeek(int seconds) {
if (current_ts > 0) {
seekTo(current_ts + seconds);
seekTo(current_ts + seconds);
}
void Replay::setCurrentSegment(int n) {
if (current_segment.exchange(n) != n) {
emit segmentChanged(n);
}
}
void Replay::segmentQueueThread() {
// maintain the segment window
while (true) {
int start_idx = std::max(current_segment - BACKWARD_SEGS, 0);
int end_idx = std::min(current_segment + FORWARD_SEGS, log_paths.size());
for (int i = 0; i < log_paths.size(); i++) {
if (i >= start_idx && i <= end_idx) {
addSegment(i);
// maintain the segment window
void Replay::queueSegment() {
assert(QThread::currentThreadId() == qApp->thread()->currentThreadId());
// fetch segments forward
int cur_seg = current_segment.load();
int end_idx = cur_seg;
for (int i = cur_seg, fwd = 0; i < segments.size() && fwd <= FORWARD_SEGS; ++i) {
if (!segments[i]) {
segments[i] = std::make_unique<Segment>(i, route_->at(i));
QObject::connect(segments[i].get(), &Segment::loadFinished, this, &Replay::queueSegment);
}
end_idx = i;
// skip invalid segment
fwd += segments[i]->isValid();
}
// merge segments
mergeSegments(cur_seg, end_idx);
}
void Replay::mergeSegments(int cur_seg, int end_idx) {
// segments must be merged in sequence.
std::vector<int> segments_need_merge;
const int begin_idx = std::max(cur_seg - BACKWARD_SEGS, 0);
for (int i = begin_idx; i <= end_idx; ++i) {
if (segments[i] && segments[i]->isLoaded()) {
segments_need_merge.push_back(i);
} else if (i >= cur_seg) {
// segment is valid,but still loading. can't skip it to merge the next one.
// otherwise the stream thread may jump to the next segment.
break;
}
}
if (segments_need_merge != segments_merged) {
qDebug() << "merge segments" << segments_need_merge;
segments_merged = segments_need_merge;
std::vector<Event *> *new_events = new std::vector<Event *>();
std::unordered_map<uint32_t, EncodeIdx> *new_eidx = new std::unordered_map<uint32_t, EncodeIdx>[MAX_CAMERAS];
for (int n : segments_need_merge) {
auto &log = segments[n]->log;
// merge & sort events
auto middle = new_events->insert(new_events->end(), log->events.begin(), log->events.end());
std::inplace_merge(new_events->begin(), middle, new_events->end(), Event::lessThan());
for (CameraType cam_type : ALL_CAMERAS) {
new_eidx[cam_type].insert(log->eidx[cam_type].begin(), log->eidx[cam_type].end());
}
}
// update logs
// set updating_events to true to force stream thread relase the lock
updating_events = true;
lock.lock();
if (route_start_ts == 0) {
// get route start time from initData
auto it = std::find_if(new_events->begin(), new_events->end(), [=](auto e) { return e->which == cereal::Event::Which::INIT_DATA; });
if (it != new_events->end()) {
route_start_ts = (*it)->mono_time;
}
}
auto prev_events = std::exchange(events, new_events);
auto prev_eidx = std::exchange(eidx, new_eidx);
updating_events = false;
lock.unlock();
// free segments
delete prev_events;
delete[] prev_eidx;
for (int i = 0; i < segments.size(); i++) {
if ((i < begin_idx || i > end_idx) && segments[i]) {
segments[i].reset(nullptr);
}
}
QThread::msleep(100);
}
}
void Replay::stream() {
QElapsedTimer timer;
timer.start();
route_start_ts = 0;
bool waiting_printed = false;
uint64_t cur_mono_time = 0;
cereal::Event::Which cur_which = cereal::Event::Which::INIT_DATA;
while (true) {
std::unique_lock lk(lock);
if (!events || events->size() == 0) {
lk.unlock();
qDebug() << "waiting for events";
QThread::msleep(100);
uint64_t evt_start_ts = seek_ts != -1 ? route_start_ts + (seek_ts * 1e9) : cur_mono_time;
Event cur_event(cur_which, evt_start_ts);
auto eit = std::upper_bound(events->begin(), events->end(), &cur_event, Event::lessThan());
if (eit == events->end()) {
lock.unlock();
if (std::exchange(waiting_printed, true) == false) {
qDebug() << "waiting for events...";
}
QThread::msleep(50);
continue;
}
// TODO: use initData's logMonoTime
if (route_start_ts == 0) {
route_start_ts = events->firstKey();
}
uint64_t t0 = seek_ts != -1 ? route_start_ts + (seek_ts * 1e9) : cur_mono_time;
waiting_printed = false;
seek_ts = -1;
qDebug() << "unlogging at" << int((t0 - route_start_ts) / 1e9);
uint64_t t0r = timer.nsecsElapsed();
uint64_t loop_start_ts = nanos_since_boot();
qDebug() << "unlogging at" << int((evt_start_ts - route_start_ts) / 1e9);
for (/**/; !updating_events && eit != events->end(); ++eit) {
const Event *evt = (*eit);
cur_which = evt->which;
cur_mono_time = evt->mono_time;
current_ts = (cur_mono_time - route_start_ts) / 1e9;
for (auto eit = events->lowerBound(t0); !updating_events && eit != events->end(); ++eit) {
cereal::Event::Reader e = (*eit)->event;
cur_mono_time = (*eit)->mono_time;
current_segment = (cur_mono_time - route_start_ts) / 1e9 / 60;
std::string type;
KJ_IF_MAYBE(e_, static_cast<capnp::DynamicStruct::Reader>(e).which()) {
KJ_IF_MAYBE(e_, static_cast<capnp::DynamicStruct::Reader>(evt->event).which()) {
type = e_->getProto().getName();
}
current_ts = std::max(cur_mono_time - route_start_ts, (uint64_t)0) / 1e9;
if (socks.contains(type)) {
float timestamp = (cur_mono_time - route_start_ts)/1e9;
if (std::abs(timestamp - last_print) > 5.0) {
last_print = timestamp;
if (socks.find(type) != socks.end()) {
if (std::abs(current_ts - last_print) > 5.0) {
last_print = current_ts;
qInfo() << "at " << int(last_print) << "s";
}
setCurrentSegment(current_ts / 60);
// keep time
long etime = cur_mono_time-t0;
long rtime = timer.nsecsElapsed() - t0r;
long us_behind = ((etime-rtime)*1e-3)+0.5;
long etime = cur_mono_time - evt_start_ts;
long rtime = nanos_since_boot() - loop_start_ts;
long us_behind = ((etime - rtime) * 1e-3) + 0.5;
if (us_behind > 0 && us_behind < 1e6) {
QThread::usleep(us_behind);
//qDebug() << "sleeping" << us_behind << etime << timer.nsecsElapsed();
}
// publish frame
// TODO: publish all frames
if (type == "roadCameraState") {
auto fr = e.getRoadCameraState();
auto it_ = eidx[RoadCam].find(fr.getFrameId());
if (evt->which == cereal::Event::ROAD_CAMERA_STATE) {
auto it_ = eidx[RoadCam].find(evt->event.getRoadCameraState().getFrameId());
if (it_ != eidx[RoadCam].end()) {
EncodeIdx &e = it_->second;
if (frs.find(e.segmentNum) != frs.end()) {
auto frm = frs[e.segmentNum];
auto &seg = segments[e.segmentNum];
if (seg && seg->isLoaded()) {
auto &frm = seg->frames[RoadCam];
if (vipc_server == nullptr) {
cl_device_id device_id = cl_get_device_id(CL_DEVICE_TYPE_DEFAULT);
cl_context context = CL_CHECK_ERR(clCreateContext(NULL, 1, &device_id, NULL, NULL, &err));
@ -224,12 +235,10 @@ void Replay::stream() {
// publish msg
if (sm == nullptr) {
auto bytes = (*eit)->bytes();
auto bytes = evt->bytes();
pm->send(type.c_str(), (capnp::byte *)bytes.begin(), bytes.size());
} else {
std::vector<std::pair<std::string, cereal::Event::Reader>> messages;
messages.push_back({type, e});
sm->update_msgs(nanos_since_boot(), messages);
sm->update_msgs(nanos_since_boot(), {{type, evt->event}});
}
}
}

View File

@ -1,17 +1,11 @@
#pragma once
#include <iostream>
#include <QJsonArray>
#include <QThread>
#include <set>
#include <capnp/dynamic.h>
#include "cereal/visionipc/visionipc_server.h"
#include "selfdrive/common/util.h"
#include "selfdrive/ui/qt/api.h"
#include "selfdrive/ui/replay/filereader.h"
#include "selfdrive/ui/replay/framereader.h"
#include "selfdrive/ui/replay/route.h"
constexpr int FORWARD_SEGS = 2;
constexpr int BACKWARD_SEGS = 2;
@ -21,45 +15,43 @@ class Replay : public QObject {
public:
Replay(QString route, QStringList allow, QStringList block, SubMaster *sm = nullptr, QObject *parent = 0);
~Replay();
void start();
void addSegment(int n);
void start(int seconds = 0);
void relativeSeek(int seconds);
void seekTo(int seconds);
signals:
void segmentChanged(int);
protected slots:
void queueSegment();
protected:
void stream();
void segmentQueueThread();
void setCurrentSegment(int n);
void mergeSegments(int begin_idx, int end_idx);
public slots:
void parseResponse(const QString &response);
void mergeEvents();
private:
float last_print = 0;
uint64_t route_start_ts;
uint64_t route_start_ts = 0;
std::atomic<int> seek_ts = 0;
std::atomic<int> current_ts = 0;
std::atomic<int> current_segment = 0;
std::atomic<int> current_segment = -1;
QThread *thread;
QThread *kb_thread;
QThread *queue_thread;
// logs
std::mutex lock;
std::atomic<bool> updating_events = false;
QMultiMap<uint64_t, Event *> *events = nullptr;
std::vector<Event *> *events = nullptr;
std::unordered_map<uint32_t, EncodeIdx> *eidx = nullptr;
HttpRequest *http;
QJsonArray camera_paths;
QJsonArray log_paths;
QMap<int, LogReader*> lrs;
QMap<int, FrameReader*> frs;
std::vector<std::unique_ptr<Segment>> segments;
std::vector<int> segments_merged;
// messaging
SubMaster *sm;
PubMaster *pm;
QVector<std::string> socks;
std::set<std::string> socks;
VisionIpcServer *vipc_server = nullptr;
std::unique_ptr<Route> route_;
};

View File

@ -0,0 +1,153 @@
#include "selfdrive/ui/replay/route.h"
#include <QDir>
#include <QEventLoop>
#include <QFile>
#include <QJsonArray>
#include <QJsonDocument>
#include <QRegExp>
#include <future>
#include "selfdrive/hardware/hw.h"
#include "selfdrive/ui/qt/api.h"
Route::Route(const QString &route) : route_(route) {}
bool Route::load() {
QEventLoop loop;
auto onError = [&loop](const QString &err) {
qInfo() << err;
loop.quit();
};
bool ret = false;
HttpRequest http(nullptr, !Hardware::PC());
QObject::connect(&http, &HttpRequest::failedResponse, onError);
QObject::connect(&http, &HttpRequest::timeoutResponse, onError);
QObject::connect(&http, &HttpRequest::receivedResponse, [&](const QString json) {
ret = loadFromJson(json);
loop.quit();
});
http.sendRequest("https://api.commadotai.com/v1/route/" + route_ + "/files");
loop.exec();
return ret;
}
bool Route::loadFromJson(const QString &json) {
QJsonObject route_files = QJsonDocument::fromJson(json.trimmed().toUtf8()).object();
if (route_files.empty()) {
qInfo() << "JSON Parse failed";
return false;
}
QRegExp rx(R"(\/(\d+)\/)");
for (const QString &key : route_files.keys()) {
for (const auto &url : route_files[key].toArray()) {
QString url_str = url.toString();
if (rx.indexIn(url_str) != -1) {
const int seg_num = rx.cap(1).toInt();
if (segments_.size() <= seg_num) {
segments_.resize(seg_num + 1);
}
if (key == "logs") {
segments_[seg_num].rlog = url_str;
} else if (key == "qlogs") {
segments_[seg_num].qlog = url_str;
} else if (key == "cameras") {
segments_[seg_num].road_cam = url_str;
} else if (key == "dcameras") {
segments_[seg_num].driver_cam = url_str;
} else if (key == "ecameras") {
segments_[seg_num].wide_road_cam = url_str;
} else if (key == "qcameras") {
segments_[seg_num].qcamera = url_str;
}
}
}
}
return true;
}
// class Segment
Segment::Segment(int n, const SegmentFile &segment_files) : seg_num_(n), files_(segment_files) {
static std::once_flag once_flag;
std::call_once(once_flag, [=]() {
if (!QDir(CACHE_DIR).exists()) QDir().mkdir(CACHE_DIR);
});
// fallback to qcamera
road_cam_path_ = files_.road_cam.isEmpty() ? files_.qcamera : files_.road_cam;
valid_ = !files_.rlog.isEmpty() && !road_cam_path_.isEmpty();
if (!valid_) return;
if (!QUrl(files_.rlog).isLocalFile()) {
for (auto &url : {files_.rlog, road_cam_path_, files_.driver_cam, files_.wide_road_cam}) {
if (!url.isEmpty() && !QFile::exists(localPath(url))) {
qDebug() << "download" << url;
downloadFile(url);
++downloading_;
}
}
}
if (downloading_ == 0) {
QTimer::singleShot(0, this, &Segment::load);
}
}
Segment::~Segment() {
// cancel download, qnam will not abort requests, need to abort them manually
aborting_ = true;
for (QNetworkReply *replay : replies_) {
if (replay->isRunning()) {
replay->abort();
}
replay->deleteLater();
}
}
void Segment::downloadFile(const QString &url) {
QNetworkReply *reply = qnam_.get(QNetworkRequest(url));
replies_.insert(reply);
connect(reply, &QNetworkReply::finished, [=]() {
if (reply->error() == QNetworkReply::NoError) {
QFile file(localPath(url));
if (file.open(QIODevice::WriteOnly)) {
file.write(reply->readAll());
}
}
if (--downloading_ == 0 && !aborting_) {
load();
}
});
}
// load concurrency
void Segment::load() {
std::vector<std::future<bool>> futures;
futures.emplace_back(std::async(std::launch::async, [=]() {
log = std::make_unique<LogReader>();
return log->load(localPath(files_.rlog).toStdString());
}));
QString camera_files[] = {road_cam_path_, files_.driver_cam, files_.wide_road_cam};
for (int i = 0; i < std::size(camera_files); ++i) {
if (!camera_files[i].isEmpty()) {
futures.emplace_back(std::async(std::launch::async, [=]() {
frames[i] = std::make_unique<FrameReader>();
return frames[i]->load(localPath(camera_files[i]).toStdString());
}));
}
}
int success_cnt = std::accumulate(futures.begin(), futures.end(), 0, [=](int v, auto &f) { return f.get() + v; });
loaded_ = valid_ = (success_cnt == futures.size());
emit loadFinished();
}
QString Segment::localPath(const QUrl &url) {
if (url.isLocalFile()) return url.toString();
QByteArray url_no_query = url.toString(QUrl::RemoveQuery).toUtf8();
return CACHE_DIR + QString(QCryptographicHash::hash(url_no_query, QCryptographicHash::Sha256).toHex());
}

View File

@ -0,0 +1,65 @@
#pragma once
#include <QNetworkAccessManager>
#include <QString>
#include <vector>
#include "selfdrive/common/util.h"
#include "selfdrive/ui/replay/filereader.h"
#include "selfdrive/ui/replay/framereader.h"
const QString CACHE_DIR = util::getenv("COMMA_CACHE", "/tmp/comma_download_cache/").c_str();
struct SegmentFile {
QString rlog;
QString qlog;
QString road_cam;
QString driver_cam;
QString wide_road_cam;
QString qcamera;
};
class Route {
public:
Route(const QString &route);
bool load();
inline const QString &name() const { return route_; };
inline int size() const { return segments_.size(); }
inline SegmentFile &at(int n) { return segments_[n]; }
protected:
bool loadFromJson(const QString &json);
QString route_;
std::vector<SegmentFile> segments_;
};
class Segment : public QObject {
Q_OBJECT
public:
Segment(int n, const SegmentFile &segment_files);
~Segment();
inline bool isValid() const { return valid_; };
inline bool isLoaded() const { return loaded_; }
std::unique_ptr<LogReader> log;
std::unique_ptr<FrameReader> frames[MAX_CAMERAS] = {};
signals:
void loadFinished();
protected:
void load();
void downloadFile(const QString &url);
QString localPath(const QUrl &url);
bool loaded_ = false, valid_ = false;
bool aborting_ = false;
int downloading_ = 0;
int seg_num_ = 0;
SegmentFile files_;
QString road_cam_path_;
QSet<QNetworkReply *> replies_;
QNetworkAccessManager qnam_;
};

View File

@ -1,18 +1,15 @@
#define CATCH_CONFIG_MAIN
#include "catch2/catch.hpp"
#include "selfdrive/ui/replay/framereader.h"
const char *stream_url = "https://commadataci.blob.core.windows.net/openpilotci/0c94aa1e1296d7c6/2021-05-05--19-48-37/0/fcamera.hevc";
TEST_CASE("FrameReader") {
SECTION("process&get") {
FrameReader fr(stream_url);
bool ret = fr.process();
REQUIRE(ret == true);
FrameReader fr;
REQUIRE(fr.load(stream_url) == true);
REQUIRE(fr.valid() == true);
REQUIRE(fr.getFrameCount() == 1200);
// random get 50 frames
// srand(time(NULL));
// for (int i = 0; i < 50; ++i) {
@ -24,11 +21,4 @@ TEST_CASE("FrameReader") {
REQUIRE(fr.get(i) != nullptr);
}
}
SECTION("process with timeout") {
FrameReader fr(stream_url, 1);
bool ret = fr.process();
REQUIRE(ret == false);
REQUIRE(fr.valid() == false);
REQUIRE(fr.getFrameCount() < 1200);
}
}