Skip to content

Commit 59bd5b0

Browse files
temp
1 parent cfe03b6 commit 59bd5b0

File tree

7 files changed

+104
-28
lines changed

7 files changed

+104
-28
lines changed

packages/core/src/index.ts

+1
Original file line numberDiff line numberDiff line change
@@ -27,3 +27,4 @@ export { ConnectionManager } from "./lib/connection_manager.js";
2727

2828
export { KeepAliveManager } from "./lib/keep_alive_manager.js";
2929
export { StreamManager } from "./lib/stream_manager.js";
30+
export { wakuMetadata, MetadataCodec } from "./lib/metadata/index.js";
+72-15
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,12 @@
11
import type { PeerId } from "@libp2p/interface/peer-id";
2+
import type { IncomingStreamData } from "@libp2p/interface-internal/registrar";
23
import type {
34
IMetadata,
4-
Libp2p,
5+
Libp2pComponents,
56
MetadataQueryParams,
67
ShardInfo
78
} from "@waku/interfaces";
9+
import { proto_metadata as proto } from "@waku/proto";
810
import { Logger } from "@waku/utils";
911
import all from "it-all";
1012
import * as lp from "it-length-prefixed";
@@ -13,29 +15,77 @@ import { Uint8ArrayList } from "uint8arraylist";
1315

1416
import { BaseProtocol } from "../base_protocol.js";
1517

16-
import { MetadataRpc } from "./rpc.js";
17-
1818
const log = new Logger("metadata");
1919

2020
export const MetadataCodec = "/vac/waku/metadata/1.0.0";
2121

2222
class Metadata extends BaseProtocol {
23-
constructor(libp2p: Libp2p) {
24-
super(MetadataCodec, libp2p.components);
23+
constructor(
24+
libp2pComponents: Libp2pComponents,
25+
private shardInfo: ShardInfo
26+
) {
27+
super(MetadataCodec, libp2pComponents);
28+
29+
libp2pComponents.registrar
30+
.handle(MetadataCodec, (streamData) => {
31+
return void this.handle(streamData);
32+
})
33+
.then(() => "yes")
34+
.catch(() => "no");
35+
}
36+
37+
private async handle(streamData: IncomingStreamData): Promise<ShardInfo> {
38+
const encodedRpcQuery = proto.WakuMetadataRequest.encode(this.shardInfo);
39+
40+
const res = await pipe(
41+
[encodedRpcQuery],
42+
lp.encode,
43+
streamData.stream,
44+
lp.decode,
45+
async (source) => await all(source)
46+
);
47+
48+
try {
49+
const bytes = new Uint8ArrayList();
50+
res.forEach((chunk) => {
51+
bytes.append(chunk);
52+
});
53+
54+
const response = proto.WakuMetadataResponse.decode(bytes);
55+
if (!response) {
56+
throw new Error("WakuMetadata response is undefined");
57+
}
58+
59+
const { shards, clusterId } = response;
60+
61+
if (!clusterId) {
62+
throw new Error("WakuMetadata response clusterId is undefined");
63+
}
64+
65+
return {
66+
clusterId,
67+
shards
68+
};
69+
} catch (e) {
70+
log.error(`Error decoding response: ${e}`);
71+
throw e;
72+
}
2573
}
2674

2775
/**
2876
* Make a metadata query to a peer
2977
*/
3078
async query(params: MetadataQueryParams, peerId: PeerId): Promise<ShardInfo> {
31-
const rpcQuery = MetadataRpc.createRequest(params.clusterId, params.shards);
79+
const encodedRpcQuery = proto.WakuMetadataRequest.encode({
80+
clusterId: params.clusterId,
81+
shards: params.shards
82+
});
3283

3384
const peer = await this.getPeer(peerId);
34-
3585
const stream = await this.getStream(peer);
3686

3787
const res = await pipe(
38-
[rpcQuery.encode()],
88+
[encodedRpcQuery],
3989
lp.encode,
4090
stream,
4191
lp.decode,
@@ -48,23 +98,30 @@ class Metadata extends BaseProtocol {
4898
bytes.append(chunk);
4999
});
50100

51-
const { response } = MetadataRpc.decode(bytes);
101+
const response = proto.WakuMetadataResponse.decode(bytes);
52102
if (!response) {
53-
throw new Error("No response in query");
103+
throw new Error("WakuMetadata response is undefined");
54104
}
55105

56106
const { shards, clusterId } = response;
107+
if (!clusterId) {
108+
throw new Error("WakuMetadata response clusterId is undefined");
109+
}
110+
57111
return {
58-
cluster: clusterId,
59-
indexList: shards
60-
} as ShardInfo;
112+
clusterId,
113+
shards
114+
};
61115
} catch (e) {
62116
log.error(`Error decoding response: ${e}`);
63117
throw e;
64118
}
65119
}
66120
}
67121

68-
export function wakuMetadata(): (libp2p: Libp2p) => IMetadata {
69-
return (libp2p: Libp2p) => new Metadata(libp2p);
122+
export function wakuMetadata(
123+
shardInfo: ShardInfo
124+
): (components: Libp2pComponents) => IMetadata {
125+
return (libp2pComponents: Libp2pComponents) =>
126+
new Metadata(libp2pComponents, shardInfo);
70127
}

packages/enr/src/relay_shard_codec.ts

+13-13
Original file line numberDiff line numberDiff line change
@@ -8,51 +8,51 @@ export const decodeRelayShard = (bytes: Uint8Array): ShardInfo => {
88
if (bytes.length < 3) throw new Error("Insufficient data");
99

1010
const view = new DataView(bytes.buffer);
11-
const cluster = view.getUint16(0);
11+
const clusterId = view.getUint16(0);
1212

13-
const indexList = [];
13+
const shards = [];
1414

1515
if (bytes.length === 130) {
1616
// rsv format (Bit Vector)
1717
for (let i = 0; i < 1024; i++) {
1818
const byteIndex = Math.floor(i / 8) + 2; // Adjusted for the 2-byte cluster field
1919
const bitIndex = 7 - (i % 8);
2020
if (view.getUint8(byteIndex) & (1 << bitIndex)) {
21-
indexList.push(i);
21+
shards.push(i);
2222
}
2323
}
2424
} else {
2525
// rs format (Index List)
2626
const numIndices = view.getUint8(2);
2727
for (let i = 0, offset = 3; i < numIndices; i++, offset += 2) {
2828
if (offset + 1 >= bytes.length) throw new Error("Unexpected end of data");
29-
indexList.push(view.getUint16(offset));
29+
shards.push(view.getUint16(offset));
3030
}
3131
}
3232

33-
return { cluster, indexList };
33+
return { clusterId, shards };
3434
};
3535

3636
export const encodeRelayShard = (shardInfo: ShardInfo): Uint8Array => {
37-
const { cluster, indexList } = shardInfo;
38-
const totalLength = indexList.length >= 64 ? 130 : 3 + 2 * indexList.length;
37+
const { clusterId, shards } = shardInfo;
38+
const totalLength = shards.length >= 64 ? 130 : 3 + 2 * shards.length;
3939
const buffer = new ArrayBuffer(totalLength);
4040
const view = new DataView(buffer);
4141

42-
view.setUint16(0, cluster);
42+
view.setUint16(0, clusterId);
4343

44-
if (indexList.length >= 64) {
44+
if (shards.length >= 64) {
4545
// rsv format (Bit Vector)
46-
for (const index of indexList) {
46+
for (const index of shards) {
4747
const byteIndex = Math.floor(index / 8) + 2; // Adjusted for the 2-byte cluster field
4848
const bitIndex = 7 - (index % 8);
4949
view.setUint8(byteIndex, view.getUint8(byteIndex) | (1 << bitIndex));
5050
}
5151
} else {
5252
// rs format (Index List)
53-
view.setUint8(2, indexList.length);
54-
for (let i = 0, offset = 3; i < indexList.length; i++, offset += 2) {
55-
view.setUint16(offset, indexList[i]);
53+
view.setUint8(2, shards.length);
54+
for (let i = 0, offset = 3; i < shards.length; i++, offset += 2) {
55+
view.setUint16(offset, shards[i]);
5656
}
5757
}
5858

packages/interfaces/src/libp2p.ts

+3
Original file line numberDiff line numberDiff line change
@@ -4,10 +4,13 @@ import type { Libp2pInit } from "libp2p";
44
import type { identifyService } from "libp2p/identify";
55
import type { PingService } from "libp2p/ping";
66

7+
import type { IMetadata } from "./metadata";
8+
79
export type Libp2pServices = {
810
ping: PingService;
911
pubsub?: GossipSub;
1012
identify: ReturnType<ReturnType<typeof identifyService>>;
13+
metadata: IMetadata;
1114
};
1215

1316
// TODO: Get libp2p to export this.

packages/sdk/src/create.ts

+2
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ import {
99
DefaultUserAgent,
1010
wakuFilter,
1111
wakuLightPush,
12+
wakuMetadata,
1213
WakuNode,
1314
WakuOptions,
1415
wakuStore
@@ -203,6 +204,7 @@ export async function defaultLibp2p(
203204
agentVersion: userAgent ?? DefaultUserAgent
204205
}),
205206
ping: pingService(),
207+
metadata: wakuMetadata(),
206208
...pubsubService,
207209
...options?.services
208210
}

packages/tests/src/node/interfaces.ts

+1
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ export interface Args {
2424
discv5UdpPort?: number;
2525
// `legacyFilter` is required to enable filter v1 with go-waku
2626
legacyFilter?: boolean;
27+
clusterId?: number;
2728
}
2829

2930
export enum LogLevel {

packages/tests/tests/peer_exchange.node.spec.ts

+12
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,18 @@ describe("Peer Exchange", () => {
5252

5353
waku = await createLightNode();
5454
await waku.start();
55+
56+
waku.libp2p.addEventListener("connection:open", (event) => {
57+
console.log("conn open");
58+
console.log(event.detail.streams);
59+
});
60+
61+
waku.libp2p.addEventListener("connection:close", (event) => {
62+
console.log("conn close");
63+
console.log(event.detail);
64+
console.log("curr conns", waku.libp2p.getConnections().length);
65+
});
66+
5567
await waku.libp2p.dialProtocol(nwaku2Ma, PeerExchangeCodec);
5668

5769
const components = waku.libp2p.components as unknown as Libp2pComponents;

0 commit comments

Comments
 (0)