LibIPC: Make TransportSocket responsible for reading entire messages

With this change, the responsibility for prepending messages with their
size and ensuring the entire message is received before returning it to
the caller is moved to TransportSocket. This removes the need to
duplicate this logic in both LibIPC and MessagePort.

Another advantage of reducing message granularity at IPC::Transport
layer is that it will make it easier to support alternative transport
implementations (like Mach ports, which unlike Unix domain sockets are
not stream oriented).
This commit is contained in:
Aliaksandr Kalenik 2025-04-07 04:17:36 +02:00 committed by Alexander Kalenik
commit a371f849e3
Notes: github-actions[bot] 2025-04-07 15:00:45 +00:00
7 changed files with 130 additions and 204 deletions

View file

@ -1,5 +1,6 @@
/*
* Copyright (c) 2021-2024, Andreas Kling <andreas@ladybird.org>
* Copyright (c) 2025, Aliaksandr Kalenik <kalenik.aliaksandr@gmail.com>
* Copyright (c) 2022, the SerenityOS developers.
*
* SPDX-License-Identifier: BSD-2-Clause
@ -11,6 +12,7 @@
#include <LibIPC/Connection.h>
#include <LibIPC/Message.h>
#include <LibIPC/Stub.h>
#include <LibIPC/UnprocessedFileDescriptors.h>
namespace IPC {
@ -139,63 +141,65 @@ void ConnectionBase::wait_for_transport_to_become_readable()
m_transport.wait_until_readable();
}
ErrorOr<Vector<u8>> ConnectionBase::read_as_much_as_possible_from_transport_without_blocking()
{
Vector<u8> bytes;
if (!m_unprocessed_bytes.is_empty()) {
bytes.append(m_unprocessed_bytes.data(), m_unprocessed_bytes.size());
m_unprocessed_bytes.clear();
}
bool should_shut_down = false;
auto schedule_shutdown = [this, &should_shut_down]() {
should_shut_down = true;
deferred_invoke([this] {
shutdown();
});
};
auto&& [new_bytes, received_fds] = m_transport.read_as_much_as_possible_without_blocking(move(schedule_shutdown));
bytes.append(new_bytes.data(), new_bytes.size());
for (auto const& fd : received_fds)
m_unprocessed_fds.enqueue(IPC::File::adopt_fd(fd));
if (!bytes.is_empty()) {
m_responsiveness_timer->stop();
did_become_responsive();
} else if (should_shut_down) {
return Error::from_string_literal("IPC connection EOF");
}
return bytes;
}
ErrorOr<void> ConnectionBase::drain_messages_from_peer()
{
auto bytes = TRY(read_as_much_as_possible_from_transport_without_blocking());
u32 pending_ack_count = 0;
u32 received_ack_count = 0;
auto schedule_shutdown = m_transport.read_as_many_messages_as_possible_without_blocking([&](auto&& unparsed_message) {
auto const& bytes = unparsed_message.bytes;
UnprocessedFileDescriptors unprocessed_fds;
unprocessed_fds.return_fds_to_front_of_queue(move(unparsed_message.fds));
if (auto message = try_parse_message(bytes, unprocessed_fds)) {
if (message->message_id() == LargeMessageWrapper::MESSAGE_ID) {
LargeMessageWrapper* wrapper = static_cast<LargeMessageWrapper*>(message.ptr());
auto wrapped_message = wrapper->wrapped_message_data();
unprocessed_fds.return_fds_to_front_of_queue(wrapper->take_fds());
auto parsed_message = try_parse_message(wrapped_message, unprocessed_fds);
VERIFY(parsed_message);
VERIFY(parsed_message->message_id() != Acknowledgement::MESSAGE_ID);
pending_ack_count++;
m_unprocessed_messages.append(parsed_message.release_nonnull());
return;
}
size_t index = 0;
try_parse_messages(bytes, index);
if (message->message_id() == Acknowledgement::MESSAGE_ID) {
VERIFY(message->endpoint_magic() == m_local_endpoint_magic);
received_ack_count += static_cast<Acknowledgement*>(message.ptr())->ack_count();
return;
}
if (index < bytes.size()) {
// Sometimes we might receive a partial message. That's okay, just stash away
// the unprocessed bytes and we'll prepend them to the next incoming message
// in the next run of this function.
auto remaining_bytes = TRY(ByteBuffer::copy(bytes.span().slice(index)));
if (!m_unprocessed_bytes.is_empty()) {
shutdown();
return Error::from_string_literal("drain_messages_from_peer: Already have unprocessed bytes");
pending_ack_count++;
m_unprocessed_messages.append(message.release_nonnull());
} else {
dbgln("Failed to parse IPC message {:hex-dump}", bytes);
VERIFY_NOT_REACHED();
}
m_unprocessed_bytes = move(remaining_bytes);
});
if (received_ack_count > 0) {
Threading::MutexLocker lock(m_acknowledgement_wait_queue->mutex);
for (size_t i = 0; i < received_ack_count; ++i)
m_acknowledgement_wait_queue->messages.take_first();
}
if (is_open() && pending_ack_count > 0) {
auto acknowledgement = Acknowledgement::create(m_peer_endpoint_magic, pending_ack_count);
MUST(post_message(m_peer_endpoint_magic, MUST(acknowledgement->encode()), MessageNeedsAcknowledgement::No));
}
if (!m_unprocessed_messages.is_empty()) {
m_responsiveness_timer->stop();
did_become_responsive();
deferred_invoke([this] {
handle_messages();
});
} else if (schedule_shutdown == TransportSocket::ShouldShutdown::Yes) {
deferred_invoke([this] {
shutdown();
});
return Error::from_string_literal("IPC connection EOF");
}
return {};
}
@ -222,57 +226,4 @@ OwnPtr<IPC::Message> ConnectionBase::wait_for_specific_endpoint_message_impl(u32
return {};
}
void ConnectionBase::try_parse_messages(Vector<u8> const& bytes, size_t& index)
{
u32 message_size = 0;
u32 pending_ack_count = 0;
u32 received_ack_count = 0;
for (; index + sizeof(message_size) < bytes.size(); index += message_size) {
memcpy(&message_size, bytes.data() + index, sizeof(message_size));
if (message_size == 0 || bytes.size() - index - sizeof(uint32_t) < message_size)
break;
index += sizeof(message_size);
auto remaining_bytes = ReadonlyBytes { bytes.data() + index, message_size };
if (auto message = try_parse_message(remaining_bytes, m_unprocessed_fds)) {
if (message->message_id() == LargeMessageWrapper::MESSAGE_ID) {
LargeMessageWrapper* wrapper = static_cast<LargeMessageWrapper*>(message.ptr());
auto wrapped_message = wrapper->wrapped_message_data();
m_unprocessed_fds.return_fds_to_front_of_queue(wrapper->take_fds());
auto parsed_message = try_parse_message(wrapped_message, m_unprocessed_fds);
VERIFY(parsed_message);
VERIFY(parsed_message->message_id() != Acknowledgement::MESSAGE_ID);
pending_ack_count++;
m_unprocessed_messages.append(parsed_message.release_nonnull());
continue;
}
if (message->message_id() == Acknowledgement::MESSAGE_ID) {
VERIFY(message->endpoint_magic() == m_local_endpoint_magic);
received_ack_count += static_cast<Acknowledgement*>(message.ptr())->ack_count();
continue;
}
pending_ack_count++;
m_unprocessed_messages.append(message.release_nonnull());
continue;
}
dbgln("Failed to parse IPC message:");
dbgln("{:hex-dump}", remaining_bytes);
break;
}
if (received_ack_count > 0) {
Threading::MutexLocker lock(m_acknowledgement_wait_queue->mutex);
for (size_t i = 0; i < received_ack_count; ++i)
m_acknowledgement_wait_queue->messages.take_first();
}
if (is_open() && pending_ack_count > 0) {
auto acknowledgement = Acknowledgement::create(m_peer_endpoint_magic, pending_ack_count);
MUST(post_message(m_peer_endpoint_magic, MUST(acknowledgement->encode()), MessageNeedsAcknowledgement::No));
}
}
}