LibWeb: Implement ReadableStreamPipeTo reads with less observability

The spec states:

    Public API must not be used: while reading or writing, or performing
    any of the operations below, the JavaScript-modifiable reader,
    writer, and stream APIs (i.e. methods on the appropriate prototypes)
    must not be used. Instead, the streams must be manipulated directly.

This migrates the default request request we were using to a custom read
request which does not involve extra promises.

I'm not sure about an analogous change with the way we write chunks to
the receiving end. There isn't a "WriteRequest" utility to be used here,
and no matter what AO we use, promises will be involved. Our current
implementation at least does not seem to affect any tests.
This commit is contained in:
Timothy Flynn 2025-04-13 09:51:09 -04:00
parent c19b777886
commit 8d275fbf14
2 changed files with 63 additions and 24 deletions

View file

@ -289,6 +289,55 @@ bool readable_stream_has_default_reader(ReadableStream const& stream)
return false;
}
class ReadableStreamPipeToReadRequest final : public ReadRequest {
GC_CELL(ReadableStreamPipeToReadRequest, ReadRequest);
GC_DECLARE_ALLOCATOR(ReadableStreamPipeToReadRequest);
using OnChunk = GC::Ref<GC::Function<void(JS::Value)>>;
using OnComplete = GC::Ref<GC::Function<void()>>;
// This has a return value just for compatibility with WebIDL::react_to_promise.
using OnError = GC::Ref<GC::Function<WebIDL::ExceptionOr<JS::Value>(JS::Value)>>;
public:
virtual void on_chunk(JS::Value chunk) override
{
m_on_chunk->function()(chunk);
}
virtual void on_close() override
{
m_on_complete->function()();
}
virtual void on_error(JS::Value error) override
{
MUST(m_on_error->function()(error));
}
private:
ReadableStreamPipeToReadRequest(OnChunk on_chunk, OnComplete on_complete, OnError on_error)
: m_on_chunk(on_chunk)
, m_on_complete(on_complete)
, m_on_error(on_error)
{
}
virtual void visit_edges(Visitor& visitor) override
{
Base::visit_edges(visitor);
visitor.visit(m_on_chunk);
visitor.visit(m_on_complete);
visitor.visit(m_on_error);
}
OnChunk m_on_chunk;
OnComplete m_on_complete;
OnError m_on_error;
};
GC_DEFINE_ALLOCATOR(ReadableStreamPipeToReadRequest);
// https://streams.spec.whatwg.org/#ref-for-in-parallel
class ReadableStreamPipeTo final : public JS::Cell {
GC_CELL(ReadableStreamPipeTo, JS::Cell);
@ -446,26 +495,15 @@ private:
if (check_for_error_and_close_states())
return;
auto when_ready = GC::create_function(heap(), [this](JS::Value value) -> WebIDL::ExceptionOr<JS::Value> {
auto& vm = this->vm();
auto on_chunk = GC::create_function(heap(), [this](JS::Value chunk) {
m_unwritten_chunks.append(chunk);
write_chunk();
process();
});
VERIFY(value.is_object());
auto& object = value.as_object();
auto done = MUST(JS::iterator_complete(vm, object));
if (done) {
if (!check_for_error_and_close_states())
finish();
} else {
auto chunk = MUST(JS::iterator_value(vm, object));
m_unwritten_chunks.append(chunk);
write_chunk();
process();
}
return JS::js_undefined();
auto on_complete = GC::create_function(heap(), [this]() {
if (!check_for_error_and_close_states())
finish();
});
auto shutdown = GC::create_function(heap(), [this](JS::Value) -> WebIDL::ExceptionOr<JS::Value> {
@ -473,7 +511,8 @@ private:
return JS::js_undefined();
});
WebIDL::react_to_promise(m_reader->read(), when_ready, shutdown);
auto read_request = heap().allocate<ReadableStreamPipeToReadRequest>(on_chunk, on_complete, shutdown);
readable_stream_default_reader_read(m_reader, read_request);
if (auto promise = m_writer->closed())
WebIDL::react_to_promise(*promise, shutdown, shutdown);
@ -487,7 +526,7 @@ private:
if (!m_shutting_down && check_for_error_and_close_states())
return;
auto promise = m_writer->write(m_unwritten_chunks.take_first());
auto promise = writable_stream_default_writer_write(m_writer, m_unwritten_chunks.take_first());
WebIDL::mark_promise_as_handled(promise);
m_pending_writes.append(promise);

View file

@ -2,6 +2,6 @@ Harness status: OK
Found 2 tests
2 Fail
Fail piping should not be observable
Fail tee should not be observable
2 Pass
Pass piping should not be observable
Pass tee should not be observable