LibIPC+LibWeb: Flush MessagePort messages before closing

The spec isn't super clear on what disentagling a MessagePort means. But
we are required to send all pending messages before closing the port.

This is a bit tricky because the transport socket performs writes on a
background thread. From the main thread, where the disentanglement will
occur, we don't really know the state of the write thread. So what we do
here is stop the background thread then flush all remaining data from
the main thread.
This commit is contained in:
Timothy Flynn 2025-05-20 16:21:17 -04:00 committed by Tim Flynn
commit 36da270dbe
Notes: github-actions[bot] 2025-05-21 10:56:21 +00:00
3 changed files with 85 additions and 36 deletions

View file

@ -74,43 +74,15 @@ TransportSocket::TransportSocket(NonnullOwnPtr<Core::LocalSocket> socket)
break;
auto [bytes, fds] = send_queue->peek(4096);
auto fds_count = fds.size();
ReadonlyBytes remaining_to_send_bytes = bytes;
ReadonlyBytes remaining_bytes_to_send = bytes;
Threading::RWLockLocker<Threading::LockMode::Read> lock(m_socket_rw_lock);
if (!m_socket->is_open())
if (transfer_data(remaining_bytes_to_send, fds) == TransferState::SocketClosed)
break;
auto result = send_message(*m_socket, remaining_to_send_bytes, fds);
if (result.is_error()) {
if (result.error().is_errno() && result.error().code() == EPIPE) {
// The socket is closed from the other end, we can stop sending.
break;
}
dbgln("TransportSocket::send_thread: {}", result.error());
VERIFY_NOT_REACHED();
}
auto written_bytes_count = bytes.size() - remaining_to_send_bytes.size();
auto written_fds_count = fds_count - fds.size();
if (written_bytes_count > 0 || written_fds_count > 0) {
send_queue->discard(written_bytes_count, written_fds_count);
}
if (!m_socket->is_open())
break;
{
Vector<struct pollfd, 1> pollfds;
pollfds.append({ .fd = m_socket->fd().value(), .events = POLLOUT, .revents = 0 });
ErrorOr<int> result { 0 };
do {
result = Core::System::poll(pollfds, -1);
} while (result.is_error() && result.error().code() == EINTR);
}
}
return 0;
});
m_send_thread->start();
(void)Core::System::setsockopt(m_socket->fd().value(), SOL_SOCKET, SO_SNDBUF, &SOCKET_BUFFER_SIZE, sizeof(SOCKET_BUFFER_SIZE));
@ -118,9 +90,16 @@ TransportSocket::TransportSocket(NonnullOwnPtr<Core::LocalSocket> socket)
}
TransportSocket::~TransportSocket()
{
stop_send_thread();
}
void TransportSocket::stop_send_thread()
{
m_send_queue->stop();
(void)m_send_thread->join();
if (m_send_thread->needs_to_be_joined())
(void)m_send_thread->join();
}
void TransportSocket::set_up_read_hook(Function<void()> hook)
@ -142,6 +121,21 @@ void TransportSocket::close()
m_socket->close();
}
void TransportSocket::close_after_sending_all_pending_messages()
{
stop_send_thread();
auto [bytes, fds] = m_send_queue->peek(NumericLimits<size_t>::max());
ReadonlyBytes remaining_bytes_to_send = bytes;
while (!remaining_bytes_to_send.is_empty() || !fds.is_empty()) {
if (transfer_data(remaining_bytes_to_send, fds) == TransferState::SocketClosed)
break;
}
close();
}
void TransportSocket::wait_until_readable()
{
Threading::RWLockLocker<Threading::LockMode::Read> lock(m_socket_rw_lock);
@ -217,6 +211,47 @@ ErrorOr<void> TransportSocket::send_message(Core::LocalSocket& socket, ReadonlyB
return {};
}
TransportSocket::TransferState TransportSocket::transfer_data(ReadonlyBytes& bytes, Vector<int>& fds)
{
auto byte_count = bytes.size();
auto fd_count = fds.size();
Threading::RWLockLocker<Threading::LockMode::Read> lock(m_socket_rw_lock);
if (!m_socket->is_open())
return TransferState::SocketClosed;
if (auto result = send_message(*m_socket, bytes, fds); result.is_error()) {
if (result.error().is_errno() && result.error().code() == EPIPE) {
// The socket is closed from the other end, we can stop sending.
return TransferState::SocketClosed;
}
dbgln("TransportSocket::send_thread: {}", result.error());
VERIFY_NOT_REACHED();
}
auto written_byte_count = byte_count - bytes.size();
auto written_fd_count = fd_count - fds.size();
if (written_byte_count > 0 || written_fd_count > 0)
m_send_queue->discard(written_byte_count, written_fd_count);
if (!m_socket->is_open())
return TransferState::SocketClosed;
{
Vector<struct pollfd, 1> pollfds;
pollfds.append({ .fd = m_socket->fd().value(), .events = POLLOUT, .revents = 0 });
ErrorOr<int> result { 0 };
do {
result = Core::System::poll(pollfds, -1);
} while (result.is_error() && result.error().code() == EINTR);
}
return TransferState::Continue;
}
TransportSocket::ShouldShutdown TransportSocket::read_as_many_messages_as_possible_without_blocking(Function<void(Message&&)>&& callback)
{
Threading::RWLockLocker<Threading::LockMode::Read> lock(m_socket_rw_lock);