LibWeb: Add "parallel queue" and allow it as fetch task destination

Note that it's not actually executing tasks in parallel, it's still
throwing them on the HTML event loop task queue, each with its own
unique task source.

This makes our fetch implementation a lot more robust when HTTP caching
is enabled, and you can now click links on https://terminal.shop/
without hitting TODO assertions in fetch.
This commit is contained in:
Andreas Kling 2025-07-16 12:29:26 +02:00 committed by Andreas Kling
parent 9a5ef95022
commit 03256a2543
Notes: github-actions[bot] 2025-07-16 22:14:47 +00:00
10 changed files with 65 additions and 44 deletions

View file

@ -63,7 +63,7 @@ void FetchedDataReceiver::on_data_received(ReadonlyBytes bytes)
// 3. Queue a fetch task to run the following steps, with fetchParamss task destination. // 3. Queue a fetch task to run the following steps, with fetchParamss task destination.
Infrastructure::queue_fetch_task( Infrastructure::queue_fetch_task(
m_fetch_params->controller(), m_fetch_params->controller(),
m_fetch_params->task_destination().get<GC::Ref<JS::Object>>(), m_fetch_params->task_destination(),
GC::create_function(heap(), [this, bytes = MUST(ByteBuffer::copy(bytes))]() mutable { GC::create_function(heap(), [this, bytes = MUST(ByteBuffer::copy(bytes))]() mutable {
HTML::TemporaryExecutionContext execution_context { m_stream->realm(), HTML::TemporaryExecutionContext::CallbacksEnabled::Yes }; HTML::TemporaryExecutionContext execution_context { m_stream->realm(), HTML::TemporaryExecutionContext::CallbacksEnabled::Yes };

View file

@ -89,7 +89,7 @@ WebIDL::ExceptionOr<GC::Ref<Infrastructure::FetchController>> fetch(JS::Realm& r
VERIFY(request.mode() == Infrastructure::Request::Mode::Navigate || !algorithms.process_early_hints_response()); VERIFY(request.mode() == Infrastructure::Request::Mode::Navigate || !algorithms.process_early_hints_response());
// 2. Let taskDestination be null. // 2. Let taskDestination be null.
GC::Ptr<JS::Object> task_destination; Infrastructure::TaskDestination task_destination;
// 3. Let crossOriginIsolatedCapability be false. // 3. Let crossOriginIsolatedCapability be false.
auto cross_origin_isolated_capability = HTML::CanUseCrossOriginIsolatedAPIs::No; auto cross_origin_isolated_capability = HTML::CanUseCrossOriginIsolatedAPIs::No;
@ -97,14 +97,15 @@ WebIDL::ExceptionOr<GC::Ref<Infrastructure::FetchController>> fetch(JS::Realm& r
// 4. If requests client is non-null, then: // 4. If requests client is non-null, then:
if (request.client() != nullptr) { if (request.client() != nullptr) {
// 1. Set taskDestination to requests clients global object. // 1. Set taskDestination to requests clients global object.
task_destination = request.client()->global_object(); task_destination = GC::Ref { request.client()->global_object() };
// 2. Set crossOriginIsolatedCapability to requests clients cross-origin isolated capability. // 2. Set crossOriginIsolatedCapability to requests clients cross-origin isolated capability.
cross_origin_isolated_capability = request.client()->cross_origin_isolated_capability(); cross_origin_isolated_capability = request.client()->cross_origin_isolated_capability();
} }
// FIXME: 5. If useParallelQueue is true, then set taskDestination to the result of starting a new parallel queue. // 5. If useParallelQueue is true, then set taskDestination to the result of starting a new parallel queue.
(void)use_parallel_queue; if (use_parallel_queue == UseParallelQueue::Yes)
task_destination = HTML::ParallelQueue::create();
// 6. Let timingInfo be a new fetch timing info whose start time and post-redirect start time are the coarsened // 6. Let timingInfo be a new fetch timing info whose start time and post-redirect start time are the coarsened
// shared current time given crossOriginIsolatedCapability, and render-blocking is set to requests // shared current time given crossOriginIsolatedCapability, and render-blocking is set to requests
@ -122,8 +123,7 @@ WebIDL::ExceptionOr<GC::Ref<Infrastructure::FetchController>> fetch(JS::Realm& r
// task destination is taskDestination, and cross-origin isolated capability is crossOriginIsolatedCapability. // task destination is taskDestination, and cross-origin isolated capability is crossOriginIsolatedCapability.
auto fetch_params = Infrastructure::FetchParams::create(vm, request, timing_info); auto fetch_params = Infrastructure::FetchParams::create(vm, request, timing_info);
fetch_params->set_algorithms(algorithms); fetch_params->set_algorithms(algorithms);
if (task_destination) fetch_params->set_task_destination(task_destination);
fetch_params->set_task_destination({ *task_destination });
fetch_params->set_cross_origin_isolated_capability(cross_origin_isolated_capability); fetch_params->set_cross_origin_isolated_capability(cross_origin_isolated_capability);
// 8. If requests body is a byte sequence, then set requests body to requests body as a body. // 8. If requests body is a byte sequence, then set requests body to requests body as a body.
@ -724,20 +724,14 @@ void fetch_response_handover(JS::Realm& realm, Infrastructure::FetchParams const
} }
}); });
// FIXME: Handle 'parallel queue' task destination
auto task_destination = fetch_params.task_destination().get<GC::Ref<JS::Object>>();
// 5. Queue a fetch task to run processResponseEndOfBodyTask with fetchParamss task destination. // 5. Queue a fetch task to run processResponseEndOfBodyTask with fetchParamss task destination.
Infrastructure::queue_fetch_task(fetch_params.controller(), task_destination, move(process_response_end_of_body_task)); Infrastructure::queue_fetch_task(fetch_params.controller(), fetch_params.task_destination(), move(process_response_end_of_body_task));
}; };
// FIXME: Handle 'parallel queue' task destination
auto task_destination = fetch_params.task_destination().get<GC::Ref<JS::Object>>();
// 4. If fetchParamss process response is non-null, then queue a fetch task to run fetchParamss process response // 4. If fetchParamss process response is non-null, then queue a fetch task to run fetchParamss process response
// given response, with fetchParamss task destination. // given response, with fetchParamss task destination.
if (fetch_params.algorithms()->process_response()) { if (fetch_params.algorithms()->process_response()) {
Infrastructure::queue_fetch_task(fetch_params.controller(), task_destination, GC::create_function(vm.heap(), [&fetch_params, &response]() { Infrastructure::queue_fetch_task(fetch_params.controller(), fetch_params.task_destination(), GC::create_function(vm.heap(), [&fetch_params, &response]() {
fetch_params.algorithms()->process_response()(response); fetch_params.algorithms()->process_response()(response);
})); }));
} }
@ -791,7 +785,7 @@ void fetch_response_handover(JS::Realm& realm, Infrastructure::FetchParams const
// 3. If internalResponse's body is null, then queue a fetch task to run processBody given null, with // 3. If internalResponse's body is null, then queue a fetch task to run processBody given null, with
// fetchParamss task destination. // fetchParamss task destination.
if (!internal_response->body()) { if (!internal_response->body()) {
Infrastructure::queue_fetch_task(fetch_params.controller(), task_destination, GC::create_function(vm.heap(), [process_body]() { Infrastructure::queue_fetch_task(fetch_params.controller(), fetch_params.task_destination(), GC::create_function(vm.heap(), [process_body]() {
process_body->function()({}); process_body->function()({});
})); }));
} }

View file

@ -68,22 +68,21 @@ void Body::fully_read(JS::Realm& realm, Web::Fetch::Infrastructure::Body::Proces
{ {
HTML::TemporaryExecutionContext execution_context { realm, HTML::TemporaryExecutionContext::CallbacksEnabled::Yes }; HTML::TemporaryExecutionContext execution_context { realm, HTML::TemporaryExecutionContext::CallbacksEnabled::Yes };
// FIXME: 1. If taskDestination is null, then set taskDestination to the result of starting a new parallel queue. // 1. If taskDestination is null, then set taskDestination to the result of starting a new parallel queue.
// FIXME: Handle 'parallel queue' task destination if (task_destination.has<Empty>())
VERIFY(!task_destination.has<Empty>()); task_destination = HTML::ParallelQueue::create();
auto task_destination_object = task_destination.get<GC::Ref<JS::Object>>();
// 2. Let successSteps given a byte sequence bytes be to queue a fetch task to run processBody given bytes, with taskDestination. // 2. Let successSteps given a byte sequence bytes be to queue a fetch task to run processBody given bytes, with taskDestination.
auto success_steps = [&realm, process_body, task_destination_object](ByteBuffer bytes) { auto success_steps = [&realm, process_body, task_destination](ByteBuffer bytes) {
queue_fetch_task(*task_destination_object, GC::create_function(realm.heap(), [process_body, bytes = move(bytes)]() mutable { queue_fetch_task(task_destination, GC::create_function(realm.heap(), [process_body, bytes = move(bytes)]() mutable {
process_body->function()(move(bytes)); process_body->function()(move(bytes));
})); }));
}; };
// 3. Let errorSteps optionally given an exception exception be to queue a fetch task to run processBodyError given // 3. Let errorSteps optionally given an exception exception be to queue a fetch task to run processBodyError given
// exception, with taskDestination. // exception, with taskDestination.
auto error_steps = [&realm, process_body_error, task_destination_object](JS::Value exception) { auto error_steps = [&realm, process_body_error, task_destination](JS::Value exception) {
queue_fetch_task(*task_destination_object, GC::create_function(realm.heap(), [process_body_error, exception]() { queue_fetch_task(task_destination, GC::create_function(realm.heap(), [process_body_error, exception]() {
process_body_error->function()(exception); process_body_error->function()(exception);
})); }));
}; };
@ -107,20 +106,21 @@ void Body::incrementally_read(ProcessBodyChunkCallback process_body_chunk, Proce
{ {
HTML::TemporaryExecutionContext const execution_context { m_stream->realm(), HTML::TemporaryExecutionContext::CallbacksEnabled::Yes }; HTML::TemporaryExecutionContext const execution_context { m_stream->realm(), HTML::TemporaryExecutionContext::CallbacksEnabled::Yes };
VERIFY(task_destination.has<GC::Ref<JS::Object>>()); // 1. If taskDestination is null, then set taskDestination to the result of starting a new parallel queue.
// FIXME: 1. If taskDestination is null, then set taskDestination to the result of starting a new parallel queue. if (task_destination.has<Empty>())
// FIXME: Handle 'parallel queue' task destination task_destination = HTML::ParallelQueue::create();
// 2. Let reader be the result of getting a reader for bodys stream. // 2. Let reader be the result of getting a reader for bodys stream.
// NOTE: This operation will not throw an exception. // NOTE: This operation will not throw an exception.
auto reader = MUST(m_stream->get_a_reader()); auto reader = MUST(m_stream->get_a_reader());
// 3. Perform the incrementally-read loop given reader, taskDestination, processBodyChunk, processEndOfBody, and processBodyError. // 3. Perform the incrementally-read loop given reader, taskDestination, processBodyChunk, processEndOfBody, and processBodyError.
VERIFY(!task_destination.has<Empty>());
incrementally_read_loop(reader, task_destination.get<GC::Ref<JS::Object>>(), process_body_chunk, process_end_of_body, process_body_error); incrementally_read_loop(reader, task_destination.get<GC::Ref<JS::Object>>(), process_body_chunk, process_end_of_body, process_body_error);
} }
// https://fetch.spec.whatwg.org/#incrementally-read-loop // https://fetch.spec.whatwg.org/#incrementally-read-loop
void Body::incrementally_read_loop(Streams::ReadableStreamDefaultReader& reader, GC::Ref<JS::Object> task_destination, ProcessBodyChunkCallback process_body_chunk, ProcessEndOfBodyCallback process_end_of_body, ProcessBodyErrorCallback process_body_error) void Body::incrementally_read_loop(Streams::ReadableStreamDefaultReader& reader, TaskDestination task_destination, ProcessBodyChunkCallback process_body_chunk, ProcessEndOfBodyCallback process_end_of_body, ProcessBodyErrorCallback process_body_error)
{ {
auto& realm = reader.realm(); auto& realm = reader.realm();
// 1. Let readRequest be the following read request: // 1. Let readRequest be the following read request:

View file

@ -46,9 +46,9 @@ public:
[[nodiscard]] GC::Ref<Body> clone(JS::Realm&); [[nodiscard]] GC::Ref<Body> clone(JS::Realm&);
void fully_read(JS::Realm&, ProcessBodyCallback process_body, ProcessBodyErrorCallback process_body_error, TaskDestination task_destination) const; void fully_read(JS::Realm&, ProcessBodyCallback process_body, ProcessBodyErrorCallback process_body_error, TaskDestination) const;
void incrementally_read(ProcessBodyChunkCallback process_body_chunk, ProcessEndOfBodyCallback process_end_of_body, ProcessBodyErrorCallback process_body_error, TaskDestination task_destination); void incrementally_read(ProcessBodyChunkCallback process_body_chunk, ProcessEndOfBodyCallback process_end_of_body, ProcessBodyErrorCallback process_body_error, TaskDestination);
void incrementally_read_loop(Streams::ReadableStreamDefaultReader& reader, GC::Ref<JS::Object> task_destination, ProcessBodyChunkCallback process_body_chunk, ProcessEndOfBodyCallback process_end_of_body, ProcessBodyErrorCallback process_body_error); void incrementally_read_loop(Streams::ReadableStreamDefaultReader& reader, TaskDestination, ProcessBodyChunkCallback process_body_chunk, ProcessEndOfBodyCallback process_end_of_body, ProcessBodyErrorCallback process_body_error);
virtual void visit_edges(JS::Cell::Visitor&) override; virtual void visit_edges(JS::Cell::Visitor&) override;

View file

@ -61,7 +61,7 @@ void IncrementalReadLoopReadRequest::on_error(JS::Value error)
})); }));
} }
IncrementalReadLoopReadRequest::IncrementalReadLoopReadRequest(GC::Ref<Body> body, GC::Ref<Streams::ReadableStreamDefaultReader> reader, GC::Ref<JS::Object> task_destination, Body::ProcessBodyChunkCallback process_body_chunk, Body::ProcessEndOfBodyCallback process_end_of_body, Body::ProcessBodyErrorCallback process_body_error) IncrementalReadLoopReadRequest::IncrementalReadLoopReadRequest(GC::Ref<Body> body, GC::Ref<Streams::ReadableStreamDefaultReader> reader, TaskDestination task_destination, Body::ProcessBodyChunkCallback process_body_chunk, Body::ProcessEndOfBodyCallback process_end_of_body, Body::ProcessBodyErrorCallback process_body_error)
: m_body(body) : m_body(body)
, m_reader(reader) , m_reader(reader)
, m_task_destination(task_destination) , m_task_destination(task_destination)
@ -76,7 +76,8 @@ void IncrementalReadLoopReadRequest::visit_edges(Visitor& visitor)
Base::visit_edges(visitor); Base::visit_edges(visitor);
visitor.visit(m_body); visitor.visit(m_body);
visitor.visit(m_reader); visitor.visit(m_reader);
visitor.visit(m_task_destination); if (auto* task_destination_object = m_task_destination.get_pointer<GC::Ref<JS::Object>>(); task_destination_object)
visitor.visit(*task_destination_object);
visitor.visit(m_process_body_chunk); visitor.visit(m_process_body_chunk);
visitor.visit(m_process_end_of_body); visitor.visit(m_process_end_of_body);
visitor.visit(m_process_body_error); visitor.visit(m_process_body_error);

View file

@ -17,7 +17,7 @@ class IncrementalReadLoopReadRequest : public Streams::ReadRequest {
GC_DECLARE_ALLOCATOR(IncrementalReadLoopReadRequest); GC_DECLARE_ALLOCATOR(IncrementalReadLoopReadRequest);
public: public:
IncrementalReadLoopReadRequest(GC::Ref<Body>, GC::Ref<Streams::ReadableStreamDefaultReader>, GC::Ref<JS::Object> task_destination, Body::ProcessBodyChunkCallback, Body::ProcessEndOfBodyCallback, Body::ProcessBodyErrorCallback); IncrementalReadLoopReadRequest(GC::Ref<Body>, GC::Ref<Streams::ReadableStreamDefaultReader>, TaskDestination, Body::ProcessBodyChunkCallback, Body::ProcessEndOfBodyCallback, Body::ProcessBodyErrorCallback);
virtual void on_chunk(JS::Value chunk) override; virtual void on_chunk(JS::Value chunk) override;
virtual void on_close() override; virtual void on_close() override;
@ -28,7 +28,7 @@ private:
GC::Ref<Body> m_body; GC::Ref<Body> m_body;
GC::Ref<Streams::ReadableStreamDefaultReader> m_reader; GC::Ref<Streams::ReadableStreamDefaultReader> m_reader;
GC::Ref<JS::Object> m_task_destination; TaskDestination m_task_destination;
Body::ProcessBodyChunkCallback m_process_body_chunk; Body::ProcessBodyChunkCallback m_process_body_chunk;
Body::ProcessEndOfBodyCallback m_process_end_of_body; Body::ProcessEndOfBodyCallback m_process_end_of_body;
Body::ProcessBodyErrorCallback m_process_body_error; Body::ProcessBodyErrorCallback m_process_body_error;

View file

@ -4,6 +4,7 @@
* SPDX-License-Identifier: BSD-2-Clause * SPDX-License-Identifier: BSD-2-Clause
*/ */
#include <LibWeb/Bindings/MainThreadVM.h>
#include <LibWeb/Fetch/Infrastructure/FetchController.h> #include <LibWeb/Fetch/Infrastructure/FetchController.h>
#include <LibWeb/Fetch/Infrastructure/Task.h> #include <LibWeb/Fetch/Infrastructure/Task.h>
#include <LibWeb/HTML/EventLoop/EventLoop.h> #include <LibWeb/HTML/EventLoop/EventLoop.h>
@ -11,21 +12,25 @@
namespace Web::Fetch::Infrastructure { namespace Web::Fetch::Infrastructure {
// https://fetch.spec.whatwg.org/#queue-a-fetch-task // https://fetch.spec.whatwg.org/#queue-a-fetch-task
HTML::TaskID queue_fetch_task(JS::Object& task_destination, GC::Ref<GC::Function<void()>> algorithm) HTML::TaskID queue_fetch_task(TaskDestination task_destination, GC::Ref<GC::Function<void()>> algorithm)
{ {
// FIXME: 1. If taskDestination is a parallel queue, then enqueue algorithm to taskDestination. VERIFY(!task_destination.has<Empty>());
// 1. If taskDestination is a parallel queue, then enqueue algorithm to taskDestination.
if (auto* parallel_queue = task_destination.get_pointer<NonnullRefPtr<HTML::ParallelQueue>>())
return (*parallel_queue)->enqueue(algorithm);
// 2. Otherwise, queue a global task on the networking task source with taskDestination and algorithm. // 2. Otherwise, queue a global task on the networking task source with taskDestination and algorithm.
return HTML::queue_global_task(HTML::Task::Source::Networking, task_destination, algorithm); return HTML::queue_global_task(HTML::Task::Source::Networking, task_destination.get<GC::Ref<JS::Object>>(), algorithm);
} }
// AD-HOC: This overload allows tracking the queued task within the fetch controller so that we may cancel queued tasks // AD-HOC: This overload allows tracking the queued task within the fetch controller so that we may cancel queued tasks
// when the spec indicates that we must stop an ongoing fetch. // when the spec indicates that we must stop an ongoing fetch.
HTML::TaskID queue_fetch_task(GC::Ref<FetchController> fetch_controller, JS::Object& task_destination, GC::Ref<GC::Function<void()>> algorithm) HTML::TaskID queue_fetch_task(GC::Ref<FetchController> fetch_controller, TaskDestination task_destination, GC::Ref<GC::Function<void()>> algorithm)
{ {
auto fetch_task_id = fetch_controller->next_fetch_task_id(); auto fetch_task_id = fetch_controller->next_fetch_task_id();
auto& heap = task_destination.heap(); auto& heap = fetch_controller->heap();
auto html_task_id = queue_fetch_task(task_destination, GC::create_function(heap, [fetch_controller, fetch_task_id, algorithm]() { auto html_task_id = queue_fetch_task(task_destination, GC::create_function(heap, [fetch_controller, fetch_task_id, algorithm]() {
fetch_controller->fetch_task_complete(fetch_task_id); fetch_controller->fetch_task_complete(fetch_task_id);
algorithm->function()(); algorithm->function()();

View file

@ -14,10 +14,9 @@
namespace Web::Fetch::Infrastructure { namespace Web::Fetch::Infrastructure {
// FIXME: 'or a parallel queue' using TaskDestination = Variant<Empty, GC::Ref<JS::Object>, NonnullRefPtr<HTML::ParallelQueue>>;
using TaskDestination = Variant<Empty, GC::Ref<JS::Object>>;
HTML::TaskID queue_fetch_task(JS::Object&, GC::Ref<GC::Function<void()>>); HTML::TaskID queue_fetch_task(TaskDestination, GC::Ref<GC::Function<void()>>);
HTML::TaskID queue_fetch_task(GC::Ref<FetchController>, JS::Object&, GC::Ref<GC::Function<void()>>); HTML::TaskID queue_fetch_task(GC::Ref<FetchController>, TaskDestination, GC::Ref<GC::Function<void()>>);
} }

View file

@ -77,4 +77,17 @@ UniqueTaskSource::~UniqueTaskSource()
s_unique_task_source_allocator.deallocate(static_cast<int>(source)); s_unique_task_source_allocator.deallocate(static_cast<int>(source));
} }
NonnullRefPtr<ParallelQueue> ParallelQueue::create()
{
return adopt_ref(*new (nothrow) ParallelQueue);
}
TaskID ParallelQueue::enqueue(GC::Ref<GC::Function<void()>> algorithm)
{
auto& event_loop = HTML::main_thread_event_loop();
auto task = HTML::Task::create(event_loop.vm(), m_task_source.source, nullptr, algorithm);
event_loop.task_queue().add(task);
return task->id();
}
} }

View file

@ -116,4 +116,13 @@ struct UniqueTaskSource {
Task::Source const source; Task::Source const source;
}; };
class ParallelQueue : public RefCounted<ParallelQueue> {
public:
static NonnullRefPtr<ParallelQueue> create();
TaskID enqueue(GC::Ref<GC::Function<void()>>);
private:
UniqueTaskSource m_task_source;
};
} }