loggerd cleanup (#19668)

albatross
Adeeb Shihadeh 2021-01-07 23:08:40 -08:00 committed by GitHub
parent b8aa250efb
commit 206d072bb4
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 125 additions and 162 deletions

View File

@ -307,7 +307,8 @@ selfdrive/proclogd/SConscript
selfdrive/proclogd/proclogd.cc
selfdrive/loggerd/SConscript
selfdrive/loggerd/encoder.[c,h]
selfdrive/loggerd/encoder.h
selfdrive/loggerd/encoder.cc
selfdrive/loggerd/frame_logger.h
selfdrive/loggerd/logger.cc
selfdrive/loggerd/logger.h

View File

@ -2,11 +2,11 @@ Import('env', 'arch', 'cereal', 'messaging', 'common', 'visionipc')
src = ['loggerd.cc', 'logger.cc']
libs = ['zmq', 'capnp', 'kj', 'z',
'avformat', 'avcodec', 'swscale', 'avutil',
'yuv', 'bz2', common, cereal, messaging, visionipc]
'avformat', 'avcodec', 'swscale', 'avutil',
'yuv', 'bz2', common, cereal, messaging, visionipc]
if arch in ["aarch64", "larch64"]:
src += ['encoder.c']
src += ['encoder.cc']
libs += ['OmxVenc', 'OmxCore']
if arch == "aarch64":
libs += ['cutils']

View File

@ -17,8 +17,6 @@
#include <libyuv.h>
//#include <android/log.h>
#include <msm_media_info.h>
#include "common/mutex.h"
@ -27,9 +25,12 @@
#include "encoder.h"
//#define ALOG(...) __android_log_print(ANDROID_LOG_VERBOSE, "omxapp", ##__VA_ARGS__)
// encoder: lossey codec using hardware hevc
// ***** OMX callback functions *****
static void wait_for_state(EncoderState *s, OMX_STATETYPE state) {
pthread_mutex_lock(&s->state_lock);
while (s->state != state) {
@ -40,14 +41,14 @@ static void wait_for_state(EncoderState *s, OMX_STATETYPE state) {
static OMX_ERRORTYPE event_handler(OMX_HANDLETYPE component, OMX_PTR app_data, OMX_EVENTTYPE event,
OMX_U32 data1, OMX_U32 data2, OMX_PTR event_data) {
EncoderState *s = app_data;
EncoderState *s = (EncoderState*)app_data;
switch (event) {
case OMX_EventCmdComplete:
assert(data1 == OMX_CommandStateSet);
LOG("set state event 0x%x", data2);
pthread_mutex_lock(&s->state_lock);
s->state = data2;
s->state = (OMX_STATETYPE)data2;
pthread_cond_broadcast(&s->state_cv);
pthread_mutex_unlock(&s->state_lock);
break;
@ -66,24 +67,17 @@ static OMX_ERRORTYPE event_handler(OMX_HANDLETYPE component, OMX_PTR app_data, O
static OMX_ERRORTYPE empty_buffer_done(OMX_HANDLETYPE component, OMX_PTR app_data,
OMX_BUFFERHEADERTYPE *buffer) {
EncoderState *s = app_data;
// printf("empty_buffer_done\n");
EncoderState *s = (EncoderState*)app_data;
queue_push(&s->free_in, (void*)buffer);
return OMX_ErrorNone;
}
static OMX_ERRORTYPE fill_buffer_done(OMX_HANDLETYPE component, OMX_PTR app_data,
OMX_BUFFERHEADERTYPE *buffer) {
EncoderState *s = app_data;
// printf("fill_buffer_done\n");
EncoderState *s = (EncoderState*)app_data;
queue_push(&s->done_out, (void*)buffer);
return OMX_ErrorNone;
}
@ -166,6 +160,9 @@ static const char* omx_color_fomat_name(uint32_t format) {
}
}
// ***** encoder functions *****
void encoder_init(EncoderState *s, const char* filename, int width, int height, int fps, int bitrate, bool h265, bool downscale) {
int err;
@ -182,9 +179,9 @@ void encoder_init(EncoderState *s, const char* filename, int width, int height,
if (downscale) {
s->downscale = true;
s->y_ptr2 = malloc(s->width*s->height);
s->u_ptr2 = malloc(s->width*s->height/4);
s->v_ptr2 = malloc(s->width*s->height/4);
s->y_ptr2 = (uint8_t *)malloc(s->width*s->height);
s->u_ptr2 = (uint8_t *)malloc(s->width*s->height/4);
s->v_ptr2 = (uint8_t *)malloc(s->width*s->height/4);
}
s->segment = -1;
@ -242,9 +239,6 @@ void encoder_init(EncoderState *s, const char* filename, int width, int height,
assert(err == OMX_ErrorNone);
s->num_in_bufs = in_port.nBufferCountActual;
// printf("in width: %d, stride: %d\n",
// in_port.format.video.nFrameWidth, in_port.format.video.nStride);
// setup output port
OMX_PARAM_PORTDEFINITIONTYPE out_port = {0};
@ -355,14 +349,14 @@ void encoder_init(EncoderState *s, const char* filename, int width, int height,
err = OMX_SendCommand(s->handle, OMX_CommandStateSet, OMX_StateIdle, NULL);
assert(err == OMX_ErrorNone);
s->in_buf_headers = calloc(s->num_in_bufs, sizeof(OMX_BUFFERHEADERTYPE*));
s->in_buf_headers = (OMX_BUFFERHEADERTYPE **)calloc(s->num_in_bufs, sizeof(OMX_BUFFERHEADERTYPE*));
for (int i=0; i<s->num_in_bufs; i++) {
err = OMX_AllocateBuffer(s->handle, &s->in_buf_headers[i], PORT_INDEX_IN, s,
in_port.nBufferSize);
assert(err == OMX_ErrorNone);
}
s->out_buf_headers = calloc(s->num_out_bufs, sizeof(OMX_BUFFERHEADERTYPE*));
s->out_buf_headers = (OMX_BUFFERHEADERTYPE **)calloc(s->num_out_bufs, sizeof(OMX_BUFFERHEADERTYPE*));
for (int i=0; i<s->num_out_bufs; i++) {
err = OMX_AllocateBuffer(s->handle, &s->out_buf_headers[i], PORT_INDEX_OUT, s,
out_port.nBufferSize);
@ -395,7 +389,7 @@ static void handle_out_buf(EncoderState *s, OMX_BUFFERHEADERTYPE *out_buf) {
if (out_buf->nFlags & OMX_BUFFERFLAG_CODECCONFIG) {
if (s->codec_config_len < out_buf->nFilledLen) {
s->codec_config = realloc(s->codec_config, out_buf->nFilledLen);
s->codec_config = (uint8_t *)realloc(s->codec_config, out_buf->nFilledLen);
}
s->codec_config_len = out_buf->nFilledLen;
memcpy(s->codec_config, buf_data, out_buf->nFilledLen);
@ -412,7 +406,7 @@ static void handle_out_buf(EncoderState *s, OMX_BUFFERHEADERTYPE *out_buf) {
if (s->remuxing) {
if (!s->wrote_codec_config && s->codec_config_len > 0) {
if (s->codec_ctx->extradata_size < s->codec_config_len) {
s->codec_ctx->extradata = realloc(s->codec_ctx->extradata, s->codec_config_len + AV_INPUT_BUFFER_PADDING_SIZE);
s->codec_ctx->extradata = (uint8_t *)realloc(s->codec_ctx->extradata, s->codec_config_len + AV_INPUT_BUFFER_PADDING_SIZE);
}
s->codec_ctx->extradata_size = s->codec_config_len;
memcpy(s->codec_ctx->extradata, s->codec_config, s->codec_config_len);
@ -434,7 +428,9 @@ static void handle_out_buf(EncoderState *s, OMX_BUFFERHEADERTYPE *out_buf) {
av_init_packet(&pkt);
pkt.data = buf_data;
pkt.size = out_buf->nFilledLen;
pkt.pts = pkt.dts = av_rescale_q_rnd(out_buf->nTimeStamp, in_timebase, s->ofmt_ctx->streams[0]->time_base, AV_ROUND_NEAR_INF|AV_ROUND_PASS_MINMAX);
enum AVRounding rnd = static_cast<enum AVRounding>(AV_ROUND_NEAR_INF|AV_ROUND_PASS_MINMAX);
pkt.pts = pkt.dts = av_rescale_q_rnd(out_buf->nTimeStamp, in_timebase, s->ofmt_ctx->streams[0]->time_base, rnd);
pkt.duration = av_rescale_q(50*1000, in_timebase, s->ofmt_ctx->streams[0]->time_base);
if (out_buf->nFlags & OMX_BUFFERFLAG_SYNCFRAME) {
@ -478,7 +474,7 @@ int encoder_encode_frame(EncoderState *s,
// this sometimes freezes... put it outside the encoder lock so we can still trigger rotates...
// THIS IS A REALLY BAD IDEA, but apparently the race has to happen 30 times to trigger this
//pthread_mutex_unlock(&s->lock);
OMX_BUFFERHEADERTYPE* in_buf = queue_pop(&s->free_in);
OMX_BUFFERHEADERTYPE* in_buf = (OMX_BUFFERHEADERTYPE *)queue_pop(&s->free_in);
//pthread_mutex_lock(&s->lock);
// if (s->rotating) {
@ -507,12 +503,12 @@ int encoder_encode_frame(EncoderState *s,
s->u_ptr2, s->width/2,
s->v_ptr2, s->width/2,
s->width, s->height,
kFilterNone);
libyuv::kFilterNone);
y_ptr = s->y_ptr2;
u_ptr = s->u_ptr2;
v_ptr = s->v_ptr2;
}
err = I420ToNV12(y_ptr, s->width,
err = libyuv::I420ToNV12(y_ptr, s->width,
u_ptr, s->width/2,
v_ptr, s->width/2,
in_y_ptr, in_y_stride,
@ -532,7 +528,7 @@ int encoder_encode_frame(EncoderState *s,
// pump output
while (true) {
OMX_BUFFERHEADERTYPE *out_buf = queue_try_pop(&s->done_out);
OMX_BUFFERHEADERTYPE *out_buf = (OMX_BUFFERHEADERTYPE *)queue_try_pop(&s->done_out);
if (!out_buf) {
break;
}
@ -620,7 +616,7 @@ void encoder_close(EncoderState *s) {
if (s->dirty) {
// drain output only if there could be frames in the encoder
OMX_BUFFERHEADERTYPE* in_buf = queue_pop(&s->free_in);
OMX_BUFFERHEADERTYPE* in_buf = (OMX_BUFFERHEADERTYPE *)queue_pop(&s->free_in);
in_buf->nFilledLen = 0;
in_buf->nOffset = 0;
in_buf->nFlags = OMX_BUFFERFLAG_EOS;
@ -630,7 +626,7 @@ void encoder_close(EncoderState *s) {
assert(err == OMX_ErrorNone);
while (true) {
OMX_BUFFERHEADERTYPE *out_buf = queue_pop(&s->done_out);
OMX_BUFFERHEADERTYPE *out_buf = (OMX_BUFFERHEADERTYPE *)queue_pop(&s->done_out);
handle_out_buf(s, out_buf);
@ -650,8 +646,8 @@ void encoder_close(EncoderState *s) {
fclose(s->of);
}
unlink(s->lock_path);
s->open = false;
}
s->open = false;
pthread_mutex_unlock(&s->lock);
}
@ -712,19 +708,6 @@ void encoder_destroy(EncoderState *s) {
#if 0
// cd one/selfdrive/visiond
// clang
// -fPIC -pie
// -std=gnu11 -O2 -g
// -o encoder
// -I ~/one/selfdrive
// -I ~/one/phonelibs/openmax/include
// -I ~/one/phonelibs/libyuv/include
// -lOmxVenc -lOmxCore -llog
// encoder.c
// buffering.c
// -L ~/one/phonelibs/libyuv/lib -l:libyuv.a
int main() {
int err;

View File

@ -1,23 +1,20 @@
#ifndef ENCODER_H
#define ENCODER_H
#pragma once
#include <stdio.h>
#include <stdint.h>
#include <stdbool.h>
#include <pthread.h>
#include <OMX_Component.h>
#include <libavformat/avformat.h>
extern "C" {
#include <libavformat/avformat.h>
}
#include "common/cqueue.h"
#include "common/visionipc.h"
#include "visionipc.h"
#ifdef __cplusplus
extern "C" {
#endif
typedef struct EncoderState {
struct EncoderState {
pthread_mutex_t lock;
int width, height, fps;
const char* path;
@ -65,7 +62,7 @@ typedef struct EncoderState {
bool downscale;
uint8_t *y_ptr2, *u_ptr2, *v_ptr2;
} EncoderState;
};
void encoder_init(EncoderState *s, const char* filename, int width, int height, int fps, int bitrate, bool h265, bool downscale);
int encoder_encode_frame(EncoderState *s,
@ -76,9 +73,3 @@ void encoder_open(EncoderState *s, const char* path);
void encoder_rotate(EncoderState *s, const char* new_path, int new_segment);
void encoder_close(EncoderState *s);
void encoder_destroy(EncoderState *s);
#ifdef __cplusplus
}
#endif
#endif

View File

@ -4,12 +4,10 @@
#include <cassert>
#include <unistd.h>
#include <errno.h>
#include <poll.h>
#include <string.h>
#include <inttypes.h>
#include <libyuv.h>
#include <sys/resource.h>
#include <pthread.h>
#include <sys/resource.h>
#include <string>
#include <iostream>
@ -46,15 +44,14 @@
#include "encoder.h"
#endif
#define MAIN_BITRATE 5000000
#define QCAM_BITRATE 128000
#define MAIN_FPS 20
constexpr int MAIN_BITRATE = 5000000;
constexpr int MAIN_FPS = 20;
#ifndef QCOM2
#define MAX_CAM_IDX LOG_CAMERA_ID_DCAMERA
#define DCAM_BITRATE 2500000
constexpr int MAX_CAM_IDX = LOG_CAMERA_ID_DCAMERA;
constexpr int DCAM_BITRATE = 2500000;
#else
#define MAX_CAM_IDX LOG_CAMERA_ID_ECAMERA
#define DCAM_BITRATE MAIN_BITRATE
constexpr int MAX_CAM_IDX = LOG_CAMERA_ID_ECAMERA;
constexpr int DCAM_BITRATE = MAIN_BITRATE;
#endif
#define NO_CAMERA_PATIENCE 500 // fall back to time-based rotation if all cameras are dead
@ -93,7 +90,7 @@ LogCameraInfo cameras_logged[LOG_CAMERA_ID_MAX] = {
[LOG_CAMERA_ID_QCAMERA] = {
.filename = "qcamera.ts",
.fps = MAIN_FPS,
.bitrate = QCAM_BITRATE,
.bitrate = 128000,
.is_h265 = false,
.downscale = true,
#ifndef QCOM2
@ -105,12 +102,12 @@ LogCameraInfo cameras_logged[LOG_CAMERA_ID_MAX] = {
};
namespace {
constexpr int SEGMENT_LENGTH = 60;
const char* LOG_ROOT = "/data/media/0/realdata";
namespace {
double randrange(double a, double b) __attribute__((unused));
double randrange(double a, double b) {
static std::mt19937 gen(millis_since_boot());
@ -137,14 +134,16 @@ public:
void waitLogThread() {
std::unique_lock<std::mutex> lk(fid_lock);
while (stream_frame_id > log_frame_id //if the log camera is older, wait for it to catch up.
while (stream_frame_id > log_frame_id // if the log camera is older, wait for it to catch up.
&& (stream_frame_id - log_frame_id) < 8 // but if its too old then there probably was a discontinuity (visiond restarted)
&& !do_exit) {
cv.wait(lk);
}
}
void cancelWait() { cv.notify_one(); }
void cancelWait() {
cv.notify_one();
}
void setStreamFrameId(uint32_t frame_id) {
fid_lock.lock();
@ -161,10 +160,11 @@ public:
}
void rotate() {
if (!enabled) { return; }
std::unique_lock<std::mutex> lk(fid_lock);
should_rotate = true;
last_rotate_frame_id = stream_frame_id;
if (enabled) {
std::unique_lock<std::mutex> lk(fid_lock);
should_rotate = true;
last_rotate_frame_id = stream_frame_id;
}
}
void finish_rotate() {
@ -183,11 +183,12 @@ struct LoggerdState {
char segment_path[4096];
int rotate_segment;
pthread_mutex_t rotate_lock;
// video encders
int num_encoder;
std::atomic<int> rotate_seq_id;
std::atomic<int> should_close;
std::atomic<int> finish_close;
RotateState rotate_state[LOG_CAMERA_ID_MAX-1];
};
LoggerdState s;
@ -204,13 +205,12 @@ void encoder_thread(int cam_idx) {
std::vector<EncoderState*> encoders;
int encoder_segment = -1;
int cnt = 0;
pthread_mutex_lock(&s.rotate_lock);
int my_idx = s.num_encoder;
s.num_encoder += 1;
pthread_mutex_unlock(&s.rotate_lock);
int cnt = 0;
LoggerHandle *lh = NULL;
while (!do_exit) {
@ -222,6 +222,7 @@ void encoder_thread(int cam_idx) {
continue;
}
// init encoders
if (encoders.empty()) {
LOGD("encoder init %dx%d", buf_info.width, buf_info.height);
@ -252,8 +253,8 @@ void encoder_thread(int cam_idx) {
//double msdiff = (double) diff / 1000000.0;
// printf("logger latency to tsEof: %f\n", msdiff);
{ // all the rotation stuff
// all the rotation stuff
{
pthread_mutex_lock(&s.rotate_lock);
pthread_mutex_unlock(&s.rotate_lock);
@ -268,8 +269,9 @@ void encoder_thread(int cam_idx) {
rotate_state.last_rotate_frame_id = extra.frame_id - 1;
rotate_state.initialized = true;
}
while (s.rotate_seq_id != my_idx && !do_exit) { usleep(1000); }
// poll for our turn
while (s.rotate_seq_id != my_idx && !do_exit) util::sleep_for(10);
LOGW("camera %d rotate encoder to %s.", cam_idx, s.segment_path);
for (auto &e : encoders) {
@ -277,7 +279,6 @@ void encoder_thread(int cam_idx) {
}
s.rotate_seq_id = (my_idx + 1) % s.num_encoder;
encoder_segment = s.rotate_segment;
if (lh) {
lh_close(lh);
}
@ -287,11 +288,11 @@ void encoder_thread(int cam_idx) {
s.should_close += 1;
pthread_mutex_unlock(&s.rotate_lock);
while(s.should_close > 0 && s.should_close < s.num_encoder && !do_exit) { usleep(1000); }
while(s.should_close > 0 && s.should_close < s.num_encoder && !do_exit) util::sleep_for(10);
pthread_mutex_lock(&s.rotate_lock);
s.should_close = s.should_close == s.num_encoder ? 1 - s.num_encoder : s.should_close + 1;
for (auto &e : encoders) {
encoder_close(e);
encoder_open(e, e->next_path);
@ -302,7 +303,8 @@ void encoder_thread(int cam_idx) {
s.finish_close += 1;
pthread_mutex_unlock(&s.rotate_lock);
while(s.finish_close > 0 && s.finish_close < s.num_encoder && !do_exit) { usleep(1000); }
// wait for all to finish
while(s.finish_close > 0 && s.finish_close < s.num_encoder && !do_exit) util::sleep_for(10);
s.finish_close = 0;
rotate_state.finish_rotate();
@ -338,7 +340,7 @@ void encoder_thread(int cam_idx) {
#ifdef QCOM2
eidx.setType(cereal::EncodeIndex::Type::FULL_H_E_V_C);
#else
eidx.setType(cam_idx == LOG_CAMERA_ID_DCAMERA ? cereal::EncodeIndex::Type::FRONT:cereal::EncodeIndex::Type::FULL_H_E_V_C);
eidx.setType(cam_idx == LOG_CAMERA_ID_DCAMERA ? cereal::EncodeIndex::Type::FRONT : cereal::EncodeIndex::Type::FULL_H_E_V_C);
#endif
eidx.setEncodeId(cnt);
@ -384,9 +386,9 @@ kj::Array<capnp::word> gen_init_data() {
MessageBuilder msg;
auto init = msg.initEvent().initInitData();
if (file_exists("/EON"))
if (file_exists("/EON")) {
init.setDeviceType(cereal::InitData::DeviceType::NEO);
else if (file_exists("/TICI")) {
} else if (file_exists("/TICI")) {
init.setDeviceType(cereal::InitData::DeviceType::TICI);
} else {
init.setDeviceType(cereal::InitData::DeviceType::PC);
@ -426,31 +428,15 @@ kj::Array<capnp::word> gen_init_data() {
if (dongle_id) {
init.setDongleId(std::string(dongle_id));
}
init.setDirty(!getenv("CLEAN"));
const char* clean = getenv("CLEAN");
if (!clean) {
init.setDirty(true);
}
// log params
Params params = Params();
std::vector<char> git_commit = params.read_db_bytes("GitCommit");
if (git_commit.size() > 0) {
init.setGitCommit(capnp::Text::Reader(git_commit.data(), git_commit.size()));
}
std::vector<char> git_branch = params.read_db_bytes("GitBranch");
if (git_branch.size() > 0) {
init.setGitBranch(capnp::Text::Reader(git_branch.data(), git_branch.size()));
}
std::vector<char> git_remote = params.read_db_bytes("GitRemote");
if (git_remote.size() > 0) {
init.setGitRemote(capnp::Text::Reader(git_remote.data(), git_remote.size()));
}
init.setGitCommit(params.get("GitCommit"));
init.setGitBranch(params.get("GitBranch"));
init.setGitRemote(params.get("GitRemote"));
init.setPassive(params.read_db_bool("Passive"));
{
// log params
std::map<std::string, std::string> params_map;
params.read_db_all(&params_map);
auto lparams = init.initParams().initEntries(params_map.size());
@ -513,11 +499,8 @@ static void bootlog() {
}
int main(int argc, char** argv) {
int err;
#ifdef QCOM
setpriority(PRIO_PROCESS, 0, -12);
#endif
if (argc > 1 && strcmp(argv[1], "--bootlog") == 0) {
bootlog();
@ -528,6 +511,7 @@ int main(int argc, char** argv) {
if (getenv("LOGGERD_TEST")) {
segment_length = atoi(getenv("LOGGERD_SEGMENT_LENGTH"));
}
bool record_front = true;
#ifndef QCOM2
record_front = Params().read_db_bool("RecordFront");
@ -576,6 +560,7 @@ int main(int argc, char** argv) {
s.num_encoder = 0;
pthread_mutex_init(&s.rotate_lock, NULL);
// TODO: create these threads dynamically on frame packet presence
std::vector<std::thread> encoder_threads;
#ifndef DISABLE_ENCODER
encoder_threads.push_back(std::thread(encoder_thread, LOG_CAMERA_ID_FCAMERA));
@ -600,7 +585,11 @@ int main(int argc, char** argv) {
double last_rotate_tms = millis_since_boot();
double last_camera_seen_tms = millis_since_boot();
while (!do_exit) {
// TODO: fix msgs from the first poll getting dropped
// poll for new messages on all sockets
for (auto sock : poller->poll(1000)) {
// drain socket
Message * last_msg = nullptr;
while (!do_exit) {
Message * msg = sock->receive(true);
@ -612,21 +601,20 @@ int main(int argc, char** argv) {
QlogState& qs = qlog_states[sock];
logger_log(&s.logger, (uint8_t*)msg->getData(), msg->getSize(), qs.counter == 0);
if (qs.counter != -1) {
//printf("%p: %d/%d\n", socks[i], qlog_counter[socks[i]], qlog_freqs[socks[i]]);
qs.counter = (qs.counter + 1) % qs.freq;
}
bytes_count += msg->getSize();
msg_count++;
}
if (last_msg) {
int fpkt_id = -1;
for (int cid=0;cid<=MAX_CAM_IDX;cid++) {
for (int cid = 0; cid <=MAX_CAM_IDX; cid++) {
if (sock == s.rotate_state[cid].fpkt_sock) {fpkt_id=cid; break;}
}
if (fpkt_id>=0) {
if (fpkt_id >= 0) {
// track camera frames to sync to encoder
// only process last frame
const uint8_t* data = (uint8_t*)last_msg->getData();
@ -649,51 +637,51 @@ int main(int argc, char** argv) {
}
last_camera_seen_tms = millis_since_boot();
}
delete last_msg;
}
delete last_msg;
}
double ts = seconds_since_boot();
double tms = millis_since_boot();
bool new_segment = false;
bool new_segment = s.logger.part == -1;
if (s.logger.part > -1) {
new_segment = true;
double tms = millis_since_boot();
if (tms - last_camera_seen_tms <= NO_CAMERA_PATIENCE && s.num_encoder > 0) {
for (int cid=0;cid<=MAX_CAM_IDX;cid++) {
new_segment = true;
for (auto &r : s.rotate_state) {
// this *should* be redundant on tici since all camera frames are synced
new_segment &= (((s.rotate_state[cid].stream_frame_id >= s.rotate_state[cid].last_rotate_frame_id + segment_length * MAIN_FPS) &&
(!s.rotate_state[cid].should_rotate) && (s.rotate_state[cid].initialized)) ||
(!s.rotate_state[cid].enabled));
new_segment &= (((r.stream_frame_id >= r.last_rotate_frame_id + segment_length * MAIN_FPS) &&
(!r.should_rotate) && (r.initialized)) ||
(!r.enabled));
#ifndef QCOM2
break; // only look at fcamera frame id if not QCOM2
#endif
}
} else {
new_segment &= tms - last_rotate_tms > segment_length * 1000;
if (new_segment) { LOGW("no camera packet seen. auto rotated"); }
if (tms - last_rotate_tms > segment_length * 1000) {
new_segment = true;
LOGW("no camera packet seen. auto rotated");
}
}
} else if (s.logger.part == -1) {
// always starts first segment immediately
new_segment = true;
}
// rotate to new segment
if (new_segment) {
pthread_mutex_lock(&s.rotate_lock);
last_rotate_tms = millis_since_boot();
err = logger_next(&s.logger, LOG_ROOT, s.segment_path, sizeof(s.segment_path), &s.rotate_segment);
int err = logger_next(&s.logger, LOG_ROOT, s.segment_path, sizeof(s.segment_path), &s.rotate_segment);
assert(err == 0);
if (s.logger.part == 0) { LOGW("logging to %s", s.segment_path); }
if (s.logger.part == 0) {
LOGW("logging to %s", s.segment_path);
}
LOGW("rotated to %s", s.segment_path);
// rotate the encoders
for (int cid=0;cid<=MAX_CAM_IDX;cid++) { s.rotate_state[cid].rotate(); }
// rotate encoders
for (auto &r : s.rotate_state) r.rotate();
pthread_mutex_unlock(&s.rotate_lock);
}
if ((msg_count%1000) == 0) {
if ((msg_count % 1000) == 0) {
double ts = seconds_since_boot();
LOGD("%lu messages, %.2f msg/sec, %.2f KB/sec", msg_count, msg_count*1.0/(ts-start_ts), bytes_count*0.001/(ts-start_ts));
}
}

View File

@ -49,14 +49,14 @@ class TestLoggerd(unittest.TestCase):
def _check_init_data(self, msgs):
msg = msgs[0]
assert msg.which() == 'initData'
self.assertEqual(msg.which(), 'initData')
def _check_sentinel(self, msgs, route):
start_type = SentinelType.startOfRoute if route else SentinelType.startOfSegment
assert msgs[1].sentinel.type == start_type
self.assertTrue(msgs[1].sentinel.type == start_type)
end_type = SentinelType.endOfRoute if route else SentinelType.endOfSegment
assert msgs[-1].sentinel.type == end_type
self.assertTrue(msgs[-1].sentinel.type == end_type)
def test_init_data_values(self):
os.environ["CLEAN"] = random.choice(["0", "1"])
@ -74,19 +74,19 @@ class TestLoggerd(unittest.TestCase):
lr = list(LogReader(str(self._gen_bootlog())))
initData = lr[0].initData
assert initData.dirty != bool(os.environ["CLEAN"])
assert initData.dongleId == os.environ["DONGLE_ID"]
assert initData.version == VERSION
self.assertTrue(initData.dirty != bool(os.environ["CLEAN"]))
self.assertEqual(initData.dongleId, os.environ["DONGLE_ID"])
self.assertEqual(initData.version, VERSION)
if os.path.isfile("/proc/cmdline"):
with open("/proc/cmdline") as f:
assert list(initData.kernelArgs) == f.read().strip().split(" ")
self.assertEqual(list(initData.kernelArgs), f.read().strip().split(" "))
with open("/proc/version") as f:
assert initData.kernelVersion == f.read()
self.assertEqual(initData.kernelVersion, f.read())
for _, k, v in fake_params:
assert getattr(initData, k) == v
self.assertEqual(getattr(initData, k), v)
def test_bootlog(self):
# generate bootlog with fake launch log
@ -118,7 +118,7 @@ class TestLoggerd(unittest.TestCase):
val = b""
if path.is_file():
val = open(path, "rb").read()
assert getattr(boot, field) == val
self.assertEqual(getattr(boot, field), val)
def test_qlog(self):
qlog_services = [s for s in CEREAL_SERVICES if service_list[s].decimation is not None]
@ -164,11 +164,11 @@ class TestLoggerd(unittest.TestCase):
if s in no_qlog_services:
# check services with no specific decimation aren't in qlog
assert recv_cnt == 0, f"got {recv_cnt} {s} msgs in qlog"
self.assertEqual(recv_cnt, 0, f"got {recv_cnt} {s} msgs in qlog")
else:
# check logged message count matches decimation
expected_cnt = len(msgs) // service_list[s].decimation
assert recv_cnt == expected_cnt, f"expected {expected_cnt} msgs for {s}, got {recv_cnt}"
self.assertEqual(recv_cnt, expected_cnt, f"expected {expected_cnt} msgs for {s}, got {recv_cnt}")
def test_rlog(self):
services = random.sample(CEREAL_SERVICES, random.randint(5, 10))
@ -204,7 +204,7 @@ class TestLoggerd(unittest.TestCase):
for m in lr:
sent = sent_msgs[m.which()].pop(0)
sent.clear_write_flag()
assert sent.to_bytes() == m.as_builder().to_bytes()
self.assertEqual(sent.to_bytes(), m.as_builder().to_bytes())
if __name__ == "__main__":