feat: Reduce latency by 1 frame

To packetize the H.264 raw stream, av_parser_parse2() (called by
av_read_frame()) knows that it has received a full frame only after it
has received some data for the next frame. As a consequence, the client
always waited until the next frame before sending the current frame to
the decoder!

On the device side, we know packets boundaries. To reduce latency,
make the device always transmit the "frame meta" to packetize the stream
manually (it was already implemented to send PTS, but only enabled on
recording).

On the client side, replace av_read_frame() by manual packetizing and
parsing.

<https://stackoverflow.com/questions/50682518/replacing-av-read-frame-to-reduce-delay>
<https://trac.ffmpeg.org/ticket/3354>
This commit is contained in:
rankun 2020-01-15 12:34:44 +08:00
parent c957bfc798
commit cc433da45b
2 changed files with 204 additions and 228 deletions

View file

@ -16,12 +16,10 @@ typedef qint32 (*ReadPacketFunc)(void*, quint8*, qint32);
Stream::Stream(QObject *parent)
: QThread(parent)
{
m_quit = false;
}
Stream::~Stream()
{
}
static void avLogCallback(void *avcl, int level, const char *fmt, va_list vl) {
@ -85,121 +83,6 @@ static quint64 bufferRead64be(quint8* buf) {
return ((quint64) msb << 32) | lsb;
}
static Stream::FrameMeta* frameMetaNew(quint64 pts) {
Stream::FrameMeta* meta = new Stream::FrameMeta;
if (!meta) {
return meta;
}
meta->pts = pts;
meta->next = Q_NULLPTR;
return meta;
}
static void frameMetaDelete(Stream::FrameMeta* frameMeta) {
if (frameMeta) {
delete frameMeta;
}
}
static bool receiverStatePushMeta(Stream::ReceiverState* state, quint64 pts) {
Stream::FrameMeta* frameMeta = frameMetaNew(pts);
if (!frameMeta) {
return false;
}
// append to the list
// (iterate to find the last item, in practice the list should be tiny)
Stream::FrameMeta **p = &state->frameMetaQueue;
while (*p) {
p = &(*p)->next;
}
*p = frameMeta;
return true;
}
static quint64 receiverStateTakeMeta(Stream::ReceiverState* state) {
Stream::FrameMeta *frameMeta = state->frameMetaQueue; // first item
Q_ASSERT(frameMeta); // must not be empty
quint64 pts = frameMeta->pts;
state->frameMetaQueue = frameMeta->next; // remove the item
frameMetaDelete(frameMeta);
return pts;
}
static qint32 readPacketWithMeta(void *opaque, uint8_t *buf, int bufSize) {
Stream* stream = (Stream*)opaque;
Stream::ReceiverState* state = stream->getReceiverState();
// The video stream contains raw packets, without time information. When we
// record, we retrieve the timestamps separately, from a "meta" header
// added by the server before each raw packet.
//
// The "meta" header length is 12 bytes:
// [. . . . . . . .|. . . .]. . . . . . . . . . . . . . . ...
// <-------------> <-----> <-----------------------------...
// PTS packet raw packet
// size
//
// It is followed by <packet_size> bytes containing the packet/frame.
if (!state->remaining) {
quint8 header[HEADER_SIZE];
qint32 r = stream->recvData(header, HEADER_SIZE);
if (r == -1) {
return errno ? AVERROR(errno) : AVERROR_EOF;
}
if (r == 0) {
return AVERROR_EOF;
}
// no partial read (net_recv_all())
if (r != HEADER_SIZE) {
return AVERROR(ENOMEM);
}
uint64_t pts = bufferRead64be(header);
state->remaining = bufferRead32be(&header[8]);
if (pts != NO_PTS && !receiverStatePushMeta(state, pts)) {
qCritical("Could not store PTS for recording");
// we cannot save the PTS, the recording would be broken
return AVERROR(ENOMEM);
}
}
Q_ASSERT(state->remaining);
if (bufSize > state->remaining) {
bufSize = state->remaining;
}
qint32 r = stream->recvData(buf, bufSize);
if (r == -1) {
return errno ? AVERROR(errno) : AVERROR_EOF;
}
if (r == 0) {
return AVERROR_EOF;
}
Q_ASSERT(state->remaining >= r);
state->remaining -= r;
return r;
}
static qint32 readRawPacket(void *opaque, quint8 *buf, qint32 bufSize) {
Stream *stream = (Stream*)opaque;
if (stream) {
qint32 len = stream->recvData(buf, bufSize);
if (len == -1) {
return AVERROR(errno);
}
if (len == 0) {
return AVERROR_EOF;
}
return len;
}
return AVERROR_EOF;
}
void Stream::setVideoSocket(VideoSocket* videoSocket)
{
m_videoSocket = videoSocket;
@ -227,71 +110,23 @@ bool Stream::startDecode()
if (!m_videoSocket) {
return false;
}
m_quit = false;
start();
return true;
}
void Stream::stopDecode()
{
m_quit = true;
if (m_decoder) {
m_decoder->interrupt();
}
wait();
}
Stream::ReceiverState *Stream::getReceiverState()
{
return &m_receiverState;
}
void Stream::run()
{
unsigned char *decoderBuffer = Q_NULLPTR;
AVIOContext *avioCtx = Q_NULLPTR;
AVFormatContext *formatCtx = Q_NULLPTR;
AVCodec *codec = Q_NULLPTR;
AVCodecContext *codecCtx = Q_NULLPTR;
ReadPacketFunc readPacket = Q_NULLPTR;
bool isFormatCtxOpen = false;
// decoder buffer
decoderBuffer = (unsigned char*)av_malloc(BUFSIZE);
if (!decoderBuffer) {
qCritical("Could not allocate buffer");
goto runQuit;
}
// initialize the receiver state
m_receiverState.frameMetaQueue = Q_NULLPTR;
m_receiverState.remaining = 0;
// if recording is enabled, a "header" is sent between raw packets
readPacket = m_recorder ? readPacketWithMeta: readRawPacket;
// io context
avioCtx = avio_alloc_context(decoderBuffer, BUFSIZE, 0, this, readPacket, NULL, NULL);
if (!avioCtx) {
qCritical("Could not allocate avio context");
// avformat_open_input takes ownership of 'decoderBuffer'
// so only free the buffer before avformat_open_input()
av_free(decoderBuffer);
goto runQuit;
}
// format context
formatCtx = avformat_alloc_context();
if (!formatCtx) {
qCritical("Could not allocate format context");
goto runQuit;
}
formatCtx->pb = avioCtx;
if (avformat_open_input(&formatCtx, NULL, NULL, NULL) < 0) {
qCritical("Could not open video stream");
goto runQuit;
}
isFormatCtxOpen = true;
m_codecCtx = Q_NULLPTR;
m_parser = Q_NULLPTR;
// codec
codec = avcodec_find_decoder(AV_CODEC_ID_H264);
@ -300,6 +135,13 @@ void Stream::run()
goto runQuit;
}
// codeCtx
m_codecCtx = avcodec_alloc_context3(codec);
if (!m_codecCtx) {
qCritical("Could not allocate codec context");
goto runQuit;
}
if (m_decoder && !m_decoder->open(codec)) {
qCritical("Could not open m_decoder");
goto runQuit;
@ -310,66 +152,203 @@ void Stream::run()
goto runQuit;
}
AVPacket packet;
av_init_packet(&packet);
packet.data = Q_NULLPTR;
packet.size = 0;
m_parser = av_parser_init(AV_CODEC_ID_H264);
if (!m_parser) {
qCritical("Could not initialize parser");
goto runQuit;
}
while (!av_read_frame(formatCtx, &packet)) {
if (m_quit) {
// if the stream is stopped, the socket had been shutdown, so the
// last packet is probably corrupted (but not detected as such by
// FFmpeg) and will not be decoded correctly
av_packet_unref(&packet);
goto runQuit;
}
if (m_decoder && !m_decoder->push(&packet)) {
av_packet_unref(&packet);
goto runQuit;
}
if (m_recorder) {
// we retrieve the PTS in order they were received, so they will
// be assigned to the correct frame
quint64 pts = receiverStateTakeMeta(&m_receiverState);
packet.pts = pts;
packet.dts = pts;
// no need to rescale with av_packet_rescale_ts(), the timestamps
// are in microseconds both in input and output
if (!m_recorder->write(&packet)) {
qCritical("Could not write frame to output file");
av_packet_unref(&packet);
goto runQuit;
}
// We must only pass complete frames to av_parser_parse2()!
// It's more complicated, but this allows to reduce the latency by 1 frame!
m_parser->flags |= PARSER_FLAG_COMPLETE_FRAMES;
for (;;) {
AVPacket packet;
bool ok = recvPacket(&packet);
if (!ok) {
// end of stream
break;
}
ok = pushPacket(&packet);
av_packet_unref(&packet);
if (avioCtx->eof_reached) {
if (!ok) {
// cannot process packet (error already logged)
break;
}
}
qDebug() << "End of frames";
qDebug("End of frames");
if (m_hasPending) {
av_packet_unref(&m_pending);
}
av_parser_close(m_parser);
runQuit:
if (m_recorder) {
m_recorder->close();
}
if (avioCtx) {
av_free(avioCtx->buffer);
av_freep(&avioCtx);
}
if (formatCtx && isFormatCtxOpen) {
avformat_close_input(&formatCtx);
}
if (formatCtx) {
avformat_free_context(formatCtx);
}
if (m_decoder) {
m_decoder->close();
}
if (codecCtx) {
avcodec_free_context(&codecCtx);
if (m_codecCtx) {
avcodec_free_context(&m_codecCtx);
}
emit onStreamStop();
}
bool Stream::recvPacket(AVPacket *packet)
{
// The video stream contains raw packets, without time information. When we
// record, we retrieve the timestamps separately, from a "meta" header
// added by the server before each raw packet.
//
// The "meta" header length is 12 bytes:
// [. . . . . . . .|. . . .]. . . . . . . . . . . . . . . ...
// <-------------> <-----> <-----------------------------...
// PTS packet raw packet
// size
//
// It is followed by <packet_size> bytes containing the packet/frame.
quint8 header[HEADER_SIZE];
qint32 r = recvData(header, HEADER_SIZE);
if (r < HEADER_SIZE) {
return false;
}
quint64 pts = bufferRead64be(header);
quint32 len = bufferRead32be(&header[8]);
assert(pts == NO_PTS || (pts & 0x8000000000000000) == 0);
assert(len);
if (av_new_packet(packet, len)) {
qCritical("Could not allocate packet");
return false;
}
r = recvData(packet->data, len);
if (r < 0 || ((uint32_t) r) < len) {
av_packet_unref(packet);
return false;
}
packet->pts = pts != NO_PTS ? (int64_t) pts : AV_NOPTS_VALUE;
return true;
}
bool Stream::pushPacket(AVPacket *packet)
{
bool isConfig = packet->pts == AV_NOPTS_VALUE;
// A config packet must not be decoded immetiately (it contains no
// frame); instead, it must be concatenated with the future data packet.
if (m_hasPending || isConfig) {
qint32 offset;
if (m_hasPending) {
offset = m_pending.size;
if (av_grow_packet(&m_pending, packet->size)) {
qCritical("Could not grow packet");
return false;
}
} else {
offset = 0;
if (av_new_packet(&m_pending, packet->size)) {
qCritical("Could not create packet");
return false;
}
m_hasPending = true;
}
memcpy(m_pending.data + offset, packet->data, packet->size);
if (!isConfig) {
// prepare the concat packet to send to the decoder
m_pending.pts = packet->pts;
m_pending.dts = packet->dts;
m_pending.flags = packet->flags;
packet = &m_pending;
}
}
if (isConfig) {
// config packet
bool ok = processConfigPacket(packet);
if (!ok) {
return false;
}
} else {
// data packet
bool ok = parse(packet);
if (m_hasPending) {
// the pending packet must be discarded (consumed or error)
m_hasPending = false;
av_packet_unref(&m_pending);
}
if (!ok) {
return false;
}
}
return true;
}
bool Stream::processConfigPacket(AVPacket *packet)
{
if (m_recorder && !m_recorder->write(packet)) {
qCritical("Could not send config packet to recorder");
return false;
}
return true;
}
bool Stream::parse(AVPacket *packet)
{
quint8 *inData = packet->data;
int inLen = packet->size;
quint8 *outData = Q_NULLPTR;
int outLen = 0;
int r = av_parser_parse2(m_parser, m_codecCtx,
&outData, &outLen, inData, inLen,
AV_NOPTS_VALUE, AV_NOPTS_VALUE, -1);
// PARSER_FLAG_COMPLETE_FRAMES is set
assert(r == inLen);
(void) r;
assert(outLen == inLen);
if (m_parser->key_frame == 1) {
packet->flags |= AV_PKT_FLAG_KEY;
}
bool ok = processFrame(packet);
if (!ok) {
qCritical("Could not process frame");
return false;
}
return true;
}
bool Stream::processFrame(AVPacket *packet)
{
if (m_decoder && !m_decoder->push(packet)) {
return false;
}
if (m_recorder) {
packet->dts = packet->pts;
if (!m_recorder->write(packet)) {
qCritical("Could not send packet to recorder");
return false;
}
}
return true;
}

View file

@ -18,17 +18,6 @@ class Stream : public QThread
{
Q_OBJECT
public:
typedef struct FrameMeta {
quint64 pts;
struct FrameMeta* next;
} FrameMeta;
typedef struct ReceiverState {
// meta (in order) for frames not consumed yet
FrameMeta* frameMetaQueue;
qint32 remaining; // remaining bytes to receive for the current frame
} ReceiverState;
Stream(QObject *parent = Q_NULLPTR);
virtual ~Stream();
@ -36,28 +25,36 @@ public:
static bool init();
static void deInit();
void setDecoder(Decoder* vb);
void setVideoSocket(VideoSocket* deviceSocket);
void setDecoder(Decoder* decoder);
void setRecoder(Recorder* recorder);
void setVideoSocket(VideoSocket* deviceSocket);
qint32 recvData(quint8* buf, qint32 bufSize);
bool startDecode();
void stopDecode();
ReceiverState* getReceiverState();
signals:
void onStreamStop();
protected:
void run();
bool recvPacket(AVPacket* packet);
bool pushPacket(AVPacket* packet);
bool processConfigPacket(AVPacket *packet);
bool parse(AVPacket *packet);
bool processFrame(AVPacket *packet);
private:
QPointer<VideoSocket> m_videoSocket;
std::atomic_bool m_quit;
// for recorder
Recorder* m_recorder = Q_NULLPTR;
ReceiverState m_receiverState;
Decoder* m_decoder = Q_NULLPTR;
AVCodecContext* m_codecCtx = Q_NULLPTR;
AVCodecParserContext *m_parser = Q_NULLPTR;
// successive packets may need to be concatenated, until a non-config
// packet is available
bool m_hasPending = false;
AVPacket m_pending;
};
#endif // STREAM_H