From 1af8282608a7536aa6636a492c3baac1db56097a Mon Sep 17 00:00:00 2001 From: Aliaksandr Kalenik Date: Mon, 7 Apr 2025 04:17:36 +0200 Subject: [PATCH] LibIPC: Make TransportSocket responsible for reading entire messages With this change, the responsibility for prepending messages with their size and ensuring the entire message is received before returning it to the caller is moved to TransportSocket. This removes the need to duplicate this logic in both LibIPC and MessagePort. Another advantage of reducing message granularity at IPC::Transport layer is that it will make it easier to support alternative transport implementations (like Mach ports, which unlike Unix domain sockets are not stream oriented). --- Libraries/LibIPC/Connection.cpp | 147 +++++++++----------------- Libraries/LibIPC/Connection.h | 5 +- Libraries/LibIPC/Message.cpp | 16 +-- Libraries/LibIPC/TransportSocket.cpp | 60 ++++++++--- Libraries/LibIPC/TransportSocket.h | 21 ++-- Libraries/LibWeb/HTML/MessagePort.cpp | 69 ++---------- Libraries/LibWeb/HTML/MessagePort.h | 16 +-- 7 files changed, 130 insertions(+), 204 deletions(-) diff --git a/Libraries/LibIPC/Connection.cpp b/Libraries/LibIPC/Connection.cpp index b50401da84a..b2811e34412 100644 --- a/Libraries/LibIPC/Connection.cpp +++ b/Libraries/LibIPC/Connection.cpp @@ -1,5 +1,6 @@ /* * Copyright (c) 2021-2024, Andreas Kling + * Copyright (c) 2025, Aliaksandr Kalenik * Copyright (c) 2022, the SerenityOS developers. * * SPDX-License-Identifier: BSD-2-Clause @@ -11,6 +12,7 @@ #include #include #include +#include namespace IPC { @@ -139,63 +141,65 @@ void ConnectionBase::wait_for_transport_to_become_readable() m_transport.wait_until_readable(); } -ErrorOr> ConnectionBase::read_as_much_as_possible_from_transport_without_blocking() -{ - Vector bytes; - - if (!m_unprocessed_bytes.is_empty()) { - bytes.append(m_unprocessed_bytes.data(), m_unprocessed_bytes.size()); - m_unprocessed_bytes.clear(); - } - - bool should_shut_down = false; - auto schedule_shutdown = [this, &should_shut_down]() { - should_shut_down = true; - deferred_invoke([this] { - shutdown(); - }); - }; - - auto&& [new_bytes, received_fds] = m_transport.read_as_much_as_possible_without_blocking(move(schedule_shutdown)); - bytes.append(new_bytes.data(), new_bytes.size()); - - for (auto const& fd : received_fds) - m_unprocessed_fds.enqueue(IPC::File::adopt_fd(fd)); - - if (!bytes.is_empty()) { - m_responsiveness_timer->stop(); - did_become_responsive(); - } else if (should_shut_down) { - return Error::from_string_literal("IPC connection EOF"); - } - - return bytes; -} - ErrorOr ConnectionBase::drain_messages_from_peer() { - auto bytes = TRY(read_as_much_as_possible_from_transport_without_blocking()); + u32 pending_ack_count = 0; + u32 received_ack_count = 0; + auto schedule_shutdown = m_transport.read_as_many_messages_as_possible_without_blocking([&](auto&& unparsed_message) { + auto const& bytes = unparsed_message.bytes; + UnprocessedFileDescriptors unprocessed_fds; + unprocessed_fds.return_fds_to_front_of_queue(move(unparsed_message.fds)); + if (auto message = try_parse_message(bytes, unprocessed_fds)) { + if (message->message_id() == LargeMessageWrapper::MESSAGE_ID) { + LargeMessageWrapper* wrapper = static_cast(message.ptr()); + auto wrapped_message = wrapper->wrapped_message_data(); + unprocessed_fds.return_fds_to_front_of_queue(wrapper->take_fds()); + auto parsed_message = try_parse_message(wrapped_message, unprocessed_fds); + VERIFY(parsed_message); + VERIFY(parsed_message->message_id() != Acknowledgement::MESSAGE_ID); + pending_ack_count++; + m_unprocessed_messages.append(parsed_message.release_nonnull()); + return; + } - size_t index = 0; - try_parse_messages(bytes, index); + if (message->message_id() == Acknowledgement::MESSAGE_ID) { + VERIFY(message->endpoint_magic() == m_local_endpoint_magic); + received_ack_count += static_cast(message.ptr())->ack_count(); + return; + } - if (index < bytes.size()) { - // Sometimes we might receive a partial message. That's okay, just stash away - // the unprocessed bytes and we'll prepend them to the next incoming message - // in the next run of this function. - auto remaining_bytes = TRY(ByteBuffer::copy(bytes.span().slice(index))); - if (!m_unprocessed_bytes.is_empty()) { - shutdown(); - return Error::from_string_literal("drain_messages_from_peer: Already have unprocessed bytes"); + pending_ack_count++; + m_unprocessed_messages.append(message.release_nonnull()); + } else { + dbgln("Failed to parse IPC message {:hex-dump}", bytes); + VERIFY_NOT_REACHED(); } - m_unprocessed_bytes = move(remaining_bytes); + }); + + if (received_ack_count > 0) { + Threading::MutexLocker lock(m_acknowledgement_wait_queue->mutex); + for (size_t i = 0; i < received_ack_count; ++i) + m_acknowledgement_wait_queue->messages.take_first(); + } + + if (is_open() && pending_ack_count > 0) { + auto acknowledgement = Acknowledgement::create(m_peer_endpoint_magic, pending_ack_count); + MUST(post_message(m_peer_endpoint_magic, MUST(acknowledgement->encode()), MessageNeedsAcknowledgement::No)); } if (!m_unprocessed_messages.is_empty()) { + m_responsiveness_timer->stop(); + did_become_responsive(); deferred_invoke([this] { handle_messages(); }); + } else if (schedule_shutdown == TransportSocket::ShouldShutdown::Yes) { + deferred_invoke([this] { + shutdown(); + }); + return Error::from_string_literal("IPC connection EOF"); } + return {}; } @@ -222,57 +226,4 @@ OwnPtr ConnectionBase::wait_for_specific_endpoint_message_impl(u32 return {}; } -void ConnectionBase::try_parse_messages(Vector const& bytes, size_t& index) -{ - u32 message_size = 0; - u32 pending_ack_count = 0; - u32 received_ack_count = 0; - for (; index + sizeof(message_size) < bytes.size(); index += message_size) { - memcpy(&message_size, bytes.data() + index, sizeof(message_size)); - if (message_size == 0 || bytes.size() - index - sizeof(uint32_t) < message_size) - break; - index += sizeof(message_size); - auto remaining_bytes = ReadonlyBytes { bytes.data() + index, message_size }; - - if (auto message = try_parse_message(remaining_bytes, m_unprocessed_fds)) { - if (message->message_id() == LargeMessageWrapper::MESSAGE_ID) { - LargeMessageWrapper* wrapper = static_cast(message.ptr()); - auto wrapped_message = wrapper->wrapped_message_data(); - m_unprocessed_fds.return_fds_to_front_of_queue(wrapper->take_fds()); - auto parsed_message = try_parse_message(wrapped_message, m_unprocessed_fds); - VERIFY(parsed_message); - VERIFY(parsed_message->message_id() != Acknowledgement::MESSAGE_ID); - pending_ack_count++; - m_unprocessed_messages.append(parsed_message.release_nonnull()); - continue; - } - - if (message->message_id() == Acknowledgement::MESSAGE_ID) { - VERIFY(message->endpoint_magic() == m_local_endpoint_magic); - received_ack_count += static_cast(message.ptr())->ack_count(); - continue; - } - - pending_ack_count++; - m_unprocessed_messages.append(message.release_nonnull()); - continue; - } - - dbgln("Failed to parse IPC message:"); - dbgln("{:hex-dump}", remaining_bytes); - break; - } - - if (received_ack_count > 0) { - Threading::MutexLocker lock(m_acknowledgement_wait_queue->mutex); - for (size_t i = 0; i < received_ack_count; ++i) - m_acknowledgement_wait_queue->messages.take_first(); - } - - if (is_open() && pending_ack_count > 0) { - auto acknowledgement = Acknowledgement::create(m_peer_endpoint_magic, pending_ack_count); - MUST(post_message(m_peer_endpoint_magic, MUST(acknowledgement->encode()), MessageNeedsAcknowledgement::No)); - } -} - } diff --git a/Libraries/LibIPC/Connection.h b/Libraries/LibIPC/Connection.h index 8577d529f3d..05c2ccf93cf 100644 --- a/Libraries/LibIPC/Connection.h +++ b/Libraries/LibIPC/Connection.h @@ -1,5 +1,6 @@ /* * Copyright (c) 2018-2024, Andreas Kling + * Copyright (c) 2025, Aliaksandr Kalenik * Copyright (c) 2022, the SerenityOS developers. * * SPDX-License-Identifier: BSD-2-Clause @@ -51,9 +52,7 @@ protected: OwnPtr wait_for_specific_endpoint_message_impl(u32 endpoint_magic, int message_id); void wait_for_transport_to_become_readable(); - ErrorOr> read_as_much_as_possible_from_transport_without_blocking(); ErrorOr drain_messages_from_peer(); - void try_parse_messages(Vector const& bytes, size_t& index); void handle_messages(); @@ -64,8 +63,6 @@ protected: RefPtr m_responsiveness_timer; Vector> m_unprocessed_messages; - UnprocessedFileDescriptors m_unprocessed_fds; - ByteBuffer m_unprocessed_bytes; u32 m_local_endpoint_magic { 0 }; u32 m_peer_endpoint_magic { 0 }; diff --git a/Libraries/LibIPC/Message.cpp b/Libraries/LibIPC/Message.cpp index 0bb186e0669..ffc83b5c537 100644 --- a/Libraries/LibIPC/Message.cpp +++ b/Libraries/LibIPC/Message.cpp @@ -15,7 +15,6 @@ using MessageSizeType = u32; MessageBuffer::MessageBuffer() { - m_data.resize(sizeof(MessageSizeType)); } ErrorOr MessageBuffer::extend_data_capacity(size_t capacity) @@ -40,13 +39,9 @@ ErrorOr MessageBuffer::append_file_descriptor(int fd) ErrorOr MessageBuffer::transfer_message(Transport& transport) { Checked checked_message_size { m_data.size() }; - checked_message_size -= sizeof(MessageSizeType); - - if (checked_message_size.has_overflow()) + if (checked_message_size.has_overflow()) { return Error::from_string_literal("Message is too large for IPC encoding"); - - MessageSizeType const message_size = checked_message_size.value(); - m_data.span().overwrite(0, reinterpret_cast(&message_size), sizeof(message_size)); + } auto raw_fds = Vector {}; auto num_fds_to_transfer = m_fds.size(); @@ -57,16 +52,15 @@ ErrorOr MessageBuffer::transfer_message(Transport& transport) } } - TRY(transport.transfer(m_data.span(), raw_fds)); + TRY(transport.transfer_message(m_data.span(), raw_fds)); return {}; } NonnullOwnPtr LargeMessageWrapper::create(u32 endpoint_magic, MessageBuffer& buffer_to_wrap) { - auto size = buffer_to_wrap.data().size() - sizeof(MessageSizeType); - u8 const* data = buffer_to_wrap.data().data() + sizeof(MessageSizeType); + auto size = buffer_to_wrap.data().size(); auto wrapped_message_data = MUST(Core::AnonymousBuffer::create_with_size(size)); - memcpy(wrapped_message_data.data(), data, size); + memcpy(wrapped_message_data.data(), buffer_to_wrap.data().data(), size); Vector files; for (auto& owned_fd : buffer_to_wrap.take_fds()) { files.append(File::adopt_fd(owned_fd->take_fd())); diff --git a/Libraries/LibIPC/TransportSocket.cpp b/Libraries/LibIPC/TransportSocket.cpp index f6c375f7327..01e717253de 100644 --- a/Libraries/LibIPC/TransportSocket.cpp +++ b/Libraries/LibIPC/TransportSocket.cpp @@ -1,5 +1,6 @@ /* * Copyright (c) 2024, Andrew Kaster + * Copyright (c) 2025, Aliaksandr Kalenik * * SPDX-License-Identifier: BSD-2-Clause */ @@ -49,6 +50,23 @@ void TransportSocket::wait_until_readable() VERIFY(maybe_did_become_readable.value()); } +struct MessageHeader { + u32 size { 0 }; + u32 fd_count { 0 }; +}; + +ErrorOr TransportSocket::transfer_message(ReadonlyBytes bytes_to_write, Vector const& unowned_fds) +{ + Vector message_buffer; + message_buffer.resize(sizeof(MessageHeader) + bytes_to_write.size()); + MessageHeader header; + header.size = bytes_to_write.size(); + header.fd_count = unowned_fds.size(); + memcpy(message_buffer.data(), &header, sizeof(MessageHeader)); + memcpy(message_buffer.data() + sizeof(MessageHeader), bytes_to_write.data(), bytes_to_write.size()); + return transfer(message_buffer.span(), unowned_fds); +} + ErrorOr TransportSocket::transfer(ReadonlyBytes bytes_to_write, Vector const& unowned_fds) { auto num_fds_to_transfer = unowned_fds.size(); @@ -98,15 +116,12 @@ ErrorOr TransportSocket::transfer(ReadonlyBytes bytes_to_write, Vector schedule_shutdown) +TransportSocket::ShouldShutdown TransportSocket::read_as_many_messages_as_possible_without_blocking(Function&& callback) { - u8 buffer[4096]; - - ReadResult result; - auto received_fds = Vector {}; - auto& bytes = result.bytes; - + auto should_shutdown = ShouldShutdown::No; while (is_open()) { + u8 buffer[4096]; + auto received_fds = Vector {}; auto maybe_bytes_read = m_socket->receive_message({ buffer, 4096 }, MSG_DONTWAIT, received_fds); if (maybe_bytes_read.is_error()) { auto error = maybe_bytes_read.release_error(); @@ -115,7 +130,7 @@ TransportSocket::ReadResult TransportSocket::read_as_much_as_possible_without_bl } if (error.is_syscall() && error.code() == ECONNRESET) { - schedule_shutdown(); + should_shutdown = ShouldShutdown::Yes; break; } @@ -126,15 +141,36 @@ TransportSocket::ReadResult TransportSocket::read_as_much_as_possible_without_bl auto bytes_read = maybe_bytes_read.release_value(); if (bytes_read.is_empty()) { - schedule_shutdown(); + should_shutdown = ShouldShutdown::Yes; break; } - bytes.append(bytes_read.data(), bytes_read.size()); - result.fds.append(received_fds.data(), received_fds.size()); + m_unprocessed_bytes.append(bytes_read.data(), bytes_read.size()); + for (auto const& fd : received_fds) { + m_unprocessed_fds.enqueue(File::adopt_fd(fd)); + } } - return result; + size_t index = 0; + while (index + sizeof(MessageHeader) < m_unprocessed_bytes.size()) { + MessageHeader header; + memcpy(&header, m_unprocessed_bytes.data() + index, sizeof(MessageHeader)); + Message message; + for (size_t i = 0; i < header.fd_count; ++i) + message.fds.append(m_unprocessed_fds.dequeue()); + message.bytes.append(m_unprocessed_bytes.data() + index + sizeof(MessageHeader), header.size); + callback(move(message)); + index += header.size + sizeof(MessageHeader); + } + + if (index < m_unprocessed_bytes.size()) { + auto remaining_bytes = MUST(ByteBuffer::copy(m_unprocessed_bytes.span().slice(index))); + m_unprocessed_bytes = move(remaining_bytes); + } else { + m_unprocessed_bytes.clear(); + } + + return should_shutdown; } ErrorOr TransportSocket::release_underlying_transport_for_transfer() diff --git a/Libraries/LibIPC/TransportSocket.h b/Libraries/LibIPC/TransportSocket.h index 7bea8731117..ac83139498c 100644 --- a/Libraries/LibIPC/TransportSocket.h +++ b/Libraries/LibIPC/TransportSocket.h @@ -1,12 +1,13 @@ /* * Copyright (c) 2024, Andrew Kaster + * Copyright (c) 2025, Aliaksandr Kalenik * * SPDX-License-Identifier: BSD-2-Clause */ #pragma once -#include +#include namespace IPC { @@ -26,13 +27,17 @@ public: void wait_until_readable(); - ErrorOr transfer(ReadonlyBytes, Vector const& unowned_fds); + ErrorOr transfer_message(ReadonlyBytes, Vector const& unowned_fds); - struct [[nodiscard]] ReadResult { - Vector bytes; - Vector fds; + enum class ShouldShutdown { + No, + Yes, }; - ReadResult read_as_much_as_possible_without_blocking(Function schedule_shutdown); + struct Message { + Vector bytes; + Vector fds; + }; + ShouldShutdown read_as_many_messages_as_possible_without_blocking(Function&& schedule_shutdown); // Obnoxious name to make it clear that this is a dangerous operation. ErrorOr release_underlying_transport_for_transfer(); @@ -40,7 +45,11 @@ public: ErrorOr clone_for_transfer(); private: + ErrorOr transfer(ReadonlyBytes, Vector const& unowned_fds); + NonnullOwnPtr m_socket; + ByteBuffer m_unprocessed_bytes; + UnprocessedFileDescriptors m_unprocessed_fds; }; } diff --git a/Libraries/LibWeb/HTML/MessagePort.cpp b/Libraries/LibWeb/HTML/MessagePort.cpp index 6e5a8264cfd..4a322d104a4 100644 --- a/Libraries/LibWeb/HTML/MessagePort.cpp +++ b/Libraries/LibWeb/HTML/MessagePort.cpp @@ -1,6 +1,7 @@ /* * Copyright (c) 2021, Andreas Kling * Copyright (c) 2023, Andrew Kaster + * Copyright (c) 2025, Aliaksandr Kalenik * * SPDX-License-Identifier: BSD-2-Clause */ @@ -285,75 +286,27 @@ void MessagePort::post_port_message(SerializedTransferRecord serialize_with_tran } } -ErrorOr MessagePort::parse_message() +void MessagePort::read_from_transport() { - static constexpr size_t HEADER_SIZE = sizeof(u32); + auto schedule_shutdown = m_transport->read_as_many_messages_as_possible_without_blocking([this](auto&& unparsed_message) { + auto& bytes = unparsed_message.bytes; + IPC::UnprocessedFileDescriptors unprocessed_fds; + unprocessed_fds.return_fds_to_front_of_queue(move(unparsed_message.fds)); - auto num_bytes_ready = m_buffered_data.size(); - switch (m_socket_state) { - case SocketState::Header: { - if (num_bytes_ready < HEADER_SIZE) - return ParseDecision::NotEnoughData; + FixedMemoryStream stream { bytes.span(), FixedMemoryStream::Mode::ReadOnly }; + IPC::Decoder decoder { stream, unprocessed_fds }; - m_socket_incoming_message_size = ByteReader::load32(m_buffered_data.data()); - // NOTE: We don't decrement the number of ready bytes because we want to remove the entire - // message + header from the buffer in one go on success - m_socket_state = SocketState::Data; - [[fallthrough]]; - } - case SocketState::Data: { - if (num_bytes_ready < HEADER_SIZE + m_socket_incoming_message_size) - return ParseDecision::NotEnoughData; + auto serialized_transfer_record = MUST(decoder.decode()); - auto payload = m_buffered_data.span().slice(HEADER_SIZE, m_socket_incoming_message_size); - - FixedMemoryStream stream { payload, FixedMemoryStream::Mode::ReadOnly }; - IPC::Decoder decoder { stream, m_unprocessed_fds }; - - auto serialized_transfer_record = TRY(decoder.decode()); - - // Make sure to advance our state machine before dispatching the MessageEvent, - // as dispatching events can run arbitrary JS (and cause us to receive another message!) - m_socket_state = SocketState::Header; - - m_buffered_data.remove(0, HEADER_SIZE + m_socket_incoming_message_size); - - // Note: this is step 7 of message_port_post_message_steps: - // 7. Add a task that runs the following steps to the port message queue of targetPort: queue_global_task(Task::Source::PostedMessage, relevant_global_object(*this), GC::create_function(heap(), [this, serialized_transfer_record = move(serialized_transfer_record)]() mutable { this->post_message_task_steps(serialized_transfer_record); })); + }); - break; - } - case SocketState::Error: - return Error::from_errno(ENOMSG); - } - - return ParseDecision::ParseNextMessage; -} - -void MessagePort::read_from_transport() -{ - auto&& [bytes, fds] = m_transport->read_as_much_as_possible_without_blocking([this] { + if (schedule_shutdown == IPC::TransportSocket::ShouldShutdown::Yes) { queue_global_task(Task::Source::PostedMessage, relevant_global_object(*this), GC::create_function(heap(), [this] { this->close(); })); - }); - - m_buffered_data.append(bytes.data(), bytes.size()); - - for (auto fd : fds) - m_unprocessed_fds.enqueue(IPC::File::adopt_fd(fd)); - - while (true) { - auto parse_decision_or_error = parse_message(); - if (parse_decision_or_error.is_error()) { - dbgln("MessagePort::read_from_socket(): Failed to parse message: {}", parse_decision_or_error.error()); - return; - } - if (parse_decision_or_error.value() == ParseDecision::NotEnoughData) - break; } } diff --git a/Libraries/LibWeb/HTML/MessagePort.h b/Libraries/LibWeb/HTML/MessagePort.h index 29f75fd0285..cf8ad8e1879 100644 --- a/Libraries/LibWeb/HTML/MessagePort.h +++ b/Libraries/LibWeb/HTML/MessagePort.h @@ -1,6 +1,7 @@ /* * Copyright (c) 2021, Andreas Kling * Copyright (c) 2023, Andrew Kaster + * Copyright (c) 2025, Aliaksandr Kalenik * * SPDX-License-Identifier: BSD-2-Clause */ @@ -79,12 +80,6 @@ private: ErrorOr send_message_on_transport(SerializedTransferRecord const&); void read_from_transport(); - enum class ParseDecision { - NotEnoughData, - ParseNextMessage, - }; - ErrorOr parse_message(); - // The HTML spec implies(!) that this is MessagePort.[[RemotePort]] GC::Ptr m_remote_port; @@ -93,15 +88,6 @@ private: Optional m_transport; - enum class SocketState : u8 { - Header, - Data, - Error, - } m_socket_state { SocketState::Header }; - size_t m_socket_incoming_message_size { 0 }; - IPC::UnprocessedFileDescriptors m_unprocessed_fds; - Vector m_buffered_data; - GC::Ptr m_worker_event_target; };