feat: record asynchronously

The record file was written from the stream thread. As a consequence,
any blocking I/O to write the file delayed the decoder.

For maximum performance even when recording is enabled, send
(refcounted) packets to a separate recording thread.
This commit is contained in:
rankun 2020-01-15 14:07:59 +08:00
parent 65a7cb7467
commit c79a93c429
7 changed files with 189 additions and 16 deletions

View file

@ -6,11 +6,12 @@
static const AVRational SCRCPY_TIME_BASE = {1, 1000000}; // timestamps in us
Recorder::Recorder(const QString& fileName)
: m_fileName(fileName)
Recorder::Recorder(const QString& fileName, QObject* parent)
: QThread(parent)
, m_fileName(fileName)
, m_format(guessRecordFormat(fileName))
{
queueInit(&m_queue);
}
Recorder::~Recorder()
@ -18,6 +19,75 @@ Recorder::~Recorder()
}
Recorder::RecordPacket* Recorder::packetNew(const AVPacket *packet) {
RecordPacket* rec = new RecordPacket;
if (!rec) {
return Q_NULLPTR;
}
if (av_packet_ref(&rec->packet, packet)) {
delete rec;
return Q_NULLPTR;
}
rec->next = Q_NULLPTR;
return rec;
}
void Recorder::packetDelete(Recorder::RecordPacket *rec) {
av_packet_unref(&rec->packet);
delete rec;
}
void Recorder::queueInit(Recorder::RecorderQueue *queue) {
queue->first = Q_NULLPTR;
// queue->last is undefined if queue->first == NULL
}
bool Recorder::queueIsEmpty(Recorder::RecorderQueue *queue) {
return !queue->first;
}
bool Recorder::queuePush(Recorder::RecorderQueue *queue, const AVPacket *packet) {
RecordPacket *rec = packetNew(packet);
if (!rec) {
qCritical("Could not allocate record packet");
return false;
}
rec->next = Q_NULLPTR;
if (queueIsEmpty(queue)) {
queue->first = queue->last = rec;
} else {
// chain rec after the (current) last packet
queue->last->next = rec;
// the last packet is now rec
queue->last = rec;
}
return true;
}
Recorder::RecordPacket* Recorder::queueTake(Recorder::RecorderQueue *queue) {
assert(!queueIsEmpty(queue));
RecordPacket *rec = queue->first;
assert(rec);
queue->first = rec->next;
// no need to update queue->last if the queue is left empty:
// queue->last is undefined if queue->first == NULL
return rec;
}
void Recorder::queueClear(Recorder::RecorderQueue *queue) {
RecordPacket *rec = queue->first;
while (rec) {
RecordPacket *current = rec;
rec = rec->next;
packetDelete(current);
}
queue->first = Q_NULLPTR;
}
void Recorder::setFrameSize(const QSize &declaredFrameSize)
{
m_declaredFrameSize = declaredFrameSize;
@ -201,3 +271,62 @@ Recorder::RecorderFormat Recorder::guessRecordFormat(const QString &fileName)
return Recorder::RECORDER_FORMAT_NULL;
}
void Recorder::run() {
for (;;) {
QMutexLocker locker(&m_mutex);
while (!m_stopped && queueIsEmpty(&m_queue)) {
m_recvDataCond.wait(&m_mutex);
}
// if stopped is set, continue to process the remaining events (to
// finish the recording) before actually stopping
if (m_stopped && queueIsEmpty(&m_queue)) {
break;
}
RecordPacket *rec = queueTake(&m_queue);
//mutex_unlock(recorder->mutex);
bool ok = write(&rec->packet);
packetDelete(rec);
if (!ok) {
qCritical("Could not record packet");
QMutexLocker locker(&m_mutex);
m_failed = true;
// discard pending packets
queueClear(&m_queue);
break;
}
}
qDebug("Recorder thread ended");
}
bool Recorder::startRecorder() {
start();
return true;
}
void Recorder::stopRecorder() {
QMutexLocker locker(&m_mutex);
m_stopped = true;
m_recvDataCond.wakeOne();
}
bool Recorder::push(const AVPacket *packet) {
QMutexLocker locker(&m_mutex);
assert(!m_stopped);
if (m_failed) {
// reject any new packet (this will stop the stream)
return false;
}
bool ok = queuePush(&m_queue, packet);
m_recvDataCond.wakeOne();
return ok;
}

View file

@ -2,14 +2,18 @@
#define RECORDER_H
#include <QString>
#include <QSize>
#include <QThread>
#include <QMutex>
#include <QWaitCondition>
extern "C"
{
#include "libavformat/avformat.h"
}
class Recorder
class Recorder : public QThread
{
Q_OBJECT
public:
enum RecorderFormat {
RECORDER_FORMAT_NULL = 0,
@ -17,7 +21,7 @@ public:
RECORDER_FORMAT_MKV,
};
Recorder(const QString& fileName);
Recorder(const QString& fileName, QObject *parent = Q_NULLPTR);
virtual ~Recorder();
void setFrameSize(const QSize& declaredFrameSize);
@ -25,6 +29,9 @@ public:
bool open(const AVCodec* inputCodec);
void close();
bool write(AVPacket* packet);
bool startRecorder();
void stopRecorder();
bool push(const AVPacket *packet);
private:
const AVOutputFormat* findMuxer(const char* name);
@ -33,12 +40,39 @@ private:
QString recorderGetFormatName(Recorder::RecorderFormat format);
RecorderFormat guessRecordFormat(const QString& fileName);
private:
struct RecordPacket {
AVPacket packet;
RecordPacket *next;
};
struct RecorderQueue {
RecordPacket *first = Q_NULLPTR;
RecordPacket *last = Q_NULLPTR; // undefined if first is NULL
};
Recorder::RecordPacket* packetNew(const AVPacket *packet);
void packetDelete(Recorder::RecordPacket *rec);
void queueInit(Recorder::RecorderQueue *queue);
bool queueIsEmpty(Recorder::RecorderQueue *queue);
bool queuePush(Recorder::RecorderQueue *queue, const AVPacket *packet);
Recorder::RecordPacket* queueTake(Recorder::RecorderQueue *queue);
void queueClear(Recorder::RecorderQueue *queue);
protected:
void run();
private:
QString m_fileName = "";
AVFormatContext* m_formatCtx = Q_NULLPTR;
QSize m_declaredFrameSize;
bool m_headerWritten = false;
RecorderFormat m_format = RECORDER_FORMAT_NULL;
QMutex m_mutex;
QWaitCondition m_recvDataCond;
bool m_stopped = false; // set on recorder_stop() by the stream reader
bool m_failed = false; // set on packet write failure
RecorderQueue m_queue;
};
#endif // RECORDER_H

View file

@ -147,10 +147,17 @@ void Stream::run()
goto runQuit;
}
if (m_recorder && !m_recorder->open(codec)) {
qCritical("Could not open recorder");
goto runQuit;
}
if (m_recorder) {
if (!m_recorder->open(codec)) {
qCritical("Could not open recorder");
goto runQuit;
}
if (!m_recorder->startRecorder()) {
qCritical("Could not start recorder");
goto runQuit;
}
}
m_parser = av_parser_init(AV_CODEC_ID_H264);
if (!m_parser) {
@ -188,6 +195,10 @@ void Stream::run()
runQuit:
if (m_recorder) {
if (m_recorder->isRunning()) {
m_recorder->stopRecorder();
m_recorder->wait();
}
m_recorder->close();
}
if (m_decoder) {
@ -300,7 +311,7 @@ bool Stream::pushPacket(AVPacket *packet)
bool Stream::processConfigPacket(AVPacket *packet)
{
if (m_recorder && !m_recorder->write(packet)) {
if (m_recorder && !m_recorder->push(packet)) {
qCritical("Could not send config packet to recorder");
return false;
}
@ -344,7 +355,7 @@ bool Stream::processFrame(AVPacket *packet)
if (m_recorder) {
packet->dts = packet->pts;
if (!m_recorder->write(packet)) {
if (!m_recorder->push(packet)) {
qCritical("Could not send packet to recorder");
return false;
}

View file

@ -3,7 +3,6 @@
#include <QThread>
#include <QPointer>
#include <QMutex>
extern "C"
{

View file

@ -53,7 +53,7 @@ bool DeviceManage::disconnectDevice(const QString &serial)
if (!serial.isEmpty() && m_devices.contains(serial)) {
auto it = m_devices.find(serial);
if (it->data()) {
it->data()->deleteLater();
delete it->data();
ret = true;
}
}
@ -66,7 +66,7 @@ void DeviceManage::disconnectAllDevice()
while (i.hasNext()) {
i.next();
if (i.value()) {
i.value()->deleteLater();
delete i.value();
}
}
}

View file

@ -64,7 +64,7 @@ Dialog::Dialog(QWidget *parent) :
Dialog::~Dialog()
{
on_stopServerBtn_clicked();
m_deviceManage.disconnectAllDevice();
delete ui;
}

View file

@ -94,7 +94,7 @@ void myMessageOutput(QtMsgType type, const QMessageLogContext &context, const QS
}
if (QtDebugMsg < type) {
if (g_mainDlg && !msg.contains("app_proces")) {
if (g_mainDlg && g_mainDlg->isVisible() && !msg.contains("app_proces")) {
g_mainDlg->outLog(msg);
}
}