From 3fcdbef327dec160b8b91e1425251abc8e444098 Mon Sep 17 00:00:00 2001 From: Tim Ledbetter Date: Thu, 10 Apr 2025 10:16:13 +0100 Subject: [PATCH] =?UTF-8?q?Revert=20"LibIPC:=20Change=20TransportSocket=20?= =?UTF-8?q?to=20write=20large=20messages=20in=E2=80=A6"?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit …small chunks. This reverts commit d6080d1fdc8dcb6a7ca1c9b9ead0bd69b9d60ede. --- Libraries/LibIPC/TransportSocket.cpp | 119 +++++++++++---------------- Libraries/LibIPC/TransportSocket.h | 38 +++------ 2 files changed, 62 insertions(+), 95 deletions(-) diff --git a/Libraries/LibIPC/TransportSocket.cpp b/Libraries/LibIPC/TransportSocket.cpp index 5800e165333..056d956407c 100644 --- a/Libraries/LibIPC/TransportSocket.cpp +++ b/Libraries/LibIPC/TransportSocket.cpp @@ -13,77 +13,26 @@ namespace IPC { -void SendQueue::enqueue_message(Vector&& bytes, Vector&& fds) -{ - Threading::MutexLocker locker(m_mutex); - m_bytes.append(bytes.data(), bytes.size()); - m_fds.append(fds.data(), fds.size()); - m_condition.signal(); -} - -SendQueue::Running SendQueue::block_until_message_enqueued() -{ - Threading::MutexLocker locker(m_mutex); - while (m_bytes.is_empty() && m_fds.is_empty() && m_running) - m_condition.wait(); - return m_running ? Running::Yes : Running::No; -} - -SendQueue::BytesAndFds SendQueue::dequeue(size_t max_bytes) -{ - Threading::MutexLocker locker(m_mutex); - auto bytes_to_send = min(max_bytes, m_bytes.size()); - Vector bytes; - bytes.append(m_bytes.data(), bytes_to_send); - m_bytes.remove(0, bytes_to_send); - return { move(bytes), move(m_fds) }; -} - -void SendQueue::return_unsent_data_to_front_of_queue(ReadonlyBytes const& bytes, Vector const& fds) -{ - Threading::MutexLocker locker(m_mutex); - m_bytes.prepend(bytes.data(), bytes.size()); - m_fds.prepend(fds.data(), fds.size()); -} - -void SendQueue::stop() -{ - Threading::MutexLocker locker(m_mutex); - m_running = false; - m_condition.signal(); -} - TransportSocket::TransportSocket(NonnullOwnPtr socket) : m_socket(move(socket)) { m_send_queue = adopt_ref(*new SendQueue); m_send_thread = Threading::Thread::construct([this, send_queue = m_send_queue]() -> intptr_t { for (;;) { - if (send_queue->block_until_message_enqueued() == SendQueue::Running::No) + send_queue->mutex.lock(); + while (send_queue->messages.is_empty() && send_queue->running) + send_queue->condition.wait(); + + if (!send_queue->running) { + send_queue->mutex.unlock(); break; + } - auto [bytes, fds] = send_queue->dequeue(4096); - ReadonlyBytes bytes_to_send = bytes; + auto [bytes, fds] = send_queue->messages.take_first(); + send_queue->mutex.unlock(); - auto result = send_message(*m_socket, bytes_to_send, fds); - if (result.is_error()) { + if (auto result = send_message(*m_socket, bytes, fds); result.is_error()) { dbgln("TransportSocket::send_thread: {}", result.error()); - VERIFY_NOT_REACHED(); - } - - if (!bytes.is_empty() || !fds.is_empty()) { - send_queue->return_unsent_data_to_front_of_queue(bytes_to_send, fds); - } - - { - Vector pollfds; - if (pollfds.is_empty()) - pollfds.append({ .fd = m_socket->fd().value(), .events = POLLOUT, .revents = 0 }); - - ErrorOr result { 0 }; - do { - result = Core::System::poll(pollfds, -1); - } while (result.is_error() && result.error().code() == EINTR); } } return 0; @@ -96,7 +45,11 @@ TransportSocket::TransportSocket(NonnullOwnPtr socket) TransportSocket::~TransportSocket() { - m_send_queue->stop(); + { + Threading::MutexLocker locker(m_send_queue->mutex); + m_send_queue->running = false; + m_send_queue->condition.signal(); + } (void)m_send_thread->join(); } @@ -161,27 +114,55 @@ void TransportSocket::post_message(Vector const& bytes_to_write, Vectorenqueue_message(move(message_buffer), move(raw_fds)); + queue_message_on_send_thread({ move(message_buffer), move(raw_fds) }); } -ErrorOr TransportSocket::send_message(Core::LocalSocket& socket, ReadonlyBytes& bytes_to_write, Vector& unowned_fds) +void TransportSocket::queue_message_on_send_thread(MessageToSend&& message_to_send) const +{ + Threading::MutexLocker lock(m_send_queue->mutex); + m_send_queue->messages.append(move(message_to_send)); + m_send_queue->condition.signal(); +} + +ErrorOr TransportSocket::send_message(Core::LocalSocket& socket, ReadonlyBytes&& bytes_to_write, Vector const& unowned_fds) { auto num_fds_to_transfer = unowned_fds.size(); while (!bytes_to_write.is_empty()) { ErrorOr maybe_nwritten = 0; if (num_fds_to_transfer > 0) { maybe_nwritten = socket.send_message(bytes_to_write, 0, unowned_fds); - if (!maybe_nwritten.is_error()) { + if (!maybe_nwritten.is_error()) num_fds_to_transfer = 0; - unowned_fds.clear(); - } } else { maybe_nwritten = socket.write_some(bytes_to_write); } if (maybe_nwritten.is_error()) { if (auto error = maybe_nwritten.release_error(); error.is_errno() && (error.code() == EAGAIN || error.code() == EWOULDBLOCK)) { - return {}; + + // FIXME: Refactor this to pass the unwritten bytes back to the caller to send 'later' + // or next time the socket is writable + Vector pollfds; + if (pollfds.is_empty()) + pollfds.append({ .fd = socket.fd().value(), .events = POLLOUT, .revents = 0 }); + + ErrorOr result { 0 }; + do { + constexpr u32 POLL_TIMEOUT_MS = 100; + result = Core::System::poll(pollfds, POLL_TIMEOUT_MS); + } while (result.is_error() && result.error().code() == EINTR); + + if (!result.is_error() && result.value() != 0) + continue; + + switch (error.code()) { + case EPIPE: + return Error::from_string_literal("IPC::transfer_message: Disconnected from peer"); + case EAGAIN: + return Error::from_string_literal("IPC::transfer_message: Timed out waiting for socket to become writable"); + default: + return Error::from_syscall("IPC::transfer_message write"sv, -error.code()); + } } else { return error; } @@ -271,7 +252,7 @@ TransportSocket::ShouldShutdown TransportSocket::read_as_many_messages_as_possib header.fd_count = received_fd_count; header.type = MessageHeader::Type::FileDescriptorAcknowledgement; memcpy(message_buffer.data(), &header, sizeof(MessageHeader)); - m_send_queue->enqueue_message(move(message_buffer), {}); + queue_message_on_send_thread({ move(message_buffer), {} }); } if (index < m_unprocessed_bytes.size()) { diff --git a/Libraries/LibIPC/TransportSocket.h b/Libraries/LibIPC/TransportSocket.h index 1c3c2a64b0d..2a4dd3a983b 100644 --- a/Libraries/LibIPC/TransportSocket.h +++ b/Libraries/LibIPC/TransportSocket.h @@ -41,31 +41,6 @@ private: int m_fd; }; -class SendQueue : public AtomicRefCounted { -public: - enum class Running { - No, - Yes, - }; - Running block_until_message_enqueued(); - void stop(); - - void enqueue_message(Vector&& bytes, Vector&& fds); - struct BytesAndFds { - Vector bytes; - Vector fds; - }; - BytesAndFds dequeue(size_t max_bytes); - void return_unsent_data_to_front_of_queue(ReadonlyBytes const& bytes, Vector const& fds); - -private: - Vector m_bytes; - Vector m_fds; - Threading::Mutex m_mutex; - Threading::ConditionVariable m_condition { m_mutex }; - bool m_running { true }; -}; - class TransportSocket { AK_MAKE_NONCOPYABLE(TransportSocket); AK_MAKE_NONMOVABLE(TransportSocket); @@ -100,7 +75,7 @@ public: ErrorOr clone_for_transfer(); private: - static ErrorOr send_message(Core::LocalSocket&, ReadonlyBytes& bytes, Vector& unowned_fds); + static ErrorOr send_message(Core::LocalSocket&, ReadonlyBytes&&, Vector const& unowned_fds); NonnullOwnPtr m_socket; ByteBuffer m_unprocessed_bytes; @@ -111,8 +86,19 @@ private: // descriptor contained in the message before the peer receives it. https://openradar.me/9477351 Queue> m_fds_retained_until_received_by_peer; + struct MessageToSend { + Vector bytes; + Vector fds; + }; + struct SendQueue : public AtomicRefCounted { + AK::SinglyLinkedList messages; + Threading::Mutex mutex; + Threading::ConditionVariable condition { mutex }; + bool running { true }; + }; RefPtr m_send_thread; RefPtr m_send_queue; + void queue_message_on_send_thread(MessageToSend&&) const; }; }