Skip to content

Commit 2bc3735

Browse files
danisharora099adklempner
authored andcommitted
feat: add support for autosharded pubsub topics
tests: use a generator for sharded pubsub topics set pubsub topic in encoder/decoder based on sharding type add function for grouping content topics by pubsub topic add autosharding config to create options add autoshard rpc endpoints to nwaku and use in tests set autoshard pubsub topics in all protocols fix rebase with static sharding removes unused function remove console logs remove autosharding from ShardInfo, add to EncoderOptions fix enr and encoder/decoder options test that same application/version hashes to same shard index update comment on shard field fix spelling of autosharding fix content topic protocol in tests add sharding type alias and function to determine topic in encoders/decoders move DefaultPubsubTopic from core to interfaces
1 parent 6dc3882 commit 2bc3735

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

44 files changed

+1351
-122
lines changed

packages/core/src/index.ts

-1
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,4 @@
11
export { DefaultUserAgent } from "./lib/waku.js";
2-
export { DefaultPubsubTopic } from "./lib/constants.js";
32
export { createEncoder, createDecoder } from "./lib/message/version_0.js";
43
export type {
54
Encoder,

packages/core/src/lib/base_protocol.ts

+3-3
Original file line numberDiff line numberDiff line change
@@ -6,12 +6,12 @@ import type {
66
IBaseProtocol,
77
Libp2pComponents,
88
PubsubTopic,
9-
ShardInfo
9+
ShardingParams
1010
} from "@waku/interfaces";
11+
import { DefaultPubsubTopic } from "@waku/interfaces";
1112
import { shardInfoToPubsubTopics } from "@waku/utils";
1213
import { getPeersForProtocol, selectPeerForProtocol } from "@waku/utils/libp2p";
1314

14-
import { DefaultPubsubTopic } from "./constants.js";
1515
import { filterPeers } from "./filterPeers.js";
1616
import { StreamManager } from "./stream_manager.js";
1717

@@ -97,7 +97,7 @@ export class BaseProtocol implements IBaseProtocol {
9797
return filterPeers(allPeersForProtocol, numPeers, maxBootstrapPeers);
9898
}
9999

100-
initializePubsubTopic(shardInfo?: ShardInfo): PubsubTopic[] {
100+
initializePubsubTopic(shardInfo?: ShardingParams): PubsubTopic[] {
101101
return shardInfo
102102
? shardInfoToPubsubTopics(shardInfo)
103103
: [DefaultPubsubTopic];

packages/core/src/lib/filter/index.ts

+1-1
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ import type {
1717
SingleShardInfo,
1818
Unsubscribe
1919
} from "@waku/interfaces";
20+
import { DefaultPubsubTopic } from "@waku/interfaces";
2021
import { WakuMessage } from "@waku/proto";
2122
import {
2223
ensurePubsubTopicIsConfigured,
@@ -30,7 +31,6 @@ import * as lp from "it-length-prefixed";
3031
import { pipe } from "it-pipe";
3132

3233
import { BaseProtocol } from "../base_protocol.js";
33-
import { DefaultPubsubTopic } from "../constants.js";
3434

3535
import {
3636
FilterPushRpc,

packages/core/src/lib/message/version_0.ts

+3-9
Original file line numberDiff line numberDiff line change
@@ -11,9 +11,7 @@ import type {
1111
SingleShardInfo
1212
} from "@waku/interfaces";
1313
import { proto_message as proto } from "@waku/proto";
14-
import { Logger, singleShardInfoToPubsubTopic } from "@waku/utils";
15-
16-
import { DefaultPubsubTopic } from "../constants.js";
14+
import { determinePubsubTopic, Logger } from "@waku/utils";
1715

1816
const log = new Logger("message:version-0");
1917
const OneMillion = BigInt(1_000_000);
@@ -128,9 +126,7 @@ export function createEncoder({
128126
return new Encoder(
129127
contentTopic,
130128
ephemeral,
131-
pubsubTopicShardInfo
132-
? singleShardInfoToPubsubTopic(pubsubTopicShardInfo)
133-
: DefaultPubsubTopic,
129+
determinePubsubTopic(contentTopic, pubsubTopicShardInfo),
134130
metaSetter
135131
);
136132
}
@@ -193,9 +189,7 @@ export function createDecoder(
193189
pubsubTopicShardInfo?: SingleShardInfo
194190
): Decoder {
195191
return new Decoder(
196-
pubsubTopicShardInfo
197-
? singleShardInfoToPubsubTopic(pubsubTopicShardInfo)
198-
: DefaultPubsubTopic,
192+
determinePubsubTopic(contentTopic, pubsubTopicShardInfo),
199193
contentTopic
200194
);
201195
}

packages/core/src/lib/metadata/index.ts

+9-4
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,12 @@
11
import type { PeerId } from "@libp2p/interface/peer-id";
22
import { IncomingStreamData } from "@libp2p/interface/stream-handler";
33
import { encodeRelayShard } from "@waku/enr";
4-
import type { IMetadata, Libp2pComponents, ShardInfo } from "@waku/interfaces";
4+
import type {
5+
IMetadata,
6+
Libp2pComponents,
7+
ShardInfo,
8+
ShardingParams
9+
} from "@waku/interfaces";
510
import { proto_metadata } from "@waku/proto";
611
import { Logger } from "@waku/utils";
712
import all from "it-all";
@@ -16,9 +21,9 @@ const log = new Logger("metadata");
1621
export const MetadataCodec = "/vac/waku/metadata/1.0.0";
1722

1823
class Metadata extends BaseProtocol {
19-
private readonly shardInfo: ShardInfo;
24+
private readonly shardInfo: ShardingParams;
2025
private libp2pComponents: Libp2pComponents;
21-
constructor(shardInfo: ShardInfo, libp2p: Libp2pComponents) {
26+
constructor(shardInfo: ShardingParams, libp2p: Libp2pComponents) {
2227
super(MetadataCodec, libp2p.components);
2328
this.libp2pComponents = libp2p;
2429
this.shardInfo = shardInfo;
@@ -99,7 +104,7 @@ class Metadata extends BaseProtocol {
99104
}
100105

101106
export function wakuMetadata(
102-
shardInfo: ShardInfo
107+
shardInfo: ShardingParams
103108
): (components: Libp2pComponents) => IMetadata {
104109
return (components: Libp2pComponents) => new Metadata(shardInfo, components);
105110
}

packages/core/src/lib/waku.ts

+3-4
Original file line numberDiff line numberDiff line change
@@ -8,14 +8,13 @@ import type {
88
IStore,
99
Libp2p,
1010
PubsubTopic,
11-
ShardInfo,
11+
ShardingParams,
1212
Waku
1313
} from "@waku/interfaces";
14-
import { Protocols } from "@waku/interfaces";
14+
import { DefaultPubsubTopic, Protocols } from "@waku/interfaces";
1515
import { Logger, shardInfoToPubsubTopics } from "@waku/utils";
1616

1717
import { ConnectionManager } from "./connection_manager.js";
18-
import { DefaultPubsubTopic } from "./constants.js";
1918

2019
export const DefaultPingKeepAliveValueSecs = 5 * 60;
2120
export const DefaultRelayKeepAliveValueSecs = 5 * 60;
@@ -57,7 +56,7 @@ export class WakuNode implements Waku {
5756
constructor(
5857
options: WakuOptions,
5958
libp2p: Libp2p,
60-
pubsubShardInfo?: ShardInfo,
59+
pubsubShardInfo?: ShardingParams,
6160
store?: (libp2p: Libp2p) => IStore,
6261
lightPush?: (libp2p: Libp2p) => ILightPush,
6362
filter?: (libp2p: Libp2p) => IFilter,
File renamed without changes.

packages/interfaces/src/index.ts

+1
Original file line numberDiff line numberDiff line change
@@ -15,3 +15,4 @@ export * from "./libp2p.js";
1515
export * from "./keep_alive_manager.js";
1616
export * from "./dns_discovery.js";
1717
export * from "./metadata.js";
18+
export * from "./constants.js";

packages/interfaces/src/message.ts

+4-1
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,10 @@ import type { PubsubTopic } from "./misc.js";
22

33
export interface SingleShardInfo {
44
clusterId: number;
5-
shard: number;
5+
/**
6+
* Specifying this field indicates to the encoder/decoder that static sharding must be used.
7+
*/
8+
shard?: number;
69
}
710

811
export interface IRateLimitProof {

packages/interfaces/src/protocols.ts

+8-1
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,13 @@ export interface IBaseProtocol {
2121
removeLibp2pEventListener: Libp2p["removeEventListener"];
2222
}
2323

24+
export type ContentTopicInfo = {
25+
clusterId: number;
26+
contentTopics: string[];
27+
};
28+
29+
export type ShardingParams = ShardInfo | ContentTopicInfo;
30+
2431
export type ProtocolCreateOptions = {
2532
/**
2633
* Waku supports usage of multiple pubsub topics. This is achieved through static sharding for now, and auto-sharding in the future.
@@ -39,7 +46,7 @@ export type ProtocolCreateOptions = {
3946
* See [Waku v2 Topic Usage Recommendations](https://rfc.vac.dev/spec/23/) for details.
4047
*
4148
*/
42-
shardInfo?: ShardInfo;
49+
shardInfo?: ShardingParams;
4350
/**
4451
* You can pass options to the `Libp2p` instance used by {@link @waku/core!WakuNode} using the `libp2p` property.
4552
* This property is the same type as the one passed to [`Libp2p.create`](https://github.com/libp2p/js-libp2p/blob/master/doc/API.md#create)

packages/message-encryption/src/ecies.ts

+3-8
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,3 @@
1-
import { DefaultPubsubTopic } from "@waku/core";
21
import { Decoder as DecoderV0 } from "@waku/core/lib/message/version_0";
32
import type {
43
EncoderOptions as BaseEncoderOptions,
@@ -11,7 +10,7 @@ import type {
1110
SingleShardInfo
1211
} from "@waku/interfaces";
1312
import { WakuMessage } from "@waku/proto";
14-
import { Logger, singleShardInfoToPubsubTopic } from "@waku/utils";
13+
import { determinePubsubTopic, Logger } from "@waku/utils";
1514

1615
import { generatePrivateKey } from "./crypto/utils.js";
1716
import { DecodedMessage } from "./decoded_message.js";
@@ -107,9 +106,7 @@ export function createEncoder({
107106
metaSetter
108107
}: EncoderOptions): Encoder {
109108
return new Encoder(
110-
pubsubTopicShardInfo
111-
? singleShardInfoToPubsubTopic(pubsubTopicShardInfo)
112-
: DefaultPubsubTopic,
109+
determinePubsubTopic(contentTopic, pubsubTopicShardInfo),
113110
contentTopic,
114111
publicKey,
115112
sigPrivKey,
@@ -200,9 +197,7 @@ export function createDecoder(
200197
pubsubTopicShardInfo?: SingleShardInfo
201198
): Decoder {
202199
return new Decoder(
203-
pubsubTopicShardInfo
204-
? singleShardInfoToPubsubTopic(pubsubTopicShardInfo)
205-
: DefaultPubsubTopic,
200+
determinePubsubTopic(contentTopic, pubsubTopicShardInfo),
206201
contentTopic,
207202
privateKey
208203
);

packages/message-encryption/src/symmetric.ts

+3-8
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,3 @@
1-
import { DefaultPubsubTopic } from "@waku/core";
21
import { Decoder as DecoderV0 } from "@waku/core/lib/message/version_0";
32
import type {
43
EncoderOptions as BaseEncoderOptions,
@@ -11,7 +10,7 @@ import type {
1110
SingleShardInfo
1211
} from "@waku/interfaces";
1312
import { WakuMessage } from "@waku/proto";
14-
import { Logger, singleShardInfoToPubsubTopic } from "@waku/utils";
13+
import { determinePubsubTopic, Logger } from "@waku/utils";
1514

1615
import { generateSymmetricKey } from "./crypto/utils.js";
1716
import { DecodedMessage } from "./decoded_message.js";
@@ -107,9 +106,7 @@ export function createEncoder({
107106
metaSetter
108107
}: EncoderOptions): Encoder {
109108
return new Encoder(
110-
pubsubTopicShardInfo
111-
? singleShardInfoToPubsubTopic(pubsubTopicShardInfo)
112-
: DefaultPubsubTopic,
109+
determinePubsubTopic(contentTopic, pubsubTopicShardInfo),
113110
contentTopic,
114111
symKey,
115112
sigPrivKey,
@@ -200,9 +197,7 @@ export function createDecoder(
200197
pubsubTopicShardInfo?: SingleShardInfo
201198
): Decoder {
202199
return new Decoder(
203-
pubsubTopicShardInfo
204-
? singleShardInfoToPubsubTopic(pubsubTopicShardInfo)
205-
: DefaultPubsubTopic,
200+
determinePubsubTopic(contentTopic, pubsubTopicShardInfo),
206201
contentTopic,
207202
symKey
208203
);

packages/relay/src/index.ts

+1-1
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@ import { SignaturePolicy } from "@chainsafe/libp2p-gossipsub/types";
99
import type { PeerId } from "@libp2p/interface/peer-id";
1010
import type { PubSub as Libp2pPubsub } from "@libp2p/interface/pubsub";
1111
import { sha256 } from "@noble/hashes/sha256";
12-
import { DefaultPubsubTopic } from "@waku/core";
12+
import { DefaultPubsubTopic } from "@waku/interfaces";
1313
import {
1414
ActiveSubscriptions,
1515
Callback,

packages/relay/src/topic_only_message.ts

+1-1
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
import { DefaultPubsubTopic } from "@waku/core";
1+
import { DefaultPubsubTopic } from "@waku/interfaces";
22
import type {
33
IDecodedMessage,
44
IDecoder,

packages/sdk/src/create.ts

+2-2
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ import type {
2323
LightNode,
2424
ProtocolCreateOptions,
2525
RelayNode,
26-
ShardInfo
26+
ShardingParams
2727
} from "@waku/interfaces";
2828
import { wakuPeerExchangeDiscovery } from "@waku/peer-exchange";
2929
import { RelayCreateOptions, wakuGossipSub, wakuRelay } from "@waku/relay";
@@ -180,7 +180,7 @@ type MetadataService = {
180180
};
181181

182182
export async function defaultLibp2p(
183-
shardInfo?: ShardInfo,
183+
shardInfo?: ShardingParams,
184184
wakuGossipSub?: PubsubService["pubsub"],
185185
options?: Partial<CreateLibp2pOptions>,
186186
userAgent?: string

packages/tests/src/message_collector.ts

+45-1
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
1-
import { DecodedMessage, DefaultPubsubTopic } from "@waku/core";
1+
import { DecodedMessage } from "@waku/core";
2+
import { DefaultPubsubTopic } from "@waku/interfaces";
23
import { Logger } from "@waku/utils";
34
import { bytesToUtf8, utf8ToBytes } from "@waku/utils/bytes";
45
import { AssertionError, expect } from "chai";
@@ -103,6 +104,49 @@ export class MessageCollector {
103104
}
104105
}
105106

107+
async waitForMessagesAutosharding(
108+
numMessages: number,
109+
options?: {
110+
contentTopic: string;
111+
timeoutDuration?: number;
112+
exact?: boolean;
113+
}
114+
): Promise<boolean> {
115+
const startTime = Date.now();
116+
const timeoutDuration = options?.timeoutDuration || 400;
117+
const exact = options?.exact || false;
118+
119+
while (this.count < numMessages) {
120+
if (this.nwaku) {
121+
try {
122+
this.list = await this.nwaku.messagesAutosharding(
123+
options!.contentTopic
124+
);
125+
} catch (error) {
126+
log.error(`Can't retrieve messages because of ${error}`);
127+
await delay(10);
128+
}
129+
}
130+
131+
if (Date.now() - startTime > timeoutDuration * numMessages) {
132+
return false;
133+
}
134+
135+
await delay(10);
136+
}
137+
138+
if (exact) {
139+
if (this.count == numMessages) {
140+
return true;
141+
} else {
142+
log.warn(`Was expecting exactly ${numMessages} messages`);
143+
return false;
144+
}
145+
} else {
146+
return true;
147+
}
148+
}
149+
106150
// Verifies a received message against expected values.
107151
verifyReceivedMessage(
108152
index: number,

0 commit comments

Comments
 (0)