diff --git a/Libraries/LibIPC/Connection.cpp b/Libraries/LibIPC/Connection.cpp index 51d1a13a4c1..f623c42b772 100644 --- a/Libraries/LibIPC/Connection.cpp +++ b/Libraries/LibIPC/Connection.cpp @@ -29,41 +29,9 @@ ConnectionBase::ConnectionBase(IPC::Stub& local_stub, Transport transport, u32 l (void)drain_messages_from_peer(); handle_messages(); }); - - m_send_queue = adopt_ref(*new SendQueue); - m_send_thread = Threading::Thread::construct([this, send_queue = m_send_queue]() -> intptr_t { - for (;;) { - send_queue->mutex.lock(); - while (send_queue->messages.is_empty() && send_queue->running) - send_queue->condition.wait(); - - if (!send_queue->running) { - send_queue->mutex.unlock(); - break; - } - - auto message_buffer = send_queue->messages.take_first(); - send_queue->mutex.unlock(); - - if (auto result = message_buffer.transfer_message(m_transport); result.is_error()) { - dbgln("ConnectionBase::send_thread: {}", result.error()); - continue; - } - } - return 0; - }); - m_send_thread->start(); } -ConnectionBase::~ConnectionBase() -{ - { - Threading::MutexLocker locker(m_send_queue->mutex); - m_send_queue->running = false; - m_send_queue->condition.signal(); - } - (void)m_send_thread->join(); -} +ConnectionBase::~ConnectionBase() = default; bool ConnectionBase::is_open() const { @@ -87,11 +55,7 @@ ErrorOr ConnectionBase::post_message(u32 endpoint_magic, MessageBuffer buf buffer = MUST(wrapper->encode()); } - { - Threading::MutexLocker locker(m_send_queue->mutex); - m_send_queue->messages.append(move(buffer)); - m_send_queue->condition.signal(); - } + MUST(buffer.transfer_message(m_transport)); m_responsiveness_timer->start(); return {}; diff --git a/Libraries/LibIPC/Connection.h b/Libraries/LibIPC/Connection.h index 6a2d2f2ff59..7aae27de1e4 100644 --- a/Libraries/LibIPC/Connection.h +++ b/Libraries/LibIPC/Connection.h @@ -60,16 +60,6 @@ protected: Vector> m_unprocessed_messages; u32 m_local_endpoint_magic { 0 }; - - struct SendQueue : public AtomicRefCounted { - AK::SinglyLinkedList messages; - Threading::Mutex mutex; - Threading::ConditionVariable condition { mutex }; - bool running { true }; - }; - - RefPtr m_send_thread; - RefPtr m_send_queue; }; template diff --git a/Libraries/LibIPC/Message.cpp b/Libraries/LibIPC/Message.cpp index 35af54f6276..5652bac21cb 100644 --- a/Libraries/LibIPC/Message.cpp +++ b/Libraries/LibIPC/Message.cpp @@ -43,16 +43,7 @@ ErrorOr MessageBuffer::transfer_message(Transport& transport) return Error::from_string_literal("Message is too large for IPC encoding"); } - auto raw_fds = Vector {}; - auto num_fds_to_transfer = m_fds.size(); - if (num_fds_to_transfer > 0) { - raw_fds.ensure_capacity(num_fds_to_transfer); - for (auto& owned_fd : m_fds) { - raw_fds.unchecked_append(owned_fd->value()); - } - } - - TRY(transport.transfer_message(m_data.span(), raw_fds)); + transport.post_message(m_data, m_fds); return {}; } diff --git a/Libraries/LibIPC/Message.h b/Libraries/LibIPC/Message.h index 7d1b741225a..65333340f2a 100644 --- a/Libraries/LibIPC/Message.h +++ b/Libraries/LibIPC/Message.h @@ -19,32 +19,6 @@ namespace IPC { -class AutoCloseFileDescriptor : public RefCounted { -public: - AutoCloseFileDescriptor(int fd) - : m_fd(fd) - { - } - - ~AutoCloseFileDescriptor() - { - if (m_fd != -1) - (void)Core::System::close(m_fd); - } - - int value() const { return m_fd; } - - int take_fd() - { - int fd = m_fd; - m_fd = -1; - return fd; - } - -private: - int m_fd; -}; - class MessageBuffer { public: MessageBuffer(); diff --git a/Libraries/LibIPC/TransportSocket.cpp b/Libraries/LibIPC/TransportSocket.cpp index ca0f8c387df..61d31b60c83 100644 --- a/Libraries/LibIPC/TransportSocket.cpp +++ b/Libraries/LibIPC/TransportSocket.cpp @@ -15,14 +15,46 @@ namespace IPC { TransportSocket::TransportSocket(NonnullOwnPtr socket) : m_socket(move(socket)) - , m_socket_write_mutex(make()) - , m_fds_retained_until_received_by_peer(make>>()) + , m_fds_retained_until_received_by_peer(make>>()) { + m_send_queue = adopt_ref(*new SendQueue); + m_send_thread = Threading::Thread::construct([&socket = *m_socket, send_queue = m_send_queue]() -> intptr_t { + for (;;) { + send_queue->mutex.lock(); + while (send_queue->messages.is_empty() && send_queue->running) + send_queue->condition.wait(); + + if (!send_queue->running) { + send_queue->mutex.unlock(); + break; + } + + auto [bytes, fds] = send_queue->messages.take_first(); + send_queue->mutex.unlock(); + + if (auto result = send_message(socket, bytes, fds); result.is_error()) { + dbgln("TransportSocket::send_thread: {}", result.error()); + } + } + return 0; + }); + m_send_thread->start(); + (void)Core::System::setsockopt(m_socket->fd().value(), SOL_SOCKET, SO_SNDBUF, &SOCKET_BUFFER_SIZE, sizeof(SOCKET_BUFFER_SIZE)); (void)Core::System::setsockopt(m_socket->fd().value(), SOL_SOCKET, SO_RCVBUF, &SOCKET_BUFFER_SIZE, sizeof(SOCKET_BUFFER_SIZE)); } -TransportSocket::~TransportSocket() = default; +TransportSocket::~TransportSocket() +{ + if (m_send_thread) { + { + Threading::MutexLocker locker(m_send_queue->mutex); + m_send_queue->running = false; + m_send_queue->condition.signal(); + } + (void)m_send_thread->join(); + } +} void TransportSocket::set_up_read_hook(Function hook) { @@ -62,37 +94,50 @@ struct MessageHeader { u32 fd_count { 0 }; }; -ErrorOr TransportSocket::transfer_message(ReadonlyBytes bytes_to_write, Vector const& unowned_fds) +void TransportSocket::post_message(Vector const& bytes_to_write, Vector> const& fds) const { Vector message_buffer; message_buffer.resize(sizeof(MessageHeader) + bytes_to_write.size()); MessageHeader header; header.payload_size = bytes_to_write.size(); - header.fd_count = unowned_fds.size(); + header.fd_count = fds.size(); header.type = MessageHeader::Type::Payload; memcpy(message_buffer.data(), &header, sizeof(MessageHeader)); memcpy(message_buffer.data() + sizeof(MessageHeader), bytes_to_write.data(), bytes_to_write.size()); - return transfer(message_buffer.span(), unowned_fds); + + for (auto const& fd : fds) + m_fds_retained_until_received_by_peer->enqueue(fd); + + auto raw_fds = Vector {}; + auto num_fds_to_transfer = fds.size(); + if (num_fds_to_transfer > 0) { + raw_fds.ensure_capacity(num_fds_to_transfer); + for (auto& owned_fd : fds) { + raw_fds.unchecked_append(owned_fd->value()); + } + } + + queue_message_on_send_thread({ move(message_buffer), move(raw_fds) }); } -ErrorOr TransportSocket::transfer(ReadonlyBytes bytes_to_write, Vector const& unowned_fds) +void TransportSocket::queue_message_on_send_thread(MessageToSend&& message_to_send) const { - m_fds_retained_until_received_by_peer->with_locked([&unowned_fds](auto& queue) { - for (auto const& fd : unowned_fds) - queue.enqueue(MUST(File::clone_fd(fd))); - }); - - Threading::MutexLocker lock(*m_socket_write_mutex); + Threading::MutexLocker lock(m_send_queue->mutex); + m_send_queue->messages.append(move(message_to_send)); + m_send_queue->condition.signal(); +} +ErrorOr TransportSocket::send_message(Core::LocalSocket& socket, ReadonlyBytes&& bytes_to_write, Vector const& unowned_fds) +{ auto num_fds_to_transfer = unowned_fds.size(); while (!bytes_to_write.is_empty()) { ErrorOr maybe_nwritten = 0; if (num_fds_to_transfer > 0) { - maybe_nwritten = m_socket->send_message(bytes_to_write, 0, unowned_fds); + maybe_nwritten = socket.send_message(bytes_to_write, 0, unowned_fds); if (!maybe_nwritten.is_error()) num_fds_to_transfer = 0; } else { - maybe_nwritten = m_socket->write_some(bytes_to_write); + maybe_nwritten = socket.write_some(bytes_to_write); } if (maybe_nwritten.is_error()) { @@ -102,7 +147,7 @@ ErrorOr TransportSocket::transfer(ReadonlyBytes bytes_to_write, Vector pollfds; if (pollfds.is_empty()) - pollfds.append({ .fd = m_socket->fd().value(), .events = POLLOUT, .revents = 0 }); + pollfds.append({ .fd = socket.fd().value(), .events = POLLOUT, .revents = 0 }); ErrorOr result { 0 }; do { @@ -196,20 +241,21 @@ TransportSocket::ShouldShutdown TransportSocket::read_as_many_messages_as_possib return ShouldShutdown::Yes; if (acknowledged_fd_count > 0) { - m_fds_retained_until_received_by_peer->with_locked([&acknowledged_fd_count](auto& queue) { - while (acknowledged_fd_count > 0) { - queue.dequeue(); - --acknowledged_fd_count; - } - }); + while (acknowledged_fd_count > 0) { + (void)m_fds_retained_until_received_by_peer->dequeue(); + --acknowledged_fd_count; + } } if (received_fd_count > 0) { + Vector message_buffer; + message_buffer.resize(sizeof(MessageHeader)); MessageHeader header; header.payload_size = 0; header.fd_count = received_fd_count; header.type = MessageHeader::Type::FileDescriptorAcknowledgement; - MUST(transfer({ &header, sizeof(MessageHeader) }, {})); + memcpy(message_buffer.data(), &header, sizeof(MessageHeader)); + queue_message_on_send_thread({ move(message_buffer), {} }); } if (index < m_unprocessed_bytes.size()) { diff --git a/Libraries/LibIPC/TransportSocket.h b/Libraries/LibIPC/TransportSocket.h index 39c092cf827..a1e7257601d 100644 --- a/Libraries/LibIPC/TransportSocket.h +++ b/Libraries/LibIPC/TransportSocket.h @@ -8,11 +8,40 @@ #pragma once #include +#include #include +#include #include +#include namespace IPC { +class AutoCloseFileDescriptor : public RefCounted { +public: + AutoCloseFileDescriptor(int fd) + : m_fd(fd) + { + } + + ~AutoCloseFileDescriptor() + { + if (m_fd != -1) + (void)Core::System::close(m_fd); + } + + int value() const { return m_fd; } + + int take_fd() + { + int fd = m_fd; + m_fd = -1; + return fd; + } + +private: + int m_fd; +}; + class TransportSocket { AK_MAKE_NONCOPYABLE(TransportSocket); AK_MAKE_DEFAULT_MOVABLE(TransportSocket); @@ -29,7 +58,7 @@ public: void wait_until_readable(); - ErrorOr transfer_message(ReadonlyBytes, Vector const& unowned_fds); + void post_message(Vector const&, Vector> const&) const; enum class ShouldShutdown { No, @@ -47,17 +76,30 @@ public: ErrorOr clone_for_transfer(); private: - ErrorOr transfer(ReadonlyBytes, Vector const& unowned_fds); + static ErrorOr send_message(Core::LocalSocket&, ReadonlyBytes&&, Vector const& unowned_fds); NonnullOwnPtr m_socket; - NonnullOwnPtr m_socket_write_mutex; ByteBuffer m_unprocessed_bytes; UnprocessedFileDescriptors m_unprocessed_fds; // After file descriptor is sent, it is moved to the wait queue until an acknowledgement is received from the peer. // This is necessary to handle a specific behavior of the macOS kernel, which may prematurely garbage-collect the file // descriptor contained in the message before the peer receives it. https://openradar.me/9477351 - NonnullOwnPtr>> m_fds_retained_until_received_by_peer; + NonnullOwnPtr>> m_fds_retained_until_received_by_peer; + + struct MessageToSend { + Vector bytes; + Vector fds; + }; + struct SendQueue : public AtomicRefCounted { + AK::SinglyLinkedList messages; + Threading::Mutex mutex; + Threading::ConditionVariable condition { mutex }; + bool running { true }; + }; + RefPtr m_send_thread; + RefPtr m_send_queue; + void queue_message_on_send_thread(MessageToSend&&) const; }; }