diff --git a/Libraries/LibWeb/Streams/AbstractOperations.cpp b/Libraries/LibWeb/Streams/AbstractOperations.cpp index e34b1ebfbd8..dc3dba99711 100644 --- a/Libraries/LibWeb/Streams/AbstractOperations.cpp +++ b/Libraries/LibWeb/Streams/AbstractOperations.cpp @@ -289,8 +289,383 @@ bool readable_stream_has_default_reader(ReadableStream const& stream) return false; } +// https://streams.spec.whatwg.org/#ref-for-in-parallel +class ReadableStreamPipeTo final : public JS::Cell { + GC_CELL(ReadableStreamPipeTo, JS::Cell); + GC_DECLARE_ALLOCATOR(ReadableStreamPipeTo); + +public: + void process() + { + if (check_for_error_and_close_states()) + return; + + auto ready_promise = m_writer->ready(); + + if (ready_promise && WebIDL::is_promise_fulfilled(*ready_promise)) { + read_chunk(); + return; + } + + auto when_ready = GC::create_function(m_realm->heap(), [this](JS::Value) -> WebIDL::ExceptionOr { + read_chunk(); + return JS::js_undefined(); + }); + + auto shutdown = GC::create_function(heap(), [this](JS::Value) -> WebIDL::ExceptionOr { + check_for_error_and_close_states(); + return JS::js_undefined(); + }); + + if (ready_promise) + WebIDL::react_to_promise(*ready_promise, when_ready, shutdown); + if (auto promise = m_reader->closed()) + WebIDL::react_to_promise(*promise, shutdown, shutdown); + } + + void set_abort_signal(GC::Ref signal, DOM::AbortSignal::AbortSignal::AbortAlgorithmID signal_id) + { + m_signal = signal; + m_signal_id = signal_id; + } + + // https://streams.spec.whatwg.org/#rs-pipeTo-shutdown-with-action + void shutdown_with_action(GC::Ref()>> action, Optional original_error = {}) + { + // 1. If shuttingDown is true, abort these substeps. + if (m_shutting_down) + return; + + // 2. Set shuttingDown to true. + m_shutting_down = true; + + auto on_pending_writes_complete = [this, action, original_error = move(original_error)]() mutable { + HTML::TemporaryExecutionContext execution_context { m_realm, HTML::TemporaryExecutionContext::CallbacksEnabled::Yes }; + + // 4. Let p be the result of performing action. + auto promise = action->function()(); + + WebIDL::react_to_promise(promise, + // 5. Upon fulfillment of p, finalize, passing along originalError if it was given. + GC::create_function(heap(), [this, original_error = move(original_error)](JS::Value) mutable -> WebIDL::ExceptionOr { + finish(move(original_error)); + return JS::js_undefined(); + }), + + // 6. Upon rejection of p with reason newError, finalize with newError. + GC::create_function(heap(), [this](JS::Value new_error) -> WebIDL::ExceptionOr { + finish(new_error); + return JS::js_undefined(); + })); + }; + + // 3. If dest.[[state]] is "writable" and ! WritableStreamCloseQueuedOrInFlight(dest) is false, + if (m_destination->state() == WritableStream::State::Writable && !writable_stream_close_queued_or_in_flight(m_destination)) { + // 1. If any chunks have been read but not yet written, write them to dest. + write_unwritten_chunks(); + + // 2. Wait until every chunk that has been read has been written (i.e. the corresponding promises have settled). + wait_for_pending_writes_to_complete(move(on_pending_writes_complete)); + } else { + on_pending_writes_complete(); + } + } + + // https://streams.spec.whatwg.org/#rs-pipeTo-shutdown + void shutdown(Optional error = {}) + { + // 1. If shuttingDown is true, abort these substeps. + if (m_shutting_down) + return; + + // 2. Set shuttingDown to true. + m_shutting_down = true; + + auto on_pending_writes_complete = [this, error = move(error)]() mutable { + HTML::TemporaryExecutionContext execution_context { m_realm, HTML::TemporaryExecutionContext::CallbacksEnabled::Yes }; + + // 4. Finalize, passing along error if it was given. + finish(move(error)); + }; + + // 3. If dest.[[state]] is "writable" and ! WritableStreamCloseQueuedOrInFlight(dest) is false, + if (m_destination->state() == WritableStream::State::Writable && !writable_stream_close_queued_or_in_flight(m_destination)) { + // 1. If any chunks have been read but not yet written, write them to dest. + write_unwritten_chunks(); + + // 2. Wait until every chunk that has been read has been written (i.e. the corresponding promises have settled). + wait_for_pending_writes_to_complete(move(on_pending_writes_complete)); + } else { + on_pending_writes_complete(); + } + } + +private: + ReadableStreamPipeTo( + GC::Ref realm, + GC::Ref promise, + GC::Ref source, + GC::Ref destination, + GC::Ref reader, + GC::Ref writer, + bool prevent_close, + bool prevent_abort, + bool prevent_cancel) + : m_realm(realm) + , m_promise(promise) + , m_source(source) + , m_destination(destination) + , m_reader(reader) + , m_writer(writer) + , m_prevent_close(prevent_close) + , m_prevent_abort(prevent_abort) + , m_prevent_cancel(prevent_cancel) + { + m_reader->set_readable_stream_pipe_to_operation({}, this); + } + + virtual void visit_edges(Cell::Visitor& visitor) override + { + Base::visit_edges(visitor); + visitor.visit(m_realm); + visitor.visit(m_promise); + visitor.visit(m_source); + visitor.visit(m_destination); + visitor.visit(m_reader); + visitor.visit(m_writer); + visitor.visit(m_signal); + visitor.visit(m_pending_writes); + visitor.visit(m_unwritten_chunks); + } + + void read_chunk() + { + // Shutdown must stop activity: if shuttingDown becomes true, the user agent must not initiate further reads from + // reader, and must only perform writes of already-read chunks, as described below. In particular, the user agent + // must check the below conditions before performing any reads or writes, since they might lead to immediate shutdown. + if (check_for_error_and_close_states()) + return; + + auto when_ready = GC::create_function(heap(), [this](JS::Value value) -> WebIDL::ExceptionOr { + auto& vm = this->vm(); + + VERIFY(value.is_object()); + auto& object = value.as_object(); + + auto done = MUST(JS::iterator_complete(vm, object)); + + if (done) { + if (!check_for_error_and_close_states()) + finish(); + } else { + auto chunk = MUST(JS::iterator_value(vm, object)); + m_unwritten_chunks.append(chunk); + + write_chunk(); + process(); + } + + return JS::js_undefined(); + }); + + auto shutdown = GC::create_function(heap(), [this](JS::Value) -> WebIDL::ExceptionOr { + check_for_error_and_close_states(); + return JS::js_undefined(); + }); + + WebIDL::react_to_promise(m_reader->read(), when_ready, shutdown); + + if (auto promise = m_writer->closed()) + WebIDL::react_to_promise(*promise, shutdown, shutdown); + } + + void write_chunk() + { + // Shutdown must stop activity: if shuttingDown becomes true, the user agent must not initiate further reads from + // reader, and must only perform writes of already-read chunks, as described below. In particular, the user agent + // must check the below conditions before performing any reads or writes, since they might lead to immediate shutdown. + if (!m_shutting_down && check_for_error_and_close_states()) + return; + + auto promise = m_writer->write(m_unwritten_chunks.take_first()); + WebIDL::mark_promise_as_handled(promise); + + m_pending_writes.append(promise); + } + + void write_unwritten_chunks() + { + while (!m_unwritten_chunks.is_empty()) + write_chunk(); + } + + void wait_for_pending_writes_to_complete(Function on_complete) + { + auto handler = GC::create_function(heap(), [this, on_complete = move(on_complete)]() { + m_pending_writes.clear(); + on_complete(); + }); + + auto success_steps = [handler](Vector const&) { handler->function()(); }; + auto failure_steps = [handler](JS::Value) { handler->function()(); }; + + WebIDL::wait_for_all(m_realm, m_pending_writes, move(success_steps), move(failure_steps)); + } + + // https://streams.spec.whatwg.org/#rs-pipeTo-finalize + // We call this `finish` instead of `finalize` to avoid conflicts with GC::Cell::finalize. + void finish(Optional error = {}) + { + // 1. Perform ! WritableStreamDefaultWriterRelease(writer). + writable_stream_default_writer_release(m_writer); + + // 2. If reader implements ReadableStreamBYOBReader, perform ! ReadableStreamBYOBReaderRelease(reader). + // 3. Otherwise, perform ! ReadableStreamDefaultReaderRelease(reader). + readable_stream_default_reader_release(m_reader); + + // 4. If signal is not undefined, remove abortAlgorithm from signal. + if (m_signal) + m_signal->remove_abort_algorithm(m_signal_id); + + // 5. If error was given, reject promise with error. + if (error.has_value()) { + WebIDL::reject_promise(m_realm, m_promise, *error); + } + // 6. Otherwise, resolve promise with undefined. + else { + WebIDL::resolve_promise(m_realm, m_promise, JS::js_undefined()); + } + + m_reader->set_readable_stream_pipe_to_operation({}, nullptr); + } + + bool check_for_error_and_close_states() + { + // Error and close states must be propagated: the following conditions must be applied in order. + return m_shutting_down + || check_for_forward_errors() + || check_for_backward_errors() + || check_for_forward_close() + || check_for_backward_close(); + } + + bool check_for_forward_errors() + { + // 1. Errors must be propagated forward: if source.[[state]] is or becomes "errored", then + if (m_source->state() == ReadableStream::State::Errored) { + // 1. If preventAbort is false, shutdown with an action of ! WritableStreamAbort(dest, source.[[storedError]]) + // and with source.[[storedError]]. + if (!m_prevent_abort) { + auto action = GC::create_function(heap(), [this]() { + return writable_stream_abort(m_destination, m_source->stored_error()); + }); + + shutdown_with_action(action, m_source->stored_error()); + } + // 2. Otherwise, shutdown with source.[[storedError]]. + else { + shutdown(m_source->stored_error()); + } + } + + return m_shutting_down; + } + + bool check_for_backward_errors() + { + // 2. Errors must be propagated backward: if dest.[[state]] is or becomes "errored", then + if (m_destination->state() == WritableStream::State::Errored) { + // 1. If preventCancel is false, shutdown with an action of ! ReadableStreamCancel(source, dest.[[storedError]]) + // and with dest.[[storedError]]. + if (!m_prevent_cancel) { + auto action = GC::create_function(heap(), [this]() { + return readable_stream_cancel(m_source, m_destination->stored_error()); + }); + + shutdown_with_action(action, m_destination->stored_error()); + } + // 2. Otherwise, shutdown with dest.[[storedError]]. + else { + shutdown(m_destination->stored_error()); + } + } + + return m_shutting_down; + } + + bool check_for_forward_close() + { + // 3. Closing must be propagated forward: if source.[[state]] is or becomes "closed", then + if (m_source->state() == ReadableStream::State::Closed) { + // 1. If preventClose is false, shutdown with an action of ! WritableStreamDefaultWriterCloseWithErrorPropagation(writer). + if (!m_prevent_close) { + auto action = GC::create_function(heap(), [this]() { + return writable_stream_default_writer_close_with_error_propagation(m_writer); + }); + + shutdown_with_action(action); + } + // 2. Otherwise, shutdown. + else { + shutdown(); + } + } + + return m_shutting_down; + } + + bool check_for_backward_close() + { + // 4. Closing must be propagated backward: if ! WritableStreamCloseQueuedOrInFlight(dest) is true or dest.[[state]] is "closed", then + if (writable_stream_close_queued_or_in_flight(m_destination) || m_destination->state() == WritableStream::State::Closed) { + // 1. Assert: no chunks have been read or written. + + // 2. Let destClosed be a new TypeError. + auto destination_closed = JS::TypeError::create(m_realm, "Destination stream was closed during piping operation"sv); + + // 3. If preventCancel is false, shutdown with an action of ! ReadableStreamCancel(source, destClosed) and with destClosed. + if (!m_prevent_cancel) { + auto action = GC::create_function(heap(), [this, destination_closed]() { + return readable_stream_cancel(m_source, destination_closed); + }); + + shutdown_with_action(action, destination_closed); + } + // 4. Otherwise, shutdown with destClosed. + else { + shutdown(destination_closed); + } + } + + return m_shutting_down; + } + + GC::Ref m_realm; + GC::Ref m_promise; + + GC::Ref m_source; + GC::Ref m_destination; + + GC::Ref m_reader; + GC::Ref m_writer; + + GC::Ptr m_signal; + DOM::AbortSignal::AbortAlgorithmID m_signal_id { 0 }; + + Vector> m_pending_writes; + Vector m_unwritten_chunks; + + bool m_prevent_close { false }; + bool m_prevent_abort { false }; + bool m_prevent_cancel { false }; + + bool m_shutting_down { false }; +}; + +GC_DEFINE_ALLOCATOR(ReadableStreamPipeTo); + // https://streams.spec.whatwg.org/#readable-stream-pipe-to -GC::Ref readable_stream_pipe_to(ReadableStream& source, WritableStream& dest, bool, bool, bool, GC::Ptr signal) +GC::Ref readable_stream_pipe_to(ReadableStream& source, WritableStream& dest, bool prevent_close, bool prevent_abort, bool prevent_cancel, GC::Ptr signal) { auto& realm = source.realm(); @@ -300,7 +675,6 @@ GC::Ref readable_stream_pipe_to(ReadableStream& source, Writabl // 4. If signal was not given, let signal be undefined. // 5. Assert: either signal is undefined, or signal implements AbortSignal. - (void)signal; // 6. Assert: ! IsReadableStreamLocked(source) is false. VERIFY(!is_readable_stream_locked(source)); @@ -322,57 +696,81 @@ GC::Ref readable_stream_pipe_to(ReadableStream& source, Writabl // 11. Set source.[[disturbed]] to true. source.set_disturbed(true); - // FIXME: 12. Let shuttingDown be false. + // 12. Let shuttingDown be false. + // NOTE: This is internal to the ReadableStreamPipeTo class. // 13. Let promise be a new promise. auto promise = WebIDL::create_promise(realm); - // FIXME 14. If signal is not undefined, - // 1. Let abortAlgorithm be the following steps: - // 1. Let error be signal’s abort reason. - // 2. Let actions be an empty ordered set. - // 3. If preventAbort is false, append the following action to actions: - // 1. If dest.[[state]] is "writable", return ! WritableStreamAbort(dest, error). - // 2. Otherwise, return a promise resolved with undefined. - // 4. If preventCancel is false, append the following action to actions: - // 1. If source.[[state]] is "readable", return ! ReadableStreamCancel(source, error). - // 2. Otherwise, return a promise resolved with undefined. - // 5. Shutdown with an action consisting of getting a promise to wait for all of the actions in actions, and with error. - // 2. If signal is aborted, perform abortAlgorithm and return promise. - // 3. Add abortAlgorithm to signal. + auto operation = realm.heap().allocate(realm, promise, source, dest, reader, writer, prevent_close, prevent_abort, prevent_cancel); - // 15. In parallel but not really; see #905, using reader and writer, read all chunks from source and write them to - // dest. Due to the locking provided by the reader and writer, the exact manner in which this happens is not - // observable to author code, and so there is flexibility in how this is done. The following constraints apply - // regardless of the exact algorithm used: - // - Public API must not be used: while reading or writing, or performing any of the operations below, the - // JavaScript-modifiable reader, writer, and stream APIs (i.e. methods on the appropriate prototypes) must not - // be used. Instead, the streams must be manipulated directly. + // 14. If signal is not undefined, + if (signal) { + // 1. Let abortAlgorithm be the following steps: + auto abort_algorithm = [&realm, operation, source = GC::Ref { source }, dest = GC::Ref { dest }, prevent_abort, prevent_cancel, signal]() { + HTML::TemporaryExecutionContext execution_context { realm, HTML::TemporaryExecutionContext::CallbacksEnabled::Yes }; - // FIXME: Currently a naive implementation that uses ReadableStreamDefaultReader::read_all_chunks() to read all chunks - // from the source and then through the callback success_steps writes those chunks to the destination. - auto chunk_steps = GC::create_function(realm.heap(), [&realm, writer](JS::Value chunk) { - auto promise = writable_stream_default_writer_write(writer, chunk); - WebIDL::resolve_promise(realm, promise, JS::js_undefined()); - }); + // 1. Let error be signal’s abort reason. + auto error = signal->reason(); - auto success_steps = GC::create_function(realm.heap(), [promise, &realm, reader, writer]() { - // Make sure we close the acquired writer. - WebIDL::resolve_promise(realm, writable_stream_default_writer_close(*writer), JS::js_undefined()); - readable_stream_default_reader_release(*reader); + // 2. Let actions be an empty ordered set. + GC::Ptr()>> abort_destination; + GC::Ptr()>> cancel_source; - WebIDL::resolve_promise(realm, promise, JS::js_undefined()); - }); + // 3. If preventAbort is false, append the following action to actions: + if (!prevent_abort) { + abort_destination = GC::create_function(realm.heap(), [&realm, dest, error]() { + // 1. If dest.[[state]] is "writable", return ! WritableStreamAbort(dest, error). + if (dest->state() == WritableStream::State::Writable) + return writable_stream_abort(dest, error); - auto failure_steps = GC::create_function(realm.heap(), [promise, &realm, reader, writer](JS::Value error) { - // Make sure we close the acquired writer. - WebIDL::resolve_promise(realm, writable_stream_default_writer_close(*writer), JS::js_undefined()); - readable_stream_default_reader_release(*reader); + // 2. Otherwise, return a promise resolved with undefined. + return WebIDL::create_resolved_promise(realm, JS::js_undefined()); + }); + } - WebIDL::reject_promise(realm, promise, error); - }); + // 4. If preventCancel is false, append the following action action to actions: + if (!prevent_cancel) { + cancel_source = GC::create_function(realm.heap(), [&realm, source, error]() { + // 1. If source.[[state]] is "readable", return ! ReadableStreamCancel(source, error). + if (source->state() == ReadableStream::State::Readable) + return readable_stream_cancel(source, error); - reader->read_all_chunks(chunk_steps, success_steps, failure_steps); + // 2. Otherwise, return a promise resolved with undefined. + return WebIDL::create_resolved_promise(realm, JS::js_undefined()); + }); + } + + // 5. Shutdown with an action consisting of getting a promise to wait for all of the actions in actions, and with error. + auto action = GC::create_function(realm.heap(), [&realm, abort_destination, cancel_source]() { + GC::RootVector> actions(realm.heap()); + + if (abort_destination) + actions.append(abort_destination->function()()); + if (cancel_source) + actions.append(cancel_source->function()()); + + return WebIDL::get_promise_for_wait_for_all(realm, actions); + }); + + operation->shutdown_with_action(action, error); + }; + + // 2. If signal is aborted, perform abortAlgorithm and return promise. + if (signal->aborted()) { + abort_algorithm(); + return promise; + } + + // 3. Add abortAlgorithm to signal. + auto signal_id = signal->add_abort_algorithm(move(abort_algorithm)); + operation->set_abort_signal(*signal, signal_id.value()); + } + + // 15. In parallel (but not really; see #905), using reader and writer, read all chunks from source and write them + // to dest. Due to the locking provided by the reader and writer, the exact manner in which this happens is not + // observable to author code, and so there is flexibility in how this is done. + operation->process(); // 16. Return promise. return promise; diff --git a/Libraries/LibWeb/Streams/ReadableStreamDefaultReader.cpp b/Libraries/LibWeb/Streams/ReadableStreamDefaultReader.cpp index 8ab80faecb1..1aa1bde0560 100644 --- a/Libraries/LibWeb/Streams/ReadableStreamDefaultReader.cpp +++ b/Libraries/LibWeb/Streams/ReadableStreamDefaultReader.cpp @@ -66,6 +66,7 @@ void ReadableStreamDefaultReader::visit_edges(Cell::Visitor& visitor) ReadableStreamGenericReaderMixin::visit_edges(visitor); for (auto& request : m_read_requests) visitor.visit(request); + visitor.visit(m_readable_stream_pipe_to_operation); } // https://streams.spec.whatwg.org/#read-loop @@ -212,42 +213,6 @@ void ReadableStreamDefaultReader::read_all_bytes(GC::Ref chunk_steps, GC::Ref success_steps, GC::Ref failure_steps) -{ - // AD-HOC: Some spec steps direct us to "read all chunks" from a stream, but there isn't an AO defined to do that. - // We implement those steps by continuously making default read requests, which is an identity transformation, - // with a custom callback to receive each chunk that is read. This is done until the controller signals - // that there are no more chunks to consume. - // This function is based on "read_all_bytes" above. - auto promise_capability = read(); - - WebIDL::react_to_promise( - promise_capability, - GC::create_function(heap(), [this, chunk_steps, success_steps, failure_steps](JS::Value value) -> WebIDL::ExceptionOr { - auto& vm = this->vm(); - - VERIFY(value.is_object()); - auto& value_object = value.as_object(); - - auto done = MUST(JS::iterator_complete(vm, value_object)); - - if (!done) { - auto chunk = MUST(JS::iterator_value(vm, value_object)); - chunk_steps->function()(chunk); - - read_all_chunks(chunk_steps, success_steps, failure_steps); - } else { - success_steps->function()(); - } - - return JS::js_undefined(); - }), - GC::create_function(heap(), [failure_steps](JS::Value error) -> WebIDL::ExceptionOr { - failure_steps->function()(error); - return JS::js_undefined(); - })); -} - // FIXME: This function is a promise-based wrapper around "read all bytes". The spec changed this function to not use promises // in https://github.com/whatwg/streams/commit/f894acdd417926a2121710803cef593e15127964 - however, it seems that the // FileAPI blob specification has not been updated to match, see: https://github.com/w3c/FileAPI/issues/187. diff --git a/Libraries/LibWeb/Streams/ReadableStreamDefaultReader.h b/Libraries/LibWeb/Streams/ReadableStreamDefaultReader.h index 8e37bf476e7..de71cb17c39 100644 --- a/Libraries/LibWeb/Streams/ReadableStreamDefaultReader.h +++ b/Libraries/LibWeb/Streams/ReadableStreamDefaultReader.h @@ -20,6 +20,8 @@ struct ReadableStreamReadResult { bool done; }; +class ReadableStreamPipeTo; + class ReadRequest : public JS::Cell { GC_CELL(ReadRequest, JS::Cell); @@ -91,13 +93,14 @@ public: void read_a_chunk(Fetch::Infrastructure::IncrementalReadLoopReadRequest& read_request); void read_all_bytes(GC::Ref, GC::Ref); - void read_all_chunks(GC::Ref, GC::Ref, GC::Ref); GC::Ref read_all_bytes_deprecated(); void release_lock(); SinglyLinkedList>& read_requests() { return m_read_requests; } + void set_readable_stream_pipe_to_operation(Badge, GC::Ptr readable_stream_pipe_to_operation) { m_readable_stream_pipe_to_operation = readable_stream_pipe_to_operation; } + private: explicit ReadableStreamDefaultReader(JS::Realm&); @@ -106,6 +109,8 @@ private: virtual void visit_edges(Cell::Visitor&) override; SinglyLinkedList> m_read_requests; + + GC::Ptr m_readable_stream_pipe_to_operation; }; } diff --git a/Libraries/LibWeb/WebIDL/Promise.cpp b/Libraries/LibWeb/WebIDL/Promise.cpp index c2bbb0ccbb6..df5733d9db0 100644 --- a/Libraries/LibWeb/WebIDL/Promise.cpp +++ b/Libraries/LibWeb/WebIDL/Promise.cpp @@ -175,8 +175,14 @@ GC::Ref upon_rejection(Promise const& promise, GC::Ref s void mark_promise_as_handled(Promise const& promise) { // To mark as handled a Promise promise, set promise.[[Promise]].[[PromiseIsHandled]] to true. - auto promise_object = as(promise.promise().ptr()); - promise_object->set_is_handled(); + auto& promise_object = as(*promise.promise()); + promise_object.set_is_handled(); +} + +bool is_promise_fulfilled(Promise const& promise) +{ + auto const& promise_object = as(*promise.promise()); + return promise_object.state() == JS::Promise::State::Fulfilled; } struct WaitForAllResults : JS::Cell { diff --git a/Libraries/LibWeb/WebIDL/Promise.h b/Libraries/LibWeb/WebIDL/Promise.h index 82e2fa30b5d..6fdfbb49799 100644 --- a/Libraries/LibWeb/WebIDL/Promise.h +++ b/Libraries/LibWeb/WebIDL/Promise.h @@ -29,6 +29,7 @@ GC::Ref react_to_promise(Promise const&, GC::Ptr on_fulf GC::Ref upon_fulfillment(Promise const&, GC::Ref); GC::Ref upon_rejection(Promise const&, GC::Ref); void mark_promise_as_handled(Promise const&); +bool is_promise_fulfilled(Promise const&); void wait_for_all(JS::Realm&, Vector> const& promises, Function const&)> success_steps, Function failure_steps); GC::Ref get_promise_for_wait_for_all(JS::Realm&, Vector> const& promises); diff --git a/Tests/LibWeb/Text/expected/wpt-import/compression/compression-bad-chunks.tentative.any.txt b/Tests/LibWeb/Text/expected/wpt-import/compression/compression-bad-chunks.tentative.any.txt index 2c8da675484..8ce0f65441d 100644 --- a/Tests/LibWeb/Text/expected/wpt-import/compression/compression-bad-chunks.tentative.any.txt +++ b/Tests/LibWeb/Text/expected/wpt-import/compression/compression-bad-chunks.tentative.any.txt @@ -1,4 +1,4 @@ -Harness status: Error +Harness status: OK Found 21 tests diff --git a/Tests/LibWeb/Text/expected/wpt-import/encoding/streams/encode-bad-chunks.any.txt b/Tests/LibWeb/Text/expected/wpt-import/encoding/streams/encode-bad-chunks.any.txt index db36b3a6af6..ea1b1e6c0d0 100644 --- a/Tests/LibWeb/Text/expected/wpt-import/encoding/streams/encode-bad-chunks.any.txt +++ b/Tests/LibWeb/Text/expected/wpt-import/encoding/streams/encode-bad-chunks.any.txt @@ -1,4 +1,4 @@ -Harness status: Error +Harness status: OK Found 6 tests diff --git a/Tests/LibWeb/Text/expected/wpt-import/streams/piping/abort.any.txt b/Tests/LibWeb/Text/expected/wpt-import/streams/piping/abort.any.txt new file mode 100644 index 00000000000..9ceba4b0d72 --- /dev/null +++ b/Tests/LibWeb/Text/expected/wpt-import/streams/piping/abort.any.txt @@ -0,0 +1,38 @@ +Harness status: OK + +Found 33 tests + +33 Pass +Pass a signal argument 'null' should cause pipeTo() to reject +Pass a signal argument 'AbortSignal' should cause pipeTo() to reject +Pass a signal argument 'true' should cause pipeTo() to reject +Pass a signal argument '-1' should cause pipeTo() to reject +Pass a signal argument '[object AbortSignal]' should cause pipeTo() to reject +Pass an aborted signal should cause the writable stream to reject with an AbortError +Pass (reason: 'null') all the error objects should be the same object +Pass (reason: 'undefined') all the error objects should be the same object +Pass (reason: 'error1: error1') all the error objects should be the same object +Pass preventCancel should prevent canceling the readable +Pass preventAbort should prevent aborting the readable +Pass preventCancel and preventAbort should prevent canceling the readable and aborting the readable +Pass (reason: 'null') abort should prevent further reads +Pass (reason: 'undefined') abort should prevent further reads +Pass (reason: 'error1: error1') abort should prevent further reads +Pass (reason: 'null') all pending writes should complete on abort +Pass (reason: 'undefined') all pending writes should complete on abort +Pass (reason: 'error1: error1') all pending writes should complete on abort +Pass (reason: 'null') underlyingSource.cancel() should called when abort, even with pending pull +Pass (reason: 'undefined') underlyingSource.cancel() should called when abort, even with pending pull +Pass (reason: 'error1: error1') underlyingSource.cancel() should called when abort, even with pending pull +Pass a rejection from underlyingSource.cancel() should be returned by pipeTo() +Pass a rejection from underlyingSink.abort() should be returned by pipeTo() +Pass a rejection from underlyingSink.abort() should be preferred to one from underlyingSource.cancel() +Pass abort signal takes priority over closed readable +Pass abort signal takes priority over errored readable +Pass abort signal takes priority over closed writable +Pass abort signal takes priority over errored writable +Pass abort should do nothing after the readable is closed +Pass abort should do nothing after the readable is errored +Pass abort should do nothing after the readable is errored, even with pending writes +Pass abort should do nothing after the writable is errored +Pass pipeTo on a teed readable byte stream should only be aborted when both branches are aborted \ No newline at end of file diff --git a/Tests/LibWeb/Text/expected/wpt-import/streams/piping/close-propagation-backward.any.txt b/Tests/LibWeb/Text/expected/wpt-import/streams/piping/close-propagation-backward.any.txt new file mode 100644 index 00000000000..94f417830b1 --- /dev/null +++ b/Tests/LibWeb/Text/expected/wpt-import/streams/piping/close-propagation-backward.any.txt @@ -0,0 +1,21 @@ +Harness status: OK + +Found 16 tests + +16 Pass +Pass Closing must be propagated backward: starts closed; preventCancel omitted; fulfilled cancel promise +Pass Closing must be propagated backward: starts closed; preventCancel omitted; rejected cancel promise +Pass Closing must be propagated backward: starts closed; preventCancel = undefined (falsy); fulfilled cancel promise +Pass Closing must be propagated backward: starts closed; preventCancel = null (falsy); fulfilled cancel promise +Pass Closing must be propagated backward: starts closed; preventCancel = false (falsy); fulfilled cancel promise +Pass Closing must be propagated backward: starts closed; preventCancel = 0 (falsy); fulfilled cancel promise +Pass Closing must be propagated backward: starts closed; preventCancel = -0 (falsy); fulfilled cancel promise +Pass Closing must be propagated backward: starts closed; preventCancel = NaN (falsy); fulfilled cancel promise +Pass Closing must be propagated backward: starts closed; preventCancel = (falsy); fulfilled cancel promise +Pass Closing must be propagated backward: starts closed; preventCancel = true (truthy) +Pass Closing must be propagated backward: starts closed; preventCancel = a (truthy) +Pass Closing must be propagated backward: starts closed; preventCancel = 1 (truthy) +Pass Closing must be propagated backward: starts closed; preventCancel = Symbol() (truthy) +Pass Closing must be propagated backward: starts closed; preventCancel = [object Object] (truthy) +Pass Closing must be propagated backward: starts closed; preventCancel = true, preventAbort = true +Pass Closing must be propagated backward: starts closed; preventCancel = true, preventAbort = true, preventClose = true \ No newline at end of file diff --git a/Tests/LibWeb/Text/expected/wpt-import/streams/piping/close-propagation-forward.any.txt b/Tests/LibWeb/Text/expected/wpt-import/streams/piping/close-propagation-forward.any.txt new file mode 100644 index 00000000000..8911034e974 --- /dev/null +++ b/Tests/LibWeb/Text/expected/wpt-import/streams/piping/close-propagation-forward.any.txt @@ -0,0 +1,35 @@ +Harness status: OK + +Found 30 tests + +30 Pass +Pass Closing must be propagated forward: starts closed; preventClose omitted; fulfilled close promise +Pass Closing must be propagated forward: starts closed; preventClose omitted; rejected close promise +Pass Closing must be propagated forward: starts closed; preventClose = undefined (falsy); fulfilled close promise +Pass Closing must be propagated forward: starts closed; preventClose = null (falsy); fulfilled close promise +Pass Closing must be propagated forward: starts closed; preventClose = false (falsy); fulfilled close promise +Pass Closing must be propagated forward: starts closed; preventClose = 0 (falsy); fulfilled close promise +Pass Closing must be propagated forward: starts closed; preventClose = -0 (falsy); fulfilled close promise +Pass Closing must be propagated forward: starts closed; preventClose = NaN (falsy); fulfilled close promise +Pass Closing must be propagated forward: starts closed; preventClose = (falsy); fulfilled close promise +Pass Closing must be propagated forward: starts closed; preventClose = true (truthy) +Pass Closing must be propagated forward: starts closed; preventClose = a (truthy) +Pass Closing must be propagated forward: starts closed; preventClose = 1 (truthy) +Pass Closing must be propagated forward: starts closed; preventClose = Symbol() (truthy) +Pass Closing must be propagated forward: starts closed; preventClose = [object Object] (truthy) +Pass Closing must be propagated forward: starts closed; preventClose = true, preventAbort = true +Pass Closing must be propagated forward: starts closed; preventClose = true, preventAbort = true, preventCancel = true +Pass Closing must be propagated forward: becomes closed asynchronously; preventClose omitted; fulfilled close promise +Pass Closing must be propagated forward: becomes closed asynchronously; preventClose omitted; rejected close promise +Pass Closing must be propagated forward: becomes closed asynchronously; preventClose = true +Pass Closing must be propagated forward: becomes closed asynchronously; dest never desires chunks; preventClose omitted; fulfilled close promise +Pass Closing must be propagated forward: becomes closed asynchronously; dest never desires chunks; preventClose omitted; rejected close promise +Pass Closing must be propagated forward: becomes closed asynchronously; dest never desires chunks; preventClose = true +Pass Closing must be propagated forward: becomes closed after one chunk; preventClose omitted; fulfilled close promise +Pass Closing must be propagated forward: becomes closed after one chunk; preventClose omitted; rejected close promise +Pass Closing must be propagated forward: becomes closed after one chunk; preventClose = true +Pass Closing must be propagated forward: shutdown must not occur until the final write completes +Pass Closing must be propagated forward: shutdown must not occur until the final write completes; preventClose = true +Pass Closing must be propagated forward: shutdown must not occur until the final write completes; becomes closed after first write +Pass Closing must be propagated forward: shutdown must not occur until the final write completes; becomes closed after first write; preventClose = true +Pass Closing must be propagated forward: erroring the writable while flushing pending writes should error pipeTo \ No newline at end of file diff --git a/Tests/LibWeb/Text/expected/wpt-import/streams/piping/error-propagation-backward.any.txt b/Tests/LibWeb/Text/expected/wpt-import/streams/piping/error-propagation-backward.any.txt new file mode 100644 index 00000000000..47969a0fed1 --- /dev/null +++ b/Tests/LibWeb/Text/expected/wpt-import/streams/piping/error-propagation-backward.any.txt @@ -0,0 +1,40 @@ +Harness status: OK + +Found 35 tests + +35 Pass +Pass Errors must be propagated backward: starts errored; preventCancel omitted; fulfilled cancel promise +Pass Errors must be propagated backward: becomes errored before piping due to write; preventCancel omitted; fulfilled cancel promise +Pass Errors must be propagated backward: becomes errored before piping due to write; preventCancel omitted; rejected cancel promise +Pass Errors must be propagated backward: becomes errored before piping due to write; preventCancel = undefined (falsy); fulfilled cancel promise +Pass Errors must be propagated backward: becomes errored before piping due to write; preventCancel = null (falsy); fulfilled cancel promise +Pass Errors must be propagated backward: becomes errored before piping due to write; preventCancel = false (falsy); fulfilled cancel promise +Pass Errors must be propagated backward: becomes errored before piping due to write; preventCancel = 0 (falsy); fulfilled cancel promise +Pass Errors must be propagated backward: becomes errored before piping due to write; preventCancel = -0 (falsy); fulfilled cancel promise +Pass Errors must be propagated backward: becomes errored before piping due to write; preventCancel = NaN (falsy); fulfilled cancel promise +Pass Errors must be propagated backward: becomes errored before piping due to write; preventCancel = (falsy); fulfilled cancel promise +Pass Errors must be propagated backward: becomes errored before piping due to write; preventCancel = true (truthy) +Pass Errors must be propagated backward: becomes errored before piping due to write; preventCancel = a (truthy) +Pass Errors must be propagated backward: becomes errored before piping due to write; preventCancel = 1 (truthy) +Pass Errors must be propagated backward: becomes errored before piping due to write; preventCancel = Symbol() (truthy) +Pass Errors must be propagated backward: becomes errored before piping due to write; preventCancel = [object Object] (truthy) +Pass Errors must be propagated backward: becomes errored before piping due to write, preventCancel = true; preventAbort = true +Pass Errors must be propagated backward: becomes errored before piping due to write; preventCancel = true, preventAbort = true, preventClose = true +Pass Errors must be propagated backward: becomes errored during piping due to write; preventCancel omitted; fulfilled cancel promise +Pass Errors must be propagated backward: becomes errored during piping due to write; preventCancel omitted; rejected cancel promise +Pass Errors must be propagated backward: becomes errored during piping due to write; preventCancel = true +Pass Errors must be propagated backward: becomes errored during piping due to write, but async; preventCancel = false; fulfilled cancel promise +Pass Errors must be propagated backward: becomes errored during piping due to write, but async; preventCancel = false; rejected cancel promise +Pass Errors must be propagated backward: becomes errored during piping due to write, but async; preventCancel = true +Pass Errors must be propagated backward: becomes errored after piping; preventCancel omitted; fulfilled cancel promise +Pass Errors must be propagated backward: becomes errored after piping; preventCancel omitted; rejected cancel promise +Pass Errors must be propagated backward: becomes errored after piping; preventCancel = true +Pass Errors must be propagated backward: becomes errored after piping due to last write; source is closed; preventCancel omitted (but cancel is never called) +Pass Errors must be propagated backward: becomes errored after piping due to last write; source is closed; preventCancel = true +Pass Errors must be propagated backward: becomes errored after piping; dest never desires chunks; preventCancel = false; fulfilled cancel promise +Pass Errors must be propagated backward: becomes errored after piping; dest never desires chunks; preventCancel = false; rejected cancel promise +Pass Errors must be propagated backward: becomes errored after piping; dest never desires chunks; preventCancel = true +Pass Errors must be propagated backward: becomes errored before piping via abort; preventCancel omitted; fulfilled cancel promise +Pass Errors must be propagated backward: becomes errored before piping via abort; preventCancel omitted; rejected cancel promise +Pass Errors must be propagated backward: becomes errored before piping via abort; preventCancel = true +Pass Errors must be propagated backward: erroring via the controller errors once pending write completes \ No newline at end of file diff --git a/Tests/LibWeb/Text/expected/wpt-import/streams/piping/error-propagation-forward.any.txt b/Tests/LibWeb/Text/expected/wpt-import/streams/piping/error-propagation-forward.any.txt new file mode 100644 index 00000000000..1406d4298aa --- /dev/null +++ b/Tests/LibWeb/Text/expected/wpt-import/streams/piping/error-propagation-forward.any.txt @@ -0,0 +1,37 @@ +Harness status: OK + +Found 32 tests + +32 Pass +Pass Errors must be propagated forward: starts errored; preventAbort = false; fulfilled abort promise +Pass Errors must be propagated forward: starts errored; preventAbort = false; rejected abort promise +Pass Errors must be propagated forward: starts errored; preventAbort = undefined (falsy); fulfilled abort promise +Pass Errors must be propagated forward: starts errored; preventAbort = null (falsy); fulfilled abort promise +Pass Errors must be propagated forward: starts errored; preventAbort = false (falsy); fulfilled abort promise +Pass Errors must be propagated forward: starts errored; preventAbort = 0 (falsy); fulfilled abort promise +Pass Errors must be propagated forward: starts errored; preventAbort = -0 (falsy); fulfilled abort promise +Pass Errors must be propagated forward: starts errored; preventAbort = NaN (falsy); fulfilled abort promise +Pass Errors must be propagated forward: starts errored; preventAbort = (falsy); fulfilled abort promise +Pass Errors must be propagated forward: starts errored; preventAbort = true (truthy) +Pass Errors must be propagated forward: starts errored; preventAbort = a (truthy) +Pass Errors must be propagated forward: starts errored; preventAbort = 1 (truthy) +Pass Errors must be propagated forward: starts errored; preventAbort = Symbol() (truthy) +Pass Errors must be propagated forward: starts errored; preventAbort = [object Object] (truthy) +Pass Errors must be propagated forward: starts errored; preventAbort = true, preventCancel = true +Pass Errors must be propagated forward: starts errored; preventAbort = true, preventCancel = true, preventClose = true +Pass Errors must be propagated forward: becomes errored while empty; preventAbort = false; fulfilled abort promise +Pass Errors must be propagated forward: becomes errored while empty; preventAbort = false; rejected abort promise +Pass Errors must be propagated forward: becomes errored while empty; preventAbort = true +Pass Errors must be propagated forward: becomes errored while empty; dest never desires chunks; preventAbort = false; fulfilled abort promise +Pass Errors must be propagated forward: becomes errored while empty; dest never desires chunks; preventAbort = false; rejected abort promise +Pass Errors must be propagated forward: becomes errored while empty; dest never desires chunks; preventAbort = true +Pass Errors must be propagated forward: becomes errored after one chunk; preventAbort = false; fulfilled abort promise +Pass Errors must be propagated forward: becomes errored after one chunk; preventAbort = false; rejected abort promise +Pass Errors must be propagated forward: becomes errored after one chunk; preventAbort = true +Pass Errors must be propagated forward: becomes errored after one chunk; dest never desires chunks; preventAbort = false; fulfilled abort promise +Pass Errors must be propagated forward: becomes errored after one chunk; dest never desires chunks; preventAbort = false; rejected abort promise +Pass Errors must be propagated forward: becomes errored after one chunk; dest never desires chunks; preventAbort = true +Pass Errors must be propagated forward: shutdown must not occur until the final write completes +Pass Errors must be propagated forward: shutdown must not occur until the final write completes; preventAbort = true +Pass Errors must be propagated forward: shutdown must not occur until the final write completes; becomes errored after first write +Pass Errors must be propagated forward: shutdown must not occur until the final write completes; becomes errored after first write; preventAbort = true \ No newline at end of file diff --git a/Tests/LibWeb/Text/expected/wpt-import/streams/piping/flow-control.any.txt b/Tests/LibWeb/Text/expected/wpt-import/streams/piping/flow-control.any.txt new file mode 100644 index 00000000000..eaa579cda26 --- /dev/null +++ b/Tests/LibWeb/Text/expected/wpt-import/streams/piping/flow-control.any.txt @@ -0,0 +1,10 @@ +Harness status: OK + +Found 5 tests + +5 Pass +Pass Piping from a non-empty ReadableStream into a WritableStream that does not desire chunks +Pass Piping from a non-empty ReadableStream into a WritableStream that does not desire chunks, but then does +Pass Piping from an empty ReadableStream into a WritableStream that does not desire chunks, but then the readable stream becomes non-empty and the writable stream starts desiring chunks +Pass Piping from a ReadableStream to a WritableStream that desires more chunks before finishing with previous ones +Pass Piping to a WritableStream that does not consume the writes fast enough exerts backpressure on the ReadableStream \ No newline at end of file diff --git a/Tests/LibWeb/Text/expected/wpt-import/streams/piping/general.any.txt b/Tests/LibWeb/Text/expected/wpt-import/streams/piping/general.any.txt new file mode 100644 index 00000000000..8baf203c193 --- /dev/null +++ b/Tests/LibWeb/Text/expected/wpt-import/streams/piping/general.any.txt @@ -0,0 +1,19 @@ +Harness status: OK + +Found 14 tests + +14 Pass +Pass Piping must lock both the ReadableStream and WritableStream +Pass Piping finishing must unlock both the ReadableStream and WritableStream +Pass pipeTo must check the brand of its ReadableStream this value +Pass pipeTo must check the brand of its WritableStream argument +Pass pipeTo must fail if the ReadableStream is locked, and not lock the WritableStream +Pass pipeTo must fail if the WritableStream is locked, and not lock the ReadableStream +Pass Piping from a ReadableStream from which lots of chunks are synchronously readable +Pass Piping from a ReadableStream for which a chunk becomes asynchronously readable after the pipeTo +Pass an undefined rejection from pull should cause pipeTo() to reject when preventAbort is true +Pass an undefined rejection from pull should cause pipeTo() to reject when preventAbort is false +Pass an undefined rejection from write should cause pipeTo() to reject when preventCancel is true +Pass an undefined rejection from write should cause pipeTo() to reject when preventCancel is false +Pass pipeTo() should reject if an option getter grabs a writer +Pass pipeTo() promise should resolve if null is passed \ No newline at end of file diff --git a/Tests/LibWeb/Text/expected/wpt-import/streams/piping/multiple-propagation.any.txt b/Tests/LibWeb/Text/expected/wpt-import/streams/piping/multiple-propagation.any.txt new file mode 100644 index 00000000000..212f4762aa8 --- /dev/null +++ b/Tests/LibWeb/Text/expected/wpt-import/streams/piping/multiple-propagation.any.txt @@ -0,0 +1,14 @@ +Harness status: OK + +Found 9 tests + +9 Pass +Pass Piping from an errored readable stream to an erroring writable stream +Pass Piping from an errored readable stream to an errored writable stream +Pass Piping from an errored readable stream to an erroring writable stream; preventAbort = true +Pass Piping from an errored readable stream to an errored writable stream; preventAbort = true +Pass Piping from an errored readable stream to a closing writable stream +Pass Piping from an errored readable stream to a closed writable stream +Pass Piping from a closed readable stream to an erroring writable stream +Pass Piping from a closed readable stream to an errored writable stream +Pass Piping from a closed readable stream to a closed writable stream \ No newline at end of file diff --git a/Tests/LibWeb/Text/expected/wpt-import/streams/readable-byte-streams/tee.any.txt b/Tests/LibWeb/Text/expected/wpt-import/streams/readable-byte-streams/tee.any.txt index ea4f794f5cf..d0d6f306af7 100644 --- a/Tests/LibWeb/Text/expected/wpt-import/streams/readable-byte-streams/tee.any.txt +++ b/Tests/LibWeb/Text/expected/wpt-import/streams/readable-byte-streams/tee.any.txt @@ -1,4 +1,4 @@ -Harness status: Error +Harness status: OK Found 39 tests diff --git a/Tests/LibWeb/Text/expected/wpt-import/streams/writable-streams/aborting.any.txt b/Tests/LibWeb/Text/expected/wpt-import/streams/writable-streams/aborting.any.txt index 3d42dd06767..db9690d7e18 100644 --- a/Tests/LibWeb/Text/expected/wpt-import/streams/writable-streams/aborting.any.txt +++ b/Tests/LibWeb/Text/expected/wpt-import/streams/writable-streams/aborting.any.txt @@ -1,4 +1,4 @@ -Harness status: Error +Harness status: OK Found 62 tests diff --git a/Tests/LibWeb/Text/input/wpt-import/streams/piping/abort.any.html b/Tests/LibWeb/Text/input/wpt-import/streams/piping/abort.any.html new file mode 100644 index 00000000000..f98677456d1 --- /dev/null +++ b/Tests/LibWeb/Text/input/wpt-import/streams/piping/abort.any.html @@ -0,0 +1,16 @@ + + + + + + + + +
+ diff --git a/Tests/LibWeb/Text/input/wpt-import/streams/piping/abort.any.js b/Tests/LibWeb/Text/input/wpt-import/streams/piping/abort.any.js new file mode 100644 index 00000000000..e813b017769 --- /dev/null +++ b/Tests/LibWeb/Text/input/wpt-import/streams/piping/abort.any.js @@ -0,0 +1,448 @@ +// META: global=window,worker,shadowrealm +// META: script=../resources/recording-streams.js +// META: script=../resources/test-utils.js +'use strict'; + +// Tests for the use of pipeTo with AbortSignal. +// There is some extra complexity to avoid timeouts in environments where abort is not implemented. + +const error1 = new Error('error1'); +error1.name = 'error1'; +const error2 = new Error('error2'); +error2.name = 'error2'; + +const errorOnPull = { + pull(controller) { + // This will cause the test to error if pipeTo abort is not implemented. + controller.error('failed to abort'); + } +}; + +// To stop pull() being called immediately when the stream is created, we need to set highWaterMark to 0. +const hwm0 = { highWaterMark: 0 }; + +for (const invalidSignal of [null, 'AbortSignal', true, -1, Object.create(AbortSignal.prototype)]) { + promise_test(t => { + const rs = recordingReadableStream(errorOnPull, hwm0); + const ws = recordingWritableStream(); + return promise_rejects_js(t, TypeError, rs.pipeTo(ws, { signal: invalidSignal }), 'pipeTo should reject') + .then(() => { + assert_equals(rs.events.length, 0, 'no ReadableStream methods should have been called'); + assert_equals(ws.events.length, 0, 'no WritableStream methods should have been called'); + }); + }, `a signal argument '${invalidSignal}' should cause pipeTo() to reject`); +} + +promise_test(t => { + const rs = recordingReadableStream(errorOnPull, hwm0); + const ws = new WritableStream(); + const abortController = new AbortController(); + const signal = abortController.signal; + abortController.abort(); + return promise_rejects_dom(t, 'AbortError', rs.pipeTo(ws, { signal }), 'pipeTo should reject') + .then(() => Promise.all([ + rs.getReader().closed, + promise_rejects_dom(t, 'AbortError', ws.getWriter().closed, 'writer.closed should reject') + ])) + .then(() => { + assert_equals(rs.events.length, 2, 'cancel should have been called'); + assert_equals(rs.events[0], 'cancel', 'first event should be cancel'); + assert_equals(rs.events[1].name, 'AbortError', 'the argument to cancel should be an AbortError'); + assert_equals(rs.events[1].constructor.name, 'DOMException', + 'the argument to cancel should be a DOMException'); + }); +}, 'an aborted signal should cause the writable stream to reject with an AbortError'); + +for (const reason of [null, undefined, error1]) { + promise_test(async t => { + const rs = recordingReadableStream(errorOnPull, hwm0); + const ws = new WritableStream(); + const abortController = new AbortController(); + const signal = abortController.signal; + abortController.abort(reason); + const pipeToPromise = rs.pipeTo(ws, { signal }); + if (reason !== undefined) { + await promise_rejects_exactly(t, reason, pipeToPromise, 'pipeTo rejects with abort reason'); + } else { + await promise_rejects_dom(t, 'AbortError', pipeToPromise, 'pipeTo rejects with AbortError'); + } + const error = await pipeToPromise.catch(e => e); + await rs.getReader().closed; + await promise_rejects_exactly(t, error, ws.getWriter().closed, 'the writable should be errored with the same object'); + assert_equals(signal.reason, error, 'signal.reason should be error'), + assert_equals(rs.events.length, 2, 'cancel should have been called'); + assert_equals(rs.events[0], 'cancel', 'first event should be cancel'); + assert_equals(rs.events[1], error, 'the readable should be canceled with the same object'); + }, `(reason: '${reason}') all the error objects should be the same object`); +} + +promise_test(t => { + const rs = recordingReadableStream(errorOnPull, hwm0); + const ws = new WritableStream(); + const abortController = new AbortController(); + const signal = abortController.signal; + abortController.abort(); + return promise_rejects_dom(t, 'AbortError', rs.pipeTo(ws, { signal, preventCancel: true }), 'pipeTo should reject') + .then(() => assert_equals(rs.events.length, 0, 'cancel should not be called')); +}, 'preventCancel should prevent canceling the readable'); + +promise_test(t => { + const rs = new ReadableStream(errorOnPull, hwm0); + const ws = recordingWritableStream(); + const abortController = new AbortController(); + const signal = abortController.signal; + abortController.abort(); + return promise_rejects_dom(t, 'AbortError', rs.pipeTo(ws, { signal, preventAbort: true }), 'pipeTo should reject') + .then(() => { + assert_equals(ws.events.length, 0, 'writable should not have been aborted'); + return ws.getWriter().ready; + }); +}, 'preventAbort should prevent aborting the readable'); + +promise_test(t => { + const rs = recordingReadableStream(errorOnPull, hwm0); + const ws = recordingWritableStream(); + const abortController = new AbortController(); + const signal = abortController.signal; + abortController.abort(); + return promise_rejects_dom(t, 'AbortError', rs.pipeTo(ws, { signal, preventCancel: true, preventAbort: true }), + 'pipeTo should reject') + .then(() => { + assert_equals(rs.events.length, 0, 'cancel should not be called'); + assert_equals(ws.events.length, 0, 'writable should not have been aborted'); + return ws.getWriter().ready; + }); +}, 'preventCancel and preventAbort should prevent canceling the readable and aborting the readable'); + +for (const reason of [null, undefined, error1]) { + promise_test(async t => { + const rs = new ReadableStream({ + start(controller) { + controller.enqueue('a'); + controller.enqueue('b'); + controller.close(); + } + }); + const abortController = new AbortController(); + const signal = abortController.signal; + const ws = recordingWritableStream({ + write() { + abortController.abort(reason); + } + }); + const pipeToPromise = rs.pipeTo(ws, { signal }); + if (reason !== undefined) { + await promise_rejects_exactly(t, reason, pipeToPromise, 'pipeTo rejects with abort reason'); + } else { + await promise_rejects_dom(t, 'AbortError', pipeToPromise, 'pipeTo rejects with AbortError'); + } + const error = await pipeToPromise.catch(e => e); + assert_equals(signal.reason, error, 'signal.reason should be error'); + assert_equals(ws.events.length, 4, 'only chunk "a" should have been written'); + assert_array_equals(ws.events.slice(0, 3), ['write', 'a', 'abort'], 'events should match'); + assert_equals(ws.events[3], error, 'abort reason should be error'); + }, `(reason: '${reason}') abort should prevent further reads`); +} + +for (const reason of [null, undefined, error1]) { + promise_test(async t => { + let readController; + const rs = new ReadableStream({ + start(c) { + readController = c; + c.enqueue('a'); + c.enqueue('b'); + } + }); + const abortController = new AbortController(); + const signal = abortController.signal; + let resolveWrite; + const writePromise = new Promise(resolve => { + resolveWrite = resolve; + }); + const ws = recordingWritableStream({ + write() { + return writePromise; + } + }, new CountQueuingStrategy({ highWaterMark: Infinity })); + const pipeToPromise = rs.pipeTo(ws, { signal }); + await delay(0); + await abortController.abort(reason); + await readController.close(); // Make sure the test terminates when signal is not implemented. + await resolveWrite(); + if (reason !== undefined) { + await promise_rejects_exactly(t, reason, pipeToPromise, 'pipeTo rejects with abort reason'); + } else { + await promise_rejects_dom(t, 'AbortError', pipeToPromise, 'pipeTo rejects with AbortError'); + } + const error = await pipeToPromise.catch(e => e); + assert_equals(signal.reason, error, 'signal.reason should be error'); + assert_equals(ws.events.length, 6, 'chunks "a" and "b" should have been written'); + assert_array_equals(ws.events.slice(0, 5), ['write', 'a', 'write', 'b', 'abort'], 'events should match'); + assert_equals(ws.events[5], error, 'abort reason should be error'); + }, `(reason: '${reason}') all pending writes should complete on abort`); +} + +for (const reason of [null, undefined, error1]) { + promise_test(async t => { + let rejectPull; + const pullPromise = new Promise((_, reject) => { + rejectPull = reject; + }); + let rejectCancel; + const cancelPromise = new Promise((_, reject) => { + rejectCancel = reject; + }); + const rs = recordingReadableStream({ + async pull() { + await Promise.race([ + pullPromise, + cancelPromise, + ]); + }, + cancel(reason) { + rejectCancel(reason); + }, + }); + const ws = new WritableStream(); + const abortController = new AbortController(); + const signal = abortController.signal; + const pipeToPromise = rs.pipeTo(ws, { signal }); + pipeToPromise.catch(() => {}); // Prevent unhandled rejection. + await delay(0); + abortController.abort(reason); + rejectPull('should not catch pull rejection'); + await delay(0); + assert_equals(rs.eventsWithoutPulls.length, 2, 'cancel should have been called'); + assert_equals(rs.eventsWithoutPulls[0], 'cancel', 'first event should be cancel'); + if (reason !== undefined) { + await promise_rejects_exactly(t, reason, pipeToPromise, 'pipeTo rejects with abort reason'); + } else { + await promise_rejects_dom(t, 'AbortError', pipeToPromise, 'pipeTo rejects with AbortError'); + } + }, `(reason: '${reason}') underlyingSource.cancel() should called when abort, even with pending pull`); +} + +promise_test(t => { + const rs = new ReadableStream({ + pull(controller) { + controller.error('failed to abort'); + }, + cancel() { + return Promise.reject(error1); + } + }, hwm0); + const ws = new WritableStream(); + const abortController = new AbortController(); + const signal = abortController.signal; + abortController.abort(); + return promise_rejects_exactly(t, error1, rs.pipeTo(ws, { signal }), 'pipeTo should reject'); +}, 'a rejection from underlyingSource.cancel() should be returned by pipeTo()'); + +promise_test(t => { + const rs = new ReadableStream(errorOnPull, hwm0); + const ws = new WritableStream({ + abort() { + return Promise.reject(error1); + } + }); + const abortController = new AbortController(); + const signal = abortController.signal; + abortController.abort(); + return promise_rejects_exactly(t, error1, rs.pipeTo(ws, { signal }), 'pipeTo should reject'); +}, 'a rejection from underlyingSink.abort() should be returned by pipeTo()'); + +promise_test(t => { + const events = []; + const rs = new ReadableStream({ + pull(controller) { + controller.error('failed to abort'); + }, + cancel() { + events.push('cancel'); + return Promise.reject(error1); + } + }, hwm0); + const ws = new WritableStream({ + abort() { + events.push('abort'); + return Promise.reject(error2); + } + }); + const abortController = new AbortController(); + const signal = abortController.signal; + abortController.abort(); + return promise_rejects_exactly(t, error2, rs.pipeTo(ws, { signal }), 'pipeTo should reject') + .then(() => assert_array_equals(events, ['abort', 'cancel'], 'abort() should be called before cancel()')); +}, 'a rejection from underlyingSink.abort() should be preferred to one from underlyingSource.cancel()'); + +promise_test(t => { + const rs = new ReadableStream({ + start(controller) { + controller.close(); + } + }); + const ws = new WritableStream(); + const abortController = new AbortController(); + const signal = abortController.signal; + abortController.abort(); + return promise_rejects_dom(t, 'AbortError', rs.pipeTo(ws, { signal }), 'pipeTo should reject'); +}, 'abort signal takes priority over closed readable'); + +promise_test(t => { + const rs = new ReadableStream({ + start(controller) { + controller.error(error1); + } + }); + const ws = new WritableStream(); + const abortController = new AbortController(); + const signal = abortController.signal; + abortController.abort(); + return promise_rejects_dom(t, 'AbortError', rs.pipeTo(ws, { signal }), 'pipeTo should reject'); +}, 'abort signal takes priority over errored readable'); + +promise_test(t => { + const rs = new ReadableStream({ + pull(controller) { + controller.error('failed to abort'); + } + }, hwm0); + const ws = new WritableStream(); + const abortController = new AbortController(); + const signal = abortController.signal; + abortController.abort(); + const writer = ws.getWriter(); + return writer.close().then(() => { + writer.releaseLock(); + return promise_rejects_dom(t, 'AbortError', rs.pipeTo(ws, { signal }), 'pipeTo should reject'); + }); +}, 'abort signal takes priority over closed writable'); + +promise_test(t => { + const rs = new ReadableStream({ + pull(controller) { + controller.error('failed to abort'); + } + }, hwm0); + const ws = new WritableStream({ + start(controller) { + controller.error(error1); + } + }); + const abortController = new AbortController(); + const signal = abortController.signal; + abortController.abort(); + return promise_rejects_dom(t, 'AbortError', rs.pipeTo(ws, { signal }), 'pipeTo should reject'); +}, 'abort signal takes priority over errored writable'); + +promise_test(() => { + let readController; + const rs = new ReadableStream({ + start(c) { + readController = c; + } + }); + const ws = new WritableStream(); + const abortController = new AbortController(); + const signal = abortController.signal; + const pipeToPromise = rs.pipeTo(ws, { signal, preventClose: true }); + readController.close(); + return Promise.resolve().then(() => { + abortController.abort(); + return pipeToPromise; + }).then(() => ws.getWriter().write('this should succeed')); +}, 'abort should do nothing after the readable is closed'); + +promise_test(t => { + let readController; + const rs = new ReadableStream({ + start(c) { + readController = c; + } + }); + const ws = new WritableStream(); + const abortController = new AbortController(); + const signal = abortController.signal; + const pipeToPromise = rs.pipeTo(ws, { signal, preventAbort: true }); + readController.error(error1); + return Promise.resolve().then(() => { + abortController.abort(); + return promise_rejects_exactly(t, error1, pipeToPromise, 'pipeTo should reject'); + }).then(() => ws.getWriter().write('this should succeed')); +}, 'abort should do nothing after the readable is errored'); + +promise_test(t => { + let readController; + const rs = new ReadableStream({ + start(c) { + readController = c; + } + }); + let resolveWrite; + const writePromise = new Promise(resolve => { + resolveWrite = resolve; + }); + const ws = new WritableStream({ + write() { + readController.error(error1); + return writePromise; + } + }); + const abortController = new AbortController(); + const signal = abortController.signal; + const pipeToPromise = rs.pipeTo(ws, { signal, preventAbort: true }); + readController.enqueue('a'); + return delay(0).then(() => { + abortController.abort(); + resolveWrite(); + return promise_rejects_exactly(t, error1, pipeToPromise, 'pipeTo should reject'); + }).then(() => ws.getWriter().write('this should succeed')); +}, 'abort should do nothing after the readable is errored, even with pending writes'); + +promise_test(t => { + const rs = recordingReadableStream({ + pull(controller) { + return delay(0).then(() => controller.close()); + } + }); + let writeController; + const ws = new WritableStream({ + start(c) { + writeController = c; + } + }); + const abortController = new AbortController(); + const signal = abortController.signal; + const pipeToPromise = rs.pipeTo(ws, { signal, preventCancel: true }); + return Promise.resolve().then(() => { + writeController.error(error1); + return Promise.resolve(); + }).then(() => { + abortController.abort(); + return promise_rejects_exactly(t, error1, pipeToPromise, 'pipeTo should reject'); + }).then(() => { + assert_array_equals(rs.events, ['pull'], 'cancel should not have been called'); + }); +}, 'abort should do nothing after the writable is errored'); + +promise_test(async t => { + const rs = new ReadableStream({ + pull(c) { + c.enqueue(new Uint8Array([])); + }, + type: "bytes", + }); + const ws = new WritableStream(); + const [first, second] = rs.tee(); + + let aborted = false; + first.pipeTo(ws, { signal: AbortSignal.abort() }).catch(() => { + aborted = true; + }); + await delay(0); + assert_true(!aborted, "pipeTo should not resolve yet"); + await second.cancel(); + await delay(0); + assert_true(aborted, "pipeTo should be aborted now"); +}, "pipeTo on a teed readable byte stream should only be aborted when both branches are aborted"); diff --git a/Tests/LibWeb/Text/input/wpt-import/streams/piping/close-propagation-backward.any.html b/Tests/LibWeb/Text/input/wpt-import/streams/piping/close-propagation-backward.any.html new file mode 100644 index 00000000000..38e4080afff --- /dev/null +++ b/Tests/LibWeb/Text/input/wpt-import/streams/piping/close-propagation-backward.any.html @@ -0,0 +1,15 @@ + + + + + + + +
+ diff --git a/Tests/LibWeb/Text/input/wpt-import/streams/piping/close-propagation-backward.any.js b/Tests/LibWeb/Text/input/wpt-import/streams/piping/close-propagation-backward.any.js new file mode 100644 index 00000000000..25bd475ed13 --- /dev/null +++ b/Tests/LibWeb/Text/input/wpt-import/streams/piping/close-propagation-backward.any.js @@ -0,0 +1,153 @@ +// META: global=window,worker,shadowrealm +// META: script=../resources/recording-streams.js +'use strict'; + +const error1 = new Error('error1!'); +error1.name = 'error1'; + +promise_test(() => { + + const rs = recordingReadableStream(); + + const ws = recordingWritableStream(); + const writer = ws.getWriter(); + writer.close(); + writer.releaseLock(); + + return rs.pipeTo(ws).then( + () => assert_unreached('the promise must not fulfill'), + err => { + assert_equals(err.name, 'TypeError', 'the promise must reject with a TypeError'); + + assert_array_equals(rs.eventsWithoutPulls, ['cancel', err]); + assert_array_equals(ws.events, ['close']); + + return Promise.all([ + rs.getReader().closed, + ws.getWriter().closed + ]); + } + ); + +}, 'Closing must be propagated backward: starts closed; preventCancel omitted; fulfilled cancel promise'); + +promise_test(t => { + + // Our recording streams do not deal well with errors generated by the system, so give them some help + let recordedError; + const rs = recordingReadableStream({ + cancel(cancelErr) { + recordedError = cancelErr; + throw error1; + } + }); + + const ws = recordingWritableStream(); + const writer = ws.getWriter(); + writer.close(); + writer.releaseLock(); + + return promise_rejects_exactly(t, error1, rs.pipeTo(ws), 'pipeTo must reject with the same error').then(() => { + assert_equals(recordedError.name, 'TypeError', 'the cancel reason must be a TypeError'); + + assert_array_equals(rs.eventsWithoutPulls, ['cancel', recordedError]); + assert_array_equals(ws.events, ['close']); + + return Promise.all([ + rs.getReader().closed, + ws.getWriter().closed + ]); + }); + +}, 'Closing must be propagated backward: starts closed; preventCancel omitted; rejected cancel promise'); + +for (const falsy of [undefined, null, false, +0, -0, NaN, '']) { + const stringVersion = Object.is(falsy, -0) ? '-0' : String(falsy); + + promise_test(() => { + + const rs = recordingReadableStream(); + + const ws = recordingWritableStream(); + const writer = ws.getWriter(); + writer.close(); + writer.releaseLock(); + + return rs.pipeTo(ws, { preventCancel: falsy }).then( + () => assert_unreached('the promise must not fulfill'), + err => { + assert_equals(err.name, 'TypeError', 'the promise must reject with a TypeError'); + + assert_array_equals(rs.eventsWithoutPulls, ['cancel', err]); + assert_array_equals(ws.events, ['close']); + + return Promise.all([ + rs.getReader().closed, + ws.getWriter().closed + ]); + } + ); + + }, `Closing must be propagated backward: starts closed; preventCancel = ${stringVersion} (falsy); fulfilled cancel ` + + `promise`); +} + +for (const truthy of [true, 'a', 1, Symbol(), { }]) { + promise_test(t => { + + const rs = recordingReadableStream(); + + const ws = recordingWritableStream(); + const writer = ws.getWriter(); + writer.close(); + writer.releaseLock(); + + return promise_rejects_js(t, TypeError, rs.pipeTo(ws, { preventCancel: truthy })).then(() => { + assert_array_equals(rs.eventsWithoutPulls, []); + assert_array_equals(ws.events, ['close']); + + return ws.getWriter().closed; + }); + + }, `Closing must be propagated backward: starts closed; preventCancel = ${String(truthy)} (truthy)`); +} + +promise_test(t => { + + const rs = recordingReadableStream(); + + const ws = recordingWritableStream(); + const writer = ws.getWriter(); + writer.close(); + writer.releaseLock(); + + return promise_rejects_js(t, TypeError, rs.pipeTo(ws, { preventCancel: true, preventAbort: true })) + .then(() => { + assert_array_equals(rs.eventsWithoutPulls, []); + assert_array_equals(ws.events, ['close']); + + return ws.getWriter().closed; + }); + +}, 'Closing must be propagated backward: starts closed; preventCancel = true, preventAbort = true'); + +promise_test(t => { + + const rs = recordingReadableStream(); + + const ws = recordingWritableStream(); + const writer = ws.getWriter(); + writer.close(); + writer.releaseLock(); + + return promise_rejects_js(t, TypeError, + rs.pipeTo(ws, { preventCancel: true, preventAbort: true, preventClose: true })) + .then(() => { + assert_array_equals(rs.eventsWithoutPulls, []); + assert_array_equals(ws.events, ['close']); + + return ws.getWriter().closed; + }); + +}, 'Closing must be propagated backward: starts closed; preventCancel = true, preventAbort = true, preventClose ' + + '= true'); diff --git a/Tests/LibWeb/Text/input/wpt-import/streams/piping/close-propagation-forward.any.html b/Tests/LibWeb/Text/input/wpt-import/streams/piping/close-propagation-forward.any.html new file mode 100644 index 00000000000..eea048b3cac --- /dev/null +++ b/Tests/LibWeb/Text/input/wpt-import/streams/piping/close-propagation-forward.any.html @@ -0,0 +1,16 @@ + + + + + + + + +
+ diff --git a/Tests/LibWeb/Text/input/wpt-import/streams/piping/close-propagation-forward.any.js b/Tests/LibWeb/Text/input/wpt-import/streams/piping/close-propagation-forward.any.js new file mode 100644 index 00000000000..0ec94f80abf --- /dev/null +++ b/Tests/LibWeb/Text/input/wpt-import/streams/piping/close-propagation-forward.any.js @@ -0,0 +1,589 @@ +// META: global=window,worker,shadowrealm +// META: script=../resources/test-utils.js +// META: script=../resources/recording-streams.js +'use strict'; + +const error1 = new Error('error1!'); +error1.name = 'error1'; + +promise_test(() => { + + const rs = recordingReadableStream({ + start(controller) { + controller.close(); + } + }); + + const ws = recordingWritableStream(); + + return rs.pipeTo(ws).then(value => { + assert_equals(value, undefined, 'the promise must fulfill with undefined'); + }) + .then(() => { + assert_array_equals(rs.events, []); + assert_array_equals(ws.events, ['close']); + + return Promise.all([ + rs.getReader().closed, + ws.getWriter().closed + ]); + }); + +}, 'Closing must be propagated forward: starts closed; preventClose omitted; fulfilled close promise'); + +promise_test(t => { + + const rs = recordingReadableStream({ + start(controller) { + controller.close(); + } + }); + + const ws = recordingWritableStream({ + close() { + throw error1; + } + }); + + return promise_rejects_exactly(t, error1, rs.pipeTo(ws), 'pipeTo must reject with the same error').then(() => { + assert_array_equals(rs.events, []); + assert_array_equals(ws.events, ['close']); + + return Promise.all([ + rs.getReader().closed, + promise_rejects_exactly(t, error1, ws.getWriter().closed) + ]); + }); + +}, 'Closing must be propagated forward: starts closed; preventClose omitted; rejected close promise'); + +for (const falsy of [undefined, null, false, +0, -0, NaN, '']) { + const stringVersion = Object.is(falsy, -0) ? '-0' : String(falsy); + + promise_test(() => { + + const rs = recordingReadableStream({ + start(controller) { + controller.close(); + } + }); + + const ws = recordingWritableStream(); + + return rs.pipeTo(ws, { preventClose: falsy }).then(value => { + assert_equals(value, undefined, 'the promise must fulfill with undefined'); + }) + .then(() => { + assert_array_equals(rs.events, []); + assert_array_equals(ws.events, ['close']); + + return Promise.all([ + rs.getReader().closed, + ws.getWriter().closed + ]); + }); + + }, `Closing must be propagated forward: starts closed; preventClose = ${stringVersion} (falsy); fulfilled close ` + + `promise`); +} + +for (const truthy of [true, 'a', 1, Symbol(), { }]) { + promise_test(() => { + + const rs = recordingReadableStream({ + start(controller) { + controller.close(); + } + }); + + const ws = recordingWritableStream(); + + return rs.pipeTo(ws, { preventClose: truthy }).then(value => { + assert_equals(value, undefined, 'the promise must fulfill with undefined'); + }) + .then(() => { + assert_array_equals(rs.events, []); + assert_array_equals(ws.events, []); + + return rs.getReader().closed; + }); + + }, `Closing must be propagated forward: starts closed; preventClose = ${String(truthy)} (truthy)`); +} + +promise_test(() => { + + const rs = recordingReadableStream({ + start(controller) { + controller.close(); + } + }); + + const ws = recordingWritableStream(); + + return rs.pipeTo(ws, { preventClose: true, preventAbort: true }).then(value => { + assert_equals(value, undefined, 'the promise must fulfill with undefined'); + }) + .then(() => { + assert_array_equals(rs.events, []); + assert_array_equals(ws.events, []); + + return rs.getReader().closed; + }); + +}, 'Closing must be propagated forward: starts closed; preventClose = true, preventAbort = true'); + +promise_test(() => { + + const rs = recordingReadableStream({ + start(controller) { + controller.close(); + } + }); + + const ws = recordingWritableStream(); + + return rs.pipeTo(ws, { preventClose: true, preventAbort: true, preventCancel: true }).then(value => { + assert_equals(value, undefined, 'the promise must fulfill with undefined'); + }) + .then(() => { + assert_array_equals(rs.events, []); + assert_array_equals(ws.events, []); + + return rs.getReader().closed; + }); + +}, 'Closing must be propagated forward: starts closed; preventClose = true, preventAbort = true, preventCancel = true'); + +promise_test(t => { + + const rs = recordingReadableStream(); + + const ws = recordingWritableStream(); + + const pipePromise = rs.pipeTo(ws); + + t.step_timeout(() => rs.controller.close()); + + return pipePromise.then(value => { + assert_equals(value, undefined, 'the promise must fulfill with undefined'); + }) + .then(() => { + assert_array_equals(rs.eventsWithoutPulls, []); + assert_array_equals(ws.events, ['close']); + + return Promise.all([ + rs.getReader().closed, + ws.getWriter().closed + ]); + }); + +}, 'Closing must be propagated forward: becomes closed asynchronously; preventClose omitted; fulfilled close promise'); + +promise_test(t => { + + const rs = recordingReadableStream(); + + const ws = recordingWritableStream({ + close() { + throw error1; + } + }); + + const pipePromise = promise_rejects_exactly(t, error1, rs.pipeTo(ws), 'pipeTo must reject with the same error'); + + t.step_timeout(() => rs.controller.close()); + + return pipePromise.then(() => { + assert_array_equals(rs.eventsWithoutPulls, []); + assert_array_equals(ws.events, ['close']); + + return Promise.all([ + rs.getReader().closed, + promise_rejects_exactly(t, error1, ws.getWriter().closed) + ]); + }); + +}, 'Closing must be propagated forward: becomes closed asynchronously; preventClose omitted; rejected close promise'); + +promise_test(t => { + + const rs = recordingReadableStream(); + + const ws = recordingWritableStream(); + + const pipePromise = rs.pipeTo(ws, { preventClose: true }); + + t.step_timeout(() => rs.controller.close()); + + return pipePromise.then(value => { + assert_equals(value, undefined, 'the promise must fulfill with undefined'); + }) + .then(() => { + assert_array_equals(rs.eventsWithoutPulls, []); + assert_array_equals(ws.events, []); + + return rs.getReader().closed; + }); + +}, 'Closing must be propagated forward: becomes closed asynchronously; preventClose = true'); + +promise_test(t => { + + const rs = recordingReadableStream(); + + const ws = recordingWritableStream(undefined, new CountQueuingStrategy({ highWaterMark: 0 })); + + const pipePromise = rs.pipeTo(ws); + + t.step_timeout(() => rs.controller.close()); + + return pipePromise.then(value => { + assert_equals(value, undefined, 'the promise must fulfill with undefined'); + }) + .then(() => { + assert_array_equals(rs.eventsWithoutPulls, []); + assert_array_equals(ws.events, ['close']); + + return Promise.all([ + rs.getReader().closed, + ws.getWriter().closed + ]); + }); + +}, 'Closing must be propagated forward: becomes closed asynchronously; dest never desires chunks; ' + + 'preventClose omitted; fulfilled close promise'); + +promise_test(t => { + + const rs = recordingReadableStream(); + + const ws = recordingWritableStream({ + close() { + throw error1; + } + }, new CountQueuingStrategy({ highWaterMark: 0 })); + + const pipePromise = promise_rejects_exactly(t, error1, rs.pipeTo(ws), 'pipeTo must reject with the same error'); + + t.step_timeout(() => rs.controller.close()); + + return pipePromise.then(() => { + assert_array_equals(rs.eventsWithoutPulls, []); + assert_array_equals(ws.events, ['close']); + + return Promise.all([ + rs.getReader().closed, + promise_rejects_exactly(t, error1, ws.getWriter().closed) + ]); + }); + +}, 'Closing must be propagated forward: becomes closed asynchronously; dest never desires chunks; ' + + 'preventClose omitted; rejected close promise'); + +promise_test(t => { + + const rs = recordingReadableStream(); + + const ws = recordingWritableStream(undefined, new CountQueuingStrategy({ highWaterMark: 0 })); + + const pipePromise = rs.pipeTo(ws, { preventClose: true }); + + t.step_timeout(() => rs.controller.close()); + + return pipePromise.then(value => { + assert_equals(value, undefined, 'the promise must fulfill with undefined'); + }) + .then(() => { + assert_array_equals(rs.eventsWithoutPulls, []); + assert_array_equals(ws.events, []); + + return rs.getReader().closed; + }); + +}, 'Closing must be propagated forward: becomes closed asynchronously; dest never desires chunks; ' + + 'preventClose = true'); + +promise_test(t => { + + const rs = recordingReadableStream(); + + const ws = recordingWritableStream(); + + const pipePromise = rs.pipeTo(ws); + + t.step_timeout(() => { + rs.controller.enqueue('Hello'); + t.step_timeout(() => rs.controller.close()); + }, 10); + + return pipePromise.then(value => { + assert_equals(value, undefined, 'the promise must fulfill with undefined'); + }) + .then(() => { + assert_array_equals(rs.eventsWithoutPulls, []); + assert_array_equals(ws.events, ['write', 'Hello', 'close']); + + return Promise.all([ + rs.getReader().closed, + ws.getWriter().closed + ]); + }); + +}, 'Closing must be propagated forward: becomes closed after one chunk; preventClose omitted; fulfilled close promise'); + +promise_test(t => { + + const rs = recordingReadableStream(); + + const ws = recordingWritableStream({ + close() { + throw error1; + } + }); + + const pipePromise = promise_rejects_exactly(t, error1, rs.pipeTo(ws), 'pipeTo must reject with the same error'); + + t.step_timeout(() => { + rs.controller.enqueue('Hello'); + t.step_timeout(() => rs.controller.close()); + }, 10); + + return pipePromise.then(() => { + assert_array_equals(rs.eventsWithoutPulls, []); + assert_array_equals(ws.events, ['write', 'Hello', 'close']); + + return Promise.all([ + rs.getReader().closed, + promise_rejects_exactly(t, error1, ws.getWriter().closed) + ]); + }); + +}, 'Closing must be propagated forward: becomes closed after one chunk; preventClose omitted; rejected close promise'); + +promise_test(t => { + + const rs = recordingReadableStream(); + + const ws = recordingWritableStream(); + + const pipePromise = rs.pipeTo(ws, { preventClose: true }); + + t.step_timeout(() => { + rs.controller.enqueue('Hello'); + t.step_timeout(() => rs.controller.close()); + }, 10); + + return pipePromise.then(value => { + assert_equals(value, undefined, 'the promise must fulfill with undefined'); + }) + .then(() => { + assert_array_equals(rs.eventsWithoutPulls, []); + assert_array_equals(ws.events, ['write', 'Hello']); + + return rs.getReader().closed; + }); + +}, 'Closing must be propagated forward: becomes closed after one chunk; preventClose = true'); + +promise_test(() => { + + const rs = recordingReadableStream(); + + let resolveWritePromise; + const ws = recordingWritableStream({ + write() { + return new Promise(resolve => { + resolveWritePromise = resolve; + }); + } + }); + + let pipeComplete = false; + const pipePromise = rs.pipeTo(ws).then(() => { + pipeComplete = true; + }); + + rs.controller.enqueue('a'); + rs.controller.close(); + + // Flush async events and verify that no shutdown occurs. + return flushAsyncEvents().then(() => { + assert_array_equals(ws.events, ['write', 'a']); // no 'close' + assert_equals(pipeComplete, false, 'the pipe must not be complete'); + + resolveWritePromise(); + + return pipePromise.then(() => { + assert_array_equals(ws.events, ['write', 'a', 'close']); + }); + }); + +}, 'Closing must be propagated forward: shutdown must not occur until the final write completes'); + +promise_test(() => { + + const rs = recordingReadableStream(); + + let resolveWritePromise; + const ws = recordingWritableStream({ + write() { + return new Promise(resolve => { + resolveWritePromise = resolve; + }); + } + }); + + let pipeComplete = false; + const pipePromise = rs.pipeTo(ws, { preventClose: true }).then(() => { + pipeComplete = true; + }); + + rs.controller.enqueue('a'); + rs.controller.close(); + + // Flush async events and verify that no shutdown occurs. + return flushAsyncEvents().then(() => { + assert_array_equals(ws.events, ['write', 'a'], + 'the chunk must have been written, but close must not have happened'); + assert_equals(pipeComplete, false, 'the pipe must not be complete'); + + resolveWritePromise(); + + return pipePromise; + }).then(() => flushAsyncEvents()).then(() => { + assert_array_equals(ws.events, ['write', 'a'], + 'the chunk must have been written, but close must not have happened'); + }); + +}, 'Closing must be propagated forward: shutdown must not occur until the final write completes; preventClose = true'); + +promise_test(() => { + + const rs = recordingReadableStream(); + + let resolveWriteCalled; + const writeCalledPromise = new Promise(resolve => { + resolveWriteCalled = resolve; + }); + + let resolveWritePromise; + const ws = recordingWritableStream({ + write() { + resolveWriteCalled(); + + return new Promise(resolve => { + resolveWritePromise = resolve; + }); + } + }, new CountQueuingStrategy({ highWaterMark: 2 })); + + let pipeComplete = false; + const pipePromise = rs.pipeTo(ws).then(() => { + pipeComplete = true; + }); + + rs.controller.enqueue('a'); + rs.controller.enqueue('b'); + + return writeCalledPromise.then(() => flushAsyncEvents()).then(() => { + assert_array_equals(ws.events, ['write', 'a'], + 'the first chunk must have been written, but close must not have happened yet'); + assert_false(pipeComplete, 'the pipe should not complete while the first write is pending'); + + rs.controller.close(); + resolveWritePromise(); + }).then(() => flushAsyncEvents()).then(() => { + assert_array_equals(ws.events, ['write', 'a', 'write', 'b'], + 'the second chunk must have been written, but close must not have happened yet'); + assert_false(pipeComplete, 'the pipe should not complete while the second write is pending'); + + resolveWritePromise(); + return pipePromise; + }).then(() => { + assert_array_equals(ws.events, ['write', 'a', 'write', 'b', 'close'], + 'all chunks must have been written and close must have happened'); + }); + +}, 'Closing must be propagated forward: shutdown must not occur until the final write completes; becomes closed after first write'); + +promise_test(() => { + + const rs = recordingReadableStream(); + + let resolveWriteCalled; + const writeCalledPromise = new Promise(resolve => { + resolveWriteCalled = resolve; + }); + + let resolveWritePromise; + const ws = recordingWritableStream({ + write() { + resolveWriteCalled(); + + return new Promise(resolve => { + resolveWritePromise = resolve; + }); + } + }, new CountQueuingStrategy({ highWaterMark: 2 })); + + let pipeComplete = false; + const pipePromise = rs.pipeTo(ws, { preventClose: true }).then(() => { + pipeComplete = true; + }); + + rs.controller.enqueue('a'); + rs.controller.enqueue('b'); + + return writeCalledPromise.then(() => flushAsyncEvents()).then(() => { + assert_array_equals(ws.events, ['write', 'a'], + 'the first chunk must have been written, but close must not have happened'); + assert_false(pipeComplete, 'the pipe should not complete while the first write is pending'); + + rs.controller.close(); + resolveWritePromise(); + }).then(() => flushAsyncEvents()).then(() => { + assert_array_equals(ws.events, ['write', 'a', 'write', 'b'], + 'the second chunk must have been written, but close must not have happened'); + assert_false(pipeComplete, 'the pipe should not complete while the second write is pending'); + + resolveWritePromise(); + return pipePromise; + }).then(() => flushAsyncEvents()).then(() => { + assert_array_equals(ws.events, ['write', 'a', 'write', 'b'], + 'all chunks must have been written, but close must not have happened'); + }); + +}, 'Closing must be propagated forward: shutdown must not occur until the final write completes; becomes closed after first write; preventClose = true'); + + +promise_test(t => { + const rs = recordingReadableStream({ + start(c) { + c.enqueue('a'); + c.enqueue('b'); + c.close(); + } + }); + let rejectWritePromise; + const ws = recordingWritableStream({ + write() { + return new Promise((resolve, reject) => { + rejectWritePromise = reject; + }); + } + }, { highWaterMark: 3 }); + const pipeToPromise = rs.pipeTo(ws); + return delay(0).then(() => { + rejectWritePromise(error1); + return promise_rejects_exactly(t, error1, pipeToPromise, 'pipeTo should reject'); + }).then(() => { + assert_array_equals(rs.events, []); + assert_array_equals(ws.events, ['write', 'a']); + + return Promise.all([ + rs.getReader().closed, + promise_rejects_exactly(t, error1, ws.getWriter().closed, 'ws should be errored') + ]); + }); +}, 'Closing must be propagated forward: erroring the writable while flushing pending writes should error pipeTo'); diff --git a/Tests/LibWeb/Text/input/wpt-import/streams/piping/error-propagation-backward.any.html b/Tests/LibWeb/Text/input/wpt-import/streams/piping/error-propagation-backward.any.html new file mode 100644 index 00000000000..aa92999ebf1 --- /dev/null +++ b/Tests/LibWeb/Text/input/wpt-import/streams/piping/error-propagation-backward.any.html @@ -0,0 +1,16 @@ + + + + + + + + +
+ diff --git a/Tests/LibWeb/Text/input/wpt-import/streams/piping/error-propagation-backward.any.js b/Tests/LibWeb/Text/input/wpt-import/streams/piping/error-propagation-backward.any.js new file mode 100644 index 00000000000..f786469d6c1 --- /dev/null +++ b/Tests/LibWeb/Text/input/wpt-import/streams/piping/error-propagation-backward.any.js @@ -0,0 +1,630 @@ +// META: global=window,worker,shadowrealm +// META: script=../resources/test-utils.js +// META: script=../resources/recording-streams.js +'use strict'; + +const error1 = new Error('error1!'); +error1.name = 'error1'; + +const error2 = new Error('error2!'); +error2.name = 'error2'; + +promise_test(t => { + + const rs = recordingReadableStream(); + + const ws = recordingWritableStream({ + start() { + return Promise.reject(error1); + } + }); + + return promise_rejects_exactly(t, error1, rs.pipeTo(ws), 'pipeTo must reject with the same error') + .then(() => { + assert_array_equals(rs.eventsWithoutPulls, ['cancel', error1]); + assert_array_equals(ws.events, []); + }); + +}, 'Errors must be propagated backward: starts errored; preventCancel omitted; fulfilled cancel promise'); + +promise_test(t => { + + const rs = recordingReadableStream(); + + const ws = recordingWritableStream({ + write() { + return Promise.reject(error1); + } + }); + + const writer = ws.getWriter(); + + return promise_rejects_exactly(t, error1, writer.write('Hello'), 'writer.write() must reject with the write error') + .then(() => promise_rejects_exactly(t, error1, writer.closed, 'writer.closed must reject with the write error')) + .then(() => { + writer.releaseLock(); + + return promise_rejects_exactly(t, error1, rs.pipeTo(ws), 'pipeTo must reject with the write error') + .then(() => { + assert_array_equals(rs.eventsWithoutPulls, ['cancel', error1]); + assert_array_equals(ws.events, ['write', 'Hello']); + }); + }); + +}, 'Errors must be propagated backward: becomes errored before piping due to write; preventCancel omitted; ' + + 'fulfilled cancel promise'); + +promise_test(t => { + + const rs = recordingReadableStream({ + cancel() { + throw error2; + } + }); + + const ws = recordingWritableStream({ + write() { + return Promise.reject(error1); + } + }); + + const writer = ws.getWriter(); + + return promise_rejects_exactly(t, error1, writer.write('Hello'), 'writer.write() must reject with the write error') + .then(() => promise_rejects_exactly(t, error1, writer.closed, 'writer.closed must reject with the write error')) + .then(() => { + writer.releaseLock(); + + return promise_rejects_exactly(t, error2, rs.pipeTo(ws), 'pipeTo must reject with the cancel error') + .then(() => { + assert_array_equals(rs.eventsWithoutPulls, ['cancel', error1]); + assert_array_equals(ws.events, ['write', 'Hello']); + }); + }); + +}, 'Errors must be propagated backward: becomes errored before piping due to write; preventCancel omitted; rejected ' + + 'cancel promise'); + +for (const falsy of [undefined, null, false, +0, -0, NaN, '']) { + const stringVersion = Object.is(falsy, -0) ? '-0' : String(falsy); + + promise_test(t => { + + const rs = recordingReadableStream(); + + const ws = recordingWritableStream({ + write() { + return Promise.reject(error1); + } + }); + + const writer = ws.getWriter(); + + return promise_rejects_exactly(t, error1, writer.write('Hello'), 'writer.write() must reject with the write error') + .then(() => promise_rejects_exactly(t, error1, writer.closed, 'writer.closed must reject with the write error')) + .then(() => { + writer.releaseLock(); + + return promise_rejects_exactly(t, error1, rs.pipeTo(ws, { preventCancel: falsy }), + 'pipeTo must reject with the write error') + .then(() => { + assert_array_equals(rs.eventsWithoutPulls, ['cancel', error1]); + assert_array_equals(ws.events, ['write', 'Hello']); + }); + }); + + }, `Errors must be propagated backward: becomes errored before piping due to write; preventCancel = ` + + `${stringVersion} (falsy); fulfilled cancel promise`); +} + +for (const truthy of [true, 'a', 1, Symbol(), { }]) { + promise_test(t => { + + const rs = recordingReadableStream(); + + const ws = recordingWritableStream({ + write() { + return Promise.reject(error1); + } + }); + + const writer = ws.getWriter(); + + return promise_rejects_exactly(t, error1, writer.write('Hello'), 'writer.write() must reject with the write error') + .then(() => promise_rejects_exactly(t, error1, writer.closed, 'writer.closed must reject with the write error')) + .then(() => { + writer.releaseLock(); + + return promise_rejects_exactly(t, error1, rs.pipeTo(ws, { preventCancel: truthy }), + 'pipeTo must reject with the write error') + .then(() => { + assert_array_equals(rs.eventsWithoutPulls, []); + assert_array_equals(ws.events, ['write', 'Hello']); + }); + }); + + }, `Errors must be propagated backward: becomes errored before piping due to write; preventCancel = ` + + `${String(truthy)} (truthy)`); +} + +promise_test(t => { + + const rs = recordingReadableStream(); + + const ws = recordingWritableStream({ + write() { + return Promise.reject(error1); + } + }); + + const writer = ws.getWriter(); + + return promise_rejects_exactly(t, error1, writer.write('Hello'), 'writer.write() must reject with the write error') + .then(() => promise_rejects_exactly(t, error1, writer.closed, 'writer.closed must reject with the write error')) + .then(() => { + writer.releaseLock(); + + return promise_rejects_exactly(t, error1, rs.pipeTo(ws, { preventCancel: true, preventAbort: true }), + 'pipeTo must reject with the write error') + .then(() => { + assert_array_equals(rs.eventsWithoutPulls, []); + assert_array_equals(ws.events, ['write', 'Hello']); + }); + }); + +}, 'Errors must be propagated backward: becomes errored before piping due to write, preventCancel = true; ' + + 'preventAbort = true'); + +promise_test(t => { + + const rs = recordingReadableStream(); + + const ws = recordingWritableStream({ + write() { + return Promise.reject(error1); + } + }); + + const writer = ws.getWriter(); + + return promise_rejects_exactly(t, error1, writer.write('Hello'), 'writer.write() must reject with the write error') + .then(() => promise_rejects_exactly(t, error1, writer.closed, 'writer.closed must reject with the write error')) + .then(() => { + writer.releaseLock(); + + return promise_rejects_exactly(t, error1, rs.pipeTo(ws, { preventCancel: true, preventAbort: true, preventClose: true }), + 'pipeTo must reject with the write error') + .then(() => { + assert_array_equals(rs.eventsWithoutPulls, []); + assert_array_equals(ws.events, ['write', 'Hello']); + }); + }); + +}, 'Errors must be propagated backward: becomes errored before piping due to write; preventCancel = true, ' + + 'preventAbort = true, preventClose = true'); + +promise_test(t => { + + const rs = recordingReadableStream({ + start(controller) { + controller.enqueue('Hello'); + } + }); + + const ws = recordingWritableStream({ + write() { + throw error1; + } + }); + + return promise_rejects_exactly(t, error1, rs.pipeTo(ws), 'pipeTo must reject with the same error').then(() => { + assert_array_equals(rs.eventsWithoutPulls, ['cancel', error1]); + assert_array_equals(ws.events, ['write', 'Hello']); + }); + +}, 'Errors must be propagated backward: becomes errored during piping due to write; preventCancel omitted; fulfilled ' + + 'cancel promise'); + +promise_test(t => { + + const rs = recordingReadableStream({ + start(controller) { + controller.enqueue('Hello'); + }, + cancel() { + throw error2; + } + }); + + const ws = recordingWritableStream({ + write() { + throw error1; + } + }); + + return promise_rejects_exactly(t, error2, rs.pipeTo(ws), 'pipeTo must reject with the cancel error').then(() => { + assert_array_equals(rs.eventsWithoutPulls, ['cancel', error1]); + assert_array_equals(ws.events, ['write', 'Hello']); + }); + +}, 'Errors must be propagated backward: becomes errored during piping due to write; preventCancel omitted; rejected ' + + 'cancel promise'); + +promise_test(t => { + + const rs = recordingReadableStream({ + start(controller) { + controller.enqueue('Hello'); + } + }); + + const ws = recordingWritableStream({ + write() { + throw error1; + } + }); + + return promise_rejects_exactly(t, error1, rs.pipeTo(ws, { preventCancel: true }), 'pipeTo must reject with the same error') + .then(() => { + assert_array_equals(rs.eventsWithoutPulls, []); + assert_array_equals(ws.events, ['write', 'Hello']); + }); + +}, 'Errors must be propagated backward: becomes errored during piping due to write; preventCancel = true'); + +promise_test(t => { + + const rs = recordingReadableStream({ + start(controller) { + controller.enqueue('a'); + controller.enqueue('b'); + controller.enqueue('c'); + } + }); + + const ws = recordingWritableStream({ + write() { + if (ws.events.length > 2) { + return delay(0).then(() => { + throw error1; + }); + } + return undefined; + } + }); + + return promise_rejects_exactly(t, error1, rs.pipeTo(ws), 'pipeTo must reject with the same error').then(() => { + assert_array_equals(rs.eventsWithoutPulls, ['cancel', error1]); + assert_array_equals(ws.events, ['write', 'a', 'write', 'b']); + }); + +}, 'Errors must be propagated backward: becomes errored during piping due to write, but async; preventCancel = ' + + 'false; fulfilled cancel promise'); + +promise_test(t => { + + const rs = recordingReadableStream({ + start(controller) { + controller.enqueue('a'); + controller.enqueue('b'); + controller.enqueue('c'); + }, + cancel() { + throw error2; + } + }); + + const ws = recordingWritableStream({ + write() { + if (ws.events.length > 2) { + return delay(0).then(() => { + throw error1; + }); + } + return undefined; + } + }); + + return promise_rejects_exactly(t, error2, rs.pipeTo(ws), 'pipeTo must reject with the cancel error').then(() => { + assert_array_equals(rs.eventsWithoutPulls, ['cancel', error1]); + assert_array_equals(ws.events, ['write', 'a', 'write', 'b']); + }); + +}, 'Errors must be propagated backward: becomes errored during piping due to write, but async; preventCancel = ' + + 'false; rejected cancel promise'); + +promise_test(t => { + + const rs = recordingReadableStream({ + start(controller) { + controller.enqueue('a'); + controller.enqueue('b'); + controller.enqueue('c'); + } + }); + + const ws = recordingWritableStream({ + write() { + if (ws.events.length > 2) { + return delay(0).then(() => { + throw error1; + }); + } + return undefined; + } + }); + + return promise_rejects_exactly(t, error1, rs.pipeTo(ws, { preventCancel: true }), 'pipeTo must reject with the same error') + .then(() => { + assert_array_equals(rs.eventsWithoutPulls, []); + assert_array_equals(ws.events, ['write', 'a', 'write', 'b']); + }); + +}, 'Errors must be propagated backward: becomes errored during piping due to write, but async; preventCancel = true'); + +promise_test(t => { + + const rs = recordingReadableStream(); + + const ws = recordingWritableStream(); + + const pipePromise = promise_rejects_exactly(t, error1, rs.pipeTo(ws), 'pipeTo must reject with the same error'); + + t.step_timeout(() => ws.controller.error(error1), 10); + + return pipePromise.then(() => { + assert_array_equals(rs.eventsWithoutPulls, ['cancel', error1]); + assert_array_equals(ws.events, []); + }); + +}, 'Errors must be propagated backward: becomes errored after piping; preventCancel omitted; fulfilled cancel promise'); + +promise_test(t => { + + const rs = recordingReadableStream({ + cancel() { + throw error2; + } + }); + + const ws = recordingWritableStream(); + + const pipePromise = promise_rejects_exactly(t, error2, rs.pipeTo(ws), 'pipeTo must reject with the cancel error'); + + t.step_timeout(() => ws.controller.error(error1), 10); + + return pipePromise.then(() => { + assert_array_equals(rs.eventsWithoutPulls, ['cancel', error1]); + assert_array_equals(ws.events, []); + }); + +}, 'Errors must be propagated backward: becomes errored after piping; preventCancel omitted; rejected cancel promise'); + +promise_test(t => { + + const rs = recordingReadableStream(); + + const ws = recordingWritableStream(); + + const pipePromise = promise_rejects_exactly(t, error1, rs.pipeTo(ws, { preventCancel: true }), + 'pipeTo must reject with the same error'); + + t.step_timeout(() => ws.controller.error(error1), 10); + + return pipePromise.then(() => { + assert_array_equals(rs.eventsWithoutPulls, []); + assert_array_equals(ws.events, []); + }); + +}, 'Errors must be propagated backward: becomes errored after piping; preventCancel = true'); + +promise_test(t => { + + const rs = recordingReadableStream({ + start(controller) { + controller.enqueue('a'); + controller.enqueue('b'); + controller.enqueue('c'); + controller.close(); + } + }); + + const ws = recordingWritableStream({ + write(chunk) { + if (chunk === 'c') { + return Promise.reject(error1); + } + return undefined; + } + }); + + return promise_rejects_exactly(t, error1, rs.pipeTo(ws), 'pipeTo must reject with the same error').then(() => { + assert_array_equals(rs.eventsWithoutPulls, []); + assert_array_equals(ws.events, ['write', 'a', 'write', 'b', 'write', 'c']); + }); + +}, 'Errors must be propagated backward: becomes errored after piping due to last write; source is closed; ' + + 'preventCancel omitted (but cancel is never called)'); + +promise_test(t => { + + const rs = recordingReadableStream({ + start(controller) { + controller.enqueue('a'); + controller.enqueue('b'); + controller.enqueue('c'); + controller.close(); + } + }); + + const ws = recordingWritableStream({ + write(chunk) { + if (chunk === 'c') { + return Promise.reject(error1); + } + return undefined; + } + }); + + return promise_rejects_exactly(t, error1, rs.pipeTo(ws, { preventCancel: true }), 'pipeTo must reject with the same error') + .then(() => { + assert_array_equals(rs.eventsWithoutPulls, []); + assert_array_equals(ws.events, ['write', 'a', 'write', 'b', 'write', 'c']); + }); + +}, 'Errors must be propagated backward: becomes errored after piping due to last write; source is closed; ' + + 'preventCancel = true'); + +promise_test(t => { + + const rs = recordingReadableStream(); + + const ws = recordingWritableStream(undefined, new CountQueuingStrategy({ highWaterMark: 0 })); + + const pipePromise = promise_rejects_exactly(t, error1, rs.pipeTo(ws), 'pipeTo must reject with the same error'); + + t.step_timeout(() => ws.controller.error(error1), 10); + + return pipePromise.then(() => { + assert_array_equals(rs.eventsWithoutPulls, ['cancel', error1]); + assert_array_equals(ws.events, []); + }); + +}, 'Errors must be propagated backward: becomes errored after piping; dest never desires chunks; preventCancel = ' + + 'false; fulfilled cancel promise'); + +promise_test(t => { + + const rs = recordingReadableStream({ + cancel() { + throw error2; + } + }); + + const ws = recordingWritableStream(undefined, new CountQueuingStrategy({ highWaterMark: 0 })); + + const pipePromise = promise_rejects_exactly(t, error2, rs.pipeTo(ws), 'pipeTo must reject with the cancel error'); + + t.step_timeout(() => ws.controller.error(error1), 10); + + return pipePromise.then(() => { + assert_array_equals(rs.eventsWithoutPulls, ['cancel', error1]); + assert_array_equals(ws.events, []); + }); + +}, 'Errors must be propagated backward: becomes errored after piping; dest never desires chunks; preventCancel = ' + + 'false; rejected cancel promise'); + +promise_test(t => { + + const rs = recordingReadableStream(); + + const ws = recordingWritableStream(undefined, new CountQueuingStrategy({ highWaterMark: 0 })); + + const pipePromise = promise_rejects_exactly(t, error1, rs.pipeTo(ws, { preventCancel: true }), + 'pipeTo must reject with the same error'); + + t.step_timeout(() => ws.controller.error(error1), 10); + + return pipePromise.then(() => { + assert_array_equals(rs.eventsWithoutPulls, []); + assert_array_equals(ws.events, []); + }); + +}, 'Errors must be propagated backward: becomes errored after piping; dest never desires chunks; preventCancel = ' + + 'true'); + +promise_test(() => { + + const rs = recordingReadableStream(); + + const ws = recordingWritableStream(); + + ws.abort(error1); + + return rs.pipeTo(ws).then( + () => assert_unreached('the promise must not fulfill'), + err => { + assert_equals(err, error1, 'the promise must reject with error1'); + + assert_array_equals(rs.eventsWithoutPulls, ['cancel', err]); + assert_array_equals(ws.events, ['abort', error1]); + } + ); + +}, 'Errors must be propagated backward: becomes errored before piping via abort; preventCancel omitted; fulfilled ' + + 'cancel promise'); + +promise_test(t => { + + const rs = recordingReadableStream({ + cancel() { + throw error2; + } + }); + + const ws = recordingWritableStream(); + + ws.abort(error1); + + return promise_rejects_exactly(t, error2, rs.pipeTo(ws), 'pipeTo must reject with the cancel error') + .then(() => { + return ws.getWriter().closed.then( + () => assert_unreached('the promise must not fulfill'), + err => { + assert_equals(err, error1, 'the promise must reject with error1'); + + assert_array_equals(rs.eventsWithoutPulls, ['cancel', err]); + assert_array_equals(ws.events, ['abort', error1]); + } + ); + }); + +}, 'Errors must be propagated backward: becomes errored before piping via abort; preventCancel omitted; rejected ' + + 'cancel promise'); + +promise_test(t => { + + const rs = recordingReadableStream(); + + const ws = recordingWritableStream(); + + ws.abort(error1); + + return promise_rejects_exactly(t, error1, rs.pipeTo(ws, { preventCancel: true })).then(() => { + assert_array_equals(rs.eventsWithoutPulls, []); + assert_array_equals(ws.events, ['abort', error1]); + }); + +}, 'Errors must be propagated backward: becomes errored before piping via abort; preventCancel = true'); + +promise_test(t => { + + const rs = recordingReadableStream(); + + let resolveWriteCalled; + const writeCalledPromise = new Promise(resolve => { + resolveWriteCalled = resolve; + }); + + const ws = recordingWritableStream({ + write() { + resolveWriteCalled(); + return flushAsyncEvents(); + } + }); + + const pipePromise = rs.pipeTo(ws); + + rs.controller.enqueue('a'); + + return writeCalledPromise.then(() => { + ws.controller.error(error1); + + return promise_rejects_exactly(t, error1, pipePromise); + }).then(() => { + assert_array_equals(rs.eventsWithoutPulls, ['cancel', error1]); + assert_array_equals(ws.events, ['write', 'a']); + }); + +}, 'Errors must be propagated backward: erroring via the controller errors once pending write completes'); diff --git a/Tests/LibWeb/Text/input/wpt-import/streams/piping/error-propagation-forward.any.html b/Tests/LibWeb/Text/input/wpt-import/streams/piping/error-propagation-forward.any.html new file mode 100644 index 00000000000..dcf0443d4c9 --- /dev/null +++ b/Tests/LibWeb/Text/input/wpt-import/streams/piping/error-propagation-forward.any.html @@ -0,0 +1,16 @@ + + + + + + + + +
+ diff --git a/Tests/LibWeb/Text/input/wpt-import/streams/piping/error-propagation-forward.any.js b/Tests/LibWeb/Text/input/wpt-import/streams/piping/error-propagation-forward.any.js new file mode 100644 index 00000000000..e9260f9ea22 --- /dev/null +++ b/Tests/LibWeb/Text/input/wpt-import/streams/piping/error-propagation-forward.any.js @@ -0,0 +1,569 @@ +// META: global=window,worker,shadowrealm +// META: script=../resources/test-utils.js +// META: script=../resources/recording-streams.js +'use strict'; + +const error1 = new Error('error1!'); +error1.name = 'error1'; + +const error2 = new Error('error2!'); +error2.name = 'error2'; + +promise_test(t => { + + const rs = recordingReadableStream({ + start() { + return Promise.reject(error1); + } + }); + + const ws = recordingWritableStream(); + + return promise_rejects_exactly(t, error1, rs.pipeTo(ws), 'pipeTo must reject with the same error') + .then(() => { + assert_array_equals(rs.events, []); + assert_array_equals(ws.events, ['abort', error1]); + }); + +}, 'Errors must be propagated forward: starts errored; preventAbort = false; fulfilled abort promise'); + +promise_test(t => { + + const rs = recordingReadableStream({ + start() { + return Promise.reject(error1); + } + }); + + const ws = recordingWritableStream({ + abort() { + throw error2; + } + }); + + return promise_rejects_exactly(t, error2, rs.pipeTo(ws), 'pipeTo must reject with the abort error') + .then(() => { + assert_array_equals(rs.events, []); + assert_array_equals(ws.events, ['abort', error1]); + }); + +}, 'Errors must be propagated forward: starts errored; preventAbort = false; rejected abort promise'); + +for (const falsy of [undefined, null, false, +0, -0, NaN, '']) { + const stringVersion = Object.is(falsy, -0) ? '-0' : String(falsy); + + promise_test(t => { + + const rs = recordingReadableStream({ + start() { + return Promise.reject(error1); + } + }); + + const ws = recordingWritableStream(); + + return promise_rejects_exactly(t, error1, rs.pipeTo(ws, { preventAbort: falsy }), 'pipeTo must reject with the same error') + .then(() => { + assert_array_equals(rs.events, []); + assert_array_equals(ws.events, ['abort', error1]); + }); + + }, `Errors must be propagated forward: starts errored; preventAbort = ${stringVersion} (falsy); fulfilled abort ` + + `promise`); +} + +for (const truthy of [true, 'a', 1, Symbol(), { }]) { + promise_test(t => { + + const rs = recordingReadableStream({ + start() { + return Promise.reject(error1); + } + }); + + const ws = recordingWritableStream(); + + return promise_rejects_exactly(t, error1, rs.pipeTo(ws, { preventAbort: truthy }), + 'pipeTo must reject with the same error') + .then(() => { + assert_array_equals(rs.events, []); + assert_array_equals(ws.events, []); + }); + + }, `Errors must be propagated forward: starts errored; preventAbort = ${String(truthy)} (truthy)`); +} + + +promise_test(t => { + + const rs = recordingReadableStream({ + start() { + return Promise.reject(error1); + } + }); + + const ws = recordingWritableStream(); + + return promise_rejects_exactly(t, error1, rs.pipeTo(ws, { preventAbort: true, preventCancel: true }), + 'pipeTo must reject with the same error') + .then(() => { + assert_array_equals(rs.events, []); + assert_array_equals(ws.events, []); + }); + +}, 'Errors must be propagated forward: starts errored; preventAbort = true, preventCancel = true'); + +promise_test(t => { + + const rs = recordingReadableStream({ + start() { + return Promise.reject(error1); + } + }); + + const ws = recordingWritableStream(); + + return promise_rejects_exactly(t, error1, rs.pipeTo(ws, { preventAbort: true, preventCancel: true, preventClose: true }), + 'pipeTo must reject with the same error') + .then(() => { + assert_array_equals(rs.events, []); + assert_array_equals(ws.events, []); + }); + +}, 'Errors must be propagated forward: starts errored; preventAbort = true, preventCancel = true, preventClose = true'); + +promise_test(t => { + + const rs = recordingReadableStream(); + + const ws = recordingWritableStream(); + + const pipePromise = promise_rejects_exactly(t, error1, rs.pipeTo(ws), 'pipeTo must reject with the same error'); + + t.step_timeout(() => rs.controller.error(error1), 10); + + return pipePromise.then(() => { + assert_array_equals(rs.eventsWithoutPulls, []); + assert_array_equals(ws.events, ['abort', error1]); + }); + +}, 'Errors must be propagated forward: becomes errored while empty; preventAbort = false; fulfilled abort promise'); + +promise_test(t => { + + const rs = recordingReadableStream(); + + const ws = recordingWritableStream({ + abort() { + throw error2; + } + }); + + const pipePromise = promise_rejects_exactly(t, error2, rs.pipeTo(ws), 'pipeTo must reject with the abort error'); + + t.step_timeout(() => rs.controller.error(error1), 10); + + return pipePromise.then(() => { + assert_array_equals(rs.eventsWithoutPulls, []); + assert_array_equals(ws.events, ['abort', error1]); + }); + +}, 'Errors must be propagated forward: becomes errored while empty; preventAbort = false; rejected abort promise'); + +promise_test(t => { + + const rs = recordingReadableStream(); + + const ws = recordingWritableStream(); + + const pipePromise = promise_rejects_exactly(t, error1, rs.pipeTo(ws, { preventAbort: true }), + 'pipeTo must reject with the same error'); + + t.step_timeout(() => rs.controller.error(error1), 10); + + return pipePromise.then(() => { + assert_array_equals(rs.eventsWithoutPulls, []); + assert_array_equals(ws.events, []); + }); + +}, 'Errors must be propagated forward: becomes errored while empty; preventAbort = true'); + +promise_test(t => { + + const rs = recordingReadableStream(); + + const ws = recordingWritableStream(undefined, new CountQueuingStrategy({ highWaterMark: 0 })); + + const pipePromise = promise_rejects_exactly(t, error1, rs.pipeTo(ws), 'pipeTo must reject with the same error'); + + t.step_timeout(() => rs.controller.error(error1), 10); + + return pipePromise.then(() => { + assert_array_equals(rs.eventsWithoutPulls, []); + assert_array_equals(ws.events, ['abort', error1]); + }); + +}, 'Errors must be propagated forward: becomes errored while empty; dest never desires chunks; ' + + 'preventAbort = false; fulfilled abort promise'); + +promise_test(t => { + + const rs = recordingReadableStream(); + + const ws = recordingWritableStream({ + abort() { + throw error2; + } + }, new CountQueuingStrategy({ highWaterMark: 0 })); + + const pipePromise = promise_rejects_exactly(t, error2, rs.pipeTo(ws), 'pipeTo must reject with the abort error'); + + t.step_timeout(() => rs.controller.error(error1), 10); + + return pipePromise.then(() => { + assert_array_equals(rs.eventsWithoutPulls, []); + assert_array_equals(ws.events, ['abort', error1]); + }); + +}, 'Errors must be propagated forward: becomes errored while empty; dest never desires chunks; ' + + 'preventAbort = false; rejected abort promise'); + +promise_test(t => { + + const rs = recordingReadableStream(); + + const ws = recordingWritableStream(undefined, new CountQueuingStrategy({ highWaterMark: 0 })); + + const pipePromise = promise_rejects_exactly(t, error1, rs.pipeTo(ws, { preventAbort: true }), + 'pipeTo must reject with the same error'); + + t.step_timeout(() => rs.controller.error(error1), 10); + + return pipePromise.then(() => { + assert_array_equals(rs.eventsWithoutPulls, []); + assert_array_equals(ws.events, []); + }); + +}, 'Errors must be propagated forward: becomes errored while empty; dest never desires chunks; ' + + 'preventAbort = true'); + +promise_test(t => { + + const rs = recordingReadableStream(); + + const ws = recordingWritableStream(); + + const pipePromise = promise_rejects_exactly(t, error1, rs.pipeTo(ws), 'pipeTo must reject with the same error'); + + t.step_timeout(() => { + rs.controller.enqueue('Hello'); + t.step_timeout(() => rs.controller.error(error1), 10); + }, 10); + + return pipePromise.then(() => { + assert_array_equals(rs.eventsWithoutPulls, []); + assert_array_equals(ws.events, ['write', 'Hello', 'abort', error1]); + }); + +}, 'Errors must be propagated forward: becomes errored after one chunk; preventAbort = false; fulfilled abort promise'); + +promise_test(t => { + + const rs = recordingReadableStream(); + + const ws = recordingWritableStream({ + abort() { + throw error2; + } + }); + + const pipePromise = promise_rejects_exactly(t, error2, rs.pipeTo(ws), 'pipeTo must reject with the abort error'); + + t.step_timeout(() => { + rs.controller.enqueue('Hello'); + t.step_timeout(() => rs.controller.error(error1), 10); + }, 10); + + return pipePromise.then(() => { + assert_array_equals(rs.eventsWithoutPulls, []); + assert_array_equals(ws.events, ['write', 'Hello', 'abort', error1]); + }); + +}, 'Errors must be propagated forward: becomes errored after one chunk; preventAbort = false; rejected abort promise'); + +promise_test(t => { + + const rs = recordingReadableStream(); + + const ws = recordingWritableStream(); + + const pipePromise = promise_rejects_exactly(t, error1, rs.pipeTo(ws, { preventAbort: true }), + 'pipeTo must reject with the same error'); + + t.step_timeout(() => { + rs.controller.enqueue('Hello'); + t.step_timeout(() => rs.controller.error(error1), 10); + }, 10); + + return pipePromise.then(() => { + assert_array_equals(rs.eventsWithoutPulls, []); + assert_array_equals(ws.events, ['write', 'Hello']); + }); + +}, 'Errors must be propagated forward: becomes errored after one chunk; preventAbort = true'); + +promise_test(t => { + + const rs = recordingReadableStream(); + + const ws = recordingWritableStream(undefined, new CountQueuingStrategy({ highWaterMark: 0 })); + + const pipePromise = promise_rejects_exactly(t, error1, rs.pipeTo(ws), 'pipeTo must reject with the same error'); + + t.step_timeout(() => { + rs.controller.enqueue('Hello'); + t.step_timeout(() => rs.controller.error(error1), 10); + }, 10); + + return pipePromise.then(() => { + assert_array_equals(rs.eventsWithoutPulls, []); + assert_array_equals(ws.events, ['abort', error1]); + }); + +}, 'Errors must be propagated forward: becomes errored after one chunk; dest never desires chunks; ' + + 'preventAbort = false; fulfilled abort promise'); + +promise_test(t => { + + const rs = recordingReadableStream(); + + const ws = recordingWritableStream({ + abort() { + throw error2; + } + }, new CountQueuingStrategy({ highWaterMark: 0 })); + + const pipePromise = promise_rejects_exactly(t, error2, rs.pipeTo(ws), 'pipeTo must reject with the abort error'); + + t.step_timeout(() => { + rs.controller.enqueue('Hello'); + t.step_timeout(() => rs.controller.error(error1), 10); + }, 10); + + return pipePromise.then(() => { + assert_array_equals(rs.eventsWithoutPulls, []); + assert_array_equals(ws.events, ['abort', error1]); + }); + +}, 'Errors must be propagated forward: becomes errored after one chunk; dest never desires chunks; ' + + 'preventAbort = false; rejected abort promise'); + +promise_test(t => { + + const rs = recordingReadableStream(); + + const ws = recordingWritableStream(undefined, new CountQueuingStrategy({ highWaterMark: 0 })); + + const pipePromise = promise_rejects_exactly(t, error1, rs.pipeTo(ws, { preventAbort: true }), + 'pipeTo must reject with the same error'); + + t.step_timeout(() => { + rs.controller.enqueue('Hello'); + t.step_timeout(() => rs.controller.error(error1), 10); + }, 10); + + return pipePromise.then(() => { + assert_array_equals(rs.eventsWithoutPulls, []); + assert_array_equals(ws.events, []); + }); + +}, 'Errors must be propagated forward: becomes errored after one chunk; dest never desires chunks; ' + + 'preventAbort = true'); + +promise_test(t => { + + const rs = recordingReadableStream(); + + let resolveWriteCalled; + const writeCalledPromise = new Promise(resolve => { + resolveWriteCalled = resolve; + }); + + let resolveWritePromise; + const ws = recordingWritableStream({ + write() { + resolveWriteCalled(); + + return new Promise(resolve => { + resolveWritePromise = resolve; + }); + } + }); + + let pipeComplete = false; + const pipePromise = promise_rejects_exactly(t, error1, rs.pipeTo(ws)).then(() => { + pipeComplete = true; + }); + + rs.controller.enqueue('a'); + + return writeCalledPromise.then(() => { + rs.controller.error(error1); + + // Flush async events and verify that no shutdown occurs. + return flushAsyncEvents(); + }).then(() => { + assert_array_equals(ws.events, ['write', 'a']); // no 'abort' + assert_equals(pipeComplete, false, 'the pipe must not be complete'); + + resolveWritePromise(); + + return pipePromise.then(() => { + assert_array_equals(ws.events, ['write', 'a', 'abort', error1]); + }); + }); + +}, 'Errors must be propagated forward: shutdown must not occur until the final write completes'); + +promise_test(t => { + + const rs = recordingReadableStream(); + + let resolveWriteCalled; + const writeCalledPromise = new Promise(resolve => { + resolveWriteCalled = resolve; + }); + + let resolveWritePromise; + const ws = recordingWritableStream({ + write() { + resolveWriteCalled(); + + return new Promise(resolve => { + resolveWritePromise = resolve; + }); + } + }); + + let pipeComplete = false; + const pipePromise = promise_rejects_exactly(t, error1, rs.pipeTo(ws, { preventAbort: true })).then(() => { + pipeComplete = true; + }); + + rs.controller.enqueue('a'); + + return writeCalledPromise.then(() => { + rs.controller.error(error1); + + // Flush async events and verify that no shutdown occurs. + return flushAsyncEvents(); + }).then(() => { + assert_array_equals(ws.events, ['write', 'a']); // no 'abort' + assert_equals(pipeComplete, false, 'the pipe must not be complete'); + + resolveWritePromise(); + return pipePromise; + }).then(() => flushAsyncEvents()).then(() => { + assert_array_equals(ws.events, ['write', 'a']); // no 'abort' + }); + +}, 'Errors must be propagated forward: shutdown must not occur until the final write completes; preventAbort = true'); + +promise_test(t => { + + const rs = recordingReadableStream(); + + let resolveWriteCalled; + const writeCalledPromise = new Promise(resolve => { + resolveWriteCalled = resolve; + }); + + let resolveWritePromise; + const ws = recordingWritableStream({ + write() { + resolveWriteCalled(); + + return new Promise(resolve => { + resolveWritePromise = resolve; + }); + } + }, new CountQueuingStrategy({ highWaterMark: 2 })); + + let pipeComplete = false; + const pipePromise = promise_rejects_exactly(t, error1, rs.pipeTo(ws)).then(() => { + pipeComplete = true; + }); + + rs.controller.enqueue('a'); + rs.controller.enqueue('b'); + + return writeCalledPromise.then(() => flushAsyncEvents()).then(() => { + assert_array_equals(ws.events, ['write', 'a'], + 'the first chunk must have been written, but abort must not have happened yet'); + assert_false(pipeComplete, 'the pipe should not complete while the first write is pending'); + + rs.controller.error(error1); + resolveWritePromise(); + return flushAsyncEvents(); + }).then(() => { + assert_array_equals(ws.events, ['write', 'a', 'write', 'b'], + 'the second chunk must have been written, but abort must not have happened yet'); + assert_false(pipeComplete, 'the pipe should not complete while the second write is pending'); + + resolveWritePromise(); + return pipePromise; + }).then(() => { + assert_array_equals(ws.events, ['write', 'a', 'write', 'b', 'abort', error1], + 'all chunks must have been written and abort must have happened'); + }); + +}, 'Errors must be propagated forward: shutdown must not occur until the final write completes; becomes errored after first write'); + +promise_test(t => { + + const rs = recordingReadableStream(); + + let resolveWriteCalled; + const writeCalledPromise = new Promise(resolve => { + resolveWriteCalled = resolve; + }); + + let resolveWritePromise; + const ws = recordingWritableStream({ + write() { + resolveWriteCalled(); + + return new Promise(resolve => { + resolveWritePromise = resolve; + }); + } + }, new CountQueuingStrategy({ highWaterMark: 2 })); + + let pipeComplete = false; + const pipePromise = promise_rejects_exactly(t, error1, rs.pipeTo(ws, { preventAbort: true })).then(() => { + pipeComplete = true; + }); + + rs.controller.enqueue('a'); + rs.controller.enqueue('b'); + + return writeCalledPromise.then(() => flushAsyncEvents()).then(() => { + assert_array_equals(ws.events, ['write', 'a'], + 'the first chunk must have been written, but abort must not have happened'); + assert_false(pipeComplete, 'the pipe should not complete while the first write is pending'); + + rs.controller.error(error1); + resolveWritePromise(); + }).then(() => flushAsyncEvents()).then(() => { + assert_array_equals(ws.events, ['write', 'a', 'write', 'b'], + 'the second chunk must have been written, but abort must not have happened'); + assert_false(pipeComplete, 'the pipe should not complete while the second write is pending'); + + resolveWritePromise(); + return pipePromise; + }).then(() => flushAsyncEvents()).then(() => { + assert_array_equals(ws.events, ['write', 'a', 'write', 'b'], + 'all chunks must have been written, but abort must not have happened'); + }); + +}, 'Errors must be propagated forward: shutdown must not occur until the final write completes; becomes errored after first write; preventAbort = true'); diff --git a/Tests/LibWeb/Text/input/wpt-import/streams/piping/flow-control.any.html b/Tests/LibWeb/Text/input/wpt-import/streams/piping/flow-control.any.html new file mode 100644 index 00000000000..8afa4be6159 --- /dev/null +++ b/Tests/LibWeb/Text/input/wpt-import/streams/piping/flow-control.any.html @@ -0,0 +1,17 @@ + + + + + + + + + +
+ diff --git a/Tests/LibWeb/Text/input/wpt-import/streams/piping/flow-control.any.js b/Tests/LibWeb/Text/input/wpt-import/streams/piping/flow-control.any.js new file mode 100644 index 00000000000..e2318da375a --- /dev/null +++ b/Tests/LibWeb/Text/input/wpt-import/streams/piping/flow-control.any.js @@ -0,0 +1,297 @@ +// META: global=window,worker,shadowrealm +// META: script=../resources/test-utils.js +// META: script=../resources/rs-utils.js +// META: script=../resources/recording-streams.js +'use strict'; + +const error1 = new Error('error1!'); +error1.name = 'error1'; + +promise_test(t => { + + const rs = recordingReadableStream({ + start(controller) { + controller.enqueue('a'); + controller.enqueue('b'); + controller.close(); + } + }); + + const ws = recordingWritableStream(undefined, new CountQueuingStrategy({ highWaterMark: 0 })); + + const pipePromise = rs.pipeTo(ws, { preventCancel: true }); + + // Wait and make sure it doesn't do any reading. + return flushAsyncEvents().then(() => { + ws.controller.error(error1); + }) + .then(() => promise_rejects_exactly(t, error1, pipePromise, 'pipeTo must reject with the same error')) + .then(() => { + assert_array_equals(rs.eventsWithoutPulls, []); + assert_array_equals(ws.events, []); + }) + .then(() => readableStreamToArray(rs)) + .then(chunksNotPreviouslyRead => { + assert_array_equals(chunksNotPreviouslyRead, ['a', 'b']); + }); + +}, 'Piping from a non-empty ReadableStream into a WritableStream that does not desire chunks'); + +promise_test(() => { + + const rs = recordingReadableStream({ + start(controller) { + controller.enqueue('b'); + controller.close(); + } + }); + + let resolveWritePromise; + const ws = recordingWritableStream({ + write() { + if (!resolveWritePromise) { + // first write + return new Promise(resolve => { + resolveWritePromise = resolve; + }); + } + return undefined; + } + }); + + const writer = ws.getWriter(); + const firstWritePromise = writer.write('a'); + assert_equals(writer.desiredSize, 0, 'after writing the writer\'s desiredSize must be 0'); + writer.releaseLock(); + + // firstWritePromise won't settle until we call resolveWritePromise. + + const pipePromise = rs.pipeTo(ws); + + return flushAsyncEvents().then(() => resolveWritePromise()) + .then(() => Promise.all([firstWritePromise, pipePromise])) + .then(() => { + assert_array_equals(rs.eventsWithoutPulls, []); + assert_array_equals(ws.events, ['write', 'a', 'write', 'b', 'close']); + }); + +}, 'Piping from a non-empty ReadableStream into a WritableStream that does not desire chunks, but then does'); + +promise_test(() => { + + const rs = recordingReadableStream(); + + let resolveWritePromise; + const ws = recordingWritableStream({ + write() { + if (!resolveWritePromise) { + // first write + return new Promise(resolve => { + resolveWritePromise = resolve; + }); + } + return undefined; + } + }); + + const writer = ws.getWriter(); + writer.write('a'); + + return flushAsyncEvents().then(() => { + assert_array_equals(ws.events, ['write', 'a']); + assert_equals(writer.desiredSize, 0, 'after writing the writer\'s desiredSize must be 0'); + writer.releaseLock(); + + const pipePromise = rs.pipeTo(ws); + + rs.controller.enqueue('b'); + resolveWritePromise(); + rs.controller.close(); + + return pipePromise.then(() => { + assert_array_equals(rs.eventsWithoutPulls, []); + assert_array_equals(ws.events, ['write', 'a', 'write', 'b', 'close']); + }); + }); + +}, 'Piping from an empty ReadableStream into a WritableStream that does not desire chunks, but then the readable ' + + 'stream becomes non-empty and the writable stream starts desiring chunks'); + +promise_test(() => { + const unreadChunks = ['b', 'c', 'd']; + + const rs = recordingReadableStream({ + pull(controller) { + controller.enqueue(unreadChunks.shift()); + if (unreadChunks.length === 0) { + controller.close(); + } + } + }, new CountQueuingStrategy({ highWaterMark: 0 })); + + let resolveWritePromise; + const ws = recordingWritableStream({ + write() { + if (!resolveWritePromise) { + // first write + return new Promise(resolve => { + resolveWritePromise = resolve; + }); + } + return undefined; + } + }, new CountQueuingStrategy({ highWaterMark: 3 })); + + const writer = ws.getWriter(); + const firstWritePromise = writer.write('a'); + assert_equals(writer.desiredSize, 2, 'after writing the writer\'s desiredSize must be 2'); + writer.releaseLock(); + + // firstWritePromise won't settle until we call resolveWritePromise. + + const pipePromise = rs.pipeTo(ws); + + return flushAsyncEvents().then(() => { + assert_array_equals(ws.events, ['write', 'a']); + assert_equals(unreadChunks.length, 1, 'chunks should continue to be enqueued until the HWM is reached'); + }).then(() => resolveWritePromise()) + .then(() => Promise.all([firstWritePromise, pipePromise])) + .then(() => { + assert_array_equals(rs.events, ['pull', 'pull', 'pull']); + assert_array_equals(ws.events, ['write', 'a', 'write', 'b','write', 'c','write', 'd', 'close']); + }); + +}, 'Piping from a ReadableStream to a WritableStream that desires more chunks before finishing with previous ones'); + +class StepTracker { + constructor() { + this.waiters = []; + this.wakers = []; + } + + // Returns promise which resolves when step `n` is reached. Also schedules step n + 1 to happen shortly after the + // promise is resolved. + waitThenAdvance(n) { + if (this.waiters[n] === undefined) { + this.waiters[n] = new Promise(resolve => { + this.wakers[n] = resolve; + }); + this.waiters[n] + .then(() => flushAsyncEvents()) + .then(() => { + if (this.wakers[n + 1] !== undefined) { + this.wakers[n + 1](); + } + }); + } + if (n == 0) { + this.wakers[0](); + } + return this.waiters[n]; + } +} + +promise_test(() => { + const steps = new StepTracker(); + const desiredSizes = []; + const rs = recordingReadableStream({ + start(controller) { + steps.waitThenAdvance(1).then(() => enqueue('a')); + steps.waitThenAdvance(3).then(() => enqueue('b')); + steps.waitThenAdvance(5).then(() => enqueue('c')); + steps.waitThenAdvance(7).then(() => enqueue('d')); + steps.waitThenAdvance(11).then(() => controller.close()); + + function enqueue(chunk) { + controller.enqueue(chunk); + desiredSizes.push(controller.desiredSize); + } + } + }); + + const chunksFinishedWriting = []; + const writableStartPromise = Promise.resolve(); + let writeCalled = false; + const ws = recordingWritableStream({ + start() { + return writableStartPromise; + }, + write(chunk) { + const waitForStep = writeCalled ? 12 : 9; + writeCalled = true; + return steps.waitThenAdvance(waitForStep).then(() => { + chunksFinishedWriting.push(chunk); + }); + } + }); + + return writableStartPromise.then(() => { + const pipePromise = rs.pipeTo(ws); + steps.waitThenAdvance(0); + + return Promise.all([ + steps.waitThenAdvance(2).then(() => { + assert_array_equals(chunksFinishedWriting, [], 'at step 2, zero chunks must have finished writing'); + assert_array_equals(ws.events, ['write', 'a'], 'at step 2, one chunk must have been written'); + + // When 'a' (the very first chunk) was enqueued, it was immediately used to fulfill the outstanding read request + // promise, leaving the queue empty. + assert_array_equals(desiredSizes, [1], + 'at step 2, the desiredSize at the last enqueue (step 1) must have been 1'); + assert_equals(rs.controller.desiredSize, 1, 'at step 2, the current desiredSize must be 1'); + }), + + steps.waitThenAdvance(4).then(() => { + assert_array_equals(chunksFinishedWriting, [], 'at step 4, zero chunks must have finished writing'); + assert_array_equals(ws.events, ['write', 'a'], 'at step 4, one chunk must have been written'); + + // When 'b' was enqueued at step 3, the queue was also empty, since immediately after enqueuing 'a' at + // step 1, it was dequeued in order to fulfill the read() call that was made at step 0. Thus the queue + // had size 1 (thus desiredSize of 0). + assert_array_equals(desiredSizes, [1, 0], + 'at step 4, the desiredSize at the last enqueue (step 3) must have been 0'); + assert_equals(rs.controller.desiredSize, 0, 'at step 4, the current desiredSize must be 0'); + }), + + steps.waitThenAdvance(6).then(() => { + assert_array_equals(chunksFinishedWriting, [], 'at step 6, zero chunks must have finished writing'); + assert_array_equals(ws.events, ['write', 'a'], 'at step 6, one chunk must have been written'); + + // When 'c' was enqueued at step 5, the queue was not empty; it had 'b' in it, since 'b' will not be read until + // the first write completes at step 9. Thus, the queue size is 2 after enqueuing 'c', giving a desiredSize of + // -1. + assert_array_equals(desiredSizes, [1, 0, -1], + 'at step 6, the desiredSize at the last enqueue (step 5) must have been -1'); + assert_equals(rs.controller.desiredSize, -1, 'at step 6, the current desiredSize must be -1'); + }), + + steps.waitThenAdvance(8).then(() => { + assert_array_equals(chunksFinishedWriting, [], 'at step 8, zero chunks must have finished writing'); + assert_array_equals(ws.events, ['write', 'a'], 'at step 8, one chunk must have been written'); + + // When 'd' was enqueued at step 7, the situation is the same as before, leading to a queue containing 'b', 'c', + // and 'd'. + assert_array_equals(desiredSizes, [1, 0, -1, -2], + 'at step 8, the desiredSize at the last enqueue (step 7) must have been -2'); + assert_equals(rs.controller.desiredSize, -2, 'at step 8, the current desiredSize must be -2'); + }), + + steps.waitThenAdvance(10).then(() => { + assert_array_equals(chunksFinishedWriting, ['a'], 'at step 10, one chunk must have finished writing'); + assert_array_equals(ws.events, ['write', 'a', 'write', 'b'], + 'at step 10, two chunks must have been written'); + + assert_equals(rs.controller.desiredSize, -1, 'at step 10, the current desiredSize must be -1'); + }), + + pipePromise.then(() => { + assert_array_equals(desiredSizes, [1, 0, -1, -2], 'backpressure must have been exerted at the source'); + assert_array_equals(chunksFinishedWriting, ['a', 'b', 'c', 'd'], 'all chunks finished writing'); + + assert_array_equals(rs.eventsWithoutPulls, [], 'nothing unexpected should happen to the ReadableStream'); + assert_array_equals(ws.events, ['write', 'a', 'write', 'b', 'write', 'c', 'write', 'd', 'close'], + 'all chunks were written (and the WritableStream closed)'); + }) + ]); + }); +}, 'Piping to a WritableStream that does not consume the writes fast enough exerts backpressure on the ReadableStream'); diff --git a/Tests/LibWeb/Text/input/wpt-import/streams/piping/general.any.html b/Tests/LibWeb/Text/input/wpt-import/streams/piping/general.any.html new file mode 100644 index 00000000000..7bdc8bf6ad8 --- /dev/null +++ b/Tests/LibWeb/Text/input/wpt-import/streams/piping/general.any.html @@ -0,0 +1,15 @@ + + + + + + + +
+ diff --git a/Tests/LibWeb/Text/input/wpt-import/streams/piping/general.any.js b/Tests/LibWeb/Text/input/wpt-import/streams/piping/general.any.js new file mode 100644 index 00000000000..f051d8102c2 --- /dev/null +++ b/Tests/LibWeb/Text/input/wpt-import/streams/piping/general.any.js @@ -0,0 +1,212 @@ +// META: global=window,worker,shadowrealm +// META: script=../resources/recording-streams.js +'use strict'; + +test(() => { + + const rs = new ReadableStream(); + const ws = new WritableStream(); + + assert_false(rs.locked, 'sanity check: the ReadableStream must not start locked'); + assert_false(ws.locked, 'sanity check: the WritableStream must not start locked'); + + rs.pipeTo(ws); + + assert_true(rs.locked, 'the ReadableStream must become locked'); + assert_true(ws.locked, 'the WritableStream must become locked'); + +}, 'Piping must lock both the ReadableStream and WritableStream'); + +promise_test(() => { + + const rs = new ReadableStream({ + start(controller) { + controller.close(); + } + }); + const ws = new WritableStream(); + + return rs.pipeTo(ws).then(() => { + assert_false(rs.locked, 'the ReadableStream must become unlocked'); + assert_false(ws.locked, 'the WritableStream must become unlocked'); + }); + +}, 'Piping finishing must unlock both the ReadableStream and WritableStream'); + +promise_test(t => { + + const fakeRS = Object.create(ReadableStream.prototype); + const ws = new WritableStream(); + + return promise_rejects_js(t, TypeError, ReadableStream.prototype.pipeTo.apply(fakeRS, [ws]), + 'pipeTo should reject with a TypeError'); + +}, 'pipeTo must check the brand of its ReadableStream this value'); + +promise_test(t => { + + const rs = new ReadableStream(); + const fakeWS = Object.create(WritableStream.prototype); + + return promise_rejects_js(t, TypeError, ReadableStream.prototype.pipeTo.apply(rs, [fakeWS]), + 'pipeTo should reject with a TypeError'); + +}, 'pipeTo must check the brand of its WritableStream argument'); + +promise_test(t => { + + const rs = new ReadableStream(); + const ws = new WritableStream(); + + rs.getReader(); + + assert_true(rs.locked, 'sanity check: the ReadableStream starts locked'); + assert_false(ws.locked, 'sanity check: the WritableStream does not start locked'); + + return promise_rejects_js(t, TypeError, rs.pipeTo(ws)).then(() => { + assert_false(ws.locked, 'the WritableStream must still be unlocked'); + }); + +}, 'pipeTo must fail if the ReadableStream is locked, and not lock the WritableStream'); + +promise_test(t => { + + const rs = new ReadableStream(); + const ws = new WritableStream(); + + ws.getWriter(); + + assert_false(rs.locked, 'sanity check: the ReadableStream does not start locked'); + assert_true(ws.locked, 'sanity check: the WritableStream starts locked'); + + return promise_rejects_js(t, TypeError, rs.pipeTo(ws)).then(() => { + assert_false(rs.locked, 'the ReadableStream must still be unlocked'); + }); + +}, 'pipeTo must fail if the WritableStream is locked, and not lock the ReadableStream'); + +promise_test(() => { + + const CHUNKS = 10; + + const rs = new ReadableStream({ + start(c) { + for (let i = 0; i < CHUNKS; ++i) { + c.enqueue(i); + } + c.close(); + } + }); + + const written = []; + const ws = new WritableStream({ + write(chunk) { + written.push(chunk); + }, + close() { + written.push('closed'); + } + }, new CountQueuingStrategy({ highWaterMark: CHUNKS })); + + return rs.pipeTo(ws).then(() => { + const targetValues = []; + for (let i = 0; i < CHUNKS; ++i) { + targetValues.push(i); + } + targetValues.push('closed'); + + assert_array_equals(written, targetValues, 'the correct values must be written'); + + // Ensure both readable and writable are closed by the time the pipe finishes. + return Promise.all([ + rs.getReader().closed, + ws.getWriter().closed + ]); + }); + + // NOTE: no requirement on *when* the pipe finishes; that is left to implementations. + +}, 'Piping from a ReadableStream from which lots of chunks are synchronously readable'); + +promise_test(t => { + + let controller; + const rs = recordingReadableStream({ + start(c) { + controller = c; + } + }); + + const ws = recordingWritableStream(); + + const pipePromise = rs.pipeTo(ws).then(() => { + assert_array_equals(ws.events, ['write', 'Hello', 'close']); + }); + + t.step_timeout(() => { + controller.enqueue('Hello'); + t.step_timeout(() => controller.close(), 10); + }, 10); + + return pipePromise; + +}, 'Piping from a ReadableStream for which a chunk becomes asynchronously readable after the pipeTo'); + +for (const preventAbort of [true, false]) { + promise_test(() => { + + const rs = new ReadableStream({ + pull() { + return Promise.reject(undefined); + } + }); + + return rs.pipeTo(new WritableStream(), { preventAbort }).then( + () => assert_unreached('pipeTo promise should be rejected'), + value => assert_equals(value, undefined, 'rejection value should be undefined')); + + }, `an undefined rejection from pull should cause pipeTo() to reject when preventAbort is ${preventAbort}`); +} + +for (const preventCancel of [true, false]) { + promise_test(() => { + + const rs = new ReadableStream({ + pull(controller) { + controller.enqueue(0); + } + }); + + const ws = new WritableStream({ + write() { + return Promise.reject(undefined); + } + }); + + return rs.pipeTo(ws, { preventCancel }).then( + () => assert_unreached('pipeTo promise should be rejected'), + value => assert_equals(value, undefined, 'rejection value should be undefined')); + + }, `an undefined rejection from write should cause pipeTo() to reject when preventCancel is ${preventCancel}`); +} + +promise_test(t => { + const rs = new ReadableStream(); + const ws = new WritableStream(); + return promise_rejects_js(t, TypeError, rs.pipeTo(ws, { + get preventAbort() { + ws.getWriter(); + } + }), 'pipeTo should reject'); +}, 'pipeTo() should reject if an option getter grabs a writer'); + +promise_test(t => { + const rs = new ReadableStream({ + start(controller) { + controller.close(); + } + }); + const ws = new WritableStream(); + + return rs.pipeTo(ws, null); +}, 'pipeTo() promise should resolve if null is passed'); diff --git a/Tests/LibWeb/Text/input/wpt-import/streams/piping/multiple-propagation.any.html b/Tests/LibWeb/Text/input/wpt-import/streams/piping/multiple-propagation.any.html new file mode 100644 index 00000000000..b71f29922b3 --- /dev/null +++ b/Tests/LibWeb/Text/input/wpt-import/streams/piping/multiple-propagation.any.html @@ -0,0 +1,16 @@ + + + + + + + + +
+ diff --git a/Tests/LibWeb/Text/input/wpt-import/streams/piping/multiple-propagation.any.js b/Tests/LibWeb/Text/input/wpt-import/streams/piping/multiple-propagation.any.js new file mode 100644 index 00000000000..9be828a2326 --- /dev/null +++ b/Tests/LibWeb/Text/input/wpt-import/streams/piping/multiple-propagation.any.js @@ -0,0 +1,227 @@ +// META: global=window,worker,shadowrealm +// META: script=../resources/test-utils.js +// META: script=../resources/recording-streams.js +'use strict'; + +const error1 = new Error('error1!'); +error1.name = 'error1'; + +const error2 = new Error('error2!'); +error2.name = 'error2'; + +function createErroredWritableStream(t) { + return Promise.resolve().then(() => { + const ws = recordingWritableStream({ + start(c) { + c.error(error2); + } + }); + + const writer = ws.getWriter(); + return promise_rejects_exactly(t, error2, writer.closed, 'the writable stream must be errored with error2') + .then(() => { + writer.releaseLock(); + assert_array_equals(ws.events, []); + return ws; + }); + }); +} + +promise_test(t => { + const rs = recordingReadableStream({ + start(c) { + c.error(error1); + } + }); + const ws = recordingWritableStream({ + start(c) { + c.error(error2); + } + }); + + // Trying to abort a stream that is erroring will give the writable's error + return promise_rejects_exactly(t, error2, rs.pipeTo(ws), 'pipeTo must reject with the writable stream\'s error').then(() => { + assert_array_equals(rs.events, []); + assert_array_equals(ws.events, []); + + return Promise.all([ + promise_rejects_exactly(t, error1, rs.getReader().closed, 'the readable stream must be errored with error1'), + promise_rejects_exactly(t, error2, ws.getWriter().closed, 'the writable stream must be errored with error2') + ]); + }); + +}, 'Piping from an errored readable stream to an erroring writable stream'); + +promise_test(t => { + const rs = recordingReadableStream({ + start(c) { + c.error(error1); + } + }); + + return createErroredWritableStream(t) + .then(ws => promise_rejects_exactly(t, error1, rs.pipeTo(ws), 'pipeTo must reject with the readable stream\'s error')) + .then(() => { + assert_array_equals(rs.events, []); + + return promise_rejects_exactly(t, error1, rs.getReader().closed, 'the readable stream must be errored with error1'); + }); +}, 'Piping from an errored readable stream to an errored writable stream'); + +promise_test(t => { + const rs = recordingReadableStream({ + start(c) { + c.error(error1); + } + }); + const ws = recordingWritableStream({ + start(c) { + c.error(error2); + } + }); + + return promise_rejects_exactly(t, error1, rs.pipeTo(ws, { preventAbort: true }), + 'pipeTo must reject with the readable stream\'s error') + .then(() => { + assert_array_equals(rs.events, []); + assert_array_equals(ws.events, []); + + return Promise.all([ + promise_rejects_exactly(t, error1, rs.getReader().closed, 'the readable stream must be errored with error1'), + promise_rejects_exactly(t, error2, ws.getWriter().closed, 'the writable stream must be errored with error2') + ]); + }); + +}, 'Piping from an errored readable stream to an erroring writable stream; preventAbort = true'); + +promise_test(t => { + const rs = recordingReadableStream({ + start(c) { + c.error(error1); + } + }); + return createErroredWritableStream(t) + .then(ws => promise_rejects_exactly(t, error1, rs.pipeTo(ws, { preventAbort: true }), + 'pipeTo must reject with the readable stream\'s error')) + .then(() => { + assert_array_equals(rs.events, []); + + return promise_rejects_exactly(t, error1, rs.getReader().closed, 'the readable stream must be errored with error1'); + }); + +}, 'Piping from an errored readable stream to an errored writable stream; preventAbort = true'); + +promise_test(t => { + const rs = recordingReadableStream({ + start(c) { + c.error(error1); + } + }); + const ws = recordingWritableStream(); + const writer = ws.getWriter(); + const closePromise = writer.close(); + writer.releaseLock(); + + return promise_rejects_exactly(t, error1, rs.pipeTo(ws), 'pipeTo must reject with the readable stream\'s error').then(() => { + assert_array_equals(rs.events, []); + assert_array_equals(ws.events, ['abort', error1]); + + return Promise.all([ + promise_rejects_exactly(t, error1, rs.getReader().closed, 'the readable stream must be errored with error1'), + promise_rejects_exactly(t, error1, ws.getWriter().closed, + 'closed must reject with error1'), + promise_rejects_exactly(t, error1, closePromise, + 'close() must reject with error1') + ]); + }); + +}, 'Piping from an errored readable stream to a closing writable stream'); + +promise_test(t => { + const rs = recordingReadableStream({ + start(c) { + c.error(error1); + } + }); + const ws = recordingWritableStream(); + const writer = ws.getWriter(); + const closePromise = writer.close(); + writer.releaseLock(); + + return flushAsyncEvents().then(() => { + return promise_rejects_exactly(t, error1, rs.pipeTo(ws), 'pipeTo must reject with the readable stream\'s error').then(() => { + assert_array_equals(rs.events, []); + assert_array_equals(ws.events, ['close']); + + return Promise.all([ + promise_rejects_exactly(t, error1, rs.getReader().closed, 'the readable stream must be errored with error1'), + ws.getWriter().closed, + closePromise + ]); + }); + }); + +}, 'Piping from an errored readable stream to a closed writable stream'); + +promise_test(t => { + const rs = recordingReadableStream({ + start(c) { + c.close(); + } + }); + const ws = recordingWritableStream({ + start(c) { + c.error(error1); + } + }); + + return promise_rejects_exactly(t, error1, rs.pipeTo(ws), 'pipeTo must reject with the writable stream\'s error').then(() => { + assert_array_equals(rs.events, []); + assert_array_equals(ws.events, []); + + return Promise.all([ + rs.getReader().closed, + promise_rejects_exactly(t, error1, ws.getWriter().closed, 'the writable stream must be errored with error1') + ]); + }); + +}, 'Piping from a closed readable stream to an erroring writable stream'); + +promise_test(t => { + const rs = recordingReadableStream({ + start(c) { + c.close(); + } + }); + return createErroredWritableStream(t) + .then(ws => promise_rejects_exactly(t, error2, rs.pipeTo(ws), 'pipeTo must reject with the writable stream\'s error')) + .then(() => { + assert_array_equals(rs.events, []); + + return rs.getReader().closed; + }); + +}, 'Piping from a closed readable stream to an errored writable stream'); + +promise_test(() => { + const rs = recordingReadableStream({ + start(c) { + c.close(); + } + }); + const ws = recordingWritableStream(); + const writer = ws.getWriter(); + writer.close(); + writer.releaseLock(); + + return rs.pipeTo(ws).then(() => { + assert_array_equals(rs.events, []); + assert_array_equals(ws.events, ['close']); + + return Promise.all([ + rs.getReader().closed, + ws.getWriter().closed + ]); + }); + +}, 'Piping from a closed readable stream to a closed writable stream');