From 03256a2543e6bc424501bfe28a9d79b5d55bd1c1 Mon Sep 17 00:00:00 2001 From: Andreas Kling Date: Wed, 16 Jul 2025 12:29:26 +0200 Subject: [PATCH] LibWeb: Add "parallel queue" and allow it as fetch task destination Note that it's not actually executing tasks in parallel, it's still throwing them on the HTML event loop task queue, each with its own unique task source. This makes our fetch implementation a lot more robust when HTTP caching is enabled, and you can now click links on https://terminal.shop/ without hitting TODO assertions in fetch. --- .../Fetch/Fetching/FetchedDataReceiver.cpp | 2 +- Libraries/LibWeb/Fetch/Fetching/Fetching.cpp | 24 +++++++------------ .../Fetch/Infrastructure/HTTP/Bodies.cpp | 24 +++++++++---------- .../LibWeb/Fetch/Infrastructure/HTTP/Bodies.h | 6 ++--- .../IncrementalReadLoopReadRequest.cpp | 5 ++-- .../IncrementalReadLoopReadRequest.h | 4 ++-- .../LibWeb/Fetch/Infrastructure/Task.cpp | 15 ++++++++---- Libraries/LibWeb/Fetch/Infrastructure/Task.h | 7 +++--- Libraries/LibWeb/HTML/EventLoop/Task.cpp | 13 ++++++++++ Libraries/LibWeb/HTML/EventLoop/Task.h | 9 +++++++ 10 files changed, 65 insertions(+), 44 deletions(-) diff --git a/Libraries/LibWeb/Fetch/Fetching/FetchedDataReceiver.cpp b/Libraries/LibWeb/Fetch/Fetching/FetchedDataReceiver.cpp index d689f0023f8..d3d6bdcd284 100644 --- a/Libraries/LibWeb/Fetch/Fetching/FetchedDataReceiver.cpp +++ b/Libraries/LibWeb/Fetch/Fetching/FetchedDataReceiver.cpp @@ -63,7 +63,7 @@ void FetchedDataReceiver::on_data_received(ReadonlyBytes bytes) // 3. Queue a fetch task to run the following steps, with fetchParams’s task destination. Infrastructure::queue_fetch_task( m_fetch_params->controller(), - m_fetch_params->task_destination().get>(), + m_fetch_params->task_destination(), GC::create_function(heap(), [this, bytes = MUST(ByteBuffer::copy(bytes))]() mutable { HTML::TemporaryExecutionContext execution_context { m_stream->realm(), HTML::TemporaryExecutionContext::CallbacksEnabled::Yes }; diff --git a/Libraries/LibWeb/Fetch/Fetching/Fetching.cpp b/Libraries/LibWeb/Fetch/Fetching/Fetching.cpp index dc617d32ada..389c3dc15fd 100644 --- a/Libraries/LibWeb/Fetch/Fetching/Fetching.cpp +++ b/Libraries/LibWeb/Fetch/Fetching/Fetching.cpp @@ -89,7 +89,7 @@ WebIDL::ExceptionOr> fetch(JS::Realm& r VERIFY(request.mode() == Infrastructure::Request::Mode::Navigate || !algorithms.process_early_hints_response()); // 2. Let taskDestination be null. - GC::Ptr task_destination; + Infrastructure::TaskDestination task_destination; // 3. Let crossOriginIsolatedCapability be false. auto cross_origin_isolated_capability = HTML::CanUseCrossOriginIsolatedAPIs::No; @@ -97,14 +97,15 @@ WebIDL::ExceptionOr> fetch(JS::Realm& r // 4. If request’s client is non-null, then: if (request.client() != nullptr) { // 1. Set taskDestination to request’s client’s global object. - task_destination = request.client()->global_object(); + task_destination = GC::Ref { request.client()->global_object() }; // 2. Set crossOriginIsolatedCapability to request’s client’s cross-origin isolated capability. cross_origin_isolated_capability = request.client()->cross_origin_isolated_capability(); } - // FIXME: 5. If useParallelQueue is true, then set taskDestination to the result of starting a new parallel queue. - (void)use_parallel_queue; + // 5. If useParallelQueue is true, then set taskDestination to the result of starting a new parallel queue. + if (use_parallel_queue == UseParallelQueue::Yes) + task_destination = HTML::ParallelQueue::create(); // 6. Let timingInfo be a new fetch timing info whose start time and post-redirect start time are the coarsened // shared current time given crossOriginIsolatedCapability, and render-blocking is set to request’s @@ -122,8 +123,7 @@ WebIDL::ExceptionOr> fetch(JS::Realm& r // task destination is taskDestination, and cross-origin isolated capability is crossOriginIsolatedCapability. auto fetch_params = Infrastructure::FetchParams::create(vm, request, timing_info); fetch_params->set_algorithms(algorithms); - if (task_destination) - fetch_params->set_task_destination({ *task_destination }); + fetch_params->set_task_destination(task_destination); fetch_params->set_cross_origin_isolated_capability(cross_origin_isolated_capability); // 8. If request’s body is a byte sequence, then set request’s body to request’s body as a body. @@ -724,20 +724,14 @@ void fetch_response_handover(JS::Realm& realm, Infrastructure::FetchParams const } }); - // FIXME: Handle 'parallel queue' task destination - auto task_destination = fetch_params.task_destination().get>(); - // 5. Queue a fetch task to run processResponseEndOfBodyTask with fetchParams’s task destination. - Infrastructure::queue_fetch_task(fetch_params.controller(), task_destination, move(process_response_end_of_body_task)); + Infrastructure::queue_fetch_task(fetch_params.controller(), fetch_params.task_destination(), move(process_response_end_of_body_task)); }; - // FIXME: Handle 'parallel queue' task destination - auto task_destination = fetch_params.task_destination().get>(); - // 4. If fetchParams’s process response is non-null, then queue a fetch task to run fetchParams’s process response // given response, with fetchParams’s task destination. if (fetch_params.algorithms()->process_response()) { - Infrastructure::queue_fetch_task(fetch_params.controller(), task_destination, GC::create_function(vm.heap(), [&fetch_params, &response]() { + Infrastructure::queue_fetch_task(fetch_params.controller(), fetch_params.task_destination(), GC::create_function(vm.heap(), [&fetch_params, &response]() { fetch_params.algorithms()->process_response()(response); })); } @@ -791,7 +785,7 @@ void fetch_response_handover(JS::Realm& realm, Infrastructure::FetchParams const // 3. If internalResponse's body is null, then queue a fetch task to run processBody given null, with // fetchParams’s task destination. if (!internal_response->body()) { - Infrastructure::queue_fetch_task(fetch_params.controller(), task_destination, GC::create_function(vm.heap(), [process_body]() { + Infrastructure::queue_fetch_task(fetch_params.controller(), fetch_params.task_destination(), GC::create_function(vm.heap(), [process_body]() { process_body->function()({}); })); } diff --git a/Libraries/LibWeb/Fetch/Infrastructure/HTTP/Bodies.cpp b/Libraries/LibWeb/Fetch/Infrastructure/HTTP/Bodies.cpp index 3da29a57dfe..73d5a2d8b7f 100644 --- a/Libraries/LibWeb/Fetch/Infrastructure/HTTP/Bodies.cpp +++ b/Libraries/LibWeb/Fetch/Infrastructure/HTTP/Bodies.cpp @@ -68,22 +68,21 @@ void Body::fully_read(JS::Realm& realm, Web::Fetch::Infrastructure::Body::Proces { HTML::TemporaryExecutionContext execution_context { realm, HTML::TemporaryExecutionContext::CallbacksEnabled::Yes }; - // FIXME: 1. If taskDestination is null, then set taskDestination to the result of starting a new parallel queue. - // FIXME: Handle 'parallel queue' task destination - VERIFY(!task_destination.has()); - auto task_destination_object = task_destination.get>(); + // 1. If taskDestination is null, then set taskDestination to the result of starting a new parallel queue. + if (task_destination.has()) + task_destination = HTML::ParallelQueue::create(); // 2. Let successSteps given a byte sequence bytes be to queue a fetch task to run processBody given bytes, with taskDestination. - auto success_steps = [&realm, process_body, task_destination_object](ByteBuffer bytes) { - queue_fetch_task(*task_destination_object, GC::create_function(realm.heap(), [process_body, bytes = move(bytes)]() mutable { + auto success_steps = [&realm, process_body, task_destination](ByteBuffer bytes) { + queue_fetch_task(task_destination, GC::create_function(realm.heap(), [process_body, bytes = move(bytes)]() mutable { process_body->function()(move(bytes)); })); }; // 3. Let errorSteps optionally given an exception exception be to queue a fetch task to run processBodyError given // exception, with taskDestination. - auto error_steps = [&realm, process_body_error, task_destination_object](JS::Value exception) { - queue_fetch_task(*task_destination_object, GC::create_function(realm.heap(), [process_body_error, exception]() { + auto error_steps = [&realm, process_body_error, task_destination](JS::Value exception) { + queue_fetch_task(task_destination, GC::create_function(realm.heap(), [process_body_error, exception]() { process_body_error->function()(exception); })); }; @@ -107,20 +106,21 @@ void Body::incrementally_read(ProcessBodyChunkCallback process_body_chunk, Proce { HTML::TemporaryExecutionContext const execution_context { m_stream->realm(), HTML::TemporaryExecutionContext::CallbacksEnabled::Yes }; - VERIFY(task_destination.has>()); - // FIXME: 1. If taskDestination is null, then set taskDestination to the result of starting a new parallel queue. - // FIXME: Handle 'parallel queue' task destination + // 1. If taskDestination is null, then set taskDestination to the result of starting a new parallel queue. + if (task_destination.has()) + task_destination = HTML::ParallelQueue::create(); // 2. Let reader be the result of getting a reader for body’s stream. // NOTE: This operation will not throw an exception. auto reader = MUST(m_stream->get_a_reader()); // 3. Perform the incrementally-read loop given reader, taskDestination, processBodyChunk, processEndOfBody, and processBodyError. + VERIFY(!task_destination.has()); incrementally_read_loop(reader, task_destination.get>(), process_body_chunk, process_end_of_body, process_body_error); } // https://fetch.spec.whatwg.org/#incrementally-read-loop -void Body::incrementally_read_loop(Streams::ReadableStreamDefaultReader& reader, GC::Ref task_destination, ProcessBodyChunkCallback process_body_chunk, ProcessEndOfBodyCallback process_end_of_body, ProcessBodyErrorCallback process_body_error) +void Body::incrementally_read_loop(Streams::ReadableStreamDefaultReader& reader, TaskDestination task_destination, ProcessBodyChunkCallback process_body_chunk, ProcessEndOfBodyCallback process_end_of_body, ProcessBodyErrorCallback process_body_error) { auto& realm = reader.realm(); // 1. Let readRequest be the following read request: diff --git a/Libraries/LibWeb/Fetch/Infrastructure/HTTP/Bodies.h b/Libraries/LibWeb/Fetch/Infrastructure/HTTP/Bodies.h index c291a81ddb7..5f6fff2e388 100644 --- a/Libraries/LibWeb/Fetch/Infrastructure/HTTP/Bodies.h +++ b/Libraries/LibWeb/Fetch/Infrastructure/HTTP/Bodies.h @@ -46,9 +46,9 @@ public: [[nodiscard]] GC::Ref clone(JS::Realm&); - void fully_read(JS::Realm&, ProcessBodyCallback process_body, ProcessBodyErrorCallback process_body_error, TaskDestination task_destination) const; - void incrementally_read(ProcessBodyChunkCallback process_body_chunk, ProcessEndOfBodyCallback process_end_of_body, ProcessBodyErrorCallback process_body_error, TaskDestination task_destination); - void incrementally_read_loop(Streams::ReadableStreamDefaultReader& reader, GC::Ref task_destination, ProcessBodyChunkCallback process_body_chunk, ProcessEndOfBodyCallback process_end_of_body, ProcessBodyErrorCallback process_body_error); + void fully_read(JS::Realm&, ProcessBodyCallback process_body, ProcessBodyErrorCallback process_body_error, TaskDestination) const; + void incrementally_read(ProcessBodyChunkCallback process_body_chunk, ProcessEndOfBodyCallback process_end_of_body, ProcessBodyErrorCallback process_body_error, TaskDestination); + void incrementally_read_loop(Streams::ReadableStreamDefaultReader& reader, TaskDestination, ProcessBodyChunkCallback process_body_chunk, ProcessEndOfBodyCallback process_end_of_body, ProcessBodyErrorCallback process_body_error); virtual void visit_edges(JS::Cell::Visitor&) override; diff --git a/Libraries/LibWeb/Fetch/Infrastructure/IncrementalReadLoopReadRequest.cpp b/Libraries/LibWeb/Fetch/Infrastructure/IncrementalReadLoopReadRequest.cpp index 5c937a9603f..9caa68fd24a 100644 --- a/Libraries/LibWeb/Fetch/Infrastructure/IncrementalReadLoopReadRequest.cpp +++ b/Libraries/LibWeb/Fetch/Infrastructure/IncrementalReadLoopReadRequest.cpp @@ -61,7 +61,7 @@ void IncrementalReadLoopReadRequest::on_error(JS::Value error) })); } -IncrementalReadLoopReadRequest::IncrementalReadLoopReadRequest(GC::Ref body, GC::Ref reader, GC::Ref task_destination, Body::ProcessBodyChunkCallback process_body_chunk, Body::ProcessEndOfBodyCallback process_end_of_body, Body::ProcessBodyErrorCallback process_body_error) +IncrementalReadLoopReadRequest::IncrementalReadLoopReadRequest(GC::Ref body, GC::Ref reader, TaskDestination task_destination, Body::ProcessBodyChunkCallback process_body_chunk, Body::ProcessEndOfBodyCallback process_end_of_body, Body::ProcessBodyErrorCallback process_body_error) : m_body(body) , m_reader(reader) , m_task_destination(task_destination) @@ -76,7 +76,8 @@ void IncrementalReadLoopReadRequest::visit_edges(Visitor& visitor) Base::visit_edges(visitor); visitor.visit(m_body); visitor.visit(m_reader); - visitor.visit(m_task_destination); + if (auto* task_destination_object = m_task_destination.get_pointer>(); task_destination_object) + visitor.visit(*task_destination_object); visitor.visit(m_process_body_chunk); visitor.visit(m_process_end_of_body); visitor.visit(m_process_body_error); diff --git a/Libraries/LibWeb/Fetch/Infrastructure/IncrementalReadLoopReadRequest.h b/Libraries/LibWeb/Fetch/Infrastructure/IncrementalReadLoopReadRequest.h index 81332df4226..621cab5328d 100644 --- a/Libraries/LibWeb/Fetch/Infrastructure/IncrementalReadLoopReadRequest.h +++ b/Libraries/LibWeb/Fetch/Infrastructure/IncrementalReadLoopReadRequest.h @@ -17,7 +17,7 @@ class IncrementalReadLoopReadRequest : public Streams::ReadRequest { GC_DECLARE_ALLOCATOR(IncrementalReadLoopReadRequest); public: - IncrementalReadLoopReadRequest(GC::Ref, GC::Ref, GC::Ref task_destination, Body::ProcessBodyChunkCallback, Body::ProcessEndOfBodyCallback, Body::ProcessBodyErrorCallback); + IncrementalReadLoopReadRequest(GC::Ref, GC::Ref, TaskDestination, Body::ProcessBodyChunkCallback, Body::ProcessEndOfBodyCallback, Body::ProcessBodyErrorCallback); virtual void on_chunk(JS::Value chunk) override; virtual void on_close() override; @@ -28,7 +28,7 @@ private: GC::Ref m_body; GC::Ref m_reader; - GC::Ref m_task_destination; + TaskDestination m_task_destination; Body::ProcessBodyChunkCallback m_process_body_chunk; Body::ProcessEndOfBodyCallback m_process_end_of_body; Body::ProcessBodyErrorCallback m_process_body_error; diff --git a/Libraries/LibWeb/Fetch/Infrastructure/Task.cpp b/Libraries/LibWeb/Fetch/Infrastructure/Task.cpp index c28a6c1d6d2..e20b0b75cf4 100644 --- a/Libraries/LibWeb/Fetch/Infrastructure/Task.cpp +++ b/Libraries/LibWeb/Fetch/Infrastructure/Task.cpp @@ -4,6 +4,7 @@ * SPDX-License-Identifier: BSD-2-Clause */ +#include #include #include #include @@ -11,21 +12,25 @@ namespace Web::Fetch::Infrastructure { // https://fetch.spec.whatwg.org/#queue-a-fetch-task -HTML::TaskID queue_fetch_task(JS::Object& task_destination, GC::Ref> algorithm) +HTML::TaskID queue_fetch_task(TaskDestination task_destination, GC::Ref> algorithm) { - // FIXME: 1. If taskDestination is a parallel queue, then enqueue algorithm to taskDestination. + VERIFY(!task_destination.has()); + + // 1. If taskDestination is a parallel queue, then enqueue algorithm to taskDestination. + if (auto* parallel_queue = task_destination.get_pointer>()) + return (*parallel_queue)->enqueue(algorithm); // 2. Otherwise, queue a global task on the networking task source with taskDestination and algorithm. - return HTML::queue_global_task(HTML::Task::Source::Networking, task_destination, algorithm); + return HTML::queue_global_task(HTML::Task::Source::Networking, task_destination.get>(), algorithm); } // AD-HOC: This overload allows tracking the queued task within the fetch controller so that we may cancel queued tasks // when the spec indicates that we must stop an ongoing fetch. -HTML::TaskID queue_fetch_task(GC::Ref fetch_controller, JS::Object& task_destination, GC::Ref> algorithm) +HTML::TaskID queue_fetch_task(GC::Ref fetch_controller, TaskDestination task_destination, GC::Ref> algorithm) { auto fetch_task_id = fetch_controller->next_fetch_task_id(); - auto& heap = task_destination.heap(); + auto& heap = fetch_controller->heap(); auto html_task_id = queue_fetch_task(task_destination, GC::create_function(heap, [fetch_controller, fetch_task_id, algorithm]() { fetch_controller->fetch_task_complete(fetch_task_id); algorithm->function()(); diff --git a/Libraries/LibWeb/Fetch/Infrastructure/Task.h b/Libraries/LibWeb/Fetch/Infrastructure/Task.h index 031637273df..d2b6385e6e5 100644 --- a/Libraries/LibWeb/Fetch/Infrastructure/Task.h +++ b/Libraries/LibWeb/Fetch/Infrastructure/Task.h @@ -14,10 +14,9 @@ namespace Web::Fetch::Infrastructure { -// FIXME: 'or a parallel queue' -using TaskDestination = Variant>; +using TaskDestination = Variant, NonnullRefPtr>; -HTML::TaskID queue_fetch_task(JS::Object&, GC::Ref>); -HTML::TaskID queue_fetch_task(GC::Ref, JS::Object&, GC::Ref>); +HTML::TaskID queue_fetch_task(TaskDestination, GC::Ref>); +HTML::TaskID queue_fetch_task(GC::Ref, TaskDestination, GC::Ref>); } diff --git a/Libraries/LibWeb/HTML/EventLoop/Task.cpp b/Libraries/LibWeb/HTML/EventLoop/Task.cpp index a93f617074e..89c9a30248c 100644 --- a/Libraries/LibWeb/HTML/EventLoop/Task.cpp +++ b/Libraries/LibWeb/HTML/EventLoop/Task.cpp @@ -77,4 +77,17 @@ UniqueTaskSource::~UniqueTaskSource() s_unique_task_source_allocator.deallocate(static_cast(source)); } +NonnullRefPtr ParallelQueue::create() +{ + return adopt_ref(*new (nothrow) ParallelQueue); +} + +TaskID ParallelQueue::enqueue(GC::Ref> algorithm) +{ + auto& event_loop = HTML::main_thread_event_loop(); + auto task = HTML::Task::create(event_loop.vm(), m_task_source.source, nullptr, algorithm); + event_loop.task_queue().add(task); + return task->id(); +} + } diff --git a/Libraries/LibWeb/HTML/EventLoop/Task.h b/Libraries/LibWeb/HTML/EventLoop/Task.h index e2bd1a68707..83b00d7ea17 100644 --- a/Libraries/LibWeb/HTML/EventLoop/Task.h +++ b/Libraries/LibWeb/HTML/EventLoop/Task.h @@ -116,4 +116,13 @@ struct UniqueTaskSource { Task::Source const source; }; +class ParallelQueue : public RefCounted { +public: + static NonnullRefPtr create(); + TaskID enqueue(GC::Ref>); + +private: + UniqueTaskSource m_task_source; +}; + }