Skip to content

Commit caccaea

Browse files
committed
add single decoder support
1 parent d560787 commit caccaea

File tree

3 files changed

+14
-7
lines changed

3 files changed

+14
-7
lines changed

packages/core/src/lib/filter/index.ts

+9-4
Original file line numberDiff line numberDiff line change
@@ -59,19 +59,20 @@ class Filter extends BaseProtocol implements IFilter {
5959
}
6060

6161
/**
62-
* @param decoders Array of Decoders to use to decode messages, it also specifies the content topics.
62+
* @param decoders Decoder or array of Decoders to use to decode messages, it also specifies the content topics.
6363
* @param callback A function that will be called on each message returned by the filter.
6464
* @param opts The FilterSubscriptionOpts used to narrow which messages are returned, and which peer to connect to.
6565
* @returns Unsubscribe function that can be used to end the subscription.
6666
*/
6767
async subscribe<T extends IDecodedMessage>(
68-
decoders: IDecoder<T>[],
68+
decoders: IDecoder<T> | IDecoder<T>[],
6969
callback: Callback<T>,
7070
opts?: ProtocolOptions
7171
): Promise<UnsubscribeFunction> {
72+
const decodersArray = Array.isArray(decoders) ? decoders : [decoders];
7273
const { pubSubTopic = DefaultPubSubTopic } = this.options;
7374

74-
const contentTopics = Array.from(groupByContentTopic(decoders).keys());
75+
const contentTopics = Array.from(groupByContentTopic(decodersArray).keys());
7576

7677
const contentFilters = contentTopics.map((contentTopic) => ({
7778
contentTopic,
@@ -110,7 +111,11 @@ class Filter extends BaseProtocol implements IFilter {
110111
throw e;
111112
}
112113

113-
const subscription: Subscription<T> = { callback, decoders, pubSubTopic };
114+
const subscription: Subscription<T> = {
115+
callback,
116+
decoders: decodersArray,
117+
pubSubTopic,
118+
};
114119
this.subscriptions.set(requestId, subscription);
115120

116121
return async () => {

packages/core/src/lib/relay/index.ts

+4-2
Original file line numberDiff line numberDiff line change
@@ -107,10 +107,12 @@ class Relay implements IRelay {
107107
* @returns Function to delete the observer
108108
*/
109109
public subscribe<T extends IDecodedMessage>(
110-
decoders: IDecoder<T>[],
110+
decoders: IDecoder<T> | IDecoder<T>[],
111111
callback: Callback<T>
112112
): () => void {
113-
const contentTopicToObservers = toObservers(decoders, callback);
113+
const contentTopicToObservers = Array.isArray(decoders)
114+
? toObservers(decoders, callback)
115+
: toObservers([decoders], callback);
114116

115117
for (const contentTopic of contentTopicToObservers.keys()) {
116118
const currObservers = this.observers.get(contentTopic) || new Set();

packages/interfaces/src/receiver.ts

+1-1
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@ export type ActiveSubscriptions = Map<PubSubTopic, ContentTopic[]>;
99

1010
export interface IReceiver {
1111
subscribe: <T extends IDecodedMessage>(
12-
decoders: IDecoder<T>[],
12+
decoders: IDecoder<T> | IDecoder<T>[],
1313
callback: Callback<T>,
1414
opts?: ProtocolOptions
1515
) => Unsubscribe | Promise<Unsubscribe>;

0 commit comments

Comments
 (0)