diff --git a/Libraries/LibIPC/TransportSocket.cpp b/Libraries/LibIPC/TransportSocket.cpp index 0c7a2064cd4..62136877af2 100644 --- a/Libraries/LibIPC/TransportSocket.cpp +++ b/Libraries/LibIPC/TransportSocket.cpp @@ -16,7 +16,7 @@ namespace IPC { void SendQueue::enqueue_message(Vector&& bytes, Vector&& fds) { Threading::MutexLocker locker(m_mutex); - m_bytes.append(bytes.data(), bytes.size()); + VERIFY(MUST(m_stream.write_some(bytes.span())) == bytes.size()); m_fds.append(fds.data(), fds.size()); m_condition.signal(); } @@ -24,26 +24,27 @@ void SendQueue::enqueue_message(Vector&& bytes, Vector&& fds) SendQueue::Running SendQueue::block_until_message_enqueued() { Threading::MutexLocker locker(m_mutex); - while (m_bytes.is_empty() && m_fds.is_empty() && m_running) + while (m_stream.is_eof() && m_fds.is_empty() && m_running) m_condition.wait(); return m_running ? Running::Yes : Running::No; } -SendQueue::BytesAndFds SendQueue::dequeue(size_t max_bytes) +SendQueue::BytesAndFds SendQueue::peek(size_t max_bytes) { Threading::MutexLocker locker(m_mutex); - auto bytes_to_send = min(max_bytes, m_bytes.size()); - Vector bytes; - bytes.append(m_bytes.data(), bytes_to_send); - m_bytes.remove(0, bytes_to_send); - return { move(bytes), move(m_fds) }; + BytesAndFds result; + auto bytes_to_send = min(max_bytes, m_stream.used_buffer_size()); + result.bytes.resize(bytes_to_send); + m_stream.peek_some(result.bytes); + result.fds = m_fds; + return result; } -void SendQueue::return_unsent_data_to_front_of_queue(ReadonlyBytes const& bytes, Vector const& fds) +void SendQueue::discard(size_t bytes_count, size_t fds_count) { Threading::MutexLocker locker(m_mutex); - m_bytes.prepend(bytes.data(), bytes.size()); - m_fds.prepend(fds.data(), fds.size()); + MUST(m_stream.discard(bytes_count)); + m_fds.remove(0, fds_count); } void SendQueue::stop() @@ -62,7 +63,8 @@ TransportSocket::TransportSocket(NonnullOwnPtr socket) if (send_queue->block_until_message_enqueued() == SendQueue::Running::No) break; - auto [bytes, fds] = send_queue->dequeue(4096); + auto [bytes, fds] = send_queue->peek(4096); + auto fds_count = fds.size(); ReadonlyBytes remaining_to_send_bytes = bytes; Threading::RWLockLocker lock(m_socket_rw_lock); @@ -72,8 +74,10 @@ TransportSocket::TransportSocket(NonnullOwnPtr socket) VERIFY_NOT_REACHED(); } - if (!remaining_to_send_bytes.is_empty() || !fds.is_empty()) { - send_queue->return_unsent_data_to_front_of_queue(remaining_to_send_bytes, fds); + auto written_bytes_count = bytes.size() - remaining_to_send_bytes.size(); + auto written_fds_count = fds_count - fds.size(); + if (written_bytes_count > 0 || written_fds_count > 0) { + send_queue->discard(written_bytes_count, written_fds_count); } if (!m_socket->is_open()) diff --git a/Libraries/LibIPC/TransportSocket.h b/Libraries/LibIPC/TransportSocket.h index e7a19f4e159..0469c1d8b07 100644 --- a/Libraries/LibIPC/TransportSocket.h +++ b/Libraries/LibIPC/TransportSocket.h @@ -7,6 +7,7 @@ #pragma once +#include #include #include #include @@ -56,11 +57,11 @@ public: Vector bytes; Vector fds; }; - BytesAndFds dequeue(size_t max_bytes); - void return_unsent_data_to_front_of_queue(ReadonlyBytes const& bytes, Vector const& fds); + BytesAndFds peek(size_t max_bytes); + void discard(size_t bytes_count, size_t fds_count); private: - Vector m_bytes; + AllocatingMemoryStream m_stream; Vector m_fds; Threading::Mutex m_mutex; Threading::ConditionVariable m_condition { m_mutex };