diff --git a/rpcs3/Emu/CPU/CPUThread.cpp b/rpcs3/Emu/CPU/CPUThread.cpp index 9237577253..9710b85c5d 100644 --- a/rpcs3/Emu/CPU/CPUThread.cpp +++ b/rpcs3/Emu/CPU/CPUThread.cpp @@ -73,8 +73,6 @@ void CPUThread::DumpInformation() { return{}; } - - return "unknown function"; } case CPU_THREAD_SPU: diff --git a/rpcs3/Emu/SysCalls/Callback.cpp b/rpcs3/Emu/SysCalls/Callback.cpp index 02c460d975..b8e319ac73 100644 --- a/rpcs3/Emu/SysCalls/Callback.cpp +++ b/rpcs3/Emu/SysCalls/Callback.cpp @@ -9,31 +9,36 @@ void CallbackManager::Register(const std::function& func) { - std::lock_guard lock(m_mutex); - - m_cb_list.push_back([=](CPUThread& CPU) -> s32 { - assert(CPU.GetType() == CPU_THREAD_PPU); - return func(static_cast(CPU)); - }); + std::lock_guard lock(m_mutex); + + m_cb_list.push_back([=](CPUThread& CPU) -> s32 + { + assert(CPU.GetType() == CPU_THREAD_PPU); + return func(static_cast(CPU)); + }); + } } void CallbackManager::Async(const std::function& func) { - std::lock_guard lock(m_mutex); - - m_async_list.push_back([=](CPUThread& CPU) { - assert(CPU.GetType() == CPU_THREAD_PPU); - func(static_cast(CPU)); - }); + std::lock_guard lock(m_mutex); - m_cb_thread->Notify(); + m_async_list.push_back([=](CPUThread& CPU) + { + assert(CPU.GetType() == CPU_THREAD_PPU); + func(static_cast(CPU)); + }); + } + + m_cv.notify_one(); } bool CallbackManager::Check(CPUThread& CPU, s32& result) { std::function func; + { std::lock_guard lock(m_mutex); @@ -44,15 +49,7 @@ bool CallbackManager::Check(CPUThread& CPU, s32& result) } } - if (func) - { - result = func(CPU); - return true; - } - else - { - return false; - } + return func ? result = func(CPU), true : false; } void CallbackManager::Init() @@ -86,26 +83,27 @@ void CallbackManager::Init() { SetCurrentNamedThread(&*m_cb_thread); + std::unique_lock lock(m_mutex); + while (!Emu.IsStopped()) { std::function func; - { - std::lock_guard lock(m_mutex); - if (m_async_list.size()) - { - func = m_async_list[0]; - m_async_list.erase(m_async_list.begin()); - } + if (m_async_list.size()) + { + func = m_async_list[0]; + m_async_list.erase(m_async_list.begin()); } if (func) { + lock.unlock(); func(*m_cb_thread); + lock.lock(); continue; } - m_cb_thread->WaitForAnySignal(); + m_cv.wait_for(lock, std::chrono::milliseconds(1)); } }); } diff --git a/rpcs3/Emu/SysCalls/Callback.h b/rpcs3/Emu/SysCalls/Callback.h index a1250a8acb..2b1cfe59d8 100644 --- a/rpcs3/Emu/SysCalls/Callback.h +++ b/rpcs3/Emu/SysCalls/Callback.h @@ -8,6 +8,8 @@ typedef void(PauseResumeCB)(bool is_paused); class CallbackManager { std::mutex m_mutex; + std::condition_variable m_cv; + std::vector> m_cb_list; std::vector> m_async_list; std::shared_ptr m_cb_thread; diff --git a/rpcs3/Emu/SysCalls/Modules/cellFs.cpp b/rpcs3/Emu/SysCalls/Modules/cellFs.cpp index ab5c8ad85c..ff605e661c 100644 --- a/rpcs3/Emu/SysCalls/Modules/cellFs.cpp +++ b/rpcs3/Emu/SysCalls/Modules/cellFs.cpp @@ -15,31 +15,6 @@ extern Module cellFs; -struct CellFsDirectoryEntry -{ - CellFsStat attribute; - CellFsDirent entry_name; -}; - -struct FsRingBufferConfig -{ - CellFsRingBuffer m_ring_buffer; // Currently unused after assignment - u32 m_buffer; - u64 m_fs_status; - u64 m_regid; - u32 m_alloc_mem_size; - u32 m_current_addr; - - FsRingBufferConfig() - : m_fs_status(CELL_FS_ST_NOT_INITIALIZED) - , m_regid(0) - , m_alloc_mem_size(0) - , m_current_addr(0) - , m_ring_buffer() { } - -} fs_config; - - s32 cellFsOpen(vm::ptr path, s32 flags, vm::ptr fd, vm::ptr arg, u64 size) { cellFs.Warning("cellFsOpen(path=*0x%x, flags=%#o, fd=*0x%x, arg=*0x%x, size=0x%llx) -> sys_fs_open()", path, flags, fd, arg, size); @@ -280,6 +255,8 @@ s32 cellFsReadWithOffset(u32 fd, u64 offset, vm::ptr buf, u64 buffer_size, return CELL_FS_EBADF; } + std::lock_guard lock(file->mutex); + const auto old_position = file->file->Tell(); file->file->Seek(offset); @@ -309,6 +286,8 @@ s32 cellFsWriteWithOffset(u32 fd, u64 offset, vm::ptr buf, u64 data_ return CELL_FS_EBADF; } + std::lock_guard lock(file->mutex); + const auto old_position = file->file->Tell(); file->file->Seek(offset); @@ -325,27 +304,51 @@ s32 cellFsWriteWithOffset(u32 fd, u64 offset, vm::ptr buf, u64 data_ return CELL_OK; } -s32 cellFsStReadInit(u32 fd, vm::ptr ringbuf) +s32 cellFsStReadInit(u32 fd, vm::ptr ringbuf) { cellFs.Warning("cellFsStReadInit(fd=0x%x, ringbuf=*0x%x)", fd, ringbuf); + if (ringbuf->copy.data() & ~se32(CELL_FS_ST_COPYLESS)) + { + return CELL_FS_EINVAL; + } + + if (ringbuf->block_size.data() & se64(0xfff)) // check if a multiple of sector size + { + return CELL_FS_EINVAL; + } + + if (ringbuf->ringbuf_size % ringbuf->block_size) // check if a multiple of block_size + { + return CELL_FS_EINVAL; + } + std::shared_ptr file; + if (!Emu.GetIdManager().GetIDData(fd, file)) - return CELL_ESRCH; + { + return CELL_FS_EBADF; + } + + if (file->flags & CELL_FS_O_WRONLY) + { + return CELL_FS_EPERM; + } - fs_config.m_ring_buffer = *ringbuf; + if (!file->st_status.compare_and_swap_test(SSS_NOT_INITIALIZED, SSS_INITIALIZED)) + { + return CELL_FS_EBUSY; + } - // If the size is less than 1MB - if (ringbuf->ringbuf_size < 0x40000000) - fs_config.m_alloc_mem_size = (((u32)ringbuf->ringbuf_size + 64 * 1024 - 1) / (64 * 1024)) * (64 * 1024); - else - fs_config.m_alloc_mem_size = (((u32)ringbuf->ringbuf_size + 1024 * 1024 - 1) / (1024 * 1024)) * (1024 * 1024); + file->st_ringbuf_size = ringbuf->ringbuf_size; + file->st_block_size = ringbuf->ringbuf_size; + file->st_trans_rate = ringbuf->transfer_rate; + file->st_copyless = ringbuf->copy.data() == se32(CELL_FS_ST_COPYLESS); - // alloc memory - fs_config.m_buffer = (u32)Memory.Alloc(fs_config.m_alloc_mem_size, 1024); - memset(vm::get_ptr(fs_config.m_buffer), 0, fs_config.m_alloc_mem_size); + const u64 alloc_size = align(file->st_ringbuf_size, file->st_ringbuf_size < 1024 * 1024 ? 64 * 1024 : 1024 * 1024); - fs_config.m_fs_status = CELL_FS_ST_INITIALIZED; + file->st_buffer = vm::alloc(vm::cast(alloc_size, "alloc_size")); + file->st_read = 0; return CELL_OK; } @@ -355,11 +358,18 @@ s32 cellFsStReadFinish(u32 fd) cellFs.Warning("cellFsStReadFinish(fd=0x%x)", fd); std::shared_ptr file; - if (!Emu.GetIdManager().GetIDData(fd, file)) - return CELL_ESRCH; - Memory.Free(fs_config.m_buffer); - fs_config.m_fs_status = CELL_FS_ST_NOT_INITIALIZED; + if (!Emu.GetIdManager().GetIDData(fd, file)) + { + return CELL_FS_EBADF; // ??? + } + + if (!file->st_status.compare_and_swap_test(SSS_INITIALIZED, SSS_NOT_INITIALIZED)) + { + return CELL_FS_ENXIO; + } + + vm::dealloc(file->st_buffer); return CELL_OK; } @@ -369,13 +379,22 @@ s32 cellFsStReadGetRingBuf(u32 fd, vm::ptr ringbuf) cellFs.Warning("cellFsStReadGetRingBuf(fd=0x%x, ringbuf=*0x%x)", fd, ringbuf); std::shared_ptr file; + if (!Emu.GetIdManager().GetIDData(fd, file)) - return CELL_ESRCH; + { + return CELL_FS_EBADF; + } - *ringbuf = fs_config.m_ring_buffer; + if (file->st_status.read_sync() == SSS_NOT_INITIALIZED) + { + return CELL_FS_ENXIO; + } + + ringbuf->ringbuf_size = file->st_ringbuf_size; + ringbuf->block_size = file->st_block_size; + ringbuf->transfer_rate = file->st_trans_rate; + ringbuf->copy = file->st_copyless ? CELL_FS_ST_COPYLESS : CELL_FS_ST_COPY; - cellFs.Warning("*** fs stream config: block_size=0x%llx, copy=0x%x, ringbuf_size=0x%llx, transfer_rate=0x%llx", - ringbuf->block_size, ringbuf->copy, ringbuf->ringbuf_size, ringbuf->transfer_rate); return CELL_OK; } @@ -384,10 +403,31 @@ s32 cellFsStReadGetStatus(u32 fd, vm::ptr status) cellFs.Warning("cellFsStReadGetRingBuf(fd=0x%x, status=*0x%x)", fd, status); std::shared_ptr file; - if (!Emu.GetIdManager().GetIDData(fd, file)) - return CELL_ESRCH; - *status = fs_config.m_fs_status; + if (!Emu.GetIdManager().GetIDData(fd, file)) + { + return CELL_FS_EBADF; + } + + switch (file->st_status.read_sync()) + { + case SSS_INITIALIZED: + case SSS_STOPPED: + { + *status = CELL_FS_ST_INITIALIZED | CELL_FS_ST_STOP; + break; + } + case SSS_STARTED: + { + *status = CELL_FS_ST_INITIALIZED | CELL_FS_ST_PROGRESS; + break; + } + default: + { + *status = CELL_FS_ST_NOT_INITIALIZED | CELL_FS_ST_STOP; + break; + } + } return CELL_OK; } @@ -397,24 +437,86 @@ s32 cellFsStReadGetRegid(u32 fd, vm::ptr regid) cellFs.Warning("cellFsStReadGetRingBuf(fd=0x%x, regid=*0x%x)", fd, regid); std::shared_ptr file; - if (!Emu.GetIdManager().GetIDData(fd, file)) - return CELL_ESRCH; - *regid = fs_config.m_regid; + if (!Emu.GetIdManager().GetIDData(fd, file)) + { + return CELL_FS_EBADF; + } + + if (file->st_status.read_sync() == SSS_NOT_INITIALIZED) + { + return CELL_FS_ENXIO; + } + + *regid = file->st_read; return CELL_OK; } s32 cellFsStReadStart(u32 fd, u64 offset, u64 size) { - cellFs.Todo("cellFsStReadStart(fd=0x%x, offset=0x%llx, size=0x%llx)", fd, offset, size); + cellFs.Warning("cellFsStReadStart(fd=0x%x, offset=0x%llx, size=0x%llx)", fd, offset, size); std::shared_ptr file; - if (!Emu.GetIdManager().GetIDData(fd, file)) - return CELL_ESRCH; - fs_config.m_current_addr = fs_config.m_buffer + (u32)offset; - fs_config.m_fs_status = CELL_FS_ST_PROGRESS; + if (!Emu.GetIdManager().GetIDData(fd, file)) + { + return CELL_FS_EBADF; + } + + switch (auto status = file->st_status.compare_and_swap(SSS_INITIALIZED, SSS_STARTED)) + { + case SSS_NOT_INITIALIZED: + { + return CELL_FS_ENXIO; + } + + case SSS_STARTED: + { + return CELL_FS_EBUSY; + } + } + + offset = std::min(file->file->GetSize(), offset); + size = std::min(file->file->GetSize() - offset, size); + + file->st_thread.set_name(fmt::format("FS ST Thread[0x%x]", fd)); + + file->st_thread.start([=]() + { + std::unique_lock lock(file->mutex); + + u64 total_read = 0; + + u32 position = file->st_buffer; + + while (file->st_status.read_relaxed() == SSS_STARTED && !Emu.IsStopped()) + { + if (file->st_read < file->st_ringbuf_size && total_read < size) + { + // read data + auto old = file->file->Seek(offset + total_read); + auto res = file->file->Read(vm::get_ptr(position), file->st_block_size); + file->file->Seek(old - file->st_block_size); + + // notify + file->st_read += res; + + // set next buffer position + position += static_cast(file->st_block_size); + position >= file->st_ringbuf_size ? position = 0 : 0; + total_read += res; + + // try again + continue; + } + + file->cv.wait_for(lock, std::chrono::milliseconds(1)); + } + + file->st_status.compare_and_swap(SSS_STOPPED, SSS_INITIALIZED); + file->st_read = 0; + }); return CELL_OK; } @@ -424,10 +526,27 @@ s32 cellFsStReadStop(u32 fd) cellFs.Warning("cellFsStReadStop(fd=0x%x)", fd); std::shared_ptr file; - if (!Emu.GetIdManager().GetIDData(fd, file)) - return CELL_ESRCH; - fs_config.m_fs_status = CELL_FS_ST_STOP; + if (!Emu.GetIdManager().GetIDData(fd, file)) + { + return CELL_FS_EBADF; + } + + switch (auto status = file->st_status.compare_and_swap(SSS_STARTED, SSS_STOPPED)) + { + case SSS_NOT_INITIALIZED: + { + return CELL_FS_ENXIO; + } + + case SSS_INITIALIZED: + case SSS_STOPPED: + { + return CELL_OK; + } + } + + file->st_thread.join(); return CELL_OK; } @@ -437,60 +556,76 @@ s32 cellFsStRead(u32 fd, vm::ptr buf, u64 size, vm::ptr rsize) cellFs.Warning("cellFsStRead(fd=0x%x, buf=*0x%x, size=0x%llx, rsize=*0x%x)", fd, buf, size, rsize); std::shared_ptr file; + if (!Emu.GetIdManager().GetIDData(fd, file)) - return CELL_ESRCH; + { + return CELL_FS_EBADF; + } - // TODO: use ringbuffer (fs_config) - fs_config.m_regid += size; + if (file->st_status.read_sync() == SSS_NOT_INITIALIZED || file->st_copyless) + { + return CELL_FS_ENXIO; + } - if (file->file->Eof()) - return CELL_FS_ERANGE; + // TODO: end point -> CELL_FS_ERANGE - *rsize = file->file->Read(buf.get_ptr(), size); + return CELL_OK; } s32 cellFsStReadGetCurrentAddr(u32 fd, vm::ptr> addr, vm::ptr size) { - cellFs.Todo("cellFsStReadGetCurrentAddr(fd=0x%x, addr=*0x%x, size=*0x%x)", fd, addr, size); + cellFs.Warning("cellFsStReadGetCurrentAddr(fd=0x%x, addr=*0x%x, size=*0x%x)", fd, addr, size); std::shared_ptr file; + if (!Emu.GetIdManager().GetIDData(fd, file)) - return CELL_ESRCH; + { + return CELL_FS_EBADF; + } return CELL_OK; } s32 cellFsStReadPutCurrentAddr(u32 fd, vm::ptr addr, u64 size) { - cellFs.Todo("cellFsStReadPutCurrentAddr(fd=0x%x, addr=*0x%x, size=0x%llx)", fd, addr, size); + cellFs.Warning("cellFsStReadPutCurrentAddr(fd=0x%x, addr=*0x%x, size=0x%llx)", fd, addr, size); std::shared_ptr file; + if (!Emu.GetIdManager().GetIDData(fd, file)) - return CELL_ESRCH; + { + return CELL_FS_EBADF; + } return CELL_OK; } s32 cellFsStReadWait(u32 fd, u64 size) { - cellFs.Todo("cellFsStReadWait(fd=0x%x, size=0x%llx)", fd, size); + cellFs.Warning("cellFsStReadWait(fd=0x%x, size=0x%llx)", fd, size); std::shared_ptr file; + if (!Emu.GetIdManager().GetIDData(fd, file)) - return CELL_ESRCH; + { + return CELL_FS_EBADF; + } return CELL_OK; } -s32 cellFsStReadWaitCallback(u32 fd, u64 size, vm::ptr func) +s32 cellFsStReadWaitCallback(u32 fd, u64 size, vm::ptr func) { cellFs.Todo("cellFsStReadWaitCallback(fd=0x%x, size=0x%llx, func=*0x%x)", fd, size, func); std::shared_ptr file; + if (!Emu.GetIdManager().GetIDData(fd, file)) - return CELL_ESRCH; + { + return CELL_FS_EBADF; + } return CELL_OK; } @@ -634,14 +769,10 @@ s32 cellFsSdataOpenByFd(u32 mself_fd, s32 flags, vm::ptr sdata_fd, u64 offs return CELL_OK; } -std::mutex g_fs_aio_mutex; - using fs_aio_cb_t = vm::ptr xaio, s32 error, s32 xid, u64 size)>; void fsAio(vm::ptr aio, bool write, s32 xid, fs_aio_cb_t func) { - std::lock_guard lock(g_fs_aio_mutex); - cellFs.Notice("FS AIO Request(%d): fd=0x%x, offset=0x%llx, buf=*0x%x, size=0x%llx, user_data=0x%llx", xid, aio->fd, aio->offset, aio->buf, aio->size, aio->user_data); s32 error = CELL_OK; @@ -655,6 +786,8 @@ void fsAio(vm::ptr aio, bool write, s32 xid, fs_aio_cb_t func) } else { + std::lock_guard lock(file->mutex); + const auto old_position = file->file->Tell(); file->file->Seek(aio->offset); @@ -704,7 +837,9 @@ s32 cellFsAioRead(vm::ptr aio, vm::ptr id, fs_aio_cb_t func) // TODO: detect mount point and send AIO request to the AIO thread of this mount point - thread_t("FS AIO Read Thread", std::bind(fsAio, aio, false, (*id = ++g_fs_aio_id), func)).detach(); + const s32 xid = (*id = ++g_fs_aio_id); + + thread_t("FS AIO Read Thread", std::bind(fsAio, aio, false, xid, func)).detach(); return CELL_OK; } @@ -720,7 +855,9 @@ s32 cellFsAioWrite(vm::ptr aio, vm::ptr id, fs_aio_cb_t func) // TODO: detect mount point and send AIO request to the AIO thread of this mount point - thread_t("FS AIO Write Thread", std::bind(fsAio, aio, true, (*id = ++g_fs_aio_id), func)).detach(); + const s32 xid = (*id = ++g_fs_aio_id); + + thread_t("FS AIO Write Thread", std::bind(fsAio, aio, true, xid, func)).detach(); return CELL_OK; } diff --git a/rpcs3/Emu/SysCalls/Modules/cellFs.h b/rpcs3/Emu/SysCalls/Modules/cellFs.h index d45512d5ff..c5cc965795 100644 --- a/rpcs3/Emu/SysCalls/Modules/cellFs.h +++ b/rpcs3/Emu/SysCalls/Modules/cellFs.h @@ -1,5 +1,11 @@ #pragma once +struct CellFsDirectoryEntry +{ + CellFsStat attribute; + CellFsDirent entry_name; +}; + // CellFsRingBuffer.copy enum : s32 { @@ -15,7 +21,7 @@ struct CellFsRingBuffer be_t copy; }; -// cellFsSt(Read|Write)GetStatus status +// cellFsStReadGetStatus status enum : u64 { CELL_FS_ST_INITIALIZED = 0x0001, diff --git a/rpcs3/Emu/SysCalls/lv2/sys_fs.cpp b/rpcs3/Emu/SysCalls/lv2/sys_fs.cpp index 635cf6b615..ab057c40b2 100644 --- a/rpcs3/Emu/SysCalls/lv2/sys_fs.cpp +++ b/rpcs3/Emu/SysCalls/lv2/sys_fs.cpp @@ -142,6 +142,8 @@ s32 sys_fs_read(u32 fd, vm::ptr buf, u64 nbytes, vm::ptr nread) return CELL_FS_EBADF; } + std::lock_guard lock(file->mutex); + *nread = file->file->Read(buf.get_ptr(), nbytes); return CELL_OK; @@ -158,7 +160,9 @@ s32 sys_fs_write(u32 fd, vm::ptr buf, u64 nbytes, vm::ptr nwrit return CELL_FS_EBADF; } - // TODO: return CELL_FS_EBUSY if locked + // TODO: return CELL_FS_EBUSY if locked by stream + + std::lock_guard lock(file->mutex); *nwrite = file->file->Write(buf.get_ptr(), nbytes); @@ -471,6 +475,7 @@ s32 sys_fs_lseek(u32 fd, s64 offset, s32 whence, vm::ptr pos) if (!Emu.GetIdManager().GetIDData(fd, file)) return CELL_ESRCH; + std::lock_guard lock(file->mutex); *pos = file->file->Seek(offset, seek_mode); return CELL_OK; } diff --git a/rpcs3/Emu/SysCalls/lv2/sys_fs.h b/rpcs3/Emu/SysCalls/lv2/sys_fs.h index 4b7b602464..730573bdc3 100644 --- a/rpcs3/Emu/SysCalls/lv2/sys_fs.h +++ b/rpcs3/Emu/SysCalls/lv2/sys_fs.h @@ -1,4 +1,5 @@ #pragma once +#include "Utilities/Thread.h" #pragma pack(push, 4) @@ -144,16 +145,41 @@ struct CellFsUtimbuf #pragma pack(pop) +// Stream Support Status (st_status) +enum : u32 +{ + SSS_NOT_INITIALIZED = 0, + SSS_INITIALIZED, + SSS_STARTED, + SSS_STOPPED, +}; + struct fs_file_t { const std::shared_ptr file; const s32 mode; const s32 flags; + std::mutex mutex; + std::condition_variable cv; + + atomic_le_t st_status; + + u64 st_ringbuf_size; + u64 st_block_size; + u64 st_trans_rate; + bool st_copyless; + + thread_t st_thread; + + u32 st_buffer; + std::atomic st_read; // amount of data that can be read from the ringbuffer + fs_file_t(std::shared_ptr& file, s32 mode, s32 flags) : file(file) , mode(mode) , flags(flags) + , st_status({ SSS_NOT_INITIALIZED }) { } };