From 50c4b03418420bbfaa5284b94b456374a92c84ac Mon Sep 17 00:00:00 2001 From: Pooya Parsa Date: Tue, 6 Aug 2024 14:20:30 +0200 Subject: [PATCH 1/4] feat: allow access to all peers --- src/adapters/bun.ts | 41 ++++++++++++++++++++---------- src/adapters/cloudflare-durable.ts | 21 ++++++++++++--- src/adapters/cloudflare.ts | 24 +++++++++++++++-- src/adapters/deno.ts | 32 ++++++++++++----------- src/adapters/node.ts | 23 ++++++++++++----- src/adapters/uws.ts | 37 +++++++++++++++++---------- src/peer.ts | 24 ++++++++++++----- src/types.ts | 8 +++--- test/_utils.ts | 16 +++++++----- test/adapters/node.test.ts | 9 ++++++- test/adapters/uws.test.ts | 11 +++++--- test/fixture/_shared.ts | 22 +++++++++++++++- test/fixture/bun.ts | 6 ++++- test/fixture/cloudflare-durable.ts | 7 ++++- test/fixture/cloudflare.ts | 11 +++++--- test/fixture/deno.ts | 13 +++++++--- test/tests.ts | 22 ++++++++++++++++ 17 files changed, 243 insertions(+), 84 deletions(-) diff --git a/src/adapters/bun.ts b/src/adapters/bun.ts index f5efdac..32498bc 100644 --- a/src/adapters/bun.ts +++ b/src/adapters/bun.ts @@ -3,11 +3,15 @@ import type { WebSocketHandler, ServerWebSocket, Server } from "bun"; import { Message } from "../message"; import { Peer } from "../peer"; -import { AdapterOptions, defineWebSocketAdapter } from "../types"; +import { + AdapterOptions, + AdapterInstance, + defineWebSocketAdapter, +} from "../types"; import { AdapterHookable } from "../hooks"; import { toBufferLike } from "../_utils"; -export interface BunAdapter { +export interface BunAdapter extends AdapterInstance { websocket: WebSocketHandler; handleUpgrade(req: Request, server: Server): Promise; } @@ -15,7 +19,7 @@ export interface BunAdapter { export interface BunOptions extends AdapterOptions {} type ContextData = { - _peer?: Peer; + _peer?: BunPeer; request?: Request; requestUrl?: string; server?: Server; @@ -24,7 +28,9 @@ type ContextData = { export default defineWebSocketAdapter( (options = {}) => { const hooks = new AdapterHookable(options); + const peers = new Set(); return { + peers, async handleUpgrade(request, server) { const res = await hooks.callHook("upgrade", request); if (res instanceof Response) { @@ -38,34 +44,37 @@ export default defineWebSocketAdapter( } satisfies ContextData, headers: res?.headers, }); - return upgradeOK - ? undefined - : new Response("Upgrade failed", { status: 500 }); + if (!upgradeOK) { + return new Response("Upgrade failed", { status: 500 }); + } }, websocket: { message: (ws, message) => { - const peer = getPeer(ws); + const peer = getPeer(ws, peers); hooks.callHook("message", peer, new Message(message)); }, open: (ws) => { - const peer = getPeer(ws); + const peer = getPeer(ws, peers); + peers.add(peer); + hooks.callAdapterHook("bun:open", peer, ws); hooks.callHook("open", peer); }, close: (ws) => { - const peer = getPeer(ws); + const peer = getPeer(ws, peers); + peers.delete(peer); hooks.callAdapterHook("bun:close", peer, ws); hooks.callHook("close", peer, {}); }, drain: (ws) => { - const peer = getPeer(ws); + const peer = getPeer(ws, peers); hooks.callAdapterHook("bun:drain", peer); }, ping(ws, data) { - const peer = getPeer(ws); + const peer = getPeer(ws, peers); hooks.callAdapterHook("bun:ping", peer, ws, data); }, pong(ws, data) { - const peer = getPeer(ws); + const peer = getPeer(ws, peers); hooks.callAdapterHook("bun:pong", peer, ws, data); }, }, @@ -73,11 +82,14 @@ export default defineWebSocketAdapter( }, ); -function getPeer(ws: ServerWebSocket) { +function getPeer( + ws: ServerWebSocket, + peers: Set, +): BunPeer { if (ws.data?._peer) { return ws.data._peer; } - const peer = new BunPeer({ bun: { ws } }); + const peer = new BunPeer({ peers, bun: { ws } }); ws.data = { ...ws.data, _peer: peer, @@ -86,6 +98,7 @@ function getPeer(ws: ServerWebSocket) { } class BunPeer extends Peer<{ + peers: Set; bun: { ws: ServerWebSocket }; }> { get addr() { diff --git a/src/adapters/cloudflare-durable.ts b/src/adapters/cloudflare-durable.ts index f5f5cf3..7143e2f 100644 --- a/src/adapters/cloudflare-durable.ts +++ b/src/adapters/cloudflare-durable.ts @@ -2,7 +2,11 @@ import type * as CF from "@cloudflare/workers-types"; import type { DurableObject } from "cloudflare:workers"; -import { AdapterOptions, defineWebSocketAdapter } from "../types"; +import { + AdapterOptions, + AdapterInstance, + defineWebSocketAdapter, +} from "../types"; import { Peer } from "../peer"; import { Message } from "../message"; import { AdapterHookable } from "../hooks"; @@ -22,7 +26,7 @@ type CrosswsState = { topics?: Set; }; -export interface CloudflareDurableAdapter { +export interface CloudflareDurableAdapter extends AdapterInstance { handleUpgrade( req: Request | CF.Request, env: unknown, @@ -59,7 +63,9 @@ export default defineWebSocketAdapter< CloudflareOptions >((opts) => { const hooks = new AdapterHookable(opts); + const peers = new Set(); return { + peers, handleUpgrade: async (req, env, _context) => { const bindingName = opts?.bindingName ?? "$DurableObject"; const instanceName = opts?.instanceName ?? "crossws"; @@ -81,6 +87,7 @@ export default defineWebSocketAdapter< server as unknown as CF.WebSocket, request, ); + peers.add(peer); (obj as DurableObjectPub).ctx.acceptWebSocket(server); hooks.callAdapterHook("cloudflare:accept", peer); hooks.callHook("open", peer); @@ -98,10 +105,10 @@ export default defineWebSocketAdapter< }, handleDurableClose: async (obj, ws, code, reason, wasClean) => { const peer = peerFromDurableEvent(obj, ws as CF.WebSocket); + peers.delete(peer); const details = { code, reason, wasClean }; hooks.callAdapterHook("cloudflare:close", peer, details); hooks.callHook("close", peer, details); - ws.close(code, reason); }, }; }); @@ -116,6 +123,7 @@ function peerFromDurableEvent( return peer; } peer = ws._crosswsPeer = new CloudflareDurablePeer({ + peers: undefined as any, // Intentionally undefined to avoid wrongly using it cloudflare: { ws: ws as CF.WebSocket, request, @@ -127,6 +135,7 @@ function peerFromDurableEvent( } class CloudflareDurablePeer extends Peer<{ + peers: Set; // Won't be used cloudflare: { ws: AugmentedWebSocket; request?: Request | CF.Request; @@ -166,6 +175,12 @@ class CloudflareDurablePeer extends Peer<{ this._internal.cloudflare.ws.serializeAttachment(state); } + get peers() { + const clients = + this._internal.cloudflare.context.getWebSockets() as unknown as (typeof this._internal.cloudflare.ws)[]; + return new Set(clients.map((c) => c._crosswsPeer!)); + } + publish(topic: string, message: any): void { const clients = ( this._internal.cloudflare.context.getWebSockets() as unknown as (typeof this._internal.cloudflare.ws)[] diff --git a/src/adapters/cloudflare.ts b/src/adapters/cloudflare.ts index 9840d6c..cccfce6 100644 --- a/src/adapters/cloudflare.ts +++ b/src/adapters/cloudflare.ts @@ -3,7 +3,11 @@ import type * as _cf from "@cloudflare/workers-types"; import { Peer } from "../peer"; -import { AdapterOptions, defineWebSocketAdapter } from "../types.js"; +import { + AdapterOptions, + AdapterInstance, + defineWebSocketAdapter, +} from "../types.js"; import { Message } from "../message"; import { WSError } from "../error"; import { AdapterHookable } from "../hooks.js"; @@ -12,7 +16,7 @@ import { toBufferLike } from "../_utils"; declare const WebSocketPair: typeof _cf.WebSocketPair; declare const Response: typeof _cf.Response; -export interface CloudflareAdapter { +export interface CloudflareAdapter extends AdapterInstance { handleUpgrade( req: _cf.Request, env: unknown, @@ -25,7 +29,9 @@ export interface CloudflareOptions extends AdapterOptions {} export default defineWebSocketAdapter( (options = {}) => { const hooks = new AdapterHookable(options); + const peers = new Set(); return { + peers, handleUpgrade: async (request, env, context) => { const res = await hooks.callHook( "upgrade", @@ -40,6 +46,7 @@ export default defineWebSocketAdapter( const peer = new CloudflarePeer({ cloudflare: { client, server, request, env, context }, }); + peers.add(peer); server.accept(); hooks.callAdapterHook("cloudflare:accept", peer); hooks.callHook("open", peer); @@ -48,10 +55,12 @@ export default defineWebSocketAdapter( hooks.callHook("message", peer, new Message(event.data)); }); server.addEventListener("error", (event) => { + peers.delete(peer); hooks.callAdapterHook("cloudflare:error", peer, event); hooks.callHook("error", peer, new WSError(event.error)); }); server.addEventListener("close", (event) => { + peers.delete(peer); hooks.callAdapterHook("cloudflare:close", peer, event); hooks.callHook("close", peer, event); }); @@ -67,6 +76,7 @@ export default defineWebSocketAdapter( ); class CloudflarePeer extends Peer<{ + peers?: never; cloudflare: { client: _cf.WebSocket; server: _cf.WebSocket; @@ -96,6 +106,16 @@ class CloudflarePeer extends Peer<{ return 0; } + publish(_topic: string, _message: any): void { + // Not supported + // Throws: A hanging Promise was canceled + // for (const peer of this._internal.peers) { + // if (peer !== this && peer._topics.has(_topic)) { + // peer.publish(_topic, _message); + // } + // } + } + close(code?: number, reason?: string) { this._internal.cloudflare.client.close(code, reason); } diff --git a/src/adapters/deno.ts b/src/adapters/deno.ts index cdfaef6..139683d 100644 --- a/src/adapters/deno.ts +++ b/src/adapters/deno.ts @@ -5,11 +5,15 @@ import { Message } from "../message.ts"; import { WSError } from "../error.ts"; import { Peer } from "../peer.ts"; -import { AdapterOptions, defineWebSocketAdapter } from "../types.ts"; +import { + AdapterOptions, + AdapterInstance, + defineWebSocketAdapter, +} from "../types.ts"; import { AdapterHookable } from "../hooks.ts"; import { toBufferLike } from "../_utils.ts"; -export interface DenoAdapter { +export interface DenoAdapter extends AdapterInstance { handleUpgrade(req: Request, info: ServeHandlerInfo): Promise; } @@ -22,15 +26,12 @@ declare global { type WebSocketUpgrade = import("@deno/types").Deno.WebSocketUpgrade; type ServeHandlerInfo = unknown; // TODO -type DenoWSSharedState = { - peers: Set; -}; - export default defineWebSocketAdapter( (options = {}) => { const hooks = new AdapterHookable(options); - const sharedState: DenoWSSharedState = { peers: new Set() }; + const peers = new Set(); return { + peers, handleUpgrade: async (request, info) => { const res = await hooks.callHook("upgrade", request); if (res instanceof Response) { @@ -41,9 +42,10 @@ export default defineWebSocketAdapter( headers: res?.headers, }); const peer = new DenoPeer({ - deno: { ws: upgrade.socket, request, info, sharedState }, + peers, + deno: { ws: upgrade.socket, request, info }, }); - sharedState.peers.add(peer); + peers.add(peer); upgrade.socket.addEventListener("open", () => { hooks.callAdapterHook("deno:open", peer); hooks.callHook("open", peer); @@ -53,12 +55,12 @@ export default defineWebSocketAdapter( hooks.callHook("message", peer, new Message(event.data)); }); upgrade.socket.addEventListener("close", () => { - sharedState.peers.delete(peer); + peers.delete(peer); hooks.callAdapterHook("deno:close", peer); hooks.callHook("close", peer, {}); }); upgrade.socket.addEventListener("error", (error) => { - sharedState.peers.delete(peer); + peers.delete(peer); hooks.callAdapterHook("deno:error", peer, error); hooks.callHook("error", peer, new WSError(error)); }); @@ -69,11 +71,11 @@ export default defineWebSocketAdapter( ); class DenoPeer extends Peer<{ + peers: Set; deno: { ws: WebSocketUpgrade["socket"]; request: Request; info: ServeHandlerInfo; - sharedState: DenoWSSharedState; }; }> { get addr() { @@ -98,11 +100,11 @@ class DenoPeer extends Peer<{ return 0; } - publish(topic: string, message: any): void { + publish(topic: string, message: any) { const data = toBufferLike(message); - for (const peer of this._internal.deno.sharedState.peers) { + for (const peer of this._internal.peers) { if (peer !== this && peer._topics.has(topic)) { - peer.send(data); + peer._internal.deno.ws.send(data); } } } diff --git a/src/adapters/node.ts b/src/adapters/node.ts index 22d00dc..3820c73 100644 --- a/src/adapters/node.ts +++ b/src/adapters/node.ts @@ -13,13 +13,17 @@ import type { import { Peer } from "../peer"; import { Message } from "../message"; import { WSError } from "../error"; -import { AdapterOptions, defineWebSocketAdapter } from "../types"; +import { + AdapterOptions, + AdapterInstance, + defineWebSocketAdapter, +} from "../types"; import { AdapterHookable } from "../hooks"; import { toBufferLike } from "../_utils"; type AugmentedReq = IncomingMessage & { _upgradeHeaders?: HeadersInit }; -export interface NodeAdapter { +export interface NodeAdapter extends AdapterInstance { handleUpgrade(req: IncomingMessage, socket: Duplex, head: Buffer): void; closeAll: (code?: number, data?: string | Buffer) => void; } @@ -32,6 +36,7 @@ export interface NodeOptions extends AdapterOptions { export default defineWebSocketAdapter( (options = {}) => { const hooks = new AdapterHookable(options); + const peers = new Set(); const wss: WebSocketServer = options.wss || @@ -41,7 +46,8 @@ export default defineWebSocketAdapter( }) as WebSocketServer); wss.on("connection", (ws, req) => { - const peer = new NodePeer({ node: { ws, req, server: wss } }); + const peer = new NodePeer({ peers, node: { ws, req, server: wss } }); + peers.add(peer); hooks.callHook("open", peer); // Managed socket-level events @@ -53,10 +59,12 @@ export default defineWebSocketAdapter( hooks.callHook("message", peer, new Message(data, isBinary)); }); ws.on("error", (error: Error) => { + peers.delete(peer); hooks.callAdapterHook("node:error", peer, error); hooks.callHook("error", peer, new WSError(error)); }); ws.on("close", (code: number, reason: Buffer) => { + peers.delete(peer); hooks.callAdapterHook("node:close", peer, code, reason); hooks.callHook("close", peer, { code, @@ -95,6 +103,9 @@ export default defineWebSocketAdapter( }); return { + get peers() { + return peers; + }, handleUpgrade: async (req, socket, head) => { const res = await hooks.callHook("upgrade", new NodeReqProxy(req)); if (res instanceof Response) { @@ -163,6 +174,7 @@ async function sendResponse(socket: Duplex, res: Response) { } class NodePeer extends Peer<{ + peers: Set; node: { server: WebSocketServer; req: IncomingMessage; @@ -221,9 +233,8 @@ class NodePeer extends Peer<{ binary: isBinary, ...options, }; - for (const client of this._internal.node.server.clients) { - const peer = (client as WebSocketT & { _peer?: NodePeer })._peer; - if (peer && peer !== this && peer._topics.has(topic)) { + for (const peer of this._internal.peers) { + if (peer !== this && peer._topics.has(topic)) { peer._internal.node.ws.send(data, sendOptions); } } diff --git a/src/adapters/uws.ts b/src/adapters/uws.ts index ff07e2d..dcdb7a3 100644 --- a/src/adapters/uws.ts +++ b/src/adapters/uws.ts @@ -10,7 +10,11 @@ import type { } from "uWebSockets.js"; import { Peer } from "../peer"; import { Message } from "../message"; -import { AdapterOptions, defineWebSocketAdapter } from "../types"; +import { + AdapterOptions, + AdapterInstance, + defineWebSocketAdapter, +} from "../types"; import { AdapterHookable } from "../hooks"; import { toBufferLike } from "../_utils"; @@ -23,7 +27,7 @@ type UserData = { type WebSocketHandler = WebSocketBehavior; -export interface UWSAdapter { +export interface UWSAdapter extends AdapterInstance { websocket: WebSocketHandler; } @@ -44,11 +48,14 @@ export interface UWSOptions extends AdapterOptions { export default defineWebSocketAdapter( (options = {}) => { const hooks = new AdapterHookable(options); + const peers = new Set(); return { + peers, websocket: { ...options.uws, close(ws, code, message) { - const peer = getPeer(ws); + const peer = getPeer(ws, peers); + peers.delete(peer); hooks.callAdapterHook("uws:close", peer, ws, code, message); hooks.callHook("close", peer, { code, @@ -56,30 +63,31 @@ export default defineWebSocketAdapter( }); }, drain(ws) { - const peer = getPeer(ws); + const peer = getPeer(ws, peers); hooks.callAdapterHook("uws:drain", peer, ws); }, message(ws, message, isBinary) { - const peer = getPeer(ws); + const peer = getPeer(ws, peers); hooks.callAdapterHook("uws:message", peer, ws, message, isBinary); const msg = new Message(message, isBinary); hooks.callHook("message", peer, msg); }, open(ws) { - const peer = getPeer(ws); + const peer = getPeer(ws, peers); + peers.add(peer); hooks.callAdapterHook("uws:open", peer, ws); hooks.callHook("open", peer); }, ping(ws, message) { - const peer = getPeer(ws); + const peer = getPeer(ws, peers); hooks.callAdapterHook("uws:ping", peer, ws, message); }, pong(ws, message) { - const peer = getPeer(ws); + const peer = getPeer(ws, peers); hooks.callAdapterHook("uws:pong", peer, ws, message); }, subscription(ws, topic, newCount, oldCount) { - const peer = getPeer(ws); + const peer = getPeer(ws, peers); hooks.callAdapterHook( "uws:subscription", peer, @@ -146,14 +154,14 @@ class UWSReqProxy { private _rawHeaders: [string, string][] = []; url: string; - constructor(private _req: HttpRequest) { + constructor(_req: HttpRequest) { // We need to precompute values since uws doesn't provide them after handler. // Headers let host = "localhost"; let proto = "http"; // eslint-disable-next-line unicorn/no-array-for-each - this._req.forEach((key, value) => { + _req.forEach((key, value) => { if (key === "host") { host = value; } else if (key === "x-forwarded-proto" && value === "https") { @@ -176,17 +184,18 @@ class UWSReqProxy { } } -function getPeer(ws: WebSocket) { +function getPeer(ws: WebSocket, peers: Set): UWSPeer { const userData = ws.getUserData(); if (userData._peer) { - return userData._peer as Peer; + return userData._peer as UWSPeer; } - const peer = new UWSPeer({ uws: { ws, userData } }); + const peer = new UWSPeer({ peers, uws: { ws, userData } }); userData._peer = peer; return peer; } class UWSPeer extends Peer<{ + peers: Set; uws: { ws: WebSocket; userData: UserData; diff --git a/src/peer.ts b/src/peer.ts index 5b7858f..db73442 100644 --- a/src/peer.ts +++ b/src/peer.ts @@ -8,17 +8,21 @@ const ReadyStateMap = { 3: "closed", } as const; -export abstract class Peer { - protected _internal: AdapterContext; +export interface AdapterInternal { + peers?: Set; +} + +export abstract class Peer { + protected _internal: Internal; protected _topics: Set; private static _idCounter = 0; private _id: string; - constructor(_internalCtx: AdapterContext) { + constructor(internal: Internal) { this._id = ++Peer._idCounter + ""; this._topics = new Set(); - this._internal = _internalCtx; + this._internal = internal; } get id(): string { @@ -41,11 +45,17 @@ export abstract class Peer { return -1; } + get peers(): Set { + return this._internal.peers || new Set(); + } + abstract send(message: any, options?: { compress?: boolean }): number; - publish(topic: string, message: any, options?: { compress?: boolean }) { - // noop - } + abstract publish( + topic: string, + message: any, + options?: { compress?: boolean }, + ): void; subscribe(topic: string) { this._topics.add(topic); diff --git a/src/types.ts b/src/types.ts index e4f2f2a..7a7d8d9 100644 --- a/src/types.ts +++ b/src/types.ts @@ -4,7 +4,9 @@ import type { Peer } from "./peer.ts"; // --- Adapter --- -export interface CrossWSAdapter {} +export interface AdapterInstance { + readonly peers: Set; +} export interface AdapterOptions { resolve?: ResolveHooks; @@ -13,12 +15,12 @@ export interface AdapterOptions { } export type Adapter< - AdapterT extends CrossWSAdapter = CrossWSAdapter, + AdapterT extends AdapterInstance = AdapterInstance, Options extends AdapterOptions = AdapterOptions, > = (options?: Options) => AdapterT; export function defineWebSocketAdapter< - AdapterT extends CrossWSAdapter = CrossWSAdapter, + AdapterT extends AdapterInstance = AdapterInstance, Options extends AdapterOptions = AdapterOptions, >(factory: Adapter) { return factory; diff --git a/test/_utils.ts b/test/_utils.ts index 42cf554..18a094c 100644 --- a/test/_utils.ts +++ b/test/_utils.ts @@ -8,15 +8,24 @@ import { wsTests } from "./tests"; const fixtureDir = fileURLToPath(new URL("fixture", import.meta.url)); +const websockets = new Set(); +afterEach(() => { + for (const ws of websockets) { + ws.close(); + } + websockets.clear(); +}); + export function wsConnect( url: string, opts?: { skip?: number; headers?: OutgoingHttpHeaders }, ) { const ws = new WebSocket(url, { headers: opts?.headers }); + websockets.add(ws); const upgradeHeaders: Record = Object.create(null); - const send = async (data: any) => { + const send = async (data: any): Promise => { ws.send( typeof data === "string" ? data : JSON.stringify({ message: data }), ); @@ -55,11 +64,6 @@ export function wsConnect( } }); - afterEach(() => { - ws.removeAllListeners(); - ws.close(); - }); - const res = { ws, send, diff --git a/test/adapters/node.test.ts b/test/adapters/node.test.ts index 165177e..4f197bf 100644 --- a/test/adapters/node.test.ts +++ b/test/adapters/node.test.ts @@ -12,7 +12,14 @@ describe("node", () => { beforeAll(async () => { ws = createDemo(nodeAdapter); - server = createServer((_req, res) => { + server = createServer((req, res) => { + if (req.url === "/peers") { + return res.end( + JSON.stringify({ + peers: [...ws.peers].map((p) => p.id), + }), + ); + } res.end("ok"); }); server.on("upgrade", ws.handleUpgrade); diff --git a/test/adapters/uws.test.ts b/test/adapters/uws.test.ts index a2990e6..7b99413 100644 --- a/test/adapters/uws.test.ts +++ b/test/adapters/uws.test.ts @@ -19,14 +19,19 @@ describe("uws", () => { res.onAborted(() => { aborted = true; }); - const html = "OK"; + + let resBody = "OK"; + const url = req.getUrl(); + if (url === "/peers") { + resBody = JSON.stringify({ peers: [...ws.peers].map((p) => p.id) }); + } + if (aborted) { return; } res.cork(() => { res.writeStatus("200 OK"); - res.writeHeader("Content-Type", "text/html"); - res.end(html); + res.end(resBody); }); }); diff --git a/test/fixture/_shared.ts b/test/fixture/_shared.ts index 6ba6175..11654a3 100644 --- a/test/fixture/_shared.ts +++ b/test/fixture/_shared.ts @@ -1,4 +1,4 @@ -import { ResolveHooks, Adapter, defineHooks } from "../../src/index.ts"; +import { Adapter, AdapterInstance, defineHooks } from "../../src/index.ts"; export const getIndexHTML = () => import("./_index.html.ts").then((r) => r.default); @@ -33,6 +33,12 @@ export function createDemo>( }); break; } + case "peers": { + peer.send({ + peers: [...peer.peers].map((p) => p.id), + }); + break; + } default: { peer.send(msgText); peer.publish("chat", msgText); @@ -61,3 +67,17 @@ export function createDemo>( hooks, }); } + +export function handleDemoRoutes( + ws: AdapterInstance, + request: Request, +): Response | undefined { + const url = new URL(request.url); + if (url.pathname === "/peers") { + return new Response( + JSON.stringify({ + peers: [...ws.peers].map((p) => p.id), + }), + ); + } +} diff --git a/test/fixture/bun.ts b/test/fixture/bun.ts index a6e9035..d1ea54b 100644 --- a/test/fixture/bun.ts +++ b/test/fixture/bun.ts @@ -1,7 +1,7 @@ // You can run this demo using `bun --bun ./bun.ts` or `npm run play:bun` in repo import bunAdapter from "../../src/adapters/bun"; -import { createDemo, getIndexHTML } from "./_shared"; +import { createDemo, getIndexHTML, handleDemoRoutes } from "./_shared"; const ws = createDemo(bunAdapter); @@ -10,6 +10,10 @@ Bun.serve({ hostname: "localhost", websocket: ws.websocket, async fetch(request, server) { + const response = handleDemoRoutes(ws, request); + if (response) { + return response; + } if (request.headers.get("upgrade") === "websocket") { return ws.handleUpgrade(request, server); } diff --git a/test/fixture/cloudflare-durable.ts b/test/fixture/cloudflare-durable.ts index 347a56d..42c07f1 100644 --- a/test/fixture/cloudflare-durable.ts +++ b/test/fixture/cloudflare-durable.ts @@ -1,7 +1,7 @@ // You can run this demo using `npm run play:cf-durable` in repo import { DurableObject } from "cloudflare:workers"; import cloudflareAdapter from "../../src/adapters/cloudflare-durable.ts"; -import { createDemo, getIndexHTML } from "./_shared.ts"; +import { createDemo, getIndexHTML, handleDemoRoutes } from "./_shared.ts"; const ws = createDemo(cloudflareAdapter); @@ -11,6 +11,11 @@ export default { env: Record, context: ExecutionContext, ): Promise { + const response = handleDemoRoutes(ws, request); + if (response) { + return response; + } + if (request.headers.get("upgrade") === "websocket") { return ws.handleUpgrade(request, env, context); } diff --git a/test/fixture/cloudflare.ts b/test/fixture/cloudflare.ts index 1dd9a22..83d6000 100644 --- a/test/fixture/cloudflare.ts +++ b/test/fixture/cloudflare.ts @@ -1,7 +1,7 @@ // You can run this demo using `npm run play:cf` in repo -import type { Request, ExecutionContext } from "@cloudflare/workers-types"; +import type { ExecutionContext } from "@cloudflare/workers-types"; import cloudflareAdapter from "../../src/adapters/cloudflare"; -import { createDemo, getIndexHTML } from "./_shared.ts"; +import { createDemo, getIndexHTML, handleDemoRoutes } from "./_shared.ts"; const ws = createDemo(cloudflareAdapter); @@ -11,8 +11,13 @@ export default { env: Record, context: ExecutionContext, ) { + const response = handleDemoRoutes(ws, request); + if (response) { + return response; + } + if (request.headers.get("upgrade") === "websocket") { - return ws.handleUpgrade(request, env, context); + return ws.handleUpgrade(request as any, env, context); } return new Response(await getIndexHTML(), { diff --git a/test/fixture/deno.ts b/test/fixture/deno.ts index d9ad246..59ad329 100644 --- a/test/fixture/deno.ts +++ b/test/fixture/deno.ts @@ -5,15 +5,20 @@ import denoAdapter from "../../src/adapters/deno.ts"; // @ts-ignore import type * as _Deno from "../types/lib.deno.d.ts"; -import { createDemo, getIndexHTML } from "./_shared.ts"; +import { createDemo, getIndexHTML, handleDemoRoutes } from "./_shared.ts"; const ws = createDemo(denoAdapter); const port = Number.parseInt(Deno.env.get("PORT") || "") || 3001; -Deno.serve({ hostname: "localhost", port }, async (req, info) => { - if (req.headers.get("upgrade") === "websocket") { - return ws.handleUpgrade(req, info); +Deno.serve({ hostname: "localhost", port }, async (request, info) => { + const response = handleDemoRoutes(ws, request); + if (response) { + return response; + } + + if (request.headers.get("upgrade") === "websocket") { + return ws.handleUpgrade(request, info); } return new Response(await getIndexHTML(), { headers: { "Content-Type": "text/html" }, diff --git a/test/tests.ts b/test/tests.ts index 11c955b..cb8beb2 100644 --- a/test/tests.ts +++ b/test/tests.ts @@ -88,4 +88,26 @@ export function wsTests( }, }); }); + + test("get peers from adapter", async () => { + await wsConnect(getURL()); + await wsConnect(getURL()); + const response = await fetch(getURL().replace("ws", "http") + "peers"); + const { peers } = (await response.json()) as any; + expect(peers.length).toBe(2); + }); + + test("get peers from peer", async () => { + const ws1 = await wsConnect(getURL(), { skip: 1 }); + const ws2 = await wsConnect(getURL(), { skip: 1 }); + if (opts.pubsub !== false) { + ws1.skip(); // join message for ws2 + } + await ws1.send("peers"); + await ws2.send("peers"); + const { peers: peers1 } = await ws1.next(); + const { peers: peers2 } = await ws2.next(); + expect(peers1.length).toBe(2); + expect(peers1).toMatchObject(peers2); + }); } From bdc9f115a04f5f8348e95a164946c6d1ff2019be Mon Sep 17 00:00:00 2001 From: Pooya Parsa Date: Tue, 6 Aug 2024 14:26:09 +0200 Subject: [PATCH 2/4] fix cf stuff --- src/adapters/cloudflare-durable.ts | 3 +-- src/adapters/cloudflare.ts | 3 ++- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/src/adapters/cloudflare-durable.ts b/src/adapters/cloudflare-durable.ts index 7143e2f..963f0a6 100644 --- a/src/adapters/cloudflare-durable.ts +++ b/src/adapters/cloudflare-durable.ts @@ -123,7 +123,6 @@ function peerFromDurableEvent( return peer; } peer = ws._crosswsPeer = new CloudflareDurablePeer({ - peers: undefined as any, // Intentionally undefined to avoid wrongly using it cloudflare: { ws: ws as CF.WebSocket, request, @@ -135,7 +134,7 @@ function peerFromDurableEvent( } class CloudflareDurablePeer extends Peer<{ - peers: Set; // Won't be used + peers?: never; cloudflare: { ws: AugmentedWebSocket; request?: Request | CF.Request; diff --git a/src/adapters/cloudflare.ts b/src/adapters/cloudflare.ts index cccfce6..fd8b4e5 100644 --- a/src/adapters/cloudflare.ts +++ b/src/adapters/cloudflare.ts @@ -44,6 +44,7 @@ export default defineWebSocketAdapter( const client = pair[0]; const server = pair[1]; const peer = new CloudflarePeer({ + peers, cloudflare: { client, server, request, env, context }, }); peers.add(peer); @@ -76,7 +77,7 @@ export default defineWebSocketAdapter( ); class CloudflarePeer extends Peer<{ - peers?: never; + peers: Set; cloudflare: { client: _cf.WebSocket; server: _cf.WebSocket; From 57a59c59f4030566d73cae07cb0f6f5f2f361572 Mon Sep 17 00:00:00 2001 From: Pooya Parsa Date: Tue, 6 Aug 2024 14:27:52 +0200 Subject: [PATCH 3/4] remove extra getter for node --- src/adapters/node.ts | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/src/adapters/node.ts b/src/adapters/node.ts index 3820c73..b498127 100644 --- a/src/adapters/node.ts +++ b/src/adapters/node.ts @@ -103,9 +103,7 @@ export default defineWebSocketAdapter( }); return { - get peers() { - return peers; - }, + peers, handleUpgrade: async (req, socket, head) => { const res = await hooks.callHook("upgrade", new NodeReqProxy(req)); if (res instanceof Response) { From a665d1ee77d7a311ee9b35a3460dcaa007d468e7 Mon Sep 17 00:00:00 2001 From: Pooya Parsa Date: Tue, 6 Aug 2024 14:33:51 +0200 Subject: [PATCH 4/4] try to restore durable peers --- src/adapters/cloudflare-durable.ts | 17 ++++++++++++++++- 1 file changed, 16 insertions(+), 1 deletion(-) diff --git a/src/adapters/cloudflare-durable.ts b/src/adapters/cloudflare-durable.ts index 963f0a6..6b26a71 100644 --- a/src/adapters/cloudflare-durable.ts +++ b/src/adapters/cloudflare-durable.ts @@ -177,7 +177,22 @@ class CloudflareDurablePeer extends Peer<{ get peers() { const clients = this._internal.cloudflare.context.getWebSockets() as unknown as (typeof this._internal.cloudflare.ws)[]; - return new Set(clients.map((c) => c._crosswsPeer!)); + return new Set( + clients.map((client) => { + let peer = client._crosswsPeer; + if (!peer) { + peer = client._crosswsPeer = new CloudflareDurablePeer({ + cloudflare: { + ws: client, + request: undefined, + env: this._internal.cloudflare.env, + context: this._internal.cloudflare.context, + }, + }); + } + return peer; + }), + ); } publish(topic: string, message: any): void {