LibIPC: Change TransportSocket to write large messages in small chunks

With this change TransportSocket becomes capable of sending large
messages without relying on workarounds, such as sending the message as
a shared memory file descriptor when it can't fully fit into the socket
buffer.

It's implemented by combining all enqueued messages into two buffers:
one for bytes and another for fds, and repeatedly attempts to write them
in smaller chunks, waiting for the socket to become writable again if
the receiver needs time to consume the data.

Another significant improvement brought by this change is that we no
longer drop messages queued for sending if the socket doesn't become
writable after a 100ms timeout. Instead, we return the message to the
send buffer and continue waiting for the socket to become writable.
This commit is contained in:
Aliaksandr Kalenik 2025-04-09 20:54:41 +02:00 committed by Alexander Kalenik
parent fc111537bb
commit d6080d1fdc
Notes: github-actions[bot] 2025-04-09 23:31:08 +00:00
2 changed files with 95 additions and 62 deletions

View file

@ -13,26 +13,77 @@
namespace IPC {
void SendQueue::enqueue_message(Vector<u8>&& bytes, Vector<int>&& fds)
{
Threading::MutexLocker locker(m_mutex);
m_bytes.append(bytes.data(), bytes.size());
m_fds.append(fds.data(), fds.size());
m_condition.signal();
}
SendQueue::Running SendQueue::block_until_message_enqueued()
{
Threading::MutexLocker locker(m_mutex);
while (m_bytes.is_empty() && m_fds.is_empty() && m_running)
m_condition.wait();
return m_running ? Running::Yes : Running::No;
}
SendQueue::BytesAndFds SendQueue::dequeue(size_t max_bytes)
{
Threading::MutexLocker locker(m_mutex);
auto bytes_to_send = min(max_bytes, m_bytes.size());
Vector<u8> bytes;
bytes.append(m_bytes.data(), bytes_to_send);
m_bytes.remove(0, bytes_to_send);
return { move(bytes), move(m_fds) };
}
void SendQueue::return_unsent_data_to_front_of_queue(ReadonlyBytes const& bytes, Vector<int> const& fds)
{
Threading::MutexLocker locker(m_mutex);
m_bytes.prepend(bytes.data(), bytes.size());
m_fds.prepend(fds.data(), fds.size());
}
void SendQueue::stop()
{
Threading::MutexLocker locker(m_mutex);
m_running = false;
m_condition.signal();
}
TransportSocket::TransportSocket(NonnullOwnPtr<Core::LocalSocket> socket)
: m_socket(move(socket))
{
m_send_queue = adopt_ref(*new SendQueue);
m_send_thread = Threading::Thread::construct([this, send_queue = m_send_queue]() -> intptr_t {
for (;;) {
send_queue->mutex.lock();
while (send_queue->messages.is_empty() && send_queue->running)
send_queue->condition.wait();
if (!send_queue->running) {
send_queue->mutex.unlock();
if (send_queue->block_until_message_enqueued() == SendQueue::Running::No)
break;
auto [bytes, fds] = send_queue->dequeue(4096);
ReadonlyBytes bytes_to_send = bytes;
auto result = send_message(*m_socket, bytes_to_send, fds);
if (result.is_error()) {
dbgln("TransportSocket::send_thread: {}", result.error());
VERIFY_NOT_REACHED();
}
auto [bytes, fds] = send_queue->messages.take_first();
send_queue->mutex.unlock();
if (!bytes.is_empty() || !fds.is_empty()) {
send_queue->return_unsent_data_to_front_of_queue(bytes_to_send, fds);
}
if (auto result = send_message(*m_socket, bytes, fds); result.is_error()) {
dbgln("TransportSocket::send_thread: {}", result.error());
{
Vector<struct pollfd, 1> pollfds;
if (pollfds.is_empty())
pollfds.append({ .fd = m_socket->fd().value(), .events = POLLOUT, .revents = 0 });
ErrorOr<int> result { 0 };
do {
result = Core::System::poll(pollfds, -1);
} while (result.is_error() && result.error().code() == EINTR);
}
}
return 0;
@ -45,11 +96,7 @@ TransportSocket::TransportSocket(NonnullOwnPtr<Core::LocalSocket> socket)
TransportSocket::~TransportSocket()
{
{
Threading::MutexLocker locker(m_send_queue->mutex);
m_send_queue->running = false;
m_send_queue->condition.signal();
}
m_send_queue->stop();
(void)m_send_thread->join();
}
@ -114,55 +161,27 @@ void TransportSocket::post_message(Vector<u8> const& bytes_to_write, Vector<Nonn
}
}
queue_message_on_send_thread({ move(message_buffer), move(raw_fds) });
m_send_queue->enqueue_message(move(message_buffer), move(raw_fds));
}
void TransportSocket::queue_message_on_send_thread(MessageToSend&& message_to_send) const
{
Threading::MutexLocker lock(m_send_queue->mutex);
m_send_queue->messages.append(move(message_to_send));
m_send_queue->condition.signal();
}
ErrorOr<void> TransportSocket::send_message(Core::LocalSocket& socket, ReadonlyBytes&& bytes_to_write, Vector<int, 1> const& unowned_fds)
ErrorOr<void> TransportSocket::send_message(Core::LocalSocket& socket, ReadonlyBytes& bytes_to_write, Vector<int>& unowned_fds)
{
auto num_fds_to_transfer = unowned_fds.size();
while (!bytes_to_write.is_empty()) {
ErrorOr<ssize_t> maybe_nwritten = 0;
if (num_fds_to_transfer > 0) {
maybe_nwritten = socket.send_message(bytes_to_write, 0, unowned_fds);
if (!maybe_nwritten.is_error())
if (!maybe_nwritten.is_error()) {
num_fds_to_transfer = 0;
unowned_fds.clear();
}
} else {
maybe_nwritten = socket.write_some(bytes_to_write);
}
if (maybe_nwritten.is_error()) {
if (auto error = maybe_nwritten.release_error(); error.is_errno() && (error.code() == EAGAIN || error.code() == EWOULDBLOCK)) {
// FIXME: Refactor this to pass the unwritten bytes back to the caller to send 'later'
// or next time the socket is writable
Vector<struct pollfd, 1> pollfds;
if (pollfds.is_empty())
pollfds.append({ .fd = socket.fd().value(), .events = POLLOUT, .revents = 0 });
ErrorOr<int> result { 0 };
do {
constexpr u32 POLL_TIMEOUT_MS = 100;
result = Core::System::poll(pollfds, POLL_TIMEOUT_MS);
} while (result.is_error() && result.error().code() == EINTR);
if (!result.is_error() && result.value() != 0)
continue;
switch (error.code()) {
case EPIPE:
return Error::from_string_literal("IPC::transfer_message: Disconnected from peer");
case EAGAIN:
return Error::from_string_literal("IPC::transfer_message: Timed out waiting for socket to become writable");
default:
return Error::from_syscall("IPC::transfer_message write"sv, -error.code());
}
return {};
} else {
return error;
}
@ -252,7 +271,7 @@ TransportSocket::ShouldShutdown TransportSocket::read_as_many_messages_as_possib
header.fd_count = received_fd_count;
header.type = MessageHeader::Type::FileDescriptorAcknowledgement;
memcpy(message_buffer.data(), &header, sizeof(MessageHeader));
queue_message_on_send_thread({ move(message_buffer), {} });
m_send_queue->enqueue_message(move(message_buffer), {});
}
if (index < m_unprocessed_bytes.size()) {

View file

@ -42,6 +42,31 @@ private:
int m_fd;
};
class SendQueue : public AtomicRefCounted<SendQueue> {
public:
enum class Running {
No,
Yes,
};
Running block_until_message_enqueued();
void stop();
void enqueue_message(Vector<u8>&& bytes, Vector<int>&& fds);
struct BytesAndFds {
Vector<u8> bytes;
Vector<int> fds;
};
BytesAndFds dequeue(size_t max_bytes);
void return_unsent_data_to_front_of_queue(ReadonlyBytes const& bytes, Vector<int> const& fds);
private:
Vector<u8> m_bytes;
Vector<int> m_fds;
Threading::Mutex m_mutex;
Threading::ConditionVariable m_condition { m_mutex };
bool m_running { true };
};
class TransportSocket {
AK_MAKE_NONCOPYABLE(TransportSocket);
AK_MAKE_NONMOVABLE(TransportSocket);
@ -76,7 +101,7 @@ public:
ErrorOr<IPC::File> clone_for_transfer();
private:
static ErrorOr<void> send_message(Core::LocalSocket&, ReadonlyBytes&&, Vector<int, 1> const& unowned_fds);
static ErrorOr<void> send_message(Core::LocalSocket&, ReadonlyBytes& bytes, Vector<int>& unowned_fds);
NonnullOwnPtr<Core::LocalSocket> m_socket;
ByteBuffer m_unprocessed_bytes;
@ -87,19 +112,8 @@ private:
// descriptor contained in the message before the peer receives it. https://openradar.me/9477351
Queue<NonnullRefPtr<AutoCloseFileDescriptor>> m_fds_retained_until_received_by_peer;
struct MessageToSend {
Vector<u8> bytes;
Vector<int, 1> fds;
};
struct SendQueue : public AtomicRefCounted<SendQueue> {
AK::SinglyLinkedList<MessageToSend> messages;
Threading::Mutex mutex;
Threading::ConditionVariable condition { mutex };
bool running { true };
};
RefPtr<Threading::Thread> m_send_thread;
RefPtr<SendQueue> m_send_queue;
void queue_message_on_send_thread(MessageToSend&&) const;
};
}