Multithreaded shader compliation follow-up (#7190)

* Multithreaded load pipeline entries shader compliation stage

Co-authored-by: kd-11 <15904127+kd-11@users.noreply.github.com>
This commit is contained in:
Dravonic 2020-01-06 15:59:59 -03:00 committed by kd-11
commit 94d2f97f27
2 changed files with 225 additions and 196 deletions

View file

@ -1,4 +1,4 @@
#pragma once
#pragma once
#include "Emu/RSX/RSXFragmentProgram.h"
#include "Emu/RSX/RSXVertexProgram.h"
@ -174,10 +174,12 @@ public:
};
protected:
shared_mutex m_vertex_mutex;
shared_mutex m_fragment_mutex;
shared_mutex m_pipeline_mutex;
shared_mutex m_decompiler_mutex;
size_t m_next_id = 0;
atomic_t<size_t> m_next_id = 0;
bool m_cache_miss_flag; // Set if last lookup did not find any usable cached programs
bool m_program_compiled_flag; // Set if last lookup caused program to be linked
@ -195,6 +197,11 @@ protected:
/// bool here to inform that the program was preexisting.
std::tuple<const vertex_program_type&, bool> search_vertex_program(const RSXVertexProgram& rsx_vp, bool force_load = true)
{
bool recompile = false;
vertex_program_type* new_shader;
{
reader_lock lock(m_vertex_mutex);
const auto& I = m_vertex_shader_cache.find(rsx_vp);
if (I != m_vertex_shader_cache.end())
{
@ -207,15 +214,30 @@ protected:
}
LOG_NOTICE(RSX, "VP not found in buffer!");
vertex_program_type& new_shader = m_vertex_shader_cache[rsx_vp];
backend_traits::recompile_vertex_program(rsx_vp, new_shader, m_next_id++);
return std::forward_as_tuple(new_shader, false);
lock.upgrade();
auto [it, inserted] = m_vertex_shader_cache.try_emplace(rsx_vp);
new_shader = &(it->second);
recompile = inserted;
}
if (recompile)
{
backend_traits::recompile_vertex_program(rsx_vp, *new_shader, m_next_id++);
}
return std::forward_as_tuple(*new_shader, false);
}
/// bool here to inform that the program was preexisting.
std::tuple<const fragment_program_type&, bool> search_fragment_program(const RSXFragmentProgram& rsx_fp, bool force_load = true)
{
bool recompile = false;
fragment_program_type* new_shader;
void* fragment_program_ucode_copy;
{
reader_lock lock(m_fragment_mutex);
const auto& I = m_fragment_shader_cache.find(rsx_fp);
if (I != m_fragment_shader_cache.end())
{
@ -228,18 +250,30 @@ protected:
}
LOG_NOTICE(RSX, "FP not found in buffer!");
void* fragment_program_ucode_copy = malloc(rsx_fp.ucode_length);
fragment_program_ucode_copy = malloc(rsx_fp.ucode_length);
verify("malloc() failed!" HERE), fragment_program_ucode_copy;
std::memcpy(fragment_program_ucode_copy, rsx_fp.addr, rsx_fp.ucode_length);
RSXFragmentProgram new_fp_key = rsx_fp;
new_fp_key.addr = fragment_program_ucode_copy;
fragment_program_type &new_shader = m_fragment_shader_cache[new_fp_key];
backend_traits::recompile_fragment_program(rsx_fp, new_shader, m_next_id++);
return std::forward_as_tuple(new_shader, false);
lock.upgrade();
auto [it, inserted] = m_fragment_shader_cache.try_emplace(new_fp_key);
new_shader = &(it->second);
recompile = inserted;
}
if (recompile)
{
backend_traits::recompile_fragment_program(rsx_fp, *new_shader, m_next_id++);
}
else
{
free(fragment_program_ucode_copy);
}
return std::forward_as_tuple(*new_shader, false);
}
public:
@ -322,22 +356,6 @@ public:
}
}
const vertex_program_type& get_transform_program(const RSXVertexProgram& rsx_vp) const
{
auto I = m_vertex_shader_cache.find(rsx_vp);
if (I != m_vertex_shader_cache.end())
return I->second;
fmt::throw_exception("Trying to get unknown transform program" HERE);
}
const fragment_program_type& get_shader_program(const RSXFragmentProgram& rsx_fp) const
{
auto I = m_fragment_shader_cache.find(rsx_fp);
if (I != m_fragment_shader_cache.end())
return I->second;
fmt::throw_exception("Trying to get unknown shader program" HERE);
}
// Returns 2 booleans.
// First flag hints that there is more work to do (busy hint)
// Second flag is true if at least one program has been linked successfully (sync hint)
@ -348,23 +366,32 @@ public:
// NOTE: Linking is much slower than decompilation step, so always decompile at least 1 unit
// TODO: Use try_lock instead
bool busy = false;
{
u32 count = 0;
std::lock_guard lock(m_decompiler_mutex);
std::unique_ptr<async_decompile_task_entry> decompile_task;
while (!m_decompile_queue.empty())
while (true)
{
const auto& decompile_task = m_decompile_queue.front();
if (decompile_task.is_fp)
{
search_fragment_program(decompile_task.fp);
std::lock_guard lock(m_decompiler_mutex);
if (m_decompile_queue.empty())
{
break;
}
else
{
search_vertex_program(decompile_task.vp);
decompile_task = std::make_unique<async_decompile_task_entry>(std::move(m_decompile_queue.front()));
m_decompile_queue.pop_front();
}
}
m_decompile_queue.pop_front();
if (decompile_task->is_fp)
{
search_fragment_program(decompile_task->fp);
}
else
{
search_vertex_program(decompile_task->vp);
}
if (++count >= max_decompile_count)
{
@ -374,7 +401,6 @@ public:
break;
}
}
}
async_link_task_entry* link_entry;
pipeline_key key;
@ -511,15 +537,6 @@ public:
return __null_pipeline_handle;
}
size_t get_fragment_constants_buffer_size(const RSXFragmentProgram &fragmentShader) const
{
const auto I = m_fragment_shader_cache.find(fragmentShader);
if (I != m_fragment_shader_cache.end())
return I->second.FragmentConstantOffsetCache.size() * 4 * sizeof(float);
LOG_ERROR(RSX, "Can't retrieve constant offset cache");
return 0;
}
void fill_fragment_constants_buffer(gsl::span<f32> dst_buffer, const RSXFragmentProgram &fragment_program, bool sanitize = false) const
{
const auto I = m_fragment_shader_cache.find(fragment_program);

View file

@ -2,6 +2,7 @@
#include "Utilities/VirtualMemory.h"
#include "Utilities/hash.h"
#include "Utilities/File.h"
#include "Utilities/lockless.h"
#include "Emu/Memory/vm.h"
#include "gcm_enums.h"
#include "Common/ProgramStateCache.h"
@ -385,6 +386,8 @@ namespace rsx
template <typename pipeline_storage_type, typename backend_storage>
class shaders_cache
{
using unpacked_type = lf_fifo<std::tuple<pipeline_storage_type, RSXVertexProgram, RSXFragmentProgram>, 1000>; // TODO: Determine best size
struct pipeline_data
{
u64 vertex_program_hash;
@ -417,10 +420,131 @@ namespace rsx
std::string version_prefix;
std::string root_path;
std::string pipeline_class_name;
std::mutex fpd_mutex;
std::unordered_map<u64, std::vector<u8>> fragment_program_data;
backend_storage& m_storage;
std::string get_message(u32 index, u32 processed, u32 entry_count)
{
const char* text = index == 0 ? "Loading pipeline object %u of %u" : "Compiling pipeline object %u of %u";
return fmt::format(text, processed, entry_count);
};
void load_shaders(uint nb_workers, unpacked_type& unpacked, std::string& directory_path, std::vector<fs::dir_entry>& entries, u32 entry_count,
shader_loading_dialog* dlg)
{
atomic_t<u32> processed(0);
std::function<void(u32)> shader_load_worker = [&](u32 stop_at)
{
u32 pos;
while (((pos = processed++) < stop_at) && !Emu.IsStopped())
{
fs::dir_entry tmp = entries[pos];
const auto filename = directory_path + "/" + tmp.name;
std::vector<u8> bytes;
fs::file f(filename);
if (f.size() != sizeof(pipeline_data))
{
LOG_ERROR(RSX, "Removing cached pipeline object %s since it's not binary compatible with the current shader cache", tmp.name.c_str());
fs::remove_file(filename);
continue;
}
f.read<u8>(bytes, f.size());
auto entry = unpack(*reinterpret_cast<pipeline_data*>(bytes.data()));
m_storage.preload_programs(std::get<1>(entry), std::get<2>(entry));
unpacked[unpacked.push_begin()] = entry;
}
};
await_workers(nb_workers, 0, shader_load_worker, processed, entry_count, dlg);
}
template <typename... Args>
void compile_shaders(uint nb_workers, unpacked_type& unpacked, u32 entry_count, shader_loading_dialog* dlg, Args&&... args)
{
atomic_t<u32> processed(0);
std::function<void(u32)> shader_comp_worker = [&](u32 stop_at)
{
u32 pos;
while (((pos = processed++) < stop_at) && !Emu.IsStopped())
{
auto& entry = unpacked[pos];
m_storage.add_pipeline_entry(std::get<1>(entry), std::get<2>(entry), std::get<0>(entry), std::forward<Args>(args)...);
}
};
await_workers(nb_workers, 1, shader_comp_worker, processed, entry_count, dlg);
}
void await_workers(uint nb_workers, u8 step, std::function<void(u32)>& worker, atomic_t<u32>& processed, u32 entry_count, shader_loading_dialog* dlg)
{
u32 processed_since_last_update = 0;
if (nb_workers == 1)
{
std::chrono::time_point<steady_clock> last_update;
// Call the worker function directly, stoping it prematurely to be able update the screen
u8 inc = 10;
u32 stop_at;
do
{
stop_at = std::min(stop_at + inc, entry_count);
worker(stop_at);
// Only update the screen at about 10fps since updating it everytime slows down the process
std::chrono::time_point<steady_clock> now = std::chrono::steady_clock::now();
processed_since_last_update += inc;
if ((std::chrono::duration_cast<std::chrono::milliseconds>(now - last_update) > 100ms) || (stop_at == entry_count))
{
dlg->update_msg(step, get_message(step, stop_at, entry_count));
dlg->inc_value(step, processed_since_last_update);
last_update = now;
processed_since_last_update = 0;
}
} while (stop_at < entry_count && !Emu.IsStopped());
}
else
{
std::vector<std::thread> worker_threads(nb_workers);
// Start workers
for (u32 i = 0; i < nb_workers; i++)
{
worker_threads[i] = std::thread(worker, entry_count);
}
u32 current_progress = 0;
u32 last_update_progress = 0;
while ((current_progress < entry_count) && !Emu.IsStopped())
{
std::this_thread::sleep_for(100ms); // Around 10fps should be good enough
current_progress = std::min(processed.load(), entry_count);
processed_since_last_update = current_progress - last_update_progress;
last_update_progress = current_progress;
if (processed_since_last_update > 0)
{
dlg->update_msg(step, get_message(step, current_progress, entry_count));
dlg->inc_value(step, processed_since_last_update);
}
}
for (std::thread& worker_thread : worker_threads)
{
worker_thread.join();
}
}
}
public:
shaders_cache(backend_storage& storage, std::string pipeline_class, std::string version_prefix_str = "v1")
@ -471,9 +595,6 @@ namespace rsx
root.rewind();
// Invalid pipeline entries to be removed
std::vector<std::string> invalid_entries;
// Progress dialog
std::unique_ptr<shader_loading_dialog> fallback_dlg;
if (!dlg)
@ -482,134 +603,22 @@ namespace rsx
dlg = fallback_dlg.get();
}
const auto getMessage = [](u32 index, u32 processed, u32 entry_count) -> std::string
{
const char* text = index == 0 ? "Loading pipeline object %u of %u" : "Compiling pipeline object %u of %u";
return fmt::format(text, processed, entry_count);
};
dlg->create("Preloading cached shaders from disk.\nPlease wait...", "Shader Compilation");
dlg->set_limit(0, entry_count);
dlg->set_limit(1, entry_count);
dlg->update_msg(0, getMessage(0, 0, entry_count));
dlg->update_msg(1, getMessage(0, 0, entry_count));
// Setup worker threads
unsigned nb_threads = std::thread::hardware_concurrency();
std::vector<std::thread> worker_threads(nb_threads);
dlg->update_msg(0, get_message(0, 0, entry_count));
dlg->update_msg(1, get_message(1, 0, entry_count));
// Preload everything needed to compile the shaders
// Can probably be parallelized too, but since it's mostly reading files it's probably not worth it
std::vector<std::tuple<pipeline_storage_type, RSXVertexProgram, RSXFragmentProgram>> unpacked;
std::chrono::time_point<steady_clock> last_update;
u32 processed_since_last_update = 0;
unpacked_type unpacked;
uint nb_workers = g_cfg.video.renderer == video_renderer::vulkan ? std::thread::hardware_concurrency() : 1;
for (u32 i = 0; (i < entry_count) && !Emu.IsStopped(); i++)
{
fs::dir_entry tmp = entries[i];
const auto filename = directory_path + "/" + tmp.name;
std::vector<u8> bytes;
fs::file f(filename);
if (f.size() != sizeof(pipeline_data))
{
LOG_ERROR(RSX, "Cached pipeline object %s is not binary compatible with the current shader cache", tmp.name.c_str());
invalid_entries.push_back(filename);
continue;
}
f.read<u8>(bytes, f.size());
auto entry = unpack(*reinterpret_cast<pipeline_data*>(bytes.data()));
m_storage.preload_programs(std::get<1>(entry), std::get<2>(entry));
unpacked.push_back(entry);
// Only update the screen at about 10fps since updating it everytime slows down the process
std::chrono::time_point<steady_clock> now = std::chrono::steady_clock::now();
processed_since_last_update++;
if ((std::chrono::duration_cast<std::chrono::milliseconds>(now - last_update) > 100ms) || (i == entry_count - 1))
{
dlg->update_msg(0, getMessage(0, i + 1, entry_count));
dlg->inc_value(0, processed_since_last_update);
last_update = now;
processed_since_last_update = 0;
}
}
load_shaders(nb_workers, unpacked, directory_path, entries, entry_count, dlg);
// Account for any invalid entries
entry_count = u32(unpacked.size());
entry_count = unpacked.size();
atomic_t<u32> processed(0);
std::function<void(u32)> shader_comp_worker = [&](u32 index)
{
u32 pos;
while (((pos = processed++) < entry_count) && !Emu.IsStopped())
{
auto& entry = unpacked[pos];
m_storage.add_pipeline_entry(std::get<1>(entry), std::get<2>(entry), std::get<0>(entry), std::forward<Args>(args)...);
}
};
if (g_cfg.video.renderer == video_renderer::vulkan)
{
// Start workers
for (u32 i = 0; i < nb_threads; i++)
{
worker_threads[i] = std::thread(shader_comp_worker, i);
}
// Wait for the workers to finish their task while updating UI
u32 current_progress = 0;
u32 last_update_progress = 0;
while ((current_progress < entry_count) && !Emu.IsStopped())
{
std::this_thread::sleep_for(100ms); // Around 10fps should be good enough
current_progress = std::min(processed.load(), entry_count);
processed_since_last_update = current_progress - last_update_progress;
last_update_progress = current_progress;
if (processed_since_last_update > 0)
{
dlg->update_msg(1, getMessage(0, current_progress, entry_count));
dlg->inc_value(1, processed_since_last_update);
}
}
// Need to join the threads to be absolutely sure shader compilation is done.
for (std::thread& worker_thread : worker_threads)
worker_thread.join();
}
else
{
u32 pos;
while (((pos = processed++) < entry_count) && !Emu.IsStopped())
{
auto& entry = unpacked[pos];
m_storage.add_pipeline_entry(std::get<1>(entry), std::get<2>(entry), std::get<0>(entry), std::forward<Args>(args)...);
// Update screen at about 10fps
std::chrono::time_point<steady_clock> now = std::chrono::steady_clock::now();
processed_since_last_update++;
if ((std::chrono::duration_cast<std::chrono::milliseconds>(now - last_update) > 100ms) || (pos == entry_count - 1))
{
dlg->update_msg(1, getMessage(0, pos + 1, entry_count));
dlg->inc_value(1, processed_since_last_update);
last_update = now;
processed_since_last_update = 0;
}
}
}
if (!invalid_entries.empty())
{
for (const auto &filename : invalid_entries)
{
fs::remove_file(filename);
}
LOG_NOTICE(RSX, "shader cache: %d entries were marked as invalid and removed", invalid_entries.size());
}
compile_shaders(nb_workers, unpacked, entry_count, dlg, std::forward<Args>(args)...);
dlg->refresh();
dlg->close();
@ -686,8 +695,11 @@ namespace rsx
f.read<u8>(data, f.size());
RSXFragmentProgram fp = {};
{
std::lock_guard<std::mutex> lock(fpd_mutex);
fragment_program_data[program_hash] = data;
fp.addr = fragment_program_data[program_hash].data();
}
fp.ucode_length = ::size32(data);
return fp;