From 6366d1303f8f13278422c73078d624e58df3e1a8 Mon Sep 17 00:00:00 2001 From: Robbe Derks Date: Thu, 27 Jan 2022 17:14:38 +0100 Subject: [PATCH] Statlog in C (#23596) * statlog in C * fix swaglog and the test * add missing files * fix context initialization todo * speed up test --- release/files_common | 2 + selfdrive/common/SConscript | 1 + selfdrive/common/statlog.cc | 43 ++++++++++ selfdrive/common/statlog.h | 10 +++ selfdrive/common/swaglog.cc | 107 +++++++++++-------------- selfdrive/common/swaglog.h | 2 + selfdrive/common/tests/test_swaglog.cc | 31 ++++--- selfdrive/common/util.h | 25 ++++++ 8 files changed, 142 insertions(+), 79 deletions(-) create mode 100644 selfdrive/common/statlog.cc create mode 100644 selfdrive/common/statlog.h diff --git a/release/files_common b/release/files_common index 2f38d2387..04dddbfe7 100644 --- a/release/files_common +++ b/release/files_common @@ -200,6 +200,8 @@ selfdrive/common/version.h selfdrive/common/swaglog.h selfdrive/common/swaglog.cc +selfdrive/common/statlog.h +selfdrive/common/statlog.cc selfdrive/common/util.cc selfdrive/common/util.h selfdrive/common/queue.h diff --git a/selfdrive/common/SConscript b/selfdrive/common/SConscript index 21f609865..8dfeecbdd 100644 --- a/selfdrive/common/SConscript +++ b/selfdrive/common/SConscript @@ -7,6 +7,7 @@ else: common_libs = [ 'params.cc', + 'statlog.cc', 'swaglog.cc', 'util.cc', 'gpio.cc', diff --git a/selfdrive/common/statlog.cc b/selfdrive/common/statlog.cc new file mode 100644 index 000000000..27dfc2ca9 --- /dev/null +++ b/selfdrive/common/statlog.cc @@ -0,0 +1,43 @@ +#ifndef _GNU_SOURCE +#define _GNU_SOURCE +#endif + +#include "selfdrive/common/statlog.h" +#include "selfdrive/common/util.h" + +#include +#include +#include + +class StatlogState : public LogState { + public: + StatlogState() : LogState("ipc:///tmp/stats") {} +}; + +static StatlogState s = {}; + +static void log(const char* metric_type, const char* metric, const char* fmt, ...) { + char* value_buf = nullptr; + va_list args; + va_start(args, fmt); + int ret = vasprintf(&value_buf, fmt, args); + va_end(args); + + if (ret > 0 && value_buf) { + char* line_buf = nullptr; + ret = asprintf(&line_buf, "%s:%s|%s", metric, value_buf, metric_type); + if (ret > 0 && line_buf) { + zmq_send(s.sock, line_buf, ret, ZMQ_NOBLOCK); + free(line_buf); + } + free(value_buf); + } +} + +void statlog_log(const char* metric_type, const char* metric, int value) { + log(metric_type, metric, "%d", value); +} + +void statlog_log(const char* metric_type, const char* metric, float value) { + log(metric_type, metric, "%f", value); +} diff --git a/selfdrive/common/statlog.h b/selfdrive/common/statlog.h new file mode 100644 index 000000000..5d223bb66 --- /dev/null +++ b/selfdrive/common/statlog.h @@ -0,0 +1,10 @@ +#pragma once + +#define STATLOG_GAUGE "g" +#define STATLOG_SAMPLE "sa" + +void statlog_log(const char* metric_type, const char* metric, int value); +void statlog_log(const char* metric_type, const char* metric, float value); + +#define statlog_gauge(metric, value) statlog_log(STATLOG_GAUGE, metric, value) +#define statlog_sample(metric, value) statlog_log(STATLOG_SAMPLE, metric, value) diff --git a/selfdrive/common/swaglog.cc b/selfdrive/common/swaglog.cc index aae4fb6e7..1fe700415 100644 --- a/selfdrive/common/swaglog.cc +++ b/selfdrive/common/swaglog.cc @@ -16,71 +16,53 @@ #include "selfdrive/common/version.h" #include "selfdrive/hardware/hw.h" -class LogState { +class SwaglogState : public LogState { public: - LogState() = default; - ~LogState(); - std::mutex lock; - bool inited; + SwaglogState() : LogState("ipc:///tmp/logmessage") {} + + bool initialized = false; json11::Json::object ctx_j; - void *zctx; - void *sock; - int print_level; + + inline void initialize() { + ctx_j = json11::Json::object {}; + print_level = CLOUDLOG_WARNING; + const char* print_lvl = getenv("LOGPRINT"); + if (print_lvl) { + if (strcmp(print_lvl, "debug") == 0) { + print_level = CLOUDLOG_DEBUG; + } else if (strcmp(print_lvl, "info") == 0) { + print_level = CLOUDLOG_INFO; + } else if (strcmp(print_lvl, "warning") == 0) { + print_level = CLOUDLOG_WARNING; + } + } + + // openpilot bindings + char* dongle_id = getenv("DONGLE_ID"); + if (dongle_id) { + ctx_j["dongle_id"] = dongle_id; + } + char* daemon_name = getenv("MANAGER_DAEMON"); + if (daemon_name) { + ctx_j["daemon"] = daemon_name; + } + ctx_j["version"] = COMMA_VERSION; + ctx_j["dirty"] = !getenv("CLEAN"); + + // device type + if (Hardware::EON()) { + ctx_j["device"] = "eon"; + } else if (Hardware::TICI()) { + ctx_j["device"] = "tici"; + } else { + ctx_j["device"] = "pc"; + } + + initialized = true; + } }; -LogState::~LogState() { - zmq_close(sock); - zmq_ctx_destroy(zctx); -} - -static LogState s = {}; - -static void cloudlog_init() { - if (s.inited) return; - s.ctx_j = json11::Json::object {}; - s.zctx = zmq_ctx_new(); - s.sock = zmq_socket(s.zctx, ZMQ_PUSH); - - int timeout = 100; // 100 ms timeout on shutdown for messages to be received by logmessaged - zmq_setsockopt(s.sock, ZMQ_LINGER, &timeout, sizeof(timeout)); - - zmq_connect(s.sock, "ipc:///tmp/logmessage"); - - s.print_level = CLOUDLOG_WARNING; - const char* print_level = getenv("LOGPRINT"); - if (print_level) { - if (strcmp(print_level, "debug") == 0) { - s.print_level = CLOUDLOG_DEBUG; - } else if (strcmp(print_level, "info") == 0) { - s.print_level = CLOUDLOG_INFO; - } else if (strcmp(print_level, "warning") == 0) { - s.print_level = CLOUDLOG_WARNING; - } - } - - // openpilot bindings - char* dongle_id = getenv("DONGLE_ID"); - if (dongle_id) { - s.ctx_j["dongle_id"] = dongle_id; - } - char* daemon_name = getenv("MANAGER_DAEMON"); - if (daemon_name) { - s.ctx_j["daemon"] = daemon_name; - } - s.ctx_j["version"] = COMMA_VERSION; - s.ctx_j["dirty"] = !getenv("CLEAN"); - - // device type - if (Hardware::EON()) { - s.ctx_j["device"] = "eon"; - } else if (Hardware::TICI()) { - s.ctx_j["device"] = "tici"; - } else { - s.ctx_j["device"] = "pc"; - } - - s.inited = true; -} +static SwaglogState s = {}; static void log(int levelnum, const char* filename, int lineno, const char* func, const char* msg, const std::string& log_s) { if (levelnum >= s.print_level) { @@ -101,7 +83,8 @@ void cloudlog_e(int levelnum, const char* filename, int lineno, const char* func if (ret <= 0 || !msg_buf) return; std::lock_guard lk(s.lock); - cloudlog_init(); + + if (!s.initialized) s.initialize(); json11::Json log_j = json11::Json::object { {"msg", msg_buf}, diff --git a/selfdrive/common/swaglog.h b/selfdrive/common/swaglog.h index a508bcac1..6403820ac 100644 --- a/selfdrive/common/swaglog.h +++ b/selfdrive/common/swaglog.h @@ -8,6 +8,8 @@ #define CLOUDLOG_ERROR 40 #define CLOUDLOG_CRITICAL 50 + + void cloudlog_e(int levelnum, const char* filename, int lineno, const char* func, const char* fmt, ...) /*__attribute__ ((format (printf, 6, 7)))*/; diff --git a/selfdrive/common/tests/test_swaglog.cc b/selfdrive/common/tests/test_swaglog.cc index 08a5ed6ea..1d00def63 100644 --- a/selfdrive/common/tests/test_swaglog.cc +++ b/selfdrive/common/tests/test_swaglog.cc @@ -22,25 +22,21 @@ void log_thread(int thread_id, int msg_cnt) { } } -void send_stop_msg(void *zctx) { - void *sock = zmq_socket(zctx, ZMQ_PUSH); - zmq_connect(sock, SWAGLOG_ADDR); - zmq_send(sock, "stop", 4, ZMQ_NOBLOCK); - zmq_close(sock); -} - -void recv_log(void *zctx, int thread_cnt, int thread_msg_cnt) { +void recv_log(int thread_cnt, int thread_msg_cnt) { + void *zctx = zmq_ctx_new(); void *sock = zmq_socket(zctx, ZMQ_PULL); zmq_bind(sock, SWAGLOG_ADDR); std::vector thread_msgs(thread_cnt); + int total_count = 0; - while (true) { + for (auto start = std::chrono::steady_clock::now(), now = start; + now < start + std::chrono::seconds{1} && total_count < (thread_cnt * thread_msg_cnt); + now = std::chrono::steady_clock::now()) { char buf[4096] = {}; - if (zmq_recv(sock, buf, sizeof(buf), 0) <= 0) { + if (zmq_recv(sock, buf, sizeof(buf), ZMQ_DONTWAIT) <= 0) { if (errno == EAGAIN || errno == EINTR || errno == EFSM) continue; break; } - if (strcmp(buf, "stop") == 0) break; REQUIRE(buf[0] == CLOUDLOG_DEBUG); std::string err; @@ -53,10 +49,13 @@ void recv_log(void *zctx, int thread_cnt, int thread_msg_cnt) { REQUIRE(msg["lineno"].int_value() == LINE_NO); auto ctx = msg["ctx"]; + REQUIRE(ctx["daemon"].string_value() == daemon_name); REQUIRE(ctx["dongle_id"].string_value() == dongle_id); - REQUIRE(ctx["version"].string_value() == COMMA_VERSION); REQUIRE(ctx["dirty"].bool_value() == true); + + REQUIRE(ctx["version"].string_value() == COMMA_VERSION); + std::string device = "pc"; if (Hardware::EON()) { device = "eon"; @@ -68,12 +67,14 @@ void recv_log(void *zctx, int thread_cnt, int thread_msg_cnt) { int thread_id = atoi(msg["msg"].string_value().c_str()); REQUIRE((thread_id >= 0 && thread_id < thread_cnt)); thread_msgs[thread_id]++; + total_count++; } for (int i = 0; i < thread_cnt; ++i) { INFO("thread :" << i); REQUIRE(thread_msgs[i] == thread_msg_cnt); } zmq_close(sock); + zmq_ctx_destroy(zctx); } TEST_CASE("swaglog") { @@ -83,15 +84,11 @@ TEST_CASE("swaglog") { const int thread_cnt = 5; const int thread_msg_cnt = 100; - void *zctx = zmq_ctx_new(); - send_stop_msg(zctx); - std::vector log_threads; for (int i = 0; i < thread_cnt; ++i) { log_threads.push_back(std::thread(log_thread, i, thread_msg_cnt)); } for (auto &t : log_threads) t.join(); - recv_log(zctx, thread_cnt, thread_msg_cnt); - zmq_ctx_destroy(zctx); + recv_log(thread_cnt, thread_msg_cnt); } diff --git a/selfdrive/common/util.h b/selfdrive/common/util.h index 89eacf69a..bf0df3bca 100644 --- a/selfdrive/common/util.h +++ b/selfdrive/common/util.h @@ -3,6 +3,7 @@ #include #include #include +#include #include #include @@ -11,6 +12,7 @@ #include #include #include +#include #include #include #include @@ -163,3 +165,26 @@ void update_max_atomic(std::atomic& max, T const& value) { T prev = max; while(prev < value && !max.compare_exchange_weak(prev, value)) {} } + +class LogState { + public: + std::mutex lock; + void *zctx; + void *sock; + int print_level; + + LogState(const char* endpoint) { + zctx = zmq_ctx_new(); + sock = zmq_socket(zctx, ZMQ_PUSH); + + // Timeout on shutdown for messages to be received by the logging process + int timeout = 100; + zmq_setsockopt(sock, ZMQ_LINGER, &timeout, sizeof(timeout)); + + zmq_connect(sock, endpoint); + }; + ~LogState() { + zmq_close(sock); + zmq_ctx_destroy(zctx); + } +};