Implement named_thread_group

This commit is contained in:
Nekotekina 2020-02-29 14:57:41 +03:00
parent eb140c52a4
commit f72971f19f
4 changed files with 150 additions and 76 deletions

View file

@ -431,3 +431,87 @@ public:
}
}
};
// Group of named threads, similar to named_thread
template <class Context>
class named_thread_group final
{
using Thread = named_thread<Context>;
const u32 m_count;
Thread* m_threads;
public:
// Lambda constructor, also the implicit deduction guide candidate
named_thread_group(std::string_view name, u32 count, const Context& f)
: m_count(count)
, m_threads(nullptr)
{
if (count == 0)
{
return;
}
m_threads = static_cast<Thread*>(::operator new(sizeof(Thread) * m_count, std::align_val_t{alignof(Thread)}));
// Create all threads
for (u32 i = 0; i < m_count; i++)
{
new (static_cast<void*>(m_threads + i)) Thread(std::string(name) + std::to_string(i + 1), f);
}
}
named_thread_group(const named_thread_group&) = delete;
named_thread_group& operator=(const named_thread_group&) = delete;
// Wait for completion
void join() const
{
for (u32 i = 0; i < m_count; i++)
{
std::as_const(*std::launder(m_threads + i))();
}
}
// Join and access specific thread
auto operator[](u32 index) const
{
return std::as_const(*std::launder(m_threads + index))();
}
// Join and access specific thread
auto operator[](u32 index)
{
return (*std::launder(m_threads + index))();
}
// Dumb iterator
auto begin()
{
return std::launder(m_threads);
}
// Dumb iterator
auto end()
{
return m_threads + m_count;
}
u32 size() const
{
return m_count;
}
~named_thread_group()
{
// Destroy all threads (it should join them)
for (u32 i = 0; i < m_count; i++)
{
std::launder(m_threads + i)->~Thread();
}
::operator delete(static_cast<void*>(m_threads), std::align_val_t{alignof(Thread)});
}
};

View file

@ -394,11 +394,6 @@ void spu_cache::initialize()
atomic_t<std::size_t> fnext{};
atomic_t<u8> fail_flag{0};
// Initialize compiler instances for parallel compilation
u32 max_threads = static_cast<u32>(g_cfg.core.llvm_threads);
u32 thread_count = max_threads > 0 ? std::min(max_threads, std::thread::hardware_concurrency()) : std::thread::hardware_concurrency();
std::vector<std::unique_ptr<spu_recompiler_base>> compilers{thread_count};
if (g_cfg.core.spu_decoder == spu_decoder_type::fast || g_cfg.core.spu_decoder == spu_decoder_type::llvm)
{
if (auto compiler = spu_recompiler_base::make_llvm_recompiler(11))
@ -421,26 +416,7 @@ void spu_cache::initialize()
}
}
for (auto& compiler : compilers)
{
if (g_cfg.core.spu_decoder == spu_decoder_type::asmjit)
{
compiler = spu_recompiler_base::make_asmjit_recompiler();
}
else if (g_cfg.core.spu_decoder == spu_decoder_type::llvm)
{
compiler = spu_recompiler_base::make_llvm_recompiler();
}
else
{
compilers.clear();
break;
}
compiler->init();
}
if (!compilers.empty() && !func_list.empty())
if (g_cfg.core.spu_decoder == spu_decoder_type::asmjit || g_cfg.core.spu_decoder == spu_decoder_type::llvm)
{
// Initialize progress dialog (wait for previous progress done)
while (g_progr_ptotal)
@ -452,10 +428,25 @@ void spu_cache::initialize()
g_progr_ptotal += func_list.size();
}
std::deque<named_thread<std::function<void()>>> thread_queue;
for (std::size_t i = 0; i < compilers.size(); i++) thread_queue.emplace_back("Worker " + std::to_string(i), [&, compiler = compilers[i].get()]()
named_thread_group workers("SPU Worker ", Emu.GetMaxThreads(), [&]() -> uint
{
// Initialize compiler instances for parallel compilation
std::unique_ptr<spu_recompiler_base> compiler;
if (g_cfg.core.spu_decoder == spu_decoder_type::asmjit)
{
compiler = spu_recompiler_base::make_asmjit_recompiler();
}
else if (g_cfg.core.spu_decoder == spu_decoder_type::llvm)
{
compiler = spu_recompiler_base::make_llvm_recompiler();
}
compiler->init();
// How much every thread compiled
uint result = 0;
// Fake LS
std::vector<be_t<u32>> ls(0x10000);
@ -497,13 +488,17 @@ void spu_cache::initialize()
std::memset(ls.data() + start / 4, 0, 4 * (size0 - 1));
g_progr_pdone++;
result++;
}
return result;
});
// Join all threads
while (!thread_queue.empty())
// Join (implicitly) and print individual results
for (u32 i = 0; i < workers.size(); i++)
{
thread_queue.pop_front();
spu_log.notice("SPU Runtime: Worker %u built %u programs.", i + 1, workers[i]);
}
if (Emu.IsStopped())
@ -518,7 +513,7 @@ void spu_cache::initialize()
return;
}
if (!compilers.empty() && !func_list.empty())
if ((g_cfg.core.spu_decoder == spu_decoder_type::asmjit || g_cfg.core.spu_decoder == spu_decoder_type::llvm) && !func_list.empty())
{
spu_log.success("SPU Runtime: Built %u functions.", func_list.size());
}

View file

@ -925,9 +925,6 @@ game_boot_result Emulator::Load(const std::string& title_id, bool add_only, bool
std::vector<std::pair<std::string, u64>> file_queue;
file_queue.reserve(2000);
std::queue<named_thread<std::function<void()>>> thread_queue;
const uint max_threads = std::thread::hardware_concurrency();
// Initialize progress dialog
g_progr = "Scanning directories for SPRX libraries...";
@ -975,57 +972,46 @@ game_boot_result Emulator::Load(const std::string& title_id, bool add_only, bool
g_progr = "Compiling PPU modules";
atomic_t<u32> worker_count = 0;
atomic_t<std::size_t> fnext = 0;
for (std::size_t i = 0; i < file_queue.size(); i++)
named_thread_group workers("SPRX Worker ", GetMaxThreads(), [&]
{
const auto& path = file_queue[i].first;
sys_log.notice("Trying to load SPRX: %s", path);
// Load MSELF or SPRX
fs::file src{path};
if (file_queue[i].second == 0)
for (std::size_t func_i = fnext++; func_i < file_queue.size(); func_i = fnext++)
{
// Some files may fail to decrypt due to the lack of klic
src = decrypt_self(std::move(src));
}
const auto& path = std::as_const(file_queue)[func_i].first;
const ppu_prx_object obj = src;
sys_log.notice("Trying to load SPRX: %s", path);
if (obj == elf_error::ok)
{
if (auto prx = ppu_load_prx(obj, path))
// Load MSELF or SPRX
fs::file src{path};
if (file_queue[func_i].second == 0)
{
worker_count++;
while (worker_count > max_threads)
{
std::this_thread::sleep_for(10ms);
}
thread_queue.emplace("Worker " + std::to_string(thread_queue.size()), [_prx = std::move(prx), &worker_count]
{
ppu_initialize(*_prx);
ppu_unload_prx(*_prx);
g_progr_fdone++;
worker_count--;
});
continue;
// Some files may fail to decrypt due to the lack of klic
src = decrypt_self(std::move(src));
}
}
sys_log.error("Failed to load SPRX '%s' (%s)", path, obj.get_error());
g_progr_fdone++;
}
const ppu_prx_object obj = src;
if (obj == elf_error::ok)
{
if (auto prx = ppu_load_prx(obj, path))
{
ppu_initialize(*prx);
ppu_unload_prx(*prx);
g_progr_fdone++;
continue;
}
}
sys_log.error("Failed to load SPRX '%s' (%s)", path, obj.get_error());
g_progr_fdone++;
continue;
}
});
// Join every thread
while (!thread_queue.empty())
{
thread_queue.pop();
}
workers.join();
// Exit "process"
Emu.CallAfter([]
@ -1719,6 +1705,13 @@ std::string Emulator::GetFormattedTitle(double fps) const
return rpcs3::get_formatted_title(title_data);
}
u32 Emulator::GetMaxThreads() const
{
u32 max_threads = static_cast<u32>(g_cfg.core.llvm_threads);
u32 thread_count = max_threads > 0 ? std::min(max_threads, std::thread::hardware_concurrency()) : std::thread::hardware_concurrency();
return thread_count;
}
s32 error_code::error_report(const fmt_type_info* sup, u64 arg, const fmt_type_info* sup2, u64 arg2)
{
static thread_local std::unordered_map<std::string, std::size_t> g_tls_error_stats;

View file

@ -202,6 +202,8 @@ public:
void SetHasGui(bool has_gui) { m_has_gui = has_gui; }
std::string GetFormattedTitle(double fps) const;
u32 GetMaxThreads() const;
};
extern Emulator Emu;