LibIPC+Everywhere: Introduce an IPC Transport abstraction

This abstraction will help us to support multiple IPC transport
mechanisms going forward. For now, we only have a TransportSocket that
implements the same behavior we previously had, using Unix Sockets for
IPC.
This commit is contained in:
Andrew Kaster 2024-10-22 15:47:33 -06:00 committed by Andrew Kaster
commit 7372b2af48
Notes: github-actions[bot] 2024-10-23 19:14:14 +00:00
45 changed files with 415 additions and 234 deletions

View file

@ -12,6 +12,7 @@
#include <LibIPC/Decoder.h>
#include <LibIPC/Encoder.h>
#include <LibIPC/File.h>
#include <LibIPC/Transport.h>
#include <LibWeb/Bindings/ExceptionOrUtils.h>
#include <LibWeb/Bindings/Intrinsics.h>
#include <LibWeb/Bindings/MessagePortPrototype.h>
@ -71,6 +72,11 @@ void MessagePort::visit_edges(Cell::Visitor& visitor)
visitor.visit(m_worker_event_target);
}
bool MessagePort::is_entangled() const
{
return m_transport.has_value();
}
void MessagePort::set_worker_event_target(JS::NonnullGCPtr<DOM::EventTarget> target)
{
m_worker_event_target = target;
@ -91,10 +97,14 @@ WebIDL::ExceptionOr<void> MessagePort::transfer_steps(HTML::TransferDataHolder&
m_remote_port->m_has_been_shipped = true;
// 2. Set dataHolder.[[RemotePort]] to remotePort.
auto fd = MUST(m_socket->release_fd());
m_socket = nullptr;
data_holder.fds.append(IPC::File::adopt_fd(fd));
data_holder.data.append(IPC_FILE_TAG);
if constexpr (IsSame<IPC::Transport, IPC::TransportSocket>) {
auto fd = MUST(m_transport->release_underlying_transport_for_transfer());
m_transport = {};
data_holder.fds.append(IPC::File::adopt_fd(fd));
data_holder.data.append(IPC_FILE_TAG);
} else {
VERIFY(false && "Don't know how to transfer IPC::Transport type");
}
}
// 4. Otherwise, set dataHolder.[[RemotePort]] to null.
@ -118,12 +128,16 @@ WebIDL::ExceptionOr<void> MessagePort::transfer_receiving_steps(HTML::TransferDa
// (This will disentangle dataHolder.[[RemotePort]] from the original port that was transferred.)
auto fd_tag = data_holder.data.take_first();
if (fd_tag == IPC_FILE_TAG) {
auto fd = data_holder.fds.take_first();
m_socket = MUST(Core::LocalSocket::adopt_fd(fd.take_fd()));
if constexpr (IsSame<IPC::Transport, IPC::TransportSocket>) {
auto fd = data_holder.fds.take_first();
m_transport = IPC::Transport(MUST(Core::LocalSocket::adopt_fd(fd.take_fd())));
m_socket->on_ready_to_read = [strong_this = JS::make_handle(this)]() {
strong_this->read_from_socket();
};
m_transport->set_up_read_hook([strong_this = JS::make_handle(this)]() {
strong_this->read_from_transport();
});
} else {
VERIFY(false && "Don't know how to receive IPC::Transport type");
}
} else if (fd_tag != 0) {
dbgln("Unexpected byte {:x} in MessagePort transfer data", fd_tag);
VERIFY_NOT_REACHED();
@ -138,7 +152,7 @@ void MessagePort::disentangle()
m_remote_port->m_remote_port = nullptr;
m_remote_port = nullptr;
m_socket = nullptr;
m_transport = {};
m_worker_event_target = nullptr;
}
@ -160,6 +174,7 @@ void MessagePort::entangle_with(MessagePort& remote_port)
remote_port.m_remote_port = this;
m_remote_port = &remote_port;
// FIXME: Abstract such that we can entangle different transport types
auto create_paired_sockets = []() -> Array<NonnullOwnPtr<Core::LocalSocket>, 2> {
int fds[2] = {};
MUST(Core::System::socketpair(AF_LOCAL, SOCK_STREAM, 0, fds));
@ -174,16 +189,16 @@ void MessagePort::entangle_with(MessagePort& remote_port)
};
auto sockets = create_paired_sockets();
m_socket = move(sockets[0]);
m_remote_port->m_socket = move(sockets[1]);
m_transport = IPC::Transport(move(sockets[0]));
m_remote_port->m_transport = IPC::Transport(move(sockets[1]));
m_socket->on_ready_to_read = [strong_this = JS::make_handle(this)]() {
strong_this->read_from_socket();
};
m_transport->set_up_read_hook([strong_this = JS::make_handle(this)]() {
strong_this->read_from_transport();
});
m_remote_port->m_socket->on_ready_to_read = [remote_port = JS::make_handle(m_remote_port)]() {
remote_port->read_from_socket();
};
m_remote_port->m_transport->set_up_read_hook([remote_port = JS::make_handle(m_remote_port)]() {
remote_port->read_from_transport();
});
}
// https://html.spec.whatwg.org/multipage/web-messaging.html#dom-messageport-postmessage-options
@ -243,7 +258,7 @@ WebIDL::ExceptionOr<void> MessagePort::message_port_post_message_steps(JS::GCPtr
// 6. If targetPort is null, or if doomed is true, then return.
// IMPLEMENTATION DEFINED: Actually check the socket here, not the target port.
// If there's no target message port in the same realm, we still want to send the message over IPC
if (!m_socket || doomed) {
if (!m_transport.has_value() || doomed) {
return {};
}
@ -253,13 +268,13 @@ WebIDL::ExceptionOr<void> MessagePort::message_port_post_message_steps(JS::GCPtr
return {};
}
ErrorOr<void> MessagePort::send_message_on_socket(SerializedTransferRecord const& serialize_with_transfer_result)
ErrorOr<void> MessagePort::send_message_on_transport(SerializedTransferRecord const& serialize_with_transfer_result)
{
IPC::MessageBuffer buffer;
IPC::Encoder encoder(buffer);
MUST(encoder.encode(serialize_with_transfer_result));
TRY(buffer.transfer_message(*m_socket));
TRY(buffer.transfer_message(*m_transport));
return {};
}
@ -267,9 +282,9 @@ void MessagePort::post_port_message(SerializedTransferRecord serialize_with_tran
{
// FIXME: Use the correct task source?
queue_global_task(Task::Source::PostedMessage, relevant_global_object(*this), JS::create_heap_function(heap(), [this, serialize_with_transfer_result = move(serialize_with_transfer_result)]() mutable {
if (!m_socket || !m_socket->is_open())
if (!m_transport.has_value() || !m_transport->is_open())
return;
if (auto result = send_message_on_socket(serialize_with_transfer_result); result.is_error()) {
if (auto result = send_message_on_transport(serialize_with_transfer_result); result.is_error()) {
dbgln("Failed to post message: {}", result.error());
disentangle();
}
@ -320,18 +335,13 @@ ErrorOr<MessagePort::ParseDecision> MessagePort::parse_message()
return ParseDecision::ParseNextMessage;
}
void MessagePort::read_from_socket()
void MessagePort::read_from_transport()
{
u8 buffer[4096] {};
Vector<int> fds;
// FIXME: What if pending bytes is > 4096? Should we loop here?
auto maybe_bytes = m_socket->receive_message(buffer, MSG_NOSIGNAL, fds);
if (maybe_bytes.is_error()) {
dbgln("MessagePort::read_from_socket(): Failed to receive message: {}", maybe_bytes.error());
return;
}
auto bytes = maybe_bytes.release_value();
auto&& [bytes, fds] = m_transport->read_as_much_as_possible_without_blocking([this] {
queue_global_task(Task::Source::PostedMessage, relevant_global_object(*this), JS::create_heap_function(heap(), [this] {
this->close();
}));
});
m_buffered_data.append(bytes.data(), bytes.size());
@ -406,7 +416,7 @@ void MessagePort::start()
if (!is_entangled())
return;
VERIFY(m_socket);
VERIFY(m_transport.has_value());
// TODO: The start() method steps are to enable this's port message queue, if it is not already enabled.
}