diff --git a/rpcs3/Emu/CPU/CPUThread.cpp b/rpcs3/Emu/CPU/CPUThread.cpp index 40afea9525..2e9d82b7a8 100644 --- a/rpcs3/Emu/CPU/CPUThread.cpp +++ b/rpcs3/Emu/CPU/CPUThread.cpp @@ -11,6 +11,7 @@ #include #include +#include #include DECLARE(cpu_thread::g_threads_created){0}; @@ -20,6 +21,8 @@ DECLARE(cpu_thread::g_suspend_counter){0}; LOG_CHANNEL(profiler); LOG_CHANNEL(sys_log, "SYS"); +static thread_local u64 s_tls_thread_slot = -1; + template <> void fmt_class_string::format(std::string& out, u64 arg) { @@ -253,20 +256,24 @@ struct cpu_counter alignas(64) atomic_t cpu_array_sema{0}; // Semaphore subdivision for each array slot (64 x N in total) - atomic_t cpu_array_bits[6]{}; + alignas(64) atomic_t cpu_array_bits[3]{}; + + // Copy of array bits for internal use + alignas(64) u64 cpu_copy_bits[3]{}; // All registered threads atomic_t cpu_array[sizeof(cpu_array_bits) * 8]{}; - u64 add(cpu_thread* _this) + u64 add(cpu_thread* _this, bool restore = false) noexcept { - if (!cpu_array_sema.try_inc(sizeof(cpu_counter::cpu_array_bits) * 8)) - { - return -1; - } - u64 array_slot = -1; + if (!restore && !cpu_array_sema.try_inc(sizeof(cpu_counter::cpu_array_bits) * 8)) + { + sys_log.fatal("Too many threads."); + return array_slot; + } + for (u32 i = 0;; i = (i + 1) % ::size32(cpu_array_bits)) { const auto [bits, ok] = cpu_array_bits[i].fetch_op([](u64& bits) -> u64 @@ -285,44 +292,92 @@ struct cpu_counter { // Get actual slot number array_slot = i * 64 + std::countr_one(bits); - break; + + // Register thread + if (cpu_array[array_slot].compare_and_swap_test(nullptr, _this)) [[likely]] + { + break; + } + + sys_log.fatal("Unexpected slot registration failure (%u).", array_slot); + cpu_array_bits[array_slot / 64] &= ~(1ull << (array_slot % 64)); + continue; } } - // Register and wait if necessary - verify("cpu_counter::add()" HERE), cpu_array[array_slot].exchange(_this) == nullptr; + if (!restore) + { + // First time (thread created) + _this->state += cpu_flag::wait; + cpu_suspend_lock.lock_unlock(); + } - _this->state += cpu_flag::wait; - cpu_suspend_lock.lock_unlock(); return array_slot; } - void remove(cpu_thread* _this, u64 slot) + void remove(cpu_thread* _this, u64 slot) noexcept { // Unregister and wait if necessary _this->state += cpu_flag::wait; - if (cpu_array[slot].exchange(nullptr) != _this) + + std::lock_guard lock(cpu_suspend_lock); + + if (!cpu_array[slot].compare_and_swap_test(_this, nullptr)) + { sys_log.fatal("Inconsistency for array slot %u", slot); + return; + } + cpu_array_bits[slot / 64] &= ~(1ull << (slot % 64)); cpu_array_sema--; - cpu_suspend_lock.lock_unlock(); + } + + // Remove temporarily + void remove(cpu_thread* _this) noexcept + { + // Unregister temporarily (called from check_state) + const u64 index = s_tls_thread_slot; + + if (index >= std::size(cpu_array)) + { + sys_log.fatal("Index out of bounds (%u).", index); + return; + } + + if (cpu_array[index].load() == _this && cpu_array[index].compare_and_swap_test(_this, nullptr)) + { + cpu_array_bits[index / 64] &= ~(1ull << (index % 64)); + return; + } + + sys_log.fatal("Thread not found in cpu_array (%s).", _this->get_name()); } }; -template +template void for_all_cpu(F func) noexcept { - auto ctr = g_fxo->get(); + const auto ctr = g_fxo->get(); for (u32 i = 0; i < ::size32(ctr->cpu_array_bits); i++) { - for (u64 bits = ctr->cpu_array_bits[i]; bits; bits &= bits - 1) + for (u64 bits = (UseCopy ? ctr->cpu_copy_bits[i] : ctr->cpu_array_bits[i].load()); bits; bits &= bits - 1) { const u64 index = i * 64 + std::countr_zero(bits); if (cpu_thread* cpu = ctr->cpu_array[index].load()) { - func(cpu); + if constexpr (std::is_invocable_v) + { + func(cpu, index); + continue; + } + + if constexpr (std::is_invocable_v) + { + func(cpu); + continue; + } } } } @@ -382,23 +437,20 @@ void cpu_thread::operator()() } // Register thread in g_cpu_array - const u64 array_slot = g_fxo->get()->add(this); + s_tls_thread_slot = g_fxo->get()->add(this); - if (array_slot == umax) + if (s_tls_thread_slot == umax) { - sys_log.fatal("Too many threads."); return; } static thread_local struct thread_cleanup_t { cpu_thread* _this; - u64 slot; std::string name; - thread_cleanup_t(cpu_thread* _this, u64 slot) + thread_cleanup_t(cpu_thread* _this) : _this(_this) - , slot(slot) , name(thread_ctrl::get_name()) { } @@ -415,7 +467,7 @@ void cpu_thread::operator()() ptr->compare_and_swap(_this, nullptr); } - g_fxo->get()->remove(_this, slot); + g_fxo->get()->remove(_this, s_tls_thread_slot); _this = nullptr; } @@ -428,7 +480,7 @@ void cpu_thread::operator()() cleanup(); } } - } cleanup{this, array_slot}; + } cleanup{this}; // Check thread status while (!(state & (cpu_flag::exit + cpu_flag::dbg_global_stop)) && thread_ctrl::state() != thread_state::aborting) @@ -555,7 +607,8 @@ bool cpu_thread::check_state() noexcept { return retval; } - else if (!cpu_sleep_called && state0 & cpu_flag::suspend) + + if (!cpu_sleep_called && state0 & cpu_flag::suspend) { cpu_sleep(); cpu_sleep_called = true; @@ -678,8 +731,11 @@ void cpu_thread::suspend_work::push(cpu_thread* _this) noexcept // Value must be reliable because cpu_flag::wait hasn't been observed only (but not if pause is set) const u64 susp_ctr = g_suspend_counter; + // cpu_counter object + const auto ctr = g_fxo->get(); + // Try to push workload - auto& queue = g_fxo->get()->cpu_suspend_work; + auto& queue = ctr->cpu_suspend_work; do { @@ -689,7 +745,7 @@ void cpu_thread::suspend_work::push(cpu_thread* _this) noexcept if (!_this && next) { // If _this == nullptr, it only works if this is the first workload pushed - g_fxo->get()->cpu_suspend_lock.lock_unlock(); + ctr->cpu_suspend_lock.lock_unlock(); continue; } } @@ -698,34 +754,42 @@ void cpu_thread::suspend_work::push(cpu_thread* _this) noexcept if (!next) { // First thread to push the work to the workload list pauses all threads and processes it - std::lock_guard lock(g_fxo->get()->cpu_suspend_lock); + std::lock_guard lock(ctr->cpu_suspend_lock); - for_all_cpu([&](cpu_thread* cpu) + // Copy of thread bits + decltype(ctr->cpu_copy_bits) copy2{}; + + for (u32 i = 0; i < ::size32(ctr->cpu_copy_bits); i++) { - if (!(cpu->state & cpu_flag::pause) && cpu != _this) + copy2[i] = ctr->cpu_copy_bits[i] = ctr->cpu_array_bits[i].load(); + } + + for_all_cpu([&](cpu_thread* cpu, u64 index) + { + if (cpu == _this || cpu->state.fetch_add(cpu_flag::pause) & cpu_flag::wait) { - cpu->state += cpu_flag::pause; + // Clear bits as long as wait flag is set + ctr->cpu_copy_bits[index / 64] &= ~(1ull << (index % 64)); + } + + if (cpu == _this) + { + copy2[index / 64] &= ~(1ull << (index % 64)); } }); busy_wait(500); - while (true) + while (std::accumulate(std::begin(ctr->cpu_copy_bits), std::end(ctr->cpu_copy_bits), u64{0}, std::bit_or())) { - bool ok = true; - - for_all_cpu([&](cpu_thread* cpu) + // Check only CPUs which haven't acknowledged their waiting state yet + for_all_cpu([&](cpu_thread* cpu, u64 index) { - if (!(cpu->state & cpu_flag::wait) && cpu != _this) + if (cpu->state & cpu_flag::wait) { - ok = false; + ctr->cpu_copy_bits[index / 64] &= ~(1ull << (index % 64)); } }); - - if (ok) [[likely]] - { - break; - } } // Extract queue and reverse element order (FILO to FIFO) (TODO: maybe leave order as is?) @@ -754,12 +818,12 @@ void cpu_thread::suspend_work::push(cpu_thread* _this) noexcept // Finalization g_suspend_counter++; - for_all_cpu([&](cpu_thread* cpu) + // Exact bitset for flag pause removal + std::memcpy(ctr->cpu_copy_bits, copy2, sizeof(copy2)); + + for_all_cpu([&](cpu_thread* cpu) { - if (cpu != _this) - { - cpu->state -= cpu_flag::pause; - } + cpu->state -= cpu_flag::pause; }); } else @@ -792,11 +856,14 @@ void cpu_thread::stop_all() noexcept { std::lock_guard lock(g_fxo->get()->cpu_suspend_lock); - for_all_cpu([](cpu_thread* cpu) + auto on_stop = [](u32, cpu_thread& cpu) { - cpu->state += cpu_flag::dbg_global_stop; - cpu->abort(); - }); + cpu.state += cpu_flag::dbg_global_stop; + cpu.abort(); + }; + + idm::select>(on_stop); + idm::select>(on_stop); } sys_log.notice("All CPU threads have been signaled.");