Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: metadata protocol #1732

Merged
merged 19 commits into from
Dec 5, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions packages/core/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -27,3 +27,5 @@ export { ConnectionManager } from "./lib/connection_manager.js";

export { KeepAliveManager } from "./lib/keep_alive_manager.js";
export { StreamManager } from "./lib/stream_manager.js";

export { MetadataCodec, wakuMetadata } from "./lib/metadata/index.js";
105 changes: 105 additions & 0 deletions packages/core/src/lib/metadata/index.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,105 @@
import type { PeerId } from "@libp2p/interface/peer-id";
import { IncomingStreamData } from "@libp2p/interface/stream-handler";
import { encodeRelayShard } from "@waku/enr";
import type { IMetadata, Libp2pComponents, ShardInfo } from "@waku/interfaces";
import { proto_metadata } from "@waku/proto";
import { Logger } from "@waku/utils";
import all from "it-all";
import * as lp from "it-length-prefixed";
import { pipe } from "it-pipe";
import { Uint8ArrayList } from "uint8arraylist";

import { BaseProtocol } from "../base_protocol.js";

const log = new Logger("metadata");

export const MetadataCodec = "/vac/waku/metadata/1.0.0";

class Metadata extends BaseProtocol {
private readonly shardInfo: ShardInfo;
private libp2pComponents: Libp2pComponents;
constructor(shardInfo: ShardInfo, libp2p: Libp2pComponents) {
super(MetadataCodec, libp2p.components);
this.libp2pComponents = libp2p;
this.shardInfo = shardInfo;
void libp2p.registrar.handle(MetadataCodec, (streamData) => {
void this.onRequest(streamData);
});
}

/**
* Handle an incoming metadata request
*/
private async onRequest(streamData: IncomingStreamData): Promise<void> {
try {
const { stream, connection } = streamData;
const encodedShardInfo = proto_metadata.WakuMetadataResponse.encode(
this.shardInfo
);

const encodedResponse = await pipe(
[encodedShardInfo],
lp.encode,
stream,
lp.decode,
async (source) => await all(source)
);

const remoteShardInfoResponse =
this.decodeMetadataResponse(encodedResponse);

// add or update the shardInfo to peer store
await this.libp2pComponents.peerStore.merge(connection.remotePeer, {
metadata: {
shardInfo: encodeRelayShard(remoteShardInfoResponse)
}
});
} catch (error) {
log.error("Error handling metadata request", error);
}
}

/**
* Make a metadata query to a peer
*/
async query(peerId: PeerId): Promise<ShardInfo> {
const request = proto_metadata.WakuMetadataRequest.encode(this.shardInfo);

const peer = await this.getPeer(peerId);

const stream = await this.getStream(peer);

const encodedResponse = await pipe(
[request],
lp.encode,
stream,
lp.decode,
async (source) => await all(source)
);

const decodedResponse = this.decodeMetadataResponse(encodedResponse);

return decodedResponse;
}

private decodeMetadataResponse(encodedResponse: Uint8ArrayList[]): ShardInfo {
const bytes = new Uint8ArrayList();

encodedResponse.forEach((chunk) => {
bytes.append(chunk);
});
const response = proto_metadata.WakuMetadataResponse.decode(
bytes
) as ShardInfo;

if (!response) log.error("Error decoding metadata response");

return response;
}
}

export function wakuMetadata(
shardInfo: ShardInfo
): (components: Libp2pComponents) => IMetadata {
return (components: Libp2pComponents) => new Metadata(shardInfo, components);
}
1 change: 1 addition & 0 deletions packages/interfaces/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,3 +14,4 @@ export * from "./misc.js";
export * from "./libp2p.js";
export * from "./keep_alive_manager.js";
export * from "./dns_discovery.js";
export * from "./metadata.js";
3 changes: 3 additions & 0 deletions packages/interfaces/src/libp2p.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,11 @@ import type { Libp2pInit, Libp2pOptions } from "libp2p";
import type { identifyService } from "libp2p/identify";
import type { PingService } from "libp2p/ping";

import { IMetadata } from "./metadata";

export type Libp2pServices = {
ping: PingService;
metadata?: IMetadata;
pubsub?: GossipSub;
identify: ReturnType<ReturnType<typeof identifyService>>;
};
Expand Down
8 changes: 8 additions & 0 deletions packages/interfaces/src/metadata.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
import type { PeerId } from "@libp2p/interface/peer-id";

import type { ShardInfo } from "./enr.js";
import type { IBaseProtocol } from "./protocols.js";

export interface IMetadata extends IBaseProtocol {
query(peerId: PeerId): Promise<ShardInfo | undefined>;
}
2 changes: 2 additions & 0 deletions packages/proto/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,3 +13,5 @@ export { PushResponse } from "./lib/light_push.js";
export * as proto_store from "./lib/store.js";

export * as proto_peer_exchange from "./lib/peer_exchange.js";

export * as proto_metadata from './lib/metadata.js'
12 changes: 12 additions & 0 deletions packages/proto/src/lib/metadata.proto
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
syntax = "proto3";


message WakuMetadataRequest {
optional uint32 cluster_id = 1;
repeated uint32 shards = 2;
}

message WakuMetadataResponse {
optional uint32 cluster_id = 1;
repeated uint32 shards = 2;
}
147 changes: 147 additions & 0 deletions packages/proto/src/lib/metadata.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,147 @@
/* eslint-disable import/export */
/* eslint-disable complexity */
/* eslint-disable @typescript-eslint/no-namespace */
/* eslint-disable @typescript-eslint/no-unnecessary-boolean-literal-compare */
/* eslint-disable @typescript-eslint/no-empty-interface */

import { encodeMessage, decodeMessage, message } from 'protons-runtime'
import type { Codec } from 'protons-runtime'
import type { Uint8ArrayList } from 'uint8arraylist'

export interface WakuMetadataRequest {
clusterId?: number
shards: number[]
}

export namespace WakuMetadataRequest {
let _codec: Codec<WakuMetadataRequest>

export const codec = (): Codec<WakuMetadataRequest> => {
if (_codec == null) {
_codec = message<WakuMetadataRequest>((obj, w, opts = {}) => {
if (opts.lengthDelimited !== false) {
w.fork()
}

if (obj.clusterId != null) {
w.uint32(8)
w.uint32(obj.clusterId)
}

if (obj.shards != null) {
for (const value of obj.shards) {
w.uint32(16)
w.uint32(value)
}
}

if (opts.lengthDelimited !== false) {
w.ldelim()
}
}, (reader, length) => {
const obj: any = {
shards: []
}

const end = length == null ? reader.len : reader.pos + length

while (reader.pos < end) {
const tag = reader.uint32()

switch (tag >>> 3) {
case 1:
obj.clusterId = reader.uint32()
break
case 2:
obj.shards.push(reader.uint32())
break
default:
reader.skipType(tag & 7)
break
}
}

return obj
})
}

return _codec
}

export const encode = (obj: Partial<WakuMetadataRequest>): Uint8Array => {
return encodeMessage(obj, WakuMetadataRequest.codec())
}

export const decode = (buf: Uint8Array | Uint8ArrayList): WakuMetadataRequest => {
return decodeMessage(buf, WakuMetadataRequest.codec())
}
}

export interface WakuMetadataResponse {
clusterId?: number
shards: number[]
}

export namespace WakuMetadataResponse {
let _codec: Codec<WakuMetadataResponse>

export const codec = (): Codec<WakuMetadataResponse> => {
if (_codec == null) {
_codec = message<WakuMetadataResponse>((obj, w, opts = {}) => {
if (opts.lengthDelimited !== false) {
w.fork()
}

if (obj.clusterId != null) {
w.uint32(8)
w.uint32(obj.clusterId)
}

if (obj.shards != null) {
for (const value of obj.shards) {
w.uint32(16)
w.uint32(value)
}
}

if (opts.lengthDelimited !== false) {
w.ldelim()
}
}, (reader, length) => {
const obj: any = {
shards: []
}

const end = length == null ? reader.len : reader.pos + length

while (reader.pos < end) {
const tag = reader.uint32()

switch (tag >>> 3) {
case 1:
obj.clusterId = reader.uint32()
break
case 2:
obj.shards.push(reader.uint32())
break
default:
reader.skipType(tag & 7)
break
}
}

return obj
})
}

return _codec
}

export const encode = (obj: Partial<WakuMetadataResponse>): Uint8Array => {
return encodeMessage(obj, WakuMetadataResponse.codec())
}

export const decode = (buf: Uint8Array | Uint8ArrayList): WakuMetadataResponse => {
return decodeMessage(buf, WakuMetadataResponse.codec())
}
}
18 changes: 17 additions & 1 deletion packages/sdk/src/create.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import {
DefaultUserAgent,
wakuFilter,
wakuLightPush,
wakuMetadata,
WakuNode,
WakuOptions,
wakuStore
Expand All @@ -16,11 +17,13 @@ import { enrTree, wakuDnsDiscovery } from "@waku/dns-discovery";
import type {
CreateLibp2pOptions,
FullNode,
IMetadata,
Libp2p,
Libp2pComponents,
LightNode,
ProtocolCreateOptions,
RelayNode
RelayNode,
ShardInfo
} from "@waku/interfaces";
import { wakuPeerExchangeDiscovery } from "@waku/peer-exchange";
import { RelayCreateOptions, wakuGossipSub, wakuRelay } from "@waku/relay";
Expand Down Expand Up @@ -54,6 +57,7 @@ export async function createLightNode(
}

const libp2p = await defaultLibp2p(
options.shardInfo,
undefined,
libp2pOptions,
options?.userAgent
Expand Down Expand Up @@ -90,6 +94,7 @@ export async function createRelayNode(
}

const libp2p = await defaultLibp2p(
options.shardInfo,
wakuGossipSub(options),
libp2pOptions,
options?.userAgent
Expand Down Expand Up @@ -134,6 +139,7 @@ export async function createFullNode(
}

const libp2p = await defaultLibp2p(
options.shardInfo,
wakuGossipSub(options),
libp2pOptions,
options?.userAgent
Expand Down Expand Up @@ -169,7 +175,12 @@ type PubsubService = {
pubsub?: (components: Libp2pComponents) => GossipSub;
};

type MetadataService = {
metadata?: (components: Libp2pComponents) => IMetadata;
};

export async function defaultLibp2p(
shardInfo?: ShardInfo,
wakuGossipSub?: PubsubService["pubsub"],
options?: Partial<CreateLibp2pOptions>,
userAgent?: string
Expand All @@ -191,6 +202,10 @@ export async function defaultLibp2p(
? { pubsub: wakuGossipSub }
: {};

const metadataService: MetadataService = shardInfo
? { metadata: wakuMetadata(shardInfo) }
: {};

return createLibp2p({
connectionManager: {
minConnections: 1
Expand All @@ -204,6 +219,7 @@ export async function defaultLibp2p(
agentVersion: userAgent ?? DefaultUserAgent
}),
ping: pingService(),
...metadataService,
...pubsubService,
...options?.services
}
Expand Down
Loading