Skip to content

Commit e138b4f

Browse files
committed
feat: decouple sharding params out of core
1 parent 601f78a commit e138b4f

20 files changed

+150
-134
lines changed

packages/core/src/lib/base_protocol.ts

+2-20
Original file line numberDiff line numberDiff line change
@@ -6,12 +6,7 @@ import type {
66
ProtocolCreateOptions,
77
PubsubTopic
88
} from "@waku/interfaces";
9-
import { DefaultPubsubTopic } from "@waku/interfaces";
10-
import {
11-
ensureShardingConfigured,
12-
Logger,
13-
shardInfoToPubsubTopics
14-
} from "@waku/utils";
9+
import { ensureShardingConfigured, Logger } from "@waku/utils";
1510
import {
1611
getConnectedPeersForProtocolAndShard,
1712
getPeersForProtocol,
@@ -32,16 +27,14 @@ export class BaseProtocol implements IBaseProtocol {
3227
public readonly removeLibp2pEventListener: Libp2p["removeEventListener"];
3328
readonly numPeersToUse: number;
3429
protected streamManager: StreamManager;
35-
protected pubsubTopics: PubsubTopic[];
3630

3731
constructor(
3832
public multicodec: string,
3933
private components: Libp2pComponents,
4034
private log: Logger,
35+
protected pubsubTopics: PubsubTopic[],
4136
private options?: ProtocolCreateOptions
4237
) {
43-
this.pubsubTopics = this.initializePubsubTopic(options);
44-
4538
this.numPeersToUse = options?.numPeersToUse ?? DEFAULT_NUM_PEERS_TO_USE;
4639

4740
this.addLibp2pEventListener = components.events.addEventListener.bind(
@@ -143,15 +136,4 @@ export class BaseProtocol implements IBaseProtocol {
143136

144137
return sortedFilteredPeers;
145138
}
146-
147-
private initializePubsubTopic(
148-
options?: ProtocolCreateOptions
149-
): PubsubTopic[] {
150-
return (
151-
options?.pubsubTopics ??
152-
(options?.shardInfo
153-
? shardInfoToPubsubTopics(options.shardInfo)
154-
: [DefaultPubsubTopic])
155-
);
156-
}
157139
}

packages/core/src/lib/filter/index.ts

+8-2
Original file line numberDiff line numberDiff line change
@@ -358,7 +358,13 @@ class Filter extends BaseProtocol implements IReceiver {
358358
}
359359

360360
constructor(libp2p: Libp2p, options?: ProtocolCreateOptions) {
361-
super(FilterCodecs.SUBSCRIBE, libp2p.components, log, options);
361+
super(
362+
FilterCodecs.SUBSCRIBE,
363+
libp2p.components,
364+
log,
365+
options!.pubsubTopics!,
366+
options
367+
);
362368

363369
libp2p.handle(FilterCodecs.PUSH, this.onRequest.bind(this)).catch((e) => {
364370
log.error("Failed to register ", FilterCodecs.PUSH, e);
@@ -493,7 +499,7 @@ class Filter extends BaseProtocol implements IReceiver {
493499
}
494500

495501
export function wakuFilter(
496-
init: Partial<ProtocolCreateOptions> = {}
502+
init: ProtocolCreateOptions = { pubsubTopics: [] }
497503
): (libp2p: Libp2p) => IFilter {
498504
return (libp2p: Libp2p) => new Filter(libp2p, init);
499505
}

packages/core/src/lib/light_push/index.ts

+7-1
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,13 @@ type PreparePushMessageResult =
4343
*/
4444
class LightPush extends BaseProtocol implements ILightPush {
4545
constructor(libp2p: Libp2p, options?: ProtocolCreateOptions) {
46-
super(LightPushCodec, libp2p.components, log, options);
46+
super(
47+
LightPushCodec,
48+
libp2p.components,
49+
log,
50+
options!.pubsubTopics!,
51+
options
52+
);
4753
}
4854

4955
private async preparePushMessage(

packages/core/src/lib/metadata/index.ts

+9-5
Original file line numberDiff line numberDiff line change
@@ -4,11 +4,10 @@ import type {
44
IMetadata,
55
Libp2pComponents,
66
PeerIdStr,
7-
ShardInfo,
8-
ShardingParams
7+
ShardInfo
98
} from "@waku/interfaces";
109
import { proto_metadata } from "@waku/proto";
11-
import { encodeRelayShard, Logger } from "@waku/utils";
10+
import { encodeRelayShard, Logger, shardInfoToPubsubTopics } from "@waku/utils";
1211
import all from "it-all";
1312
import * as lp from "it-length-prefixed";
1413
import { pipe } from "it-pipe";
@@ -25,10 +24,15 @@ class Metadata extends BaseProtocol implements IMetadata {
2524
handshakesConfirmed: Set<PeerIdStr> = new Set();
2625

2726
constructor(
28-
public shardInfo: ShardingParams,
27+
public shardInfo: ShardInfo,
2928
libp2p: Libp2pComponents
3029
) {
31-
super(MetadataCodec, libp2p.components, log, shardInfo && { shardInfo });
30+
super(
31+
MetadataCodec,
32+
libp2p.components,
33+
log,
34+
shardInfoToPubsubTopics(shardInfo)
35+
);
3236
this.libp2pComponents = libp2p;
3337
void libp2p.registrar.handle(MetadataCodec, (streamData) => {
3438
void this.onRequest(streamData);

packages/core/src/lib/store/index.ts

+1-1
Original file line numberDiff line numberDiff line change
@@ -76,7 +76,7 @@ class Store extends BaseProtocol implements IStore {
7676
private readonly NUM_PEERS_PROTOCOL = 1;
7777

7878
constructor(libp2p: Libp2p, options?: ProtocolCreateOptions) {
79-
super(StoreCodec, libp2p.components, log, options);
79+
super(StoreCodec, libp2p.components, log, options!.pubsubTopics!, options);
8080
}
8181

8282
/**

packages/core/src/lib/wait_for_remote_peer.ts

+9-8
Original file line numberDiff line numberDiff line change
@@ -31,11 +31,6 @@ export async function waitForRemotePeer(
3131
): Promise<void> {
3232
protocols = protocols ?? getEnabledProtocols(waku);
3333

34-
const isShardingEnabled = waku.shardInfo !== undefined;
35-
const metadataService = isShardingEnabled
36-
? waku.libp2p.services.metadata
37-
: undefined;
38-
3934
if (!waku.isStarted()) return Promise.reject("Waku node is not started");
4035

4136
const promises = [];
@@ -49,19 +44,25 @@ export async function waitForRemotePeer(
4944
if (protocols.includes(Protocols.Store)) {
5045
if (!waku.store)
5146
throw new Error("Cannot wait for Store peer: protocol not mounted");
52-
promises.push(waitForConnectedPeer(waku.store, metadataService));
47+
promises.push(
48+
waitForConnectedPeer(waku.store, waku.libp2p.services.metadata)
49+
);
5350
}
5451

5552
if (protocols.includes(Protocols.LightPush)) {
5653
if (!waku.lightPush)
5754
throw new Error("Cannot wait for LightPush peer: protocol not mounted");
58-
promises.push(waitForConnectedPeer(waku.lightPush, metadataService));
55+
promises.push(
56+
waitForConnectedPeer(waku.lightPush, waku.libp2p.services.metadata)
57+
);
5958
}
6059

6160
if (protocols.includes(Protocols.Filter)) {
6261
if (!waku.filter)
6362
throw new Error("Cannot wait for Filter peer: protocol not mounted");
64-
promises.push(waitForConnectedPeer(waku.filter, metadataService));
63+
promises.push(
64+
waitForConnectedPeer(waku.filter, waku.libp2p.services.metadata)
65+
);
6566
}
6667

6768
if (timeoutMs) {

packages/core/src/lib/waku.ts

+6-14
Original file line numberDiff line numberDiff line change
@@ -8,11 +8,10 @@ import type {
88
IStore,
99
Libp2p,
1010
PubsubTopic,
11-
ShardingParams,
1211
Waku
1312
} from "@waku/interfaces";
14-
import { DefaultPubsubTopic, Protocols } from "@waku/interfaces";
15-
import { Logger, shardInfoToPubsubTopics } from "@waku/utils";
13+
import { Protocols } from "@waku/interfaces";
14+
import { Logger } from "@waku/utils";
1615

1716
import { ConnectionManager } from "./connection_manager.js";
1817

@@ -42,6 +41,7 @@ export interface WakuOptions {
4241
* @default {@link @waku/core.DefaultUserAgent}
4342
*/
4443
userAgent?: string;
44+
pubsubTopics: PubsubTopic[];
4545
}
4646

4747
export class WakuNode implements Waku {
@@ -55,20 +55,16 @@ export class WakuNode implements Waku {
5555

5656
constructor(
5757
options: WakuOptions,
58-
pubsubTopics: PubsubTopic[] = [],
5958
libp2p: Libp2p,
60-
private pubsubShardInfo?: ShardingParams,
6159
store?: (libp2p: Libp2p) => IStore,
6260
lightPush?: (libp2p: Libp2p) => ILightPush,
6361
filter?: (libp2p: Libp2p) => IFilter,
6462
relay?: (libp2p: Libp2p) => IRelay
6563
) {
66-
if (!pubsubShardInfo) {
67-
this.pubsubTopics =
68-
pubsubTopics.length > 0 ? pubsubTopics : [DefaultPubsubTopic];
69-
} else {
70-
this.pubsubTopics = shardInfoToPubsubTopics(pubsubShardInfo);
64+
if (options.pubsubTopics.length == 0) {
65+
throw new Error("At least one pubsub topic must be provided");
7166
}
67+
this.pubsubTopics = options.pubsubTopics;
7268

7369
this.libp2p = libp2p;
7470

@@ -110,10 +106,6 @@ export class WakuNode implements Waku {
110106
);
111107
}
112108

113-
get shardInfo(): ShardingParams | undefined {
114-
return this.pubsubShardInfo;
115-
}
116-
117109
/**
118110
* Dials to the provided peer.
119111
*

packages/interfaces/src/waku.ts

+1-3
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@ import { IConnectionManager } from "./connection_manager.js";
55
import type { IFilter } from "./filter.js";
66
import type { Libp2p } from "./libp2p.js";
77
import type { ILightPush } from "./light_push.js";
8-
import { Protocols, ShardingParams } from "./protocols.js";
8+
import { Protocols } from "./protocols.js";
99
import type { IRelay } from "./relay.js";
1010
import type { IStore } from "./store.js";
1111

@@ -16,8 +16,6 @@ export interface Waku {
1616
filter?: IFilter;
1717
lightPush?: ILightPush;
1818

19-
shardInfo?: ShardingParams;
20-
2119
connectionManager: IConnectionManager;
2220

2321
dial(peer: PeerId | Multiaddr, protocols?: Protocols[]): Promise<Stream>;

packages/peer-exchange/src/waku_peer_exchange.ts

+9-7
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,8 @@ import type {
44
IPeerExchange,
55
Libp2pComponents,
66
PeerExchangeQueryParams,
7-
PeerInfo
7+
PeerInfo,
8+
PubsubTopic
89
} from "@waku/interfaces";
910
import { isDefined } from "@waku/utils";
1011
import { Logger } from "@waku/utils";
@@ -26,8 +27,8 @@ export class WakuPeerExchange extends BaseProtocol implements IPeerExchange {
2627
/**
2728
* @param components - libp2p components
2829
*/
29-
constructor(components: Libp2pComponents) {
30-
super(PeerExchangeCodec, components, log);
30+
constructor(components: Libp2pComponents, pubsubTopics: PubsubTopic[]) {
31+
super(PeerExchangeCodec, components, log, pubsubTopics);
3132
}
3233

3334
/**
@@ -91,8 +92,9 @@ export class WakuPeerExchange extends BaseProtocol implements IPeerExchange {
9192
*
9293
* @returns A function that creates a new peer exchange protocol
9394
*/
94-
export function wakuPeerExchange(): (
95-
components: Libp2pComponents
96-
) => WakuPeerExchange {
97-
return (components: Libp2pComponents) => new WakuPeerExchange(components);
95+
export function wakuPeerExchange(
96+
pubsubTopics: PubsubTopic[]
97+
): (components: Libp2pComponents) => WakuPeerExchange {
98+
return (components: Libp2pComponents) =>
99+
new WakuPeerExchange(components, pubsubTopics);
98100
}

packages/peer-exchange/src/waku_peer_exchange_discovery.ts

+11-7
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@ import type {
77
PeerId,
88
PeerInfo
99
} from "@libp2p/interface";
10-
import { Libp2pComponents, Tags } from "@waku/interfaces";
10+
import { Libp2pComponents, PubsubTopic, Tags } from "@waku/interfaces";
1111
import { encodeRelayShard, Logger } from "@waku/utils";
1212

1313
import { PeerExchangeCodec, WakuPeerExchange } from "./waku_peer_exchange.js";
@@ -77,10 +77,14 @@ export class PeerExchangeDiscovery
7777
);
7878
};
7979

80-
constructor(components: Libp2pComponents, options: Options = {}) {
80+
constructor(
81+
components: Libp2pComponents,
82+
pubsubTopics: PubsubTopic[],
83+
options: Options = {}
84+
) {
8185
super();
8286
this.components = components;
83-
this.peerExchange = new WakuPeerExchange(components);
87+
this.peerExchange = new WakuPeerExchange(components, pubsubTopics);
8488
this.options = options;
8589
this.isStarted = false;
8690
}
@@ -219,9 +223,9 @@ export class PeerExchangeDiscovery
219223
}
220224
}
221225

222-
export function wakuPeerExchangeDiscovery(): (
223-
components: Libp2pComponents
224-
) => PeerExchangeDiscovery {
226+
export function wakuPeerExchangeDiscovery(
227+
pubsubTopics: PubsubTopic[]
228+
): (components: Libp2pComponents) => PeerExchangeDiscovery {
225229
return (components: Libp2pComponents) =>
226-
new PeerExchangeDiscovery(components);
230+
new PeerExchangeDiscovery(components, pubsubTopics);
227231
}

packages/relay/src/index.ts

+5-13
Original file line numberDiff line numberDiff line change
@@ -24,11 +24,7 @@ import {
2424
SendError,
2525
SendResult
2626
} from "@waku/interfaces";
27-
import {
28-
isWireSizeUnderCap,
29-
shardInfoToPubsubTopics,
30-
toAsyncIterator
31-
} from "@waku/utils";
27+
import { isWireSizeUnderCap, toAsyncIterator } from "@waku/utils";
3228
import { pushOrInitMapSet } from "@waku/utils";
3329
import { Logger } from "@waku/utils";
3430

@@ -63,19 +59,15 @@ class Relay implements IRelay {
6359
*/
6460
private observers: Map<PubsubTopic, Map<ContentTopic, Set<unknown>>>;
6561

66-
constructor(libp2p: Libp2p, options?: Partial<RelayCreateOptions>) {
62+
constructor(libp2p: Libp2p, pubsubTopics: PubsubTopic[]) {
6763
if (!this.isRelayPubsub(libp2p.services.pubsub)) {
6864
throw Error(
6965
`Failed to initialize Relay. libp2p.pubsub does not support ${Relay.multicodec}`
7066
);
7167
}
7268

7369
this.gossipSub = libp2p.services.pubsub as GossipSub;
74-
this.pubsubTopics = new Set(
75-
options?.shardInfo
76-
? shardInfoToPubsubTopics(options.shardInfo)
77-
: options?.pubsubTopics ?? [DefaultPubsubTopic]
78-
);
70+
this.pubsubTopics = new Set(pubsubTopics);
7971

8072
if (this.gossipSub.isStarted()) {
8173
this.subscribeToAllTopics();
@@ -283,9 +275,9 @@ class Relay implements IRelay {
283275
}
284276

285277
export function wakuRelay(
286-
init: Partial<ProtocolCreateOptions> = {}
278+
pubsubTopics: PubsubTopic[]
287279
): (libp2p: Libp2p) => IRelay {
288-
return (libp2p: Libp2p) => new Relay(libp2p, init);
280+
return (libp2p: Libp2p) => new Relay(libp2p, pubsubTopics);
289281
}
290282

291283
export function wakuGossipSub(

0 commit comments

Comments
 (0)