Skip to content

Commit

Permalink
refactor!: move peer.ctx to peer._internal (#59)
Browse files Browse the repository at this point in the history
  • Loading branch information
pi0 authored Aug 6, 2024
1 parent 51fca9e commit 5e92bd7
Show file tree
Hide file tree
Showing 7 changed files with 59 additions and 52 deletions.
20 changes: 10 additions & 10 deletions src/adapters/bun.ts
Original file line number Diff line number Diff line change
Expand Up @@ -91,50 +91,50 @@ class BunPeer extends Peer<{
bun: { ws: ServerWebSocket<ContextData> };
}> {
get addr() {
let addr = this.ctx.bun.ws.remoteAddress;
let addr = this._internal.bun.ws.remoteAddress;
if (addr.includes(":")) {
addr = `[${addr}]`;
}
return addr;
}

get readyState() {
return this.ctx.bun.ws.readyState as any;
return this._internal.bun.ws.readyState as any;
}

get url() {
return this.ctx.bun.ws.data.requestUrl || "/";
return this._internal.bun.ws.data.requestUrl || "/";
}

get headers() {
return this.ctx.bun.ws.data.request?.headers;
return this._internal.bun.ws.data.request?.headers;
}

send(message: any, options?: { compress?: boolean }) {
return this.ctx.bun.ws.send(toBufferLike(message), options?.compress);
return this._internal.bun.ws.send(toBufferLike(message), options?.compress);
}

publish(topic: string, message: any, options?: { compress?: boolean }) {
return this.ctx.bun.ws.publish(
return this._internal.bun.ws.publish(
topic,
toBufferLike(message),
options?.compress,
);
}

subscribe(topic: string): void {
this.ctx.bun.ws.subscribe(topic);
this._internal.bun.ws.subscribe(topic);
}

unsubscribe(topic: string): void {
this.ctx.bun.ws.unsubscribe(topic);
this._internal.bun.ws.unsubscribe(topic);
}

close(code?: number, reason?: string) {
this.ctx.bun.ws.close(code, reason);
this._internal.bun.ws.close(code, reason);
}

terminate() {
this.ctx.bun.ws.terminate();
this._internal.bun.ws.terminate();
}
}
24 changes: 14 additions & 10 deletions src/adapters/cloudflare-durable.ts
Original file line number Diff line number Diff line change
Expand Up @@ -135,37 +135,41 @@ class CloudflareDurablePeer extends Peer<{
};
}> {
get url() {
return this.ctx.cloudflare.request?.url || this.ctx.cloudflare.ws.url || "";
return (
this._internal.cloudflare.request?.url ||
this._internal.cloudflare.ws.url ||
""
);
}

get headers() {
return this.ctx.cloudflare.request?.headers as Headers;
return this._internal.cloudflare.request?.headers as Headers;
}

get readyState() {
return this.ctx.cloudflare.ws.readyState as -1 | 0 | 1 | 2 | 3;
return this._internal.cloudflare.ws.readyState as -1 | 0 | 1 | 2 | 3;
}

send(message: any) {
this.ctx.cloudflare.ws.send(toBufferLike(message));
this._internal.cloudflare.ws.send(toBufferLike(message));
return 0;
}

subscribe(topic: string): void {
super.subscribe(topic);
const state: CrosswsState = {
// Max limit: 2,048 bytes
...(this.ctx.cloudflare.ws.deserializeAttachment() as CrosswsState),
...(this._internal.cloudflare.ws.deserializeAttachment() as CrosswsState),
topics: this._topics,
};
this.ctx.cloudflare.ws._crosswsState = state;
this.ctx.cloudflare.ws.serializeAttachment(state);
this._internal.cloudflare.ws._crosswsState = state;
this._internal.cloudflare.ws.serializeAttachment(state);
}

publish(topic: string, message: any): void {
const clients = (
this.ctx.cloudflare.context.getWebSockets() as unknown as (typeof this.ctx.cloudflare.ws)[]
).filter((c) => c !== this.ctx.cloudflare.ws);
this._internal.cloudflare.context.getWebSockets() as unknown as (typeof this._internal.cloudflare.ws)[]
).filter((c) => c !== this._internal.cloudflare.ws);
if (clients.length === 0) {
return;
}
Expand All @@ -183,7 +187,7 @@ class CloudflareDurablePeer extends Peer<{
}

close(code?: number, reason?: string) {
this.ctx.cloudflare.ws.close(code, reason);
this._internal.cloudflare.ws.close(code, reason);
}

terminate(): void {
Expand Down
10 changes: 5 additions & 5 deletions src/adapters/cloudflare.ts
Original file line number Diff line number Diff line change
Expand Up @@ -80,24 +80,24 @@ class CloudflarePeer extends Peer<{
}

get url() {
return this.ctx.cloudflare.request.url;
return this._internal.cloudflare.request.url;
}

get headers() {
return this.ctx.cloudflare.request.headers as unknown as Headers;
return this._internal.cloudflare.request.headers as unknown as Headers;
}

get readyState() {
return this.ctx.cloudflare.client.readyState as -1 | 0 | 1 | 2 | 3;
return this._internal.cloudflare.client.readyState as -1 | 0 | 1 | 2 | 3;
}

send(message: any) {
this.ctx.cloudflare.server.send(toBufferLike(message));
this._internal.cloudflare.server.send(toBufferLike(message));
return 0;
}

close(code?: number, reason?: string) {
this.ctx.cloudflare.client.close(code, reason);
this._internal.cloudflare.client.close(code, reason);
}

terminate(): void {
Expand Down
16 changes: 8 additions & 8 deletions src/adapters/deno.ts
Original file line number Diff line number Diff line change
Expand Up @@ -78,41 +78,41 @@ class DenoPeer extends Peer<{
}> {
get addr() {
// @ts-expect-error types missing
return this.ctx.deno.ws.remoteAddress;
return this._internal.deno.ws.remoteAddress;
}

get readyState() {
return this.ctx.deno.ws.readyState as -1 | 0 | 1 | 2 | 3;
return this._internal.deno.ws.readyState as -1 | 0 | 1 | 2 | 3;
}

get url() {
return this.ctx.deno.request.url;
return this._internal.deno.request.url;
}

get headers() {
return this.ctx.deno.request.headers || new Headers();
return this._internal.deno.request.headers || new Headers();
}

send(message: any) {
this.ctx.deno.ws.send(toBufferLike(message));
this._internal.deno.ws.send(toBufferLike(message));
return 0;
}

publish(topic: string, message: any): void {
const data = toBufferLike(message);
for (const peer of this.ctx.deno.sharedState.peers) {
for (const peer of this._internal.deno.sharedState.peers) {
if (peer !== this && peer._topics.has(topic)) {
peer.send(data);
}
}
}

close(code?: number, reason?: string) {
this.ctx.deno.ws.close(code, reason);
this._internal.deno.ws.close(code, reason);
}

terminate(): void {
// @ts-ignore (terminate is Deno-only api)
this.ctx.deno.ws.terminate();
this._internal.deno.ws.terminate();
}
}
18 changes: 9 additions & 9 deletions src/adapters/node.ts
Original file line number Diff line number Diff line change
Expand Up @@ -170,18 +170,18 @@ class NodePeer extends Peer<{
};
}> {
_req: NodeReqProxy;
constructor(ctx: NodePeer["ctx"]) {
constructor(ctx: NodePeer["_internal"]) {
super(ctx);
this._req = new NodeReqProxy(ctx.node.req);
ctx.node.ws._peer = this;
}

get addr() {
const socket = this.ctx.node.req.socket;
const socket = this._internal.node.req.socket;
if (!socket) {
return undefined;
}
const headers = this.ctx.node.req.headers;
const headers = this._internal.node.req.headers;
let addr = headers["x-forwarded-for"] || socket.remoteAddress || "??";
if (addr.includes(":")) {
addr = `[${addr}]`;
Expand All @@ -199,13 +199,13 @@ class NodePeer extends Peer<{
}

get readyState() {
return this.ctx.node.ws.readyState;
return this._internal.node.ws.readyState;
}

send(message: any, options?: { compress?: boolean }) {
const data = toBufferLike(message);
const isBinary = typeof data !== "string";
this.ctx.node.ws.send(data, {
this._internal.node.ws.send(data, {
compress: options?.compress,
binary: isBinary,
...options,
Expand All @@ -221,19 +221,19 @@ class NodePeer extends Peer<{
binary: isBinary,
...options,
};
for (const client of this.ctx.node.server.clients) {
for (const client of this._internal.node.server.clients) {
const peer = (client as WebSocketT & { _peer?: NodePeer })._peer;
if (peer && peer !== this && peer._topics.has(topic)) {
peer.ctx.node.ws.send(data, sendOptions);
peer._internal.node.ws.send(data, sendOptions);
}
}
}

close(code?: number, data?: string | Buffer) {
this.ctx.node.ws.close(code, data);
this._internal.node.ws.close(code, data);
}

terminate() {
this.ctx.node.ws.terminate();
this._internal.node.ws.terminate();
}
}
14 changes: 7 additions & 7 deletions src/adapters/uws.ts
Original file line number Diff line number Diff line change
Expand Up @@ -195,15 +195,15 @@ class UWSPeer extends Peer<{
_decoder = new TextDecoder();
_req: UWSReqProxy;

constructor(ctx: UWSPeer["ctx"]) {
constructor(ctx: UWSPeer["_internal"]) {
super(ctx);
this._req = new UWSReqProxy(ctx.uws.userData.req);
}

get addr() {
try {
const addr = this._decoder.decode(
this.ctx.uws.ws?.getRemoteAddressAsText(),
this._internal.uws.ws?.getRemoteAddressAsText(),
);
return addr.replace(/(0000:)+/, "");
} catch {
Expand All @@ -222,25 +222,25 @@ class UWSPeer extends Peer<{
send(message: any, options?: { compress?: boolean }) {
const data = toBufferLike(message);
const isBinary = typeof data !== "string";
return this.ctx.uws.ws.send(data, isBinary, options?.compress);
return this._internal.uws.ws.send(data, isBinary, options?.compress);
}

subscribe(topic: string): void {
this.ctx.uws.ws.subscribe(topic);
this._internal.uws.ws.subscribe(topic);
}

publish(topic: string, message: string, options?: { compress?: boolean }) {
const data = toBufferLike(message);
const isBinary = typeof data !== "string";
this.ctx.uws.ws.publish(topic, data, isBinary, options?.compress);
this._internal.uws.ws.publish(topic, data, isBinary, options?.compress);
return 0;
}

close(code?: number, reason?: RecognizedString) {
this.ctx.uws.ws.end(code, reason);
this._internal.uws.ws.end(code, reason);
}

terminate(): void {
this.ctx.uws.ws.close();
this._internal.uws.ws.close();
}
}
9 changes: 6 additions & 3 deletions src/peer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,13 +9,16 @@ const ReadyStateMap = {
} as const;

export abstract class Peer<AdapterContext = any> {
_topics: Set<string> = new Set();
protected _internal: AdapterContext;
protected _topics: Set<string>;

static _idCounter = 0;
private static _idCounter = 0;
private _id: string;

constructor(public ctx: AdapterContext) {
constructor(_internalCtx: AdapterContext) {
this._id = ++Peer._idCounter + "";
this._topics = new Set();
this._internal = _internalCtx;
}

get id(): string {
Expand Down

0 comments on commit 5e92bd7

Please sign in to comment.