LibCore: Port SharedCircularQueue to Windows

This commit is contained in:
stasoid 2024-11-30 13:15:18 +05:00 committed by Andrew Kaster
commit 777dcdf6d0
Notes: github-actions[bot] 2024-12-09 00:26:43 +00:00

View file

@ -1,33 +1,17 @@
/* /*
* Copyright (c) 2022, kleines Filmröllchen <filmroellchen@serenityos.org> * Copyright (c) 2022, kleines Filmröllchen <filmroellchen@serenityos.org>
* Copyright (c) 2024, stasoid <stasoid@yahoo.com>
* *
* SPDX-License-Identifier: BSD-2-Clause * SPDX-License-Identifier: BSD-2-Clause
*/ */
#pragma once #pragma once
#include <AK/Assertions.h>
#include <AK/Atomic.h>
#include <AK/BuiltinWrappers.h> #include <AK/BuiltinWrappers.h>
#include <AK/ByteString.h> #include <AK/ByteString.h>
#include <AK/Debug.h> #include <AK/Debug.h>
#include <AK/Error.h>
#include <AK/Format.h>
#include <AK/Function.h> #include <AK/Function.h>
#include <AK/NonnullRefPtr.h>
#include <AK/NumericLimits.h>
#include <AK/Platform.h>
#include <AK/RefCounted.h>
#include <AK/RefPtr.h>
#include <AK/Types.h>
#include <AK/Variant.h>
#include <AK/Weakable.h>
#include <LibCore/AnonymousBuffer.h> #include <LibCore/AnonymousBuffer.h>
#include <LibCore/System.h>
#include <errno.h>
#include <fcntl.h>
#include <sched.h>
#include <sys/mman.h>
namespace Core { namespace Core {
@ -64,14 +48,17 @@ public:
// Allocates a new circular queue in shared memory. // Allocates a new circular queue in shared memory.
static ErrorOr<SharedSingleProducerCircularQueue<T, Size>> create() static ErrorOr<SharedSingleProducerCircularQueue<T, Size>> create()
{ {
auto fd = TRY(System::anon_create(sizeof(SharedMemorySPCQ), O_CLOEXEC)); auto anon_buf = TRY(AnonymousBuffer::create_with_size(sizeof(SharedMemorySPCQ)));
return create_internal(fd, true); auto shared_queue = new (anon_buf.data<void>()) SharedMemorySPCQ;
return create_internal(anon_buf, shared_queue);
} }
// Uses an existing circular queue from given shared memory. // Uses an existing circular queue from given shared memory.
static ErrorOr<SharedSingleProducerCircularQueue<T, Size>> create(int fd) static ErrorOr<SharedSingleProducerCircularQueue<T, Size>> create(int fd)
{ {
return create_internal(fd, false); auto anon_buf = TRY(AnonymousBuffer::create_from_anon_fd(fd, sizeof(SharedMemorySPCQ)));
auto shared_queue = (SharedMemorySPCQ*)anon_buf.data<void>();
return create_internal(anon_buf, shared_queue);
} }
constexpr size_t size() const { return Size; } constexpr size_t size() const { return Size; }
@ -84,7 +71,7 @@ public:
return head - tail; return head - tail;
} }
ALWAYS_INLINE constexpr int fd() const { return m_queue->m_fd; } ALWAYS_INLINE constexpr int fd() const { return m_queue->fd(); }
ALWAYS_INLINE constexpr bool is_valid() const { return !m_queue.is_null(); } ALWAYS_INLINE constexpr bool is_valid() const { return !m_queue.is_null(); }
ALWAYS_INLINE constexpr size_t weak_head() const { return m_queue->m_queue->m_head.load(AK::MemoryOrder::memory_order_relaxed); } ALWAYS_INLINE constexpr size_t weak_head() const { return m_queue->m_queue->m_head.load(AK::MemoryOrder::memory_order_relaxed); }
@ -174,42 +161,35 @@ private:
alignas(ValueType) Array<ValueType, Size> m_data; alignas(ValueType) Array<ValueType, Size> m_data;
}; };
class RefCountedSharedMemorySPCQ : public RefCounted<RefCountedSharedMemorySPCQ> { class RefCountedSharedMemorySPCQ
: public RefCounted<RefCountedSharedMemorySPCQ>
, public AnonymousBuffer {
friend class SharedSingleProducerCircularQueue; friend class SharedSingleProducerCircularQueue;
public: public:
SharedMemorySPCQ* m_queue; SharedMemorySPCQ* m_queue;
void* m_raw;
int m_fd;
~RefCountedSharedMemorySPCQ() ~RefCountedSharedMemorySPCQ()
{ {
MUST(System::close(m_fd)); dbgln_if(SHARED_QUEUE_DEBUG, "destructed SSPCQ at {:p}, shared mem: {:p}", this, m_queue);
MUST(System::munmap(m_raw, sizeof(SharedMemorySPCQ)));
dbgln_if(SHARED_QUEUE_DEBUG, "destructed SSPCQ at {:p}, shared mem: {:p}", this, this->m_raw);
} }
private: private:
RefCountedSharedMemorySPCQ(SharedMemorySPCQ* queue, int fd) RefCountedSharedMemorySPCQ(AnonymousBuffer anon_buf, SharedMemorySPCQ* shared_queue)
: m_queue(queue) : AnonymousBuffer(anon_buf)
, m_raw(reinterpret_cast<void*>(queue)) , m_queue(shared_queue)
, m_fd(fd)
{ {
} }
}; };
static ErrorOr<SharedSingleProducerCircularQueue<T, Size>> create_internal(int fd, bool is_new) static ErrorOr<SharedSingleProducerCircularQueue<T, Size>> create_internal(AnonymousBuffer anon_buf, SharedMemorySPCQ* shared_queue)
{ {
auto name = ByteString::formatted("SharedSingleProducerCircularQueue@{:x}", fd);
auto* raw_mapping = TRY(System::mmap(nullptr, sizeof(SharedMemorySPCQ), PROT_READ | PROT_WRITE, MAP_SHARED, fd, 0, 0, name));
dbgln_if(SHARED_QUEUE_DEBUG, "successfully mmapped {} at {:p}", name, raw_mapping);
SharedMemorySPCQ* shared_queue = is_new ? new (raw_mapping) SharedMemorySPCQ() : reinterpret_cast<SharedMemorySPCQ*>(raw_mapping);
if (!shared_queue) if (!shared_queue)
return Error::from_string_literal("Unexpected error when creating shared queue from raw memory"); return Error::from_string_literal("Unexpected error when creating shared queue from raw memory");
auto name = ByteString::formatted("SharedSingleProducerCircularQueue@{:x}", anon_buf.fd());
return SharedSingleProducerCircularQueue<T, Size> { move(name), adopt_ref(*new (nothrow) RefCountedSharedMemorySPCQ(shared_queue, fd)) }; dbgln_if(SHARED_QUEUE_DEBUG, "successfully mmapped {} at {:p}", name, shared_queue);
auto ref_counted = new (nothrow) RefCountedSharedMemorySPCQ(anon_buf, shared_queue);
return SharedSingleProducerCircularQueue<T, Size> { move(name), adopt_ref(*ref_counted) };
} }
SharedSingleProducerCircularQueue(ByteString name, RefPtr<RefCountedSharedMemorySPCQ> queue) SharedSingleProducerCircularQueue(ByteString name, RefPtr<RefCountedSharedMemorySPCQ> queue)