diff --git a/Userland/Libraries/LibIPC/CMakeLists.txt b/Userland/Libraries/LibIPC/CMakeLists.txt index a6c508d8a34..7af0aa27752 100644 --- a/Userland/Libraries/LibIPC/CMakeLists.txt +++ b/Userland/Libraries/LibIPC/CMakeLists.txt @@ -6,4 +6,4 @@ set(SOURCES ) serenity_lib(LibIPC ipc) -target_link_libraries(LibIPC PRIVATE LibCore LibURL) +target_link_libraries(LibIPC PRIVATE LibCore LibURL LibThreading) diff --git a/Userland/Libraries/LibIPC/Connection.cpp b/Userland/Libraries/LibIPC/Connection.cpp index 9fb4502d7f7..2f441f93df1 100644 --- a/Userland/Libraries/LibIPC/Connection.cpp +++ b/Userland/Libraries/LibIPC/Connection.cpp @@ -1,5 +1,5 @@ /* - * Copyright (c) 2021, Andreas Kling + * Copyright (c) 2021-2024, Andreas Kling * Copyright (c) 2022, the SerenityOS developers. * * SPDX-License-Identifier: BSD-2-Clause @@ -26,9 +26,41 @@ ConnectionBase::ConnectionBase(IPC::Stub& local_stub, NonnullOwnPtr intptr_t { + for (;;) { + queue->mutex.lock(); + if (queue->messages.is_empty()) + queue->condition.wait(); + + if (!queue->running) { + queue->mutex.unlock(); + break; + } + + auto message = queue->messages.take_first(); + queue->mutex.unlock(); + + if (auto result = message.transfer_message(*m_socket); result.is_error()) { + dbgln("ConnectionBase::send_thread: {}", result.error()); + continue; + } + } + return 0; + }); + m_send_thread->start(); } -ConnectionBase::~ConnectionBase() = default; +ConnectionBase::~ConnectionBase() +{ + { + Threading::MutexLocker locker(m_send_queue->mutex); + m_send_queue->running = false; + m_send_queue->condition.signal(); + } + m_send_thread->detach(); +} bool ConnectionBase::is_open() const { @@ -47,9 +79,10 @@ ErrorOr ConnectionBase::post_message(MessageBuffer buffer) if (!m_socket->is_open()) return Error::from_string_literal("Trying to post_message during IPC shutdown"); - if (auto result = buffer.transfer_message(*m_socket); result.is_error()) { - shutdown_with_error(result.error()); - return result.release_error(); + { + Threading::MutexLocker locker(m_send_queue->mutex); + m_send_queue->messages.append(move(buffer)); + m_send_queue->condition.signal(); } m_responsiveness_timer->start(); diff --git a/Userland/Libraries/LibIPC/Connection.h b/Userland/Libraries/LibIPC/Connection.h index df066084f10..982c8038106 100644 --- a/Userland/Libraries/LibIPC/Connection.h +++ b/Userland/Libraries/LibIPC/Connection.h @@ -1,5 +1,5 @@ /* - * Copyright (c) 2018-2020, Andreas Kling + * Copyright (c) 2018-2024, Andreas Kling * Copyright (c) 2022, the SerenityOS developers. * * SPDX-License-Identifier: BSD-2-Clause @@ -12,6 +12,9 @@ #include #include #include +#include +#include +#include namespace IPC { @@ -57,6 +60,16 @@ protected: ByteBuffer m_unprocessed_bytes; u32 m_local_endpoint_magic { 0 }; + + struct SendQueue : public AtomicRefCounted { + AK::SinglyLinkedList messages; + Threading::Mutex mutex; + Threading::ConditionVariable condition { mutex }; + bool running { true }; + }; + + RefPtr m_send_thread; + RefPtr m_send_queue; }; template