From 8af2a49b5c0d351165965dae3af6e839981ac074 Mon Sep 17 00:00:00 2001 From: stasoid Date: Sat, 17 May 2025 21:31:11 +0500 Subject: [PATCH] LibIPC: Make TransportSocketWindows responsible for reading entire messages. Port of a371f84 to Windows. --- Libraries/LibIPC/Connection.cpp | 2 +- Libraries/LibIPC/MessageWindows.cpp | 8 +--- Libraries/LibIPC/TransportSocketWindows.cpp | 53 +++++++++++++++++---- Libraries/LibIPC/TransportSocketWindows.h | 17 +++++-- 4 files changed, 58 insertions(+), 22 deletions(-) diff --git a/Libraries/LibIPC/Connection.cpp b/Libraries/LibIPC/Connection.cpp index 8f15685393d..c6a2a306c82 100644 --- a/Libraries/LibIPC/Connection.cpp +++ b/Libraries/LibIPC/Connection.cpp @@ -109,7 +109,7 @@ ErrorOr ConnectionBase::drain_messages_from_peer() deferred_invoke([this] { handle_messages(); }); - } else if (schedule_shutdown == TransportSocket::ShouldShutdown::Yes) { + } else if (schedule_shutdown == Transport::ShouldShutdown::Yes) { deferred_invoke([this] { shutdown(); }); diff --git a/Libraries/LibIPC/MessageWindows.cpp b/Libraries/LibIPC/MessageWindows.cpp index e15126c2a97..3eb3dbc7197 100644 --- a/Libraries/LibIPC/MessageWindows.cpp +++ b/Libraries/LibIPC/MessageWindows.cpp @@ -17,7 +17,6 @@ using MessageSizeType = u32; MessageBuffer::MessageBuffer() { - m_data.resize(sizeof(MessageSizeType)); } ErrorOr MessageBuffer::extend_data_capacity(size_t capacity) @@ -59,15 +58,10 @@ ErrorOr MessageBuffer::append_file_descriptor(int handle) ErrorOr MessageBuffer::transfer_message(Transport& transport) { Checked checked_message_size { m_data.size() }; - checked_message_size -= sizeof(MessageSizeType); - if (checked_message_size.has_overflow()) return Error::from_string_literal("Message is too large for IPC encoding"); - MessageSizeType const message_size = checked_message_size.value(); - m_data.span().overwrite(0, reinterpret_cast(&message_size), sizeof(message_size)); - - TRY(transport.transfer(m_data.span(), m_handle_offsets)); + TRY(transport.transfer_message(m_data, m_handle_offsets)); return {}; } diff --git a/Libraries/LibIPC/TransportSocketWindows.cpp b/Libraries/LibIPC/TransportSocketWindows.cpp index e2c16a6dc2a..d840dff08d6 100644 --- a/Libraries/LibIPC/TransportSocketWindows.cpp +++ b/Libraries/LibIPC/TransportSocketWindows.cpp @@ -100,10 +100,26 @@ ErrorOr TransportSocketWindows::duplicate_handles(Bytes bytes, Vector TransportSocketWindows::transfer(Bytes bytes_to_write, Vector const& handle_offsets) -{ - TRY(duplicate_handles(bytes_to_write, handle_offsets)); +struct MessageHeader { + u32 size { 0 }; +}; +ErrorOr TransportSocketWindows::transfer_message(ReadonlyBytes bytes, Vector const& handle_offsets) +{ + Vector message_buffer; + message_buffer.resize(sizeof(MessageHeader) + bytes.size()); + MessageHeader header; + header.size = bytes.size(); + memcpy(message_buffer.data(), &header, sizeof(MessageHeader)); + memcpy(message_buffer.data() + sizeof(MessageHeader), bytes.data(), bytes.size()); + + TRY(duplicate_handles({ message_buffer.data() + sizeof(MessageHeader), bytes.size() }, handle_offsets)); + + return transfer(message_buffer.span()); +} + +ErrorOr TransportSocketWindows::transfer(ReadonlyBytes bytes_to_write) +{ while (!bytes_to_write.is_empty()) { ErrorOr maybe_nwritten = m_socket->write_some(bytes_to_write); @@ -132,9 +148,9 @@ ErrorOr TransportSocketWindows::transfer(Bytes bytes_to_write, Vector schedule_shutdown) +TransportSocketWindows::ShouldShutdown TransportSocketWindows::read_as_many_messages_as_possible_without_blocking(Function&& callback) { - ReadResult result; + auto should_shutdown = ShouldShutdown::No; while (is_open()) { @@ -146,7 +162,7 @@ TransportSocketWindows::ReadResult TransportSocketWindows::read_as_much_as_possi if (error.code() == EWOULDBLOCK) break; if (error.code() == ECONNRESET) { - schedule_shutdown(); + should_shutdown = ShouldShutdown::Yes; break; } VERIFY_NOT_REACHED(); @@ -154,14 +170,33 @@ TransportSocketWindows::ReadResult TransportSocketWindows::read_as_much_as_possi auto bytes_read = maybe_bytes_read.release_value(); if (bytes_read.is_empty()) { - schedule_shutdown(); + should_shutdown = ShouldShutdown::Yes; break; } - result.bytes.append(bytes_read.data(), bytes_read.size()); + m_unprocessed_bytes.append(bytes_read.data(), bytes_read.size()); } - return result; + size_t index = 0; + while (index + sizeof(MessageHeader) <= m_unprocessed_bytes.size()) { + MessageHeader header; + memcpy(&header, m_unprocessed_bytes.data() + index, sizeof(MessageHeader)); + if (header.size + sizeof(MessageHeader) > m_unprocessed_bytes.size() - index) + break; + Message message; + message.bytes.append(m_unprocessed_bytes.data() + index + sizeof(MessageHeader), header.size); + callback(move(message)); + index += header.size + sizeof(MessageHeader); + } + + if (index < m_unprocessed_bytes.size()) { + auto remaining_bytes = MUST(ByteBuffer::copy(m_unprocessed_bytes.span().slice(index))); + m_unprocessed_bytes = move(remaining_bytes); + } else { + m_unprocessed_bytes.clear(); + } + + return should_shutdown; } ErrorOr TransportSocketWindows::release_underlying_transport_for_transfer() diff --git a/Libraries/LibIPC/TransportSocketWindows.h b/Libraries/LibIPC/TransportSocketWindows.h index 3c4102b759c..dcac37f02fd 100644 --- a/Libraries/LibIPC/TransportSocketWindows.h +++ b/Libraries/LibIPC/TransportSocketWindows.h @@ -7,6 +7,7 @@ #pragma once +#include #include #include @@ -26,13 +27,17 @@ public: void wait_until_readable(); - ErrorOr transfer(Bytes, Vector const& handle_offsets); + ErrorOr transfer_message(ReadonlyBytes, Vector const& handle_offsets); - struct [[nodiscard]] ReadResult { - Vector bytes; - Vector fds; // always empty, present to avoid OS #ifdefs in Connection.cpp + enum class ShouldShutdown { + No, + Yes, }; - ReadResult read_as_much_as_possible_without_blocking(Function schedule_shutdown); + struct Message { + Vector bytes; + Queue fds; // always empty, present to avoid OS #ifdefs in Connection.cpp + }; + ShouldShutdown read_as_many_messages_as_possible_without_blocking(Function&&); // Obnoxious name to make it clear that this is a dangerous operation. ErrorOr release_underlying_transport_for_transfer(); @@ -41,9 +46,11 @@ public: private: ErrorOr duplicate_handles(Bytes, Vector const& handle_offsets); + ErrorOr transfer(ReadonlyBytes); private: NonnullOwnPtr m_socket; + ByteBuffer m_unprocessed_bytes; int m_peer_pid = -1; };