LibIPC+LibWeb: Delete LargeMessageWrapper workaround in IPC connection

Bring back 2d625f5c23
This commit is contained in:
Aliaksandr Kalenik 2025-04-10 20:26:46 +02:00
parent 5ed5169d60
commit 7608229219
11 changed files with 23 additions and 177 deletions

View file

@ -12,7 +12,6 @@
#include <LibIPC/Connection.h>
#include <LibIPC/Message.h>
#include <LibIPC/Stub.h>
#include <LibIPC/UnprocessedFileDescriptors.h>
namespace IPC {
@ -40,21 +39,16 @@ bool ConnectionBase::is_open() const
ErrorOr<void> ConnectionBase::post_message(Message const& message)
{
return post_message(message.endpoint_magic(), TRY(message.encode()));
return post_message(TRY(message.encode()));
}
ErrorOr<void> ConnectionBase::post_message(u32 endpoint_magic, MessageBuffer buffer)
ErrorOr<void> ConnectionBase::post_message(MessageBuffer buffer)
{
// NOTE: If this connection is being shut down, but has not yet been destroyed,
// the socket will be closed. Don't try to send more messages.
if (!m_transport->is_open())
return Error::from_string_literal("Trying to post_message during IPC shutdown");
if (buffer.data().size() > TransportSocket::SOCKET_BUFFER_SIZE) {
auto wrapper = LargeMessageWrapper::create(endpoint_magic, buffer);
buffer = MUST(wrapper->encode());
}
MUST(buffer.transfer_message(*m_transport));
m_responsiveness_timer->start();
@ -85,7 +79,7 @@ void ConnectionBase::handle_messages()
}
if (auto response = handler_result.release_value()) {
if (auto post_result = post_message(m_local_endpoint_magic, *response); post_result.is_error()) {
if (auto post_result = post_message(*response); post_result.is_error()) {
dbgln("IPC::ConnectionBase::handle_messages: {}", post_result.error());
}
}
@ -100,24 +94,11 @@ void ConnectionBase::wait_for_transport_to_become_readable()
ErrorOr<void> ConnectionBase::drain_messages_from_peer()
{
auto schedule_shutdown = m_transport->read_as_many_messages_as_possible_without_blocking([&](auto&& unparsed_message) {
auto const& bytes = unparsed_message.bytes;
UnprocessedFileDescriptors unprocessed_fds;
unprocessed_fds.return_fds_to_front_of_queue(move(unparsed_message.fds));
if (auto message = try_parse_message(bytes, unprocessed_fds)) {
if (message->message_id() == LargeMessageWrapper::MESSAGE_ID) {
LargeMessageWrapper* wrapper = static_cast<LargeMessageWrapper*>(message.ptr());
auto wrapped_message = wrapper->wrapped_message_data();
unprocessed_fds.return_fds_to_front_of_queue(wrapper->take_fds());
auto parsed_message = try_parse_message(wrapped_message, unprocessed_fds);
VERIFY(parsed_message);
m_unprocessed_messages.append(parsed_message.release_nonnull());
return;
}
auto schedule_shutdown = m_transport->read_as_many_messages_as_possible_without_blocking([&](auto&& raw_message) {
if (auto message = try_parse_message(raw_message.bytes, raw_message.fds)) {
m_unprocessed_messages.append(message.release_nonnull());
} else {
dbgln("Failed to parse IPC message {:hex-dump}", bytes);
dbgln("Failed to parse IPC message {:hex-dump}", raw_message.bytes);
VERIFY_NOT_REACHED();
}
});

View file

@ -15,10 +15,6 @@
#include <LibIPC/Forward.h>
#include <LibIPC/Message.h>
#include <LibIPC/Transport.h>
#include <LibIPC/UnprocessedFileDescriptors.h>
#include <LibThreading/ConditionVariable.h>
#include <LibThreading/MutexProtected.h>
#include <LibThreading/Thread.h>
namespace IPC {
@ -30,7 +26,7 @@ public:
[[nodiscard]] bool is_open() const;
ErrorOr<void> post_message(Message const&);
ErrorOr<void> post_message(u32 endpoint_magic, MessageBuffer);
ErrorOr<void> post_message(MessageBuffer);
void shutdown();
virtual void die() { }
@ -43,7 +39,7 @@ protected:
virtual void may_have_become_unresponsive() { }
virtual void did_become_responsive() { }
virtual void shutdown_with_error(Error const&);
virtual OwnPtr<Message> try_parse_message(ReadonlyBytes, UnprocessedFileDescriptors&) = 0;
virtual OwnPtr<Message> try_parse_message(ReadonlyBytes, Queue<File>&) = 0;
OwnPtr<IPC::Message> wait_for_specific_endpoint_message_impl(u32 endpoint_magic, int message_id);
void wait_for_transport_to_become_readable();
@ -102,7 +98,7 @@ protected:
return {};
}
virtual OwnPtr<Message> try_parse_message(ReadonlyBytes bytes, UnprocessedFileDescriptors& fds) override
virtual OwnPtr<Message> try_parse_message(ReadonlyBytes bytes, Queue<File>& fds) override
{
auto local_message = LocalEndpoint::decode_message(bytes, fds);
if (!local_message.is_error())

View file

@ -23,7 +23,6 @@
#include <LibIPC/File.h>
#include <LibIPC/Forward.h>
#include <LibIPC/Message.h>
#include <LibIPC/UnprocessedFileDescriptors.h>
#include <LibURL/Origin.h>
#include <LibURL/URL.h>
@ -38,7 +37,7 @@ inline ErrorOr<T> decode(Decoder&)
class Decoder {
public:
Decoder(Stream& stream, UnprocessedFileDescriptors& files)
Decoder(Stream& stream, Queue<File>& files)
: m_stream(stream)
, m_files(files)
{
@ -63,11 +62,11 @@ public:
ErrorOr<size_t> decode_size();
Stream& stream() { return m_stream; }
UnprocessedFileDescriptors& files() { return m_files; }
Queue<File>& files() { return m_files; }
private:
Stream& m_stream;
UnprocessedFileDescriptors& m_files;
Queue<File>& m_files;
};
template<Arithmetic T>

View file

@ -6,7 +6,6 @@
#include <AK/Checked.h>
#include <LibIPC/Decoder.h>
#include <LibIPC/Encoder.h>
#include <LibIPC/Message.h>
namespace IPC {
@ -47,53 +46,4 @@ ErrorOr<void> MessageBuffer::transfer_message(Transport& transport)
return {};
}
NonnullOwnPtr<LargeMessageWrapper> LargeMessageWrapper::create(u32 endpoint_magic, MessageBuffer& buffer_to_wrap)
{
auto size = buffer_to_wrap.data().size();
auto wrapped_message_data = MUST(Core::AnonymousBuffer::create_with_size(size));
memcpy(wrapped_message_data.data<void>(), buffer_to_wrap.data().data(), size);
Vector<File> files;
for (auto& owned_fd : buffer_to_wrap.take_fds()) {
files.append(File::adopt_fd(owned_fd->take_fd()));
}
return make<LargeMessageWrapper>(endpoint_magic, move(wrapped_message_data), move(files));
}
LargeMessageWrapper::LargeMessageWrapper(u32 endpoint_magic, Core::AnonymousBuffer wrapped_message_data, Vector<File>&& wrapped_fds)
: m_endpoint_magic(endpoint_magic)
, m_wrapped_message_data(move(wrapped_message_data))
, m_wrapped_fds(move(wrapped_fds))
{
}
ErrorOr<MessageBuffer> LargeMessageWrapper::encode() const
{
MessageBuffer buffer;
Encoder stream { buffer };
TRY(stream.encode(m_endpoint_magic));
TRY(stream.encode(MESSAGE_ID));
TRY(stream.encode(m_wrapped_message_data));
TRY(stream.encode(m_wrapped_fds.size()));
for (auto const& wrapped_fd : m_wrapped_fds) {
TRY(stream.append_file_descriptor(wrapped_fd.take_fd()));
}
return buffer;
}
ErrorOr<NonnullOwnPtr<LargeMessageWrapper>> LargeMessageWrapper::decode(u32 endpoint_magic, Stream& stream, UnprocessedFileDescriptors& files)
{
Decoder decoder { stream, files };
auto wrapped_message_data = TRY(decoder.decode<Core::AnonymousBuffer>());
Vector<File> wrapped_fds;
auto num_fds = TRY(decoder.decode<u32>());
for (u32 i = 0; i < num_fds; ++i) {
auto fd = TRY(decoder.decode<IPC::File>());
wrapped_fds.append(move(fd));
}
return make<LargeMessageWrapper>(endpoint_magic, wrapped_message_data, move(wrapped_fds));
}
}

View file

@ -8,14 +8,8 @@
#pragma once
#include <AK/Error.h>
#include <AK/RefCounted.h>
#include <AK/RefPtr.h>
#include <AK/Vector.h>
#include <LibCore/AnonymousBuffer.h>
#include <LibCore/Forward.h>
#include <LibCore/System.h>
#include <LibIPC/Transport.h>
#include <LibIPC/UnprocessedFileDescriptors.h>
namespace IPC {
@ -67,30 +61,4 @@ protected:
Message() = default;
};
class LargeMessageWrapper : public Message {
public:
~LargeMessageWrapper() override = default;
static constexpr int MESSAGE_ID = 0x0;
static NonnullOwnPtr<LargeMessageWrapper> create(u32 endpoint_magic, MessageBuffer& buffer_to_wrap);
u32 endpoint_magic() const override { return m_endpoint_magic; }
int message_id() const override { return MESSAGE_ID; }
char const* message_name() const override { return "LargeMessageWrapper"; }
ErrorOr<MessageBuffer> encode() const override;
static ErrorOr<NonnullOwnPtr<LargeMessageWrapper>> decode(u32 endpoint_magic, Stream& stream, UnprocessedFileDescriptors& files);
ReadonlyBytes wrapped_message_data() const { return ReadonlyBytes { m_wrapped_message_data.data<u8>(), m_wrapped_message_data.size() }; }
auto take_fds() { return move(m_wrapped_fds); }
LargeMessageWrapper(u32 endpoint_magic, Core::AnonymousBuffer wrapped_message_data, Vector<IPC::File>&& wrapped_fds);
private:
u32 m_endpoint_magic { 0 };
Core::AnonymousBuffer m_wrapped_message_data;
Vector<File> m_wrapped_fds;
};
}

View file

@ -197,7 +197,7 @@ ErrorOr<void> TransportSocket::send_message(Core::LocalSocket& socket, ReadonlyB
return {};
}
TransportSocket::ShouldShutdown TransportSocket::read_as_many_messages_as_possible_without_blocking(Function<void(Message)>&& callback)
TransportSocket::ShouldShutdown TransportSocket::read_as_many_messages_as_possible_without_blocking(Function<void(Message&&)>&& callback)
{
Threading::RWLockLocker<Threading::LockMode::Read> lock(m_socket_rw_lock);
@ -248,7 +248,7 @@ TransportSocket::ShouldShutdown TransportSocket::read_as_many_messages_as_possib
Message message;
received_fd_count += header.fd_count;
for (size_t i = 0; i < header.fd_count; ++i)
message.fds.append(m_unprocessed_fds.dequeue());
message.fds.enqueue(m_unprocessed_fds.dequeue());
message.bytes.append(m_unprocessed_bytes.data() + index + sizeof(MessageHeader), header.payload_size);
callback(move(message));
} else if (header.type == MessageHeader::Type::FileDescriptorAcknowledgement) {

View file

@ -9,7 +9,6 @@
#include <AK/Queue.h>
#include <LibCore/Socket.h>
#include <LibIPC/UnprocessedFileDescriptors.h>
#include <LibThreading/ConditionVariable.h>
#include <LibThreading/MutexProtected.h>
#include <LibThreading/RWLock.h>
@ -92,9 +91,9 @@ public:
};
struct Message {
Vector<u8> bytes;
Vector<File> fds;
Queue<File> fds;
};
ShouldShutdown read_as_many_messages_as_possible_without_blocking(Function<void(Message)>&&);
ShouldShutdown read_as_many_messages_as_possible_without_blocking(Function<void(Message&&)>&&);
// Obnoxious name to make it clear that this is a dangerous operation.
ErrorOr<int> release_underlying_transport_for_transfer();
@ -107,7 +106,7 @@ private:
NonnullOwnPtr<Core::LocalSocket> m_socket;
mutable Threading::RWLock m_socket_rw_lock;
ByteBuffer m_unprocessed_bytes;
UnprocessedFileDescriptors m_unprocessed_fds;
Queue<File> m_unprocessed_fds;
// After file descriptor is sent, it is moved to the wait queue until an acknowledgement is received from the peer.
// This is necessary to handle a specific behavior of the macOS kernel, which may prematurely garbage-collect the file

View file

@ -1,36 +0,0 @@
/*
* Copyright (c) 2025, Aliaksandr Kalenik <kalenik.aliaksandr@gmail.com>
*
* SPDX-License-Identifier: BSD-2-Clause
*/
#pragma once
#include <LibIPC/File.h>
namespace IPC {
class UnprocessedFileDescriptors {
public:
void enqueue(File&& fd)
{
m_fds.append(move(fd));
}
File dequeue()
{
return m_fds.take_first();
}
void return_fds_to_front_of_queue(Vector<File>&& fds)
{
m_fds.prepend(move(fds));
}
size_t size() const { return m_fds.size(); }
private:
Vector<File> m_fds;
};
}

View file

@ -288,13 +288,9 @@ void MessagePort::post_port_message(SerializedTransferRecord serialize_with_tran
void MessagePort::read_from_transport()
{
auto schedule_shutdown = m_transport->read_as_many_messages_as_possible_without_blocking([this](auto&& unparsed_message) {
auto& bytes = unparsed_message.bytes;
IPC::UnprocessedFileDescriptors unprocessed_fds;
unprocessed_fds.return_fds_to_front_of_queue(move(unparsed_message.fds));
FixedMemoryStream stream { bytes.span(), FixedMemoryStream::Mode::ReadOnly };
IPC::Decoder decoder { stream, unprocessed_fds };
auto schedule_shutdown = m_transport->read_as_many_messages_as_possible_without_blocking([this](auto&& raw_message) {
FixedMemoryStream stream { raw_message.bytes.span(), FixedMemoryStream::Mode::ReadOnly };
IPC::Decoder decoder { stream, raw_message.fds };
auto serialized_transfer_record = MUST(decoder.decode<SerializedTransferRecord>());

View file

@ -13,7 +13,6 @@
#include <LibCore/Socket.h>
#include <LibIPC/File.h>
#include <LibIPC/Transport.h>
#include <LibIPC/UnprocessedFileDescriptors.h>
#include <LibWeb/Bindings/Transferable.h>
#include <LibWeb/DOM/EventTarget.h>
#include <LibWeb/Forward.h>

View file

@ -404,7 +404,7 @@ public:)~~~");
static i32 static_message_id() { return (int)MessageID::@message.pascal_name@; }
virtual const char* message_name() const override { return "@endpoint.name@::@message.pascal_name@"; }
static ErrorOr<NonnullOwnPtr<@message.pascal_name@>> decode(Stream& stream, IPC::UnprocessedFileDescriptors& files)
static ErrorOr<NonnullOwnPtr<@message.pascal_name@>> decode(Stream& stream, Queue<IPC::File>& files)
{
IPC::Decoder decoder { stream, files };)~~~");
@ -649,7 +649,7 @@ void generate_proxy_method(SourceGenerator& message_generator, Endpoint const& e
}
} else {
message_generator.append(R"~~~());
MUST(m_connection.post_message(@endpoint.magic@, move(message_buffer))); )~~~");
MUST(m_connection.post_message(move(message_buffer))); )~~~");
}
message_generator.appendln(R"~~~(
@ -720,7 +720,7 @@ public:
static u32 static_magic() { return @endpoint.magic@; }
static ErrorOr<NonnullOwnPtr<IPC::Message>> decode_message(ReadonlyBytes buffer, [[maybe_unused]] IPC::UnprocessedFileDescriptors& files)
static ErrorOr<NonnullOwnPtr<IPC::Message>> decode_message(ReadonlyBytes buffer, [[maybe_unused]] Queue<IPC::File>& files)
{
FixedMemoryStream stream { buffer };
auto message_endpoint_magic = TRY(stream.read_value<u32>());)~~~");
@ -757,11 +757,6 @@ public:
do_decode_message(message.response_name());
}
generator.append(R"~~~(
case (int)IPC::LargeMessageWrapper::MESSAGE_ID:
return TRY(IPC::LargeMessageWrapper::decode(message_endpoint_magic, stream, files));
)~~~");
generator.append(R"~~~(
default:)~~~");
if constexpr (GENERATE_DEBUG) {
@ -903,7 +898,6 @@ void build(StringBuilder& builder, Vector<Endpoint> const& endpoints)
#include <LibIPC/File.h>
#include <LibIPC/Message.h>
#include <LibIPC/Stub.h>
#include <LibIPC/UnprocessedFileDescriptors.h>
#if defined(AK_COMPILER_CLANG)
#pragma clang diagnostic push