diff --git a/Libraries/LibIPC/Connection.cpp b/Libraries/LibIPC/Connection.cpp index b50401da84a..b2811e34412 100644 --- a/Libraries/LibIPC/Connection.cpp +++ b/Libraries/LibIPC/Connection.cpp @@ -1,5 +1,6 @@ /* * Copyright (c) 2021-2024, Andreas Kling + * Copyright (c) 2025, Aliaksandr Kalenik * Copyright (c) 2022, the SerenityOS developers. * * SPDX-License-Identifier: BSD-2-Clause @@ -11,6 +12,7 @@ #include #include #include +#include namespace IPC { @@ -139,63 +141,65 @@ void ConnectionBase::wait_for_transport_to_become_readable() m_transport.wait_until_readable(); } -ErrorOr> ConnectionBase::read_as_much_as_possible_from_transport_without_blocking() -{ - Vector bytes; - - if (!m_unprocessed_bytes.is_empty()) { - bytes.append(m_unprocessed_bytes.data(), m_unprocessed_bytes.size()); - m_unprocessed_bytes.clear(); - } - - bool should_shut_down = false; - auto schedule_shutdown = [this, &should_shut_down]() { - should_shut_down = true; - deferred_invoke([this] { - shutdown(); - }); - }; - - auto&& [new_bytes, received_fds] = m_transport.read_as_much_as_possible_without_blocking(move(schedule_shutdown)); - bytes.append(new_bytes.data(), new_bytes.size()); - - for (auto const& fd : received_fds) - m_unprocessed_fds.enqueue(IPC::File::adopt_fd(fd)); - - if (!bytes.is_empty()) { - m_responsiveness_timer->stop(); - did_become_responsive(); - } else if (should_shut_down) { - return Error::from_string_literal("IPC connection EOF"); - } - - return bytes; -} - ErrorOr ConnectionBase::drain_messages_from_peer() { - auto bytes = TRY(read_as_much_as_possible_from_transport_without_blocking()); + u32 pending_ack_count = 0; + u32 received_ack_count = 0; + auto schedule_shutdown = m_transport.read_as_many_messages_as_possible_without_blocking([&](auto&& unparsed_message) { + auto const& bytes = unparsed_message.bytes; + UnprocessedFileDescriptors unprocessed_fds; + unprocessed_fds.return_fds_to_front_of_queue(move(unparsed_message.fds)); + if (auto message = try_parse_message(bytes, unprocessed_fds)) { + if (message->message_id() == LargeMessageWrapper::MESSAGE_ID) { + LargeMessageWrapper* wrapper = static_cast(message.ptr()); + auto wrapped_message = wrapper->wrapped_message_data(); + unprocessed_fds.return_fds_to_front_of_queue(wrapper->take_fds()); + auto parsed_message = try_parse_message(wrapped_message, unprocessed_fds); + VERIFY(parsed_message); + VERIFY(parsed_message->message_id() != Acknowledgement::MESSAGE_ID); + pending_ack_count++; + m_unprocessed_messages.append(parsed_message.release_nonnull()); + return; + } - size_t index = 0; - try_parse_messages(bytes, index); + if (message->message_id() == Acknowledgement::MESSAGE_ID) { + VERIFY(message->endpoint_magic() == m_local_endpoint_magic); + received_ack_count += static_cast(message.ptr())->ack_count(); + return; + } - if (index < bytes.size()) { - // Sometimes we might receive a partial message. That's okay, just stash away - // the unprocessed bytes and we'll prepend them to the next incoming message - // in the next run of this function. - auto remaining_bytes = TRY(ByteBuffer::copy(bytes.span().slice(index))); - if (!m_unprocessed_bytes.is_empty()) { - shutdown(); - return Error::from_string_literal("drain_messages_from_peer: Already have unprocessed bytes"); + pending_ack_count++; + m_unprocessed_messages.append(message.release_nonnull()); + } else { + dbgln("Failed to parse IPC message {:hex-dump}", bytes); + VERIFY_NOT_REACHED(); } - m_unprocessed_bytes = move(remaining_bytes); + }); + + if (received_ack_count > 0) { + Threading::MutexLocker lock(m_acknowledgement_wait_queue->mutex); + for (size_t i = 0; i < received_ack_count; ++i) + m_acknowledgement_wait_queue->messages.take_first(); + } + + if (is_open() && pending_ack_count > 0) { + auto acknowledgement = Acknowledgement::create(m_peer_endpoint_magic, pending_ack_count); + MUST(post_message(m_peer_endpoint_magic, MUST(acknowledgement->encode()), MessageNeedsAcknowledgement::No)); } if (!m_unprocessed_messages.is_empty()) { + m_responsiveness_timer->stop(); + did_become_responsive(); deferred_invoke([this] { handle_messages(); }); + } else if (schedule_shutdown == TransportSocket::ShouldShutdown::Yes) { + deferred_invoke([this] { + shutdown(); + }); + return Error::from_string_literal("IPC connection EOF"); } + return {}; } @@ -222,57 +226,4 @@ OwnPtr ConnectionBase::wait_for_specific_endpoint_message_impl(u32 return {}; } -void ConnectionBase::try_parse_messages(Vector const& bytes, size_t& index) -{ - u32 message_size = 0; - u32 pending_ack_count = 0; - u32 received_ack_count = 0; - for (; index + sizeof(message_size) < bytes.size(); index += message_size) { - memcpy(&message_size, bytes.data() + index, sizeof(message_size)); - if (message_size == 0 || bytes.size() - index - sizeof(uint32_t) < message_size) - break; - index += sizeof(message_size); - auto remaining_bytes = ReadonlyBytes { bytes.data() + index, message_size }; - - if (auto message = try_parse_message(remaining_bytes, m_unprocessed_fds)) { - if (message->message_id() == LargeMessageWrapper::MESSAGE_ID) { - LargeMessageWrapper* wrapper = static_cast(message.ptr()); - auto wrapped_message = wrapper->wrapped_message_data(); - m_unprocessed_fds.return_fds_to_front_of_queue(wrapper->take_fds()); - auto parsed_message = try_parse_message(wrapped_message, m_unprocessed_fds); - VERIFY(parsed_message); - VERIFY(parsed_message->message_id() != Acknowledgement::MESSAGE_ID); - pending_ack_count++; - m_unprocessed_messages.append(parsed_message.release_nonnull()); - continue; - } - - if (message->message_id() == Acknowledgement::MESSAGE_ID) { - VERIFY(message->endpoint_magic() == m_local_endpoint_magic); - received_ack_count += static_cast(message.ptr())->ack_count(); - continue; - } - - pending_ack_count++; - m_unprocessed_messages.append(message.release_nonnull()); - continue; - } - - dbgln("Failed to parse IPC message:"); - dbgln("{:hex-dump}", remaining_bytes); - break; - } - - if (received_ack_count > 0) { - Threading::MutexLocker lock(m_acknowledgement_wait_queue->mutex); - for (size_t i = 0; i < received_ack_count; ++i) - m_acknowledgement_wait_queue->messages.take_first(); - } - - if (is_open() && pending_ack_count > 0) { - auto acknowledgement = Acknowledgement::create(m_peer_endpoint_magic, pending_ack_count); - MUST(post_message(m_peer_endpoint_magic, MUST(acknowledgement->encode()), MessageNeedsAcknowledgement::No)); - } -} - } diff --git a/Libraries/LibIPC/Connection.h b/Libraries/LibIPC/Connection.h index 8577d529f3d..05c2ccf93cf 100644 --- a/Libraries/LibIPC/Connection.h +++ b/Libraries/LibIPC/Connection.h @@ -1,5 +1,6 @@ /* * Copyright (c) 2018-2024, Andreas Kling + * Copyright (c) 2025, Aliaksandr Kalenik * Copyright (c) 2022, the SerenityOS developers. * * SPDX-License-Identifier: BSD-2-Clause @@ -51,9 +52,7 @@ protected: OwnPtr wait_for_specific_endpoint_message_impl(u32 endpoint_magic, int message_id); void wait_for_transport_to_become_readable(); - ErrorOr> read_as_much_as_possible_from_transport_without_blocking(); ErrorOr drain_messages_from_peer(); - void try_parse_messages(Vector const& bytes, size_t& index); void handle_messages(); @@ -64,8 +63,6 @@ protected: RefPtr m_responsiveness_timer; Vector> m_unprocessed_messages; - UnprocessedFileDescriptors m_unprocessed_fds; - ByteBuffer m_unprocessed_bytes; u32 m_local_endpoint_magic { 0 }; u32 m_peer_endpoint_magic { 0 }; diff --git a/Libraries/LibIPC/Message.cpp b/Libraries/LibIPC/Message.cpp index 0bb186e0669..ffc83b5c537 100644 --- a/Libraries/LibIPC/Message.cpp +++ b/Libraries/LibIPC/Message.cpp @@ -15,7 +15,6 @@ using MessageSizeType = u32; MessageBuffer::MessageBuffer() { - m_data.resize(sizeof(MessageSizeType)); } ErrorOr MessageBuffer::extend_data_capacity(size_t capacity) @@ -40,13 +39,9 @@ ErrorOr MessageBuffer::append_file_descriptor(int fd) ErrorOr MessageBuffer::transfer_message(Transport& transport) { Checked checked_message_size { m_data.size() }; - checked_message_size -= sizeof(MessageSizeType); - - if (checked_message_size.has_overflow()) + if (checked_message_size.has_overflow()) { return Error::from_string_literal("Message is too large for IPC encoding"); - - MessageSizeType const message_size = checked_message_size.value(); - m_data.span().overwrite(0, reinterpret_cast(&message_size), sizeof(message_size)); + } auto raw_fds = Vector {}; auto num_fds_to_transfer = m_fds.size(); @@ -57,16 +52,15 @@ ErrorOr MessageBuffer::transfer_message(Transport& transport) } } - TRY(transport.transfer(m_data.span(), raw_fds)); + TRY(transport.transfer_message(m_data.span(), raw_fds)); return {}; } NonnullOwnPtr LargeMessageWrapper::create(u32 endpoint_magic, MessageBuffer& buffer_to_wrap) { - auto size = buffer_to_wrap.data().size() - sizeof(MessageSizeType); - u8 const* data = buffer_to_wrap.data().data() + sizeof(MessageSizeType); + auto size = buffer_to_wrap.data().size(); auto wrapped_message_data = MUST(Core::AnonymousBuffer::create_with_size(size)); - memcpy(wrapped_message_data.data(), data, size); + memcpy(wrapped_message_data.data(), buffer_to_wrap.data().data(), size); Vector files; for (auto& owned_fd : buffer_to_wrap.take_fds()) { files.append(File::adopt_fd(owned_fd->take_fd())); diff --git a/Libraries/LibIPC/TransportSocket.cpp b/Libraries/LibIPC/TransportSocket.cpp index f6c375f7327..01e717253de 100644 --- a/Libraries/LibIPC/TransportSocket.cpp +++ b/Libraries/LibIPC/TransportSocket.cpp @@ -1,5 +1,6 @@ /* * Copyright (c) 2024, Andrew Kaster + * Copyright (c) 2025, Aliaksandr Kalenik * * SPDX-License-Identifier: BSD-2-Clause */ @@ -49,6 +50,23 @@ void TransportSocket::wait_until_readable() VERIFY(maybe_did_become_readable.value()); } +struct MessageHeader { + u32 size { 0 }; + u32 fd_count { 0 }; +}; + +ErrorOr TransportSocket::transfer_message(ReadonlyBytes bytes_to_write, Vector const& unowned_fds) +{ + Vector message_buffer; + message_buffer.resize(sizeof(MessageHeader) + bytes_to_write.size()); + MessageHeader header; + header.size = bytes_to_write.size(); + header.fd_count = unowned_fds.size(); + memcpy(message_buffer.data(), &header, sizeof(MessageHeader)); + memcpy(message_buffer.data() + sizeof(MessageHeader), bytes_to_write.data(), bytes_to_write.size()); + return transfer(message_buffer.span(), unowned_fds); +} + ErrorOr TransportSocket::transfer(ReadonlyBytes bytes_to_write, Vector const& unowned_fds) { auto num_fds_to_transfer = unowned_fds.size(); @@ -98,15 +116,12 @@ ErrorOr TransportSocket::transfer(ReadonlyBytes bytes_to_write, Vector schedule_shutdown) +TransportSocket::ShouldShutdown TransportSocket::read_as_many_messages_as_possible_without_blocking(Function&& callback) { - u8 buffer[4096]; - - ReadResult result; - auto received_fds = Vector {}; - auto& bytes = result.bytes; - + auto should_shutdown = ShouldShutdown::No; while (is_open()) { + u8 buffer[4096]; + auto received_fds = Vector {}; auto maybe_bytes_read = m_socket->receive_message({ buffer, 4096 }, MSG_DONTWAIT, received_fds); if (maybe_bytes_read.is_error()) { auto error = maybe_bytes_read.release_error(); @@ -115,7 +130,7 @@ TransportSocket::ReadResult TransportSocket::read_as_much_as_possible_without_bl } if (error.is_syscall() && error.code() == ECONNRESET) { - schedule_shutdown(); + should_shutdown = ShouldShutdown::Yes; break; } @@ -126,15 +141,36 @@ TransportSocket::ReadResult TransportSocket::read_as_much_as_possible_without_bl auto bytes_read = maybe_bytes_read.release_value(); if (bytes_read.is_empty()) { - schedule_shutdown(); + should_shutdown = ShouldShutdown::Yes; break; } - bytes.append(bytes_read.data(), bytes_read.size()); - result.fds.append(received_fds.data(), received_fds.size()); + m_unprocessed_bytes.append(bytes_read.data(), bytes_read.size()); + for (auto const& fd : received_fds) { + m_unprocessed_fds.enqueue(File::adopt_fd(fd)); + } } - return result; + size_t index = 0; + while (index + sizeof(MessageHeader) < m_unprocessed_bytes.size()) { + MessageHeader header; + memcpy(&header, m_unprocessed_bytes.data() + index, sizeof(MessageHeader)); + Message message; + for (size_t i = 0; i < header.fd_count; ++i) + message.fds.append(m_unprocessed_fds.dequeue()); + message.bytes.append(m_unprocessed_bytes.data() + index + sizeof(MessageHeader), header.size); + callback(move(message)); + index += header.size + sizeof(MessageHeader); + } + + if (index < m_unprocessed_bytes.size()) { + auto remaining_bytes = MUST(ByteBuffer::copy(m_unprocessed_bytes.span().slice(index))); + m_unprocessed_bytes = move(remaining_bytes); + } else { + m_unprocessed_bytes.clear(); + } + + return should_shutdown; } ErrorOr TransportSocket::release_underlying_transport_for_transfer() diff --git a/Libraries/LibIPC/TransportSocket.h b/Libraries/LibIPC/TransportSocket.h index 7bea8731117..ac83139498c 100644 --- a/Libraries/LibIPC/TransportSocket.h +++ b/Libraries/LibIPC/TransportSocket.h @@ -1,12 +1,13 @@ /* * Copyright (c) 2024, Andrew Kaster + * Copyright (c) 2025, Aliaksandr Kalenik * * SPDX-License-Identifier: BSD-2-Clause */ #pragma once -#include +#include namespace IPC { @@ -26,13 +27,17 @@ public: void wait_until_readable(); - ErrorOr transfer(ReadonlyBytes, Vector const& unowned_fds); + ErrorOr transfer_message(ReadonlyBytes, Vector const& unowned_fds); - struct [[nodiscard]] ReadResult { - Vector bytes; - Vector fds; + enum class ShouldShutdown { + No, + Yes, }; - ReadResult read_as_much_as_possible_without_blocking(Function schedule_shutdown); + struct Message { + Vector bytes; + Vector fds; + }; + ShouldShutdown read_as_many_messages_as_possible_without_blocking(Function&& schedule_shutdown); // Obnoxious name to make it clear that this is a dangerous operation. ErrorOr release_underlying_transport_for_transfer(); @@ -40,7 +45,11 @@ public: ErrorOr clone_for_transfer(); private: + ErrorOr transfer(ReadonlyBytes, Vector const& unowned_fds); + NonnullOwnPtr m_socket; + ByteBuffer m_unprocessed_bytes; + UnprocessedFileDescriptors m_unprocessed_fds; }; } diff --git a/Libraries/LibWeb/HTML/MessagePort.cpp b/Libraries/LibWeb/HTML/MessagePort.cpp index 6e5a8264cfd..4a322d104a4 100644 --- a/Libraries/LibWeb/HTML/MessagePort.cpp +++ b/Libraries/LibWeb/HTML/MessagePort.cpp @@ -1,6 +1,7 @@ /* * Copyright (c) 2021, Andreas Kling * Copyright (c) 2023, Andrew Kaster + * Copyright (c) 2025, Aliaksandr Kalenik * * SPDX-License-Identifier: BSD-2-Clause */ @@ -285,75 +286,27 @@ void MessagePort::post_port_message(SerializedTransferRecord serialize_with_tran } } -ErrorOr MessagePort::parse_message() +void MessagePort::read_from_transport() { - static constexpr size_t HEADER_SIZE = sizeof(u32); + auto schedule_shutdown = m_transport->read_as_many_messages_as_possible_without_blocking([this](auto&& unparsed_message) { + auto& bytes = unparsed_message.bytes; + IPC::UnprocessedFileDescriptors unprocessed_fds; + unprocessed_fds.return_fds_to_front_of_queue(move(unparsed_message.fds)); - auto num_bytes_ready = m_buffered_data.size(); - switch (m_socket_state) { - case SocketState::Header: { - if (num_bytes_ready < HEADER_SIZE) - return ParseDecision::NotEnoughData; + FixedMemoryStream stream { bytes.span(), FixedMemoryStream::Mode::ReadOnly }; + IPC::Decoder decoder { stream, unprocessed_fds }; - m_socket_incoming_message_size = ByteReader::load32(m_buffered_data.data()); - // NOTE: We don't decrement the number of ready bytes because we want to remove the entire - // message + header from the buffer in one go on success - m_socket_state = SocketState::Data; - [[fallthrough]]; - } - case SocketState::Data: { - if (num_bytes_ready < HEADER_SIZE + m_socket_incoming_message_size) - return ParseDecision::NotEnoughData; + auto serialized_transfer_record = MUST(decoder.decode()); - auto payload = m_buffered_data.span().slice(HEADER_SIZE, m_socket_incoming_message_size); - - FixedMemoryStream stream { payload, FixedMemoryStream::Mode::ReadOnly }; - IPC::Decoder decoder { stream, m_unprocessed_fds }; - - auto serialized_transfer_record = TRY(decoder.decode()); - - // Make sure to advance our state machine before dispatching the MessageEvent, - // as dispatching events can run arbitrary JS (and cause us to receive another message!) - m_socket_state = SocketState::Header; - - m_buffered_data.remove(0, HEADER_SIZE + m_socket_incoming_message_size); - - // Note: this is step 7 of message_port_post_message_steps: - // 7. Add a task that runs the following steps to the port message queue of targetPort: queue_global_task(Task::Source::PostedMessage, relevant_global_object(*this), GC::create_function(heap(), [this, serialized_transfer_record = move(serialized_transfer_record)]() mutable { this->post_message_task_steps(serialized_transfer_record); })); + }); - break; - } - case SocketState::Error: - return Error::from_errno(ENOMSG); - } - - return ParseDecision::ParseNextMessage; -} - -void MessagePort::read_from_transport() -{ - auto&& [bytes, fds] = m_transport->read_as_much_as_possible_without_blocking([this] { + if (schedule_shutdown == IPC::TransportSocket::ShouldShutdown::Yes) { queue_global_task(Task::Source::PostedMessage, relevant_global_object(*this), GC::create_function(heap(), [this] { this->close(); })); - }); - - m_buffered_data.append(bytes.data(), bytes.size()); - - for (auto fd : fds) - m_unprocessed_fds.enqueue(IPC::File::adopt_fd(fd)); - - while (true) { - auto parse_decision_or_error = parse_message(); - if (parse_decision_or_error.is_error()) { - dbgln("MessagePort::read_from_socket(): Failed to parse message: {}", parse_decision_or_error.error()); - return; - } - if (parse_decision_or_error.value() == ParseDecision::NotEnoughData) - break; } } diff --git a/Libraries/LibWeb/HTML/MessagePort.h b/Libraries/LibWeb/HTML/MessagePort.h index 29f75fd0285..cf8ad8e1879 100644 --- a/Libraries/LibWeb/HTML/MessagePort.h +++ b/Libraries/LibWeb/HTML/MessagePort.h @@ -1,6 +1,7 @@ /* * Copyright (c) 2021, Andreas Kling * Copyright (c) 2023, Andrew Kaster + * Copyright (c) 2025, Aliaksandr Kalenik * * SPDX-License-Identifier: BSD-2-Clause */ @@ -79,12 +80,6 @@ private: ErrorOr send_message_on_transport(SerializedTransferRecord const&); void read_from_transport(); - enum class ParseDecision { - NotEnoughData, - ParseNextMessage, - }; - ErrorOr parse_message(); - // The HTML spec implies(!) that this is MessagePort.[[RemotePort]] GC::Ptr m_remote_port; @@ -93,15 +88,6 @@ private: Optional m_transport; - enum class SocketState : u8 { - Header, - Data, - Error, - } m_socket_state { SocketState::Header }; - size_t m_socket_incoming_message_size { 0 }; - IPC::UnprocessedFileDescriptors m_unprocessed_fds; - Vector m_buffered_data; - GC::Ptr m_worker_event_target; };