Skip to content

Commit 4cf2ffe

Browse files
feat!: add support for sharded pubsub topics & remove support for named pubsub topics (#1697)
* merge branches * tests: use a generator for sharded pubsub topics * fix namespace edge case * move shardInfo to pubsubTopic logic in waku.ts * simplify encoder/decoder creation logic + update tests * sharding utils: add error handling * remove redundant test util * baseprotocol: create abstraction for initialising pubsub topics * fix: `createDecoder` interface * filter: createSubscription takes shardInfo instead of pubsubTopicStr * fix: sharding utils for error handling * SingleShardInfo: use a new interface instead of omitting and rename namespace * change redundant namespaces
1 parent 7eb3375 commit 4cf2ffe

33 files changed

+538
-312
lines changed

packages/core/src/lib/base_protocol.ts

+14-1
Original file line numberDiff line numberDiff line change
@@ -2,9 +2,16 @@ import type { Libp2p } from "@libp2p/interface";
22
import type { Stream } from "@libp2p/interface/connection";
33
import type { PeerId } from "@libp2p/interface/peer-id";
44
import { Peer, PeerStore } from "@libp2p/interface/peer-store";
5-
import type { IBaseProtocol, Libp2pComponents } from "@waku/interfaces";
5+
import type {
6+
IBaseProtocol,
7+
Libp2pComponents,
8+
PubsubTopic,
9+
ShardInfo
10+
} from "@waku/interfaces";
11+
import { shardInfoToPubsubTopics } from "@waku/utils";
612
import { getPeersForProtocol, selectPeerForProtocol } from "@waku/utils/libp2p";
713

14+
import { DefaultPubsubTopic } from "./constants.js";
815
import { filterPeers } from "./filterPeers.js";
916
import { StreamManager } from "./stream_manager.js";
1017

@@ -89,4 +96,10 @@ export class BaseProtocol implements IBaseProtocol {
8996
// Filter the peers based on the specified criteria
9097
return filterPeers(allPeersForProtocol, numPeers, maxBootstrapPeers);
9198
}
99+
100+
initializePubsubTopic(shardInfo?: ShardInfo): PubsubTopic[] {
101+
return shardInfo
102+
? shardInfoToPubsubTopics(shardInfo)
103+
: [DefaultPubsubTopic];
104+
}
92105
}

packages/core/src/lib/filter/index.ts

+8-2
Original file line numberDiff line numberDiff line change
@@ -14,12 +14,14 @@ import type {
1414
PeerIdStr,
1515
ProtocolCreateOptions,
1616
PubsubTopic,
17+
SingleShardInfo,
1718
Unsubscribe
1819
} from "@waku/interfaces";
1920
import { WakuMessage } from "@waku/proto";
2021
import {
2122
ensurePubsubTopicIsConfigured,
2223
groupByContentTopic,
24+
singleShardInfoToPubsubTopic,
2325
toAsyncIterator
2426
} from "@waku/utils";
2527
import { Logger } from "@waku/utils";
@@ -279,7 +281,7 @@ class Filter extends BaseProtocol implements IReceiver {
279281
constructor(libp2p: Libp2p, options?: ProtocolCreateOptions) {
280282
super(FilterCodecs.SUBSCRIBE, libp2p.components);
281283

282-
this.pubsubTopics = options?.pubsubTopics || [DefaultPubsubTopic];
284+
this.pubsubTopics = this.initializePubsubTopic(options?.shardInfo);
283285

284286
libp2p.handle(FilterCodecs.PUSH, this.onRequest.bind(this)).catch((e) => {
285287
log.error("Failed to register ", FilterCodecs.PUSH, e);
@@ -289,8 +291,12 @@ class Filter extends BaseProtocol implements IReceiver {
289291
}
290292

291293
async createSubscription(
292-
pubsubTopic: string = DefaultPubsubTopic
294+
pubsubTopicShardInfo?: SingleShardInfo
293295
): Promise<Subscription> {
296+
const pubsubTopic = pubsubTopicShardInfo
297+
? singleShardInfoToPubsubTopic(pubsubTopicShardInfo)
298+
: DefaultPubsubTopic;
299+
294300
ensurePubsubTopicIsConfigured(pubsubTopic, this.pubsubTopics);
295301

296302
//TODO: get a relevant peer for the topic/shard

packages/core/src/lib/keep_alive_manager.ts

+2-2
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@ import type { PeerId } from "@libp2p/interface/peer-id";
22
import type { PeerStore } from "@libp2p/interface/peer-store";
33
import type { IRelay, PeerIdStr } from "@waku/interfaces";
44
import type { KeepAliveOptions } from "@waku/interfaces";
5-
import { Logger } from "@waku/utils";
5+
import { Logger, pubsubTopicToSingleShardInfo } from "@waku/utils";
66
import { utf8ToBytes } from "@waku/utils/bytes";
77
import type { PingService } from "libp2p/ping";
88

@@ -129,7 +129,7 @@ export class KeepAliveManager {
129129
if (!meshPeers.includes(peerIdStr)) continue;
130130

131131
const encoder = createEncoder({
132-
pubsubTopic: topic,
132+
pubsubTopicShardInfo: pubsubTopicToSingleShardInfo(topic),
133133
contentTopic: RelayPingContentTopic,
134134
ephemeral: true
135135
});

packages/core/src/lib/light_push/index.ts

+1-2
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,6 @@ import { pipe } from "it-pipe";
2222
import { Uint8ArrayList } from "uint8arraylist";
2323

2424
import { BaseProtocol } from "../base_protocol.js";
25-
import { DefaultPubsubTopic } from "../constants.js";
2625

2726
import { PushRpc } from "./push_rpc.js";
2827

@@ -50,7 +49,7 @@ class LightPush extends BaseProtocol implements ILightPush {
5049

5150
constructor(libp2p: Libp2p, options?: ProtocolCreateOptions) {
5251
super(LightPushCodec, libp2p.components);
53-
this.pubsubTopics = options?.pubsubTopics ?? [DefaultPubsubTopic];
52+
this.pubsubTopics = this.initializePubsubTopic(options?.shardInfo);
5453
}
5554

5655
private async preparePushMessage(

packages/core/src/lib/message/version_0.ts

+19-6
Original file line numberDiff line numberDiff line change
@@ -7,10 +7,11 @@ import type {
77
IMetaSetter,
88
IProtoMessage,
99
IRateLimitProof,
10-
PubsubTopic
10+
PubsubTopic,
11+
SingleShardInfo
1112
} from "@waku/interfaces";
1213
import { proto_message as proto } from "@waku/proto";
13-
import { Logger } from "@waku/utils";
14+
import { Logger, singleShardInfoToPubsubTopic } from "@waku/utils";
1415

1516
import { DefaultPubsubTopic } from "../constants.js";
1617

@@ -119,12 +120,19 @@ export class Encoder implements IEncoder {
119120
* messages.
120121
*/
121122
export function createEncoder({
122-
pubsubTopic = DefaultPubsubTopic,
123+
pubsubTopicShardInfo,
123124
contentTopic,
124125
ephemeral,
125126
metaSetter
126127
}: EncoderOptions): Encoder {
127-
return new Encoder(contentTopic, ephemeral, pubsubTopic, metaSetter);
128+
return new Encoder(
129+
contentTopic,
130+
ephemeral,
131+
pubsubTopicShardInfo?.index
132+
? singleShardInfoToPubsubTopic(pubsubTopicShardInfo)
133+
: DefaultPubsubTopic,
134+
metaSetter
135+
);
128136
}
129137

130138
export class Decoder implements IDecoder<DecodedMessage> {
@@ -182,7 +190,12 @@ export class Decoder implements IDecoder<DecodedMessage> {
182190
*/
183191
export function createDecoder(
184192
contentTopic: string,
185-
pubsubTopic: PubsubTopic = DefaultPubsubTopic
193+
pubsubTopicShardInfo?: SingleShardInfo
186194
): Decoder {
187-
return new Decoder(pubsubTopic, contentTopic);
195+
return new Decoder(
196+
pubsubTopicShardInfo?.index
197+
? singleShardInfoToPubsubTopic(pubsubTopicShardInfo)
198+
: DefaultPubsubTopic,
199+
contentTopic
200+
);
188201
}

packages/core/src/lib/store/index.ts

+1-2
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,6 @@ import { pipe } from "it-pipe";
1919
import { Uint8ArrayList } from "uint8arraylist";
2020

2121
import { BaseProtocol } from "../base_protocol.js";
22-
import { DefaultPubsubTopic } from "../constants.js";
2322
import { toProtoMessage } from "../to_proto_message.js";
2423

2524
import { HistoryRpc, PageDirection, Params } from "./history_rpc.js";
@@ -80,7 +79,7 @@ class Store extends BaseProtocol implements IStore {
8079

8180
constructor(libp2p: Libp2p, options?: ProtocolCreateOptions) {
8281
super(StoreCodec, libp2p.components);
83-
this.pubsubTopics = options?.pubsubTopics ?? [DefaultPubsubTopic];
82+
this.pubsubTopics = this.initializePubsubTopic(options?.shardInfo);
8483
}
8584

8685
/**

packages/core/src/lib/waku.ts

+12-3
Original file line numberDiff line numberDiff line change
@@ -8,12 +8,14 @@ import type {
88
IStore,
99
Libp2p,
1010
PubsubTopic,
11+
ShardInfo,
1112
Waku
1213
} from "@waku/interfaces";
1314
import { Protocols } from "@waku/interfaces";
14-
import { Logger } from "@waku/utils";
15+
import { Logger, shardInfoToPubsubTopics } from "@waku/utils";
1516

1617
import { ConnectionManager } from "./connection_manager.js";
18+
import { DefaultPubsubTopic } from "./constants.js";
1719

1820
export const DefaultPingKeepAliveValueSecs = 5 * 60;
1921
export const DefaultRelayKeepAliveValueSecs = 5 * 60;
@@ -50,16 +52,23 @@ export class WakuNode implements Waku {
5052
public filter?: IFilter;
5153
public lightPush?: ILightPush;
5254
public connectionManager: ConnectionManager;
55+
public readonly pubsubTopics: PubsubTopic[];
5356

5457
constructor(
5558
options: WakuOptions,
56-
public readonly pubsubTopics: PubsubTopic[],
5759
libp2p: Libp2p,
60+
pubsubShardInfo?: ShardInfo,
5861
store?: (libp2p: Libp2p) => IStore,
5962
lightPush?: (libp2p: Libp2p) => ILightPush,
6063
filter?: (libp2p: Libp2p) => IFilter,
6164
relay?: (libp2p: Libp2p) => IRelay
6265
) {
66+
if (!pubsubShardInfo) {
67+
this.pubsubTopics = [DefaultPubsubTopic];
68+
} else {
69+
this.pubsubTopics = shardInfoToPubsubTopics(pubsubShardInfo);
70+
}
71+
6372
this.libp2p = libp2p;
6473

6574
if (store) {
@@ -88,7 +97,7 @@ export class WakuNode implements Waku {
8897
peerId,
8998
libp2p,
9099
{ pingKeepAlive, relayKeepAlive },
91-
pubsubTopics,
100+
this.pubsubTopics,
92101
this.relay
93102
);
94103

packages/interfaces/src/filter.ts

+2-2
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
import type { PeerId } from "@libp2p/interface/peer-id";
22

3-
import type { IDecodedMessage, IDecoder } from "./message.js";
3+
import type { IDecodedMessage, IDecoder, SingleShardInfo } from "./message.js";
44
import type { ContentTopic } from "./misc.js";
55
import type { Callback, IBaseProtocol } from "./protocols.js";
66
import type { IReceiver } from "./receiver.js";
@@ -25,7 +25,7 @@ export interface IFilterSubscription {
2525
export type IFilter = IReceiver &
2626
IBaseProtocol & {
2727
createSubscription(
28-
pubsubTopic?: string,
28+
pubsubTopicShardInfo?: SingleShardInfo,
2929
peerId?: PeerId
3030
): Promise<IFilterSubscription>;
3131
};

packages/interfaces/src/message.ts

+6-1
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,10 @@
11
import type { PubsubTopic } from "./misc.js";
22

3+
export interface SingleShardInfo {
4+
cluster: number;
5+
index: number;
6+
}
7+
38
export interface IRateLimitProof {
49
proof: Uint8Array;
510
merkleRoot: Uint8Array;
@@ -38,7 +43,7 @@ export interface IMetaSetter {
3843
}
3944

4045
export interface EncoderOptions {
41-
pubsubTopic?: PubsubTopic;
46+
pubsubTopicShardInfo?: SingleShardInfo;
4247
/** The content topic to set on outgoing messages. */
4348
contentTopic: string;
4449
/**

packages/interfaces/src/protocols.ts

+5-5
Original file line numberDiff line numberDiff line change
@@ -2,9 +2,9 @@ import type { Libp2p } from "@libp2p/interface";
22
import type { PeerId } from "@libp2p/interface/peer-id";
33
import type { Peer, PeerStore } from "@libp2p/interface/peer-store";
44

5+
import type { ShardInfo } from "./enr.js";
56
import type { CreateLibp2pOptions } from "./libp2p.js";
67
import type { IDecodedMessage } from "./message.js";
7-
import type { PubsubTopic } from "./misc.js";
88

99
export enum Protocols {
1010
Relay = "relay",
@@ -23,9 +23,9 @@ export interface IBaseProtocol {
2323

2424
export type ProtocolCreateOptions = {
2525
/**
26-
* Waku supports usage of multiple pubsub topics, but this is still in early stages.
27-
* Waku implements sharding to achieve scalability
28-
* The format of the sharded topic is `/waku/2/rs/<shard_cluster_index>/<shard_number>`
26+
* Waku supports usage of multiple pubsub topics. This is achieved through static sharding for now, and auto-sharding in the future.
27+
* The format to specify a shard is:
28+
* clusterId: number, shards: number[]
2929
* To learn more about the sharding specifications implemented, see [Relay Sharding](https://rfc.vac.dev/spec/51/).
3030
* The Pubsub Topic to use. Defaults to {@link @waku/core!DefaultPubsubTopic }.
3131
*
@@ -39,7 +39,7 @@ export type ProtocolCreateOptions = {
3939
* See [Waku v2 Topic Usage Recommendations](https://rfc.vac.dev/spec/23/) for details.
4040
*
4141
*/
42-
pubsubTopics?: PubsubTopic[];
42+
shardInfo?: ShardInfo;
4343
/**
4444
* You can pass options to the `Libp2p` instance used by {@link @waku/core!WakuNode} using the `libp2p` property.
4545
* This property is the same type as the one passed to [`Libp2p.create`](https://github.com/libp2p/js-libp2p/blob/master/doc/API.md#create)

packages/message-encryption/src/ecies.ts

+15-6
Original file line numberDiff line numberDiff line change
@@ -7,10 +7,11 @@ import type {
77
IMessage,
88
IMetaSetter,
99
IProtoMessage,
10-
PubsubTopic
10+
PubsubTopic,
11+
SingleShardInfo
1112
} from "@waku/interfaces";
1213
import { WakuMessage } from "@waku/proto";
13-
import { Logger } from "@waku/utils";
14+
import { Logger, singleShardInfoToPubsubTopic } from "@waku/utils";
1415

1516
import { generatePrivateKey } from "./crypto/utils.js";
1617
import { DecodedMessage } from "./decoded_message.js";
@@ -98,15 +99,17 @@ export interface EncoderOptions extends BaseEncoderOptions {
9899
* in [26/WAKU2-PAYLOAD](https://rfc.vac.dev/spec/26/).
99100
*/
100101
export function createEncoder({
101-
pubsubTopic = DefaultPubsubTopic,
102+
pubsubTopicShardInfo,
102103
contentTopic,
103104
publicKey,
104105
sigPrivKey,
105106
ephemeral = false,
106107
metaSetter
107108
}: EncoderOptions): Encoder {
108109
return new Encoder(
109-
pubsubTopic,
110+
pubsubTopicShardInfo?.index
111+
? singleShardInfoToPubsubTopic(pubsubTopicShardInfo)
112+
: DefaultPubsubTopic,
110113
contentTopic,
111114
publicKey,
112115
sigPrivKey,
@@ -194,7 +197,13 @@ class Decoder extends DecoderV0 implements IDecoder<DecodedMessage> {
194197
export function createDecoder(
195198
contentTopic: string,
196199
privateKey: Uint8Array,
197-
pubsubTopic: PubsubTopic = DefaultPubsubTopic
200+
pubsubTopicShardInfo?: SingleShardInfo
198201
): Decoder {
199-
return new Decoder(pubsubTopic, contentTopic, privateKey);
202+
return new Decoder(
203+
pubsubTopicShardInfo?.index
204+
? singleShardInfoToPubsubTopic(pubsubTopicShardInfo)
205+
: DefaultPubsubTopic,
206+
contentTopic,
207+
privateKey
208+
);
200209
}

packages/message-encryption/src/symmetric.ts

+15-6
Original file line numberDiff line numberDiff line change
@@ -7,10 +7,11 @@ import type {
77
IMessage,
88
IMetaSetter,
99
IProtoMessage,
10-
PubsubTopic
10+
PubsubTopic,
11+
SingleShardInfo
1112
} from "@waku/interfaces";
1213
import { WakuMessage } from "@waku/proto";
13-
import { Logger } from "@waku/utils";
14+
import { Logger, singleShardInfoToPubsubTopic } from "@waku/utils";
1415

1516
import { generateSymmetricKey } from "./crypto/utils.js";
1617
import { DecodedMessage } from "./decoded_message.js";
@@ -98,15 +99,17 @@ export interface EncoderOptions extends BaseEncoderOptions {
9899
* in [26/WAKU2-PAYLOAD](https://rfc.vac.dev/spec/26/).
99100
*/
100101
export function createEncoder({
101-
pubsubTopic = DefaultPubsubTopic,
102+
pubsubTopicShardInfo,
102103
contentTopic,
103104
symKey,
104105
sigPrivKey,
105106
ephemeral = false,
106107
metaSetter
107108
}: EncoderOptions): Encoder {
108109
return new Encoder(
109-
pubsubTopic,
110+
pubsubTopicShardInfo?.index
111+
? singleShardInfoToPubsubTopic(pubsubTopicShardInfo)
112+
: DefaultPubsubTopic,
110113
contentTopic,
111114
symKey,
112115
sigPrivKey,
@@ -194,7 +197,13 @@ class Decoder extends DecoderV0 implements IDecoder<DecodedMessage> {
194197
export function createDecoder(
195198
contentTopic: string,
196199
symKey: Uint8Array,
197-
pubsubTopic: PubsubTopic = DefaultPubsubTopic
200+
pubsubTopicShardInfo?: SingleShardInfo
198201
): Decoder {
199-
return new Decoder(pubsubTopic, contentTopic, symKey);
202+
return new Decoder(
203+
pubsubTopicShardInfo?.index
204+
? singleShardInfoToPubsubTopic(pubsubTopicShardInfo)
205+
: DefaultPubsubTopic,
206+
contentTopic,
207+
symKey
208+
);
200209
}

0 commit comments

Comments
 (0)