diff --git a/Libraries/LibWeb/Streams/AbstractOperations.cpp b/Libraries/LibWeb/Streams/AbstractOperations.cpp index dc3dba99711..64307311722 100644 --- a/Libraries/LibWeb/Streams/AbstractOperations.cpp +++ b/Libraries/LibWeb/Streams/AbstractOperations.cpp @@ -289,6 +289,55 @@ bool readable_stream_has_default_reader(ReadableStream const& stream) return false; } +class ReadableStreamPipeToReadRequest final : public ReadRequest { + GC_CELL(ReadableStreamPipeToReadRequest, ReadRequest); + GC_DECLARE_ALLOCATOR(ReadableStreamPipeToReadRequest); + + using OnChunk = GC::Ref>; + using OnComplete = GC::Ref>; + + // This has a return value just for compatibility with WebIDL::react_to_promise. + using OnError = GC::Ref(JS::Value)>>; + +public: + virtual void on_chunk(JS::Value chunk) override + { + m_on_chunk->function()(chunk); + } + + virtual void on_close() override + { + m_on_complete->function()(); + } + + virtual void on_error(JS::Value error) override + { + MUST(m_on_error->function()(error)); + } + +private: + ReadableStreamPipeToReadRequest(OnChunk on_chunk, OnComplete on_complete, OnError on_error) + : m_on_chunk(on_chunk) + , m_on_complete(on_complete) + , m_on_error(on_error) + { + } + + virtual void visit_edges(Visitor& visitor) override + { + Base::visit_edges(visitor); + visitor.visit(m_on_chunk); + visitor.visit(m_on_complete); + visitor.visit(m_on_error); + } + + OnChunk m_on_chunk; + OnComplete m_on_complete; + OnError m_on_error; +}; + +GC_DEFINE_ALLOCATOR(ReadableStreamPipeToReadRequest); + // https://streams.spec.whatwg.org/#ref-for-in-parallel class ReadableStreamPipeTo final : public JS::Cell { GC_CELL(ReadableStreamPipeTo, JS::Cell); @@ -446,26 +495,15 @@ private: if (check_for_error_and_close_states()) return; - auto when_ready = GC::create_function(heap(), [this](JS::Value value) -> WebIDL::ExceptionOr { - auto& vm = this->vm(); + auto on_chunk = GC::create_function(heap(), [this](JS::Value chunk) { + m_unwritten_chunks.append(chunk); + write_chunk(); + process(); + }); - VERIFY(value.is_object()); - auto& object = value.as_object(); - - auto done = MUST(JS::iterator_complete(vm, object)); - - if (done) { - if (!check_for_error_and_close_states()) - finish(); - } else { - auto chunk = MUST(JS::iterator_value(vm, object)); - m_unwritten_chunks.append(chunk); - - write_chunk(); - process(); - } - - return JS::js_undefined(); + auto on_complete = GC::create_function(heap(), [this]() { + if (!check_for_error_and_close_states()) + finish(); }); auto shutdown = GC::create_function(heap(), [this](JS::Value) -> WebIDL::ExceptionOr { @@ -473,7 +511,8 @@ private: return JS::js_undefined(); }); - WebIDL::react_to_promise(m_reader->read(), when_ready, shutdown); + auto read_request = heap().allocate(on_chunk, on_complete, shutdown); + readable_stream_default_reader_read(m_reader, read_request); if (auto promise = m_writer->closed()) WebIDL::react_to_promise(*promise, shutdown, shutdown); @@ -487,7 +526,7 @@ private: if (!m_shutting_down && check_for_error_and_close_states()) return; - auto promise = m_writer->write(m_unwritten_chunks.take_first()); + auto promise = writable_stream_default_writer_write(m_writer, m_unwritten_chunks.take_first()); WebIDL::mark_promise_as_handled(promise); m_pending_writes.append(promise); diff --git a/Tests/LibWeb/Text/expected/wpt-import/streams/piping/then-interception.any.txt b/Tests/LibWeb/Text/expected/wpt-import/streams/piping/then-interception.any.txt index 1419b8486e2..682f56caaf0 100644 --- a/Tests/LibWeb/Text/expected/wpt-import/streams/piping/then-interception.any.txt +++ b/Tests/LibWeb/Text/expected/wpt-import/streams/piping/then-interception.any.txt @@ -2,6 +2,6 @@ Harness status: OK Found 2 tests -2 Fail -Fail piping should not be observable -Fail tee should not be observable \ No newline at end of file +2 Pass +Pass piping should not be observable +Pass tee should not be observable \ No newline at end of file