mirror of
https://github.com/LadybirdBrowser/ladybird.git
synced 2025-04-23 21:15:14 +00:00
LibIPC+Meta: Keep message buffer alive until acknowledged by peer
This change ensures that instead of immediately deallocating the message buffer after sending, we retain it in an acknowledgement wait queue until an acknowledgement is received from the peer. This is necessary to handle a behavior of the macOS kernel, which may prematurely garbage-collect file descriptors contained within the message buffer before the peer receives them. The acknowledgement mechanism assumes messages are received in the same order they were sent so, each acknowledgement message simply indicates the count of successfully received messages, specifying how many entries can safely be removed from the acknowledgement wait queue.
This commit is contained in:
parent
15e2c78e9a
commit
c3121c9d8a
Notes:
github-actions[bot]
2025-04-05 21:15:26 +00:00
Author: https://github.com/kalenikaliaksandr Commit: https://github.com/LadybirdBrowser/ladybird/commit/c3121c9d8ad Pull-request: https://github.com/LadybirdBrowser/ladybird/pull/4230
5 changed files with 121 additions and 16 deletions
|
@ -14,10 +14,11 @@
|
|||
|
||||
namespace IPC {
|
||||
|
||||
ConnectionBase::ConnectionBase(IPC::Stub& local_stub, Transport transport, u32 local_endpoint_magic)
|
||||
ConnectionBase::ConnectionBase(IPC::Stub& local_stub, Transport transport, u32 local_endpoint_magic, u32 peer_endpoint_magic)
|
||||
: m_local_stub(local_stub)
|
||||
, m_transport(move(transport))
|
||||
, m_local_endpoint_magic(local_endpoint_magic)
|
||||
, m_peer_endpoint_magic(peer_endpoint_magic)
|
||||
{
|
||||
m_responsiveness_timer = Core::Timer::create_single_shot(3000, [this] { may_have_become_unresponsive(); });
|
||||
|
||||
|
@ -29,21 +30,27 @@ ConnectionBase::ConnectionBase(IPC::Stub& local_stub, Transport transport, u32 l
|
|||
});
|
||||
|
||||
m_send_queue = adopt_ref(*new SendQueue);
|
||||
m_send_thread = Threading::Thread::construct([this, queue = m_send_queue]() -> intptr_t {
|
||||
m_acknowledgement_wait_queue = adopt_ref(*new AcknowledgementWaitQueue);
|
||||
m_send_thread = Threading::Thread::construct([this, send_queue = m_send_queue, acknowledgement_wait_queue = m_acknowledgement_wait_queue]() -> intptr_t {
|
||||
for (;;) {
|
||||
queue->mutex.lock();
|
||||
while (queue->messages.is_empty() && queue->running)
|
||||
queue->condition.wait();
|
||||
send_queue->mutex.lock();
|
||||
while (send_queue->messages.is_empty() && send_queue->running)
|
||||
send_queue->condition.wait();
|
||||
|
||||
if (!queue->running) {
|
||||
queue->mutex.unlock();
|
||||
if (!send_queue->running) {
|
||||
send_queue->mutex.unlock();
|
||||
break;
|
||||
}
|
||||
|
||||
auto message = queue->messages.take_first();
|
||||
queue->mutex.unlock();
|
||||
auto [message_buffer, needs_acknowledgement] = send_queue->messages.take_first();
|
||||
send_queue->mutex.unlock();
|
||||
|
||||
if (auto result = message.transfer_message(m_transport); result.is_error()) {
|
||||
if (needs_acknowledgement == MessageNeedsAcknowledgement::Yes) {
|
||||
Threading::MutexLocker lock(acknowledgement_wait_queue->mutex);
|
||||
acknowledgement_wait_queue->messages.append(message_buffer);
|
||||
}
|
||||
|
||||
if (auto result = message_buffer.transfer_message(m_transport); result.is_error()) {
|
||||
dbgln("ConnectionBase::send_thread: {}", result.error());
|
||||
continue;
|
||||
}
|
||||
|
@ -73,7 +80,7 @@ ErrorOr<void> ConnectionBase::post_message(Message const& message)
|
|||
return post_message(message.endpoint_magic(), TRY(message.encode()));
|
||||
}
|
||||
|
||||
ErrorOr<void> ConnectionBase::post_message(u32 endpoint_magic, MessageBuffer buffer)
|
||||
ErrorOr<void> ConnectionBase::post_message(u32 endpoint_magic, MessageBuffer buffer, MessageNeedsAcknowledgement needs_acknowledgement)
|
||||
{
|
||||
// NOTE: If this connection is being shut down, but has not yet been destroyed,
|
||||
// the socket will be closed. Don't try to send more messages.
|
||||
|
@ -87,7 +94,7 @@ ErrorOr<void> ConnectionBase::post_message(u32 endpoint_magic, MessageBuffer buf
|
|||
|
||||
{
|
||||
Threading::MutexLocker locker(m_send_queue->mutex);
|
||||
m_send_queue->messages.append(move(buffer));
|
||||
m_send_queue->messages.append({ move(buffer), needs_acknowledgement });
|
||||
m_send_queue->condition.signal();
|
||||
}
|
||||
|
||||
|
@ -218,6 +225,8 @@ OwnPtr<IPC::Message> ConnectionBase::wait_for_specific_endpoint_message_impl(u32
|
|||
void ConnectionBase::try_parse_messages(Vector<u8> const& bytes, size_t& index)
|
||||
{
|
||||
u32 message_size = 0;
|
||||
u32 pending_ack_count = 0;
|
||||
u32 received_ack_count = 0;
|
||||
for (; index + sizeof(message_size) < bytes.size(); index += message_size) {
|
||||
memcpy(&message_size, bytes.data() + index, sizeof(message_size));
|
||||
if (message_size == 0 || bytes.size() - index - sizeof(uint32_t) < message_size)
|
||||
|
@ -232,9 +241,19 @@ void ConnectionBase::try_parse_messages(Vector<u8> const& bytes, size_t& index)
|
|||
m_unprocessed_fds.return_fds_to_front_of_queue(wrapper->take_fds());
|
||||
auto parsed_message = try_parse_message(wrapped_message, m_unprocessed_fds);
|
||||
VERIFY(parsed_message);
|
||||
VERIFY(parsed_message->message_id() != Acknowledgement::MESSAGE_ID);
|
||||
pending_ack_count++;
|
||||
m_unprocessed_messages.append(parsed_message.release_nonnull());
|
||||
continue;
|
||||
}
|
||||
|
||||
if (message->message_id() == Acknowledgement::MESSAGE_ID) {
|
||||
VERIFY(message->endpoint_magic() == m_local_endpoint_magic);
|
||||
received_ack_count += static_cast<Acknowledgement*>(message.ptr())->ack_count();
|
||||
continue;
|
||||
}
|
||||
|
||||
pending_ack_count++;
|
||||
m_unprocessed_messages.append(message.release_nonnull());
|
||||
continue;
|
||||
}
|
||||
|
@ -243,6 +262,17 @@ void ConnectionBase::try_parse_messages(Vector<u8> const& bytes, size_t& index)
|
|||
dbgln("{:hex-dump}", remaining_bytes);
|
||||
break;
|
||||
}
|
||||
|
||||
if (received_ack_count > 0) {
|
||||
Threading::MutexLocker lock(m_acknowledgement_wait_queue->mutex);
|
||||
for (size_t i = 0; i < received_ack_count; ++i)
|
||||
m_acknowledgement_wait_queue->messages.take_first();
|
||||
}
|
||||
|
||||
if (is_open() && pending_ack_count > 0) {
|
||||
auto acknowledgement = Acknowledgement::create(m_peer_endpoint_magic, pending_ack_count);
|
||||
MUST(post_message(m_peer_endpoint_magic, MUST(acknowledgement->encode()), MessageNeedsAcknowledgement::No));
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -12,6 +12,7 @@
|
|||
#include <LibCore/EventReceiver.h>
|
||||
#include <LibIPC/File.h>
|
||||
#include <LibIPC/Forward.h>
|
||||
#include <LibIPC/Message.h>
|
||||
#include <LibIPC/Transport.h>
|
||||
#include <LibIPC/UnprocessedFileDescriptors.h>
|
||||
#include <LibThreading/ConditionVariable.h>
|
||||
|
@ -26,9 +27,14 @@ class ConnectionBase : public Core::EventReceiver {
|
|||
public:
|
||||
virtual ~ConnectionBase() override;
|
||||
|
||||
enum class MessageNeedsAcknowledgement {
|
||||
No,
|
||||
Yes,
|
||||
};
|
||||
|
||||
[[nodiscard]] bool is_open() const;
|
||||
ErrorOr<void> post_message(Message const&);
|
||||
ErrorOr<void> post_message(u32 endpoint_magic, MessageBuffer);
|
||||
ErrorOr<void> post_message(u32 endpoint_magic, MessageBuffer, MessageNeedsAcknowledgement = MessageNeedsAcknowledgement::Yes);
|
||||
|
||||
void shutdown();
|
||||
virtual void die() { }
|
||||
|
@ -36,7 +42,7 @@ public:
|
|||
Transport& transport() { return m_transport; }
|
||||
|
||||
protected:
|
||||
explicit ConnectionBase(IPC::Stub&, Transport, u32 local_endpoint_magic);
|
||||
explicit ConnectionBase(IPC::Stub&, Transport, u32 local_endpoint_magic, u32 peer_endpoint_magic);
|
||||
|
||||
virtual void may_have_become_unresponsive() { }
|
||||
virtual void did_become_responsive() { }
|
||||
|
@ -62,23 +68,38 @@ protected:
|
|||
ByteBuffer m_unprocessed_bytes;
|
||||
|
||||
u32 m_local_endpoint_magic { 0 };
|
||||
u32 m_peer_endpoint_magic { 0 };
|
||||
|
||||
struct MessageToSend {
|
||||
MessageBuffer buffer;
|
||||
MessageNeedsAcknowledgement needs_acknowledgement { MessageNeedsAcknowledgement::Yes };
|
||||
};
|
||||
|
||||
struct SendQueue : public AtomicRefCounted<SendQueue> {
|
||||
AK::SinglyLinkedList<MessageBuffer> messages;
|
||||
AK::SinglyLinkedList<MessageToSend> messages;
|
||||
Threading::Mutex mutex;
|
||||
Threading::ConditionVariable condition { mutex };
|
||||
bool running { true };
|
||||
};
|
||||
|
||||
// After a message is sent, it is moved to the acknowledgement wait queue until an acknowledgement is received from the peer.
|
||||
// This is necessary to handle a specific behavior of the macOS kernel, which may prematurely garbage-collect the file
|
||||
// descriptor contained in the message before the peer receives it. https://openradar.me/9477351
|
||||
struct AcknowledgementWaitQueue : public AtomicRefCounted<AcknowledgementWaitQueue> {
|
||||
AK::SinglyLinkedList<MessageBuffer> messages;
|
||||
Threading::Mutex mutex;
|
||||
};
|
||||
|
||||
RefPtr<Threading::Thread> m_send_thread;
|
||||
RefPtr<SendQueue> m_send_queue;
|
||||
RefPtr<AcknowledgementWaitQueue> m_acknowledgement_wait_queue;
|
||||
};
|
||||
|
||||
template<typename LocalEndpoint, typename PeerEndpoint>
|
||||
class Connection : public ConnectionBase {
|
||||
public:
|
||||
Connection(IPC::Stub& local_stub, Transport transport)
|
||||
: ConnectionBase(local_stub, move(transport), LocalEndpoint::static_magic())
|
||||
: ConnectionBase(local_stub, move(transport), LocalEndpoint::static_magic(), PeerEndpoint::static_magic())
|
||||
{
|
||||
}
|
||||
|
||||
|
|
|
@ -111,4 +111,32 @@ ErrorOr<NonnullOwnPtr<LargeMessageWrapper>> LargeMessageWrapper::decode(u32 endp
|
|||
return make<LargeMessageWrapper>(endpoint_magic, wrapped_message_data, move(wrapped_fds));
|
||||
}
|
||||
|
||||
Acknowledgement::Acknowledgement(u32 endpoint_magic, u32 ack_count)
|
||||
: m_endpoint_magic(endpoint_magic)
|
||||
, m_ack_count(ack_count)
|
||||
{
|
||||
}
|
||||
|
||||
NonnullOwnPtr<Acknowledgement> Acknowledgement::create(u32 endpoint_magic, u32 ack_count)
|
||||
{
|
||||
return make<Acknowledgement>(endpoint_magic, ack_count);
|
||||
}
|
||||
|
||||
ErrorOr<MessageBuffer> Acknowledgement::encode() const
|
||||
{
|
||||
MessageBuffer buffer;
|
||||
Encoder stream { buffer };
|
||||
TRY(stream.encode(m_endpoint_magic));
|
||||
TRY(stream.encode(MESSAGE_ID));
|
||||
TRY(stream.encode(m_ack_count));
|
||||
return buffer;
|
||||
}
|
||||
|
||||
ErrorOr<NonnullOwnPtr<Acknowledgement>> Acknowledgement::decode(u32 endpoint_magic, Stream& stream, UnprocessedFileDescriptors& files)
|
||||
{
|
||||
Decoder decoder { stream, files };
|
||||
auto ack_count = TRY(decoder.decode<u32>());
|
||||
return make<Acknowledgement>(endpoint_magic, ack_count);
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -119,4 +119,28 @@ private:
|
|||
Vector<File> m_wrapped_fds;
|
||||
};
|
||||
|
||||
class Acknowledgement : public Message {
|
||||
public:
|
||||
~Acknowledgement() override = default;
|
||||
|
||||
static constexpr int MESSAGE_ID = 0xFFFFFFFF;
|
||||
|
||||
static NonnullOwnPtr<Acknowledgement> create(u32 endpoint_magic, u32 ack_count);
|
||||
|
||||
u32 endpoint_magic() const override { return m_endpoint_magic; }
|
||||
int message_id() const override { return MESSAGE_ID; }
|
||||
char const* message_name() const override { return "Acknowledgement"; }
|
||||
ErrorOr<MessageBuffer> encode() const override;
|
||||
|
||||
static ErrorOr<NonnullOwnPtr<Acknowledgement>> decode(u32 endpoint_magic, Stream& stream, UnprocessedFileDescriptors& files);
|
||||
|
||||
u32 ack_count() const { return m_ack_count; }
|
||||
|
||||
Acknowledgement(u32 endpoint_magic, u32 number_of_acknowledged_messages);
|
||||
|
||||
private:
|
||||
u32 m_endpoint_magic { 0 };
|
||||
u32 m_ack_count { 0 };
|
||||
};
|
||||
|
||||
}
|
||||
|
|
|
@ -760,6 +760,8 @@ public:
|
|||
generator.append(R"~~~(
|
||||
case (int)IPC::LargeMessageWrapper::MESSAGE_ID:
|
||||
return TRY(IPC::LargeMessageWrapper::decode(message_endpoint_magic, stream, files));
|
||||
case (int)IPC::Acknowledgement::MESSAGE_ID:
|
||||
return TRY(IPC::Acknowledgement::decode(message_endpoint_magic, stream, files));
|
||||
)~~~");
|
||||
|
||||
generator.append(R"~~~(
|
||||
|
|
Loading…
Add table
Reference in a new issue