loggerd: fix double encoder open (#19703)

* fix double encoder open/close

* simplify rotation

* clean up

* cleanup

Co-authored-by: Comma Device <device@comma.ai>
pull/19635/head
Adeeb Shihadeh 2021-01-11 19:48:50 -08:00 committed by GitHub
parent 9ca8f1a357
commit 87e8ef1fca
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 12 additions and 175 deletions

View File

@ -456,11 +456,6 @@ int encoder_encode_frame(EncoderState *s,
int err;
pthread_mutex_lock(&s->lock);
if (s->opening) {
encoder_open(s, s->next_path);
s->opening = false;
}
if (!s->open) {
pthread_mutex_unlock(&s->lock);
return -1;
@ -538,20 +533,16 @@ int encoder_encode_frame(EncoderState *s,
*frame_segment = s->segment;
}
if (s->closing) {
encoder_close(s);
s->closing = false;
}
pthread_mutex_unlock(&s->lock);
return ret;
}
void encoder_open(EncoderState *s, const char* path) {
void encoder_open(EncoderState *s, const char* path, int segment) {
int err;
pthread_mutex_lock(&s->lock);
s->segment = segment;
snprintf(s->vid_path, sizeof(s->vid_path), "%s/%s", path, s->filename);
LOGD("encoder_open %s remuxing:%d", s->vid_path, s->remuxing);
@ -647,23 +638,6 @@ void encoder_close(EncoderState *s) {
pthread_mutex_unlock(&s->lock);
}
void encoder_rotate(EncoderState *s, const char* new_path, int new_segment) {
pthread_mutex_lock(&s->lock);
snprintf(s->next_path, sizeof(s->next_path), "%s", new_path);
s->next_segment = new_segment;
if (s->open) {
if (s->next_segment == -1) {
s->closing = true;
} else {
s->rotating = true;
}
} else {
s->segment = s->next_segment;
s->opening = true;
}
pthread_mutex_unlock(&s->lock);
}
void encoder_destroy(EncoderState *s) {
int err;
@ -707,89 +681,3 @@ void encoder_destroy(EncoderState *s) {
free(s->v_ptr2);
}
}
#if 0
int main() {
int err;
EncoderState state;
EncoderState *s = &state;
memset(s, 0, sizeof(*s));
int w = 1164;
int h = 874;
encoder_init(s, w, h, 20);
printf("inited\n");
encoder_open(s, "/sdcard/t1");
// uint8_t *tmpy = malloc(640*480);
// uint8_t *tmpu = malloc((640/2)*(480/2));
// uint8_t *tmpv = malloc((640/2)*(480/2));
// memset(tmpy, 0, 640*480);
// // memset(tmpu, 0xff, (640/2)*(480/2));
// memset(tmpv, 0, (640/2)*(480/2));
// #if 0
// FILE *infile = fopen("/sdcard/camera_t2.yuv", "rb");
uint8_t *inbuf = malloc(w*h*3/2);
memset(inbuf, 0, w*h*3/2);
for (int i=0; i<20*3+5; i++) {
// fread(inbuf, w*h*3/2, 1, infile);
uint8_t *tmpy = inbuf;
uint8_t *tmpu = inbuf + w*h;
uint8_t *tmpv = inbuf + w*h + (w/2)*(h/2);
for (int y=0; y<h/2; y++) {
for (int x=0; x<w/2; x++) {
tmpu[y * (w/2) + x] = (i ^ y ^ x);
}
}
encoder_encode_frame(s, 20000*i, tmpy, tmpu, tmpv);
}
// #endif
// while(1);
printf("done\n");
// encoder_close(s);
// printf("restart\n");
// fclose(s->of);
// s->of = fopen("/sdcard/tmpout2.hevc", "wb");
// if (s->codec_config) {
// fwrite(s->codec_config, s->codec_config_len, 1, s->of);
// }
// encoder_open(s, "/sdcard/t1");
encoder_rotate(s, "/sdcard/t2");
for (int i=0; i<20*3+5; i++) {
// fread(inbuf, w*h*3/2, 1, infile);
uint8_t *tmpy = inbuf;
uint8_t *tmpu = inbuf + w*h;
uint8_t *tmpv = inbuf + w*h + (w/2)*(h/2);
for (int y=0; y<h/2; y++) {
for (int x=0; x<w/2; x++) {
tmpu[y * (w/2) + x] = (i ^ y ^ x);
}
}
encoder_encode_frame(s, 20000*i, tmpy, tmpu, tmpv);
}
encoder_close(s);
return 0;
}
#endif

View File

@ -28,12 +28,6 @@ struct EncoderState {
int counter;
int segment;
bool rotating;
bool closing;
bool opening;
char next_path[1024];
int next_segment;
const char* filename;
FILE *of;
@ -72,7 +66,6 @@ int encoder_encode_frame(EncoderState *s,
const uint8_t *y_ptr, const uint8_t *u_ptr, const uint8_t *v_ptr,
int in_width, int in_height,
int *frame_segment, VisionIpcBufExtra *extra);
void encoder_open(EncoderState *s, const char* path);
void encoder_rotate(EncoderState *s, const char* new_path, int new_segment);
void encoder_open(EncoderState *s, const char* path, int segment);
void encoder_close(EncoderState *s);
void encoder_destroy(EncoderState *s);

View File

@ -184,12 +184,6 @@ struct LoggerdState {
char segment_path[4096];
int rotate_segment;
pthread_mutex_t rotate_lock;
// video encders
int num_encoder;
std::atomic<int> rotate_seq_id;
std::atomic<int> should_close;
std::atomic<int> finish_close;
RotateState rotate_state[LOG_CAMERA_ID_MAX-1];
};
LoggerdState s;
@ -199,21 +193,15 @@ void encoder_thread(int cam_idx) {
assert(cam_idx < LOG_CAMERA_ID_MAX-1);
LogCameraInfo &cam_info = cameras_logged[cam_idx];
set_thread_name(cam_info.filename);
RotateState &rotate_state = s.rotate_state[cam_idx];
std::vector<EncoderState*> encoders;
pthread_mutex_lock(&s.rotate_lock);
int my_idx = s.num_encoder;
s.num_encoder += 1;
pthread_mutex_unlock(&s.rotate_lock);
set_thread_name(cam_info.filename);
int cnt = 0;
LoggerHandle *lh = NULL;
std::vector<EncoderState*> encoders;
VisionIpcClient vipc_client = VisionIpcClient("camerad", cam_info.stream_type, false);
while (!do_exit) {
if (!vipc_client.connect(false)){
util::sleep_for(100);
@ -247,10 +235,7 @@ void encoder_thread(int cam_idx) {
continue;
}
//uint64_t current_time = nanos_since_boot();
//uint64_t diff = current_time - extra.timestamp_eof;
//double msdiff = (double) diff / 1000000.0;
// printf("logger latency to tsEof: %f\n", msdiff);
//printf("logger latency to tsEof: %f\n", (double)(nanos_since_boot() - extra.timestamp_eof) / 1000000.0);
// all the rotation stuff
{
@ -264,56 +249,32 @@ void encoder_thread(int cam_idx) {
// rotate the encoder if the logger is on a newer segment
if (rotate_state.should_rotate) {
LOGW("camera %d rotate encoder to %s", cam_idx, s.segment_path);
if (!rotate_state.initialized) {
rotate_state.last_rotate_frame_id = extra.frame_id - 1;
rotate_state.initialized = true;
}
// poll for our turn
while (s.rotate_seq_id != my_idx && !do_exit) util::sleep_for(10);
LOGW("camera %d rotate encoder to %s.", cam_idx, s.segment_path);
for (auto &e : encoders) {
encoder_rotate(e, s.segment_path, s.rotate_segment);
}
s.rotate_seq_id = (my_idx + 1) % s.num_encoder;
if (lh) {
lh_close(lh);
}
lh = logger_get_handle(&s.logger);
pthread_mutex_lock(&s.rotate_lock);
s.should_close += 1;
pthread_mutex_unlock(&s.rotate_lock);
while(s.should_close > 0 && s.should_close < s.num_encoder && !do_exit) util::sleep_for(10);
pthread_mutex_lock(&s.rotate_lock);
s.should_close = s.should_close == s.num_encoder ? 1 - s.num_encoder : s.should_close + 1;
for (auto &e : encoders) {
encoder_close(e);
encoder_open(e, e->next_path);
e->segment = e->next_segment;
e->rotating = false;
encoder_open(e, s.segment_path, s.rotate_segment);
}
s.finish_close += 1;
pthread_mutex_unlock(&s.rotate_lock);
// wait for all to finish
while(s.finish_close > 0 && s.finish_close < s.num_encoder && !do_exit) util::sleep_for(10);
s.finish_close = 0;
rotate_state.finish_rotate();
}
}
rotate_state.setStreamFrameId(extra.frame_id);
// encode a frame
{
// encode hevc
int out_segment = -1;
int out_id = encoder_encode_frame(encoders[0],
buf->y, buf->u, buf->v,
@ -358,7 +319,6 @@ void encoder_thread(int cam_idx) {
lh_close(lh);
lh = NULL;
}
}
LOG("encoder destroy");
@ -551,10 +511,6 @@ int main(int argc, char** argv) {
}
// init encoders
s.rotate_seq_id = 0;
s.should_close = 0;
s.finish_close = 0;
s.num_encoder = 0;
pthread_mutex_init(&s.rotate_lock, NULL);
// TODO: create these threads dynamically on frame packet presence
@ -641,7 +597,7 @@ int main(int argc, char** argv) {
bool new_segment = s.logger.part == -1;
if (s.logger.part > -1) {
double tms = millis_since_boot();
if (tms - last_camera_seen_tms <= NO_CAMERA_PATIENCE && s.num_encoder > 0) {
if (tms - last_camera_seen_tms <= NO_CAMERA_PATIENCE && encoder_threads.size() > 0) {
new_segment = true;
for (auto &r : s.rotate_state) {
// this *should* be redundant on tici since all camera frames are synced