LibIPC+Meta: Keep message buffer alive until acknowledged by peer

This change ensures that instead of immediately deallocating the message
buffer after sending, we retain it in an acknowledgement wait queue
until an acknowledgement is received from the peer. This is necessary
to handle a behavior of the macOS kernel, which may prematurely
garbage-collect file descriptors contained within the message buffer
before the peer receives them.

The acknowledgement mechanism assumes messages are received in the same
order they were sent so, each acknowledgement message simply indicates
the count of successfully received messages, specifying how many entries
can safely be removed from the acknowledgement wait queue.
This commit is contained in:
Aliaksandr Kalenik 2025-04-05 00:05:20 +02:00 committed by Andreas Kling
commit c3121c9d8a
Notes: github-actions[bot] 2025-04-05 21:15:26 +00:00
5 changed files with 121 additions and 16 deletions

View file

@ -14,10 +14,11 @@
namespace IPC {
ConnectionBase::ConnectionBase(IPC::Stub& local_stub, Transport transport, u32 local_endpoint_magic)
ConnectionBase::ConnectionBase(IPC::Stub& local_stub, Transport transport, u32 local_endpoint_magic, u32 peer_endpoint_magic)
: m_local_stub(local_stub)
, m_transport(move(transport))
, m_local_endpoint_magic(local_endpoint_magic)
, m_peer_endpoint_magic(peer_endpoint_magic)
{
m_responsiveness_timer = Core::Timer::create_single_shot(3000, [this] { may_have_become_unresponsive(); });
@ -29,21 +30,27 @@ ConnectionBase::ConnectionBase(IPC::Stub& local_stub, Transport transport, u32 l
});
m_send_queue = adopt_ref(*new SendQueue);
m_send_thread = Threading::Thread::construct([this, queue = m_send_queue]() -> intptr_t {
m_acknowledgement_wait_queue = adopt_ref(*new AcknowledgementWaitQueue);
m_send_thread = Threading::Thread::construct([this, send_queue = m_send_queue, acknowledgement_wait_queue = m_acknowledgement_wait_queue]() -> intptr_t {
for (;;) {
queue->mutex.lock();
while (queue->messages.is_empty() && queue->running)
queue->condition.wait();
send_queue->mutex.lock();
while (send_queue->messages.is_empty() && send_queue->running)
send_queue->condition.wait();
if (!queue->running) {
queue->mutex.unlock();
if (!send_queue->running) {
send_queue->mutex.unlock();
break;
}
auto message = queue->messages.take_first();
queue->mutex.unlock();
auto [message_buffer, needs_acknowledgement] = send_queue->messages.take_first();
send_queue->mutex.unlock();
if (auto result = message.transfer_message(m_transport); result.is_error()) {
if (needs_acknowledgement == MessageNeedsAcknowledgement::Yes) {
Threading::MutexLocker lock(acknowledgement_wait_queue->mutex);
acknowledgement_wait_queue->messages.append(message_buffer);
}
if (auto result = message_buffer.transfer_message(m_transport); result.is_error()) {
dbgln("ConnectionBase::send_thread: {}", result.error());
continue;
}
@ -73,7 +80,7 @@ ErrorOr<void> ConnectionBase::post_message(Message const& message)
return post_message(message.endpoint_magic(), TRY(message.encode()));
}
ErrorOr<void> ConnectionBase::post_message(u32 endpoint_magic, MessageBuffer buffer)
ErrorOr<void> ConnectionBase::post_message(u32 endpoint_magic, MessageBuffer buffer, MessageNeedsAcknowledgement needs_acknowledgement)
{
// NOTE: If this connection is being shut down, but has not yet been destroyed,
// the socket will be closed. Don't try to send more messages.
@ -87,7 +94,7 @@ ErrorOr<void> ConnectionBase::post_message(u32 endpoint_magic, MessageBuffer buf
{
Threading::MutexLocker locker(m_send_queue->mutex);
m_send_queue->messages.append(move(buffer));
m_send_queue->messages.append({ move(buffer), needs_acknowledgement });
m_send_queue->condition.signal();
}
@ -218,6 +225,8 @@ OwnPtr<IPC::Message> ConnectionBase::wait_for_specific_endpoint_message_impl(u32
void ConnectionBase::try_parse_messages(Vector<u8> const& bytes, size_t& index)
{
u32 message_size = 0;
u32 pending_ack_count = 0;
u32 received_ack_count = 0;
for (; index + sizeof(message_size) < bytes.size(); index += message_size) {
memcpy(&message_size, bytes.data() + index, sizeof(message_size));
if (message_size == 0 || bytes.size() - index - sizeof(uint32_t) < message_size)
@ -232,9 +241,19 @@ void ConnectionBase::try_parse_messages(Vector<u8> const& bytes, size_t& index)
m_unprocessed_fds.return_fds_to_front_of_queue(wrapper->take_fds());
auto parsed_message = try_parse_message(wrapped_message, m_unprocessed_fds);
VERIFY(parsed_message);
VERIFY(parsed_message->message_id() != Acknowledgement::MESSAGE_ID);
pending_ack_count++;
m_unprocessed_messages.append(parsed_message.release_nonnull());
continue;
}
if (message->message_id() == Acknowledgement::MESSAGE_ID) {
VERIFY(message->endpoint_magic() == m_local_endpoint_magic);
received_ack_count += static_cast<Acknowledgement*>(message.ptr())->ack_count();
continue;
}
pending_ack_count++;
m_unprocessed_messages.append(message.release_nonnull());
continue;
}
@ -243,6 +262,17 @@ void ConnectionBase::try_parse_messages(Vector<u8> const& bytes, size_t& index)
dbgln("{:hex-dump}", remaining_bytes);
break;
}
if (received_ack_count > 0) {
Threading::MutexLocker lock(m_acknowledgement_wait_queue->mutex);
for (size_t i = 0; i < received_ack_count; ++i)
m_acknowledgement_wait_queue->messages.take_first();
}
if (is_open() && pending_ack_count > 0) {
auto acknowledgement = Acknowledgement::create(m_peer_endpoint_magic, pending_ack_count);
MUST(post_message(m_peer_endpoint_magic, MUST(acknowledgement->encode()), MessageNeedsAcknowledgement::No));
}
}
}