Skip to content

Commit

Permalink
fix(edge): make trackStreamConsumed handle cancellation
Browse files Browse the repository at this point in the history
  • Loading branch information
lubieowoce committed Feb 13, 2025
1 parent 5eebc0a commit 78745e9
Show file tree
Hide file tree
Showing 2 changed files with 125 additions and 4 deletions.
76 changes: 76 additions & 0 deletions packages/next/src/server/web/web-on-close.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
import { DetachedPromise } from '../../lib/detached-promise'
import { trackStreamConsumed } from './web-on-close'

describe('trackStreamConsumed', () => {
it('calls onEnd when the stream finishes', async () => {
const endPromise = new DetachedPromise<void>()
const onEnd = jest.fn(endPromise.resolve)

const { stream: inputStream, controller } =
readableStreamWithController<string>()
const trackedStream = trackStreamConsumed(inputStream, onEnd)

const reader = trackedStream.getReader()
controller.enqueue('one')
controller.enqueue('two')
await reader.read()
await reader.read()
expect(onEnd).not.toHaveBeenCalled()

controller.close()

await endPromise.promise
expect(onEnd).toHaveBeenCalledTimes(1)
})

it('calls onEnd when the stream errors', async () => {
const endPromise = new DetachedPromise<void>()
const onEnd = jest.fn(endPromise.resolve)

const { stream: inputStream, controller } =
readableStreamWithController<string>()
const trackedStream = trackStreamConsumed(inputStream, onEnd)

const reader = trackedStream.getReader()
controller.enqueue('one')
controller.enqueue('two')
await reader.read()
await reader.read()
expect(onEnd).not.toHaveBeenCalled()

controller.error(new Error('kaboom'))

await endPromise.promise
expect(onEnd).toHaveBeenCalledTimes(1)
})

it('calls onEnd when the stream is cancelled', async () => {
const endPromise = new DetachedPromise<void>()
const onEnd = jest.fn(endPromise.resolve)

const { stream: inputStream, controller } =
readableStreamWithController<string>()
const trackedStream = trackStreamConsumed(inputStream, onEnd)

const reader = trackedStream.getReader()
controller.enqueue('one')
controller.enqueue('two')
await reader.read()
await reader.read()
expect(onEnd).not.toHaveBeenCalled()

await reader.cancel()
await endPromise.promise
expect(onEnd).toHaveBeenCalledTimes(1)
})
})

function readableStreamWithController<TChunk>() {
let controller: ReadableStreamDefaultController<TChunk> = undefined!
const stream = new ReadableStream<TChunk>({
start(_controller) {
controller = _controller
},
})
return { controller, stream }
}
53 changes: 49 additions & 4 deletions packages/next/src/server/web/web-on-close.ts
Original file line number Diff line number Diff line change
Expand Up @@ -22,12 +22,57 @@ export function trackStreamConsumed<TChunk>(
stream: ReadableStream<TChunk>,
onEnd: () => void
): ReadableStream<TChunk> {
const closePassThrough = new TransformStream<TChunk, TChunk>({
flush: () => {
return onEnd()
// NOTE:
// This needs to be robust against the stream being cancelled.
//
// Thich can happen e.g. during redirects -- we'll stream a response,
// but if we're not fast enough, the browser can disconnect before we finish
// (because it wants to navigate to the redirect location anyway)
// and we'll get cancelled with a `ResponseAborted`.
//
// Ideally, we would just do this:
//
// const closePassThrough = new TransformStream<TChunk, TChunk>({
// flush() { onEnd() },
// cancel() { onEnd() },
// })
// return stream.pipeThrough(closePassThrough)
//
// But cancellation handling via `Transformer.cancel` is only available in node >20
// so we can't use it yet, so we need to use a `ReadableStream` instead.

let calledOnEnd = false
return new ReadableStream<TChunk>({
async start(controller) {
const reader = stream.getReader()
while (true) {
try {
const { done, value } = await reader.read()
if (!done) {
controller.enqueue(value)
} else {
controller.close()
break
}
} catch (err) {
controller.error(err)
break
}
}
if (!calledOnEnd) {
calledOnEnd = true
onEnd()
}
},
cancel() {
// NOTE: apparently `cancel()` can be called even after the reader above exits,
// so we need to guard against calling the callback twice
if (!calledOnEnd) {
calledOnEnd = true
onEnd()
}
},
})
return stream.pipeThrough(closePassThrough)
}

export class CloseController {
Expand Down

0 comments on commit 78745e9

Please sign in to comment.