diff --git a/Libraries/LibIPC/Connection.cpp b/Libraries/LibIPC/Connection.cpp index 8f15685393d..cc02f30d85b 100644 --- a/Libraries/LibIPC/Connection.cpp +++ b/Libraries/LibIPC/Connection.cpp @@ -12,6 +12,7 @@ #include #include #include +#include namespace IPC { @@ -39,16 +40,21 @@ bool ConnectionBase::is_open() const ErrorOr ConnectionBase::post_message(Message const& message) { - return post_message(TRY(message.encode())); + return post_message(message.endpoint_magic(), TRY(message.encode())); } -ErrorOr ConnectionBase::post_message(MessageBuffer buffer) +ErrorOr ConnectionBase::post_message(u32 endpoint_magic, MessageBuffer buffer) { // NOTE: If this connection is being shut down, but has not yet been destroyed, // the socket will be closed. Don't try to send more messages. if (!m_transport->is_open()) return Error::from_string_literal("Trying to post_message during IPC shutdown"); + if (buffer.data().size() > TransportSocket::SOCKET_BUFFER_SIZE) { + auto wrapper = LargeMessageWrapper::create(endpoint_magic, buffer); + buffer = MUST(wrapper->encode()); + } + MUST(buffer.transfer_message(*m_transport)); m_responsiveness_timer->start(); @@ -79,7 +85,7 @@ void ConnectionBase::handle_messages() } if (auto response = handler_result.release_value()) { - if (auto post_result = post_message(*response); post_result.is_error()) { + if (auto post_result = post_message(m_local_endpoint_magic, *response); post_result.is_error()) { dbgln("IPC::ConnectionBase::handle_messages: {}", post_result.error()); } } @@ -94,11 +100,24 @@ void ConnectionBase::wait_for_transport_to_become_readable() ErrorOr ConnectionBase::drain_messages_from_peer() { - auto schedule_shutdown = m_transport->read_as_many_messages_as_possible_without_blocking([&](auto&& raw_message) { - if (auto message = try_parse_message(raw_message.bytes, raw_message.fds)) { + auto schedule_shutdown = m_transport->read_as_many_messages_as_possible_without_blocking([&](auto&& unparsed_message) { + auto const& bytes = unparsed_message.bytes; + UnprocessedFileDescriptors unprocessed_fds; + unprocessed_fds.return_fds_to_front_of_queue(move(unparsed_message.fds)); + if (auto message = try_parse_message(bytes, unprocessed_fds)) { + if (message->message_id() == LargeMessageWrapper::MESSAGE_ID) { + LargeMessageWrapper* wrapper = static_cast(message.ptr()); + auto wrapped_message = wrapper->wrapped_message_data(); + unprocessed_fds.return_fds_to_front_of_queue(wrapper->take_fds()); + auto parsed_message = try_parse_message(wrapped_message, unprocessed_fds); + VERIFY(parsed_message); + m_unprocessed_messages.append(parsed_message.release_nonnull()); + return; + } + m_unprocessed_messages.append(message.release_nonnull()); } else { - dbgln("Failed to parse IPC message {:hex-dump}", raw_message.bytes); + dbgln("Failed to parse IPC message {:hex-dump}", bytes); VERIFY_NOT_REACHED(); } }); diff --git a/Libraries/LibIPC/Connection.h b/Libraries/LibIPC/Connection.h index c1644e66b6a..0f5b918226f 100644 --- a/Libraries/LibIPC/Connection.h +++ b/Libraries/LibIPC/Connection.h @@ -15,6 +15,10 @@ #include #include #include +#include +#include +#include +#include namespace IPC { @@ -26,7 +30,7 @@ public: [[nodiscard]] bool is_open() const; ErrorOr post_message(Message const&); - ErrorOr post_message(MessageBuffer); + ErrorOr post_message(u32 endpoint_magic, MessageBuffer); void shutdown(); virtual void die() { } @@ -39,7 +43,7 @@ protected: virtual void may_have_become_unresponsive() { } virtual void did_become_responsive() { } virtual void shutdown_with_error(Error const&); - virtual OwnPtr try_parse_message(ReadonlyBytes, Queue&) = 0; + virtual OwnPtr try_parse_message(ReadonlyBytes, UnprocessedFileDescriptors&) = 0; OwnPtr wait_for_specific_endpoint_message_impl(u32 endpoint_magic, int message_id); void wait_for_transport_to_become_readable(); @@ -98,7 +102,7 @@ protected: return {}; } - virtual OwnPtr try_parse_message(ReadonlyBytes bytes, Queue& fds) override + virtual OwnPtr try_parse_message(ReadonlyBytes bytes, UnprocessedFileDescriptors& fds) override { auto local_message = LocalEndpoint::decode_message(bytes, fds); if (!local_message.is_error()) diff --git a/Libraries/LibIPC/Decoder.h b/Libraries/LibIPC/Decoder.h index 7fa284c26a2..c7fdd892b72 100644 --- a/Libraries/LibIPC/Decoder.h +++ b/Libraries/LibIPC/Decoder.h @@ -23,6 +23,7 @@ #include #include #include +#include #include #include @@ -37,7 +38,7 @@ inline ErrorOr decode(Decoder&) class Decoder { public: - Decoder(Stream& stream, Queue& files) + Decoder(Stream& stream, UnprocessedFileDescriptors& files) : m_stream(stream) , m_files(files) { @@ -62,11 +63,11 @@ public: ErrorOr decode_size(); Stream& stream() { return m_stream; } - Queue& files() { return m_files; } + UnprocessedFileDescriptors& files() { return m_files; } private: Stream& m_stream; - Queue& m_files; + UnprocessedFileDescriptors& m_files; }; template diff --git a/Libraries/LibIPC/Message.cpp b/Libraries/LibIPC/Message.cpp index 680cc329d08..5652bac21cb 100644 --- a/Libraries/LibIPC/Message.cpp +++ b/Libraries/LibIPC/Message.cpp @@ -6,6 +6,7 @@ #include #include +#include #include namespace IPC { @@ -46,4 +47,53 @@ ErrorOr MessageBuffer::transfer_message(Transport& transport) return {}; } +NonnullOwnPtr LargeMessageWrapper::create(u32 endpoint_magic, MessageBuffer& buffer_to_wrap) +{ + auto size = buffer_to_wrap.data().size(); + auto wrapped_message_data = MUST(Core::AnonymousBuffer::create_with_size(size)); + memcpy(wrapped_message_data.data(), buffer_to_wrap.data().data(), size); + Vector files; + for (auto& owned_fd : buffer_to_wrap.take_fds()) { + files.append(File::adopt_fd(owned_fd->take_fd())); + } + return make(endpoint_magic, move(wrapped_message_data), move(files)); +} + +LargeMessageWrapper::LargeMessageWrapper(u32 endpoint_magic, Core::AnonymousBuffer wrapped_message_data, Vector&& wrapped_fds) + : m_endpoint_magic(endpoint_magic) + , m_wrapped_message_data(move(wrapped_message_data)) + , m_wrapped_fds(move(wrapped_fds)) +{ +} + +ErrorOr LargeMessageWrapper::encode() const +{ + MessageBuffer buffer; + Encoder stream { buffer }; + TRY(stream.encode(m_endpoint_magic)); + TRY(stream.encode(MESSAGE_ID)); + TRY(stream.encode(m_wrapped_message_data)); + TRY(stream.encode(m_wrapped_fds.size())); + for (auto const& wrapped_fd : m_wrapped_fds) { + TRY(stream.append_file_descriptor(wrapped_fd.take_fd())); + } + + return buffer; +} + +ErrorOr> LargeMessageWrapper::decode(u32 endpoint_magic, Stream& stream, UnprocessedFileDescriptors& files) +{ + Decoder decoder { stream, files }; + auto wrapped_message_data = TRY(decoder.decode()); + + Vector wrapped_fds; + auto num_fds = TRY(decoder.decode()); + for (u32 i = 0; i < num_fds; ++i) { + auto fd = TRY(decoder.decode()); + wrapped_fds.append(move(fd)); + } + + return make(endpoint_magic, wrapped_message_data, move(wrapped_fds)); +} + } diff --git a/Libraries/LibIPC/Message.h b/Libraries/LibIPC/Message.h index e79638618ed..65333340f2a 100644 --- a/Libraries/LibIPC/Message.h +++ b/Libraries/LibIPC/Message.h @@ -8,8 +8,14 @@ #pragma once #include +#include +#include #include +#include +#include +#include #include +#include namespace IPC { @@ -61,4 +67,30 @@ protected: Message() = default; }; +class LargeMessageWrapper : public Message { +public: + ~LargeMessageWrapper() override = default; + + static constexpr int MESSAGE_ID = 0x0; + + static NonnullOwnPtr create(u32 endpoint_magic, MessageBuffer& buffer_to_wrap); + + u32 endpoint_magic() const override { return m_endpoint_magic; } + int message_id() const override { return MESSAGE_ID; } + char const* message_name() const override { return "LargeMessageWrapper"; } + ErrorOr encode() const override; + + static ErrorOr> decode(u32 endpoint_magic, Stream& stream, UnprocessedFileDescriptors& files); + + ReadonlyBytes wrapped_message_data() const { return ReadonlyBytes { m_wrapped_message_data.data(), m_wrapped_message_data.size() }; } + auto take_fds() { return move(m_wrapped_fds); } + + LargeMessageWrapper(u32 endpoint_magic, Core::AnonymousBuffer wrapped_message_data, Vector&& wrapped_fds); + +private: + u32 m_endpoint_magic { 0 }; + Core::AnonymousBuffer m_wrapped_message_data; + Vector m_wrapped_fds; +}; + } diff --git a/Libraries/LibIPC/TransportSocket.cpp b/Libraries/LibIPC/TransportSocket.cpp index 056d956407c..86239539ce7 100644 --- a/Libraries/LibIPC/TransportSocket.cpp +++ b/Libraries/LibIPC/TransportSocket.cpp @@ -173,7 +173,7 @@ ErrorOr TransportSocket::send_message(Core::LocalSocket& socket, ReadonlyB return {}; } -TransportSocket::ShouldShutdown TransportSocket::read_as_many_messages_as_possible_without_blocking(Function&& callback) +TransportSocket::ShouldShutdown TransportSocket::read_as_many_messages_as_possible_without_blocking(Function&& callback) { bool should_shutdown = false; while (is_open()) { @@ -222,7 +222,7 @@ TransportSocket::ShouldShutdown TransportSocket::read_as_many_messages_as_possib Message message; received_fd_count += header.fd_count; for (size_t i = 0; i < header.fd_count; ++i) - message.fds.enqueue(m_unprocessed_fds.dequeue()); + message.fds.append(m_unprocessed_fds.dequeue()); message.bytes.append(m_unprocessed_bytes.data() + index + sizeof(MessageHeader), header.payload_size); callback(move(message)); } else if (header.type == MessageHeader::Type::FileDescriptorAcknowledgement) { diff --git a/Libraries/LibIPC/TransportSocket.h b/Libraries/LibIPC/TransportSocket.h index 2a4dd3a983b..2c466d0f08c 100644 --- a/Libraries/LibIPC/TransportSocket.h +++ b/Libraries/LibIPC/TransportSocket.h @@ -9,6 +9,7 @@ #include #include +#include #include #include #include @@ -65,9 +66,9 @@ public: }; struct Message { Vector bytes; - Queue fds; + Vector fds; }; - ShouldShutdown read_as_many_messages_as_possible_without_blocking(Function&& schedule_shutdown); + ShouldShutdown read_as_many_messages_as_possible_without_blocking(Function&& schedule_shutdown); // Obnoxious name to make it clear that this is a dangerous operation. ErrorOr release_underlying_transport_for_transfer(); @@ -79,7 +80,7 @@ private: NonnullOwnPtr m_socket; ByteBuffer m_unprocessed_bytes; - Queue m_unprocessed_fds; + UnprocessedFileDescriptors m_unprocessed_fds; // After file descriptor is sent, it is moved to the wait queue until an acknowledgement is received from the peer. // This is necessary to handle a specific behavior of the macOS kernel, which may prematurely garbage-collect the file diff --git a/Libraries/LibIPC/UnprocessedFileDescriptors.h b/Libraries/LibIPC/UnprocessedFileDescriptors.h new file mode 100644 index 00000000000..991c5598b8a --- /dev/null +++ b/Libraries/LibIPC/UnprocessedFileDescriptors.h @@ -0,0 +1,36 @@ +/* + * Copyright (c) 2025, Aliaksandr Kalenik + * + * SPDX-License-Identifier: BSD-2-Clause + */ + +#pragma once + +#include + +namespace IPC { + +class UnprocessedFileDescriptors { +public: + void enqueue(File&& fd) + { + m_fds.append(move(fd)); + } + + File dequeue() + { + return m_fds.take_first(); + } + + void return_fds_to_front_of_queue(Vector&& fds) + { + m_fds.prepend(move(fds)); + } + + size_t size() const { return m_fds.size(); } + +private: + Vector m_fds; +}; + +} diff --git a/Libraries/LibWeb/HTML/MessagePort.cpp b/Libraries/LibWeb/HTML/MessagePort.cpp index 97d6a808e80..03fe434b6ec 100644 --- a/Libraries/LibWeb/HTML/MessagePort.cpp +++ b/Libraries/LibWeb/HTML/MessagePort.cpp @@ -288,9 +288,13 @@ void MessagePort::post_port_message(SerializedTransferRecord serialize_with_tran void MessagePort::read_from_transport() { - auto schedule_shutdown = m_transport->read_as_many_messages_as_possible_without_blocking([this](auto&& raw_message) { - FixedMemoryStream stream { raw_message.bytes.span(), FixedMemoryStream::Mode::ReadOnly }; - IPC::Decoder decoder { stream, raw_message.fds }; + auto schedule_shutdown = m_transport->read_as_many_messages_as_possible_without_blocking([this](auto&& unparsed_message) { + auto& bytes = unparsed_message.bytes; + IPC::UnprocessedFileDescriptors unprocessed_fds; + unprocessed_fds.return_fds_to_front_of_queue(move(unparsed_message.fds)); + + FixedMemoryStream stream { bytes.span(), FixedMemoryStream::Mode::ReadOnly }; + IPC::Decoder decoder { stream, unprocessed_fds }; auto serialized_transfer_record = MUST(decoder.decode()); diff --git a/Libraries/LibWeb/HTML/MessagePort.h b/Libraries/LibWeb/HTML/MessagePort.h index 24e3bbeccbf..0d48fd0adfb 100644 --- a/Libraries/LibWeb/HTML/MessagePort.h +++ b/Libraries/LibWeb/HTML/MessagePort.h @@ -13,6 +13,7 @@ #include #include #include +#include #include #include #include diff --git a/Meta/Lagom/Tools/CodeGenerators/IPCCompiler/main.cpp b/Meta/Lagom/Tools/CodeGenerators/IPCCompiler/main.cpp index c7fe62dfa4a..4d041620171 100644 --- a/Meta/Lagom/Tools/CodeGenerators/IPCCompiler/main.cpp +++ b/Meta/Lagom/Tools/CodeGenerators/IPCCompiler/main.cpp @@ -404,7 +404,7 @@ public:)~~~"); static i32 static_message_id() { return (int)MessageID::@message.pascal_name@; } virtual const char* message_name() const override { return "@endpoint.name@::@message.pascal_name@"; } - static ErrorOr> decode(Stream& stream, Queue& files) + static ErrorOr> decode(Stream& stream, IPC::UnprocessedFileDescriptors& files) { IPC::Decoder decoder { stream, files };)~~~"); @@ -649,7 +649,7 @@ void generate_proxy_method(SourceGenerator& message_generator, Endpoint const& e } } else { message_generator.append(R"~~~()); - MUST(m_connection.post_message(move(message_buffer))); )~~~"); + MUST(m_connection.post_message(@endpoint.magic@, move(message_buffer))); )~~~"); } message_generator.appendln(R"~~~( @@ -720,7 +720,7 @@ public: static u32 static_magic() { return @endpoint.magic@; } - static ErrorOr> decode_message(ReadonlyBytes buffer, [[maybe_unused]] Queue& files) + static ErrorOr> decode_message(ReadonlyBytes buffer, [[maybe_unused]] IPC::UnprocessedFileDescriptors& files) { FixedMemoryStream stream { buffer }; auto message_endpoint_magic = TRY(stream.read_value());)~~~"); @@ -757,6 +757,11 @@ public: do_decode_message(message.response_name()); } + generator.append(R"~~~( + case (int)IPC::LargeMessageWrapper::MESSAGE_ID: + return TRY(IPC::LargeMessageWrapper::decode(message_endpoint_magic, stream, files)); +)~~~"); + generator.append(R"~~~( default:)~~~"); if constexpr (GENERATE_DEBUG) { @@ -898,6 +903,7 @@ void build(StringBuilder& builder, Vector const& endpoints) #include #include #include +#include #if defined(AK_COMPILER_CLANG) #pragma clang diagnostic push