diff --git a/rpcs3/util/atomic.cpp b/rpcs3/util/atomic.cpp index 3f78f34d45..1e229e463b 100644 --- a/rpcs3/util/atomic.cpp +++ b/rpcs3/util/atomic.cpp @@ -1,15 +1,12 @@ #include "atomic.hpp" -#ifdef __linux__ +// TODO: something for other platforms +#if defined(__linux__) || !defined(_WIN32) #define USE_FUTEX #endif #include "Utilities/sync.h" -#ifdef USE_POSIX -#include -#endif - #include #include #include @@ -82,13 +79,59 @@ namespace // Reference counter, owning pointer, collision bit and optionally selected slot atomic_t addr_ref{}; - // Counter for waiting threads for the semaphore and allocated semaphore id - atomic_t sema_var{}; + // Allocated semaphore bits (max 60) + atomic_t sema_bits{}; + + // Semaphores (one per thread), data is platform-specific but 0 means empty + atomic_t sema_data[60]{}; + + atomic_t* sema_alloc() + { + const auto [bits, ok] = sema_bits.fetch_op([](u64& bits) + { + if (bits + 1 < (1ull << 60)) + { + // Set lowest clear bit + bits |= bits + 1; + return true; + } + + return false; + }); + + if (ok) [[likely]] + { + // Find lowest clear bit + const auto sema = &sema_data[std::countr_one(bits)]; + +#if defined(USE_FUTEX) || defined(_WIN32) + sema->release(1); +#endif + + return sema; + } + + return nullptr; + } + + void sema_free(atomic_t* sema) + { + if (sema < sema_data || sema >= std::end(sema_data)) + { + std::abort(); + } + + // Clear sema + sema->release(0); + + // Clear sema bit + sema_bits &= ~(1ull << (sema - sema_data)); + } }; } // Main hashtable for atomic wait. -static sync_var s_hashtable[s_hashtable_size]{}; +alignas(64) static sync_var s_hashtable[s_hashtable_size]{}; namespace { @@ -105,10 +148,10 @@ namespace static constexpr u32 s_slot_gcount = (s_hashtable_power > 7 ? 4096 : 256) / 64; // Array of slot branch objects -static slot_info s_slot_list[s_slot_gcount * 64]{}; +alignas(64) static slot_info s_slot_list[s_slot_gcount * 64]{}; // Allocation bits -static atomic_t s_slot_bits[s_slot_gcount]{}; +alignas(64) static atomic_t s_slot_bits[s_slot_gcount]{}; static u64 slot_alloc() { @@ -123,7 +166,7 @@ static u64 slot_alloc() for (u32 i = 0;; i++) { - const u32 group = (i + start) % s_slot_gcount; + const u32 group = (i + start * 8) % s_slot_gcount; const auto [bits, ok] = s_slot_bits[group].fetch_op([](u64& bits) { @@ -227,154 +270,14 @@ static void slot_free(std::uintptr_t iptr, sync_var* loc, u64 lv = 0) if (ok > 1 && _old & s_collision_bit) { + if (loc->sema_bits) + std::abort(); + // Deallocate slot on last waiter slot_free(_old); } } -// Number of search groups (defines max semaphore count as gcount * 64) -static constexpr u32 s_sema_gcount = 128; - -static constexpr u64 s_sema_mask = (s_sema_gcount * 64 - 1); - -#ifdef USE_POSIX -using sema_handle = sem_t; -#elif defined(USE_FUTEX) -namespace -{ - struct alignas(64) sema_handle - { - atomic_t sema; - }; -} -#elif defined(_WIN32) -using sema_handle = std::uint16_t; -#else -namespace -{ - struct dumb_sema - { - u64 count = 0; - std::mutex mutex; - std::condition_variable cond; - }; -} - -using sema_handle = std::unique_ptr; -#endif - -// Array of native semaphores -static sema_handle s_sema_list[64 * s_sema_gcount]{}; - -// Array of associated reference counters -static atomic_t s_sema_refs[64 * s_sema_gcount]{}; - -// Allocation bits (reserve first bit) -static atomic_t s_sema_bits[s_sema_gcount]{1}; - -static u32 sema_alloc() -{ - // Diversify search start points to reduce contention and increase immediate success chance -#ifdef _WIN32 - const u32 start = GetCurrentProcessorNumber(); -#elif __linux__ - const u32 start = sched_getcpu(); -#else - const u32 start = __rdtsc(); -#endif - - for (u32 i = 0; i < s_sema_gcount * 3; i++) - { - const u32 group = (i + start) % s_sema_gcount; - - const auto [bits, ok] = s_sema_bits[group].fetch_op([](u64& bits) - { - if (~bits) - { - // Set lowest clear bit - bits |= bits + 1; - return true; - } - - return false; - }); - - if (ok) - { - // Find lowest clear bit - const u32 id = group * 64 + static_cast(std::countr_one(bits)); - -#ifdef USE_POSIX - // Initialize semaphore (should be very fast) - sem_init(&s_sema_list[id], 0, 0); -#elif defined(_WIN32) || defined(USE_FUTEX) - // Do nothing -#else - if (!s_sema_list[id]) - { - s_sema_list[id] = std::make_unique(); - } -#endif - - // Initialize ref counter - if (s_sema_refs[id]++) - { - std::abort(); - } - - return id; - } - } - - return 0; -} - -static void sema_free(u32 id) -{ - if (id && id < 64 * s_sema_gcount) - { - // Dereference first - if (--s_sema_refs[id]) - { - return; - } - -#ifdef USE_POSIX - // Destroy semaphore (should be very fast) - sem_destroy(&s_sema_list[id]); -#else - // No action required -#endif - - // Reset allocation bit - s_sema_bits[id / 64] &= ~(1ull << (id % 64)); - } -} - -static bool sema_get(u32 id) -{ - if (id && id < 64 * s_sema_gcount) - { - // Increment only if the semaphore is allocated - if (s_sema_refs[id].fetch_op([](u64& refs) - { - if (refs) - { - // Increase reference from non-zero value - refs++; - return true; - } - - return false; - }).second) - { - return true; - } - } - - return false; -} - void atomic_storage_futex::wait(const void* data, std::size_t size, u64 old_value, u64 timeout, u64 mask) { if (!timeout) @@ -444,7 +347,14 @@ void atomic_storage_futex::wait(const void* data, std::size_t size, u64 old_valu if (!ok) { // Expected only on top level - return; + if (timeout + 1 || ptr_cmp(data, size, old_value, mask)) + { + return; + } + + // TODO + busy_wait(30000); + continue; } if (!_old || (_old & s_pointer_mask) == (iptr & s_pointer_mask)) @@ -470,111 +380,45 @@ void atomic_storage_futex::wait(const void* data, std::size_t size, u64 old_valu lv = eq_bits + 1; } - // Now try to reference a semaphore (allocate it if needed) - u32 sema_id = static_cast(slot->sema_var & s_sema_mask); + auto sema = slot->sema_alloc(); - for (u32 loop_count = 0; loop_count < 7; loop_count++) + while (!sema) { - // Try to allocate a semaphore - if (!sema_id) + if (timeout + 1 || ptr_cmp(data, size, old_value, mask)) { - const u32 sema = sema_alloc(); - - if (!sema) - { - break; - } - - sema_id = slot->sema_var.atomic_op([&](u64& value) -> u32 - { - if (value & s_sema_mask) - { - return static_cast(value & s_sema_mask); - } - - // Insert allocated semaphore - value += s_sema_mask + 1; - value |= sema; - return 0; - }); - - if (sema_id) - { - // Drop unnecessary allocation - sema_free(sema); - } - else - { - sema_id = sema; - break; - } + slot_free(iptr, &s_hashtable[iptr % s_hashtable_size]); + return; } - if (!sema_get(sema_id)) - { - sema_id = 0; - continue; - } - - // Try to increment sig (check semaphore validity) - const auto [_old, _new] = slot->sema_var.fetch_op([&](u64& value) -> u64 - { - if ((value & ~s_sema_mask) == ~s_sema_mask) - { - // Signal overflow - return 0; - } - - if ((value & s_sema_mask) != sema_id) - { - return 0; - } - - value += s_sema_mask + 1; - return value; - }); - - if (!_new) - { - sema_free(sema_id); - - if ((_old & ~s_sema_mask) == ~s_sema_mask) - { - // Break on signal overflow - sema_id = -1; - break; - } - - sema_id = _new & s_sema_mask; - continue; - } - - break; + // TODO + busy_wait(30000); + sema = slot->sema_alloc(); } + // Can skip unqueue process if true +#ifdef USE_FUTEX + bool fallback = true; +#else bool fallback = false; +#endif - if (sema_id && ptr_cmp(data, size, old_value, mask)) + while (ptr_cmp(data, size, old_value, mask)) { #ifdef USE_FUTEX struct timespec ts; ts.tv_sec = timeout / 1'000'000'000; ts.tv_nsec = timeout % 1'000'000'000; - if (s_sema_list[sema_id].sema.try_dec(0)) + if (sema->load() > 1) [[unlikely]] { - fallback = true; + // Signaled prematurely + sema->release(1); } else { - futex(&s_sema_list[sema_id].sema, FUTEX_WAIT_PRIVATE, 0, timeout + 1 ? &ts : nullptr); - - if (s_sema_list[sema_id].sema.try_dec(0)) - { - fallback = true; - } + futex(sema, FUTEX_WAIT_PRIVATE, 1, timeout + 1 ? &ts : nullptr); } -#elif defined(_WIN32) && !defined(USE_POSIX) +#elif defined(_WIN32) LARGE_INTEGER qw; qw.QuadPart = -static_cast(timeout / 100); @@ -584,142 +428,75 @@ void atomic_storage_futex::wait(const void* data, std::size_t size, u64 old_valu qw.QuadPart -= 1; } - if (!NtWaitForKeyedEvent(nullptr, &s_sema_list[sema_id], false, timeout + 1 ? &qw : nullptr)) + if (fallback) { - fallback = true; - } -#elif defined(USE_POSIX) - struct timespec ts; - clock_gettime(CLOCK_REALTIME, &ts); - ts.tv_sec += timeout / 1'000'000'000; - ts.tv_nsec += timeout % 1'000'000'000; - ts.tv_sec += ts.tv_nsec / 1'000'000'000; - ts.tv_nsec %= 1'000'000'000; - - // It's pretty unreliable because it uses absolute time, which may jump backwards. Sigh. - if (timeout + 1) - { - if (sem_timedwait(&s_sema_list[sema_id], &ts) == 0) - { - fallback = true; - } - } - else - { - if (sem_wait(&s_sema_list[sema_id]) == 0) - { - fallback = true; - } - } -#else - dumb_sema& sema = *s_sema_list[sema_id]; - - std::unique_lock lock(sema.mutex); - - if (timeout + 1) - { - sema.cond.wait_for(lock, std::chrono::nanoseconds(timeout), [&] - { - return sema.count > 0; - }); - } - else - { - sema.cond.wait(lock, [&] - { - return sema.count > 0; - }); + // Restart waiting + sema->release(1); + fallback = false; } - if (sema.count > 0) + if (!NtWaitForKeyedEvent(nullptr, sema, false, timeout + 1 ? &qw : nullptr)) { - sema.count--; + // Error code assumed to be timeout fallback = true; } #endif + + if (timeout + 1) + { + // TODO: reduce timeout instead + break; + } } - if (!sema_id) + while (!fallback) { - fallback = true; - } +#if defined(_WIN32) + static LARGE_INTEGER instant{}; - while (true) - { - // Try to decrement - const auto [prev, ok] = slot->sema_var.fetch_op([&](u64& value) - { - if (value) - { - // If timeout - if (!fallback) - { - if ((value & ~s_sema_mask) == 0 || (value & s_sema_mask) != sema_id) - { - // Give up if signaled or semaphore has already changed - return false; - } - - value -= s_sema_mask + 1; - - if ((value & ~s_sema_mask) == 0) - { - // Remove allocated sema on last waiter - value = 0; - } - } - - return true; - } - - return false; - }); - - if (ok || fallback) + if (sema->compare_and_swap_test(1, 2)) { + // Succeeded in self-notifying break; } -#ifdef USE_FUTEX - if (s_sema_list[sema_id].sema.try_dec(0)) + if (!NtWaitForKeyedEvent(nullptr, sema, false, &instant)) { - fallback = true; - } -#elif defined(_WIN32) && !defined(USE_POSIX) - static LARGE_INTEGER instant{}; - - if (!NtWaitForKeyedEvent(nullptr, &s_sema_list[sema_id], false, &instant)) - { - fallback = true; - } -#elif defined(USE_POSIX) - if (sem_trywait(&s_sema_list[sema_id]) == 0) - { - fallback = true; - } -#else - dumb_sema& sema = *s_sema_list[sema_id]; - - std::unique_lock lock(sema.mutex); - - if (sema.count > 0) - { - sema.count--; - fallback = true; + // Succeeded in obtaining an event without waiting + break; } #endif } - if (sema_id) - { - sema_free(sema_id); - } + slot->sema_free(sema); slot_free(iptr, &s_hashtable[iptr % s_hashtable_size]); s_tls_wait_cb(nullptr); } +// Platform specific wake-up function +static inline bool alert_sema(atomic_t* sema) +{ +#ifdef USE_FUTEX + if (sema->load() == 1 && sema->compare_and_swap_test(1, 2)) + { + // Use "wake all" arg for robustness, only 1 thread is expected + futex(sema, FUTEX_WAKE_PRIVATE, 0x7fff'ffff); + return true; + } +#elif defined(_WIN32) + if (sema->load() == 1 && sema->compare_and_swap_test(1, 2)) + { + // Can wait in rare cases, which is its annoying weakness + NtReleaseKeyedEvent(nullptr, sema, 1, nullptr); + return true; + } +#endif + + return false; +} + void atomic_storage_futex::set_wait_callback(bool(*cb)(const void* data)) { if (cb) @@ -747,58 +524,15 @@ void atomic_storage_futex::notify_one(const void* data) return; } - const u64 value = slot->sema_var; - - if ((value & ~s_sema_mask) == 0 || !(value & s_sema_mask)) + for (u64 bits = slot->sema_bits; bits; bits &= bits - 1) { - return; - } + const auto sema = &slot->sema_data[std::countr_zero(bits)]; - const u32 sema_id = static_cast(value & s_sema_mask); - - if (!sema_get(sema_id)) - { - return; - } - - const auto [_, ok] = slot->sema_var.fetch_op([&](u64& value) - { - if ((value & ~s_sema_mask) == 0 || (value & s_sema_mask) != sema_id) + if (alert_sema(sema)) { - return false; + break; } - - value -= s_sema_mask + 1; - - // Reset allocated semaphore on last waiter - if ((value & ~s_sema_mask) == 0) - { - value = 0; - } - - return true; - }); - - if (ok) - { -#ifdef USE_POSIX - sem_post(&s_sema_list[sema_id]); -#elif defined(USE_FUTEX) - s_sema_list[sema_id].sema++; - futex(&s_sema_list[sema_id].sema, FUTEX_WAKE_PRIVATE, 1); -#elif defined(_WIN32) - NtReleaseKeyedEvent(nullptr, &s_sema_list[sema_id], 1, nullptr); -#else - dumb_sema& sema = *s_sema_list[sema_id]; - - sema.mutex.lock(); - sema.count += 1; - sema.mutex.unlock(); - sema.cond.notify_one(); -#endif } - - sema_free(sema_id); } void atomic_storage_futex::notify_all(const void* data) @@ -812,54 +546,13 @@ void atomic_storage_futex::notify_all(const void* data) return; } - const u64 value = slot->sema_var; - - if ((value & ~s_sema_mask) == 0 || !(value & s_sema_mask)) + for (u64 bits = slot->sema_bits; bits; bits &= bits - 1) { - return; - } + const auto sema = &slot->sema_data[std::countr_zero(bits)]; - const u32 sema_id = static_cast(value & s_sema_mask); - - if (!sema_get(sema_id)) - { - return; - } - - const auto [_, count] = slot->sema_var.fetch_op([&](u64& value) -> u32 - { - if ((value & ~s_sema_mask) == 0 || (value & s_sema_mask) != sema_id) + if (alert_sema(sema)) { - return 0; + continue; } - - return (std::exchange(value, 0) & ~s_sema_mask) / (s_sema_mask + 1); - }); - -#ifdef USE_POSIX - for (u32 i = 0; i < count; i++) - { - sem_post(&s_sema_list[sema_id]); } -#elif defined(USE_FUTEX) - s_sema_list[sema_id].sema += count; - futex(&s_sema_list[sema_id].sema, FUTEX_WAKE_PRIVATE, 0x7fff'ffff); -#elif defined(_WIN32) - for (u32 i = 0; i < count; i++) - { - NtReleaseKeyedEvent(nullptr, &s_sema_list[sema_id], count, nullptr); - } -#else - if (count) - { - dumb_sema& sema = *s_sema_list[sema_id]; - - sema.mutex.lock(); - sema.count += count; - sema.mutex.unlock(); - sema.cond.notify_all(); - } -#endif - - sema_free(sema_id); }