diff --git a/app/src/screen.c b/app/src/screen.c index 126caf9b..02f2f9cb 100644 --- a/app/src/screen.c +++ b/app/src/screen.c @@ -308,7 +308,7 @@ screen_init(struct screen *screen, const struct screen_params *params) { .on_new_frame = sc_video_buffer_on_new_frame, }; - bool ok = sc_video_buffer_init(&screen->vb, 0, &cbs, screen); + bool ok = sc_video_buffer_init(&screen->vb, 1000, &cbs, screen); if (!ok) { LOGE("Could not initialize video buffer"); return false; diff --git a/app/src/v4l2_sink.c b/app/src/v4l2_sink.c index 8f8b98ee..63f14dd9 100644 --- a/app/src/v4l2_sink.c +++ b/app/src/v4l2_sink.c @@ -159,7 +159,7 @@ sc_v4l2_sink_open(struct sc_v4l2_sink *vs) { .on_new_frame = sc_video_buffer_on_new_frame, }; - bool ok = sc_video_buffer_init(&vs->vb, 0, &cbs, vs); + bool ok = sc_video_buffer_init(&vs->vb, 1, &cbs, vs); if (!ok) { LOGE("Could not initialize video buffer"); return false; diff --git a/app/src/video_buffer.c b/app/src/video_buffer.c index 2b527db8..47978768 100644 --- a/app/src/video_buffer.c +++ b/app/src/video_buffer.c @@ -1,11 +1,102 @@ #include "video_buffer.h" #include +#include + #include #include #include "util/log.h" +static void +sc_clock_history_init(struct sc_clock_history *history) { + history->count = 0; + history->head = 0; + history->system_sum = 0; + history->stream_sum = 0; +} + +static void +sc_clock_history_push(struct sc_clock_history *history, + sc_tick system, sc_tick stream) { + struct sc_clock_point *point = &history->points[history->head]; + if (history->count == SC_CLOCK_RANGE) { + history->system_sum -= point->system; + history->stream_sum -= point->stream; + } else { + ++history->count; + } + + point->system = system; + point->stream = stream; + + history->system_sum += system; + history->stream_sum += stream; + + LOGD("#### %ld %ld\n", history->stream_sum / history->count, history->system_sum / history->count); + + history->head = (history->head + 1) % SC_CLOCK_RANGE; +} + +static void +sc_clock_history_get_average_point(struct sc_clock_history *history, + struct sc_clock_point *point) { + assert(history->count); + point->system = history->system_sum / history->count; + point->stream = history->stream_sum / history->count; +} + +static void +sc_clock_init(struct sc_clock *clock) { + clock->coeff = 1; + clock->offset = 0; + clock->weight = 0; + + clock->last.system = 0; + clock->last.stream = 0; + + sc_clock_history_init(&clock->history); +} + +static sc_tick +sc_clock_to_system_ts(struct sc_clock *clock, sc_tick stream_ts) { + assert(clock->weight); // sc_clock_update() must have been called + return (sc_tick) (stream_ts * clock->coeff) + clock->offset; +} + +static void +sc_clock_update(struct sc_clock *clock, sc_tick now, sc_tick stream_ts) { + double instant_coeff; + if (clock->weight) { + sc_tick system_delta = now - clock->last.system; + sc_tick stream_delta = stream_ts - clock->last.stream; + instant_coeff = (double) system_delta / stream_delta; + } else { + // This is the first update, we cannot compute delta + instant_coeff = 1; + } + + sc_clock_history_push(&clock->history, now, stream_ts); + + if (clock->weight < SC_CLOCK_RANGE) { + ++clock->weight; + } + + // (1-t) * avg + t * new + clock->coeff = ((clock->weight - 1) * clock->coeff + instant_coeff) + / clock->weight; + + struct sc_clock_point center; + sc_clock_history_get_average_point(&clock->history, ¢er); + + clock->offset = center.system - (sc_tick) (center.stream * clock->coeff); + + LOGD("%g x + %ld", clock->coeff, clock->offset); + + clock->last.system = now; + clock->last.stream = stream_ts; +} + static struct sc_video_buffer_frame * sc_video_buffer_frame_new(const AVFrame *frame) { struct sc_video_buffer_frame *vb_frame = malloc(sizeof(*vb_frame)); @@ -61,27 +152,50 @@ run_buffering(void *data) { } if (vb->b.stopped) { - // Flush queue - while (!sc_queue_is_empty(&vb->b.queue)) { - struct sc_video_buffer_frame *vb_frame; - sc_queue_take(&vb->b.queue, next, &vb_frame); - sc_video_buffer_frame_delete(vb_frame); - } - break; + sc_mutex_unlock(&vb->b.mutex); + goto stopped; } struct sc_video_buffer_frame *vb_frame; sc_queue_take(&vb->b.queue, next, &vb_frame); - sc_mutex_unlock(&vb->b.mutex); + sc_tick now = sc_tick_now(); + int64_t pts = vb_frame->frame->pts / 1000; + LOGD("==== pts = %ld", pts); - usleep(vb->buffering_ms * 1000); + bool timed_out = false; + while (!vb->b.stopped && !timed_out) { + sc_tick deadline = sc_clock_to_system_ts(&vb->b.clock, pts) + + vb->buffering_ms / 2; + if (deadline > now + vb->buffering_ms) { + deadline = now + vb->buffering_ms; + } + + timed_out = + !sc_cond_timedwait(&vb->b.wait_cond, &vb->b.mutex, deadline); + } + + if (vb->b.stopped) { + sc_video_buffer_frame_delete(vb_frame); + sc_mutex_unlock(&vb->b.mutex); + goto stopped; + } + + sc_mutex_unlock(&vb->b.mutex); sc_video_buffer_offer(vb, vb_frame->frame); sc_video_buffer_frame_delete(vb_frame); } +stopped: + // Flush queue + while (!sc_queue_is_empty(&vb->b.queue)) { + struct sc_video_buffer_frame *vb_frame; + sc_queue_take(&vb->b.queue, next, &vb_frame); + sc_video_buffer_frame_delete(vb_frame); + } + LOGD("Buffering thread ended"); return 0; @@ -112,6 +226,16 @@ sc_video_buffer_init(struct sc_video_buffer *vb, unsigned buffering_ms, return false; } + ok = sc_cond_init(&vb->b.wait_cond); + if (!ok) { + LOGC("Could not create wait cond"); + sc_cond_destroy(&vb->b.queue_cond); + sc_mutex_destroy(&vb->b.mutex); + sc_frame_buffer_destroy(&vb->fb); + return false; + } + + sc_clock_init(&vb->b.clock); sc_queue_init(&vb->b.queue); } @@ -143,6 +267,8 @@ sc_video_buffer_stop(struct sc_video_buffer *vb) { if (vb->buffering_ms) { sc_mutex_lock(&vb->b.mutex); vb->b.stopped = true; + sc_cond_signal(&vb->b.queue_cond); + sc_cond_signal(&vb->b.wait_cond); sc_mutex_unlock(&vb->b.mutex); } } @@ -158,6 +284,7 @@ void sc_video_buffer_destroy(struct sc_video_buffer *vb) { sc_frame_buffer_destroy(&vb->fb); if (vb->buffering_ms) { + sc_cond_destroy(&vb->b.wait_cond); sc_cond_destroy(&vb->b.queue_cond); sc_mutex_destroy(&vb->b.mutex); } @@ -176,9 +303,12 @@ sc_video_buffer_push(struct sc_video_buffer *vb, const AVFrame *frame) { return false; } + sc_clock_update(&vb->b.clock, sc_tick_now(), vb_frame->frame->pts); + sc_mutex_lock(&vb->b.mutex); sc_queue_push(&vb->b.queue, next, vb_frame); sc_cond_signal(&vb->b.queue_cond); + sc_cond_signal(&vb->b.wait_cond); sc_mutex_unlock(&vb->b.mutex); return true; diff --git a/app/src/video_buffer.h b/app/src/video_buffer.h index 0633ff73..3fba902e 100644 --- a/app/src/video_buffer.h +++ b/app/src/video_buffer.h @@ -14,11 +14,41 @@ typedef struct AVFrame AVFrame; struct sc_video_buffer_frame { AVFrame *frame; + sc_tick system_pts; struct sc_video_buffer_frame *next; }; struct sc_video_buffer_frame_queue SC_QUEUE(struct sc_video_buffer_frame); +#define SC_CLOCK_RANGE 32 + +struct sc_clock_point { + sc_tick system; + sc_tick stream; +}; + +struct sc_clock_history { + struct sc_clock_point points[SC_CLOCK_RANGE]; + unsigned count; + unsigned head; + + sc_tick system_sum; + sc_tick stream_sum; +}; + +struct sc_clock { + double coeff; + sc_tick offset; + unsigned weight; // 0 <= weight && weight <= SC_CLOCK_RANGE + + struct { + sc_tick system; + sc_tick stream; + } last; + + struct sc_clock_history history; +}; + struct sc_video_buffer { struct sc_frame_buffer fb; @@ -29,6 +59,9 @@ struct sc_video_buffer { sc_thread thread; sc_mutex mutex; sc_cond queue_cond; + sc_cond wait_cond; + + struct sc_clock clock; struct sc_video_buffer_frame_queue queue; bool stopped; } b; // buffering