Properly handle session packets in delay_buffer

The delay buffer must forward the session packets while preserving
their order relative to media packets.
This commit is contained in:
Romain Vimont 2025-06-15 12:31:24 +02:00
commit fe0bda4bc7
2 changed files with 111 additions and 48 deletions

View file

@ -10,16 +10,18 @@
#define DOWNCAST(SINK) container_of(SINK, struct sc_delay_buffer, frame_sink) #define DOWNCAST(SINK) container_of(SINK, struct sc_delay_buffer, frame_sink)
static bool static bool
sc_delayed_frame_init(struct sc_delayed_frame *dframe, const AVFrame *frame) { sc_delayed_packet_init_frame(struct sc_delayed_packet *dpacket,
dframe->frame = av_frame_alloc(); const AVFrame *frame) {
if (!dframe->frame) { dpacket->type = SC_DELAYED_PACKET_TYPE_FRAME;
dpacket->frame = av_frame_alloc();
if (!dpacket->frame) {
LOG_OOM(); LOG_OOM();
return false; return false;
} }
if (av_frame_ref(dframe->frame, frame)) { if (av_frame_ref(dpacket->frame, frame)) {
LOG_OOM(); LOG_OOM();
av_frame_free(&dframe->frame); av_frame_free(&dpacket->frame);
return false; return false;
} }
@ -27,9 +29,18 @@ sc_delayed_frame_init(struct sc_delayed_frame *dframe, const AVFrame *frame) {
} }
static void static void
sc_delayed_frame_destroy(struct sc_delayed_frame *dframe) { sc_delayed_packet_init_session(struct sc_delayed_packet *dpacket,
av_frame_unref(dframe->frame); const struct sc_stream_session *session) {
av_frame_free(&dframe->frame); dpacket->type = SC_DELAYED_PACKET_TYPE_SESSION;
dpacket->session = *session;
}
static void
sc_delayed_packet_destroy(struct sc_delayed_packet *dpacket) {
if (dpacket->type == SC_DELAYED_PACKET_TYPE_FRAME) {
av_frame_unref(dpacket->frame);
av_frame_free(&dpacket->frame);
}
} }
static int static int
@ -50,43 +61,52 @@ run_buffering(void *data) {
goto stopped; goto stopped;
} }
struct sc_delayed_frame dframe = sc_vecdeque_pop(&db->queue); struct sc_delayed_packet dpacket = sc_vecdeque_pop(&db->queue);
sc_tick max_deadline = sc_tick_now() + db->delay; bool ok;
// PTS (written by the server) are expressed in microseconds if (dpacket.type == SC_DELAYED_PACKET_TYPE_FRAME) {
sc_tick pts = SC_TICK_FROM_US(dframe.frame->pts); sc_tick max_deadline = sc_tick_now() + db->delay;
// PTS (written by the server) are expressed in microseconds
sc_tick pts = SC_TICK_FROM_US(dpacket.frame->pts);
bool timed_out = false; bool timed_out = false;
while (!db->stopped && !timed_out) { while (!db->stopped && !timed_out) {
sc_tick deadline = sc_clock_to_system_time(&db->clock, pts) sc_tick deadline = sc_clock_to_system_time(&db->clock, pts)
+ db->delay; + db->delay;
if (deadline > max_deadline) { if (deadline > max_deadline) {
deadline = max_deadline; deadline = max_deadline;
}
timed_out =
!sc_cond_timedwait(&db->wait_cond, &db->mutex, deadline);
} }
timed_out = bool stopped = db->stopped;
!sc_cond_timedwait(&db->wait_cond, &db->mutex, deadline); sc_mutex_unlock(&db->mutex);
}
bool stopped = db->stopped; if (stopped) {
sc_mutex_unlock(&db->mutex); sc_delayed_packet_destroy(&dpacket);
goto stopped;
if (stopped) { }
sc_delayed_frame_destroy(&dframe);
goto stopped;
}
#ifdef SC_BUFFERING_DEBUG #ifdef SC_BUFFERING_DEBUG
LOGD("Buffering: %" PRItick ";%" PRItick ";%" PRItick, LOGD("Buffering: %" PRItick ";%" PRItick ";%" PRItick,
pts, dframe.push_date, sc_tick_now()); pts, dframe.push_date, sc_tick_now());
#endif #endif
bool ok = sc_frame_source_sinks_push(&db->frame_source, dframe.frame); ok = sc_frame_source_sinks_push(&db->frame_source, dpacket.frame);
sc_delayed_frame_destroy(&dframe); } else {
assert(dpacket.type == SC_DELAYED_PACKET_TYPE_SESSION);
sc_mutex_unlock(&db->mutex);
ok = sc_frame_source_sinks_push_session(&db->frame_source,
&dpacket.session);
}
sc_delayed_packet_destroy(&dpacket);
if (!ok) { if (!ok) {
LOGE("Delayed frame could not be pushed, stopping"); LOGE("Delayed packet could not be pushed, stopping");
sc_mutex_lock(&db->mutex); sc_mutex_lock(&db->mutex);
// Prevent to push any new frame // Prevent to push any new packet
db->stopped = true; db->stopped = true;
sc_mutex_unlock(&db->mutex); sc_mutex_unlock(&db->mutex);
goto stopped; goto stopped;
@ -98,8 +118,8 @@ stopped:
// Flush queue // Flush queue
while (!sc_vecdeque_is_empty(&db->queue)) { while (!sc_vecdeque_is_empty(&db->queue)) {
struct sc_delayed_frame *dframe = sc_vecdeque_popref(&db->queue); struct sc_delayed_packet *dpacket = sc_vecdeque_popref(&db->queue);
sc_delayed_frame_destroy(dframe); sc_delayed_packet_destroy(dpacket);
} }
LOGD("Buffering thread ended"); LOGD("Buffering thread ended");
@ -113,6 +133,7 @@ sc_delay_buffer_frame_sink_open(struct sc_frame_sink *sink,
const struct sc_stream_session *session) { const struct sc_stream_session *session) {
struct sc_delay_buffer *db = DOWNCAST(sink); struct sc_delay_buffer *db = DOWNCAST(sink);
(void) ctx; (void) ctx;
(void) session;
bool ok = sc_mutex_init(&db->mutex); bool ok = sc_mutex_init(&db->mutex);
if (!ok) { if (!ok) {
@ -197,24 +218,56 @@ sc_delay_buffer_frame_sink_push(struct sc_frame_sink *sink,
return sc_frame_source_sinks_push(&db->frame_source, frame); return sc_frame_source_sinks_push(&db->frame_source, frame);
} }
struct sc_delayed_frame dframe; struct sc_delayed_packet *dpacket = sc_vecdeque_push_hole(&db->queue);
bool ok = sc_delayed_frame_init(&dframe, frame); if (!dpacket) {
if (!ok) {
sc_mutex_unlock(&db->mutex); sc_mutex_unlock(&db->mutex);
LOG_OOM();
return false; return false;
} }
#ifdef SC_BUFFERING_DEBUG bool ok = sc_delayed_packet_init_frame(dpacket, frame);
dframe.push_date = sc_tick_now();
#endif
ok = sc_vecdeque_push(&db->queue, dframe);
if (!ok) { if (!ok) {
sc_mutex_unlock(&db->mutex); sc_mutex_unlock(&db->mutex);
LOG_OOM(); LOG_OOM();
return false; return false;
} }
#ifdef SC_BUFFERING_DEBUG
dpacket->push_date = sc_tick_now();
#endif
sc_cond_signal(&db->queue_cond);
sc_mutex_unlock(&db->mutex);
return true;
}
static bool
sc_delay_buffer_frame_sink_push_session(struct sc_frame_sink *sink,
const struct sc_stream_session *session) {
struct sc_delay_buffer *db = DOWNCAST(sink);
sc_mutex_lock(&db->mutex);
if (db->stopped) {
sc_mutex_unlock(&db->mutex);
return false;
}
struct sc_delayed_packet *dpacket = sc_vecdeque_push_hole(&db->queue);
if (!dpacket) {
sc_mutex_unlock(&db->mutex);
LOG_OOM();
return false;
}
sc_delayed_packet_init_session(dpacket, session);
#ifdef SC_BUFFERING_DEBUG
dpacket->push_date = sc_tick_now();
#endif
sc_cond_signal(&db->queue_cond); sc_cond_signal(&db->queue_cond);
sc_mutex_unlock(&db->mutex); sc_mutex_unlock(&db->mutex);
@ -236,6 +289,7 @@ sc_delay_buffer_init(struct sc_delay_buffer *db, sc_tick delay,
.open = sc_delay_buffer_frame_sink_open, .open = sc_delay_buffer_frame_sink_open,
.close = sc_delay_buffer_frame_sink_close, .close = sc_delay_buffer_frame_sink_close,
.push = sc_delay_buffer_frame_sink_push, .push = sc_delay_buffer_frame_sink_push,
.push_session = sc_delay_buffer_frame_sink_push_session,
}; };
db->frame_sink.ops = &ops; db->frame_sink.ops = &ops;

View file

@ -18,14 +18,23 @@
// forward declarations // forward declarations
typedef struct AVFrame AVFrame; typedef struct AVFrame AVFrame;
struct sc_delayed_frame { enum sc_delayed_packet_type {
AVFrame *frame; SC_DELAYED_PACKET_TYPE_FRAME,
SC_DELAYED_PACKET_TYPE_SESSION,
};
struct sc_delayed_packet {
enum sc_delayed_packet_type type;
union {
AVFrame *frame;
struct sc_stream_session session;
};
#ifdef SC_BUFFERING_DEBUG #ifdef SC_BUFFERING_DEBUG
sc_tick push_date; sc_tick push_date;
#endif #endif
}; };
struct sc_delayed_frame_queue SC_VECDEQUE(struct sc_delayed_frame); struct sc_delayed_packet_queue SC_VECDEQUE(struct sc_delayed_packet);
struct sc_delay_buffer { struct sc_delay_buffer {
struct sc_frame_source frame_source; // frame source trait struct sc_frame_source frame_source; // frame source trait
@ -40,7 +49,7 @@ struct sc_delay_buffer {
sc_cond wait_cond; sc_cond wait_cond;
struct sc_clock clock; struct sc_clock clock;
struct sc_delayed_frame_queue queue; struct sc_delayed_packet_queue queue;
bool stopped; bool stopped;
}; };