From 90e5b776c8628df43125d73277605ce5ef8e3e02 Mon Sep 17 00:00:00 2001 From: danisharora099 Date: Wed, 20 Sep 2023 18:40:14 +0530 Subject: [PATCH 01/29] `ProtocolCreateOptions` now has `pubSubTopic` as `pubSubTopic[]` --- packages/interfaces/src/protocols.ts | 17 +++++++++++------ 1 file changed, 11 insertions(+), 6 deletions(-) diff --git a/packages/interfaces/src/protocols.ts b/packages/interfaces/src/protocols.ts index c354b1b740..126ec6255f 100644 --- a/packages/interfaces/src/protocols.ts +++ b/packages/interfaces/src/protocols.ts @@ -22,18 +22,22 @@ export interface IBaseProtocol { export type ProtocolCreateOptions = { /** - * The PubSub Topic to use. Defaults to {@link @waku/core.DefaultPubSubTopic }. + * Waku supports usage of multiple pubsub topics, but this is still in early stages. + * Waku implements sharding to achieve scalability + * The format of the sharded topic is `/waku/2/rs//` + * To learn more about the sharding specifications implemented, see [Relay Sharding](https://rfc.vac.dev/spec/51/). * - * One and only one pubsub topic is used by Waku. This is used by: + * If no pubsub topic is specified, the default pubsub topic is used. + * The set of pubsub topics that are used to initialise the Waku node, will need to be used by the protocols as well + * You cannot currently add or remove pubsub topics after initialisation. + * This is used by: * - WakuRelay to receive, route and send messages, * - WakuLightPush to send messages, * - WakuStore to retrieve messages. - * - * The usage of the default pubsub topic is recommended. * See [Waku v2 Topic Usage Recommendations](https://rfc.vac.dev/spec/23/) for details. * */ - pubSubTopic?: string; + pubSubTopics?: string[]; /** * You can pass options to the `Libp2p` instance used by {@link @waku/core.WakuNode} using the `libp2p` property. * This property is the same type as the one passed to [`Libp2p.create`](https://github.com/libp2p/js-libp2p/blob/master/doc/API.md#create) @@ -63,7 +67,8 @@ export enum SendError { ENCODE_FAILED = "Failed to encode", DECODE_FAILED = "Failed to decode", SIZE_TOO_BIG = "Size is too big", - NO_RPC_RESPONSE = "No RPC response" + NO_RPC_RESPONSE = "No RPC response", + TOPIC_NOT_SUBSCRIBED = "Topic not subscribed" } export interface SendResult { From 72174978d0af6985f228719a04e9e20162cbf366 Mon Sep 17 00:00:00 2001 From: danisharora099 Date: Wed, 20 Sep 2023 18:45:47 +0530 Subject: [PATCH 02/29] chore: update encoder & decoder to support `PubSubTopic` --- packages/core/src/lib/message/version_0.ts | 21 +++++++++++++----- packages/interfaces/src/message.ts | 7 +++++- packages/message-encryption/src/ecies.ts | 14 ++++++++---- packages/message-encryption/src/symmetric.ts | 23 +++++++++++++++----- packages/relay/src/topic_only_message.ts | 2 ++ 5 files changed, 52 insertions(+), 15 deletions(-) diff --git a/packages/core/src/lib/message/version_0.ts b/packages/core/src/lib/message/version_0.ts index 1e57e9ec13..0e289490ae 100644 --- a/packages/core/src/lib/message/version_0.ts +++ b/packages/core/src/lib/message/version_0.ts @@ -6,11 +6,14 @@ import type { IMessage, IMetaSetter, IProtoMessage, - IRateLimitProof + IRateLimitProof, + PubSubTopic } from "@waku/interfaces"; import { proto_message as proto } from "@waku/proto"; import debug from "debug"; +import { DefaultPubSubTopic } from "../constants.js"; + const log = debug("waku:message:version-0"); const OneMillion = BigInt(1_000_000); @@ -71,6 +74,7 @@ export class DecodedMessage implements IDecodedMessage { export class Encoder implements IEncoder { constructor( + public pubSubTopic: PubSubTopic, public contentTopic: string, public ephemeral: boolean = false, public metaSetter?: IMetaSetter @@ -116,15 +120,19 @@ export class Encoder implements IEncoder { * messages. */ export function createEncoder({ + pubSubTopic = DefaultPubSubTopic, contentTopic, ephemeral, metaSetter }: EncoderOptions): Encoder { - return new Encoder(contentTopic, ephemeral, metaSetter); + return new Encoder(pubSubTopic, contentTopic, ephemeral, metaSetter); } export class Decoder implements IDecoder { - constructor(public contentTopic: string) { + constructor( + public pubSubTopic: PubSubTopic, + public contentTopic: string + ) { if (!contentTopic || contentTopic === "") { throw new Error("Content topic must be specified"); } @@ -175,6 +183,9 @@ export class Decoder implements IDecoder { * * @param contentTopic The resulting decoder will only decode messages with this content topic. */ -export function createDecoder(contentTopic: string): Decoder { - return new Decoder(contentTopic); +export function createDecoder( + contentTopic: string, + pubsubTopic: PubSubTopic = DefaultPubSubTopic +): Decoder { + return new Decoder(pubsubTopic, contentTopic); } diff --git a/packages/interfaces/src/message.ts b/packages/interfaces/src/message.ts index eb71a867a1..c9383ffb2b 100644 --- a/packages/interfaces/src/message.ts +++ b/packages/interfaces/src/message.ts @@ -1,3 +1,5 @@ +import type { PubSubTopic } from "./misc.js"; + export interface IRateLimitProof { proof: Uint8Array; merkleRoot: Uint8Array; @@ -36,6 +38,7 @@ export interface IMetaSetter { } export interface EncoderOptions { + pubSubTopic?: PubSubTopic; /** The content topic to set on outgoing messages. */ contentTopic: string; /** @@ -52,6 +55,7 @@ export interface EncoderOptions { } export interface IEncoder { + pubSubTopic: PubSubTopic; contentTopic: string; ephemeral: boolean; toWire: (message: IMessage) => Promise; @@ -61,7 +65,7 @@ export interface IEncoder { export interface IDecodedMessage { payload: Uint8Array; contentTopic: string; - pubSubTopic: string; + pubSubTopic: PubSubTopic; timestamp: Date | undefined; rateLimitProof: IRateLimitProof | undefined; ephemeral: boolean | undefined; @@ -69,6 +73,7 @@ export interface IDecodedMessage { } export interface IDecoder { + pubSubTopic: PubSubTopic; contentTopic: string; fromWireToProtoObj: (bytes: Uint8Array) => Promise; fromProtoObj: ( diff --git a/packages/message-encryption/src/ecies.ts b/packages/message-encryption/src/ecies.ts index 802f201de3..6225da2664 100644 --- a/packages/message-encryption/src/ecies.ts +++ b/packages/message-encryption/src/ecies.ts @@ -1,5 +1,6 @@ +import { DefaultPubSubTopic } from "@waku/core"; import { Decoder as DecoderV0 } from "@waku/core/lib/message/version_0"; -import { IMetaSetter } from "@waku/interfaces"; +import { IMetaSetter, PubSubTopic } from "@waku/interfaces"; import type { EncoderOptions as BaseEncoderOptions, IDecoder, @@ -32,6 +33,7 @@ const log = debug("waku:message-encryption:ecies"); class Encoder implements IEncoder { constructor( + public pubSubTopic: PubSubTopic, public contentTopic: string, private publicKey: Uint8Array, private sigPrivKey?: Uint8Array, @@ -95,6 +97,7 @@ export interface EncoderOptions extends BaseEncoderOptions { * in [26/WAKU2-PAYLOAD](https://rfc.vac.dev/spec/26/). */ export function createEncoder({ + pubSubTopic = DefaultPubSubTopic, contentTopic, publicKey, sigPrivKey, @@ -102,6 +105,7 @@ export function createEncoder({ metaSetter }: EncoderOptions): Encoder { return new Encoder( + pubSubTopic, contentTopic, publicKey, sigPrivKey, @@ -112,10 +116,11 @@ export function createEncoder({ class Decoder extends DecoderV0 implements IDecoder { constructor( + pubSubTopic: PubSubTopic, contentTopic: string, private privateKey: Uint8Array ) { - super(contentTopic); + super(pubSubTopic, contentTopic); } async fromProtoObj( @@ -184,7 +189,8 @@ class Decoder extends DecoderV0 implements IDecoder { */ export function createDecoder( contentTopic: string, - privateKey: Uint8Array + privateKey: Uint8Array, + pubSubTopic: PubSubTopic = DefaultPubSubTopic ): Decoder { - return new Decoder(contentTopic, privateKey); + return new Decoder(pubSubTopic, contentTopic, privateKey); } diff --git a/packages/message-encryption/src/symmetric.ts b/packages/message-encryption/src/symmetric.ts index 4eb165fcf7..feffb52689 100644 --- a/packages/message-encryption/src/symmetric.ts +++ b/packages/message-encryption/src/symmetric.ts @@ -1,3 +1,4 @@ +import { DefaultPubSubTopic } from "@waku/core"; import { Decoder as DecoderV0 } from "@waku/core/lib/message/version_0"; import type { EncoderOptions as BaseEncoderOptions, @@ -5,7 +6,8 @@ import type { IEncoder, IMessage, IMetaSetter, - IProtoMessage + IProtoMessage, + PubSubTopic } from "@waku/interfaces"; import { WakuMessage } from "@waku/proto"; import debug from "debug"; @@ -27,6 +29,7 @@ const log = debug("waku:message-encryption:symmetric"); class Encoder implements IEncoder { constructor( + public pubSubTopic: PubSubTopic, public contentTopic: string, private symKey: Uint8Array, private sigPrivKey?: Uint8Array, @@ -91,21 +94,30 @@ export interface EncoderOptions extends BaseEncoderOptions { * in [26/WAKU2-PAYLOAD](https://rfc.vac.dev/spec/26/). */ export function createEncoder({ + pubSubTopic = DefaultPubSubTopic, contentTopic, symKey, sigPrivKey, ephemeral = false, metaSetter }: EncoderOptions): Encoder { - return new Encoder(contentTopic, symKey, sigPrivKey, ephemeral, metaSetter); + return new Encoder( + pubSubTopic, + contentTopic, + symKey, + sigPrivKey, + ephemeral, + metaSetter + ); } class Decoder extends DecoderV0 implements IDecoder { constructor( + pubSubTopic: PubSubTopic, contentTopic: string, private symKey: Uint8Array ) { - super(contentTopic); + super(pubSubTopic, contentTopic); } async fromProtoObj( @@ -174,7 +186,8 @@ class Decoder extends DecoderV0 implements IDecoder { */ export function createDecoder( contentTopic: string, - symKey: Uint8Array + symKey: Uint8Array, + pubSubTopic: PubSubTopic = DefaultPubSubTopic ): Decoder { - return new Decoder(contentTopic, symKey); + return new Decoder(pubSubTopic, contentTopic, symKey); } diff --git a/packages/relay/src/topic_only_message.ts b/packages/relay/src/topic_only_message.ts index 0a81f7a6cd..845280b860 100644 --- a/packages/relay/src/topic_only_message.ts +++ b/packages/relay/src/topic_only_message.ts @@ -1,3 +1,4 @@ +import { DefaultPubSubTopic } from "@waku/core"; import type { IDecodedMessage, IDecoder, @@ -26,6 +27,7 @@ export class TopicOnlyMessage implements IDecodedMessage { } export class TopicOnlyDecoder implements IDecoder { + pubSubTopic = DefaultPubSubTopic; public contentTopic = ""; fromWireToProtoObj(bytes: Uint8Array): Promise { From c232e9f6e8d8875734e0282fcdcc740fc95607f3 Mon Sep 17 00:00:00 2001 From: danisharora099 Date: Wed, 20 Sep 2023 18:46:25 +0530 Subject: [PATCH 03/29] feat(protocols): allow multiple `PubSubTopic[]` --- packages/core/src/lib/base_protocol.ts | 17 ++++- packages/core/src/lib/filter/index.ts | 20 +++--- packages/core/src/lib/light_push/index.ts | 10 ++- packages/core/src/lib/store/index.ts | 83 ++++++++++++----------- 4 files changed, 78 insertions(+), 52 deletions(-) diff --git a/packages/core/src/lib/base_protocol.ts b/packages/core/src/lib/base_protocol.ts index 1546fda480..e0dad78392 100644 --- a/packages/core/src/lib/base_protocol.ts +++ b/packages/core/src/lib/base_protocol.ts @@ -2,7 +2,11 @@ import type { Libp2p } from "@libp2p/interface"; import type { Stream } from "@libp2p/interface/connection"; import type { PeerId } from "@libp2p/interface/peer-id"; import { Peer, PeerStore } from "@libp2p/interface/peer-store"; -import type { IBaseProtocol, Libp2pComponents } from "@waku/interfaces"; +import type { + IBaseProtocol, + Libp2pComponents, + PubSubTopic +} from "@waku/interfaces"; import { getPeersForProtocol, selectPeerForProtocol } from "@waku/utils/libp2p"; import { filterPeers } from "./filterPeers.js"; @@ -89,4 +93,15 @@ export class BaseProtocol implements IBaseProtocol { // Filter the peers based on the specified criteria return filterPeers(allPeersForProtocol, numPeers, maxBootstrapPeers); } + + protected ensurePubsubTopicIsValid( + pubsubTopic: PubSubTopic, + allowedTopics: PubSubTopic[] + ): void { + if (!allowedTopics.includes(pubsubTopic)) { + throw new Error( + `Pubsub topic ${pubsubTopic} is not supported by this protocol. Allowed topics are: ${allowedTopics}` + ); + } + } } diff --git a/packages/core/src/lib/filter/index.ts b/packages/core/src/lib/filter/index.ts index bd102e3787..d8db596e22 100644 --- a/packages/core/src/lib/filter/index.ts +++ b/packages/core/src/lib/filter/index.ts @@ -230,7 +230,7 @@ class Subscription { } class Filter extends BaseProtocol implements IReceiver { - private readonly options: ProtocolCreateOptions; + private readonly pubsubTopics: PubSubTopic[] = []; private activeSubscriptions = new Map(); private readonly NUM_PEERS_PROTOCOL = 1; @@ -253,19 +253,21 @@ class Filter extends BaseProtocol implements IReceiver { constructor(libp2p: Libp2p, options?: ProtocolCreateOptions) { super(FilterCodecs.SUBSCRIBE, libp2p.components); + this.pubsubTopics = options?.pubSubTopics || [DefaultPubSubTopic]; + libp2p.handle(FilterCodecs.PUSH, this.onRequest.bind(this)).catch((e) => { log("Failed to register ", FilterCodecs.PUSH, e); }); this.activeSubscriptions = new Map(); - - this.options = options ?? {}; } - async createSubscription(pubSubTopic?: string): Promise { - const _pubSubTopic = - pubSubTopic ?? this.options.pubSubTopic ?? DefaultPubSubTopic; + async createSubscription( + pubSubTopic: string = DefaultPubSubTopic + ): Promise { + this.ensurePubsubTopicIsValid(pubSubTopic, this.pubsubTopics); + //TODO: get a relevant peer for the topic/shard const peer = ( await this.getPeers({ maxBootstrapPeers: 1, @@ -274,11 +276,11 @@ class Filter extends BaseProtocol implements IReceiver { )[0]; const subscription = - this.getActiveSubscription(_pubSubTopic, peer.id.toString()) ?? + this.getActiveSubscription(pubSubTopic, peer.id.toString()) ?? this.setActiveSubscription( - _pubSubTopic, + pubSubTopic, peer.id.toString(), - new Subscription(_pubSubTopic, peer, this.getStream.bind(this, peer)) + new Subscription(pubSubTopic, peer, this.getStream.bind(this, peer)) ); return subscription; diff --git a/packages/core/src/lib/light_push/index.ts b/packages/core/src/lib/light_push/index.ts index d9379c1690..301853cdd9 100644 --- a/packages/core/src/lib/light_push/index.ts +++ b/packages/core/src/lib/light_push/index.ts @@ -5,6 +5,7 @@ import { IMessage, Libp2p, ProtocolCreateOptions, + PubSubTopic, SendError, SendResult } from "@waku/interfaces"; @@ -40,12 +41,12 @@ type PreparePushMessageResult = * Implements the [Waku v2 Light Push protocol](https://rfc.vac.dev/spec/19/). */ class LightPush extends BaseProtocol implements ILightPush { - options: ProtocolCreateOptions; + private readonly pubsubTopics: PubSubTopic[]; private readonly NUM_PEERS_PROTOCOL = 1; constructor(libp2p: Libp2p, options?: ProtocolCreateOptions) { super(LightPushCodec, libp2p.components); - this.options = options || {}; + this.pubsubTopics = options?.pubSubTopics ?? [DefaultPubSubTopic]; } private async preparePushMessage( @@ -81,7 +82,9 @@ class LightPush extends BaseProtocol implements ILightPush { } async send(encoder: IEncoder, message: IMessage): Promise { - const { pubSubTopic = DefaultPubSubTopic } = this.options; + const { pubSubTopic } = encoder; + this.ensurePubsubTopicIsValid(pubSubTopic, this.pubsubTopics); + const recipients: PeerId[] = []; const { query, error: preparationError } = await this.preparePushMessage( @@ -97,6 +100,7 @@ class LightPush extends BaseProtocol implements ILightPush { }; } + //TODO: get a relevant peer for the topic/shard const peers = await this.getPeers({ maxBootstrapPeers: 1, numPeers: this.NUM_PEERS_PROTOCOL diff --git a/packages/core/src/lib/store/index.ts b/packages/core/src/lib/store/index.ts index 67bd856e5d..63a0e127f2 100644 --- a/packages/core/src/lib/store/index.ts +++ b/packages/core/src/lib/store/index.ts @@ -6,7 +6,8 @@ import { IDecoder, IStore, Libp2p, - ProtocolCreateOptions + ProtocolCreateOptions, + PubSubTopic } from "@waku/interfaces"; import { proto_store as proto } from "@waku/proto"; import { isDefined } from "@waku/utils"; @@ -74,12 +75,12 @@ export interface QueryOptions { * The Waku Store protocol can be used to retrieved historical messages. */ class Store extends BaseProtocol implements IStore { - options: ProtocolCreateOptions; + private readonly pubSubTopics: PubSubTopic[]; private readonly NUM_PEERS_PROTOCOL = 1; constructor(libp2p: Libp2p, options?: ProtocolCreateOptions) { super(StoreCodec, libp2p.components); - this.options = options ?? {}; + this.pubSubTopics = options?.pubSubTopics ?? [DefaultPubSubTopic]; } /** @@ -211,8 +212,6 @@ class Store extends BaseProtocol implements IStore { decoders: IDecoder[], options?: QueryOptions ): AsyncGenerator[]> { - const { pubSubTopic = DefaultPubSubTopic } = this.options; - let startTime, endTime; if (options?.timeFilter) { @@ -220,44 +219,50 @@ class Store extends BaseProtocol implements IStore { endTime = options.timeFilter.endTime; } - const decodersAsMap = new Map(); - decoders.forEach((dec) => { - if (decodersAsMap.has(dec.contentTopic)) { - throw new Error( - "API does not support different decoder per content topic" - ); - } - decodersAsMap.set(dec.contentTopic, dec); - }); + const _pubSubTopics = decoders.map((decoder) => decoder.pubSubTopic); - const contentTopics = decoders.map((dec) => dec.contentTopic); + for (const topic of _pubSubTopics) { + this.ensurePubsubTopicIsValid(topic, this.pubSubTopics); - const queryOpts = Object.assign( - { - pubSubTopic: pubSubTopic, - pageDirection: PageDirection.BACKWARD, - pageSize: DefaultPageSize - }, - options, - { contentTopics, startTime, endTime } - ); + const decodersAsMap = new Map(); + decoders.forEach((dec) => { + if (decodersAsMap.has(dec.contentTopic)) { + throw new Error( + "API does not support different decoder per content topic" + ); + } + decodersAsMap.set(dec.contentTopic, dec); + }); - log("Querying history with the following options", options); + const contentTopics = decoders.map((dec) => dec.contentTopic); - const peer = ( - await this.getPeers({ - numPeers: this.NUM_PEERS_PROTOCOL, - maxBootstrapPeers: 1 - }) - )[0]; - - for await (const messages of paginate( - this.getStream.bind(this, peer), - queryOpts, - decodersAsMap, - options?.cursor - )) { - yield messages; + const queryOpts = Object.assign( + { + pubSubTopic: topic, + pageDirection: PageDirection.BACKWARD, + pageSize: DefaultPageSize + }, + options, + { contentTopics, startTime, endTime } + ); + + log("Querying history with the following options", options); + + const peer = ( + await this.getPeers({ + numPeers: this.NUM_PEERS_PROTOCOL, + maxBootstrapPeers: 1 + }) + )[0]; + + for await (const messages of paginate( + this.getStream.bind(this, peer), + queryOpts, + decodersAsMap, + options?.cursor + )) { + yield messages; + } } } } From 0e1e1a1d45cb754b6538abdd3a5cff70b4776998 Mon Sep 17 00:00:00 2001 From: danisharora099 Date: Wed, 20 Sep 2023 18:47:19 +0530 Subject: [PATCH 04/29] feat(relay): allow multiple `PubSubTopic[]` --- packages/core/src/lib/keep_alive_manager.ts | 56 +++-- packages/core/src/lib/wait_for_remote_peer.ts | 18 +- packages/interfaces/src/relay.ts | 2 +- packages/relay/src/index.ts | 205 +++++++++++++----- 4 files changed, 202 insertions(+), 79 deletions(-) diff --git a/packages/core/src/lib/keep_alive_manager.ts b/packages/core/src/lib/keep_alive_manager.ts index 6227c08257..4878bfd029 100644 --- a/packages/core/src/lib/keep_alive_manager.ts +++ b/packages/core/src/lib/keep_alive_manager.ts @@ -1,6 +1,6 @@ import type { PeerId } from "@libp2p/interface/peer-id"; import type { PeerStore } from "@libp2p/interface/peer-store"; -import type { IRelay } from "@waku/interfaces"; +import type { IRelay, PeerIdStr, PubSubTopic } from "@waku/interfaces"; import type { KeepAliveOptions } from "@waku/interfaces"; import { utf8ToBytes } from "@waku/utils/bytes"; import debug from "debug"; @@ -13,7 +13,7 @@ const log = debug("waku:keep-alive"); export class KeepAliveManager { private pingKeepAliveTimers: Map>; - private relayKeepAliveTimers: Map>; + private relayKeepAliveTimers: Map[]>; private options: KeepAliveOptions; private relay?: IRelay; @@ -66,16 +66,11 @@ export class KeepAliveManager { const relay = this.relay; if (relay && relayPeriodSecs !== 0) { - const encoder = createEncoder({ - contentTopic: RelayPingContentTopic, - ephemeral: true - }); - const interval = setInterval(() => { - log("Sending Waku Relay ping message"); - relay - .send(encoder, { payload: new Uint8Array([1]) }) - .catch((e) => log("Failed to send relay ping", e)); - }, relayPeriodSecs * 1000); + const interval = this.sendRelayPings( + relay, + relayPeriodSecs, + peerId.toString() + ); this.relayKeepAliveTimers.set(peerId, interval); } } @@ -89,7 +84,7 @@ export class KeepAliveManager { } if (this.relayKeepAliveTimers.has(peerId)) { - clearInterval(this.relayKeepAliveTimers.get(peerId)); + this.relayKeepAliveTimers.get(peerId)?.map(clearInterval); this.relayKeepAliveTimers.delete(peerId); } } @@ -105,4 +100,39 @@ export class KeepAliveManager { this.pingKeepAliveTimers.clear(); this.relayKeepAliveTimers.clear(); } + + private sendRelayPings( + relay: IRelay, + relayPeriodSecs: number, + peerIdStr: PeerIdStr + ): NodeJS.Timeout[] { + const peersMap = relay.getMeshPeers(); + + // find the PubSubTopics the peer is part of + const pubSubTopics: PubSubTopic[] = []; + peersMap.forEach((peers, topic) => { + if (peers.includes(peerIdStr)) { + pubSubTopics.push(topic); + } + }); + + // send a ping message to each PubSubTopic the peer is part of + const intervals: NodeJS.Timeout[] = []; + for (const topic of pubSubTopics) { + const encoder = createEncoder({ + pubSubTopic: topic, + contentTopic: RelayPingContentTopic, + ephemeral: true + }); + const interval = setInterval(() => { + log("Sending Waku Relay ping message"); + relay + .send(encoder, { payload: new Uint8Array([1]) }) + .catch((e) => log("Failed to send relay ping", e)); + }, relayPeriodSecs * 1000); + intervals.push(interval); + } + + return intervals; + } } diff --git a/packages/core/src/lib/wait_for_remote_peer.ts b/packages/core/src/lib/wait_for_remote_peer.ts index 6776e734a2..7c829b34e2 100644 --- a/packages/core/src/lib/wait_for_remote_peer.ts +++ b/packages/core/src/lib/wait_for_remote_peer.ts @@ -96,15 +96,23 @@ async function waitForConnectedPeer(protocol: IBaseProtocol): Promise { } /** - * Wait for a peer with the given protocol to be connected and in the gossipsub - * mesh. + * Wait for at least one peer with the given protocol to be connected and in the gossipsub + * mesh for all pubSubTopics. */ async function waitForGossipSubPeerInMesh(waku: IRelay): Promise { - let peers = waku.getMeshPeers(); + let allTopicsHavePeers = false; - while (peers.length == 0) { + while (!allTopicsHavePeers) { await pEvent(waku.gossipSub, "gossipsub:heartbeat"); - peers = waku.getMeshPeers(); + const meshPeersMap = waku.getMeshPeers(); + + allTopicsHavePeers = true; // Assume all topics have peers initially + for (const peers of meshPeersMap.values()) { + if (peers.length === 0) { + allTopicsHavePeers = false; // If any topic doesn't have peers, set to false + break; + } + } } } diff --git a/packages/interfaces/src/relay.ts b/packages/interfaces/src/relay.ts index 522ab19576..3269579284 100644 --- a/packages/interfaces/src/relay.ts +++ b/packages/interfaces/src/relay.ts @@ -14,7 +14,7 @@ import type { ISender } from "./sender.js"; export interface IRelayAPI { readonly gossipSub: GossipSub; start: () => Promise; - getMeshPeers: (topic?: TopicStr) => PeerIdStr[]; + getMeshPeers: () => Map; } export type IRelay = IRelayAPI & ISender & IReceiver; diff --git a/packages/relay/src/index.ts b/packages/relay/src/index.ts index 20dc7d2f50..046148018b 100644 --- a/packages/relay/src/index.ts +++ b/packages/relay/src/index.ts @@ -21,6 +21,7 @@ import { IRelay, Libp2p, ProtocolCreateOptions, + PubSubTopic, SendError, SendResult } from "@waku/interfaces"; @@ -34,6 +35,7 @@ import { TopicOnlyDecoder } from "./topic_only_message.js"; const log = debug("waku:relay"); export type Observer = { + pubsubTopic: PubSubTopic; decoder: IDecoder; callback: Callback; }; @@ -46,7 +48,7 @@ export type ContentTopic = string; * Throws if libp2p.pubsub does not support Waku Relay */ class Relay implements IRelay { - private readonly pubSubTopic: string; + readonly pubSubTopics: Set; private defaultDecoder: IDecoder; public static multicodec: string = RelayCodecs[0]; @@ -56,7 +58,7 @@ class Relay implements IRelay { * observers called when receiving new message. * Observers under key `""` are always called. */ - private observers: Map>; + private observers: Map>>; constructor(libp2p: Libp2p, options?: Partial) { if (!this.isRelayPubSub(libp2p.services.pubsub)) { @@ -66,21 +68,22 @@ class Relay implements IRelay { } this.gossipSub = libp2p.services.pubsub as GossipSub; - this.pubSubTopic = options?.pubSubTopic ?? DefaultPubSubTopic; + this.pubSubTopics = new Set(options?.pubSubTopics ?? [DefaultPubSubTopic]); if (this.gossipSub.isStarted()) { - this.gossipSubSubscribe(this.pubSubTopic); + this.subscribeToAllTopics(); } this.observers = new Map(); + // Default PubSubTopic decoder // TODO: User might want to decide what decoder should be used (e.g. for RLN) this.defaultDecoder = new TopicOnlyDecoder(); } /** * Mounts the gossipsub protocol onto the libp2p node - * and subscribes to the default topic. + * and subscribes to all the topics. * * @override * @returns {void} @@ -91,7 +94,7 @@ class Relay implements IRelay { } await this.gossipSub.start(); - this.gossipSubSubscribe(this.pubSubTopic); + this.subscribeToAllTopics(); } /** @@ -99,6 +102,16 @@ class Relay implements IRelay { */ public async send(encoder: IEncoder, message: IMessage): Promise { const recipients: PeerId[] = []; + + const { pubSubTopic } = encoder; + if (!this.pubSubTopics.has(pubSubTopic)) { + log("Failed to send waku relay: topic not subscribed"); + return { + recipients, + errors: [SendError.TOPIC_NOT_SUBSCRIBED] + }; + } + if (!isSizeValid(message.payload)) { log("Failed to send waku relay: message is bigger that 1MB"); return { @@ -116,45 +129,72 @@ class Relay implements IRelay { }; } - return this.gossipSub.publish(this.pubSubTopic, msg); + return await this.gossipSub.publish(pubSubTopic, msg); } - /** - * Add an observer and associated Decoder to process incoming messages on a given content topic. - * - * @returns Function to delete the observer - */ public subscribe( decoders: IDecoder | IDecoder[], callback: Callback ): () => void { - const contentTopicToObservers = Array.isArray(decoders) - ? toObservers(decoders, callback) - : toObservers([decoders], callback); - - for (const contentTopic of contentTopicToObservers.keys()) { - const currObservers = this.observers.get(contentTopic) || new Set(); - const newObservers = - contentTopicToObservers.get(contentTopic) || new Set(); + const pubSubTopicToContentObservers = Array.isArray(decoders) + ? toObservers(Array.from(this.pubSubTopics), decoders, callback) + : toObservers(Array.from(this.pubSubTopics), [decoders], callback); + + for (const [ + pubSubTopic, + contentTopicToObservers + ] of pubSubTopicToContentObservers.entries()) { + const existingContentObservers = + this.observers.get(pubSubTopic) || + new Map>>(); + + for (const [ + contentTopic, + newObservers + ] of contentTopicToObservers.entries()) { + const currObservers = + existingContentObservers.get(contentTopic) || new Set>(); + existingContentObservers.set( + contentTopic, + union(currObservers, newObservers) + ); + } - this.observers.set(contentTopic, union(currObservers, newObservers)); + this.observers.set(pubSubTopic, existingContentObservers); } return () => { - for (const contentTopic of contentTopicToObservers.keys()) { - const currentObservers = this.observers.get(contentTopic) || new Set(); - const observersToRemove = - contentTopicToObservers.get(contentTopic) || new Set(); - - const nextObservers = leftMinusJoin( - currentObservers, - observersToRemove - ); + for (const [ + pubSubTopic, + contentTopicToObservers + ] of pubSubTopicToContentObservers.entries()) { + const existingContentObservers = this.observers.get(pubSubTopic); + + if (existingContentObservers) { + for (const [ + contentTopic, + observersToRemove + ] of contentTopicToObservers.entries()) { + const currentObservers = + existingContentObservers.get(contentTopic) || + new Set>(); + const nextObservers = leftMinusJoin( + currentObservers, + observersToRemove + ); + + if (nextObservers.size) { + existingContentObservers.set(contentTopic, nextObservers); + } else { + existingContentObservers.delete(contentTopic); + } + } - if (nextObservers.size) { - this.observers.set(contentTopic, nextObservers); - } else { - this.observers.delete(contentTopic); + if (existingContentObservers.size) { + this.observers.set(pubSubTopic, existingContentObservers); + } else { + this.observers.delete(pubSubTopic); + } } } }; @@ -168,12 +208,28 @@ class Relay implements IRelay { public getActiveSubscriptions(): ActiveSubscriptions { const map = new Map(); - map.set(this.pubSubTopic, this.observers.keys()); + for (const pubSubTopic of this.pubSubTopics) { + map.set(pubSubTopic, Array.from(this.observers.keys())); + } + return map; + } + + /** + * Returns mesh peers for all topics. + * @returns Map of topic to peer ids + */ + public getMeshPeers(): Map { + const map = new Map(); + for (const topic of this.pubSubTopics) { + map.set(topic, this.gossipSub.mesh.get(topic)); + } return map; } - public getMeshPeers(topic?: TopicStr): PeerIdStr[] { - return this.gossipSub.getMeshPeers(topic ?? this.pubSubTopic); + private subscribeToAllTopics(): void { + for (const pubSubTopic of this.pubSubTopics) { + this.gossipSubSubscribe(pubSubTopic); + } } private async processIncomingMessage( @@ -186,12 +242,20 @@ class Relay implements IRelay { return; } - const observers = this.observers.get(topicOnlyMsg.contentTopic) as Set< + // Retrieve the map of content topics for the given pubSubTopic + const contentTopicMap = this.observers.get(pubSubTopic); + if (!contentTopicMap) { + return; + } + + // Retrieve the set of observers for the given contentTopic + const observers = contentTopicMap.get(topicOnlyMsg.contentTopic) as Set< Observer >; if (!observers) { return; } + await Promise.all( Array.from(observers).map(({ decoder, callback }) => { return (async () => { @@ -269,30 +333,51 @@ export function wakuGossipSub( } function toObservers( + allPubSubTopics: PubSubTopic[], decoders: IDecoder[], callback: Callback -): Map>> { - const contentTopicToDecoders = Array.from( - groupByContentTopic(decoders).entries() - ); - - const contentTopicToObserversEntries = contentTopicToDecoders.map( - ([contentTopic, decoders]) => - [ - contentTopic, - new Set( - decoders.map( - (decoder) => - ({ - decoder, - callback - }) as Observer - ) - ) - ] as [ContentTopic, Set>] - ); - - return new Map(contentTopicToObserversEntries); +): Map>>> { + for (const decoder of decoders) { + if (!allPubSubTopics.includes(decoder.pubSubTopic)) { + throw new Error( + `Pubsub topic ${decoder.pubSubTopic} is not supported by this protocol. Allowed topics are: ${allPubSubTopics}` + ); + } + } + + const finalMap = new Map>>>(); + + for (const pubSubTopic of allPubSubTopics) { + // Filter decoders that match the current pubSubTopic + const filteredDecoders = decoders.filter( + (decoder) => decoder.pubSubTopic === pubSubTopic + ); + + // Group these decoders by ContentTopic + const contentTopicToDecoders = Array.from( + groupByContentTopic(filteredDecoders).entries() + ); + + const contentTopicToObserversMap = new Map< + ContentTopic, + Set> + >(); + + for (const [contentTopic, topicDecoders] of contentTopicToDecoders) { + const observersSet = new Set>( + topicDecoders.map((decoder) => ({ + decoder, + callback + })) as Observer[] + ); + contentTopicToObserversMap.set(contentTopic, observersSet); + } + + // Set the result in the final map + finalMap.set(pubSubTopic, contentTopicToObserversMap); + } + + return finalMap; } function union(left: Set, right: Set): Set { From 74c88000767dbd6c36a4a46402b548459e46cb8b Mon Sep 17 00:00:00 2001 From: danisharora099 Date: Wed, 20 Sep 2023 18:47:36 +0530 Subject: [PATCH 05/29] chore(tests): update for new API --- .../light-push/custom_pubsub.node.spec.ts | 15 +++++---- packages/tests/tests/light-push/utils.ts | 2 +- packages/tests/tests/relay.node.spec.ts | 33 +++++++++++-------- packages/tests/tests/store.node.spec.ts | 11 +++++-- .../tests/wait_for_remote_peer.node.spec.ts | 8 ++--- 5 files changed, 41 insertions(+), 28 deletions(-) diff --git a/packages/tests/tests/light-push/custom_pubsub.node.spec.ts b/packages/tests/tests/light-push/custom_pubsub.node.spec.ts index fa36bad14d..95499e37c7 100644 --- a/packages/tests/tests/light-push/custom_pubsub.node.spec.ts +++ b/packages/tests/tests/light-push/custom_pubsub.node.spec.ts @@ -1,15 +1,11 @@ +import { createEncoder } from "@waku/core"; import { LightNode } from "@waku/interfaces"; import { utf8ToBytes } from "@waku/utils/bytes"; import { expect } from "chai"; import { MessageCollector, NimGoNode, tearDownNodes } from "../../src/index.js"; -import { - messageText, - runNodes, - TestContentTopic, - TestEncoder -} from "./utils.js"; +import { messageText, runNodes, TestContentTopic } from "./utils.js"; describe("Waku Light Push [node only] - custom pubsub topic", function () { this.timeout(15000); @@ -34,7 +30,12 @@ describe("Waku Light Push [node only] - custom pubsub topic", function () { it("Push message", async function () { const nimPeerId = await nwaku.getPeerId(); - const pushResponse = await waku.lightPush.send(TestEncoder, { + const testEncoder = createEncoder({ + contentTopic: TestContentTopic, + pubSubTopic: customPubSubTopic + }); + + const pushResponse = await waku.lightPush.send(testEncoder, { payload: utf8ToBytes(messageText) }); diff --git a/packages/tests/tests/light-push/utils.ts b/packages/tests/tests/light-push/utils.ts index cbe4cc32cb..9e2d4b03b7 100644 --- a/packages/tests/tests/light-push/utils.ts +++ b/packages/tests/tests/light-push/utils.ts @@ -30,7 +30,7 @@ export async function runNodes( let waku: LightNode | undefined; try { waku = await createLightNode({ - pubSubTopic, + pubSubTopics: pubSubTopic ? [pubSubTopic] : undefined, staticNoiseKey: NOISE_KEY_1 }); await waku.start(); diff --git a/packages/tests/tests/relay.node.spec.ts b/packages/tests/tests/relay.node.spec.ts index 9c046f9de6..1e4aa26ab9 100644 --- a/packages/tests/tests/relay.node.spec.ts +++ b/packages/tests/tests/relay.node.spec.ts @@ -259,6 +259,15 @@ describe("Waku Relay [node only]", () => { let waku1: RelayNode; let waku2: RelayNode; let waku3: RelayNode; + + const pubSubTopic = "/some/pubsub/topic"; + + const CustomTopicEncoder = createEncoder({ + contentTopic: TestContentTopic, + pubSubTopic: pubSubTopic + }); + const CustomTopicDecoder = createDecoder(TestContentTopic, pubSubTopic); + afterEach(async function () { !!waku1 && waku1.stop().catch((e) => console.log("Waku failed to stop", e)); @@ -271,17 +280,15 @@ describe("Waku Relay [node only]", () => { it("Publish", async function () { this.timeout(10000); - const pubSubTopic = "/some/pubsub/topic"; - // 1 and 2 uses a custom pubsub // 3 uses the default pubsub [waku1, waku2, waku3] = await Promise.all([ createRelayNode({ - pubSubTopic: pubSubTopic, + pubSubTopics: [pubSubTopic], staticNoiseKey: NOISE_KEY_1 }).then((waku) => waku.start().then(() => waku)), createRelayNode({ - pubSubTopic: pubSubTopic, + pubSubTopics: [pubSubTopic], staticNoiseKey: NOISE_KEY_2, libp2p: { addresses: { listen: ["/ip4/0.0.0.0/tcp/0/ws"] } } }).then((waku) => waku.start().then(() => waku)), @@ -310,7 +317,7 @@ describe("Waku Relay [node only]", () => { const waku2ReceivedMsgPromise: Promise = new Promise( (resolve) => { - void waku2.relay.subscribe([TestDecoder], resolve); + void waku2.relay.subscribe([CustomTopicDecoder], resolve); } ); @@ -323,7 +330,7 @@ describe("Waku Relay [node only]", () => { } ); - await waku1.relay.send(TestEncoder, { + await waku1.relay.send(CustomTopicEncoder, { payload: utf8ToBytes(messageText) }); @@ -338,16 +345,14 @@ describe("Waku Relay [node only]", () => { this.timeout(10000); const MB = 1024 ** 2; - const pubSubTopic = "/some/pubsub/topic"; - // 1 and 2 uses a custom pubsub [waku1, waku2] = await Promise.all([ createRelayNode({ - pubSubTopic: pubSubTopic, + pubSubTopics: [pubSubTopic], staticNoiseKey: NOISE_KEY_1 }).then((waku) => waku.start().then(() => waku)), createRelayNode({ - pubSubTopic: pubSubTopic, + pubSubTopics: [pubSubTopic], staticNoiseKey: NOISE_KEY_2, libp2p: { addresses: { listen: ["/ip4/0.0.0.0/tcp/0/ws"] } } }).then((waku) => waku.start().then(() => waku)) @@ -365,7 +370,7 @@ describe("Waku Relay [node only]", () => { const waku2ReceivedMsgPromise: Promise = new Promise( (resolve) => { - void waku2.relay.subscribe([TestDecoder], () => + void waku2.relay.subscribe([CustomTopicDecoder], () => resolve({ payload: new Uint8Array([]) } as DecodedMessage) @@ -373,18 +378,18 @@ describe("Waku Relay [node only]", () => { } ); - let sendResult = await waku1.relay.send(TestEncoder, { + let sendResult = await waku1.relay.send(CustomTopicEncoder, { payload: generateRandomUint8Array(1 * MB) }); expect(sendResult.recipients.length).to.eq(1); - sendResult = await waku1.relay.send(TestEncoder, { + sendResult = await waku1.relay.send(CustomTopicEncoder, { payload: generateRandomUint8Array(1 * MB + 65536) }); expect(sendResult.recipients.length).to.eq(0); expect(sendResult.errors).to.include(SendError.SIZE_TOO_BIG); - sendResult = await waku1.relay.send(TestEncoder, { + sendResult = await waku1.relay.send(CustomTopicEncoder, { payload: generateRandomUint8Array(2 * MB) }); expect(sendResult.recipients.length).to.eq(0); diff --git a/packages/tests/tests/store.node.spec.ts b/packages/tests/tests/store.node.spec.ts index 53736af4bd..09f0ddd324 100644 --- a/packages/tests/tests/store.node.spec.ts +++ b/packages/tests/tests/store.node.spec.ts @@ -566,6 +566,11 @@ describe("Waku Store, custom pubsub topic", () => { let waku: LightNode; let nwaku: NimGoNode; + const CustomPubSubTestDecoder = createDecoder( + TestContentTopic, + customPubSubTopic + ); + beforeEach(async function () { this.timeout(15_000); nwaku = new NimGoNode(makeLogFileName(this)); @@ -600,7 +605,7 @@ describe("Waku Store, custom pubsub topic", () => { waku = await createLightNode({ staticNoiseKey: NOISE_KEY_1, - pubSubTopic: customPubSubTopic + pubSubTopics: [customPubSubTopic] }); await waku.start(); await waku.dial(await nwaku.getMultiaddrWithId()); @@ -608,7 +613,9 @@ describe("Waku Store, custom pubsub topic", () => { const messages: IMessage[] = []; let promises: Promise[] = []; - for await (const msgPromises of waku.store.queryGenerator([TestDecoder])) { + for await (const msgPromises of waku.store.queryGenerator([ + CustomPubSubTestDecoder + ])) { const _promises = msgPromises.map(async (promise) => { const msg = await promise; if (msg) { diff --git a/packages/tests/tests/wait_for_remote_peer.node.spec.ts b/packages/tests/tests/wait_for_remote_peer.node.spec.ts index 575ada689d..dd9f1dcf56 100644 --- a/packages/tests/tests/wait_for_remote_peer.node.spec.ts +++ b/packages/tests/tests/wait_for_remote_peer.node.spec.ts @@ -7,7 +7,7 @@ import { expect } from "chai"; import { delay, makeLogFileName, NOISE_KEY_1 } from "../src/index.js"; import { NimGoNode } from "../src/node/node.js"; -describe("Wait for remote peer", function () { +describe.only("Wait for remote peer", function () { let waku1: RelayNode; let waku2: LightNode; let nwaku: NimGoNode | undefined; @@ -39,7 +39,7 @@ describe("Wait for remote peer", function () { await waku1.dial(multiAddrWithId); await delay(1000); await waitForRemotePeer(waku1, [Protocols.Relay]); - const peers = waku1.relay.getMeshPeers(); + const peers = Array.from(waku1.relay.getMeshPeers().values()).flat(); const nimPeerId = multiAddrWithId.getPeerId(); expect(nimPeerId).to.not.be.undefined; @@ -67,7 +67,7 @@ describe("Wait for remote peer", function () { await waku1.dial(multiAddrWithId); await waitPromise; - const peers = waku1.relay.getMeshPeers(); + const peers = Array.from(waku1.relay.getMeshPeers().values()).flat(); const nimPeerId = multiAddrWithId.getPeerId(); expect(nimPeerId).to.not.be.undefined; @@ -262,7 +262,7 @@ describe("Wait for remote peer", function () { await waku1.dial(multiAddrWithId); await waitForRemotePeer(waku1); - const peers = waku1.relay.getMeshPeers(); + const peers = Array.from(waku1.relay.getMeshPeers().values()).flat(); const nimPeerId = multiAddrWithId.getPeerId(); From bad5d5eaed37efe3e635088c033d04560b62ab75 Mon Sep 17 00:00:00 2001 From: danisharora099 Date: Wed, 20 Sep 2023 18:59:22 +0530 Subject: [PATCH 06/29] chore: minor fixes --- .cspell.json | 1 + package-lock.json | 20 +------------------ packages/interfaces/src/protocols.ts | 4 ++-- .../tests/wait_for_remote_peer.node.spec.ts | 2 +- packages/utils/package.json | 1 - 5 files changed, 5 insertions(+), 23 deletions(-) diff --git a/.cspell.json b/.cspell.json index 5eb847a036..fea16763c9 100644 --- a/.cspell.json +++ b/.cspell.json @@ -39,6 +39,7 @@ "exponentiate", "extip", "fanout", + "sharded", "floodsub", "fontsource", "globby", diff --git a/package-lock.json b/package-lock.json index 6e433aaeda..520e59339c 100644 --- a/package-lock.json +++ b/package-lock.json @@ -27626,7 +27626,6 @@ "@rollup/plugin-json": "^6.0.0", "@rollup/plugin-node-resolve": "^15.1.0", "@waku/build-utils": "*", - "@waku/interfaces": "0.0.17", "cspell": "^7.3.2", "npm-run-all": "^4.1.5", "rollup": "^3.29.2", @@ -27635,15 +27634,6 @@ "engines": { "node": ">=16" } - }, - "packages/utils/node_modules/@waku/interfaces": { - "version": "0.0.17", - "resolved": "https://registry.npmjs.org/@waku/interfaces/-/interfaces-0.0.17.tgz", - "integrity": "sha512-/3HrMhpjMwpArm73L05Q7xLPKFeGjvpDlrFq4lqrXjKa8J2VUmiZh6fD9LdYP1ZvhPdzrCRAL+o7+Lb9kt4fvQ==", - "dev": true, - "engines": { - "node": ">=16" - } } }, "dependencies": { @@ -31576,21 +31566,13 @@ "@rollup/plugin-json": "^6.0.0", "@rollup/plugin-node-resolve": "^15.1.0", "@waku/build-utils": "*", - "@waku/interfaces": "0.0.17", + "@waku/interfaces": "0.0.18", "cspell": "^7.3.2", "debug": "^4.3.4", "npm-run-all": "^4.1.5", "rollup": "^3.29.2", "typescript": "^5.0.4", "uint8arrays": "^4.0.4" - }, - "dependencies": { - "@waku/interfaces": { - "version": "0.0.17", - "resolved": "https://registry.npmjs.org/@waku/interfaces/-/interfaces-0.0.17.tgz", - "integrity": "sha512-/3HrMhpjMwpArm73L05Q7xLPKFeGjvpDlrFq4lqrXjKa8J2VUmiZh6fD9LdYP1ZvhPdzrCRAL+o7+Lb9kt4fvQ==", - "dev": true - } } }, "@webassemblyjs/ast": { diff --git a/packages/interfaces/src/protocols.ts b/packages/interfaces/src/protocols.ts index 126ec6255f..d99195486c 100644 --- a/packages/interfaces/src/protocols.ts +++ b/packages/interfaces/src/protocols.ts @@ -28,8 +28,8 @@ export type ProtocolCreateOptions = { * To learn more about the sharding specifications implemented, see [Relay Sharding](https://rfc.vac.dev/spec/51/). * * If no pubsub topic is specified, the default pubsub topic is used. - * The set of pubsub topics that are used to initialise the Waku node, will need to be used by the protocols as well - * You cannot currently add or remove pubsub topics after initialisation. + * The set of pubsub topics that are used to initialize the Waku node, will need to be used by the protocols as well + * You cannot currently add or remove pubsub topics after initialization. * This is used by: * - WakuRelay to receive, route and send messages, * - WakuLightPush to send messages, diff --git a/packages/tests/tests/wait_for_remote_peer.node.spec.ts b/packages/tests/tests/wait_for_remote_peer.node.spec.ts index dd9f1dcf56..d118ce1aff 100644 --- a/packages/tests/tests/wait_for_remote_peer.node.spec.ts +++ b/packages/tests/tests/wait_for_remote_peer.node.spec.ts @@ -7,7 +7,7 @@ import { expect } from "chai"; import { delay, makeLogFileName, NOISE_KEY_1 } from "../src/index.js"; import { NimGoNode } from "../src/node/node.js"; -describe.only("Wait for remote peer", function () { +describe("Wait for remote peer", function () { let waku1: RelayNode; let waku2: LightNode; let nwaku: NimGoNode | undefined; diff --git a/packages/utils/package.json b/packages/utils/package.json index 100af1624a..83f6729668 100644 --- a/packages/utils/package.json +++ b/packages/utils/package.json @@ -74,7 +74,6 @@ "@rollup/plugin-json": "^6.0.0", "@rollup/plugin-node-resolve": "^15.1.0", "@waku/build-utils": "*", - "@waku/interfaces": "0.0.17", "cspell": "^7.3.2", "npm-run-all": "^4.1.5", "rollup": "^3.29.2", From 24c9ad083bb34af00e082ac3d9a84c8bd25497fe Mon Sep 17 00:00:00 2001 From: danisharora099 Date: Wed, 20 Sep 2023 19:20:38 +0530 Subject: [PATCH 07/29] chore: make store more robust --- packages/core/src/lib/store/index.ts | 32 ++++++++++++++++++---------- 1 file changed, 21 insertions(+), 11 deletions(-) diff --git a/packages/core/src/lib/store/index.ts b/packages/core/src/lib/store/index.ts index 63a0e127f2..6c79839e59 100644 --- a/packages/core/src/lib/store/index.ts +++ b/packages/core/src/lib/store/index.ts @@ -219,22 +219,32 @@ class Store extends BaseProtocol implements IStore { endTime = options.timeFilter.endTime; } - const _pubSubTopics = decoders.map((decoder) => decoder.pubSubTopic); + const _pubSubTopics = new Set( + decoders.map((decoder) => decoder.pubSubTopic) + ); for (const topic of _pubSubTopics) { this.ensurePubsubTopicIsValid(topic, this.pubSubTopics); const decodersAsMap = new Map(); - decoders.forEach((dec) => { - if (decodersAsMap.has(dec.contentTopic)) { - throw new Error( - "API does not support different decoder per content topic" - ); - } - decodersAsMap.set(dec.contentTopic, dec); - }); - - const contentTopics = decoders.map((dec) => dec.contentTopic); + decoders + .filter((decoder) => decoder.pubSubTopic === topic) + .forEach((dec) => { + if (decodersAsMap.has(dec.contentTopic)) { + throw new Error( + "API does not support different decoder per content topic" + ); + } + decodersAsMap.set(dec.contentTopic, dec); + }); + + const contentTopics = decoders + .filter((decoder) => decoder.pubSubTopic === topic) + .map((dec) => dec.contentTopic); + + if (contentTopics.length === 0) { + throw new Error("No decoders found for topic " + topic); + } const queryOpts = Object.assign( { From 1bac3309d284384497fb79faaf9105b618f1e014 Mon Sep 17 00:00:00 2001 From: danisharora099 Date: Wed, 20 Sep 2023 19:28:41 +0530 Subject: [PATCH 08/29] fix(relay): correctly set types --- packages/relay/src/index.ts | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/packages/relay/src/index.ts b/packages/relay/src/index.ts index 046148018b..b899ce06fb 100644 --- a/packages/relay/src/index.ts +++ b/packages/relay/src/index.ts @@ -219,9 +219,9 @@ class Relay implements IRelay { * @returns Map of topic to peer ids */ public getMeshPeers(): Map { - const map = new Map(); + const map = new Map(); for (const topic of this.pubSubTopics) { - map.set(topic, this.gossipSub.mesh.get(topic)); + map.set(topic, Array.from(this.gossipSub.mesh.get(topic) ?? [])); } return map; } From cbc514a948df8762428cb21477f92daed838d840 Mon Sep 17 00:00:00 2001 From: danisharora099 Date: Thu, 21 Sep 2023 13:12:40 +0530 Subject: [PATCH 09/29] chore(address comments): update terminology around configured pubsub topics --- packages/core/src/lib/base_protocol.ts | 6 +++--- packages/interfaces/src/protocols.ts | 2 +- packages/relay/src/index.ts | 6 +++--- 3 files changed, 7 insertions(+), 7 deletions(-) diff --git a/packages/core/src/lib/base_protocol.ts b/packages/core/src/lib/base_protocol.ts index e0dad78392..f5267792e3 100644 --- a/packages/core/src/lib/base_protocol.ts +++ b/packages/core/src/lib/base_protocol.ts @@ -96,11 +96,11 @@ export class BaseProtocol implements IBaseProtocol { protected ensurePubsubTopicIsValid( pubsubTopic: PubSubTopic, - allowedTopics: PubSubTopic[] + configuredTopics: PubSubTopic[] ): void { - if (!allowedTopics.includes(pubsubTopic)) { + if (!configuredTopics.includes(pubsubTopic)) { throw new Error( - `Pubsub topic ${pubsubTopic} is not supported by this protocol. Allowed topics are: ${allowedTopics}` + `PubSub topic ${pubsubTopic} is not supported by this protocol. Configured topics are: ${configuredTopics}. Please update your configuration by passing in the topic during Waku node instantiation.` ); } } diff --git a/packages/interfaces/src/protocols.ts b/packages/interfaces/src/protocols.ts index d99195486c..b6e097cba7 100644 --- a/packages/interfaces/src/protocols.ts +++ b/packages/interfaces/src/protocols.ts @@ -68,7 +68,7 @@ export enum SendError { DECODE_FAILED = "Failed to decode", SIZE_TOO_BIG = "Size is too big", NO_RPC_RESPONSE = "No RPC response", - TOPIC_NOT_SUBSCRIBED = "Topic not subscribed" + TOPIC_NOT_CONFIGURED = "Topic not configured" } export interface SendResult { diff --git a/packages/relay/src/index.ts b/packages/relay/src/index.ts index b899ce06fb..7df4b5aec7 100644 --- a/packages/relay/src/index.ts +++ b/packages/relay/src/index.ts @@ -105,10 +105,10 @@ class Relay implements IRelay { const { pubSubTopic } = encoder; if (!this.pubSubTopics.has(pubSubTopic)) { - log("Failed to send waku relay: topic not subscribed"); + log("Failed to send waku relay: topic not configured"); return { recipients, - errors: [SendError.TOPIC_NOT_SUBSCRIBED] + errors: [SendError.TOPIC_NOT_CONFIGURED] }; } @@ -340,7 +340,7 @@ function toObservers( for (const decoder of decoders) { if (!allPubSubTopics.includes(decoder.pubSubTopic)) { throw new Error( - `Pubsub topic ${decoder.pubSubTopic} is not supported by this protocol. Allowed topics are: ${allPubSubTopics}` + `PubSub topic ${decoder.pubSubTopic} is not supported by this protocol. Configured topics are: ${allPubSubTopics}. Please update your configuration by passing in the topic during Waku node instantiation.` ); } } From 697072ce63f120cd86f61fa81ec4b790617cb93b Mon Sep 17 00:00:00 2001 From: danisharora099 Date: Thu, 21 Sep 2023 13:17:29 +0530 Subject: [PATCH 10/29] chore(address comments): minor refactoring --- packages/core/src/lib/message/version_0.ts | 4 ++-- packages/interfaces/src/protocols.ts | 3 ++- packages/relay/src/index.ts | 2 +- 3 files changed, 5 insertions(+), 4 deletions(-) diff --git a/packages/core/src/lib/message/version_0.ts b/packages/core/src/lib/message/version_0.ts index 0e289490ae..270057e8ac 100644 --- a/packages/core/src/lib/message/version_0.ts +++ b/packages/core/src/lib/message/version_0.ts @@ -74,9 +74,9 @@ export class DecodedMessage implements IDecodedMessage { export class Encoder implements IEncoder { constructor( - public pubSubTopic: PubSubTopic, public contentTopic: string, public ephemeral: boolean = false, + public pubSubTopic: PubSubTopic, public metaSetter?: IMetaSetter ) { if (!contentTopic || contentTopic === "") { @@ -125,7 +125,7 @@ export function createEncoder({ ephemeral, metaSetter }: EncoderOptions): Encoder { - return new Encoder(pubSubTopic, contentTopic, ephemeral, metaSetter); + return new Encoder(contentTopic, ephemeral, pubSubTopic, metaSetter); } export class Decoder implements IDecoder { diff --git a/packages/interfaces/src/protocols.ts b/packages/interfaces/src/protocols.ts index b6e097cba7..e27aa411ab 100644 --- a/packages/interfaces/src/protocols.ts +++ b/packages/interfaces/src/protocols.ts @@ -4,6 +4,7 @@ import type { Peer, PeerStore } from "@libp2p/interface/peer-store"; import type { Libp2pOptions } from "libp2p"; import type { IDecodedMessage } from "./message.js"; +import type { PubSubTopic } from "./misc.js"; export enum Protocols { Relay = "relay", @@ -37,7 +38,7 @@ export type ProtocolCreateOptions = { * See [Waku v2 Topic Usage Recommendations](https://rfc.vac.dev/spec/23/) for details. * */ - pubSubTopics?: string[]; + pubSubTopics?: PubSubTopic[]; /** * You can pass options to the `Libp2p` instance used by {@link @waku/core.WakuNode} using the `libp2p` property. * This property is the same type as the one passed to [`Libp2p.create`](https://github.com/libp2p/js-libp2p/blob/master/doc/API.md#create) diff --git a/packages/relay/src/index.ts b/packages/relay/src/index.ts index 7df4b5aec7..0ec44398f8 100644 --- a/packages/relay/src/index.ts +++ b/packages/relay/src/index.ts @@ -153,7 +153,7 @@ class Relay implements IRelay { newObservers ] of contentTopicToObservers.entries()) { const currObservers = - existingContentObservers.get(contentTopic) || new Set>(); + existingContentObservers.get(contentTopic) ?? new Set>(); existingContentObservers.set( contentTopic, union(currObservers, newObservers) From 3285849eb077dc84b34ef300b60f13aec04baed8 Mon Sep 17 00:00:00 2001 From: danisharora099 Date: Thu, 21 Sep 2023 13:22:52 +0530 Subject: [PATCH 11/29] chore(relay): split `subscribe` into smaller functions for readability & modularity --- packages/relay/src/index.ts | 88 ++++++++++++++++++++++++------------- 1 file changed, 58 insertions(+), 30 deletions(-) diff --git a/packages/relay/src/index.ts b/packages/relay/src/index.ts index 0ec44398f8..5818942cd4 100644 --- a/packages/relay/src/index.ts +++ b/packages/relay/src/index.ts @@ -136,10 +136,33 @@ class Relay implements IRelay { decoders: IDecoder | IDecoder[], callback: Callback ): () => void { - const pubSubTopicToContentObservers = Array.isArray(decoders) + const pubSubTopicToContentObservers = this.createObservers( + decoders, + callback + ); + + this.addObservers(pubSubTopicToContentObservers); + + return () => { + this.removeObservers(pubSubTopicToContentObservers); + }; + } + + private createObservers( + decoders: IDecoder | IDecoder[], + callback: Callback + ): Map>>> { + return Array.isArray(decoders) ? toObservers(Array.from(this.pubSubTopics), decoders, callback) : toObservers(Array.from(this.pubSubTopics), [decoders], callback); + } + private addObservers( + pubSubTopicToContentObservers: Map< + PubSubTopic, + Map>> + > + ): void { for (const [ pubSubTopic, contentTopicToObservers @@ -162,42 +185,47 @@ class Relay implements IRelay { this.observers.set(pubSubTopic, existingContentObservers); } + } - return () => { - for (const [ - pubSubTopic, - contentTopicToObservers - ] of pubSubTopicToContentObservers.entries()) { - const existingContentObservers = this.observers.get(pubSubTopic); - - if (existingContentObservers) { - for (const [ - contentTopic, + private removeObservers( + pubSubTopicToContentObservers: Map< + PubSubTopic, + Map>> + > + ): void { + for (const [ + pubSubTopic, + contentTopicToObservers + ] of pubSubTopicToContentObservers.entries()) { + const existingContentObservers = this.observers.get(pubSubTopic); + + if (existingContentObservers) { + for (const [ + contentTopic, + observersToRemove + ] of contentTopicToObservers.entries()) { + const currentObservers = + existingContentObservers.get(contentTopic) || + new Set>(); + const nextObservers = leftMinusJoin( + currentObservers, observersToRemove - ] of contentTopicToObservers.entries()) { - const currentObservers = - existingContentObservers.get(contentTopic) || - new Set>(); - const nextObservers = leftMinusJoin( - currentObservers, - observersToRemove - ); - - if (nextObservers.size) { - existingContentObservers.set(contentTopic, nextObservers); - } else { - existingContentObservers.delete(contentTopic); - } - } + ); - if (existingContentObservers.size) { - this.observers.set(pubSubTopic, existingContentObservers); + if (nextObservers.size) { + existingContentObservers.set(contentTopic, nextObservers); } else { - this.observers.delete(pubSubTopic); + existingContentObservers.delete(contentTopic); } } + + if (existingContentObservers.size) { + this.observers.set(pubSubTopic, existingContentObservers); + } else { + this.observers.delete(pubSubTopic); + } } - }; + } } public toSubscriptionIterator( From 5392b319a067a9826479a70ad4e77ac0b20ad3af Mon Sep 17 00:00:00 2001 From: danisharora099 Date: Thu, 21 Sep 2023 13:30:24 +0530 Subject: [PATCH 12/29] chore(address comments): refactor `waitForGossipSubPeerInMesh` --- packages/core/src/lib/wait_for_remote_peer.ts | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/packages/core/src/lib/wait_for_remote_peer.ts b/packages/core/src/lib/wait_for_remote_peer.ts index 7c829b34e2..8cd9cd24df 100644 --- a/packages/core/src/lib/wait_for_remote_peer.ts +++ b/packages/core/src/lib/wait_for_remote_peer.ts @@ -100,16 +100,16 @@ async function waitForConnectedPeer(protocol: IBaseProtocol): Promise { * mesh for all pubSubTopics. */ async function waitForGossipSubPeerInMesh(waku: IRelay): Promise { - let allTopicsHavePeers = false; + let topicHasNoPeer = true; - while (!allTopicsHavePeers) { + while (topicHasNoPeer) { await pEvent(waku.gossipSub, "gossipsub:heartbeat"); const meshPeersMap = waku.getMeshPeers(); - allTopicsHavePeers = true; // Assume all topics have peers initially + topicHasNoPeer = false; // Assume no topic is without peers initially for (const peers of meshPeersMap.values()) { if (peers.length === 0) { - allTopicsHavePeers = false; // If any topic doesn't have peers, set to false + topicHasNoPeer = true; // If any topic doesn't have peers, set to true break; } } From 7f517ad0d704aea88e495815f7164aa8350d8945 Mon Sep 17 00:00:00 2001 From: danisharora099 Date: Thu, 21 Sep 2023 13:38:50 +0530 Subject: [PATCH 13/29] chore(store): only allow to query one `pubSubTopic` --- packages/core/src/lib/store/index.ts | 112 +++++++++++++++------------ 1 file changed, 64 insertions(+), 48 deletions(-) diff --git a/packages/core/src/lib/store/index.ts b/packages/core/src/lib/store/index.ts index 6c79839e59..1cc82eaa0c 100644 --- a/packages/core/src/lib/store/index.ts +++ b/packages/core/src/lib/store/index.ts @@ -207,11 +207,21 @@ class Store extends BaseProtocol implements IStore { * @throws If not able to reach a Waku Store peer to query, * or if an error is encountered when processing the reply, * or if two decoders with the same content topic are passed. + * + * This API only supports querying a single pubsub topic at a time. + * If multiple decoders are provided, they must all have the same pubsub topic. + * @throws If multiple decoders with different pubsub topics are provided. + * @throws If no decoders are provided. + * @throws If no decoders are found for the provided pubsub topic. */ async *queryGenerator( decoders: IDecoder[], options?: QueryOptions ): AsyncGenerator[]> { + if (decoders.length === 0) { + throw new Error("No decoders provided"); + } + let startTime, endTime; if (options?.timeFilter) { @@ -219,60 +229,66 @@ class Store extends BaseProtocol implements IStore { endTime = options.timeFilter.endTime; } - const _pubSubTopics = new Set( + const uniquePubSubTopicsInQuery = new Set( decoders.map((decoder) => decoder.pubSubTopic) ); - for (const topic of _pubSubTopics) { - this.ensurePubsubTopicIsValid(topic, this.pubSubTopics); - - const decodersAsMap = new Map(); - decoders - .filter((decoder) => decoder.pubSubTopic === topic) - .forEach((dec) => { - if (decodersAsMap.has(dec.contentTopic)) { - throw new Error( - "API does not support different decoder per content topic" - ); - } - decodersAsMap.set(dec.contentTopic, dec); - }); - - const contentTopics = decoders - .filter((decoder) => decoder.pubSubTopic === topic) - .map((dec) => dec.contentTopic); - - if (contentTopics.length === 0) { - throw new Error("No decoders found for topic " + topic); - } - - const queryOpts = Object.assign( - { - pubSubTopic: topic, - pageDirection: PageDirection.BACKWARD, - pageSize: DefaultPageSize - }, - options, - { contentTopics, startTime, endTime } + if (uniquePubSubTopicsInQuery.size > 1) { + throw new Error( + "API does not support querying multiple pubsub topics at once" ); + } + + const pubSubTopicForQuery = uniquePubSubTopicsInQuery + .values() + .next() as unknown as PubSubTopic; - log("Querying history with the following options", options); - - const peer = ( - await this.getPeers({ - numPeers: this.NUM_PEERS_PROTOCOL, - maxBootstrapPeers: 1 - }) - )[0]; - - for await (const messages of paginate( - this.getStream.bind(this, peer), - queryOpts, - decodersAsMap, - options?.cursor - )) { - yield messages; + this.ensurePubsubTopicIsValid(pubSubTopicForQuery, this.pubSubTopics); + + const decodersAsMap = new Map(); + decoders.forEach((dec) => { + if (decodersAsMap.has(dec.contentTopic)) { + throw new Error( + "API does not support different decoder per content topic" + ); } + decodersAsMap.set(dec.contentTopic, dec); + }); + + const contentTopics = decoders + .filter((decoder) => decoder.pubSubTopic === pubSubTopicForQuery) + .map((dec) => dec.contentTopic); + + if (contentTopics.length === 0) { + throw new Error("No decoders found for topic " + pubSubTopicForQuery); + } + + const queryOpts = Object.assign( + { + pubSubTopic: pubSubTopicForQuery, + pageDirection: PageDirection.BACKWARD, + pageSize: DefaultPageSize + }, + options, + { contentTopics, startTime, endTime } + ); + + log("Querying history with the following options", options); + + const peer = ( + await this.getPeers({ + numPeers: this.NUM_PEERS_PROTOCOL, + maxBootstrapPeers: 1 + }) + )[0]; + + for await (const messages of paginate( + this.getStream.bind(this, peer), + queryOpts, + decodersAsMap, + options?.cursor + )) { + yield messages; } } } From 76208e2a71bc609528ffa0cbaf43c02ac3a0b199 Mon Sep 17 00:00:00 2001 From: danisharora099 Date: Thu, 21 Sep 2023 14:48:31 +0530 Subject: [PATCH 14/29] fix: `store` bug --- packages/core/src/lib/store/index.ts | 13 +++++++------ 1 file changed, 7 insertions(+), 6 deletions(-) diff --git a/packages/core/src/lib/store/index.ts b/packages/core/src/lib/store/index.ts index 1cc82eaa0c..c771bc8a43 100644 --- a/packages/core/src/lib/store/index.ts +++ b/packages/core/src/lib/store/index.ts @@ -229,19 +229,20 @@ class Store extends BaseProtocol implements IStore { endTime = options.timeFilter.endTime; } - const uniquePubSubTopicsInQuery = new Set( - decoders.map((decoder) => decoder.pubSubTopic) + // convert array to set to remove duplicates + const uniquePubSubTopicsInQuery = Array.from( + new Set(decoders.map((decoder) => decoder.pubSubTopic)) ); - if (uniquePubSubTopicsInQuery.size > 1) { + // If multiple pubsub topics are provided, throw an error + if (uniquePubSubTopicsInQuery.length > 1) { throw new Error( "API does not support querying multiple pubsub topics at once" ); } - const pubSubTopicForQuery = uniquePubSubTopicsInQuery - .values() - .next() as unknown as PubSubTopic; + // we can be certain that there is only one pubsub topic in the query + const pubSubTopicForQuery = uniquePubSubTopicsInQuery[0]; this.ensurePubsubTopicIsValid(pubSubTopicForQuery, this.pubSubTopics); From 356548e5611a14f70b1ad43d7231594cda2deec1 Mon Sep 17 00:00:00 2001 From: danisharora099 Date: Thu, 21 Sep 2023 17:08:34 +0530 Subject: [PATCH 15/29] feat(tests): add some basic tests --- package-lock.json | 5 -- packages/tests/tests/sharding.spec.ts | 79 +++++++++++++++++++++++++++ 2 files changed, 79 insertions(+), 5 deletions(-) create mode 100644 packages/tests/tests/sharding.spec.ts diff --git a/package-lock.json b/package-lock.json index d05f82a654..08f0655394 100644 --- a/package-lock.json +++ b/package-lock.json @@ -32140,11 +32140,6 @@ "npm-run-all": "^4.1.5", "rollup": "^3.29.2", "uint8arrays": "^4.0.4" - }, - "dependencies": { - "@waku/interfaces": { - "version": "0.0.17" - } } }, "@webassemblyjs/ast": { diff --git a/packages/tests/tests/sharding.spec.ts b/packages/tests/tests/sharding.spec.ts new file mode 100644 index 0000000000..307897a82e --- /dev/null +++ b/packages/tests/tests/sharding.spec.ts @@ -0,0 +1,79 @@ +import { createLightNode, LightNode, utf8ToBytes } from "@waku/sdk"; +import { createEncoder } from "@waku/sdk"; +import chai, { expect } from "chai"; +import chaiAsPromised from "chai-as-promised"; + +import { makeLogFileName } from "../src/log_file.js"; +import { NimGoNode } from "../src/node/node.js"; + +const PubSubTopic1 = "/waku/1/test1"; +const PubSubTopic2 = "/waku/1/test2"; + +const ContentTopic = "/waku/1/content/test"; + +chai.use(chaiAsPromised); + +describe("Static Sharding", () => { + let waku: LightNode; + let nwaku: NimGoNode; + + beforeEach(async function () { + this.timeout(15_000); + nwaku = new NimGoNode(makeLogFileName(this)); + await nwaku.start({ store: true, lightpush: true, relay: true }); + }); + + afterEach(async function () { + !!nwaku && + nwaku.stop().catch((e) => console.log("Nwaku failed to stop", e)); + !!waku && waku.stop().catch((e) => console.log("Waku failed to stop", e)); + }); + + it("configure the node with multiple pubsub topics", async function () { + this.timeout(15_000); + waku = await createLightNode({ + pubSubTopics: [PubSubTopic1, PubSubTopic2] + }); + + const encoder1 = createEncoder({ + contentTopic: ContentTopic, + pubSubTopic: PubSubTopic1 + }); + + const encoder2 = createEncoder({ + contentTopic: ContentTopic, + pubSubTopic: PubSubTopic2 + }); + + const request1 = waku.lightPush.send(encoder1, { + payload: utf8ToBytes("Hello World") + }); + + const request2 = waku.lightPush.send(encoder2, { + payload: utf8ToBytes("Hello World") + }); + + await expect(request1).to.be.fulfilled; + await expect(request2).to.be.fulfilled; + }); + + it("using a protocol with unconfigured pubsub topic should fail", async function () { + this.timeout(15_000); + waku = await createLightNode({ + pubSubTopics: [PubSubTopic1] + }); + + // use a pubsub topic that is not configured + const encoder = createEncoder({ + contentTopic: ContentTopic, + pubSubTopic: PubSubTopic2 + }); + + // the following request should throw an error + const request = waku.lightPush.send(encoder, { + payload: utf8ToBytes("Hello World") + }); + + await expect(request).to.be.rejectedWith(Error); + }); +}); From fd85ba0d11b04fb26c011d7b1cfddf6f0643a8b2 Mon Sep 17 00:00:00 2001 From: danisharora099 Date: Tue, 26 Sep 2023 09:02:39 +0530 Subject: [PATCH 16/29] sharding utils --- packages/core/src/lib/base_protocol.ts | 17 +---------------- packages/core/src/lib/filter/index.ts | 8 ++++++-- packages/core/src/lib/light_push/index.ts | 4 ++-- packages/core/src/lib/store/index.ts | 4 ++-- packages/utils/src/common/index.ts | 1 + packages/utils/src/common/sharding.ts | 12 ++++++++++++ 6 files changed, 24 insertions(+), 22 deletions(-) create mode 100644 packages/utils/src/common/sharding.ts diff --git a/packages/core/src/lib/base_protocol.ts b/packages/core/src/lib/base_protocol.ts index f5267792e3..1546fda480 100644 --- a/packages/core/src/lib/base_protocol.ts +++ b/packages/core/src/lib/base_protocol.ts @@ -2,11 +2,7 @@ import type { Libp2p } from "@libp2p/interface"; import type { Stream } from "@libp2p/interface/connection"; import type { PeerId } from "@libp2p/interface/peer-id"; import { Peer, PeerStore } from "@libp2p/interface/peer-store"; -import type { - IBaseProtocol, - Libp2pComponents, - PubSubTopic -} from "@waku/interfaces"; +import type { IBaseProtocol, Libp2pComponents } from "@waku/interfaces"; import { getPeersForProtocol, selectPeerForProtocol } from "@waku/utils/libp2p"; import { filterPeers } from "./filterPeers.js"; @@ -93,15 +89,4 @@ export class BaseProtocol implements IBaseProtocol { // Filter the peers based on the specified criteria return filterPeers(allPeersForProtocol, numPeers, maxBootstrapPeers); } - - protected ensurePubsubTopicIsValid( - pubsubTopic: PubSubTopic, - configuredTopics: PubSubTopic[] - ): void { - if (!configuredTopics.includes(pubsubTopic)) { - throw new Error( - `PubSub topic ${pubsubTopic} is not supported by this protocol. Configured topics are: ${configuredTopics}. Please update your configuration by passing in the topic during Waku node instantiation.` - ); - } - } } diff --git a/packages/core/src/lib/filter/index.ts b/packages/core/src/lib/filter/index.ts index d8db596e22..8b33f599b2 100644 --- a/packages/core/src/lib/filter/index.ts +++ b/packages/core/src/lib/filter/index.ts @@ -17,7 +17,11 @@ import type { Unsubscribe } from "@waku/interfaces"; import { WakuMessage } from "@waku/proto"; -import { groupByContentTopic, toAsyncIterator } from "@waku/utils"; +import { + ensurePubsubTopicIsValid, + groupByContentTopic, + toAsyncIterator +} from "@waku/utils"; import debug from "debug"; import all from "it-all"; import * as lp from "it-length-prefixed"; @@ -265,7 +269,7 @@ class Filter extends BaseProtocol implements IReceiver { async createSubscription( pubSubTopic: string = DefaultPubSubTopic ): Promise { - this.ensurePubsubTopicIsValid(pubSubTopic, this.pubsubTopics); + ensurePubsubTopicIsValid(pubSubTopic, this.pubsubTopics); //TODO: get a relevant peer for the topic/shard const peer = ( diff --git a/packages/core/src/lib/light_push/index.ts b/packages/core/src/lib/light_push/index.ts index 3bab41d496..a5bda99b68 100644 --- a/packages/core/src/lib/light_push/index.ts +++ b/packages/core/src/lib/light_push/index.ts @@ -11,7 +11,7 @@ import { SendResult } from "@waku/interfaces"; import { PushResponse } from "@waku/proto"; -import { isSizeValid } from "@waku/utils"; +import { ensurePubsubTopicIsValid, isSizeValid } from "@waku/utils"; import debug from "debug"; import all from "it-all"; import * as lp from "it-length-prefixed"; @@ -84,7 +84,7 @@ class LightPush extends BaseProtocol implements ILightPush { async send(encoder: IEncoder, message: IMessage): Promise { const { pubSubTopic } = encoder; - this.ensurePubsubTopicIsValid(pubSubTopic, this.pubsubTopics); + ensurePubsubTopicIsValid(pubSubTopic, this.pubsubTopics); const recipients: PeerId[] = []; diff --git a/packages/core/src/lib/store/index.ts b/packages/core/src/lib/store/index.ts index c771bc8a43..38a7608a58 100644 --- a/packages/core/src/lib/store/index.ts +++ b/packages/core/src/lib/store/index.ts @@ -10,7 +10,7 @@ import { PubSubTopic } from "@waku/interfaces"; import { proto_store as proto } from "@waku/proto"; -import { isDefined } from "@waku/utils"; +import { ensurePubsubTopicIsValid, isDefined } from "@waku/utils"; import { concat, utf8ToBytes } from "@waku/utils/bytes"; import debug from "debug"; import all from "it-all"; @@ -244,7 +244,7 @@ class Store extends BaseProtocol implements IStore { // we can be certain that there is only one pubsub topic in the query const pubSubTopicForQuery = uniquePubSubTopicsInQuery[0]; - this.ensurePubsubTopicIsValid(pubSubTopicForQuery, this.pubSubTopics); + ensurePubsubTopicIsValid(pubSubTopicForQuery, this.pubSubTopics); const decodersAsMap = new Map(); decoders.forEach((dec) => { diff --git a/packages/utils/src/common/index.ts b/packages/utils/src/common/index.ts index f73240fbeb..5d080b2319 100644 --- a/packages/utils/src/common/index.ts +++ b/packages/utils/src/common/index.ts @@ -3,6 +3,7 @@ export * from "./random_subset.js"; export * from "./group_by.js"; export * from "./to_async_iterator.js"; export * from "./is_size_valid.js"; +export * from "./sharding.js"; export function removeItemFromArray(arr: unknown[], value: unknown): unknown[] { const index = arr.indexOf(value); diff --git a/packages/utils/src/common/sharding.ts b/packages/utils/src/common/sharding.ts new file mode 100644 index 0000000000..99b55ff813 --- /dev/null +++ b/packages/utils/src/common/sharding.ts @@ -0,0 +1,12 @@ +import type { PubSubTopic } from "@waku/interfaces"; + +export function ensurePubsubTopicIsValid( + pubsubTopic: PubSubTopic, + configuredTopics: PubSubTopic[] +): void { + if (!configuredTopics.includes(pubsubTopic)) { + throw new Error( + `PubSub topic ${pubsubTopic} is not supported by this protocol. Configured topics are: ${configuredTopics}. Please update your configuration by passing in the topic during Waku node instantiation.` + ); + } +} From 8e8da03b7e4c32c0d1f688398c3087b1f2e950d3 Mon Sep 17 00:00:00 2001 From: danisharora099 Date: Tue, 26 Sep 2023 09:09:32 +0530 Subject: [PATCH 17/29] address comments --- package-lock.json | 2 +- packages/core/src/lib/filter/index.ts | 1 + packages/core/src/lib/keep_alive_manager.ts | 8 ++++---- packages/relay/src/index.ts | 2 +- packages/tests/tests/sharding.spec.ts | 6 +++--- packages/utils/package.json | 4 ++-- 6 files changed, 12 insertions(+), 11 deletions(-) diff --git a/package-lock.json b/package-lock.json index 08f0655394..3699c6eda8 100644 --- a/package-lock.json +++ b/package-lock.json @@ -27855,7 +27855,6 @@ "version": "0.0.11", "license": "MIT OR Apache-2.0", "dependencies": { - "@waku/interfaces": "0.0.18", "debug": "^4.3.4", "uint8arrays": "^4.0.4" }, @@ -27864,6 +27863,7 @@ "@rollup/plugin-json": "^6.0.0", "@rollup/plugin-node-resolve": "^15.1.0", "@waku/build-utils": "*", + "@waku/interfaces": "0.0.18", "cspell": "^7.3.2", "npm-run-all": "^4.1.5", "rollup": "^3.29.2" diff --git a/packages/core/src/lib/filter/index.ts b/packages/core/src/lib/filter/index.ts index 8b33f599b2..12ff834cf8 100644 --- a/packages/core/src/lib/filter/index.ts +++ b/packages/core/src/lib/filter/index.ts @@ -272,6 +272,7 @@ class Filter extends BaseProtocol implements IReceiver { ensurePubsubTopicIsValid(pubSubTopic, this.pubsubTopics); //TODO: get a relevant peer for the topic/shard + // https://github.com/waku-org/js-waku/pull/1586#discussion_r1336428230 const peer = ( await this.getPeers({ maxBootstrapPeers: 1, diff --git a/packages/core/src/lib/keep_alive_manager.ts b/packages/core/src/lib/keep_alive_manager.ts index 4878bfd029..ce2cc6ad9b 100644 --- a/packages/core/src/lib/keep_alive_manager.ts +++ b/packages/core/src/lib/keep_alive_manager.ts @@ -38,7 +38,7 @@ export class KeepAliveManager { const peerIdStr = peerId.toString(); if (pingPeriodSecs !== 0) { - const interval = setInterval(() => { + const intervals = setInterval(() => { void (async () => { try { // ping the peer for keep alive @@ -61,12 +61,12 @@ export class KeepAliveManager { })(); }, pingPeriodSecs * 1000); - this.pingKeepAliveTimers.set(peerIdStr, interval); + this.pingKeepAliveTimers.set(peerIdStr, intervals); } const relay = this.relay; if (relay && relayPeriodSecs !== 0) { - const interval = this.sendRelayPings( + const interval = this.scheduleRelayPings( relay, relayPeriodSecs, peerId.toString() @@ -101,7 +101,7 @@ export class KeepAliveManager { this.relayKeepAliveTimers.clear(); } - private sendRelayPings( + private scheduleRelayPings( relay: IRelay, relayPeriodSecs: number, peerIdStr: PeerIdStr diff --git a/packages/relay/src/index.ts b/packages/relay/src/index.ts index 5818942cd4..7812054f22 100644 --- a/packages/relay/src/index.ts +++ b/packages/relay/src/index.ts @@ -129,7 +129,7 @@ class Relay implements IRelay { }; } - return await this.gossipSub.publish(pubSubTopic, msg); + return this.gossipSub.publish(pubSubTopic, msg); } public subscribe( diff --git a/packages/tests/tests/sharding.spec.ts b/packages/tests/tests/sharding.spec.ts index 307897a82e..3e830e3e7f 100644 --- a/packages/tests/tests/sharding.spec.ts +++ b/packages/tests/tests/sharding.spec.ts @@ -6,10 +6,10 @@ import chaiAsPromised from "chai-as-promised"; import { makeLogFileName } from "../src/log_file.js"; import { NimGoNode } from "../src/node/node.js"; -const PubSubTopic1 = "/waku/1/test1"; -const PubSubTopic2 = "/waku/1/test2"; +const PubSubTopic1 = "/waku/2/rs/0/2"; +const PubSubTopic2 = "/waku/2/rs/0/3"; -const ContentTopic = "/waku/1/content/test"; +const ContentTopic = "/waku/2/content/test"; chai.use(chaiAsPromised); diff --git a/packages/utils/package.json b/packages/utils/package.json index 7d2b1f8464..870d7e1059 100644 --- a/packages/utils/package.json +++ b/packages/utils/package.json @@ -66,10 +66,10 @@ }, "dependencies": { "debug": "^4.3.4", - "uint8arrays": "^4.0.4", - "@waku/interfaces": "0.0.18" + "uint8arrays": "^4.0.4" }, "devDependencies": { + "@waku/interfaces": "0.0.18", "@rollup/plugin-commonjs": "^25.0.4", "@rollup/plugin-json": "^6.0.0", "@rollup/plugin-node-resolve": "^15.1.0", From a84726b4e152cb917ae986bfbf0321a7b4d1ec85 Mon Sep 17 00:00:00 2001 From: danisharora099 Date: Tue, 26 Sep 2023 13:49:59 +0530 Subject: [PATCH 18/29] feat(relay): re-add API for `getMeshPeers` --- packages/interfaces/src/relay.ts | 3 ++- packages/relay/src/index.ts | 6 +++++- 2 files changed, 7 insertions(+), 2 deletions(-) diff --git a/packages/interfaces/src/relay.ts b/packages/interfaces/src/relay.ts index 3269579284..ca6bb513ea 100644 --- a/packages/interfaces/src/relay.ts +++ b/packages/interfaces/src/relay.ts @@ -14,7 +14,8 @@ import type { ISender } from "./sender.js"; export interface IRelayAPI { readonly gossipSub: GossipSub; start: () => Promise; - getMeshPeers: () => Map; + getMeshPeers: (topic?: TopicStr) => PeerIdStr[]; + getAllMeshPeers: () => Map; } export type IRelay = IRelayAPI & ISender & IReceiver; diff --git a/packages/relay/src/index.ts b/packages/relay/src/index.ts index 7812054f22..3b54e74d8f 100644 --- a/packages/relay/src/index.ts +++ b/packages/relay/src/index.ts @@ -242,11 +242,15 @@ class Relay implements IRelay { return map; } + public getMeshPeers(topic: TopicStr = DefaultPubSubTopic): PeerIdStr[] { + return this.gossipSub.getMeshPeers(topic); + } + /** * Returns mesh peers for all topics. * @returns Map of topic to peer ids */ - public getMeshPeers(): Map { + public getAllMeshPeers(): Map { const map = new Map(); for (const topic of this.pubSubTopics) { map.set(topic, Array.from(this.gossipSub.mesh.get(topic) ?? [])); From a626ceb81cdc6c55a36294dfc19161d5e9134096 Mon Sep 17 00:00:00 2001 From: Danish Arora <35004822+danisharora099@users.noreply.github.com> Date: Tue, 26 Sep 2023 20:23:35 +0530 Subject: [PATCH 19/29] update error message Co-authored-by: fryorcraken <110212804+fryorcraken@users.noreply.github.com> --- packages/utils/src/common/sharding.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/packages/utils/src/common/sharding.ts b/packages/utils/src/common/sharding.ts index 99b55ff813..a4439e86f3 100644 --- a/packages/utils/src/common/sharding.ts +++ b/packages/utils/src/common/sharding.ts @@ -6,7 +6,7 @@ export function ensurePubsubTopicIsValid( ): void { if (!configuredTopics.includes(pubsubTopic)) { throw new Error( - `PubSub topic ${pubsubTopic} is not supported by this protocol. Configured topics are: ${configuredTopics}. Please update your configuration by passing in the topic during Waku node instantiation.` + `PubSub topic ${pubsubTopic} has not been configured on this instance. Configured topics are: ${configuredTopics}. Please update your configuration by passing in the topic during Waku node instantiation.` ); } } From 8054eb4d2614aac90465a2f0d4a83632478e4528 Mon Sep 17 00:00:00 2001 From: danisharora099 Date: Tue, 26 Sep 2023 20:44:36 +0530 Subject: [PATCH 20/29] refactor for new API --- packages/core/src/lib/filter/index.ts | 4 ++-- packages/core/src/lib/keep_alive_manager.ts | 2 +- packages/core/src/lib/light_push/index.ts | 4 ++-- packages/core/src/lib/store/index.ts | 4 ++-- packages/utils/src/common/sharding.ts | 2 +- 5 files changed, 8 insertions(+), 8 deletions(-) diff --git a/packages/core/src/lib/filter/index.ts b/packages/core/src/lib/filter/index.ts index 12ff834cf8..e2fcd165d2 100644 --- a/packages/core/src/lib/filter/index.ts +++ b/packages/core/src/lib/filter/index.ts @@ -18,7 +18,7 @@ import type { } from "@waku/interfaces"; import { WakuMessage } from "@waku/proto"; import { - ensurePubsubTopicIsValid, + ensurePubsubTopicIsConfigured, groupByContentTopic, toAsyncIterator } from "@waku/utils"; @@ -269,7 +269,7 @@ class Filter extends BaseProtocol implements IReceiver { async createSubscription( pubSubTopic: string = DefaultPubSubTopic ): Promise { - ensurePubsubTopicIsValid(pubSubTopic, this.pubsubTopics); + ensurePubsubTopicIsConfigured(pubSubTopic, this.pubsubTopics); //TODO: get a relevant peer for the topic/shard // https://github.com/waku-org/js-waku/pull/1586#discussion_r1336428230 diff --git a/packages/core/src/lib/keep_alive_manager.ts b/packages/core/src/lib/keep_alive_manager.ts index ce2cc6ad9b..5256a6e83f 100644 --- a/packages/core/src/lib/keep_alive_manager.ts +++ b/packages/core/src/lib/keep_alive_manager.ts @@ -106,7 +106,7 @@ export class KeepAliveManager { relayPeriodSecs: number, peerIdStr: PeerIdStr ): NodeJS.Timeout[] { - const peersMap = relay.getMeshPeers(); + const peersMap = relay.getAllMeshPeers(); // find the PubSubTopics the peer is part of const pubSubTopics: PubSubTopic[] = []; diff --git a/packages/core/src/lib/light_push/index.ts b/packages/core/src/lib/light_push/index.ts index a5bda99b68..8739d2ae54 100644 --- a/packages/core/src/lib/light_push/index.ts +++ b/packages/core/src/lib/light_push/index.ts @@ -11,7 +11,7 @@ import { SendResult } from "@waku/interfaces"; import { PushResponse } from "@waku/proto"; -import { ensurePubsubTopicIsValid, isSizeValid } from "@waku/utils"; +import { ensurePubsubTopicIsConfigured, isSizeValid } from "@waku/utils"; import debug from "debug"; import all from "it-all"; import * as lp from "it-length-prefixed"; @@ -84,7 +84,7 @@ class LightPush extends BaseProtocol implements ILightPush { async send(encoder: IEncoder, message: IMessage): Promise { const { pubSubTopic } = encoder; - ensurePubsubTopicIsValid(pubSubTopic, this.pubsubTopics); + ensurePubsubTopicIsConfigured(pubSubTopic, this.pubsubTopics); const recipients: PeerId[] = []; diff --git a/packages/core/src/lib/store/index.ts b/packages/core/src/lib/store/index.ts index 38a7608a58..9ed938ff3c 100644 --- a/packages/core/src/lib/store/index.ts +++ b/packages/core/src/lib/store/index.ts @@ -10,7 +10,7 @@ import { PubSubTopic } from "@waku/interfaces"; import { proto_store as proto } from "@waku/proto"; -import { ensurePubsubTopicIsValid, isDefined } from "@waku/utils"; +import { ensurePubsubTopicIsConfigured, isDefined } from "@waku/utils"; import { concat, utf8ToBytes } from "@waku/utils/bytes"; import debug from "debug"; import all from "it-all"; @@ -244,7 +244,7 @@ class Store extends BaseProtocol implements IStore { // we can be certain that there is only one pubsub topic in the query const pubSubTopicForQuery = uniquePubSubTopicsInQuery[0]; - ensurePubsubTopicIsValid(pubSubTopicForQuery, this.pubSubTopics); + ensurePubsubTopicIsConfigured(pubSubTopicForQuery, this.pubSubTopics); const decodersAsMap = new Map(); decoders.forEach((dec) => { diff --git a/packages/utils/src/common/sharding.ts b/packages/utils/src/common/sharding.ts index a4439e86f3..f14ba20d69 100644 --- a/packages/utils/src/common/sharding.ts +++ b/packages/utils/src/common/sharding.ts @@ -1,6 +1,6 @@ import type { PubSubTopic } from "@waku/interfaces"; -export function ensurePubsubTopicIsValid( +export function ensurePubsubTopicIsConfigured( pubsubTopic: PubSubTopic, configuredTopics: PubSubTopic[] ): void { From 8b9f5583edf8168474d1c3686a5ee1bd1912c6c5 Mon Sep 17 00:00:00 2001 From: fryorcraken <110212804+fryorcraken@users.noreply.github.com> Date: Wed, 27 Sep 2023 01:29:40 +1000 Subject: [PATCH 21/29] feat: simplify handling of observers (#1614) * refactor: simplify handling of observers * refactor: Remove redundant PubSubTopic from Observer --- packages/relay/src/index.ts | 175 ++++++------------------------------ 1 file changed, 28 insertions(+), 147 deletions(-) diff --git a/packages/relay/src/index.ts b/packages/relay/src/index.ts index 3b54e74d8f..67c1e536b1 100644 --- a/packages/relay/src/index.ts +++ b/packages/relay/src/index.ts @@ -25,7 +25,7 @@ import { SendError, SendResult } from "@waku/interfaces"; -import { groupByContentTopic, isSizeValid, toAsyncIterator } from "@waku/utils"; +import { isSizeValid, toAsyncIterator } from "@waku/utils"; import debug from "debug"; import { RelayCodecs } from "./constants.js"; @@ -35,7 +35,6 @@ import { TopicOnlyDecoder } from "./topic_only_message.js"; const log = debug("waku:relay"); export type Observer = { - pubsubTopic: PubSubTopic; decoder: IDecoder; callback: Callback; }; @@ -136,95 +135,41 @@ class Relay implements IRelay { decoders: IDecoder | IDecoder[], callback: Callback ): () => void { - const pubSubTopicToContentObservers = this.createObservers( - decoders, - callback - ); - - this.addObservers(pubSubTopicToContentObservers); + const observers: Array<[PubSubTopic, Observer]> = []; + + for (const decoder of Array.isArray(decoders) ? decoders : [decoders]) { + const pubSubTopic = decoder.pubSubTopic; + const ctObs: Map>> = this.observers.get( + pubSubTopic + ) ?? new Map(); + const _obs = ctObs.get(decoder.contentTopic) ?? new Set(); + const observer = { pubSubTopic, decoder, callback }; + _obs.add(observer); + ctObs.set(decoder.contentTopic, _obs); + this.observers.set(pubSubTopic, ctObs); + + observers.push([pubSubTopic, observer]); + } return () => { - this.removeObservers(pubSubTopicToContentObservers); + this.removeObservers(observers); }; } - private createObservers( - decoders: IDecoder | IDecoder[], - callback: Callback - ): Map>>> { - return Array.isArray(decoders) - ? toObservers(Array.from(this.pubSubTopics), decoders, callback) - : toObservers(Array.from(this.pubSubTopics), [decoders], callback); - } - - private addObservers( - pubSubTopicToContentObservers: Map< - PubSubTopic, - Map>> - > - ): void { - for (const [ - pubSubTopic, - contentTopicToObservers - ] of pubSubTopicToContentObservers.entries()) { - const existingContentObservers = - this.observers.get(pubSubTopic) || - new Map>>(); - - for (const [ - contentTopic, - newObservers - ] of contentTopicToObservers.entries()) { - const currObservers = - existingContentObservers.get(contentTopic) ?? new Set>(); - existingContentObservers.set( - contentTopic, - union(currObservers, newObservers) - ); - } - - this.observers.set(pubSubTopic, existingContentObservers); - } - } - private removeObservers( - pubSubTopicToContentObservers: Map< - PubSubTopic, - Map>> - > + observers: Array<[PubSubTopic, Observer]> ): void { - for (const [ - pubSubTopic, - contentTopicToObservers - ] of pubSubTopicToContentObservers.entries()) { - const existingContentObservers = this.observers.get(pubSubTopic); - - if (existingContentObservers) { - for (const [ - contentTopic, - observersToRemove - ] of contentTopicToObservers.entries()) { - const currentObservers = - existingContentObservers.get(contentTopic) || - new Set>(); - const nextObservers = leftMinusJoin( - currentObservers, - observersToRemove - ); - - if (nextObservers.size) { - existingContentObservers.set(contentTopic, nextObservers); - } else { - existingContentObservers.delete(contentTopic); - } - } + for (const [pubSubTopic, observer] of observers) { + const ctObs = this.observers.get(pubSubTopic); + if (!ctObs) continue; - if (existingContentObservers.size) { - this.observers.set(pubSubTopic, existingContentObservers); - } else { - this.observers.delete(pubSubTopic); - } - } + const contentTopic = observer.decoder.contentTopic; + const _obs = ctObs.get(contentTopic); + if (!_obs) continue; + + _obs.delete(observer); + ctObs.set(contentTopic, _obs); + this.observers.set(pubSubTopic, ctObs); } } @@ -363,67 +308,3 @@ export function wakuGossipSub( return pubsub; }; } - -function toObservers( - allPubSubTopics: PubSubTopic[], - decoders: IDecoder[], - callback: Callback -): Map>>> { - for (const decoder of decoders) { - if (!allPubSubTopics.includes(decoder.pubSubTopic)) { - throw new Error( - `PubSub topic ${decoder.pubSubTopic} is not supported by this protocol. Configured topics are: ${allPubSubTopics}. Please update your configuration by passing in the topic during Waku node instantiation.` - ); - } - } - - const finalMap = new Map>>>(); - - for (const pubSubTopic of allPubSubTopics) { - // Filter decoders that match the current pubSubTopic - const filteredDecoders = decoders.filter( - (decoder) => decoder.pubSubTopic === pubSubTopic - ); - - // Group these decoders by ContentTopic - const contentTopicToDecoders = Array.from( - groupByContentTopic(filteredDecoders).entries() - ); - - const contentTopicToObserversMap = new Map< - ContentTopic, - Set> - >(); - - for (const [contentTopic, topicDecoders] of contentTopicToDecoders) { - const observersSet = new Set>( - topicDecoders.map((decoder) => ({ - decoder, - callback - })) as Observer[] - ); - contentTopicToObserversMap.set(contentTopic, observersSet); - } - - // Set the result in the final map - finalMap.set(pubSubTopic, contentTopicToObserversMap); - } - - return finalMap; -} - -function union(left: Set, right: Set): Set { - for (const val of right.values()) { - left.add(val); - } - return left; -} - -function leftMinusJoin(left: Set, right: Set): Set { - for (const val of right.values()) { - if (left.has(val)) { - left.delete(val); - } - } - return left; -} From 8c01b7478ffe27fa72f31cb67c8175049904f1ac Mon Sep 17 00:00:00 2001 From: danisharora099 Date: Tue, 26 Sep 2023 21:04:22 +0530 Subject: [PATCH 22/29] use `??` instead of `||` --- packages/relay/src/index.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/packages/relay/src/index.ts b/packages/relay/src/index.ts index 67c1e536b1..3cc56f324b 100644 --- a/packages/relay/src/index.ts +++ b/packages/relay/src/index.ts @@ -282,7 +282,7 @@ class Relay implements IRelay { } private isRelayPubSub(pubsub: PubSub | undefined): boolean { - return pubsub?.multicodecs?.includes(Relay.multicodec) || false; + return pubsub?.multicodecs?.includes(Relay.multicodec) ?? false; } } From 08b05ea4c37455b9368c7960f393e04b833e218c Mon Sep 17 00:00:00 2001 From: danisharora099 Date: Tue, 26 Sep 2023 21:16:13 +0530 Subject: [PATCH 23/29] update `pubsubTopic` to `pubSubTopic` --- packages/core/src/lib/filter/index.ts | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/packages/core/src/lib/filter/index.ts b/packages/core/src/lib/filter/index.ts index e2fcd165d2..d0b6d68c58 100644 --- a/packages/core/src/lib/filter/index.ts +++ b/packages/core/src/lib/filter/index.ts @@ -234,7 +234,7 @@ class Subscription { } class Filter extends BaseProtocol implements IReceiver { - private readonly pubsubTopics: PubSubTopic[] = []; + private readonly pubSubTopics: PubSubTopic[] = []; private activeSubscriptions = new Map(); private readonly NUM_PEERS_PROTOCOL = 1; @@ -257,7 +257,7 @@ class Filter extends BaseProtocol implements IReceiver { constructor(libp2p: Libp2p, options?: ProtocolCreateOptions) { super(FilterCodecs.SUBSCRIBE, libp2p.components); - this.pubsubTopics = options?.pubSubTopics || [DefaultPubSubTopic]; + this.pubSubTopics = options?.pubSubTopics || [DefaultPubSubTopic]; libp2p.handle(FilterCodecs.PUSH, this.onRequest.bind(this)).catch((e) => { log("Failed to register ", FilterCodecs.PUSH, e); @@ -269,7 +269,7 @@ class Filter extends BaseProtocol implements IReceiver { async createSubscription( pubSubTopic: string = DefaultPubSubTopic ): Promise { - ensurePubsubTopicIsConfigured(pubSubTopic, this.pubsubTopics); + ensurePubsubTopicIsConfigured(pubSubTopic, this.pubSubTopics); //TODO: get a relevant peer for the topic/shard // https://github.com/waku-org/js-waku/pull/1586#discussion_r1336428230 From d49a298823d32e566b26bf0a1f5bfd12a609ab53 Mon Sep 17 00:00:00 2001 From: danisharora099 Date: Tue, 26 Sep 2023 21:17:23 +0530 Subject: [PATCH 24/29] update `interval` typo --- packages/core/src/lib/keep_alive_manager.ts | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/packages/core/src/lib/keep_alive_manager.ts b/packages/core/src/lib/keep_alive_manager.ts index 5256a6e83f..c7aca76b00 100644 --- a/packages/core/src/lib/keep_alive_manager.ts +++ b/packages/core/src/lib/keep_alive_manager.ts @@ -38,7 +38,7 @@ export class KeepAliveManager { const peerIdStr = peerId.toString(); if (pingPeriodSecs !== 0) { - const intervals = setInterval(() => { + const interval = setInterval(() => { void (async () => { try { // ping the peer for keep alive @@ -61,17 +61,17 @@ export class KeepAliveManager { })(); }, pingPeriodSecs * 1000); - this.pingKeepAliveTimers.set(peerIdStr, intervals); + this.pingKeepAliveTimers.set(peerIdStr, interval); } const relay = this.relay; if (relay && relayPeriodSecs !== 0) { - const interval = this.scheduleRelayPings( + const intervals = this.scheduleRelayPings( relay, relayPeriodSecs, peerId.toString() ); - this.relayKeepAliveTimers.set(peerId, interval); + this.relayKeepAliveTimers.set(peerId, intervals); } } From 11e8094640a8ab24d9263bcdcfa417855155ea76 Mon Sep 17 00:00:00 2001 From: danisharora099 Date: Tue, 26 Sep 2023 21:31:49 +0530 Subject: [PATCH 25/29] change occurence of `pubsubTopic` to `pubSubTopic` --- packages/core/src/lib/light_push/index.ts | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/packages/core/src/lib/light_push/index.ts b/packages/core/src/lib/light_push/index.ts index 8739d2ae54..547d1372ff 100644 --- a/packages/core/src/lib/light_push/index.ts +++ b/packages/core/src/lib/light_push/index.ts @@ -42,12 +42,12 @@ type PreparePushMessageResult = * Implements the [Waku v2 Light Push protocol](https://rfc.vac.dev/spec/19/). */ class LightPush extends BaseProtocol implements ILightPush { - private readonly pubsubTopics: PubSubTopic[]; + private readonly pubSubTopics: PubSubTopic[]; private readonly NUM_PEERS_PROTOCOL = 1; constructor(libp2p: Libp2p, options?: ProtocolCreateOptions) { super(LightPushCodec, libp2p.components); - this.pubsubTopics = options?.pubSubTopics ?? [DefaultPubSubTopic]; + this.pubSubTopics = options?.pubSubTopics ?? [DefaultPubSubTopic]; } private async preparePushMessage( @@ -84,7 +84,7 @@ class LightPush extends BaseProtocol implements ILightPush { async send(encoder: IEncoder, message: IMessage): Promise { const { pubSubTopic } = encoder; - ensurePubsubTopicIsConfigured(pubSubTopic, this.pubsubTopics); + ensurePubsubTopicIsConfigured(pubSubTopic, this.pubSubTopics); const recipients: PeerId[] = []; From 1977bd7de639ff1af6e3dcfa3e5a6ace0c572df4 Mon Sep 17 00:00:00 2001 From: danisharora099 Date: Wed, 27 Sep 2023 11:50:56 +0530 Subject: [PATCH 26/29] relay: rm `getAllMeshPeers` and make `pubSubTopics` public --- packages/core/src/lib/keep_alive_manager.ts | 17 +++++------------ packages/interfaces/src/relay.ts | 3 ++- packages/relay/src/index.ts | 14 +------------- .../tests/wait_for_remote_peer.node.spec.ts | 4 ++-- 4 files changed, 10 insertions(+), 28 deletions(-) diff --git a/packages/core/src/lib/keep_alive_manager.ts b/packages/core/src/lib/keep_alive_manager.ts index c7aca76b00..26f7c296d9 100644 --- a/packages/core/src/lib/keep_alive_manager.ts +++ b/packages/core/src/lib/keep_alive_manager.ts @@ -1,6 +1,6 @@ import type { PeerId } from "@libp2p/interface/peer-id"; import type { PeerStore } from "@libp2p/interface/peer-store"; -import type { IRelay, PeerIdStr, PubSubTopic } from "@waku/interfaces"; +import type { IRelay, PeerIdStr } from "@waku/interfaces"; import type { KeepAliveOptions } from "@waku/interfaces"; import { utf8ToBytes } from "@waku/utils/bytes"; import debug from "debug"; @@ -106,19 +106,12 @@ export class KeepAliveManager { relayPeriodSecs: number, peerIdStr: PeerIdStr ): NodeJS.Timeout[] { - const peersMap = relay.getAllMeshPeers(); - - // find the PubSubTopics the peer is part of - const pubSubTopics: PubSubTopic[] = []; - peersMap.forEach((peers, topic) => { - if (peers.includes(peerIdStr)) { - pubSubTopics.push(topic); - } - }); - // send a ping message to each PubSubTopic the peer is part of const intervals: NodeJS.Timeout[] = []; - for (const topic of pubSubTopics) { + for (const topic of relay.pubSubTopics) { + const meshPeers = relay.getMeshPeers(topic); + if (!meshPeers.includes(peerIdStr)) continue; + const encoder = createEncoder({ pubSubTopic: topic, contentTopic: RelayPingContentTopic, diff --git a/packages/interfaces/src/relay.ts b/packages/interfaces/src/relay.ts index ca6bb513ea..f98f219ba4 100644 --- a/packages/interfaces/src/relay.ts +++ b/packages/interfaces/src/relay.ts @@ -1,6 +1,7 @@ import type { GossipSub } from "@chainsafe/libp2p-gossipsub"; import type { PeerIdStr, TopicStr } from "@chainsafe/libp2p-gossipsub/types"; +import { PubSubTopic } from "./misc.js"; import { IReceiver } from "./receiver.js"; import type { ISender } from "./sender.js"; @@ -12,10 +13,10 @@ import type { ISender } from "./sender.js"; * @property getMeshPeers - Function to retrieve the mesh peers for a given topic or all topics if none is specified. Returns an array of peer IDs as strings. */ export interface IRelayAPI { + readonly pubSubTopics: Set; readonly gossipSub: GossipSub; start: () => Promise; getMeshPeers: (topic?: TopicStr) => PeerIdStr[]; - getAllMeshPeers: () => Map; } export type IRelay = IRelayAPI & ISender & IReceiver; diff --git a/packages/relay/src/index.ts b/packages/relay/src/index.ts index 3cc56f324b..8a7ad930f4 100644 --- a/packages/relay/src/index.ts +++ b/packages/relay/src/index.ts @@ -47,7 +47,7 @@ export type ContentTopic = string; * Throws if libp2p.pubsub does not support Waku Relay */ class Relay implements IRelay { - readonly pubSubTopics: Set; + public readonly pubSubTopics: Set; private defaultDecoder: IDecoder; public static multicodec: string = RelayCodecs[0]; @@ -191,18 +191,6 @@ class Relay implements IRelay { return this.gossipSub.getMeshPeers(topic); } - /** - * Returns mesh peers for all topics. - * @returns Map of topic to peer ids - */ - public getAllMeshPeers(): Map { - const map = new Map(); - for (const topic of this.pubSubTopics) { - map.set(topic, Array.from(this.gossipSub.mesh.get(topic) ?? [])); - } - return map; - } - private subscribeToAllTopics(): void { for (const pubSubTopic of this.pubSubTopics) { this.gossipSubSubscribe(pubSubTopic); diff --git a/packages/tests/tests/wait_for_remote_peer.node.spec.ts b/packages/tests/tests/wait_for_remote_peer.node.spec.ts index d118ce1aff..d190c56ead 100644 --- a/packages/tests/tests/wait_for_remote_peer.node.spec.ts +++ b/packages/tests/tests/wait_for_remote_peer.node.spec.ts @@ -1,4 +1,4 @@ -import { waitForRemotePeer } from "@waku/core"; +import { DefaultPubSubTopic, waitForRemotePeer } from "@waku/core"; import type { LightNode, RelayNode } from "@waku/interfaces"; import { Protocols } from "@waku/interfaces"; import { createLightNode, createRelayNode } from "@waku/sdk"; @@ -262,7 +262,7 @@ describe("Wait for remote peer", function () { await waku1.dial(multiAddrWithId); await waitForRemotePeer(waku1); - const peers = Array.from(waku1.relay.getMeshPeers().values()).flat(); + const peers = waku1.relay.getMeshPeers(DefaultPubSubTopic); const nimPeerId = multiAddrWithId.getPeerId(); From 3fd68e539d423bd818a389c16777184e1bbdde18 Mon Sep 17 00:00:00 2001 From: danisharora099 Date: Wed, 27 Sep 2023 12:02:49 +0530 Subject: [PATCH 27/29] relay: use `push_or_init_map` and move to `utils` --- package-lock.json | 7 ++++++- packages/relay/src/index.ts | 9 ++++----- packages/utils/package.json | 3 ++- packages/utils/src/common/index.ts | 1 + .../lib => utils/src/common}/push_or_init_map.spec.ts | 0 .../src/lib => utils/src/common}/push_or_init_map.ts | 0 6 files changed, 13 insertions(+), 7 deletions(-) rename packages/{core/src/lib => utils/src/common}/push_or_init_map.spec.ts (100%) rename packages/{core/src/lib => utils/src/common}/push_or_init_map.ts (100%) diff --git a/package-lock.json b/package-lock.json index 1d357585df..f12d4f53dd 100644 --- a/package-lock.json +++ b/package-lock.json @@ -6506,7 +6506,8 @@ }, "node_modules/chai": { "version": "4.3.8", - "license": "MIT", + "resolved": "https://registry.npmjs.org/chai/-/chai-4.3.8.tgz", + "integrity": "sha512-vX4YvVVtxlfSZ2VecZgFUTU5qPCYsobVI2O9FmwEXBhDigYGQA6jRXCycIs1yJnnWbZ6/+a2zNIF5DfVCcJBFQ==", "dependencies": { "assertion-error": "^1.1.0", "check-error": "^1.0.2", @@ -26180,6 +26181,7 @@ "version": "0.0.11", "license": "MIT OR Apache-2.0", "dependencies": { + "chai": "^4.3.8", "debug": "^4.3.4", "uint8arrays": "^4.0.4" }, @@ -29393,6 +29395,7 @@ "@rollup/plugin-node-resolve": "^15.1.0", "@waku/build-utils": "*", "@waku/interfaces": "0.0.18", + "chai": "^4.3.8", "cspell": "^7.3.2", "debug": "^4.3.4", "npm-run-all": "^4.1.5", @@ -30506,6 +30509,8 @@ }, "chai": { "version": "4.3.8", + "resolved": "https://registry.npmjs.org/chai/-/chai-4.3.8.tgz", + "integrity": "sha512-vX4YvVVtxlfSZ2VecZgFUTU5qPCYsobVI2O9FmwEXBhDigYGQA6jRXCycIs1yJnnWbZ6/+a2zNIF5DfVCcJBFQ==", "requires": { "assertion-error": "^1.1.0", "check-error": "^1.0.2", diff --git a/packages/relay/src/index.ts b/packages/relay/src/index.ts index 8a7ad930f4..bf258c0f6f 100644 --- a/packages/relay/src/index.ts +++ b/packages/relay/src/index.ts @@ -26,6 +26,7 @@ import { SendResult } from "@waku/interfaces"; import { isSizeValid, toAsyncIterator } from "@waku/utils"; +import { pushOrInitMapSet } from "@waku/utils"; import debug from "debug"; import { RelayCodecs } from "./constants.js"; @@ -138,16 +139,14 @@ class Relay implements IRelay { const observers: Array<[PubSubTopic, Observer]> = []; for (const decoder of Array.isArray(decoders) ? decoders : [decoders]) { - const pubSubTopic = decoder.pubSubTopic; + const { pubSubTopic } = decoder; const ctObs: Map>> = this.observers.get( pubSubTopic ) ?? new Map(); - const _obs = ctObs.get(decoder.contentTopic) ?? new Set(); const observer = { pubSubTopic, decoder, callback }; - _obs.add(observer); - ctObs.set(decoder.contentTopic, _obs); - this.observers.set(pubSubTopic, ctObs); + pushOrInitMapSet(ctObs, decoder.contentTopic, observer); + this.observers.set(pubSubTopic, ctObs); observers.push([pubSubTopic, observer]); } diff --git a/packages/utils/package.json b/packages/utils/package.json index f369680280..d4b3f25a42 100644 --- a/packages/utils/package.json +++ b/packages/utils/package.json @@ -65,15 +65,16 @@ "node": ">=18" }, "dependencies": { + "chai": "^4.3.8", "debug": "^4.3.4", "uint8arrays": "^4.0.4" }, "devDependencies": { - "@waku/interfaces": "0.0.18", "@rollup/plugin-commonjs": "^25.0.4", "@rollup/plugin-json": "^6.0.0", "@rollup/plugin-node-resolve": "^15.1.0", "@waku/build-utils": "*", + "@waku/interfaces": "0.0.18", "cspell": "^7.3.2", "npm-run-all": "^4.1.5", "rollup": "^3.29.2" diff --git a/packages/utils/src/common/index.ts b/packages/utils/src/common/index.ts index 5d080b2319..f834bc22c4 100644 --- a/packages/utils/src/common/index.ts +++ b/packages/utils/src/common/index.ts @@ -4,6 +4,7 @@ export * from "./group_by.js"; export * from "./to_async_iterator.js"; export * from "./is_size_valid.js"; export * from "./sharding.js"; +export * from "./push_or_init_map.js"; export function removeItemFromArray(arr: unknown[], value: unknown): unknown[] { const index = arr.indexOf(value); diff --git a/packages/core/src/lib/push_or_init_map.spec.ts b/packages/utils/src/common/push_or_init_map.spec.ts similarity index 100% rename from packages/core/src/lib/push_or_init_map.spec.ts rename to packages/utils/src/common/push_or_init_map.spec.ts diff --git a/packages/core/src/lib/push_or_init_map.ts b/packages/utils/src/common/push_or_init_map.ts similarity index 100% rename from packages/core/src/lib/push_or_init_map.ts rename to packages/utils/src/common/push_or_init_map.ts From 03ae72aee35434233708d5eef99196aba1645f61 Mon Sep 17 00:00:00 2001 From: danisharora099 Date: Wed, 27 Sep 2023 12:14:37 +0530 Subject: [PATCH 28/29] fix: update API for tests --- packages/tests/tests/wait_for_remote_peer.node.spec.ts | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/packages/tests/tests/wait_for_remote_peer.node.spec.ts b/packages/tests/tests/wait_for_remote_peer.node.spec.ts index d190c56ead..fad988d7a0 100644 --- a/packages/tests/tests/wait_for_remote_peer.node.spec.ts +++ b/packages/tests/tests/wait_for_remote_peer.node.spec.ts @@ -39,7 +39,7 @@ describe("Wait for remote peer", function () { await waku1.dial(multiAddrWithId); await delay(1000); await waitForRemotePeer(waku1, [Protocols.Relay]); - const peers = Array.from(waku1.relay.getMeshPeers().values()).flat(); + const peers = waku1.relay.getMeshPeers(DefaultPubSubTopic); const nimPeerId = multiAddrWithId.getPeerId(); expect(nimPeerId).to.not.be.undefined; @@ -67,7 +67,7 @@ describe("Wait for remote peer", function () { await waku1.dial(multiAddrWithId); await waitPromise; - const peers = Array.from(waku1.relay.getMeshPeers().values()).flat(); + const peers = waku1.relay.getMeshPeers(DefaultPubSubTopic); const nimPeerId = multiAddrWithId.getPeerId(); expect(nimPeerId).to.not.be.undefined; From 6f0cbb7cae60bb01451de0dbb844880366fc57e8 Mon Sep 17 00:00:00 2001 From: danisharora099 Date: Wed, 27 Sep 2023 12:43:57 +0530 Subject: [PATCH 29/29] fix: relay waitForRemotePeer --- packages/core/src/lib/wait_for_remote_peer.ts | 17 ++++++----------- .../tests/wait_for_remote_peer.node.spec.ts | 2 +- 2 files changed, 7 insertions(+), 12 deletions(-) diff --git a/packages/core/src/lib/wait_for_remote_peer.ts b/packages/core/src/lib/wait_for_remote_peer.ts index fb414aa88f..84901e801f 100644 --- a/packages/core/src/lib/wait_for_remote_peer.ts +++ b/packages/core/src/lib/wait_for_remote_peer.ts @@ -100,18 +100,13 @@ async function waitForConnectedPeer(protocol: IBaseProtocol): Promise { * mesh for all pubSubTopics. */ async function waitForGossipSubPeerInMesh(waku: IRelay): Promise { - let topicHasNoPeer = true; + let peers = waku.getMeshPeers(); + const pubSubTopics = waku.pubSubTopics; - while (topicHasNoPeer) { - await pEvent(waku.gossipSub, "gossipsub:heartbeat"); - const meshPeersMap = waku.getMeshPeers(); - - topicHasNoPeer = false; // Assume no topic is without peers initially - for (const peers of meshPeersMap.values()) { - if (peers.length === 0) { - topicHasNoPeer = true; // If any topic doesn't have peers, set to true - break; - } + for (const topic of pubSubTopics) { + while (peers.length == 0) { + await pEvent(waku.gossipSub, "gossipsub:heartbeat"); + peers = waku.getMeshPeers(topic); } } } diff --git a/packages/tests/tests/wait_for_remote_peer.node.spec.ts b/packages/tests/tests/wait_for_remote_peer.node.spec.ts index fad988d7a0..3034fdbcb1 100644 --- a/packages/tests/tests/wait_for_remote_peer.node.spec.ts +++ b/packages/tests/tests/wait_for_remote_peer.node.spec.ts @@ -67,7 +67,7 @@ describe("Wait for remote peer", function () { await waku1.dial(multiAddrWithId); await waitPromise; - const peers = waku1.relay.getMeshPeers(DefaultPubSubTopic); + const peers = waku1.relay.getMeshPeers(); const nimPeerId = multiAddrWithId.getPeerId(); expect(nimPeerId).to.not.be.undefined;