PPU/LV2: Make thread-lists scheduling atomic

This commit is contained in:
eladash 2019-04-25 17:27:50 +03:00 committed by Ivan
parent 500a4fa2fb
commit 050339bb3e
14 changed files with 166 additions and 115 deletions

View file

@ -228,7 +228,7 @@ error_code cellMsgDialogOpen2(u32 type, vm::cptr<char> msgString, vm::ptr<CellMs
Emu.CallAfter([&]()
{
dlg->Create(msgString.get_ptr());
lv2_obj::awake(ppu);
lv2_obj::awake(&ppu);
});
while (!ppu.state.test_and_reset(cpu_flag::signal))

View file

@ -615,7 +615,7 @@ void ppu_thread::cpu_task()
void ppu_thread::cpu_sleep()
{
vm::temporary_unlock(*this);
lv2_obj::awake(*this);
lv2_obj::awake(this);
}
void ppu_thread::cpu_mem()

View file

@ -1006,10 +1006,10 @@ DECLARE(lv2_obj::g_ppu);
DECLARE(lv2_obj::g_pending);
DECLARE(lv2_obj::g_waiting);
void lv2_obj::sleep_timeout(cpu_thread& thread, u64 timeout)
{
std::lock_guard lock(g_mutex);
thread_local DECLARE(lv2_obj::g_to_awake);
void lv2_obj::sleep_unlocked(cpu_thread& thread, u64 timeout)
{
const u64 start_time = get_guest_system_time();
if (auto ppu = static_cast<ppu_thread*>(thread.id_type() == 1 ? &thread : nullptr))
@ -1055,20 +1055,26 @@ void lv2_obj::sleep_timeout(cpu_thread& thread, u64 timeout)
}
}
schedule_all();
if (!g_to_awake.empty())
{
// Schedule pending entries
awake_unlocked({});
}
else
{
schedule_all();
}
}
void lv2_obj::awake(cpu_thread& cpu, u32 prio)
void lv2_obj::awake_unlocked(cpu_thread* cpu, u32 prio)
{
// Check thread type
if (cpu.id_type() != 1) return;
std::lock_guard lock(g_mutex);
AUDIT(!cpu || cpu->id_type() == 1);
if (prio < INT32_MAX)
{
// Priority set
if (static_cast<ppu_thread&>(cpu).prio.exchange(prio) == prio || !unqueue(g_ppu, &cpu))
if (static_cast<ppu_thread*>(cpu)->prio.exchange(prio) == prio || !unqueue(g_ppu, cpu))
{
return;
}
@ -1080,7 +1086,7 @@ void lv2_obj::awake(cpu_thread& cpu, u32 prio)
for (std::size_t i = 0, pos = -1; i < g_ppu.size(); i++)
{
if (g_ppu[i] == &cpu)
if (g_ppu[i] == cpu)
{
pos = i;
prio = g_ppu[i]->prio;
@ -1091,45 +1097,58 @@ void lv2_obj::awake(cpu_thread& cpu, u32 prio)
}
}
unqueue(g_ppu, &cpu);
unqueue(g_pending, &cpu);
unqueue(g_ppu, cpu);
unqueue(g_pending, cpu);
static_cast<ppu_thread&>(cpu).start_time = start_time;
static_cast<ppu_thread*>(cpu)->start_time = start_time;
}
// Emplace current thread
for (std::size_t i = 0; i <= g_ppu.size(); i++)
const auto emplace_thread = [](cpu_thread* const cpu)
{
if (i < g_ppu.size() && g_ppu[i] == &cpu)
for (auto it = g_ppu.cbegin(), end = g_ppu.cend();; it++)
{
LOG_TRACE(PPU, "sleep() - suspended (p=%zu)", g_pending.size());
break;
}
// Use priority, also preserve FIFO order
if (i == g_ppu.size() || g_ppu[i]->prio > static_cast<ppu_thread&>(cpu).prio)
{
LOG_TRACE(PPU, "awake(): %s", cpu.id);
g_ppu.insert(g_ppu.cbegin() + i, &static_cast<ppu_thread&>(cpu));
// Unregister timeout if necessary
for (auto it = g_waiting.cbegin(), end = g_waiting.cend(); it != end; it++)
if (it != end && *it == cpu)
{
if (it->second == &cpu)
{
g_waiting.erase(it);
break;
}
LOG_TRACE(PPU, "sleep() - suspended (p=%zu)", g_pending.size());
return;
}
break;
// Use priority, also preserve FIFO order
if (it == end || (*it)->prio > static_cast<ppu_thread*>(cpu)->prio)
{
g_ppu.insert(it, static_cast<ppu_thread*>(cpu));
break;
}
}
}
// Remove pending if necessary
if (!g_pending.empty() && &cpu == get_current_cpu_thread())
// Unregister timeout if necessary
for (auto it = g_waiting.cbegin(), end = g_waiting.cend(); it != end; it++)
{
if (it->second == cpu)
{
g_waiting.erase(it);
break;
}
}
LOG_TRACE(PPU, "awake(): %s", cpu->id);
};
if (cpu)
{
unqueue(g_pending, &cpu);
// Emplace current thread
emplace_thread(cpu);
}
else for (const auto _cpu : g_to_awake)
{
// Emplace threads from list
emplace_thread(_cpu);
}
// Remove pending if necessary
if (!g_pending.empty() && cpu && cpu == get_current_cpu_thread())
{
unqueue(g_pending, cpu);
}
// Suspend threads if necessary

View file

@ -104,7 +104,7 @@ error_code sys_cond_signal(ppu_thread& ppu, u32 cond_id)
if (cond.ret)
{
cond->awake(*cond.ret);
cond->awake(cond.ret);
}
return CELL_OK;
@ -145,7 +145,7 @@ error_code sys_cond_signal_all(ppu_thread& ppu, u32 cond_id)
if (cond.ret)
{
cond->awake(*cond.ret);
cond->awake(cond.ret);
}
return CELL_OK;
@ -194,7 +194,7 @@ error_code sys_cond_signal_to(ppu_thread& ppu, u32 cond_id, u32 thread_id)
if (cond.ret && cond.ret != (cpu_thread*)(2))
{
cond->awake(*cond.ret);
cond->awake(cond.ret);
}
else if (!cond.ret)
{
@ -233,22 +233,24 @@ error_code sys_cond_wait(ppu_thread& ppu, u32 cond_id, u64 timeout)
}
else
{
// Further function result
ppu.gpr[3] = CELL_OK;
std::lock_guard lock(cond->mutex->mutex);
// Register waiter
cond->sq.emplace_back(&ppu);
cond->sleep(ppu, timeout);
// Unlock the mutex
cond->mutex->lock_count = 0;
if (auto cpu = cond->mutex->reown<ppu_thread>())
{
cond->mutex->awake(*cpu);
cond->mutex->append(cpu);
}
// Further function result
ppu.gpr[3] = CELL_OK;
// Sleep current thread and schedule mutex waiter
cond->sleep(ppu, timeout);
}
while (!ppu.state.test_and_reset(cpu_flag::signal))

View file

@ -1,4 +1,4 @@
#include "stdafx.h"
#include "stdafx.h"
#include "sys_event.h"
#include "Emu/System.h"
@ -48,7 +48,7 @@ bool lv2_event_queue::send(lv2_event event)
std::tie(ppu.gpr[4], ppu.gpr[5], ppu.gpr[6], ppu.gpr[7]) = event;
awake(ppu);
awake(&ppu);
}
else
{
@ -171,14 +171,19 @@ error_code sys_event_queue_destroy(ppu_thread& ppu, u32 equeue_id, s32 mode)
{
std::lock_guard lock(queue->mutex);
for (auto cpu : queue->sq)
if (queue->type == SYS_PPU_QUEUE)
{
if (queue->type == SYS_PPU_QUEUE)
for (auto cpu : queue->sq)
{
static_cast<ppu_thread&>(*cpu).gpr[3] = CELL_ECANCELED;
queue->awake(*cpu);
queue->append(cpu);
}
else
lv2_obj::awake_all();
}
else
{
for (auto cpu : queue->sq)
{
static_cast<spu_thread&>(*cpu).ch_in_mbox.set_values(1, CELL_ECANCELED);
cpu->state += cpu_flag::signal;

View file

@ -1,4 +1,4 @@
#include "stdafx.h"
#include "stdafx.h"
#include "sys_event_flag.h"
#include "Emu/System.h"
@ -310,7 +310,7 @@ error_code sys_event_flag_set(u32 id, u64 bitptn)
if (ppu.gpr[3] == CELL_OK)
{
flag->waiters--;
flag->awake(ppu);
flag->append(cpu);
return true;
}
@ -318,6 +318,7 @@ error_code sys_event_flag_set(u32 id, u64 bitptn)
});
flag->sq.erase(tail, flag->sq.end());
lv2_obj::awake_all();
}
return CELL_OK;
@ -376,8 +377,10 @@ error_code sys_event_flag_cancel(ppu_thread& ppu, u32 id, vm::ptr<u32> num)
ppu.gpr[6] = pattern;
flag->waiters--;
flag->awake(ppu);
flag->append(thread);
}
lv2_obj::awake_all();
}
if (ppu.test_stopped())

View file

@ -175,7 +175,7 @@ error_code _sys_lwcond_signal(ppu_thread& ppu, u32 lwcond_id, u32 lwmutex_id, u3
if (mode != 1)
{
cond->awake(*cond.ret);
cond->awake(cond.ret);
}
return CELL_OK;
@ -195,8 +195,6 @@ error_code _sys_lwcond_signal_all(ppu_thread& ppu, u32 lwcond_id, u32 lwmutex_id
fmt::throw_exception("Unknown mode (%d)" HERE, mode);
}
std::basic_string<cpu_thread*> threads;
const auto cond = idm::check<lv2_obj, lv2_lwcond>(lwcond_id, [&](lv2_lwcond& cond) -> s32
{
lv2_lwmutex* mutex;
@ -234,7 +232,7 @@ error_code _sys_lwcond_signal_all(ppu_thread& ppu, u32 lwcond_id, u32 lwmutex_id
}
else
{
threads.push_back(cpu);
lv2_obj::append(cpu);
}
result++;
@ -251,9 +249,9 @@ error_code _sys_lwcond_signal_all(ppu_thread& ppu, u32 lwcond_id, u32 lwmutex_id
return CELL_ESRCH;
}
for (auto cpu : threads)
if (mode == 2)
{
cond->awake(*cpu);
lv2_obj::awake_all();
}
if (mode == 1)
@ -275,7 +273,7 @@ error_code _sys_lwcond_queue_wait(ppu_thread& ppu, u32 lwcond_id, u32 lwmutex_id
std::shared_ptr<lv2_lwmutex> mutex;
const auto cond = idm::get<lv2_obj, lv2_lwcond>(lwcond_id, [&](lv2_lwcond& cond) -> cpu_thread*
const auto cond = idm::get<lv2_obj, lv2_lwcond>(lwcond_id, [&](lv2_lwcond& cond)
{
mutex = idm::get_unlocked<lv2_obj, lv2_lwmutex>(lwmutex_id);
@ -289,18 +287,23 @@ error_code _sys_lwcond_queue_wait(ppu_thread& ppu, u32 lwcond_id, u32 lwmutex_id
// Add a waiter
cond.waiters++;
cond.sq.emplace_back(&ppu);
cond.sleep(ppu, timeout);
std::lock_guard lock2(mutex->mutex);
// Process lwmutex sleep queue
if (const auto cpu = mutex->schedule<ppu_thread>(mutex->sq, mutex->protocol))
{
return cpu;
std::lock_guard lock2(mutex->mutex);
// Process lwmutex sleep queue
if (const auto cpu = mutex->schedule<ppu_thread>(mutex->sq, mutex->protocol))
{
cond.append(cpu);
}
else
{
mutex->signaled |= 1;
}
}
mutex->signaled |= 1;
return nullptr;
// Sleep current thread and schedule lwmutex waiter
cond.sleep(ppu, timeout);
});
if (!cond || !mutex)
@ -308,11 +311,6 @@ error_code _sys_lwcond_queue_wait(ppu_thread& ppu, u32 lwcond_id, u32 lwmutex_id
return CELL_ESRCH;
}
if (cond.ret)
{
cond->awake(*cond.ret);
}
while (!ppu.state.test_and_reset(cpu_flag::signal))
{
if (ppu.is_stopped())

View file

@ -212,7 +212,7 @@ error_code _sys_lwmutex_unlock(ppu_thread& ppu, u32 lwmutex_id)
if (mutex.ret)
{
mutex->awake(*mutex.ret);
mutex->awake(mutex.ret);
}
return CELL_OK;
@ -245,7 +245,7 @@ error_code _sys_lwmutex_unlock2(ppu_thread& ppu, u32 lwmutex_id)
if (mutex.ret)
{
mutex->awake(*mutex.ret);
mutex->awake(mutex.ret);
}
return CELL_OK;

View file

@ -237,7 +237,7 @@ error_code sys_mutex_unlock(ppu_thread& ppu, u32 mutex_id)
if (auto cpu = mutex->reown<ppu_thread>())
{
mutex->awake(*cpu);
mutex->awake(cpu);
}
}
else if (mutex.ret)

View file

@ -203,9 +203,11 @@ extern void network_thread_init()
for (ppu_thread* ppu : s_to_awake)
{
network_clear_queue(*ppu);
lv2_obj::awake(*ppu);
lv2_obj::append(ppu);
}
lv2_obj::awake_all();
s_to_awake.clear();
socklist.clear();
@ -309,7 +311,7 @@ s32 sys_net_bnet_accept(ppu_thread& ppu, s32 s, vm::ptr<sys_net_sockaddr> addr,
if (native_socket != -1 || (result = get_last_error(!sock.so_nbio)))
{
lv2_obj::awake(ppu);
lv2_obj::awake(&ppu);
return true;
}
}
@ -532,7 +534,7 @@ s32 sys_net_bnet_connect(ppu_thread& ppu, s32 s, vm::ptr<sys_net_sockaddr> addr,
result = native_error ? get_last_error(false, native_error) : 0;
}
lv2_obj::awake(ppu);
lv2_obj::awake(&ppu);
return true;
}
@ -946,7 +948,7 @@ s32 sys_net_bnet_recvfrom(ppu_thread& ppu, s32 s, vm::ptr<void> buf, u32 len, s3
if (native_result >= 0 || (result = get_last_error(!sock.so_nbio && (flags & SYS_NET_MSG_DONTWAIT) == 0)))
{
lv2_obj::awake(ppu);
lv2_obj::awake(&ppu);
return true;
}
}
@ -1115,7 +1117,7 @@ s32 sys_net_bnet_sendto(ppu_thread& ppu, s32 s, vm::cptr<void> buf, u32 len, s32
if (native_result >= 0 || (result = get_last_error(!sock.so_nbio && (flags & SYS_NET_MSG_DONTWAIT) == 0)))
{
lv2_obj::awake(ppu);
lv2_obj::awake(&ppu);
return true;
}
}

View file

@ -49,7 +49,7 @@ void _sys_ppu_thread_exit(ppu_thread& ppu, u64 errorcode)
std::lock_guard lock(id_manager::g_mutex);
// Schedule joiner and unqueue
lv2_obj::awake(*idm::check_unlocked<named_thread<ppu_thread>>(jid), -2);
lv2_obj::awake(idm::check_unlocked<named_thread<ppu_thread>>(jid), -2);
}
// Unqueue
@ -63,7 +63,7 @@ void sys_ppu_thread_yield(ppu_thread& ppu)
{
sys_ppu_thread.trace("sys_ppu_thread_yield()");
lv2_obj::awake(ppu, -4);
lv2_obj::yield(ppu);
}
error_code sys_ppu_thread_join(ppu_thread& ppu, u32 thread_id, vm::ptr<u64> vptr)
@ -219,7 +219,7 @@ error_code sys_ppu_thread_set_priority(ppu_thread& ppu, u32 thread_id, s32 prio)
{
if (thread.prio != prio)
{
lv2_obj::awake(thread, prio);
lv2_obj::awake(&thread, prio);
}
});
@ -359,7 +359,7 @@ error_code sys_ppu_thread_start(ppu_thread& ppu, u32 thread_id)
const auto thread = idm::get<named_thread<ppu_thread>>(thread_id, [&](ppu_thread& thread)
{
lv2_obj::awake(thread, -2);
lv2_obj::awake(&thread, -2);
});
if (!thread)

View file

@ -250,7 +250,7 @@ error_code sys_rwlock_runlock(ppu_thread& ppu, u32 rw_lock_id)
{
rwlock->owner = cpu->id << 1 | !rwlock->wq.empty() | !rwlock->rq.empty();
rwlock->awake(*cpu);
rwlock->awake(cpu);
}
else
{
@ -348,16 +348,19 @@ error_code sys_rwlock_wlock(ppu_thread& ppu, u32 rw_lock_id, u64 timeout)
// If the last waiter quit the writer sleep queue, wake blocked readers
if (!rwlock->rq.empty() && rwlock->wq.empty() && rwlock->owner < 0)
{
verify(HERE), rwlock->owner & 1;
rwlock->owner -= s64{2} * rwlock->rq.size();
while (auto cpu = rwlock->schedule<ppu_thread>(rwlock->rq, SYS_SYNC_PRIORITY))
rwlock->owner.atomic_op([&](s64& owner)
{
rwlock->awake(*cpu);
owner -= -2 * static_cast<s64>(rwlock->rq.size()); // Add readers to value
owner &= -2; // Clear wait bit
});
// Protocol doesn't matter here since they are all enqueued anyways
while (auto cpu = rwlock->schedule<ppu_thread>(rwlock->rq, SYS_SYNC_FIFO))
{
rwlock->append(cpu);
}
rwlock->owner &= ~1;
lv2_obj::awake_all();
}
ppu.gpr[3] = CELL_ETIMEDOUT;
@ -437,18 +440,17 @@ error_code sys_rwlock_wunlock(ppu_thread& ppu, u32 rw_lock_id)
{
rwlock->owner = cpu->id << 1 | !rwlock->wq.empty() | !rwlock->rq.empty();
rwlock->awake(*cpu);
rwlock->awake(cpu);
}
else if (auto readers = rwlock->rq.size())
{
rwlock->owner = (s64{-2} * readers) | 1;
while (auto cpu = rwlock->schedule<ppu_thread>(rwlock->rq, SYS_SYNC_PRIORITY))
while (auto cpu = rwlock->schedule<ppu_thread>(rwlock->rq, SYS_SYNC_FIFO))
{
rwlock->awake(*cpu);
rwlock->append(cpu);
}
rwlock->owner &= ~1;
rwlock->owner.release(-2 * static_cast<s64>(readers));
lv2_obj::awake_all();
}
else
{

View file

@ -1,4 +1,4 @@
#include "stdafx.h"
#include "stdafx.h"
#include "sys_semaphore.h"
#include "Emu/System.h"
@ -242,8 +242,10 @@ error_code sys_semaphore_post(ppu_thread& ppu, u32 sem_id, s32 count)
// Wake threads
for (s32 i = std::min<s32>(-std::min<s32>(val, 0), count); i > 0; i--)
{
sem->awake(*verify(HERE, sem->schedule<ppu_thread>(sem->sq, sem->protocol)));
sem->append(verify(HERE, sem->schedule<ppu_thread>(sem->sq, sem->protocol)));
}
lv2_obj::awake_all();
}
return CELL_OK;

View file

@ -115,27 +115,42 @@ struct lv2_obj
return res;
}
private:
// Remove the current thread from the scheduling queue, register timeout
static void sleep_timeout(cpu_thread&, u64 timeout);
static void sleep_unlocked(cpu_thread&, u64 timeout);
static void sleep(cpu_thread& thread, u64 timeout = 0)
// Schedule the thread
static void awake_unlocked(cpu_thread*, u32 prio = -1);
public:
static void sleep(cpu_thread& cpu, const u64 timeout = 0)
{
vm::temporary_unlock(thread);
sleep_timeout(thread, timeout);
vm::temporary_unlock(cpu);
std::lock_guard{g_mutex}, sleep_unlocked(cpu, timeout);
g_to_awake.clear();
}
static inline void awake(cpu_thread* const thread, const u32 prio = -1)
{
std::lock_guard lock(g_mutex);
awake_unlocked(thread, prio);
}
static void yield(cpu_thread& thread)
{
vm::temporary_unlock(thread);
awake(thread, -4);
awake(&thread, -4);
}
// Schedule the thread
static void awake(cpu_thread&, u32 prio);
static void awake(cpu_thread& thread)
static inline void awake_all()
{
awake(thread, -1);
awake({});
g_to_awake.clear();
}
static inline void append(cpu_thread* const thread)
{
g_to_awake.emplace_back(thread);
}
static void cleanup();
@ -216,7 +231,7 @@ struct lv2_obj
}
template<bool is_usleep = false>
static bool wait_timeout(u64 usec, cpu_thread* const cpu = nullptr)
static bool wait_timeout(u64 usec, cpu_thread* const cpu = {})
{
static_assert(UINT64_MAX / cond_variable::max_timeout >= g_cfg.core.clocks_scale.max, "timeout may overflow during scaling");
@ -287,6 +302,9 @@ private:
// Scheduler mutex
static shared_mutex g_mutex;
// Pending list of threads to run
static thread_local std::vector<class cpu_thread*> g_to_awake;
// Scheduler queue for active PPU threads
static std::deque<class ppu_thread*> g_ppu;