Skip to content

Commit

Permalink
refactor: waitUntil passed around via ALS (#733)
Browse files Browse the repository at this point in the history
Co-authored-by: conico974 <nicodorseuil@yahoo.fr>
  • Loading branch information
vicb and conico974 authored Feb 10, 2025
1 parent 50b3559 commit b59027a
Show file tree
Hide file tree
Showing 16 changed files with 105 additions and 46 deletions.
25 changes: 25 additions & 0 deletions .changeset/pink-papayas-smoke.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
---
"@opennextjs/aws": minor
---

refactor: `waitUntil` passed around via ALS and `OpenNextHandler` signature has changed

BREAKING CHANGE: `waitUntil` is passed around via ALS to fix #713.

`globalThis.openNextWaitUntil` is no more available, you can access `waitUntil`
on the ALS context: `globalThis.__openNextAls.getStore()`

The `OpenNextHandler` signature has changed: the second parameter was a `StreamCreator`.
It was changed to be of type `OpenNextHandlerOptions` which has both a `streamCreator` key
and a `waitUntil` key.

If you use a custom wrapper, you need to update the call to the handler as follow:

```ts
// before
globalThis.openNextWaitUntil = myWaitUntil;
handler(internalEvent, myStreamCreator);

// after
handler(internalEvent, { streamCreator: myStreamCreator, waitUntil: myWaitUntil });
```
4 changes: 3 additions & 1 deletion packages/open-next/src/adapters/edge-adapter.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import type { InternalEvent, InternalResult } from "types/open-next";
import { runWithOpenNextRequestContext } from "utils/promise";
import { emptyReadableStream } from "utils/stream";

import type { OpenNextHandlerOptions } from "types/overrides";
// We import it like that so that the edge plugin can replace it
import { NextConfig } from "../adapters/config";
import { createGenericHandler } from "../core/createGenericHandler";
Expand All @@ -16,12 +17,13 @@ globalThis.__openNextAls = new AsyncLocalStorage();

const defaultHandler = async (
internalEvent: InternalEvent,
options?: OpenNextHandlerOptions,
): Promise<InternalResult> => {
globalThis.isEdgeRuntime = true;

// We run everything in the async local storage context so that it is available in edge runtime functions
return runWithOpenNextRequestContext(
{ isISRRevalidation: false },
{ isISRRevalidation: false, waitUntil: options?.waitUntil },
async () => {
const host = internalEvent.headers.host
? `https://${internalEvent.headers.host}`
Expand Down
7 changes: 4 additions & 3 deletions packages/open-next/src/adapters/image-optimization-adapter.ts
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import type {
} from "types/open-next.js";
import { emptyReadableStream, toReadableStream } from "utils/stream.js";

import type { OpenNextHandlerOptions } from "types/overrides.js";
import { createGenericHandler } from "../core/createGenericHandler.js";
import { resolveImageLoader } from "../core/resolve.js";
import { debug, error } from "./logger.js";
Expand Down Expand Up @@ -58,7 +59,7 @@ export const handler = await createGenericHandler({

export async function defaultHandler(
event: InternalEvent,
streamCreator?: StreamCreator,
options?: OpenNextHandlerOptions,
): Promise<InternalResult> {
// Images are handled via header and query param information.
debug("handler event", event);
Expand Down Expand Up @@ -99,9 +100,9 @@ export async function defaultHandler(
downloadHandler,
);

return buildSuccessResponse(result, streamCreator, etag);
return buildSuccessResponse(result, options?.streamCreator, etag);
} catch (e: any) {
return buildFailureResponse(e, streamCreator);
return buildFailureResponse(e, options?.streamCreator);
}
}

Expand Down
7 changes: 6 additions & 1 deletion packages/open-next/src/adapters/middleware.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import type {
} from "types/open-next";
import { runWithOpenNextRequestContext } from "utils/promise";

import type { OpenNextHandlerOptions } from "types/overrides";
import { debug, error } from "../adapters/logger";
import { createGenericHandler } from "../core/createGenericHandler";
import {
Expand All @@ -24,6 +25,7 @@ globalThis.__openNextAls = new AsyncLocalStorage();

const defaultHandler = async (
internalEvent: InternalEvent,
options?: OpenNextHandlerOptions,
): Promise<InternalResult | MiddlewareResult> => {
const originResolver = await resolveOriginResolver(
globalThis.openNextConfig.middleware?.originResolver,
Expand All @@ -49,7 +51,10 @@ const defaultHandler = async (

// We run everything in the async local storage context so that it is available in the external middleware
return runWithOpenNextRequestContext(
{ isISRRevalidation: internalEvent.headers["x-isr"] === "1" },
{
isISRRevalidation: internalEvent.headers["x-isr"] === "1",
waitUntil: options?.waitUntil,
},
async () => {
const result = await routingHandler(internalEvent);
if ("internalEvent" in result) {
Expand Down
15 changes: 9 additions & 6 deletions packages/open-next/src/core/requestHandler.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,10 @@ import type {
InternalResult,
ResolvedRoute,
RoutingResult,
StreamCreator,
} from "types/open-next";
import { runWithOpenNextRequestContext } from "utils/promise";

import type { OpenNextHandlerOptions } from "types/overrides";
import { debug, error, warn } from "../adapters/logger";
import { patchAsyncStorage } from "./patchAsyncStorage";
import { convertRes, createServerResponse } from "./routing/util";
Expand All @@ -29,12 +29,15 @@ patchAsyncStorage();

export async function openNextHandler(
internalEvent: InternalEvent,
responseStreaming?: StreamCreator,
options?: OpenNextHandlerOptions,
): Promise<InternalResult> {
const initialHeaders = internalEvent.headers;
// We run everything in the async local storage context so that it is available in the middleware as well as in NextServer
return runWithOpenNextRequestContext(
{ isISRRevalidation: initialHeaders["x-isr"] === "1" },
{
isISRRevalidation: initialHeaders["x-isr"] === "1",
waitUntil: options?.waitUntil,
},
async () => {
if (initialHeaders["x-forwarded-host"]) {
initialHeaders.host = initialHeaders["x-forwarded-host"];
Expand Down Expand Up @@ -116,7 +119,7 @@ export async function openNextHandler(

if ("type" in routingResult) {
// response is used only in the streaming case
if (responseStreaming) {
if (options?.streamCreator) {
const response = createServerResponse(
{
internalEvent,
Expand All @@ -127,7 +130,7 @@ export async function openNextHandler(
initialPath: internalEvent.rawPath,
},
headers,
responseStreaming,
options.streamCreator,
);
response.statusCode = routingResult.statusCode;
response.flushHeaders();
Expand Down Expand Up @@ -171,7 +174,7 @@ export async function openNextHandler(
const res = createServerResponse(
routingResult,
overwrittenResponseHeaders,
responseStreaming,
options?.streamCreator,
);

await processRequest(req, res, preprocessedEvent);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ const handler: WrapperHandler = async (handler, converter) =>
},
};

const response = await handler(internalEvent, streamCreator);
const response = await handler(internalEvent, { streamCreator });

const isUsingEdge = globalThis.isEdgeRuntime ?? false;
if (isUsingEdge) {
Expand Down
4 changes: 3 additions & 1 deletion packages/open-next/src/overrides/wrappers/aws-lambda.ts
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,9 @@ const handler: WrapperHandler =
},
};

const response = await handler(internalEvent, fakeStream);
const response = await handler(internalEvent, {
streamCreator: fakeStream,
});

return converter.convertTo(response, event);
};
Expand Down
5 changes: 3 additions & 2 deletions packages/open-next/src/overrides/wrappers/cloudflare-edge.ts
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@ const handler: WrapperHandler<
ctx: WorkerContext,
): Promise<Response> => {
globalThis.process = process;
globalThis.openNextWaitUntil = ctx.waitUntil.bind(ctx);

// Set the environment variables
// Cloudflare suggests to not override the process.env object but instead apply the values to it
Expand Down Expand Up @@ -63,7 +62,9 @@ const handler: WrapperHandler<
}
}

const response = await handler(internalEvent);
const response = await handler(internalEvent, {
waitUntil: ctx.waitUntil.bind(ctx),
});

const result: Response = await converter.convertTo(response);

Expand Down
8 changes: 6 additions & 2 deletions packages/open-next/src/overrides/wrappers/cloudflare-node.ts
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ const handler: WrapperHandler<InternalEvent, InternalResult> =
ctx: any,
): Promise<Response> => {
globalThis.process = process;
globalThis.openNextWaitUntil = ctx.waitUntil.bind(ctx);

// Set the environment variables
// Cloudflare suggests to not override the process.env object but instead apply the values to it
Expand Down Expand Up @@ -75,7 +74,12 @@ const handler: WrapperHandler<InternalEvent, InternalResult> =
},
};

ctx.waitUntil(handler(internalEvent, streamCreator));
ctx.waitUntil(
handler(internalEvent, {
streamCreator,
waitUntil: ctx.waitUntil.bind(ctx),
}),
);

return promiseResponse;
};
Expand Down
12 changes: 8 additions & 4 deletions packages/open-next/src/overrides/wrappers/dummy.ts
Original file line number Diff line number Diff line change
@@ -1,9 +1,13 @@
import type { InternalEvent, StreamCreator } from "types/open-next";
import type { Wrapper, WrapperHandler } from "types/overrides";
import type { InternalEvent } from "types/open-next";
import type {
OpenNextHandlerOptions,
Wrapper,
WrapperHandler,
} from "types/overrides";

const dummyWrapper: WrapperHandler = async (handler, converter) => {
return async (event: InternalEvent, responseStream?: StreamCreator) => {
return await handler(event, responseStream);
return async (event: InternalEvent, options?: OpenNextHandlerOptions) => {
return await handler(event, options);
};
};

Expand Down
8 changes: 4 additions & 4 deletions packages/open-next/src/overrides/wrappers/express-dev.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,25 +13,25 @@ const wrapper: WrapperHandler = async (handler, converter) => {

app.all("/_next/image", async (req, res) => {
const internalEvent = await converter.convertFrom(req);
const _res: StreamCreator = {
const streamCreator: StreamCreator = {
writeHeaders: (prelude) => {
res.writeHead(prelude.statusCode, prelude.headers);
return res;
},
};
await imageHandler(internalEvent, _res);
await imageHandler(internalEvent, { streamCreator });
});

app.all("*paths", async (req, res) => {
const internalEvent = await converter.convertFrom(req);
const _res: StreamCreator = {
const streamCreator: StreamCreator = {
writeHeaders: (prelude) => {
res.writeHead(prelude.statusCode, prelude.headers);
return res;
},
onFinish: () => {},
};
await handler(internalEvent, _res);
await handler(internalEvent, { streamCreator });
});

const server = app.listen(
Expand Down
4 changes: 2 additions & 2 deletions packages/open-next/src/overrides/wrappers/node.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ import { debug, error } from "../../adapters/logger";
const wrapper: WrapperHandler = async (handler, converter) => {
const server = createServer(async (req, res) => {
const internalEvent = await converter.convertFrom(req);
const _res: StreamCreator = {
const streamCreator: StreamCreator = {
writeHeaders: (prelude) => {
res.setHeader("Set-Cookie", prelude.cookies);
res.writeHead(prelude.statusCode, prelude.headers);
Expand All @@ -23,7 +23,7 @@ const wrapper: WrapperHandler = async (handler, converter) => {
});
res.end("OK");
} else {
await handler(internalEvent, _res);
await handler(internalEvent, { streamCreator });
}
});

Expand Down
10 changes: 2 additions & 8 deletions packages/open-next/src/types/global.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ import type {
} from "types/overrides";

import type { DetachedPromiseRunner } from "../utils/promise";
import type { OpenNextConfig } from "./open-next";
import type { OpenNextConfig, WaitUntil } from "./open-next";

export interface RequestData {
geo?: {
Expand Down Expand Up @@ -59,6 +59,7 @@ interface OpenNextRequestContext {
pendingPromiseRunner: DetachedPromiseRunner;
isISRRevalidation?: boolean;
mergeHeadersPriority?: "middleware" | "handler";
waitUntil?: WaitUntil;
}

declare global {
Expand Down Expand Up @@ -152,13 +153,6 @@ declare global {
*/
var __openNextAls: AsyncLocalStorage<OpenNextRequestContext>;

/**
* The function that is used to run background tasks even after the response has been sent.
* This one is defined by the wrapper function as most of them don't need or support this feature.
* If not present, all the awaiting promises will be resolved before sending the response.
*/
var openNextWaitUntil: ((promise: Promise<void>) => void) | undefined;

/**
* The entries object that contains the functions that are available in the function.
* Only available in edge runtime functions.
Expand Down
1 change: 1 addition & 0 deletions packages/open-next/src/types/open-next.ts
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ export interface StreamCreator {
onFinish?: (length: number) => void;
}

export type WaitUntil = (promise: Promise<void>) => void;
export interface DangerousOptions {
/**
* The tag cache is used for revalidateTags and revalidatePath.
Expand Down
10 changes: 9 additions & 1 deletion packages/open-next/src/types/overrides.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import type {
Origin,
ResolvedRoute,
StreamCreator,
WaitUntil,
} from "./open-next";

// Queue
Expand Down Expand Up @@ -122,10 +123,17 @@ export type Wrapper<
edgeRuntime?: boolean;
};

export type OpenNextHandlerOptions = {
// Create a `Writeable` for streaming responses.
streamCreator?: StreamCreator;
// Extends the liftetime of the runtime after the response is returned.
waitUntil?: WaitUntil;
};

export type OpenNextHandler<
E extends BaseEventOrResult = InternalEvent,
R extends BaseEventOrResult = InternalResult,
> = (event: E, responseStream?: StreamCreator) => Promise<R>;
> = (event: E, options?: OpenNextHandlerOptions) => Promise<R>;

export type Converter<
E extends BaseEventOrResult = InternalEvent,
Expand Down
Loading

0 comments on commit b59027a

Please sign in to comment.