cellAtracXdec: use lv2 mutexes + conds

This commit is contained in:
capriots 2025-01-12 12:25:06 +01:00 committed by Elad
parent d1ccadbac2
commit ae670c35f7
2 changed files with 192 additions and 64 deletions

View file

@ -270,35 +270,75 @@ void AtracXdecContext::exec(ppu_thread& ppu)
decoder.init_avcodec();
}
switch (savestate)
{
case atracxdec_state::initial: break;
case atracxdec_state::waiting_for_cmd: goto label1_wait_for_cmd_state;
case atracxdec_state::checking_run_thread_1: goto label2_check_run_thread_1_state;
case atracxdec_state::executing_cmd: goto label3_execute_cmd_state;
case atracxdec_state::waiting_for_output: goto label4_wait_for_output_state;
case atracxdec_state::checking_run_thread_2: goto label5_check_run_thread_2_state;
case atracxdec_state::decoding: goto label6_decode_state;
}
for (;;cmd_counter++)
{
cellAtracXdec.trace("Command counter: %llu, waiting for next command...", cmd_counter);
if (!skip_getting_command)
for (;;)
{
lv2_obj::sleep(ppu);
std::lock_guard lock{queue_mutex};
savestate = atracxdec_state::initial;
while (cmd_queue.empty() && !ppu.is_stopped())
{
lv2_obj::sleep(ppu);
queue_not_empty.wait(queue_mutex, 20000);
}
ensure(sys_mutex_lock(ppu, queue_mutex, 0) == CELL_OK);
if (ppu.is_stopped())
{
ppu.state += cpu_flag::again;
return;
}
cmd_queue.pop(cmd);
if (!run_thread)
if (ppu.state & cpu_flag::again)
{
return;
}
if (!cmd_queue.empty())
{
break;
}
savestate = atracxdec_state::waiting_for_cmd;
label1_wait_for_cmd_state:
ensure(sys_cond_wait(ppu, queue_not_empty, 0) == CELL_OK);
if (ppu.state & cpu_flag::again)
{
return;
}
ensure(sys_mutex_unlock(ppu, queue_mutex) == CELL_OK);
}
cmd_queue.pop(cmd);
ensure(sys_mutex_unlock(ppu, queue_mutex) == CELL_OK);
savestate = atracxdec_state::checking_run_thread_1;
label2_check_run_thread_1_state:
ensure(sys_mutex_lock(ppu, run_thread_mutex, 0) == CELL_OK);
if (ppu.state & cpu_flag::again)
{
return;
}
if (!run_thread)
{
ensure(sys_mutex_unlock(ppu, run_thread_mutex) == CELL_OK);
return;
}
ensure(sys_mutex_unlock(ppu, run_thread_mutex) == CELL_OK);
savestate = atracxdec_state::executing_cmd;
label3_execute_cmd_state:
cellAtracXdec.trace("Command type: %d", static_cast<u32>(cmd.type.get()));
switch (cmd.type)
@ -327,8 +367,6 @@ void AtracXdecContext::exec(ppu_thread& ppu)
}
case AtracXdecCmdType::end_seq:
{
skip_getting_command = true;
// Block savestate creation during callbacks
std::unique_lock savestate_lock{g_fxo->get<hle_locks_t>(), std::try_to_lock};
@ -338,41 +376,59 @@ void AtracXdecContext::exec(ppu_thread& ppu)
return;
}
skip_getting_command = false;
// Doesn't do anything else
notify_seq_done.cbFunc(ppu, notify_seq_done.cbArg);
break;
}
case AtracXdecCmdType::decode_au:
{
skip_getting_command = true;
ensure(!!cmd.au_start_addr); // Not checked on LLE
cellAtracXdec.trace("Waiting for output to be consumed...");
lv2_obj::sleep(ppu);
std::unique_lock output_mutex_lock{output_mutex};
ensure(sys_mutex_lock(ppu, output_mutex, 0) == CELL_OK);
while (output_locked && !ppu.is_stopped())
if (ppu.state & cpu_flag::again)
{
lv2_obj::sleep(ppu);
output_consumed.wait(output_mutex, 20000);
return;
}
if (ppu.is_stopped())
while (output_locked)
{
savestate = atracxdec_state::waiting_for_output;
label4_wait_for_output_state:
ensure(sys_cond_wait(ppu, output_consumed, 0) == CELL_OK);
if (ppu.state & cpu_flag::again)
{
return;
}
}
cellAtracXdec.trace("Output consumed");
savestate = atracxdec_state::checking_run_thread_2;
label5_check_run_thread_2_state:
ensure(sys_mutex_lock(ppu, run_thread_mutex, 0) == CELL_OK);
if (ppu.state & cpu_flag::again)
{
ppu.state += cpu_flag::again;
return;
}
if (!run_thread)
{
ensure(sys_mutex_unlock(ppu, run_thread_mutex) == CELL_OK);
ensure(sys_mutex_unlock(ppu, output_mutex) == CELL_OK);
return;
}
cellAtracXdec.trace("Output consumed");
ensure(sys_mutex_unlock(ppu, run_thread_mutex) == CELL_OK);
savestate = atracxdec_state::decoding;
label6_decode_state:
u32 error = CELL_OK;
@ -578,14 +634,12 @@ void AtracXdecContext::exec(ppu_thread& ppu)
return;
}
skip_getting_command = false;
// au_done and pcm_out callbacks are always called after a decode command, even if an error occurred
// The output always has to be consumed as well
notify_au_done.cbFunc(ppu, cmd.pcm_handle, notify_au_done.cbArg);
output_locked = true;
output_mutex_lock.unlock();
ensure(sys_mutex_unlock(ppu, output_mutex) == CELL_OK);
const u32 output_size = decoded_samples_num * (decoder.bw_pcm & 0x7fu) * decoder.nch_out;
@ -614,29 +668,46 @@ void AtracXdecContext::exec(ppu_thread& ppu)
template <AtracXdecCmdType type>
error_code AtracXdecContext::send_command(ppu_thread& ppu, auto&&... args)
{
ppu.state += cpu_flag::wait;
auto& savestate = *ppu.optional_savestate_state;
const bool signal = savestate.try_read<bool>().second;
savestate.clear();
if (!signal)
{
std::lock_guard lock{queue_mutex};
ensure(sys_mutex_lock(ppu, queue_mutex, 0) == CELL_OK);
if (ppu.state & cpu_flag::again)
{
return {};
}
if constexpr (type == AtracXdecCmdType::close)
{
// Close command is only sent if the queue is empty on LLE
if (!cmd_queue.empty())
{
ensure(sys_mutex_unlock(ppu, queue_mutex) == CELL_OK);
return {};
}
}
if (cmd_queue.full())
{
ensure(sys_mutex_unlock(ppu, queue_mutex) == CELL_OK);
return CELL_ADEC_ERROR_ATX_BUSY;
}
cmd_queue.emplace(std::forward<AtracXdecCmdType>(type), std::forward<decltype(args)>(args)...);
ensure(sys_mutex_unlock(ppu, queue_mutex) == CELL_OK);
}
queue_not_empty.notify_one();
ensure(sys_cond_signal(ppu, queue_not_empty) == CELL_OK);
if (ppu.state & cpu_flag::again)
{
savestate(true);
}
return CELL_OK;
}
@ -699,6 +770,29 @@ error_code _CellAdecCoreOpOpenExt_atracx(ppu_thread& ppu, vm::ptr<AtracXdecConte
write_to_ptr(handle.get_ptr(), AtracXdecContext(notifyAuDone, notifyAuDoneArg, notifyPcmOut, notifyPcmOutArg, notifyError, notifyErrorArg, notifySeqDone, notifySeqDoneArg,
vm::bptr<u8>::make(handle.addr() + utils::align(static_cast<u32>(sizeof(AtracXdecContext)), 0x80) + ATXDEC_SPURS_STRUCTS_SIZE)));
const vm::var<sys_mutex_attribute_t> mutex_attr{{ SYS_SYNC_PRIORITY, SYS_SYNC_NOT_RECURSIVE, SYS_SYNC_NOT_PROCESS_SHARED, SYS_SYNC_NOT_ADAPTIVE, 0, 0, 0, { "_atd001"_u64 } }};
const vm::var<sys_cond_attribute_t> cond_attr{{ SYS_SYNC_NOT_PROCESS_SHARED, 0, 0, { "_atd002"_u64 } }};
ensure(sys_mutex_create(ppu, handle.ptr(&AtracXdecContext::queue_mutex), mutex_attr) == CELL_OK);
ensure(sys_cond_create(ppu, handle.ptr(&AtracXdecContext::queue_not_empty), handle->queue_mutex, cond_attr) == CELL_OK);
mutex_attr->name_u64 = "_atd003"_u64;
cond_attr->name_u64 = "_atd004"_u64;
ensure(sys_mutex_create(ppu, handle.ptr(&AtracXdecContext::run_thread_mutex), mutex_attr) == CELL_OK);
ensure(sys_cond_create(ppu, handle.ptr(&AtracXdecContext::run_thread_cond), handle->run_thread_mutex, cond_attr) == CELL_OK);
mutex_attr->name_u64 = "_atd005"_u64;
cond_attr->name_u64 = "_atd006"_u64;
ensure(sys_mutex_create(ppu, handle.ptr(&AtracXdecContext::output_mutex), mutex_attr) == CELL_OK);
ensure(sys_cond_create(ppu, handle.ptr(&AtracXdecContext::output_consumed), handle->output_mutex, cond_attr) == CELL_OK);
ensure(sys_mutex_lock(ppu, handle->output_mutex, 0) == CELL_OK);
handle->output_locked = false;
ensure(sys_cond_signal(ppu, handle->output_consumed) == CELL_OK);
ensure(sys_mutex_unlock(ppu, handle->output_mutex) == CELL_OK);
const vm::var<char[]> _name = vm::make_str("HLE ATRAC3plus decoder");
const auto entry = g_fxo->get<ppu_function_manager>().func_addr(FIND_FUNC(atracXdecEntry));
ppu_execute<&sys_ppu_thread_create>(ppu, handle.ptr(&AtracXdecContext::thread_id), entry, handle.addr(), +res->ppuThreadPriority, +res->ppuThreadStackSize, SYS_PPU_THREAD_CREATE_JOINABLE, +_name);
@ -725,29 +819,32 @@ error_code _CellAdecCoreOpClose_atracx(ppu_thread& ppu, vm::ptr<AtracXdecContext
return {};
}
ppu.state += cpu_flag::wait;
cellAtracXdec.notice("_CellAdecCoreOpClose_atracx(handle=*0x%x)", handle);
ensure(!!handle); // Not checked on LLE
ensure(sys_mutex_lock(ppu, handle->run_thread_mutex, 0) == CELL_OK);
handle->run_thread = false;
ensure(sys_mutex_unlock(ppu, handle->run_thread_mutex) == CELL_OK);
handle->send_command<AtracXdecCmdType::close>(ppu);
{
std::lock_guard lock{handle->output_mutex};
handle->output_locked = false;
}
ensure(sys_mutex_lock(ppu, handle->output_mutex, 0) == CELL_OK);
handle->output_locked = false;
ensure(sys_mutex_unlock(ppu, handle->output_mutex) == CELL_OK);
ensure(sys_cond_signal(ppu, handle->output_consumed) == CELL_OK);
handle->output_consumed.notify_one();
vm::var<u64> thread_ret;
ensure(sys_ppu_thread_join(ppu, static_cast<u32>(handle->thread_id), +thread_ret) == CELL_OK);
if (vm::var<u64> ret; sys_ppu_thread_join(ppu, static_cast<u32>(handle->thread_id), +ret) != CELL_OK)
{
// Other thread already closed the decoder
return CELL_ADEC_ERROR_FATAL;
}
error_code ret = sys_cond_destroy(ppu, handle->queue_not_empty);
ret = ret ? ret : sys_cond_destroy(ppu, handle->run_thread_cond);
ret = ret ? ret : sys_cond_destroy(ppu, handle->output_consumed);
ret = ret ? ret : sys_mutex_destroy(ppu, handle->queue_mutex);
ret = ret ? ret : sys_mutex_destroy(ppu, handle->run_thread_mutex);
ret = ret ? ret : sys_mutex_destroy(ppu, handle->output_mutex);
return CELL_OK;
return ret != CELL_OK ? static_cast<error_code>(CELL_ADEC_ERROR_FATAL) : CELL_OK;
}
error_code _CellAdecCoreOpStartSeq_atracx(ppu_thread& ppu, vm::ptr<AtracXdecContext> handle, vm::cptr<CellAdecParamAtracX> atracxParam)
@ -808,15 +905,35 @@ error_code _CellAdecCoreOpRealign_atracx(vm::ptr<AtracXdecContext> handle, vm::p
error_code _CellAdecCoreOpReleasePcm_atracx(ppu_thread& ppu, vm::ptr<AtracXdecContext> handle, s32 pcmHandle, vm::cptr<void> outBuffer)
{
ppu.state += cpu_flag::wait;
cellAtracXdec.trace("_CellAdecCoreOpReleasePcm_atracx(handle=*0x%x, pcmHandle=%d, outBuffer=*0x%x)", handle, pcmHandle, outBuffer);
ensure(!!handle); // Not checked on LLE
std::lock_guard lock{handle->output_mutex};
handle->output_locked = false;
handle->output_consumed.notify_one();
auto& savestate = *ppu.optional_savestate_state;
const bool signal = savestate.try_read<bool>().second;
savestate.clear();
if (!signal)
{
ensure(sys_mutex_lock(ppu, handle->output_mutex, 0) == CELL_OK);
if (ppu.state & cpu_flag::again)
{
return {};
}
handle->output_locked = false;
}
ensure(sys_cond_signal(ppu, handle->output_consumed) == CELL_OK);
if (ppu.state & cpu_flag::again)
{
savestate(true);
return {};
}
ensure(sys_mutex_unlock(ppu, handle->output_mutex) == CELL_OK);
return CELL_OK;
}

View file

@ -20,7 +20,6 @@ constexpr int averror_eof = AVERROR_EOF; // Workaround for old-style-cast error
#pragma GCC diagnostic pop
#endif
#include "Utilities/cond.h"
#include "cellPamf.h"
#include "cellAdec.h"
@ -215,16 +214,28 @@ struct AtracXdecDecoder
CHECK_SIZE(AtracXdecDecoder, 0xa8);
// HLE exclusive, for savestates
enum class atracxdec_state : u8
{
initial,
waiting_for_cmd,
checking_run_thread_1,
executing_cmd,
waiting_for_output,
checking_run_thread_2,
decoding
};
struct AtracXdecContext
{
be_t<u64> thread_id; // sys_ppu_thread_t
shared_mutex queue_mutex; // sys_mutex_t
cond_variable queue_not_empty; // sys_cond_t
be_t<u32> queue_mutex; // sys_mutex_t
be_t<u32> queue_not_empty; // sys_cond_t
AdecCmdQueue<AtracXdecCmd> cmd_queue;
shared_mutex output_mutex; // sys_mutex_t
cond_variable output_consumed; // sys_cond_t
be_t<u32> output_mutex; // sys_mutex_t
be_t<u32> output_consumed; // sys_cond_t
be_t<u32> output_locked = false;
be_t<u32> run_thread_mutex; // sys_mutex_t
@ -239,10 +250,10 @@ struct AtracXdecContext
const vm::bptr<u8> work_mem;
// HLE exclusive
u64 cmd_counter = 0; // For debugging
AtracXdecCmd cmd; // For savestates; if savestate was created while processing a decode command, we need to save the current command
b8 skip_getting_command = false; // For savestates; skips getting a new command from the queue
b8 skip_next_frame; // Needed to emulate behavior of LLE SPU program, it doesn't output the first frame after a sequence reset or error
u64 cmd_counter = 0; // For debugging
AtracXdecCmd cmd; // For savestates; if savestate was created while processing a decode command, we need to save the current command
atracxdec_state savestate{}; // For savestates
b8 skip_next_frame; // Needed to emulate behavior of LLE SPU program, it doesn't output the first frame after a sequence reset or error
u8 spurs_stuff[58]; // 120 bytes on LLE, pointers to CellSpurs, CellSpursTaskset, etc.