LibIPC: Send IPC messages on a secondary thread

To prevent deadlocks when both IPC peers are trying to send to each
other but both sides have too much in their buffer already, we now
move the send operation to a secondary thread where it can block until
the peer is able to handle it.
This commit is contained in:
Andreas Kling 2024-09-06 06:47:26 +02:00 committed by Andreas Kling
commit 4bc5d6a681
Notes: github-actions[bot] 2024-09-19 05:38:57 +00:00
3 changed files with 53 additions and 7 deletions

View file

@ -6,4 +6,4 @@ set(SOURCES
) )
serenity_lib(LibIPC ipc) serenity_lib(LibIPC ipc)
target_link_libraries(LibIPC PRIVATE LibCore LibURL) target_link_libraries(LibIPC PRIVATE LibCore LibURL LibThreading)

View file

@ -1,5 +1,5 @@
/* /*
* Copyright (c) 2021, Andreas Kling <kling@serenityos.org> * Copyright (c) 2021-2024, Andreas Kling <andreas@ladybird.org>
* Copyright (c) 2022, the SerenityOS developers. * Copyright (c) 2022, the SerenityOS developers.
* *
* SPDX-License-Identifier: BSD-2-Clause * SPDX-License-Identifier: BSD-2-Clause
@ -26,9 +26,41 @@ ConnectionBase::ConnectionBase(IPC::Stub& local_stub, NonnullOwnPtr<Core::LocalS
(void)drain_messages_from_peer(); (void)drain_messages_from_peer();
handle_messages(); handle_messages();
}; };
m_send_queue = adopt_ref(*new SendQueue);
m_send_thread = Threading::Thread::construct([this, queue = m_send_queue]() -> intptr_t {
for (;;) {
queue->mutex.lock();
if (queue->messages.is_empty())
queue->condition.wait();
if (!queue->running) {
queue->mutex.unlock();
break;
}
auto message = queue->messages.take_first();
queue->mutex.unlock();
if (auto result = message.transfer_message(*m_socket); result.is_error()) {
dbgln("ConnectionBase::send_thread: {}", result.error());
continue;
}
}
return 0;
});
m_send_thread->start();
} }
ConnectionBase::~ConnectionBase() = default; ConnectionBase::~ConnectionBase()
{
{
Threading::MutexLocker locker(m_send_queue->mutex);
m_send_queue->running = false;
m_send_queue->condition.signal();
}
m_send_thread->detach();
}
bool ConnectionBase::is_open() const bool ConnectionBase::is_open() const
{ {
@ -47,9 +79,10 @@ ErrorOr<void> ConnectionBase::post_message(MessageBuffer buffer)
if (!m_socket->is_open()) if (!m_socket->is_open())
return Error::from_string_literal("Trying to post_message during IPC shutdown"); return Error::from_string_literal("Trying to post_message during IPC shutdown");
if (auto result = buffer.transfer_message(*m_socket); result.is_error()) { {
shutdown_with_error(result.error()); Threading::MutexLocker locker(m_send_queue->mutex);
return result.release_error(); m_send_queue->messages.append(move(buffer));
m_send_queue->condition.signal();
} }
m_responsiveness_timer->start(); m_responsiveness_timer->start();

View file

@ -1,5 +1,5 @@
/* /*
* Copyright (c) 2018-2020, Andreas Kling <kling@serenityos.org> * Copyright (c) 2018-2024, Andreas Kling <andreas@ladybird.org>
* Copyright (c) 2022, the SerenityOS developers. * Copyright (c) 2022, the SerenityOS developers.
* *
* SPDX-License-Identifier: BSD-2-Clause * SPDX-License-Identifier: BSD-2-Clause
@ -12,6 +12,9 @@
#include <LibCore/EventReceiver.h> #include <LibCore/EventReceiver.h>
#include <LibIPC/File.h> #include <LibIPC/File.h>
#include <LibIPC/Forward.h> #include <LibIPC/Forward.h>
#include <LibThreading/ConditionVariable.h>
#include <LibThreading/MutexProtected.h>
#include <LibThreading/Thread.h>
namespace IPC { namespace IPC {
@ -57,6 +60,16 @@ protected:
ByteBuffer m_unprocessed_bytes; ByteBuffer m_unprocessed_bytes;
u32 m_local_endpoint_magic { 0 }; u32 m_local_endpoint_magic { 0 };
struct SendQueue : public AtomicRefCounted<SendQueue> {
AK::SinglyLinkedList<MessageBuffer> messages;
Threading::Mutex mutex;
Threading::ConditionVariable condition { mutex };
bool running { true };
};
RefPtr<Threading::Thread> m_send_thread;
RefPtr<SendQueue> m_send_queue;
}; };
template<typename LocalEndpoint, typename PeerEndpoint> template<typename LocalEndpoint, typename PeerEndpoint>