From ab35325003c37983ee5d1b384e777a78843f195d Mon Sep 17 00:00:00 2001 From: Aliaksandr Kalenik Date: Mon, 7 Apr 2025 23:41:24 +0200 Subject: [PATCH] LibIPC: Move early fd deallocation workaround to the transport layer Reimplements c3121c9d at the transport layer, allowing us to solve the same problem once, in a single place, for both the LibIPC connection and MessagePort. This avoids exposing a workaround for a macOS specific Unix domain socket issue to higher abstraction layers. --- Libraries/LibIPC/Connection.cpp | 39 ++-------- Libraries/LibIPC/Connection.h | 28 +------ Libraries/LibIPC/Message.cpp | 28 ------- Libraries/LibIPC/Message.h | 24 ------ Libraries/LibIPC/TransportSocket.cpp | 77 +++++++++++++++---- Libraries/LibIPC/TransportSocket.h | 8 ++ .../Tools/CodeGenerators/IPCCompiler/main.cpp | 2 - 7 files changed, 78 insertions(+), 128 deletions(-) diff --git a/Libraries/LibIPC/Connection.cpp b/Libraries/LibIPC/Connection.cpp index 0b31709cde8..51d1a13a4c1 100644 --- a/Libraries/LibIPC/Connection.cpp +++ b/Libraries/LibIPC/Connection.cpp @@ -16,11 +16,10 @@ namespace IPC { -ConnectionBase::ConnectionBase(IPC::Stub& local_stub, Transport transport, u32 local_endpoint_magic, u32 peer_endpoint_magic) +ConnectionBase::ConnectionBase(IPC::Stub& local_stub, Transport transport, u32 local_endpoint_magic) : m_local_stub(local_stub) , m_transport(move(transport)) , m_local_endpoint_magic(local_endpoint_magic) - , m_peer_endpoint_magic(peer_endpoint_magic) { m_responsiveness_timer = Core::Timer::create_single_shot(3000, [this] { may_have_become_unresponsive(); }); @@ -32,8 +31,7 @@ ConnectionBase::ConnectionBase(IPC::Stub& local_stub, Transport transport, u32 l }); m_send_queue = adopt_ref(*new SendQueue); - m_acknowledgement_wait_queue = adopt_ref(*new AcknowledgementWaitQueue); - m_send_thread = Threading::Thread::construct([this, send_queue = m_send_queue, acknowledgement_wait_queue = m_acknowledgement_wait_queue]() -> intptr_t { + m_send_thread = Threading::Thread::construct([this, send_queue = m_send_queue]() -> intptr_t { for (;;) { send_queue->mutex.lock(); while (send_queue->messages.is_empty() && send_queue->running) @@ -44,14 +42,9 @@ ConnectionBase::ConnectionBase(IPC::Stub& local_stub, Transport transport, u32 l break; } - auto [message_buffer, needs_acknowledgement] = send_queue->messages.take_first(); + auto message_buffer = send_queue->messages.take_first(); send_queue->mutex.unlock(); - if (needs_acknowledgement == MessageNeedsAcknowledgement::Yes) { - Threading::MutexLocker lock(acknowledgement_wait_queue->mutex); - acknowledgement_wait_queue->messages.append(message_buffer); - } - if (auto result = message_buffer.transfer_message(m_transport); result.is_error()) { dbgln("ConnectionBase::send_thread: {}", result.error()); continue; @@ -82,7 +75,7 @@ ErrorOr ConnectionBase::post_message(Message const& message) return post_message(message.endpoint_magic(), TRY(message.encode())); } -ErrorOr ConnectionBase::post_message(u32 endpoint_magic, MessageBuffer buffer, MessageNeedsAcknowledgement needs_acknowledgement) +ErrorOr ConnectionBase::post_message(u32 endpoint_magic, MessageBuffer buffer) { // NOTE: If this connection is being shut down, but has not yet been destroyed, // the socket will be closed. Don't try to send more messages. @@ -96,7 +89,7 @@ ErrorOr ConnectionBase::post_message(u32 endpoint_magic, MessageBuffer buf { Threading::MutexLocker locker(m_send_queue->mutex); - m_send_queue->messages.append({ move(buffer), needs_acknowledgement }); + m_send_queue->messages.append(move(buffer)); m_send_queue->condition.signal(); } @@ -143,8 +136,6 @@ void ConnectionBase::wait_for_transport_to_become_readable() ErrorOr ConnectionBase::drain_messages_from_peer() { - u32 pending_ack_count = 0; - u32 received_ack_count = 0; auto schedule_shutdown = m_transport.read_as_many_messages_as_possible_without_blocking([&](auto&& unparsed_message) { auto const& bytes = unparsed_message.bytes; UnprocessedFileDescriptors unprocessed_fds; @@ -156,19 +147,10 @@ ErrorOr ConnectionBase::drain_messages_from_peer() unprocessed_fds.return_fds_to_front_of_queue(wrapper->take_fds()); auto parsed_message = try_parse_message(wrapped_message, unprocessed_fds); VERIFY(parsed_message); - VERIFY(parsed_message->message_id() != Acknowledgement::MESSAGE_ID); - pending_ack_count++; m_unprocessed_messages.append(parsed_message.release_nonnull()); return; } - if (message->message_id() == Acknowledgement::MESSAGE_ID) { - VERIFY(message->endpoint_magic() == m_local_endpoint_magic); - received_ack_count += static_cast(message.ptr())->ack_count(); - return; - } - - pending_ack_count++; m_unprocessed_messages.append(message.release_nonnull()); } else { dbgln("Failed to parse IPC message {:hex-dump}", bytes); @@ -176,17 +158,6 @@ ErrorOr ConnectionBase::drain_messages_from_peer() } }); - if (received_ack_count > 0) { - Threading::MutexLocker lock(m_acknowledgement_wait_queue->mutex); - for (size_t i = 0; i < received_ack_count; ++i) - m_acknowledgement_wait_queue->messages.take_first(); - } - - if (is_open() && pending_ack_count > 0) { - auto acknowledgement = Acknowledgement::create(m_peer_endpoint_magic, pending_ack_count); - MUST(post_message(m_peer_endpoint_magic, MUST(acknowledgement->encode()), MessageNeedsAcknowledgement::No)); - } - if (!m_unprocessed_messages.is_empty()) { m_responsiveness_timer->stop(); did_become_responsive(); diff --git a/Libraries/LibIPC/Connection.h b/Libraries/LibIPC/Connection.h index 05c2ccf93cf..6a2d2f2ff59 100644 --- a/Libraries/LibIPC/Connection.h +++ b/Libraries/LibIPC/Connection.h @@ -28,14 +28,9 @@ class ConnectionBase : public Core::EventReceiver { public: virtual ~ConnectionBase() override; - enum class MessageNeedsAcknowledgement { - No, - Yes, - }; - [[nodiscard]] bool is_open() const; ErrorOr post_message(Message const&); - ErrorOr post_message(u32 endpoint_magic, MessageBuffer, MessageNeedsAcknowledgement = MessageNeedsAcknowledgement::Yes); + ErrorOr post_message(u32 endpoint_magic, MessageBuffer); void shutdown(); virtual void die() { } @@ -43,7 +38,7 @@ public: Transport& transport() { return m_transport; } protected: - explicit ConnectionBase(IPC::Stub&, Transport, u32 local_endpoint_magic, u32 peer_endpoint_magic); + explicit ConnectionBase(IPC::Stub&, Transport, u32 local_endpoint_magic); virtual void may_have_become_unresponsive() { } virtual void did_become_responsive() { } @@ -65,38 +60,23 @@ protected: Vector> m_unprocessed_messages; u32 m_local_endpoint_magic { 0 }; - u32 m_peer_endpoint_magic { 0 }; - - struct MessageToSend { - MessageBuffer buffer; - MessageNeedsAcknowledgement needs_acknowledgement { MessageNeedsAcknowledgement::Yes }; - }; struct SendQueue : public AtomicRefCounted { - AK::SinglyLinkedList messages; + AK::SinglyLinkedList messages; Threading::Mutex mutex; Threading::ConditionVariable condition { mutex }; bool running { true }; }; - // After a message is sent, it is moved to the acknowledgement wait queue until an acknowledgement is received from the peer. - // This is necessary to handle a specific behavior of the macOS kernel, which may prematurely garbage-collect the file - // descriptor contained in the message before the peer receives it. https://openradar.me/9477351 - struct AcknowledgementWaitQueue : public AtomicRefCounted { - AK::SinglyLinkedList messages; - Threading::Mutex mutex; - }; - RefPtr m_send_thread; RefPtr m_send_queue; - RefPtr m_acknowledgement_wait_queue; }; template class Connection : public ConnectionBase { public: Connection(IPC::Stub& local_stub, Transport transport) - : ConnectionBase(local_stub, move(transport), LocalEndpoint::static_magic(), PeerEndpoint::static_magic()) + : ConnectionBase(local_stub, move(transport), LocalEndpoint::static_magic()) { } diff --git a/Libraries/LibIPC/Message.cpp b/Libraries/LibIPC/Message.cpp index ffc83b5c537..35af54f6276 100644 --- a/Libraries/LibIPC/Message.cpp +++ b/Libraries/LibIPC/Message.cpp @@ -105,32 +105,4 @@ ErrorOr> LargeMessageWrapper::decode(u32 endp return make(endpoint_magic, wrapped_message_data, move(wrapped_fds)); } -Acknowledgement::Acknowledgement(u32 endpoint_magic, u32 ack_count) - : m_endpoint_magic(endpoint_magic) - , m_ack_count(ack_count) -{ -} - -NonnullOwnPtr Acknowledgement::create(u32 endpoint_magic, u32 ack_count) -{ - return make(endpoint_magic, ack_count); -} - -ErrorOr Acknowledgement::encode() const -{ - MessageBuffer buffer; - Encoder stream { buffer }; - TRY(stream.encode(m_endpoint_magic)); - TRY(stream.encode(MESSAGE_ID)); - TRY(stream.encode(m_ack_count)); - return buffer; -} - -ErrorOr> Acknowledgement::decode(u32 endpoint_magic, Stream& stream, UnprocessedFileDescriptors& files) -{ - Decoder decoder { stream, files }; - auto ack_count = TRY(decoder.decode()); - return make(endpoint_magic, ack_count); -} - } diff --git a/Libraries/LibIPC/Message.h b/Libraries/LibIPC/Message.h index 6a4ccae1940..7d1b741225a 100644 --- a/Libraries/LibIPC/Message.h +++ b/Libraries/LibIPC/Message.h @@ -119,28 +119,4 @@ private: Vector m_wrapped_fds; }; -class Acknowledgement : public Message { -public: - ~Acknowledgement() override = default; - - static constexpr int MESSAGE_ID = 0xFFFFFFFF; - - static NonnullOwnPtr create(u32 endpoint_magic, u32 ack_count); - - u32 endpoint_magic() const override { return m_endpoint_magic; } - int message_id() const override { return MESSAGE_ID; } - char const* message_name() const override { return "Acknowledgement"; } - ErrorOr encode() const override; - - static ErrorOr> decode(u32 endpoint_magic, Stream& stream, UnprocessedFileDescriptors& files); - - u32 ack_count() const { return m_ack_count; } - - Acknowledgement(u32 endpoint_magic, u32 number_of_acknowledged_messages); - -private: - u32 m_endpoint_magic { 0 }; - u32 m_ack_count { 0 }; -}; - } diff --git a/Libraries/LibIPC/TransportSocket.cpp b/Libraries/LibIPC/TransportSocket.cpp index 8522d19f0ad..ca0f8c387df 100644 --- a/Libraries/LibIPC/TransportSocket.cpp +++ b/Libraries/LibIPC/TransportSocket.cpp @@ -15,6 +15,8 @@ namespace IPC { TransportSocket::TransportSocket(NonnullOwnPtr socket) : m_socket(move(socket)) + , m_socket_write_mutex(make()) + , m_fds_retained_until_received_by_peer(make>>()) { (void)Core::System::setsockopt(m_socket->fd().value(), SOL_SOCKET, SO_SNDBUF, &SOCKET_BUFFER_SIZE, sizeof(SOCKET_BUFFER_SIZE)); (void)Core::System::setsockopt(m_socket->fd().value(), SOL_SOCKET, SO_RCVBUF, &SOCKET_BUFFER_SIZE, sizeof(SOCKET_BUFFER_SIZE)); @@ -51,7 +53,12 @@ void TransportSocket::wait_until_readable() } struct MessageHeader { - u32 size { 0 }; + enum class Type : u8 { + Payload = 0, + FileDescriptorAcknowledgement = 1, + }; + Type type { Type::Payload }; + u32 payload_size { 0 }; u32 fd_count { 0 }; }; @@ -60,8 +67,9 @@ ErrorOr TransportSocket::transfer_message(ReadonlyBytes bytes_to_write, Ve Vector message_buffer; message_buffer.resize(sizeof(MessageHeader) + bytes_to_write.size()); MessageHeader header; - header.size = bytes_to_write.size(); + header.payload_size = bytes_to_write.size(); header.fd_count = unowned_fds.size(); + header.type = MessageHeader::Type::Payload; memcpy(message_buffer.data(), &header, sizeof(MessageHeader)); memcpy(message_buffer.data() + sizeof(MessageHeader), bytes_to_write.data(), bytes_to_write.size()); return transfer(message_buffer.span(), unowned_fds); @@ -69,6 +77,13 @@ ErrorOr TransportSocket::transfer_message(ReadonlyBytes bytes_to_write, Ve ErrorOr TransportSocket::transfer(ReadonlyBytes bytes_to_write, Vector const& unowned_fds) { + m_fds_retained_until_received_by_peer->with_locked([&unowned_fds](auto& queue) { + for (auto const& fd : unowned_fds) + queue.enqueue(MUST(File::clone_fd(fd))); + }); + + Threading::MutexLocker lock(*m_socket_write_mutex); + auto num_fds_to_transfer = unowned_fds.size(); while (!bytes_to_write.is_empty()) { ErrorOr maybe_nwritten = 0; @@ -118,7 +133,7 @@ ErrorOr TransportSocket::transfer(ReadonlyBytes bytes_to_write, Vector&& callback) { - auto should_shutdown = ShouldShutdown::No; + bool should_shutdown = false; while (is_open()) { u8 buffer[4096]; auto received_fds = Vector {}; @@ -130,7 +145,7 @@ TransportSocket::ShouldShutdown TransportSocket::read_as_many_messages_as_possib } if (error.is_syscall() && error.code() == ECONNRESET) { - should_shutdown = ShouldShutdown::Yes; + should_shutdown = true; break; } @@ -141,7 +156,7 @@ TransportSocket::ShouldShutdown TransportSocket::read_as_many_messages_as_possib auto bytes_read = maybe_bytes_read.release_value(); if (bytes_read.is_empty()) { - should_shutdown = ShouldShutdown::Yes; + should_shutdown = true; break; } @@ -151,20 +166,50 @@ TransportSocket::ShouldShutdown TransportSocket::read_as_many_messages_as_possib } } + u32 received_fd_count = 0; + u32 acknowledged_fd_count = 0; size_t index = 0; while (index + sizeof(MessageHeader) <= m_unprocessed_bytes.size()) { MessageHeader header; memcpy(&header, m_unprocessed_bytes.data() + index, sizeof(MessageHeader)); - if (header.size + sizeof(MessageHeader) > m_unprocessed_bytes.size() - index) - break; - if (header.fd_count > m_unprocessed_fds.size()) - break; - Message message; - for (size_t i = 0; i < header.fd_count; ++i) - message.fds.append(m_unprocessed_fds.dequeue()); - message.bytes.append(m_unprocessed_bytes.data() + index + sizeof(MessageHeader), header.size); - callback(move(message)); - index += header.size + sizeof(MessageHeader); + if (header.type == MessageHeader::Type::Payload) { + if (header.payload_size + sizeof(MessageHeader) > m_unprocessed_bytes.size() - index) + break; + if (header.fd_count > m_unprocessed_fds.size()) + break; + Message message; + received_fd_count += header.fd_count; + for (size_t i = 0; i < header.fd_count; ++i) + message.fds.append(m_unprocessed_fds.dequeue()); + message.bytes.append(m_unprocessed_bytes.data() + index + sizeof(MessageHeader), header.payload_size); + callback(move(message)); + } else if (header.type == MessageHeader::Type::FileDescriptorAcknowledgement) { + VERIFY(header.payload_size == 0); + acknowledged_fd_count += header.fd_count; + } else { + VERIFY_NOT_REACHED(); + } + index += header.payload_size + sizeof(MessageHeader); + } + + if (should_shutdown) + return ShouldShutdown::Yes; + + if (acknowledged_fd_count > 0) { + m_fds_retained_until_received_by_peer->with_locked([&acknowledged_fd_count](auto& queue) { + while (acknowledged_fd_count > 0) { + queue.dequeue(); + --acknowledged_fd_count; + } + }); + } + + if (received_fd_count > 0) { + MessageHeader header; + header.payload_size = 0; + header.fd_count = received_fd_count; + header.type = MessageHeader::Type::FileDescriptorAcknowledgement; + MUST(transfer({ &header, sizeof(MessageHeader) }, {})); } if (index < m_unprocessed_bytes.size()) { @@ -174,7 +219,7 @@ TransportSocket::ShouldShutdown TransportSocket::read_as_many_messages_as_possib m_unprocessed_bytes.clear(); } - return should_shutdown; + return ShouldShutdown::No; } ErrorOr TransportSocket::release_underlying_transport_for_transfer() diff --git a/Libraries/LibIPC/TransportSocket.h b/Libraries/LibIPC/TransportSocket.h index ac83139498c..39c092cf827 100644 --- a/Libraries/LibIPC/TransportSocket.h +++ b/Libraries/LibIPC/TransportSocket.h @@ -7,7 +7,9 @@ #pragma once +#include #include +#include namespace IPC { @@ -48,8 +50,14 @@ private: ErrorOr transfer(ReadonlyBytes, Vector const& unowned_fds); NonnullOwnPtr m_socket; + NonnullOwnPtr m_socket_write_mutex; ByteBuffer m_unprocessed_bytes; UnprocessedFileDescriptors m_unprocessed_fds; + + // After file descriptor is sent, it is moved to the wait queue until an acknowledgement is received from the peer. + // This is necessary to handle a specific behavior of the macOS kernel, which may prematurely garbage-collect the file + // descriptor contained in the message before the peer receives it. https://openradar.me/9477351 + NonnullOwnPtr>> m_fds_retained_until_received_by_peer; }; } diff --git a/Meta/Lagom/Tools/CodeGenerators/IPCCompiler/main.cpp b/Meta/Lagom/Tools/CodeGenerators/IPCCompiler/main.cpp index 0c7c316b877..4d041620171 100644 --- a/Meta/Lagom/Tools/CodeGenerators/IPCCompiler/main.cpp +++ b/Meta/Lagom/Tools/CodeGenerators/IPCCompiler/main.cpp @@ -760,8 +760,6 @@ public: generator.append(R"~~~( case (int)IPC::LargeMessageWrapper::MESSAGE_ID: return TRY(IPC::LargeMessageWrapper::decode(message_endpoint_magic, stream, files)); - case (int)IPC::Acknowledgement::MESSAGE_ID: - return TRY(IPC::Acknowledgement::decode(message_endpoint_magic, stream, files)); )~~~"); generator.append(R"~~~(