Skip to content

Commit 9ac2a3f

Browse files
feat: metadata protocol (#1732)
* add proto * add rpc and interfaces * add protocol implementation * update faulty proto def * add rpc and interfaces * refactor implementation & write test * setup the metadata protocol as a service * fix cases where metadata service needs to be undefined * remove redundant catch block * remove addressed TODO * update import path * log errors * remove redundant code from handling incoming metadata request * update tests * add test to check for active connections * change expects * save remote peer's shard info after successful connection
1 parent 12a5534 commit 9ac2a3f

File tree

10 files changed

+501
-1
lines changed

10 files changed

+501
-1
lines changed

packages/core/src/index.ts

+2
Original file line numberDiff line numberDiff line change
@@ -27,3 +27,5 @@ export { ConnectionManager } from "./lib/connection_manager.js";
2727

2828
export { KeepAliveManager } from "./lib/keep_alive_manager.js";
2929
export { StreamManager } from "./lib/stream_manager.js";
30+
31+
export { MetadataCodec, wakuMetadata } from "./lib/metadata/index.js";
+105
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,105 @@
1+
import type { PeerId } from "@libp2p/interface/peer-id";
2+
import { IncomingStreamData } from "@libp2p/interface/stream-handler";
3+
import { encodeRelayShard } from "@waku/enr";
4+
import type { IMetadata, Libp2pComponents, ShardInfo } from "@waku/interfaces";
5+
import { proto_metadata } from "@waku/proto";
6+
import { Logger } from "@waku/utils";
7+
import all from "it-all";
8+
import * as lp from "it-length-prefixed";
9+
import { pipe } from "it-pipe";
10+
import { Uint8ArrayList } from "uint8arraylist";
11+
12+
import { BaseProtocol } from "../base_protocol.js";
13+
14+
const log = new Logger("metadata");
15+
16+
export const MetadataCodec = "/vac/waku/metadata/1.0.0";
17+
18+
class Metadata extends BaseProtocol {
19+
private readonly shardInfo: ShardInfo;
20+
private libp2pComponents: Libp2pComponents;
21+
constructor(shardInfo: ShardInfo, libp2p: Libp2pComponents) {
22+
super(MetadataCodec, libp2p.components);
23+
this.libp2pComponents = libp2p;
24+
this.shardInfo = shardInfo;
25+
void libp2p.registrar.handle(MetadataCodec, (streamData) => {
26+
void this.onRequest(streamData);
27+
});
28+
}
29+
30+
/**
31+
* Handle an incoming metadata request
32+
*/
33+
private async onRequest(streamData: IncomingStreamData): Promise<void> {
34+
try {
35+
const { stream, connection } = streamData;
36+
const encodedShardInfo = proto_metadata.WakuMetadataResponse.encode(
37+
this.shardInfo
38+
);
39+
40+
const encodedResponse = await pipe(
41+
[encodedShardInfo],
42+
lp.encode,
43+
stream,
44+
lp.decode,
45+
async (source) => await all(source)
46+
);
47+
48+
const remoteShardInfoResponse =
49+
this.decodeMetadataResponse(encodedResponse);
50+
51+
// add or update the shardInfo to peer store
52+
await this.libp2pComponents.peerStore.merge(connection.remotePeer, {
53+
metadata: {
54+
shardInfo: encodeRelayShard(remoteShardInfoResponse)
55+
}
56+
});
57+
} catch (error) {
58+
log.error("Error handling metadata request", error);
59+
}
60+
}
61+
62+
/**
63+
* Make a metadata query to a peer
64+
*/
65+
async query(peerId: PeerId): Promise<ShardInfo> {
66+
const request = proto_metadata.WakuMetadataRequest.encode(this.shardInfo);
67+
68+
const peer = await this.getPeer(peerId);
69+
70+
const stream = await this.getStream(peer);
71+
72+
const encodedResponse = await pipe(
73+
[request],
74+
lp.encode,
75+
stream,
76+
lp.decode,
77+
async (source) => await all(source)
78+
);
79+
80+
const decodedResponse = this.decodeMetadataResponse(encodedResponse);
81+
82+
return decodedResponse;
83+
}
84+
85+
private decodeMetadataResponse(encodedResponse: Uint8ArrayList[]): ShardInfo {
86+
const bytes = new Uint8ArrayList();
87+
88+
encodedResponse.forEach((chunk) => {
89+
bytes.append(chunk);
90+
});
91+
const response = proto_metadata.WakuMetadataResponse.decode(
92+
bytes
93+
) as ShardInfo;
94+
95+
if (!response) log.error("Error decoding metadata response");
96+
97+
return response;
98+
}
99+
}
100+
101+
export function wakuMetadata(
102+
shardInfo: ShardInfo
103+
): (components: Libp2pComponents) => IMetadata {
104+
return (components: Libp2pComponents) => new Metadata(shardInfo, components);
105+
}

packages/interfaces/src/index.ts

+1
Original file line numberDiff line numberDiff line change
@@ -14,3 +14,4 @@ export * from "./misc.js";
1414
export * from "./libp2p.js";
1515
export * from "./keep_alive_manager.js";
1616
export * from "./dns_discovery.js";
17+
export * from "./metadata.js";

packages/interfaces/src/libp2p.ts

+3
Original file line numberDiff line numberDiff line change
@@ -4,8 +4,11 @@ import type { Libp2pInit, Libp2pOptions } from "libp2p";
44
import type { identifyService } from "libp2p/identify";
55
import type { PingService } from "libp2p/ping";
66

7+
import { IMetadata } from "./metadata";
8+
79
export type Libp2pServices = {
810
ping: PingService;
11+
metadata?: IMetadata;
912
pubsub?: GossipSub;
1013
identify: ReturnType<ReturnType<typeof identifyService>>;
1114
};

packages/interfaces/src/metadata.ts

+8
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,8 @@
1+
import type { PeerId } from "@libp2p/interface/peer-id";
2+
3+
import type { ShardInfo } from "./enr.js";
4+
import type { IBaseProtocol } from "./protocols.js";
5+
6+
export interface IMetadata extends IBaseProtocol {
7+
query(peerId: PeerId): Promise<ShardInfo | undefined>;
8+
}

packages/proto/src/index.ts

+2
Original file line numberDiff line numberDiff line change
@@ -13,3 +13,5 @@ export { PushResponse } from "./lib/light_push.js";
1313
export * as proto_store from "./lib/store.js";
1414

1515
export * as proto_peer_exchange from "./lib/peer_exchange.js";
16+
17+
export * as proto_metadata from './lib/metadata.js'

packages/proto/src/lib/metadata.proto

+12
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,12 @@
1+
syntax = "proto3";
2+
3+
4+
message WakuMetadataRequest {
5+
optional uint32 cluster_id = 1;
6+
repeated uint32 shards = 2;
7+
}
8+
9+
message WakuMetadataResponse {
10+
optional uint32 cluster_id = 1;
11+
repeated uint32 shards = 2;
12+
}

packages/proto/src/lib/metadata.ts

+147
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,147 @@
1+
/* eslint-disable import/export */
2+
/* eslint-disable complexity */
3+
/* eslint-disable @typescript-eslint/no-namespace */
4+
/* eslint-disable @typescript-eslint/no-unnecessary-boolean-literal-compare */
5+
/* eslint-disable @typescript-eslint/no-empty-interface */
6+
7+
import { encodeMessage, decodeMessage, message } from 'protons-runtime'
8+
import type { Codec } from 'protons-runtime'
9+
import type { Uint8ArrayList } from 'uint8arraylist'
10+
11+
export interface WakuMetadataRequest {
12+
clusterId?: number
13+
shards: number[]
14+
}
15+
16+
export namespace WakuMetadataRequest {
17+
let _codec: Codec<WakuMetadataRequest>
18+
19+
export const codec = (): Codec<WakuMetadataRequest> => {
20+
if (_codec == null) {
21+
_codec = message<WakuMetadataRequest>((obj, w, opts = {}) => {
22+
if (opts.lengthDelimited !== false) {
23+
w.fork()
24+
}
25+
26+
if (obj.clusterId != null) {
27+
w.uint32(8)
28+
w.uint32(obj.clusterId)
29+
}
30+
31+
if (obj.shards != null) {
32+
for (const value of obj.shards) {
33+
w.uint32(16)
34+
w.uint32(value)
35+
}
36+
}
37+
38+
if (opts.lengthDelimited !== false) {
39+
w.ldelim()
40+
}
41+
}, (reader, length) => {
42+
const obj: any = {
43+
shards: []
44+
}
45+
46+
const end = length == null ? reader.len : reader.pos + length
47+
48+
while (reader.pos < end) {
49+
const tag = reader.uint32()
50+
51+
switch (tag >>> 3) {
52+
case 1:
53+
obj.clusterId = reader.uint32()
54+
break
55+
case 2:
56+
obj.shards.push(reader.uint32())
57+
break
58+
default:
59+
reader.skipType(tag & 7)
60+
break
61+
}
62+
}
63+
64+
return obj
65+
})
66+
}
67+
68+
return _codec
69+
}
70+
71+
export const encode = (obj: Partial<WakuMetadataRequest>): Uint8Array => {
72+
return encodeMessage(obj, WakuMetadataRequest.codec())
73+
}
74+
75+
export const decode = (buf: Uint8Array | Uint8ArrayList): WakuMetadataRequest => {
76+
return decodeMessage(buf, WakuMetadataRequest.codec())
77+
}
78+
}
79+
80+
export interface WakuMetadataResponse {
81+
clusterId?: number
82+
shards: number[]
83+
}
84+
85+
export namespace WakuMetadataResponse {
86+
let _codec: Codec<WakuMetadataResponse>
87+
88+
export const codec = (): Codec<WakuMetadataResponse> => {
89+
if (_codec == null) {
90+
_codec = message<WakuMetadataResponse>((obj, w, opts = {}) => {
91+
if (opts.lengthDelimited !== false) {
92+
w.fork()
93+
}
94+
95+
if (obj.clusterId != null) {
96+
w.uint32(8)
97+
w.uint32(obj.clusterId)
98+
}
99+
100+
if (obj.shards != null) {
101+
for (const value of obj.shards) {
102+
w.uint32(16)
103+
w.uint32(value)
104+
}
105+
}
106+
107+
if (opts.lengthDelimited !== false) {
108+
w.ldelim()
109+
}
110+
}, (reader, length) => {
111+
const obj: any = {
112+
shards: []
113+
}
114+
115+
const end = length == null ? reader.len : reader.pos + length
116+
117+
while (reader.pos < end) {
118+
const tag = reader.uint32()
119+
120+
switch (tag >>> 3) {
121+
case 1:
122+
obj.clusterId = reader.uint32()
123+
break
124+
case 2:
125+
obj.shards.push(reader.uint32())
126+
break
127+
default:
128+
reader.skipType(tag & 7)
129+
break
130+
}
131+
}
132+
133+
return obj
134+
})
135+
}
136+
137+
return _codec
138+
}
139+
140+
export const encode = (obj: Partial<WakuMetadataResponse>): Uint8Array => {
141+
return encodeMessage(obj, WakuMetadataResponse.codec())
142+
}
143+
144+
export const decode = (buf: Uint8Array | Uint8ArrayList): WakuMetadataResponse => {
145+
return decodeMessage(buf, WakuMetadataResponse.codec())
146+
}
147+
}

packages/sdk/src/create.ts

+17-1
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ import {
88
DefaultUserAgent,
99
wakuFilter,
1010
wakuLightPush,
11+
wakuMetadata,
1112
WakuNode,
1213
WakuOptions,
1314
wakuStore
@@ -16,11 +17,13 @@ import { enrTree, wakuDnsDiscovery } from "@waku/dns-discovery";
1617
import type {
1718
CreateLibp2pOptions,
1819
FullNode,
20+
IMetadata,
1921
Libp2p,
2022
Libp2pComponents,
2123
LightNode,
2224
ProtocolCreateOptions,
23-
RelayNode
25+
RelayNode,
26+
ShardInfo
2427
} from "@waku/interfaces";
2528
import { wakuPeerExchangeDiscovery } from "@waku/peer-exchange";
2629
import { RelayCreateOptions, wakuGossipSub, wakuRelay } from "@waku/relay";
@@ -54,6 +57,7 @@ export async function createLightNode(
5457
}
5558

5659
const libp2p = await defaultLibp2p(
60+
options.shardInfo,
5761
undefined,
5862
libp2pOptions,
5963
options?.userAgent
@@ -90,6 +94,7 @@ export async function createRelayNode(
9094
}
9195

9296
const libp2p = await defaultLibp2p(
97+
options.shardInfo,
9398
wakuGossipSub(options),
9499
libp2pOptions,
95100
options?.userAgent
@@ -134,6 +139,7 @@ export async function createFullNode(
134139
}
135140

136141
const libp2p = await defaultLibp2p(
142+
options.shardInfo,
137143
wakuGossipSub(options),
138144
libp2pOptions,
139145
options?.userAgent
@@ -169,7 +175,12 @@ type PubsubService = {
169175
pubsub?: (components: Libp2pComponents) => GossipSub;
170176
};
171177

178+
type MetadataService = {
179+
metadata?: (components: Libp2pComponents) => IMetadata;
180+
};
181+
172182
export async function defaultLibp2p(
183+
shardInfo?: ShardInfo,
173184
wakuGossipSub?: PubsubService["pubsub"],
174185
options?: Partial<CreateLibp2pOptions>,
175186
userAgent?: string
@@ -191,6 +202,10 @@ export async function defaultLibp2p(
191202
? { pubsub: wakuGossipSub }
192203
: {};
193204

205+
const metadataService: MetadataService = shardInfo
206+
? { metadata: wakuMetadata(shardInfo) }
207+
: {};
208+
194209
return createLibp2p({
195210
connectionManager: {
196211
minConnections: 1
@@ -204,6 +219,7 @@ export async function defaultLibp2p(
204219
agentVersion: userAgent ?? DefaultUserAgent
205220
}),
206221
ping: pingService(),
222+
...metadataService,
207223
...pubsubService,
208224
...options?.services
209225
}

0 commit comments

Comments
 (0)