LibWeb: Implement ReadableStream's async iterator

This commit is contained in:
Timothy Flynn 2025-04-12 13:19:05 -04:00
parent e16072cef4
commit 4e804d85af
8 changed files with 261 additions and 43 deletions

View file

@ -743,6 +743,7 @@ set(SOURCES
Streams/GenericTransformStream.cpp
Streams/ReadableByteStreamController.cpp
Streams/ReadableStream.cpp
Streams/ReadableStreamAsyncIterator.cpp
Streams/ReadableStreamBYOBReader.cpp
Streams/ReadableStreamBYOBRequest.cpp
Streams/ReadableStreamDefaultController.cpp

View file

@ -786,6 +786,7 @@ class ByteLengthQueuingStrategy;
class CountQueuingStrategy;
class ReadableByteStreamController;
class ReadableStream;
class ReadableStreamAsyncIterator;
class ReadableStreamBYOBReader;
class ReadableStreamBYOBRequest;
class ReadableStreamDefaultController;

View file

@ -24,8 +24,13 @@ dictionary ReadableStreamGetReaderOptions {
ReadableStreamReaderMode mode;
};
// https://streams.spec.whatwg.org/#dictdef-readablestreamiteratoroptions
dictionary ReadableStreamIteratorOptions {
boolean preventCancel = false;
};
// https://streams.spec.whatwg.org/#readablestream
[Exposed=*, Transferable]
[Exposed=*, Transferable, DefinesAsyncIteratorReturn]
interface ReadableStream {
constructor(optional object underlyingSource, optional QueuingStrategy strategy = {});
@ -39,7 +44,7 @@ interface ReadableStream {
Promise<undefined> pipeTo(WritableStream destination, optional StreamPipeOptions options = {});
sequence<ReadableStream> tee();
// FIXME: async iterable<any>(optional ReadableStreamIteratorOptions options = {});
async iterable<any>(optional ReadableStreamIteratorOptions options = {});
};
typedef (ReadableStreamDefaultReader or ReadableStreamBYOBReader) ReadableStreamReader;

View file

@ -0,0 +1,169 @@
/*
* Copyright (c) 2025, Tim Flynn <trflynn89@ladybird.org>
*
* SPDX-License-Identifier: BSD-2-Clause
*/
#include <LibWeb/Bindings/Intrinsics.h>
#include <LibWeb/Bindings/ReadableStreamAsyncIteratorPrototype.h>
#include <LibWeb/Streams/AbstractOperations.h>
#include <LibWeb/Streams/ReadableStream.h>
#include <LibWeb/Streams/ReadableStreamAsyncIterator.h>
#include <LibWeb/Streams/ReadableStreamDefaultReader.h>
namespace Web::Bindings {
template<>
void Intrinsics::create_web_prototype_and_constructor<ReadableStreamAsyncIteratorPrototype>(JS::Realm& realm)
{
auto prototype = realm.create<ReadableStreamAsyncIteratorPrototype>(realm);
m_prototypes.set("ReadableStreamAsyncIterator"_fly_string, prototype);
}
}
namespace Web::Streams {
GC_DEFINE_ALLOCATOR(ReadableStreamAsyncIterator);
// https://streams.spec.whatwg.org/#ref-for-asynchronous-iterator-initialization-steps
WebIDL::ExceptionOr<GC::Ref<ReadableStreamAsyncIterator>> ReadableStreamAsyncIterator::create(JS::Realm& realm, JS::Object::PropertyKind kind, ReadableStream& stream, ReadableStreamIteratorOptions options)
{
// 1. Let reader be ? AcquireReadableStreamDefaultReader(stream).
// 2. Set iterators reader to reader.
auto reader = TRY(acquire_readable_stream_default_reader(stream));
// 3. Let preventCancel be args[0]["preventCancel"].
// 4. Set iterators prevent cancel to preventCancel.
auto prevent_cancel = options.prevent_cancel;
return realm.create<ReadableStreamAsyncIterator>(realm, kind, reader, prevent_cancel);
}
ReadableStreamAsyncIterator::ReadableStreamAsyncIterator(JS::Realm& realm, JS::Object::PropertyKind kind, GC::Ref<ReadableStreamDefaultReader> reader, bool prevent_cancel)
: AsyncIterator(realm, kind)
, m_reader(reader)
, m_prevent_cancel(prevent_cancel)
{
}
ReadableStreamAsyncIterator::~ReadableStreamAsyncIterator() = default;
void ReadableStreamAsyncIterator::initialize(JS::Realm& realm)
{
Base::initialize(realm);
WEB_SET_PROTOTYPE_FOR_INTERFACE(ReadableStreamAsyncIterator);
}
void ReadableStreamAsyncIterator::visit_edges(JS::Cell::Visitor& visitor)
{
Base::visit_edges(visitor);
visitor.visit(m_reader);
}
class ReadableStreamAsyncIteratorReadRequest final : public ReadRequest {
GC_CELL(ReadableStreamAsyncIteratorReadRequest, ReadRequest);
GC_DECLARE_ALLOCATOR(ReadableStreamAsyncIteratorReadRequest);
public:
ReadableStreamAsyncIteratorReadRequest(JS::Realm& realm, ReadableStreamDefaultReader& reader, WebIDL::Promise& promise)
: m_realm(realm)
, m_reader(reader)
, m_promise(promise)
{
}
// chunk steps, given chunk
virtual void on_chunk(JS::Value chunk) override
{
// 1. Resolve promise with chunk.
WebIDL::resolve_promise(m_realm, m_promise, chunk);
}
// close steps
virtual void on_close() override
{
// 1. Perform ! ReadableStreamDefaultReaderRelease(reader).
readable_stream_default_reader_release(m_reader);
// 2. Resolve promise with end of iteration.
WebIDL::resolve_promise(m_realm, m_promise, JS::js_special_empty_value());
}
// error steps, given e
virtual void on_error(JS::Value error) override
{
// 1. Perform ! ReadableStreamDefaultReaderRelease(reader).
readable_stream_default_reader_release(m_reader);
// 2. Reject promise with e.
WebIDL::reject_promise(m_realm, m_promise, error);
}
private:
virtual void visit_edges(Visitor& visitor) override
{
Base::visit_edges(visitor);
visitor.visit(m_realm);
visitor.visit(m_reader);
visitor.visit(m_promise);
}
GC::Ref<JS::Realm> m_realm;
GC::Ref<ReadableStreamDefaultReader> m_reader;
GC::Ref<WebIDL::Promise> m_promise;
};
GC_DEFINE_ALLOCATOR(ReadableStreamAsyncIteratorReadRequest);
// https://streams.spec.whatwg.org/#ref-for-dfn-get-the-next-iteration-result
GC::Ref<WebIDL::Promise> ReadableStreamAsyncIterator::next_iteration_result(JS::Realm& realm)
{
// 1. Let reader be iterators reader.
// 2. Assert: reader.[[stream]] is not undefined.
VERIFY(m_reader->stream());
// 3. Let promise be a new promise.
auto promise = WebIDL::create_promise(realm);
// 4. Let readRequest be a new read request with the following items:
auto read_request = heap().allocate<ReadableStreamAsyncIteratorReadRequest>(realm, m_reader, promise);
// 5. Perform ! ReadableStreamDefaultReaderRead(this, readRequest).
readable_stream_default_reader_read(m_reader, read_request);
// 6. Return promise.
return promise;
}
// https://streams.spec.whatwg.org/#ref-for-asynchronous-iterator-return
GC::Ref<WebIDL::Promise> ReadableStreamAsyncIterator::iterator_return(JS::Realm& realm, JS::Value arg)
{
// 1. Let reader be iterators reader.
// 2. Assert: reader.[[stream]] is not undefined.
VERIFY(m_reader->stream());
// 3. Assert: reader.[[readRequests]] is empty, as the async iterator machinery guarantees that any previous calls
// to next() have settled before this is called.
VERIFY(m_reader->read_requests().is_empty());
// 4. If iterators prevent cancel is false:
if (!m_prevent_cancel) {
// 1. Let result be ! ReadableStreamReaderGenericCancel(reader, arg).
auto result = readable_stream_reader_generic_cancel(m_reader, arg);
// 2. Perform ! ReadableStreamDefaultReaderRelease(reader).
readable_stream_default_reader_release(m_reader);
// 3. Return result.
return result;
}
// 5. Perform ! ReadableStreamDefaultReaderRelease(reader).
readable_stream_default_reader_release(m_reader);
// 6. Return a promise resolved with undefined.
return WebIDL::create_resolved_promise(realm, JS::js_undefined());
}
}

View file

@ -0,0 +1,42 @@
/*
* Copyright (c) 2025, Tim Flynn <trflynn89@ladybird.org>
*
* SPDX-License-Identifier: BSD-2-Clause
*/
#pragma once
#include <LibWeb/Bindings/PlatformObject.h>
#include <LibWeb/Forward.h>
#include <LibWeb/WebIDL/AsyncIterator.h>
namespace Web::Streams {
// https://streams.spec.whatwg.org/#dictdef-readablestreamiteratoroptions
struct ReadableStreamIteratorOptions {
bool prevent_cancel { false };
};
class ReadableStreamAsyncIterator final : public WebIDL::AsyncIterator {
WEB_PLATFORM_OBJECT(ReadableStreamAsyncIterator, WebIDL::AsyncIterator);
GC_DECLARE_ALLOCATOR(ReadableStreamAsyncIterator);
public:
static WebIDL::ExceptionOr<GC::Ref<ReadableStreamAsyncIterator>> create(JS::Realm&, JS::Object::PropertyKind, ReadableStream&, ReadableStreamIteratorOptions);
virtual ~ReadableStreamAsyncIterator() override;
private:
ReadableStreamAsyncIterator(JS::Realm&, JS::Object::PropertyKind, GC::Ref<ReadableStreamDefaultReader>, bool prevent_cancel);
virtual void initialize(JS::Realm&) override;
virtual void visit_edges(Cell::Visitor&) override;
virtual GC::Ref<WebIDL::Promise> next_iteration_result(JS::Realm&) override;
virtual GC::Ref<WebIDL::Promise> iterator_return(JS::Realm&, JS::Value) override;
GC::Ref<ReadableStreamDefaultReader> m_reader;
bool m_prevent_cancel { false };
};
}

View file

@ -297,7 +297,7 @@ libweb_js_bindings(ServiceWorker/ServiceWorkerRegistration)
libweb_js_bindings(Streams/ByteLengthQueuingStrategy)
libweb_js_bindings(Streams/CountQueuingStrategy)
libweb_js_bindings(Streams/ReadableByteStreamController)
libweb_js_bindings(Streams/ReadableStream)
libweb_js_bindings(Streams/ReadableStream ASYNC_ITERABLE)
libweb_js_bindings(Streams/ReadableStreamBYOBReader)
libweb_js_bindings(Streams/ReadableStreamBYOBRequest)
libweb_js_bindings(Streams/ReadableStreamDefaultController)

View file

@ -2,8 +2,7 @@ Harness status: OK
Found 229 tests
228 Pass
1 Fail
229 Pass
Pass idl_test setup
Pass idl_test validation
Pass ReadableStreamDefaultReader includes ReadableStreamGenericReader: member names are unique
@ -21,7 +20,7 @@ Pass ReadableStream interface: operation getReader(optional ReadableStreamGetRea
Pass ReadableStream interface: operation pipeThrough(ReadableWritablePair, optional StreamPipeOptions)
Pass ReadableStream interface: operation pipeTo(WritableStream, optional StreamPipeOptions)
Pass ReadableStream interface: operation tee()
Fail ReadableStream interface: async iterable<any>
Pass ReadableStream interface: async iterable<any>
Pass ReadableStream must be primary interface of new ReadableStream()
Pass Stringification of new ReadableStream()
Pass ReadableStream interface: new ReadableStream() must inherit property "from(any)" with the proper type

View file

@ -2,45 +2,46 @@ Harness status: OK
Found 41 tests
41 Fail
Fail Async iterator instances should have the correct list of properties
Fail Async-iterating a push source
Fail Async-iterating a pull source
Fail Async-iterating a push source with undefined values
Fail Async-iterating a pull source with undefined values
Fail Async-iterating a pull source manually
Fail Async-iterating an errored stream throws
Fail Async-iterating a closed stream never executes the loop body, but works fine
Fail Async-iterating an empty but not closed/errored stream never executes the loop body and stalls the async function
Fail Async-iterating a partially consumed stream
36 Pass
5 Fail
Pass Async iterator instances should have the correct list of properties
Pass Async-iterating a push source
Pass Async-iterating a pull source
Pass Async-iterating a push source with undefined values
Pass Async-iterating a pull source with undefined values
Pass Async-iterating a pull source manually
Pass Async-iterating an errored stream throws
Pass Async-iterating a closed stream never executes the loop body, but works fine
Pass Async-iterating an empty but not closed/errored stream never executes the loop body and stalls the async function
Pass Async-iterating a partially consumed stream
Fail Cancellation behavior when throwing inside loop body; preventCancel = false
Fail Cancellation behavior when throwing inside loop body; preventCancel = true
Pass Cancellation behavior when throwing inside loop body; preventCancel = true
Fail Cancellation behavior when breaking inside loop body; preventCancel = false
Fail Cancellation behavior when breaking inside loop body; preventCancel = true
Pass Cancellation behavior when breaking inside loop body; preventCancel = true
Fail Cancellation behavior when returning inside loop body; preventCancel = false
Fail Cancellation behavior when returning inside loop body; preventCancel = true
Fail Cancellation behavior when manually calling return(); preventCancel = false
Fail Cancellation behavior when manually calling return(); preventCancel = true
Fail next() rejects if the stream errors
Fail return() does not rejects if the stream has not errored yet
Fail return() rejects if the stream has errored
Fail next() that succeeds; next() that reports an error; next()
Fail next() that succeeds; next() that reports an error(); next() [no awaiting]
Fail next() that succeeds; next() that reports an error(); return()
Fail next() that succeeds; next() that reports an error(); return() [no awaiting]
Fail next() that succeeds; return()
Fail next() that succeeds; return() [no awaiting]
Fail return(); next()
Fail return(); next() [no awaiting]
Fail return(); next() with delayed cancel()
Fail return(); next() with delayed cancel() [no awaiting]
Fail return(); return()
Fail return(); return() [no awaiting]
Fail values() throws if there's already a lock
Fail Acquiring a reader after exhaustively async-iterating a stream
Fail Acquiring a reader after return()ing from a stream that errors
Pass Cancellation behavior when returning inside loop body; preventCancel = true
Pass Cancellation behavior when manually calling return(); preventCancel = false
Pass Cancellation behavior when manually calling return(); preventCancel = true
Pass next() rejects if the stream errors
Pass return() does not rejects if the stream has not errored yet
Pass return() rejects if the stream has errored
Pass next() that succeeds; next() that reports an error; next()
Pass next() that succeeds; next() that reports an error(); next() [no awaiting]
Pass next() that succeeds; next() that reports an error(); return()
Pass next() that succeeds; next() that reports an error(); return() [no awaiting]
Pass next() that succeeds; return()
Pass next() that succeeds; return() [no awaiting]
Pass return(); next()
Pass return(); next() [no awaiting]
Pass return(); next() with delayed cancel()
Pass return(); next() with delayed cancel() [no awaiting]
Pass return(); return()
Pass return(); return() [no awaiting]
Pass values() throws if there's already a lock
Pass Acquiring a reader after exhaustively async-iterating a stream
Pass Acquiring a reader after return()ing from a stream that errors
Fail Acquiring a reader after partially async-iterating a stream
Fail Acquiring a reader and reading the remaining chunks after partially async-iterating a stream with preventCancel = true
Fail return() should unlock the stream synchronously when preventCancel = false
Fail return() should unlock the stream synchronously when preventCancel = true
Fail close() while next() is pending
Pass return() should unlock the stream synchronously when preventCancel = false
Pass return() should unlock the stream synchronously when preventCancel = true
Pass close() while next() is pending