c++ replay: chunking and concurrent downloads (#22308)

* download segment files by chunks in multiple threads

* remove easy_handl on aborting

* add test cases

* better error handling

* update test

* cleanup

* add CURLGlobalInitializer

* check http code

* finish
pull/22362/head
Dean Lee 2021-09-28 18:24:48 +08:00 committed by GitHub
parent 8096da1dcd
commit 4e6ff308a8
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 168 additions and 41 deletions

View File

@ -111,8 +111,8 @@ if arch in ['x86_64', 'Darwin'] and os.path.exists(Dir("#tools/").get_abspath())
replay_lib_src = ["replay/replay.cc", "replay/logreader.cc", "replay/framereader.cc", "replay/route.cc"]
replay_lib = qt_env.Library("qt_replay", replay_lib_src, LIBS=base_libs)
replay_libs = [replay_lib, 'avutil', 'avcodec', 'avformat', 'swscale', 'bz2'] + qt_libs
replay_libs = [replay_lib, 'avutil', 'avcodec', 'avformat', 'swscale', 'bz2', 'curl'] + qt_libs
qt_env.Program("replay/replay", ["replay/main.cc"], LIBS=replay_libs)
if GetOption('test'):
qt_env.Program('replay/tests/test_replay', ['replay/tests/test_replay.cc'], LIBS=[replay_libs])
qt_env.Program('replay/tests/test_replay', ['replay/tests/test_runner.cc', 'replay/tests/test_replay.cc'], LIBS=[replay_libs])

View File

@ -5,6 +5,7 @@
#include <QApplication>
#include <QCommandLineParser>
#include <QDebug>
#include <QThread>
const QString DEMO_ROUTE = "3533c53bb29502d1|2019-12-10--01-13-27";

View File

@ -1,7 +1,7 @@
#include "selfdrive/ui/replay/replay.h"
#include <QApplication>
#include <QDebug>
#include "cereal/services.h"
#include "selfdrive/camerad/cameras/camera_common.h"
#include "selfdrive/common/timing.h"

View File

@ -1,16 +1,121 @@
#include "selfdrive/ui/replay/route.h"
#include <curl/curl.h>
#include <QDir>
#include <QEventLoop>
#include <QFile>
#include <QJsonArray>
#include <QJsonDocument>
#include <QRegExp>
#include <QThread>
#include <future>
#include "selfdrive/hardware/hw.h"
#include "selfdrive/ui/qt/api.h"
struct CURLGlobalInitializer {
CURLGlobalInitializer() { curl_global_init(CURL_GLOBAL_DEFAULT); }
~CURLGlobalInitializer() { curl_global_cleanup(); }
};
struct MultiPartWriter {
int64_t offset;
int64_t end;
FILE *fp;
};
static size_t write_cb(char *data, size_t n, size_t l, void *userp) {
MultiPartWriter *w = (MultiPartWriter *)userp;
fseek(w->fp, w->offset, SEEK_SET);
fwrite(data, l, n, w->fp);
w->offset += n * l;
return n * l;
}
static size_t dumy_write_cb(char *data, size_t n, size_t l, void *userp) { return n * l; }
int64_t getDownloadContentLength(const std::string &url) {
CURL *curl = curl_easy_init();
curl_easy_setopt(curl, CURLOPT_URL, url.c_str());
curl_easy_setopt(curl, CURLOPT_WRITEFUNCTION, dumy_write_cb);
curl_easy_setopt(curl, CURLOPT_HEADER, 1);
curl_easy_setopt(curl, CURLOPT_NOBODY, 1);
CURLcode res = curl_easy_perform(curl);
double content_length = -1;
if (res == CURLE_OK) {
res = curl_easy_getinfo(curl, CURLINFO_CONTENT_LENGTH_DOWNLOAD, &content_length);
}
curl_easy_cleanup(curl);
return res == CURLE_OK ? (int64_t)content_length : -1;
}
bool httpMultiPartDownload(const std::string &url, const std::string &target_file, int parts, std::atomic<bool> *abort) {
int64_t content_length = getDownloadContentLength(url);
if (content_length == -1) return false;
std::string tmp_file = target_file + ".tmp";
FILE *fp = fopen(tmp_file.c_str(), "wb");
// create a sparse file
fseek(fp, content_length, SEEK_SET);
CURLM *cm = curl_multi_init();
std::map<CURL *, MultiPartWriter> writers;
const int part_size = content_length / parts;
for (int i = 0; i < parts; ++i) {
CURL *eh = curl_easy_init();
writers[eh] = {
.fp = fp,
.offset = i * part_size,
.end = i == parts - 1 ? content_length - 1 : (i + 1) * part_size - 1,
};
curl_easy_setopt(eh, CURLOPT_WRITEFUNCTION, write_cb);
curl_easy_setopt(eh, CURLOPT_WRITEDATA, (void *)(&writers[eh]));
curl_easy_setopt(eh, CURLOPT_URL, url.c_str());
curl_easy_setopt(eh, CURLOPT_RANGE, util::string_format("%d-%d", writers[eh].offset, writers[eh].end).c_str());
curl_easy_setopt(eh, CURLOPT_HTTPGET, 1);
curl_easy_setopt(eh, CURLOPT_NOSIGNAL, 1);
curl_easy_setopt(eh, CURLOPT_FOLLOWLOCATION, 1);
curl_multi_add_handle(cm, eh);
}
int running = 1, success_cnt = 0;
while (!(abort && abort->load())){
CURLMcode ret = curl_multi_perform(cm, &running);
if (!running) {
CURLMsg *msg;
int msgs_left = -1;
while ((msg = curl_multi_info_read(cm, &msgs_left))) {
if (msg->msg == CURLMSG_DONE && msg->data.result == CURLE_OK) {
int http_status_code = 0;
curl_easy_getinfo(msg->easy_handle, CURLINFO_RESPONSE_CODE, &http_status_code);
success_cnt += (http_status_code == 206);
}
}
break;
}
if (ret == CURLM_OK) {
curl_multi_wait(cm, nullptr, 0, 1000, nullptr);
}
};
fclose(fp);
bool success = success_cnt == parts;
if (success) {
success = ::rename(tmp_file.c_str(), target_file.c_str()) == 0;
}
// cleanup curl
for (auto &[e, w] : writers) {
curl_multi_remove_handle(cm, e);
curl_easy_cleanup(e);
}
curl_multi_cleanup(cm);
return success;
}
Route::Route(const QString &route) : route_(route) {}
bool Route::load() {
@ -71,6 +176,7 @@ bool Route::loadFromJson(const QString &json) {
// class Segment
Segment::Segment(int n, const SegmentFile &segment_files, bool load_dcam, bool load_ecam) : seg_num_(n), files_(segment_files) {
static CURLGlobalInitializer curl_initializer;
static std::once_flag once_flag;
std::call_once(once_flag, [=]() {
if (!QDir(CACHE_DIR).exists()) QDir().mkdir(CACHE_DIR);
@ -91,7 +197,6 @@ Segment::Segment(int n, const SegmentFile &segment_files, bool load_dcam, bool l
if (!QUrl(files_.rlog).isLocalFile()) {
for (auto &url : {files_.rlog, road_cam_path_, files_.driver_cam, files_.wide_road_cam}) {
if (!url.isEmpty() && !QFile::exists(localPath(url))) {
qDebug() << "download" << url;
downloadFile(url);
++downloading_;
}
@ -103,30 +208,20 @@ Segment::Segment(int n, const SegmentFile &segment_files, bool load_dcam, bool l
}
Segment::~Segment() {
// cancel download, qnam will not abort requests, need to abort them manually
aborting_ = true;
for (QNetworkReply *replay : replies_) {
if (replay->isRunning()) {
replay->abort();
}
replay->deleteLater();
for (auto &t : download_threads_) {
if (t->isRunning()) t->wait();
}
}
void Segment::downloadFile(const QString &url) {
QNetworkReply *reply = qnam_.get(QNetworkRequest(url));
replies_.insert(reply);
connect(reply, &QNetworkReply::finished, [=]() {
if (reply->error() == QNetworkReply::NoError) {
QFile file(localPath(url));
if (file.open(QIODevice::WriteOnly)) {
file.write(reply->readAll());
}
}
qDebug() << "download" << url;
download_threads_.emplace_back(QThread::create([=]() {
httpMultiPartDownload(url.toStdString(), localPath(url).toStdString(), connections_per_file, &aborting_);
if (--downloading_ == 0 && !aborting_) {
load();
}
});
}))->start();
}
// load concurrency

View File

@ -1,6 +1,6 @@
#pragma once
#include <QNetworkAccessManager>
#include <QObject>
#include <QString>
#include <vector>
@ -9,6 +9,7 @@
#include "selfdrive/ui/replay/logreader.h"
const QString CACHE_DIR = util::getenv("COMMA_CACHE", "/tmp/comma_download_cache/").c_str();
const int connections_per_file = 3;
struct SegmentFile {
QString rlog;
@ -55,11 +56,12 @@ protected:
QString localPath(const QUrl &url);
bool loaded_ = false, valid_ = false;
bool aborting_ = false;
std::atomic<bool> aborting_ = false;
int downloading_ = 0;
int seg_num_ = 0;
SegmentFile files_;
QString road_cam_path_;
QSet<QNetworkReply *> replies_;
QNetworkAccessManager qnam_;
std::vector<QThread*> download_threads_;
};
bool httpMultiPartDownload(const std::string &url, const std::string &target_file, int parts, std::atomic<bool> *abort = nullptr);

View File

@ -1,24 +1,51 @@
#define CATCH_CONFIG_MAIN
#include <QCryptographicHash>
#include <QString>
#include <future>
#include "catch2/catch.hpp"
#include "selfdrive/common/util.h"
#include "selfdrive/ui/replay/framereader.h"
#include "selfdrive/ui/replay/route.h"
const char *stream_url = "https://commadataci.blob.core.windows.net/openpilotci/0c94aa1e1296d7c6/2021-05-05--19-48-37/0/fcamera.hevc";
TEST_CASE("FrameReader") {
SECTION("process&get") {
FrameReader fr;
REQUIRE(fr.load(stream_url) == true);
REQUIRE(fr.valid() == true);
REQUIRE(fr.getFrameCount() == 1200);
// random get 50 frames
// srand(time(NULL));
// for (int i = 0; i < 50; ++i) {
// int idx = rand() % (fr.getFrameCount() - 1);
// REQUIRE(fr.get(idx) != nullptr);
// }
// sequence get 50 frames {
for (int i = 0; i < 50; ++i) {
REQUIRE(fr.get(i) != nullptr);
}
// TEST_CASE("FrameReader") {
// SECTION("process&get") {
// FrameReader fr;
// REQUIRE(fr.load(stream_url) == true);
// REQUIRE(fr.valid() == true);
// REQUIRE(fr.getFrameCount() == 1200);
// // random get 50 frames
// // srand(time(NULL));
// // for (int i = 0; i < 50; ++i) {
// // int idx = rand() % (fr.getFrameCount() - 1);
// // REQUIRE(fr.get(idx) != nullptr);
// // }
// // sequence get 50 frames {
// for (int i = 0; i < 50; ++i) {
// REQUIRE(fr.get(i) != nullptr);
// }
// }
// }
std::string sha_256(const QString &dat) {
return QString(QCryptographicHash::hash(dat.toUtf8(), QCryptographicHash::Sha256).toHex()).toStdString();
}
TEST_CASE("httpMultiPartDownload") {
char filename[] = "/tmp/XXXXXX";
int fd = mkstemp(filename);
REQUIRE(fd != -1);
close(fd);
SECTION("http 200") {
REQUIRE(httpMultiPartDownload(stream_url, filename, 5));
std::string content = util::read_file(filename);
REQUIRE(content.size() == 37495242);
std::string checksum = sha_256(QString::fromStdString(content));
REQUIRE(checksum == "d8ff81560ce7ed6f16d5fb5a6d6dd13aba06c8080c62cfe768327914318744c4");
}
SECTION("http 404") {
REQUIRE(httpMultiPartDownload(util::string_format("%s_abc", stream_url), filename, 5) == false);
}
}

View File

@ -0,0 +1,2 @@
#define CATCH_CONFIG_MAIN
#include "catch2/catch.hpp"