diff --git a/Utilities/cond.cpp b/Utilities/cond.cpp index f15d30cdac..f6d819333d 100644 --- a/Utilities/cond.cpp +++ b/Utilities/cond.cpp @@ -1,5 +1,6 @@ #include "cond.h" #include "sync.h" +#include "lockless.h" #include @@ -267,3 +268,27 @@ void cond_x16::imp_notify() noexcept balanced_awaken(m_cvx16, utils::popcnt16(wait_mask)); } + +bool lf_queue_base::wait(u64 _timeout) +{ + return balanced_wait_until(m_head, _timeout, [](std::uintptr_t& head, auto... ret) -> int + { + if (head != 1) + { + return +1; + } + + if constexpr (sizeof...(ret)) + { + head = 0; + return -1; + } + + return 0; + }); +} + +void lf_queue_base::imp_notify() +{ + balanced_awaken(m_head, 1); +} diff --git a/Utilities/lockless.h b/Utilities/lockless.h index 67eb7434ea..0f8d17f23b 100644 --- a/Utilities/lockless.h +++ b/Utilities/lockless.h @@ -311,17 +311,28 @@ public: } }; +class lf_queue_base +{ +protected: + atomic_t m_head = 0; + + void imp_notify(); + +public: + // Wait for new elements pushed, no other thread shall call wait() or pop_all() simultaneously + bool wait(u64 usec_timeout = -1); +}; + // Linked list-based multi-producer queue (the consumer drains the whole queue at once) template -class lf_queue +class lf_queue : public lf_queue_base { - // Elements are added by replacing m_head - atomic_t*> m_head = nullptr; + using lf_queue_base::m_head; // Extract all elements and reverse element order (FILO to FIFO) lf_queue_item* reverse() noexcept { - if (auto* head = m_head.load() ? m_head.exchange(nullptr) : nullptr) + if (auto* head = m_head.load() ? reinterpret_cast*>(m_head.exchange(0)) : nullptr) { if (auto* prev = head->m_link) { @@ -347,18 +358,23 @@ public: ~lf_queue() { - delete m_head.load(); + delete reinterpret_cast(m_head.load()); } template void push(Args&&... args) { - auto* old = m_head.load(); - auto* item = new lf_queue_item(old, std::forward(args)...); + auto _old = m_head.load(); + auto* item = new lf_queue_item(_old & 1 ? nullptr : reinterpret_cast*>(_old), std::forward(args)...); - while (!m_head.compare_exchange(old, item)) + while (!m_head.compare_exchange(_old, reinterpret_cast(item))) { - item->m_link = old; + item->m_link = _old & 1 ? nullptr : reinterpret_cast*>(_old); + } + + if (_old & 1) + { + lf_queue_base::imp_notify(); } } diff --git a/Utilities/sync.h b/Utilities/sync.h index 6f9c1eeb71..95a7e37226 100644 --- a/Utilities/sync.h +++ b/Utilities/sync.h @@ -153,7 +153,7 @@ inline int futex(volatile void* uaddr, int futex_op, uint val, const timespec* t template bool balanced_wait_until(atomic_t& var, u64 usec_timeout, Pred&& pred) { - static_assert(sizeof(T) == 4); + static_assert(sizeof(T) == 4 || sizeof(T) == 8); const bool is_inf = usec_timeout > u64{UINT32_MAX / 1000} * 1000000; @@ -184,7 +184,7 @@ bool balanced_wait_until(atomic_t& var, u64 usec_timeout, Pred&& pred) { while (!test_pred(value)) { - if (OptWaitOnAddress(&var, &value, sizeof(u32), is_inf ? INFINITE : usec_timeout / 1000)) + if (OptWaitOnAddress(&var, &value, sizeof(T), is_inf ? INFINITE : usec_timeout / 1000)) { if (!test_pred(value) && !test_pred(value, nullptr)) { @@ -260,7 +260,7 @@ bool balanced_wait_until(atomic_t& var, u64 usec_timeout, Pred&& pred) template void balanced_awaken(atomic_t& var, u32 weight) { - static_assert(sizeof(T) == 4); + static_assert(sizeof(T) == 4 || sizeof(T) == 8); #ifdef _WIN32 if (OptWaitOnAddress)