From cb4e020de5b95647dcead2c503c7480b22cd3b83 Mon Sep 17 00:00:00 2001 From: Mattias Buelens Date: Fri, 15 Dec 2023 22:30:47 +0100 Subject: [PATCH 01/20] stream: fix code style --- lib/internal/webstreams/transformstream.js | 39 +++++++++++----------- 1 file changed, 20 insertions(+), 19 deletions(-) diff --git a/lib/internal/webstreams/transformstream.js b/lib/internal/webstreams/transformstream.js index 3dc85b79a8273e..9fd1b36a45914c 100644 --- a/lib/internal/webstreams/transformstream.js +++ b/lib/internal/webstreams/transformstream.js @@ -668,25 +668,26 @@ function transformStreamDefaultSourceCancelAlgorithm(stream, reason) { reason); transformStreamDefaultControllerClearAlgorithms(controller); - PromisePrototypeThen(cancelPromise, - () => { - if (writable[kState].state === 'errored') - reject(writable[kState].storedError); - else { - writableStreamDefaultControllerErrorIfNeeded( - writable[kState].controller, - reason); - transformStreamUnblockWrite(stream); - resolve(); - } - }, - (error) => { - writableStreamDefaultControllerErrorIfNeeded( - writable[kState].controller, - error); - transformStreamUnblockWrite(stream); - reject(error); - }, + PromisePrototypeThen( + cancelPromise, + () => { + if (writable[kState].state === 'errored') + reject(writable[kState].storedError); + else { + writableStreamDefaultControllerErrorIfNeeded( + writable[kState].controller, + reason); + transformStreamUnblockWrite(stream); + resolve(); + } + }, + (error) => { + writableStreamDefaultControllerErrorIfNeeded( + writable[kState].controller, + error); + transformStreamUnblockWrite(stream); + reject(error); + }, ); return controller[kState].finishPromise; From 46022b87c83c89a406c5d62b675f58f763f9d560 Mon Sep 17 00:00:00 2001 From: Mattias Buelens Date: Mon, 4 Dec 2023 18:14:46 +0100 Subject: [PATCH 02/20] stream: avoid PromiseResolve in ensureIsPromise --- lib/internal/webstreams/util.js | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/lib/internal/webstreams/util.js b/lib/internal/webstreams/util.js index 1979c55667b167..b31a0d3380a36e 100644 --- a/lib/internal/webstreams/util.js +++ b/lib/internal/webstreams/util.js @@ -9,7 +9,7 @@ const { MathMax, NumberIsNaN, PromisePrototypeThen, - PromiseResolve, + Promise, PromiseReject, ReflectGet, Symbol, @@ -183,7 +183,7 @@ function enqueueValueWithSize(controller, value, size) { function ensureIsPromise(fn, thisArg, ...args) { try { const value = FunctionPrototypeCall(fn, thisArg, ...args); - return isPromise(value) ? value : PromiseResolve(value); + return new Promise((r) => r(value)); } catch (error) { return PromiseReject(error); } From 62fc1e142f533bbf93c626ee8b46a90892c561db Mon Sep 17 00:00:00 2001 From: Mattias Buelens Date: Mon, 4 Dec 2023 18:30:10 +0100 Subject: [PATCH 03/20] stream: fix handling sync errors from source cancel and sink abort --- lib/internal/webstreams/readablestream.js | 16 +++++----------- lib/internal/webstreams/writablestream.js | 7 ++----- 2 files changed, 7 insertions(+), 16 deletions(-) diff --git a/lib/internal/webstreams/readablestream.js b/lib/internal/webstreams/readablestream.js index 4af209349341d1..9785899e2de47a 100644 --- a/lib/internal/webstreams/readablestream.js +++ b/lib/internal/webstreams/readablestream.js @@ -1991,10 +1991,7 @@ function readableStreamCancel(stream, reason) { } return PromisePrototypeThen( - ensureIsPromise( - stream[kState].controller[kCancel], - stream[kState].controller, - reason), + stream[kState].controller[kCancel](reason), () => {}); } @@ -2389,12 +2386,9 @@ function readableStreamDefaultControllerError(controller, error) { function readableStreamDefaultControllerCancelSteps(controller, reason) { resetQueue(controller); - try { - const result = controller[kState].cancelAlgorithm(reason); - return result; - } finally { - readableStreamDefaultControllerClearAlgorithms(controller); - } + const result = ensureIsPromise(controller[kState].cancelAlgorithm, controller, reason); + readableStreamDefaultControllerClearAlgorithms(controller); + return result; } function readableStreamDefaultControllerPullSteps(controller, readRequest) { @@ -3126,7 +3120,7 @@ function readableByteStreamControllerError(controller, error) { function readableByteStreamControllerCancelSteps(controller, reason) { readableByteStreamControllerClearPendingPullIntos(controller); resetQueue(controller); - const result = controller[kState].cancelAlgorithm(reason); + const result = ensureIsPromise(controller[kState].cancelAlgorithm, controller, reason); readableByteStreamControllerClearAlgorithms(controller); return result; } diff --git a/lib/internal/webstreams/writablestream.js b/lib/internal/webstreams/writablestream.js index 6dd7bc65566db6..9c88f5a5a9f9c8 100644 --- a/lib/internal/webstreams/writablestream.js +++ b/lib/internal/webstreams/writablestream.js @@ -539,7 +539,7 @@ class WritableStreamDefaultController { } [kAbort](reason) { - const result = this[kState].abortAlgorithm(reason); + const result = ensureIsPromise(this[kState].abortAlgorithm, this, reason); writableStreamDefaultControllerClearAlgorithms(this); return result; } @@ -907,10 +907,7 @@ function writableStreamFinishErroring(stream) { return; } PromisePrototypeThen( - ensureIsPromise( - stream[kState].controller[kAbort], - stream[kState].controller, - abortRequest.reason), + stream[kState].controller[kAbort](abortRequest.reason), () => { abortRequest.abort.resolve?.(); writableStreamRejectCloseAndClosedPromiseIfNeeded(stream); From 19e8940b351a946fd2607160ce836052c2730d3c Mon Sep 17 00:00:00 2001 From: Mattias Buelens Date: Mon, 4 Dec 2023 23:14:59 +0100 Subject: [PATCH 04/20] stream: check for promise only when constructing from source/sink/transformer --- lib/internal/webstreams/readablestream.js | 17 ++++----- lib/internal/webstreams/transformstream.js | 43 ++++++++-------------- lib/internal/webstreams/writablestream.js | 14 ++++--- 3 files changed, 32 insertions(+), 42 deletions(-) diff --git a/lib/internal/webstreams/readablestream.js b/lib/internal/webstreams/readablestream.js index 9785899e2de47a..ab4ad2de9beac9 100644 --- a/lib/internal/webstreams/readablestream.js +++ b/lib/internal/webstreams/readablestream.js @@ -2356,7 +2356,7 @@ function readableStreamDefaultControllerCallPullIfNeeded(controller) { assert(!controller[kState].pullAgain); controller[kState].pulling = true; PromisePrototypeThen( - ensureIsPromise(controller[kState].pullAlgorithm, controller), + controller[kState].pullAlgorithm(), () => { controller[kState].pulling = false; if (controller[kState].pullAgain) { @@ -2386,7 +2386,7 @@ function readableStreamDefaultControllerError(controller, error) { function readableStreamDefaultControllerCancelSteps(controller, reason) { resetQueue(controller); - const result = ensureIsPromise(controller[kState].cancelAlgorithm, controller, reason); + const result = controller[kState].cancelAlgorithm(reason); readableStreamDefaultControllerClearAlgorithms(controller); return result; } @@ -2462,11 +2462,10 @@ function setupReadableStreamDefaultControllerFromSource( FunctionPrototypeBind(start, source, controller) : nonOpStart; const pullAlgorithm = pull ? - FunctionPrototypeBind(pull, source, controller) : + () => ensureIsPromise(pull, source, controller) : nonOpPull; - const cancelAlgorithm = cancel ? - FunctionPrototypeBind(cancel, source) : + (reason) => ensureIsPromise(cancel, source, reason) : nonOpCancel; setupReadableStreamDefaultController( @@ -3094,7 +3093,7 @@ function readableByteStreamControllerCallPullIfNeeded(controller) { assert(!controller[kState].pullAgain); controller[kState].pulling = true; PromisePrototypeThen( - ensureIsPromise(controller[kState].pullAlgorithm, controller), + controller[kState].pullAlgorithm(), () => { controller[kState].pulling = false; if (controller[kState].pullAgain) { @@ -3120,7 +3119,7 @@ function readableByteStreamControllerError(controller, error) { function readableByteStreamControllerCancelSteps(controller, reason) { readableByteStreamControllerClearPendingPullIntos(controller); resetQueue(controller); - const result = ensureIsPromise(controller[kState].cancelAlgorithm, controller, reason); + const result = controller[kState].cancelAlgorithm(reason); readableByteStreamControllerClearAlgorithms(controller); return result; } @@ -3261,10 +3260,10 @@ function setupReadableByteStreamControllerFromSource( FunctionPrototypeBind(start, source, controller) : nonOpStart; const pullAlgorithm = pull ? - FunctionPrototypeBind(pull, source, controller) : + () => ensureIsPromise(pull, source, controller) : nonOpPull; const cancelAlgorithm = cancel ? - FunctionPrototypeBind(cancel, source) : + (reason) => ensureIsPromise(cancel, source, reason) : nonOpCancel; if (autoAllocateChunkSize === 0) { diff --git a/lib/internal/webstreams/transformstream.js b/lib/internal/webstreams/transformstream.js index 9fd1b36a45914c..b75e8fc5761f72 100644 --- a/lib/internal/webstreams/transformstream.js +++ b/lib/internal/webstreams/transformstream.js @@ -463,15 +463,18 @@ function setupTransformStreamDefaultControllerFromTransformer( stream, transformer) { const controller = new TransformStreamDefaultController(kSkipThrow); - const transform = transformer?.transform || defaultTransformAlgorithm; - const flush = transformer?.flush || nonOpFlush; - const cancel = transformer?.cancel || nonOpCancel; - const transformAlgorithm = - FunctionPrototypeBind(transform, transformer); - const flushAlgorithm = - FunctionPrototypeBind(flush, transformer); - const cancelAlgorithm = - FunctionPrototypeBind(cancel, transformer); + const transform = transformer?.transform; + const flush = transformer?.flush; + const cancel = transformer?.cancel; + const transformAlgorithm = transform ? + (chunk) => ensureIsPromise(transform, transformer, chunk, controller) : + defaultTransformAlgorithm; + const flushAlgorithm = flush ? + () => ensureIsPromise(flush, transformer, controller) : + nonOpFlush; + const cancelAlgorithm = cancel ? + (reason) => ensureIsPromise(cancel, transformer, reason) : + nonOpCancel; setupTransformStreamDefaultController( stream, @@ -519,11 +522,7 @@ function transformStreamDefaultControllerError(controller, error) { async function transformStreamDefaultControllerPerformTransform(controller, chunk) { try { - return await ensureIsPromise( - controller[kState].transformAlgorithm, - controller, - chunk, - controller); + return await controller[kState].transformAlgorithm(chunk); } catch (error) { transformStreamError(controller[kState].stream, error); throw error; @@ -584,10 +583,7 @@ async function transformStreamDefaultSinkAbortAlgorithm(stream, reason) { const { promise, resolve, reject } = createDeferredPromise(); controller[kState].finishPromise = promise; - const cancelPromise = ensureIsPromise( - controller[kState].cancelAlgorithm, - controller, - reason); + const cancelPromise = controller[kState].cancelAlgorithm(reason); transformStreamDefaultControllerClearAlgorithms(controller); PromisePrototypeThen( @@ -620,11 +616,7 @@ function transformStreamDefaultSinkCloseAlgorithm(stream) { } const { promise, resolve, reject } = createDeferredPromise(); controller[kState].finishPromise = promise; - const flushPromise = - ensureIsPromise( - controller[kState].flushAlgorithm, - controller, - controller); + const flushPromise = controller[kState].flushAlgorithm(); transformStreamDefaultControllerClearAlgorithms(controller); PromisePrototypeThen( flushPromise, @@ -662,10 +654,7 @@ function transformStreamDefaultSourceCancelAlgorithm(stream, reason) { const { promise, resolve, reject } = createDeferredPromise(); controller[kState].finishPromise = promise; - const cancelPromise = ensureIsPromise( - controller[kState].cancelAlgorithm, - controller, - reason); + const cancelPromise = controller[kState].cancelAlgorithm(reason); transformStreamDefaultControllerClearAlgorithms(controller); PromisePrototypeThen( diff --git a/lib/internal/webstreams/writablestream.js b/lib/internal/webstreams/writablestream.js index 9c88f5a5a9f9c8..77e21e2b3b6abf 100644 --- a/lib/internal/webstreams/writablestream.js +++ b/lib/internal/webstreams/writablestream.js @@ -539,7 +539,7 @@ class WritableStreamDefaultController { } [kAbort](reason) { - const result = ensureIsPromise(this[kState].abortAlgorithm, this, reason); + const result = this[kState].abortAlgorithm(reason); writableStreamDefaultControllerClearAlgorithms(this); return result; } @@ -1111,7 +1111,7 @@ function writableStreamDefaultControllerProcessWrite(controller, chunk) { writableStreamMarkFirstWriteRequestInFlight(stream); PromisePrototypeThen( - ensureIsPromise(writeAlgorithm, controller, chunk, controller), + writeAlgorithm(chunk), () => { writableStreamFinishInFlightWrite(stream); const { @@ -1144,7 +1144,7 @@ function writableStreamDefaultControllerProcessClose(controller) { writableStreamMarkCloseRequestInFlight(stream); dequeueValue(controller); assert(!queue.length); - const sinkClosePromise = ensureIsPromise(closeAlgorithm, controller); + const sinkClosePromise = closeAlgorithm(); writableStreamDefaultControllerClearAlgorithms(controller); PromisePrototypeThen( sinkClosePromise, @@ -1243,12 +1243,14 @@ function setupWritableStreamDefaultControllerFromSink( FunctionPrototypeBind(start, sink, controller) : nonOpStart; const writeAlgorithm = write ? - FunctionPrototypeBind(write, sink) : + (chunk) => ensureIsPromise(write, sink, chunk, controller) : nonOpWrite; const closeAlgorithm = close ? - FunctionPrototypeBind(close, sink) : nonOpCancel; + () => ensureIsPromise(close, sink) : + nonOpCancel; const abortAlgorithm = abort ? - FunctionPrototypeBind(abort, sink) : nonOpCancel; + (reason) => ensureIsPromise(abort, sink, reason) : + nonOpCancel; setupWritableStreamDefaultController( stream, controller, From ab33b45ec7529120dbc798fe18e0aa87d9168998 Mon Sep 17 00:00:00 2001 From: Mattias Buelens Date: Mon, 4 Dec 2023 23:28:35 +0100 Subject: [PATCH 05/20] stream: fix TeeReadableStream --- lib/internal/webstreams/readablestream.js | 16 +++++++++------- 1 file changed, 9 insertions(+), 7 deletions(-) diff --git a/lib/internal/webstreams/readablestream.js b/lib/internal/webstreams/readablestream.js index ab4ad2de9beac9..e2e3b319841748 100644 --- a/lib/internal/webstreams/readablestream.js +++ b/lib/internal/webstreams/readablestream.js @@ -1233,23 +1233,25 @@ function TeeReadableStream(start, pull, cancel) { this[kType] = 'ReadableStream'; this[kState] = { disturbed: false, + reader: undefined, state: 'readable', storedError: undefined, stream: undefined, transfer: { writable: undefined, - port: undefined, + port1: undefined, + port2: undefined, promise: undefined, }, }; this[kIsClosedPromise] = createDeferredPromise(); - setupReadableStreamDefaultControllerFromSource( + const controller = new ReadableStreamDefaultController(kSkipThrow); + setupReadableStreamDefaultController( this, - ObjectCreate(null, { - start: { __proto__: null, value: start }, - pull: { __proto__: null, value: pull }, - cancel: { __proto__: null, value: cancel }, - }), + controller, + start, + pull, + cancel, 1, () => 1); } From 463f49a4017be43259d561a135622aa49cb773a6 Mon Sep 17 00:00:00 2001 From: Mattias Buelens Date: Mon, 4 Dec 2023 23:29:58 +0100 Subject: [PATCH 06/20] stream: rename TeeReadableStream to InternalReadableStream --- lib/internal/webstreams/readablestream.js | 20 ++++++++++---------- 1 file changed, 10 insertions(+), 10 deletions(-) diff --git a/lib/internal/webstreams/readablestream.js b/lib/internal/webstreams/readablestream.js index e2e3b319841748..8e0ad753135b65 100644 --- a/lib/internal/webstreams/readablestream.js +++ b/lib/internal/webstreams/readablestream.js @@ -1228,7 +1228,7 @@ ObjectDefineProperties(ReadableByteStreamController.prototype, { [SymbolToStringTag]: getNonWritablePropertyDescriptor(ReadableByteStreamController.name), }); -function TeeReadableStream(start, pull, cancel) { +function InternalReadableStream(start, pull, cancel) { markTransferMode(this, false, true); this[kType] = 'ReadableStream'; this[kState] = { @@ -1256,15 +1256,15 @@ function TeeReadableStream(start, pull, cancel) { () => 1); } -ObjectSetPrototypeOf(TeeReadableStream.prototype, ReadableStream.prototype); -ObjectSetPrototypeOf(TeeReadableStream, ReadableStream); +ObjectSetPrototypeOf(InternalReadableStream.prototype, ReadableStream.prototype); +ObjectSetPrototypeOf(InternalReadableStream, ReadableStream); -function createTeeReadableStream(start, pull, cancel) { - const tee = new TeeReadableStream(start, pull, cancel); +function createReadableStream(start, pull, cancel) { + const stream = new InternalReadableStream(start, pull, cancel); - // For spec compliance the Tee must be a ReadableStream - tee.constructor = ReadableStream; - return tee; + // For spec compliance the InternalReadableStream must be a ReadableStream + stream.constructor = ReadableStream; + return stream; } const isReadableStream = @@ -1654,9 +1654,9 @@ function readableStreamDefaultTee(stream, cloneForBranch2) { } branch1 = - createTeeReadableStream(nonOpStart, pullAlgorithm, cancel1Algorithm); + createReadableStream(nonOpStart, pullAlgorithm, cancel1Algorithm); branch2 = - createTeeReadableStream(nonOpStart, pullAlgorithm, cancel2Algorithm); + createReadableStream(nonOpStart, pullAlgorithm, cancel2Algorithm); PromisePrototypeThen( reader[kState].close.promise, From 36324d5d6cbc56c62c5675ecbab7bf8d9c7179ac Mon Sep 17 00:00:00 2001 From: Mattias Buelens Date: Mon, 4 Dec 2023 23:30:41 +0100 Subject: [PATCH 07/20] stream: use internal ReadableStream for teeing byte streams --- lib/internal/webstreams/readablestream.js | 53 ++++++++++++++++++----- 1 file changed, 43 insertions(+), 10 deletions(-) diff --git a/lib/internal/webstreams/readablestream.js b/lib/internal/webstreams/readablestream.js index 8e0ad753135b65..9925adfee8edf8 100644 --- a/lib/internal/webstreams/readablestream.js +++ b/lib/internal/webstreams/readablestream.js @@ -1267,6 +1267,45 @@ function createReadableStream(start, pull, cancel) { return stream; } +function InternalReadableByteStream(start, pull, cancel) { + markTransferMode(this, false, true); + this[kType] = 'ReadableStream'; + this[kState] = { + disturbed: false, + reader: undefined, + state: 'readable', + storedError: undefined, + stream: undefined, + transfer: { + writable: undefined, + port1: undefined, + port2: undefined, + promise: undefined, + }, + }; + this[kIsClosedPromise] = createDeferredPromise(); + const controller = new ReadableByteStreamController(kSkipThrow); + setupReadableByteStreamController( + this, + controller, + start, + pull, + cancel, + 0, + undefined); +} + +ObjectSetPrototypeOf(InternalReadableByteStream.prototype, ReadableStream.prototype); +ObjectSetPrototypeOf(InternalReadableByteStream, ReadableStream); + +function createReadableByteStream(start, pull, cancel) { + const stream = new InternalReadableByteStream(start, pull, cancel); + + // For spec compliance the InternalReadableByteStream must be a ReadableStream + stream.constructor = ReadableStream; + return stream; +} + const isReadableStream = isBrandCheck('ReadableStream'); const isReadableByteStreamController = @@ -1933,16 +1972,10 @@ function readableByteStreamTee(stream) { return cancelDeferred.promise; } - branch1 = new ReadableStream({ - type: 'bytes', - pull: pull1Algorithm, - cancel: cancel1Algorithm, - }); - branch2 = new ReadableStream({ - type: 'bytes', - pull: pull2Algorithm, - cancel: cancel2Algorithm, - }); + branch1 = + createReadableByteStream(nonOpStart, pull1Algorithm, cancel1Algorithm); + branch2 = + createReadableByteStream(nonOpStart, pull2Algorithm, cancel2Algorithm); forwardReaderError(reader); From f0b10365df0d8d8659fa274cf71491bf085f9d2a Mon Sep 17 00:00:00 2001 From: Mattias Buelens Date: Mon, 4 Dec 2023 23:34:53 +0100 Subject: [PATCH 08/20] stream: use internal ReadableStream for ReadableStream.from() --- lib/internal/webstreams/readablestream.js | 26 ++++++++++------------- 1 file changed, 11 insertions(+), 15 deletions(-) diff --git a/lib/internal/webstreams/readablestream.js b/lib/internal/webstreams/readablestream.js index 9925adfee8edf8..4e9437b6c45ee9 100644 --- a/lib/internal/webstreams/readablestream.js +++ b/lib/internal/webstreams/readablestream.js @@ -1228,7 +1228,7 @@ ObjectDefineProperties(ReadableByteStreamController.prototype, { [SymbolToStringTag]: getNonWritablePropertyDescriptor(ReadableByteStreamController.name), }); -function InternalReadableStream(start, pull, cancel) { +function InternalReadableStream(start, pull, cancel, highWaterMark, size) { markTransferMode(this, false, true); this[kType] = 'ReadableStream'; this[kState] = { @@ -1252,15 +1252,15 @@ function InternalReadableStream(start, pull, cancel) { start, pull, cancel, - 1, - () => 1); + highWaterMark, + size); } ObjectSetPrototypeOf(InternalReadableStream.prototype, ReadableStream.prototype); ObjectSetPrototypeOf(InternalReadableStream, ReadableStream); -function createReadableStream(start, pull, cancel) { - const stream = new InternalReadableStream(start, pull, cancel); +function createReadableStream(start, pull, cancel, highWaterMark = 1, size = () => 1) { + const stream = new InternalReadableStream(start, pull, cancel, highWaterMark, size); // For spec compliance the InternalReadableStream must be a ReadableStream stream.constructor = ReadableStream; @@ -1358,16 +1358,12 @@ function readableStreamFromIterable(iterable) { }); } - stream = new ReadableStream({ - start: startAlgorithm, - pull: pullAlgorithm, - cancel: cancelAlgorithm, - }, { - size() { - return 1; - }, - highWaterMark: 0, - }); + stream = createReadableStream( + startAlgorithm, + pullAlgorithm, + cancelAlgorithm, + 0, + ); return stream; } From 3846bfed16a387bc62a077e6ffefb06112a00dff Mon Sep 17 00:00:00 2001 From: Mattias Buelens Date: Mon, 4 Dec 2023 23:46:54 +0100 Subject: [PATCH 09/20] stream: use internal streams for TransformStream --- lib/internal/webstreams/readablestream.js | 2 + lib/internal/webstreams/transformstream.js | 48 ++++++--------- lib/internal/webstreams/writablestream.js | 69 ++++++++++++++++++++++ 3 files changed, 90 insertions(+), 29 deletions(-) diff --git a/lib/internal/webstreams/readablestream.js b/lib/internal/webstreams/readablestream.js index 4e9437b6c45ee9..14023165677879 100644 --- a/lib/internal/webstreams/readablestream.js +++ b/lib/internal/webstreams/readablestream.js @@ -3391,4 +3391,6 @@ module.exports = { readableByteStreamControllerPullSteps, setupReadableByteStreamController, setupReadableByteStreamControllerFromSource, + createReadableStream, + createReadableByteStream, }; diff --git a/lib/internal/webstreams/transformstream.js b/lib/internal/webstreams/transformstream.js index b75e8fc5761f72..f5b1846546486b 100644 --- a/lib/internal/webstreams/transformstream.js +++ b/lib/internal/webstreams/transformstream.js @@ -51,6 +51,7 @@ const { const { ReadableStream, + createReadableStream, readableStreamDefaultControllerCanCloseOrEnqueue, readableStreamDefaultControllerClose, readableStreamDefaultControllerEnqueue, @@ -61,6 +62,7 @@ const { const { WritableStream, + createWritableStream, writableStreamDefaultControllerErrorIfNeeded, } = require('internal/webstreams/writablestream'); @@ -360,36 +362,24 @@ function initializeTransformStream( readableHighWaterMark, readableSizeAlgorithm) { - const writable = new WritableStream({ - __proto__: null, - start() { return startPromise.promise; }, - write(chunk) { - return transformStreamDefaultSinkWriteAlgorithm(stream, chunk); - }, - abort(reason) { - return transformStreamDefaultSinkAbortAlgorithm(stream, reason); - }, - close() { - return transformStreamDefaultSinkCloseAlgorithm(stream); - }, - }, { - highWaterMark: writableHighWaterMark, - size: writableSizeAlgorithm, - }); + const startAlgorithm = () => startPromise.promise; - const readable = new ReadableStream({ - __proto__: null, - start() { return startPromise.promise; }, - pull() { - return transformStreamDefaultSourcePullAlgorithm(stream); - }, - cancel(reason) { - return transformStreamDefaultSourceCancelAlgorithm(stream, reason); - }, - }, { - highWaterMark: readableHighWaterMark, - size: readableSizeAlgorithm, - }); + const writable = createWritableStream( + startAlgorithm, + (chunk) => transformStreamDefaultSinkWriteAlgorithm(stream, chunk), + () => transformStreamDefaultSinkCloseAlgorithm(stream), + (reason) => transformStreamDefaultSinkAbortAlgorithm(stream, reason), + writableHighWaterMark, + writableSizeAlgorithm, + ); + + const readable = createReadableStream( + startAlgorithm, + () => transformStreamDefaultSourcePullAlgorithm(stream), + (reason) => transformStreamDefaultSourceCancelAlgorithm(stream, reason), + readableHighWaterMark, + readableSizeAlgorithm, + ); stream[kState] = { readable, diff --git a/lib/internal/webstreams/writablestream.js b/lib/internal/webstreams/writablestream.js index 77e21e2b3b6abf..d8557969762d0f 100644 --- a/lib/internal/webstreams/writablestream.js +++ b/lib/internal/webstreams/writablestream.js @@ -581,6 +581,74 @@ ObjectDefineProperties(WritableStreamDefaultController.prototype, { [SymbolToStringTag]: getNonWritablePropertyDescriptor(WritableStreamDefaultController.name), }); +function InternalWritableStream(start, write, close, abort, highWaterMark, size) { + markTransferMode(this, false, true); + this[kType] = 'WritableStream'; + this[kState] = { + close: createDeferredPromise(), + closeRequest: { + promise: undefined, + resolve: undefined, + reject: undefined, + }, + inFlightWriteRequest: { + promise: undefined, + resolve: undefined, + reject: undefined, + }, + inFlightCloseRequest: { + promise: undefined, + resolve: undefined, + reject: undefined, + }, + pendingAbortRequest: { + abort: { + promise: undefined, + resolve: undefined, + reject: undefined, + }, + reason: undefined, + wasAlreadyErroring: false, + }, + backpressure: false, + controller: undefined, + state: 'writable', + storedError: undefined, + writeRequests: [], + writer: undefined, + transfer: { + readable: undefined, + port1: undefined, + port2: undefined, + promise: undefined, + }, + }; + this[kIsClosedPromise] = createDeferredPromise(); + + const controller = new WritableStreamDefaultController(kSkipThrow); + setupWritableStreamDefaultController( + this, + controller, + start, + write, + close, + abort, + highWaterMark, + size + ) +} + +ObjectSetPrototypeOf(InternalWritableStream.prototype, WritableStream.prototype); +ObjectSetPrototypeOf(InternalWritableStream, WritableStream); + +function createWritableStream(start, write, close, abort, highWaterMark, size) { + const stream = new InternalWritableStream(start, write, close, abort, highWaterMark, size); + + // For spec compliance the InternalWritableStream must be a WritableStream + stream.constructor = WritableStream; + return stream; +} + const isWritableStream = isBrandCheck('WritableStream'); const isWritableStreamDefaultWriter = @@ -1360,4 +1428,5 @@ module.exports = { writableStreamDefaultControllerAdvanceQueueIfNeeded, setupWritableStreamDefaultControllerFromSink, setupWritableStreamDefaultController, + createWritableStream, }; From 0d45e6f3ff0ff723a16ffebb203ab2a87e6ab4db Mon Sep 17 00:00:00 2001 From: Mattias Buelens Date: Fri, 15 Dec 2023 22:49:15 +0100 Subject: [PATCH 10/20] stream: add helpers to create internal state --- lib/internal/webstreams/readablestream.js | 72 +++-------- lib/internal/webstreams/writablestream.js | 148 ++++++---------------- 2 files changed, 58 insertions(+), 162 deletions(-) diff --git a/lib/internal/webstreams/readablestream.js b/lib/internal/webstreams/readablestream.js index 14023165677879..35deea7f345582 100644 --- a/lib/internal/webstreams/readablestream.js +++ b/lib/internal/webstreams/readablestream.js @@ -251,19 +251,7 @@ class ReadableStream { markTransferMode(this, false, true); if (source === null) throw new ERR_INVALID_ARG_VALUE('source', 'Object', source); - this[kState] = { - disturbed: false, - reader: undefined, - state: 'readable', - storedError: undefined, - stream: undefined, - transfer: { - writable: undefined, - port1: undefined, - port2: undefined, - promise: undefined, - }, - }; + this[kState] = createReadableStreamState(); this[kIsClosedPromise] = createDeferredPromise(); this[kControllerErrorFunction] = () => {}; @@ -647,19 +635,7 @@ ObjectDefineProperties(ReadableStream, { function InternalTransferredReadableStream() { markTransferMode(this, false, true); this[kType] = 'ReadableStream'; - this[kState] = { - disturbed: false, - reader: undefined, - state: 'readable', - storedError: undefined, - stream: undefined, - transfer: { - writable: undefined, - port1: undefined, - port2: undefined, - promise: undefined, - }, - }; + this[kState] = createReadableStreamState(); this[kIsClosedPromise] = createDeferredPromise(); } @@ -1231,19 +1207,7 @@ ObjectDefineProperties(ReadableByteStreamController.prototype, { function InternalReadableStream(start, pull, cancel, highWaterMark, size) { markTransferMode(this, false, true); this[kType] = 'ReadableStream'; - this[kState] = { - disturbed: false, - reader: undefined, - state: 'readable', - storedError: undefined, - stream: undefined, - transfer: { - writable: undefined, - port1: undefined, - port2: undefined, - promise: undefined, - }, - }; + this[kState] = createReadableStreamState(); this[kIsClosedPromise] = createDeferredPromise(); const controller = new ReadableStreamDefaultController(kSkipThrow); setupReadableStreamDefaultController( @@ -1270,19 +1234,7 @@ function createReadableStream(start, pull, cancel, highWaterMark = 1, size = () function InternalReadableByteStream(start, pull, cancel) { markTransferMode(this, false, true); this[kType] = 'ReadableStream'; - this[kState] = { - disturbed: false, - reader: undefined, - state: 'readable', - storedError: undefined, - stream: undefined, - transfer: { - writable: undefined, - port1: undefined, - port2: undefined, - promise: undefined, - }, - }; + this[kState] = createReadableStreamState(); this[kIsClosedPromise] = createDeferredPromise(); const controller = new ReadableByteStreamController(kSkipThrow); setupReadableByteStreamController( @@ -1319,6 +1271,22 @@ const isReadableStreamBYOBReader = // ---- ReadableStream Implementation +function createReadableStreamState() { + return { + disturbed: false, + reader: undefined, + state: 'readable', + storedError: undefined, + stream: undefined, + transfer: { + writable: undefined, + port1: undefined, + port2: undefined, + promise: undefined, + }, + }; +} + function readableStreamFromIterable(iterable) { let stream; const iteratorRecord = getIterator(iterable, 'async'); diff --git a/lib/internal/webstreams/writablestream.js b/lib/internal/webstreams/writablestream.js index d8557969762d0f..ce031da0f7381f 100644 --- a/lib/internal/webstreams/writablestream.js +++ b/lib/internal/webstreams/writablestream.js @@ -160,45 +160,7 @@ class WritableStream { if (type !== undefined) throw new ERR_INVALID_ARG_VALUE.RangeError('type', type); - this[kState] = { - close: createDeferredPromise(), - closeRequest: { - promise: undefined, - resolve: undefined, - reject: undefined, - }, - inFlightWriteRequest: { - promise: undefined, - resolve: undefined, - reject: undefined, - }, - inFlightCloseRequest: { - promise: undefined, - resolve: undefined, - reject: undefined, - }, - pendingAbortRequest: { - abort: { - promise: undefined, - resolve: undefined, - reject: undefined, - }, - reason: undefined, - wasAlreadyErroring: false, - }, - backpressure: false, - controller: undefined, - state: 'writable', - storedError: undefined, - writeRequests: [], - writer: undefined, - transfer: { - readable: undefined, - port1: undefined, - port2: undefined, - promise: undefined, - }, - }; + this[kState] = createWritableStreamState(); this[kIsClosedPromise] = createDeferredPromise(); this[kControllerErrorFunction] = () => {}; @@ -330,45 +292,7 @@ ObjectDefineProperties(WritableStream.prototype, { function InternalTransferredWritableStream() { markTransferMode(this, false, true); this[kType] = 'WritableStream'; - this[kState] = { - close: createDeferredPromise(), - closeRequest: { - promise: undefined, - resolve: undefined, - reject: undefined, - }, - inFlightWriteRequest: { - promise: undefined, - resolve: undefined, - reject: undefined, - }, - inFlightCloseRequest: { - promise: undefined, - resolve: undefined, - reject: undefined, - }, - pendingAbortRequest: { - abort: { - promise: undefined, - resolve: undefined, - reject: undefined, - }, - reason: undefined, - wasAlreadyErroring: false, - }, - backpressure: false, - controller: undefined, - state: 'writable', - storedError: undefined, - writeRequests: [], - writer: undefined, - transfer: { - readable: undefined, - port1: undefined, - port2: undefined, - promise: undefined, - }, - }; + this[kState] = createWritableStreamState(); this[kIsClosedPromise] = createDeferredPromise(); } @@ -584,7 +508,42 @@ ObjectDefineProperties(WritableStreamDefaultController.prototype, { function InternalWritableStream(start, write, close, abort, highWaterMark, size) { markTransferMode(this, false, true); this[kType] = 'WritableStream'; - this[kState] = { + this[kState] = createWritableStreamState(); + this[kIsClosedPromise] = createDeferredPromise(); + + const controller = new WritableStreamDefaultController(kSkipThrow); + setupWritableStreamDefaultController( + this, + controller, + start, + write, + close, + abort, + highWaterMark, + size + ) +} + +ObjectSetPrototypeOf(InternalWritableStream.prototype, WritableStream.prototype); +ObjectSetPrototypeOf(InternalWritableStream, WritableStream); + +function createWritableStream(start, write, close, abort, highWaterMark = 1, size = () => 1) { + const stream = new InternalWritableStream(start, write, close, abort, highWaterMark, size); + + // For spec compliance the InternalWritableStream must be a WritableStream + stream.constructor = WritableStream; + return stream; +} + +const isWritableStream = + isBrandCheck('WritableStream'); +const isWritableStreamDefaultWriter = + isBrandCheck('WritableStreamDefaultWriter'); +const isWritableStreamDefaultController = + isBrandCheck('WritableStreamDefaultController'); + +function createWritableStreamState() { + return { close: createDeferredPromise(), closeRequest: { promise: undefined, @@ -623,39 +582,8 @@ function InternalWritableStream(start, write, close, abort, highWaterMark, size) promise: undefined, }, }; - this[kIsClosedPromise] = createDeferredPromise(); - - const controller = new WritableStreamDefaultController(kSkipThrow); - setupWritableStreamDefaultController( - this, - controller, - start, - write, - close, - abort, - highWaterMark, - size - ) } -ObjectSetPrototypeOf(InternalWritableStream.prototype, WritableStream.prototype); -ObjectSetPrototypeOf(InternalWritableStream, WritableStream); - -function createWritableStream(start, write, close, abort, highWaterMark, size) { - const stream = new InternalWritableStream(start, write, close, abort, highWaterMark, size); - - // For spec compliance the InternalWritableStream must be a WritableStream - stream.constructor = WritableStream; - return stream; -} - -const isWritableStream = - isBrandCheck('WritableStream'); -const isWritableStreamDefaultWriter = - isBrandCheck('WritableStreamDefaultWriter'); -const isWritableStreamDefaultController = - isBrandCheck('WritableStreamDefaultController'); - function isWritableStreamLocked(stream) { return stream[kState].writer !== undefined; } From 5df4f2e2e2d6661d489b3edfabaa975709cadb3c Mon Sep 17 00:00:00 2001 From: Mattias Buelens Date: Fri, 15 Dec 2023 23:32:24 +0100 Subject: [PATCH 11/20] stream: remove unused field in internal state --- lib/internal/webstreams/readablestream.js | 1 - 1 file changed, 1 deletion(-) diff --git a/lib/internal/webstreams/readablestream.js b/lib/internal/webstreams/readablestream.js index 35deea7f345582..dca68c10d93aa0 100644 --- a/lib/internal/webstreams/readablestream.js +++ b/lib/internal/webstreams/readablestream.js @@ -1277,7 +1277,6 @@ function createReadableStreamState() { reader: undefined, state: 'readable', storedError: undefined, - stream: undefined, transfer: { writable: undefined, port1: undefined, From 15b30ca6c923a26a1842255c18db12a5c57e18b5 Mon Sep 17 00:00:00 2001 From: Mattias Buelens Date: Tue, 5 Dec 2023 00:33:43 +0100 Subject: [PATCH 12/20] stream: fix validating callbacks --- lib/internal/webstreams/readablestream.js | 14 +++++++------- lib/internal/webstreams/transformstream.js | 13 ++++++------- lib/internal/webstreams/util.js | 6 ++++++ lib/internal/webstreams/writablestream.js | 10 +++++----- 4 files changed, 24 insertions(+), 19 deletions(-) diff --git a/lib/internal/webstreams/readablestream.js b/lib/internal/webstreams/readablestream.js index dca68c10d93aa0..0fdba30dc51e87 100644 --- a/lib/internal/webstreams/readablestream.js +++ b/lib/internal/webstreams/readablestream.js @@ -95,9 +95,9 @@ const { AsyncIterator, cloneAsUint8Array, copyArrayBuffer, + createPromiseCallback, customInspect, dequeueValue, - ensureIsPromise, enqueueValueWithSize, extractHighWaterMark, extractSizeAlgorithm, @@ -2354,7 +2354,7 @@ function readableStreamDefaultControllerCallPullIfNeeded(controller) { assert(!controller[kState].pullAgain); controller[kState].pulling = true; PromisePrototypeThen( - controller[kState].pullAlgorithm(), + controller[kState].pullAlgorithm(controller), () => { controller[kState].pulling = false; if (controller[kState].pullAgain) { @@ -2460,10 +2460,10 @@ function setupReadableStreamDefaultControllerFromSource( FunctionPrototypeBind(start, source, controller) : nonOpStart; const pullAlgorithm = pull ? - () => ensureIsPromise(pull, source, controller) : + createPromiseCallback('source.pull', pull, source) : nonOpPull; const cancelAlgorithm = cancel ? - (reason) => ensureIsPromise(cancel, source, reason) : + createPromiseCallback('source.cancel', cancel, source) : nonOpCancel; setupReadableStreamDefaultController( @@ -3091,7 +3091,7 @@ function readableByteStreamControllerCallPullIfNeeded(controller) { assert(!controller[kState].pullAgain); controller[kState].pulling = true; PromisePrototypeThen( - controller[kState].pullAlgorithm(), + controller[kState].pullAlgorithm(controller), () => { controller[kState].pulling = false; if (controller[kState].pullAgain) { @@ -3258,10 +3258,10 @@ function setupReadableByteStreamControllerFromSource( FunctionPrototypeBind(start, source, controller) : nonOpStart; const pullAlgorithm = pull ? - () => ensureIsPromise(pull, source, controller) : + createPromiseCallback('source.pull', pull, source, controller) : nonOpPull; const cancelAlgorithm = cancel ? - (reason) => ensureIsPromise(cancel, source, reason) : + createPromiseCallback('source.cancel', cancel, source) : nonOpCancel; if (autoAllocateChunkSize === 0) { diff --git a/lib/internal/webstreams/transformstream.js b/lib/internal/webstreams/transformstream.js index f5b1846546486b..5cf1bfd4499e8f 100644 --- a/lib/internal/webstreams/transformstream.js +++ b/lib/internal/webstreams/transformstream.js @@ -1,7 +1,6 @@ 'use strict'; const { - FunctionPrototypeBind, FunctionPrototypeCall, ObjectDefineProperties, ObjectSetPrototypeOf, @@ -38,8 +37,8 @@ const { } = require('internal/worker/js_transferable'); const { + createPromiseCallback, customInspect, - ensureIsPromise, extractHighWaterMark, extractSizeAlgorithm, isBrandCheck, @@ -457,13 +456,13 @@ function setupTransformStreamDefaultControllerFromTransformer( const flush = transformer?.flush; const cancel = transformer?.cancel; const transformAlgorithm = transform ? - (chunk) => ensureIsPromise(transform, transformer, chunk, controller) : + createPromiseCallback('transformer.transform', transform, transformer) : defaultTransformAlgorithm; const flushAlgorithm = flush ? - () => ensureIsPromise(flush, transformer, controller) : + createPromiseCallback('transformer.flush', flush, transformer) : nonOpFlush; const cancelAlgorithm = cancel ? - (reason) => ensureIsPromise(cancel, transformer, reason) : + createPromiseCallback('transformer.cancel', cancel, transformer) : nonOpCancel; setupTransformStreamDefaultController( @@ -512,7 +511,7 @@ function transformStreamDefaultControllerError(controller, error) { async function transformStreamDefaultControllerPerformTransform(controller, chunk) { try { - return await controller[kState].transformAlgorithm(chunk); + return await controller[kState].transformAlgorithm(chunk, controller); } catch (error) { transformStreamError(controller[kState].stream, error); throw error; @@ -606,7 +605,7 @@ function transformStreamDefaultSinkCloseAlgorithm(stream) { } const { promise, resolve, reject } = createDeferredPromise(); controller[kState].finishPromise = promise; - const flushPromise = controller[kState].flushAlgorithm(); + const flushPromise = controller[kState].flushAlgorithm(controller); transformStreamDefaultControllerClearAlgorithms(controller); PromisePrototypeThen( flushPromise, diff --git a/lib/internal/webstreams/util.js b/lib/internal/webstreams/util.js index b31a0d3380a36e..0bafebaf2cdd61 100644 --- a/lib/internal/webstreams/util.js +++ b/lib/internal/webstreams/util.js @@ -189,6 +189,11 @@ function ensureIsPromise(fn, thisArg, ...args) { } } +function createPromiseCallback(name, fn, thisArg) { + validateFunction(fn, name); + return (...args) => ensureIsPromise(fn, thisArg, ...args); +} + function isPromisePending(promise) { if (promise === undefined) return false; const details = getPromiseDetails(promise); @@ -273,6 +278,7 @@ module.exports = { ArrayBufferViewGetByteLength, ArrayBufferViewGetByteOffset, AsyncIterator, + createPromiseCallback, cloneAsUint8Array, copyArrayBuffer, customInspect, diff --git a/lib/internal/webstreams/writablestream.js b/lib/internal/webstreams/writablestream.js index ce031da0f7381f..54668f0a38fb66 100644 --- a/lib/internal/webstreams/writablestream.js +++ b/lib/internal/webstreams/writablestream.js @@ -49,9 +49,9 @@ const { } = require('internal/worker/js_transferable'); const { + createPromiseCallback, customInspect, dequeueValue, - ensureIsPromise, enqueueValueWithSize, extractHighWaterMark, extractSizeAlgorithm, @@ -1107,7 +1107,7 @@ function writableStreamDefaultControllerProcessWrite(controller, chunk) { writableStreamMarkFirstWriteRequestInFlight(stream); PromisePrototypeThen( - writeAlgorithm(chunk), + writeAlgorithm(chunk, controller), () => { writableStreamFinishInFlightWrite(stream); const { @@ -1239,13 +1239,13 @@ function setupWritableStreamDefaultControllerFromSink( FunctionPrototypeBind(start, sink, controller) : nonOpStart; const writeAlgorithm = write ? - (chunk) => ensureIsPromise(write, sink, chunk, controller) : + createPromiseCallback('sink.write', write, sink) : nonOpWrite; const closeAlgorithm = close ? - () => ensureIsPromise(close, sink) : + createPromiseCallback('sink.close', close, sink) : nonOpCancel; const abortAlgorithm = abort ? - (reason) => ensureIsPromise(abort, sink, reason) : + createPromiseCallback('sink.abort', abort, sink) : nonOpCancel; setupWritableStreamDefaultController( stream, From 0bd501f541cf45ca4e447827fd77b345dd3003b6 Mon Sep 17 00:00:00 2001 From: Mattias Buelens Date: Fri, 15 Dec 2023 23:30:21 +0100 Subject: [PATCH 13/20] test: update expectations for streams wpt --- test/wpt/status/streams.json | 7 ------- 1 file changed, 7 deletions(-) diff --git a/test/wpt/status/streams.json b/test/wpt/status/streams.json index 8d6a4c6d2fe27b..3b6e0ce6429f9d 100644 --- a/test/wpt/status/streams.json +++ b/test/wpt/status/streams.json @@ -59,12 +59,5 @@ }, "readable-streams/read-task-handling.window.js": { "skip": "Browser-specific test" - }, - "transform-streams/cancel.any.js": { - "fail": { - "expected": [ - "readable.cancel() and a parallel writable.close() should reject if a transformer.cancel() calls controller.error()" - ] - } } } From 112a71713286a179b106f80d80759de9264c179c Mon Sep 17 00:00:00 2001 From: Mattias Buelens Date: Sat, 16 Dec 2023 00:24:23 +0100 Subject: [PATCH 14/20] stream: set __proto__ to null for internal state --- lib/internal/webstreams/readablestream.js | 2 ++ lib/internal/webstreams/transformstream.js | 5 +++++ lib/internal/webstreams/writablestream.js | 7 +++++++ 3 files changed, 14 insertions(+) diff --git a/lib/internal/webstreams/readablestream.js b/lib/internal/webstreams/readablestream.js index 0fdba30dc51e87..be8fc752c70db7 100644 --- a/lib/internal/webstreams/readablestream.js +++ b/lib/internal/webstreams/readablestream.js @@ -1273,11 +1273,13 @@ const isReadableStreamBYOBReader = function createReadableStreamState() { return { + __proto__: null, disturbed: false, reader: undefined, state: 'readable', storedError: undefined, transfer: { + __proto__: null, writable: undefined, port1: undefined, port2: undefined, diff --git a/lib/internal/webstreams/transformstream.js b/lib/internal/webstreams/transformstream.js index 5cf1bfd4499e8f..1d39f3e8c3a6ca 100644 --- a/lib/internal/webstreams/transformstream.js +++ b/lib/internal/webstreams/transformstream.js @@ -252,10 +252,12 @@ function InternalTransferredTransformStream() { markTransferMode(this, false, true); this[kType] = 'TransformStream'; this[kState] = { + __proto__: null, readable: undefined, writable: undefined, backpressure: undefined, backpressureChange: { + __proto__: null, promise: undefined, resolve: undefined, reject: undefined, @@ -381,11 +383,13 @@ function initializeTransformStream( ); stream[kState] = { + __proto__: null, readable, writable, controller: undefined, backpressure: undefined, backpressureChange: { + __proto__: null, promise: undefined, resolve: undefined, reject: undefined, @@ -440,6 +444,7 @@ function setupTransformStreamDefaultController( assert(isTransformStream(stream)); assert(stream[kState].controller === undefined); controller[kState] = { + __proto__: null, stream, transformAlgorithm, flushAlgorithm, diff --git a/lib/internal/webstreams/writablestream.js b/lib/internal/webstreams/writablestream.js index 54668f0a38fb66..24263c3d4b42a6 100644 --- a/lib/internal/webstreams/writablestream.js +++ b/lib/internal/webstreams/writablestream.js @@ -544,24 +544,30 @@ const isWritableStreamDefaultController = function createWritableStreamState() { return { + __proto__: null, close: createDeferredPromise(), closeRequest: { + __proto__: null, promise: undefined, resolve: undefined, reject: undefined, }, inFlightWriteRequest: { + __proto__: null, promise: undefined, resolve: undefined, reject: undefined, }, inFlightCloseRequest: { + __proto__: null, promise: undefined, resolve: undefined, reject: undefined, }, pendingAbortRequest: { + __proto__: null, abort: { + __proto__: null, promise: undefined, resolve: undefined, reject: undefined, @@ -576,6 +582,7 @@ function createWritableStreamState() { writeRequests: [], writer: undefined, transfer: { + __proto__: null, readable: undefined, port1: undefined, port2: undefined, From b9a13b528e6330cd7d58e5bf9943909c2eb8a22a Mon Sep 17 00:00:00 2001 From: Mattias Buelens Date: Sat, 16 Dec 2023 00:29:05 +0100 Subject: [PATCH 15/20] fixup! simplify ensureIsPromise --- lib/internal/webstreams/util.js | 15 ++------------- 1 file changed, 2 insertions(+), 13 deletions(-) diff --git a/lib/internal/webstreams/util.js b/lib/internal/webstreams/util.js index 0bafebaf2cdd61..baafed8b536553 100644 --- a/lib/internal/webstreams/util.js +++ b/lib/internal/webstreams/util.js @@ -9,8 +9,6 @@ const { MathMax, NumberIsNaN, PromisePrototypeThen, - Promise, - PromiseReject, ReflectGet, Symbol, SymbolAsyncIterator, @@ -31,10 +29,6 @@ const { detachArrayBuffer, } = internalBinding('buffer'); -const { - isPromise, -} = require('internal/util/types'); - const { inspect, } = require('util'); @@ -180,13 +174,8 @@ function enqueueValueWithSize(controller, value, size) { controller[kState].queueTotalSize += size; } -function ensureIsPromise(fn, thisArg, ...args) { - try { - const value = FunctionPrototypeCall(fn, thisArg, ...args); - return new Promise((r) => r(value)); - } catch (error) { - return PromiseReject(error); - } +async function ensureIsPromise(fn, thisArg, ...args) { + return await FunctionPrototypeCall(fn, thisArg, ...args); } function createPromiseCallback(name, fn, thisArg) { From 162cfdc0d99568962e045865603f1889c4cfd631 Mon Sep 17 00:00:00 2001 From: Mattias Buelens Date: Sat, 16 Dec 2023 00:30:15 +0100 Subject: [PATCH 16/20] fixup! remove unused imports --- lib/internal/webstreams/readablestream.js | 1 - lib/internal/webstreams/transformstream.js | 2 -- 2 files changed, 3 deletions(-) diff --git a/lib/internal/webstreams/readablestream.js b/lib/internal/webstreams/readablestream.js index be8fc752c70db7..35656216680875 100644 --- a/lib/internal/webstreams/readablestream.js +++ b/lib/internal/webstreams/readablestream.js @@ -11,7 +11,6 @@ const { FunctionPrototypeCall, MathMin, NumberIsInteger, - ObjectCreate, ObjectDefineProperties, ObjectSetPrototypeOf, Promise, diff --git a/lib/internal/webstreams/transformstream.js b/lib/internal/webstreams/transformstream.js index 1d39f3e8c3a6ca..3d839483b68492 100644 --- a/lib/internal/webstreams/transformstream.js +++ b/lib/internal/webstreams/transformstream.js @@ -49,7 +49,6 @@ const { } = require('internal/webstreams/util'); const { - ReadableStream, createReadableStream, readableStreamDefaultControllerCanCloseOrEnqueue, readableStreamDefaultControllerClose, @@ -60,7 +59,6 @@ const { } = require('internal/webstreams/readablestream'); const { - WritableStream, createWritableStream, writableStreamDefaultControllerErrorIfNeeded, } = require('internal/webstreams/writablestream'); From a889e2c7eec96c4e9afaeaa78dbb5b12e5b8c74d Mon Sep 17 00:00:00 2001 From: Mattias Buelens Date: Sat, 16 Dec 2023 00:32:41 +0100 Subject: [PATCH 17/20] fixup! whitespace --- lib/internal/webstreams/transformstream.js | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lib/internal/webstreams/transformstream.js b/lib/internal/webstreams/transformstream.js index 3d839483b68492..b418fd2cb5347c 100644 --- a/lib/internal/webstreams/transformstream.js +++ b/lib/internal/webstreams/transformstream.js @@ -373,7 +373,7 @@ function initializeTransformStream( ); const readable = createReadableStream( - startAlgorithm, + startAlgorithm, () => transformStreamDefaultSourcePullAlgorithm(stream), (reason) => transformStreamDefaultSourceCancelAlgorithm(stream, reason), readableHighWaterMark, From 16a9cc7113e0b768085c5a5bb3eb90fdea6330fa Mon Sep 17 00:00:00 2001 From: Mattias Buelens Date: Sat, 16 Dec 2023 00:40:33 +0100 Subject: [PATCH 18/20] fixup! code style --- lib/internal/webstreams/writablestream.js | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/lib/internal/webstreams/writablestream.js b/lib/internal/webstreams/writablestream.js index 24263c3d4b42a6..954bc6c2cc93d8 100644 --- a/lib/internal/webstreams/writablestream.js +++ b/lib/internal/webstreams/writablestream.js @@ -520,8 +520,8 @@ function InternalWritableStream(start, write, close, abort, highWaterMark, size) close, abort, highWaterMark, - size - ) + size, + ); } ObjectSetPrototypeOf(InternalWritableStream.prototype, WritableStream.prototype); From 11e6f94bd6740e7f3880711edb3727508afe8ab5 Mon Sep 17 00:00:00 2001 From: Mattias Buelens Date: Sat, 16 Dec 2023 11:39:10 +0100 Subject: [PATCH 19/20] fixup! fix timing of ensureIsPromise --- lib/internal/webstreams/util.js | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/lib/internal/webstreams/util.js b/lib/internal/webstreams/util.js index baafed8b536553..9b87207b59428e 100644 --- a/lib/internal/webstreams/util.js +++ b/lib/internal/webstreams/util.js @@ -174,8 +174,10 @@ function enqueueValueWithSize(controller, value, size) { controller[kState].queueTotalSize += size; } +// This implements "invoke a callback function type" for callback functions that return a promise. +// See https://webidl.spec.whatwg.org/#es-invoking-callback-functions async function ensureIsPromise(fn, thisArg, ...args) { - return await FunctionPrototypeCall(fn, thisArg, ...args); + return FunctionPrototypeCall(fn, thisArg, ...args); } function createPromiseCallback(name, fn, thisArg) { From 0c0e3c184c600ca98937c7ca760e312179be89eb Mon Sep 17 00:00:00 2001 From: Mattias Buelens Date: Sat, 16 Dec 2023 11:40:58 +0100 Subject: [PATCH 20/20] fixup! rename to invokePromiseCallback --- lib/internal/webstreams/util.js | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/lib/internal/webstreams/util.js b/lib/internal/webstreams/util.js index 9b87207b59428e..e862b3ffe25724 100644 --- a/lib/internal/webstreams/util.js +++ b/lib/internal/webstreams/util.js @@ -176,13 +176,13 @@ function enqueueValueWithSize(controller, value, size) { // This implements "invoke a callback function type" for callback functions that return a promise. // See https://webidl.spec.whatwg.org/#es-invoking-callback-functions -async function ensureIsPromise(fn, thisArg, ...args) { +async function invokePromiseCallback(fn, thisArg, ...args) { return FunctionPrototypeCall(fn, thisArg, ...args); } function createPromiseCallback(name, fn, thisArg) { validateFunction(fn, name); - return (...args) => ensureIsPromise(fn, thisArg, ...args); + return (...args) => invokePromiseCallback(fn, thisArg, ...args); } function isPromisePending(promise) { @@ -274,11 +274,11 @@ module.exports = { copyArrayBuffer, customInspect, dequeueValue, - ensureIsPromise, enqueueValueWithSize, extractHighWaterMark, extractSizeAlgorithm, lazyTransfer, + invokePromiseCallback, isBrandCheck, isPromisePending, isViewedArrayBufferDetached,