LV2: allocation-free synchronization syscalls

* Show waiters' ID in kernel explorer.
* Remove deque dependency from sys_sync.h
This commit is contained in:
Eladash 2022-07-25 18:57:47 +03:00 committed by Ivan
parent c7fbc16357
commit 73aaff1b29
26 changed files with 547 additions and 275 deletions

View file

@ -946,6 +946,24 @@ u32* cpu_thread::get_pc2()
return nullptr;
}
cpu_thread* cpu_thread::get_next_cpu()
{
switch (id_type())
{
case 1:
{
return static_cast<ppu_thread*>(this)->next_cpu;
}
case 2:
{
return static_cast<spu_thread*>(this)->next_cpu;
}
default: break;
}
return nullptr;
}
std::shared_ptr<CPUDisAsm> make_disasm(const cpu_thread* cpu);
void cpu_thread::dump_all(std::string& ret) const

View file

@ -140,6 +140,7 @@ public:
u32 get_pc() const;
u32* get_pc2(); // Last PC before stepping for the debugger (may be null)
cpu_thread* get_next_cpu(); // Access next_cpu member if the is one
void notify();
cpu_thread& operator=(thread_state);

View file

@ -323,6 +323,10 @@ public:
std::shared_ptr<utils::serial> optional_savestate_state;
bool interrupt_thread_executing = false;
atomic_t<ppu_thread*> next_cpu{}; // LV2 sleep queues' node link
atomic_t<ppu_thread*> next_ppu{}; // LV2 PPU running queue's node link
bool ack_suspend = false;
be_t<u64>* get_stack_arg(s32 i, u64 align = alignof(u64));
void exec_task();
void fast_call(u32 addr, u64 rtoc);

View file

@ -4832,7 +4832,7 @@ bool spu_thread::stop_and_signal(u32 code)
if (queue->events.empty())
{
queue->sq.emplace_back(this);
lv2_obj::emplace(queue->sq, this);
group->run_state = SPU_THREAD_GROUP_STATUS_WAITING;
group->waiter_spu_index = index;

View file

@ -815,6 +815,8 @@ public:
const u32 option; // sys_spu_thread_initialize option
const u32 lv2_id; // The actual id that is used by syscalls
atomic_t<spu_thread*> next_cpu{}; // LV2 thread queues' node link
// Thread name
atomic_ptr<std::string> spu_tname;

View file

@ -49,6 +49,7 @@
#include "sys_crypto_engine.h"
#include <optional>
#include <deque>
extern std::string ppu_get_syscall_name(u64 code);
@ -1187,14 +1188,18 @@ std::string ppu_get_syscall_name(u64 code)
}
DECLARE(lv2_obj::g_mutex);
DECLARE(lv2_obj::g_ppu);
DECLARE(lv2_obj::g_pending);
DECLARE(lv2_obj::g_waiting);
DECLARE(lv2_obj::g_to_sleep);
DECLARE(lv2_obj::g_ppu){};
DECLARE(lv2_obj::g_pending){};
thread_local DECLARE(lv2_obj::g_to_notify){};
thread_local DECLARE(lv2_obj::g_to_awake);
// Scheduler queue for timeouts (wait until -> thread)
static std::deque<std::pair<u64, class cpu_thread*>> g_waiting;
// Threads which must call lv2_obj::sleep before the scheduler starts
static std::deque<class cpu_thread*> g_to_sleep;
namespace cpu_counter
{
void remove(cpu_thread*) noexcept;
@ -1260,7 +1265,7 @@ void lv2_obj::sleep_unlocked(cpu_thread& thread, u64 timeout, bool notify_later)
if (auto ppu = thread.try_get<ppu_thread>())
{
ppu_log.trace("sleep() - waiting (%zu)", g_pending.size());
ppu_log.trace("sleep() - waiting (%zu)", g_pending);
const auto [_, ok] = ppu->state.fetch_op([&](bs_t<cpu_flag>& val)
{
@ -1280,10 +1285,11 @@ void lv2_obj::sleep_unlocked(cpu_thread& thread, u64 timeout, bool notify_later)
}
// Find and remove the thread
if (!unqueue(g_ppu, ppu))
if (!unqueue(g_ppu, ppu, &ppu_thread::next_ppu))
{
if (unqueue(g_to_sleep, ppu))
if (auto it = std::find(g_to_sleep.begin(), g_to_sleep.end(), ppu); it != g_to_sleep.end())
{
g_to_sleep.erase(it);
ppu->start_time = start_time;
on_to_sleep_update();
}
@ -1293,15 +1299,19 @@ void lv2_obj::sleep_unlocked(cpu_thread& thread, u64 timeout, bool notify_later)
return;
}
unqueue(g_pending, ppu);
if (std::exchange(ppu->ack_suspend, false))
{
g_pending--;
}
ppu->raddr = 0; // Clear reservation
ppu->start_time = start_time;
}
else if (auto spu = thread.try_get<spu_thread>())
{
if (unqueue(g_to_sleep, spu))
if (auto it = std::find(g_to_sleep.begin(), g_to_sleep.end(), spu); it != g_to_sleep.end())
{
g_to_sleep.erase(it);
on_to_sleep_update();
}
@ -1344,7 +1354,7 @@ bool lv2_obj::awake_unlocked(cpu_thread* cpu, bool notify_later, s32 prio)
default:
{
// Priority set
if (static_cast<ppu_thread*>(cpu)->prio.exchange(prio) == prio || !unqueue(g_ppu, cpu))
if (static_cast<ppu_thread*>(cpu)->prio.exchange(prio) == prio || !unqueue(g_ppu, static_cast<ppu_thread*>(cpu), &ppu_thread::next_ppu))
{
return true;
}
@ -1353,36 +1363,66 @@ bool lv2_obj::awake_unlocked(cpu_thread* cpu, bool notify_later, s32 prio)
}
case yield_cmd:
{
usz i = 0;
// Yield command
for (usz i = 0;; i++)
for (auto ppu_next = &g_ppu;; i++)
{
if (i + 1 >= g_ppu.size())
const auto ppu = +*ppu_next;
if (!ppu)
{
return false;
}
if (const auto ppu = g_ppu[i]; ppu == cpu)
if (ppu == cpu)
{
usz j = i + 1;
auto ppu2_next = &ppu->next_ppu;
for (; j < g_ppu.size(); j++)
if (auto next = +*ppu2_next; !next || next->prio != ppu->prio)
{
if (g_ppu[j]->prio != ppu->prio)
return false;
}
for (;; i++)
{
const auto next = +*ppu2_next;
if (auto next2 = +next->next_ppu; !next2 || next2->prio != ppu->prio)
{
break;
}
ppu2_next = &next->next_ppu;
}
if (j == i + 1)
if (ppu2_next == &ppu->next_ppu)
{
// Empty 'same prio' threads list
return false;
}
// Rotate current thread to the last position of the 'same prio' threads list
std::rotate(g_ppu.begin() + i, g_ppu.begin() + i + 1, g_ppu.begin() + j);
auto ppu2 = +*ppu2_next;
if (j <= g_cfg.core.ppu_threads + 0u)
// Rotate current thread to the last position of the 'same prio' threads list
ppu_next->release(ppu2);
// Exchange forward pointers
if (ppu->next_ppu != ppu2)
{
auto ppu2_val = +ppu2->next_ppu;
ppu2->next_ppu.release(+ppu->next_ppu);
ppu->next_ppu.release(ppu2_val);
ppu2_next->release(ppu);
}
else
{
auto ppu2_val = +ppu2->next_ppu;
ppu2->next_ppu.release(ppu);
ppu->next_ppu.release(ppu2_val);
}
if (i <= g_cfg.core.ppu_threads + 0u)
{
// Threads were rotated, but no context switch was made
return false;
@ -1392,7 +1432,10 @@ bool lv2_obj::awake_unlocked(cpu_thread* cpu, bool notify_later, s32 prio)
cpu = nullptr; // Disable current thread enqueing, also enable threads list enqueing
break;
}
ppu_next = &ppu->next_ppu;
}
break;
}
case enqueue_cmd:
@ -1403,20 +1446,25 @@ bool lv2_obj::awake_unlocked(cpu_thread* cpu, bool notify_later, s32 prio)
const auto emplace_thread = [](cpu_thread* const cpu)
{
for (auto it = g_ppu.cbegin(), end = g_ppu.cend();; it++)
for (auto it = &g_ppu;;)
{
if (it != end && *it == cpu)
const auto next = +*it;
if (next == cpu)
{
ppu_log.trace("sleep() - suspended (p=%zu)", g_pending.size());
ppu_log.trace("sleep() - suspended (p=%zu)", g_pending);
return false;
}
// Use priority, also preserve FIFO order
if (it == end || (*it)->prio > static_cast<ppu_thread*>(cpu)->prio)
if (!next || next->prio > static_cast<ppu_thread*>(cpu)->prio)
{
g_ppu.insert(it, static_cast<ppu_thread*>(cpu));
it->release(static_cast<ppu_thread*>(cpu));
static_cast<ppu_thread*>(cpu)->next_ppu.release(next);
break;
}
it = &next->next_ppu;
}
// Unregister timeout if necessary
@ -1448,20 +1496,28 @@ bool lv2_obj::awake_unlocked(cpu_thread* cpu, bool notify_later, s32 prio)
}
// Remove pending if necessary
if (!g_pending.empty() && ((cpu && cpu == get_current_cpu_thread()) || prio == yield_cmd))
if (g_pending && ((cpu && cpu == get_current_cpu_thread()) || prio == yield_cmd))
{
unqueue(g_pending, get_current_cpu_thread());
if (auto cur = cpu_thread::get_current<ppu_thread>())
{
if (std::exchange(cur->ack_suspend, false))
{
g_pending--;
}
}
}
// Suspend threads if necessary
for (usz i = g_cfg.core.ppu_threads; changed_queue && i < g_ppu.size(); i++)
{
const auto target = g_ppu[i];
auto target = +g_ppu;
if (!target->state.test_and_set(cpu_flag::suspend))
// Suspend threads if necessary
for (usz i = 0, thread_count = g_cfg.core.ppu_threads; changed_queue && target; target = target->next_ppu, i++)
{
if (i >= thread_count && cpu_flag::suspend - target->state)
{
ppu_log.trace("suspend(): %s", target->id);
g_pending.emplace_back(target);
target->ack_suspend = true;
g_pending++;
ensure(!target->state.test_and_set(cpu_flag::suspend));
if (is_paused(target->state - cpu_flag::suspend))
{
@ -1476,23 +1532,23 @@ bool lv2_obj::awake_unlocked(cpu_thread* cpu, bool notify_later, s32 prio)
void lv2_obj::cleanup()
{
g_ppu.clear();
g_pending.clear();
g_waiting.clear();
g_ppu = nullptr;
g_to_sleep.clear();
g_waiting.clear();
g_pending = 0;
}
void lv2_obj::schedule_all(bool notify_later)
{
if (g_pending.empty() && g_to_sleep.empty())
if (!g_pending && g_to_sleep.empty())
{
usz notify_later_idx = notify_later ? 0 : std::size(g_to_notify) - 1;
// Wake up threads
for (usz i = 0, x = std::min<usz>(g_cfg.core.ppu_threads, g_ppu.size()); i < x; i++)
{
const auto target = g_ppu[i];
auto target = +g_ppu;
// Wake up threads
for (usz x = g_cfg.core.ppu_threads; target && x; target = target->next_ppu, x--)
{
if (target->state & cpu_flag::suspend)
{
ppu_log.trace("schedule(): %s", target->id);
@ -1557,9 +1613,19 @@ ppu_thread_status lv2_obj::ppu_state(ppu_thread* ppu, bool lock_idm, bool lock_l
opt_lock[1].emplace(lv2_obj::g_mutex);
}
const auto it = std::find(g_ppu.begin(), g_ppu.end(), ppu);
usz pos = umax;
usz i = 0;
if (it == g_ppu.end())
for (auto target = +g_ppu; target; target = target->next_ppu, i++)
{
if (target == ppu)
{
pos = i;
break;
}
}
if (pos == umax)
{
if (!ppu->interrupt_thread_executing)
{
@ -1569,7 +1635,7 @@ ppu_thread_status lv2_obj::ppu_state(ppu_thread* ppu, bool lock_idm, bool lock_l
return PPU_THREAD_STATUS_SLEEP;
}
if (it - g_ppu.begin() >= g_cfg.core.ppu_threads)
if (pos >= g_cfg.core.ppu_threads + 0u)
{
return PPU_THREAD_STATUS_RUNNABLE;
}

View file

@ -112,7 +112,7 @@ error_code sys_cond_destroy(ppu_thread& ppu, u32 cond_id)
{
std::lock_guard lock(cond.mutex->mutex);
if (cond.waiters)
if (cond.sq)
{
return CELL_EBUSY;
}
@ -143,7 +143,7 @@ error_code sys_cond_signal(ppu_thread& ppu, u32 cond_id)
const auto cond = idm::check<lv2_obj, lv2_cond>(cond_id, [&](lv2_cond& cond)
{
if (cond.waiters)
if (cond.sq)
{
lv2_obj::notify_all_t notify;
@ -158,7 +158,6 @@ error_code sys_cond_signal(ppu_thread& ppu, u32 cond_id)
}
// TODO: Is EBUSY returned after reqeueing, on sys_cond_destroy?
cond.waiters--;
if (cond.mutex->try_own(*cpu, cpu->id))
{
@ -184,13 +183,15 @@ error_code sys_cond_signal_all(ppu_thread& ppu, u32 cond_id)
const auto cond = idm::check<lv2_obj, lv2_cond>(cond_id, [&](lv2_cond& cond)
{
if (cond.waiters)
if (cond.sq)
{
lv2_obj::notify_all_t notify;
std::lock_guard lock(cond.mutex->mutex);
for (auto cpu : cond.sq)
for (auto cpu = +cond.sq; cpu; cpu = cpu->next_cpu)
{
if (static_cast<ppu_thread*>(cpu)->state & cpu_flag::again)
if (cpu->state & cpu_flag::again)
{
ppu.state += cpu_flag::again;
return;
@ -198,9 +199,10 @@ error_code sys_cond_signal_all(ppu_thread& ppu, u32 cond_id)
}
cpu_thread* result = nullptr;
cond.waiters -= ::size32(cond.sq);
decltype(cond.sq) sq{+cond.sq};
cond.sq.release(nullptr);
while (const auto cpu = cond.schedule<ppu_thread>(cond.sq, SYS_SYNC_PRIORITY))
while (const auto cpu = cond.schedule<ppu_thread>(sq, SYS_SYNC_PRIORITY))
{
if (cond.mutex->try_own(*cpu, cpu->id))
{
@ -210,7 +212,7 @@ error_code sys_cond_signal_all(ppu_thread& ppu, u32 cond_id)
if (result)
{
lv2_obj::awake(result);
cond.awake(result, true);
}
}
});
@ -236,13 +238,13 @@ error_code sys_cond_signal_to(ppu_thread& ppu, u32 cond_id, u32 thread_id)
return -1;
}
if (cond.waiters)
if (cond.sq)
{
lv2_obj::notify_all_t notify;
std::lock_guard lock(cond.mutex->mutex);
for (auto cpu : cond.sq)
for (auto cpu = +cond.sq; cpu; cpu = cpu->next_cpu)
{
if (cpu->id == thread_id)
{
@ -254,8 +256,6 @@ error_code sys_cond_signal_to(ppu_thread& ppu, u32 cond_id, u32 thread_id)
ensure(cond.unqueue(cond.sq, cpu));
cond.waiters--;
if (cond.mutex->try_own(*cpu, cpu->id))
{
cond.awake(cpu, true);
@ -315,9 +315,8 @@ error_code sys_cond_wait(ppu_thread& ppu, u32 cond_id, u64 timeout)
else
{
// Register waiter
cond.sq.emplace_back(&ppu);
cond.waiters++;
}
lv2_obj::emplace(cond.sq, &ppu);
}
if (ppu.loaded_from_savestate)
{
@ -361,8 +360,26 @@ error_code sys_cond_wait(ppu_thread& ppu, u32 cond_id, u64 timeout)
{
std::lock_guard lock(cond->mutex->mutex);
const bool cond_sleep = std::find(cond->sq.begin(), cond->sq.end(), &ppu) != cond->sq.end();
const bool mutex_sleep = std::find(cond->mutex->sq.begin(), cond->mutex->sq.end(), &ppu) != cond->mutex->sq.end();
bool mutex_sleep = false;
bool cond_sleep = false;
for (auto cpu = +cond->sq; cpu; cpu = cpu->next_cpu)
{
if (cpu == &ppu)
{
cond_sleep = true;
break;
}
}
for (auto cpu = +cond->mutex->sq; cpu; cpu = cpu->next_cpu)
{
if (cpu == &ppu)
{
mutex_sleep = true;
break;
}
}
if (!cond_sleep && !mutex_sleep)
{
@ -402,8 +419,6 @@ error_code sys_cond_wait(ppu_thread& ppu, u32 cond_id, u64 timeout)
if (cond->unqueue(cond->sq, &ppu))
{
// TODO: Is EBUSY returned after reqeueing, on sys_cond_destroy?
cond->waiters--;
ppu.gpr[3] = CELL_ETIMEDOUT;
// Own or requeue

View file

@ -27,8 +27,7 @@ struct lv2_cond final : lv2_obj
const u32 mtx_id;
std::shared_ptr<lv2_mutex> mutex; // Associated Mutex
atomic_t<u32> waiters{0};
std::deque<cpu_thread*> sq;
atomic_t<ppu_thread*> sq{};
lv2_cond(u64 key, u64 name, u32 mtx_id, std::shared_ptr<lv2_mutex> mutex)
: key(key)

View file

@ -120,7 +120,7 @@ CellError lv2_event_queue::send(lv2_event event, bool notify_later)
return CELL_ENOTCONN;
}
if (sq.empty())
if (!pq && !sq)
{
if (events.size() < this->size + 0u)
{
@ -135,7 +135,7 @@ CellError lv2_event_queue::send(lv2_event event, bool notify_later)
if (type == SYS_PPU_QUEUE)
{
// Store event in registers
auto& ppu = static_cast<ppu_thread&>(*schedule<ppu_thread>(sq, protocol));
auto& ppu = static_cast<ppu_thread&>(*schedule<ppu_thread>(pq, protocol));
if (ppu.state & cpu_flag::again)
{
@ -241,11 +241,15 @@ error_code sys_event_queue_destroy(ppu_thread& ppu, u32 equeue_id, s32 mode)
std::unique_lock<shared_mutex> qlock;
cpu_thread* head{};
const auto queue = idm::withdraw<lv2_obj, lv2_event_queue>(equeue_id, [&](lv2_event_queue& queue) -> CellError
{
qlock = std::unique_lock{queue.mutex};
if (!mode && !queue.sq.empty())
head = queue.type == SYS_PPU_QUEUE ? static_cast<cpu_thread*>(+queue.pq) : +queue.sq;
if (!mode && head)
{
return CELL_EBUSY;
}
@ -258,13 +262,13 @@ error_code sys_event_queue_destroy(ppu_thread& ppu, u32 equeue_id, s32 mode)
lv2_obj::on_id_destroy(queue, queue.key);
if (queue.sq.empty())
if (!head)
{
qlock.unlock();
}
else
{
for (auto cpu : queue.sq)
for (auto cpu = head; cpu; cpu = cpu->get_next_cpu())
{
if (cpu->state & cpu_flag::again)
{
@ -296,15 +300,18 @@ error_code sys_event_queue_destroy(ppu_thread& ppu, u32 equeue_id, s32 mode)
if (qlock.owns_lock())
{
std::deque<cpu_thread*> sq;
sq = std::move(queue->sq);
if (sys_event.warning)
{
fmt::append(lost_data, "Forcefully awaken waiters (%u):\n", sq.size());
u32 size = 0;
for (auto cpu : sq)
for (auto cpu = head; cpu; cpu = cpu->get_next_cpu())
{
size++;
}
fmt::append(lost_data, "Forcefully awaken waiters (%u):\n", size);
for (auto cpu = head; cpu; cpu = cpu->get_next_cpu())
{
lost_data += cpu->get_name();
lost_data += '\n';
@ -313,21 +320,24 @@ error_code sys_event_queue_destroy(ppu_thread& ppu, u32 equeue_id, s32 mode)
if (queue->type == SYS_PPU_QUEUE)
{
for (auto cpu : sq)
for (auto cpu = +queue->pq; cpu; cpu = cpu->next_cpu)
{
static_cast<ppu_thread&>(*cpu).gpr[3] = CELL_ECANCELED;
cpu->gpr[3] = CELL_ECANCELED;
queue->append(cpu);
}
atomic_storage<ppu_thread*>::release(queue->pq, nullptr);
lv2_obj::awake_all();
}
else
{
for (auto cpu : sq)
for (auto cpu = +queue->sq; cpu; cpu = cpu->next_cpu)
{
static_cast<spu_thread&>(*cpu).ch_in_mbox.set_values(1, CELL_ECANCELED);
resume_spu_thread_group_from_waiting(static_cast<spu_thread&>(*cpu));
cpu->ch_in_mbox.set_values(1, CELL_ECANCELED);
resume_spu_thread_group_from_waiting(*cpu);
}
atomic_storage<spu_thread*>::release(queue->sq, nullptr);
}
qlock.unlock();
@ -382,7 +392,7 @@ error_code sys_event_queue_tryreceive(ppu_thread& ppu, u32 equeue_id, vm::ptr<sy
s32 count = 0;
while (queue->sq.empty() && count < size && !queue->events.empty())
while (count < size && !queue->events.empty())
{
auto& dest = event_array[count++];
const auto event = queue->events.front();
@ -425,8 +435,8 @@ error_code sys_event_queue_receive(ppu_thread& ppu, u32 equeue_id, vm::ptr<sys_e
if (queue.events.empty())
{
queue.sq.emplace_back(&ppu);
queue.sleep(ppu, timeout, true);
lv2_obj::emplace(queue.pq, &ppu);
return CELL_EBUSY;
}
@ -464,13 +474,16 @@ error_code sys_event_queue_receive(ppu_thread& ppu, u32 equeue_id, vm::ptr<sys_e
{
std::lock_guard lock_rsx(queue->mutex);
if (std::find(queue->sq.begin(), queue->sq.end(), &ppu) == queue->sq.end())
for (auto cpu = +queue->pq; cpu; cpu = cpu->next_cpu)
{
break;
if (cpu == &ppu)
{
ppu.state += cpu_flag::again;
return {};
}
}
ppu.state += cpu_flag::again;
return {};
break;
}
for (usz i = 0; cpu_flag::signal - ppu.state && i < 50; i++)
@ -495,7 +508,7 @@ error_code sys_event_queue_receive(ppu_thread& ppu, u32 equeue_id, vm::ptr<sys_e
std::lock_guard lock(queue->mutex);
if (!queue->unqueue(queue->sq, &ppu))
if (!queue->unqueue(queue->pq, &ppu))
{
break;
}

View file

@ -4,7 +4,10 @@
#include "Emu/Memory/vm_ptr.h"
#include <deque>
class cpu_thread;
class spu_thrread;
// Event Queue Type
enum : u32
@ -89,7 +92,8 @@ struct lv2_event_queue final : public lv2_obj
shared_mutex mutex;
std::deque<lv2_event> events;
std::deque<cpu_thread*> sq;
atomic_t<spu_thread*> sq{};
atomic_t<ppu_thread*> pq{};
lv2_event_queue(u32 protocol, s32 type, s32 size, u64 name, u64 ipc_key) noexcept;

View file

@ -89,7 +89,7 @@ error_code sys_event_flag_destroy(ppu_thread& ppu, u32 id)
const auto flag = idm::withdraw<lv2_obj, lv2_event_flag>(id, [&](lv2_event_flag& flag) -> CellError
{
if (flag.waiters)
if (flag.sq)
{
return CELL_EBUSY;
}
@ -164,14 +164,13 @@ error_code sys_event_flag_wait(ppu_thread& ppu, u32 id, u64 bitptn, u32 mode, vm
return {};
}
if (flag.type == SYS_SYNC_WAITER_SINGLE && !flag.sq.empty())
if (flag.type == SYS_SYNC_WAITER_SINGLE && flag.sq)
{
return CELL_EPERM;
}
flag.waiters++;
flag.sq.emplace_back(&ppu);
flag.sleep(ppu, timeout, true);
lv2_obj::emplace(flag.sq, &ppu);
return CELL_EBUSY;
});
@ -204,13 +203,16 @@ error_code sys_event_flag_wait(ppu_thread& ppu, u32 id, u64 bitptn, u32 mode, vm
{
std::lock_guard lock(flag->mutex);
if (std::find(flag->sq.begin(), flag->sq.end(), &ppu) == flag->sq.end())
for (auto cpu = +flag->sq; cpu; cpu = cpu->next_cpu)
{
break;
if (cpu == &ppu)
{
ppu.state += cpu_flag::again;
return {};
}
}
ppu.state += cpu_flag::again;
return {};
break;
}
for (usz i = 0; cpu_flag::signal - ppu.state && i < 50; i++)
@ -240,7 +242,6 @@ error_code sys_event_flag_wait(ppu_thread& ppu, u32 id, u64 bitptn, u32 mode, vm
break;
}
flag->waiters--;
ppu.gpr[3] = CELL_ETIMEDOUT;
ppu.gpr[6] = flag->pattern;
break;
@ -317,7 +318,7 @@ error_code sys_event_flag_set(cpu_thread& cpu, u32 id, u64 bitptn)
{
std::lock_guard lock(flag->mutex);
for (auto ppu : flag->sq)
for (auto ppu = +flag->sq; ppu; ppu = ppu->next_cpu)
{
if (ppu->state & cpu_flag::again)
{
@ -328,24 +329,55 @@ error_code sys_event_flag_set(cpu_thread& cpu, u32 id, u64 bitptn)
}
}
// Sort sleep queue in required order
if (flag->protocol != SYS_SYNC_FIFO)
{
std::stable_sort(flag->sq.begin(), flag->sq.end(), [](cpu_thread* a, cpu_thread* b)
{
return static_cast<ppu_thread*>(a)->prio < static_cast<ppu_thread*>(b)->prio;
});
}
// Process all waiters in single atomic op
const u32 count = flag->pattern.atomic_op([&](u64& value)
{
value |= bitptn;
u32 count = 0;
for (auto cpu : flag->sq)
if (!flag->sq)
{
auto& ppu = static_cast<ppu_thread&>(*cpu);
return count;
}
for (auto ppu = +flag->sq; ppu; ppu = ppu->next_cpu)
{
ppu->gpr[7] = 0;
}
auto first = +flag->sq;
auto get_next = [&]() -> ppu_thread*
{
if (flag->protocol != SYS_SYNC_PRIORITY)
{
return std::exchange(first, first ? +first->next_cpu : nullptr);
}
s32 prio = smax;
ppu_thread* it{};
for (auto ppu = first; ppu; ppu = ppu->next_cpu)
{
if (!ppu->gpr[7] && ppu->prio < prio)
{
it = ppu;
prio = ppu->prio;
}
}
if (it)
{
// Mark it so it won't reappear
it->gpr[7] = 1;
}
return it;
};
while (auto it = get_next())
{
auto& ppu = *it;
const u64 pattern = ppu.gpr[4];
const u64 mode = ppu.gpr[5];
@ -370,25 +402,22 @@ error_code sys_event_flag_set(cpu_thread& cpu, u32 id, u64 bitptn)
}
// Remove waiters
const auto tail = std::remove_if(flag->sq.begin(), flag->sq.end(), [&](cpu_thread* cpu)
for (auto next_cpu = &flag->sq; *next_cpu;)
{
auto& ppu = static_cast<ppu_thread&>(*cpu);
auto& ppu = *+*next_cpu;
if (ppu.gpr[3] == CELL_OK)
{
flag->waiters--;
flag->append(cpu);
return true;
next_cpu->release(+ppu.next_cpu);
ppu.next_cpu.release(nullptr);
flag->append(&ppu);
continue;
}
return false;
});
next_cpu = &ppu.next_cpu;
};
if (tail != flag->sq.end())
{
flag->sq.erase(tail, flag->sq.end());
lv2_obj::awake_all();
}
lv2_obj::awake_all();
}
return CELL_OK;
@ -432,7 +461,7 @@ error_code sys_event_flag_cancel(ppu_thread& ppu, u32 id, vm::ptr<u32> num)
{
std::lock_guard lock(flag->mutex);
for (auto cpu : flag->sq)
for (auto cpu = +flag->sq; cpu; cpu = cpu->next_cpu)
{
if (cpu->state & cpu_flag::again)
{
@ -444,21 +473,18 @@ error_code sys_event_flag_cancel(ppu_thread& ppu, u32 id, vm::ptr<u32> num)
// Get current pattern
const u64 pattern = flag->pattern;
// Set count
value = ::size32(flag->sq);
// Signal all threads to return CELL_ECANCELED (protocol does not matter)
for (auto thread : ::as_rvalue(std::move(flag->sq)))
for (auto ppu = +flag->sq; ppu; ppu = ppu->next_cpu)
{
auto& ppu = static_cast<ppu_thread&>(*thread);
ppu->gpr[3] = CELL_ECANCELED;
ppu->gpr[6] = pattern;
ppu.gpr[3] = CELL_ECANCELED;
ppu.gpr[6] = pattern;
flag->waiters--;
flag->append(thread);
value++;
flag->append(ppu);
}
flag->sq.release(nullptr);
if (value)
{
lv2_obj::awake_all();

View file

@ -41,9 +41,8 @@ struct lv2_event_flag final : lv2_obj
const u64 name;
shared_mutex mutex;
atomic_t<u32> waiters{0};
atomic_t<u64> pattern;
std::deque<cpu_thread*> sq;
atomic_t<ppu_thread*> sq{};
lv2_event_flag(u32 protocol, u64 key, s32 type, u64 name, u64 pattern) noexcept
: protocol{static_cast<u8>(protocol)}

View file

@ -65,7 +65,7 @@ error_code _sys_lwcond_destroy(ppu_thread& ppu, u32 lwcond_id)
const auto cond = idm::withdraw<lv2_obj, lv2_lwcond>(lwcond_id, [&](lv2_lwcond& cond) -> CellError
{
if (cond.waiters)
if (cond.sq)
{
return CELL_EBUSY;
}
@ -127,7 +127,7 @@ error_code _sys_lwcond_signal(ppu_thread& ppu, u32 lwcond_id, u32 lwmutex_id, u6
}
}
if (cond.waiters)
if (cond.sq)
{
lv2_obj::notify_all_t notify;
@ -153,8 +153,6 @@ error_code _sys_lwcond_signal(ppu_thread& ppu, u32 lwcond_id, u32 lwmutex_id, u6
return 0;
}
cond.waiters--;
if (mode == 2)
{
static_cast<ppu_thread*>(result)->gpr[3] = CELL_EBUSY;
@ -165,11 +163,11 @@ error_code _sys_lwcond_signal(ppu_thread& ppu, u32 lwcond_id, u32 lwmutex_id, u6
ensure(!mutex->signaled);
std::lock_guard lock(mutex->mutex);
if (mode == 3 && !mutex->sq.empty()) [[unlikely]]
if (mode == 3 && mutex->sq) [[unlikely]]
{
// Respect ordering of the sleep queue
mutex->sq.emplace_back(result);
auto result2 = mutex->schedule<ppu_thread>(mutex->sq, mutex->protocol);
lv2_obj::emplace(mutex->sq, result);
result = mutex->schedule<ppu_thread>(mutex->sq, mutex->protocol);
if (static_cast<ppu_thread*>(result2)->state & cpu_flag::again)
{
@ -241,8 +239,6 @@ error_code _sys_lwcond_signal_all(ppu_thread& ppu, u32 lwcond_id, u32 lwmutex_id
fmt::throw_exception("Unknown mode (%d)", mode);
}
bool need_awake = false;
const auto cond = idm::check<lv2_obj, lv2_lwcond>(lwcond_id, [&](lv2_lwcond& cond) -> s32
{
lv2_lwmutex* mutex{};
@ -257,7 +253,7 @@ error_code _sys_lwcond_signal_all(ppu_thread& ppu, u32 lwcond_id, u32 lwmutex_id
}
}
if (cond.waiters)
if (cond.sq)
{
lv2_obj::notify_all_t notify;
@ -265,18 +261,19 @@ error_code _sys_lwcond_signal_all(ppu_thread& ppu, u32 lwcond_id, u32 lwmutex_id
u32 result = 0;
for (auto cpu : cond.sq)
for (auto cpu = +cond.sq; cpu; cpu = cpu->next_cpu)
{
if (static_cast<ppu_thread*>(cpu)->state & cpu_flag::again)
if (cpu->state & cpu_flag::again)
{
ppu.state += cpu_flag::again;
return 0;
}
}
cond.waiters = 0;
while (const auto cpu = cond.schedule<ppu_thread>(cond.sq, cond.protocol))
decltype(cond.sq) sq{+cond.sq};
cond.sq.release(nullptr);
while (const auto cpu = cond.schedule<ppu_thread>(sq, cond.protocol))
{
if (mode == 2)
{
@ -292,13 +289,12 @@ error_code _sys_lwcond_signal_all(ppu_thread& ppu, u32 lwcond_id, u32 lwmutex_id
else
{
lv2_obj::append(cpu);
need_awake = true;
}
result++;
}
if (need_awake)
if (result && mode == 2)
{
lv2_obj::awake_all(true);
}
@ -358,13 +354,12 @@ error_code _sys_lwcond_queue_wait(ppu_thread& ppu, u32 lwcond_id, u32 lwmutex_id
{
// Special: loading state from the point of waiting on lwmutex sleep queue
std::lock_guard lock2(mutex->mutex);
mutex->sq.emplace_back(&ppu);
lv2_obj::emplace(mutex->sq, &ppu);
}
else
{
// Add a waiter
cond.waiters++;
cond.sq.emplace_back(&ppu);
lv2_obj::emplace(cond.sq, &ppu);
}
if (!ppu.loaded_from_savestate)
@ -414,8 +409,26 @@ error_code _sys_lwcond_queue_wait(ppu_thread& ppu, u32 lwcond_id, u32 lwmutex_id
reader_lock lock(cond->mutex);
reader_lock lock2(mutex->mutex);
const bool cond_sleep = std::find(cond->sq.begin(), cond->sq.end(), &ppu) != cond->sq.end();
const bool mutex_sleep = std::find(mutex->sq.begin(), mutex->sq.end(), &ppu) != mutex->sq.end();
bool mutex_sleep = false;
bool cond_sleep = false;
for (auto cpu = +mutex->sq; cpu; cpu = cpu->next_cpu)
{
if (cpu == &ppu)
{
mutex_sleep = true;
break;
}
}
for (auto cpu = +mutex->sq; cpu; cpu = cpu->next_cpu)
{
if (cpu == &ppu)
{
cond_sleep = true;
break;
}
}
if (!cond_sleep && !mutex_sleep)
{
@ -451,14 +464,24 @@ error_code _sys_lwcond_queue_wait(ppu_thread& ppu, u32 lwcond_id, u32 lwmutex_id
if (cond->unqueue(cond->sq, &ppu))
{
cond->waiters--;
ppu.gpr[3] = CELL_ETIMEDOUT;
break;
}
reader_lock lock2(mutex->mutex);
if (std::find(mutex->sq.cbegin(), mutex->sq.cend(), &ppu) == mutex->sq.cend())
bool mutex_sleep = false;
for (auto cpu = +mutex->sq; cpu; cpu = cpu->next_cpu)
{
if (cpu == &ppu)
{
mutex_sleep = true;
break;
}
}
if (!mutex_sleep)
{
break;
}

View file

@ -31,8 +31,7 @@ struct lv2_lwcond final : lv2_obj
vm::ptr<sys_lwcond_t> control;
shared_mutex mutex;
atomic_t<u32> waiters{0};
std::deque<cpu_thread*> sq;
atomic_t<ppu_thread*> sq{};
lv2_lwcond(u64 name, u32 lwid, u32 protocol, vm::ptr<sys_lwcond_t> control) noexcept
: name(std::bit_cast<be_t<u64>>(name))

View file

@ -72,7 +72,7 @@ error_code _sys_lwmutex_destroy(ppu_thread& ppu, u32 lwmutex_id)
std::lock_guard lock(mutex.mutex);
if (!mutex.sq.empty())
if (mutex.sq)
{
return CELL_EBUSY;
}
@ -171,8 +171,8 @@ error_code _sys_lwmutex_lock(ppu_thread& ppu, u32 lwmutex_id, u64 timeout)
return true;
}
mutex.add_waiter(&ppu);
mutex.sleep(ppu, timeout, true);
mutex.add_waiter(&ppu);
return false;
});
@ -197,13 +197,16 @@ error_code _sys_lwmutex_lock(ppu_thread& ppu, u32 lwmutex_id, u64 timeout)
{
std::lock_guard lock(mutex->mutex);
if (std::find(mutex->sq.begin(), mutex->sq.end(), &ppu) == mutex->sq.end())
for (auto cpu = +mutex->sq; cpu; cpu = cpu->next_cpu)
{
break;
if (cpu == &ppu)
{
ppu.state += cpu_flag::again;
return {};
}
}
ppu.state += cpu_flag::again;
return {};
break;
}
for (usz i = 0; cpu_flag::signal - ppu.state && i < 50; i++)

View file

@ -61,7 +61,7 @@ struct lv2_lwmutex final : lv2_obj
shared_mutex mutex;
atomic_t<s32> signaled{0};
std::deque<cpu_thread*> sq;
atomic_t<ppu_thread*> sq{};
atomic_t<s32> lwcond_waiters{0};
lv2_lwmutex(u32 protocol, vm::ptr<sys_lwmutex_t> control, u64 name) noexcept
@ -75,7 +75,8 @@ struct lv2_lwmutex final : lv2_obj
void save(utils::serial& ar);
// Add a waiter
void add_waiter(cpu_thread* cpu)
template <typename T>
void add_waiter(T* cpu)
{
const bool notify = lwcond_waiters.fetch_op([](s32& val)
{
@ -91,7 +92,7 @@ struct lv2_lwmutex final : lv2_obj
return true;
}).second;
sq.emplace_back(cpu);
lv2_obj::emplace(sq, cpu);
if (notify)
{

View file

@ -188,13 +188,16 @@ error_code sys_mutex_lock(ppu_thread& ppu, u32 mutex_id, u64 timeout)
{
std::lock_guard lock(mutex->mutex);
if (std::find(mutex->sq.begin(), mutex->sq.end(), &ppu) == mutex->sq.end())
for (auto cpu = +mutex->sq; cpu; cpu = cpu->next_cpu)
{
break;
if (cpu == &ppu)
{
ppu.state += cpu_flag::again;
return {};
}
}
ppu.state += cpu_flag::again;
return {};
break;
}
for (usz i = 0; cpu_flag::signal - ppu.state && i < 50; i++)

View file

@ -35,7 +35,7 @@ struct lv2_mutex final : lv2_obj
shared_mutex mutex;
atomic_t<u32> owner{0};
atomic_t<u32> lock_count{0}; // Recursive Locks
std::deque<cpu_thread*> sq;
atomic_t<ppu_thread*> sq{};
lv2_mutex(u32 protocol, u32 recursive,u32 adaptive, u64 key, u64 name) noexcept
: protocol{static_cast<u8>(protocol)}
@ -82,7 +82,8 @@ struct lv2_mutex final : lv2_obj
return CELL_EBUSY;
}
bool try_own(cpu_thread& cpu, u32 id)
template <typename T>
bool try_own(T& cpu, u32 id)
{
if (owner.fetch_op([&](u32& val)
{
@ -96,7 +97,7 @@ struct lv2_mutex final : lv2_obj
}
}))
{
sq.emplace_back(&cpu);
lv2_obj::emplace(sq, &cpu);
return false;
}
@ -139,7 +140,7 @@ struct lv2_mutex final : lv2_obj
return static_cast<T*>(cpu);
}
owner = cpu->id << 1 | !sq.empty();
owner = cpu->id << 1 | !!sq;
return static_cast<T*>(cpu);
}
else

View file

@ -132,8 +132,8 @@ error_code sys_rwlock_rlock(ppu_thread& ppu, u32 rw_lock_id, u64 timeout)
if (_old > 0 || _old & 1)
{
rwlock.rq.emplace_back(&ppu);
rwlock.sleep(ppu, timeout, true);
lv2_obj::emplace(rwlock.rq, &ppu);
return false;
}
@ -163,12 +163,15 @@ error_code sys_rwlock_rlock(ppu_thread& ppu, u32 rw_lock_id, u64 timeout)
{
std::lock_guard lock(rwlock->mutex);
if (std::find(rwlock->rq.begin(), rwlock->rq.end(), &ppu) == rwlock->rq.end())
for (auto cpu = +rwlock->rq; cpu; cpu = cpu->next_cpu)
{
break;
if (cpu == &ppu)
{
ppu.state += cpu_flag::again;
return {};
}
}
ppu.state += cpu_flag::again;
break;
}
@ -305,7 +308,7 @@ error_code sys_rwlock_runlock(ppu_thread& ppu, u32 rw_lock_id)
return {};
}
rwlock->owner = cpu->id << 1 | !rwlock->wq.empty() | !rwlock->rq.empty();
rwlock->owner = cpu->id << 1 | !!rwlock->wq | !!rwlock->rq;
rwlock->awake(cpu);
}
@ -313,7 +316,7 @@ error_code sys_rwlock_runlock(ppu_thread& ppu, u32 rw_lock_id)
{
rwlock->owner = 0;
ensure(rwlock->rq.empty());
ensure(!rwlock->rq);
}
}
}
@ -361,8 +364,8 @@ error_code sys_rwlock_wlock(ppu_thread& ppu, u32 rw_lock_id, u64 timeout)
if (_old != 0)
{
rwlock.wq.emplace_back(&ppu);
rwlock.sleep(ppu, timeout, true);
lv2_obj::emplace(rwlock.wq, &ppu);
}
return _old;
@ -396,12 +399,15 @@ error_code sys_rwlock_wlock(ppu_thread& ppu, u32 rw_lock_id, u64 timeout)
{
std::lock_guard lock(rwlock->mutex);
if (std::find(rwlock->wq.begin(), rwlock->wq.end(), &ppu) == rwlock->wq.end())
for (auto cpu = +rwlock->wq; cpu; cpu = cpu->next_cpu)
{
break;
if (cpu == &ppu)
{
ppu.state += cpu_flag::again;
return {};
}
}
ppu.state += cpu_flag::again;
break;
}
@ -433,23 +439,28 @@ error_code sys_rwlock_wlock(ppu_thread& ppu, u32 rw_lock_id, u64 timeout)
}
// If the last waiter quit the writer sleep queue, wake blocked readers
if (!rwlock->rq.empty() && rwlock->wq.empty() && rwlock->owner < 0)
if (rwlock->rq && !rwlock->wq && rwlock->owner < 0)
{
rwlock->owner.atomic_op([&](s64& owner)
{
owner -= 2 * static_cast<s64>(rwlock->rq.size()); // Add readers to value
owner &= -2; // Clear wait bit
});
s64 size = 0;
// Protocol doesn't matter here since they are all enqueued anyways
for (auto cpu : ::as_rvalue(std::move(rwlock->rq)))
for (auto cpu = +rwlock->rq; cpu; cpu = cpu->next_cpu)
{
size++;
rwlock->append(cpu);
}
rwlock->rq.release(nullptr);
rwlock->owner.atomic_op([&](s64& owner)
{
owner -= 2 * size; // Add readers to value
owner &= -2; // Clear wait bit
});
lv2_obj::awake_all();
}
else if (rwlock->rq.empty() && rwlock->wq.empty())
else if (!rwlock->rq && !rwlock->wq)
{
rwlock->owner &= -2;
}
@ -535,27 +546,32 @@ error_code sys_rwlock_wunlock(ppu_thread& ppu, u32 rw_lock_id)
return {};
}
rwlock->owner = cpu->id << 1 | !rwlock->wq.empty() | !rwlock->rq.empty();
rwlock->owner = cpu->id << 1 | !!rwlock->wq | !!rwlock->rq;
rwlock->awake(cpu);
}
else if (auto readers = rwlock->rq.size())
else if (rwlock->rq)
{
for (auto cpu : rwlock->rq)
for (auto cpu = +rwlock->rq; cpu; cpu = cpu->next_cpu)
{
if (static_cast<ppu_thread*>(cpu)->state & cpu_flag::again)
if (cpu->state & cpu_flag::again)
{
ppu.state += cpu_flag::again;
return {};
}
}
for (auto cpu : ::as_rvalue(std::move(rwlock->rq)))
s64 size = 0;
// Protocol doesn't matter here since they are all enqueued anyways
for (auto cpu = +rwlock->rq; cpu; cpu = cpu->next_cpu)
{
size++;
rwlock->append(cpu);
}
rwlock->owner.release(-2 * static_cast<s64>(readers));
rwlock->rq.release(nullptr);
rwlock->owner.release(-2 * static_cast<s64>(size));
lv2_obj::awake_all();
}
else

View file

@ -29,8 +29,8 @@ struct lv2_rwlock final : lv2_obj
shared_mutex mutex;
atomic_t<s64> owner{0};
std::deque<cpu_thread*> rq;
std::deque<cpu_thread*> wq;
atomic_t<ppu_thread*> rq{};
atomic_t<ppu_thread*> wq{};
lv2_rwlock(u32 protocol, u64 key, u64 name) noexcept
: protocol{static_cast<u8>(protocol)}

View file

@ -129,8 +129,8 @@ error_code sys_semaphore_wait(ppu_thread& ppu, u32 sem_id, u64 timeout)
if (sema.val-- <= 0)
{
sema.sq.emplace_back(&ppu);
sema.sleep(ppu, timeout, true);
lv2_obj::emplace(sema.sq, &ppu);
return false;
}
@ -160,13 +160,16 @@ error_code sys_semaphore_wait(ppu_thread& ppu, u32 sem_id, u64 timeout)
{
std::lock_guard lock(sem->mutex);
if (std::find(sem->sq.begin(), sem->sq.end(), &ppu) == sem->sq.end())
for (auto cpu = +sem->sq; cpu; cpu = cpu->next_cpu)
{
break;
if (cpu == &ppu)
{
ppu.state += cpu_flag::again;
return {};
}
}
ppu.state += cpu_flag::again;
return {};
break;
}
for (usz i = 0; cpu_flag::signal - ppu.state && i < 50; i++)
@ -280,7 +283,7 @@ error_code sys_semaphore_post(ppu_thread& ppu, u32 sem_id, s32 count)
{
std::lock_guard lock(sem->mutex);
for (auto cpu : sem->sq)
for (auto cpu = +sem->sq; cpu; cpu = cpu->next_cpu)
{
if (static_cast<ppu_thread*>(cpu)->state & cpu_flag::again)
{

View file

@ -30,7 +30,7 @@ struct lv2_sema final : lv2_obj
shared_mutex mutex;
atomic_t<s32> val;
std::deque<cpu_thread*> sq;
atomic_t<ppu_thread*> sq{};
lv2_sema(u32 protocol, u64 key, u64 name, s32 max, s32 value) noexcept
: protocol{static_cast<u8>(protocol)}

View file

@ -10,7 +10,6 @@
#include "Emu/IPC.h"
#include "Emu/system_config.h"
#include <deque>
#include <thread>
// attr_protocol (waiting scheduling policy)
@ -107,60 +106,106 @@ public:
return str;
}
// Find and remove the object from the deque container
template <typename T, typename E>
static T unqueue(std::deque<T>& queue, E object)
// Find and remove the object from the linked list
template <typename T>
static T* unqueue(atomic_t<T*>& first, T* object, atomic_t<T*> T::* mem_ptr = &T::next_cpu)
{
for (auto found = queue.cbegin(), end = queue.cend(); found != end; found++)
auto it = +first;
if (it == object)
{
if (*found == object)
first.release(+it->*mem_ptr);
(it->*mem_ptr).release(nullptr);
return it;
}
for (; it;)
{
const auto next = it->*mem_ptr + 0;
if (next == object)
{
queue.erase(found);
return static_cast<T>(object);
(it->*mem_ptr).release(+next->*mem_ptr);
(next->*mem_ptr).release(nullptr);
return next;
}
it = next;
}
return {};
}
template <typename E, typename T>
static T* schedule(std::deque<T*>& queue, u32 protocol)
static E* schedule(atomic_t<T>& first, u32 protocol)
{
if (queue.empty())
auto it = static_cast<E*>(first);
if (!it)
{
return nullptr;
return it;
}
if (protocol == SYS_SYNC_FIFO)
{
const auto res = queue.front();
if (it && cpu_flag::again - it->state)
{
first.release(+it->next_cpu);
it->next_cpu.release(nullptr);
}
if (res->state.none_of(cpu_flag::again))
queue.pop_front();
return res;
return it;
}
s32 prio = 3071;
auto it = queue.cbegin();
s32 prio = it->prio;
auto found = it;
auto parent_found = &first;
for (auto found = it, end = queue.cend(); found != end; found++)
while (true)
{
const s32 _prio = static_cast<E*>(*found)->prio;
auto& node = it->next_cpu;
const auto next = static_cast<E*>(node);
if (!next)
{
break;
}
const s32 _prio = static_cast<E*>(next)->prio;
if (_prio < prio)
{
it = found;
found = next;
parent_found = &node;
prio = _prio;
}
it = next;
}
const auto res = *it;
if (cpu_flag::again - found->state)
{
parent_found->release(+found->next_cpu);
found->next_cpu.release(nullptr);
}
if (res->state.none_of(cpu_flag::again))
queue.erase(it);
return found;
}
return res;
template <typename T>
static auto emplace(atomic_t<T>& first, T object)
{
auto it = &first;
while (auto ptr = static_cast<T>(+*it))
{
it = &ptr->next_cpu;
}
it->release(object);
// Return parent
return it;
}
private:
@ -443,7 +488,10 @@ public:
return;
}
cpu->state.notify_one(cpu_flag::suspend + cpu_flag::signal);
if (cpu->state & cpu_flag::signal)
{
cpu->state.notify_one(cpu_flag::suspend + cpu_flag::signal);
}
}
}
@ -463,16 +511,10 @@ private:
static thread_local std::vector<class cpu_thread*> g_to_awake;
// Scheduler queue for active PPU threads
static std::deque<class ppu_thread*> g_ppu;
static atomic_t<class ppu_thread*> g_ppu;
// Waiting for the response from
static std::deque<class cpu_thread*> g_pending;
// Scheduler queue for timeouts (wait until -> thread)
static std::deque<std::pair<u64, class cpu_thread*>> g_waiting;
// Threads which must call lv2_obj::sleep before the scheduler starts
static std::deque<class cpu_thread*> g_to_sleep;
static u32 g_pending;
// Pending list of threads to notify
static thread_local std::add_pointer_t<class cpu_thread> g_to_notify[4];

View file

@ -11,6 +11,7 @@
#include "sys_process.h"
#include <thread>
#include <deque>
LOG_CHANNEL(sys_timer);
@ -152,8 +153,12 @@ error_code sys_timer_create(ppu_thread& ppu, vm::ptr<u32> timer_id)
auto& thread = g_fxo->get<named_thread<lv2_timer_thread>>();
{
std::lock_guard lock(thread.mutex);
lv2_obj::unqueue(thread.timers, ptr);
thread.timers.emplace_back(std::move(ptr));
// Theoretically could have been destroyed by sys_timer_destroy by now
if (auto it = std::find(thread.timers.begin(), thread.timers.end(), ptr); it == thread.timers.end())
{
thread.timers.emplace_back(std::move(ptr));
}
}
*timer_id = idm::last_id();
@ -192,7 +197,12 @@ error_code sys_timer_destroy(ppu_thread& ppu, u32 timer_id)
auto& thread = g_fxo->get<named_thread<lv2_timer_thread>>();
std::lock_guard lock(thread.mutex);
lv2_obj::unqueue(thread.timers, std::move(timer.ptr));
if (auto it = std::find(thread.timers.begin(), thread.timers.end(), timer.ptr); it != thread.timers.end())
{
thread.timers.erase(it);
}
return CELL_OK;
}

View file

@ -109,7 +109,7 @@ public:
// sys_usbd_receive_event PPU Threads
shared_mutex mutex_sq;
std::deque<ppu_thread*> sq;
atomic_t<ppu_thread*> sq{};
static constexpr auto thread_name = "Usb Manager Thread"sv;
@ -642,7 +642,7 @@ error_code sys_usbd_finalize(ppu_thread& ppu, u32 handle)
usbh.is_init = false;
// Forcefully awake all waiters
for (auto& cpu : ::as_rvalue(std::move(usbh.sq)))
for (auto cpu = +usbh.sq; cpu; cpu = cpu->next_cpu)
{
// Special ternimation signal value
cpu->gpr[4] = 4;
@ -651,6 +651,8 @@ error_code sys_usbd_finalize(ppu_thread& ppu, u32 handle)
lv2_obj::awake(cpu);
}
usbh.sq.release(nullptr);
// TODO
return CELL_OK;
}
@ -857,7 +859,7 @@ error_code sys_usbd_receive_event(ppu_thread& ppu, u32 handle, vm::ptr<u64> arg1
}
lv2_obj::sleep(ppu);
usbh.sq.emplace_back(&ppu);
lv2_obj::emplace(usbh.sq, &ppu);
}
while (auto state = +ppu.state)
@ -872,14 +874,17 @@ error_code sys_usbd_receive_event(ppu_thread& ppu, u32 handle, vm::ptr<u64> arg1
{
std::lock_guard lock(usbh.mutex);
if (std::find(usbh.sq.begin(), usbh.sq.end(), &ppu) == usbh.sq.end())
for (auto cpu = +usbh.sq; cpu; cpu = cpu->next_cpu)
{
break;
if (cpu == &ppu)
{
ppu.state += cpu_flag::again;
sys_usbd.trace("sys_usbd_receive_event: aborting");
return {};
}
}
ppu.state += cpu_flag::again;
sys_usbd.trace("sys_usbd_receive_event: aborting");
return {};
break;
}
thread_ctrl::wait_on(ppu.state, state);

View file

@ -337,6 +337,14 @@ void kernel_explorer::update()
return;
}
auto show_waiters = [&](QTreeWidgetItem* tree, cpu_thread* cpu)
{
for (; cpu; cpu = cpu->get_next_cpu())
{
add_leaf(tree, qstr(fmt::format("Waiter: ID: 0x%x", cpu->id_type() == 2 ? static_cast<spu_thread*>(cpu)->lv2_id : cpu->id)));
}
};
switch (id >> 24)
{
case SYS_MEM_OBJECT:
@ -356,22 +364,33 @@ void kernel_explorer::update()
case SYS_MUTEX_OBJECT:
{
auto& mutex = static_cast<lv2_mutex&>(obj);
add_leaf(node, qstr(fmt::format(u8"Mutex 0x%08x: “%s”, %s,%s Owner: %#x, Locks: %u, Key: %#llx, Conds: %u, Wq: %zu", id, lv2_obj::name64(mutex.name), mutex.protocol,
mutex.recursive == SYS_SYNC_RECURSIVE ? " Recursive," : "", mutex.owner >> 1, +mutex.lock_count, mutex.key, mutex.cond_count, mutex.sq.size())));
show_waiters(add_solid_node(node, qstr(fmt::format(u8"Mutex 0x%08x: “%s”, %s,%s Owner: %#x, Locks: %u, Key: %#llx, Conds: %u", id, lv2_obj::name64(mutex.name), mutex.protocol,
mutex.recursive == SYS_SYNC_RECURSIVE ? " Recursive," : "", mutex.owner >> 1, +mutex.lock_count, mutex.key, mutex.cond_count))), mutex.sq);
break;
}
case SYS_COND_OBJECT:
{
auto& cond = static_cast<lv2_cond&>(obj);
add_leaf(node, qstr(fmt::format(u8"Cond 0x%08x: “%s”, %s, Mutex: 0x%08x, Key: %#llx, Wq: %u", id, lv2_obj::name64(cond.name), cond.mutex->protocol, cond.mtx_id, cond.key, +cond.waiters)));
show_waiters(add_solid_node(node, qstr(fmt::format(u8"Cond 0x%08x: “%s”, %s, Mutex: 0x%08x, Key: %#llx", id, lv2_obj::name64(cond.name), cond.mutex->protocol, cond.mtx_id, cond.key))), cond.sq);
break;
}
case SYS_RWLOCK_OBJECT:
{
auto& rw = static_cast<lv2_rwlock&>(obj);
const s64 val = rw.owner;
add_leaf(node, qstr(fmt::format(u8"RW Lock 0x%08x: “%s”, %s, Owner: %#x(%d), Key: %#llx, Rq: %zu, Wq: %zu", id, lv2_obj::name64(rw.name), rw.protocol,
std::max<s64>(0, val >> 1), -std::min<s64>(0, val >> 1), rw.key, rw.rq.size(), rw.wq.size())));
auto tree = add_solid_node(node, qstr(fmt::format(u8"RW Lock 0x%08x: “%s”, %s, Owner: %#x(%d), Key: %#llx", id, lv2_obj::name64(rw.name), rw.protocol,
std::max<s64>(0, val >> 1), -std::min<s64>(0, val >> 1), rw.key)));
if (auto rq = +rw.rq)
{
show_waiters(add_solid_node(tree, "Reader Waiters"), rq);
}
if (auto wq = +rw.wq)
{
show_waiters(add_solid_node(tree, "Writer Waiters"), wq);
}
break;
}
case SYS_INTR_TAG_OBJECT:
@ -397,8 +416,8 @@ void kernel_explorer::update()
case SYS_EVENT_QUEUE_OBJECT:
{
auto& eq = static_cast<lv2_event_queue&>(obj);
add_leaf(node, qstr(fmt::format(u8"Event Queue 0x%08x: “%s”, %s, %s, Key: %#llx, Events: %zu/%d, Wq: %zu", id, lv2_obj::name64(eq.name), eq.protocol,
eq.type == SYS_SPU_QUEUE ? "SPU" : "PPU", eq.key, eq.events.size(), eq.size, eq.sq.size())));
show_waiters(add_solid_node(node, qstr(fmt::format(u8"Event Queue 0x%08x: “%s”, %s, %s, Key: %#llx, Events: %zu/%d", id, lv2_obj::name64(eq.name), eq.protocol,
eq.type == SYS_SPU_QUEUE ? "SPU" : "PPU", eq.key, eq.events.size(), eq.size))), eq.type == SYS_SPU_QUEUE ? static_cast<cpu_thread*>(+eq.sq) : +eq.pq);
break;
}
case SYS_EVENT_PORT_OBJECT:
@ -494,12 +513,12 @@ void kernel_explorer::update()
}
else
{
add_leaf(node, qstr(fmt::format(u8"LWMutex 0x%08x: “%s”, %s, Signal: %#x, Wq: %zu (unmapped/invalid control data at *0x%x)", id, lv2_obj::name64(lwm.name), lwm.protocol, +lwm.signaled, lwm.sq.size(), lwm.control)));
show_waiters(add_solid_node(node, qstr(fmt::format(u8"LWMutex 0x%08x: “%s”, %s, Signal: %#x (unmapped/invalid control data at *0x%x)", id, lv2_obj::name64(lwm.name), lwm.protocol, +lwm.signaled, lwm.control))), lwm.sq);
break;
}
add_leaf(node, qstr(fmt::format(u8"LWMutex 0x%08x: “%s”, %s,%s Owner: %s, Locks: %u, Signal: %#x, Control: *0x%x, Wq: %zu", id, lv2_obj::name64(lwm.name), lwm.protocol,
(lwm_data.attribute & SYS_SYNC_RECURSIVE) ? " Recursive," : "", owner_str, lwm_data.recursive_count, +lwm.signaled, lwm.control, lwm.sq.size())));
show_waiters(add_solid_node(node, qstr(fmt::format(u8"LWMutex 0x%08x: “%s”, %s,%s Owner: %s, Locks: %u, Signal: %#x, Control: *0x%x", id, lv2_obj::name64(lwm.name), lwm.protocol,
(lwm_data.attribute & SYS_SYNC_RECURSIVE) ? " Recursive," : "", owner_str, lwm_data.recursive_count, +lwm.signaled, lwm.control))), lwm.sq);
break;
}
case SYS_TIMER_OBJECT:
@ -517,21 +536,21 @@ void kernel_explorer::update()
{
auto& sema = static_cast<lv2_sema&>(obj);
const auto val = +sema.val;
add_leaf(node, qstr(fmt::format(u8"Sema 0x%08x: “%s”, %s, Count: %d/%d, Key: %#llx, Wq: %zu", id, lv2_obj::name64(sema.name), sema.protocol,
std::max<s32>(val, 0), sema.max, sema.key, -std::min<s32>(val, 0))));
show_waiters(add_solid_node(node, qstr(fmt::format(u8"Sema 0x%08x: “%s”, %s, Count: %d/%d, Key: %#llx", id, lv2_obj::name64(sema.name), sema.protocol,
std::max<s32>(val, 0), sema.max, sema.key, -std::min<s32>(val, 0)))), sema.sq);
break;
}
case SYS_LWCOND_OBJECT:
{
auto& lwc = static_cast<lv2_lwcond&>(obj);
add_leaf(node, qstr(fmt::format(u8"LWCond 0x%08x: “%s”, %s, OG LWMutex: 0x%08x, Control: *0x%x, Wq: %zu", id, lv2_obj::name64(lwc.name), lwc.protocol, lwc.lwid, lwc.control, +lwc.waiters)));
show_waiters(add_solid_node(node, qstr(fmt::format(u8"LWCond 0x%08x: “%s”, %s, OG LWMutex: 0x%08x, Control: *0x%x", id, lv2_obj::name64(lwc.name), lwc.protocol, lwc.lwid, lwc.control))), lwc.sq);
break;
}
case SYS_EVENT_FLAG_OBJECT:
{
auto& ef = static_cast<lv2_event_flag&>(obj);
add_leaf(node, qstr(fmt::format(u8"Event Flag 0x%08x: “%s”, %s, Type: 0x%x, Key: %#llx, Pattern: 0x%llx, Wq: %zu", id, lv2_obj::name64(ef.name), ef.protocol,
ef.type, ef.key, ef.pattern.load(), +ef.waiters)));
show_waiters(add_solid_node(node, qstr(fmt::format(u8"Event Flag 0x%08x: “%s”, %s, Type: 0x%x, Key: %#llx, Pattern: 0x%llx", id, lv2_obj::name64(ef.name), ef.protocol,
ef.type, ef.key, ef.pattern.load()))), ef.sq);
break;
}
case SYS_RSXAUDIO_OBJECT:
@ -543,7 +562,7 @@ void kernel_explorer::update()
break;
}
QTreeWidgetItem* rao_obj = add_leaf(node, qstr(fmt::format(u8"RSXAudio 0x%08x: Shmem: 0x%08x", id, u32{rao.shmem})));
QTreeWidgetItem* rao_obj = add_solid_node(node, qstr(fmt::format(u8"RSXAudio 0x%08x: Shmem: 0x%08x", id, u32{rao.shmem})));
for (u64 q_idx = 0; q_idx < rao.event_queue.size(); q_idx++)
{
if (const auto eq = rao.event_queue[q_idx].lock())