Skip to content

Commit d2a69e6

Browse files
refactor implementation & write test
1 parent fa18b1e commit d2a69e6

File tree

5 files changed

+118
-76
lines changed

5 files changed

+118
-76
lines changed

packages/core/src/index.ts

+2
Original file line numberDiff line numberDiff line change
@@ -27,3 +27,5 @@ export { ConnectionManager } from "./lib/connection_manager.js";
2727

2828
export { KeepAliveManager } from "./lib/keep_alive_manager.js";
2929
export { StreamManager } from "./lib/stream_manager.js";
30+
31+
export { MetadataCodec, wakuMetadata } from "./lib/metadata/index.js";
+61-29
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,7 @@
11
import type { PeerId } from "@libp2p/interface/peer-id";
2-
import type {
3-
IMetadata,
4-
Libp2p,
5-
MetadataQueryParams,
6-
ShardInfo
7-
} from "@waku/interfaces";
2+
import { IncomingStreamData } from "@libp2p/interface/stream-handler";
3+
import type { IMetadata, Libp2p, ShardInfo } from "@waku/interfaces";
4+
import { proto_metadata } from "@waku/proto";
85
import { Logger } from "@waku/utils";
96
import all from "it-all";
107
import * as lp from "it-length-prefixed";
@@ -13,31 +10,32 @@ import { Uint8ArrayList } from "uint8arraylist";
1310

1411
import { BaseProtocol } from "../base_protocol.js";
1512

16-
import { MetadataRpc } from "./rpc.js";
17-
1813
const log = new Logger("metadata");
1914

2015
export const MetadataCodec = "/vac/waku/metadata/1.0.0";
2116

2217
class Metadata extends BaseProtocol {
23-
constructor(libp2p: Libp2p) {
18+
private readonly shardInfo: ShardInfo;
19+
constructor(shardInfo: ShardInfo, libp2p: Libp2p) {
2420
super(MetadataCodec, libp2p.components);
21+
this.shardInfo = shardInfo;
22+
void libp2p.handle(MetadataCodec, (streamData) => {
23+
void this.onRequest(streamData);
24+
});
2525
}
2626

2727
/**
28-
* Make a metadata query to a peer
28+
* Handle an incoming metadata request
2929
*/
30-
async query(params: MetadataQueryParams, peerId: PeerId): Promise<ShardInfo> {
31-
const rpcQuery = MetadataRpc.createRequest(params.clusterId, params.shards);
32-
33-
const peer = await this.getPeer(peerId);
34-
35-
const stream = await this.getStream(peer);
30+
private async onRequest(streamData: IncomingStreamData): Promise<void> {
31+
const encodedRpcQuery = proto_metadata.WakuMetadataRequest.encode(
32+
this.shardInfo
33+
);
3634

3735
const res = await pipe(
38-
[rpcQuery.encode()],
36+
[encodedRpcQuery],
3937
lp.encode,
40-
stream,
38+
streamData.stream,
4139
lp.decode,
4240
async (source) => await all(source)
4341
);
@@ -48,23 +46,57 @@ class Metadata extends BaseProtocol {
4846
bytes.append(chunk);
4947
});
5048

51-
const { response } = MetadataRpc.decode(bytes);
52-
if (!response) {
53-
throw new Error("No response in query");
49+
const shardInfoRes = proto_metadata.WakuMetadataResponse.decode(bytes);
50+
if (!shardInfoRes) {
51+
throw new Error("WakuMetadata response is undefined");
52+
}
53+
if (!shardInfoRes.clusterId) {
54+
throw new Error("WakuMetadata response clusterId is undefined");
5455
}
55-
56-
const { shards, clusterId } = response;
57-
return {
58-
cluster: clusterId,
59-
indexList: shards
60-
} as ShardInfo;
6156
} catch (e) {
6257
log.error(`Error decoding response: ${e}`);
6358
throw e;
6459
}
6560
}
61+
62+
/**
63+
* Make a metadata query to a peer
64+
*/
65+
async query(peerId: PeerId): Promise<ShardInfo> {
66+
const request = proto_metadata.WakuMetadataRequest.encode(this.shardInfo);
67+
68+
try {
69+
const peer = await this.getPeer(peerId);
70+
71+
const stream = await this.getStream(peer);
72+
73+
const res = await pipe(
74+
[request],
75+
lp.encode,
76+
stream,
77+
lp.decode,
78+
async (source) => await all(source)
79+
);
80+
81+
const bytes = new Uint8ArrayList();
82+
res.forEach((chunk) => {
83+
bytes.append(chunk);
84+
});
85+
const response = proto_metadata.WakuMetadataResponse.decode(bytes);
86+
if (!response) {
87+
throw new Error("Error decoding metadata response");
88+
}
89+
90+
return response as ShardInfo;
91+
} catch (err) {
92+
log.error("Error decoding metadata response", err);
93+
throw err;
94+
}
95+
}
6696
}
6797

68-
export function wakuMetadata(): (libp2p: Libp2p) => IMetadata {
69-
return (libp2p: Libp2p) => new Metadata(libp2p);
98+
export function wakuMetadata(
99+
shardInfo: ShardInfo
100+
): (libp2p: Libp2p) => IMetadata {
101+
return (libp2p: Libp2p) => new Metadata(shardInfo, libp2p);
70102
}

packages/core/src/lib/metadata/rpc.ts

-38
This file was deleted.

packages/interfaces/src/metadata.ts

+1-9
Original file line numberDiff line numberDiff line change
@@ -3,14 +3,6 @@ import type { PeerId } from "@libp2p/interface/peer-id";
33
import type { ShardInfo } from "./enr.js";
44
import type { IBaseProtocol } from "./protocols.js";
55

6-
export interface MetadataQueryParams {
7-
clusterId?: number;
8-
shards: number[];
9-
}
10-
116
export interface IMetadata extends IBaseProtocol {
12-
query(
13-
params: MetadataQueryParams,
14-
peerId: PeerId
15-
): Promise<ShardInfo | undefined>;
7+
query(peerId: PeerId): Promise<ShardInfo | undefined>;
168
}

packages/tests/tests/metadata.spec.ts

+54
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,54 @@
1+
import { MetadataCodec, wakuMetadata } from "@waku/core";
2+
import type { LightNode, ShardInfo } from "@waku/interfaces";
3+
import { createLightNode } from "@waku/sdk";
4+
import { shardInfoToPubsubTopics } from "@waku/utils";
5+
import { expect } from "chai";
6+
7+
import { tearDownNodes } from "../src/index.js";
8+
import { makeLogFileName } from "../src/log_file.js";
9+
import { NimGoNode } from "../src/node/node.js";
10+
11+
describe("Metadata Protocol", () => {
12+
describe("Locally Run Nodes", () => {
13+
let waku: LightNode;
14+
let nwaku1: NimGoNode;
15+
const shardInfo: ShardInfo = {
16+
clusterId: 1,
17+
shards: [1]
18+
};
19+
20+
beforeEach(function () {
21+
nwaku1 = new NimGoNode(makeLogFileName(this) + "1");
22+
});
23+
24+
afterEach(async function () {
25+
this.timeout(15000);
26+
await tearDownNodes([nwaku1], waku);
27+
});
28+
29+
it("interop", async function () {
30+
this.timeout(55_000);
31+
32+
await nwaku1.start({
33+
relay: true,
34+
discv5Discovery: true,
35+
peerExchange: true,
36+
clusterId: shardInfo.clusterId,
37+
pubsubTopic: shardInfoToPubsubTopics(shardInfo)
38+
});
39+
40+
const nwaku1Ma = await nwaku1.getMultiaddrWithId();
41+
const nwaku1PeerId = await nwaku1.getPeerId();
42+
43+
waku = await createLightNode();
44+
await waku.start();
45+
const metadata = wakuMetadata(shardInfo)(waku.libp2p);
46+
await waku.libp2p.dialProtocol(nwaku1Ma, MetadataCodec);
47+
48+
const shardInfoRes = await metadata.query(nwaku1PeerId);
49+
expect(shardInfoRes).to.not.be.undefined;
50+
expect(shardInfoRes?.clusterId).to.equal(shardInfo.clusterId);
51+
expect(shardInfoRes?.shards).to.deep.equal(shardInfo.shards);
52+
});
53+
});
54+
});

0 commit comments

Comments
 (0)