From 36da270dbef3e1438b6b6ba157161ab58ec0f16e Mon Sep 17 00:00:00 2001 From: Timothy Flynn Date: Tue, 20 May 2025 16:21:17 -0400 Subject: [PATCH] LibIPC+LibWeb: Flush MessagePort messages before closing The spec isn't super clear on what disentagling a MessagePort means. But we are required to send all pending messages before closing the port. This is a bit tricky because the transport socket performs writes on a background thread. From the main thread, where the disentanglement will occur, we don't really know the state of the write thread. So what we do here is stop the background thread then flush all remaining data from the main thread. --- Libraries/LibIPC/TransportSocket.cpp | 101 +++++++++++++++++--------- Libraries/LibIPC/TransportSocket.h | 10 +++ Libraries/LibWeb/HTML/MessagePort.cpp | 10 ++- 3 files changed, 85 insertions(+), 36 deletions(-) diff --git a/Libraries/LibIPC/TransportSocket.cpp b/Libraries/LibIPC/TransportSocket.cpp index b28be78d966..783780a210d 100644 --- a/Libraries/LibIPC/TransportSocket.cpp +++ b/Libraries/LibIPC/TransportSocket.cpp @@ -74,43 +74,15 @@ TransportSocket::TransportSocket(NonnullOwnPtr socket) break; auto [bytes, fds] = send_queue->peek(4096); - auto fds_count = fds.size(); - ReadonlyBytes remaining_to_send_bytes = bytes; + ReadonlyBytes remaining_bytes_to_send = bytes; - Threading::RWLockLocker lock(m_socket_rw_lock); - if (!m_socket->is_open()) + if (transfer_data(remaining_bytes_to_send, fds) == TransferState::SocketClosed) break; - auto result = send_message(*m_socket, remaining_to_send_bytes, fds); - if (result.is_error()) { - if (result.error().is_errno() && result.error().code() == EPIPE) { - // The socket is closed from the other end, we can stop sending. - break; - } - dbgln("TransportSocket::send_thread: {}", result.error()); - VERIFY_NOT_REACHED(); - } - - auto written_bytes_count = bytes.size() - remaining_to_send_bytes.size(); - auto written_fds_count = fds_count - fds.size(); - if (written_bytes_count > 0 || written_fds_count > 0) { - send_queue->discard(written_bytes_count, written_fds_count); - } - - if (!m_socket->is_open()) - break; - - { - Vector pollfds; - pollfds.append({ .fd = m_socket->fd().value(), .events = POLLOUT, .revents = 0 }); - - ErrorOr result { 0 }; - do { - result = Core::System::poll(pollfds, -1); - } while (result.is_error() && result.error().code() == EINTR); - } } + return 0; }); + m_send_thread->start(); (void)Core::System::setsockopt(m_socket->fd().value(), SOL_SOCKET, SO_SNDBUF, &SOCKET_BUFFER_SIZE, sizeof(SOCKET_BUFFER_SIZE)); @@ -118,9 +90,16 @@ TransportSocket::TransportSocket(NonnullOwnPtr socket) } TransportSocket::~TransportSocket() +{ + stop_send_thread(); +} + +void TransportSocket::stop_send_thread() { m_send_queue->stop(); - (void)m_send_thread->join(); + + if (m_send_thread->needs_to_be_joined()) + (void)m_send_thread->join(); } void TransportSocket::set_up_read_hook(Function hook) @@ -142,6 +121,21 @@ void TransportSocket::close() m_socket->close(); } +void TransportSocket::close_after_sending_all_pending_messages() +{ + stop_send_thread(); + + auto [bytes, fds] = m_send_queue->peek(NumericLimits::max()); + ReadonlyBytes remaining_bytes_to_send = bytes; + + while (!remaining_bytes_to_send.is_empty() || !fds.is_empty()) { + if (transfer_data(remaining_bytes_to_send, fds) == TransferState::SocketClosed) + break; + } + + close(); +} + void TransportSocket::wait_until_readable() { Threading::RWLockLocker lock(m_socket_rw_lock); @@ -217,6 +211,47 @@ ErrorOr TransportSocket::send_message(Core::LocalSocket& socket, ReadonlyB return {}; } +TransportSocket::TransferState TransportSocket::transfer_data(ReadonlyBytes& bytes, Vector& fds) +{ + auto byte_count = bytes.size(); + auto fd_count = fds.size(); + + Threading::RWLockLocker lock(m_socket_rw_lock); + + if (!m_socket->is_open()) + return TransferState::SocketClosed; + + if (auto result = send_message(*m_socket, bytes, fds); result.is_error()) { + if (result.error().is_errno() && result.error().code() == EPIPE) { + // The socket is closed from the other end, we can stop sending. + return TransferState::SocketClosed; + } + + dbgln("TransportSocket::send_thread: {}", result.error()); + VERIFY_NOT_REACHED(); + } + + auto written_byte_count = byte_count - bytes.size(); + auto written_fd_count = fd_count - fds.size(); + if (written_byte_count > 0 || written_fd_count > 0) + m_send_queue->discard(written_byte_count, written_fd_count); + + if (!m_socket->is_open()) + return TransferState::SocketClosed; + + { + Vector pollfds; + pollfds.append({ .fd = m_socket->fd().value(), .events = POLLOUT, .revents = 0 }); + + ErrorOr result { 0 }; + do { + result = Core::System::poll(pollfds, -1); + } while (result.is_error() && result.error().code() == EINTR); + } + + return TransferState::Continue; +} + TransportSocket::ShouldShutdown TransportSocket::read_as_many_messages_as_possible_without_blocking(Function&& callback) { Threading::RWLockLocker lock(m_socket_rw_lock); diff --git a/Libraries/LibIPC/TransportSocket.h b/Libraries/LibIPC/TransportSocket.h index 3304c2b911d..74727235e73 100644 --- a/Libraries/LibIPC/TransportSocket.h +++ b/Libraries/LibIPC/TransportSocket.h @@ -73,7 +73,9 @@ public: void set_up_read_hook(Function); bool is_open() const; + void close(); + void close_after_sending_all_pending_messages(); void wait_until_readable(); @@ -95,8 +97,16 @@ public: ErrorOr clone_for_transfer(); private: + enum class TransferState { + Continue, + SocketClosed, + }; + [[nodiscard]] TransferState transfer_data(ReadonlyBytes& bytes, Vector& fds); + static ErrorOr send_message(Core::LocalSocket&, ReadonlyBytes& bytes, Vector& unowned_fds); + void stop_send_thread(); + NonnullOwnPtr m_socket; mutable Threading::RWLock m_socket_rw_lock; ByteBuffer m_unprocessed_bytes; diff --git a/Libraries/LibWeb/HTML/MessagePort.cpp b/Libraries/LibWeb/HTML/MessagePort.cpp index 123dc56b4eb..a14ed948970 100644 --- a/Libraries/LibWeb/HTML/MessagePort.cpp +++ b/Libraries/LibWeb/HTML/MessagePort.cpp @@ -146,11 +146,15 @@ WebIDL::ExceptionOr MessagePort::transfer_receiving_steps(HTML::TransferDa void MessagePort::disentangle() { - if (m_remote_port) + if (m_remote_port) { m_remote_port->m_remote_port = nullptr; - m_remote_port = nullptr; + m_remote_port = nullptr; + } - m_transport.clear(); + if (m_transport) { + m_transport->close_after_sending_all_pending_messages(); + m_transport.clear(); + } m_worker_event_target = nullptr; }