diff --git a/rpcs3/Emu/SysCalls/Modules/cellFs.cpp b/rpcs3/Emu/SysCalls/Modules/cellFs.cpp index ff605e661c..6f4a07d3b0 100644 --- a/rpcs3/Emu/SysCalls/Modules/cellFs.cpp +++ b/rpcs3/Emu/SysCalls/Modules/cellFs.cpp @@ -335,6 +335,8 @@ s32 cellFsStReadInit(u32 fd, vm::ptr ringbuf) return CELL_FS_EPERM; } + std::lock_guard lock(file->mutex); + if (!file->st_status.compare_and_swap_test(SSS_NOT_INITIALIZED, SSS_INITIALIZED)) { return CELL_FS_EBUSY; @@ -347,8 +349,10 @@ s32 cellFsStReadInit(u32 fd, vm::ptr ringbuf) const u64 alloc_size = align(file->st_ringbuf_size, file->st_ringbuf_size < 1024 * 1024 ? 64 * 1024 : 1024 * 1024); - file->st_buffer = vm::alloc(vm::cast(alloc_size, "alloc_size")); - file->st_read = 0; + file->st_buffer = vm::alloc(static_cast(alloc_size), vm::main); + file->st_read_size = 0; + file->st_total_read = 0; + file->st_copied = 0; return CELL_OK; } @@ -364,12 +368,14 @@ s32 cellFsStReadFinish(u32 fd) return CELL_FS_EBADF; // ??? } + std::lock_guard lock(file->mutex); + if (!file->st_status.compare_and_swap_test(SSS_INITIALIZED, SSS_NOT_INITIALIZED)) { return CELL_FS_ENXIO; } - vm::dealloc(file->st_buffer); + vm::dealloc(file->st_buffer, vm::main); return CELL_OK; } @@ -448,7 +454,7 @@ s32 cellFsStReadGetRegid(u32 fd, vm::ptr regid) return CELL_FS_ENXIO; } - *regid = file->st_read; + *regid = file->st_total_read - file->st_copied; return CELL_OK; } @@ -481,41 +487,54 @@ s32 cellFsStReadStart(u32 fd, u64 offset, u64 size) size = std::min(file->file->GetSize() - offset, size); file->st_thread.set_name(fmt::format("FS ST Thread[0x%x]", fd)); + file->st_read_size = size; file->st_thread.start([=]() { std::unique_lock lock(file->mutex); - u64 total_read = 0; - - u32 position = file->st_buffer; - while (file->st_status.read_relaxed() == SSS_STARTED && !Emu.IsStopped()) { - if (file->st_read < file->st_ringbuf_size && total_read < size) + // check free space in buffer and available data in stream + if (file->st_total_read - file->st_copied <= file->st_ringbuf_size - file->st_block_size && file->st_total_read < file->st_read_size) { + // get buffer position + const u32 position = vm::cast(file->st_buffer + file->st_total_read % file->st_ringbuf_size); + // read data - auto old = file->file->Seek(offset + total_read); + auto old = file->file->Seek(offset + file->st_total_read); auto res = file->file->Read(vm::get_ptr(position), file->st_block_size); file->file->Seek(old - file->st_block_size); // notify - file->st_read += res; + file->st_total_read += res; + file->cv.notify_one(); + } - // set next buffer position - position += static_cast(file->st_block_size); - position >= file->st_ringbuf_size ? position = 0 : 0; - total_read += res; + // check callback condition if set + if (file->st_callback.data.func) + { + const u64 available = file->st_total_read - file->st_copied; - // try again - continue; + if (available >= file->st_callback.data.size) + { + const auto func = file->st_callback.exchange({}).func; + + Emu.GetCallbackManager().Async([=](PPUThread& CPU) + { + func(CPU, fd, available); + }); + } } file->cv.wait_for(lock, std::chrono::milliseconds(1)); } file->st_status.compare_and_swap(SSS_STOPPED, SSS_INITIALIZED); - file->st_read = 0; + file->st_read_size = 0; + file->st_total_read = 0; + file->st_copied = 0; + file->st_callback.data = {}; }); return CELL_OK; @@ -546,6 +565,7 @@ s32 cellFsStReadStop(u32 fd) } } + file->cv.notify_all(); file->st_thread.join(); return CELL_OK; @@ -567,14 +587,25 @@ s32 cellFsStRead(u32 fd, vm::ptr buf, u64 size, vm::ptr rsize) return CELL_FS_ENXIO; } - // TODO: end point -> CELL_FS_ERANGE - + const u64 copied = file->st_copied.load(); + const u32 position = vm::cast(file->st_buffer + copied % file->st_ringbuf_size); + const u64 total_read = file->st_total_read.load(); + const u32 copy_size = vm::cast(*rsize = std::min(size, total_read - copied)); + // copy data + const u32 first_size = std::min(copy_size, file->st_ringbuf_size - (position - file->st_buffer)); + memcpy(buf.get_ptr(), vm::get_ptr(position), first_size); + memcpy((buf + first_size).get_ptr(), vm::get_ptr(file->st_buffer), copy_size - first_size); - return CELL_OK; + // notify + file->st_copied += copy_size; + file->cv.notify_one(); + + // check end of stream + return total_read < file->st_read_size ? CELL_OK : CELL_FS_ERANGE; } -s32 cellFsStReadGetCurrentAddr(u32 fd, vm::ptr> addr, vm::ptr size) +s32 cellFsStReadGetCurrentAddr(u32 fd, vm::ptr addr, vm::ptr size) { cellFs.Warning("cellFsStReadGetCurrentAddr(fd=0x%x, addr=*0x%x, size=*0x%x)", fd, addr, size); @@ -585,7 +616,26 @@ s32 cellFsStReadGetCurrentAddr(u32 fd, vm::ptr> addr, vm::ptr s return CELL_FS_EBADF; } - return CELL_OK; + if (file->st_status.read_sync() == SSS_NOT_INITIALIZED || !file->st_copyless) + { + return CELL_FS_ENXIO; + } + + const u64 copied = file->st_copied.load(); + const u32 position = vm::cast(file->st_buffer + copied % file->st_ringbuf_size); + const u64 total_read = file->st_total_read.load(); + + if ((*size = std::min(file->st_ringbuf_size - (position - file->st_buffer), total_read - copied)).data()) + { + *addr = position; + } + else + { + *addr = 0; + } + + // check end of stream + return total_read < file->st_read_size ? CELL_OK : CELL_FS_ERANGE; } s32 cellFsStReadPutCurrentAddr(u32 fd, vm::ptr addr, u64 size) @@ -599,7 +649,20 @@ s32 cellFsStReadPutCurrentAddr(u32 fd, vm::ptr addr, u64 size) return CELL_FS_EBADF; } - return CELL_OK; + if (file->st_status.read_sync() == SSS_NOT_INITIALIZED || !file->st_copyless) + { + return CELL_FS_ENXIO; + } + + const u64 copied = file->st_copied.load(); + const u64 total_read = file->st_total_read.load(); + + // notify + file->st_copied += size; + file->cv.notify_one(); + + // check end of stream + return total_read < file->st_read_size ? CELL_OK : CELL_FS_ERANGE; } s32 cellFsStReadWait(u32 fd, u64 size) @@ -612,13 +675,33 @@ s32 cellFsStReadWait(u32 fd, u64 size) { return CELL_FS_EBADF; } + + if (file->st_status.read_sync() == SSS_NOT_INITIALIZED) + { + return CELL_FS_ENXIO; + } + + std::unique_lock lock(file->mutex); + + while (file->st_total_read - file->st_copied < size && file->st_total_read < file->st_read_size) + { + // wait for size availability or stream end + + if (Emu.IsStopped()) + { + cellFs.Warning("cellFsStReadWait(0x%x) aborted", fd); + return CELL_OK; + } + + file->cv.wait_for(lock, std::chrono::milliseconds(1)); + } return CELL_OK; } -s32 cellFsStReadWaitCallback(u32 fd, u64 size, vm::ptr func) +s32 cellFsStReadWaitCallback(u32 fd, u64 size, fs_st_cb_t func) { - cellFs.Todo("cellFsStReadWaitCallback(fd=0x%x, size=0x%llx, func=*0x%x)", fd, size, func); + cellFs.Warning("cellFsStReadWaitCallback(fd=0x%x, size=0x%llx, func=*0x%x)", fd, size, func); std::shared_ptr file; @@ -626,6 +709,16 @@ s32 cellFsStReadWaitCallback(u32 fd, u64 size, vm::ptr { return CELL_FS_EBADF; } + + if (file->st_status.read_sync() == SSS_NOT_INITIALIZED) + { + return CELL_FS_ENXIO; + } + + if (!file->st_callback.compare_and_swap_test({}, { vm::cast(size, "size"), func })) + { + return CELL_FS_EIO; + } return CELL_OK; } diff --git a/rpcs3/Emu/SysCalls/lv2/sys_fs.h b/rpcs3/Emu/SysCalls/lv2/sys_fs.h index 730573bdc3..7466548234 100644 --- a/rpcs3/Emu/SysCalls/lv2/sys_fs.h +++ b/rpcs3/Emu/SysCalls/lv2/sys_fs.h @@ -154,6 +154,14 @@ enum : u32 SSS_STOPPED, }; +using fs_st_cb_t = vm::ptr; + +struct fs_st_cb_rec_t +{ + u32 size; + fs_st_cb_t func; +}; + struct fs_file_t { const std::shared_ptr file; @@ -173,13 +181,18 @@ struct fs_file_t thread_t st_thread; u32 st_buffer; - std::atomic st_read; // amount of data that can be read from the ringbuffer + u64 st_read_size; + std::atomic st_total_read; + std::atomic st_copied; + + atomic_le_t st_callback; fs_file_t(std::shared_ptr& file, s32 mode, s32 flags) : file(file) , mode(mode) , flags(flags) , st_status({ SSS_NOT_INITIALIZED }) + , st_callback({}) { } };