replay: ncurses GUI (#23608)

* initial version

* print all message's in ncurses window

* show download progress bar

* move all to class ConsoleUI

* timeline

* improve timeline&stats

* fix logMessage

* add warning indicator

* continue

* cleanup

* cast type to int

* simplify seekToFlag

* more

* <=

* handle enter

* add box to logging window

* fix multiple threads problem

* fix concurrency issues

* draw indicator

* many improvements

* more

* fix multipe threads logging

* stop replay before exit

* use lambda instead of std::bind

* cleanup

* small cleanup

* use carFingerPrint

* don't emit signal in replay::stream

* merge car_events into timeline

* cleanup DonloadStats

* cleanup

* rename carname to carFingerprint

* improve alert

* add comments

* add help functions

templete function

* handle term resize

* display replaying status

* rename to INSTANT

* helper function pauseReplay

* more

* cleanup

use rDebug

* no template

* less colors

* remove function mv_add_str

* use BORDER_SIZE

* tune colors

* add spaces

* apply reviews

use /
pull/23640/head^2
Dean Lee 2022-01-28 05:17:41 +08:00 committed by GitHub
parent 036c1799d4
commit 3ca8e3653b
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
13 changed files with 634 additions and 220 deletions

View File

@ -115,10 +115,10 @@ if GetOption('extras'):
if arch in ['x86_64', 'Darwin'] or GetOption('extras'):
qt_env['CXXFLAGS'] += ["-Wno-deprecated-declarations"]
replay_lib_src = ["replay/replay.cc", "replay/camera.cc", "replay/filereader.cc", "replay/logreader.cc", "replay/framereader.cc", "replay/route.cc", "replay/util.cc"]
replay_lib_src = ["replay/replay.cc", "replay/consoleui.cc", "replay/camera.cc", "replay/filereader.cc", "replay/logreader.cc", "replay/framereader.cc", "replay/route.cc", "replay/util.cc"]
replay_lib = qt_env.Library("qt_replay", replay_lib_src, LIBS=base_libs)
replay_libs = [replay_lib, 'avutil', 'avcodec', 'avformat', 'bz2', 'curl', 'yuv'] + qt_libs
replay_libs = [replay_lib, 'avutil', 'avcodec', 'avformat', 'bz2', 'curl', 'yuv', 'ncurses'] + qt_libs
qt_env.Program("replay/replay", ["replay/main.cc"], LIBS=replay_libs)
qt_env.Program("watch3", ["watch3.cc"], LIBS=qt_libs + ['common', 'json11', 'zmq', 'visionipc', 'messaging'])

View File

@ -1,7 +1,7 @@
#include "selfdrive/ui/replay/camera.h"
#include "selfdrive/ui/replay/util.h"
#include <cassert>
#include <iostream>
CameraServer::CameraServer(std::pair<int, int> camera_size[MAX_CAMERAS], bool send_yuv) : send_yuv(send_yuv) {
for (int i = 0; i < MAX_CAMERAS; ++i) {
@ -24,7 +24,7 @@ void CameraServer::startVipcServer() {
vipc_server_.reset(new VisionIpcServer("camerad"));
for (auto &cam : cameras_) {
if (cam.width > 0 && cam.height > 0) {
std::cout << "camera[" << cam.type << "] frame size " << cam.width << "x" << cam.height << std::endl;
rInfo("camera[%d] frame size %dx%d", cam.type, cam.width, cam.height);
vipc_server_->create_buffers(cam.rgb_type, UI_BUF_COUNT, true, cam.width, cam.height);
if (send_yuv) {
vipc_server_->create_buffers(cam.yuv_type, YUV_BUFFER_COUNT, false, cam.width, cam.height);
@ -61,7 +61,7 @@ void CameraServer::cameraThread(Camera &cam) {
if (rgb) vipc_server_->send(rgb, &extra, false);
if (yuv) vipc_server_->send(yuv, &extra, false);
} else {
std::cout << "camera[" << cam.type << "] failed to get frame:" << eidx.getSegmentId() << std::endl;
rError("camera[%d] failed to get frame:", cam.type, eidx.getSegmentId());
}
cam.cached_id = id + 1;

View File

@ -0,0 +1,353 @@
#include "selfdrive/ui/replay/consoleui.h"
#include <QApplication>
#include <initializer_list>
#include "selfdrive/common/version.h"
namespace {
const int BORDER_SIZE = 3;
const std::initializer_list<std::pair<std::string, std::string>> keyboard_shortcuts[] = {
{
{"s", "+10s"},
{"shift+s", "-10s"},
{"m", "+60s"},
{"shift+m", "-60s"},
{"p", "Pause/Resume"},
{"e", "Next Engagement"},
{"d", "Next Disengagement"},
},
{
{"enter", "Enter seek request"},
{"x", "+/-Replay speed"},
{"q", "Exit"},
},
};
enum Color {
Default,
Debug,
Yellow,
Green,
Red,
BrightWhite,
Engaged,
Disengaged,
};
void add_str(WINDOW *w, const char *str, Color color = Color::Default, bool bold = false) {
if (color != Color::Default) wattron(w, COLOR_PAIR(color));
if (bold) wattron(w, A_BOLD);
waddstr(w, str);
if (bold) wattroff(w, A_BOLD);
if (color != Color::Default) wattroff(w, COLOR_PAIR(color));
}
std::string format_seconds(int s) {
int total_minutes = s / 60;
int seconds = s % 60;
int hours = total_minutes / 60;
int minutes = total_minutes % 60;
return util::string_format("%02d:%02d:%02d", hours, minutes, seconds);
}
} // namespace
ConsoleUI::ConsoleUI(Replay *replay, QObject *parent) : replay(replay), sm({"carState", "liveParameters"}), QObject(parent) {
// Initialize curses
initscr();
clear();
curs_set(false);
cbreak(); // Line buffering disabled. pass on everything
noecho();
keypad(stdscr, true);
nodelay(stdscr, true); // non-blocking getchar()
// Initialize all the colors. https://www.ditig.com/256-colors-cheat-sheet
start_color();
init_pair(Color::Debug, 246, COLOR_BLACK); // #949494
init_pair(Color::Yellow, 184, COLOR_BLACK);
init_pair(Color::Red, COLOR_RED, COLOR_BLACK);
init_pair(Color::BrightWhite, 15, COLOR_BLACK);
init_pair(Color::Disengaged, COLOR_BLUE, COLOR_BLUE);
init_pair(Color::Engaged, 28, 28);
init_pair(Color::Green, 34, COLOR_BLACK);
initWindows();
qRegisterMetaType<uint64_t>("uint64_t");
qRegisterMetaType<ReplyMsgType>("ReplyMsgType");
installMessageHandler([this](ReplyMsgType type, const std::string msg) {
emit logMessageSignal(type, QString::fromStdString(msg));
});
installDownloadProgressHandler([this](uint64_t cur, uint64_t total, bool success) {
emit updateProgressBarSignal(cur, total, success);
});
QObject::connect(replay, &Replay::streamStarted, this, &ConsoleUI::updateSummary);
QObject::connect(&notifier, SIGNAL(activated(int)), SLOT(readyRead()));
QObject::connect(this, &ConsoleUI::updateProgressBarSignal, this, &ConsoleUI::updateProgressBar);
QObject::connect(this, &ConsoleUI::logMessageSignal, this, &ConsoleUI::logMessage);
sm_timer.callOnTimeout(this, &ConsoleUI::updateStatus);
sm_timer.start(100);
getch_timer.start(1000, this);
readyRead();
}
ConsoleUI::~ConsoleUI() {
endwin();
}
void ConsoleUI::initWindows() {
getmaxyx(stdscr, max_height, max_width);
w.fill(nullptr);
w[Win::Title] = newwin(1, max_width, 0, 0);
w[Win::Stats] = newwin(2, max_width - 2 * BORDER_SIZE, 2, BORDER_SIZE);
w[Win::Timeline] = newwin(4, max_width - 2 * BORDER_SIZE, 5, BORDER_SIZE);
w[Win::TimelineDesc] = newwin(1, 100, 10, BORDER_SIZE);
w[Win::CarState] = newwin(3, 100, 12, BORDER_SIZE);
w[Win::DownloadBar] = newwin(1, 100, 16, BORDER_SIZE);
if (int log_height = max_height - 27; log_height > 4) {
w[Win::LogBorder] = newwin(log_height, max_width - 2 * (BORDER_SIZE - 1), 17, BORDER_SIZE - 1);
box(w[Win::LogBorder], 0, 0);
w[Win::Log] = newwin(log_height - 2, max_width - 2 * BORDER_SIZE, 18, BORDER_SIZE);
scrollok(w[Win::Log], true);
}
w[Win::Help] = newwin(5, max_width - (2 * BORDER_SIZE), max_height - 6, BORDER_SIZE);
// set the title bar
wbkgd(w[Win::Title], A_REVERSE);
mvwprintw(w[Win::Title], 0, 3, "openpilot replay %s", COMMA_VERSION);
// show windows on the real screen
refresh();
displayTimelineDesc();
displayHelp();
updateSummary();
updateTimeline();
for (auto win : w) {
if (win) wrefresh(win);
}
}
void ConsoleUI::timerEvent(QTimerEvent *ev) {
if (ev->timerId() != getch_timer.timerId()) return;
if (is_term_resized(max_height, max_width)) {
for (auto win : w) {
if (win) delwin(win);
}
endwin();
clear();
refresh();
initWindows();
rWarning("resize term %dx%d", max_height, max_width);
}
updateTimeline();
}
void ConsoleUI::updateStatus() {
auto write_item = [this](int y, int x, const char *key, const std::string &value, const char *unit,
bool bold = false, Color color = Color::BrightWhite) {
auto win = w[Win::CarState];
wmove(win, y, x);
add_str(win, key);
add_str(win, value.c_str(), color, bold);
add_str(win, unit);
};
static const std::pair<const char *, Color> status_text[] = {
{"loading...", Color::Red},
{"playing", Color::Green},
{"paused...", Color::Yellow},
};
sm.update(0);
if (status != Status::Paused) {
status = (sm.updated("carState") || sm.updated("liveParameters")) ? Status::Playing : Status::Waiting;
}
auto [status_str, status_color] = status_text[status];
write_item(0, 0, "STATUS: ", status_str, " ", false, status_color);
std::string suffix = util::string_format(" / %s [%d/%d] ", format_seconds(replay->totalSeconds()).c_str(),
replay->currentSeconds() / 60, replay->route()->segments().size());
write_item(0, 25, "TIME: ", format_seconds(replay->currentSeconds()), suffix.c_str(), true);
auto p = sm["liveParameters"].getLiveParameters();
write_item(1, 0, "STIFFNESS: ", util::string_format("%.2f %%", p.getStiffnessFactor() * 100), " ");
write_item(1, 25, "SPEED: ", util::string_format("%.2f", sm["carState"].getCarState().getVEgo()), " m/s");
write_item(2, 0, "STEER RATIO: ", util::string_format("%.2f", p.getSteerRatio()), "");
auto angle_offsets = util::string_format("%.2f|%.2f", p.getAngleOffsetAverageDeg(), p.getAngleOffsetDeg());
write_item(2, 25, "ANGLE OFFSET(AVG|INSTANT): ", angle_offsets, " deg");
wrefresh(w[Win::CarState]);
}
void ConsoleUI::displayHelp() {
for (int i = 0; i < std::size(keyboard_shortcuts); ++i) {
wmove(w[Win::Help], i * 2, 0);
for (auto &[key, desc] : keyboard_shortcuts[i]) {
wattron(w[Win::Help], A_REVERSE);
waddstr(w[Win::Help], (' ' + key + ' ').c_str());
wattroff(w[Win::Help], A_REVERSE);
waddstr(w[Win::Help], (' ' + desc + ' ').c_str());
}
}
wrefresh(w[Win::Help]);
}
void ConsoleUI::displayTimelineDesc() {
std::tuple<Color, const char *, bool> indicators[]{
{Color::Engaged, " Engaged ", false},
{Color::Disengaged, " Disengaged ", false},
{Color::Green, " Info ", true},
{Color::Yellow, " Warning ", true},
{Color::Red, " Critical ", true},
};
for (auto [color, name, bold] : indicators) {
add_str(w[Win::TimelineDesc], "__", color, bold);
add_str(w[Win::TimelineDesc], name);
}
}
void ConsoleUI::logMessage(ReplyMsgType type, const QString &msg) {
if (auto win = w[Win::Log]) {
Color color = Color::Default;
if (type == ReplyMsgType::Debug) {
color = Color::Debug;
} else if (type == ReplyMsgType::Warning) {
color = Color::Yellow;
} else if (type == ReplyMsgType::Critical) {
color = Color::Red;
}
add_str(win, qPrintable(msg + "\n"), color);
wrefresh(win);
}
}
void ConsoleUI::updateProgressBar(uint64_t cur, uint64_t total, bool success) {
werase(w[Win::DownloadBar]);
if (success && cur < total) {
const int width = 35;
const float progress = cur / (double)total;
const int pos = width * progress;
wprintw(w[Win::DownloadBar], "Downloading [%s>%s] %d%% %s", std::string(pos, '=').c_str(),
std::string(width - pos, ' ').c_str(), int(progress * 100.0), formattedDataSize(total).c_str());
}
wrefresh(w[Win::DownloadBar]);
}
void ConsoleUI::updateSummary() {
const auto &route = replay->route();
mvwprintw(w[Win::Stats], 0, 0, "Route: %s, %d segments", qPrintable(route->name()), route->segments().size());
mvwprintw(w[Win::Stats], 1, 0, "Car Fingerprint: %s", replay->carFingerprint().c_str());
wrefresh(w[Win::Stats]);
}
void ConsoleUI::updateTimeline() {
auto win = w[Win::Timeline];
int width = getmaxx(win);
werase(win);
wattron(win, COLOR_PAIR(Color::Disengaged));
mvwhline(win, 1, 0, ' ', width);
mvwhline(win, 2, 0, ' ', width);
wattroff(win, COLOR_PAIR(Color::Disengaged));
const int total_sec = replay->totalSeconds();
for (auto [begin, end, type] : replay->getTimeline()) {
int start_pos = ((double)begin / total_sec) * width;
int end_pos = ((double)end / total_sec) * width;
if (type == TimelineType::Engaged) {
mvwchgat(win, 1, start_pos, end_pos - start_pos + 1, A_COLOR, Color::Engaged, NULL);
mvwchgat(win, 2, start_pos, end_pos - start_pos + 1, A_COLOR, Color::Engaged, NULL);
} else {
auto color_id = Color::Green;
if (type != TimelineType::AlertInfo) {
color_id = type == TimelineType::AlertWarning ? Color::Yellow : Color::Red;
}
mvwchgat(win, 3, start_pos, end_pos - start_pos + 1, ACS_S3, color_id, NULL);
}
}
int cur_pos = ((double)replay->currentSeconds() / total_sec) * width;
wattron(win, COLOR_PAIR(Color::BrightWhite));
mvwaddch(win, 0, cur_pos, ACS_VLINE);
mvwaddch(win, 3, cur_pos, ACS_VLINE);
wattroff(win, COLOR_PAIR(Color::BrightWhite));
wrefresh(win);
}
void ConsoleUI::readyRead() {
int c;
while ((c = getch()) != ERR) {
handleKey(c);
}
}
void ConsoleUI::pauseReplay(bool pause) {
replay->pause(pause);
status = pause ? Status::Paused : Status::Waiting;
}
void ConsoleUI::handleKey(char c) {
if (c == '\n') {
// pause the replay and blocking getchar()
pauseReplay(true);
updateStatus();
getch_timer.stop();
curs_set(true);
nodelay(stdscr, false);
// Wait for user input
rWarning("Waiting for input...");
int y = getmaxy(stdscr) - 9;
move(y, BORDER_SIZE);
add_str(stdscr, "Enter seek request: ", Color::BrightWhite, true);
refresh();
// Seek to choice
echo();
int choice = 0;
scanw((char *)"%d", &choice);
noecho();
pauseReplay(false);
replay->seekTo(choice, false);
// Clean up and turn off the blocking mode
move(y, 0);
clrtoeol();
nodelay(stdscr, true);
curs_set(false);
refresh();
getch_timer.start(1000, this);
} else if (c == 'x') {
if (replay->hasFlag(REPLAY_FLAG_FULL_SPEED)) {
replay->removeFlag(REPLAY_FLAG_FULL_SPEED);
rWarning("replay at normal speed");
} else {
replay->addFlag(REPLAY_FLAG_FULL_SPEED);
rWarning("replay at full speed");
}
} else if (c == 'e') {
replay->seekToFlag(FindFlag::nextEngagement);
} else if (c == 'd') {
replay->seekToFlag(FindFlag::nextDisEngagement);
} else if (c == 'm') {
replay->seekTo(+60, true);
} else if (c == 'M') {
replay->seekTo(-60, true);
} else if (c == 's') {
replay->seekTo(+10, true);
} else if (c == 'S') {
replay->seekTo(-10, true);
} else if (c == ' ') {
pauseReplay(!replay->isPaused());
} else if (c == 'q' || c == 'Q') {
replay->stop();
qApp->exit();
}
}

View File

@ -0,0 +1,50 @@
#pragma once
#include <array>
#include <QBasicTimer>
#include <QObject>
#include <QSocketNotifier>
#include <QTimer>
#include <QTimerEvent>
#include "selfdrive/ui/replay/replay.h"
#include <ncurses.h>
class ConsoleUI : public QObject {
Q_OBJECT
public:
ConsoleUI(Replay *replay, QObject *parent = 0);
~ConsoleUI();
private:
void initWindows();
void handleKey(char c);
void displayHelp();
void displayTimelineDesc();
void updateTimeline();
void updateSummary();
void updateStatus();
void pauseReplay(bool pause);
enum Status { Waiting, Playing, Paused };
enum Win { Title, Stats, Log, LogBorder, DownloadBar, Timeline, TimelineDesc, Help, CarState, Max};
std::array<WINDOW*, Win::Max> w{};
SubMaster sm;
Replay *replay;
QBasicTimer getch_timer;
QTimer sm_timer;
QSocketNotifier notifier{0, QSocketNotifier::Read, this};
int max_width, max_height;
Status status = Status::Waiting;
signals:
void updateProgressBarSignal(uint64_t cur, uint64_t total, bool success);
void logMessageSignal(ReplyMsgType type, const QString &msg);
private slots:
void readyRead();
void timerEvent(QTimerEvent *ev);
void updateProgressBar(uint64_t cur, uint64_t total, bool success);
void logMessage(ReplyMsgType type, const QString &msg);
};

View File

@ -1,7 +1,6 @@
#include "selfdrive/ui/replay/filereader.h"
#include <fstream>
#include <iostream>
#include "selfdrive/common/util.h"
#include "selfdrive/ui/replay/util.h"
@ -35,13 +34,12 @@ std::string FileReader::read(const std::string &file, std::atomic<bool> *abort)
std::string FileReader::download(const std::string &url, std::atomic<bool> *abort) {
for (int i = 0; i <= max_retries_ && !(abort && *abort); ++i) {
if (i > 0) rWarning("download failed, retrying %d", i);
std::string result = httpGet(url, chunk_size_, abort);
if (!result.empty()) {
return result;
}
if (i != max_retries_ && !(abort && *abort)) {
std::cout << "download failed, retrying " << i + 1 << std::endl;
}
}
return {};
}

View File

@ -1,4 +1,5 @@
#include "selfdrive/ui/replay/framereader.h"
#include "selfdrive/ui/replay/util.h"
#include <cassert>
#include "libyuv.h"
@ -29,7 +30,7 @@ enum AVPixelFormat get_hw_format(AVCodecContext *ctx, const enum AVPixelFormat *
for (const enum AVPixelFormat *p = pix_fmts; *p != -1; p++) {
if (*p == *hw_pix_fmt) return *p;
}
printf("Please run replay with the --no-cuda flag!\n");
rWarning("Please run replay with the --no-cuda flag!");
// fallback to YUV420p
*hw_pix_fmt = AV_PIX_FMT_NONE;
return AV_PIX_FMT_YUV420P;
@ -37,7 +38,9 @@ enum AVPixelFormat get_hw_format(AVCodecContext *ctx, const enum AVPixelFormat *
} // namespace
FrameReader::FrameReader() {}
FrameReader::FrameReader() {
av_log_set_level(AV_LOG_QUIET);
}
FrameReader::~FrameReader() {
for (AVPacket *pkt : packets) {
@ -81,13 +84,13 @@ bool FrameReader::load(const std::byte *data, size_t size, bool no_cuda, std::at
if (ret != 0) {
char err_str[1024] = {0};
av_strerror(ret, err_str, std::size(err_str));
printf("Error loading video - %s\n", err_str);
rWarning("Error loading video - %s", err_str);
return false;
}
ret = avformat_find_stream_info(input_ctx, nullptr);
if (ret < 0) {
printf("cannot find a video stream in the input file\n");
rWarning("cannot find a video stream in the input file");
return false;
}
@ -105,7 +108,7 @@ bool FrameReader::load(const std::byte *data, size_t size, bool no_cuda, std::at
if (has_cuda_device && !no_cuda) {
if (!initHardwareDecoder(AV_HWDEVICE_TYPE_CUDA)) {
printf("No CUDA capable device was found. fallback to CPU decoding.\n");
rWarning("No CUDA capable device was found. fallback to CPU decoding.");
} else {
nv12toyuv_buffer.resize(getYUVSize());
}
@ -135,8 +138,8 @@ bool FrameReader::initHardwareDecoder(AVHWDeviceType hw_device_type) {
for (int i = 0;; i++) {
const AVCodecHWConfig *config = avcodec_get_hw_config(decoder_ctx->codec, i);
if (!config) {
printf("decoder %s does not support hw device type %s.\n",
decoder_ctx->codec->name, av_hwdevice_get_type_name(hw_device_type));
rWarning("decoder %s does not support hw device type %s.", decoder_ctx->codec->name,
av_hwdevice_get_type_name(hw_device_type));
return false;
}
if (config->methods & AV_CODEC_HW_CONFIG_METHOD_HW_DEVICE_CTX && config->device_type == hw_device_type) {
@ -149,7 +152,7 @@ bool FrameReader::initHardwareDecoder(AVHWDeviceType hw_device_type) {
if (ret < 0) {
hw_pix_fmt = AV_PIX_FMT_NONE;
has_cuda_device = false;
printf("Failed to create specified HW device %d.\n", ret);
rWarning("Failed to create specified HW device %d.", ret);
return false;
}
@ -192,7 +195,7 @@ bool FrameReader::decode(int idx, uint8_t *rgb, uint8_t *yuv) {
AVFrame *FrameReader::decodeFrame(AVPacket *pkt) {
int ret = avcodec_send_packet(decoder_ctx, pkt);
if (ret < 0) {
printf("Error sending a packet for decoding\n");
rWarning("Error sending a packet for decoding");
return nullptr;
}
@ -205,7 +208,7 @@ AVFrame *FrameReader::decodeFrame(AVPacket *pkt) {
if (av_frame_->format == hw_pix_fmt) {
hw_frame.reset(av_frame_alloc());
if ((ret = av_hwframe_transfer_data(hw_frame.get(), av_frame_.get(), 0)) < 0) {
printf("error transferring the data from GPU to CPU\n");
rWarning("error transferring the data from GPU to CPU");
return nullptr;
}
return hw_frame.get();

View File

@ -1,7 +1,6 @@
#include "selfdrive/ui/replay/logreader.h"
#include <algorithm>
#include <iostream>
#include "selfdrive/ui/replay/util.h"
Event::Event(const kj::ArrayPtr<const capnp::word> &amsg, bool frame) : reader(amsg), frame(frame) {
@ -59,7 +58,7 @@ bool LogReader::load(const std::byte *data, size_t size, std::atomic<bool> *abor
raw_ = decompressBZ2(data, size, abort);
if (raw_.empty()) {
if (!(abort && *abort)) {
std::cout << "failed to decompress log" << std::endl;
rWarning("failed to decompress log");
}
return false;
}
@ -92,9 +91,9 @@ bool LogReader::load(const std::byte *data, size_t size, std::atomic<bool> *abor
events.push_back(evt);
}
} catch (const kj::Exception &e) {
std::cout << "failed to parse log : " << e.getDescription().cStr() << std::endl;
rWarning("failed to parse log : %s", e.getDescription().cStr());
if (!events.empty()) {
std::cout << "read " << events.size() << " events from corrupt log" << std::endl;
rWarning("read %zu events from corrupt log", events.size());
}
}

View File

@ -1,109 +1,13 @@
#include <termios.h>
#include <QApplication>
#include <QCommandLineParser>
#include <QDebug>
#include <QThread>
#include <csignal>
#include <iostream>
#include "selfdrive/ui/replay/consoleui.h"
#include "selfdrive/ui/replay/replay.h"
const QString DEMO_ROUTE = "4cf7a6ad03080c90|2021-09-29--13-46-36";
struct termios oldt = {};
Replay *replay = nullptr;
void sigHandler(int s) {
std::signal(s, SIG_DFL);
if (oldt.c_lflag) {
tcsetattr(STDIN_FILENO, TCSANOW, &oldt);
}
replay->stop();
qApp->quit();
}
int getch() {
int ch;
struct termios newt;
tcgetattr(STDIN_FILENO, &oldt);
newt = oldt;
newt.c_lflag &= ~(ICANON | ECHO);
tcsetattr(STDIN_FILENO, TCSANOW, &newt);
ch = getchar();
tcsetattr(STDIN_FILENO, TCSANOW, &oldt);
return ch;
}
void keyboardThread(Replay *replay_) {
char c;
while (true) {
c = getch();
if (c == '\n') {
printf("Enter seek request: ");
std::string r;
std::cin >> r;
try {
if (r[0] == '#') {
r.erase(0, 1);
replay_->seekTo(std::stoi(r) * 60, false);
} else {
replay_->seekTo(std::stoi(r), false);
}
} catch (std::invalid_argument) {
qDebug() << "invalid argument";
}
getch(); // remove \n from entering seek
} else if (c == 'e') {
replay_->seekToFlag(FindFlag::nextEngagement);
} else if (c == 'd') {
replay_->seekToFlag(FindFlag::nextDisEngagement);
} else if (c == 'm') {
replay_->seekTo(+60, true);
} else if (c == 'M') {
replay_->seekTo(-60, true);
} else if (c == 's') {
replay_->seekTo(+10, true);
} else if (c == 'S') {
replay_->seekTo(-10, true);
} else if (c == 'G') {
replay_->seekTo(0, true);
} else if (c == 'x') {
if (replay_->hasFlag(REPLAY_FLAG_FULL_SPEED)) {
replay_->removeFlag(REPLAY_FLAG_FULL_SPEED);
qInfo() << "replay at normal speed";
} else {
replay_->addFlag(REPLAY_FLAG_FULL_SPEED);
qInfo() << "replay at full speed";
}
} else if (c == ' ') {
replay_->pause(!replay_->isPaused());
}
}
}
void replayMessageOutput(QtMsgType type, const QMessageLogContext &context, const QString &msg) {
QByteArray localMsg = msg.toLocal8Bit();
if (type == QtDebugMsg) {
std::cout << "\033[38;5;248m" << localMsg.constData() << "\033[00m" << std::endl;
} else if (type == QtWarningMsg) {
std::cout << "\033[38;5;227m" << localMsg.constData() << "\033[00m" << std::endl;
} else if (type == QtCriticalMsg) {
std::cout << "\033[38;5;196m" << localMsg.constData() << "\033[00m" << std::endl;
} else {
std::cout << localMsg.constData() << std::endl;
}
}
int main(int argc, char *argv[]) {
qInstallMessageHandler(replayMessageOutput);
QApplication app(argc, argv);
std::signal(SIGINT, sigHandler);
std::signal(SIGTERM, sigHandler);
const std::tuple<QString, REPLAY_FLAGS, QString> flags[] = {
{"dcam", REPLAY_FLAG_DCAM, "load driver camera"},
@ -145,16 +49,12 @@ int main(int argc, char *argv[]) {
replay_flags |= flag;
}
}
replay = new Replay(route, allow, block, nullptr, replay_flags, parser.value("data_dir"), &app);
Replay *replay = new Replay(route, allow, block, nullptr, replay_flags, parser.value("data_dir"), &app);
if (!replay->load()) {
return 0;
}
replay->start(parser.value("start").toInt());
// start keyboard control thread
QThread *t = new QThread();
QObject::connect(t, &QThread::started, [=]() { keyboardThread(replay); });
QObject::connect(t, &QThread::finished, t, &QThread::deleteLater);
t->start();
ConsoleUI console_ui(replay);
replay->start(parser.value("start").toInt());
return app.exec();
}

View File

@ -1,7 +1,7 @@
#include "selfdrive/ui/replay/replay.h"
#include <QApplication>
#include <QDebug>
#include <QtConcurrent>
#include <capnp/dynamic.h>
#include "cereal/services.h"
@ -23,6 +23,7 @@ Replay::Replay(QString route, QStringList allow, QStringList block, SubMaster *s
}
}
qDebug() << "services " << s;
qDebug() << "loading route " << route;
if (sm == nullptr) {
pm = std::make_unique<PubMaster>(s);
@ -33,19 +34,17 @@ Replay::Replay(QString route, QStringList allow, QStringList block, SubMaster *s
qRegisterMetaType<FindFlag>("FindFlag");
connect(this, &Replay::seekTo, this, &Replay::doSeek);
connect(this, &Replay::seekToFlag, this, &Replay::doSeekToFlag);
connect(this, &Replay::stop, this, &Replay::doStop);
connect(this, &Replay::segmentChanged, this, &Replay::queueSegment);
}
Replay::~Replay() {
doStop();
stop();
}
void Replay::doStop() {
void Replay::stop() {
if (!stream_thread_ && segments_.empty()) return;
qDebug() << "shutdown: in progress...";
rInfo("shutdown: in progress...");
if (stream_thread_ != nullptr) {
exit_ = updating_events_ = true;
stream_cv_.notify_one();
@ -55,7 +54,8 @@ void Replay::doStop() {
}
segments_.clear();
camera_server_.reset(nullptr);
qDebug() << "shutdown: done";
timeline_future.waitForFinished();
rInfo("shutdown: done");
}
bool Replay::load() {
@ -75,7 +75,7 @@ bool Replay::load() {
qCritical() << "no valid segments in route" << route_->name();
return false;
}
qInfo() << "load route" << route_->name() << "with" << segments_.size() << "valid segments";
rInfo("load route %s with %zu valid segments", qPrintable(route_->name()), segments_.size());
return true;
}
@ -104,11 +104,11 @@ void Replay::doSeek(int seconds, bool relative) {
seconds = std::max(0, seconds);
int seg = seconds / 60;
if (segments_.find(seg) == segments_.end()) {
qWarning() << "can't seek to" << seconds << "s, segment" << seg << "is invalid";
rWarning("can't seek to %d s segment %d is invalid", seconds, seg);
return true;
}
qInfo() << "seeking to" << seconds << "s, segment" << seg;
rWarning("seeking to %d s, segment %d", seconds, seg);
current_segment_ = seg;
cur_mono_time_ = route_start_ts_ + seconds * 1e9;
return isSegmentMerged(seg);
@ -116,49 +116,73 @@ void Replay::doSeek(int seconds, bool relative) {
queueSegment();
}
void Replay::doSeekToFlag(FindFlag flag) {
void Replay::seekToFlag(FindFlag flag) {
if (flag == FindFlag::nextEngagement) {
qInfo() << "seeking to the next engagement...";
rWarning("seeking to the next engagement...");
} else if (flag == FindFlag::nextDisEngagement) {
qInfo() << "seeking to the disengagement...";
rWarning("seeking to the disengagement...");
}
updateEvents([&]() {
if (auto next = find(flag)) {
uint64_t tm = *next - 2 * 1e9; // seek to 2 seconds before next
if (tm <= cur_mono_time_) {
return true;
}
cur_mono_time_ = tm;
int tm = *next - 2; // seek to 2 seconds before next
cur_mono_time_ = (route_start_ts_ + tm * 1e9);
current_segment_ = currentSeconds() / 60;
return isSegmentMerged(current_segment_);
} else {
qWarning() << "seeking failed";
return true;
rWarning("seeking failed");
}
return isSegmentMerged(current_segment_);
});
queueSegment();
}
std::optional<uint64_t> Replay::find(FindFlag flag) {
// Search in all segments
for (const auto &[n, _] : segments_) {
if (n < current_segment_) continue;
void Replay::buildTimeline() {
uint64_t engaged_begin = 0;
uint64_t alert_begin = 0;
TimelineType alert_type = TimelineType::None;
for (int i = 0; i < segments_.size() && !exit_; ++i) {
LogReader log;
bool cache_to_local = true; // cache qlog to local for fast seek
if (!log.load(route_->at(n).qlog.toStdString(), nullptr, cache_to_local, 0, 3)) continue;
if (!log.load(route_->at(i).qlog.toStdString(), &exit_, !hasFlag(REPLAY_FLAG_NO_FILE_CACHE), 0, 3)) continue;
for (const Event *e : log.events) {
if (e->mono_time > cur_mono_time_ && e->which == cereal::Event::Which::CONTROLS_STATE) {
const auto cs = e->event.getControlsState();
if (flag == FindFlag::nextEngagement && cs.getEnabled()) {
return e->mono_time;
} else if (flag == FindFlag::nextDisEngagement && !cs.getEnabled()) {
return e->mono_time;
if (e->which == cereal::Event::Which::CONTROLS_STATE) {
auto cs = e->event.getControlsState();
if (!engaged_begin && cs.getEnabled()) {
engaged_begin = e->mono_time;
} else if (engaged_begin && !cs.getEnabled()) {
std::lock_guard lk(timeline_lock);
timeline.push_back({toSeconds(engaged_begin), toSeconds(e->mono_time), TimelineType::Engaged});
engaged_begin = 0;
}
if (!alert_begin && cs.getAlertType().size() > 0) {
alert_begin = e->mono_time;
alert_type = TimelineType::AlertInfo;
if (cs.getAlertStatus() != cereal::ControlsState::AlertStatus::NORMAL) {
alert_type = cs.getAlertStatus() == cereal::ControlsState::AlertStatus::USER_PROMPT
? TimelineType::AlertWarning
: TimelineType::AlertCritical;
}
} else if (alert_begin && cs.getAlertType().size() == 0) {
std::lock_guard lk(timeline_lock);
timeline.push_back({toSeconds(alert_begin), toSeconds(e->mono_time), alert_type});
alert_begin = 0;
}
}
}
}
}
std::optional<uint64_t> Replay::find(FindFlag flag) {
int cur_ts = currentSeconds();
for (auto [start_ts, end_ts, type] : getTimeline()) {
if (type == TimelineType::Engaged) {
if (flag == FindFlag::nextEngagement && start_ts > cur_ts) {
return start_ts;
} else if (flag == FindFlag::nextDisEngagement && end_ts > cur_ts) {
return end_ts;
}
}
}
@ -167,10 +191,7 @@ std::optional<uint64_t> Replay::find(FindFlag flag) {
void Replay::pause(bool pause) {
updateEvents([=]() {
qInfo() << (pause ? "paused..." : "resuming");
if (pause) {
qInfo() << "at " << currentSeconds() << "s";
}
rWarning("%s at %d s", pause ? "paused..." : "resuming", currentSeconds());
paused_ = pause;
return true;
});
@ -185,7 +206,7 @@ void Replay::setCurrentSegment(int n) {
void Replay::segmentLoadFinished(bool success) {
if (!success) {
Segment *seg = qobject_cast<Segment *>(sender());
qWarning() << "failed to load segment " << seg->seg_num << ", removing it from current replay list";
rWarning("failed to load segment %d, removing it from current replay list", seg->seg_num);
segments_.erase(seg->seg_num);
}
queueSegment();
@ -201,19 +222,18 @@ void Replay::queueSegment() {
}
// load one segment at a time
for (auto it = cur; it != end; ++it) {
if (!it->second) {
if (it == cur || std::prev(it)->second->isLoaded()) {
auto &[n, seg] = *it;
auto &[n, seg] = *it;
if ((seg && !seg->isLoaded()) || !seg) {
if (!seg) {
rDebug("loading segment %d...", n);
seg = std::make_unique<Segment>(n, route_->at(n), flags_);
QObject::connect(seg.get(), &Segment::loadFinished, this, &Replay::segmentLoadFinished);
qDebug() << "loading segment" << n << "...";
}
break;
}
}
const auto &cur_segment = cur->second;
enableHttpLogging(!cur_segment->isLoaded());
const auto &cur_segment = cur->second;
// merge the previous adjacent segment if it's loaded
auto begin = segments_.find(cur_segment->seg_num - 1);
if (begin == segments_.end() || !(begin->second && begin->second->isLoaded())) {
@ -228,6 +248,7 @@ void Replay::queueSegment() {
// start stream thread
if (stream_thread_ == nullptr && cur_segment->isLoaded()) {
startStream(cur_segment.get());
emit streamStarted();
}
}
@ -235,13 +256,18 @@ void Replay::mergeSegments(const SegmentMap::iterator &begin, const SegmentMap::
// merge 3 segments in sequence.
std::vector<int> segments_need_merge;
size_t new_events_size = 0;
for (auto it = begin; it != end && it->second->isLoaded() && segments_need_merge.size() < 3; ++it) {
for (auto it = begin; it != end && it->second && it->second->isLoaded() && segments_need_merge.size() < 3; ++it) {
segments_need_merge.push_back(it->first);
new_events_size += it->second->log->events.size();
}
if (segments_need_merge != segments_merged_) {
qDebug() << "merge segments" << segments_need_merge;
std::string s;
for (int i = 0; i < segments_need_merge.size(); ++i) {
s += std::to_string(segments_need_merge[i]);
if (i != segments_need_merge.size() - 1) s += ", ";
}
rDebug("merge segments %s", s.c_str());
new_events_->clear();
new_events_->reserve(new_events_size);
for (int n : segments_need_merge) {
@ -269,10 +295,11 @@ void Replay::startStream(const Segment *cur_segment) {
// write CarParams
it = std::find_if(events.begin(), events.end(), [](auto e) { return e->which == cereal::Event::Which::CAR_PARAMS; });
if (it != events.end()) {
car_fingerprint_ = (*it)->event.getCarParams().getCarFingerprint();
auto bytes = (*it)->bytes();
Params().put("CarParams", (const char *)bytes.begin(), bytes.size());
} else {
qWarning() << "failed to read CarParams from current segment";
rWarning("failed to read CarParams from current segment");
}
// start camera server
@ -291,6 +318,8 @@ void Replay::startStream(const Segment *cur_segment) {
QObject::connect(stream_thread_, &QThread::started, [=]() { stream(); });
QObject::connect(stream_thread_, &QThread::finished, stream_thread_, &QThread::deleteLater);
stream_thread_->start();
timeline_future = QtConcurrent::run(this, &Replay::buildTimeline);
}
void Replay::publishMessage(const Event *e) {
@ -298,7 +327,7 @@ void Replay::publishMessage(const Event *e) {
auto bytes = e->bytes();
int ret = pm->send(sockets_[e->which], (capnp::byte *)bytes.begin(), bytes.size());
if (ret == -1) {
qDebug() << "stop publishing" << sockets_[e->which] << "due to multiple publishers error";
rWarning("stop publishing %s due to multiple publishers error", sockets_[e->which]);
sockets_[e->which] = nullptr;
}
} else {
@ -324,9 +353,7 @@ void Replay::publishFrame(const Event *e) {
}
void Replay::stream() {
float last_print = 0;
cereal::Event::Which cur_which = cereal::Event::Which::INIT_DATA;
std::unique_lock lk(stream_lock_);
while (true) {
@ -337,7 +364,7 @@ void Replay::stream() {
Event cur_event(cur_which, cur_mono_time_);
auto eit = std::upper_bound(events_->begin(), events_->end(), &cur_event, Event::lessThan());
if (eit == events_->end()) {
qDebug() << "waiting for events...";
rInfo("waiting for events...");
continue;
}
@ -348,12 +375,7 @@ void Replay::stream() {
const Event *evt = (*eit);
cur_which = evt->which;
cur_mono_time_ = evt->mono_time;
const int current_ts = currentSeconds();
if (last_print > current_ts || (current_ts - last_print) > 5.0) {
last_print = current_ts;
qInfo() << "at " << current_ts << "s";
}
setCurrentSegment(current_ts / 60);
setCurrentSegment(toSeconds(cur_mono_time_) / 60);
// migration for pandaState -> pandaStates to keep UI working for old segments
if (cur_which == cereal::Event::Which::PANDA_STATE_D_E_P_R_E_C_A_T_E_D) {
@ -396,7 +418,7 @@ void Replay::stream() {
if (eit == events_->end() && !hasFlag(REPLAY_FLAG_NO_LOOP)) {
int last_segment = segments_.rbegin()->first;
if (current_segment_ >= last_segment && isSegmentMerged(last_segment)) {
qInfo() << "reaches the end of route, restart from beginning";
rInfo("reaches the end of route, restart from beginning");
emit seekTo(0, false);
}
}

View File

@ -28,6 +28,8 @@ enum class FindFlag {
nextDisEngagement
};
enum class TimelineType { None, Engaged, AlertInfo, AlertWarning, AlertCritical };
class Replay : public QObject {
Q_OBJECT
@ -37,23 +39,31 @@ public:
~Replay();
bool load();
void start(int seconds = 0);
void stop();
void pause(bool pause);
bool isPaused() const { return paused_; }
void seekToFlag(FindFlag flag);
inline bool isPaused() const { return paused_; }
inline bool hasFlag(REPLAY_FLAGS flag) const { return flags_ & flag; }
inline void addFlag(REPLAY_FLAGS flag) { flags_ |= flag; }
inline void removeFlag(REPLAY_FLAGS flag) { flags_ &= ~flag; }
inline const Route* route() const { return route_.get(); }
inline int currentSeconds() const { return (cur_mono_time_ - route_start_ts_) / 1e9; }
inline int toSeconds(uint64_t mono_time) const { return (mono_time - route_start_ts_) / 1e9; }
inline int totalSeconds() const { return segments_.size() * 60; }
inline const std::string &carFingerprint() const { return car_fingerprint_; }
inline const std::vector<std::tuple<int, int, TimelineType>> getTimeline() {
std::lock_guard lk(timeline_lock);
return timeline;
}
signals:
void segmentChanged();
void seekTo(int seconds, bool relative);
void seekToFlag(FindFlag flag);
void stop();
void streamStarted();
protected slots:
void queueSegment();
void doStop();
void doSeek(int seconds, bool relative);
void doSeekToFlag(FindFlag flag);
void segmentLoadFinished(bool sucess);
protected:
@ -66,7 +76,7 @@ protected:
void updateEvents(const std::function<bool()>& lambda);
void publishMessage(const Event *e);
void publishFrame(const Event *e);
inline int currentSeconds() const { return (cur_mono_time_ - route_start_ts_) / 1e9; }
void buildTimeline();
inline bool isSegmentMerged(int n) {
return std::find(segments_merged_.begin(), segments_merged_.end(), n) != segments_merged_.end();
}
@ -80,7 +90,7 @@ protected:
std::atomic<int> current_segment_ = 0;
SegmentMap segments_;
// the following variables must be protected with stream_lock_
bool exit_ = false;
std::atomic<bool> exit_ = false;
bool paused_ = false;
bool events_updated_ = false;
uint64_t route_start_ts_ = 0;
@ -96,4 +106,9 @@ protected:
std::unique_ptr<Route> route_;
std::unique_ptr<CameraServer> camera_server_;
std::atomic<uint32_t> flags_ = REPLAY_FLAG_NONE;
std::mutex timeline_lock;
QFuture<void> timeline_future;
std::vector<std::tuple<int, int, TimelineType>> timeline;
std::string car_fingerprint_;
};

View File

@ -26,7 +26,7 @@ RouteIdentifier Route::parseRoute(const QString &str) {
bool Route::load() {
if (route_.str.isEmpty()) {
qInfo() << "invalid route format";
rInfo("invalid route format");
return false;
}
return data_dir_.isEmpty() ? loadFromServer() : loadFromLocal();

View File

@ -15,6 +15,40 @@
#include "selfdrive/common/timing.h"
#include "selfdrive/common/util.h"
ReplayMessageHandler message_handler = nullptr;
DownloadProgressHandler download_progress_handler = nullptr;
void installMessageHandler(ReplayMessageHandler handler) { message_handler = handler; }
void installDownloadProgressHandler(DownloadProgressHandler handler) { download_progress_handler = handler; }
void logMessage(ReplyMsgType type, const char *fmt, ...) {
static std::mutex lock;
std::lock_guard lk(lock);
char *msg_buf = nullptr;
va_list args;
va_start(args, fmt);
int ret = vasprintf(&msg_buf, fmt, args);
va_end(args);
if (ret <= 0 || !msg_buf) return;
if (message_handler) {
message_handler(type, msg_buf);
} else {
if (type == ReplyMsgType::Debug) {
std::cout << "\033[38;5;248m" << msg_buf << "\033[00m" << std::endl;
} else if (type == ReplyMsgType::Warning) {
std::cout << "\033[38;5;227m" << msg_buf << "\033[00m" << std::endl;
} else if (type == ReplyMsgType::Critical) {
std::cout << "\033[38;5;196m" << msg_buf << "\033[00m" << std::endl;
} else {
std::cout << msg_buf << std::endl;
}
}
free(msg_buf);
}
namespace {
struct CURLGlobalInitializer {
@ -23,7 +57,6 @@ struct CURLGlobalInitializer {
};
static CURLGlobalInitializer curl_initializer;
static std::atomic<bool> enable_http_logging = false;
template <class T>
struct MultiPartWriter {
@ -57,6 +90,38 @@ size_t write_cb(char *data, size_t size, size_t count, void *userp) {
size_t dumy_write_cb(char *data, size_t size, size_t count, void *userp) { return size * count; }
struct DownloadStats {
void add(const std::string &url, uint64_t total_bytes) {
std::lock_guard lk(lock);
items[url] = {0, total_bytes};
}
void remove(const std::string &url) {
std::lock_guard lk(lock);
items.erase(url);
}
void update(const std::string &url, uint64_t downloaded, bool success = true) {
std::lock_guard lk(lock);
items[url].first = downloaded;
auto stat = std::accumulate(items.begin(), items.end(), std::pair<int, int>{}, [=](auto &a, auto &b){
return std::pair{a.first + b.second.first, a.second + b.second.second};
});
double tm = millis_since_boot();
if (download_progress_handler && ((tm - prev_tm) > 500 || !success || stat.first >= stat.second)) {
download_progress_handler(stat.first, stat.second, success);
prev_tm = tm;
}
}
std::mutex lock;
std::map<std::string, std::pair<uint64_t, uint64_t>> items;
double prev_tm = 0;
};
} // namespace
std::string formattedDataSize(size_t size) {
if (size < 1024) {
return std::to_string(size) + " B";
@ -67,8 +132,6 @@ std::string formattedDataSize(size_t size) {
}
}
} // namespace
size_t getRemoteFileSize(const std::string &url, std::atomic<bool> *abort) {
CURL *curl = curl_easy_init();
if (!curl) return -1;
@ -99,12 +162,11 @@ std::string getUrlWithoutQuery(const std::string &url) {
return (idx == std::string::npos ? url : url.substr(0, idx));
}
void enableHttpLogging(bool enable) {
enable_http_logging = enable;
}
template <class T>
bool httpDownload(const std::string &url, T &buf, size_t chunk_size, size_t content_length, std::atomic<bool> *abort) {
static DownloadStats download_stats;
download_stats.add(url, content_length);
int parts = 1;
if (chunk_size > 0 && content_length > 10 * 1024 * 1024) {
parts = std::nearbyint(content_length / (float)chunk_size);
@ -134,23 +196,11 @@ bool httpDownload(const std::string &url, T &buf, size_t chunk_size, size_t cont
curl_multi_add_handle(cm, eh);
}
size_t prev_written = 0;
double last_print = millis_since_boot();
int still_running = 1;
while (still_running > 0 && !(abort && *abort)) {
curl_multi_wait(cm, nullptr, 0, 1000, nullptr);
curl_multi_perform(cm, &still_running);
if (enable_http_logging) {
if (double ts = millis_since_boot(); (ts - last_print) > 2 * 1000) {
size_t average = (written - prev_written) / ((ts - last_print) / 1000.);
int progress = std::min<int>(100, 100.0 * (double)written / (double)content_length);
std::cout << "downloading " << getUrlWithoutQuery(url) << " - " << progress
<< "% (" << formattedDataSize(average) << "/s)" << std::endl;
last_print = ts;
prev_written = written;
}
}
download_stats.update(url, written);
}
CURLMsg *msg;
@ -164,21 +214,25 @@ bool httpDownload(const std::string &url, T &buf, size_t chunk_size, size_t cont
if (res_status == 206) {
complete++;
} else {
std::cout << "Download failed: http error code: " << res_status << std::endl;
rWarning("Download failed: http error code: %d", res_status);
}
} else {
std::cout << "Download failed: connection failure: " << msg->data.result << std::endl;
rWarning("Download failed: connection failure: %d", msg->data.result);
}
}
}
bool success = complete == parts;
download_stats.update(url, written, success);
download_stats.remove(url);
for (const auto &[e, w] : writers) {
curl_multi_remove_handle(cm, e);
curl_easy_cleanup(e);
}
curl_multi_cleanup(cm);
return complete == parts;
return success;
}
std::string httpGet(const std::string &url, size_t chunk_size, std::atomic<bool> *abort) {
@ -221,7 +275,7 @@ std::string decompressBZ2(const std::byte *in, size_t in_size, std::atomic<bool>
if (bzerror == BZ_OK && prev_write_pos == strm.next_out) {
// content is corrupt
bzerror = BZ_STREAM_END;
std::cout << "decompressBZ2 error : content is corrupt" << std::endl;
rWarning("decompressBZ2 error : content is corrupt");
break;
}

View File

@ -1,14 +1,34 @@
#pragma once
#include <atomic>
#include <functional>
#include <string>
enum class ReplyMsgType {
Info,
Debug,
Warning,
Critical
};
typedef std::function<void(ReplyMsgType type, const std::string msg)> ReplayMessageHandler;
void installMessageHandler(ReplayMessageHandler);
void logMessage(ReplyMsgType type, const char* fmt, ...);
#define rInfo(fmt, ...) ::logMessage(ReplyMsgType::Info, fmt, ## __VA_ARGS__)
#define rDebug(fmt, ...) ::logMessage(ReplyMsgType::Debug, fmt, ## __VA_ARGS__)
#define rWarning(fmt, ...) ::logMessage(ReplyMsgType::Warning, fmt, ## __VA_ARGS__)
#define rError(fmt, ...) ::logMessage(ReplyMsgType::Critical , fmt, ## __VA_ARGS__)
std::string sha256(const std::string &str);
void precise_nano_sleep(long sleep_ns);
std::string decompressBZ2(const std::string &in, std::atomic<bool> *abort = nullptr);
std::string decompressBZ2(const std::byte *in, size_t in_size, std::atomic<bool> *abort = nullptr);
void enableHttpLogging(bool enable);
std::string getUrlWithoutQuery(const std::string &url);
size_t getRemoteFileSize(const std::string &url, std::atomic<bool> *abort = nullptr);
std::string httpGet(const std::string &url, size_t chunk_size = 0, std::atomic<bool> *abort = nullptr);
typedef std::function<void(uint64_t cur, uint64_t total, bool success)> DownloadProgressHandler;
void installDownloadProgressHandler(DownloadProgressHandler);
bool httpDownload(const std::string &url, const std::string &file, size_t chunk_size = 0, std::atomic<bool> *abort = nullptr);
std::string formattedDataSize(size_t size);