/* * Copyright (c) 2025, Tim Flynn * * SPDX-License-Identifier: BSD-2-Clause */ #include #include #include #include #include #include #include #include #include namespace Web::Streams::Detail { class ReadableStreamPipeToReadRequest final : public ReadRequest { GC_CELL(ReadableStreamPipeToReadRequest, ReadRequest); GC_DECLARE_ALLOCATOR(ReadableStreamPipeToReadRequest); using OnChunk = GC::Ref>; using OnComplete = GC::Ref>; // This has a return value just for compatibility with WebIDL::react_to_promise. using OnError = GC::Ref(JS::Value)>>; public: virtual void on_chunk(JS::Value chunk) override { m_on_chunk->function()(chunk); } virtual void on_close() override { m_on_complete->function()(); } virtual void on_error(JS::Value error) override { MUST(m_on_error->function()(error)); } private: ReadableStreamPipeToReadRequest(OnChunk on_chunk, OnComplete on_complete, OnError on_error) : m_on_chunk(on_chunk) , m_on_complete(on_complete) , m_on_error(on_error) { } virtual void visit_edges(Visitor& visitor) override { Base::visit_edges(visitor); visitor.visit(m_on_chunk); visitor.visit(m_on_complete); visitor.visit(m_on_error); } OnChunk m_on_chunk; OnComplete m_on_complete; OnError m_on_error; }; GC_DEFINE_ALLOCATOR(ReadableStreamPipeTo); GC_DEFINE_ALLOCATOR(ReadableStreamPipeToReadRequest); // https://streams.spec.whatwg.org/#ref-for-in-parallel ReadableStreamPipeTo::ReadableStreamPipeTo( GC::Ref realm, GC::Ref promise, GC::Ref source, GC::Ref destination, GC::Ref reader, GC::Ref writer, bool prevent_close, bool prevent_abort, bool prevent_cancel) : m_realm(realm) , m_promise(promise) , m_source(source) , m_destination(destination) , m_reader(reader) , m_writer(writer) , m_prevent_close(prevent_close) , m_prevent_abort(prevent_abort) , m_prevent_cancel(prevent_cancel) { m_reader->set_readable_stream_pipe_to_operation({}, this); } void ReadableStreamPipeTo::visit_edges(Cell::Visitor& visitor) { Base::visit_edges(visitor); visitor.visit(m_realm); visitor.visit(m_promise); visitor.visit(m_source); visitor.visit(m_destination); visitor.visit(m_reader); visitor.visit(m_writer); visitor.visit(m_signal); visitor.visit(m_pending_writes); visitor.visit(m_unwritten_chunks); } void ReadableStreamPipeTo::process() { if (check_for_error_and_close_states()) return; auto ready_promise = m_writer->ready(); if (ready_promise && WebIDL::is_promise_fulfilled(*ready_promise)) { read_chunk(); return; } auto when_ready = GC::create_function(m_realm->heap(), [this](JS::Value) -> WebIDL::ExceptionOr { read_chunk(); return JS::js_undefined(); }); auto shutdown = GC::create_function(heap(), [this](JS::Value) -> WebIDL::ExceptionOr { check_for_error_and_close_states(); return JS::js_undefined(); }); if (ready_promise) WebIDL::react_to_promise(*ready_promise, when_ready, shutdown); if (auto promise = m_reader->closed()) WebIDL::react_to_promise(*promise, shutdown, shutdown); } void ReadableStreamPipeTo::set_abort_signal(GC::Ref signal, DOM::AbortSignal::AbortSignal::AbortAlgorithmID signal_id) { m_signal = signal; m_signal_id = signal_id; } // https://streams.spec.whatwg.org/#rs-pipeTo-shutdown-with-action void ReadableStreamPipeTo::shutdown_with_action(GC::Ref()>> action, Optional original_error) { // 1. If shuttingDown is true, abort these substeps. if (m_shutting_down) return; // 2. Set shuttingDown to true. m_shutting_down = true; auto on_pending_writes_complete = [this, action, original_error = move(original_error)]() mutable { HTML::TemporaryExecutionContext execution_context { m_realm, HTML::TemporaryExecutionContext::CallbacksEnabled::Yes }; // 4. Let p be the result of performing action. auto promise = action->function()(); WebIDL::react_to_promise(promise, // 5. Upon fulfillment of p, finalize, passing along originalError if it was given. GC::create_function(heap(), [this, original_error = move(original_error)](JS::Value) mutable -> WebIDL::ExceptionOr { finish(move(original_error)); return JS::js_undefined(); }), // 6. Upon rejection of p with reason newError, finalize with newError. GC::create_function(heap(), [this](JS::Value new_error) -> WebIDL::ExceptionOr { finish(new_error); return JS::js_undefined(); })); }; // 3. If dest.[[state]] is "writable" and ! WritableStreamCloseQueuedOrInFlight(dest) is false, if (m_destination->state() == WritableStream::State::Writable && !writable_stream_close_queued_or_in_flight(m_destination)) { // 1. If any chunks have been read but not yet written, write them to dest. write_unwritten_chunks(); // 2. Wait until every chunk that has been read has been written (i.e. the corresponding promises have settled). wait_for_pending_writes_to_complete(move(on_pending_writes_complete)); } else { on_pending_writes_complete(); } } // https://streams.spec.whatwg.org/#rs-pipeTo-shutdown void ReadableStreamPipeTo::shutdown(Optional error) { // 1. If shuttingDown is true, abort these substeps. if (m_shutting_down) return; // 2. Set shuttingDown to true. m_shutting_down = true; auto on_pending_writes_complete = [this, error = move(error)]() mutable { HTML::TemporaryExecutionContext execution_context { m_realm, HTML::TemporaryExecutionContext::CallbacksEnabled::Yes }; // 4. Finalize, passing along error if it was given. finish(move(error)); }; // 3. If dest.[[state]] is "writable" and ! WritableStreamCloseQueuedOrInFlight(dest) is false, if (m_destination->state() == WritableStream::State::Writable && !writable_stream_close_queued_or_in_flight(m_destination)) { // 1. If any chunks have been read but not yet written, write them to dest. write_unwritten_chunks(); // 2. Wait until every chunk that has been read has been written (i.e. the corresponding promises have settled). wait_for_pending_writes_to_complete(move(on_pending_writes_complete)); } else { on_pending_writes_complete(); } } void ReadableStreamPipeTo::read_chunk() { // Shutdown must stop activity: if shuttingDown becomes true, the user agent must not initiate further reads from // reader, and must only perform writes of already-read chunks, as described below. In particular, the user agent // must check the below conditions before performing any reads or writes, since they might lead to immediate shutdown. if (check_for_error_and_close_states()) return; auto on_chunk = GC::create_function(heap(), [this](JS::Value chunk) { m_unwritten_chunks.append(chunk); if (check_for_error_and_close_states()) return; HTML::queue_a_microtask(nullptr, GC::create_function(m_realm->heap(), [this]() { HTML::TemporaryExecutionContext execution_context { m_realm, HTML::TemporaryExecutionContext::CallbacksEnabled::Yes }; write_chunk(); process(); })); }); auto on_complete = GC::create_function(heap(), [this]() { if (!check_for_error_and_close_states()) finish(); }); auto shutdown = GC::create_function(heap(), [this](JS::Value) -> WebIDL::ExceptionOr { check_for_error_and_close_states(); return JS::js_undefined(); }); auto read_request = heap().allocate(on_chunk, on_complete, shutdown); readable_stream_default_reader_read(m_reader, read_request); if (auto promise = m_writer->closed()) WebIDL::react_to_promise(*promise, shutdown, shutdown); } void ReadableStreamPipeTo::write_chunk() { // Shutdown must stop activity: if shuttingDown becomes true, the user agent must not initiate further reads from // reader, and must only perform writes of already-read chunks, as described below. In particular, the user agent // must check the below conditions before performing any reads or writes, since they might lead to immediate shutdown. if (!m_shutting_down && check_for_error_and_close_states()) return; auto promise = writable_stream_default_writer_write(m_writer, m_unwritten_chunks.take_first()); WebIDL::mark_promise_as_handled(promise); m_pending_writes.append(promise); } void ReadableStreamPipeTo::write_unwritten_chunks() { while (!m_unwritten_chunks.is_empty()) write_chunk(); } void ReadableStreamPipeTo::wait_for_pending_writes_to_complete(Function on_complete) { auto handler = GC::create_function(heap(), [this, on_complete = move(on_complete)]() { m_pending_writes.clear(); on_complete(); }); auto success_steps = [handler](Vector const&) { handler->function()(); }; auto failure_steps = [handler](JS::Value) { handler->function()(); }; WebIDL::wait_for_all(m_realm, m_pending_writes, move(success_steps), move(failure_steps)); } // https://streams.spec.whatwg.org/#rs-pipeTo-finalize // We call this `finish` instead of `finalize` to avoid conflicts with GC::Cell::finalize. void ReadableStreamPipeTo::finish(Optional error) { // 1. Perform ! WritableStreamDefaultWriterRelease(writer). writable_stream_default_writer_release(m_writer); // 2. If reader implements ReadableStreamBYOBReader, perform ! ReadableStreamBYOBReaderRelease(reader). // 3. Otherwise, perform ! ReadableStreamDefaultReaderRelease(reader). readable_stream_default_reader_release(m_reader); // 4. If signal is not undefined, remove abortAlgorithm from signal. if (m_signal) m_signal->remove_abort_algorithm(m_signal_id); // 5. If error was given, reject promise with error. if (error.has_value()) { WebIDL::reject_promise(m_realm, m_promise, *error); } // 6. Otherwise, resolve promise with undefined. else { WebIDL::resolve_promise(m_realm, m_promise, JS::js_undefined()); } m_reader->set_readable_stream_pipe_to_operation({}, nullptr); } bool ReadableStreamPipeTo::check_for_error_and_close_states() { // Error and close states must be propagated: the following conditions must be applied in order. return m_shutting_down || check_for_forward_errors() || check_for_backward_errors() || check_for_forward_close() || check_for_backward_close(); } bool ReadableStreamPipeTo::check_for_forward_errors() { // 1. Errors must be propagated forward: if source.[[state]] is or becomes "errored", then if (m_source->state() == ReadableStream::State::Errored) { // 1. If preventAbort is false, shutdown with an action of ! WritableStreamAbort(dest, source.[[storedError]]) // and with source.[[storedError]]. if (!m_prevent_abort) { auto action = GC::create_function(heap(), [this]() { return writable_stream_abort(m_destination, m_source->stored_error()); }); shutdown_with_action(action, m_source->stored_error()); } // 2. Otherwise, shutdown with source.[[storedError]]. else { shutdown(m_source->stored_error()); } } return m_shutting_down; } bool ReadableStreamPipeTo::check_for_backward_errors() { // 2. Errors must be propagated backward: if dest.[[state]] is or becomes "errored", then if (m_destination->state() == WritableStream::State::Errored) { // 1. If preventCancel is false, shutdown with an action of ! ReadableStreamCancel(source, dest.[[storedError]]) // and with dest.[[storedError]]. if (!m_prevent_cancel) { auto action = GC::create_function(heap(), [this]() { return readable_stream_cancel(m_source, m_destination->stored_error()); }); shutdown_with_action(action, m_destination->stored_error()); } // 2. Otherwise, shutdown with dest.[[storedError]]. else { shutdown(m_destination->stored_error()); } } return m_shutting_down; } bool ReadableStreamPipeTo::check_for_forward_close() { // 3. Closing must be propagated forward: if source.[[state]] is or becomes "closed", then if (m_source->state() == ReadableStream::State::Closed) { // 1. If preventClose is false, shutdown with an action of ! WritableStreamDefaultWriterCloseWithErrorPropagation(writer). if (!m_prevent_close) { auto action = GC::create_function(heap(), [this]() { return writable_stream_default_writer_close_with_error_propagation(m_writer); }); shutdown_with_action(action); } // 2. Otherwise, shutdown. else { shutdown(); } } return m_shutting_down; } bool ReadableStreamPipeTo::check_for_backward_close() { // 4. Closing must be propagated backward: if ! WritableStreamCloseQueuedOrInFlight(dest) is true or dest.[[state]] is "closed", then if (writable_stream_close_queued_or_in_flight(m_destination) || m_destination->state() == WritableStream::State::Closed) { // 1. Assert: no chunks have been read or written. // 2. Let destClosed be a new TypeError. auto destination_closed = JS::TypeError::create(m_realm, "Destination stream was closed during piping operation"sv); // 3. If preventCancel is false, shutdown with an action of ! ReadableStreamCancel(source, destClosed) and with destClosed. if (!m_prevent_cancel) { auto action = GC::create_function(heap(), [this, destination_closed]() { return readable_stream_cancel(m_source, destination_closed); }); shutdown_with_action(action, destination_closed); } // 4. Otherwise, shutdown with destClosed. else { shutdown(destination_closed); } } return m_shutting_down; } }