diff --git a/rpcs3/Emu/RSX/RSXThread.cpp b/rpcs3/Emu/RSX/RSXThread.cpp index 09e8208af2..15536d5790 100644 --- a/rpcs3/Emu/RSX/RSXThread.cpp +++ b/rpcs3/Emu/RSX/RSXThread.cpp @@ -537,15 +537,18 @@ namespace rsx m_vblank_thread.reset(); } - if (m_vertex_streaming_task.processing_threads.size() > 0) + if (m_vertex_streaming_task.available_threads > 0) { - for (auto &thr : m_vertex_streaming_task.processing_threads) + for (auto &task : m_vertex_streaming_task.worker_threads) { - thr->join(); - thr.reset(); + if (!task.worker_thread) + break; + + task.worker_thread->join(); + task.worker_thread.reset(); } - m_vertex_streaming_task.processing_threads.resize(0); + m_vertex_streaming_task.available_threads = 0; } } @@ -1131,7 +1134,8 @@ namespace rsx } } - void thread::post_vertex_stream_to_upload(gsl::span src, gsl::span dst, rsx::vertex_base_type type, u32 vector_element_count, u32 attribute_src_stride, u8 dst_stride, std::function callback) + void thread::post_vertex_stream_to_upload(gsl::span src, gsl::span dst, rsx::vertex_base_type type, u32 vector_element_count, + u32 attribute_src_stride, u8 dst_stride, u32 vertex_count, std::function callback) { upload_stream_packet packet; packet.dst_span = dst; @@ -1141,92 +1145,95 @@ namespace rsx packet.dst_stride = dst_stride; packet.vector_width = vector_element_count; packet.post_upload_func = callback; + packet.vertex_count = vertex_count; - m_vertex_streaming_task.packets.push_back(packet); - } - - void thread::start_vertex_upload_task(u32 vertex_count) - { - if (m_vertex_streaming_task.processing_threads.size() == 0) + if (m_vertex_streaming_task.available_threads == 0) { const u32 streaming_thread_count = (u32)g_cfg.video.vertex_upload_threads; - m_vertex_streaming_task.processing_threads.resize(streaming_thread_count); + m_vertex_streaming_task.available_threads = streaming_thread_count; for (u32 n = 0; n < streaming_thread_count; ++n) { - thread_ctrl::spawn(m_vertex_streaming_task.processing_threads[n], "Vertex Stream " + std::to_string(n), [this, n]() + thread_ctrl::spawn(m_vertex_streaming_task.worker_threads[n].worker_thread, "Vertex Stream " + std::to_string(n), [this, n]() { - auto &task = m_vertex_streaming_task; + auto &owner = m_vertex_streaming_task; + auto &task = m_vertex_streaming_task.worker_threads[n]; const u32 index = n; while (!Emu.IsStopped()) { - if (task.remaining_packets != 0) + if (task.thread_status.load(std::memory_order_consume) != 0) { - //Wait for me! - task.ready_threads--; - - const size_t step = task.processing_threads.size(); - const size_t job_count = task.packets.size(); - //Process every nth packet - - size_t current_job = index; - - while (true) + for (auto &packet: task.packets) { - if (current_job >= job_count) - break; - - auto &packet = task.packets[current_job]; - - write_vertex_array_data_to_buffer(packet.dst_span, packet.src_span, task.vertex_count, packet.type, packet.vector_width, packet.src_stride, packet.dst_stride); + write_vertex_array_data_to_buffer(packet.dst_span, packet.src_span, packet.vertex_count, packet.type, packet.vector_width, packet.src_stride, packet.dst_stride); if (packet.post_upload_func) - packet.post_upload_func(packet.dst_span.data(), packet.type, (u8)packet.vector_width, task.vertex_count); + packet.post_upload_func(packet.dst_span.data(), packet.type, (u8)packet.vector_width, packet.vertex_count); - task.remaining_packets--; - current_job += step; - _mm_sfence(); + owner.remaining_tasks--; } - _mm_mfence(); - - while (task.remaining_packets > 0 && !Emu.IsStopped()) - { - std::this_thread::yield(); - _mm_lfence(); - } - - task.ready_threads++; + task.packets.resize(0); + task.thread_status.store(0); _mm_sfence(); } - else - { - std::this_thread::yield(); - } + + std::this_thread::yield(); } }); } } - while (m_vertex_streaming_task.ready_threads != 0 && !Emu.IsStopped()) + //Increment job counter.. + m_vertex_streaming_task.remaining_tasks++; + + //Assign this packet to a thread + //Simple round robin based on first available thread + upload_stream_worker *best_fit = nullptr; + for (auto &worker : m_vertex_streaming_task.worker_threads) { - _mm_pause(); + if (!worker.worker_thread) + break; + + if (worker.thread_status.load(std::memory_order_consume) == 0) + { + if (worker.packets.size() == 0) + { + worker.packets.push_back(packet); + return; + } + + if (best_fit == nullptr) + best_fit = &worker; + else if (best_fit->packets.size() > worker.packets.size()) + best_fit = &worker; + } } - m_vertex_streaming_task.vertex_count = vertex_count; - m_vertex_streaming_task.ready_threads = 0; - m_vertex_streaming_task.remaining_packets = (int)m_vertex_streaming_task.packets.size(); + best_fit->packets.push_back(packet); + } + + void thread::start_vertex_upload_task() + { + for (auto &worker : m_vertex_streaming_task.worker_threads) + { + if (!worker.worker_thread) + break; + + if (worker.thread_status.load(std::memory_order_consume) == 0 && worker.packets.size() > 0) + { + worker.thread_status.store(1); + } + } } void thread::wait_for_vertex_upload_task() { - while (m_vertex_streaming_task.remaining_packets > 0 && !Emu.IsStopped()) + while (m_vertex_streaming_task.remaining_tasks.load(std::memory_order_consume) != 0 && !Emu.IsStopped()) { _mm_pause(); } - - m_vertex_streaming_task.packets.resize(0); } bool thread::vertex_upload_task_ready() @@ -1234,7 +1241,12 @@ namespace rsx if (g_cfg.video.vertex_upload_threads < 2) return false; - return (m_vertex_streaming_task.remaining_packets == 0 && m_vertex_streaming_task.ready_threads == 0); + //Not initialized + if (m_vertex_streaming_task.available_threads == 0) + return true; + + //At least two threads are available + return (m_vertex_streaming_task.remaining_tasks < (m_vertex_streaming_task.available_threads - 1)); } void thread::flip(int buffer) diff --git a/rpcs3/Emu/RSX/RSXThread.h b/rpcs3/Emu/RSX/RSXThread.h index 86257d72af..f2963640ac 100644 --- a/rpcs3/Emu/RSX/RSXThread.h +++ b/rpcs3/Emu/RSX/RSXThread.h @@ -244,25 +244,30 @@ namespace rsx gsl::span dst_span; rsx::vertex_base_type type; u32 vector_width; + u32 vertex_count; u32 src_stride; u8 dst_stride; }; + struct upload_stream_worker + { + std::shared_ptr worker_thread; + std::vector packets; + std::atomic thread_status = { 0 }; + }; + struct upload_stream_task { - std::vector packets; - std::atomic remaining_packets = { 0 }; - std::atomic ready_threads = { 0 }; - std::atomic vertex_count; - - std::vector> processing_threads; + std::array worker_threads; + int available_threads = 0; + std::atomic remaining_tasks = {0}; }; upload_stream_task m_vertex_streaming_task; void post_vertex_stream_to_upload(gsl::span src, gsl::span dst, rsx::vertex_base_type type, - u32 vector_element_count, u32 attribute_src_stride, u8 dst_stride, + u32 vector_element_count, u32 attribute_src_stride, u8 dst_stride, u32 vertex_count, std::function callback); - void start_vertex_upload_task(u32 vertex_count); + void start_vertex_upload_task(); void wait_for_vertex_upload_task(); bool vertex_upload_task_ready(); diff --git a/rpcs3/Emu/RSX/VK/VKVertexBuffers.cpp b/rpcs3/Emu/RSX/VK/VKVertexBuffers.cpp index 8c89beb2de..c15b1980ec 100644 --- a/rpcs3/Emu/RSX/VK/VKVertexBuffers.cpp +++ b/rpcs3/Emu/RSX/VK/VKVertexBuffers.cpp @@ -561,13 +561,13 @@ namespace const u32 real_element_size = vk::get_suitable_vk_size(vertex_array.type, vertex_array.attribute_size); gsl::span dest_span(dst + (memory_allocations[n] - offset_base), allocated_sizes[n]); - rsxthr->post_vertex_stream_to_upload(vertex_array.data, dest_span, vertex_array.type, vertex_array.attribute_size, vertex_array.stride, real_element_size, vk::prepare_buffer_for_writing); + rsxthr->post_vertex_stream_to_upload(vertex_array.data, dest_span, vertex_array.type, vertex_array.attribute_size, vertex_array.stride, real_element_size, vertex_count, vk::prepare_buffer_for_writing); space_remaining -= allocated_sizes[n]; n++; } - rsxthr->start_vertex_upload_task(vertex_count); + rsxthr->start_vertex_upload_task(); } } }