LibIPC: Move send thread from IPC connection to the transport layer

By doing this we also make MessagePort, that relies on IPC transport,
to send messages from separate thread, which solves the problem when
WebWorker and WebContent could deadlock if both were trying to post
messages at the same time.

Fixes https://github.com/LadybirdBrowser/ladybird/issues/4254
This commit is contained in:
Aliaksandr Kalenik 2025-04-08 04:55:50 +02:00
parent 74cedf0077
commit 09118ac43d
6 changed files with 118 additions and 111 deletions

View file

@ -29,41 +29,9 @@ ConnectionBase::ConnectionBase(IPC::Stub& local_stub, Transport transport, u32 l
(void)drain_messages_from_peer();
handle_messages();
});
m_send_queue = adopt_ref(*new SendQueue);
m_send_thread = Threading::Thread::construct([this, send_queue = m_send_queue]() -> intptr_t {
for (;;) {
send_queue->mutex.lock();
while (send_queue->messages.is_empty() && send_queue->running)
send_queue->condition.wait();
if (!send_queue->running) {
send_queue->mutex.unlock();
break;
}
auto message_buffer = send_queue->messages.take_first();
send_queue->mutex.unlock();
if (auto result = message_buffer.transfer_message(m_transport); result.is_error()) {
dbgln("ConnectionBase::send_thread: {}", result.error());
continue;
}
}
return 0;
});
m_send_thread->start();
}
ConnectionBase::~ConnectionBase()
{
{
Threading::MutexLocker locker(m_send_queue->mutex);
m_send_queue->running = false;
m_send_queue->condition.signal();
}
(void)m_send_thread->join();
}
ConnectionBase::~ConnectionBase() = default;
bool ConnectionBase::is_open() const
{
@ -87,11 +55,7 @@ ErrorOr<void> ConnectionBase::post_message(u32 endpoint_magic, MessageBuffer buf
buffer = MUST(wrapper->encode());
}
{
Threading::MutexLocker locker(m_send_queue->mutex);
m_send_queue->messages.append(move(buffer));
m_send_queue->condition.signal();
}
MUST(buffer.transfer_message(m_transport));
m_responsiveness_timer->start();
return {};

View file

@ -60,16 +60,6 @@ protected:
Vector<NonnullOwnPtr<Message>> m_unprocessed_messages;
u32 m_local_endpoint_magic { 0 };
struct SendQueue : public AtomicRefCounted<SendQueue> {
AK::SinglyLinkedList<MessageBuffer> messages;
Threading::Mutex mutex;
Threading::ConditionVariable condition { mutex };
bool running { true };
};
RefPtr<Threading::Thread> m_send_thread;
RefPtr<SendQueue> m_send_queue;
};
template<typename LocalEndpoint, typename PeerEndpoint>

View file

@ -43,16 +43,7 @@ ErrorOr<void> MessageBuffer::transfer_message(Transport& transport)
return Error::from_string_literal("Message is too large for IPC encoding");
}
auto raw_fds = Vector<int, 1> {};
auto num_fds_to_transfer = m_fds.size();
if (num_fds_to_transfer > 0) {
raw_fds.ensure_capacity(num_fds_to_transfer);
for (auto& owned_fd : m_fds) {
raw_fds.unchecked_append(owned_fd->value());
}
}
TRY(transport.transfer_message(m_data.span(), raw_fds));
transport.post_message(m_data, m_fds);
return {};
}

View file

@ -19,32 +19,6 @@
namespace IPC {
class AutoCloseFileDescriptor : public RefCounted<AutoCloseFileDescriptor> {
public:
AutoCloseFileDescriptor(int fd)
: m_fd(fd)
{
}
~AutoCloseFileDescriptor()
{
if (m_fd != -1)
(void)Core::System::close(m_fd);
}
int value() const { return m_fd; }
int take_fd()
{
int fd = m_fd;
m_fd = -1;
return fd;
}
private:
int m_fd;
};
class MessageBuffer {
public:
MessageBuffer();

View file

@ -15,14 +15,46 @@ namespace IPC {
TransportSocket::TransportSocket(NonnullOwnPtr<Core::LocalSocket> socket)
: m_socket(move(socket))
, m_socket_write_mutex(make<Threading::Mutex>())
, m_fds_retained_until_received_by_peer(make<Threading::MutexProtected<Queue<File>>>())
, m_fds_retained_until_received_by_peer(make<Queue<NonnullRefPtr<AutoCloseFileDescriptor>>>())
{
m_send_queue = adopt_ref(*new SendQueue);
m_send_thread = Threading::Thread::construct([&socket = *m_socket, send_queue = m_send_queue]() -> intptr_t {
for (;;) {
send_queue->mutex.lock();
while (send_queue->messages.is_empty() && send_queue->running)
send_queue->condition.wait();
if (!send_queue->running) {
send_queue->mutex.unlock();
break;
}
auto [bytes, fds] = send_queue->messages.take_first();
send_queue->mutex.unlock();
if (auto result = send_message(socket, bytes, fds); result.is_error()) {
dbgln("TransportSocket::send_thread: {}", result.error());
}
}
return 0;
});
m_send_thread->start();
(void)Core::System::setsockopt(m_socket->fd().value(), SOL_SOCKET, SO_SNDBUF, &SOCKET_BUFFER_SIZE, sizeof(SOCKET_BUFFER_SIZE));
(void)Core::System::setsockopt(m_socket->fd().value(), SOL_SOCKET, SO_RCVBUF, &SOCKET_BUFFER_SIZE, sizeof(SOCKET_BUFFER_SIZE));
}
TransportSocket::~TransportSocket() = default;
TransportSocket::~TransportSocket()
{
if (m_send_thread) {
{
Threading::MutexLocker locker(m_send_queue->mutex);
m_send_queue->running = false;
m_send_queue->condition.signal();
}
(void)m_send_thread->join();
}
}
void TransportSocket::set_up_read_hook(Function<void()> hook)
{
@ -62,37 +94,50 @@ struct MessageHeader {
u32 fd_count { 0 };
};
ErrorOr<void> TransportSocket::transfer_message(ReadonlyBytes bytes_to_write, Vector<int, 1> const& unowned_fds)
void TransportSocket::post_message(Vector<u8> const& bytes_to_write, Vector<NonnullRefPtr<AutoCloseFileDescriptor>> const& fds) const
{
Vector<u8> message_buffer;
message_buffer.resize(sizeof(MessageHeader) + bytes_to_write.size());
MessageHeader header;
header.payload_size = bytes_to_write.size();
header.fd_count = unowned_fds.size();
header.fd_count = fds.size();
header.type = MessageHeader::Type::Payload;
memcpy(message_buffer.data(), &header, sizeof(MessageHeader));
memcpy(message_buffer.data() + sizeof(MessageHeader), bytes_to_write.data(), bytes_to_write.size());
return transfer(message_buffer.span(), unowned_fds);
for (auto const& fd : fds)
m_fds_retained_until_received_by_peer->enqueue(fd);
auto raw_fds = Vector<int, 1> {};
auto num_fds_to_transfer = fds.size();
if (num_fds_to_transfer > 0) {
raw_fds.ensure_capacity(num_fds_to_transfer);
for (auto& owned_fd : fds) {
raw_fds.unchecked_append(owned_fd->value());
}
}
queue_message_on_send_thread({ move(message_buffer), move(raw_fds) });
}
ErrorOr<void> TransportSocket::transfer(ReadonlyBytes bytes_to_write, Vector<int, 1> const& unowned_fds)
void TransportSocket::queue_message_on_send_thread(MessageToSend&& message_to_send) const
{
m_fds_retained_until_received_by_peer->with_locked([&unowned_fds](auto& queue) {
for (auto const& fd : unowned_fds)
queue.enqueue(MUST(File::clone_fd(fd)));
});
Threading::MutexLocker lock(*m_socket_write_mutex);
Threading::MutexLocker lock(m_send_queue->mutex);
m_send_queue->messages.append(move(message_to_send));
m_send_queue->condition.signal();
}
ErrorOr<void> TransportSocket::send_message(Core::LocalSocket& socket, ReadonlyBytes&& bytes_to_write, Vector<int, 1> const& unowned_fds)
{
auto num_fds_to_transfer = unowned_fds.size();
while (!bytes_to_write.is_empty()) {
ErrorOr<ssize_t> maybe_nwritten = 0;
if (num_fds_to_transfer > 0) {
maybe_nwritten = m_socket->send_message(bytes_to_write, 0, unowned_fds);
maybe_nwritten = socket.send_message(bytes_to_write, 0, unowned_fds);
if (!maybe_nwritten.is_error())
num_fds_to_transfer = 0;
} else {
maybe_nwritten = m_socket->write_some(bytes_to_write);
maybe_nwritten = socket.write_some(bytes_to_write);
}
if (maybe_nwritten.is_error()) {
@ -102,7 +147,7 @@ ErrorOr<void> TransportSocket::transfer(ReadonlyBytes bytes_to_write, Vector<int
// or next time the socket is writable
Vector<struct pollfd, 1> pollfds;
if (pollfds.is_empty())
pollfds.append({ .fd = m_socket->fd().value(), .events = POLLOUT, .revents = 0 });
pollfds.append({ .fd = socket.fd().value(), .events = POLLOUT, .revents = 0 });
ErrorOr<int> result { 0 };
do {
@ -196,20 +241,21 @@ TransportSocket::ShouldShutdown TransportSocket::read_as_many_messages_as_possib
return ShouldShutdown::Yes;
if (acknowledged_fd_count > 0) {
m_fds_retained_until_received_by_peer->with_locked([&acknowledged_fd_count](auto& queue) {
while (acknowledged_fd_count > 0) {
queue.dequeue();
--acknowledged_fd_count;
}
});
while (acknowledged_fd_count > 0) {
(void)m_fds_retained_until_received_by_peer->dequeue();
--acknowledged_fd_count;
}
}
if (received_fd_count > 0) {
Vector<u8> message_buffer;
message_buffer.resize(sizeof(MessageHeader));
MessageHeader header;
header.payload_size = 0;
header.fd_count = received_fd_count;
header.type = MessageHeader::Type::FileDescriptorAcknowledgement;
MUST(transfer({ &header, sizeof(MessageHeader) }, {}));
memcpy(message_buffer.data(), &header, sizeof(MessageHeader));
queue_message_on_send_thread({ move(message_buffer), {} });
}
if (index < m_unprocessed_bytes.size()) {

View file

@ -8,11 +8,40 @@
#pragma once
#include <AK/Queue.h>
#include <LibCore/Socket.h>
#include <LibIPC/UnprocessedFileDescriptors.h>
#include <LibThreading/ConditionVariable.h>
#include <LibThreading/MutexProtected.h>
#include <LibThreading/Thread.h>
namespace IPC {
class AutoCloseFileDescriptor : public RefCounted<AutoCloseFileDescriptor> {
public:
AutoCloseFileDescriptor(int fd)
: m_fd(fd)
{
}
~AutoCloseFileDescriptor()
{
if (m_fd != -1)
(void)Core::System::close(m_fd);
}
int value() const { return m_fd; }
int take_fd()
{
int fd = m_fd;
m_fd = -1;
return fd;
}
private:
int m_fd;
};
class TransportSocket {
AK_MAKE_NONCOPYABLE(TransportSocket);
AK_MAKE_DEFAULT_MOVABLE(TransportSocket);
@ -29,7 +58,7 @@ public:
void wait_until_readable();
ErrorOr<void> transfer_message(ReadonlyBytes, Vector<int, 1> const& unowned_fds);
void post_message(Vector<u8> const&, Vector<NonnullRefPtr<AutoCloseFileDescriptor>> const&) const;
enum class ShouldShutdown {
No,
@ -47,17 +76,30 @@ public:
ErrorOr<IPC::File> clone_for_transfer();
private:
ErrorOr<void> transfer(ReadonlyBytes, Vector<int, 1> const& unowned_fds);
static ErrorOr<void> send_message(Core::LocalSocket&, ReadonlyBytes&&, Vector<int, 1> const& unowned_fds);
NonnullOwnPtr<Core::LocalSocket> m_socket;
NonnullOwnPtr<Threading::Mutex> m_socket_write_mutex;
ByteBuffer m_unprocessed_bytes;
UnprocessedFileDescriptors m_unprocessed_fds;
// After file descriptor is sent, it is moved to the wait queue until an acknowledgement is received from the peer.
// This is necessary to handle a specific behavior of the macOS kernel, which may prematurely garbage-collect the file
// descriptor contained in the message before the peer receives it. https://openradar.me/9477351
NonnullOwnPtr<Threading::MutexProtected<Queue<File>>> m_fds_retained_until_received_by_peer;
NonnullOwnPtr<Queue<NonnullRefPtr<AutoCloseFileDescriptor>>> m_fds_retained_until_received_by_peer;
struct MessageToSend {
Vector<u8> bytes;
Vector<int, 1> fds;
};
struct SendQueue : public AtomicRefCounted<SendQueue> {
AK::SinglyLinkedList<MessageToSend> messages;
Threading::Mutex mutex;
Threading::ConditionVariable condition { mutex };
bool running { true };
};
RefPtr<Threading::Thread> m_send_thread;
RefPtr<SendQueue> m_send_queue;
void queue_message_on_send_thread(MessageToSend&&) const;
};
}