From cfe205807ad0b72e3ad7c11cbff499d4ff06e73e Mon Sep 17 00:00:00 2001 From: Raz Luvaton <16746759+rluvaton@users.noreply.github.com> Date: Sun, 17 Sep 2023 21:59:46 +0300 Subject: [PATCH 1/3] stream: improve readable webstream `pipeTo` --- lib/internal/webstreams/readablestream.js | 51 ++++++++++++++++------- 1 file changed, 35 insertions(+), 16 deletions(-) diff --git a/lib/internal/webstreams/readablestream.js b/lib/internal/webstreams/readablestream.js index 28e985875c5ddb..f26b38c6aba9f2 100644 --- a/lib/internal/webstreams/readablestream.js +++ b/lib/internal/webstreams/readablestream.js @@ -1336,7 +1336,9 @@ function readableStreamPipeTo( const promise = createDeferredPromise(); - let currentWrite = PromiseResolve(); + const state = { + currentWrite: PromiseResolve(), + }; // The error here can be undefined. The rejected arg // tells us that the promise must be rejected even @@ -1353,9 +1355,9 @@ function readableStreamPipeTo( } async function waitForCurrentWrite() { - const write = currentWrite; + const write = state.currentWrite; await write; - if (write !== currentWrite) + if (write !== state.currentWrite) await waitForCurrentWrite(); } @@ -1446,20 +1448,15 @@ function readableStreamPipeTo( async function step() { if (shuttingDown) return true; + await writer[kState].ready.promise; - return new Promise((resolve, reject) => { - readableStreamDefaultReaderRead( - reader, - { - [kChunk](chunk) { - currentWrite = writableStreamDefaultWriterWrite(writer, chunk); - setPromiseHandled(currentWrite); - resolve(false); - }, - [kClose]: () => resolve(true), - [kError]: reject, - }); - }); + + const promise = createDeferredPromise(); + const readRequest = new PipeToReadableStreamReadRequest(writer, state, promise); + + readableStreamDefaultReaderRead(reader, readRequest); + + return promise.promise; } async function run() { @@ -1521,6 +1518,28 @@ function readableStreamPipeTo( return promise.promise; } +class PipeToReadableStreamReadRequest { + constructor(writer, state, promise) { + this.writer = writer; + this.state = state; + this.promise = promise; + } + + [kChunk](chunk) { + this.state.currentWrite = writableStreamDefaultWriterWrite(this.writer, chunk); + setPromiseHandled(this.state.currentWrite); + this.promise.resolve(false); + } + + [kClose]() { + this.promise.resolve(true) + } + + [kError](error) { + this.promise.reject(error); + } +} + function readableStreamTee(stream, cloneForBranch2) { if (isReadableByteStreamController(stream[kState].controller)) { return readableByteStreamTee(stream); From 3734af5c29d16ac78e901624faf133d7daf60594 Mon Sep 17 00:00:00 2001 From: Raz Luvaton <16746759+rluvaton@users.noreply.github.com> Date: Sun, 17 Sep 2023 22:39:38 +0300 Subject: [PATCH 2/3] stream: fix lint --- lib/internal/webstreams/readablestream.js | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/lib/internal/webstreams/readablestream.js b/lib/internal/webstreams/readablestream.js index f26b38c6aba9f2..43c9eee924087d 100644 --- a/lib/internal/webstreams/readablestream.js +++ b/lib/internal/webstreams/readablestream.js @@ -14,7 +14,6 @@ const { ObjectCreate, ObjectDefineProperties, ObjectSetPrototypeOf, - Promise, PromisePrototypeThen, PromiseResolve, PromiseReject, @@ -1452,6 +1451,7 @@ function readableStreamPipeTo( await writer[kState].ready.promise; const promise = createDeferredPromise(); + // eslint-disable-next-line no-use-before-define const readRequest = new PipeToReadableStreamReadRequest(writer, state, promise); readableStreamDefaultReaderRead(reader, readRequest); @@ -1532,7 +1532,7 @@ class PipeToReadableStreamReadRequest { } [kClose]() { - this.promise.resolve(true) + this.promise.resolve(true); } [kError](error) { From fac8ca681a0dfe07e0289de27af83cdfff833f5c Mon Sep 17 00:00:00 2001 From: Raz Luvaton <16746759+rluvaton@users.noreply.github.com> Date: Sun, 17 Sep 2023 22:57:16 +0300 Subject: [PATCH 3/3] stream: inline var --- lib/internal/webstreams/readablestream.js | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/lib/internal/webstreams/readablestream.js b/lib/internal/webstreams/readablestream.js index 43c9eee924087d..924ffc34874111 100644 --- a/lib/internal/webstreams/readablestream.js +++ b/lib/internal/webstreams/readablestream.js @@ -1452,9 +1452,7 @@ function readableStreamPipeTo( const promise = createDeferredPromise(); // eslint-disable-next-line no-use-before-define - const readRequest = new PipeToReadableStreamReadRequest(writer, state, promise); - - readableStreamDefaultReaderRead(reader, readRequest); + readableStreamDefaultReaderRead(reader, new PipeToReadableStreamReadRequest(writer, state, promise)); return promise.promise; }