Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add getObservers method #1249

Merged
merged 8 commits into from
Mar 21, 2023
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
35 changes: 8 additions & 27 deletions packages/core/src/lib/relay/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,13 @@ import {
} from "@chainsafe/libp2p-gossipsub";
import type { PeerIdStr, TopicStr } from "@chainsafe/libp2p-gossipsub/types";
import { SignaturePolicy } from "@chainsafe/libp2p-gossipsub/types";
import { CustomEvent } from "@libp2p/interfaces/events";
import type {
Callback,
IDecoder,
IEncoder,
IMessage,
IRelay,
Observers,
ProtocolCreateOptions,
SendResult,
} from "@waku/interfaces";
Expand All @@ -28,18 +28,9 @@ import { messageValidator } from "./message_validator.js";

const log = debug("waku:relay");

export type Observer<T extends IDecodedMessage> = {
decoder: IDecoder<T>;
callback: Callback<T>;
};

export type RelayCreateOptions = ProtocolCreateOptions & GossipsubOpts;
export type ContentTopic = string;

type BasicEventPayload = {
contentTopic: string;
};

/**
* Implements the [Waku v2 Relay protocol](https://rfc.vac.dev/spec/11/).
* Must be passed as a `pubsub` module to a `Libp2p` instance.
Expand All @@ -55,7 +46,7 @@ class Relay extends GossipSub implements IRelay {
* observers called when receiving new message.
* Observers under key `""` are always called.
*/
private observers: Map<ContentTopic, Set<unknown>>;
private observers: Map<ContentTopic, Set<Observer<unknown>>>;

constructor(
components: GossipSubComponents,
Expand Down Expand Up @@ -120,26 +111,10 @@ class Relay extends GossipSub implements IRelay {

pushOrInitMapSet(this.observers, contentTopic, observer);

this.dispatchEvent(
new CustomEvent<BasicEventPayload>("observer:added", {
detail: {
contentTopic,
},
})
);

return () => {
const observers = this.observers.get(contentTopic);
if (observers) {
observers.delete(observer);

this.dispatchEvent(
new CustomEvent<BasicEventPayload>("observer:removed", {
detail: {
contentTopic,
},
})
);
}
};
}
Expand Down Expand Up @@ -208,6 +183,12 @@ class Relay extends GossipSub implements IRelay {
getMeshPeers(topic?: TopicStr): PeerIdStr[] {
return super.getMeshPeers(topic ?? this.pubSubTopic);
}

public getObservers<T extends IDecodedMessage>(
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think it is any better than just exposing observers...

@fryorcraken

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

By dump method, I would mean a method that returns a string rendered version of the observers for logging purposes.

Not sure what are common practice in JavaScript. For example, in Rust, one would implement the fmt::Debug trait.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have an idea. Will write once back on the laptop.

Copy link
Collaborator

@fryorcraken fryorcraken Mar 20, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The idea is to inspire ourselves from the new filter protocol https://github.com/vacp2p/rfc/pull/562/files and see if we can provide an interface on active subscription that could then be added to IReceiver.

Look at the filter request, it has 1 optional pubsub and an array of content topics.
There is a also a new "ping" request to check that the remote has active subscriptions for the local node.

Hence, we could have an interface such as:

type PubSubTopic = string;
type ContentTopic = string;

type ActiveSubscriptions = Map<PubSubTopic, ContentTopic[]>

interface IReceiver {
    // ... subscribe
    getActiveSubscriptions: () => Promise<ActiveSubscriptions | undefined>
}
  • In the case of relay, the Promise would resolve instantly.
  • In the case of the new filter protocol, we can do a ping to the remote server and return undefined (or empty map?) if remote server returns an error or returned locally saved subscriptions if 200 OK is returned.

edit: fixed the interface

cc @jm-clius

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks really neat. 👍 I will update this PR to close original issue and update IReceiver PR to correlate with this.

One thing I what to change is Promise<ActiveSubscriptions | undefined> - I believe if operation is sync it shouldn't be overcomplicated with Promise.

Copy link
Collaborator

@fryorcraken fryorcraken Mar 21, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fair to not use a Promise and stick to ActiveSubscriptions return type for now.

When we implement the new filter protocol and implement this interface, I think it would make sense to do a "ping" to confirm the subscriptions are active. At this point in time, we may need to change the return type to Promise<ActiveSubscriptions | undefined> | ActiveSubscriptions | undefined or something like that.
But this can be done when it becomes necessary.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agree, let's handle it when time comes

contentTopic: string
): Set<Observer<T>> {
return this.observers.get(contentTopic) as Set<Observer<T>>;
}
}

Relay.multicodec = constants.RelayCodecs[constants.RelayCodecs.length - 1];
Expand Down
20 changes: 9 additions & 11 deletions packages/interfaces/src/relay.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
import type { GossipSub, GossipsubEvents } from "@chainsafe/libp2p-gossipsub";
import type { EventEmitter } from "@libp2p/interfaces/events";
import type { GossipSub } from "@chainsafe/libp2p-gossipsub";

import type {
IDecodedMessage,
Expand All @@ -9,20 +8,19 @@ import type {
} from "./message.js";
import type { Callback, SendResult } from "./protocols.js";

export interface RelayEvents {
"observer:added": CustomEvent;
"observer:removed": CustomEvent;
}

type IRelayEmitter = EventEmitter<RelayEvents & GossipsubEvents>;
export type Observer<T extends IDecodedMessage> = {
decoder: IDecoder<T>;
callback: Callback<T>;
};

interface IRelayAPI extends GossipSub {
export interface IRelay extends GossipSub {
send: (encoder: IEncoder, message: IMessage) => Promise<SendResult>;
addObserver: <T extends IDecodedMessage>(
decoder: IDecoder<T>,
callback: Callback<T>
) => () => void;
getMeshPeers: () => string[];
getObservers: <T extends IDecodedMessage>(
contentTopic: string
) => Set<Observer<T>>;
}

export type IRelay = IRelayAPI & IRelayEmitter;