Skip to content

Commit 8b5f328

Browse files
Merge branch 'feat/metadata-protocol' of github.com:waku-org/js-waku into feat/metadata-protocol
2 parents c30991e + 59bd5b0 commit 8b5f328

File tree

6 files changed

+91
-15
lines changed

6 files changed

+91
-15
lines changed

packages/core/src/index.ts

+1
Original file line numberDiff line numberDiff line change
@@ -27,3 +27,4 @@ export { ConnectionManager } from "./lib/connection_manager.js";
2727

2828
export { KeepAliveManager } from "./lib/keep_alive_manager.js";
2929
export { StreamManager } from "./lib/stream_manager.js";
30+
export { wakuMetadata, MetadataCodec } from "./lib/metadata/index.js";
+72-15
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,12 @@
11
import type { PeerId } from "@libp2p/interface/peer-id";
2+
import type { IncomingStreamData } from "@libp2p/interface-internal/registrar";
23
import type {
34
IMetadata,
4-
Libp2p,
5+
Libp2pComponents,
56
MetadataQueryParams,
67
ShardInfo
78
} from "@waku/interfaces";
9+
import { proto_metadata as proto } from "@waku/proto";
810
import { Logger } from "@waku/utils";
911
import all from "it-all";
1012
import * as lp from "it-length-prefixed";
@@ -13,29 +15,77 @@ import { Uint8ArrayList } from "uint8arraylist";
1315

1416
import { BaseProtocol } from "../base_protocol.js";
1517

16-
import { MetadataRpc } from "./rpc.js";
17-
1818
const log = new Logger("metadata");
1919

2020
export const MetadataCodec = "/vac/waku/metadata/1.0.0";
2121

2222
class Metadata extends BaseProtocol {
23-
constructor(libp2p: Libp2p) {
24-
super(MetadataCodec, libp2p.components);
23+
constructor(
24+
libp2pComponents: Libp2pComponents,
25+
private shardInfo: ShardInfo
26+
) {
27+
super(MetadataCodec, libp2pComponents);
28+
29+
libp2pComponents.registrar
30+
.handle(MetadataCodec, (streamData) => {
31+
return void this.handle(streamData);
32+
})
33+
.then(() => "yes")
34+
.catch(() => "no");
35+
}
36+
37+
private async handle(streamData: IncomingStreamData): Promise<ShardInfo> {
38+
const encodedRpcQuery = proto.WakuMetadataRequest.encode(this.shardInfo);
39+
40+
const res = await pipe(
41+
[encodedRpcQuery],
42+
lp.encode,
43+
streamData.stream,
44+
lp.decode,
45+
async (source) => await all(source)
46+
);
47+
48+
try {
49+
const bytes = new Uint8ArrayList();
50+
res.forEach((chunk) => {
51+
bytes.append(chunk);
52+
});
53+
54+
const response = proto.WakuMetadataResponse.decode(bytes);
55+
if (!response) {
56+
throw new Error("WakuMetadata response is undefined");
57+
}
58+
59+
const { shards, clusterId } = response;
60+
61+
if (!clusterId) {
62+
throw new Error("WakuMetadata response clusterId is undefined");
63+
}
64+
65+
return {
66+
clusterId,
67+
shards
68+
};
69+
} catch (e) {
70+
log.error(`Error decoding response: ${e}`);
71+
throw e;
72+
}
2573
}
2674

2775
/**
2876
* Make a metadata query to a peer
2977
*/
3078
async query(params: MetadataQueryParams, peerId: PeerId): Promise<ShardInfo> {
31-
const rpcQuery = MetadataRpc.createRequest(params.clusterId, params.shards);
79+
const encodedRpcQuery = proto.WakuMetadataRequest.encode({
80+
clusterId: params.clusterId,
81+
shards: params.shards
82+
});
3283

3384
const peer = await this.getPeer(peerId);
34-
3585
const stream = await this.getStream(peer);
3686

3787
const res = await pipe(
38-
[rpcQuery.encode()],
88+
[encodedRpcQuery],
3989
lp.encode,
4090
stream,
4191
lp.decode,
@@ -48,23 +98,30 @@ class Metadata extends BaseProtocol {
4898
bytes.append(chunk);
4999
});
50100

51-
const { response } = MetadataRpc.decode(bytes);
101+
const response = proto.WakuMetadataResponse.decode(bytes);
52102
if (!response) {
53-
throw new Error("No response in query");
103+
throw new Error("WakuMetadata response is undefined");
54104
}
55105

56106
const { shards, clusterId } = response;
107+
if (!clusterId) {
108+
throw new Error("WakuMetadata response clusterId is undefined");
109+
}
110+
57111
return {
58-
cluster: clusterId,
59-
indexList: shards
60-
} as ShardInfo;
112+
clusterId,
113+
shards
114+
};
61115
} catch (e) {
62116
log.error(`Error decoding response: ${e}`);
63117
throw e;
64118
}
65119
}
66120
}
67121

68-
export function wakuMetadata(): (libp2p: Libp2p) => IMetadata {
69-
return (libp2p: Libp2p) => new Metadata(libp2p);
122+
export function wakuMetadata(
123+
shardInfo: ShardInfo
124+
): (components: Libp2pComponents) => IMetadata {
125+
return (libp2pComponents: Libp2pComponents) =>
126+
new Metadata(libp2pComponents, shardInfo);
70127
}

packages/interfaces/src/libp2p.ts

+3
Original file line numberDiff line numberDiff line change
@@ -4,10 +4,13 @@ import type { Libp2pInit } from "libp2p";
44
import type { identifyService } from "libp2p/identify";
55
import type { PingService } from "libp2p/ping";
66

7+
import type { IMetadata } from "./metadata";
8+
79
export type Libp2pServices = {
810
ping: PingService;
911
pubsub?: GossipSub;
1012
identify: ReturnType<ReturnType<typeof identifyService>>;
13+
metadata: IMetadata;
1114
};
1215

1316
// TODO: Get libp2p to export this.

packages/sdk/src/create.ts

+2
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ import {
99
DefaultUserAgent,
1010
wakuFilter,
1111
wakuLightPush,
12+
wakuMetadata,
1213
WakuNode,
1314
WakuOptions,
1415
wakuStore
@@ -203,6 +204,7 @@ export async function defaultLibp2p(
203204
agentVersion: userAgent ?? DefaultUserAgent
204205
}),
205206
ping: pingService(),
207+
metadata: wakuMetadata(),
206208
...pubsubService,
207209
...options?.services
208210
}

packages/tests/src/node/interfaces.ts

+1
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ export interface Args {
2424
discv5UdpPort?: number;
2525
// `legacyFilter` is required to enable filter v1 with go-waku
2626
legacyFilter?: boolean;
27+
clusterId?: number;
2728
}
2829

2930
export enum LogLevel {

packages/tests/tests/peer_exchange.node.spec.ts

+12
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,18 @@ describe("Peer Exchange", () => {
5252

5353
waku = await createLightNode();
5454
await waku.start();
55+
56+
waku.libp2p.addEventListener("connection:open", (event) => {
57+
console.log("conn open");
58+
console.log(event.detail.streams);
59+
});
60+
61+
waku.libp2p.addEventListener("connection:close", (event) => {
62+
console.log("conn close");
63+
console.log(event.detail);
64+
console.log("curr conns", waku.libp2p.getConnections().length);
65+
});
66+
5567
await waku.libp2p.dialProtocol(nwaku2Ma, PeerExchangeCodec);
5668

5769
const components = waku.libp2p.components as unknown as Libp2pComponents;

0 commit comments

Comments
 (0)