Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat!: add and implement IReceiver #1219

Merged
merged 21 commits into from
Mar 31, 2023
Merged
Show file tree
Hide file tree
Changes from 13 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion packages/core/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,11 @@ export * as waku_light_push from "./lib/light_push/index.js";
export { wakuLightPush, LightPushCodec } from "./lib/light_push/index.js";

export * as waku_relay from "./lib/relay/index.js";
export { wakuRelay, RelayCreateOptions } from "./lib/relay/index.js";
export {
wakuRelay,
RelayCreateOptions,
wakuPubSub,
} from "./lib/relay/index.js";

export * as waku_store from "./lib/store/index.js";
export {
Expand Down
30 changes: 26 additions & 4 deletions packages/core/src/lib/filter/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ import type { Libp2p } from "@libp2p/interface-libp2p";
import type { Peer } from "@libp2p/interface-peer-store";
import type { IncomingStreamData } from "@libp2p/interface-registrar";
import type {
ActiveSubscriptions,
Callback,
IDecodedMessage,
IDecoder,
Expand Down Expand Up @@ -58,19 +59,20 @@ class Filter extends BaseProtocol implements IFilter {
}

/**
* @param decoders Array of Decoders to use to decode messages, it also specifies the content topics.
* @param decoders Decoder or array of Decoders to use to decode messages, it also specifies the content topics.
* @param callback A function that will be called on each message returned by the filter.
* @param opts The FilterSubscriptionOpts used to narrow which messages are returned, and which peer to connect to.
* @returns Unsubscribe function that can be used to end the subscription.
*/
async subscribe<T extends IDecodedMessage>(
decoders: IDecoder<T>[],
decoders: IDecoder<T> | IDecoder<T>[],
callback: Callback<T>,
opts?: ProtocolOptions
): Promise<UnsubscribeFunction> {
const decodersArray = Array.isArray(decoders) ? decoders : [decoders];
const { pubSubTopic = DefaultPubSubTopic } = this.options;

const contentTopics = Array.from(groupByContentTopic(decoders).keys());
const contentTopics = Array.from(groupByContentTopic(decodersArray).keys());

const contentFilters = contentTopics.map((contentTopic) => ({
contentTopic,
Expand Down Expand Up @@ -109,7 +111,11 @@ class Filter extends BaseProtocol implements IFilter {
throw e;
}

const subscription: Subscription<T> = { callback, decoders, pubSubTopic };
const subscription: Subscription<T> = {
callback,
decoders: decodersArray,
pubSubTopic,
};
this.subscriptions.set(requestId, subscription);

return async () => {
Expand All @@ -118,6 +124,22 @@ class Filter extends BaseProtocol implements IFilter {
};
}

public getActiveSubscriptions(): ActiveSubscriptions {
const map: ActiveSubscriptions = new Map();
const subscriptions = this.subscriptions as Map<
RequestID,
Subscription<IDecodedMessage>
>;

for (const item of subscriptions.values()) {
const values = map.get(item.pubSubTopic) || [];
const nextValues = item.decoders.map((decoder) => decoder.contentTopic);
map.set(item.pubSubTopic, [...values, ...nextValues]);
}

return map;
}

private onRequest(streamData: IncomingStreamData): void {
log("Receiving message push");
try {
Expand Down
140 changes: 101 additions & 39 deletions packages/core/src/lib/relay/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import {
} from "@chainsafe/libp2p-gossipsub";
import type { PeerIdStr, TopicStr } from "@chainsafe/libp2p-gossipsub/types";
import { SignaturePolicy } from "@chainsafe/libp2p-gossipsub/types";
import type { Libp2p } from "@libp2p/interface-libp2p";
import type {
ActiveSubscriptions,
Callback,
Expand All @@ -20,8 +21,8 @@ import type {
import debug from "debug";

import { DefaultPubSubTopic } from "../constants.js";
import { groupByContentTopic } from "../group_by.js";
import { TopicOnlyDecoder } from "../message/topic_only_message.js";
import { pushOrInitMapSet } from "../push_or_init_map.js";

import * as constants from "./constants.js";
import { messageValidator } from "./message_validator.js";
Expand All @@ -42,30 +43,27 @@ export type ContentTopic = string;
*
* @implements {require('libp2p-interfaces/src/pubsub')}
*/
class Relay extends GossipSub implements IRelay {
class Relay implements IRelay {
private readonly pubSubTopic: string;
defaultDecoder: IDecoder<IDecodedMessage>;
private defaultDecoder: IDecoder<IDecodedMessage>;

public static multicodec: string = constants.RelayCodecs[0];
public readonly gossipSub: GossipSub;

/**
* observers called when receiving new message.
* Observers under key `""` are always called.
*/
private observers: Map<ContentTopic, Set<unknown>>;

constructor(
components: GossipSubComponents,
options?: Partial<RelayCreateOptions>
) {
constructor(libp2p: Libp2p, options?: Partial<RelayCreateOptions>) {
options = Object.assign(options ?? {}, {
// Ensure that no signature is included nor expected in the messages.
globalSignaturePolicy: SignaturePolicy.StrictNoSign,
fallbackToFloodsub: false,
});

super(components, options);
this.multicodecs = constants.RelayCodecs;

this.gossipSub = libp2p.pubsub as GossipSub;
this.pubSubTopic = options?.pubSubTopic ?? DefaultPubSubTopic;

this.observers = new Map();
Expand All @@ -82,8 +80,8 @@ class Relay extends GossipSub implements IRelay {
* @returns {void}
*/
public async start(): Promise<void> {
await super.start();
this.subscribe(this.pubSubTopic);
await this.gossipSub.start();
this.gossipSubSubscribe(this.pubSubTopic);
}

/**
Expand All @@ -96,30 +94,46 @@ class Relay extends GossipSub implements IRelay {
return { recipients: [] };
}

return this.publish(this.pubSubTopic, msg);
return this.gossipSub.publish(this.pubSubTopic, msg);
}

/**
* Add an observer and associated Decoder to process incoming messages on a given content topic.
*
* @returns Function to delete the observer
*/
addObserver<T extends IDecodedMessage>(
decoder: IDecoder<T>,
public subscribe<T extends IDecodedMessage>(
decoders: IDecoder<T> | IDecoder<T>[],
callback: Callback<T>
): () => void {
const observer = {
decoder,
callback,
};
const contentTopic = decoder.contentTopic;
const contentTopicToObservers = Array.isArray(decoders)
? toObservers(decoders, callback)
: toObservers([decoders], callback);

pushOrInitMapSet(this.observers, contentTopic, observer);
for (const contentTopic of contentTopicToObservers.keys()) {
const currObservers = this.observers.get(contentTopic) || new Set();
const newObservers =
contentTopicToObservers.get(contentTopic) || new Set();

this.observers.set(contentTopic, union(currObservers, newObservers));
}

return () => {
const observers = this.observers.get(contentTopic);
if (observers) {
observers.delete(observer);
for (const contentTopic of contentTopicToObservers.keys()) {
const currentObservers = this.observers.get(contentTopic) || new Set();
const observersToRemove =
contentTopicToObservers.get(contentTopic) || new Set();

const nextObservers = leftMinusJoin(
currentObservers,
observersToRemove
);

if (nextObservers.size) {
this.observers.set(contentTopic, nextObservers);
} else {
this.observers.delete(contentTopic);
}
}
};
}
Expand All @@ -130,6 +144,10 @@ class Relay extends GossipSub implements IRelay {
return map;
}

public getMeshPeers(topic?: TopicStr): PeerIdStr[] {
return this.gossipSub.getMeshPeers(topic ?? this.pubSubTopic);
}

private async processIncomingMessage<T extends IDecodedMessage>(
pubSubTopic: string,
bytes: Uint8Array
Expand Down Expand Up @@ -168,8 +186,8 @@ class Relay extends GossipSub implements IRelay {
*
* @override
*/
subscribe(pubSubTopic: string): void {
this.addEventListener(
private gossipSubSubscribe(pubSubTopic: string): void {
this.gossipSub.addEventListener(
"gossipsub:message",
async (event: CustomEvent<GossipsubMessage>) => {
if (event.detail.msg.topic !== pubSubTopic) return;
Expand All @@ -182,24 +200,68 @@ class Relay extends GossipSub implements IRelay {
}
);

this.topicValidators.set(pubSubTopic, messageValidator);
super.subscribe(pubSubTopic);
}

unsubscribe(pubSubTopic: TopicStr): void {
super.unsubscribe(pubSubTopic);
this.topicValidators.delete(pubSubTopic);
}

getMeshPeers(topic?: TopicStr): PeerIdStr[] {
return super.getMeshPeers(topic ?? this.pubSubTopic);
this.gossipSub.topicValidators.set(pubSubTopic, messageValidator);
this.gossipSub.subscribe(pubSubTopic);
}
}

Relay.multicodec = constants.RelayCodecs[constants.RelayCodecs.length - 1];

export function wakuRelay(
init: Partial<ProtocolCreateOptions> = {}
): (libp2p: Libp2p) => IRelay {
return (libp2p: Libp2p) => new Relay(libp2p, init);
}

export function wakuPubSub(
init: Partial<RelayCreateOptions> = {}
): (components: GossipSubComponents) => IRelay {
return (components: GossipSubComponents) => new Relay(components, init);
): (components: GossipSubComponents) => GossipSub {
return (components: GossipSubComponents) => {
const pubsub = new GossipSub(components, init);
pubsub.multicodecs = constants.RelayCodecs;
return pubsub;
};
}

function toObservers<T extends IDecodedMessage>(
decoders: IDecoder<T>[],
callback: Callback<T>
): Map<ContentTopic, Set<Observer<T>>> {
const contentTopicToDecoders = Array.from(
groupByContentTopic(decoders).entries()
);

const contentTopicToObserversEntries = contentTopicToDecoders.map(
([contentTopic, decoders]) =>
[
contentTopic,
new Set(
decoders.map(
(decoder) =>
({
decoder,
callback,
} as Observer<T>)
)
),
] as [ContentTopic, Set<Observer<T>>]
);

return new Map(contentTopicToObserversEntries);
}

function union(left: Set<unknown>, right: Set<unknown>): Set<unknown> {
for (const val of right.values()) {
left.add(val);
}
return left;
}

function leftMinusJoin(left: Set<unknown>, right: Set<unknown>): Set<unknown> {
for (const val of right.values()) {
if (left.has(val)) {
left.delete(val);
}
}
return left;
}
2 changes: 1 addition & 1 deletion packages/core/src/lib/wait_for_remote_peer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ async function waitForGossipSubPeerInMesh(waku: IRelay): Promise<void> {
let peers = waku.getMeshPeers();

while (peers.length == 0) {
await pEvent(waku, "gossipsub:heartbeat");
await pEvent(waku.gossipSub, "gossipsub:heartbeat");
peers = waku.getMeshPeers();
}
}
Expand Down
26 changes: 7 additions & 19 deletions packages/core/src/lib/waku.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
import type { Stream } from "@libp2p/interface-connection";
import type { Libp2p } from "@libp2p/interface-libp2p";
import type { PeerId } from "@libp2p/interface-peer-id";
import type { PubSub } from "@libp2p/interface-pubsub";
import type { Multiaddr } from "@multiformats/multiaddr";
import type {
IFilter,
Expand All @@ -14,7 +13,6 @@ import { Protocols } from "@waku/interfaces";
import debug from "debug";

import { ConnectionManager } from "./connection_manager.js";
import * as relayConstants from "./relay/constants.js";

export const DefaultPingKeepAliveValueSecs = 0;
export const DefaultRelayKeepAliveValueSecs = 5 * 60;
Expand Down Expand Up @@ -57,7 +55,8 @@ export class WakuNode implements Waku {
libp2p: Libp2p,
store?: (libp2p: Libp2p) => IStore,
lightPush?: (libp2p: Libp2p) => ILightPush,
filter?: (libp2p: Libp2p) => IFilter
filter?: (libp2p: Libp2p) => IFilter,
relay?: (libp2p: Libp2p) => IRelay
) {
this.libp2p = libp2p;

Expand All @@ -71,8 +70,8 @@ export class WakuNode implements Waku {
this.lightPush = lightPush(libp2p);
}

if (isRelay(libp2p.pubsub)) {
this.relay = libp2p.pubsub;
if (relay) {
this.relay = relay(libp2p);
}

const pingKeepAlive =
Expand Down Expand Up @@ -120,7 +119,9 @@ export class WakuNode implements Waku {
const codecs: string[] = [];
if (_protocols.includes(Protocols.Relay)) {
if (this.relay) {
this.relay.multicodecs.forEach((codec) => codecs.push(codec));
this.relay.gossipSub.multicodecs.forEach((codec: string) =>
codecs.push(codec)
);
} else {
log(
"Relay codec not included in dial codec: protocol not mounted locally"
Expand Down Expand Up @@ -188,16 +189,3 @@ export class WakuNode implements Waku {
return localMultiaddr + "/p2p/" + this.libp2p.peerId.toString();
}
}

function isRelay(pubsub: PubSub): pubsub is IRelay {
if (pubsub) {
try {
return pubsub.multicodecs.includes(
relayConstants.RelayCodecs[relayConstants.RelayCodecs.length - 1]
);
// Exception is expected if `libp2p` was not instantiated with pubsub
// eslint-disable-next-line no-empty
} catch (e) {}
}
return false;
}
Loading