Skip to content

Commit d0c9683

Browse files
save remote peer's shard info after successful connection
1 parent cb43460 commit d0c9683

File tree

2 files changed

+175
-106
lines changed

2 files changed

+175
-106
lines changed

packages/core/src/lib/metadata/index.ts

+36-6
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
import type { PeerId } from "@libp2p/interface/peer-id";
22
import { IncomingStreamData } from "@libp2p/interface/stream-handler";
3+
import { encodeRelayShard } from "@waku/enr";
34
import type { IMetadata, Libp2pComponents, ShardInfo } from "@waku/interfaces";
45
import { proto_metadata } from "@waku/proto";
56
import { Logger } from "@waku/utils";
@@ -16,8 +17,10 @@ export const MetadataCodec = "/vac/waku/metadata/1.0.0";
1617

1718
class Metadata extends BaseProtocol {
1819
private readonly shardInfo: ShardInfo;
20+
private libp2pComponents: Libp2pComponents;
1921
constructor(shardInfo: ShardInfo, libp2p: Libp2pComponents) {
2022
super(MetadataCodec, libp2p.components);
23+
this.libp2pComponents = libp2p;
2124
this.shardInfo = shardInfo;
2225
void libp2p.registrar.handle(MetadataCodec, (streamData) => {
2326
void this.onRequest(streamData);
@@ -29,11 +32,28 @@ class Metadata extends BaseProtocol {
2932
*/
3033
private async onRequest(streamData: IncomingStreamData): Promise<void> {
3134
try {
32-
const encodedResponse = proto_metadata.WakuMetadataResponse.encode(
35+
const { stream, connection } = streamData;
36+
const encodedShardInfo = proto_metadata.WakuMetadataResponse.encode(
3337
this.shardInfo
3438
);
3539

36-
await pipe([encodedResponse], lp.encode, streamData.stream);
40+
const encodedResponse = await pipe(
41+
[encodedShardInfo],
42+
lp.encode,
43+
stream,
44+
lp.decode,
45+
async (source) => await all(source)
46+
);
47+
48+
const remoteShardInfoResponse =
49+
this.decodeMetadataResponse(encodedResponse);
50+
51+
// add or update the shardInfo to peer store
52+
await this.libp2pComponents.peerStore.merge(connection.remotePeer, {
53+
metadata: {
54+
shardInfo: encodeRelayShard(remoteShardInfoResponse)
55+
}
56+
});
3757
} catch (error) {
3858
log.error("Error handling metadata request", error);
3959
}
@@ -49,22 +69,32 @@ class Metadata extends BaseProtocol {
4969

5070
const stream = await this.getStream(peer);
5171

52-
const res = await pipe(
72+
const encodedResponse = await pipe(
5373
[request],
5474
lp.encode,
5575
stream,
5676
lp.decode,
5777
async (source) => await all(source)
5878
);
5979

80+
const decodedResponse = this.decodeMetadataResponse(encodedResponse);
81+
82+
return decodedResponse;
83+
}
84+
85+
private decodeMetadataResponse(encodedResponse: Uint8ArrayList[]): ShardInfo {
6086
const bytes = new Uint8ArrayList();
61-
res.forEach((chunk) => {
87+
88+
encodedResponse.forEach((chunk) => {
6289
bytes.append(chunk);
6390
});
64-
const response = proto_metadata.WakuMetadataResponse.decode(bytes);
91+
const response = proto_metadata.WakuMetadataResponse.decode(
92+
bytes
93+
) as ShardInfo;
94+
6595
if (!response) log.error("Error decoding metadata response");
6696

67-
return response as ShardInfo;
97+
return response;
6898
}
6999
}
70100

packages/tests/tests/metadata.spec.ts

+139-100
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
import { MetadataCodec } from "@waku/core";
2+
import { decodeRelayShard } from "@waku/enr";
23
import type { LightNode, ShardInfo } from "@waku/interfaces";
34
import { createLightNode } from "@waku/sdk";
45
import { shardInfoToPubsubTopics } from "@waku/utils";
@@ -11,7 +12,7 @@ import { NimGoNode } from "../src/node/node.js";
1112

1213
chai.use(chaiAsPromised);
1314

14-
describe.only("Metadata Protocol", () => {
15+
describe("Metadata Protocol", () => {
1516
let waku: LightNode;
1617
let nwaku1: NimGoNode;
1718

@@ -24,142 +25,180 @@ describe.only("Metadata Protocol", () => {
2425
await tearDownNodes([nwaku1], waku);
2526
});
2627

27-
it("same cluster, same shard: nodes connect", async function () {
28-
this.timeout(55_000);
29-
30-
const shardInfo: ShardInfo = {
31-
clusterId: 1,
32-
shards: [1]
33-
};
28+
describe("connections", function () {
29+
it("same cluster, same shard: nodes connect", async function () {
30+
this.timeout(55_000);
31+
32+
const shardInfo: ShardInfo = {
33+
clusterId: 1,
34+
shards: [1]
35+
};
36+
37+
await nwaku1.start({
38+
relay: true,
39+
discv5Discovery: true,
40+
peerExchange: true,
41+
clusterId: shardInfo.clusterId,
42+
pubsubTopic: shardInfoToPubsubTopics(shardInfo)
43+
});
44+
45+
const nwaku1Ma = await nwaku1.getMultiaddrWithId();
46+
const nwaku1PeerId = await nwaku1.getPeerId();
47+
48+
waku = await createLightNode({ shardInfo });
49+
await waku.start();
50+
await waku.libp2p.dialProtocol(nwaku1Ma, MetadataCodec);
51+
52+
const shardInfoRes =
53+
await waku.libp2p.services.metadata?.query(nwaku1PeerId);
54+
expect(shardInfoRes).to.not.be.undefined;
55+
expect(shardInfoRes?.clusterId).to.equal(shardInfo.clusterId);
56+
expect(shardInfoRes?.shards).to.deep.equal(shardInfo.shards);
57+
58+
const activeConnections = waku.libp2p.getConnections();
59+
expect(activeConnections.length).to.equal(1);
60+
});
3461

35-
await nwaku1.start({
36-
relay: true,
37-
discv5Discovery: true,
38-
peerExchange: true,
39-
clusterId: shardInfo.clusterId,
40-
pubsubTopic: shardInfoToPubsubTopics(shardInfo)
62+
it("same cluster, different shard: nodes connect", async function () {
63+
this.timeout(55_000);
64+
65+
const shardInfo1: ShardInfo = {
66+
clusterId: 1,
67+
shards: [1]
68+
};
69+
70+
const shardInfo2: ShardInfo = {
71+
clusterId: 1,
72+
shards: [2]
73+
};
74+
75+
await nwaku1.start({
76+
relay: true,
77+
discv5Discovery: true,
78+
peerExchange: true,
79+
clusterId: shardInfo1.clusterId,
80+
pubsubTopic: shardInfoToPubsubTopics(shardInfo1)
81+
});
82+
83+
const nwaku1Ma = await nwaku1.getMultiaddrWithId();
84+
const nwaku1PeerId = await nwaku1.getPeerId();
85+
86+
waku = await createLightNode({ shardInfo: shardInfo2 });
87+
await waku.start();
88+
await waku.libp2p.dialProtocol(nwaku1Ma, MetadataCodec);
89+
90+
const shardInfoRes =
91+
await waku.libp2p.services.metadata?.query(nwaku1PeerId);
92+
expect(shardInfoRes).to.not.be.undefined;
93+
expect(shardInfoRes?.clusterId).to.equal(shardInfo1.clusterId);
94+
expect(shardInfoRes?.shards).to.deep.equal(shardInfo1.shards);
95+
96+
const activeConnections = waku.libp2p.getConnections();
97+
expect(activeConnections.length).to.equal(1);
4198
});
4299

43-
const nwaku1Ma = await nwaku1.getMultiaddrWithId();
44-
const nwaku1PeerId = await nwaku1.getPeerId();
100+
it("different cluster, same shard: nodes don't connect", async function () {
101+
this.timeout(55_000);
45102

46-
waku = await createLightNode({ shardInfo });
47-
await waku.start();
48-
await waku.libp2p.dialProtocol(nwaku1Ma, MetadataCodec);
103+
const shardInfo1: ShardInfo = {
104+
clusterId: 1,
105+
shards: [1]
106+
};
49107

50-
const shardInfoRes =
51-
await waku.libp2p.services.metadata?.query(nwaku1PeerId);
52-
expect(shardInfoRes).to.not.be.undefined;
53-
expect(shardInfoRes?.clusterId).to.equal(shardInfo.clusterId);
54-
expect(shardInfoRes?.shards).to.deep.equal(shardInfo.shards);
108+
const shardInfo2: ShardInfo = {
109+
clusterId: 2,
110+
shards: [1]
111+
};
55112

56-
const activeConnections = waku.libp2p.getConnections();
57-
expect(activeConnections.length).to.equal(1);
58-
});
113+
await nwaku1.start({
114+
relay: true,
115+
discv5Discovery: true,
116+
peerExchange: true,
117+
clusterId: shardInfo1.clusterId,
118+
pubsubTopic: shardInfoToPubsubTopics(shardInfo1)
119+
});
59120

60-
it("same cluster, different shard: nodes connect", async function () {
61-
this.timeout(55_000);
121+
const nwaku1Ma = await nwaku1.getMultiaddrWithId();
62122

63-
const shardInfo1: ShardInfo = {
64-
clusterId: 1,
65-
shards: [1]
66-
};
123+
waku = await createLightNode({ shardInfo: shardInfo2 });
124+
await waku.start();
125+
await waku.libp2p.dialProtocol(nwaku1Ma, MetadataCodec);
67126

68-
const shardInfo2: ShardInfo = {
69-
clusterId: 1,
70-
shards: [2]
71-
};
127+
// add a delay to make sure the connection is closed from the other side
128+
await delay(100);
72129

73-
await nwaku1.start({
74-
relay: true,
75-
discv5Discovery: true,
76-
peerExchange: true,
77-
clusterId: shardInfo1.clusterId,
78-
pubsubTopic: shardInfoToPubsubTopics(shardInfo1)
130+
const activeConnections = waku.libp2p.getConnections();
131+
expect(activeConnections.length).to.equal(0);
79132
});
80133

81-
const nwaku1Ma = await nwaku1.getMultiaddrWithId();
82-
const nwaku1PeerId = await nwaku1.getPeerId();
134+
it("different cluster, different shard: nodes don't connect", async function () {
135+
this.timeout(55_000);
83136

84-
waku = await createLightNode({ shardInfo: shardInfo2 });
85-
await waku.start();
86-
await waku.libp2p.dialProtocol(nwaku1Ma, MetadataCodec);
137+
const shardInfo1: ShardInfo = {
138+
clusterId: 1,
139+
shards: [1]
140+
};
87141

88-
const shardInfoRes =
89-
await waku.libp2p.services.metadata?.query(nwaku1PeerId);
90-
expect(shardInfoRes).to.not.be.undefined;
91-
expect(shardInfoRes?.clusterId).to.equal(shardInfo1.clusterId);
92-
expect(shardInfoRes?.shards).to.deep.equal(shardInfo1.shards);
142+
const shardInfo2: ShardInfo = {
143+
clusterId: 2,
144+
shards: [2]
145+
};
93146

94-
const activeConnections = waku.libp2p.getConnections();
95-
expect(activeConnections.length).to.equal(1);
96-
});
147+
await nwaku1.start({
148+
relay: true,
149+
discv5Discovery: true,
150+
peerExchange: true,
151+
clusterId: shardInfo1.clusterId,
152+
pubsubTopic: shardInfoToPubsubTopics(shardInfo1)
153+
});
97154

98-
it("different cluster, same shard: nodes don't connect", async function () {
99-
this.timeout(55_000);
155+
const nwaku1Ma = await nwaku1.getMultiaddrWithId();
100156

101-
const shardInfo1: ShardInfo = {
102-
clusterId: 1,
103-
shards: [1]
104-
};
157+
waku = await createLightNode({ shardInfo: shardInfo2 });
158+
await waku.start();
159+
await waku.libp2p.dialProtocol(nwaku1Ma, MetadataCodec);
105160

106-
const shardInfo2: ShardInfo = {
107-
clusterId: 2,
108-
shards: [1]
109-
};
161+
// add a delay to make sure the connection is closed from the other side
162+
await delay(100);
110163

111-
await nwaku1.start({
112-
relay: true,
113-
discv5Discovery: true,
114-
peerExchange: true,
115-
clusterId: shardInfo1.clusterId,
116-
pubsubTopic: shardInfoToPubsubTopics(shardInfo1)
164+
const activeConnections = waku.libp2p.getConnections();
165+
expect(activeConnections.length).to.equal(0);
117166
});
118-
119-
const nwaku1Ma = await nwaku1.getMultiaddrWithId();
120-
121-
waku = await createLightNode({ shardInfo: shardInfo2 });
122-
await waku.start();
123-
await waku.libp2p.dialProtocol(nwaku1Ma, MetadataCodec);
124-
125-
// add a delay to make sure the connection is closed from the other side
126-
await delay(100);
127-
128-
const activeConnections = waku.libp2p.getConnections();
129-
expect(activeConnections.length).to.equal(0);
130167
});
131168

132-
it("different cluster, different shard: nodes don't connect", async function () {
133-
this.timeout(55_000);
134-
135-
const shardInfo1: ShardInfo = {
169+
it("PeerStore has remote peer's shard info after successful connection", async function () {
170+
const shardInfo: ShardInfo = {
136171
clusterId: 1,
137172
shards: [1]
138173
};
139174

140-
const shardInfo2: ShardInfo = {
141-
clusterId: 2,
142-
shards: [2]
143-
};
144-
145175
await nwaku1.start({
146176
relay: true,
147177
discv5Discovery: true,
148178
peerExchange: true,
149-
clusterId: shardInfo1.clusterId,
150-
pubsubTopic: shardInfoToPubsubTopics(shardInfo1)
179+
clusterId: shardInfo.clusterId,
180+
pubsubTopic: shardInfoToPubsubTopics(shardInfo)
151181
});
152182

153183
const nwaku1Ma = await nwaku1.getMultiaddrWithId();
184+
const nwaku1PeerId = await nwaku1.getPeerId();
154185

155-
waku = await createLightNode({ shardInfo: shardInfo2 });
186+
waku = await createLightNode({ shardInfo });
156187
await waku.start();
157188
await waku.libp2p.dialProtocol(nwaku1Ma, MetadataCodec);
158189

159-
// add a delay to make sure the connection is closed from the other side
160-
await delay(100);
190+
// delay to ensure the connection is estabilished and shardInfo is updated
191+
await delay(500);
192+
193+
const encodedShardInfo = (
194+
await waku.libp2p.peerStore.get(nwaku1PeerId)
195+
).metadata.get("shardInfo");
196+
expect(encodedShardInfo).to.not.be.undefined;
197+
198+
const metadataShardInfo = decodeRelayShard(encodedShardInfo!);
199+
expect(metadataShardInfo).not.be.undefined;
161200

162-
const activeConnections = waku.libp2p.getConnections();
163-
expect(activeConnections.length).to.equal(0);
201+
expect(metadataShardInfo!.clusterId).to.eq(shardInfo.clusterId);
202+
expect(metadataShardInfo.shards).to.deep.eq(shardInfo.shards);
164203
});
165204
});

0 commit comments

Comments
 (0)