LibIPC: Make TransportSocketWindows responsible for reading entire

messages. Port of a371f84 to Windows.
This commit is contained in:
stasoid 2025-05-17 21:31:11 +05:00 committed by Andrew Kaster
commit 8af2a49b5c
Notes: github-actions[bot] 2025-06-17 21:38:09 +00:00
4 changed files with 58 additions and 22 deletions

View file

@ -109,7 +109,7 @@ ErrorOr<void> ConnectionBase::drain_messages_from_peer()
deferred_invoke([this] { deferred_invoke([this] {
handle_messages(); handle_messages();
}); });
} else if (schedule_shutdown == TransportSocket::ShouldShutdown::Yes) { } else if (schedule_shutdown == Transport::ShouldShutdown::Yes) {
deferred_invoke([this] { deferred_invoke([this] {
shutdown(); shutdown();
}); });

View file

@ -17,7 +17,6 @@ using MessageSizeType = u32;
MessageBuffer::MessageBuffer() MessageBuffer::MessageBuffer()
{ {
m_data.resize(sizeof(MessageSizeType));
} }
ErrorOr<void> MessageBuffer::extend_data_capacity(size_t capacity) ErrorOr<void> MessageBuffer::extend_data_capacity(size_t capacity)
@ -59,15 +58,10 @@ ErrorOr<void> MessageBuffer::append_file_descriptor(int handle)
ErrorOr<void> MessageBuffer::transfer_message(Transport& transport) ErrorOr<void> MessageBuffer::transfer_message(Transport& transport)
{ {
Checked<MessageSizeType> checked_message_size { m_data.size() }; Checked<MessageSizeType> checked_message_size { m_data.size() };
checked_message_size -= sizeof(MessageSizeType);
if (checked_message_size.has_overflow()) if (checked_message_size.has_overflow())
return Error::from_string_literal("Message is too large for IPC encoding"); return Error::from_string_literal("Message is too large for IPC encoding");
MessageSizeType const message_size = checked_message_size.value(); TRY(transport.transfer_message(m_data, m_handle_offsets));
m_data.span().overwrite(0, reinterpret_cast<u8 const*>(&message_size), sizeof(message_size));
TRY(transport.transfer(m_data.span(), m_handle_offsets));
return {}; return {};
} }

View file

@ -100,10 +100,26 @@ ErrorOr<void> TransportSocketWindows::duplicate_handles(Bytes bytes, Vector<size
return {}; return {};
} }
ErrorOr<void> TransportSocketWindows::transfer(Bytes bytes_to_write, Vector<size_t> const& handle_offsets) struct MessageHeader {
{ u32 size { 0 };
TRY(duplicate_handles(bytes_to_write, handle_offsets)); };
ErrorOr<void> TransportSocketWindows::transfer_message(ReadonlyBytes bytes, Vector<size_t> const& handle_offsets)
{
Vector<u8> message_buffer;
message_buffer.resize(sizeof(MessageHeader) + bytes.size());
MessageHeader header;
header.size = bytes.size();
memcpy(message_buffer.data(), &header, sizeof(MessageHeader));
memcpy(message_buffer.data() + sizeof(MessageHeader), bytes.data(), bytes.size());
TRY(duplicate_handles({ message_buffer.data() + sizeof(MessageHeader), bytes.size() }, handle_offsets));
return transfer(message_buffer.span());
}
ErrorOr<void> TransportSocketWindows::transfer(ReadonlyBytes bytes_to_write)
{
while (!bytes_to_write.is_empty()) { while (!bytes_to_write.is_empty()) {
ErrorOr<size_t> maybe_nwritten = m_socket->write_some(bytes_to_write); ErrorOr<size_t> maybe_nwritten = m_socket->write_some(bytes_to_write);
@ -132,9 +148,9 @@ ErrorOr<void> TransportSocketWindows::transfer(Bytes bytes_to_write, Vector<size
return {}; return {};
} }
TransportSocketWindows::ReadResult TransportSocketWindows::read_as_much_as_possible_without_blocking(Function<void()> schedule_shutdown) TransportSocketWindows::ShouldShutdown TransportSocketWindows::read_as_many_messages_as_possible_without_blocking(Function<void(Message&&)>&& callback)
{ {
ReadResult result; auto should_shutdown = ShouldShutdown::No;
while (is_open()) { while (is_open()) {
@ -146,7 +162,7 @@ TransportSocketWindows::ReadResult TransportSocketWindows::read_as_much_as_possi
if (error.code() == EWOULDBLOCK) if (error.code() == EWOULDBLOCK)
break; break;
if (error.code() == ECONNRESET) { if (error.code() == ECONNRESET) {
schedule_shutdown(); should_shutdown = ShouldShutdown::Yes;
break; break;
} }
VERIFY_NOT_REACHED(); VERIFY_NOT_REACHED();
@ -154,14 +170,33 @@ TransportSocketWindows::ReadResult TransportSocketWindows::read_as_much_as_possi
auto bytes_read = maybe_bytes_read.release_value(); auto bytes_read = maybe_bytes_read.release_value();
if (bytes_read.is_empty()) { if (bytes_read.is_empty()) {
schedule_shutdown(); should_shutdown = ShouldShutdown::Yes;
break; break;
} }
result.bytes.append(bytes_read.data(), bytes_read.size()); m_unprocessed_bytes.append(bytes_read.data(), bytes_read.size());
} }
return result; size_t index = 0;
while (index + sizeof(MessageHeader) <= m_unprocessed_bytes.size()) {
MessageHeader header;
memcpy(&header, m_unprocessed_bytes.data() + index, sizeof(MessageHeader));
if (header.size + sizeof(MessageHeader) > m_unprocessed_bytes.size() - index)
break;
Message message;
message.bytes.append(m_unprocessed_bytes.data() + index + sizeof(MessageHeader), header.size);
callback(move(message));
index += header.size + sizeof(MessageHeader);
}
if (index < m_unprocessed_bytes.size()) {
auto remaining_bytes = MUST(ByteBuffer::copy(m_unprocessed_bytes.span().slice(index)));
m_unprocessed_bytes = move(remaining_bytes);
} else {
m_unprocessed_bytes.clear();
}
return should_shutdown;
} }
ErrorOr<int> TransportSocketWindows::release_underlying_transport_for_transfer() ErrorOr<int> TransportSocketWindows::release_underlying_transport_for_transfer()

View file

@ -7,6 +7,7 @@
#pragma once #pragma once
#include <AK/Queue.h>
#include <LibCore/Socket.h> #include <LibCore/Socket.h>
#include <LibIPC/File.h> #include <LibIPC/File.h>
@ -26,13 +27,17 @@ public:
void wait_until_readable(); void wait_until_readable();
ErrorOr<void> transfer(Bytes, Vector<size_t> const& handle_offsets); ErrorOr<void> transfer_message(ReadonlyBytes, Vector<size_t> const& handle_offsets);
struct [[nodiscard]] ReadResult { enum class ShouldShutdown {
Vector<u8> bytes; No,
Vector<int> fds; // always empty, present to avoid OS #ifdefs in Connection.cpp Yes,
}; };
ReadResult read_as_much_as_possible_without_blocking(Function<void()> schedule_shutdown); struct Message {
Vector<u8> bytes;
Queue<File> fds; // always empty, present to avoid OS #ifdefs in Connection.cpp
};
ShouldShutdown read_as_many_messages_as_possible_without_blocking(Function<void(Message&&)>&&);
// Obnoxious name to make it clear that this is a dangerous operation. // Obnoxious name to make it clear that this is a dangerous operation.
ErrorOr<int> release_underlying_transport_for_transfer(); ErrorOr<int> release_underlying_transport_for_transfer();
@ -41,9 +46,11 @@ public:
private: private:
ErrorOr<void> duplicate_handles(Bytes, Vector<size_t> const& handle_offsets); ErrorOr<void> duplicate_handles(Bytes, Vector<size_t> const& handle_offsets);
ErrorOr<void> transfer(ReadonlyBytes);
private: private:
NonnullOwnPtr<Core::LocalSocket> m_socket; NonnullOwnPtr<Core::LocalSocket> m_socket;
ByteBuffer m_unprocessed_bytes;
int m_peer_pid = -1; int m_peer_pid = -1;
}; };