From f0d0b82b8d6f5f450952113e234d0a5a49e80c48 Mon Sep 17 00:00:00 2001 From: iejMac <61431446+iejMac@users.noreply.github.com> Date: Sun, 16 May 2021 02:06:30 +0200 Subject: [PATCH] Replay: seeking functionality (#20763) --- cereal | 2 +- selfdrive/ui/SConscript | 3 +- selfdrive/ui/replay/filereader.cc | 3 - selfdrive/ui/replay/main.cc | 4 +- selfdrive/ui/replay/replay.cc | 285 +++++++++++++++++++++++++++--- selfdrive/ui/replay/replay.h | 56 ++++-- selfdrive/ui/replay/unlogger.cc | 202 --------------------- selfdrive/ui/replay/unlogger.h | 42 ----- 8 files changed, 299 insertions(+), 298 deletions(-) delete mode 100644 selfdrive/ui/replay/unlogger.cc delete mode 100644 selfdrive/ui/replay/unlogger.h diff --git a/cereal b/cereal index a6f4b635..3c895e7b 160000 --- a/cereal +++ b/cereal @@ -1 +1 @@ -Subproject commit a6f4b6351d70e74220ffa7d9af917c3dea08a2ce +Subproject commit 3c895e7b33a06a4c087c7728a3e44986b360f3ab diff --git a/selfdrive/ui/SConscript b/selfdrive/ui/SConscript index 325c50b7..5be9e433 100644 --- a/selfdrive/ui/SConscript +++ b/selfdrive/ui/SConscript @@ -63,8 +63,7 @@ if arch == 'x86_64' and os.path.exists(Dir("#tools/").get_abspath()): qt_env['CPPPATH'] += ["#tools/clib"] qt_env['CXXFLAGS'] += ["-Wno-deprecated-declarations"] - replay_lib_src = ["replay/replay.cc", "replay/unlogger.cc", - "replay/filereader.cc", "#tools/clib/framereader.cc"] + replay_lib_src = ["replay/replay.cc", "replay/filereader.cc", "#tools/clib/framereader.cc"] replay_lib = qt_env.Library("qt_replay", replay_lib_src, LIBS=base_libs) replay_libs = [replay_lib, 'avutil', 'avcodec', 'avformat', 'swscale', 'bz2'] + qt_libs diff --git a/selfdrive/ui/replay/filereader.cc b/selfdrive/ui/replay/filereader.cc index 9b564bfe..dd0f8c24 100644 --- a/selfdrive/ui/replay/filereader.cc +++ b/selfdrive/ui/replay/filereader.cc @@ -119,8 +119,6 @@ void LogReader::mergeEvents(int dled) { *events += events_local; eidx->unite(eidx_local); events_lock->unlock(); - - printf("parsed %d into %d events with offset %d\n", dled, events->size(), event_offset); } void LogReader::readyRead() { @@ -135,7 +133,6 @@ void LogReader::readyRead() { qWarning() << "bz2 decompress failed"; break; } - qDebug() << "got" << dat.size() << "with" << bStream.avail_out << "size" << raw.size(); } int dled = raw.size() - bStream.avail_out; diff --git a/selfdrive/ui/replay/main.cc b/selfdrive/ui/replay/main.cc index 4a4f76d3..1b1efd2a 100644 --- a/selfdrive/ui/replay/main.cc +++ b/selfdrive/ui/replay/main.cc @@ -11,8 +11,8 @@ int main(int argc, char *argv[]){ return 1; } - Replay *replay = new Replay(route, 0); - replay->stream(0); + Replay *replay = new Replay(route); + replay->start(); return a.exec(); } diff --git a/selfdrive/ui/replay/replay.cc b/selfdrive/ui/replay/replay.cc index 8c563105..0773600f 100644 --- a/selfdrive/ui/replay/replay.cc +++ b/selfdrive/ui/replay/replay.cc @@ -1,17 +1,57 @@ -#include "replay.h" +#include "selfdrive/ui/replay/replay.h" + +#include +#include + +#include "cereal/services.h" +#include "selfdrive/camerad/cameras/camera_common.h" +#include "selfdrive/common/timing.h" #include "selfdrive/hardware/hw.h" -Replay::Replay(QString route_, int seek) : route(route_) { - unlogger = new Unlogger(&events, &events_lock, &frs, seek); - current_segment = 0; - bool create_jwt = !Hardware::PC(); - http = new HttpRequest(this, "https://api.commadotai.com/v1/route/" + route + "/files", "", create_jwt); +int getch() { + int ch; + struct termios oldt; + struct termios newt; + + tcgetattr(STDIN_FILENO, &oldt); + newt = oldt; + newt.c_lflag &= ~(ICANON | ECHO); + + tcsetattr(STDIN_FILENO, TCSANOW, &newt); + ch = getchar(); + tcsetattr(STDIN_FILENO, TCSANOW, &oldt); + + return ch; +} + +Replay::Replay(QString route, SubMaster *sm_, QObject *parent) : sm(sm_), QObject(parent) { + QStringList block = QString(getenv("BLOCK")).split(","); + qDebug() << "blocklist" << block; + + QStringList allow = QString(getenv("ALLOW")).split(","); + qDebug() << "allowlist" << allow; + + std::vector s; + for (const auto &it : services) { + if ((allow[0].size() == 0 || allow.contains(it.name)) && + !block.contains(it.name)) { + s.push_back(it.name); + socks.append(std::string(it.name)); + } + } + qDebug() << "services " << s; + + if (sm == nullptr) { + pm = new PubMaster(s); + } + + const QString url = "https://api.commadotai.com/v1/route/" + route + "/files"; + http = new HttpRequest(this, url, "", Hardware::PC()); QObject::connect(http, &HttpRequest::receivedResponse, this, &Replay::parseResponse); } -void Replay::parseResponse(const QString &response){ +void Replay::parseResponse(const QString &response) { QJsonDocument doc = QJsonDocument::fromJson(response.trimmed().toUtf8()); - if (doc.isNull()) { qDebug() << "JSON Parse failed"; return; @@ -20,37 +60,228 @@ void Replay::parseResponse(const QString &response){ camera_paths = doc["cameras"].toArray(); log_paths = doc["logs"].toArray(); - // add first segment - addSegment(0); + seekTime(0); } -void Replay::addSegment(int i){ - if (lrs.find(i) != lrs.end()) { +void Replay::addSegment(int n) { + assert((n >= 0) && (n < log_paths.size()) && (n < camera_paths.size())); + if (lrs.find(n) != lrs.end()) { return; } - QThread* thread = new QThread; + QThread *t = new QThread; + lrs.insert(n, new LogReader(log_paths.at(n).toString(), &events, &events_lock, &eidx)); - QString log_fn = this->log_paths.at(i).toString(); - lrs.insert(i, new LogReader(log_fn, &events, &events_lock, &unlogger->eidx)); + lrs[n]->moveToThread(t); + QObject::connect(t, &QThread::started, lrs[n], &LogReader::process); + t->start(); - lrs[i]->moveToThread(thread); - QObject::connect(thread, &QThread::started, lrs[i], &LogReader::process); - thread->start(); - - QString camera_fn = this->camera_paths.at(i).toString(); - frs.insert(i, new FrameReader(qPrintable(camera_fn))); + frs.insert(n, new FrameReader(qPrintable(camera_paths.at(n).toString()))); } -void Replay::stream(SubMaster *sm){ - QThread* thread = new QThread; - unlogger->moveToThread(thread); +void Replay::removeSegment(int n) { + // TODO: fix FrameReader and LogReader destructors + /* + if (lrs.contains(n)) { + auto lr = lrs.take(n); + delete lr; + } + if (frs.contains(n)) { + auto fr = frs.take(n); + delete fr; + } + + events_lock.lockForWrite(); + auto eit = events.begin(); + while (eit != events.end()) { + if(std::abs(eit.key()/1e9 - getCurrentTime()/1e9) > 60.0){ + eit = events.erase(eit); + continue; + } + eit++; + } + events_lock.unlock(); + */ +} + +void Replay::start(){ + thread = new QThread; QObject::connect(thread, &QThread::started, [=](){ - unlogger->process(sm); + stream(); }); thread->start(); - QObject::connect(unlogger, &Unlogger::loadSegment, [=](){ - addSegment(++current_segment); + kb_thread = new QThread; + QObject::connect(kb_thread, &QThread::started, [=](){ + keyboardThread(); }); + kb_thread->start(); + + queue_thread = new QThread; + QObject::connect(queue_thread, &QThread::started, [=](){ + segmentQueueThread(); + }); + queue_thread->start(); +} + +void Replay::seekTime(int ts) { + ts = std::clamp(ts, 0, log_paths.size() * 60); + qInfo() << "seeking to " << ts; + + seek_ts = ts; + current_segment = ts/60; +} + +void Replay::segmentQueueThread() { + // maintain the segment window + while (true) { + for (int i = 0; i < log_paths.size(); i++) { + int start_idx = std::max(current_segment - BACKWARD_SEGS, 0); + int end_idx = std::min(current_segment + FORWARD_SEGS, log_paths.size()); + if (i >= start_idx && i <= end_idx) { + addSegment(i); + } else { + removeSegment(i); + } + } + QThread::msleep(100); + } +} + +void Replay::keyboardThread() { + char c; + while (true) { + c = getch(); + if(c == '\n'){ + printf("Enter seek request: "); + std::string r; + std::cin >> r; + + try { + if(r[0] == '#') { + r.erase(0, 1); + seekTime(std::stoi(r)*60); + } else { + seekTime(std::stoi(r)); + } + } catch (std::invalid_argument) { + qDebug() << "invalid argument"; + } + getch(); // remove \n from entering seek + } else if (c == 'm') { + seekTime(current_ts + 60); + } else if (c == 'M') { + seekTime(current_ts - 60); + } else if (c == 's') { + seekTime(current_ts + 10); + } else if (c == 'S') { + seekTime(current_ts - 10); + } else if (c == 'G') { + seekTime(0); + } + } +} + +void Replay::stream() { + QElapsedTimer timer; + timer.start(); + + route_start_ts = 0; + while (true) { + if (events.size() == 0) { + qDebug() << "waiting for events"; + QThread::msleep(100); + continue; + } + + // TODO: use initData's logMonoTime + if (route_start_ts == 0) { + route_start_ts = events.firstKey(); + } + + uint64_t t0 = route_start_ts + (seek_ts * 1e9); + seek_ts = -1; + qDebug() << "unlogging at" << (t0 - route_start_ts) / 1e9; + + // wait until we have events within 1s of the current time + auto eit = events.lowerBound(t0); + while (eit.key() - t0 > 1e9) { + eit = events.lowerBound(t0); + QThread::msleep(10); + } + + uint64_t t0r = timer.nsecsElapsed(); + while ((eit != events.end()) && seek_ts < 0) { + cereal::Event::Reader e = (*eit); + std::string type; + KJ_IF_MAYBE(e_, static_cast(e).which()) { + type = e_->getProto().getName(); + } + + uint64_t tm = e.getLogMonoTime(); + current_ts = std::max(tm - route_start_ts, (unsigned long)0) / 1e9; + + if (socks.contains(type)) { + float timestamp = (tm - route_start_ts)/1e9; + if (std::abs(timestamp - last_print) > 5.0) { + last_print = timestamp; + qInfo() << "at " << last_print; + } + + // keep time + long etime = tm-t0; + long rtime = timer.nsecsElapsed() - t0r; + long us_behind = ((etime-rtime)*1e-3)+0.5; + if (us_behind > 0 && us_behind < 1e6) { + QThread::usleep(us_behind); + //qDebug() << "sleeping" << us_behind << etime << timer.nsecsElapsed(); + } + + // publish frame + // TODO: publish all frames + if (type == "roadCameraState") { + auto fr = e.getRoadCameraState(); + + auto it_ = eidx.find(fr.getFrameId()); + if (it_ != eidx.end()) { + auto pp = *it_; + if (frs.find(pp.first) != frs.end()) { + auto frm = frs[pp.first]; + auto data = frm->get(pp.second); + + if (vipc_server == nullptr) { + cl_device_id device_id = cl_get_device_id(CL_DEVICE_TYPE_DEFAULT); + cl_context context = CL_CHECK_ERR(clCreateContext(NULL, 1, &device_id, NULL, NULL, &err)); + + vipc_server = new VisionIpcServer("camerad", device_id, context); + vipc_server->create_buffers(VisionStreamType::VISION_STREAM_RGB_BACK, UI_BUF_COUNT, + true, frm->width, frm->height); + vipc_server->start_listener(); + } + + VisionIpcBufExtra extra = {}; + VisionBuf *buf = vipc_server->get_buffer(VisionStreamType::VISION_STREAM_RGB_BACK); + memcpy(buf->addr, data, frm->getRGBSize()); + vipc_server->send(buf, &extra, false); + } + } + } + + // publish msg + if (sm == nullptr) { + capnp::MallocMessageBuilder msg; + msg.setRoot(e); + auto words = capnp::messageToFlatArray(msg); + auto bytes = words.asBytes(); + pm->send(type.c_str(), (unsigned char*)bytes.begin(), bytes.size()); + } else { + std::vector> messages; + messages.push_back({type, e}); + sm->update_msgs(nanos_since_boot(), messages); + } + } + + ++eit; + } + } } diff --git a/selfdrive/ui/replay/replay.h b/selfdrive/ui/replay/replay.h index 5fd88f0b..e7c81cbd 100644 --- a/selfdrive/ui/replay/replay.h +++ b/selfdrive/ui/replay/replay.h @@ -1,10 +1,10 @@ #pragma once -#include +#include +#include + #include -#include -#include -#include +#include #include @@ -12,37 +12,55 @@ #include "selfdrive/common/util.h" #include "selfdrive/ui/qt/api.h" #include "selfdrive/ui/replay/filereader.h" -#include "selfdrive/ui/replay/unlogger.h" #include "tools/clib/framereader.h" + +constexpr int FORWARD_SEGS = 2; +constexpr int BACKWARD_SEGS = 2; + + class Replay : public QObject { Q_OBJECT public: - Replay(QString route_, int seek); - void stream(SubMaster *sm = nullptr); - void addSegment(int i); - QJsonArray camera_paths; - QJsonArray log_paths; + Replay(QString route, SubMaster *sm = nullptr, QObject *parent = 0); - QQueue event_sizes; + void start(); + void addSegment(int n); + void removeSegment(int n); + void seekTime(int ts); public slots: + void stream(); + void keyboardThread(); + void segmentQueueThread(); void parseResponse(const QString &response); -protected: - Unlogger *unlogger; - private: - QString route; + float last_print = 0; + uint64_t route_start_ts; + std::atomic seek_ts = 0; + std::atomic current_ts = 0; + std::atomic current_segment; - QReadWriteLock events_lock; + QThread *thread; + QThread *kb_thread; + QThread *queue_thread; + + // logs Events events; + QReadWriteLock events_lock; + QMap> eidx; + HttpRequest *http; + QJsonArray camera_paths; + QJsonArray log_paths; QMap lrs; QMap frs; - HttpRequest *http; - int current_segment; + // messaging + SubMaster *sm; + PubMaster *pm; + QVector socks; + VisionIpcServer *vipc_server = nullptr; }; - diff --git a/selfdrive/ui/replay/unlogger.cc b/selfdrive/ui/replay/unlogger.cc deleted file mode 100644 index 01ce43b2..00000000 --- a/selfdrive/ui/replay/unlogger.cc +++ /dev/null @@ -1,202 +0,0 @@ -#include "selfdrive/ui/replay/unlogger.h" - -#include -#include -#include - -#include -#include - -// include the dynamic struct -#include -#include - -#include "cereal/gen/cpp/car.capnp.c++" -#include "cereal/gen/cpp/legacy.capnp.c++" -#include "cereal/gen/cpp/log.capnp.c++" -#include "cereal/services.h" -#include "selfdrive/common/timing.h" - -Unlogger::Unlogger(Events *events_, QReadWriteLock* events_lock_, QMap *frs_, int seek) - : events(events_), events_lock(events_lock_), frs(frs_) { - ctx = Context::create(); - - seek_request = seek*1e9; - - QStringList block = QString(getenv("BLOCK")).split(","); - qDebug() << "blocklist" << block; - - QStringList allow = QString(getenv("ALLOW")).split(","); - qDebug() << "allowlist" << allow; - - for (const auto& it : services) { - std::string name = it.name; - if (allow[0].size() > 0 && !allow.contains(name.c_str())) { - qDebug() << "not allowing" << name.c_str(); - continue; - } - - if (block.contains(name.c_str())) { - qDebug() << "blocking" << name.c_str(); - continue; - } - - PubSocket *sock = PubSocket::create(ctx, name); - if (sock == NULL) { - qDebug() << "FAILED" << name.c_str(); - continue; - } - - qDebug() << name.c_str(); - - socks.insert(name, sock); - } -} - -void Unlogger::process(SubMaster *sm) { - - qDebug() << "hello from unlogger thread"; - while (events->size() == 0) { - qDebug() << "waiting for events"; - QThread::sleep(1); - } - qDebug() << "got events"; - - // TODO: hack - if (seek_request != 0) { - seek_request += events->begin().key(); - while (events->lowerBound(seek_request) == events->end()) { - qDebug() << "waiting for desired time"; - QThread::sleep(1); - } - } - - QElapsedTimer timer; - timer.start(); - - uint64_t last_elapsed = 0; - - // loops - while (1) { - uint64_t t0 = (events->begin()+1).key(); - uint64_t t0r = timer.nsecsElapsed(); - qDebug() << "unlogging at" << t0; - - auto eit = events->lowerBound(t0); - while (eit != events->end()) { - - float time_to_end = ((events->lastKey() - eit.key())/1e9); - if (loading_segment && (time_to_end > 20.0)){ - loading_segment = false; - } - - while (paused) { - QThread::usleep(1000); - t0 = eit->getLogMonoTime(); - t0r = timer.nsecsElapsed(); - } - - if (seek_request != 0) { - t0 = seek_request; - qDebug() << "seeking to" << t0; - t0r = timer.nsecsElapsed(); - eit = events->lowerBound(t0); - seek_request = 0; - if (eit == events->end()) { - qWarning() << "seek off end"; - break; - } - } - - if (abs(((long long)tc-(long long)last_elapsed)) > 50e6) { - //qDebug() << "elapsed"; - emit elapsed(); - last_elapsed = tc; - } - - cereal::Event::Reader e = *eit; - - capnp::DynamicStruct::Reader e_ds = static_cast(e); - std::string type; - KJ_IF_MAYBE(e_, e_ds.which()){ - type = e_->getProto().getName(); - } - - uint64_t tm = e.getLogMonoTime(); - auto it = socks.find(type); - tc = tm; - if (it != socks.end()) { - long etime = tm-t0; - - float timestamp = etime/1e9; - if(std::abs(timestamp-last_print) > 5.0){ - last_print = timestamp; - printf("at %f\n", last_print); - } - - long rtime = timer.nsecsElapsed() - t0r; - long us_behind = ((etime-rtime)*1e-3)+0.5; - if (us_behind > 0) { - if (us_behind > 1e6) { - qWarning() << "OVER ONE SECOND BEHIND, HACKING" << us_behind; - us_behind = 0; - t0 = tm; - t0r = timer.nsecsElapsed(); - } - QThread::usleep(us_behind); - //qDebug() << "sleeping" << us_behind << etime << timer.nsecsElapsed(); - } - - if (type == "roadCameraState") { - auto fr = e.getRoadCameraState(); - - auto it_ = eidx.find(fr.getFrameId()); - if (it_ != eidx.end()) { - auto pp = *it_; - //qDebug() << fr.getRoadCameraStateId() << pp; - - if (frs->find(pp.first) != frs->end()) { - auto frm = (*frs)[pp.first]; - auto data = frm->get(pp.second); - - if (vipc_server == nullptr) { - cl_device_id device_id = cl_get_device_id(CL_DEVICE_TYPE_DEFAULT); - cl_context context = CL_CHECK_ERR(clCreateContext(NULL, 1, &device_id, NULL, NULL, &err)); - - vipc_server = new VisionIpcServer("camerad", device_id, context); - vipc_server->create_buffers(VisionStreamType::VISION_STREAM_RGB_BACK, 4, true, frm->width, frm->height); - - vipc_server->start_listener(); - } - - VisionBuf *buf = vipc_server->get_buffer(VisionStreamType::VISION_STREAM_RGB_BACK); - memcpy(buf->addr, data, frm->getRGBSize()); - VisionIpcBufExtra extra = {}; - - vipc_server->send(buf, &extra, false); - } - } - } - - if (sm == nullptr){ - capnp::MallocMessageBuilder msg; - msg.setRoot(e); - auto words = capnp::messageToFlatArray(msg); - auto bytes = words.asBytes(); - - (*it)->send((char*)bytes.begin(), bytes.size()); - } else{ - std::vector> messages; - messages.push_back({type, e}); - sm->update_msgs(nanos_since_boot(), messages); - } - } - ++eit; - - if (time_to_end < 10.0 && !loading_segment){ - loading_segment = true; - emit loadSegment(); - } - } - } -} diff --git a/selfdrive/ui/replay/unlogger.h b/selfdrive/ui/replay/unlogger.h deleted file mode 100644 index d3e98076..00000000 --- a/selfdrive/ui/replay/unlogger.h +++ /dev/null @@ -1,42 +0,0 @@ -#pragma once - -#include -#include - -#include "cereal/messaging/messaging.h" -#include "cereal/visionipc/visionipc_server.h" -#include "selfdrive/common/clutil.h" -#include "selfdrive/ui/replay/filereader.h" -#include "tools/clib/framereader.h" - -class Unlogger : public QObject { -Q_OBJECT - public: - Unlogger(Events *events_, QReadWriteLock* events_lock_, QMap *frs_, int seek); - uint64_t getCurrentTime() { return tc; } - void setSeekRequest(uint64_t seek_request_) { seek_request = seek_request_; } - void setPause(bool pause) { paused = pause; } - void togglePause() { paused = !paused; } - QMap > eidx; - - public slots: - void process(SubMaster *sm = nullptr); - signals: - void elapsed(); - void finished(); - void loadSegment(); - private: - Events *events; - QReadWriteLock *events_lock; - QMap *frs; - QMap socks; - Context *ctx; - uint64_t tc = 0; - float last_print = 0; - uint64_t seek_request = 0; - bool paused = false; - bool loading_segment = false; - - VisionIpcServer *vipc_server = nullptr; -}; -