LibIPC: Use AllocatingMemoryStream in TransportSocket send queue
Some checks are pending
CI / Lagom (x86_64, Fuzzers_CI, false, ubuntu-24.04, Linux, Clang) (push) Waiting to run
CI / Lagom (arm64, Sanitizer_CI, false, macos-15, macOS, Clang) (push) Waiting to run
CI / Lagom (x86_64, Sanitizer_CI, false, ubuntu-24.04, Linux, GNU) (push) Waiting to run
CI / Lagom (x86_64, Sanitizer_CI, true, ubuntu-24.04, Linux, Clang) (push) Waiting to run
Package the js repl as a binary artifact / build-and-package (arm64, macos-15, macOS, macOS-universal2) (push) Waiting to run
Package the js repl as a binary artifact / build-and-package (x86_64, ubuntu-24.04, Linux, Linux-x86_64) (push) Waiting to run
Run test262 and test-wasm / run_and_update_results (push) Waiting to run
Lint Code / lint (push) Waiting to run
Label PRs with merge conflicts / auto-labeler (push) Waiting to run
Push notes / build (push) Waiting to run

Memory stream is a more suitable container for the socket send queue,
as using it results in fewer allocations than trying to emulate a stream
using a Vector.
This commit is contained in:
Aliaksandr Kalenik 2025-04-14 03:22:10 +02:00 committed by Andreas Kling
commit 466c793fdb
Notes: github-actions[bot] 2025-04-15 16:49:48 +00:00
2 changed files with 22 additions and 17 deletions

View file

@ -16,7 +16,7 @@ namespace IPC {
void SendQueue::enqueue_message(Vector<u8>&& bytes, Vector<int>&& fds) void SendQueue::enqueue_message(Vector<u8>&& bytes, Vector<int>&& fds)
{ {
Threading::MutexLocker locker(m_mutex); Threading::MutexLocker locker(m_mutex);
m_bytes.append(bytes.data(), bytes.size()); VERIFY(MUST(m_stream.write_some(bytes.span())) == bytes.size());
m_fds.append(fds.data(), fds.size()); m_fds.append(fds.data(), fds.size());
m_condition.signal(); m_condition.signal();
} }
@ -24,26 +24,27 @@ void SendQueue::enqueue_message(Vector<u8>&& bytes, Vector<int>&& fds)
SendQueue::Running SendQueue::block_until_message_enqueued() SendQueue::Running SendQueue::block_until_message_enqueued()
{ {
Threading::MutexLocker locker(m_mutex); Threading::MutexLocker locker(m_mutex);
while (m_bytes.is_empty() && m_fds.is_empty() && m_running) while (m_stream.is_eof() && m_fds.is_empty() && m_running)
m_condition.wait(); m_condition.wait();
return m_running ? Running::Yes : Running::No; return m_running ? Running::Yes : Running::No;
} }
SendQueue::BytesAndFds SendQueue::dequeue(size_t max_bytes) SendQueue::BytesAndFds SendQueue::peek(size_t max_bytes)
{ {
Threading::MutexLocker locker(m_mutex); Threading::MutexLocker locker(m_mutex);
auto bytes_to_send = min(max_bytes, m_bytes.size()); BytesAndFds result;
Vector<u8> bytes; auto bytes_to_send = min(max_bytes, m_stream.used_buffer_size());
bytes.append(m_bytes.data(), bytes_to_send); result.bytes.resize(bytes_to_send);
m_bytes.remove(0, bytes_to_send); m_stream.peek_some(result.bytes);
return { move(bytes), move(m_fds) }; result.fds = m_fds;
return result;
} }
void SendQueue::return_unsent_data_to_front_of_queue(ReadonlyBytes const& bytes, Vector<int> const& fds) void SendQueue::discard(size_t bytes_count, size_t fds_count)
{ {
Threading::MutexLocker locker(m_mutex); Threading::MutexLocker locker(m_mutex);
m_bytes.prepend(bytes.data(), bytes.size()); MUST(m_stream.discard(bytes_count));
m_fds.prepend(fds.data(), fds.size()); m_fds.remove(0, fds_count);
} }
void SendQueue::stop() void SendQueue::stop()
@ -62,7 +63,8 @@ TransportSocket::TransportSocket(NonnullOwnPtr<Core::LocalSocket> socket)
if (send_queue->block_until_message_enqueued() == SendQueue::Running::No) if (send_queue->block_until_message_enqueued() == SendQueue::Running::No)
break; break;
auto [bytes, fds] = send_queue->dequeue(4096); auto [bytes, fds] = send_queue->peek(4096);
auto fds_count = fds.size();
ReadonlyBytes remaining_to_send_bytes = bytes; ReadonlyBytes remaining_to_send_bytes = bytes;
Threading::RWLockLocker<Threading::LockMode::Read> lock(m_socket_rw_lock); Threading::RWLockLocker<Threading::LockMode::Read> lock(m_socket_rw_lock);
@ -72,8 +74,10 @@ TransportSocket::TransportSocket(NonnullOwnPtr<Core::LocalSocket> socket)
VERIFY_NOT_REACHED(); VERIFY_NOT_REACHED();
} }
if (!remaining_to_send_bytes.is_empty() || !fds.is_empty()) { auto written_bytes_count = bytes.size() - remaining_to_send_bytes.size();
send_queue->return_unsent_data_to_front_of_queue(remaining_to_send_bytes, fds); auto written_fds_count = fds_count - fds.size();
if (written_bytes_count > 0 || written_fds_count > 0) {
send_queue->discard(written_bytes_count, written_fds_count);
} }
if (!m_socket->is_open()) if (!m_socket->is_open())

View file

@ -7,6 +7,7 @@
#pragma once #pragma once
#include <AK/MemoryStream.h>
#include <AK/Queue.h> #include <AK/Queue.h>
#include <LibCore/Socket.h> #include <LibCore/Socket.h>
#include <LibThreading/ConditionVariable.h> #include <LibThreading/ConditionVariable.h>
@ -56,11 +57,11 @@ public:
Vector<u8> bytes; Vector<u8> bytes;
Vector<int> fds; Vector<int> fds;
}; };
BytesAndFds dequeue(size_t max_bytes); BytesAndFds peek(size_t max_bytes);
void return_unsent_data_to_front_of_queue(ReadonlyBytes const& bytes, Vector<int> const& fds); void discard(size_t bytes_count, size_t fds_count);
private: private:
Vector<u8> m_bytes; AllocatingMemoryStream m_stream;
Vector<int> m_fds; Vector<int> m_fds;
Threading::Mutex m_mutex; Threading::Mutex m_mutex;
Threading::ConditionVariable m_condition { m_mutex }; Threading::ConditionVariable m_condition { m_mutex };