diff --git a/QtScrcpy/device/stream/stream.cpp b/QtScrcpy/device/stream/stream.cpp index 648aef2..03123d9 100644 --- a/QtScrcpy/device/stream/stream.cpp +++ b/QtScrcpy/device/stream/stream.cpp @@ -16,12 +16,10 @@ typedef qint32 (*ReadPacketFunc)(void*, quint8*, qint32); Stream::Stream(QObject *parent) : QThread(parent) { - m_quit = false; } Stream::~Stream() { - } static void avLogCallback(void *avcl, int level, const char *fmt, va_list vl) { @@ -85,121 +83,6 @@ static quint64 bufferRead64be(quint8* buf) { return ((quint64) msb << 32) | lsb; } -static Stream::FrameMeta* frameMetaNew(quint64 pts) { - Stream::FrameMeta* meta = new Stream::FrameMeta; - if (!meta) { - return meta; - } - meta->pts = pts; - meta->next = Q_NULLPTR; - return meta; -} - -static void frameMetaDelete(Stream::FrameMeta* frameMeta) { - if (frameMeta) { - delete frameMeta; - } -} - -static bool receiverStatePushMeta(Stream::ReceiverState* state, quint64 pts) { - Stream::FrameMeta* frameMeta = frameMetaNew(pts); - if (!frameMeta) { - return false; - } - - // append to the list - // (iterate to find the last item, in practice the list should be tiny) - Stream::FrameMeta **p = &state->frameMetaQueue; - while (*p) { - p = &(*p)->next; - } - *p = frameMeta; - return true; -} - -static quint64 receiverStateTakeMeta(Stream::ReceiverState* state) { - Stream::FrameMeta *frameMeta = state->frameMetaQueue; // first item - Q_ASSERT(frameMeta); // must not be empty - quint64 pts = frameMeta->pts; - state->frameMetaQueue = frameMeta->next; // remove the item - frameMetaDelete(frameMeta); - return pts; -} - -static qint32 readPacketWithMeta(void *opaque, uint8_t *buf, int bufSize) { - Stream* stream = (Stream*)opaque; - Stream::ReceiverState* state = stream->getReceiverState(); - - // The video stream contains raw packets, without time information. When we - // record, we retrieve the timestamps separately, from a "meta" header - // added by the server before each raw packet. - // - // The "meta" header length is 12 bytes: - // [. . . . . . . .|. . . .]. . . . . . . . . . . . . . . ... - // <-------------> <-----> <-----------------------------... - // PTS packet raw packet - // size - // - // It is followed by bytes containing the packet/frame. - - if (!state->remaining) { - quint8 header[HEADER_SIZE]; - qint32 r = stream->recvData(header, HEADER_SIZE); - if (r == -1) { - return errno ? AVERROR(errno) : AVERROR_EOF; - } - if (r == 0) { - return AVERROR_EOF; - } - // no partial read (net_recv_all()) - if (r != HEADER_SIZE) { - return AVERROR(ENOMEM); - } - - uint64_t pts = bufferRead64be(header); - state->remaining = bufferRead32be(&header[8]); - - if (pts != NO_PTS && !receiverStatePushMeta(state, pts)) { - qCritical("Could not store PTS for recording"); - // we cannot save the PTS, the recording would be broken - return AVERROR(ENOMEM); - } - } - - Q_ASSERT(state->remaining); - - if (bufSize > state->remaining) { - bufSize = state->remaining; - } - - qint32 r = stream->recvData(buf, bufSize); - if (r == -1) { - return errno ? AVERROR(errno) : AVERROR_EOF; - } - if (r == 0) { - return AVERROR_EOF; - } - - Q_ASSERT(state->remaining >= r); - state->remaining -= r; - return r; -} - -static qint32 readRawPacket(void *opaque, quint8 *buf, qint32 bufSize) { - Stream *stream = (Stream*)opaque; - if (stream) { - qint32 len = stream->recvData(buf, bufSize); - if (len == -1) { - return AVERROR(errno); - } - if (len == 0) { - return AVERROR_EOF; - } - return len; - } - return AVERROR_EOF; -} - void Stream::setVideoSocket(VideoSocket* videoSocket) { m_videoSocket = videoSocket; @@ -227,71 +110,23 @@ bool Stream::startDecode() if (!m_videoSocket) { return false; } - m_quit = false; start(); return true; } void Stream::stopDecode() { - m_quit = true; if (m_decoder) { m_decoder->interrupt(); } wait(); } -Stream::ReceiverState *Stream::getReceiverState() -{ - return &m_receiverState; -} - void Stream::run() { - unsigned char *decoderBuffer = Q_NULLPTR; - AVIOContext *avioCtx = Q_NULLPTR; - AVFormatContext *formatCtx = Q_NULLPTR; AVCodec *codec = Q_NULLPTR; - AVCodecContext *codecCtx = Q_NULLPTR; - ReadPacketFunc readPacket = Q_NULLPTR; - bool isFormatCtxOpen = false; - - // decoder buffer - decoderBuffer = (unsigned char*)av_malloc(BUFSIZE); - if (!decoderBuffer) { - qCritical("Could not allocate buffer"); - goto runQuit; - } - - // initialize the receiver state - m_receiverState.frameMetaQueue = Q_NULLPTR; - m_receiverState.remaining = 0; - - // if recording is enabled, a "header" is sent between raw packets - readPacket = m_recorder ? readPacketWithMeta: readRawPacket; - - // io context - avioCtx = avio_alloc_context(decoderBuffer, BUFSIZE, 0, this, readPacket, NULL, NULL); - if (!avioCtx) { - qCritical("Could not allocate avio context"); - // avformat_open_input takes ownership of 'decoderBuffer' - // so only free the buffer before avformat_open_input() - av_free(decoderBuffer); - goto runQuit; - } - - // format context - formatCtx = avformat_alloc_context(); - if (!formatCtx) { - qCritical("Could not allocate format context"); - goto runQuit; - } - formatCtx->pb = avioCtx; - if (avformat_open_input(&formatCtx, NULL, NULL, NULL) < 0) { - qCritical("Could not open video stream"); - goto runQuit; - } - isFormatCtxOpen = true; + m_codecCtx = Q_NULLPTR; + m_parser = Q_NULLPTR; // codec codec = avcodec_find_decoder(AV_CODEC_ID_H264); @@ -300,6 +135,13 @@ void Stream::run() goto runQuit; } + // codeCtx + m_codecCtx = avcodec_alloc_context3(codec); + if (!m_codecCtx) { + qCritical("Could not allocate codec context"); + goto runQuit; + } + if (m_decoder && !m_decoder->open(codec)) { qCritical("Could not open m_decoder"); goto runQuit; @@ -310,66 +152,203 @@ void Stream::run() goto runQuit; } - AVPacket packet; - av_init_packet(&packet); - packet.data = Q_NULLPTR; - packet.size = 0; + m_parser = av_parser_init(AV_CODEC_ID_H264); + if (!m_parser) { + qCritical("Could not initialize parser"); + goto runQuit; + } - while (!av_read_frame(formatCtx, &packet)) { - if (m_quit) { - // if the stream is stopped, the socket had been shutdown, so the - // last packet is probably corrupted (but not detected as such by - // FFmpeg) and will not be decoded correctly - av_packet_unref(&packet); - goto runQuit; - } - if (m_decoder && !m_decoder->push(&packet)) { - av_packet_unref(&packet); - goto runQuit; - } - if (m_recorder) { - // we retrieve the PTS in order they were received, so they will - // be assigned to the correct frame - quint64 pts = receiverStateTakeMeta(&m_receiverState); - packet.pts = pts; - packet.dts = pts; - // no need to rescale with av_packet_rescale_ts(), the timestamps - // are in microseconds both in input and output - if (!m_recorder->write(&packet)) { - qCritical("Could not write frame to output file"); - av_packet_unref(&packet); - goto runQuit; - } + // We must only pass complete frames to av_parser_parse2()! + // It's more complicated, but this allows to reduce the latency by 1 frame! + m_parser->flags |= PARSER_FLAG_COMPLETE_FRAMES; + + for (;;) { + AVPacket packet; + bool ok = recvPacket(&packet); + if (!ok) { + // end of stream + break; } + ok = pushPacket(&packet); av_packet_unref(&packet); - - if (avioCtx->eof_reached) { + if (!ok) { + // cannot process packet (error already logged) break; } } - qDebug() << "End of frames"; + + qDebug("End of frames"); + + if (m_hasPending) { + av_packet_unref(&m_pending); + } + + av_parser_close(m_parser); runQuit: if (m_recorder) { m_recorder->close(); } - if (avioCtx) { - av_free(avioCtx->buffer); - av_freep(&avioCtx); - } - if (formatCtx && isFormatCtxOpen) { - avformat_close_input(&formatCtx); - } - if (formatCtx) { - avformat_free_context(formatCtx); - } if (m_decoder) { m_decoder->close(); } - if (codecCtx) { - avcodec_free_context(&codecCtx); + if (m_codecCtx) { + avcodec_free_context(&m_codecCtx); } emit onStreamStop(); } + +bool Stream::recvPacket(AVPacket *packet) +{ + // The video stream contains raw packets, without time information. When we + // record, we retrieve the timestamps separately, from a "meta" header + // added by the server before each raw packet. + // + // The "meta" header length is 12 bytes: + // [. . . . . . . .|. . . .]. . . . . . . . . . . . . . . ... + // <-------------> <-----> <-----------------------------... + // PTS packet raw packet + // size + // + // It is followed by bytes containing the packet/frame. + + quint8 header[HEADER_SIZE]; + qint32 r = recvData(header, HEADER_SIZE); + if (r < HEADER_SIZE) { + return false; + } + + quint64 pts = bufferRead64be(header); + quint32 len = bufferRead32be(&header[8]); + assert(pts == NO_PTS || (pts & 0x8000000000000000) == 0); + assert(len); + + if (av_new_packet(packet, len)) { + qCritical("Could not allocate packet"); + return false; + } + + r = recvData(packet->data, len); + if (r < 0 || ((uint32_t) r) < len) { + av_packet_unref(packet); + return false; + } + + packet->pts = pts != NO_PTS ? (int64_t) pts : AV_NOPTS_VALUE; + + return true; +} + +bool Stream::pushPacket(AVPacket *packet) +{ + bool isConfig = packet->pts == AV_NOPTS_VALUE; + + // A config packet must not be decoded immetiately (it contains no + // frame); instead, it must be concatenated with the future data packet. + if (m_hasPending || isConfig) { + qint32 offset; + if (m_hasPending) { + offset = m_pending.size; + if (av_grow_packet(&m_pending, packet->size)) { + qCritical("Could not grow packet"); + return false; + } + } else { + offset = 0; + if (av_new_packet(&m_pending, packet->size)) { + qCritical("Could not create packet"); + return false; + } + m_hasPending = true; + } + + memcpy(m_pending.data + offset, packet->data, packet->size); + + if (!isConfig) { + // prepare the concat packet to send to the decoder + m_pending.pts = packet->pts; + m_pending.dts = packet->dts; + m_pending.flags = packet->flags; + packet = &m_pending; + } + } + + if (isConfig) { + // config packet + bool ok = processConfigPacket(packet); + if (!ok) { + return false; + } + } else { + // data packet + bool ok = parse(packet); + + if (m_hasPending) { + // the pending packet must be discarded (consumed or error) + m_hasPending = false; + av_packet_unref(&m_pending); + } + + if (!ok) { + return false; + } + } + return true; +} + +bool Stream::processConfigPacket(AVPacket *packet) +{ + if (m_recorder && !m_recorder->write(packet)) { + qCritical("Could not send config packet to recorder"); + return false; + } + return true; +} + +bool Stream::parse(AVPacket *packet) +{ + quint8 *inData = packet->data; + int inLen = packet->size; + quint8 *outData = Q_NULLPTR; + int outLen = 0; + int r = av_parser_parse2(m_parser, m_codecCtx, + &outData, &outLen, inData, inLen, + AV_NOPTS_VALUE, AV_NOPTS_VALUE, -1); + + // PARSER_FLAG_COMPLETE_FRAMES is set + assert(r == inLen); + (void) r; + assert(outLen == inLen); + + if (m_parser->key_frame == 1) { + packet->flags |= AV_PKT_FLAG_KEY; + } + + bool ok = processFrame(packet); + if (!ok) { + qCritical("Could not process frame"); + return false; + } + + return true; +} + +bool Stream::processFrame(AVPacket *packet) +{ + if (m_decoder && !m_decoder->push(packet)) { + return false; + } + + if (m_recorder) { + packet->dts = packet->pts; + + if (!m_recorder->write(packet)) { + qCritical("Could not send packet to recorder"); + return false; + } + } + + return true; +} diff --git a/QtScrcpy/device/stream/stream.h b/QtScrcpy/device/stream/stream.h index 0c4605e..c44e63b 100644 --- a/QtScrcpy/device/stream/stream.h +++ b/QtScrcpy/device/stream/stream.h @@ -18,17 +18,6 @@ class Stream : public QThread { Q_OBJECT public: - typedef struct FrameMeta { - quint64 pts; - struct FrameMeta* next; - } FrameMeta; - - typedef struct ReceiverState { - // meta (in order) for frames not consumed yet - FrameMeta* frameMetaQueue; - qint32 remaining; // remaining bytes to receive for the current frame - } ReceiverState; - Stream(QObject *parent = Q_NULLPTR); virtual ~Stream(); @@ -36,28 +25,36 @@ public: static bool init(); static void deInit(); - void setDecoder(Decoder* vb); - void setVideoSocket(VideoSocket* deviceSocket); + void setDecoder(Decoder* decoder); void setRecoder(Recorder* recorder); + void setVideoSocket(VideoSocket* deviceSocket); qint32 recvData(quint8* buf, qint32 bufSize); bool startDecode(); void stopDecode(); - ReceiverState* getReceiverState(); signals: void onStreamStop(); protected: void run(); + bool recvPacket(AVPacket* packet); + bool pushPacket(AVPacket* packet); + bool processConfigPacket(AVPacket *packet); + bool parse(AVPacket *packet); + bool processFrame(AVPacket *packet); private: QPointer m_videoSocket; - std::atomic_bool m_quit; - // for recorder Recorder* m_recorder = Q_NULLPTR; - ReceiverState m_receiverState; Decoder* m_decoder = Q_NULLPTR; + + AVCodecContext* m_codecCtx = Q_NULLPTR; + AVCodecParserContext *m_parser = Q_NULLPTR; + // successive packets may need to be concatenated, until a non-config + // packet is available + bool m_hasPending = false; + AVPacket m_pending; }; #endif // STREAM_H