Statlog in C (#23596)

* statlog in C

* fix swaglog and the test

* add missing files

* fix context initialization todo

* speed up test
pull/23641/head
Robbe Derks 2022-01-27 17:14:38 +01:00 committed by GitHub
parent ee71fd2fcc
commit 6366d1303f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 142 additions and 79 deletions

View File

@ -200,6 +200,8 @@ selfdrive/common/version.h
selfdrive/common/swaglog.h
selfdrive/common/swaglog.cc
selfdrive/common/statlog.h
selfdrive/common/statlog.cc
selfdrive/common/util.cc
selfdrive/common/util.h
selfdrive/common/queue.h

View File

@ -7,6 +7,7 @@ else:
common_libs = [
'params.cc',
'statlog.cc',
'swaglog.cc',
'util.cc',
'gpio.cc',

View File

@ -0,0 +1,43 @@
#ifndef _GNU_SOURCE
#define _GNU_SOURCE
#endif
#include "selfdrive/common/statlog.h"
#include "selfdrive/common/util.h"
#include <stdio.h>
#include <mutex>
#include <zmq.h>
class StatlogState : public LogState {
public:
StatlogState() : LogState("ipc:///tmp/stats") {}
};
static StatlogState s = {};
static void log(const char* metric_type, const char* metric, const char* fmt, ...) {
char* value_buf = nullptr;
va_list args;
va_start(args, fmt);
int ret = vasprintf(&value_buf, fmt, args);
va_end(args);
if (ret > 0 && value_buf) {
char* line_buf = nullptr;
ret = asprintf(&line_buf, "%s:%s|%s", metric, value_buf, metric_type);
if (ret > 0 && line_buf) {
zmq_send(s.sock, line_buf, ret, ZMQ_NOBLOCK);
free(line_buf);
}
free(value_buf);
}
}
void statlog_log(const char* metric_type, const char* metric, int value) {
log(metric_type, metric, "%d", value);
}
void statlog_log(const char* metric_type, const char* metric, float value) {
log(metric_type, metric, "%f", value);
}

View File

@ -0,0 +1,10 @@
#pragma once
#define STATLOG_GAUGE "g"
#define STATLOG_SAMPLE "sa"
void statlog_log(const char* metric_type, const char* metric, int value);
void statlog_log(const char* metric_type, const char* metric, float value);
#define statlog_gauge(metric, value) statlog_log(STATLOG_GAUGE, metric, value)
#define statlog_sample(metric, value) statlog_log(STATLOG_SAMPLE, metric, value)

View File

@ -16,71 +16,53 @@
#include "selfdrive/common/version.h"
#include "selfdrive/hardware/hw.h"
class LogState {
class SwaglogState : public LogState {
public:
LogState() = default;
~LogState();
std::mutex lock;
bool inited;
SwaglogState() : LogState("ipc:///tmp/logmessage") {}
bool initialized = false;
json11::Json::object ctx_j;
void *zctx;
void *sock;
int print_level;
inline void initialize() {
ctx_j = json11::Json::object {};
print_level = CLOUDLOG_WARNING;
const char* print_lvl = getenv("LOGPRINT");
if (print_lvl) {
if (strcmp(print_lvl, "debug") == 0) {
print_level = CLOUDLOG_DEBUG;
} else if (strcmp(print_lvl, "info") == 0) {
print_level = CLOUDLOG_INFO;
} else if (strcmp(print_lvl, "warning") == 0) {
print_level = CLOUDLOG_WARNING;
}
}
// openpilot bindings
char* dongle_id = getenv("DONGLE_ID");
if (dongle_id) {
ctx_j["dongle_id"] = dongle_id;
}
char* daemon_name = getenv("MANAGER_DAEMON");
if (daemon_name) {
ctx_j["daemon"] = daemon_name;
}
ctx_j["version"] = COMMA_VERSION;
ctx_j["dirty"] = !getenv("CLEAN");
// device type
if (Hardware::EON()) {
ctx_j["device"] = "eon";
} else if (Hardware::TICI()) {
ctx_j["device"] = "tici";
} else {
ctx_j["device"] = "pc";
}
initialized = true;
}
};
LogState::~LogState() {
zmq_close(sock);
zmq_ctx_destroy(zctx);
}
static LogState s = {};
static void cloudlog_init() {
if (s.inited) return;
s.ctx_j = json11::Json::object {};
s.zctx = zmq_ctx_new();
s.sock = zmq_socket(s.zctx, ZMQ_PUSH);
int timeout = 100; // 100 ms timeout on shutdown for messages to be received by logmessaged
zmq_setsockopt(s.sock, ZMQ_LINGER, &timeout, sizeof(timeout));
zmq_connect(s.sock, "ipc:///tmp/logmessage");
s.print_level = CLOUDLOG_WARNING;
const char* print_level = getenv("LOGPRINT");
if (print_level) {
if (strcmp(print_level, "debug") == 0) {
s.print_level = CLOUDLOG_DEBUG;
} else if (strcmp(print_level, "info") == 0) {
s.print_level = CLOUDLOG_INFO;
} else if (strcmp(print_level, "warning") == 0) {
s.print_level = CLOUDLOG_WARNING;
}
}
// openpilot bindings
char* dongle_id = getenv("DONGLE_ID");
if (dongle_id) {
s.ctx_j["dongle_id"] = dongle_id;
}
char* daemon_name = getenv("MANAGER_DAEMON");
if (daemon_name) {
s.ctx_j["daemon"] = daemon_name;
}
s.ctx_j["version"] = COMMA_VERSION;
s.ctx_j["dirty"] = !getenv("CLEAN");
// device type
if (Hardware::EON()) {
s.ctx_j["device"] = "eon";
} else if (Hardware::TICI()) {
s.ctx_j["device"] = "tici";
} else {
s.ctx_j["device"] = "pc";
}
s.inited = true;
}
static SwaglogState s = {};
static void log(int levelnum, const char* filename, int lineno, const char* func, const char* msg, const std::string& log_s) {
if (levelnum >= s.print_level) {
@ -101,7 +83,8 @@ void cloudlog_e(int levelnum, const char* filename, int lineno, const char* func
if (ret <= 0 || !msg_buf) return;
std::lock_guard lk(s.lock);
cloudlog_init();
if (!s.initialized) s.initialize();
json11::Json log_j = json11::Json::object {
{"msg", msg_buf},

View File

@ -8,6 +8,8 @@
#define CLOUDLOG_ERROR 40
#define CLOUDLOG_CRITICAL 50
void cloudlog_e(int levelnum, const char* filename, int lineno, const char* func,
const char* fmt, ...) /*__attribute__ ((format (printf, 6, 7)))*/;

View File

@ -22,25 +22,21 @@ void log_thread(int thread_id, int msg_cnt) {
}
}
void send_stop_msg(void *zctx) {
void *sock = zmq_socket(zctx, ZMQ_PUSH);
zmq_connect(sock, SWAGLOG_ADDR);
zmq_send(sock, "stop", 4, ZMQ_NOBLOCK);
zmq_close(sock);
}
void recv_log(void *zctx, int thread_cnt, int thread_msg_cnt) {
void recv_log(int thread_cnt, int thread_msg_cnt) {
void *zctx = zmq_ctx_new();
void *sock = zmq_socket(zctx, ZMQ_PULL);
zmq_bind(sock, SWAGLOG_ADDR);
std::vector<int> thread_msgs(thread_cnt);
int total_count = 0;
while (true) {
for (auto start = std::chrono::steady_clock::now(), now = start;
now < start + std::chrono::seconds{1} && total_count < (thread_cnt * thread_msg_cnt);
now = std::chrono::steady_clock::now()) {
char buf[4096] = {};
if (zmq_recv(sock, buf, sizeof(buf), 0) <= 0) {
if (zmq_recv(sock, buf, sizeof(buf), ZMQ_DONTWAIT) <= 0) {
if (errno == EAGAIN || errno == EINTR || errno == EFSM) continue;
break;
}
if (strcmp(buf, "stop") == 0) break;
REQUIRE(buf[0] == CLOUDLOG_DEBUG);
std::string err;
@ -53,10 +49,13 @@ void recv_log(void *zctx, int thread_cnt, int thread_msg_cnt) {
REQUIRE(msg["lineno"].int_value() == LINE_NO);
auto ctx = msg["ctx"];
REQUIRE(ctx["daemon"].string_value() == daemon_name);
REQUIRE(ctx["dongle_id"].string_value() == dongle_id);
REQUIRE(ctx["version"].string_value() == COMMA_VERSION);
REQUIRE(ctx["dirty"].bool_value() == true);
REQUIRE(ctx["version"].string_value() == COMMA_VERSION);
std::string device = "pc";
if (Hardware::EON()) {
device = "eon";
@ -68,12 +67,14 @@ void recv_log(void *zctx, int thread_cnt, int thread_msg_cnt) {
int thread_id = atoi(msg["msg"].string_value().c_str());
REQUIRE((thread_id >= 0 && thread_id < thread_cnt));
thread_msgs[thread_id]++;
total_count++;
}
for (int i = 0; i < thread_cnt; ++i) {
INFO("thread :" << i);
REQUIRE(thread_msgs[i] == thread_msg_cnt);
}
zmq_close(sock);
zmq_ctx_destroy(zctx);
}
TEST_CASE("swaglog") {
@ -83,15 +84,11 @@ TEST_CASE("swaglog") {
const int thread_cnt = 5;
const int thread_msg_cnt = 100;
void *zctx = zmq_ctx_new();
send_stop_msg(zctx);
std::vector<std::thread> log_threads;
for (int i = 0; i < thread_cnt; ++i) {
log_threads.push_back(std::thread(log_thread, i, thread_msg_cnt));
}
for (auto &t : log_threads) t.join();
recv_log(zctx, thread_cnt, thread_msg_cnt);
zmq_ctx_destroy(zctx);
recv_log(thread_cnt, thread_msg_cnt);
}

View File

@ -3,6 +3,7 @@
#include <fcntl.h>
#include <sys/stat.h>
#include <unistd.h>
#include <zmq.h>
#include <algorithm>
#include <atomic>
@ -11,6 +12,7 @@
#include <ctime>
#include <map>
#include <memory>
#include <mutex>
#include <string>
#include <thread>
#include <vector>
@ -163,3 +165,26 @@ void update_max_atomic(std::atomic<T>& max, T const& value) {
T prev = max;
while(prev < value && !max.compare_exchange_weak(prev, value)) {}
}
class LogState {
public:
std::mutex lock;
void *zctx;
void *sock;
int print_level;
LogState(const char* endpoint) {
zctx = zmq_ctx_new();
sock = zmq_socket(zctx, ZMQ_PUSH);
// Timeout on shutdown for messages to be received by the logging process
int timeout = 100;
zmq_setsockopt(sock, ZMQ_LINGER, &timeout, sizeof(timeout));
zmq_connect(sock, endpoint);
};
~LogState() {
zmq_close(sock);
zmq_ctx_destroy(zctx);
}
};