diff --git a/Libraries/LibIPC/TransportSocket.cpp b/Libraries/LibIPC/TransportSocket.cpp index 86239539ce7..67c32044488 100644 --- a/Libraries/LibIPC/TransportSocket.cpp +++ b/Libraries/LibIPC/TransportSocket.cpp @@ -13,26 +13,79 @@ namespace IPC { +void SendQueue::enqueue_message(Vector&& bytes, Vector&& fds) +{ + Threading::MutexLocker locker(m_mutex); + m_bytes.append(bytes.data(), bytes.size()); + m_fds.append(fds.data(), fds.size()); + m_condition.signal(); +} + +SendQueue::Running SendQueue::block_until_message_enqueued() +{ + Threading::MutexLocker locker(m_mutex); + while (m_bytes.is_empty() && m_fds.is_empty() && m_running) + m_condition.wait(); + return m_running ? Running::Yes : Running::No; +} + +SendQueue::BytesAndFds SendQueue::dequeue(size_t max_bytes) +{ + Threading::MutexLocker locker(m_mutex); + auto bytes_to_send = min(max_bytes, m_bytes.size()); + Vector bytes; + bytes.append(m_bytes.data(), bytes_to_send); + m_bytes.remove(0, bytes_to_send); + return { move(bytes), move(m_fds) }; +} + +void SendQueue::return_unsent_data_to_front_of_queue(ReadonlyBytes const& bytes, Vector const& fds) +{ + Threading::MutexLocker locker(m_mutex); + m_bytes.prepend(bytes.data(), bytes.size()); + m_fds.prepend(fds.data(), fds.size()); +} + +void SendQueue::stop() +{ + Threading::MutexLocker locker(m_mutex); + m_running = false; + m_condition.signal(); +} + TransportSocket::TransportSocket(NonnullOwnPtr socket) : m_socket(move(socket)) { m_send_queue = adopt_ref(*new SendQueue); m_send_thread = Threading::Thread::construct([this, send_queue = m_send_queue]() -> intptr_t { for (;;) { - send_queue->mutex.lock(); - while (send_queue->messages.is_empty() && send_queue->running) - send_queue->condition.wait(); - - if (!send_queue->running) { - send_queue->mutex.unlock(); + if (send_queue->block_until_message_enqueued() == SendQueue::Running::No) break; + + auto [bytes, fds] = send_queue->dequeue(4096); + ReadonlyBytes remaining_to_send_bytes = bytes; + + auto result = send_message(*m_socket, remaining_to_send_bytes, fds); + if (result.is_error()) { + dbgln("TransportSocket::send_thread: {}", result.error()); + VERIFY_NOT_REACHED(); } - auto [bytes, fds] = send_queue->messages.take_first(); - send_queue->mutex.unlock(); + if (!remaining_to_send_bytes.is_empty() || !fds.is_empty()) { + send_queue->return_unsent_data_to_front_of_queue(remaining_to_send_bytes, fds); + } - if (auto result = send_message(*m_socket, bytes, fds); result.is_error()) { - dbgln("TransportSocket::send_thread: {}", result.error()); + if (!m_socket->is_open()) + break; + + { + Vector pollfds; + pollfds.append({ .fd = m_socket->fd().value(), .events = POLLOUT, .revents = 0 }); + + ErrorOr result { 0 }; + do { + result = Core::System::poll(pollfds, -1); + } while (result.is_error() && result.error().code() == EINTR); } } return 0; @@ -45,11 +98,7 @@ TransportSocket::TransportSocket(NonnullOwnPtr socket) TransportSocket::~TransportSocket() { - { - Threading::MutexLocker locker(m_send_queue->mutex); - m_send_queue->running = false; - m_send_queue->condition.signal(); - } + m_send_queue->stop(); (void)m_send_thread->join(); } @@ -114,61 +163,31 @@ void TransportSocket::post_message(Vector const& bytes_to_write, Vectorenqueue_message(move(message_buffer), move(raw_fds)); } -void TransportSocket::queue_message_on_send_thread(MessageToSend&& message_to_send) const -{ - Threading::MutexLocker lock(m_send_queue->mutex); - m_send_queue->messages.append(move(message_to_send)); - m_send_queue->condition.signal(); -} - -ErrorOr TransportSocket::send_message(Core::LocalSocket& socket, ReadonlyBytes&& bytes_to_write, Vector const& unowned_fds) +ErrorOr TransportSocket::send_message(Core::LocalSocket& socket, ReadonlyBytes& bytes_to_write, Vector& unowned_fds) { auto num_fds_to_transfer = unowned_fds.size(); while (!bytes_to_write.is_empty()) { ErrorOr maybe_nwritten = 0; if (num_fds_to_transfer > 0) { maybe_nwritten = socket.send_message(bytes_to_write, 0, unowned_fds); - if (!maybe_nwritten.is_error()) - num_fds_to_transfer = 0; } else { maybe_nwritten = socket.write_some(bytes_to_write); } if (maybe_nwritten.is_error()) { - if (auto error = maybe_nwritten.release_error(); error.is_errno() && (error.code() == EAGAIN || error.code() == EWOULDBLOCK)) { - - // FIXME: Refactor this to pass the unwritten bytes back to the caller to send 'later' - // or next time the socket is writable - Vector pollfds; - if (pollfds.is_empty()) - pollfds.append({ .fd = socket.fd().value(), .events = POLLOUT, .revents = 0 }); - - ErrorOr result { 0 }; - do { - constexpr u32 POLL_TIMEOUT_MS = 100; - result = Core::System::poll(pollfds, POLL_TIMEOUT_MS); - } while (result.is_error() && result.error().code() == EINTR); - - if (!result.is_error() && result.value() != 0) - continue; - - switch (error.code()) { - case EPIPE: - return Error::from_string_literal("IPC::transfer_message: Disconnected from peer"); - case EAGAIN: - return Error::from_string_literal("IPC::transfer_message: Timed out waiting for socket to become writable"); - default: - return Error::from_syscall("IPC::transfer_message write"sv, -error.code()); - } + if (auto error = maybe_nwritten.release_error(); error.is_errno() && (error.code() == EAGAIN || error.code() == EWOULDBLOCK || error.code() == EINTR)) { + return {}; } else { return error; } } bytes_to_write = bytes_to_write.slice(maybe_nwritten.value()); + num_fds_to_transfer = 0; + unowned_fds.clear(); } return {}; } @@ -252,7 +271,7 @@ TransportSocket::ShouldShutdown TransportSocket::read_as_many_messages_as_possib header.fd_count = received_fd_count; header.type = MessageHeader::Type::FileDescriptorAcknowledgement; memcpy(message_buffer.data(), &header, sizeof(MessageHeader)); - queue_message_on_send_thread({ move(message_buffer), {} }); + m_send_queue->enqueue_message(move(message_buffer), {}); } if (index < m_unprocessed_bytes.size()) { diff --git a/Libraries/LibIPC/TransportSocket.h b/Libraries/LibIPC/TransportSocket.h index 2c466d0f08c..b980fdb739c 100644 --- a/Libraries/LibIPC/TransportSocket.h +++ b/Libraries/LibIPC/TransportSocket.h @@ -42,6 +42,31 @@ private: int m_fd; }; +class SendQueue : public AtomicRefCounted { +public: + enum class Running { + No, + Yes, + }; + Running block_until_message_enqueued(); + void stop(); + + void enqueue_message(Vector&& bytes, Vector&& fds); + struct BytesAndFds { + Vector bytes; + Vector fds; + }; + BytesAndFds dequeue(size_t max_bytes); + void return_unsent_data_to_front_of_queue(ReadonlyBytes const& bytes, Vector const& fds); + +private: + Vector m_bytes; + Vector m_fds; + Threading::Mutex m_mutex; + Threading::ConditionVariable m_condition { m_mutex }; + bool m_running { true }; +}; + class TransportSocket { AK_MAKE_NONCOPYABLE(TransportSocket); AK_MAKE_NONMOVABLE(TransportSocket); @@ -68,7 +93,7 @@ public: Vector bytes; Vector fds; }; - ShouldShutdown read_as_many_messages_as_possible_without_blocking(Function&& schedule_shutdown); + ShouldShutdown read_as_many_messages_as_possible_without_blocking(Function&&); // Obnoxious name to make it clear that this is a dangerous operation. ErrorOr release_underlying_transport_for_transfer(); @@ -76,7 +101,7 @@ public: ErrorOr clone_for_transfer(); private: - static ErrorOr send_message(Core::LocalSocket&, ReadonlyBytes&&, Vector const& unowned_fds); + static ErrorOr send_message(Core::LocalSocket&, ReadonlyBytes& bytes, Vector& unowned_fds); NonnullOwnPtr m_socket; ByteBuffer m_unprocessed_bytes; @@ -87,19 +112,8 @@ private: // descriptor contained in the message before the peer receives it. https://openradar.me/9477351 Queue> m_fds_retained_until_received_by_peer; - struct MessageToSend { - Vector bytes; - Vector fds; - }; - struct SendQueue : public AtomicRefCounted { - AK::SinglyLinkedList messages; - Threading::Mutex mutex; - Threading::ConditionVariable condition { mutex }; - bool running { true }; - }; RefPtr m_send_thread; RefPtr m_send_queue; - void queue_message_on_send_thread(MessageToSend&&) const; }; }