pthread: Implement condvar with waitable atomics and sleepqueue

This commit is contained in:
IndecisiveTurtle 2024-11-15 19:50:10 +02:00
parent f96a21551a
commit 4fd9ef6136
4 changed files with 247 additions and 44 deletions

View file

@ -1,6 +1,7 @@
// SPDX-FileCopyrightText: Copyright 2024 shadPS4 Emulator Project
// SPDX-License-Identifier: GPL-2.0-or-later
#include <boost/icl/separate_interval_set.hpp>
#include <map>
#include "common/alignment.h"
#include "common/arch.h"

View file

@ -2,6 +2,8 @@
// SPDX-License-Identifier: GPL-2.0-or-later
#include <cstring>
#include <unordered_map>
#include "common/assert.h"
#include "core/libraries/error_codes.h"
#include "core/libraries/kernel/kernel.h"
#include "core/libraries/kernel/threads/pthread.h"
@ -19,6 +21,59 @@ static constexpr PthreadCondAttr PhreadCondattrDefault = {
.c_clockid = ClockId::Realtime,
};
static std::mutex sc_lock;
static std::unordered_map<void*, SleepQueue*> sc_table;
void SleepqAdd(void* wchan, Pthread* td) {
auto [it, is_new] = sc_table.try_emplace(wchan, td->sleepqueue);
if (!is_new) {
it->second->sq_freeq.push_front(td->sleepqueue);
}
td->sleepqueue = nullptr;
td->wchan = wchan;
it->second->sq_blocked.push_back(td);
}
int SleepqRemove(SleepQueue* sq, Pthread* td) {
std::erase(sq->sq_blocked, td);
if (sq->sq_blocked.empty()) {
td->sleepqueue = sq;
sc_table.erase(td->wchan);
td->wchan = nullptr;
return 0;
} else {
td->sleepqueue = sq->sq_freeq.front();
sq->sq_freeq.pop_front();
td->wchan = nullptr;
return 1;
}
}
void SleepqDrop(SleepQueue* sq, void (*callback)(Pthread*, void*), void* arg) {
if (sq->sq_blocked.empty()) {
return;
}
sc_table.erase(sq);
Pthread* td = sq->sq_blocked.front();
sq->sq_blocked.pop_front();
callback(td, arg);
td->sleepqueue = sq;
td->wchan = nullptr;
auto sq2 = sq->sq_freeq.begin();
for (Pthread* td : sq->sq_blocked) {
callback(td, arg);
td->sleepqueue = *sq2;
td->wchan = nullptr;
++sq2;
}
sq->sq_blocked.clear();
sq->sq_freeq.clear();
}
static int CondInit(PthreadCondT* cond, const PthreadCondAttrT* cond_attr, const char* name) {
auto* cvp = new PthreadCond{};
if (cvp == nullptr) {
@ -90,35 +145,61 @@ int PS4_SYSV_ABI posix_pthread_cond_destroy(PthreadCondT* cond) {
return 0;
}
int PthreadCond::Wait(PthreadMutexT* mutex, const OrbisKernelTimespec* abstime) {
int PthreadCond::Wait(PthreadMutexT* mutex, const OrbisKernelTimespec* abstime, u64 usec) {
PthreadMutex* mp = *mutex;
if (int error = mp->IsOwned(g_curthread); error != 0) {
return error;
}
//_thr_testcancel(curthread);
//_thr_cancel_enter2(curthread, 0);
if (abstime) {
const auto status = cond.wait_until(*mp, abstime->TimePoint());
return status == std::cv_status::timeout ? POSIX_ETIMEDOUT : 0;
} else {
cond.wait(*mp);
return 0;
}
//_thr_cancel_leave(curthread, 0);
}
Pthread* curthread = g_curthread;
ASSERT_MSG(curthread->wchan == nullptr, "Thread was already on queue.");
// _thr_testcancel(curthread);
sc_lock.lock();
int PthreadCond::Wait(PthreadMutexT* mutex, u64 usec) {
PthreadMutex* mp = *mutex;
if (int error = mp->IsOwned(g_curthread); error != 0) {
return error;
}
/*
* set __has_user_waiters before unlocking mutex, this allows
* us to check it without locking in pthread_cond_signal().
*/
has_user_waiters = 1;
curthread->will_sleep = 1;
//_thr_testcancel(curthread);
//_thr_cancel_enter2(curthread, 0);
const auto status = cond.wait_for(*mp, std::chrono::microseconds(usec));
return status == std::cv_status::timeout ? POSIX_ETIMEDOUT : 0;
//_thr_cancel_leave(curthread, 0);
int recurse;
mp->CvUnlock(&recurse);
curthread->mutex_obj = mp;
SleepqAdd(this, curthread);
int error = 0;
for (;;) {
curthread->wake_sema.try_acquire();
sc_lock.unlock();
//_thr_cancel_enter2(curthread, 0);
int error = curthread->Sleep(abstime, usec) ? 0 : POSIX_ETIMEDOUT;
//_thr_cancel_leave(curthread, 0);
sc_lock.lock();
if (curthread->wchan == nullptr) {
error = 0;
break;
} else if (curthread->ShouldCancel()) {
SleepQueue* sq = sc_table[this];
has_user_waiters = SleepqRemove(sq, curthread);
sc_lock.unlock();
curthread->mutex_obj = nullptr;
mp->CvLock(recurse);
return 0;
} else if (error == POSIX_ETIMEDOUT) {
SleepQueue* sq = sc_table[this];
has_user_waiters = SleepqRemove(sq, curthread);
break;
}
UNREACHABLE();
}
sc_lock.unlock();
curthread->mutex_obj = nullptr;
mp->CvLock(recurse);
return error;
}
int PS4_SYSV_ABI posix_pthread_cond_wait(PthreadCondT* cond, PthreadMutexT* mutex) {
@ -143,20 +224,102 @@ int PS4_SYSV_ABI posix_pthread_cond_reltimedwait_np(PthreadCondT* cond, PthreadM
u64 usec) {
PthreadCond* cvp{};
CHECK_AND_INIT_COND
return cvp->Wait(mutex, usec);
return cvp->Wait(mutex, THR_RELTIME, usec);
}
int PthreadCond::Signal() {
Pthread* curthread = g_curthread;
sc_lock.lock();
auto it = sc_table.find(this);
if (it == sc_table.end()) {
sc_lock.unlock();
return 0;
}
SleepQueue* sq = it->second;
Pthread* td = sq->sq_blocked.front();
PthreadMutex* mp = td->mutex_obj;
has_user_waiters = SleepqRemove(sq, td);
std::binary_semaphore* waddr = nullptr;
if (mp->m_owner == curthread) {
if (curthread->nwaiter_defer >= Pthread::MaxDeferWaiters) {
curthread->WakeAll();
}
curthread->defer_waiters[curthread->nwaiter_defer++] = &td->wake_sema;
mp->m_flags |= PthreadMutexFlags::Defered;
} else {
waddr = &td->wake_sema;
}
sc_lock.unlock();
if (waddr != nullptr) {
waddr->release();
}
return 0;
}
struct BroadcastArg {
Pthread* curthread;
std::binary_semaphore* waddrs[Pthread::MaxDeferWaiters];
int count;
};
int PthreadCond::Broadcast() {
BroadcastArg ba;
ba.curthread = g_curthread;
ba.count = 0;
const auto drop_cb = [](Pthread* td, void* arg) {
BroadcastArg* ba = reinterpret_cast<BroadcastArg*>(arg);
Pthread* curthread = ba->curthread;
PthreadMutex* mp = td->mutex_obj;
if (mp->m_owner == curthread) {
if (curthread->nwaiter_defer >= Pthread::MaxDeferWaiters) {
curthread->WakeAll();
}
curthread->defer_waiters[curthread->nwaiter_defer++] = &td->wake_sema;
mp->m_flags |= PthreadMutexFlags::Defered;
} else {
if (ba->count >= Pthread::MaxDeferWaiters) {
for (int i = 0; i < ba->count; i++) {
ba->waddrs[i]->release();
}
ba->count = 0;
}
ba->waddrs[ba->count++] = &td->wake_sema;
}
};
sc_lock.lock();
auto it = sc_table.find(this);
if (it == sc_table.end()) {
sc_lock.unlock();
return 0;
}
SleepqDrop(it->second, drop_cb, &ba);
has_user_waiters = 0;
sc_lock.unlock();
for (int i = 0; i < ba.count; i++) {
ba.waddrs[i]->release();
}
return 0;
}
int PS4_SYSV_ABI posix_pthread_cond_signal(PthreadCondT* cond) {
PthreadCond* cvp{};
CHECK_AND_INIT_COND
cvp->cond.notify_one();
return 0;
return cvp->Signal();
}
int PS4_SYSV_ABI posix_pthread_cond_broadcast(PthreadCondT* cond) {
PthreadCond* cvp{};
CHECK_AND_INIT_COND
cvp->cond.notify_all();
cvp->Broadcast();
return 0;
}

View file

@ -18,7 +18,6 @@ static std::mutex MutxStaticLock;
#define THR_MUTEX_INITIALIZER ((PthreadMutex*)NULL)
#define THR_ADAPTIVE_MUTEX_INITIALIZER ((PthreadMutex*)1)
#define THR_MUTEX_DESTROYED ((PthreadMutex*)2)
#define THR_MUTEX_RELTIME (const OrbisKernelTimespec*)-1
#define CPU_SPINWAIT __asm__ volatile("pause")
@ -138,7 +137,7 @@ int PthreadMutex::SelfTryLock() {
int PthreadMutex::SelfLock(const OrbisKernelTimespec* abstime, u64 usec) {
const auto DoSleep = [&] {
if (abstime == THR_MUTEX_RELTIME) {
if (abstime == THR_RELTIME) {
std::this_thread::sleep_for(std::chrono::microseconds(usec));
return POSIX_ETIMEDOUT;
} else {
@ -225,11 +224,11 @@ int PthreadMutex::Lock(const OrbisKernelTimespec* abstime, u64 usec) {
if (abstime == nullptr) {
m_lock.lock();
} else if (abstime != THR_MUTEX_RELTIME &&
(abstime->tv_nsec < 0 || abstime->tv_nsec >= 1000000000)) [[unlikely]] {
} else if (abstime != THR_RELTIME && (abstime->tv_nsec < 0 || abstime->tv_nsec >= 1000000000))
[[unlikely]] {
ret = POSIX_EINVAL;
} else {
if (THR_MUTEX_RELTIME) {
if (THR_RELTIME) {
ret = m_lock.try_lock_for(std::chrono::microseconds(usec)) ? 0 : POSIX_ETIMEDOUT;
} else {
ret = m_lock.try_lock_until(abstime->TimePoint()) ? 0 : POSIX_ETIMEDOUT;
@ -269,7 +268,7 @@ int PS4_SYSV_ABI posix_pthread_mutex_timedlock(PthreadMutexT* mutex,
int PS4_SYSV_ABI posix_pthread_mutex_reltimedlock_np(PthreadMutexT* mutex, u64 usec) {
CHECK_AND_INIT_MUTEX
return (*mutex)->Lock(THR_MUTEX_RELTIME, usec);
return (*mutex)->Lock(THR_RELTIME, usec);
}
int PthreadMutex::Unlock() {
@ -284,8 +283,15 @@ int PthreadMutex::Unlock() {
if (Type() == PthreadMutexType::Recursive && m_count > 0) [[unlikely]] {
m_count--;
} else {
int defered = True(m_flags & PthreadMutexFlags::Defered);
m_flags &= ~PthreadMutexFlags::Defered;
curthread->Dequeue(this);
m_lock.unlock();
if (curthread->will_sleep == 0 && defered) {
curthread->WakeAll();
}
}
return 0;
}

View file

@ -3,7 +3,6 @@
#pragma once
#include <condition_variable>
#include <forward_list>
#include <list>
#include <mutex>
@ -61,22 +60,28 @@ struct PthreadMutex : public ListBaseHook {
return static_cast<PthreadMutexType>(m_flags & PthreadMutexFlags::TypeMask);
}
void lock() {
Lock(nullptr);
}
void unlock() {
Unlock();
}
int SelfTryLock();
int SelfLock(const OrbisKernelTimespec* abstime, u64 usec);
int TryLock();
int Lock(const OrbisKernelTimespec* abstime, u64 usec = 0);
int CvLock(int recurse) {
const int error = Lock(nullptr);
if (error == 0) {
m_count = recurse;
}
return error;
}
int Unlock();
int CvUnlock(int* recurse) {
*recurse = m_count;
m_count = 0;
return Unlock();
}
bool IsOwned(Pthread* curthread) const;
};
using PthreadMutexT = PthreadMutex*;
@ -111,15 +116,16 @@ enum class ClockId : u32 {
};
struct PthreadCond {
std::condition_variable_any cond;
u32 has_user_waiters;
u32 has_kern_waiters;
u32 flags;
ClockId clock_id;
std::string name;
int Wait(PthreadMutexT* mutex, const OrbisKernelTimespec* abstime);
int Wait(PthreadMutexT* mutex, u64 usec);
int Wait(PthreadMutexT* mutex, const OrbisKernelTimespec* abstime, u64 usec = 0);
int Signal();
int Broadcast();
};
using PthreadCondT = PthreadCond*;
@ -247,8 +253,11 @@ struct SchedParam {
int sched_priority;
};
#define THR_RELTIME (const OrbisKernelTimespec*)-1
struct Pthread {
static constexpr u32 ThrMagic = 0xd09ba115U;
static constexpr u32 MaxDeferWaiters = 50;
std::atomic<long> tid;
std::mutex lock;
@ -296,6 +305,8 @@ struct Pthread {
PthreadMutex* mutex_obj;
bool will_sleep;
bool has_user_waiters;
int nwaiter_defer;
std::binary_semaphore* defer_waiters[MaxDeferWaiters];
bool InCritical() const noexcept {
return locklevel > 0 || critical_count > 0;
@ -309,6 +320,28 @@ struct Pthread {
return cancel_pending && cancel_enable && no_cancel == 0;
}
void WakeAll() {
for (int i = 0; i < nwaiter_defer; i++) {
defer_waiters[i]->release();
}
nwaiter_defer = 0;
}
bool Sleep(const OrbisKernelTimespec* abstime, u64 usec) {
will_sleep = 0;
if (nwaiter_defer > 0) {
WakeAll();
}
if (abstime == THR_RELTIME) {
return wake_sema.try_acquire_for(std::chrono::microseconds(usec));
} else if (abstime != nullptr) {
return wake_sema.try_acquire_until(abstime->TimePoint());
} else {
wake_sema.acquire();
return true;
}
}
void Enqueue(PthreadMutex* mutex) {
mutex->m_owner = this;
// mutexq.push_back(*mutex);