diff --git a/Libraries/LibIPC/Connection.cpp b/Libraries/LibIPC/Connection.cpp index a93d25875fb..2e0d103977a 100644 --- a/Libraries/LibIPC/Connection.cpp +++ b/Libraries/LibIPC/Connection.cpp @@ -70,16 +70,21 @@ bool ConnectionBase::is_open() const ErrorOr ConnectionBase::post_message(Message const& message) { - return post_message(TRY(message.encode())); + return post_message(message.endpoint_magic(), TRY(message.encode())); } -ErrorOr ConnectionBase::post_message(MessageBuffer buffer) +ErrorOr ConnectionBase::post_message(u32 endpoint_magic, MessageBuffer buffer) { // NOTE: If this connection is being shut down, but has not yet been destroyed, // the socket will be closed. Don't try to send more messages. if (!m_transport.is_open()) return Error::from_string_literal("Trying to post_message during IPC shutdown"); + if (buffer.data().size() > TransportSocket::SOCKET_BUFFER_SIZE) { + auto wrapper = LargeMessageWrapper::create(endpoint_magic, buffer); + buffer = MUST(wrapper->encode()); + } + { Threading::MutexLocker locker(m_send_queue->mutex); m_send_queue->messages.append(move(buffer)); @@ -114,7 +119,7 @@ void ConnectionBase::handle_messages() } if (auto response = handler_result.release_value()) { - if (auto post_result = post_message(*response); post_result.is_error()) { + if (auto post_result = post_message(m_local_endpoint_magic, *response); post_result.is_error()) { dbgln("IPC::ConnectionBase::handle_messages: {}", post_result.error()); } } @@ -221,6 +226,15 @@ void ConnectionBase::try_parse_messages(Vector const& bytes, size_t& index) auto remaining_bytes = ReadonlyBytes { bytes.data() + index, message_size }; if (auto message = try_parse_message(remaining_bytes, m_unprocessed_fds)) { + if (message->message_id() == LargeMessageWrapper::MESSAGE_ID) { + LargeMessageWrapper* wrapper = static_cast(message.ptr()); + auto wrapped_message = wrapper->wrapped_message_data(); + m_unprocessed_fds.return_fds_to_front_of_queue(wrapper->take_fds()); + auto parsed_message = try_parse_message(wrapped_message, m_unprocessed_fds); + VERIFY(parsed_message); + m_unprocessed_messages.append(parsed_message.release_nonnull()); + continue; + } m_unprocessed_messages.append(message.release_nonnull()); continue; } diff --git a/Libraries/LibIPC/Connection.h b/Libraries/LibIPC/Connection.h index 9d100df40d5..e489f64ac1f 100644 --- a/Libraries/LibIPC/Connection.h +++ b/Libraries/LibIPC/Connection.h @@ -13,6 +13,7 @@ #include #include #include +#include #include #include #include @@ -27,7 +28,7 @@ public: [[nodiscard]] bool is_open() const; ErrorOr post_message(Message const&); - ErrorOr post_message(MessageBuffer); + ErrorOr post_message(u32 endpoint_magic, MessageBuffer); void shutdown(); virtual void die() { } @@ -40,7 +41,7 @@ protected: virtual void may_have_become_unresponsive() { } virtual void did_become_responsive() { } virtual void shutdown_with_error(Error const&); - virtual OwnPtr try_parse_message(ReadonlyBytes, Queue&) = 0; + virtual OwnPtr try_parse_message(ReadonlyBytes, UnprocessedFileDescriptors&) = 0; OwnPtr wait_for_specific_endpoint_message_impl(u32 endpoint_magic, int message_id); void wait_for_transport_to_become_readable(); @@ -57,7 +58,7 @@ protected: RefPtr m_responsiveness_timer; Vector> m_unprocessed_messages; - Queue m_unprocessed_fds; // unused on Windows + UnprocessedFileDescriptors m_unprocessed_fds; ByteBuffer m_unprocessed_bytes; u32 m_local_endpoint_magic { 0 }; @@ -113,7 +114,7 @@ protected: return {}; } - virtual OwnPtr try_parse_message(ReadonlyBytes bytes, Queue& fds) override + virtual OwnPtr try_parse_message(ReadonlyBytes bytes, UnprocessedFileDescriptors& fds) override { auto local_message = LocalEndpoint::decode_message(bytes, fds); if (!local_message.is_error()) diff --git a/Libraries/LibIPC/Decoder.h b/Libraries/LibIPC/Decoder.h index 709ad106eaa..c7fdd892b72 100644 --- a/Libraries/LibIPC/Decoder.h +++ b/Libraries/LibIPC/Decoder.h @@ -23,6 +23,7 @@ #include #include #include +#include #include #include @@ -37,7 +38,7 @@ inline ErrorOr decode(Decoder&) class Decoder { public: - Decoder(Stream& stream, Queue& files) + Decoder(Stream& stream, UnprocessedFileDescriptors& files) : m_stream(stream) , m_files(files) { @@ -62,11 +63,11 @@ public: ErrorOr decode_size(); Stream& stream() { return m_stream; } - Queue& files() { return m_files; } + UnprocessedFileDescriptors& files() { return m_files; } private: Stream& m_stream; - Queue& m_files; + UnprocessedFileDescriptors& m_files; }; template diff --git a/Libraries/LibIPC/File.cpp b/Libraries/LibIPC/File.cpp index 6f664564059..317b82d05f6 100644 --- a/Libraries/LibIPC/File.cpp +++ b/Libraries/LibIPC/File.cpp @@ -14,7 +14,7 @@ namespace IPC { template<> ErrorOr decode(Decoder& decoder) { - auto file = TRY(decoder.files().try_dequeue()); + auto file = decoder.files().dequeue(); TRY(Core::System::set_close_on_exec(file.fd(), true)); return file; } diff --git a/Libraries/LibIPC/Message.cpp b/Libraries/LibIPC/Message.cpp index 58778e6ae87..063ed4d27ff 100644 --- a/Libraries/LibIPC/Message.cpp +++ b/Libraries/LibIPC/Message.cpp @@ -5,6 +5,8 @@ */ #include +#include +#include #include namespace IPC { @@ -59,4 +61,54 @@ ErrorOr MessageBuffer::transfer_message(Transport& transport) return {}; } +NonnullOwnPtr LargeMessageWrapper::create(u32 endpoint_magic, MessageBuffer& buffer_to_wrap) +{ + auto size = buffer_to_wrap.data().size() - sizeof(MessageSizeType); + u8 const* data = buffer_to_wrap.data().data() + sizeof(MessageSizeType); + auto wrapped_message_data = MUST(Core::AnonymousBuffer::create_with_size(size)); + memcpy(wrapped_message_data.data(), data, size); + Vector files; + for (auto& owned_fd : buffer_to_wrap.take_fds()) { + files.append(File::adopt_fd(owned_fd->take_fd())); + } + return make(endpoint_magic, move(wrapped_message_data), move(files)); +} + +LargeMessageWrapper::LargeMessageWrapper(u32 endpoint_magic, Core::AnonymousBuffer wrapped_message_data, Vector&& wrapped_fds) + : m_endpoint_magic(endpoint_magic) + , m_wrapped_message_data(move(wrapped_message_data)) + , m_wrapped_fds(move(wrapped_fds)) +{ +} + +ErrorOr LargeMessageWrapper::encode() const +{ + MessageBuffer buffer; + Encoder stream { buffer }; + TRY(stream.encode(m_endpoint_magic)); + TRY(stream.encode(MESSAGE_ID)); + TRY(stream.encode(m_wrapped_message_data)); + TRY(stream.encode(m_wrapped_fds.size())); + for (auto const& wrapped_fd : m_wrapped_fds) { + TRY(stream.append_file_descriptor(wrapped_fd.take_fd())); + } + + return buffer; +} + +ErrorOr> LargeMessageWrapper::decode(u32 endpoint_magic, Stream& stream, UnprocessedFileDescriptors& files) +{ + Decoder decoder { stream, files }; + auto wrapped_message_data = TRY(decoder.decode()); + + Vector wrapped_fds; + auto num_fds = TRY(decoder.decode()); + for (u32 i = 0; i < num_fds; ++i) { + auto fd = TRY(decoder.decode()); + wrapped_fds.append(move(fd)); + } + + return make(endpoint_magic, wrapped_message_data, move(wrapped_fds)); +} + } diff --git a/Libraries/LibIPC/Message.h b/Libraries/LibIPC/Message.h index 8321973269f..9dfbbebce68 100644 --- a/Libraries/LibIPC/Message.h +++ b/Libraries/LibIPC/Message.h @@ -11,9 +11,11 @@ #include #include #include +#include #include #include #include +#include namespace IPC { @@ -32,6 +34,13 @@ public: int value() const { return m_fd; } + int take_fd() + { + int fd = m_fd; + m_fd = -1; + return fd; + } + private: int m_fd; }; @@ -40,6 +49,12 @@ class MessageBuffer { public: MessageBuffer(); + MessageBuffer(Vector data, Vector, 1> fds) + : m_data(move(data)) + , m_fds(move(fds)) + { + } + ErrorOr extend_data_capacity(size_t capacity); ErrorOr append_data(u8 const* values, size_t count); @@ -47,8 +62,12 @@ public: ErrorOr transfer_message(Transport& transport); + auto const& data() const { return m_data; } + auto take_fds() { return move(m_fds); } + private: Vector m_data; + bool m_fds_taken { false }; Vector, 1> m_fds; #ifdef AK_OS_WINDOWS Vector m_handle_offsets; @@ -75,4 +94,30 @@ protected: Message() = default; }; +class LargeMessageWrapper : public Message { +public: + ~LargeMessageWrapper() override = default; + + static constexpr int MESSAGE_ID = 0x0; + + static NonnullOwnPtr create(u32 endpoint_magic, MessageBuffer& buffer_to_wrap); + + u32 endpoint_magic() const override { return m_endpoint_magic; } + int message_id() const override { return MESSAGE_ID; } + char const* message_name() const override { return "LargeMessageWrapper"; } + ErrorOr encode() const override; + + static ErrorOr> decode(u32 endpoint_magic, Stream& stream, UnprocessedFileDescriptors& files); + + ReadonlyBytes wrapped_message_data() const { return ReadonlyBytes { m_wrapped_message_data.data(), m_wrapped_message_data.size() }; } + auto take_fds() { return move(m_wrapped_fds); } + + LargeMessageWrapper(u32 endpoint_magic, Core::AnonymousBuffer wrapped_message_data, Vector&& wrapped_fds); + +private: + u32 m_endpoint_magic { 0 }; + Core::AnonymousBuffer m_wrapped_message_data; + Vector m_wrapped_fds; +}; + } diff --git a/Libraries/LibIPC/TransportSocket.cpp b/Libraries/LibIPC/TransportSocket.cpp index c9e92880024..f6c375f7327 100644 --- a/Libraries/LibIPC/TransportSocket.cpp +++ b/Libraries/LibIPC/TransportSocket.cpp @@ -15,9 +15,8 @@ namespace IPC { TransportSocket::TransportSocket(NonnullOwnPtr socket) : m_socket(move(socket)) { - socklen_t socket_buffer_size = 128 * KiB; - (void)Core::System::setsockopt(m_socket->fd().value(), SOL_SOCKET, SO_SNDBUF, &socket_buffer_size, sizeof(socket_buffer_size)); - (void)Core::System::setsockopt(m_socket->fd().value(), SOL_SOCKET, SO_RCVBUF, &socket_buffer_size, sizeof(socket_buffer_size)); + (void)Core::System::setsockopt(m_socket->fd().value(), SOL_SOCKET, SO_SNDBUF, &SOCKET_BUFFER_SIZE, sizeof(SOCKET_BUFFER_SIZE)); + (void)Core::System::setsockopt(m_socket->fd().value(), SOL_SOCKET, SO_RCVBUF, &SOCKET_BUFFER_SIZE, sizeof(SOCKET_BUFFER_SIZE)); } TransportSocket::~TransportSocket() = default; diff --git a/Libraries/LibIPC/TransportSocket.h b/Libraries/LibIPC/TransportSocket.h index f533cc5922e..7bea8731117 100644 --- a/Libraries/LibIPC/TransportSocket.h +++ b/Libraries/LibIPC/TransportSocket.h @@ -15,6 +15,8 @@ class TransportSocket { AK_MAKE_DEFAULT_MOVABLE(TransportSocket); public: + static constexpr socklen_t SOCKET_BUFFER_SIZE = 128 * KiB; + explicit TransportSocket(NonnullOwnPtr socket); ~TransportSocket(); diff --git a/Libraries/LibIPC/UnprocessedFileDescriptors.h b/Libraries/LibIPC/UnprocessedFileDescriptors.h new file mode 100644 index 00000000000..ecdda2b7244 --- /dev/null +++ b/Libraries/LibIPC/UnprocessedFileDescriptors.h @@ -0,0 +1,34 @@ +/* + * Copyright (c) 2025, Aliaksandr Kalenik + * + * SPDX-License-Identifier: BSD-2-Clause + */ + +#pragma once + +#include + +namespace IPC { + +class UnprocessedFileDescriptors { +public: + void enqueue(File&& fd) + { + m_fds.append(move(fd)); + } + + File dequeue() + { + return m_fds.take_first(); + } + + void return_fds_to_front_of_queue(Vector&& fds) + { + m_fds.prepend(move(fds)); + } + +private: + Vector m_fds; +}; + +} diff --git a/Libraries/LibWeb/HTML/MessagePort.h b/Libraries/LibWeb/HTML/MessagePort.h index f671a7434a5..29f75fd0285 100644 --- a/Libraries/LibWeb/HTML/MessagePort.h +++ b/Libraries/LibWeb/HTML/MessagePort.h @@ -12,6 +12,7 @@ #include #include #include +#include #include #include #include @@ -98,7 +99,7 @@ private: Error, } m_socket_state { SocketState::Header }; size_t m_socket_incoming_message_size { 0 }; - Queue m_unprocessed_fds; + IPC::UnprocessedFileDescriptors m_unprocessed_fds; Vector m_buffered_data; GC::Ptr m_worker_event_target; diff --git a/Meta/Lagom/Tools/CodeGenerators/IPCCompiler/main.cpp b/Meta/Lagom/Tools/CodeGenerators/IPCCompiler/main.cpp index c7fe62dfa4a..4d041620171 100644 --- a/Meta/Lagom/Tools/CodeGenerators/IPCCompiler/main.cpp +++ b/Meta/Lagom/Tools/CodeGenerators/IPCCompiler/main.cpp @@ -404,7 +404,7 @@ public:)~~~"); static i32 static_message_id() { return (int)MessageID::@message.pascal_name@; } virtual const char* message_name() const override { return "@endpoint.name@::@message.pascal_name@"; } - static ErrorOr> decode(Stream& stream, Queue& files) + static ErrorOr> decode(Stream& stream, IPC::UnprocessedFileDescriptors& files) { IPC::Decoder decoder { stream, files };)~~~"); @@ -649,7 +649,7 @@ void generate_proxy_method(SourceGenerator& message_generator, Endpoint const& e } } else { message_generator.append(R"~~~()); - MUST(m_connection.post_message(move(message_buffer))); )~~~"); + MUST(m_connection.post_message(@endpoint.magic@, move(message_buffer))); )~~~"); } message_generator.appendln(R"~~~( @@ -720,7 +720,7 @@ public: static u32 static_magic() { return @endpoint.magic@; } - static ErrorOr> decode_message(ReadonlyBytes buffer, [[maybe_unused]] Queue& files) + static ErrorOr> decode_message(ReadonlyBytes buffer, [[maybe_unused]] IPC::UnprocessedFileDescriptors& files) { FixedMemoryStream stream { buffer }; auto message_endpoint_magic = TRY(stream.read_value());)~~~"); @@ -757,6 +757,11 @@ public: do_decode_message(message.response_name()); } + generator.append(R"~~~( + case (int)IPC::LargeMessageWrapper::MESSAGE_ID: + return TRY(IPC::LargeMessageWrapper::decode(message_endpoint_magic, stream, files)); +)~~~"); + generator.append(R"~~~( default:)~~~"); if constexpr (GENERATE_DEBUG) { @@ -898,6 +903,7 @@ void build(StringBuilder& builder, Vector const& endpoints) #include #include #include +#include #if defined(AK_COMPILER_CLANG) #pragma clang diagnostic push