Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat!: add and implement IReceiver #1219

Merged
merged 21 commits into from
Mar 31, 2023
Merged
Show file tree
Hide file tree
Changes from 11 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 26 additions & 4 deletions packages/core/src/lib/filter/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ import type { Libp2p } from "@libp2p/interface-libp2p";
import type { Peer } from "@libp2p/interface-peer-store";
import type { IncomingStreamData } from "@libp2p/interface-registrar";
import type {
ActiveSubscriptions,
Callback,
IDecodedMessage,
IDecoder,
Expand Down Expand Up @@ -58,19 +59,20 @@ class Filter extends BaseProtocol implements IFilter {
}

/**
* @param decoders Array of Decoders to use to decode messages, it also specifies the content topics.
* @param decoders Decoder or array of Decoders to use to decode messages, it also specifies the content topics.
* @param callback A function that will be called on each message returned by the filter.
* @param opts The FilterSubscriptionOpts used to narrow which messages are returned, and which peer to connect to.
* @returns Unsubscribe function that can be used to end the subscription.
*/
async subscribe<T extends IDecodedMessage>(
decoders: IDecoder<T>[],
decoders: IDecoder<T> | IDecoder<T>[],
callback: Callback<T>,
opts?: ProtocolOptions
): Promise<UnsubscribeFunction> {
const decodersArray = Array.isArray(decoders) ? decoders : [decoders];
const { pubSubTopic = DefaultPubSubTopic } = this.options;

const contentTopics = Array.from(groupByContentTopic(decoders).keys());
const contentTopics = Array.from(groupByContentTopic(decodersArray).keys());

const contentFilters = contentTopics.map((contentTopic) => ({
contentTopic,
Expand Down Expand Up @@ -109,7 +111,11 @@ class Filter extends BaseProtocol implements IFilter {
throw e;
}

const subscription: Subscription<T> = { callback, decoders, pubSubTopic };
const subscription: Subscription<T> = {
callback,
decoders: decodersArray,
pubSubTopic,
};
this.subscriptions.set(requestId, subscription);

return async () => {
Expand All @@ -118,6 +124,22 @@ class Filter extends BaseProtocol implements IFilter {
};
}

public getActiveSubscriptions(): ActiveSubscriptions {
const map: ActiveSubscriptions = new Map();
const subscriptions = this.subscriptions as Map<
RequestID,
Subscription<IDecodedMessage>
>;

for (const item of subscriptions.values()) {
const values = map.get(item.pubSubTopic) || [];
const nextValues = item.decoders.map((decoder) => decoder.contentTopic);
map.set(item.pubSubTopic, [...values, ...nextValues]);
}

return map;
}

private onRequest(streamData: IncomingStreamData): void {
log("Receiving message push");
try {
Expand Down
120 changes: 88 additions & 32 deletions packages/core/src/lib/relay/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,8 @@ import type {
import debug from "debug";

import { DefaultPubSubTopic } from "../constants.js";
import { groupByContentTopic } from "../group_by.js";
import { TopicOnlyDecoder } from "../message/topic_only_message.js";
import { pushOrInitMapSet } from "../push_or_init_map.js";

import * as constants from "./constants.js";
import { messageValidator } from "./message_validator.js";
Expand All @@ -42,10 +42,12 @@ export type ContentTopic = string;
*
* @implements {require('libp2p-interfaces/src/pubsub')}
*/
class Relay extends GossipSub implements IRelay {
class Relay implements IRelay {
private readonly pubSubTopic: string;
defaultDecoder: IDecoder<IDecodedMessage>;
private defaultDecoder: IDecoder<IDecodedMessage>;

public static multicodec: string = constants.RelayCodecs[0];
public readonly gossipSub: GossipSub;

/**
* observers called when receiving new message.
Expand All @@ -63,8 +65,8 @@ class Relay extends GossipSub implements IRelay {
fallbackToFloodsub: false,
});

super(components, options);
this.multicodecs = constants.RelayCodecs;
this.gossipSub = new GossipSub(components, options);
this.gossipSub.multicodecs = constants.RelayCodecs;

this.pubSubTopic = options?.pubSubTopic ?? DefaultPubSubTopic;

Expand All @@ -82,8 +84,8 @@ class Relay extends GossipSub implements IRelay {
* @returns {void}
*/
public async start(): Promise<void> {
await super.start();
this.subscribe(this.pubSubTopic);
await this.gossipSub.start();
this.gossipSubSubscribe(this.pubSubTopic);
}

/**
Expand All @@ -96,30 +98,46 @@ class Relay extends GossipSub implements IRelay {
return { recipients: [] };
}

return this.publish(this.pubSubTopic, msg);
return this.gossipSub.publish(this.pubSubTopic, msg);
}

/**
* Add an observer and associated Decoder to process incoming messages on a given content topic.
*
* @returns Function to delete the observer
*/
addObserver<T extends IDecodedMessage>(
decoder: IDecoder<T>,
public subscribe<T extends IDecodedMessage>(
decoders: IDecoder<T> | IDecoder<T>[],
callback: Callback<T>
): () => void {
const observer = {
decoder,
callback,
};
const contentTopic = decoder.contentTopic;
const contentTopicToObservers = Array.isArray(decoders)
? toObservers(decoders, callback)
: toObservers([decoders], callback);

pushOrInitMapSet(this.observers, contentTopic, observer);
for (const contentTopic of contentTopicToObservers.keys()) {
const currObservers = this.observers.get(contentTopic) || new Set();
const newObservers =
contentTopicToObservers.get(contentTopic) || new Set();

this.observers.set(contentTopic, union(currObservers, newObservers));
}

return () => {
const observers = this.observers.get(contentTopic);
if (observers) {
observers.delete(observer);
for (const contentTopic of contentTopicToObservers.keys()) {
const currentObservers = this.observers.get(contentTopic) || new Set();
const observersToRemove =
contentTopicToObservers.get(contentTopic) || new Set();

const nextObservers = leftMinusJoin(
currentObservers,
observersToRemove
);

if (nextObservers.size) {
this.observers.set(contentTopic, nextObservers);
} else {
this.observers.delete(contentTopic);
}
}
};
}
Expand All @@ -130,6 +148,10 @@ class Relay extends GossipSub implements IRelay {
return map;
}

public getMeshPeers(topic?: TopicStr): PeerIdStr[] {
return this.gossipSub.getMeshPeers(topic ?? this.pubSubTopic);
}

private async processIncomingMessage<T extends IDecodedMessage>(
pubSubTopic: string,
bytes: Uint8Array
Expand Down Expand Up @@ -168,8 +190,8 @@ class Relay extends GossipSub implements IRelay {
*
* @override
*/
subscribe(pubSubTopic: string): void {
this.addEventListener(
private gossipSubSubscribe(pubSubTopic: string): void {
this.gossipSub.addEventListener(
"gossipsub:message",
async (event: CustomEvent<GossipsubMessage>) => {
if (event.detail.msg.topic !== pubSubTopic) return;
Expand All @@ -182,17 +204,8 @@ class Relay extends GossipSub implements IRelay {
}
);

this.topicValidators.set(pubSubTopic, messageValidator);
super.subscribe(pubSubTopic);
}

unsubscribe(pubSubTopic: TopicStr): void {
super.unsubscribe(pubSubTopic);
this.topicValidators.delete(pubSubTopic);
}

getMeshPeers(topic?: TopicStr): PeerIdStr[] {
return super.getMeshPeers(topic ?? this.pubSubTopic);
this.gossipSub.topicValidators.set(pubSubTopic, messageValidator);
this.gossipSub.subscribe(pubSubTopic);
}
}

Expand All @@ -203,3 +216,46 @@ export function wakuRelay(
): (components: GossipSubComponents) => IRelay {
return (components: GossipSubComponents) => new Relay(components, init);
}

function toObservers<T extends IDecodedMessage>(
decoders: IDecoder<T>[],
callback: Callback<T>
): Map<ContentTopic, Set<Observer<T>>> {
const contentTopicToDecoders = Array.from(
groupByContentTopic(decoders).entries()
);

const contentTopicToObserversEntries = contentTopicToDecoders.map(
([contentTopic, decoders]) =>
[
contentTopic,
new Set(
decoders.map(
(decoder) =>
({
decoder,
callback,
} as Observer<T>)
)
),
] as [ContentTopic, Set<Observer<T>>]
);

return new Map(contentTopicToObserversEntries);
}

function union(left: Set<unknown>, right: Set<unknown>): Set<unknown> {
for (const val of right.values()) {
left.add(val);
}
return left;
}

function leftMinusJoin(left: Set<unknown>, right: Set<unknown>): Set<unknown> {
for (const val of right.values()) {
if (left.has(val)) {
left.delete(val);
}
}
return left;
}
2 changes: 1 addition & 1 deletion packages/core/src/lib/wait_for_remote_peer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ async function waitForGossipSubPeerInMesh(waku: IRelay): Promise<void> {
let peers = waku.getMeshPeers();

while (peers.length == 0) {
await pEvent(waku, "gossipsub:heartbeat");
await pEvent(waku.gossipSub, "gossipsub:heartbeat");
peers = waku.getMeshPeers();
}
}
Expand Down
17 changes: 10 additions & 7 deletions packages/core/src/lib/waku.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
import type { Stream } from "@libp2p/interface-connection";
import type { Libp2p } from "@libp2p/interface-libp2p";
import type { PeerId } from "@libp2p/interface-peer-id";
import type { PubSub } from "@libp2p/interface-pubsub";
import type { Multiaddr } from "@multiformats/multiaddr";
import type {
IFilter,
Expand Down Expand Up @@ -71,8 +70,10 @@ export class WakuNode implements Waku {
this.lightPush = lightPush(libp2p);
}

if (isRelay(libp2p.pubsub)) {
this.relay = libp2p.pubsub;
// since wakuRelay function will make it IRelay and not PubSub
const maybeRelay = libp2p.pubsub as unknown as IRelay;
if (isRelay(maybeRelay)) {
this.relay = maybeRelay;
}

const pingKeepAlive =
Expand Down Expand Up @@ -120,7 +121,9 @@ export class WakuNode implements Waku {
const codecs: string[] = [];
if (_protocols.includes(Protocols.Relay)) {
if (this.relay) {
this.relay.multicodecs.forEach((codec) => codecs.push(codec));
this.relay.gossipSub.multicodecs.forEach((codec: string) =>
codecs.push(codec)
);
} else {
log(
"Relay codec not included in dial codec: protocol not mounted locally"
Expand Down Expand Up @@ -189,10 +192,10 @@ export class WakuNode implements Waku {
}
}

function isRelay(pubsub: PubSub): pubsub is IRelay {
if (pubsub) {
function isRelay(maybeRelay: IRelay): boolean {
if (maybeRelay) {
try {
return pubsub.multicodecs.includes(
return maybeRelay.gossipSub.multicodecs.includes(
relayConstants.RelayCodecs[relayConstants.RelayCodecs.length - 1]
);
// Exception is expected if `libp2p` was not instantiated with pubsub
Expand Down
16 changes: 3 additions & 13 deletions packages/interfaces/src/filter.ts
Original file line number Diff line number Diff line change
@@ -1,14 +1,4 @@
import type { IDecodedMessage, IDecoder } from "./message.js";
import type {
Callback,
PointToPointProtocol,
ProtocolOptions,
} from "./protocols.js";
import type { PointToPointProtocol } from "./protocols.js";
import type { IReceiver } from "./receiver.js";

export interface IFilter extends PointToPointProtocol {
subscribe: <T extends IDecodedMessage>(
decoders: IDecoder<T>[],
callback: Callback<T>,
opts?: ProtocolOptions
) => Promise<() => Promise<void>>;
}
export type IFilter = IReceiver & PointToPointProtocol;
1 change: 1 addition & 0 deletions packages/interfaces/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,3 +9,4 @@ export * from "./store.js";
export * from "./waku.js";
export * from "./connection_manager.js";
export * from "./sender.js";
export * from "./receiver.js";
17 changes: 17 additions & 0 deletions packages/interfaces/src/receiver.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
import type { IDecodedMessage, IDecoder } from "./message.js";
import type { Callback, ProtocolOptions } from "./protocols.js";

type Unsubscribe = () => void | Promise<void>;
type PubSubTopic = string;
type ContentTopic = string;

export type ActiveSubscriptions = Map<PubSubTopic, ContentTopic[]>;

export interface IReceiver {
subscribe: <T extends IDecodedMessage>(
decoders: IDecoder<T> | IDecoder<T>[],
callback: Callback<T>,
opts?: ProtocolOptions
) => Unsubscribe | Promise<Unsubscribe>;
getActiveSubscriptions: () => ActiveSubscriptions;
}
20 changes: 6 additions & 14 deletions packages/interfaces/src/relay.ts
Original file line number Diff line number Diff line change
@@ -1,21 +1,13 @@
import type { GossipSub } from "@chainsafe/libp2p-gossipsub";
import type { PeerIdStr, TopicStr } from "@chainsafe/libp2p-gossipsub/types";

import type { IDecodedMessage, IDecoder } from "./message.js";
import type { Callback } from "./protocols.js";
import { IReceiver } from "./receiver.js";
import type { ISender } from "./sender.js";

type PubSubTopic = string;
type ContentTopic = string;

export type ActiveSubscriptions = Map<PubSubTopic, ContentTopic[]>;

interface IRelayAPI {
addObserver: <T extends IDecodedMessage>(
decoder: IDecoder<T>,
callback: Callback<T>
) => () => void;
getMeshPeers: () => string[];
getActiveSubscriptions: () => ActiveSubscriptions | undefined;
readonly gossipSub: GossipSub;
start: () => Promise<void>;
getMeshPeers: (topic?: TopicStr) => PeerIdStr[];
}

export type IRelay = IRelayAPI & GossipSub & ISender;
export type IRelay = IRelayAPI & ISender & IReceiver;
2 changes: 1 addition & 1 deletion packages/tests/tests/filter.node.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ describe("Waku Filter", () => {
messageCount++;
expect(msg.contentTopic).to.eq(TestContentTopic);
};
await waku.filter.subscribe([TestDecoder], callback);
await waku.filter.subscribe(TestDecoder, callback);

await delay(200);
await waku.lightPush.send(TestEncoder, {
Expand Down
Loading