LibWeb: Implement ReadableStream transfer

This commit is contained in:
Timothy Flynn 2025-05-20 17:20:22 -04:00 committed by Tim Flynn
commit 312db85a84
Notes: github-actions[bot] 2025-05-21 10:56:01 +00:00
9 changed files with 511 additions and 28 deletions

View file

@ -49,6 +49,7 @@
#include <LibWeb/HTML/ImageData.h>
#include <LibWeb/HTML/MessagePort.h>
#include <LibWeb/HTML/StructuredSerialize.h>
#include <LibWeb/Streams/ReadableStream.h>
#include <LibWeb/WebIDL/DOMException.h>
#include <LibWeb/WebIDL/ExceptionOr.h>
@ -1293,7 +1294,8 @@ static bool is_interface_exposed_on_target_realm(TransferType name, JS::Realm& r
switch (name) {
case TransferType::MessagePort:
return intrinsics.is_exposed("MessagePort"sv);
break;
case TransferType::ReadableStream:
return intrinsics.is_exposed("ReadableStream"sv);
case TransferType::Unknown:
dbgln("Unknown interface type for transfer: {}", to_underlying(name));
break;
@ -1311,6 +1313,11 @@ static WebIDL::ExceptionOr<GC::Ref<Bindings::PlatformObject>> create_transferred
TRY(message_port->transfer_receiving_steps(transfer_data_holder));
return message_port;
}
case TransferType::ReadableStream: {
auto readable_stream = target_realm.create<Streams::ReadableStream>(target_realm);
TRY(readable_stream->transfer_receiving_steps(transfer_data_holder));
return readable_stream;
}
case TransferType::ArrayBuffer:
case TransferType::ResizableArrayBuffer:
dbgln("ArrayBuffer ({}) is not a platform object.", to_underlying(name));

View file

@ -49,6 +49,7 @@ enum class TransferType : u8 {
MessagePort = 1,
ArrayBuffer = 2,
ResizableArrayBuffer = 3,
ReadableStream = 4,
};
WebIDL::ExceptionOr<SerializationRecord> structured_serialize(JS::VM& vm, JS::Value);

View file

@ -11,6 +11,8 @@
#include <LibWeb/Bindings/Intrinsics.h>
#include <LibWeb/Bindings/ReadableStreamPrototype.h>
#include <LibWeb/DOM/AbortSignal.h>
#include <LibWeb/HTML/MessagePort.h>
#include <LibWeb/HTML/Scripting/TemporaryExecutionContext.h>
#include <LibWeb/Streams/AbstractOperations.h>
#include <LibWeb/Streams/ReadableByteStreamController.h>
#include <LibWeb/Streams/ReadableStream.h>
@ -89,6 +91,22 @@ ReadableStream::ReadableStream(JS::Realm& realm)
ReadableStream::~ReadableStream() = default;
void ReadableStream::initialize(JS::Realm& realm)
{
WEB_SET_PROTOTYPE_FOR_INTERFACE(ReadableStream);
Base::initialize(realm);
}
void ReadableStream::visit_edges(Cell::Visitor& visitor)
{
Base::visit_edges(visitor);
if (m_controller.has_value())
m_controller->visit([&](auto& controller) { visitor.visit(controller); });
visitor.visit(m_stored_error);
if (m_reader.has_value())
m_reader->visit([&](auto& reader) { visitor.visit(reader); });
}
// https://streams.spec.whatwg.org/#rs-locked
bool ReadableStream::locked() const
{
@ -218,22 +236,6 @@ void ReadableStream::error(JS::Value error)
});
}
void ReadableStream::initialize(JS::Realm& realm)
{
WEB_SET_PROTOTYPE_FOR_INTERFACE(ReadableStream);
Base::initialize(realm);
}
void ReadableStream::visit_edges(Cell::Visitor& visitor)
{
Base::visit_edges(visitor);
if (m_controller.has_value())
m_controller->visit([&](auto& controller) { visitor.visit(controller); });
visitor.visit(m_stored_error);
if (m_reader.has_value())
m_reader->visit([&](auto& reader) { visitor.visit(reader); });
}
// https://streams.spec.whatwg.org/#readablestream-locked
bool ReadableStream::is_readable() const
{
@ -453,4 +455,62 @@ GC::Ref<ReadableStream> ReadableStream::piped_through(GC::Ref<TransformStream> t
return transform->readable();
}
// https://streams.spec.whatwg.org/#ref-for-transfer-steps
WebIDL::ExceptionOr<void> ReadableStream::transfer_steps(HTML::TransferDataHolder& data_holder)
{
auto& realm = this->realm();
auto& vm = realm.vm();
HTML::TemporaryExecutionContext execution_context { realm, HTML::TemporaryExecutionContext::CallbacksEnabled::Yes };
// 1. If ! IsReadableStreamLocked(value) is true, throw a "DataCloneError" DOMException.
if (is_readable_stream_locked(*this))
return WebIDL::DataCloneError::create(realm, "Cannot transfer locked ReadableStream"_string);
// 2. Let port1 be a new MessagePort in the current Realm.
auto port1 = HTML::MessagePort::create(realm);
// 3. Let port2 be a new MessagePort in the current Realm.
auto port2 = HTML::MessagePort::create(realm, HTML::TransferType::ReadableStream);
// 4. Entangle port1 and port2.
port1->entangle_with(port2);
// 5. Let writable be a new WritableStream in the current Realm.
auto writable = realm.create<WritableStream>(realm);
// 6. Perform ! SetUpCrossRealmTransformWritable(writable, port1).
set_up_cross_realm_transform_writable(realm, writable, port1);
// 7. Let promise be ! ReadableStreamPipeTo(value, writable, false, false, false).
auto promise = readable_stream_pipe_to(*this, writable, false, false, false);
// 8. Set promise.[[PromiseIsHandled]] to true.
WebIDL::mark_promise_as_handled(promise);
// 9. Set dataHolder.[[port]] to ! StructuredSerializeWithTransfer(port2, « port2 »).
auto result = MUST(HTML::structured_serialize_with_transfer(vm, port2, { { GC::Root { port2 } } }));
data_holder = move(result.transfer_data_holders.first());
return {};
}
// https://streams.spec.whatwg.org/#ref-for-transfer-receiving-steps
WebIDL::ExceptionOr<void> ReadableStream::transfer_receiving_steps(HTML::TransferDataHolder& data_holder)
{
auto& realm = this->realm();
HTML::TemporaryExecutionContext execution_context { realm, HTML::TemporaryExecutionContext::CallbacksEnabled::Yes };
// 1. Let deserializedRecord be ! StructuredDeserializeWithTransfer(dataHolder.[[port]], the current Realm).
// 2. Let port be deserializedRecord.[[Deserialized]].
auto port = HTML::MessagePort::create(realm);
TRY(port->transfer_receiving_steps(data_holder));
// 3. Perform ! SetUpCrossRealmTransformReadable(value, port).
set_up_cross_realm_transform_readable(realm, *this, port);
return {};
}
}

View file

@ -11,6 +11,7 @@
#include <LibJS/Forward.h>
#include <LibWeb/Bindings/PlatformObject.h>
#include <LibWeb/Bindings/ReadableStreamPrototype.h>
#include <LibWeb/Bindings/Transferable.h>
#include <LibWeb/Forward.h>
#include <LibWeb/Streams/Algorithms.h>
#include <LibWeb/Streams/QueuingStrategy.h>
@ -58,7 +59,9 @@ struct ReadableStreamPair {
};
// https://streams.spec.whatwg.org/#readablestream
class ReadableStream final : public Bindings::PlatformObject {
class ReadableStream final
: public Bindings::PlatformObject
, public Bindings::Transferable {
WEB_PLATFORM_OBJECT(ReadableStream, Bindings::PlatformObject);
GC_DECLARE_ALLOCATOR(ReadableStream);
@ -113,6 +116,11 @@ public:
GC::Ptr<WebIDL::ArrayBufferView> current_byob_request_view();
// ^Transferable
virtual WebIDL::ExceptionOr<void> transfer_steps(HTML::TransferDataHolder&) override;
virtual WebIDL::ExceptionOr<void> transfer_receiving_steps(HTML::TransferDataHolder&) override;
virtual HTML::TransferType primary_interface() const override { return HTML::TransferType::ReadableStream; }
private:
explicit ReadableStream(JS::Realm&);
@ -123,10 +131,6 @@ private:
// A ReadableStreamDefaultController or ReadableByteStreamController created with the ability to control the state and queue of this stream
Optional<ReadableStreamController> m_controller;
// https://streams.spec.whatwg.org/#readablestream-detached
// A boolean flag set to true when the stream is transferred
bool m_detached { false };
// https://streams.spec.whatwg.org/#readablestream-disturbed
// A boolean flag set to true when the stream has been read from or canceled
bool m_disturbed { false };

View file

@ -2,9 +2,8 @@ Harness status: OK
Found 150 tests
140 Pass
141 Pass
9 Fail
1 Optional Feature Unsupported
Pass primitive undefined
Pass primitive null
Pass primitive true
@ -149,7 +148,7 @@ Pass A detached ArrayBuffer cannot be transferred
Pass A detached platform object cannot be transferred
Pass Transferring a non-transferable platform object fails
Pass An object whose interface is deleted from the global object must still be received
Optional Feature Unsupported A subclass instance will be received as its closest transferable superclass
Pass A subclass instance will be received as its closest transferable superclass
Pass Resizable ArrayBuffer is transferable
Fail Length-tracking TypedArray is transferable
Fail Length-tracking DataView is transferable

View file

@ -2,9 +2,8 @@ Harness status: OK
Found 150 tests
140 Pass
141 Pass
9 Fail
1 Optional Feature Unsupported
Pass primitive undefined
Pass primitive null
Pass primitive true
@ -149,7 +148,7 @@ Pass A detached ArrayBuffer cannot be transferred
Pass A detached platform object cannot be transferred
Pass Transferring a non-transferable platform object fails
Pass An object whose interface is deleted from the global object must still be received
Optional Feature Unsupported A subclass instance will be received as its closest transferable superclass
Pass A subclass instance will be received as its closest transferable superclass
Pass Resizable ArrayBuffer is transferable
Fail Length-tracking TypedArray is transferable
Fail Length-tracking DataView is transferable

View file

@ -0,0 +1,21 @@
Harness status: OK
Found 16 tests
16 Pass
Pass sending one chunk through a transferred stream should work
Pass sending ten chunks through a transferred stream should work
Pass sending ten chunks one at a time should work
Pass sending ten chunks on demand should work
Pass transferring a stream should relieve backpressure
Pass transferring a stream should add one chunk to the queue size
Pass the extra queue from transferring is counted in chunks
Pass cancel should be propagated to the original
Pass cancel should abort a pending read()
Pass stream cancel should not wait for underlying source cancel
Pass serialization should not happen until the value is read
Pass transferring a non-serializable chunk should error both sides
Pass errors should be passed through
Pass race between cancel() and error() should leave sides in different states
Pass race between cancel() and close() should be benign
Pass race between cancel() and enqueue() should be benign

View file

@ -0,0 +1,260 @@
<!DOCTYPE html>
<meta charset="utf-8">
<script src="../../resources/testharness.js"></script>
<script src="../../resources/testharnessreport.js"></script>
<script src="resources/helpers.js"></script>
<script src="../resources/recording-streams.js"></script>
<script src="../resources/test-utils.js"></script>
<script>
'use strict';
promise_test(async () => {
const rs = await createTransferredReadableStream({
start(controller) {
controller.enqueue('a');
controller.close();
}
});
const reader = rs.getReader();
{
const {value, done} = await reader.read();
assert_false(done, 'should not be done yet');
assert_equals(value, 'a', 'first chunk should be a');
}
{
const {done} = await reader.read();
assert_true(done, 'should be done now');
}
}, 'sending one chunk through a transferred stream should work');
promise_test(async () => {
let controller;
const rs = await createTransferredReadableStream({
start(c) {
controller = c;
}
});
for (let i = 0; i < 10; ++i) {
controller.enqueue(i);
}
controller.close();
const reader = rs.getReader();
for (let i = 0; i < 10; ++i) {
const {value, done} = await reader.read();
assert_false(done, 'should not be done yet');
assert_equals(value, i, 'chunk content should match index');
}
const {done} = await reader.read();
assert_true(done, 'should be done now');
}, 'sending ten chunks through a transferred stream should work');
promise_test(async () => {
let controller;
const rs = await createTransferredReadableStream({
start(c) {
controller = c;
}
});
const reader = rs.getReader();
for (let i = 0; i < 10; ++i) {
controller.enqueue(i);
const {value, done} = await reader.read();
assert_false(done, 'should not be done yet');
assert_equals(value, i, 'chunk content should match index');
}
controller.close();
const {done} = await reader.read();
assert_true(done, 'should be done now');
}, 'sending ten chunks one at a time should work');
promise_test(async () => {
let controller;
const rs = await createTransferredReadableStream({
start() {
this.counter = 0;
},
pull(controller) {
controller.enqueue(this.counter);
++this.counter;
if (this.counter === 10)
controller.close();
}
});
const reader = rs.getReader();
for (let i = 0; i < 10; ++i) {
const {value, done} = await reader.read();
assert_false(done, 'should not be done yet');
assert_equals(value, i, 'chunk content should match index');
}
const {done} = await reader.read();
assert_true(done, 'should be done now');
}, 'sending ten chunks on demand should work');
promise_test(async () => {
const rs = recordingReadableStream({}, { highWaterMark: 0 });
await delay(0);
assert_array_equals(rs.events, [], 'pull() should not have been called');
// Eat the message so it can't interfere with other tests.
addEventListener('message', () => {}, {once: true});
// The transfer is done manually to verify that it is posting the stream that
// relieves backpressure, not receiving it.
postMessage(rs, '*', [rs]);
await delay(0);
assert_array_equals(rs.events, ['pull'], 'pull() should have been called');
}, 'transferring a stream should relieve backpressure');
promise_test(async () => {
const rs = await recordingTransferredReadableStream({
pull(controller) {
controller.enqueue('a');
}
}, { highWaterMark: 2 });
await delay(0);
assert_array_equals(rs.events, ['pull', 'pull', 'pull'],
'pull() should have been called three times');
}, 'transferring a stream should add one chunk to the queue size');
promise_test(async () => {
const rs = await recordingTransferredReadableStream({
start(controller) {
controller.enqueue(new Uint8Array(1024));
controller.enqueue(new Uint8Array(1024));
}
}, new ByteLengthQueuingStrategy({highWaterMark: 512}));
await delay(0);
// At this point the queue contains 1024/512 bytes and 1/1 chunk, so it's full
// and pull() is not called.
assert_array_equals(rs.events, [], 'pull() should not have been called');
const reader = rs.getReader();
const {value, done} = await reader.read();
assert_false(done, 'we should not be done');
assert_equals(value.byteLength, 1024, 'expected chunk should be returned');
// Now the queue contains 0/512 bytes and 1/1 chunk, so pull() is called. If
// the implementation erroneously counted the extra queue space in bytes, then
// the queue would contain 1024/513 bytes and pull() wouldn't be called.
assert_array_equals(rs.events, ['pull'], 'pull() should have been called');
}, 'the extra queue from transferring is counted in chunks');
async function transferredReadableStreamWithCancelPromise() {
let resolveCancelCalled;
const cancelCalled = new Promise(resolve => {
resolveCancelCalled = resolve;
});
const rs = await recordingTransferredReadableStream({
cancel() {
resolveCancelCalled();
}
});
return { rs, cancelCalled };
}
promise_test(async () => {
const { rs, cancelCalled } = await transferredReadableStreamWithCancelPromise();
rs.cancel('message');
await cancelCalled;
assert_array_equals(rs.events, ['pull', 'cancel', 'message'],
'cancel() should have been called');
const reader = rs.getReader();
// Check the stream really got closed.
await reader.closed;
}, 'cancel should be propagated to the original');
promise_test(async () => {
const { rs, cancelCalled } = await transferredReadableStreamWithCancelPromise();
const reader = rs.getReader();
const readPromise = reader.read();
reader.cancel('done');
const { done } = await readPromise;
assert_true(done, 'should be done');
await cancelCalled;
assert_array_equals(rs.events, ['pull', 'cancel', 'done'],
'events should match');
}, 'cancel should abort a pending read()');
promise_test(async () => {
let cancelComplete = false;
const rs = await createTransferredReadableStream({
async cancel() {
await flushAsyncEvents();
cancelComplete = true;
}
});
await rs.cancel();
assert_false(cancelComplete,
'cancel() on the underlying sink should not have completed');
}, 'stream cancel should not wait for underlying source cancel');
promise_test(async t => {
const rs = await recordingTransferredReadableStream();
const reader = rs.getReader();
let serializationHappened = false;
rs.controller.enqueue({
get getter() {
serializationHappened = true;
return 'a';
}
});
await flushAsyncEvents();
assert_false(serializationHappened,
'serialization should not have happened yet');
const {value, done} = await reader.read();
assert_false(done, 'should not be done');
assert_equals(value.getter, 'a', 'getter should be a');
assert_true(serializationHappened,
'serialization should have happened');
}, 'serialization should not happen until the value is read');
promise_test(async t => {
const rs = await recordingTransferredReadableStream();
const reader = rs.getReader();
rs.controller.enqueue(new ReadableStream());
await promise_rejects_dom(t, 'DataCloneError', reader.read(),
'closed promise should reject');
assert_throws_js(TypeError, () => rs.controller.enqueue(),
'original stream should be errored');
}, 'transferring a non-serializable chunk should error both sides');
promise_test(async t => {
const rs = await createTransferredReadableStream({
start(controller) {
controller.error('foo');
}
});
const reader = rs.getReader();
return promise_rejects_exactly(t, 'foo', reader.read(),
'error should be passed through');
}, 'errors should be passed through');
promise_test(async () => {
const rs = await recordingTransferredReadableStream();
await delay(0);
const reader = rs.getReader();
reader.cancel();
rs.controller.error();
const {done} = await reader.read();
assert_true(done, 'should be done');
assert_throws_js(TypeError, () => rs.controller.enqueue(),
'enqueue should throw');
}, 'race between cancel() and error() should leave sides in different states');
promise_test(async () => {
const rs = await recordingTransferredReadableStream();
await delay(0);
const reader = rs.getReader();
reader.cancel();
rs.controller.close();
const {done} = await reader.read();
assert_true(done, 'should be done');
}, 'race between cancel() and close() should be benign');
promise_test(async () => {
const rs = await recordingTransferredReadableStream();
await delay(0);
const reader = rs.getReader();
reader.cancel();
rs.controller.enqueue('a');
const {done} = await reader.read();
assert_true(done, 'should be done');
}, 'race between cancel() and enqueue() should be benign');
</script>

View file

@ -0,0 +1,132 @@
'use strict';
(() => {
// Create a ReadableStream that will pass the tests in
// testTransferredReadableStream(), below.
function createOriginalReadableStream() {
return new ReadableStream({
start(controller) {
controller.enqueue('a');
controller.close();
}
});
}
// Common tests to roughly determine that |rs| is a correctly transferred
// version of a stream created by createOriginalReadableStream().
function testTransferredReadableStream(rs) {
assert_equals(rs.constructor, ReadableStream,
'rs should be a ReadableStream in this realm');
assert_true(rs instanceof ReadableStream,
'instanceof check should pass');
// Perform a brand-check on |rs| in the process of calling getReader().
const reader = ReadableStream.prototype.getReader.call(rs);
return reader.read().then(({value, done}) => {
assert_false(done, 'done should be false');
assert_equals(value, 'a', 'value should be "a"');
return reader.read();
}).then(({done}) => {
assert_true(done, 'done should be true');
});
}
function testMessage(msg) {
assert_array_equals(msg.ports, [], 'there should be no ports in the event');
return testTransferredReadableStream(msg.data);
}
function testMessageEvent(target) {
return new Promise((resolve, reject) => {
target.addEventListener('message', ev => {
try {
resolve(testMessage(ev));
} catch (e) {
reject(e);
}
}, {once: true});
});
}
function testMessageEventOrErrorMessage(target) {
return new Promise((resolve, reject) => {
target.addEventListener('message', ev => {
if (typeof ev.data === 'string') {
// Assume it's an error message and reject with it.
reject(ev.data);
return;
}
try {
resolve(testMessage(ev));
} catch (e) {
reject(e);
}
}, {once: true});
});
}
function checkTestResults(target) {
return new Promise((resolve, reject) => {
target.onmessage = msg => {
// testharness.js sends us objects which we need to ignore.
if (typeof msg.data !== 'string')
return;
if (msg.data === 'OK') {
resolve();
} else {
reject(msg.data);
}
};
});
}
// These tests assume that a transferred ReadableStream will behave the same
// regardless of how it was transferred. This enables us to simply transfer the
// stream to ourselves.
function createTransferredReadableStream(underlyingSource) {
const original = new ReadableStream(underlyingSource);
const promise = new Promise((resolve, reject) => {
addEventListener('message', msg => {
const rs = msg.data;
if (rs instanceof ReadableStream) {
resolve(rs);
} else {
reject(new Error(`what is this thing: "${rs}"?`));
}
}, {once: true});
});
postMessage(original, '*', [original]);
return promise;
}
function recordingTransferredReadableStream(underlyingSource, strategy) {
const original = recordingReadableStream(underlyingSource, strategy);
const promise = new Promise((resolve, reject) => {
addEventListener('message', msg => {
const rs = msg.data;
if (rs instanceof ReadableStream) {
rs.events = original.events;
rs.eventsWithoutPulls = original.eventsWithoutPulls;
rs.controller = original.controller;
resolve(rs);
} else {
reject(new Error(`what is this thing: "${rs}"?`));
}
}, {once: true});
});
postMessage(original, '*', [original]);
return promise;
}
self.createOriginalReadableStream = createOriginalReadableStream;
self.testMessage = testMessage;
self.testMessageEvent = testMessageEvent;
self.testMessageEventOrErrorMessage = testMessageEventOrErrorMessage;
self.checkTestResults = checkTestResults;
self.createTransferredReadableStream = createTransferredReadableStream;
self.recordingTransferredReadableStream = recordingTransferredReadableStream;
})();