diff --git a/Utilities/lockless.h b/Utilities/lockless.h index f7b98cd65c..430ce4cd30 100644 --- a/Utilities/lockless.h +++ b/Utilities/lockless.h @@ -66,9 +66,9 @@ public: constexpr lf_fifo() = default; // Get current "push" position - u32 size() + u32 size() const { - return reinterpret_cast&>(m_ctrl).load(); // Hack + return reinterpret_cast&>(m_ctrl).load(); // Hack } // Acquire the place for one or more elements. @@ -78,7 +78,7 @@ public: } // Get current "pop" position - u32 peek() + u32 peek() const { return m_ctrl.load().pop; } @@ -145,3 +145,186 @@ public: } } }; + +// Fixed-size single-producer single-consumer queue +template +class lf_spsc +{ + // If N is a power of 2, m_push/m_pop can safely overflow and the algorithm is simplified + static_assert(N && (1u << 31) % N == 0, "lf_spsc<> error: size must be power of 2"); + +protected: + volatile std::uint32_t m_push{0}; + volatile std::uint32_t m_pop{0}; + T m_data[N]{}; + +public: + constexpr lf_spsc() = default; + + // Try to push (producer only) + template + bool try_push(T2&& data) + { + const std::uint32_t pos = m_push; + + if (pos - m_pop >= N) + { + return false; + } + + _mm_lfence(); + m_data[pos % N] = std::forward(data); + _mm_sfence(); + m_push = pos + 1; + return true; + } + + // Try to get push pointer (producer only) + operator T*() + { + const std::uint32_t pos = m_push; + + if (pos - m_pop >= N) + { + return nullptr; + } + + _mm_lfence(); + return m_data + (pos % N); + } + + // Increment push counter (producer only) + void end_push() + { + const std::uint32_t pos = m_push; + + if (pos - m_pop < N) + { + _mm_sfence(); + m_push = pos + 1; + } + } + + // Unsafe access + T& get_push(std::size_t i) + { + _mm_lfence(); + return m_data[(m_push + i) % N]; + } + + // Try to pop (consumer only) + template + bool try_pop(T2& out) + { + const std::uint32_t pos = m_pop; + + if (m_push - pos <= 0) + { + return false; + } + + _mm_lfence(); + out = std::move(m_data[pos % N]); + _mm_sfence(); + m_pop = pos + 1; + return true; + } + + // Increment pop counter (consumer only) + void end_pop() + { + const std::uint32_t pos = m_pop; + + if (m_push - pos > 0) + { + _mm_sfence(); + m_pop = pos + 1; + } + } + + // Get size (consumer only) + std::uint32_t size() const + { + return m_push - m_pop; + } + + // Direct access (consumer only) + T& operator [](std::size_t i) + { + _mm_lfence(); + return m_data[(m_pop + i) % N]; + } +}; + +// Fixed-size multi-producer single-consumer queue +template +class lf_mpsc : lf_spsc +{ +protected: + using lf_spsc::m_push; + using lf_spsc::m_pop; + using lf_spsc::m_data; + + enum : std::uint64_t + { + c_ack = 1ull << 0, + c_rel = 1ull << 32, + }; + + atomic_t m_lock{0}; + + void release(std::uint64_t value) + { + // Push all pending elements at once when possible + if (value && value % c_rel == value / c_rel) + { + _mm_sfence(); + m_push += value % c_rel; + m_lock.compare_and_swap_test(value, 0); + } + } + +public: + constexpr lf_mpsc() = default; + + // Try to get push pointer + operator T*() + { + const std::uint64_t old = m_lock.fetch_add(c_ack); + const std::uint32_t pos = m_push; + + if (old % N >= N || pos - m_pop >= N - (old % N)) + { + release(m_lock.sub_fetch(c_ack)); + return nullptr; + } + + return m_data + ((pos + old) % N); + } + + // Increment push counter (producer only) + void end_push() + { + release(m_lock.add_fetch(c_rel)); + } + + // Try to push + template + bool try_push(T2&& data) + { + if (T* ptr = *this) + { + *ptr = std::forward(data); + end_push(); + return true; + } + + return false; + } + + // Enable consumer methods + using lf_spsc::try_pop; + using lf_spsc::end_pop; + using lf_spsc::size; + using lf_spsc::operator []; +};