diff --git a/rpcs3/Emu/Cell/Modules/cellMsgDialog.cpp b/rpcs3/Emu/Cell/Modules/cellMsgDialog.cpp index c28004631d..bd20b915af 100644 --- a/rpcs3/Emu/Cell/Modules/cellMsgDialog.cpp +++ b/rpcs3/Emu/Cell/Modules/cellMsgDialog.cpp @@ -228,7 +228,7 @@ error_code cellMsgDialogOpen2(u32 type, vm::cptr msgString, vm::ptrCreate(msgString.get_ptr()); - lv2_obj::awake(ppu); + lv2_obj::awake(&ppu); }); while (!ppu.state.test_and_reset(cpu_flag::signal)) diff --git a/rpcs3/Emu/Cell/PPUThread.cpp b/rpcs3/Emu/Cell/PPUThread.cpp index 31c9d621a2..1841e60eb4 100644 --- a/rpcs3/Emu/Cell/PPUThread.cpp +++ b/rpcs3/Emu/Cell/PPUThread.cpp @@ -615,7 +615,7 @@ void ppu_thread::cpu_task() void ppu_thread::cpu_sleep() { vm::temporary_unlock(*this); - lv2_obj::awake(*this); + lv2_obj::awake(this); } void ppu_thread::cpu_mem() diff --git a/rpcs3/Emu/Cell/lv2/lv2.cpp b/rpcs3/Emu/Cell/lv2/lv2.cpp index 4829104f6b..1302647245 100644 --- a/rpcs3/Emu/Cell/lv2/lv2.cpp +++ b/rpcs3/Emu/Cell/lv2/lv2.cpp @@ -1006,10 +1006,10 @@ DECLARE(lv2_obj::g_ppu); DECLARE(lv2_obj::g_pending); DECLARE(lv2_obj::g_waiting); -void lv2_obj::sleep_timeout(cpu_thread& thread, u64 timeout) -{ - std::lock_guard lock(g_mutex); +thread_local DECLARE(lv2_obj::g_to_awake); +void lv2_obj::sleep_unlocked(cpu_thread& thread, u64 timeout) +{ const u64 start_time = get_guest_system_time(); if (auto ppu = static_cast(thread.id_type() == 1 ? &thread : nullptr)) @@ -1055,20 +1055,26 @@ void lv2_obj::sleep_timeout(cpu_thread& thread, u64 timeout) } } - schedule_all(); + if (!g_to_awake.empty()) + { + // Schedule pending entries + awake_unlocked({}); + } + else + { + schedule_all(); + } } -void lv2_obj::awake(cpu_thread& cpu, u32 prio) +void lv2_obj::awake_unlocked(cpu_thread* cpu, u32 prio) { // Check thread type - if (cpu.id_type() != 1) return; - - std::lock_guard lock(g_mutex); + AUDIT(!cpu || cpu->id_type() == 1); if (prio < INT32_MAX) { // Priority set - if (static_cast(cpu).prio.exchange(prio) == prio || !unqueue(g_ppu, &cpu)) + if (static_cast(cpu)->prio.exchange(prio) == prio || !unqueue(g_ppu, cpu)) { return; } @@ -1080,7 +1086,7 @@ void lv2_obj::awake(cpu_thread& cpu, u32 prio) for (std::size_t i = 0, pos = -1; i < g_ppu.size(); i++) { - if (g_ppu[i] == &cpu) + if (g_ppu[i] == cpu) { pos = i; prio = g_ppu[i]->prio; @@ -1091,45 +1097,58 @@ void lv2_obj::awake(cpu_thread& cpu, u32 prio) } } - unqueue(g_ppu, &cpu); - unqueue(g_pending, &cpu); + unqueue(g_ppu, cpu); + unqueue(g_pending, cpu); - static_cast(cpu).start_time = start_time; + static_cast(cpu)->start_time = start_time; } - // Emplace current thread - for (std::size_t i = 0; i <= g_ppu.size(); i++) + const auto emplace_thread = [](cpu_thread* const cpu) { - if (i < g_ppu.size() && g_ppu[i] == &cpu) + for (auto it = g_ppu.cbegin(), end = g_ppu.cend();; it++) { - LOG_TRACE(PPU, "sleep() - suspended (p=%zu)", g_pending.size()); - break; - } - - // Use priority, also preserve FIFO order - if (i == g_ppu.size() || g_ppu[i]->prio > static_cast(cpu).prio) - { - LOG_TRACE(PPU, "awake(): %s", cpu.id); - g_ppu.insert(g_ppu.cbegin() + i, &static_cast(cpu)); - - // Unregister timeout if necessary - for (auto it = g_waiting.cbegin(), end = g_waiting.cend(); it != end; it++) + if (it != end && *it == cpu) { - if (it->second == &cpu) - { - g_waiting.erase(it); - break; - } + LOG_TRACE(PPU, "sleep() - suspended (p=%zu)", g_pending.size()); + return; } - break; + // Use priority, also preserve FIFO order + if (it == end || (*it)->prio > static_cast(cpu)->prio) + { + g_ppu.insert(it, static_cast(cpu)); + break; + } } - } - // Remove pending if necessary - if (!g_pending.empty() && &cpu == get_current_cpu_thread()) + // Unregister timeout if necessary + for (auto it = g_waiting.cbegin(), end = g_waiting.cend(); it != end; it++) + { + if (it->second == cpu) + { + g_waiting.erase(it); + break; + } + } + + LOG_TRACE(PPU, "awake(): %s", cpu->id); + }; + + if (cpu) { - unqueue(g_pending, &cpu); + // Emplace current thread + emplace_thread(cpu); + } + else for (const auto _cpu : g_to_awake) + { + // Emplace threads from list + emplace_thread(_cpu); + } + + // Remove pending if necessary + if (!g_pending.empty() && cpu && cpu == get_current_cpu_thread()) + { + unqueue(g_pending, cpu); } // Suspend threads if necessary diff --git a/rpcs3/Emu/Cell/lv2/sys_cond.cpp b/rpcs3/Emu/Cell/lv2/sys_cond.cpp index 6f63ffa9f6..bf5c10d705 100644 --- a/rpcs3/Emu/Cell/lv2/sys_cond.cpp +++ b/rpcs3/Emu/Cell/lv2/sys_cond.cpp @@ -104,7 +104,7 @@ error_code sys_cond_signal(ppu_thread& ppu, u32 cond_id) if (cond.ret) { - cond->awake(*cond.ret); + cond->awake(cond.ret); } return CELL_OK; @@ -145,7 +145,7 @@ error_code sys_cond_signal_all(ppu_thread& ppu, u32 cond_id) if (cond.ret) { - cond->awake(*cond.ret); + cond->awake(cond.ret); } return CELL_OK; @@ -194,7 +194,7 @@ error_code sys_cond_signal_to(ppu_thread& ppu, u32 cond_id, u32 thread_id) if (cond.ret && cond.ret != (cpu_thread*)(2)) { - cond->awake(*cond.ret); + cond->awake(cond.ret); } else if (!cond.ret) { @@ -233,22 +233,24 @@ error_code sys_cond_wait(ppu_thread& ppu, u32 cond_id, u64 timeout) } else { + // Further function result + ppu.gpr[3] = CELL_OK; + std::lock_guard lock(cond->mutex->mutex); // Register waiter cond->sq.emplace_back(&ppu); - cond->sleep(ppu, timeout); // Unlock the mutex cond->mutex->lock_count = 0; if (auto cpu = cond->mutex->reown()) { - cond->mutex->awake(*cpu); + cond->mutex->append(cpu); } - // Further function result - ppu.gpr[3] = CELL_OK; + // Sleep current thread and schedule mutex waiter + cond->sleep(ppu, timeout); } while (!ppu.state.test_and_reset(cpu_flag::signal)) diff --git a/rpcs3/Emu/Cell/lv2/sys_event.cpp b/rpcs3/Emu/Cell/lv2/sys_event.cpp index 610976b49d..cd2cb3f8ad 100644 --- a/rpcs3/Emu/Cell/lv2/sys_event.cpp +++ b/rpcs3/Emu/Cell/lv2/sys_event.cpp @@ -1,4 +1,4 @@ -#include "stdafx.h" +#include "stdafx.h" #include "sys_event.h" #include "Emu/System.h" @@ -48,7 +48,7 @@ bool lv2_event_queue::send(lv2_event event) std::tie(ppu.gpr[4], ppu.gpr[5], ppu.gpr[6], ppu.gpr[7]) = event; - awake(ppu); + awake(&ppu); } else { @@ -171,14 +171,19 @@ error_code sys_event_queue_destroy(ppu_thread& ppu, u32 equeue_id, s32 mode) { std::lock_guard lock(queue->mutex); - for (auto cpu : queue->sq) + if (queue->type == SYS_PPU_QUEUE) { - if (queue->type == SYS_PPU_QUEUE) + for (auto cpu : queue->sq) { static_cast(*cpu).gpr[3] = CELL_ECANCELED; - queue->awake(*cpu); + queue->append(cpu); } - else + + lv2_obj::awake_all(); + } + else + { + for (auto cpu : queue->sq) { static_cast(*cpu).ch_in_mbox.set_values(1, CELL_ECANCELED); cpu->state += cpu_flag::signal; diff --git a/rpcs3/Emu/Cell/lv2/sys_event_flag.cpp b/rpcs3/Emu/Cell/lv2/sys_event_flag.cpp index 55671f0a19..678dbaa588 100644 --- a/rpcs3/Emu/Cell/lv2/sys_event_flag.cpp +++ b/rpcs3/Emu/Cell/lv2/sys_event_flag.cpp @@ -1,4 +1,4 @@ -#include "stdafx.h" +#include "stdafx.h" #include "sys_event_flag.h" #include "Emu/System.h" @@ -310,7 +310,7 @@ error_code sys_event_flag_set(u32 id, u64 bitptn) if (ppu.gpr[3] == CELL_OK) { flag->waiters--; - flag->awake(ppu); + flag->append(cpu); return true; } @@ -318,6 +318,7 @@ error_code sys_event_flag_set(u32 id, u64 bitptn) }); flag->sq.erase(tail, flag->sq.end()); + lv2_obj::awake_all(); } return CELL_OK; @@ -376,8 +377,10 @@ error_code sys_event_flag_cancel(ppu_thread& ppu, u32 id, vm::ptr num) ppu.gpr[6] = pattern; flag->waiters--; - flag->awake(ppu); + flag->append(thread); } + + lv2_obj::awake_all(); } if (ppu.test_stopped()) diff --git a/rpcs3/Emu/Cell/lv2/sys_lwcond.cpp b/rpcs3/Emu/Cell/lv2/sys_lwcond.cpp index bde908e6d8..fc59274e4f 100644 --- a/rpcs3/Emu/Cell/lv2/sys_lwcond.cpp +++ b/rpcs3/Emu/Cell/lv2/sys_lwcond.cpp @@ -175,7 +175,7 @@ error_code _sys_lwcond_signal(ppu_thread& ppu, u32 lwcond_id, u32 lwmutex_id, u3 if (mode != 1) { - cond->awake(*cond.ret); + cond->awake(cond.ret); } return CELL_OK; @@ -195,8 +195,6 @@ error_code _sys_lwcond_signal_all(ppu_thread& ppu, u32 lwcond_id, u32 lwmutex_id fmt::throw_exception("Unknown mode (%d)" HERE, mode); } - std::basic_string threads; - const auto cond = idm::check(lwcond_id, [&](lv2_lwcond& cond) -> s32 { lv2_lwmutex* mutex; @@ -234,7 +232,7 @@ error_code _sys_lwcond_signal_all(ppu_thread& ppu, u32 lwcond_id, u32 lwmutex_id } else { - threads.push_back(cpu); + lv2_obj::append(cpu); } result++; @@ -251,9 +249,9 @@ error_code _sys_lwcond_signal_all(ppu_thread& ppu, u32 lwcond_id, u32 lwmutex_id return CELL_ESRCH; } - for (auto cpu : threads) + if (mode == 2) { - cond->awake(*cpu); + lv2_obj::awake_all(); } if (mode == 1) @@ -275,7 +273,7 @@ error_code _sys_lwcond_queue_wait(ppu_thread& ppu, u32 lwcond_id, u32 lwmutex_id std::shared_ptr mutex; - const auto cond = idm::get(lwcond_id, [&](lv2_lwcond& cond) -> cpu_thread* + const auto cond = idm::get(lwcond_id, [&](lv2_lwcond& cond) { mutex = idm::get_unlocked(lwmutex_id); @@ -289,18 +287,23 @@ error_code _sys_lwcond_queue_wait(ppu_thread& ppu, u32 lwcond_id, u32 lwmutex_id // Add a waiter cond.waiters++; cond.sq.emplace_back(&ppu); - cond.sleep(ppu, timeout); - std::lock_guard lock2(mutex->mutex); - - // Process lwmutex sleep queue - if (const auto cpu = mutex->schedule(mutex->sq, mutex->protocol)) { - return cpu; + std::lock_guard lock2(mutex->mutex); + + // Process lwmutex sleep queue + if (const auto cpu = mutex->schedule(mutex->sq, mutex->protocol)) + { + cond.append(cpu); + } + else + { + mutex->signaled |= 1; + } } - mutex->signaled |= 1; - return nullptr; + // Sleep current thread and schedule lwmutex waiter + cond.sleep(ppu, timeout); }); if (!cond || !mutex) @@ -308,11 +311,6 @@ error_code _sys_lwcond_queue_wait(ppu_thread& ppu, u32 lwcond_id, u32 lwmutex_id return CELL_ESRCH; } - if (cond.ret) - { - cond->awake(*cond.ret); - } - while (!ppu.state.test_and_reset(cpu_flag::signal)) { if (ppu.is_stopped()) diff --git a/rpcs3/Emu/Cell/lv2/sys_lwmutex.cpp b/rpcs3/Emu/Cell/lv2/sys_lwmutex.cpp index 21722cb7ae..d44fc9c970 100644 --- a/rpcs3/Emu/Cell/lv2/sys_lwmutex.cpp +++ b/rpcs3/Emu/Cell/lv2/sys_lwmutex.cpp @@ -212,7 +212,7 @@ error_code _sys_lwmutex_unlock(ppu_thread& ppu, u32 lwmutex_id) if (mutex.ret) { - mutex->awake(*mutex.ret); + mutex->awake(mutex.ret); } return CELL_OK; @@ -245,7 +245,7 @@ error_code _sys_lwmutex_unlock2(ppu_thread& ppu, u32 lwmutex_id) if (mutex.ret) { - mutex->awake(*mutex.ret); + mutex->awake(mutex.ret); } return CELL_OK; diff --git a/rpcs3/Emu/Cell/lv2/sys_mutex.cpp b/rpcs3/Emu/Cell/lv2/sys_mutex.cpp index 0772f695bc..ca3ed83d3c 100644 --- a/rpcs3/Emu/Cell/lv2/sys_mutex.cpp +++ b/rpcs3/Emu/Cell/lv2/sys_mutex.cpp @@ -237,7 +237,7 @@ error_code sys_mutex_unlock(ppu_thread& ppu, u32 mutex_id) if (auto cpu = mutex->reown()) { - mutex->awake(*cpu); + mutex->awake(cpu); } } else if (mutex.ret) diff --git a/rpcs3/Emu/Cell/lv2/sys_net.cpp b/rpcs3/Emu/Cell/lv2/sys_net.cpp index 2abbc6a3a8..13ef3cd972 100644 --- a/rpcs3/Emu/Cell/lv2/sys_net.cpp +++ b/rpcs3/Emu/Cell/lv2/sys_net.cpp @@ -203,9 +203,11 @@ extern void network_thread_init() for (ppu_thread* ppu : s_to_awake) { network_clear_queue(*ppu); - lv2_obj::awake(*ppu); + lv2_obj::append(ppu); } + lv2_obj::awake_all(); + s_to_awake.clear(); socklist.clear(); @@ -309,7 +311,7 @@ s32 sys_net_bnet_accept(ppu_thread& ppu, s32 s, vm::ptr addr, if (native_socket != -1 || (result = get_last_error(!sock.so_nbio))) { - lv2_obj::awake(ppu); + lv2_obj::awake(&ppu); return true; } } @@ -532,7 +534,7 @@ s32 sys_net_bnet_connect(ppu_thread& ppu, s32 s, vm::ptr addr, result = native_error ? get_last_error(false, native_error) : 0; } - lv2_obj::awake(ppu); + lv2_obj::awake(&ppu); return true; } @@ -946,7 +948,7 @@ s32 sys_net_bnet_recvfrom(ppu_thread& ppu, s32 s, vm::ptr buf, u32 len, s3 if (native_result >= 0 || (result = get_last_error(!sock.so_nbio && (flags & SYS_NET_MSG_DONTWAIT) == 0))) { - lv2_obj::awake(ppu); + lv2_obj::awake(&ppu); return true; } } @@ -1115,7 +1117,7 @@ s32 sys_net_bnet_sendto(ppu_thread& ppu, s32 s, vm::cptr buf, u32 len, s32 if (native_result >= 0 || (result = get_last_error(!sock.so_nbio && (flags & SYS_NET_MSG_DONTWAIT) == 0))) { - lv2_obj::awake(ppu); + lv2_obj::awake(&ppu); return true; } } diff --git a/rpcs3/Emu/Cell/lv2/sys_ppu_thread.cpp b/rpcs3/Emu/Cell/lv2/sys_ppu_thread.cpp index cb65b87820..d15d9a3351 100644 --- a/rpcs3/Emu/Cell/lv2/sys_ppu_thread.cpp +++ b/rpcs3/Emu/Cell/lv2/sys_ppu_thread.cpp @@ -49,7 +49,7 @@ void _sys_ppu_thread_exit(ppu_thread& ppu, u64 errorcode) std::lock_guard lock(id_manager::g_mutex); // Schedule joiner and unqueue - lv2_obj::awake(*idm::check_unlocked>(jid), -2); + lv2_obj::awake(idm::check_unlocked>(jid), -2); } // Unqueue @@ -63,7 +63,7 @@ void sys_ppu_thread_yield(ppu_thread& ppu) { sys_ppu_thread.trace("sys_ppu_thread_yield()"); - lv2_obj::awake(ppu, -4); + lv2_obj::yield(ppu); } error_code sys_ppu_thread_join(ppu_thread& ppu, u32 thread_id, vm::ptr vptr) @@ -219,7 +219,7 @@ error_code sys_ppu_thread_set_priority(ppu_thread& ppu, u32 thread_id, s32 prio) { if (thread.prio != prio) { - lv2_obj::awake(thread, prio); + lv2_obj::awake(&thread, prio); } }); @@ -359,7 +359,7 @@ error_code sys_ppu_thread_start(ppu_thread& ppu, u32 thread_id) const auto thread = idm::get>(thread_id, [&](ppu_thread& thread) { - lv2_obj::awake(thread, -2); + lv2_obj::awake(&thread, -2); }); if (!thread) diff --git a/rpcs3/Emu/Cell/lv2/sys_rwlock.cpp b/rpcs3/Emu/Cell/lv2/sys_rwlock.cpp index 05d3eca652..7b2522a024 100644 --- a/rpcs3/Emu/Cell/lv2/sys_rwlock.cpp +++ b/rpcs3/Emu/Cell/lv2/sys_rwlock.cpp @@ -250,7 +250,7 @@ error_code sys_rwlock_runlock(ppu_thread& ppu, u32 rw_lock_id) { rwlock->owner = cpu->id << 1 | !rwlock->wq.empty() | !rwlock->rq.empty(); - rwlock->awake(*cpu); + rwlock->awake(cpu); } else { @@ -348,16 +348,19 @@ error_code sys_rwlock_wlock(ppu_thread& ppu, u32 rw_lock_id, u64 timeout) // If the last waiter quit the writer sleep queue, wake blocked readers if (!rwlock->rq.empty() && rwlock->wq.empty() && rwlock->owner < 0) { - verify(HERE), rwlock->owner & 1; - - rwlock->owner -= s64{2} * rwlock->rq.size(); - - while (auto cpu = rwlock->schedule(rwlock->rq, SYS_SYNC_PRIORITY)) + rwlock->owner.atomic_op([&](s64& owner) { - rwlock->awake(*cpu); + owner -= -2 * static_cast(rwlock->rq.size()); // Add readers to value + owner &= -2; // Clear wait bit + }); + + // Protocol doesn't matter here since they are all enqueued anyways + while (auto cpu = rwlock->schedule(rwlock->rq, SYS_SYNC_FIFO)) + { + rwlock->append(cpu); } - rwlock->owner &= ~1; + lv2_obj::awake_all(); } ppu.gpr[3] = CELL_ETIMEDOUT; @@ -437,18 +440,17 @@ error_code sys_rwlock_wunlock(ppu_thread& ppu, u32 rw_lock_id) { rwlock->owner = cpu->id << 1 | !rwlock->wq.empty() | !rwlock->rq.empty(); - rwlock->awake(*cpu); + rwlock->awake(cpu); } else if (auto readers = rwlock->rq.size()) { - rwlock->owner = (s64{-2} * readers) | 1; - - while (auto cpu = rwlock->schedule(rwlock->rq, SYS_SYNC_PRIORITY)) + while (auto cpu = rwlock->schedule(rwlock->rq, SYS_SYNC_FIFO)) { - rwlock->awake(*cpu); + rwlock->append(cpu); } - rwlock->owner &= ~1; + rwlock->owner.release(-2 * static_cast(readers)); + lv2_obj::awake_all(); } else { diff --git a/rpcs3/Emu/Cell/lv2/sys_semaphore.cpp b/rpcs3/Emu/Cell/lv2/sys_semaphore.cpp index 3bfaddbc67..ae9b7a06ae 100644 --- a/rpcs3/Emu/Cell/lv2/sys_semaphore.cpp +++ b/rpcs3/Emu/Cell/lv2/sys_semaphore.cpp @@ -1,4 +1,4 @@ -#include "stdafx.h" +#include "stdafx.h" #include "sys_semaphore.h" #include "Emu/System.h" @@ -242,8 +242,10 @@ error_code sys_semaphore_post(ppu_thread& ppu, u32 sem_id, s32 count) // Wake threads for (s32 i = std::min(-std::min(val, 0), count); i > 0; i--) { - sem->awake(*verify(HERE, sem->schedule(sem->sq, sem->protocol))); + sem->append(verify(HERE, sem->schedule(sem->sq, sem->protocol))); } + + lv2_obj::awake_all(); } return CELL_OK; diff --git a/rpcs3/Emu/Cell/lv2/sys_sync.h b/rpcs3/Emu/Cell/lv2/sys_sync.h index 301edcdea5..9a68261f47 100644 --- a/rpcs3/Emu/Cell/lv2/sys_sync.h +++ b/rpcs3/Emu/Cell/lv2/sys_sync.h @@ -115,27 +115,42 @@ struct lv2_obj return res; } +private: // Remove the current thread from the scheduling queue, register timeout - static void sleep_timeout(cpu_thread&, u64 timeout); + static void sleep_unlocked(cpu_thread&, u64 timeout); - static void sleep(cpu_thread& thread, u64 timeout = 0) + // Schedule the thread + static void awake_unlocked(cpu_thread*, u32 prio = -1); + +public: + static void sleep(cpu_thread& cpu, const u64 timeout = 0) { - vm::temporary_unlock(thread); - sleep_timeout(thread, timeout); + vm::temporary_unlock(cpu); + std::lock_guard{g_mutex}, sleep_unlocked(cpu, timeout); + g_to_awake.clear(); + } + + static inline void awake(cpu_thread* const thread, const u32 prio = -1) + { + std::lock_guard lock(g_mutex); + awake_unlocked(thread, prio); } static void yield(cpu_thread& thread) { vm::temporary_unlock(thread); - awake(thread, -4); + awake(&thread, -4); } - // Schedule the thread - static void awake(cpu_thread&, u32 prio); - - static void awake(cpu_thread& thread) + static inline void awake_all() { - awake(thread, -1); + awake({}); + g_to_awake.clear(); + } + + static inline void append(cpu_thread* const thread) + { + g_to_awake.emplace_back(thread); } static void cleanup(); @@ -216,7 +231,7 @@ struct lv2_obj } template - static bool wait_timeout(u64 usec, cpu_thread* const cpu = nullptr) + static bool wait_timeout(u64 usec, cpu_thread* const cpu = {}) { static_assert(UINT64_MAX / cond_variable::max_timeout >= g_cfg.core.clocks_scale.max, "timeout may overflow during scaling"); @@ -287,6 +302,9 @@ private: // Scheduler mutex static shared_mutex g_mutex; + // Pending list of threads to run + static thread_local std::vector g_to_awake; + // Scheduler queue for active PPU threads static std::deque g_ppu;