Skip to content

Commit

Permalink
feat: allow access to peer url and headers
Browse files Browse the repository at this point in the history
  • Loading branch information
pi0 committed Feb 24, 2024
1 parent 7e36eba commit b67bef0
Show file tree
Hide file tree
Showing 9 changed files with 135 additions and 24 deletions.
6 changes: 3 additions & 3 deletions docs/2.adapters/bun.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,18 +6,18 @@ icon: simple-icons:bun

> Integrate CrossWS with Bun.
To integrate CrossWS with your Bun server, you need to check for `server.upgrade` and also pass the `websocket` object returned from the adapter to server options. CrossWS leverages native Bun WebSocket API.
To integrate CrossWS with your Bun server, you need to handle upgrade with `handleUpgrade` util and also pass the `websocket` object returned from the adapter to server options. CrossWS leverages native Bun WebSocket API.

```ts
import wsAdapter from "./dist/adapters/bun";

const { websocket } = wsAdapter({ message: console.log });
const { websocket, handleUpgrade } = wsAdapter({ message: console.log });

Bun.serve({
port: 3000,
websocket,
fetch(req, server) {
if (server.upgrade(req)) {
if (handleUpgrade(req, server)) {
return;
}
return new Response(
Expand Down
12 changes: 11 additions & 1 deletion playground/_common.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,17 @@ export function createDemo<T extends WebSocketAdapter>(
): ReturnType<T> {
const hooks = createWebSocketDebugHooks({
open(peer) {
peer.send("Hello!");
peer.send(`Hello!`);
peer.send(
JSON.stringify(
{
url: peer.url,
headers: peer.headers && Object.fromEntries(peer.headers),
},
undefined,
2,
),
);
},
message(peer, message) {
if (message.text() === "ping") {
Expand Down
2 changes: 1 addition & 1 deletion playground/bun.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ Bun.serve({
port: 3001,
websocket: adapter.websocket,
async fetch(req, server) {
if (server.upgrade(req)) {
if (server.upgrade(req, { data: { req, server } })) {
return;
}
return new Response(await getIndexHTML({ name: "bun" }), {
Expand Down
21 changes: 19 additions & 2 deletions src/adapters/bun.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
// https://bun.sh/docs/api/websockets

import type { WebSocketHandler, ServerWebSocket } from "bun";
import type { WebSocketHandler, ServerWebSocket, Server } from "bun";

import { WebSocketMessage } from "../message";
import { WebSocketError } from "../error";
Expand All @@ -10,7 +10,11 @@ import { CrossWSOptions, createCrossWS } from "../crossws";

export interface AdapterOptions extends CrossWSOptions {}

type ContextData = { _peer?: WebSocketPeer };
type ContextData = {
_peer?: WebSocketPeer;
req?: Request;
server?: Server;
};

export interface Adapter {
websocket: WebSocketHandler<ContextData>;
Expand All @@ -31,6 +35,11 @@ export default defineWebSocketAdapter<Adapter, AdapterOptions>(
};

return {
handleUpgrade(req: Request, server: Server) {
return server.upgrade(req, {
data: { req, server },
});
},
websocket: {
message: (ws, message) => {
const peer = getPeer(ws);
Expand Down Expand Up @@ -85,6 +94,14 @@ class WebSocketPeer extends WebSocketPeerBase<{
return this.ctx.bun.ws.readyState as any;
}

get url() {
return this.ctx.bun.ws.data.req?.url || "/";
}

get headers() {
return this.ctx.bun.ws.data.req?.headers || new Headers();
}

send(message: string | ArrayBuffer) {
this.ctx.bun.ws.send(message);
return 0;
Expand Down
14 changes: 11 additions & 3 deletions src/adapters/cloudflare.ts
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ export default defineWebSocketAdapter<Adapter, AdapterOptions>(
const crossws = createCrossWS(hooks, options);

const handleUpgrade = (
request: _cf.Request,
req: _cf.Request,
env: Env,
context: _cf.ExecutionContext,
) => {
Expand All @@ -37,7 +37,7 @@ export default defineWebSocketAdapter<Adapter, AdapterOptions>(
const server = pair[1];

const peer = new CloudflareWebSocketPeer({
cloudflare: { client, server, request, env, context },
cloudflare: { client, server, req, env, context },
});

server.accept();
Expand Down Expand Up @@ -77,7 +77,7 @@ class CloudflareWebSocketPeer extends WebSocketPeerBase<{
cloudflare: {
client: _cf.WebSocket;
server: _cf.WebSocket;
request: _cf.Request;
req: _cf.Request;
env: Env;
context: _cf.ExecutionContext;
};
Expand All @@ -86,6 +86,14 @@ class CloudflareWebSocketPeer extends WebSocketPeerBase<{
return undefined;
}

get url() {
return this.ctx.cloudflare.req.url;
}

get headers() {
return this.ctx.cloudflare.req.headers as Headers;
}

get readyState() {
return this.ctx.cloudflare.client.readyState as -1 | 0 | 1 | 2 | 3;
}
Expand Down
16 changes: 12 additions & 4 deletions src/adapters/deno.ts
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,10 @@ export default defineWebSocketAdapter<Adapter, AdapterOptions>(
(hooks, options = {}) => {
const crossws = createCrossWS(hooks, options);

const handleUpgrade = (request: Request) => {
const upgrade = Deno.upgradeWebSocket(request);
const handleUpgrade = (req: Request) => {
const upgrade = Deno.upgradeWebSocket(req);
const peer = new DenoWebSocketPeer({
deno: { ws: upgrade.socket, request },
deno: { ws: upgrade.socket, req },
});
upgrade.socket.addEventListener("open", () => {
crossws.$("deno:open", peer);
Expand Down Expand Up @@ -53,7 +53,7 @@ export default defineWebSocketAdapter<Adapter, AdapterOptions>(
);

class DenoWebSocketPeer extends WebSocketPeerBase<{
deno: { ws: any; request: Request };
deno: { ws: any; req: Request };
}> {
get id() {
return this.ctx.deno.ws.remoteAddress;
Expand All @@ -63,6 +63,14 @@ class DenoWebSocketPeer extends WebSocketPeerBase<{
return this.ctx.deno.ws.readyState as -1 | 0 | 1 | 2 | 3;
}

get url() {
return this.ctx.deno.req.url;
}

get headers() {
return this.ctx.deno.req.headers || new Headers();
}

send(message: string | ArrayBuffer) {
this.ctx.deno.ws.send(message);
return 0;
Expand Down
50 changes: 46 additions & 4 deletions src/adapters/node-uws.ts
Original file line number Diff line number Diff line change
@@ -1,13 +1,23 @@
// https://github.com/websockets/ws
// https://github.com/websockets/ws/blob/master/doc/ws.md

import { WebSocketBehavior, WebSocket } from "uWebSockets.js";
import type {
WebSocketBehavior,
WebSocket,
HttpRequest,
HttpResponse,
} from "uWebSockets.js";
import { WebSocketPeerBase } from "../peer";
import { WebSocketMessage } from "../message";
import { defineWebSocketAdapter } from "../adapter";
import { CrossWSOptions, createCrossWS } from "../crossws";

type UserData = { _peer?: any };
type UserData = {
_peer?: any;
req: HttpRequest;
res: HttpResponse;
context: any;
};

type WebSocketHandler = WebSocketBehavior<UserData>;

Expand Down Expand Up @@ -38,7 +48,7 @@ export default defineWebSocketAdapter<Adapter, AdapterOptions>(
if (userData._peer) {
return userData._peer as WebSocketPeer;
}
const peer = new WebSocketPeer({ uws: { ws } });
const peer = new WebSocketPeer({ uws: { ws, userData } });
userData._peer = peer;
return peer;
};
Expand Down Expand Up @@ -78,7 +88,20 @@ export default defineWebSocketAdapter<Adapter, AdapterOptions>(
crossws.$("uws:subscription", peer, ws, topic, newCount, oldCount);
},
// error ? TODO
// upgrade(res, req, context) {}
upgrade(res, req, context) {
/* This immediately calls open handler, you must not use res after this call */
res.upgrade(
{
req,
res,
context,
},
req.getHeader("sec-websocket-key"),
req.getHeader("sec-websocket-protocol"),
req.getHeader("sec-websocket-extensions"),
context,
);
},
};

return {
Expand All @@ -90,8 +113,11 @@ export default defineWebSocketAdapter<Adapter, AdapterOptions>(
class WebSocketPeer extends WebSocketPeerBase<{
uws: {
ws: WebSocket<UserData>;
userData: UserData;
};
}> {
_headers: Headers | undefined;

get id() {
try {
const addr = this.ctx.uws.ws?.getRemoteAddressAsText();
Expand All @@ -104,6 +130,22 @@ class WebSocketPeer extends WebSocketPeerBase<{
// TODO
// get readyState() {}

get url() {
return this.ctx.uws.userData.req.getUrl();
}

get headers() {
if (!this._headers) {
const headers = new Headers();
// eslint-disable-next-line unicorn/no-array-for-each
this.ctx.uws.userData.req.forEach((key, value) => {
headers.set(key, value);
});
this._headers = headers;
}
return this._headers;
}

send(message: string, compress?: boolean) {
this.ctx.uws.ws.send(message, false, compress);
return 0;
Expand Down
22 changes: 22 additions & 0 deletions src/adapters/node.ts
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,8 @@ class NodeWebSocketPeer extends WebSocketPeerBase<{
ws: WebSocketT;
};
}> {
_headers: Headers | undefined;

get id() {
const socket = this.ctx.node.req.socket;
if (!socket) {
Expand All @@ -124,6 +126,26 @@ class NodeWebSocketPeer extends WebSocketPeerBase<{
return `${addr}:${socket.remotePort}`;
}

get url() {
return this.ctx.node.req.url || "/";
}

get headers() {
if (!this._headers) {
this._headers = new Headers();
for (const [key, value] of Object.entries(this.ctx.node.req.headers)) {
if (typeof value === "string") {
this._headers.append(key, value);
} else if (Array.isArray(value)) {
for (const v of value) {
this._headers.append(key, v);
}
} // else value is undefined
}
}
return this._headers;
}

get readyState() {
return this.ctx.node.ws.readyState;
}
Expand Down
16 changes: 10 additions & 6 deletions src/peer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,17 +8,21 @@ const ReadyStateMap = {
3: "closed",
} as const;

export interface WebSocketContext {}

export abstract class WebSocketPeerBase<
T extends WebSocketContext = WebSocketContext,
> {
constructor(public ctx: T) {}
export abstract class WebSocketPeerBase<AdapterContext = any> {
constructor(public ctx: AdapterContext) {}

get id(): string | undefined {
return undefined;
}

get url(): string {
return "/";
}

get headers(): Headers {
return new Headers();
}

get readyState(): ReadyState | -1 {
return -1;
}
Expand Down

0 comments on commit b67bef0

Please sign in to comment.