From fe0bda4bc74d159fcd8aee7459c7c11a48e8a163 Mon Sep 17 00:00:00 2001 From: Romain Vimont Date: Sun, 15 Jun 2025 12:31:24 +0200 Subject: [PATCH] Properly handle session packets in delay_buffer The delay buffer must forward the session packets while preserving their order relative to media packets. --- app/src/delay_buffer.c | 142 ++++++++++++++++++++++++++++------------- app/src/delay_buffer.h | 17 +++-- 2 files changed, 111 insertions(+), 48 deletions(-) diff --git a/app/src/delay_buffer.c b/app/src/delay_buffer.c index 2a6fcdb3..a46f43b2 100644 --- a/app/src/delay_buffer.c +++ b/app/src/delay_buffer.c @@ -10,16 +10,18 @@ #define DOWNCAST(SINK) container_of(SINK, struct sc_delay_buffer, frame_sink) static bool -sc_delayed_frame_init(struct sc_delayed_frame *dframe, const AVFrame *frame) { - dframe->frame = av_frame_alloc(); - if (!dframe->frame) { +sc_delayed_packet_init_frame(struct sc_delayed_packet *dpacket, + const AVFrame *frame) { + dpacket->type = SC_DELAYED_PACKET_TYPE_FRAME; + dpacket->frame = av_frame_alloc(); + if (!dpacket->frame) { LOG_OOM(); return false; } - if (av_frame_ref(dframe->frame, frame)) { + if (av_frame_ref(dpacket->frame, frame)) { LOG_OOM(); - av_frame_free(&dframe->frame); + av_frame_free(&dpacket->frame); return false; } @@ -27,9 +29,18 @@ sc_delayed_frame_init(struct sc_delayed_frame *dframe, const AVFrame *frame) { } static void -sc_delayed_frame_destroy(struct sc_delayed_frame *dframe) { - av_frame_unref(dframe->frame); - av_frame_free(&dframe->frame); +sc_delayed_packet_init_session(struct sc_delayed_packet *dpacket, + const struct sc_stream_session *session) { + dpacket->type = SC_DELAYED_PACKET_TYPE_SESSION; + dpacket->session = *session; +} + +static void +sc_delayed_packet_destroy(struct sc_delayed_packet *dpacket) { + if (dpacket->type == SC_DELAYED_PACKET_TYPE_FRAME) { + av_frame_unref(dpacket->frame); + av_frame_free(&dpacket->frame); + } } static int @@ -50,43 +61,52 @@ run_buffering(void *data) { goto stopped; } - struct sc_delayed_frame dframe = sc_vecdeque_pop(&db->queue); + struct sc_delayed_packet dpacket = sc_vecdeque_pop(&db->queue); - sc_tick max_deadline = sc_tick_now() + db->delay; - // PTS (written by the server) are expressed in microseconds - sc_tick pts = SC_TICK_FROM_US(dframe.frame->pts); + bool ok; + if (dpacket.type == SC_DELAYED_PACKET_TYPE_FRAME) { + sc_tick max_deadline = sc_tick_now() + db->delay; + // PTS (written by the server) are expressed in microseconds + sc_tick pts = SC_TICK_FROM_US(dpacket.frame->pts); - bool timed_out = false; - while (!db->stopped && !timed_out) { - sc_tick deadline = sc_clock_to_system_time(&db->clock, pts) - + db->delay; - if (deadline > max_deadline) { - deadline = max_deadline; + bool timed_out = false; + while (!db->stopped && !timed_out) { + sc_tick deadline = sc_clock_to_system_time(&db->clock, pts) + + db->delay; + if (deadline > max_deadline) { + deadline = max_deadline; + } + + timed_out = + !sc_cond_timedwait(&db->wait_cond, &db->mutex, deadline); } - timed_out = - !sc_cond_timedwait(&db->wait_cond, &db->mutex, deadline); - } + bool stopped = db->stopped; + sc_mutex_unlock(&db->mutex); - bool stopped = db->stopped; - sc_mutex_unlock(&db->mutex); - - if (stopped) { - sc_delayed_frame_destroy(&dframe); - goto stopped; - } + if (stopped) { + sc_delayed_packet_destroy(&dpacket); + goto stopped; + } #ifdef SC_BUFFERING_DEBUG - LOGD("Buffering: %" PRItick ";%" PRItick ";%" PRItick, - pts, dframe.push_date, sc_tick_now()); + LOGD("Buffering: %" PRItick ";%" PRItick ";%" PRItick, + pts, dframe.push_date, sc_tick_now()); #endif - bool ok = sc_frame_source_sinks_push(&db->frame_source, dframe.frame); - sc_delayed_frame_destroy(&dframe); + ok = sc_frame_source_sinks_push(&db->frame_source, dpacket.frame); + } else { + assert(dpacket.type == SC_DELAYED_PACKET_TYPE_SESSION); + sc_mutex_unlock(&db->mutex); + ok = sc_frame_source_sinks_push_session(&db->frame_source, + &dpacket.session); + } + + sc_delayed_packet_destroy(&dpacket); if (!ok) { - LOGE("Delayed frame could not be pushed, stopping"); + LOGE("Delayed packet could not be pushed, stopping"); sc_mutex_lock(&db->mutex); - // Prevent to push any new frame + // Prevent to push any new packet db->stopped = true; sc_mutex_unlock(&db->mutex); goto stopped; @@ -98,8 +118,8 @@ stopped: // Flush queue while (!sc_vecdeque_is_empty(&db->queue)) { - struct sc_delayed_frame *dframe = sc_vecdeque_popref(&db->queue); - sc_delayed_frame_destroy(dframe); + struct sc_delayed_packet *dpacket = sc_vecdeque_popref(&db->queue); + sc_delayed_packet_destroy(dpacket); } LOGD("Buffering thread ended"); @@ -113,6 +133,7 @@ sc_delay_buffer_frame_sink_open(struct sc_frame_sink *sink, const struct sc_stream_session *session) { struct sc_delay_buffer *db = DOWNCAST(sink); (void) ctx; + (void) session; bool ok = sc_mutex_init(&db->mutex); if (!ok) { @@ -197,24 +218,56 @@ sc_delay_buffer_frame_sink_push(struct sc_frame_sink *sink, return sc_frame_source_sinks_push(&db->frame_source, frame); } - struct sc_delayed_frame dframe; - bool ok = sc_delayed_frame_init(&dframe, frame); - if (!ok) { + struct sc_delayed_packet *dpacket = sc_vecdeque_push_hole(&db->queue); + if (!dpacket) { sc_mutex_unlock(&db->mutex); + LOG_OOM(); return false; } -#ifdef SC_BUFFERING_DEBUG - dframe.push_date = sc_tick_now(); -#endif - - ok = sc_vecdeque_push(&db->queue, dframe); + bool ok = sc_delayed_packet_init_frame(dpacket, frame); if (!ok) { sc_mutex_unlock(&db->mutex); LOG_OOM(); return false; } +#ifdef SC_BUFFERING_DEBUG + dpacket->push_date = sc_tick_now(); +#endif + + sc_cond_signal(&db->queue_cond); + + sc_mutex_unlock(&db->mutex); + + return true; +} + +static bool +sc_delay_buffer_frame_sink_push_session(struct sc_frame_sink *sink, + const struct sc_stream_session *session) { + struct sc_delay_buffer *db = DOWNCAST(sink); + + sc_mutex_lock(&db->mutex); + + if (db->stopped) { + sc_mutex_unlock(&db->mutex); + return false; + } + + struct sc_delayed_packet *dpacket = sc_vecdeque_push_hole(&db->queue); + if (!dpacket) { + sc_mutex_unlock(&db->mutex); + LOG_OOM(); + return false; + } + + sc_delayed_packet_init_session(dpacket, session); + +#ifdef SC_BUFFERING_DEBUG + dpacket->push_date = sc_tick_now(); +#endif + sc_cond_signal(&db->queue_cond); sc_mutex_unlock(&db->mutex); @@ -236,6 +289,7 @@ sc_delay_buffer_init(struct sc_delay_buffer *db, sc_tick delay, .open = sc_delay_buffer_frame_sink_open, .close = sc_delay_buffer_frame_sink_close, .push = sc_delay_buffer_frame_sink_push, + .push_session = sc_delay_buffer_frame_sink_push_session, }; db->frame_sink.ops = &ops; diff --git a/app/src/delay_buffer.h b/app/src/delay_buffer.h index 61cd77e4..420684be 100644 --- a/app/src/delay_buffer.h +++ b/app/src/delay_buffer.h @@ -18,14 +18,23 @@ // forward declarations typedef struct AVFrame AVFrame; -struct sc_delayed_frame { - AVFrame *frame; +enum sc_delayed_packet_type { + SC_DELAYED_PACKET_TYPE_FRAME, + SC_DELAYED_PACKET_TYPE_SESSION, +}; + +struct sc_delayed_packet { + enum sc_delayed_packet_type type; + union { + AVFrame *frame; + struct sc_stream_session session; + }; #ifdef SC_BUFFERING_DEBUG sc_tick push_date; #endif }; -struct sc_delayed_frame_queue SC_VECDEQUE(struct sc_delayed_frame); +struct sc_delayed_packet_queue SC_VECDEQUE(struct sc_delayed_packet); struct sc_delay_buffer { struct sc_frame_source frame_source; // frame source trait @@ -40,7 +49,7 @@ struct sc_delay_buffer { sc_cond wait_cond; struct sc_clock clock; - struct sc_delayed_frame_queue queue; + struct sc_delayed_packet_queue queue; bool stopped; };