From 14bac7b287561d4ecea0fa9697890aa543a2654c Mon Sep 17 00:00:00 2001 From: Aliaksandr Kalenik Date: Tue, 8 Apr 2025 04:55:50 +0200 Subject: [PATCH] LibIPC: Move send thread from IPC connection to the transport layer By doing this we also make MessagePort, that relies on IPC transport, to send messages from separate thread, which solves the problem when WebWorker and WebContent could deadlock if both were trying to post messages at the same time. Fixes https://github.com/LadybirdBrowser/ladybird/issues/4254 --- Libraries/LibIPC/Connection.cpp | 40 +----------- Libraries/LibIPC/Connection.h | 10 --- Libraries/LibIPC/Message.cpp | 11 +--- Libraries/LibIPC/Message.h | 26 -------- Libraries/LibIPC/TransportSocket.cpp | 92 +++++++++++++++++++++------- Libraries/LibIPC/TransportSocket.h | 50 +++++++++++++-- 6 files changed, 118 insertions(+), 111 deletions(-) diff --git a/Libraries/LibIPC/Connection.cpp b/Libraries/LibIPC/Connection.cpp index 51d1a13a4c1..f623c42b772 100644 --- a/Libraries/LibIPC/Connection.cpp +++ b/Libraries/LibIPC/Connection.cpp @@ -29,41 +29,9 @@ ConnectionBase::ConnectionBase(IPC::Stub& local_stub, Transport transport, u32 l (void)drain_messages_from_peer(); handle_messages(); }); - - m_send_queue = adopt_ref(*new SendQueue); - m_send_thread = Threading::Thread::construct([this, send_queue = m_send_queue]() -> intptr_t { - for (;;) { - send_queue->mutex.lock(); - while (send_queue->messages.is_empty() && send_queue->running) - send_queue->condition.wait(); - - if (!send_queue->running) { - send_queue->mutex.unlock(); - break; - } - - auto message_buffer = send_queue->messages.take_first(); - send_queue->mutex.unlock(); - - if (auto result = message_buffer.transfer_message(m_transport); result.is_error()) { - dbgln("ConnectionBase::send_thread: {}", result.error()); - continue; - } - } - return 0; - }); - m_send_thread->start(); } -ConnectionBase::~ConnectionBase() -{ - { - Threading::MutexLocker locker(m_send_queue->mutex); - m_send_queue->running = false; - m_send_queue->condition.signal(); - } - (void)m_send_thread->join(); -} +ConnectionBase::~ConnectionBase() = default; bool ConnectionBase::is_open() const { @@ -87,11 +55,7 @@ ErrorOr ConnectionBase::post_message(u32 endpoint_magic, MessageBuffer buf buffer = MUST(wrapper->encode()); } - { - Threading::MutexLocker locker(m_send_queue->mutex); - m_send_queue->messages.append(move(buffer)); - m_send_queue->condition.signal(); - } + MUST(buffer.transfer_message(m_transport)); m_responsiveness_timer->start(); return {}; diff --git a/Libraries/LibIPC/Connection.h b/Libraries/LibIPC/Connection.h index 6a2d2f2ff59..7aae27de1e4 100644 --- a/Libraries/LibIPC/Connection.h +++ b/Libraries/LibIPC/Connection.h @@ -60,16 +60,6 @@ protected: Vector> m_unprocessed_messages; u32 m_local_endpoint_magic { 0 }; - - struct SendQueue : public AtomicRefCounted { - AK::SinglyLinkedList messages; - Threading::Mutex mutex; - Threading::ConditionVariable condition { mutex }; - bool running { true }; - }; - - RefPtr m_send_thread; - RefPtr m_send_queue; }; template diff --git a/Libraries/LibIPC/Message.cpp b/Libraries/LibIPC/Message.cpp index 35af54f6276..5652bac21cb 100644 --- a/Libraries/LibIPC/Message.cpp +++ b/Libraries/LibIPC/Message.cpp @@ -43,16 +43,7 @@ ErrorOr MessageBuffer::transfer_message(Transport& transport) return Error::from_string_literal("Message is too large for IPC encoding"); } - auto raw_fds = Vector {}; - auto num_fds_to_transfer = m_fds.size(); - if (num_fds_to_transfer > 0) { - raw_fds.ensure_capacity(num_fds_to_transfer); - for (auto& owned_fd : m_fds) { - raw_fds.unchecked_append(owned_fd->value()); - } - } - - TRY(transport.transfer_message(m_data.span(), raw_fds)); + transport.post_message(m_data, m_fds); return {}; } diff --git a/Libraries/LibIPC/Message.h b/Libraries/LibIPC/Message.h index 7d1b741225a..65333340f2a 100644 --- a/Libraries/LibIPC/Message.h +++ b/Libraries/LibIPC/Message.h @@ -19,32 +19,6 @@ namespace IPC { -class AutoCloseFileDescriptor : public RefCounted { -public: - AutoCloseFileDescriptor(int fd) - : m_fd(fd) - { - } - - ~AutoCloseFileDescriptor() - { - if (m_fd != -1) - (void)Core::System::close(m_fd); - } - - int value() const { return m_fd; } - - int take_fd() - { - int fd = m_fd; - m_fd = -1; - return fd; - } - -private: - int m_fd; -}; - class MessageBuffer { public: MessageBuffer(); diff --git a/Libraries/LibIPC/TransportSocket.cpp b/Libraries/LibIPC/TransportSocket.cpp index ca0f8c387df..61d31b60c83 100644 --- a/Libraries/LibIPC/TransportSocket.cpp +++ b/Libraries/LibIPC/TransportSocket.cpp @@ -15,14 +15,46 @@ namespace IPC { TransportSocket::TransportSocket(NonnullOwnPtr socket) : m_socket(move(socket)) - , m_socket_write_mutex(make()) - , m_fds_retained_until_received_by_peer(make>>()) + , m_fds_retained_until_received_by_peer(make>>()) { + m_send_queue = adopt_ref(*new SendQueue); + m_send_thread = Threading::Thread::construct([&socket = *m_socket, send_queue = m_send_queue]() -> intptr_t { + for (;;) { + send_queue->mutex.lock(); + while (send_queue->messages.is_empty() && send_queue->running) + send_queue->condition.wait(); + + if (!send_queue->running) { + send_queue->mutex.unlock(); + break; + } + + auto [bytes, fds] = send_queue->messages.take_first(); + send_queue->mutex.unlock(); + + if (auto result = send_message(socket, bytes, fds); result.is_error()) { + dbgln("TransportSocket::send_thread: {}", result.error()); + } + } + return 0; + }); + m_send_thread->start(); + (void)Core::System::setsockopt(m_socket->fd().value(), SOL_SOCKET, SO_SNDBUF, &SOCKET_BUFFER_SIZE, sizeof(SOCKET_BUFFER_SIZE)); (void)Core::System::setsockopt(m_socket->fd().value(), SOL_SOCKET, SO_RCVBUF, &SOCKET_BUFFER_SIZE, sizeof(SOCKET_BUFFER_SIZE)); } -TransportSocket::~TransportSocket() = default; +TransportSocket::~TransportSocket() +{ + if (m_send_thread) { + { + Threading::MutexLocker locker(m_send_queue->mutex); + m_send_queue->running = false; + m_send_queue->condition.signal(); + } + (void)m_send_thread->join(); + } +} void TransportSocket::set_up_read_hook(Function hook) { @@ -62,37 +94,50 @@ struct MessageHeader { u32 fd_count { 0 }; }; -ErrorOr TransportSocket::transfer_message(ReadonlyBytes bytes_to_write, Vector const& unowned_fds) +void TransportSocket::post_message(Vector const& bytes_to_write, Vector> const& fds) const { Vector message_buffer; message_buffer.resize(sizeof(MessageHeader) + bytes_to_write.size()); MessageHeader header; header.payload_size = bytes_to_write.size(); - header.fd_count = unowned_fds.size(); + header.fd_count = fds.size(); header.type = MessageHeader::Type::Payload; memcpy(message_buffer.data(), &header, sizeof(MessageHeader)); memcpy(message_buffer.data() + sizeof(MessageHeader), bytes_to_write.data(), bytes_to_write.size()); - return transfer(message_buffer.span(), unowned_fds); + + for (auto const& fd : fds) + m_fds_retained_until_received_by_peer->enqueue(fd); + + auto raw_fds = Vector {}; + auto num_fds_to_transfer = fds.size(); + if (num_fds_to_transfer > 0) { + raw_fds.ensure_capacity(num_fds_to_transfer); + for (auto& owned_fd : fds) { + raw_fds.unchecked_append(owned_fd->value()); + } + } + + queue_message_on_send_thread({ move(message_buffer), move(raw_fds) }); } -ErrorOr TransportSocket::transfer(ReadonlyBytes bytes_to_write, Vector const& unowned_fds) +void TransportSocket::queue_message_on_send_thread(MessageToSend&& message_to_send) const { - m_fds_retained_until_received_by_peer->with_locked([&unowned_fds](auto& queue) { - for (auto const& fd : unowned_fds) - queue.enqueue(MUST(File::clone_fd(fd))); - }); - - Threading::MutexLocker lock(*m_socket_write_mutex); + Threading::MutexLocker lock(m_send_queue->mutex); + m_send_queue->messages.append(move(message_to_send)); + m_send_queue->condition.signal(); +} +ErrorOr TransportSocket::send_message(Core::LocalSocket& socket, ReadonlyBytes&& bytes_to_write, Vector const& unowned_fds) +{ auto num_fds_to_transfer = unowned_fds.size(); while (!bytes_to_write.is_empty()) { ErrorOr maybe_nwritten = 0; if (num_fds_to_transfer > 0) { - maybe_nwritten = m_socket->send_message(bytes_to_write, 0, unowned_fds); + maybe_nwritten = socket.send_message(bytes_to_write, 0, unowned_fds); if (!maybe_nwritten.is_error()) num_fds_to_transfer = 0; } else { - maybe_nwritten = m_socket->write_some(bytes_to_write); + maybe_nwritten = socket.write_some(bytes_to_write); } if (maybe_nwritten.is_error()) { @@ -102,7 +147,7 @@ ErrorOr TransportSocket::transfer(ReadonlyBytes bytes_to_write, Vector pollfds; if (pollfds.is_empty()) - pollfds.append({ .fd = m_socket->fd().value(), .events = POLLOUT, .revents = 0 }); + pollfds.append({ .fd = socket.fd().value(), .events = POLLOUT, .revents = 0 }); ErrorOr result { 0 }; do { @@ -196,20 +241,21 @@ TransportSocket::ShouldShutdown TransportSocket::read_as_many_messages_as_possib return ShouldShutdown::Yes; if (acknowledged_fd_count > 0) { - m_fds_retained_until_received_by_peer->with_locked([&acknowledged_fd_count](auto& queue) { - while (acknowledged_fd_count > 0) { - queue.dequeue(); - --acknowledged_fd_count; - } - }); + while (acknowledged_fd_count > 0) { + (void)m_fds_retained_until_received_by_peer->dequeue(); + --acknowledged_fd_count; + } } if (received_fd_count > 0) { + Vector message_buffer; + message_buffer.resize(sizeof(MessageHeader)); MessageHeader header; header.payload_size = 0; header.fd_count = received_fd_count; header.type = MessageHeader::Type::FileDescriptorAcknowledgement; - MUST(transfer({ &header, sizeof(MessageHeader) }, {})); + memcpy(message_buffer.data(), &header, sizeof(MessageHeader)); + queue_message_on_send_thread({ move(message_buffer), {} }); } if (index < m_unprocessed_bytes.size()) { diff --git a/Libraries/LibIPC/TransportSocket.h b/Libraries/LibIPC/TransportSocket.h index 39c092cf827..a1e7257601d 100644 --- a/Libraries/LibIPC/TransportSocket.h +++ b/Libraries/LibIPC/TransportSocket.h @@ -8,11 +8,40 @@ #pragma once #include +#include #include +#include #include +#include namespace IPC { +class AutoCloseFileDescriptor : public RefCounted { +public: + AutoCloseFileDescriptor(int fd) + : m_fd(fd) + { + } + + ~AutoCloseFileDescriptor() + { + if (m_fd != -1) + (void)Core::System::close(m_fd); + } + + int value() const { return m_fd; } + + int take_fd() + { + int fd = m_fd; + m_fd = -1; + return fd; + } + +private: + int m_fd; +}; + class TransportSocket { AK_MAKE_NONCOPYABLE(TransportSocket); AK_MAKE_DEFAULT_MOVABLE(TransportSocket); @@ -29,7 +58,7 @@ public: void wait_until_readable(); - ErrorOr transfer_message(ReadonlyBytes, Vector const& unowned_fds); + void post_message(Vector const&, Vector> const&) const; enum class ShouldShutdown { No, @@ -47,17 +76,30 @@ public: ErrorOr clone_for_transfer(); private: - ErrorOr transfer(ReadonlyBytes, Vector const& unowned_fds); + static ErrorOr send_message(Core::LocalSocket&, ReadonlyBytes&&, Vector const& unowned_fds); NonnullOwnPtr m_socket; - NonnullOwnPtr m_socket_write_mutex; ByteBuffer m_unprocessed_bytes; UnprocessedFileDescriptors m_unprocessed_fds; // After file descriptor is sent, it is moved to the wait queue until an acknowledgement is received from the peer. // This is necessary to handle a specific behavior of the macOS kernel, which may prematurely garbage-collect the file // descriptor contained in the message before the peer receives it. https://openradar.me/9477351 - NonnullOwnPtr>> m_fds_retained_until_received_by_peer; + NonnullOwnPtr>> m_fds_retained_until_received_by_peer; + + struct MessageToSend { + Vector bytes; + Vector fds; + }; + struct SendQueue : public AtomicRefCounted { + AK::SinglyLinkedList messages; + Threading::Mutex mutex; + Threading::ConditionVariable condition { mutex }; + bool running { true }; + }; + RefPtr m_send_thread; + RefPtr m_send_queue; + void queue_message_on_send_thread(MessageToSend&&) const; }; }