diff --git a/Userland/Libraries/LibThreading/ThreadPool.h b/Userland/Libraries/LibThreading/ThreadPool.h new file mode 100644 index 00000000000..7992d113719 --- /dev/null +++ b/Userland/Libraries/LibThreading/ThreadPool.h @@ -0,0 +1,142 @@ +/* + * Copyright (c) 2024, Ali Mohammad Pur + * + * SPDX-License-Identifier: BSD-2-Clause + */ + +#pragma once + +#include +#include +#include +#include +#include +#include +#include + +namespace Threading { + +template +struct ThreadPoolLooper { + IterationDecision next(Pool& pool, bool wait) + { + Optional entry; + while (true) { + entry = pool.m_work_queue.with_locked([&](auto& queue) -> Optional { + if (queue.is_empty()) + return {}; + return queue.dequeue(); + }); + if (entry.has_value()) + break; + if (pool.m_should_exit) + return IterationDecision::Break; + + if (!wait) + return IterationDecision::Continue; + + pool.m_mutex.lock(); + pool.m_work_available.wait(); + pool.m_mutex.unlock(); + } + + pool.m_busy_count++; + pool.m_handler(entry.release_value()); + return IterationDecision::Continue; + } +}; + +template class Looper = ThreadPoolLooper> +class ThreadPool { + AK_MAKE_NONCOPYABLE(ThreadPool); + AK_MAKE_NONMOVABLE(ThreadPool); + +public: + using Work = TWork; + friend struct ThreadPoolLooper; + + ThreadPool(Optional concurrency = {}) + requires(IsFunction) + : m_handler([](Work work) { return work(); }) + , m_work_available(m_mutex) + , m_work_done(m_mutex) + { + initialize_workers(concurrency.value_or(Core::System::hardware_concurrency())); + } + + explicit ThreadPool(Function handler, Optional concurrency = {}) + : m_handler(move(handler)) + , m_work_available(m_mutex) + , m_work_done(m_mutex) + { + initialize_workers(concurrency.value_or(Core::System::hardware_concurrency())); + } + + ~ThreadPool() + { + m_should_exit.store(true, AK::MemoryOrder::memory_order_release); + for (auto& worker : m_workers) { + m_work_available.broadcast(); + (void)worker->join(); + } + } + + void submit(Work work) + { + m_work_queue.with_locked([&](auto& queue) { + queue.enqueue({ move(work) }); + }); + m_work_available.broadcast(); + } + + void wait_for_all() + { + while (true) { + if (m_work_queue.with_locked([](auto& queue) { return queue.is_empty(); })) + break; + m_mutex.lock(); + m_work_done.wait(); + m_mutex.unlock(); + } + + while (m_busy_count.load(AK::MemoryOrder::memory_order_acquire) > 0) { + m_mutex.lock(); + m_work_done.wait(); + m_mutex.unlock(); + } + } + +private: + void initialize_workers(size_t concurrency) + { + for (size_t i = 0; i < concurrency; ++i) { + m_workers.append(Thread::construct([this]() -> intptr_t { + Looper thread_looper; + for (; !m_should_exit;) { + auto result = thread_looper.next(*this, true); + m_busy_count--; + m_work_done.signal(); + if (result == IterationDecision::Break) + break; + } + + return 0; + }, + "ThreadPool worker"sv)); + } + + for (auto& worker : m_workers) + worker->start(); + } + + Vector> m_workers; + MutexProtected> m_work_queue; + Function m_handler; + Mutex m_mutex; + ConditionVariable m_work_available; + ConditionVariable m_work_done; + Atomic m_should_exit { false }; + Atomic m_busy_count { 0 }; +}; + +}