From 3c6010c663dddb78aa84b85e7a27307c4841a9dd Mon Sep 17 00:00:00 2001 From: Timothy Flynn Date: Sun, 13 Apr 2025 09:51:09 -0400 Subject: [PATCH] LibWeb: Implement ReadableStreamPipeTo reads with less observability The spec states: Public API must not be used: while reading or writing, or performing any of the operations below, the JavaScript-modifiable reader, writer, and stream APIs (i.e. methods on the appropriate prototypes) must not be used. Instead, the streams must be manipulated directly. This migrates the default request request we were using to a custom read request which does not involve extra promises. I'm not sure about an analogous change with the way we write chunks to the receiving end. There isn't a "WriteRequest" utility to be used here, and no matter what AO we use, promises will be involved. Our current implementation at least does not seem to affect any tests. --- .../LibWeb/Streams/AbstractOperations.cpp | 81 ++++++++++++++----- .../streams/piping/then-interception.any.txt | 6 +- 2 files changed, 63 insertions(+), 24 deletions(-) diff --git a/Libraries/LibWeb/Streams/AbstractOperations.cpp b/Libraries/LibWeb/Streams/AbstractOperations.cpp index dc3dba99711..64307311722 100644 --- a/Libraries/LibWeb/Streams/AbstractOperations.cpp +++ b/Libraries/LibWeb/Streams/AbstractOperations.cpp @@ -289,6 +289,55 @@ bool readable_stream_has_default_reader(ReadableStream const& stream) return false; } +class ReadableStreamPipeToReadRequest final : public ReadRequest { + GC_CELL(ReadableStreamPipeToReadRequest, ReadRequest); + GC_DECLARE_ALLOCATOR(ReadableStreamPipeToReadRequest); + + using OnChunk = GC::Ref>; + using OnComplete = GC::Ref>; + + // This has a return value just for compatibility with WebIDL::react_to_promise. + using OnError = GC::Ref(JS::Value)>>; + +public: + virtual void on_chunk(JS::Value chunk) override + { + m_on_chunk->function()(chunk); + } + + virtual void on_close() override + { + m_on_complete->function()(); + } + + virtual void on_error(JS::Value error) override + { + MUST(m_on_error->function()(error)); + } + +private: + ReadableStreamPipeToReadRequest(OnChunk on_chunk, OnComplete on_complete, OnError on_error) + : m_on_chunk(on_chunk) + , m_on_complete(on_complete) + , m_on_error(on_error) + { + } + + virtual void visit_edges(Visitor& visitor) override + { + Base::visit_edges(visitor); + visitor.visit(m_on_chunk); + visitor.visit(m_on_complete); + visitor.visit(m_on_error); + } + + OnChunk m_on_chunk; + OnComplete m_on_complete; + OnError m_on_error; +}; + +GC_DEFINE_ALLOCATOR(ReadableStreamPipeToReadRequest); + // https://streams.spec.whatwg.org/#ref-for-in-parallel class ReadableStreamPipeTo final : public JS::Cell { GC_CELL(ReadableStreamPipeTo, JS::Cell); @@ -446,26 +495,15 @@ private: if (check_for_error_and_close_states()) return; - auto when_ready = GC::create_function(heap(), [this](JS::Value value) -> WebIDL::ExceptionOr { - auto& vm = this->vm(); + auto on_chunk = GC::create_function(heap(), [this](JS::Value chunk) { + m_unwritten_chunks.append(chunk); + write_chunk(); + process(); + }); - VERIFY(value.is_object()); - auto& object = value.as_object(); - - auto done = MUST(JS::iterator_complete(vm, object)); - - if (done) { - if (!check_for_error_and_close_states()) - finish(); - } else { - auto chunk = MUST(JS::iterator_value(vm, object)); - m_unwritten_chunks.append(chunk); - - write_chunk(); - process(); - } - - return JS::js_undefined(); + auto on_complete = GC::create_function(heap(), [this]() { + if (!check_for_error_and_close_states()) + finish(); }); auto shutdown = GC::create_function(heap(), [this](JS::Value) -> WebIDL::ExceptionOr { @@ -473,7 +511,8 @@ private: return JS::js_undefined(); }); - WebIDL::react_to_promise(m_reader->read(), when_ready, shutdown); + auto read_request = heap().allocate(on_chunk, on_complete, shutdown); + readable_stream_default_reader_read(m_reader, read_request); if (auto promise = m_writer->closed()) WebIDL::react_to_promise(*promise, shutdown, shutdown); @@ -487,7 +526,7 @@ private: if (!m_shutting_down && check_for_error_and_close_states()) return; - auto promise = m_writer->write(m_unwritten_chunks.take_first()); + auto promise = writable_stream_default_writer_write(m_writer, m_unwritten_chunks.take_first()); WebIDL::mark_promise_as_handled(promise); m_pending_writes.append(promise); diff --git a/Tests/LibWeb/Text/expected/wpt-import/streams/piping/then-interception.any.txt b/Tests/LibWeb/Text/expected/wpt-import/streams/piping/then-interception.any.txt index 1419b8486e2..682f56caaf0 100644 --- a/Tests/LibWeb/Text/expected/wpt-import/streams/piping/then-interception.any.txt +++ b/Tests/LibWeb/Text/expected/wpt-import/streams/piping/then-interception.any.txt @@ -2,6 +2,6 @@ Harness status: OK Found 2 tests -2 Fail -Fail piping should not be observable -Fail tee should not be observable \ No newline at end of file +2 Pass +Pass piping should not be observable +Pass tee should not be observable \ No newline at end of file