Replay: seeking functionality (#20763)

albatross
iejMac 2021-05-16 02:06:30 +02:00 committed by GitHub
parent 533bc30c2e
commit f0d0b82b8d
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 299 additions and 298 deletions

2
cereal

@ -1 +1 @@
Subproject commit a6f4b6351d70e74220ffa7d9af917c3dea08a2ce
Subproject commit 3c895e7b33a06a4c087c7728a3e44986b360f3ab

View File

@ -63,8 +63,7 @@ if arch == 'x86_64' and os.path.exists(Dir("#tools/").get_abspath()):
qt_env['CPPPATH'] += ["#tools/clib"]
qt_env['CXXFLAGS'] += ["-Wno-deprecated-declarations"]
replay_lib_src = ["replay/replay.cc", "replay/unlogger.cc",
"replay/filereader.cc", "#tools/clib/framereader.cc"]
replay_lib_src = ["replay/replay.cc", "replay/filereader.cc", "#tools/clib/framereader.cc"]
replay_lib = qt_env.Library("qt_replay", replay_lib_src, LIBS=base_libs)
replay_libs = [replay_lib, 'avutil', 'avcodec', 'avformat', 'swscale', 'bz2'] + qt_libs

View File

@ -119,8 +119,6 @@ void LogReader::mergeEvents(int dled) {
*events += events_local;
eidx->unite(eidx_local);
events_lock->unlock();
printf("parsed %d into %d events with offset %d\n", dled, events->size(), event_offset);
}
void LogReader::readyRead() {
@ -135,7 +133,6 @@ void LogReader::readyRead() {
qWarning() << "bz2 decompress failed";
break;
}
qDebug() << "got" << dat.size() << "with" << bStream.avail_out << "size" << raw.size();
}
int dled = raw.size() - bStream.avail_out;

View File

@ -11,8 +11,8 @@ int main(int argc, char *argv[]){
return 1;
}
Replay *replay = new Replay(route, 0);
replay->stream(0);
Replay *replay = new Replay(route);
replay->start();
return a.exec();
}

View File

@ -1,17 +1,57 @@
#include "replay.h"
#include "selfdrive/ui/replay/replay.h"
#include <QJsonDocument>
#include <QJsonObject>
#include "cereal/services.h"
#include "selfdrive/camerad/cameras/camera_common.h"
#include "selfdrive/common/timing.h"
#include "selfdrive/hardware/hw.h"
Replay::Replay(QString route_, int seek) : route(route_) {
unlogger = new Unlogger(&events, &events_lock, &frs, seek);
current_segment = 0;
bool create_jwt = !Hardware::PC();
http = new HttpRequest(this, "https://api.commadotai.com/v1/route/" + route + "/files", "", create_jwt);
int getch() {
int ch;
struct termios oldt;
struct termios newt;
tcgetattr(STDIN_FILENO, &oldt);
newt = oldt;
newt.c_lflag &= ~(ICANON | ECHO);
tcsetattr(STDIN_FILENO, TCSANOW, &newt);
ch = getchar();
tcsetattr(STDIN_FILENO, TCSANOW, &oldt);
return ch;
}
Replay::Replay(QString route, SubMaster *sm_, QObject *parent) : sm(sm_), QObject(parent) {
QStringList block = QString(getenv("BLOCK")).split(",");
qDebug() << "blocklist" << block;
QStringList allow = QString(getenv("ALLOW")).split(",");
qDebug() << "allowlist" << allow;
std::vector<const char*> s;
for (const auto &it : services) {
if ((allow[0].size() == 0 || allow.contains(it.name)) &&
!block.contains(it.name)) {
s.push_back(it.name);
socks.append(std::string(it.name));
}
}
qDebug() << "services " << s;
if (sm == nullptr) {
pm = new PubMaster(s);
}
const QString url = "https://api.commadotai.com/v1/route/" + route + "/files";
http = new HttpRequest(this, url, "", Hardware::PC());
QObject::connect(http, &HttpRequest::receivedResponse, this, &Replay::parseResponse);
}
void Replay::parseResponse(const QString &response){
void Replay::parseResponse(const QString &response) {
QJsonDocument doc = QJsonDocument::fromJson(response.trimmed().toUtf8());
if (doc.isNull()) {
qDebug() << "JSON Parse failed";
return;
@ -20,37 +60,228 @@ void Replay::parseResponse(const QString &response){
camera_paths = doc["cameras"].toArray();
log_paths = doc["logs"].toArray();
// add first segment
addSegment(0);
seekTime(0);
}
void Replay::addSegment(int i){
if (lrs.find(i) != lrs.end()) {
void Replay::addSegment(int n) {
assert((n >= 0) && (n < log_paths.size()) && (n < camera_paths.size()));
if (lrs.find(n) != lrs.end()) {
return;
}
QThread* thread = new QThread;
QThread *t = new QThread;
lrs.insert(n, new LogReader(log_paths.at(n).toString(), &events, &events_lock, &eidx));
QString log_fn = this->log_paths.at(i).toString();
lrs.insert(i, new LogReader(log_fn, &events, &events_lock, &unlogger->eidx));
lrs[n]->moveToThread(t);
QObject::connect(t, &QThread::started, lrs[n], &LogReader::process);
t->start();
lrs[i]->moveToThread(thread);
QObject::connect(thread, &QThread::started, lrs[i], &LogReader::process);
thread->start();
QString camera_fn = this->camera_paths.at(i).toString();
frs.insert(i, new FrameReader(qPrintable(camera_fn)));
frs.insert(n, new FrameReader(qPrintable(camera_paths.at(n).toString())));
}
void Replay::stream(SubMaster *sm){
QThread* thread = new QThread;
unlogger->moveToThread(thread);
void Replay::removeSegment(int n) {
// TODO: fix FrameReader and LogReader destructors
/*
if (lrs.contains(n)) {
auto lr = lrs.take(n);
delete lr;
}
if (frs.contains(n)) {
auto fr = frs.take(n);
delete fr;
}
events_lock.lockForWrite();
auto eit = events.begin();
while (eit != events.end()) {
if(std::abs(eit.key()/1e9 - getCurrentTime()/1e9) > 60.0){
eit = events.erase(eit);
continue;
}
eit++;
}
events_lock.unlock();
*/
}
void Replay::start(){
thread = new QThread;
QObject::connect(thread, &QThread::started, [=](){
unlogger->process(sm);
stream();
});
thread->start();
QObject::connect(unlogger, &Unlogger::loadSegment, [=](){
addSegment(++current_segment);
kb_thread = new QThread;
QObject::connect(kb_thread, &QThread::started, [=](){
keyboardThread();
});
kb_thread->start();
queue_thread = new QThread;
QObject::connect(queue_thread, &QThread::started, [=](){
segmentQueueThread();
});
queue_thread->start();
}
void Replay::seekTime(int ts) {
ts = std::clamp(ts, 0, log_paths.size() * 60);
qInfo() << "seeking to " << ts;
seek_ts = ts;
current_segment = ts/60;
}
void Replay::segmentQueueThread() {
// maintain the segment window
while (true) {
for (int i = 0; i < log_paths.size(); i++) {
int start_idx = std::max(current_segment - BACKWARD_SEGS, 0);
int end_idx = std::min(current_segment + FORWARD_SEGS, log_paths.size());
if (i >= start_idx && i <= end_idx) {
addSegment(i);
} else {
removeSegment(i);
}
}
QThread::msleep(100);
}
}
void Replay::keyboardThread() {
char c;
while (true) {
c = getch();
if(c == '\n'){
printf("Enter seek request: ");
std::string r;
std::cin >> r;
try {
if(r[0] == '#') {
r.erase(0, 1);
seekTime(std::stoi(r)*60);
} else {
seekTime(std::stoi(r));
}
} catch (std::invalid_argument) {
qDebug() << "invalid argument";
}
getch(); // remove \n from entering seek
} else if (c == 'm') {
seekTime(current_ts + 60);
} else if (c == 'M') {
seekTime(current_ts - 60);
} else if (c == 's') {
seekTime(current_ts + 10);
} else if (c == 'S') {
seekTime(current_ts - 10);
} else if (c == 'G') {
seekTime(0);
}
}
}
void Replay::stream() {
QElapsedTimer timer;
timer.start();
route_start_ts = 0;
while (true) {
if (events.size() == 0) {
qDebug() << "waiting for events";
QThread::msleep(100);
continue;
}
// TODO: use initData's logMonoTime
if (route_start_ts == 0) {
route_start_ts = events.firstKey();
}
uint64_t t0 = route_start_ts + (seek_ts * 1e9);
seek_ts = -1;
qDebug() << "unlogging at" << (t0 - route_start_ts) / 1e9;
// wait until we have events within 1s of the current time
auto eit = events.lowerBound(t0);
while (eit.key() - t0 > 1e9) {
eit = events.lowerBound(t0);
QThread::msleep(10);
}
uint64_t t0r = timer.nsecsElapsed();
while ((eit != events.end()) && seek_ts < 0) {
cereal::Event::Reader e = (*eit);
std::string type;
KJ_IF_MAYBE(e_, static_cast<capnp::DynamicStruct::Reader>(e).which()) {
type = e_->getProto().getName();
}
uint64_t tm = e.getLogMonoTime();
current_ts = std::max(tm - route_start_ts, (unsigned long)0) / 1e9;
if (socks.contains(type)) {
float timestamp = (tm - route_start_ts)/1e9;
if (std::abs(timestamp - last_print) > 5.0) {
last_print = timestamp;
qInfo() << "at " << last_print;
}
// keep time
long etime = tm-t0;
long rtime = timer.nsecsElapsed() - t0r;
long us_behind = ((etime-rtime)*1e-3)+0.5;
if (us_behind > 0 && us_behind < 1e6) {
QThread::usleep(us_behind);
//qDebug() << "sleeping" << us_behind << etime << timer.nsecsElapsed();
}
// publish frame
// TODO: publish all frames
if (type == "roadCameraState") {
auto fr = e.getRoadCameraState();
auto it_ = eidx.find(fr.getFrameId());
if (it_ != eidx.end()) {
auto pp = *it_;
if (frs.find(pp.first) != frs.end()) {
auto frm = frs[pp.first];
auto data = frm->get(pp.second);
if (vipc_server == nullptr) {
cl_device_id device_id = cl_get_device_id(CL_DEVICE_TYPE_DEFAULT);
cl_context context = CL_CHECK_ERR(clCreateContext(NULL, 1, &device_id, NULL, NULL, &err));
vipc_server = new VisionIpcServer("camerad", device_id, context);
vipc_server->create_buffers(VisionStreamType::VISION_STREAM_RGB_BACK, UI_BUF_COUNT,
true, frm->width, frm->height);
vipc_server->start_listener();
}
VisionIpcBufExtra extra = {};
VisionBuf *buf = vipc_server->get_buffer(VisionStreamType::VISION_STREAM_RGB_BACK);
memcpy(buf->addr, data, frm->getRGBSize());
vipc_server->send(buf, &extra, false);
}
}
}
// publish msg
if (sm == nullptr) {
capnp::MallocMessageBuilder msg;
msg.setRoot(e);
auto words = capnp::messageToFlatArray(msg);
auto bytes = words.asBytes();
pm->send(type.c_str(), (unsigned char*)bytes.begin(), bytes.size());
} else {
std::vector<std::pair<std::string, cereal::Event::Reader>> messages;
messages.push_back({type, e});
sm->update_msgs(nanos_since_boot(), messages);
}
}
++eit;
}
}
}

View File

@ -1,10 +1,10 @@
#pragma once
#include <QFile>
#include <iostream>
#include <termios.h>
#include <QJsonArray>
#include <QJsonDocument>
#include <QJsonObject>
#include <QQueue>
#include <QThread>
#include <capnp/dynamic.h>
@ -12,37 +12,55 @@
#include "selfdrive/common/util.h"
#include "selfdrive/ui/qt/api.h"
#include "selfdrive/ui/replay/filereader.h"
#include "selfdrive/ui/replay/unlogger.h"
#include "tools/clib/framereader.h"
constexpr int FORWARD_SEGS = 2;
constexpr int BACKWARD_SEGS = 2;
class Replay : public QObject {
Q_OBJECT
public:
Replay(QString route_, int seek);
void stream(SubMaster *sm = nullptr);
void addSegment(int i);
QJsonArray camera_paths;
QJsonArray log_paths;
Replay(QString route, SubMaster *sm = nullptr, QObject *parent = 0);
QQueue<int> event_sizes;
void start();
void addSegment(int n);
void removeSegment(int n);
void seekTime(int ts);
public slots:
void stream();
void keyboardThread();
void segmentQueueThread();
void parseResponse(const QString &response);
protected:
Unlogger *unlogger;
private:
QString route;
float last_print = 0;
uint64_t route_start_ts;
std::atomic<int> seek_ts = 0;
std::atomic<int> current_ts = 0;
std::atomic<int> current_segment;
QReadWriteLock events_lock;
QThread *thread;
QThread *kb_thread;
QThread *queue_thread;
// logs
Events events;
QReadWriteLock events_lock;
QMap<int, QPair<int, int>> eidx;
HttpRequest *http;
QJsonArray camera_paths;
QJsonArray log_paths;
QMap<int, LogReader*> lrs;
QMap<int, FrameReader*> frs;
HttpRequest *http;
int current_segment;
// messaging
SubMaster *sm;
PubMaster *pm;
QVector<std::string> socks;
VisionIpcServer *vipc_server = nullptr;
};

View File

@ -1,202 +0,0 @@
#include "selfdrive/ui/replay/unlogger.h"
#include <cmath>
#include <string>
#include <vector>
#include <stdint.h>
#include <time.h>
// include the dynamic struct
#include <capnp/dynamic.h>
#include <capnp/schema.h>
#include "cereal/gen/cpp/car.capnp.c++"
#include "cereal/gen/cpp/legacy.capnp.c++"
#include "cereal/gen/cpp/log.capnp.c++"
#include "cereal/services.h"
#include "selfdrive/common/timing.h"
Unlogger::Unlogger(Events *events_, QReadWriteLock* events_lock_, QMap<int, FrameReader*> *frs_, int seek)
: events(events_), events_lock(events_lock_), frs(frs_) {
ctx = Context::create();
seek_request = seek*1e9;
QStringList block = QString(getenv("BLOCK")).split(",");
qDebug() << "blocklist" << block;
QStringList allow = QString(getenv("ALLOW")).split(",");
qDebug() << "allowlist" << allow;
for (const auto& it : services) {
std::string name = it.name;
if (allow[0].size() > 0 && !allow.contains(name.c_str())) {
qDebug() << "not allowing" << name.c_str();
continue;
}
if (block.contains(name.c_str())) {
qDebug() << "blocking" << name.c_str();
continue;
}
PubSocket *sock = PubSocket::create(ctx, name);
if (sock == NULL) {
qDebug() << "FAILED" << name.c_str();
continue;
}
qDebug() << name.c_str();
socks.insert(name, sock);
}
}
void Unlogger::process(SubMaster *sm) {
qDebug() << "hello from unlogger thread";
while (events->size() == 0) {
qDebug() << "waiting for events";
QThread::sleep(1);
}
qDebug() << "got events";
// TODO: hack
if (seek_request != 0) {
seek_request += events->begin().key();
while (events->lowerBound(seek_request) == events->end()) {
qDebug() << "waiting for desired time";
QThread::sleep(1);
}
}
QElapsedTimer timer;
timer.start();
uint64_t last_elapsed = 0;
// loops
while (1) {
uint64_t t0 = (events->begin()+1).key();
uint64_t t0r = timer.nsecsElapsed();
qDebug() << "unlogging at" << t0;
auto eit = events->lowerBound(t0);
while (eit != events->end()) {
float time_to_end = ((events->lastKey() - eit.key())/1e9);
if (loading_segment && (time_to_end > 20.0)){
loading_segment = false;
}
while (paused) {
QThread::usleep(1000);
t0 = eit->getLogMonoTime();
t0r = timer.nsecsElapsed();
}
if (seek_request != 0) {
t0 = seek_request;
qDebug() << "seeking to" << t0;
t0r = timer.nsecsElapsed();
eit = events->lowerBound(t0);
seek_request = 0;
if (eit == events->end()) {
qWarning() << "seek off end";
break;
}
}
if (abs(((long long)tc-(long long)last_elapsed)) > 50e6) {
//qDebug() << "elapsed";
emit elapsed();
last_elapsed = tc;
}
cereal::Event::Reader e = *eit;
capnp::DynamicStruct::Reader e_ds = static_cast<capnp::DynamicStruct::Reader>(e);
std::string type;
KJ_IF_MAYBE(e_, e_ds.which()){
type = e_->getProto().getName();
}
uint64_t tm = e.getLogMonoTime();
auto it = socks.find(type);
tc = tm;
if (it != socks.end()) {
long etime = tm-t0;
float timestamp = etime/1e9;
if(std::abs(timestamp-last_print) > 5.0){
last_print = timestamp;
printf("at %f\n", last_print);
}
long rtime = timer.nsecsElapsed() - t0r;
long us_behind = ((etime-rtime)*1e-3)+0.5;
if (us_behind > 0) {
if (us_behind > 1e6) {
qWarning() << "OVER ONE SECOND BEHIND, HACKING" << us_behind;
us_behind = 0;
t0 = tm;
t0r = timer.nsecsElapsed();
}
QThread::usleep(us_behind);
//qDebug() << "sleeping" << us_behind << etime << timer.nsecsElapsed();
}
if (type == "roadCameraState") {
auto fr = e.getRoadCameraState();
auto it_ = eidx.find(fr.getFrameId());
if (it_ != eidx.end()) {
auto pp = *it_;
//qDebug() << fr.getRoadCameraStateId() << pp;
if (frs->find(pp.first) != frs->end()) {
auto frm = (*frs)[pp.first];
auto data = frm->get(pp.second);
if (vipc_server == nullptr) {
cl_device_id device_id = cl_get_device_id(CL_DEVICE_TYPE_DEFAULT);
cl_context context = CL_CHECK_ERR(clCreateContext(NULL, 1, &device_id, NULL, NULL, &err));
vipc_server = new VisionIpcServer("camerad", device_id, context);
vipc_server->create_buffers(VisionStreamType::VISION_STREAM_RGB_BACK, 4, true, frm->width, frm->height);
vipc_server->start_listener();
}
VisionBuf *buf = vipc_server->get_buffer(VisionStreamType::VISION_STREAM_RGB_BACK);
memcpy(buf->addr, data, frm->getRGBSize());
VisionIpcBufExtra extra = {};
vipc_server->send(buf, &extra, false);
}
}
}
if (sm == nullptr){
capnp::MallocMessageBuilder msg;
msg.setRoot(e);
auto words = capnp::messageToFlatArray(msg);
auto bytes = words.asBytes();
(*it)->send((char*)bytes.begin(), bytes.size());
} else{
std::vector<std::pair<std::string, cereal::Event::Reader>> messages;
messages.push_back({type, e});
sm->update_msgs(nanos_since_boot(), messages);
}
}
++eit;
if (time_to_end < 10.0 && !loading_segment){
loading_segment = true;
emit loadSegment();
}
}
}
}

View File

@ -1,42 +0,0 @@
#pragma once
#include <QReadWriteLock>
#include <QThread>
#include "cereal/messaging/messaging.h"
#include "cereal/visionipc/visionipc_server.h"
#include "selfdrive/common/clutil.h"
#include "selfdrive/ui/replay/filereader.h"
#include "tools/clib/framereader.h"
class Unlogger : public QObject {
Q_OBJECT
public:
Unlogger(Events *events_, QReadWriteLock* events_lock_, QMap<int, FrameReader*> *frs_, int seek);
uint64_t getCurrentTime() { return tc; }
void setSeekRequest(uint64_t seek_request_) { seek_request = seek_request_; }
void setPause(bool pause) { paused = pause; }
void togglePause() { paused = !paused; }
QMap<int, QPair<int, int> > eidx;
public slots:
void process(SubMaster *sm = nullptr);
signals:
void elapsed();
void finished();
void loadSegment();
private:
Events *events;
QReadWriteLock *events_lock;
QMap<int, FrameReader*> *frs;
QMap<std::string, PubSocket*> socks;
Context *ctx;
uint64_t tc = 0;
float last_print = 0;
uint64_t seek_request = 0;
bool paused = false;
bool loading_segment = false;
VisionIpcServer *vipc_server = nullptr;
};