From 4def5c67abf905c14d23e14c0041231613294e3a Mon Sep 17 00:00:00 2001 From: weboko Date: Thu, 16 Mar 2023 21:54:08 +0100 Subject: [PATCH 1/7] add getObservers method --- packages/core/src/lib/relay/index.ts | 35 +++++++--------------------- packages/interfaces/src/relay.ts | 20 +++++++--------- 2 files changed, 17 insertions(+), 38 deletions(-) diff --git a/packages/core/src/lib/relay/index.ts b/packages/core/src/lib/relay/index.ts index 296282133b..c5636ad377 100644 --- a/packages/core/src/lib/relay/index.ts +++ b/packages/core/src/lib/relay/index.ts @@ -6,13 +6,13 @@ import { } from "@chainsafe/libp2p-gossipsub"; import type { PeerIdStr, TopicStr } from "@chainsafe/libp2p-gossipsub/types"; import { SignaturePolicy } from "@chainsafe/libp2p-gossipsub/types"; -import { CustomEvent } from "@libp2p/interfaces/events"; import type { Callback, IDecoder, IEncoder, IMessage, IRelay, + Observers, ProtocolCreateOptions, SendResult, } from "@waku/interfaces"; @@ -28,18 +28,9 @@ import { messageValidator } from "./message_validator.js"; const log = debug("waku:relay"); -export type Observer = { - decoder: IDecoder; - callback: Callback; -}; - export type RelayCreateOptions = ProtocolCreateOptions & GossipsubOpts; export type ContentTopic = string; -type BasicEventPayload = { - contentTopic: string; -}; - /** * Implements the [Waku v2 Relay protocol](https://rfc.vac.dev/spec/11/). * Must be passed as a `pubsub` module to a `Libp2p` instance. @@ -55,7 +46,7 @@ class Relay extends GossipSub implements IRelay { * observers called when receiving new message. * Observers under key `""` are always called. */ - private observers: Map>; + private observers: Map>>; constructor( components: GossipSubComponents, @@ -120,26 +111,10 @@ class Relay extends GossipSub implements IRelay { pushOrInitMapSet(this.observers, contentTopic, observer); - this.dispatchEvent( - new CustomEvent("observer:added", { - detail: { - contentTopic, - }, - }) - ); - return () => { const observers = this.observers.get(contentTopic); if (observers) { observers.delete(observer); - - this.dispatchEvent( - new CustomEvent("observer:removed", { - detail: { - contentTopic, - }, - }) - ); } }; } @@ -208,6 +183,12 @@ class Relay extends GossipSub implements IRelay { getMeshPeers(topic?: TopicStr): PeerIdStr[] { return super.getMeshPeers(topic ?? this.pubSubTopic); } + + public getObservers( + contentTopic: string + ): Set> { + return this.observers.get(contentTopic) as Set>; + } } Relay.multicodec = constants.RelayCodecs[constants.RelayCodecs.length - 1]; diff --git a/packages/interfaces/src/relay.ts b/packages/interfaces/src/relay.ts index d90dfb3a1f..8dafde897a 100644 --- a/packages/interfaces/src/relay.ts +++ b/packages/interfaces/src/relay.ts @@ -1,5 +1,4 @@ -import type { GossipSub, GossipsubEvents } from "@chainsafe/libp2p-gossipsub"; -import type { EventEmitter } from "@libp2p/interfaces/events"; +import type { GossipSub } from "@chainsafe/libp2p-gossipsub"; import type { IDecodedMessage, @@ -9,20 +8,19 @@ import type { } from "./message.js"; import type { Callback, SendResult } from "./protocols.js"; -export interface RelayEvents { - "observer:added": CustomEvent; - "observer:removed": CustomEvent; -} - -type IRelayEmitter = EventEmitter; +export type Observer = { + decoder: IDecoder; + callback: Callback; +}; -interface IRelayAPI extends GossipSub { +export interface IRelay extends GossipSub { send: (encoder: IEncoder, message: IMessage) => Promise; addObserver: ( decoder: IDecoder, callback: Callback ) => () => void; getMeshPeers: () => string[]; + getObservers: ( + contentTopic: string + ) => Set>; } - -export type IRelay = IRelayAPI & IRelayEmitter; From 59e9dc7b00fab9726a71ea85d761e711293e9ae8 Mon Sep 17 00:00:00 2001 From: weboko Date: Fri, 17 Mar 2023 20:42:19 +0100 Subject: [PATCH 2/7] fix types --- packages/core/src/lib/relay/index.ts | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/packages/core/src/lib/relay/index.ts b/packages/core/src/lib/relay/index.ts index c5636ad377..d5797d74bc 100644 --- a/packages/core/src/lib/relay/index.ts +++ b/packages/core/src/lib/relay/index.ts @@ -8,15 +8,15 @@ import type { PeerIdStr, TopicStr } from "@chainsafe/libp2p-gossipsub/types"; import { SignaturePolicy } from "@chainsafe/libp2p-gossipsub/types"; import type { Callback, + IDecodedMessage, IDecoder, IEncoder, IMessage, IRelay, - Observers, + Observer, ProtocolCreateOptions, SendResult, } from "@waku/interfaces"; -import { IDecodedMessage } from "@waku/interfaces"; import debug from "debug"; import { DefaultPubSubTopic } from "../constants.js"; From 2fa7d1c30da2eaae82321932fa7d3d64a6f65e86 Mon Sep 17 00:00:00 2001 From: weboko Date: Fri, 17 Mar 2023 21:11:59 +0100 Subject: [PATCH 3/7] fix type --- packages/core/src/lib/relay/index.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/packages/core/src/lib/relay/index.ts b/packages/core/src/lib/relay/index.ts index d5797d74bc..46c0e47f41 100644 --- a/packages/core/src/lib/relay/index.ts +++ b/packages/core/src/lib/relay/index.ts @@ -46,7 +46,7 @@ class Relay extends GossipSub implements IRelay { * observers called when receiving new message. * Observers under key `""` are always called. */ - private observers: Map>>; + private observers: Map>; constructor( components: GossipSubComponents, From 22db10f7786ef1738ed02035832bb47740bd3308 Mon Sep 17 00:00:00 2001 From: weboko Date: Tue, 21 Mar 2023 02:02:28 +0100 Subject: [PATCH 4/7] add getActiveSubscriptions --- packages/core/src/lib/relay/index.ts | 13 ++++++++++++- packages/interfaces/src/relay.ts | 12 +++++------- 2 files changed, 17 insertions(+), 8 deletions(-) diff --git a/packages/core/src/lib/relay/index.ts b/packages/core/src/lib/relay/index.ts index 46c0e47f41..ba745119fd 100644 --- a/packages/core/src/lib/relay/index.ts +++ b/packages/core/src/lib/relay/index.ts @@ -7,13 +7,13 @@ import { import type { PeerIdStr, TopicStr } from "@chainsafe/libp2p-gossipsub/types"; import { SignaturePolicy } from "@chainsafe/libp2p-gossipsub/types"; import type { + ActiveSubscriptions, Callback, IDecodedMessage, IDecoder, IEncoder, IMessage, IRelay, - Observer, ProtocolCreateOptions, SendResult, } from "@waku/interfaces"; @@ -28,6 +28,11 @@ import { messageValidator } from "./message_validator.js"; const log = debug("waku:relay"); +export type Observer = { + decoder: IDecoder; + callback: Callback; +}; + export type RelayCreateOptions = ProtocolCreateOptions & GossipsubOpts; export type ContentTopic = string; @@ -119,6 +124,12 @@ class Relay extends GossipSub implements IRelay { }; } + public getActiveSubscriptions(): ActiveSubscriptions | undefined { + const map = new Map(); + map.set(this.pubSubTopic, this.observers.keys()); + return map; + } + private async processIncomingMessage( pubSubTopic: string, bytes: Uint8Array diff --git a/packages/interfaces/src/relay.ts b/packages/interfaces/src/relay.ts index 8dafde897a..df61ff795c 100644 --- a/packages/interfaces/src/relay.ts +++ b/packages/interfaces/src/relay.ts @@ -8,10 +8,10 @@ import type { } from "./message.js"; import type { Callback, SendResult } from "./protocols.js"; -export type Observer = { - decoder: IDecoder; - callback: Callback; -}; +type PubSubTopic = string; +type ContentTopic = string; + +export type ActiveSubscriptions = Map; export interface IRelay extends GossipSub { send: (encoder: IEncoder, message: IMessage) => Promise; @@ -20,7 +20,5 @@ export interface IRelay extends GossipSub { callback: Callback ) => () => void; getMeshPeers: () => string[]; - getObservers: ( - contentTopic: string - ) => Set>; + getActiveSubscriptions: () => ActiveSubscriptions | undefined; } From 88813934fec3b8b9aa7b0cbd489e2a6537e543bd Mon Sep 17 00:00:00 2001 From: weboko Date: Tue, 21 Mar 2023 02:03:28 +0100 Subject: [PATCH 5/7] remove getObservers --- packages/core/src/lib/relay/index.ts | 6 ------ 1 file changed, 6 deletions(-) diff --git a/packages/core/src/lib/relay/index.ts b/packages/core/src/lib/relay/index.ts index ba745119fd..3169dd0067 100644 --- a/packages/core/src/lib/relay/index.ts +++ b/packages/core/src/lib/relay/index.ts @@ -194,12 +194,6 @@ class Relay extends GossipSub implements IRelay { getMeshPeers(topic?: TopicStr): PeerIdStr[] { return super.getMeshPeers(topic ?? this.pubSubTopic); } - - public getObservers( - contentTopic: string - ): Set> { - return this.observers.get(contentTopic) as Set>; - } } Relay.multicodec = constants.RelayCodecs[constants.RelayCodecs.length - 1]; From b62894d7b70cef585c3d304c077af8ae5f25c275 Mon Sep 17 00:00:00 2001 From: weboko Date: Tue, 21 Mar 2023 02:12:39 +0100 Subject: [PATCH 6/7] address comment --- packages/core/src/lib/relay/index.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/packages/core/src/lib/relay/index.ts b/packages/core/src/lib/relay/index.ts index 3169dd0067..40365e4f93 100644 --- a/packages/core/src/lib/relay/index.ts +++ b/packages/core/src/lib/relay/index.ts @@ -124,7 +124,7 @@ class Relay extends GossipSub implements IRelay { }; } - public getActiveSubscriptions(): ActiveSubscriptions | undefined { + public getActiveSubscriptions(): ActiveSubscriptions { const map = new Map(); map.set(this.pubSubTopic, this.observers.keys()); return map; From cad198800588b5f8bfad910b274c828a97dd9cfa Mon Sep 17 00:00:00 2001 From: weboko Date: Tue, 21 Mar 2023 02:21:04 +0100 Subject: [PATCH 7/7] remove export --- packages/interfaces/src/relay.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/packages/interfaces/src/relay.ts b/packages/interfaces/src/relay.ts index 3d05d7115c..c602322cd1 100644 --- a/packages/interfaces/src/relay.ts +++ b/packages/interfaces/src/relay.ts @@ -9,7 +9,7 @@ type ContentTopic = string; export type ActiveSubscriptions = Map; -export interface IRelayAPI { +interface IRelayAPI { addObserver: ( decoder: IDecoder, callback: Callback