LibWeb: Implement ReadableStreamPipeTo according to spec

Our existing implementation of stream piping was extremely ad-hoc. It
did nothing to handle closed/errored streams, and did not read from or
write to streams in a way required by the spec.

This new implementation uses a custom JS::Cell to drive the read/write
loop.
This commit is contained in:
Timothy Flynn 2025-04-09 12:01:51 -04:00 committed by Tim Flynn
parent 4010c4643a
commit eb0a51faf0
Notes: github-actions[bot] 2025-04-11 16:11:49 +00:00
33 changed files with 3926 additions and 85 deletions

View file

@ -289,8 +289,383 @@ bool readable_stream_has_default_reader(ReadableStream const& stream)
return false;
}
// https://streams.spec.whatwg.org/#ref-for-in-parallel
class ReadableStreamPipeTo final : public JS::Cell {
GC_CELL(ReadableStreamPipeTo, JS::Cell);
GC_DECLARE_ALLOCATOR(ReadableStreamPipeTo);
public:
void process()
{
if (check_for_error_and_close_states())
return;
auto ready_promise = m_writer->ready();
if (ready_promise && WebIDL::is_promise_fulfilled(*ready_promise)) {
read_chunk();
return;
}
auto when_ready = GC::create_function(m_realm->heap(), [this](JS::Value) -> WebIDL::ExceptionOr<JS::Value> {
read_chunk();
return JS::js_undefined();
});
auto shutdown = GC::create_function(heap(), [this](JS::Value) -> WebIDL::ExceptionOr<JS::Value> {
check_for_error_and_close_states();
return JS::js_undefined();
});
if (ready_promise)
WebIDL::react_to_promise(*ready_promise, when_ready, shutdown);
if (auto promise = m_reader->closed())
WebIDL::react_to_promise(*promise, shutdown, shutdown);
}
void set_abort_signal(GC::Ref<DOM::AbortSignal> signal, DOM::AbortSignal::AbortSignal::AbortAlgorithmID signal_id)
{
m_signal = signal;
m_signal_id = signal_id;
}
// https://streams.spec.whatwg.org/#rs-pipeTo-shutdown-with-action
void shutdown_with_action(GC::Ref<GC::Function<GC::Ref<WebIDL::Promise>()>> action, Optional<JS::Value> original_error = {})
{
// 1. If shuttingDown is true, abort these substeps.
if (m_shutting_down)
return;
// 2. Set shuttingDown to true.
m_shutting_down = true;
auto on_pending_writes_complete = [this, action, original_error = move(original_error)]() mutable {
HTML::TemporaryExecutionContext execution_context { m_realm, HTML::TemporaryExecutionContext::CallbacksEnabled::Yes };
// 4. Let p be the result of performing action.
auto promise = action->function()();
WebIDL::react_to_promise(promise,
// 5. Upon fulfillment of p, finalize, passing along originalError if it was given.
GC::create_function(heap(), [this, original_error = move(original_error)](JS::Value) mutable -> WebIDL::ExceptionOr<JS::Value> {
finish(move(original_error));
return JS::js_undefined();
}),
// 6. Upon rejection of p with reason newError, finalize with newError.
GC::create_function(heap(), [this](JS::Value new_error) -> WebIDL::ExceptionOr<JS::Value> {
finish(new_error);
return JS::js_undefined();
}));
};
// 3. If dest.[[state]] is "writable" and ! WritableStreamCloseQueuedOrInFlight(dest) is false,
if (m_destination->state() == WritableStream::State::Writable && !writable_stream_close_queued_or_in_flight(m_destination)) {
// 1. If any chunks have been read but not yet written, write them to dest.
write_unwritten_chunks();
// 2. Wait until every chunk that has been read has been written (i.e. the corresponding promises have settled).
wait_for_pending_writes_to_complete(move(on_pending_writes_complete));
} else {
on_pending_writes_complete();
}
}
// https://streams.spec.whatwg.org/#rs-pipeTo-shutdown
void shutdown(Optional<JS::Value> error = {})
{
// 1. If shuttingDown is true, abort these substeps.
if (m_shutting_down)
return;
// 2. Set shuttingDown to true.
m_shutting_down = true;
auto on_pending_writes_complete = [this, error = move(error)]() mutable {
HTML::TemporaryExecutionContext execution_context { m_realm, HTML::TemporaryExecutionContext::CallbacksEnabled::Yes };
// 4. Finalize, passing along error if it was given.
finish(move(error));
};
// 3. If dest.[[state]] is "writable" and ! WritableStreamCloseQueuedOrInFlight(dest) is false,
if (m_destination->state() == WritableStream::State::Writable && !writable_stream_close_queued_or_in_flight(m_destination)) {
// 1. If any chunks have been read but not yet written, write them to dest.
write_unwritten_chunks();
// 2. Wait until every chunk that has been read has been written (i.e. the corresponding promises have settled).
wait_for_pending_writes_to_complete(move(on_pending_writes_complete));
} else {
on_pending_writes_complete();
}
}
private:
ReadableStreamPipeTo(
GC::Ref<JS::Realm> realm,
GC::Ref<WebIDL::Promise> promise,
GC::Ref<ReadableStream> source,
GC::Ref<WritableStream> destination,
GC::Ref<ReadableStreamDefaultReader> reader,
GC::Ref<WritableStreamDefaultWriter> writer,
bool prevent_close,
bool prevent_abort,
bool prevent_cancel)
: m_realm(realm)
, m_promise(promise)
, m_source(source)
, m_destination(destination)
, m_reader(reader)
, m_writer(writer)
, m_prevent_close(prevent_close)
, m_prevent_abort(prevent_abort)
, m_prevent_cancel(prevent_cancel)
{
m_reader->set_readable_stream_pipe_to_operation({}, this);
}
virtual void visit_edges(Cell::Visitor& visitor) override
{
Base::visit_edges(visitor);
visitor.visit(m_realm);
visitor.visit(m_promise);
visitor.visit(m_source);
visitor.visit(m_destination);
visitor.visit(m_reader);
visitor.visit(m_writer);
visitor.visit(m_signal);
visitor.visit(m_pending_writes);
visitor.visit(m_unwritten_chunks);
}
void read_chunk()
{
// Shutdown must stop activity: if shuttingDown becomes true, the user agent must not initiate further reads from
// reader, and must only perform writes of already-read chunks, as described below. In particular, the user agent
// must check the below conditions before performing any reads or writes, since they might lead to immediate shutdown.
if (check_for_error_and_close_states())
return;
auto when_ready = GC::create_function(heap(), [this](JS::Value value) -> WebIDL::ExceptionOr<JS::Value> {
auto& vm = this->vm();
VERIFY(value.is_object());
auto& object = value.as_object();
auto done = MUST(JS::iterator_complete(vm, object));
if (done) {
if (!check_for_error_and_close_states())
finish();
} else {
auto chunk = MUST(JS::iterator_value(vm, object));
m_unwritten_chunks.append(chunk);
write_chunk();
process();
}
return JS::js_undefined();
});
auto shutdown = GC::create_function(heap(), [this](JS::Value) -> WebIDL::ExceptionOr<JS::Value> {
check_for_error_and_close_states();
return JS::js_undefined();
});
WebIDL::react_to_promise(m_reader->read(), when_ready, shutdown);
if (auto promise = m_writer->closed())
WebIDL::react_to_promise(*promise, shutdown, shutdown);
}
void write_chunk()
{
// Shutdown must stop activity: if shuttingDown becomes true, the user agent must not initiate further reads from
// reader, and must only perform writes of already-read chunks, as described below. In particular, the user agent
// must check the below conditions before performing any reads or writes, since they might lead to immediate shutdown.
if (!m_shutting_down && check_for_error_and_close_states())
return;
auto promise = m_writer->write(m_unwritten_chunks.take_first());
WebIDL::mark_promise_as_handled(promise);
m_pending_writes.append(promise);
}
void write_unwritten_chunks()
{
while (!m_unwritten_chunks.is_empty())
write_chunk();
}
void wait_for_pending_writes_to_complete(Function<void()> on_complete)
{
auto handler = GC::create_function(heap(), [this, on_complete = move(on_complete)]() {
m_pending_writes.clear();
on_complete();
});
auto success_steps = [handler](Vector<JS::Value> const&) { handler->function()(); };
auto failure_steps = [handler](JS::Value) { handler->function()(); };
WebIDL::wait_for_all(m_realm, m_pending_writes, move(success_steps), move(failure_steps));
}
// https://streams.spec.whatwg.org/#rs-pipeTo-finalize
// We call this `finish` instead of `finalize` to avoid conflicts with GC::Cell::finalize.
void finish(Optional<JS::Value> error = {})
{
// 1. Perform ! WritableStreamDefaultWriterRelease(writer).
writable_stream_default_writer_release(m_writer);
// 2. If reader implements ReadableStreamBYOBReader, perform ! ReadableStreamBYOBReaderRelease(reader).
// 3. Otherwise, perform ! ReadableStreamDefaultReaderRelease(reader).
readable_stream_default_reader_release(m_reader);
// 4. If signal is not undefined, remove abortAlgorithm from signal.
if (m_signal)
m_signal->remove_abort_algorithm(m_signal_id);
// 5. If error was given, reject promise with error.
if (error.has_value()) {
WebIDL::reject_promise(m_realm, m_promise, *error);
}
// 6. Otherwise, resolve promise with undefined.
else {
WebIDL::resolve_promise(m_realm, m_promise, JS::js_undefined());
}
m_reader->set_readable_stream_pipe_to_operation({}, nullptr);
}
bool check_for_error_and_close_states()
{
// Error and close states must be propagated: the following conditions must be applied in order.
return m_shutting_down
|| check_for_forward_errors()
|| check_for_backward_errors()
|| check_for_forward_close()
|| check_for_backward_close();
}
bool check_for_forward_errors()
{
// 1. Errors must be propagated forward: if source.[[state]] is or becomes "errored", then
if (m_source->state() == ReadableStream::State::Errored) {
// 1. If preventAbort is false, shutdown with an action of ! WritableStreamAbort(dest, source.[[storedError]])
// and with source.[[storedError]].
if (!m_prevent_abort) {
auto action = GC::create_function(heap(), [this]() {
return writable_stream_abort(m_destination, m_source->stored_error());
});
shutdown_with_action(action, m_source->stored_error());
}
// 2. Otherwise, shutdown with source.[[storedError]].
else {
shutdown(m_source->stored_error());
}
}
return m_shutting_down;
}
bool check_for_backward_errors()
{
// 2. Errors must be propagated backward: if dest.[[state]] is or becomes "errored", then
if (m_destination->state() == WritableStream::State::Errored) {
// 1. If preventCancel is false, shutdown with an action of ! ReadableStreamCancel(source, dest.[[storedError]])
// and with dest.[[storedError]].
if (!m_prevent_cancel) {
auto action = GC::create_function(heap(), [this]() {
return readable_stream_cancel(m_source, m_destination->stored_error());
});
shutdown_with_action(action, m_destination->stored_error());
}
// 2. Otherwise, shutdown with dest.[[storedError]].
else {
shutdown(m_destination->stored_error());
}
}
return m_shutting_down;
}
bool check_for_forward_close()
{
// 3. Closing must be propagated forward: if source.[[state]] is or becomes "closed", then
if (m_source->state() == ReadableStream::State::Closed) {
// 1. If preventClose is false, shutdown with an action of ! WritableStreamDefaultWriterCloseWithErrorPropagation(writer).
if (!m_prevent_close) {
auto action = GC::create_function(heap(), [this]() {
return writable_stream_default_writer_close_with_error_propagation(m_writer);
});
shutdown_with_action(action);
}
// 2. Otherwise, shutdown.
else {
shutdown();
}
}
return m_shutting_down;
}
bool check_for_backward_close()
{
// 4. Closing must be propagated backward: if ! WritableStreamCloseQueuedOrInFlight(dest) is true or dest.[[state]] is "closed", then
if (writable_stream_close_queued_or_in_flight(m_destination) || m_destination->state() == WritableStream::State::Closed) {
// 1. Assert: no chunks have been read or written.
// 2. Let destClosed be a new TypeError.
auto destination_closed = JS::TypeError::create(m_realm, "Destination stream was closed during piping operation"sv);
// 3. If preventCancel is false, shutdown with an action of ! ReadableStreamCancel(source, destClosed) and with destClosed.
if (!m_prevent_cancel) {
auto action = GC::create_function(heap(), [this, destination_closed]() {
return readable_stream_cancel(m_source, destination_closed);
});
shutdown_with_action(action, destination_closed);
}
// 4. Otherwise, shutdown with destClosed.
else {
shutdown(destination_closed);
}
}
return m_shutting_down;
}
GC::Ref<JS::Realm> m_realm;
GC::Ref<WebIDL::Promise> m_promise;
GC::Ref<ReadableStream> m_source;
GC::Ref<WritableStream> m_destination;
GC::Ref<ReadableStreamDefaultReader> m_reader;
GC::Ref<WritableStreamDefaultWriter> m_writer;
GC::Ptr<DOM::AbortSignal> m_signal;
DOM::AbortSignal::AbortAlgorithmID m_signal_id { 0 };
Vector<GC::Ref<WebIDL::Promise>> m_pending_writes;
Vector<JS::Value, 1> m_unwritten_chunks;
bool m_prevent_close { false };
bool m_prevent_abort { false };
bool m_prevent_cancel { false };
bool m_shutting_down { false };
};
GC_DEFINE_ALLOCATOR(ReadableStreamPipeTo);
// https://streams.spec.whatwg.org/#readable-stream-pipe-to
GC::Ref<WebIDL::Promise> readable_stream_pipe_to(ReadableStream& source, WritableStream& dest, bool, bool, bool, GC::Ptr<DOM::AbortSignal> signal)
GC::Ref<WebIDL::Promise> readable_stream_pipe_to(ReadableStream& source, WritableStream& dest, bool prevent_close, bool prevent_abort, bool prevent_cancel, GC::Ptr<DOM::AbortSignal> signal)
{
auto& realm = source.realm();
@ -300,7 +675,6 @@ GC::Ref<WebIDL::Promise> readable_stream_pipe_to(ReadableStream& source, Writabl
// 4. If signal was not given, let signal be undefined.
// 5. Assert: either signal is undefined, or signal implements AbortSignal.
(void)signal;
// 6. Assert: ! IsReadableStreamLocked(source) is false.
VERIFY(!is_readable_stream_locked(source));
@ -322,57 +696,81 @@ GC::Ref<WebIDL::Promise> readable_stream_pipe_to(ReadableStream& source, Writabl
// 11. Set source.[[disturbed]] to true.
source.set_disturbed(true);
// FIXME: 12. Let shuttingDown be false.
// 12. Let shuttingDown be false.
// NOTE: This is internal to the ReadableStreamPipeTo class.
// 13. Let promise be a new promise.
auto promise = WebIDL::create_promise(realm);
// FIXME 14. If signal is not undefined,
// 1. Let abortAlgorithm be the following steps:
// 1. Let error be signals abort reason.
// 2. Let actions be an empty ordered set.
// 3. If preventAbort is false, append the following action to actions:
// 1. If dest.[[state]] is "writable", return ! WritableStreamAbort(dest, error).
// 2. Otherwise, return a promise resolved with undefined.
// 4. If preventCancel is false, append the following action to actions:
// 1. If source.[[state]] is "readable", return ! ReadableStreamCancel(source, error).
// 2. Otherwise, return a promise resolved with undefined.
// 5. Shutdown with an action consisting of getting a promise to wait for all of the actions in actions, and with error.
// 2. If signal is aborted, perform abortAlgorithm and return promise.
// 3. Add abortAlgorithm to signal.
auto operation = realm.heap().allocate<ReadableStreamPipeTo>(realm, promise, source, dest, reader, writer, prevent_close, prevent_abort, prevent_cancel);
// 15. In parallel but not really; see #905, using reader and writer, read all chunks from source and write them to
// dest. Due to the locking provided by the reader and writer, the exact manner in which this happens is not
// observable to author code, and so there is flexibility in how this is done. The following constraints apply
// regardless of the exact algorithm used:
// - Public API must not be used: while reading or writing, or performing any of the operations below, the
// JavaScript-modifiable reader, writer, and stream APIs (i.e. methods on the appropriate prototypes) must not
// be used. Instead, the streams must be manipulated directly.
// 14. If signal is not undefined,
if (signal) {
// 1. Let abortAlgorithm be the following steps:
auto abort_algorithm = [&realm, operation, source = GC::Ref { source }, dest = GC::Ref { dest }, prevent_abort, prevent_cancel, signal]() {
HTML::TemporaryExecutionContext execution_context { realm, HTML::TemporaryExecutionContext::CallbacksEnabled::Yes };
// FIXME: Currently a naive implementation that uses ReadableStreamDefaultReader::read_all_chunks() to read all chunks
// from the source and then through the callback success_steps writes those chunks to the destination.
auto chunk_steps = GC::create_function(realm.heap(), [&realm, writer](JS::Value chunk) {
auto promise = writable_stream_default_writer_write(writer, chunk);
WebIDL::resolve_promise(realm, promise, JS::js_undefined());
});
// 1. Let error be signals abort reason.
auto error = signal->reason();
auto success_steps = GC::create_function(realm.heap(), [promise, &realm, reader, writer]() {
// Make sure we close the acquired writer.
WebIDL::resolve_promise(realm, writable_stream_default_writer_close(*writer), JS::js_undefined());
readable_stream_default_reader_release(*reader);
// 2. Let actions be an empty ordered set.
GC::Ptr<GC::Function<GC::Ref<WebIDL::Promise>()>> abort_destination;
GC::Ptr<GC::Function<GC::Ref<WebIDL::Promise>()>> cancel_source;
WebIDL::resolve_promise(realm, promise, JS::js_undefined());
});
// 3. If preventAbort is false, append the following action to actions:
if (!prevent_abort) {
abort_destination = GC::create_function(realm.heap(), [&realm, dest, error]() {
// 1. If dest.[[state]] is "writable", return ! WritableStreamAbort(dest, error).
if (dest->state() == WritableStream::State::Writable)
return writable_stream_abort(dest, error);
auto failure_steps = GC::create_function(realm.heap(), [promise, &realm, reader, writer](JS::Value error) {
// Make sure we close the acquired writer.
WebIDL::resolve_promise(realm, writable_stream_default_writer_close(*writer), JS::js_undefined());
readable_stream_default_reader_release(*reader);
// 2. Otherwise, return a promise resolved with undefined.
return WebIDL::create_resolved_promise(realm, JS::js_undefined());
});
}
WebIDL::reject_promise(realm, promise, error);
});
// 4. If preventCancel is false, append the following action action to actions:
if (!prevent_cancel) {
cancel_source = GC::create_function(realm.heap(), [&realm, source, error]() {
// 1. If source.[[state]] is "readable", return ! ReadableStreamCancel(source, error).
if (source->state() == ReadableStream::State::Readable)
return readable_stream_cancel(source, error);
reader->read_all_chunks(chunk_steps, success_steps, failure_steps);
// 2. Otherwise, return a promise resolved with undefined.
return WebIDL::create_resolved_promise(realm, JS::js_undefined());
});
}
// 5. Shutdown with an action consisting of getting a promise to wait for all of the actions in actions, and with error.
auto action = GC::create_function(realm.heap(), [&realm, abort_destination, cancel_source]() {
GC::RootVector<GC::Ref<WebIDL::Promise>> actions(realm.heap());
if (abort_destination)
actions.append(abort_destination->function()());
if (cancel_source)
actions.append(cancel_source->function()());
return WebIDL::get_promise_for_wait_for_all(realm, actions);
});
operation->shutdown_with_action(action, error);
};
// 2. If signal is aborted, perform abortAlgorithm and return promise.
if (signal->aborted()) {
abort_algorithm();
return promise;
}
// 3. Add abortAlgorithm to signal.
auto signal_id = signal->add_abort_algorithm(move(abort_algorithm));
operation->set_abort_signal(*signal, signal_id.value());
}
// 15. In parallel (but not really; see #905), using reader and writer, read all chunks from source and write them
// to dest. Due to the locking provided by the reader and writer, the exact manner in which this happens is not
// observable to author code, and so there is flexibility in how this is done.
operation->process();
// 16. Return promise.
return promise;

View file

@ -66,6 +66,7 @@ void ReadableStreamDefaultReader::visit_edges(Cell::Visitor& visitor)
ReadableStreamGenericReaderMixin::visit_edges(visitor);
for (auto& request : m_read_requests)
visitor.visit(request);
visitor.visit(m_readable_stream_pipe_to_operation);
}
// https://streams.spec.whatwg.org/#read-loop
@ -212,42 +213,6 @@ void ReadableStreamDefaultReader::read_all_bytes(GC::Ref<ReadLoopReadRequest::Su
readable_stream_default_reader_read(*this, read_request);
}
void ReadableStreamDefaultReader::read_all_chunks(GC::Ref<ReadAllOnChunkSteps> chunk_steps, GC::Ref<ReadAllOnSuccessSteps> success_steps, GC::Ref<ReadAllOnFailureSteps> failure_steps)
{
// AD-HOC: Some spec steps direct us to "read all chunks" from a stream, but there isn't an AO defined to do that.
// We implement those steps by continuously making default read requests, which is an identity transformation,
// with a custom callback to receive each chunk that is read. This is done until the controller signals
// that there are no more chunks to consume.
// This function is based on "read_all_bytes" above.
auto promise_capability = read();
WebIDL::react_to_promise(
promise_capability,
GC::create_function(heap(), [this, chunk_steps, success_steps, failure_steps](JS::Value value) -> WebIDL::ExceptionOr<JS::Value> {
auto& vm = this->vm();
VERIFY(value.is_object());
auto& value_object = value.as_object();
auto done = MUST(JS::iterator_complete(vm, value_object));
if (!done) {
auto chunk = MUST(JS::iterator_value(vm, value_object));
chunk_steps->function()(chunk);
read_all_chunks(chunk_steps, success_steps, failure_steps);
} else {
success_steps->function()();
}
return JS::js_undefined();
}),
GC::create_function(heap(), [failure_steps](JS::Value error) -> WebIDL::ExceptionOr<JS::Value> {
failure_steps->function()(error);
return JS::js_undefined();
}));
}
// FIXME: This function is a promise-based wrapper around "read all bytes". The spec changed this function to not use promises
// in https://github.com/whatwg/streams/commit/f894acdd417926a2121710803cef593e15127964 - however, it seems that the
// FileAPI blob specification has not been updated to match, see: https://github.com/w3c/FileAPI/issues/187.

View file

@ -20,6 +20,8 @@ struct ReadableStreamReadResult {
bool done;
};
class ReadableStreamPipeTo;
class ReadRequest : public JS::Cell {
GC_CELL(ReadRequest, JS::Cell);
@ -91,13 +93,14 @@ public:
void read_a_chunk(Fetch::Infrastructure::IncrementalReadLoopReadRequest& read_request);
void read_all_bytes(GC::Ref<ReadLoopReadRequest::SuccessSteps>, GC::Ref<ReadLoopReadRequest::FailureSteps>);
void read_all_chunks(GC::Ref<ReadAllOnChunkSteps>, GC::Ref<ReadAllOnSuccessSteps>, GC::Ref<ReadAllOnFailureSteps>);
GC::Ref<WebIDL::Promise> read_all_bytes_deprecated();
void release_lock();
SinglyLinkedList<GC::Ref<ReadRequest>>& read_requests() { return m_read_requests; }
void set_readable_stream_pipe_to_operation(Badge<ReadableStreamPipeTo>, GC::Ptr<JS::Cell> readable_stream_pipe_to_operation) { m_readable_stream_pipe_to_operation = readable_stream_pipe_to_operation; }
private:
explicit ReadableStreamDefaultReader(JS::Realm&);
@ -106,6 +109,8 @@ private:
virtual void visit_edges(Cell::Visitor&) override;
SinglyLinkedList<GC::Ref<ReadRequest>> m_read_requests;
GC::Ptr<JS::Cell> m_readable_stream_pipe_to_operation;
};
}

View file

@ -175,8 +175,14 @@ GC::Ref<Promise> upon_rejection(Promise const& promise, GC::Ref<ReactionSteps> s
void mark_promise_as_handled(Promise const& promise)
{
// To mark as handled a Promise<T> promise, set promise.[[Promise]].[[PromiseIsHandled]] to true.
auto promise_object = as<JS::Promise>(promise.promise().ptr());
promise_object->set_is_handled();
auto& promise_object = as<JS::Promise>(*promise.promise());
promise_object.set_is_handled();
}
bool is_promise_fulfilled(Promise const& promise)
{
auto const& promise_object = as<JS::Promise>(*promise.promise());
return promise_object.state() == JS::Promise::State::Fulfilled;
}
struct WaitForAllResults : JS::Cell {

View file

@ -29,6 +29,7 @@ GC::Ref<Promise> react_to_promise(Promise const&, GC::Ptr<ReactionSteps> on_fulf
GC::Ref<Promise> upon_fulfillment(Promise const&, GC::Ref<ReactionSteps>);
GC::Ref<Promise> upon_rejection(Promise const&, GC::Ref<ReactionSteps>);
void mark_promise_as_handled(Promise const&);
bool is_promise_fulfilled(Promise const&);
void wait_for_all(JS::Realm&, Vector<GC::Ref<Promise>> const& promises, Function<void(Vector<JS::Value> const&)> success_steps, Function<void(JS::Value)> failure_steps);
GC::Ref<Promise> get_promise_for_wait_for_all(JS::Realm&, Vector<GC::Ref<Promise>> const& promises);

View file

@ -1,4 +1,4 @@
Harness status: Error
Harness status: OK
Found 21 tests

View file

@ -1,4 +1,4 @@
Harness status: Error
Harness status: OK
Found 6 tests

View file

@ -0,0 +1,38 @@
Harness status: OK
Found 33 tests
33 Pass
Pass a signal argument 'null' should cause pipeTo() to reject
Pass a signal argument 'AbortSignal' should cause pipeTo() to reject
Pass a signal argument 'true' should cause pipeTo() to reject
Pass a signal argument '-1' should cause pipeTo() to reject
Pass a signal argument '[object AbortSignal]' should cause pipeTo() to reject
Pass an aborted signal should cause the writable stream to reject with an AbortError
Pass (reason: 'null') all the error objects should be the same object
Pass (reason: 'undefined') all the error objects should be the same object
Pass (reason: 'error1: error1') all the error objects should be the same object
Pass preventCancel should prevent canceling the readable
Pass preventAbort should prevent aborting the readable
Pass preventCancel and preventAbort should prevent canceling the readable and aborting the readable
Pass (reason: 'null') abort should prevent further reads
Pass (reason: 'undefined') abort should prevent further reads
Pass (reason: 'error1: error1') abort should prevent further reads
Pass (reason: 'null') all pending writes should complete on abort
Pass (reason: 'undefined') all pending writes should complete on abort
Pass (reason: 'error1: error1') all pending writes should complete on abort
Pass (reason: 'null') underlyingSource.cancel() should called when abort, even with pending pull
Pass (reason: 'undefined') underlyingSource.cancel() should called when abort, even with pending pull
Pass (reason: 'error1: error1') underlyingSource.cancel() should called when abort, even with pending pull
Pass a rejection from underlyingSource.cancel() should be returned by pipeTo()
Pass a rejection from underlyingSink.abort() should be returned by pipeTo()
Pass a rejection from underlyingSink.abort() should be preferred to one from underlyingSource.cancel()
Pass abort signal takes priority over closed readable
Pass abort signal takes priority over errored readable
Pass abort signal takes priority over closed writable
Pass abort signal takes priority over errored writable
Pass abort should do nothing after the readable is closed
Pass abort should do nothing after the readable is errored
Pass abort should do nothing after the readable is errored, even with pending writes
Pass abort should do nothing after the writable is errored
Pass pipeTo on a teed readable byte stream should only be aborted when both branches are aborted

View file

@ -0,0 +1,21 @@
Harness status: OK
Found 16 tests
16 Pass
Pass Closing must be propagated backward: starts closed; preventCancel omitted; fulfilled cancel promise
Pass Closing must be propagated backward: starts closed; preventCancel omitted; rejected cancel promise
Pass Closing must be propagated backward: starts closed; preventCancel = undefined (falsy); fulfilled cancel promise
Pass Closing must be propagated backward: starts closed; preventCancel = null (falsy); fulfilled cancel promise
Pass Closing must be propagated backward: starts closed; preventCancel = false (falsy); fulfilled cancel promise
Pass Closing must be propagated backward: starts closed; preventCancel = 0 (falsy); fulfilled cancel promise
Pass Closing must be propagated backward: starts closed; preventCancel = -0 (falsy); fulfilled cancel promise
Pass Closing must be propagated backward: starts closed; preventCancel = NaN (falsy); fulfilled cancel promise
Pass Closing must be propagated backward: starts closed; preventCancel = (falsy); fulfilled cancel promise
Pass Closing must be propagated backward: starts closed; preventCancel = true (truthy)
Pass Closing must be propagated backward: starts closed; preventCancel = a (truthy)
Pass Closing must be propagated backward: starts closed; preventCancel = 1 (truthy)
Pass Closing must be propagated backward: starts closed; preventCancel = Symbol() (truthy)
Pass Closing must be propagated backward: starts closed; preventCancel = [object Object] (truthy)
Pass Closing must be propagated backward: starts closed; preventCancel = true, preventAbort = true
Pass Closing must be propagated backward: starts closed; preventCancel = true, preventAbort = true, preventClose = true

View file

@ -0,0 +1,35 @@
Harness status: OK
Found 30 tests
30 Pass
Pass Closing must be propagated forward: starts closed; preventClose omitted; fulfilled close promise
Pass Closing must be propagated forward: starts closed; preventClose omitted; rejected close promise
Pass Closing must be propagated forward: starts closed; preventClose = undefined (falsy); fulfilled close promise
Pass Closing must be propagated forward: starts closed; preventClose = null (falsy); fulfilled close promise
Pass Closing must be propagated forward: starts closed; preventClose = false (falsy); fulfilled close promise
Pass Closing must be propagated forward: starts closed; preventClose = 0 (falsy); fulfilled close promise
Pass Closing must be propagated forward: starts closed; preventClose = -0 (falsy); fulfilled close promise
Pass Closing must be propagated forward: starts closed; preventClose = NaN (falsy); fulfilled close promise
Pass Closing must be propagated forward: starts closed; preventClose = (falsy); fulfilled close promise
Pass Closing must be propagated forward: starts closed; preventClose = true (truthy)
Pass Closing must be propagated forward: starts closed; preventClose = a (truthy)
Pass Closing must be propagated forward: starts closed; preventClose = 1 (truthy)
Pass Closing must be propagated forward: starts closed; preventClose = Symbol() (truthy)
Pass Closing must be propagated forward: starts closed; preventClose = [object Object] (truthy)
Pass Closing must be propagated forward: starts closed; preventClose = true, preventAbort = true
Pass Closing must be propagated forward: starts closed; preventClose = true, preventAbort = true, preventCancel = true
Pass Closing must be propagated forward: becomes closed asynchronously; preventClose omitted; fulfilled close promise
Pass Closing must be propagated forward: becomes closed asynchronously; preventClose omitted; rejected close promise
Pass Closing must be propagated forward: becomes closed asynchronously; preventClose = true
Pass Closing must be propagated forward: becomes closed asynchronously; dest never desires chunks; preventClose omitted; fulfilled close promise
Pass Closing must be propagated forward: becomes closed asynchronously; dest never desires chunks; preventClose omitted; rejected close promise
Pass Closing must be propagated forward: becomes closed asynchronously; dest never desires chunks; preventClose = true
Pass Closing must be propagated forward: becomes closed after one chunk; preventClose omitted; fulfilled close promise
Pass Closing must be propagated forward: becomes closed after one chunk; preventClose omitted; rejected close promise
Pass Closing must be propagated forward: becomes closed after one chunk; preventClose = true
Pass Closing must be propagated forward: shutdown must not occur until the final write completes
Pass Closing must be propagated forward: shutdown must not occur until the final write completes; preventClose = true
Pass Closing must be propagated forward: shutdown must not occur until the final write completes; becomes closed after first write
Pass Closing must be propagated forward: shutdown must not occur until the final write completes; becomes closed after first write; preventClose = true
Pass Closing must be propagated forward: erroring the writable while flushing pending writes should error pipeTo

View file

@ -0,0 +1,40 @@
Harness status: OK
Found 35 tests
35 Pass
Pass Errors must be propagated backward: starts errored; preventCancel omitted; fulfilled cancel promise
Pass Errors must be propagated backward: becomes errored before piping due to write; preventCancel omitted; fulfilled cancel promise
Pass Errors must be propagated backward: becomes errored before piping due to write; preventCancel omitted; rejected cancel promise
Pass Errors must be propagated backward: becomes errored before piping due to write; preventCancel = undefined (falsy); fulfilled cancel promise
Pass Errors must be propagated backward: becomes errored before piping due to write; preventCancel = null (falsy); fulfilled cancel promise
Pass Errors must be propagated backward: becomes errored before piping due to write; preventCancel = false (falsy); fulfilled cancel promise
Pass Errors must be propagated backward: becomes errored before piping due to write; preventCancel = 0 (falsy); fulfilled cancel promise
Pass Errors must be propagated backward: becomes errored before piping due to write; preventCancel = -0 (falsy); fulfilled cancel promise
Pass Errors must be propagated backward: becomes errored before piping due to write; preventCancel = NaN (falsy); fulfilled cancel promise
Pass Errors must be propagated backward: becomes errored before piping due to write; preventCancel = (falsy); fulfilled cancel promise
Pass Errors must be propagated backward: becomes errored before piping due to write; preventCancel = true (truthy)
Pass Errors must be propagated backward: becomes errored before piping due to write; preventCancel = a (truthy)
Pass Errors must be propagated backward: becomes errored before piping due to write; preventCancel = 1 (truthy)
Pass Errors must be propagated backward: becomes errored before piping due to write; preventCancel = Symbol() (truthy)
Pass Errors must be propagated backward: becomes errored before piping due to write; preventCancel = [object Object] (truthy)
Pass Errors must be propagated backward: becomes errored before piping due to write, preventCancel = true; preventAbort = true
Pass Errors must be propagated backward: becomes errored before piping due to write; preventCancel = true, preventAbort = true, preventClose = true
Pass Errors must be propagated backward: becomes errored during piping due to write; preventCancel omitted; fulfilled cancel promise
Pass Errors must be propagated backward: becomes errored during piping due to write; preventCancel omitted; rejected cancel promise
Pass Errors must be propagated backward: becomes errored during piping due to write; preventCancel = true
Pass Errors must be propagated backward: becomes errored during piping due to write, but async; preventCancel = false; fulfilled cancel promise
Pass Errors must be propagated backward: becomes errored during piping due to write, but async; preventCancel = false; rejected cancel promise
Pass Errors must be propagated backward: becomes errored during piping due to write, but async; preventCancel = true
Pass Errors must be propagated backward: becomes errored after piping; preventCancel omitted; fulfilled cancel promise
Pass Errors must be propagated backward: becomes errored after piping; preventCancel omitted; rejected cancel promise
Pass Errors must be propagated backward: becomes errored after piping; preventCancel = true
Pass Errors must be propagated backward: becomes errored after piping due to last write; source is closed; preventCancel omitted (but cancel is never called)
Pass Errors must be propagated backward: becomes errored after piping due to last write; source is closed; preventCancel = true
Pass Errors must be propagated backward: becomes errored after piping; dest never desires chunks; preventCancel = false; fulfilled cancel promise
Pass Errors must be propagated backward: becomes errored after piping; dest never desires chunks; preventCancel = false; rejected cancel promise
Pass Errors must be propagated backward: becomes errored after piping; dest never desires chunks; preventCancel = true
Pass Errors must be propagated backward: becomes errored before piping via abort; preventCancel omitted; fulfilled cancel promise
Pass Errors must be propagated backward: becomes errored before piping via abort; preventCancel omitted; rejected cancel promise
Pass Errors must be propagated backward: becomes errored before piping via abort; preventCancel = true
Pass Errors must be propagated backward: erroring via the controller errors once pending write completes

View file

@ -0,0 +1,37 @@
Harness status: OK
Found 32 tests
32 Pass
Pass Errors must be propagated forward: starts errored; preventAbort = false; fulfilled abort promise
Pass Errors must be propagated forward: starts errored; preventAbort = false; rejected abort promise
Pass Errors must be propagated forward: starts errored; preventAbort = undefined (falsy); fulfilled abort promise
Pass Errors must be propagated forward: starts errored; preventAbort = null (falsy); fulfilled abort promise
Pass Errors must be propagated forward: starts errored; preventAbort = false (falsy); fulfilled abort promise
Pass Errors must be propagated forward: starts errored; preventAbort = 0 (falsy); fulfilled abort promise
Pass Errors must be propagated forward: starts errored; preventAbort = -0 (falsy); fulfilled abort promise
Pass Errors must be propagated forward: starts errored; preventAbort = NaN (falsy); fulfilled abort promise
Pass Errors must be propagated forward: starts errored; preventAbort = (falsy); fulfilled abort promise
Pass Errors must be propagated forward: starts errored; preventAbort = true (truthy)
Pass Errors must be propagated forward: starts errored; preventAbort = a (truthy)
Pass Errors must be propagated forward: starts errored; preventAbort = 1 (truthy)
Pass Errors must be propagated forward: starts errored; preventAbort = Symbol() (truthy)
Pass Errors must be propagated forward: starts errored; preventAbort = [object Object] (truthy)
Pass Errors must be propagated forward: starts errored; preventAbort = true, preventCancel = true
Pass Errors must be propagated forward: starts errored; preventAbort = true, preventCancel = true, preventClose = true
Pass Errors must be propagated forward: becomes errored while empty; preventAbort = false; fulfilled abort promise
Pass Errors must be propagated forward: becomes errored while empty; preventAbort = false; rejected abort promise
Pass Errors must be propagated forward: becomes errored while empty; preventAbort = true
Pass Errors must be propagated forward: becomes errored while empty; dest never desires chunks; preventAbort = false; fulfilled abort promise
Pass Errors must be propagated forward: becomes errored while empty; dest never desires chunks; preventAbort = false; rejected abort promise
Pass Errors must be propagated forward: becomes errored while empty; dest never desires chunks; preventAbort = true
Pass Errors must be propagated forward: becomes errored after one chunk; preventAbort = false; fulfilled abort promise
Pass Errors must be propagated forward: becomes errored after one chunk; preventAbort = false; rejected abort promise
Pass Errors must be propagated forward: becomes errored after one chunk; preventAbort = true
Pass Errors must be propagated forward: becomes errored after one chunk; dest never desires chunks; preventAbort = false; fulfilled abort promise
Pass Errors must be propagated forward: becomes errored after one chunk; dest never desires chunks; preventAbort = false; rejected abort promise
Pass Errors must be propagated forward: becomes errored after one chunk; dest never desires chunks; preventAbort = true
Pass Errors must be propagated forward: shutdown must not occur until the final write completes
Pass Errors must be propagated forward: shutdown must not occur until the final write completes; preventAbort = true
Pass Errors must be propagated forward: shutdown must not occur until the final write completes; becomes errored after first write
Pass Errors must be propagated forward: shutdown must not occur until the final write completes; becomes errored after first write; preventAbort = true

View file

@ -0,0 +1,10 @@
Harness status: OK
Found 5 tests
5 Pass
Pass Piping from a non-empty ReadableStream into a WritableStream that does not desire chunks
Pass Piping from a non-empty ReadableStream into a WritableStream that does not desire chunks, but then does
Pass Piping from an empty ReadableStream into a WritableStream that does not desire chunks, but then the readable stream becomes non-empty and the writable stream starts desiring chunks
Pass Piping from a ReadableStream to a WritableStream that desires more chunks before finishing with previous ones
Pass Piping to a WritableStream that does not consume the writes fast enough exerts backpressure on the ReadableStream

View file

@ -0,0 +1,19 @@
Harness status: OK
Found 14 tests
14 Pass
Pass Piping must lock both the ReadableStream and WritableStream
Pass Piping finishing must unlock both the ReadableStream and WritableStream
Pass pipeTo must check the brand of its ReadableStream this value
Pass pipeTo must check the brand of its WritableStream argument
Pass pipeTo must fail if the ReadableStream is locked, and not lock the WritableStream
Pass pipeTo must fail if the WritableStream is locked, and not lock the ReadableStream
Pass Piping from a ReadableStream from which lots of chunks are synchronously readable
Pass Piping from a ReadableStream for which a chunk becomes asynchronously readable after the pipeTo
Pass an undefined rejection from pull should cause pipeTo() to reject when preventAbort is true
Pass an undefined rejection from pull should cause pipeTo() to reject when preventAbort is false
Pass an undefined rejection from write should cause pipeTo() to reject when preventCancel is true
Pass an undefined rejection from write should cause pipeTo() to reject when preventCancel is false
Pass pipeTo() should reject if an option getter grabs a writer
Pass pipeTo() promise should resolve if null is passed

View file

@ -0,0 +1,14 @@
Harness status: OK
Found 9 tests
9 Pass
Pass Piping from an errored readable stream to an erroring writable stream
Pass Piping from an errored readable stream to an errored writable stream
Pass Piping from an errored readable stream to an erroring writable stream; preventAbort = true
Pass Piping from an errored readable stream to an errored writable stream; preventAbort = true
Pass Piping from an errored readable stream to a closing writable stream
Pass Piping from an errored readable stream to a closed writable stream
Pass Piping from a closed readable stream to an erroring writable stream
Pass Piping from a closed readable stream to an errored writable stream
Pass Piping from a closed readable stream to a closed writable stream

View file

@ -1,4 +1,4 @@
Harness status: Error
Harness status: OK
Found 39 tests

View file

@ -1,4 +1,4 @@
Harness status: Error
Harness status: OK
Found 62 tests

View file

@ -0,0 +1,16 @@
<!doctype html>
<meta charset=utf-8>
<script>
self.GLOBAL = {
isWindow: function() { return true; },
isWorker: function() { return false; },
isShadowRealm: function() { return false; },
};
</script>
<script src="../../resources/testharness.js"></script>
<script src="../../resources/testharnessreport.js"></script>
<script src="../resources/recording-streams.js"></script>
<script src="../resources/test-utils.js"></script>
<div id=log></div>
<script src="../../streams/piping/abort.any.js"></script>

View file

@ -0,0 +1,448 @@
// META: global=window,worker,shadowrealm
// META: script=../resources/recording-streams.js
// META: script=../resources/test-utils.js
'use strict';
// Tests for the use of pipeTo with AbortSignal.
// There is some extra complexity to avoid timeouts in environments where abort is not implemented.
const error1 = new Error('error1');
error1.name = 'error1';
const error2 = new Error('error2');
error2.name = 'error2';
const errorOnPull = {
pull(controller) {
// This will cause the test to error if pipeTo abort is not implemented.
controller.error('failed to abort');
}
};
// To stop pull() being called immediately when the stream is created, we need to set highWaterMark to 0.
const hwm0 = { highWaterMark: 0 };
for (const invalidSignal of [null, 'AbortSignal', true, -1, Object.create(AbortSignal.prototype)]) {
promise_test(t => {
const rs = recordingReadableStream(errorOnPull, hwm0);
const ws = recordingWritableStream();
return promise_rejects_js(t, TypeError, rs.pipeTo(ws, { signal: invalidSignal }), 'pipeTo should reject')
.then(() => {
assert_equals(rs.events.length, 0, 'no ReadableStream methods should have been called');
assert_equals(ws.events.length, 0, 'no WritableStream methods should have been called');
});
}, `a signal argument '${invalidSignal}' should cause pipeTo() to reject`);
}
promise_test(t => {
const rs = recordingReadableStream(errorOnPull, hwm0);
const ws = new WritableStream();
const abortController = new AbortController();
const signal = abortController.signal;
abortController.abort();
return promise_rejects_dom(t, 'AbortError', rs.pipeTo(ws, { signal }), 'pipeTo should reject')
.then(() => Promise.all([
rs.getReader().closed,
promise_rejects_dom(t, 'AbortError', ws.getWriter().closed, 'writer.closed should reject')
]))
.then(() => {
assert_equals(rs.events.length, 2, 'cancel should have been called');
assert_equals(rs.events[0], 'cancel', 'first event should be cancel');
assert_equals(rs.events[1].name, 'AbortError', 'the argument to cancel should be an AbortError');
assert_equals(rs.events[1].constructor.name, 'DOMException',
'the argument to cancel should be a DOMException');
});
}, 'an aborted signal should cause the writable stream to reject with an AbortError');
for (const reason of [null, undefined, error1]) {
promise_test(async t => {
const rs = recordingReadableStream(errorOnPull, hwm0);
const ws = new WritableStream();
const abortController = new AbortController();
const signal = abortController.signal;
abortController.abort(reason);
const pipeToPromise = rs.pipeTo(ws, { signal });
if (reason !== undefined) {
await promise_rejects_exactly(t, reason, pipeToPromise, 'pipeTo rejects with abort reason');
} else {
await promise_rejects_dom(t, 'AbortError', pipeToPromise, 'pipeTo rejects with AbortError');
}
const error = await pipeToPromise.catch(e => e);
await rs.getReader().closed;
await promise_rejects_exactly(t, error, ws.getWriter().closed, 'the writable should be errored with the same object');
assert_equals(signal.reason, error, 'signal.reason should be error'),
assert_equals(rs.events.length, 2, 'cancel should have been called');
assert_equals(rs.events[0], 'cancel', 'first event should be cancel');
assert_equals(rs.events[1], error, 'the readable should be canceled with the same object');
}, `(reason: '${reason}') all the error objects should be the same object`);
}
promise_test(t => {
const rs = recordingReadableStream(errorOnPull, hwm0);
const ws = new WritableStream();
const abortController = new AbortController();
const signal = abortController.signal;
abortController.abort();
return promise_rejects_dom(t, 'AbortError', rs.pipeTo(ws, { signal, preventCancel: true }), 'pipeTo should reject')
.then(() => assert_equals(rs.events.length, 0, 'cancel should not be called'));
}, 'preventCancel should prevent canceling the readable');
promise_test(t => {
const rs = new ReadableStream(errorOnPull, hwm0);
const ws = recordingWritableStream();
const abortController = new AbortController();
const signal = abortController.signal;
abortController.abort();
return promise_rejects_dom(t, 'AbortError', rs.pipeTo(ws, { signal, preventAbort: true }), 'pipeTo should reject')
.then(() => {
assert_equals(ws.events.length, 0, 'writable should not have been aborted');
return ws.getWriter().ready;
});
}, 'preventAbort should prevent aborting the readable');
promise_test(t => {
const rs = recordingReadableStream(errorOnPull, hwm0);
const ws = recordingWritableStream();
const abortController = new AbortController();
const signal = abortController.signal;
abortController.abort();
return promise_rejects_dom(t, 'AbortError', rs.pipeTo(ws, { signal, preventCancel: true, preventAbort: true }),
'pipeTo should reject')
.then(() => {
assert_equals(rs.events.length, 0, 'cancel should not be called');
assert_equals(ws.events.length, 0, 'writable should not have been aborted');
return ws.getWriter().ready;
});
}, 'preventCancel and preventAbort should prevent canceling the readable and aborting the readable');
for (const reason of [null, undefined, error1]) {
promise_test(async t => {
const rs = new ReadableStream({
start(controller) {
controller.enqueue('a');
controller.enqueue('b');
controller.close();
}
});
const abortController = new AbortController();
const signal = abortController.signal;
const ws = recordingWritableStream({
write() {
abortController.abort(reason);
}
});
const pipeToPromise = rs.pipeTo(ws, { signal });
if (reason !== undefined) {
await promise_rejects_exactly(t, reason, pipeToPromise, 'pipeTo rejects with abort reason');
} else {
await promise_rejects_dom(t, 'AbortError', pipeToPromise, 'pipeTo rejects with AbortError');
}
const error = await pipeToPromise.catch(e => e);
assert_equals(signal.reason, error, 'signal.reason should be error');
assert_equals(ws.events.length, 4, 'only chunk "a" should have been written');
assert_array_equals(ws.events.slice(0, 3), ['write', 'a', 'abort'], 'events should match');
assert_equals(ws.events[3], error, 'abort reason should be error');
}, `(reason: '${reason}') abort should prevent further reads`);
}
for (const reason of [null, undefined, error1]) {
promise_test(async t => {
let readController;
const rs = new ReadableStream({
start(c) {
readController = c;
c.enqueue('a');
c.enqueue('b');
}
});
const abortController = new AbortController();
const signal = abortController.signal;
let resolveWrite;
const writePromise = new Promise(resolve => {
resolveWrite = resolve;
});
const ws = recordingWritableStream({
write() {
return writePromise;
}
}, new CountQueuingStrategy({ highWaterMark: Infinity }));
const pipeToPromise = rs.pipeTo(ws, { signal });
await delay(0);
await abortController.abort(reason);
await readController.close(); // Make sure the test terminates when signal is not implemented.
await resolveWrite();
if (reason !== undefined) {
await promise_rejects_exactly(t, reason, pipeToPromise, 'pipeTo rejects with abort reason');
} else {
await promise_rejects_dom(t, 'AbortError', pipeToPromise, 'pipeTo rejects with AbortError');
}
const error = await pipeToPromise.catch(e => e);
assert_equals(signal.reason, error, 'signal.reason should be error');
assert_equals(ws.events.length, 6, 'chunks "a" and "b" should have been written');
assert_array_equals(ws.events.slice(0, 5), ['write', 'a', 'write', 'b', 'abort'], 'events should match');
assert_equals(ws.events[5], error, 'abort reason should be error');
}, `(reason: '${reason}') all pending writes should complete on abort`);
}
for (const reason of [null, undefined, error1]) {
promise_test(async t => {
let rejectPull;
const pullPromise = new Promise((_, reject) => {
rejectPull = reject;
});
let rejectCancel;
const cancelPromise = new Promise((_, reject) => {
rejectCancel = reject;
});
const rs = recordingReadableStream({
async pull() {
await Promise.race([
pullPromise,
cancelPromise,
]);
},
cancel(reason) {
rejectCancel(reason);
},
});
const ws = new WritableStream();
const abortController = new AbortController();
const signal = abortController.signal;
const pipeToPromise = rs.pipeTo(ws, { signal });
pipeToPromise.catch(() => {}); // Prevent unhandled rejection.
await delay(0);
abortController.abort(reason);
rejectPull('should not catch pull rejection');
await delay(0);
assert_equals(rs.eventsWithoutPulls.length, 2, 'cancel should have been called');
assert_equals(rs.eventsWithoutPulls[0], 'cancel', 'first event should be cancel');
if (reason !== undefined) {
await promise_rejects_exactly(t, reason, pipeToPromise, 'pipeTo rejects with abort reason');
} else {
await promise_rejects_dom(t, 'AbortError', pipeToPromise, 'pipeTo rejects with AbortError');
}
}, `(reason: '${reason}') underlyingSource.cancel() should called when abort, even with pending pull`);
}
promise_test(t => {
const rs = new ReadableStream({
pull(controller) {
controller.error('failed to abort');
},
cancel() {
return Promise.reject(error1);
}
}, hwm0);
const ws = new WritableStream();
const abortController = new AbortController();
const signal = abortController.signal;
abortController.abort();
return promise_rejects_exactly(t, error1, rs.pipeTo(ws, { signal }), 'pipeTo should reject');
}, 'a rejection from underlyingSource.cancel() should be returned by pipeTo()');
promise_test(t => {
const rs = new ReadableStream(errorOnPull, hwm0);
const ws = new WritableStream({
abort() {
return Promise.reject(error1);
}
});
const abortController = new AbortController();
const signal = abortController.signal;
abortController.abort();
return promise_rejects_exactly(t, error1, rs.pipeTo(ws, { signal }), 'pipeTo should reject');
}, 'a rejection from underlyingSink.abort() should be returned by pipeTo()');
promise_test(t => {
const events = [];
const rs = new ReadableStream({
pull(controller) {
controller.error('failed to abort');
},
cancel() {
events.push('cancel');
return Promise.reject(error1);
}
}, hwm0);
const ws = new WritableStream({
abort() {
events.push('abort');
return Promise.reject(error2);
}
});
const abortController = new AbortController();
const signal = abortController.signal;
abortController.abort();
return promise_rejects_exactly(t, error2, rs.pipeTo(ws, { signal }), 'pipeTo should reject')
.then(() => assert_array_equals(events, ['abort', 'cancel'], 'abort() should be called before cancel()'));
}, 'a rejection from underlyingSink.abort() should be preferred to one from underlyingSource.cancel()');
promise_test(t => {
const rs = new ReadableStream({
start(controller) {
controller.close();
}
});
const ws = new WritableStream();
const abortController = new AbortController();
const signal = abortController.signal;
abortController.abort();
return promise_rejects_dom(t, 'AbortError', rs.pipeTo(ws, { signal }), 'pipeTo should reject');
}, 'abort signal takes priority over closed readable');
promise_test(t => {
const rs = new ReadableStream({
start(controller) {
controller.error(error1);
}
});
const ws = new WritableStream();
const abortController = new AbortController();
const signal = abortController.signal;
abortController.abort();
return promise_rejects_dom(t, 'AbortError', rs.pipeTo(ws, { signal }), 'pipeTo should reject');
}, 'abort signal takes priority over errored readable');
promise_test(t => {
const rs = new ReadableStream({
pull(controller) {
controller.error('failed to abort');
}
}, hwm0);
const ws = new WritableStream();
const abortController = new AbortController();
const signal = abortController.signal;
abortController.abort();
const writer = ws.getWriter();
return writer.close().then(() => {
writer.releaseLock();
return promise_rejects_dom(t, 'AbortError', rs.pipeTo(ws, { signal }), 'pipeTo should reject');
});
}, 'abort signal takes priority over closed writable');
promise_test(t => {
const rs = new ReadableStream({
pull(controller) {
controller.error('failed to abort');
}
}, hwm0);
const ws = new WritableStream({
start(controller) {
controller.error(error1);
}
});
const abortController = new AbortController();
const signal = abortController.signal;
abortController.abort();
return promise_rejects_dom(t, 'AbortError', rs.pipeTo(ws, { signal }), 'pipeTo should reject');
}, 'abort signal takes priority over errored writable');
promise_test(() => {
let readController;
const rs = new ReadableStream({
start(c) {
readController = c;
}
});
const ws = new WritableStream();
const abortController = new AbortController();
const signal = abortController.signal;
const pipeToPromise = rs.pipeTo(ws, { signal, preventClose: true });
readController.close();
return Promise.resolve().then(() => {
abortController.abort();
return pipeToPromise;
}).then(() => ws.getWriter().write('this should succeed'));
}, 'abort should do nothing after the readable is closed');
promise_test(t => {
let readController;
const rs = new ReadableStream({
start(c) {
readController = c;
}
});
const ws = new WritableStream();
const abortController = new AbortController();
const signal = abortController.signal;
const pipeToPromise = rs.pipeTo(ws, { signal, preventAbort: true });
readController.error(error1);
return Promise.resolve().then(() => {
abortController.abort();
return promise_rejects_exactly(t, error1, pipeToPromise, 'pipeTo should reject');
}).then(() => ws.getWriter().write('this should succeed'));
}, 'abort should do nothing after the readable is errored');
promise_test(t => {
let readController;
const rs = new ReadableStream({
start(c) {
readController = c;
}
});
let resolveWrite;
const writePromise = new Promise(resolve => {
resolveWrite = resolve;
});
const ws = new WritableStream({
write() {
readController.error(error1);
return writePromise;
}
});
const abortController = new AbortController();
const signal = abortController.signal;
const pipeToPromise = rs.pipeTo(ws, { signal, preventAbort: true });
readController.enqueue('a');
return delay(0).then(() => {
abortController.abort();
resolveWrite();
return promise_rejects_exactly(t, error1, pipeToPromise, 'pipeTo should reject');
}).then(() => ws.getWriter().write('this should succeed'));
}, 'abort should do nothing after the readable is errored, even with pending writes');
promise_test(t => {
const rs = recordingReadableStream({
pull(controller) {
return delay(0).then(() => controller.close());
}
});
let writeController;
const ws = new WritableStream({
start(c) {
writeController = c;
}
});
const abortController = new AbortController();
const signal = abortController.signal;
const pipeToPromise = rs.pipeTo(ws, { signal, preventCancel: true });
return Promise.resolve().then(() => {
writeController.error(error1);
return Promise.resolve();
}).then(() => {
abortController.abort();
return promise_rejects_exactly(t, error1, pipeToPromise, 'pipeTo should reject');
}).then(() => {
assert_array_equals(rs.events, ['pull'], 'cancel should not have been called');
});
}, 'abort should do nothing after the writable is errored');
promise_test(async t => {
const rs = new ReadableStream({
pull(c) {
c.enqueue(new Uint8Array([]));
},
type: "bytes",
});
const ws = new WritableStream();
const [first, second] = rs.tee();
let aborted = false;
first.pipeTo(ws, { signal: AbortSignal.abort() }).catch(() => {
aborted = true;
});
await delay(0);
assert_true(!aborted, "pipeTo should not resolve yet");
await second.cancel();
await delay(0);
assert_true(aborted, "pipeTo should be aborted now");
}, "pipeTo on a teed readable byte stream should only be aborted when both branches are aborted");

View file

@ -0,0 +1,15 @@
<!doctype html>
<meta charset=utf-8>
<script>
self.GLOBAL = {
isWindow: function() { return true; },
isWorker: function() { return false; },
isShadowRealm: function() { return false; },
};
</script>
<script src="../../resources/testharness.js"></script>
<script src="../../resources/testharnessreport.js"></script>
<script src="../resources/recording-streams.js"></script>
<div id=log></div>
<script src="../../streams/piping/close-propagation-backward.any.js"></script>

View file

@ -0,0 +1,153 @@
// META: global=window,worker,shadowrealm
// META: script=../resources/recording-streams.js
'use strict';
const error1 = new Error('error1!');
error1.name = 'error1';
promise_test(() => {
const rs = recordingReadableStream();
const ws = recordingWritableStream();
const writer = ws.getWriter();
writer.close();
writer.releaseLock();
return rs.pipeTo(ws).then(
() => assert_unreached('the promise must not fulfill'),
err => {
assert_equals(err.name, 'TypeError', 'the promise must reject with a TypeError');
assert_array_equals(rs.eventsWithoutPulls, ['cancel', err]);
assert_array_equals(ws.events, ['close']);
return Promise.all([
rs.getReader().closed,
ws.getWriter().closed
]);
}
);
}, 'Closing must be propagated backward: starts closed; preventCancel omitted; fulfilled cancel promise');
promise_test(t => {
// Our recording streams do not deal well with errors generated by the system, so give them some help
let recordedError;
const rs = recordingReadableStream({
cancel(cancelErr) {
recordedError = cancelErr;
throw error1;
}
});
const ws = recordingWritableStream();
const writer = ws.getWriter();
writer.close();
writer.releaseLock();
return promise_rejects_exactly(t, error1, rs.pipeTo(ws), 'pipeTo must reject with the same error').then(() => {
assert_equals(recordedError.name, 'TypeError', 'the cancel reason must be a TypeError');
assert_array_equals(rs.eventsWithoutPulls, ['cancel', recordedError]);
assert_array_equals(ws.events, ['close']);
return Promise.all([
rs.getReader().closed,
ws.getWriter().closed
]);
});
}, 'Closing must be propagated backward: starts closed; preventCancel omitted; rejected cancel promise');
for (const falsy of [undefined, null, false, +0, -0, NaN, '']) {
const stringVersion = Object.is(falsy, -0) ? '-0' : String(falsy);
promise_test(() => {
const rs = recordingReadableStream();
const ws = recordingWritableStream();
const writer = ws.getWriter();
writer.close();
writer.releaseLock();
return rs.pipeTo(ws, { preventCancel: falsy }).then(
() => assert_unreached('the promise must not fulfill'),
err => {
assert_equals(err.name, 'TypeError', 'the promise must reject with a TypeError');
assert_array_equals(rs.eventsWithoutPulls, ['cancel', err]);
assert_array_equals(ws.events, ['close']);
return Promise.all([
rs.getReader().closed,
ws.getWriter().closed
]);
}
);
}, `Closing must be propagated backward: starts closed; preventCancel = ${stringVersion} (falsy); fulfilled cancel ` +
`promise`);
}
for (const truthy of [true, 'a', 1, Symbol(), { }]) {
promise_test(t => {
const rs = recordingReadableStream();
const ws = recordingWritableStream();
const writer = ws.getWriter();
writer.close();
writer.releaseLock();
return promise_rejects_js(t, TypeError, rs.pipeTo(ws, { preventCancel: truthy })).then(() => {
assert_array_equals(rs.eventsWithoutPulls, []);
assert_array_equals(ws.events, ['close']);
return ws.getWriter().closed;
});
}, `Closing must be propagated backward: starts closed; preventCancel = ${String(truthy)} (truthy)`);
}
promise_test(t => {
const rs = recordingReadableStream();
const ws = recordingWritableStream();
const writer = ws.getWriter();
writer.close();
writer.releaseLock();
return promise_rejects_js(t, TypeError, rs.pipeTo(ws, { preventCancel: true, preventAbort: true }))
.then(() => {
assert_array_equals(rs.eventsWithoutPulls, []);
assert_array_equals(ws.events, ['close']);
return ws.getWriter().closed;
});
}, 'Closing must be propagated backward: starts closed; preventCancel = true, preventAbort = true');
promise_test(t => {
const rs = recordingReadableStream();
const ws = recordingWritableStream();
const writer = ws.getWriter();
writer.close();
writer.releaseLock();
return promise_rejects_js(t, TypeError,
rs.pipeTo(ws, { preventCancel: true, preventAbort: true, preventClose: true }))
.then(() => {
assert_array_equals(rs.eventsWithoutPulls, []);
assert_array_equals(ws.events, ['close']);
return ws.getWriter().closed;
});
}, 'Closing must be propagated backward: starts closed; preventCancel = true, preventAbort = true, preventClose ' +
'= true');

View file

@ -0,0 +1,16 @@
<!doctype html>
<meta charset=utf-8>
<script>
self.GLOBAL = {
isWindow: function() { return true; },
isWorker: function() { return false; },
isShadowRealm: function() { return false; },
};
</script>
<script src="../../resources/testharness.js"></script>
<script src="../../resources/testharnessreport.js"></script>
<script src="../resources/test-utils.js"></script>
<script src="../resources/recording-streams.js"></script>
<div id=log></div>
<script src="../../streams/piping/close-propagation-forward.any.js"></script>

View file

@ -0,0 +1,589 @@
// META: global=window,worker,shadowrealm
// META: script=../resources/test-utils.js
// META: script=../resources/recording-streams.js
'use strict';
const error1 = new Error('error1!');
error1.name = 'error1';
promise_test(() => {
const rs = recordingReadableStream({
start(controller) {
controller.close();
}
});
const ws = recordingWritableStream();
return rs.pipeTo(ws).then(value => {
assert_equals(value, undefined, 'the promise must fulfill with undefined');
})
.then(() => {
assert_array_equals(rs.events, []);
assert_array_equals(ws.events, ['close']);
return Promise.all([
rs.getReader().closed,
ws.getWriter().closed
]);
});
}, 'Closing must be propagated forward: starts closed; preventClose omitted; fulfilled close promise');
promise_test(t => {
const rs = recordingReadableStream({
start(controller) {
controller.close();
}
});
const ws = recordingWritableStream({
close() {
throw error1;
}
});
return promise_rejects_exactly(t, error1, rs.pipeTo(ws), 'pipeTo must reject with the same error').then(() => {
assert_array_equals(rs.events, []);
assert_array_equals(ws.events, ['close']);
return Promise.all([
rs.getReader().closed,
promise_rejects_exactly(t, error1, ws.getWriter().closed)
]);
});
}, 'Closing must be propagated forward: starts closed; preventClose omitted; rejected close promise');
for (const falsy of [undefined, null, false, +0, -0, NaN, '']) {
const stringVersion = Object.is(falsy, -0) ? '-0' : String(falsy);
promise_test(() => {
const rs = recordingReadableStream({
start(controller) {
controller.close();
}
});
const ws = recordingWritableStream();
return rs.pipeTo(ws, { preventClose: falsy }).then(value => {
assert_equals(value, undefined, 'the promise must fulfill with undefined');
})
.then(() => {
assert_array_equals(rs.events, []);
assert_array_equals(ws.events, ['close']);
return Promise.all([
rs.getReader().closed,
ws.getWriter().closed
]);
});
}, `Closing must be propagated forward: starts closed; preventClose = ${stringVersion} (falsy); fulfilled close ` +
`promise`);
}
for (const truthy of [true, 'a', 1, Symbol(), { }]) {
promise_test(() => {
const rs = recordingReadableStream({
start(controller) {
controller.close();
}
});
const ws = recordingWritableStream();
return rs.pipeTo(ws, { preventClose: truthy }).then(value => {
assert_equals(value, undefined, 'the promise must fulfill with undefined');
})
.then(() => {
assert_array_equals(rs.events, []);
assert_array_equals(ws.events, []);
return rs.getReader().closed;
});
}, `Closing must be propagated forward: starts closed; preventClose = ${String(truthy)} (truthy)`);
}
promise_test(() => {
const rs = recordingReadableStream({
start(controller) {
controller.close();
}
});
const ws = recordingWritableStream();
return rs.pipeTo(ws, { preventClose: true, preventAbort: true }).then(value => {
assert_equals(value, undefined, 'the promise must fulfill with undefined');
})
.then(() => {
assert_array_equals(rs.events, []);
assert_array_equals(ws.events, []);
return rs.getReader().closed;
});
}, 'Closing must be propagated forward: starts closed; preventClose = true, preventAbort = true');
promise_test(() => {
const rs = recordingReadableStream({
start(controller) {
controller.close();
}
});
const ws = recordingWritableStream();
return rs.pipeTo(ws, { preventClose: true, preventAbort: true, preventCancel: true }).then(value => {
assert_equals(value, undefined, 'the promise must fulfill with undefined');
})
.then(() => {
assert_array_equals(rs.events, []);
assert_array_equals(ws.events, []);
return rs.getReader().closed;
});
}, 'Closing must be propagated forward: starts closed; preventClose = true, preventAbort = true, preventCancel = true');
promise_test(t => {
const rs = recordingReadableStream();
const ws = recordingWritableStream();
const pipePromise = rs.pipeTo(ws);
t.step_timeout(() => rs.controller.close());
return pipePromise.then(value => {
assert_equals(value, undefined, 'the promise must fulfill with undefined');
})
.then(() => {
assert_array_equals(rs.eventsWithoutPulls, []);
assert_array_equals(ws.events, ['close']);
return Promise.all([
rs.getReader().closed,
ws.getWriter().closed
]);
});
}, 'Closing must be propagated forward: becomes closed asynchronously; preventClose omitted; fulfilled close promise');
promise_test(t => {
const rs = recordingReadableStream();
const ws = recordingWritableStream({
close() {
throw error1;
}
});
const pipePromise = promise_rejects_exactly(t, error1, rs.pipeTo(ws), 'pipeTo must reject with the same error');
t.step_timeout(() => rs.controller.close());
return pipePromise.then(() => {
assert_array_equals(rs.eventsWithoutPulls, []);
assert_array_equals(ws.events, ['close']);
return Promise.all([
rs.getReader().closed,
promise_rejects_exactly(t, error1, ws.getWriter().closed)
]);
});
}, 'Closing must be propagated forward: becomes closed asynchronously; preventClose omitted; rejected close promise');
promise_test(t => {
const rs = recordingReadableStream();
const ws = recordingWritableStream();
const pipePromise = rs.pipeTo(ws, { preventClose: true });
t.step_timeout(() => rs.controller.close());
return pipePromise.then(value => {
assert_equals(value, undefined, 'the promise must fulfill with undefined');
})
.then(() => {
assert_array_equals(rs.eventsWithoutPulls, []);
assert_array_equals(ws.events, []);
return rs.getReader().closed;
});
}, 'Closing must be propagated forward: becomes closed asynchronously; preventClose = true');
promise_test(t => {
const rs = recordingReadableStream();
const ws = recordingWritableStream(undefined, new CountQueuingStrategy({ highWaterMark: 0 }));
const pipePromise = rs.pipeTo(ws);
t.step_timeout(() => rs.controller.close());
return pipePromise.then(value => {
assert_equals(value, undefined, 'the promise must fulfill with undefined');
})
.then(() => {
assert_array_equals(rs.eventsWithoutPulls, []);
assert_array_equals(ws.events, ['close']);
return Promise.all([
rs.getReader().closed,
ws.getWriter().closed
]);
});
}, 'Closing must be propagated forward: becomes closed asynchronously; dest never desires chunks; ' +
'preventClose omitted; fulfilled close promise');
promise_test(t => {
const rs = recordingReadableStream();
const ws = recordingWritableStream({
close() {
throw error1;
}
}, new CountQueuingStrategy({ highWaterMark: 0 }));
const pipePromise = promise_rejects_exactly(t, error1, rs.pipeTo(ws), 'pipeTo must reject with the same error');
t.step_timeout(() => rs.controller.close());
return pipePromise.then(() => {
assert_array_equals(rs.eventsWithoutPulls, []);
assert_array_equals(ws.events, ['close']);
return Promise.all([
rs.getReader().closed,
promise_rejects_exactly(t, error1, ws.getWriter().closed)
]);
});
}, 'Closing must be propagated forward: becomes closed asynchronously; dest never desires chunks; ' +
'preventClose omitted; rejected close promise');
promise_test(t => {
const rs = recordingReadableStream();
const ws = recordingWritableStream(undefined, new CountQueuingStrategy({ highWaterMark: 0 }));
const pipePromise = rs.pipeTo(ws, { preventClose: true });
t.step_timeout(() => rs.controller.close());
return pipePromise.then(value => {
assert_equals(value, undefined, 'the promise must fulfill with undefined');
})
.then(() => {
assert_array_equals(rs.eventsWithoutPulls, []);
assert_array_equals(ws.events, []);
return rs.getReader().closed;
});
}, 'Closing must be propagated forward: becomes closed asynchronously; dest never desires chunks; ' +
'preventClose = true');
promise_test(t => {
const rs = recordingReadableStream();
const ws = recordingWritableStream();
const pipePromise = rs.pipeTo(ws);
t.step_timeout(() => {
rs.controller.enqueue('Hello');
t.step_timeout(() => rs.controller.close());
}, 10);
return pipePromise.then(value => {
assert_equals(value, undefined, 'the promise must fulfill with undefined');
})
.then(() => {
assert_array_equals(rs.eventsWithoutPulls, []);
assert_array_equals(ws.events, ['write', 'Hello', 'close']);
return Promise.all([
rs.getReader().closed,
ws.getWriter().closed
]);
});
}, 'Closing must be propagated forward: becomes closed after one chunk; preventClose omitted; fulfilled close promise');
promise_test(t => {
const rs = recordingReadableStream();
const ws = recordingWritableStream({
close() {
throw error1;
}
});
const pipePromise = promise_rejects_exactly(t, error1, rs.pipeTo(ws), 'pipeTo must reject with the same error');
t.step_timeout(() => {
rs.controller.enqueue('Hello');
t.step_timeout(() => rs.controller.close());
}, 10);
return pipePromise.then(() => {
assert_array_equals(rs.eventsWithoutPulls, []);
assert_array_equals(ws.events, ['write', 'Hello', 'close']);
return Promise.all([
rs.getReader().closed,
promise_rejects_exactly(t, error1, ws.getWriter().closed)
]);
});
}, 'Closing must be propagated forward: becomes closed after one chunk; preventClose omitted; rejected close promise');
promise_test(t => {
const rs = recordingReadableStream();
const ws = recordingWritableStream();
const pipePromise = rs.pipeTo(ws, { preventClose: true });
t.step_timeout(() => {
rs.controller.enqueue('Hello');
t.step_timeout(() => rs.controller.close());
}, 10);
return pipePromise.then(value => {
assert_equals(value, undefined, 'the promise must fulfill with undefined');
})
.then(() => {
assert_array_equals(rs.eventsWithoutPulls, []);
assert_array_equals(ws.events, ['write', 'Hello']);
return rs.getReader().closed;
});
}, 'Closing must be propagated forward: becomes closed after one chunk; preventClose = true');
promise_test(() => {
const rs = recordingReadableStream();
let resolveWritePromise;
const ws = recordingWritableStream({
write() {
return new Promise(resolve => {
resolveWritePromise = resolve;
});
}
});
let pipeComplete = false;
const pipePromise = rs.pipeTo(ws).then(() => {
pipeComplete = true;
});
rs.controller.enqueue('a');
rs.controller.close();
// Flush async events and verify that no shutdown occurs.
return flushAsyncEvents().then(() => {
assert_array_equals(ws.events, ['write', 'a']); // no 'close'
assert_equals(pipeComplete, false, 'the pipe must not be complete');
resolveWritePromise();
return pipePromise.then(() => {
assert_array_equals(ws.events, ['write', 'a', 'close']);
});
});
}, 'Closing must be propagated forward: shutdown must not occur until the final write completes');
promise_test(() => {
const rs = recordingReadableStream();
let resolveWritePromise;
const ws = recordingWritableStream({
write() {
return new Promise(resolve => {
resolveWritePromise = resolve;
});
}
});
let pipeComplete = false;
const pipePromise = rs.pipeTo(ws, { preventClose: true }).then(() => {
pipeComplete = true;
});
rs.controller.enqueue('a');
rs.controller.close();
// Flush async events and verify that no shutdown occurs.
return flushAsyncEvents().then(() => {
assert_array_equals(ws.events, ['write', 'a'],
'the chunk must have been written, but close must not have happened');
assert_equals(pipeComplete, false, 'the pipe must not be complete');
resolveWritePromise();
return pipePromise;
}).then(() => flushAsyncEvents()).then(() => {
assert_array_equals(ws.events, ['write', 'a'],
'the chunk must have been written, but close must not have happened');
});
}, 'Closing must be propagated forward: shutdown must not occur until the final write completes; preventClose = true');
promise_test(() => {
const rs = recordingReadableStream();
let resolveWriteCalled;
const writeCalledPromise = new Promise(resolve => {
resolveWriteCalled = resolve;
});
let resolveWritePromise;
const ws = recordingWritableStream({
write() {
resolveWriteCalled();
return new Promise(resolve => {
resolveWritePromise = resolve;
});
}
}, new CountQueuingStrategy({ highWaterMark: 2 }));
let pipeComplete = false;
const pipePromise = rs.pipeTo(ws).then(() => {
pipeComplete = true;
});
rs.controller.enqueue('a');
rs.controller.enqueue('b');
return writeCalledPromise.then(() => flushAsyncEvents()).then(() => {
assert_array_equals(ws.events, ['write', 'a'],
'the first chunk must have been written, but close must not have happened yet');
assert_false(pipeComplete, 'the pipe should not complete while the first write is pending');
rs.controller.close();
resolveWritePromise();
}).then(() => flushAsyncEvents()).then(() => {
assert_array_equals(ws.events, ['write', 'a', 'write', 'b'],
'the second chunk must have been written, but close must not have happened yet');
assert_false(pipeComplete, 'the pipe should not complete while the second write is pending');
resolveWritePromise();
return pipePromise;
}).then(() => {
assert_array_equals(ws.events, ['write', 'a', 'write', 'b', 'close'],
'all chunks must have been written and close must have happened');
});
}, 'Closing must be propagated forward: shutdown must not occur until the final write completes; becomes closed after first write');
promise_test(() => {
const rs = recordingReadableStream();
let resolveWriteCalled;
const writeCalledPromise = new Promise(resolve => {
resolveWriteCalled = resolve;
});
let resolveWritePromise;
const ws = recordingWritableStream({
write() {
resolveWriteCalled();
return new Promise(resolve => {
resolveWritePromise = resolve;
});
}
}, new CountQueuingStrategy({ highWaterMark: 2 }));
let pipeComplete = false;
const pipePromise = rs.pipeTo(ws, { preventClose: true }).then(() => {
pipeComplete = true;
});
rs.controller.enqueue('a');
rs.controller.enqueue('b');
return writeCalledPromise.then(() => flushAsyncEvents()).then(() => {
assert_array_equals(ws.events, ['write', 'a'],
'the first chunk must have been written, but close must not have happened');
assert_false(pipeComplete, 'the pipe should not complete while the first write is pending');
rs.controller.close();
resolveWritePromise();
}).then(() => flushAsyncEvents()).then(() => {
assert_array_equals(ws.events, ['write', 'a', 'write', 'b'],
'the second chunk must have been written, but close must not have happened');
assert_false(pipeComplete, 'the pipe should not complete while the second write is pending');
resolveWritePromise();
return pipePromise;
}).then(() => flushAsyncEvents()).then(() => {
assert_array_equals(ws.events, ['write', 'a', 'write', 'b'],
'all chunks must have been written, but close must not have happened');
});
}, 'Closing must be propagated forward: shutdown must not occur until the final write completes; becomes closed after first write; preventClose = true');
promise_test(t => {
const rs = recordingReadableStream({
start(c) {
c.enqueue('a');
c.enqueue('b');
c.close();
}
});
let rejectWritePromise;
const ws = recordingWritableStream({
write() {
return new Promise((resolve, reject) => {
rejectWritePromise = reject;
});
}
}, { highWaterMark: 3 });
const pipeToPromise = rs.pipeTo(ws);
return delay(0).then(() => {
rejectWritePromise(error1);
return promise_rejects_exactly(t, error1, pipeToPromise, 'pipeTo should reject');
}).then(() => {
assert_array_equals(rs.events, []);
assert_array_equals(ws.events, ['write', 'a']);
return Promise.all([
rs.getReader().closed,
promise_rejects_exactly(t, error1, ws.getWriter().closed, 'ws should be errored')
]);
});
}, 'Closing must be propagated forward: erroring the writable while flushing pending writes should error pipeTo');

View file

@ -0,0 +1,16 @@
<!doctype html>
<meta charset=utf-8>
<script>
self.GLOBAL = {
isWindow: function() { return true; },
isWorker: function() { return false; },
isShadowRealm: function() { return false; },
};
</script>
<script src="../../resources/testharness.js"></script>
<script src="../../resources/testharnessreport.js"></script>
<script src="../resources/test-utils.js"></script>
<script src="../resources/recording-streams.js"></script>
<div id=log></div>
<script src="../../streams/piping/error-propagation-backward.any.js"></script>

View file

@ -0,0 +1,630 @@
// META: global=window,worker,shadowrealm
// META: script=../resources/test-utils.js
// META: script=../resources/recording-streams.js
'use strict';
const error1 = new Error('error1!');
error1.name = 'error1';
const error2 = new Error('error2!');
error2.name = 'error2';
promise_test(t => {
const rs = recordingReadableStream();
const ws = recordingWritableStream({
start() {
return Promise.reject(error1);
}
});
return promise_rejects_exactly(t, error1, rs.pipeTo(ws), 'pipeTo must reject with the same error')
.then(() => {
assert_array_equals(rs.eventsWithoutPulls, ['cancel', error1]);
assert_array_equals(ws.events, []);
});
}, 'Errors must be propagated backward: starts errored; preventCancel omitted; fulfilled cancel promise');
promise_test(t => {
const rs = recordingReadableStream();
const ws = recordingWritableStream({
write() {
return Promise.reject(error1);
}
});
const writer = ws.getWriter();
return promise_rejects_exactly(t, error1, writer.write('Hello'), 'writer.write() must reject with the write error')
.then(() => promise_rejects_exactly(t, error1, writer.closed, 'writer.closed must reject with the write error'))
.then(() => {
writer.releaseLock();
return promise_rejects_exactly(t, error1, rs.pipeTo(ws), 'pipeTo must reject with the write error')
.then(() => {
assert_array_equals(rs.eventsWithoutPulls, ['cancel', error1]);
assert_array_equals(ws.events, ['write', 'Hello']);
});
});
}, 'Errors must be propagated backward: becomes errored before piping due to write; preventCancel omitted; ' +
'fulfilled cancel promise');
promise_test(t => {
const rs = recordingReadableStream({
cancel() {
throw error2;
}
});
const ws = recordingWritableStream({
write() {
return Promise.reject(error1);
}
});
const writer = ws.getWriter();
return promise_rejects_exactly(t, error1, writer.write('Hello'), 'writer.write() must reject with the write error')
.then(() => promise_rejects_exactly(t, error1, writer.closed, 'writer.closed must reject with the write error'))
.then(() => {
writer.releaseLock();
return promise_rejects_exactly(t, error2, rs.pipeTo(ws), 'pipeTo must reject with the cancel error')
.then(() => {
assert_array_equals(rs.eventsWithoutPulls, ['cancel', error1]);
assert_array_equals(ws.events, ['write', 'Hello']);
});
});
}, 'Errors must be propagated backward: becomes errored before piping due to write; preventCancel omitted; rejected ' +
'cancel promise');
for (const falsy of [undefined, null, false, +0, -0, NaN, '']) {
const stringVersion = Object.is(falsy, -0) ? '-0' : String(falsy);
promise_test(t => {
const rs = recordingReadableStream();
const ws = recordingWritableStream({
write() {
return Promise.reject(error1);
}
});
const writer = ws.getWriter();
return promise_rejects_exactly(t, error1, writer.write('Hello'), 'writer.write() must reject with the write error')
.then(() => promise_rejects_exactly(t, error1, writer.closed, 'writer.closed must reject with the write error'))
.then(() => {
writer.releaseLock();
return promise_rejects_exactly(t, error1, rs.pipeTo(ws, { preventCancel: falsy }),
'pipeTo must reject with the write error')
.then(() => {
assert_array_equals(rs.eventsWithoutPulls, ['cancel', error1]);
assert_array_equals(ws.events, ['write', 'Hello']);
});
});
}, `Errors must be propagated backward: becomes errored before piping due to write; preventCancel = ` +
`${stringVersion} (falsy); fulfilled cancel promise`);
}
for (const truthy of [true, 'a', 1, Symbol(), { }]) {
promise_test(t => {
const rs = recordingReadableStream();
const ws = recordingWritableStream({
write() {
return Promise.reject(error1);
}
});
const writer = ws.getWriter();
return promise_rejects_exactly(t, error1, writer.write('Hello'), 'writer.write() must reject with the write error')
.then(() => promise_rejects_exactly(t, error1, writer.closed, 'writer.closed must reject with the write error'))
.then(() => {
writer.releaseLock();
return promise_rejects_exactly(t, error1, rs.pipeTo(ws, { preventCancel: truthy }),
'pipeTo must reject with the write error')
.then(() => {
assert_array_equals(rs.eventsWithoutPulls, []);
assert_array_equals(ws.events, ['write', 'Hello']);
});
});
}, `Errors must be propagated backward: becomes errored before piping due to write; preventCancel = ` +
`${String(truthy)} (truthy)`);
}
promise_test(t => {
const rs = recordingReadableStream();
const ws = recordingWritableStream({
write() {
return Promise.reject(error1);
}
});
const writer = ws.getWriter();
return promise_rejects_exactly(t, error1, writer.write('Hello'), 'writer.write() must reject with the write error')
.then(() => promise_rejects_exactly(t, error1, writer.closed, 'writer.closed must reject with the write error'))
.then(() => {
writer.releaseLock();
return promise_rejects_exactly(t, error1, rs.pipeTo(ws, { preventCancel: true, preventAbort: true }),
'pipeTo must reject with the write error')
.then(() => {
assert_array_equals(rs.eventsWithoutPulls, []);
assert_array_equals(ws.events, ['write', 'Hello']);
});
});
}, 'Errors must be propagated backward: becomes errored before piping due to write, preventCancel = true; ' +
'preventAbort = true');
promise_test(t => {
const rs = recordingReadableStream();
const ws = recordingWritableStream({
write() {
return Promise.reject(error1);
}
});
const writer = ws.getWriter();
return promise_rejects_exactly(t, error1, writer.write('Hello'), 'writer.write() must reject with the write error')
.then(() => promise_rejects_exactly(t, error1, writer.closed, 'writer.closed must reject with the write error'))
.then(() => {
writer.releaseLock();
return promise_rejects_exactly(t, error1, rs.pipeTo(ws, { preventCancel: true, preventAbort: true, preventClose: true }),
'pipeTo must reject with the write error')
.then(() => {
assert_array_equals(rs.eventsWithoutPulls, []);
assert_array_equals(ws.events, ['write', 'Hello']);
});
});
}, 'Errors must be propagated backward: becomes errored before piping due to write; preventCancel = true, ' +
'preventAbort = true, preventClose = true');
promise_test(t => {
const rs = recordingReadableStream({
start(controller) {
controller.enqueue('Hello');
}
});
const ws = recordingWritableStream({
write() {
throw error1;
}
});
return promise_rejects_exactly(t, error1, rs.pipeTo(ws), 'pipeTo must reject with the same error').then(() => {
assert_array_equals(rs.eventsWithoutPulls, ['cancel', error1]);
assert_array_equals(ws.events, ['write', 'Hello']);
});
}, 'Errors must be propagated backward: becomes errored during piping due to write; preventCancel omitted; fulfilled ' +
'cancel promise');
promise_test(t => {
const rs = recordingReadableStream({
start(controller) {
controller.enqueue('Hello');
},
cancel() {
throw error2;
}
});
const ws = recordingWritableStream({
write() {
throw error1;
}
});
return promise_rejects_exactly(t, error2, rs.pipeTo(ws), 'pipeTo must reject with the cancel error').then(() => {
assert_array_equals(rs.eventsWithoutPulls, ['cancel', error1]);
assert_array_equals(ws.events, ['write', 'Hello']);
});
}, 'Errors must be propagated backward: becomes errored during piping due to write; preventCancel omitted; rejected ' +
'cancel promise');
promise_test(t => {
const rs = recordingReadableStream({
start(controller) {
controller.enqueue('Hello');
}
});
const ws = recordingWritableStream({
write() {
throw error1;
}
});
return promise_rejects_exactly(t, error1, rs.pipeTo(ws, { preventCancel: true }), 'pipeTo must reject with the same error')
.then(() => {
assert_array_equals(rs.eventsWithoutPulls, []);
assert_array_equals(ws.events, ['write', 'Hello']);
});
}, 'Errors must be propagated backward: becomes errored during piping due to write; preventCancel = true');
promise_test(t => {
const rs = recordingReadableStream({
start(controller) {
controller.enqueue('a');
controller.enqueue('b');
controller.enqueue('c');
}
});
const ws = recordingWritableStream({
write() {
if (ws.events.length > 2) {
return delay(0).then(() => {
throw error1;
});
}
return undefined;
}
});
return promise_rejects_exactly(t, error1, rs.pipeTo(ws), 'pipeTo must reject with the same error').then(() => {
assert_array_equals(rs.eventsWithoutPulls, ['cancel', error1]);
assert_array_equals(ws.events, ['write', 'a', 'write', 'b']);
});
}, 'Errors must be propagated backward: becomes errored during piping due to write, but async; preventCancel = ' +
'false; fulfilled cancel promise');
promise_test(t => {
const rs = recordingReadableStream({
start(controller) {
controller.enqueue('a');
controller.enqueue('b');
controller.enqueue('c');
},
cancel() {
throw error2;
}
});
const ws = recordingWritableStream({
write() {
if (ws.events.length > 2) {
return delay(0).then(() => {
throw error1;
});
}
return undefined;
}
});
return promise_rejects_exactly(t, error2, rs.pipeTo(ws), 'pipeTo must reject with the cancel error').then(() => {
assert_array_equals(rs.eventsWithoutPulls, ['cancel', error1]);
assert_array_equals(ws.events, ['write', 'a', 'write', 'b']);
});
}, 'Errors must be propagated backward: becomes errored during piping due to write, but async; preventCancel = ' +
'false; rejected cancel promise');
promise_test(t => {
const rs = recordingReadableStream({
start(controller) {
controller.enqueue('a');
controller.enqueue('b');
controller.enqueue('c');
}
});
const ws = recordingWritableStream({
write() {
if (ws.events.length > 2) {
return delay(0).then(() => {
throw error1;
});
}
return undefined;
}
});
return promise_rejects_exactly(t, error1, rs.pipeTo(ws, { preventCancel: true }), 'pipeTo must reject with the same error')
.then(() => {
assert_array_equals(rs.eventsWithoutPulls, []);
assert_array_equals(ws.events, ['write', 'a', 'write', 'b']);
});
}, 'Errors must be propagated backward: becomes errored during piping due to write, but async; preventCancel = true');
promise_test(t => {
const rs = recordingReadableStream();
const ws = recordingWritableStream();
const pipePromise = promise_rejects_exactly(t, error1, rs.pipeTo(ws), 'pipeTo must reject with the same error');
t.step_timeout(() => ws.controller.error(error1), 10);
return pipePromise.then(() => {
assert_array_equals(rs.eventsWithoutPulls, ['cancel', error1]);
assert_array_equals(ws.events, []);
});
}, 'Errors must be propagated backward: becomes errored after piping; preventCancel omitted; fulfilled cancel promise');
promise_test(t => {
const rs = recordingReadableStream({
cancel() {
throw error2;
}
});
const ws = recordingWritableStream();
const pipePromise = promise_rejects_exactly(t, error2, rs.pipeTo(ws), 'pipeTo must reject with the cancel error');
t.step_timeout(() => ws.controller.error(error1), 10);
return pipePromise.then(() => {
assert_array_equals(rs.eventsWithoutPulls, ['cancel', error1]);
assert_array_equals(ws.events, []);
});
}, 'Errors must be propagated backward: becomes errored after piping; preventCancel omitted; rejected cancel promise');
promise_test(t => {
const rs = recordingReadableStream();
const ws = recordingWritableStream();
const pipePromise = promise_rejects_exactly(t, error1, rs.pipeTo(ws, { preventCancel: true }),
'pipeTo must reject with the same error');
t.step_timeout(() => ws.controller.error(error1), 10);
return pipePromise.then(() => {
assert_array_equals(rs.eventsWithoutPulls, []);
assert_array_equals(ws.events, []);
});
}, 'Errors must be propagated backward: becomes errored after piping; preventCancel = true');
promise_test(t => {
const rs = recordingReadableStream({
start(controller) {
controller.enqueue('a');
controller.enqueue('b');
controller.enqueue('c');
controller.close();
}
});
const ws = recordingWritableStream({
write(chunk) {
if (chunk === 'c') {
return Promise.reject(error1);
}
return undefined;
}
});
return promise_rejects_exactly(t, error1, rs.pipeTo(ws), 'pipeTo must reject with the same error').then(() => {
assert_array_equals(rs.eventsWithoutPulls, []);
assert_array_equals(ws.events, ['write', 'a', 'write', 'b', 'write', 'c']);
});
}, 'Errors must be propagated backward: becomes errored after piping due to last write; source is closed; ' +
'preventCancel omitted (but cancel is never called)');
promise_test(t => {
const rs = recordingReadableStream({
start(controller) {
controller.enqueue('a');
controller.enqueue('b');
controller.enqueue('c');
controller.close();
}
});
const ws = recordingWritableStream({
write(chunk) {
if (chunk === 'c') {
return Promise.reject(error1);
}
return undefined;
}
});
return promise_rejects_exactly(t, error1, rs.pipeTo(ws, { preventCancel: true }), 'pipeTo must reject with the same error')
.then(() => {
assert_array_equals(rs.eventsWithoutPulls, []);
assert_array_equals(ws.events, ['write', 'a', 'write', 'b', 'write', 'c']);
});
}, 'Errors must be propagated backward: becomes errored after piping due to last write; source is closed; ' +
'preventCancel = true');
promise_test(t => {
const rs = recordingReadableStream();
const ws = recordingWritableStream(undefined, new CountQueuingStrategy({ highWaterMark: 0 }));
const pipePromise = promise_rejects_exactly(t, error1, rs.pipeTo(ws), 'pipeTo must reject with the same error');
t.step_timeout(() => ws.controller.error(error1), 10);
return pipePromise.then(() => {
assert_array_equals(rs.eventsWithoutPulls, ['cancel', error1]);
assert_array_equals(ws.events, []);
});
}, 'Errors must be propagated backward: becomes errored after piping; dest never desires chunks; preventCancel = ' +
'false; fulfilled cancel promise');
promise_test(t => {
const rs = recordingReadableStream({
cancel() {
throw error2;
}
});
const ws = recordingWritableStream(undefined, new CountQueuingStrategy({ highWaterMark: 0 }));
const pipePromise = promise_rejects_exactly(t, error2, rs.pipeTo(ws), 'pipeTo must reject with the cancel error');
t.step_timeout(() => ws.controller.error(error1), 10);
return pipePromise.then(() => {
assert_array_equals(rs.eventsWithoutPulls, ['cancel', error1]);
assert_array_equals(ws.events, []);
});
}, 'Errors must be propagated backward: becomes errored after piping; dest never desires chunks; preventCancel = ' +
'false; rejected cancel promise');
promise_test(t => {
const rs = recordingReadableStream();
const ws = recordingWritableStream(undefined, new CountQueuingStrategy({ highWaterMark: 0 }));
const pipePromise = promise_rejects_exactly(t, error1, rs.pipeTo(ws, { preventCancel: true }),
'pipeTo must reject with the same error');
t.step_timeout(() => ws.controller.error(error1), 10);
return pipePromise.then(() => {
assert_array_equals(rs.eventsWithoutPulls, []);
assert_array_equals(ws.events, []);
});
}, 'Errors must be propagated backward: becomes errored after piping; dest never desires chunks; preventCancel = ' +
'true');
promise_test(() => {
const rs = recordingReadableStream();
const ws = recordingWritableStream();
ws.abort(error1);
return rs.pipeTo(ws).then(
() => assert_unreached('the promise must not fulfill'),
err => {
assert_equals(err, error1, 'the promise must reject with error1');
assert_array_equals(rs.eventsWithoutPulls, ['cancel', err]);
assert_array_equals(ws.events, ['abort', error1]);
}
);
}, 'Errors must be propagated backward: becomes errored before piping via abort; preventCancel omitted; fulfilled ' +
'cancel promise');
promise_test(t => {
const rs = recordingReadableStream({
cancel() {
throw error2;
}
});
const ws = recordingWritableStream();
ws.abort(error1);
return promise_rejects_exactly(t, error2, rs.pipeTo(ws), 'pipeTo must reject with the cancel error')
.then(() => {
return ws.getWriter().closed.then(
() => assert_unreached('the promise must not fulfill'),
err => {
assert_equals(err, error1, 'the promise must reject with error1');
assert_array_equals(rs.eventsWithoutPulls, ['cancel', err]);
assert_array_equals(ws.events, ['abort', error1]);
}
);
});
}, 'Errors must be propagated backward: becomes errored before piping via abort; preventCancel omitted; rejected ' +
'cancel promise');
promise_test(t => {
const rs = recordingReadableStream();
const ws = recordingWritableStream();
ws.abort(error1);
return promise_rejects_exactly(t, error1, rs.pipeTo(ws, { preventCancel: true })).then(() => {
assert_array_equals(rs.eventsWithoutPulls, []);
assert_array_equals(ws.events, ['abort', error1]);
});
}, 'Errors must be propagated backward: becomes errored before piping via abort; preventCancel = true');
promise_test(t => {
const rs = recordingReadableStream();
let resolveWriteCalled;
const writeCalledPromise = new Promise(resolve => {
resolveWriteCalled = resolve;
});
const ws = recordingWritableStream({
write() {
resolveWriteCalled();
return flushAsyncEvents();
}
});
const pipePromise = rs.pipeTo(ws);
rs.controller.enqueue('a');
return writeCalledPromise.then(() => {
ws.controller.error(error1);
return promise_rejects_exactly(t, error1, pipePromise);
}).then(() => {
assert_array_equals(rs.eventsWithoutPulls, ['cancel', error1]);
assert_array_equals(ws.events, ['write', 'a']);
});
}, 'Errors must be propagated backward: erroring via the controller errors once pending write completes');

View file

@ -0,0 +1,16 @@
<!doctype html>
<meta charset=utf-8>
<script>
self.GLOBAL = {
isWindow: function() { return true; },
isWorker: function() { return false; },
isShadowRealm: function() { return false; },
};
</script>
<script src="../../resources/testharness.js"></script>
<script src="../../resources/testharnessreport.js"></script>
<script src="../resources/test-utils.js"></script>
<script src="../resources/recording-streams.js"></script>
<div id=log></div>
<script src="../../streams/piping/error-propagation-forward.any.js"></script>

View file

@ -0,0 +1,569 @@
// META: global=window,worker,shadowrealm
// META: script=../resources/test-utils.js
// META: script=../resources/recording-streams.js
'use strict';
const error1 = new Error('error1!');
error1.name = 'error1';
const error2 = new Error('error2!');
error2.name = 'error2';
promise_test(t => {
const rs = recordingReadableStream({
start() {
return Promise.reject(error1);
}
});
const ws = recordingWritableStream();
return promise_rejects_exactly(t, error1, rs.pipeTo(ws), 'pipeTo must reject with the same error')
.then(() => {
assert_array_equals(rs.events, []);
assert_array_equals(ws.events, ['abort', error1]);
});
}, 'Errors must be propagated forward: starts errored; preventAbort = false; fulfilled abort promise');
promise_test(t => {
const rs = recordingReadableStream({
start() {
return Promise.reject(error1);
}
});
const ws = recordingWritableStream({
abort() {
throw error2;
}
});
return promise_rejects_exactly(t, error2, rs.pipeTo(ws), 'pipeTo must reject with the abort error')
.then(() => {
assert_array_equals(rs.events, []);
assert_array_equals(ws.events, ['abort', error1]);
});
}, 'Errors must be propagated forward: starts errored; preventAbort = false; rejected abort promise');
for (const falsy of [undefined, null, false, +0, -0, NaN, '']) {
const stringVersion = Object.is(falsy, -0) ? '-0' : String(falsy);
promise_test(t => {
const rs = recordingReadableStream({
start() {
return Promise.reject(error1);
}
});
const ws = recordingWritableStream();
return promise_rejects_exactly(t, error1, rs.pipeTo(ws, { preventAbort: falsy }), 'pipeTo must reject with the same error')
.then(() => {
assert_array_equals(rs.events, []);
assert_array_equals(ws.events, ['abort', error1]);
});
}, `Errors must be propagated forward: starts errored; preventAbort = ${stringVersion} (falsy); fulfilled abort ` +
`promise`);
}
for (const truthy of [true, 'a', 1, Symbol(), { }]) {
promise_test(t => {
const rs = recordingReadableStream({
start() {
return Promise.reject(error1);
}
});
const ws = recordingWritableStream();
return promise_rejects_exactly(t, error1, rs.pipeTo(ws, { preventAbort: truthy }),
'pipeTo must reject with the same error')
.then(() => {
assert_array_equals(rs.events, []);
assert_array_equals(ws.events, []);
});
}, `Errors must be propagated forward: starts errored; preventAbort = ${String(truthy)} (truthy)`);
}
promise_test(t => {
const rs = recordingReadableStream({
start() {
return Promise.reject(error1);
}
});
const ws = recordingWritableStream();
return promise_rejects_exactly(t, error1, rs.pipeTo(ws, { preventAbort: true, preventCancel: true }),
'pipeTo must reject with the same error')
.then(() => {
assert_array_equals(rs.events, []);
assert_array_equals(ws.events, []);
});
}, 'Errors must be propagated forward: starts errored; preventAbort = true, preventCancel = true');
promise_test(t => {
const rs = recordingReadableStream({
start() {
return Promise.reject(error1);
}
});
const ws = recordingWritableStream();
return promise_rejects_exactly(t, error1, rs.pipeTo(ws, { preventAbort: true, preventCancel: true, preventClose: true }),
'pipeTo must reject with the same error')
.then(() => {
assert_array_equals(rs.events, []);
assert_array_equals(ws.events, []);
});
}, 'Errors must be propagated forward: starts errored; preventAbort = true, preventCancel = true, preventClose = true');
promise_test(t => {
const rs = recordingReadableStream();
const ws = recordingWritableStream();
const pipePromise = promise_rejects_exactly(t, error1, rs.pipeTo(ws), 'pipeTo must reject with the same error');
t.step_timeout(() => rs.controller.error(error1), 10);
return pipePromise.then(() => {
assert_array_equals(rs.eventsWithoutPulls, []);
assert_array_equals(ws.events, ['abort', error1]);
});
}, 'Errors must be propagated forward: becomes errored while empty; preventAbort = false; fulfilled abort promise');
promise_test(t => {
const rs = recordingReadableStream();
const ws = recordingWritableStream({
abort() {
throw error2;
}
});
const pipePromise = promise_rejects_exactly(t, error2, rs.pipeTo(ws), 'pipeTo must reject with the abort error');
t.step_timeout(() => rs.controller.error(error1), 10);
return pipePromise.then(() => {
assert_array_equals(rs.eventsWithoutPulls, []);
assert_array_equals(ws.events, ['abort', error1]);
});
}, 'Errors must be propagated forward: becomes errored while empty; preventAbort = false; rejected abort promise');
promise_test(t => {
const rs = recordingReadableStream();
const ws = recordingWritableStream();
const pipePromise = promise_rejects_exactly(t, error1, rs.pipeTo(ws, { preventAbort: true }),
'pipeTo must reject with the same error');
t.step_timeout(() => rs.controller.error(error1), 10);
return pipePromise.then(() => {
assert_array_equals(rs.eventsWithoutPulls, []);
assert_array_equals(ws.events, []);
});
}, 'Errors must be propagated forward: becomes errored while empty; preventAbort = true');
promise_test(t => {
const rs = recordingReadableStream();
const ws = recordingWritableStream(undefined, new CountQueuingStrategy({ highWaterMark: 0 }));
const pipePromise = promise_rejects_exactly(t, error1, rs.pipeTo(ws), 'pipeTo must reject with the same error');
t.step_timeout(() => rs.controller.error(error1), 10);
return pipePromise.then(() => {
assert_array_equals(rs.eventsWithoutPulls, []);
assert_array_equals(ws.events, ['abort', error1]);
});
}, 'Errors must be propagated forward: becomes errored while empty; dest never desires chunks; ' +
'preventAbort = false; fulfilled abort promise');
promise_test(t => {
const rs = recordingReadableStream();
const ws = recordingWritableStream({
abort() {
throw error2;
}
}, new CountQueuingStrategy({ highWaterMark: 0 }));
const pipePromise = promise_rejects_exactly(t, error2, rs.pipeTo(ws), 'pipeTo must reject with the abort error');
t.step_timeout(() => rs.controller.error(error1), 10);
return pipePromise.then(() => {
assert_array_equals(rs.eventsWithoutPulls, []);
assert_array_equals(ws.events, ['abort', error1]);
});
}, 'Errors must be propagated forward: becomes errored while empty; dest never desires chunks; ' +
'preventAbort = false; rejected abort promise');
promise_test(t => {
const rs = recordingReadableStream();
const ws = recordingWritableStream(undefined, new CountQueuingStrategy({ highWaterMark: 0 }));
const pipePromise = promise_rejects_exactly(t, error1, rs.pipeTo(ws, { preventAbort: true }),
'pipeTo must reject with the same error');
t.step_timeout(() => rs.controller.error(error1), 10);
return pipePromise.then(() => {
assert_array_equals(rs.eventsWithoutPulls, []);
assert_array_equals(ws.events, []);
});
}, 'Errors must be propagated forward: becomes errored while empty; dest never desires chunks; ' +
'preventAbort = true');
promise_test(t => {
const rs = recordingReadableStream();
const ws = recordingWritableStream();
const pipePromise = promise_rejects_exactly(t, error1, rs.pipeTo(ws), 'pipeTo must reject with the same error');
t.step_timeout(() => {
rs.controller.enqueue('Hello');
t.step_timeout(() => rs.controller.error(error1), 10);
}, 10);
return pipePromise.then(() => {
assert_array_equals(rs.eventsWithoutPulls, []);
assert_array_equals(ws.events, ['write', 'Hello', 'abort', error1]);
});
}, 'Errors must be propagated forward: becomes errored after one chunk; preventAbort = false; fulfilled abort promise');
promise_test(t => {
const rs = recordingReadableStream();
const ws = recordingWritableStream({
abort() {
throw error2;
}
});
const pipePromise = promise_rejects_exactly(t, error2, rs.pipeTo(ws), 'pipeTo must reject with the abort error');
t.step_timeout(() => {
rs.controller.enqueue('Hello');
t.step_timeout(() => rs.controller.error(error1), 10);
}, 10);
return pipePromise.then(() => {
assert_array_equals(rs.eventsWithoutPulls, []);
assert_array_equals(ws.events, ['write', 'Hello', 'abort', error1]);
});
}, 'Errors must be propagated forward: becomes errored after one chunk; preventAbort = false; rejected abort promise');
promise_test(t => {
const rs = recordingReadableStream();
const ws = recordingWritableStream();
const pipePromise = promise_rejects_exactly(t, error1, rs.pipeTo(ws, { preventAbort: true }),
'pipeTo must reject with the same error');
t.step_timeout(() => {
rs.controller.enqueue('Hello');
t.step_timeout(() => rs.controller.error(error1), 10);
}, 10);
return pipePromise.then(() => {
assert_array_equals(rs.eventsWithoutPulls, []);
assert_array_equals(ws.events, ['write', 'Hello']);
});
}, 'Errors must be propagated forward: becomes errored after one chunk; preventAbort = true');
promise_test(t => {
const rs = recordingReadableStream();
const ws = recordingWritableStream(undefined, new CountQueuingStrategy({ highWaterMark: 0 }));
const pipePromise = promise_rejects_exactly(t, error1, rs.pipeTo(ws), 'pipeTo must reject with the same error');
t.step_timeout(() => {
rs.controller.enqueue('Hello');
t.step_timeout(() => rs.controller.error(error1), 10);
}, 10);
return pipePromise.then(() => {
assert_array_equals(rs.eventsWithoutPulls, []);
assert_array_equals(ws.events, ['abort', error1]);
});
}, 'Errors must be propagated forward: becomes errored after one chunk; dest never desires chunks; ' +
'preventAbort = false; fulfilled abort promise');
promise_test(t => {
const rs = recordingReadableStream();
const ws = recordingWritableStream({
abort() {
throw error2;
}
}, new CountQueuingStrategy({ highWaterMark: 0 }));
const pipePromise = promise_rejects_exactly(t, error2, rs.pipeTo(ws), 'pipeTo must reject with the abort error');
t.step_timeout(() => {
rs.controller.enqueue('Hello');
t.step_timeout(() => rs.controller.error(error1), 10);
}, 10);
return pipePromise.then(() => {
assert_array_equals(rs.eventsWithoutPulls, []);
assert_array_equals(ws.events, ['abort', error1]);
});
}, 'Errors must be propagated forward: becomes errored after one chunk; dest never desires chunks; ' +
'preventAbort = false; rejected abort promise');
promise_test(t => {
const rs = recordingReadableStream();
const ws = recordingWritableStream(undefined, new CountQueuingStrategy({ highWaterMark: 0 }));
const pipePromise = promise_rejects_exactly(t, error1, rs.pipeTo(ws, { preventAbort: true }),
'pipeTo must reject with the same error');
t.step_timeout(() => {
rs.controller.enqueue('Hello');
t.step_timeout(() => rs.controller.error(error1), 10);
}, 10);
return pipePromise.then(() => {
assert_array_equals(rs.eventsWithoutPulls, []);
assert_array_equals(ws.events, []);
});
}, 'Errors must be propagated forward: becomes errored after one chunk; dest never desires chunks; ' +
'preventAbort = true');
promise_test(t => {
const rs = recordingReadableStream();
let resolveWriteCalled;
const writeCalledPromise = new Promise(resolve => {
resolveWriteCalled = resolve;
});
let resolveWritePromise;
const ws = recordingWritableStream({
write() {
resolveWriteCalled();
return new Promise(resolve => {
resolveWritePromise = resolve;
});
}
});
let pipeComplete = false;
const pipePromise = promise_rejects_exactly(t, error1, rs.pipeTo(ws)).then(() => {
pipeComplete = true;
});
rs.controller.enqueue('a');
return writeCalledPromise.then(() => {
rs.controller.error(error1);
// Flush async events and verify that no shutdown occurs.
return flushAsyncEvents();
}).then(() => {
assert_array_equals(ws.events, ['write', 'a']); // no 'abort'
assert_equals(pipeComplete, false, 'the pipe must not be complete');
resolveWritePromise();
return pipePromise.then(() => {
assert_array_equals(ws.events, ['write', 'a', 'abort', error1]);
});
});
}, 'Errors must be propagated forward: shutdown must not occur until the final write completes');
promise_test(t => {
const rs = recordingReadableStream();
let resolveWriteCalled;
const writeCalledPromise = new Promise(resolve => {
resolveWriteCalled = resolve;
});
let resolveWritePromise;
const ws = recordingWritableStream({
write() {
resolveWriteCalled();
return new Promise(resolve => {
resolveWritePromise = resolve;
});
}
});
let pipeComplete = false;
const pipePromise = promise_rejects_exactly(t, error1, rs.pipeTo(ws, { preventAbort: true })).then(() => {
pipeComplete = true;
});
rs.controller.enqueue('a');
return writeCalledPromise.then(() => {
rs.controller.error(error1);
// Flush async events and verify that no shutdown occurs.
return flushAsyncEvents();
}).then(() => {
assert_array_equals(ws.events, ['write', 'a']); // no 'abort'
assert_equals(pipeComplete, false, 'the pipe must not be complete');
resolveWritePromise();
return pipePromise;
}).then(() => flushAsyncEvents()).then(() => {
assert_array_equals(ws.events, ['write', 'a']); // no 'abort'
});
}, 'Errors must be propagated forward: shutdown must not occur until the final write completes; preventAbort = true');
promise_test(t => {
const rs = recordingReadableStream();
let resolveWriteCalled;
const writeCalledPromise = new Promise(resolve => {
resolveWriteCalled = resolve;
});
let resolveWritePromise;
const ws = recordingWritableStream({
write() {
resolveWriteCalled();
return new Promise(resolve => {
resolveWritePromise = resolve;
});
}
}, new CountQueuingStrategy({ highWaterMark: 2 }));
let pipeComplete = false;
const pipePromise = promise_rejects_exactly(t, error1, rs.pipeTo(ws)).then(() => {
pipeComplete = true;
});
rs.controller.enqueue('a');
rs.controller.enqueue('b');
return writeCalledPromise.then(() => flushAsyncEvents()).then(() => {
assert_array_equals(ws.events, ['write', 'a'],
'the first chunk must have been written, but abort must not have happened yet');
assert_false(pipeComplete, 'the pipe should not complete while the first write is pending');
rs.controller.error(error1);
resolveWritePromise();
return flushAsyncEvents();
}).then(() => {
assert_array_equals(ws.events, ['write', 'a', 'write', 'b'],
'the second chunk must have been written, but abort must not have happened yet');
assert_false(pipeComplete, 'the pipe should not complete while the second write is pending');
resolveWritePromise();
return pipePromise;
}).then(() => {
assert_array_equals(ws.events, ['write', 'a', 'write', 'b', 'abort', error1],
'all chunks must have been written and abort must have happened');
});
}, 'Errors must be propagated forward: shutdown must not occur until the final write completes; becomes errored after first write');
promise_test(t => {
const rs = recordingReadableStream();
let resolveWriteCalled;
const writeCalledPromise = new Promise(resolve => {
resolveWriteCalled = resolve;
});
let resolveWritePromise;
const ws = recordingWritableStream({
write() {
resolveWriteCalled();
return new Promise(resolve => {
resolveWritePromise = resolve;
});
}
}, new CountQueuingStrategy({ highWaterMark: 2 }));
let pipeComplete = false;
const pipePromise = promise_rejects_exactly(t, error1, rs.pipeTo(ws, { preventAbort: true })).then(() => {
pipeComplete = true;
});
rs.controller.enqueue('a');
rs.controller.enqueue('b');
return writeCalledPromise.then(() => flushAsyncEvents()).then(() => {
assert_array_equals(ws.events, ['write', 'a'],
'the first chunk must have been written, but abort must not have happened');
assert_false(pipeComplete, 'the pipe should not complete while the first write is pending');
rs.controller.error(error1);
resolveWritePromise();
}).then(() => flushAsyncEvents()).then(() => {
assert_array_equals(ws.events, ['write', 'a', 'write', 'b'],
'the second chunk must have been written, but abort must not have happened');
assert_false(pipeComplete, 'the pipe should not complete while the second write is pending');
resolveWritePromise();
return pipePromise;
}).then(() => flushAsyncEvents()).then(() => {
assert_array_equals(ws.events, ['write', 'a', 'write', 'b'],
'all chunks must have been written, but abort must not have happened');
});
}, 'Errors must be propagated forward: shutdown must not occur until the final write completes; becomes errored after first write; preventAbort = true');

View file

@ -0,0 +1,17 @@
<!doctype html>
<meta charset=utf-8>
<script>
self.GLOBAL = {
isWindow: function() { return true; },
isWorker: function() { return false; },
isShadowRealm: function() { return false; },
};
</script>
<script src="../../resources/testharness.js"></script>
<script src="../../resources/testharnessreport.js"></script>
<script src="../resources/test-utils.js"></script>
<script src="../resources/rs-utils.js"></script>
<script src="../resources/recording-streams.js"></script>
<div id=log></div>
<script src="../../streams/piping/flow-control.any.js"></script>

View file

@ -0,0 +1,297 @@
// META: global=window,worker,shadowrealm
// META: script=../resources/test-utils.js
// META: script=../resources/rs-utils.js
// META: script=../resources/recording-streams.js
'use strict';
const error1 = new Error('error1!');
error1.name = 'error1';
promise_test(t => {
const rs = recordingReadableStream({
start(controller) {
controller.enqueue('a');
controller.enqueue('b');
controller.close();
}
});
const ws = recordingWritableStream(undefined, new CountQueuingStrategy({ highWaterMark: 0 }));
const pipePromise = rs.pipeTo(ws, { preventCancel: true });
// Wait and make sure it doesn't do any reading.
return flushAsyncEvents().then(() => {
ws.controller.error(error1);
})
.then(() => promise_rejects_exactly(t, error1, pipePromise, 'pipeTo must reject with the same error'))
.then(() => {
assert_array_equals(rs.eventsWithoutPulls, []);
assert_array_equals(ws.events, []);
})
.then(() => readableStreamToArray(rs))
.then(chunksNotPreviouslyRead => {
assert_array_equals(chunksNotPreviouslyRead, ['a', 'b']);
});
}, 'Piping from a non-empty ReadableStream into a WritableStream that does not desire chunks');
promise_test(() => {
const rs = recordingReadableStream({
start(controller) {
controller.enqueue('b');
controller.close();
}
});
let resolveWritePromise;
const ws = recordingWritableStream({
write() {
if (!resolveWritePromise) {
// first write
return new Promise(resolve => {
resolveWritePromise = resolve;
});
}
return undefined;
}
});
const writer = ws.getWriter();
const firstWritePromise = writer.write('a');
assert_equals(writer.desiredSize, 0, 'after writing the writer\'s desiredSize must be 0');
writer.releaseLock();
// firstWritePromise won't settle until we call resolveWritePromise.
const pipePromise = rs.pipeTo(ws);
return flushAsyncEvents().then(() => resolveWritePromise())
.then(() => Promise.all([firstWritePromise, pipePromise]))
.then(() => {
assert_array_equals(rs.eventsWithoutPulls, []);
assert_array_equals(ws.events, ['write', 'a', 'write', 'b', 'close']);
});
}, 'Piping from a non-empty ReadableStream into a WritableStream that does not desire chunks, but then does');
promise_test(() => {
const rs = recordingReadableStream();
let resolveWritePromise;
const ws = recordingWritableStream({
write() {
if (!resolveWritePromise) {
// first write
return new Promise(resolve => {
resolveWritePromise = resolve;
});
}
return undefined;
}
});
const writer = ws.getWriter();
writer.write('a');
return flushAsyncEvents().then(() => {
assert_array_equals(ws.events, ['write', 'a']);
assert_equals(writer.desiredSize, 0, 'after writing the writer\'s desiredSize must be 0');
writer.releaseLock();
const pipePromise = rs.pipeTo(ws);
rs.controller.enqueue('b');
resolveWritePromise();
rs.controller.close();
return pipePromise.then(() => {
assert_array_equals(rs.eventsWithoutPulls, []);
assert_array_equals(ws.events, ['write', 'a', 'write', 'b', 'close']);
});
});
}, 'Piping from an empty ReadableStream into a WritableStream that does not desire chunks, but then the readable ' +
'stream becomes non-empty and the writable stream starts desiring chunks');
promise_test(() => {
const unreadChunks = ['b', 'c', 'd'];
const rs = recordingReadableStream({
pull(controller) {
controller.enqueue(unreadChunks.shift());
if (unreadChunks.length === 0) {
controller.close();
}
}
}, new CountQueuingStrategy({ highWaterMark: 0 }));
let resolveWritePromise;
const ws = recordingWritableStream({
write() {
if (!resolveWritePromise) {
// first write
return new Promise(resolve => {
resolveWritePromise = resolve;
});
}
return undefined;
}
}, new CountQueuingStrategy({ highWaterMark: 3 }));
const writer = ws.getWriter();
const firstWritePromise = writer.write('a');
assert_equals(writer.desiredSize, 2, 'after writing the writer\'s desiredSize must be 2');
writer.releaseLock();
// firstWritePromise won't settle until we call resolveWritePromise.
const pipePromise = rs.pipeTo(ws);
return flushAsyncEvents().then(() => {
assert_array_equals(ws.events, ['write', 'a']);
assert_equals(unreadChunks.length, 1, 'chunks should continue to be enqueued until the HWM is reached');
}).then(() => resolveWritePromise())
.then(() => Promise.all([firstWritePromise, pipePromise]))
.then(() => {
assert_array_equals(rs.events, ['pull', 'pull', 'pull']);
assert_array_equals(ws.events, ['write', 'a', 'write', 'b','write', 'c','write', 'd', 'close']);
});
}, 'Piping from a ReadableStream to a WritableStream that desires more chunks before finishing with previous ones');
class StepTracker {
constructor() {
this.waiters = [];
this.wakers = [];
}
// Returns promise which resolves when step `n` is reached. Also schedules step n + 1 to happen shortly after the
// promise is resolved.
waitThenAdvance(n) {
if (this.waiters[n] === undefined) {
this.waiters[n] = new Promise(resolve => {
this.wakers[n] = resolve;
});
this.waiters[n]
.then(() => flushAsyncEvents())
.then(() => {
if (this.wakers[n + 1] !== undefined) {
this.wakers[n + 1]();
}
});
}
if (n == 0) {
this.wakers[0]();
}
return this.waiters[n];
}
}
promise_test(() => {
const steps = new StepTracker();
const desiredSizes = [];
const rs = recordingReadableStream({
start(controller) {
steps.waitThenAdvance(1).then(() => enqueue('a'));
steps.waitThenAdvance(3).then(() => enqueue('b'));
steps.waitThenAdvance(5).then(() => enqueue('c'));
steps.waitThenAdvance(7).then(() => enqueue('d'));
steps.waitThenAdvance(11).then(() => controller.close());
function enqueue(chunk) {
controller.enqueue(chunk);
desiredSizes.push(controller.desiredSize);
}
}
});
const chunksFinishedWriting = [];
const writableStartPromise = Promise.resolve();
let writeCalled = false;
const ws = recordingWritableStream({
start() {
return writableStartPromise;
},
write(chunk) {
const waitForStep = writeCalled ? 12 : 9;
writeCalled = true;
return steps.waitThenAdvance(waitForStep).then(() => {
chunksFinishedWriting.push(chunk);
});
}
});
return writableStartPromise.then(() => {
const pipePromise = rs.pipeTo(ws);
steps.waitThenAdvance(0);
return Promise.all([
steps.waitThenAdvance(2).then(() => {
assert_array_equals(chunksFinishedWriting, [], 'at step 2, zero chunks must have finished writing');
assert_array_equals(ws.events, ['write', 'a'], 'at step 2, one chunk must have been written');
// When 'a' (the very first chunk) was enqueued, it was immediately used to fulfill the outstanding read request
// promise, leaving the queue empty.
assert_array_equals(desiredSizes, [1],
'at step 2, the desiredSize at the last enqueue (step 1) must have been 1');
assert_equals(rs.controller.desiredSize, 1, 'at step 2, the current desiredSize must be 1');
}),
steps.waitThenAdvance(4).then(() => {
assert_array_equals(chunksFinishedWriting, [], 'at step 4, zero chunks must have finished writing');
assert_array_equals(ws.events, ['write', 'a'], 'at step 4, one chunk must have been written');
// When 'b' was enqueued at step 3, the queue was also empty, since immediately after enqueuing 'a' at
// step 1, it was dequeued in order to fulfill the read() call that was made at step 0. Thus the queue
// had size 1 (thus desiredSize of 0).
assert_array_equals(desiredSizes, [1, 0],
'at step 4, the desiredSize at the last enqueue (step 3) must have been 0');
assert_equals(rs.controller.desiredSize, 0, 'at step 4, the current desiredSize must be 0');
}),
steps.waitThenAdvance(6).then(() => {
assert_array_equals(chunksFinishedWriting, [], 'at step 6, zero chunks must have finished writing');
assert_array_equals(ws.events, ['write', 'a'], 'at step 6, one chunk must have been written');
// When 'c' was enqueued at step 5, the queue was not empty; it had 'b' in it, since 'b' will not be read until
// the first write completes at step 9. Thus, the queue size is 2 after enqueuing 'c', giving a desiredSize of
// -1.
assert_array_equals(desiredSizes, [1, 0, -1],
'at step 6, the desiredSize at the last enqueue (step 5) must have been -1');
assert_equals(rs.controller.desiredSize, -1, 'at step 6, the current desiredSize must be -1');
}),
steps.waitThenAdvance(8).then(() => {
assert_array_equals(chunksFinishedWriting, [], 'at step 8, zero chunks must have finished writing');
assert_array_equals(ws.events, ['write', 'a'], 'at step 8, one chunk must have been written');
// When 'd' was enqueued at step 7, the situation is the same as before, leading to a queue containing 'b', 'c',
// and 'd'.
assert_array_equals(desiredSizes, [1, 0, -1, -2],
'at step 8, the desiredSize at the last enqueue (step 7) must have been -2');
assert_equals(rs.controller.desiredSize, -2, 'at step 8, the current desiredSize must be -2');
}),
steps.waitThenAdvance(10).then(() => {
assert_array_equals(chunksFinishedWriting, ['a'], 'at step 10, one chunk must have finished writing');
assert_array_equals(ws.events, ['write', 'a', 'write', 'b'],
'at step 10, two chunks must have been written');
assert_equals(rs.controller.desiredSize, -1, 'at step 10, the current desiredSize must be -1');
}),
pipePromise.then(() => {
assert_array_equals(desiredSizes, [1, 0, -1, -2], 'backpressure must have been exerted at the source');
assert_array_equals(chunksFinishedWriting, ['a', 'b', 'c', 'd'], 'all chunks finished writing');
assert_array_equals(rs.eventsWithoutPulls, [], 'nothing unexpected should happen to the ReadableStream');
assert_array_equals(ws.events, ['write', 'a', 'write', 'b', 'write', 'c', 'write', 'd', 'close'],
'all chunks were written (and the WritableStream closed)');
})
]);
});
}, 'Piping to a WritableStream that does not consume the writes fast enough exerts backpressure on the ReadableStream');

View file

@ -0,0 +1,15 @@
<!doctype html>
<meta charset=utf-8>
<script>
self.GLOBAL = {
isWindow: function() { return true; },
isWorker: function() { return false; },
isShadowRealm: function() { return false; },
};
</script>
<script src="../../resources/testharness.js"></script>
<script src="../../resources/testharnessreport.js"></script>
<script src="../resources/recording-streams.js"></script>
<div id=log></div>
<script src="../../streams/piping/general.any.js"></script>

View file

@ -0,0 +1,212 @@
// META: global=window,worker,shadowrealm
// META: script=../resources/recording-streams.js
'use strict';
test(() => {
const rs = new ReadableStream();
const ws = new WritableStream();
assert_false(rs.locked, 'sanity check: the ReadableStream must not start locked');
assert_false(ws.locked, 'sanity check: the WritableStream must not start locked');
rs.pipeTo(ws);
assert_true(rs.locked, 'the ReadableStream must become locked');
assert_true(ws.locked, 'the WritableStream must become locked');
}, 'Piping must lock both the ReadableStream and WritableStream');
promise_test(() => {
const rs = new ReadableStream({
start(controller) {
controller.close();
}
});
const ws = new WritableStream();
return rs.pipeTo(ws).then(() => {
assert_false(rs.locked, 'the ReadableStream must become unlocked');
assert_false(ws.locked, 'the WritableStream must become unlocked');
});
}, 'Piping finishing must unlock both the ReadableStream and WritableStream');
promise_test(t => {
const fakeRS = Object.create(ReadableStream.prototype);
const ws = new WritableStream();
return promise_rejects_js(t, TypeError, ReadableStream.prototype.pipeTo.apply(fakeRS, [ws]),
'pipeTo should reject with a TypeError');
}, 'pipeTo must check the brand of its ReadableStream this value');
promise_test(t => {
const rs = new ReadableStream();
const fakeWS = Object.create(WritableStream.prototype);
return promise_rejects_js(t, TypeError, ReadableStream.prototype.pipeTo.apply(rs, [fakeWS]),
'pipeTo should reject with a TypeError');
}, 'pipeTo must check the brand of its WritableStream argument');
promise_test(t => {
const rs = new ReadableStream();
const ws = new WritableStream();
rs.getReader();
assert_true(rs.locked, 'sanity check: the ReadableStream starts locked');
assert_false(ws.locked, 'sanity check: the WritableStream does not start locked');
return promise_rejects_js(t, TypeError, rs.pipeTo(ws)).then(() => {
assert_false(ws.locked, 'the WritableStream must still be unlocked');
});
}, 'pipeTo must fail if the ReadableStream is locked, and not lock the WritableStream');
promise_test(t => {
const rs = new ReadableStream();
const ws = new WritableStream();
ws.getWriter();
assert_false(rs.locked, 'sanity check: the ReadableStream does not start locked');
assert_true(ws.locked, 'sanity check: the WritableStream starts locked');
return promise_rejects_js(t, TypeError, rs.pipeTo(ws)).then(() => {
assert_false(rs.locked, 'the ReadableStream must still be unlocked');
});
}, 'pipeTo must fail if the WritableStream is locked, and not lock the ReadableStream');
promise_test(() => {
const CHUNKS = 10;
const rs = new ReadableStream({
start(c) {
for (let i = 0; i < CHUNKS; ++i) {
c.enqueue(i);
}
c.close();
}
});
const written = [];
const ws = new WritableStream({
write(chunk) {
written.push(chunk);
},
close() {
written.push('closed');
}
}, new CountQueuingStrategy({ highWaterMark: CHUNKS }));
return rs.pipeTo(ws).then(() => {
const targetValues = [];
for (let i = 0; i < CHUNKS; ++i) {
targetValues.push(i);
}
targetValues.push('closed');
assert_array_equals(written, targetValues, 'the correct values must be written');
// Ensure both readable and writable are closed by the time the pipe finishes.
return Promise.all([
rs.getReader().closed,
ws.getWriter().closed
]);
});
// NOTE: no requirement on *when* the pipe finishes; that is left to implementations.
}, 'Piping from a ReadableStream from which lots of chunks are synchronously readable');
promise_test(t => {
let controller;
const rs = recordingReadableStream({
start(c) {
controller = c;
}
});
const ws = recordingWritableStream();
const pipePromise = rs.pipeTo(ws).then(() => {
assert_array_equals(ws.events, ['write', 'Hello', 'close']);
});
t.step_timeout(() => {
controller.enqueue('Hello');
t.step_timeout(() => controller.close(), 10);
}, 10);
return pipePromise;
}, 'Piping from a ReadableStream for which a chunk becomes asynchronously readable after the pipeTo');
for (const preventAbort of [true, false]) {
promise_test(() => {
const rs = new ReadableStream({
pull() {
return Promise.reject(undefined);
}
});
return rs.pipeTo(new WritableStream(), { preventAbort }).then(
() => assert_unreached('pipeTo promise should be rejected'),
value => assert_equals(value, undefined, 'rejection value should be undefined'));
}, `an undefined rejection from pull should cause pipeTo() to reject when preventAbort is ${preventAbort}`);
}
for (const preventCancel of [true, false]) {
promise_test(() => {
const rs = new ReadableStream({
pull(controller) {
controller.enqueue(0);
}
});
const ws = new WritableStream({
write() {
return Promise.reject(undefined);
}
});
return rs.pipeTo(ws, { preventCancel }).then(
() => assert_unreached('pipeTo promise should be rejected'),
value => assert_equals(value, undefined, 'rejection value should be undefined'));
}, `an undefined rejection from write should cause pipeTo() to reject when preventCancel is ${preventCancel}`);
}
promise_test(t => {
const rs = new ReadableStream();
const ws = new WritableStream();
return promise_rejects_js(t, TypeError, rs.pipeTo(ws, {
get preventAbort() {
ws.getWriter();
}
}), 'pipeTo should reject');
}, 'pipeTo() should reject if an option getter grabs a writer');
promise_test(t => {
const rs = new ReadableStream({
start(controller) {
controller.close();
}
});
const ws = new WritableStream();
return rs.pipeTo(ws, null);
}, 'pipeTo() promise should resolve if null is passed');

View file

@ -0,0 +1,16 @@
<!doctype html>
<meta charset=utf-8>
<script>
self.GLOBAL = {
isWindow: function() { return true; },
isWorker: function() { return false; },
isShadowRealm: function() { return false; },
};
</script>
<script src="../../resources/testharness.js"></script>
<script src="../../resources/testharnessreport.js"></script>
<script src="../resources/test-utils.js"></script>
<script src="../resources/recording-streams.js"></script>
<div id=log></div>
<script src="../../streams/piping/multiple-propagation.any.js"></script>

View file

@ -0,0 +1,227 @@
// META: global=window,worker,shadowrealm
// META: script=../resources/test-utils.js
// META: script=../resources/recording-streams.js
'use strict';
const error1 = new Error('error1!');
error1.name = 'error1';
const error2 = new Error('error2!');
error2.name = 'error2';
function createErroredWritableStream(t) {
return Promise.resolve().then(() => {
const ws = recordingWritableStream({
start(c) {
c.error(error2);
}
});
const writer = ws.getWriter();
return promise_rejects_exactly(t, error2, writer.closed, 'the writable stream must be errored with error2')
.then(() => {
writer.releaseLock();
assert_array_equals(ws.events, []);
return ws;
});
});
}
promise_test(t => {
const rs = recordingReadableStream({
start(c) {
c.error(error1);
}
});
const ws = recordingWritableStream({
start(c) {
c.error(error2);
}
});
// Trying to abort a stream that is erroring will give the writable's error
return promise_rejects_exactly(t, error2, rs.pipeTo(ws), 'pipeTo must reject with the writable stream\'s error').then(() => {
assert_array_equals(rs.events, []);
assert_array_equals(ws.events, []);
return Promise.all([
promise_rejects_exactly(t, error1, rs.getReader().closed, 'the readable stream must be errored with error1'),
promise_rejects_exactly(t, error2, ws.getWriter().closed, 'the writable stream must be errored with error2')
]);
});
}, 'Piping from an errored readable stream to an erroring writable stream');
promise_test(t => {
const rs = recordingReadableStream({
start(c) {
c.error(error1);
}
});
return createErroredWritableStream(t)
.then(ws => promise_rejects_exactly(t, error1, rs.pipeTo(ws), 'pipeTo must reject with the readable stream\'s error'))
.then(() => {
assert_array_equals(rs.events, []);
return promise_rejects_exactly(t, error1, rs.getReader().closed, 'the readable stream must be errored with error1');
});
}, 'Piping from an errored readable stream to an errored writable stream');
promise_test(t => {
const rs = recordingReadableStream({
start(c) {
c.error(error1);
}
});
const ws = recordingWritableStream({
start(c) {
c.error(error2);
}
});
return promise_rejects_exactly(t, error1, rs.pipeTo(ws, { preventAbort: true }),
'pipeTo must reject with the readable stream\'s error')
.then(() => {
assert_array_equals(rs.events, []);
assert_array_equals(ws.events, []);
return Promise.all([
promise_rejects_exactly(t, error1, rs.getReader().closed, 'the readable stream must be errored with error1'),
promise_rejects_exactly(t, error2, ws.getWriter().closed, 'the writable stream must be errored with error2')
]);
});
}, 'Piping from an errored readable stream to an erroring writable stream; preventAbort = true');
promise_test(t => {
const rs = recordingReadableStream({
start(c) {
c.error(error1);
}
});
return createErroredWritableStream(t)
.then(ws => promise_rejects_exactly(t, error1, rs.pipeTo(ws, { preventAbort: true }),
'pipeTo must reject with the readable stream\'s error'))
.then(() => {
assert_array_equals(rs.events, []);
return promise_rejects_exactly(t, error1, rs.getReader().closed, 'the readable stream must be errored with error1');
});
}, 'Piping from an errored readable stream to an errored writable stream; preventAbort = true');
promise_test(t => {
const rs = recordingReadableStream({
start(c) {
c.error(error1);
}
});
const ws = recordingWritableStream();
const writer = ws.getWriter();
const closePromise = writer.close();
writer.releaseLock();
return promise_rejects_exactly(t, error1, rs.pipeTo(ws), 'pipeTo must reject with the readable stream\'s error').then(() => {
assert_array_equals(rs.events, []);
assert_array_equals(ws.events, ['abort', error1]);
return Promise.all([
promise_rejects_exactly(t, error1, rs.getReader().closed, 'the readable stream must be errored with error1'),
promise_rejects_exactly(t, error1, ws.getWriter().closed,
'closed must reject with error1'),
promise_rejects_exactly(t, error1, closePromise,
'close() must reject with error1')
]);
});
}, 'Piping from an errored readable stream to a closing writable stream');
promise_test(t => {
const rs = recordingReadableStream({
start(c) {
c.error(error1);
}
});
const ws = recordingWritableStream();
const writer = ws.getWriter();
const closePromise = writer.close();
writer.releaseLock();
return flushAsyncEvents().then(() => {
return promise_rejects_exactly(t, error1, rs.pipeTo(ws), 'pipeTo must reject with the readable stream\'s error').then(() => {
assert_array_equals(rs.events, []);
assert_array_equals(ws.events, ['close']);
return Promise.all([
promise_rejects_exactly(t, error1, rs.getReader().closed, 'the readable stream must be errored with error1'),
ws.getWriter().closed,
closePromise
]);
});
});
}, 'Piping from an errored readable stream to a closed writable stream');
promise_test(t => {
const rs = recordingReadableStream({
start(c) {
c.close();
}
});
const ws = recordingWritableStream({
start(c) {
c.error(error1);
}
});
return promise_rejects_exactly(t, error1, rs.pipeTo(ws), 'pipeTo must reject with the writable stream\'s error').then(() => {
assert_array_equals(rs.events, []);
assert_array_equals(ws.events, []);
return Promise.all([
rs.getReader().closed,
promise_rejects_exactly(t, error1, ws.getWriter().closed, 'the writable stream must be errored with error1')
]);
});
}, 'Piping from a closed readable stream to an erroring writable stream');
promise_test(t => {
const rs = recordingReadableStream({
start(c) {
c.close();
}
});
return createErroredWritableStream(t)
.then(ws => promise_rejects_exactly(t, error2, rs.pipeTo(ws), 'pipeTo must reject with the writable stream\'s error'))
.then(() => {
assert_array_equals(rs.events, []);
return rs.getReader().closed;
});
}, 'Piping from a closed readable stream to an errored writable stream');
promise_test(() => {
const rs = recordingReadableStream({
start(c) {
c.close();
}
});
const ws = recordingWritableStream();
const writer = ws.getWriter();
writer.close();
writer.releaseLock();
return rs.pipeTo(ws).then(() => {
assert_array_equals(rs.events, []);
assert_array_equals(ws.events, ['close']);
return Promise.all([
rs.getReader().closed,
ws.getWriter().closed
]);
});
}, 'Piping from a closed readable stream to a closed writable stream');