LibIPC: Change TransportSocket to write large messages in small chunks

Bring back d6080d1fdc with a missing check
whether underlying socket is closed, before accessing `fd()` that is
optional and empty in case of closed socket.
This commit is contained in:
Aliaksandr Kalenik 2025-04-09 20:54:41 +02:00
commit 58fd6f64ac
2 changed files with 98 additions and 65 deletions

View file

@ -13,26 +13,79 @@
namespace IPC { namespace IPC {
void SendQueue::enqueue_message(Vector<u8>&& bytes, Vector<int>&& fds)
{
Threading::MutexLocker locker(m_mutex);
m_bytes.append(bytes.data(), bytes.size());
m_fds.append(fds.data(), fds.size());
m_condition.signal();
}
SendQueue::Running SendQueue::block_until_message_enqueued()
{
Threading::MutexLocker locker(m_mutex);
while (m_bytes.is_empty() && m_fds.is_empty() && m_running)
m_condition.wait();
return m_running ? Running::Yes : Running::No;
}
SendQueue::BytesAndFds SendQueue::dequeue(size_t max_bytes)
{
Threading::MutexLocker locker(m_mutex);
auto bytes_to_send = min(max_bytes, m_bytes.size());
Vector<u8> bytes;
bytes.append(m_bytes.data(), bytes_to_send);
m_bytes.remove(0, bytes_to_send);
return { move(bytes), move(m_fds) };
}
void SendQueue::return_unsent_data_to_front_of_queue(ReadonlyBytes const& bytes, Vector<int> const& fds)
{
Threading::MutexLocker locker(m_mutex);
m_bytes.prepend(bytes.data(), bytes.size());
m_fds.prepend(fds.data(), fds.size());
}
void SendQueue::stop()
{
Threading::MutexLocker locker(m_mutex);
m_running = false;
m_condition.signal();
}
TransportSocket::TransportSocket(NonnullOwnPtr<Core::LocalSocket> socket) TransportSocket::TransportSocket(NonnullOwnPtr<Core::LocalSocket> socket)
: m_socket(move(socket)) : m_socket(move(socket))
{ {
m_send_queue = adopt_ref(*new SendQueue); m_send_queue = adopt_ref(*new SendQueue);
m_send_thread = Threading::Thread::construct([this, send_queue = m_send_queue]() -> intptr_t { m_send_thread = Threading::Thread::construct([this, send_queue = m_send_queue]() -> intptr_t {
for (;;) { for (;;) {
send_queue->mutex.lock(); if (send_queue->block_until_message_enqueued() == SendQueue::Running::No)
while (send_queue->messages.is_empty() && send_queue->running)
send_queue->condition.wait();
if (!send_queue->running) {
send_queue->mutex.unlock();
break; break;
auto [bytes, fds] = send_queue->dequeue(4096);
ReadonlyBytes remaining_to_send_bytes = bytes;
auto result = send_message(*m_socket, remaining_to_send_bytes, fds);
if (result.is_error()) {
dbgln("TransportSocket::send_thread: {}", result.error());
VERIFY_NOT_REACHED();
} }
auto [bytes, fds] = send_queue->messages.take_first(); if (!remaining_to_send_bytes.is_empty() || !fds.is_empty()) {
send_queue->mutex.unlock(); send_queue->return_unsent_data_to_front_of_queue(remaining_to_send_bytes, fds);
}
if (auto result = send_message(*m_socket, bytes, fds); result.is_error()) { if (!m_socket->is_open())
dbgln("TransportSocket::send_thread: {}", result.error()); break;
{
Vector<struct pollfd, 1> pollfds;
pollfds.append({ .fd = m_socket->fd().value(), .events = POLLOUT, .revents = 0 });
ErrorOr<int> result { 0 };
do {
result = Core::System::poll(pollfds, -1);
} while (result.is_error() && result.error().code() == EINTR);
} }
} }
return 0; return 0;
@ -45,11 +98,7 @@ TransportSocket::TransportSocket(NonnullOwnPtr<Core::LocalSocket> socket)
TransportSocket::~TransportSocket() TransportSocket::~TransportSocket()
{ {
{ m_send_queue->stop();
Threading::MutexLocker locker(m_send_queue->mutex);
m_send_queue->running = false;
m_send_queue->condition.signal();
}
(void)m_send_thread->join(); (void)m_send_thread->join();
} }
@ -114,61 +163,31 @@ void TransportSocket::post_message(Vector<u8> const& bytes_to_write, Vector<Nonn
} }
} }
queue_message_on_send_thread({ move(message_buffer), move(raw_fds) }); m_send_queue->enqueue_message(move(message_buffer), move(raw_fds));
} }
void TransportSocket::queue_message_on_send_thread(MessageToSend&& message_to_send) const ErrorOr<void> TransportSocket::send_message(Core::LocalSocket& socket, ReadonlyBytes& bytes_to_write, Vector<int>& unowned_fds)
{
Threading::MutexLocker lock(m_send_queue->mutex);
m_send_queue->messages.append(move(message_to_send));
m_send_queue->condition.signal();
}
ErrorOr<void> TransportSocket::send_message(Core::LocalSocket& socket, ReadonlyBytes&& bytes_to_write, Vector<int, 1> const& unowned_fds)
{ {
auto num_fds_to_transfer = unowned_fds.size(); auto num_fds_to_transfer = unowned_fds.size();
while (!bytes_to_write.is_empty()) { while (!bytes_to_write.is_empty()) {
ErrorOr<ssize_t> maybe_nwritten = 0; ErrorOr<ssize_t> maybe_nwritten = 0;
if (num_fds_to_transfer > 0) { if (num_fds_to_transfer > 0) {
maybe_nwritten = socket.send_message(bytes_to_write, 0, unowned_fds); maybe_nwritten = socket.send_message(bytes_to_write, 0, unowned_fds);
if (!maybe_nwritten.is_error())
num_fds_to_transfer = 0;
} else { } else {
maybe_nwritten = socket.write_some(bytes_to_write); maybe_nwritten = socket.write_some(bytes_to_write);
} }
if (maybe_nwritten.is_error()) { if (maybe_nwritten.is_error()) {
if (auto error = maybe_nwritten.release_error(); error.is_errno() && (error.code() == EAGAIN || error.code() == EWOULDBLOCK)) { if (auto error = maybe_nwritten.release_error(); error.is_errno() && (error.code() == EAGAIN || error.code() == EWOULDBLOCK || error.code() == EINTR)) {
return {};
// FIXME: Refactor this to pass the unwritten bytes back to the caller to send 'later'
// or next time the socket is writable
Vector<struct pollfd, 1> pollfds;
if (pollfds.is_empty())
pollfds.append({ .fd = socket.fd().value(), .events = POLLOUT, .revents = 0 });
ErrorOr<int> result { 0 };
do {
constexpr u32 POLL_TIMEOUT_MS = 100;
result = Core::System::poll(pollfds, POLL_TIMEOUT_MS);
} while (result.is_error() && result.error().code() == EINTR);
if (!result.is_error() && result.value() != 0)
continue;
switch (error.code()) {
case EPIPE:
return Error::from_string_literal("IPC::transfer_message: Disconnected from peer");
case EAGAIN:
return Error::from_string_literal("IPC::transfer_message: Timed out waiting for socket to become writable");
default:
return Error::from_syscall("IPC::transfer_message write"sv, -error.code());
}
} else { } else {
return error; return error;
} }
} }
bytes_to_write = bytes_to_write.slice(maybe_nwritten.value()); bytes_to_write = bytes_to_write.slice(maybe_nwritten.value());
num_fds_to_transfer = 0;
unowned_fds.clear();
} }
return {}; return {};
} }
@ -252,7 +271,7 @@ TransportSocket::ShouldShutdown TransportSocket::read_as_many_messages_as_possib
header.fd_count = received_fd_count; header.fd_count = received_fd_count;
header.type = MessageHeader::Type::FileDescriptorAcknowledgement; header.type = MessageHeader::Type::FileDescriptorAcknowledgement;
memcpy(message_buffer.data(), &header, sizeof(MessageHeader)); memcpy(message_buffer.data(), &header, sizeof(MessageHeader));
queue_message_on_send_thread({ move(message_buffer), {} }); m_send_queue->enqueue_message(move(message_buffer), {});
} }
if (index < m_unprocessed_bytes.size()) { if (index < m_unprocessed_bytes.size()) {

View file

@ -42,6 +42,31 @@ private:
int m_fd; int m_fd;
}; };
class SendQueue : public AtomicRefCounted<SendQueue> {
public:
enum class Running {
No,
Yes,
};
Running block_until_message_enqueued();
void stop();
void enqueue_message(Vector<u8>&& bytes, Vector<int>&& fds);
struct BytesAndFds {
Vector<u8> bytes;
Vector<int> fds;
};
BytesAndFds dequeue(size_t max_bytes);
void return_unsent_data_to_front_of_queue(ReadonlyBytes const& bytes, Vector<int> const& fds);
private:
Vector<u8> m_bytes;
Vector<int> m_fds;
Threading::Mutex m_mutex;
Threading::ConditionVariable m_condition { m_mutex };
bool m_running { true };
};
class TransportSocket { class TransportSocket {
AK_MAKE_NONCOPYABLE(TransportSocket); AK_MAKE_NONCOPYABLE(TransportSocket);
AK_MAKE_NONMOVABLE(TransportSocket); AK_MAKE_NONMOVABLE(TransportSocket);
@ -68,7 +93,7 @@ public:
Vector<u8> bytes; Vector<u8> bytes;
Vector<File> fds; Vector<File> fds;
}; };
ShouldShutdown read_as_many_messages_as_possible_without_blocking(Function<void(Message)>&& schedule_shutdown); ShouldShutdown read_as_many_messages_as_possible_without_blocking(Function<void(Message)>&&);
// Obnoxious name to make it clear that this is a dangerous operation. // Obnoxious name to make it clear that this is a dangerous operation.
ErrorOr<int> release_underlying_transport_for_transfer(); ErrorOr<int> release_underlying_transport_for_transfer();
@ -76,7 +101,7 @@ public:
ErrorOr<IPC::File> clone_for_transfer(); ErrorOr<IPC::File> clone_for_transfer();
private: private:
static ErrorOr<void> send_message(Core::LocalSocket&, ReadonlyBytes&&, Vector<int, 1> const& unowned_fds); static ErrorOr<void> send_message(Core::LocalSocket&, ReadonlyBytes& bytes, Vector<int>& unowned_fds);
NonnullOwnPtr<Core::LocalSocket> m_socket; NonnullOwnPtr<Core::LocalSocket> m_socket;
ByteBuffer m_unprocessed_bytes; ByteBuffer m_unprocessed_bytes;
@ -87,19 +112,8 @@ private:
// descriptor contained in the message before the peer receives it. https://openradar.me/9477351 // descriptor contained in the message before the peer receives it. https://openradar.me/9477351
Queue<NonnullRefPtr<AutoCloseFileDescriptor>> m_fds_retained_until_received_by_peer; Queue<NonnullRefPtr<AutoCloseFileDescriptor>> m_fds_retained_until_received_by_peer;
struct MessageToSend {
Vector<u8> bytes;
Vector<int, 1> fds;
};
struct SendQueue : public AtomicRefCounted<SendQueue> {
AK::SinglyLinkedList<MessageToSend> messages;
Threading::Mutex mutex;
Threading::ConditionVariable condition { mutex };
bool running { true };
};
RefPtr<Threading::Thread> m_send_thread; RefPtr<Threading::Thread> m_send_thread;
RefPtr<SendQueue> m_send_queue; RefPtr<SendQueue> m_send_queue;
void queue_message_on_send_thread(MessageToSend&&) const;
}; };
} }