LibIPC: Move early fd deallocation workaround to the transport layer

Reimplements c3121c9d at the transport layer, allowing us to solve the
same problem once, in a single place, for both the LibIPC connection and
MessagePort. This avoids exposing a workaround for a macOS specific Unix
domain socket issue to higher abstraction layers.
This commit is contained in:
Aliaksandr Kalenik 2025-04-07 23:41:24 +02:00
parent 4c7207db81
commit b4ff6c2c93
7 changed files with 78 additions and 128 deletions

View file

@ -16,11 +16,10 @@
namespace IPC {
ConnectionBase::ConnectionBase(IPC::Stub& local_stub, Transport transport, u32 local_endpoint_magic, u32 peer_endpoint_magic)
ConnectionBase::ConnectionBase(IPC::Stub& local_stub, Transport transport, u32 local_endpoint_magic)
: m_local_stub(local_stub)
, m_transport(move(transport))
, m_local_endpoint_magic(local_endpoint_magic)
, m_peer_endpoint_magic(peer_endpoint_magic)
{
m_responsiveness_timer = Core::Timer::create_single_shot(3000, [this] { may_have_become_unresponsive(); });
@ -32,8 +31,7 @@ ConnectionBase::ConnectionBase(IPC::Stub& local_stub, Transport transport, u32 l
});
m_send_queue = adopt_ref(*new SendQueue);
m_acknowledgement_wait_queue = adopt_ref(*new AcknowledgementWaitQueue);
m_send_thread = Threading::Thread::construct([this, send_queue = m_send_queue, acknowledgement_wait_queue = m_acknowledgement_wait_queue]() -> intptr_t {
m_send_thread = Threading::Thread::construct([this, send_queue = m_send_queue]() -> intptr_t {
for (;;) {
send_queue->mutex.lock();
while (send_queue->messages.is_empty() && send_queue->running)
@ -44,14 +42,9 @@ ConnectionBase::ConnectionBase(IPC::Stub& local_stub, Transport transport, u32 l
break;
}
auto [message_buffer, needs_acknowledgement] = send_queue->messages.take_first();
auto message_buffer = send_queue->messages.take_first();
send_queue->mutex.unlock();
if (needs_acknowledgement == MessageNeedsAcknowledgement::Yes) {
Threading::MutexLocker lock(acknowledgement_wait_queue->mutex);
acknowledgement_wait_queue->messages.append(message_buffer);
}
if (auto result = message_buffer.transfer_message(m_transport); result.is_error()) {
dbgln("ConnectionBase::send_thread: {}", result.error());
continue;
@ -82,7 +75,7 @@ ErrorOr<void> ConnectionBase::post_message(Message const& message)
return post_message(message.endpoint_magic(), TRY(message.encode()));
}
ErrorOr<void> ConnectionBase::post_message(u32 endpoint_magic, MessageBuffer buffer, MessageNeedsAcknowledgement needs_acknowledgement)
ErrorOr<void> ConnectionBase::post_message(u32 endpoint_magic, MessageBuffer buffer)
{
// NOTE: If this connection is being shut down, but has not yet been destroyed,
// the socket will be closed. Don't try to send more messages.
@ -96,7 +89,7 @@ ErrorOr<void> ConnectionBase::post_message(u32 endpoint_magic, MessageBuffer buf
{
Threading::MutexLocker locker(m_send_queue->mutex);
m_send_queue->messages.append({ move(buffer), needs_acknowledgement });
m_send_queue->messages.append(move(buffer));
m_send_queue->condition.signal();
}
@ -143,8 +136,6 @@ void ConnectionBase::wait_for_transport_to_become_readable()
ErrorOr<void> ConnectionBase::drain_messages_from_peer()
{
u32 pending_ack_count = 0;
u32 received_ack_count = 0;
auto schedule_shutdown = m_transport.read_as_many_messages_as_possible_without_blocking([&](auto&& unparsed_message) {
auto const& bytes = unparsed_message.bytes;
UnprocessedFileDescriptors unprocessed_fds;
@ -156,19 +147,10 @@ ErrorOr<void> ConnectionBase::drain_messages_from_peer()
unprocessed_fds.return_fds_to_front_of_queue(wrapper->take_fds());
auto parsed_message = try_parse_message(wrapped_message, unprocessed_fds);
VERIFY(parsed_message);
VERIFY(parsed_message->message_id() != Acknowledgement::MESSAGE_ID);
pending_ack_count++;
m_unprocessed_messages.append(parsed_message.release_nonnull());
return;
}
if (message->message_id() == Acknowledgement::MESSAGE_ID) {
VERIFY(message->endpoint_magic() == m_local_endpoint_magic);
received_ack_count += static_cast<Acknowledgement*>(message.ptr())->ack_count();
return;
}
pending_ack_count++;
m_unprocessed_messages.append(message.release_nonnull());
} else {
dbgln("Failed to parse IPC message {:hex-dump}", bytes);
@ -176,17 +158,6 @@ ErrorOr<void> ConnectionBase::drain_messages_from_peer()
}
});
if (received_ack_count > 0) {
Threading::MutexLocker lock(m_acknowledgement_wait_queue->mutex);
for (size_t i = 0; i < received_ack_count; ++i)
m_acknowledgement_wait_queue->messages.take_first();
}
if (is_open() && pending_ack_count > 0) {
auto acknowledgement = Acknowledgement::create(m_peer_endpoint_magic, pending_ack_count);
MUST(post_message(m_peer_endpoint_magic, MUST(acknowledgement->encode()), MessageNeedsAcknowledgement::No));
}
if (!m_unprocessed_messages.is_empty()) {
m_responsiveness_timer->stop();
did_become_responsive();

View file

@ -28,14 +28,9 @@ class ConnectionBase : public Core::EventReceiver {
public:
virtual ~ConnectionBase() override;
enum class MessageNeedsAcknowledgement {
No,
Yes,
};
[[nodiscard]] bool is_open() const;
ErrorOr<void> post_message(Message const&);
ErrorOr<void> post_message(u32 endpoint_magic, MessageBuffer, MessageNeedsAcknowledgement = MessageNeedsAcknowledgement::Yes);
ErrorOr<void> post_message(u32 endpoint_magic, MessageBuffer);
void shutdown();
virtual void die() { }
@ -43,7 +38,7 @@ public:
Transport& transport() { return m_transport; }
protected:
explicit ConnectionBase(IPC::Stub&, Transport, u32 local_endpoint_magic, u32 peer_endpoint_magic);
explicit ConnectionBase(IPC::Stub&, Transport, u32 local_endpoint_magic);
virtual void may_have_become_unresponsive() { }
virtual void did_become_responsive() { }
@ -65,38 +60,23 @@ protected:
Vector<NonnullOwnPtr<Message>> m_unprocessed_messages;
u32 m_local_endpoint_magic { 0 };
u32 m_peer_endpoint_magic { 0 };
struct MessageToSend {
MessageBuffer buffer;
MessageNeedsAcknowledgement needs_acknowledgement { MessageNeedsAcknowledgement::Yes };
};
struct SendQueue : public AtomicRefCounted<SendQueue> {
AK::SinglyLinkedList<MessageToSend> messages;
AK::SinglyLinkedList<MessageBuffer> messages;
Threading::Mutex mutex;
Threading::ConditionVariable condition { mutex };
bool running { true };
};
// After a message is sent, it is moved to the acknowledgement wait queue until an acknowledgement is received from the peer.
// This is necessary to handle a specific behavior of the macOS kernel, which may prematurely garbage-collect the file
// descriptor contained in the message before the peer receives it. https://openradar.me/9477351
struct AcknowledgementWaitQueue : public AtomicRefCounted<AcknowledgementWaitQueue> {
AK::SinglyLinkedList<MessageBuffer> messages;
Threading::Mutex mutex;
};
RefPtr<Threading::Thread> m_send_thread;
RefPtr<SendQueue> m_send_queue;
RefPtr<AcknowledgementWaitQueue> m_acknowledgement_wait_queue;
};
template<typename LocalEndpoint, typename PeerEndpoint>
class Connection : public ConnectionBase {
public:
Connection(IPC::Stub& local_stub, Transport transport)
: ConnectionBase(local_stub, move(transport), LocalEndpoint::static_magic(), PeerEndpoint::static_magic())
: ConnectionBase(local_stub, move(transport), LocalEndpoint::static_magic())
{
}

View file

@ -105,32 +105,4 @@ ErrorOr<NonnullOwnPtr<LargeMessageWrapper>> LargeMessageWrapper::decode(u32 endp
return make<LargeMessageWrapper>(endpoint_magic, wrapped_message_data, move(wrapped_fds));
}
Acknowledgement::Acknowledgement(u32 endpoint_magic, u32 ack_count)
: m_endpoint_magic(endpoint_magic)
, m_ack_count(ack_count)
{
}
NonnullOwnPtr<Acknowledgement> Acknowledgement::create(u32 endpoint_magic, u32 ack_count)
{
return make<Acknowledgement>(endpoint_magic, ack_count);
}
ErrorOr<MessageBuffer> Acknowledgement::encode() const
{
MessageBuffer buffer;
Encoder stream { buffer };
TRY(stream.encode(m_endpoint_magic));
TRY(stream.encode(MESSAGE_ID));
TRY(stream.encode(m_ack_count));
return buffer;
}
ErrorOr<NonnullOwnPtr<Acknowledgement>> Acknowledgement::decode(u32 endpoint_magic, Stream& stream, UnprocessedFileDescriptors& files)
{
Decoder decoder { stream, files };
auto ack_count = TRY(decoder.decode<u32>());
return make<Acknowledgement>(endpoint_magic, ack_count);
}
}

View file

@ -119,28 +119,4 @@ private:
Vector<File> m_wrapped_fds;
};
class Acknowledgement : public Message {
public:
~Acknowledgement() override = default;
static constexpr int MESSAGE_ID = 0xFFFFFFFF;
static NonnullOwnPtr<Acknowledgement> create(u32 endpoint_magic, u32 ack_count);
u32 endpoint_magic() const override { return m_endpoint_magic; }
int message_id() const override { return MESSAGE_ID; }
char const* message_name() const override { return "Acknowledgement"; }
ErrorOr<MessageBuffer> encode() const override;
static ErrorOr<NonnullOwnPtr<Acknowledgement>> decode(u32 endpoint_magic, Stream& stream, UnprocessedFileDescriptors& files);
u32 ack_count() const { return m_ack_count; }
Acknowledgement(u32 endpoint_magic, u32 number_of_acknowledged_messages);
private:
u32 m_endpoint_magic { 0 };
u32 m_ack_count { 0 };
};
}

View file

@ -15,6 +15,8 @@ namespace IPC {
TransportSocket::TransportSocket(NonnullOwnPtr<Core::LocalSocket> socket)
: m_socket(move(socket))
, m_socket_write_mutex(make<Threading::Mutex>())
, m_fds_retained_until_received_by_peer(make<Threading::MutexProtected<Queue<File>>>())
{
(void)Core::System::setsockopt(m_socket->fd().value(), SOL_SOCKET, SO_SNDBUF, &SOCKET_BUFFER_SIZE, sizeof(SOCKET_BUFFER_SIZE));
(void)Core::System::setsockopt(m_socket->fd().value(), SOL_SOCKET, SO_RCVBUF, &SOCKET_BUFFER_SIZE, sizeof(SOCKET_BUFFER_SIZE));
@ -51,7 +53,12 @@ void TransportSocket::wait_until_readable()
}
struct MessageHeader {
u32 size { 0 };
enum class Type : u8 {
Payload = 0,
FileDescriptorAcknowledgement = 1,
};
Type type { Type::Payload };
u32 payload_size { 0 };
u32 fd_count { 0 };
};
@ -60,8 +67,9 @@ ErrorOr<void> TransportSocket::transfer_message(ReadonlyBytes bytes_to_write, Ve
Vector<u8> message_buffer;
message_buffer.resize(sizeof(MessageHeader) + bytes_to_write.size());
MessageHeader header;
header.size = bytes_to_write.size();
header.payload_size = bytes_to_write.size();
header.fd_count = unowned_fds.size();
header.type = MessageHeader::Type::Payload;
memcpy(message_buffer.data(), &header, sizeof(MessageHeader));
memcpy(message_buffer.data() + sizeof(MessageHeader), bytes_to_write.data(), bytes_to_write.size());
return transfer(message_buffer.span(), unowned_fds);
@ -69,6 +77,13 @@ ErrorOr<void> TransportSocket::transfer_message(ReadonlyBytes bytes_to_write, Ve
ErrorOr<void> TransportSocket::transfer(ReadonlyBytes bytes_to_write, Vector<int, 1> const& unowned_fds)
{
m_fds_retained_until_received_by_peer->with_locked([&unowned_fds](auto& queue) {
for (auto const& fd : unowned_fds)
queue.enqueue(MUST(File::clone_fd(fd)));
});
Threading::MutexLocker lock(*m_socket_write_mutex);
auto num_fds_to_transfer = unowned_fds.size();
while (!bytes_to_write.is_empty()) {
ErrorOr<ssize_t> maybe_nwritten = 0;
@ -118,7 +133,7 @@ ErrorOr<void> TransportSocket::transfer(ReadonlyBytes bytes_to_write, Vector<int
TransportSocket::ShouldShutdown TransportSocket::read_as_many_messages_as_possible_without_blocking(Function<void(Message)>&& callback)
{
auto should_shutdown = ShouldShutdown::No;
bool should_shutdown = false;
while (is_open()) {
u8 buffer[4096];
auto received_fds = Vector<int> {};
@ -130,7 +145,7 @@ TransportSocket::ShouldShutdown TransportSocket::read_as_many_messages_as_possib
}
if (error.is_syscall() && error.code() == ECONNRESET) {
should_shutdown = ShouldShutdown::Yes;
should_shutdown = true;
break;
}
@ -141,7 +156,7 @@ TransportSocket::ShouldShutdown TransportSocket::read_as_many_messages_as_possib
auto bytes_read = maybe_bytes_read.release_value();
if (bytes_read.is_empty()) {
should_shutdown = ShouldShutdown::Yes;
should_shutdown = true;
break;
}
@ -151,20 +166,50 @@ TransportSocket::ShouldShutdown TransportSocket::read_as_many_messages_as_possib
}
}
u32 received_fd_count = 0;
u32 acknowledged_fd_count = 0;
size_t index = 0;
while (index + sizeof(MessageHeader) <= m_unprocessed_bytes.size()) {
MessageHeader header;
memcpy(&header, m_unprocessed_bytes.data() + index, sizeof(MessageHeader));
if (header.size + sizeof(MessageHeader) > m_unprocessed_bytes.size() - index)
break;
if (header.fd_count > m_unprocessed_fds.size())
break;
Message message;
for (size_t i = 0; i < header.fd_count; ++i)
message.fds.append(m_unprocessed_fds.dequeue());
message.bytes.append(m_unprocessed_bytes.data() + index + sizeof(MessageHeader), header.size);
callback(move(message));
index += header.size + sizeof(MessageHeader);
if (header.type == MessageHeader::Type::Payload) {
if (header.payload_size + sizeof(MessageHeader) > m_unprocessed_bytes.size() - index)
break;
if (header.fd_count > m_unprocessed_fds.size())
break;
Message message;
received_fd_count += header.fd_count;
for (size_t i = 0; i < header.fd_count; ++i)
message.fds.append(m_unprocessed_fds.dequeue());
message.bytes.append(m_unprocessed_bytes.data() + index + sizeof(MessageHeader), header.payload_size);
callback(move(message));
} else if (header.type == MessageHeader::Type::FileDescriptorAcknowledgement) {
VERIFY(header.payload_size == 0);
acknowledged_fd_count += header.fd_count;
} else {
VERIFY_NOT_REACHED();
}
index += header.payload_size + sizeof(MessageHeader);
}
if (should_shutdown)
return ShouldShutdown::Yes;
if (acknowledged_fd_count > 0) {
m_fds_retained_until_received_by_peer->with_locked([&acknowledged_fd_count](auto& queue) {
while (acknowledged_fd_count > 0) {
queue.dequeue();
--acknowledged_fd_count;
}
});
}
if (received_fd_count > 0) {
MessageHeader header;
header.payload_size = 0;
header.fd_count = received_fd_count;
header.type = MessageHeader::Type::FileDescriptorAcknowledgement;
MUST(transfer({ &header, sizeof(MessageHeader) }, {}));
}
if (index < m_unprocessed_bytes.size()) {
@ -174,7 +219,7 @@ TransportSocket::ShouldShutdown TransportSocket::read_as_many_messages_as_possib
m_unprocessed_bytes.clear();
}
return should_shutdown;
return ShouldShutdown::No;
}
ErrorOr<int> TransportSocket::release_underlying_transport_for_transfer()

View file

@ -7,7 +7,9 @@
#pragma once
#include <AK/Queue.h>
#include <LibIPC/UnprocessedFileDescriptors.h>
#include <LibThreading/MutexProtected.h>
namespace IPC {
@ -48,8 +50,14 @@ private:
ErrorOr<void> transfer(ReadonlyBytes, Vector<int, 1> const& unowned_fds);
NonnullOwnPtr<Core::LocalSocket> m_socket;
NonnullOwnPtr<Threading::Mutex> m_socket_write_mutex;
ByteBuffer m_unprocessed_bytes;
UnprocessedFileDescriptors m_unprocessed_fds;
// After file descriptor is sent, it is moved to the wait queue until an acknowledgement is received from the peer.
// This is necessary to handle a specific behavior of the macOS kernel, which may prematurely garbage-collect the file
// descriptor contained in the message before the peer receives it. https://openradar.me/9477351
NonnullOwnPtr<Threading::MutexProtected<Queue<File>>> m_fds_retained_until_received_by_peer;
};
}

View file

@ -760,8 +760,6 @@ public:
generator.append(R"~~~(
case (int)IPC::LargeMessageWrapper::MESSAGE_ID:
return TRY(IPC::LargeMessageWrapper::decode(message_endpoint_magic, stream, files));
case (int)IPC::Acknowledgement::MESSAGE_ID:
return TRY(IPC::Acknowledgement::decode(message_endpoint_magic, stream, files));
)~~~");
generator.append(R"~~~(