Skip to content

Commit 124a29e

Browse files
feat(static-sharding): filter peer connections per shards (#1626)
* add interface for `ShardInfo` * enr: add deserialization logic & setup getters * add sharding related utils * utils: add shard<-> bytes conversion helpers * pass `pubSubTopics` to `Waku` * add `rs`/`rsv` details during discovery * connection-manager: discard irrelevant peers * add tests for static sharding - peer exchange * update `ConnectionManager` tests to account for topic validity * add js suffix to import * address some comments * move shardInfo encoding to ENR * test: update for new API * enr: add tests for serialisation & deserialisation * address comment * update test * move getPeershardInfo to ConnectionManager and return ShardInfo instead of bytes * update encoding and decoding relay shards to also factor for shards>64 * relay shard encoding decoding: use DataView and verbose spec tests * improve tests for relay shard encoding decoding * rm: only * improve log message for unconfigured pubsub topic * minor improvement * fix: buffer <> Uint8array problems with shard decoding * fix: test * rm: only
1 parent fe64da1 commit 124a29e

18 files changed

+480
-25
lines changed

package-lock.json

+4
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

packages/core/package.json

+1
Original file line numberDiff line numberDiff line change
@@ -73,6 +73,7 @@
7373
},
7474
"dependencies": {
7575
"@noble/hashes": "^1.3.2",
76+
"@waku/enr": "^0.0.17",
7677
"@waku/interfaces": "0.0.18",
7778
"@waku/proto": "0.0.5",
7879
"@waku/utils": "0.0.11",

packages/core/src/lib/connection_manager.ts

+51-1
Original file line numberDiff line numberDiff line change
@@ -1,17 +1,22 @@
11
import type { PeerId } from "@libp2p/interface/peer-id";
22
import type { PeerInfo } from "@libp2p/interface/peer-info";
33
import type { Peer } from "@libp2p/interface/peer-store";
4+
import type { PeerStore } from "@libp2p/interface/peer-store";
45
import { CustomEvent, EventEmitter } from "@libp2p/interfaces/events";
6+
import { decodeRelayShard } from "@waku/enr";
57
import {
68
ConnectionManagerOptions,
79
EPeersByDiscoveryEvents,
810
IConnectionManager,
911
IPeersByDiscoveryEvents,
1012
IRelay,
1113
KeepAliveOptions,
12-
PeersByDiscoveryResult
14+
PeersByDiscoveryResult,
15+
PubSubTopic,
16+
ShardInfo
1317
} from "@waku/interfaces";
1418
import { Libp2p, Tags } from "@waku/interfaces";
19+
import { shardInfoToPubSubTopics } from "@waku/utils";
1520
import debug from "debug";
1621

1722
import { KeepAliveManager } from "./keep_alive_manager.js";
@@ -40,6 +45,7 @@ export class ConnectionManager
4045
peerId: string,
4146
libp2p: Libp2p,
4247
keepAliveOptions: KeepAliveOptions,
48+
pubSubTopics: PubSubTopic[],
4349
relay?: IRelay,
4450
options?: ConnectionManagerOptions
4551
): ConnectionManager {
@@ -48,6 +54,7 @@ export class ConnectionManager
4854
instance = new ConnectionManager(
4955
libp2p,
5056
keepAliveOptions,
57+
pubSubTopics,
5158
relay,
5259
options
5360
);
@@ -104,11 +111,13 @@ export class ConnectionManager
104111
private constructor(
105112
libp2p: Libp2p,
106113
keepAliveOptions: KeepAliveOptions,
114+
private configuredPubSubTopics: PubSubTopic[],
107115
relay?: IRelay,
108116
options?: Partial<ConnectionManagerOptions>
109117
) {
110118
super();
111119
this.libp2p = libp2p;
120+
this.configuredPubSubTopics = configuredPubSubTopics;
112121
this.options = {
113122
maxDialAttemptsForPeer: DEFAULT_MAX_DIAL_ATTEMPTS_FOR_PEER,
114123
maxBootstrapPeersAllowed: DEFAULT_MAX_BOOTSTRAP_PEERS_ALLOWED,
@@ -314,6 +323,20 @@ export class ConnectionManager
314323
void (async () => {
315324
const { id: peerId } = evt.detail;
316325

326+
if (!(await this.isPeerTopicConfigured(peerId))) {
327+
const shardInfo = await this.getPeerShardInfo(
328+
peerId,
329+
this.libp2p.peerStore
330+
);
331+
log(
332+
`Discovered peer ${peerId.toString()} with ShardInfo ${shardInfo} is not part of any of the configured pubsub topics (${
333+
this.configuredPubSubTopics
334+
}).
335+
Not dialing.`
336+
);
337+
return;
338+
}
339+
317340
const isBootstrap = (await this.getTagNamesForPeer(peerId)).includes(
318341
Tags.BOOTSTRAP
319342
);
@@ -430,4 +453,31 @@ export class ConnectionManager
430453
return [];
431454
}
432455
}
456+
457+
private async isPeerTopicConfigured(peerId: PeerId): Promise<boolean> {
458+
const shardInfo = await this.getPeerShardInfo(
459+
peerId,
460+
this.libp2p.peerStore
461+
);
462+
463+
// If there's no shard information, simply return true
464+
if (!shardInfo) return true;
465+
466+
const pubSubTopics = shardInfoToPubSubTopics(shardInfo);
467+
468+
const isTopicConfigured = pubSubTopics.some((topic) =>
469+
this.configuredPubSubTopics.includes(topic)
470+
);
471+
return isTopicConfigured;
472+
}
473+
474+
private async getPeerShardInfo(
475+
peerId: PeerId,
476+
peerStore: PeerStore
477+
): Promise<ShardInfo | undefined> {
478+
const peer = await peerStore.get(peerId);
479+
const shardInfoBytes = peer.metadata.get("shardInfo");
480+
if (!shardInfoBytes) return undefined;
481+
return decodeRelayShard(shardInfoBytes);
482+
}
433483
}

packages/core/src/lib/waku.ts

+3
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ import type {
77
IRelay,
88
IStore,
99
Libp2p,
10+
PubSubTopic,
1011
Waku
1112
} from "@waku/interfaces";
1213
import { Protocols } from "@waku/interfaces";
@@ -52,6 +53,7 @@ export class WakuNode implements Waku {
5253

5354
constructor(
5455
options: WakuOptions,
56+
public readonly pubSubTopics: PubSubTopic[],
5557
libp2p: Libp2p,
5658
store?: (libp2p: Libp2p) => IStore,
5759
lightPush?: (libp2p: Libp2p) => ILightPush,
@@ -86,6 +88,7 @@ export class WakuNode implements Waku {
8688
peerId,
8789
libp2p,
8890
{ pingKeepAlive, relayKeepAlive },
91+
pubSubTopics,
8992
this.relay
9093
);
9194

packages/dns-discovery/src/dns_discovery.ts

+16-8
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ import type {
55
} from "@libp2p/interface/peer-discovery";
66
import { peerDiscovery as symbol } from "@libp2p/interface/peer-discovery";
77
import type { PeerInfo } from "@libp2p/interface/peer-info";
8+
import { encodeRelayShard } from "@waku/enr";
89
import type {
910
DnsDiscOptions,
1011
DnsDiscoveryComponents,
@@ -72,18 +73,16 @@ export class PeerDiscoveryDns
7273
return;
7374
}
7475

75-
const peerInfo = peerEnr.peerInfo;
76+
const { peerInfo, shardInfo } = peerEnr;
7677

7778
if (!peerInfo) {
7879
continue;
7980
}
8081

8182
const tagsToUpdate = {
82-
tags: {
83-
[DEFAULT_BOOTSTRAP_TAG_NAME]: {
84-
value: this._options.tagValue ?? DEFAULT_BOOTSTRAP_TAG_VALUE,
85-
ttl: this._options.tagTTL ?? DEFAULT_BOOTSTRAP_TAG_TTL
86-
}
83+
[DEFAULT_BOOTSTRAP_TAG_NAME]: {
84+
value: this._options.tagValue ?? DEFAULT_BOOTSTRAP_TAG_VALUE,
85+
ttl: this._options.tagTTL ?? DEFAULT_BOOTSTRAP_TAG_TTL
8786
}
8887
};
8988

@@ -96,11 +95,20 @@ export class PeerDiscoveryDns
9695

9796
if (!hasBootstrapTag) {
9897
isPeerChanged = true;
99-
await this._components.peerStore.merge(peerInfo.id, tagsToUpdate);
98+
await this._components.peerStore.merge(peerInfo.id, {
99+
tags: tagsToUpdate
100+
});
100101
}
101102
} else {
102103
isPeerChanged = true;
103-
await this._components.peerStore.save(peerInfo.id, tagsToUpdate);
104+
await this._components.peerStore.save(peerInfo.id, {
105+
tags: tagsToUpdate,
106+
...(shardInfo && {
107+
metadata: {
108+
shardInfo: encodeRelayShard(shardInfo)
109+
}
110+
})
111+
});
104112
}
105113

106114
if (isPeerChanged) {

packages/enr/package.json

+1
Original file line numberDiff line numberDiff line change
@@ -71,6 +71,7 @@
7171
"@waku/interfaces": "0.0.18",
7272
"chai": "^4.3.7",
7373
"cspell": "^7.3.2",
74+
"fast-check": "^3.13.1",
7475
"mocha": "^10.2.0",
7576
"npm-run-all": "^4.1.5",
7677
"process": "^0.11.10",

packages/enr/src/enr.ts

+9-1
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,8 @@ import type {
66
ENRValue,
77
IEnr,
88
NodeId,
9-
SequenceNumber
9+
SequenceNumber,
10+
ShardInfo
1011
} from "@waku/interfaces";
1112
import debug from "debug";
1213

@@ -64,6 +65,13 @@ export class ENR extends RawEnr implements IEnr {
6465
protocol: TransportProtocol | TransportProtocolPerIpVersion
6566
) => Multiaddr | undefined = locationMultiaddrFromEnrFields.bind({}, this);
6667

68+
get shardInfo(): ShardInfo | undefined {
69+
if (this.rs && this.rsv) {
70+
log("Warning: ENR contains both `rs` and `rsv` fields.");
71+
}
72+
return this.rs || this.rsv;
73+
}
74+
6775
setLocationMultiaddr(multiaddr: Multiaddr): void {
6876
const protoNames = multiaddr.protoNames();
6977
if (

packages/enr/src/index.ts

+1
Original file line numberDiff line numberDiff line change
@@ -5,3 +5,4 @@ export * from "./enr.js";
55
export * from "./peer_id.js";
66
export * from "./waku2_codec.js";
77
export * from "./crypto.js";
8+
export * from "./relay_shard_codec.js";

packages/enr/src/raw_enr.ts

+20-1
Original file line numberDiff line numberDiff line change
@@ -3,11 +3,18 @@ import {
33
convertToBytes,
44
convertToString
55
} from "@multiformats/multiaddr/convert";
6-
import type { ENRKey, ENRValue, SequenceNumber, Waku2 } from "@waku/interfaces";
6+
import type {
7+
ENRKey,
8+
ENRValue,
9+
SequenceNumber,
10+
ShardInfo,
11+
Waku2
12+
} from "@waku/interfaces";
713
import { bytesToUtf8 } from "@waku/utils/bytes";
814

915
import { ERR_INVALID_ID } from "./constants.js";
1016
import { decodeMultiaddrs, encodeMultiaddrs } from "./multiaddrs_codec.js";
17+
import { decodeRelayShard } from "./relay_shard_codec.js";
1118
import { decodeWaku2, encodeWaku2 } from "./waku2_codec.js";
1219

1320
export class RawEnr extends Map<ENRKey, ENRValue> {
@@ -45,6 +52,18 @@ export class RawEnr extends Map<ENRKey, ENRValue> {
4552
}
4653
}
4754

55+
get rs(): ShardInfo | undefined {
56+
const rs = this.get("rs");
57+
if (!rs) return undefined;
58+
return decodeRelayShard(rs);
59+
}
60+
61+
get rsv(): ShardInfo | undefined {
62+
const rsv = this.get("rsv");
63+
if (!rsv) return undefined;
64+
return decodeRelayShard(rsv);
65+
}
66+
4867
get ip(): string | undefined {
4968
return getStringValue(this, "ip", "ip4");
5069
}
+68
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,68 @@
1+
import { expect } from "chai";
2+
import fc from "fast-check";
3+
4+
import { decodeRelayShard, encodeRelayShard } from "./relay_shard_codec.js";
5+
6+
describe("Relay Shard codec", () => {
7+
// Boundary test case
8+
it("should handle a minimal index list", () => {
9+
const shardInfo = { cluster: 0, indexList: [0] };
10+
const encoded = encodeRelayShard(shardInfo);
11+
const decoded = decodeRelayShard(encoded);
12+
expect(decoded).to.deep.equal(
13+
shardInfo,
14+
"Decoded shard info does not match the original for minimal index list"
15+
);
16+
});
17+
18+
// Property-based test for rs format (Index List)
19+
it("should correctly encode and decode relay shards using rs format (Index List)", () => {
20+
fc.assert(
21+
fc.property(
22+
fc.nat(65535), // cluster
23+
fc
24+
.array(fc.nat(1023), { minLength: 1, maxLength: 63 }) // indexList
25+
.map((arr) => [...new Set(arr)].sort((a, b) => a - b)),
26+
(cluster, indexList) => {
27+
const shardInfo = { cluster, indexList };
28+
const encoded = encodeRelayShard(shardInfo);
29+
const decoded = decodeRelayShard(encoded);
30+
31+
expect(decoded).to.deep.equal(
32+
shardInfo,
33+
"Decoded shard info does not match the original for rs format"
34+
);
35+
}
36+
)
37+
);
38+
});
39+
40+
// Property-based test for rsv format (Bit Vector)
41+
it("should correctly encode and decode relay shards using rsv format (Bit Vector)", () => {
42+
fc.assert(
43+
fc.property(
44+
fc.nat(65535), // cluster
45+
fc
46+
.array(fc.nat(1023), { minLength: 64, maxLength: 1024 }) // indexList
47+
.map((arr) => [...new Set(arr)].sort((a, b) => a - b)),
48+
(cluster, indexList) => {
49+
const shardInfo = { cluster, indexList };
50+
const encoded = encodeRelayShard(shardInfo);
51+
const decoded = decodeRelayShard(encoded);
52+
53+
expect(decoded).to.deep.equal(
54+
shardInfo,
55+
"Decoded shard info does not match the original for rsv format"
56+
);
57+
}
58+
)
59+
);
60+
});
61+
62+
// Error handling test case
63+
it("should throw an error for insufficient data", () => {
64+
expect(() => decodeRelayShard(new Uint8Array([0, 0]))).to.throw(
65+
"Insufficient data"
66+
);
67+
});
68+
});

0 commit comments

Comments
 (0)