cellFsSt* update

This commit is contained in:
Nekotekina 2015-03-16 16:15:52 +03:00
parent 63b97d6817
commit 7ce45a3bae
2 changed files with 133 additions and 27 deletions

View file

@ -335,6 +335,8 @@ s32 cellFsStReadInit(u32 fd, vm::ptr<const CellFsRingBuffer> ringbuf)
return CELL_FS_EPERM;
}
std::lock_guard<std::mutex> lock(file->mutex);
if (!file->st_status.compare_and_swap_test(SSS_NOT_INITIALIZED, SSS_INITIALIZED))
{
return CELL_FS_EBUSY;
@ -347,8 +349,10 @@ s32 cellFsStReadInit(u32 fd, vm::ptr<const CellFsRingBuffer> ringbuf)
const u64 alloc_size = align(file->st_ringbuf_size, file->st_ringbuf_size < 1024 * 1024 ? 64 * 1024 : 1024 * 1024);
file->st_buffer = vm::alloc(vm::cast(alloc_size, "alloc_size"));
file->st_read = 0;
file->st_buffer = vm::alloc(static_cast<u32>(alloc_size), vm::main);
file->st_read_size = 0;
file->st_total_read = 0;
file->st_copied = 0;
return CELL_OK;
}
@ -364,12 +368,14 @@ s32 cellFsStReadFinish(u32 fd)
return CELL_FS_EBADF; // ???
}
std::lock_guard<std::mutex> lock(file->mutex);
if (!file->st_status.compare_and_swap_test(SSS_INITIALIZED, SSS_NOT_INITIALIZED))
{
return CELL_FS_ENXIO;
}
vm::dealloc(file->st_buffer);
vm::dealloc(file->st_buffer, vm::main);
return CELL_OK;
}
@ -448,7 +454,7 @@ s32 cellFsStReadGetRegid(u32 fd, vm::ptr<u64> regid)
return CELL_FS_ENXIO;
}
*regid = file->st_read;
*regid = file->st_total_read - file->st_copied;
return CELL_OK;
}
@ -481,41 +487,54 @@ s32 cellFsStReadStart(u32 fd, u64 offset, u64 size)
size = std::min<u64>(file->file->GetSize() - offset, size);
file->st_thread.set_name(fmt::format("FS ST Thread[0x%x]", fd));
file->st_read_size = size;
file->st_thread.start([=]()
{
std::unique_lock<std::mutex> lock(file->mutex);
u64 total_read = 0;
u32 position = file->st_buffer;
while (file->st_status.read_relaxed() == SSS_STARTED && !Emu.IsStopped())
{
if (file->st_read < file->st_ringbuf_size && total_read < size)
// check free space in buffer and available data in stream
if (file->st_total_read - file->st_copied <= file->st_ringbuf_size - file->st_block_size && file->st_total_read < file->st_read_size)
{
// get buffer position
const u32 position = vm::cast(file->st_buffer + file->st_total_read % file->st_ringbuf_size);
// read data
auto old = file->file->Seek(offset + total_read);
auto old = file->file->Seek(offset + file->st_total_read);
auto res = file->file->Read(vm::get_ptr(position), file->st_block_size);
file->file->Seek(old - file->st_block_size);
// notify
file->st_read += res;
file->st_total_read += res;
file->cv.notify_one();
}
// set next buffer position
position += static_cast<u32>(file->st_block_size);
position >= file->st_ringbuf_size ? position = 0 : 0;
total_read += res;
// check callback condition if set
if (file->st_callback.data.func)
{
const u64 available = file->st_total_read - file->st_copied;
// try again
continue;
if (available >= file->st_callback.data.size)
{
const auto func = file->st_callback.exchange({}).func;
Emu.GetCallbackManager().Async([=](PPUThread& CPU)
{
func(CPU, fd, available);
});
}
}
file->cv.wait_for(lock, std::chrono::milliseconds(1));
}
file->st_status.compare_and_swap(SSS_STOPPED, SSS_INITIALIZED);
file->st_read = 0;
file->st_read_size = 0;
file->st_total_read = 0;
file->st_copied = 0;
file->st_callback.data = {};
});
return CELL_OK;
@ -546,6 +565,7 @@ s32 cellFsStReadStop(u32 fd)
}
}
file->cv.notify_all();
file->st_thread.join();
return CELL_OK;
@ -567,14 +587,25 @@ s32 cellFsStRead(u32 fd, vm::ptr<u8> buf, u64 size, vm::ptr<u64> rsize)
return CELL_FS_ENXIO;
}
// TODO: end point -> CELL_FS_ERANGE
const u64 copied = file->st_copied.load();
const u32 position = vm::cast(file->st_buffer + copied % file->st_ringbuf_size);
const u64 total_read = file->st_total_read.load();
const u32 copy_size = vm::cast(*rsize = std::min<u64>(size, total_read - copied));
// copy data
const u32 first_size = std::min<u32>(copy_size, file->st_ringbuf_size - (position - file->st_buffer));
memcpy(buf.get_ptr(), vm::get_ptr(position), first_size);
memcpy((buf + first_size).get_ptr(), vm::get_ptr(file->st_buffer), copy_size - first_size);
return CELL_OK;
// notify
file->st_copied += copy_size;
file->cv.notify_one();
// check end of stream
return total_read < file->st_read_size ? CELL_OK : CELL_FS_ERANGE;
}
s32 cellFsStReadGetCurrentAddr(u32 fd, vm::ptr<vm::ptr<u8>> addr, vm::ptr<u64> size)
s32 cellFsStReadGetCurrentAddr(u32 fd, vm::ptr<u32> addr, vm::ptr<u64> size)
{
cellFs.Warning("cellFsStReadGetCurrentAddr(fd=0x%x, addr=*0x%x, size=*0x%x)", fd, addr, size);
@ -585,7 +616,26 @@ s32 cellFsStReadGetCurrentAddr(u32 fd, vm::ptr<vm::ptr<u8>> addr, vm::ptr<u64> s
return CELL_FS_EBADF;
}
return CELL_OK;
if (file->st_status.read_sync() == SSS_NOT_INITIALIZED || !file->st_copyless)
{
return CELL_FS_ENXIO;
}
const u64 copied = file->st_copied.load();
const u32 position = vm::cast(file->st_buffer + copied % file->st_ringbuf_size);
const u64 total_read = file->st_total_read.load();
if ((*size = std::min<u64>(file->st_ringbuf_size - (position - file->st_buffer), total_read - copied)).data())
{
*addr = position;
}
else
{
*addr = 0;
}
// check end of stream
return total_read < file->st_read_size ? CELL_OK : CELL_FS_ERANGE;
}
s32 cellFsStReadPutCurrentAddr(u32 fd, vm::ptr<u8> addr, u64 size)
@ -599,7 +649,20 @@ s32 cellFsStReadPutCurrentAddr(u32 fd, vm::ptr<u8> addr, u64 size)
return CELL_FS_EBADF;
}
return CELL_OK;
if (file->st_status.read_sync() == SSS_NOT_INITIALIZED || !file->st_copyless)
{
return CELL_FS_ENXIO;
}
const u64 copied = file->st_copied.load();
const u64 total_read = file->st_total_read.load();
// notify
file->st_copied += size;
file->cv.notify_one();
// check end of stream
return total_read < file->st_read_size ? CELL_OK : CELL_FS_ERANGE;
}
s32 cellFsStReadWait(u32 fd, u64 size)
@ -612,13 +675,33 @@ s32 cellFsStReadWait(u32 fd, u64 size)
{
return CELL_FS_EBADF;
}
if (file->st_status.read_sync() == SSS_NOT_INITIALIZED)
{
return CELL_FS_ENXIO;
}
std::unique_lock<std::mutex> lock(file->mutex);
while (file->st_total_read - file->st_copied < size && file->st_total_read < file->st_read_size)
{
// wait for size availability or stream end
if (Emu.IsStopped())
{
cellFs.Warning("cellFsStReadWait(0x%x) aborted", fd);
return CELL_OK;
}
file->cv.wait_for(lock, std::chrono::milliseconds(1));
}
return CELL_OK;
}
s32 cellFsStReadWaitCallback(u32 fd, u64 size, vm::ptr<void(u32 xfd, u64 xsize)> func)
s32 cellFsStReadWaitCallback(u32 fd, u64 size, fs_st_cb_t func)
{
cellFs.Todo("cellFsStReadWaitCallback(fd=0x%x, size=0x%llx, func=*0x%x)", fd, size, func);
cellFs.Warning("cellFsStReadWaitCallback(fd=0x%x, size=0x%llx, func=*0x%x)", fd, size, func);
std::shared_ptr<fs_file_t> file;
@ -626,6 +709,16 @@ s32 cellFsStReadWaitCallback(u32 fd, u64 size, vm::ptr<void(u32 xfd, u64 xsize)>
{
return CELL_FS_EBADF;
}
if (file->st_status.read_sync() == SSS_NOT_INITIALIZED)
{
return CELL_FS_ENXIO;
}
if (!file->st_callback.compare_and_swap_test({}, { vm::cast(size, "size"), func }))
{
return CELL_FS_EIO;
}
return CELL_OK;
}

View file

@ -154,6 +154,14 @@ enum : u32
SSS_STOPPED,
};
using fs_st_cb_t = vm::ptr<void(u32 xfd, u64 xsize)>;
struct fs_st_cb_rec_t
{
u32 size;
fs_st_cb_t func;
};
struct fs_file_t
{
const std::shared_ptr<vfsStream> file;
@ -173,13 +181,18 @@ struct fs_file_t
thread_t st_thread;
u32 st_buffer;
std::atomic<u64> st_read; // amount of data that can be read from the ringbuffer
u64 st_read_size;
std::atomic<u64> st_total_read;
std::atomic<u64> st_copied;
atomic_le_t<fs_st_cb_rec_t> st_callback;
fs_file_t(std::shared_ptr<vfsStream>& file, s32 mode, s32 flags)
: file(file)
, mode(mode)
, flags(flags)
, st_status({ SSS_NOT_INITIALIZED })
, st_callback({})
{
}
};