Skip to content

Commit 7daa9d0

Browse files
authored
feat: toSubscriptionIterator impl for IReceiver (#1307)
1 parent 60c9a62 commit 7daa9d0

File tree

11 files changed

+196
-6
lines changed

11 files changed

+196
-6
lines changed

package-lock.json

+3-1
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

package.json

+1-1
Original file line numberDiff line numberDiff line change
@@ -3,9 +3,9 @@
33
"private": true,
44
"type": "module",
55
"workspaces": [
6+
"packages/interfaces",
67
"packages/utils",
78
"packages/proto",
8-
"packages/interfaces",
99
"packages/enr",
1010
"packages/core",
1111
"packages/message-hash",

packages/core/src/lib/filter/index.ts

+9
Original file line numberDiff line numberDiff line change
@@ -4,13 +4,15 @@ import type { IncomingStreamData } from "@libp2p/interface-registrar";
44
import type {
55
ActiveSubscriptions,
66
Callback,
7+
IAsyncIterator,
78
IDecodedMessage,
89
IDecoder,
910
IFilter,
1011
ProtocolCreateOptions,
1112
ProtocolOptions,
1213
} from "@waku/interfaces";
1314
import { WakuMessage as WakuMessageProto } from "@waku/proto";
15+
import { toAsyncIterator } from "@waku/utils";
1416
import debug from "debug";
1517
import all from "it-all";
1618
import * as lp from "it-length-prefixed";
@@ -124,6 +126,13 @@ class Filter extends BaseProtocol implements IFilter {
124126
};
125127
}
126128

129+
public toSubscriptionIterator<T extends IDecodedMessage>(
130+
decoders: IDecoder<T> | IDecoder<T>[],
131+
opts?: ProtocolOptions | undefined
132+
): Promise<IAsyncIterator<T>> {
133+
return toAsyncIterator(this, decoders, opts);
134+
}
135+
127136
public getActiveSubscriptions(): ActiveSubscriptions {
128137
const map: ActiveSubscriptions = new Map();
129138
const subscriptions = this.subscriptions as Map<

packages/core/src/lib/relay/index.ts

+10
Original file line numberDiff line numberDiff line change
@@ -12,14 +12,17 @@ import { sha256 } from "@noble/hashes/sha256";
1212
import type {
1313
ActiveSubscriptions,
1414
Callback,
15+
IAsyncIterator,
1516
IDecodedMessage,
1617
IDecoder,
1718
IEncoder,
1819
IMessage,
1920
IRelay,
2021
ProtocolCreateOptions,
22+
ProtocolOptions,
2123
SendResult,
2224
} from "@waku/interfaces";
25+
import { toAsyncIterator } from "@waku/utils";
2326
import debug from "debug";
2427

2528
import { DefaultPubSubTopic } from "../constants.js";
@@ -146,6 +149,13 @@ class Relay implements IRelay {
146149
};
147150
}
148151

152+
public toSubscriptionIterator<T extends IDecodedMessage>(
153+
decoders: IDecoder<T> | IDecoder<T>[],
154+
opts?: ProtocolOptions | undefined
155+
): Promise<IAsyncIterator<T>> {
156+
return toAsyncIterator(this, decoders, opts);
157+
}
158+
149159
public getActiveSubscriptions(): ActiveSubscriptions {
150160
const map = new Map();
151161
map.set(this.pubSubTopic, this.observers.keys());

packages/interfaces/src/index.ts

+1
Original file line numberDiff line numberDiff line change
@@ -10,3 +10,4 @@ export * from "./waku.js";
1010
export * from "./connection_manager.js";
1111
export * from "./sender.js";
1212
export * from "./receiver.js";
13+
export * from "./misc.js";

packages/interfaces/src/misc.ts

+11
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,11 @@
1+
import type { IDecodedMessage } from "./message.js";
2+
3+
export interface IAsyncIterator<T extends IDecodedMessage> {
4+
iterator: AsyncIterator<T>;
5+
stop: Unsubscribe;
6+
}
7+
8+
export type Unsubscribe = () => void | Promise<void>;
9+
10+
export type PubSubTopic = string;
11+
export type ContentTopic = string;

packages/interfaces/src/receiver.ts

+10-4
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,19 @@
11
import type { IDecodedMessage, IDecoder } from "./message.js";
2+
import type {
3+
ContentTopic,
4+
IAsyncIterator,
5+
PubSubTopic,
6+
Unsubscribe,
7+
} from "./misc.js";
28
import type { Callback, ProtocolOptions } from "./protocols.js";
39

4-
type Unsubscribe = () => void | Promise<void>;
5-
type PubSubTopic = string;
6-
type ContentTopic = string;
7-
810
export type ActiveSubscriptions = Map<PubSubTopic, ContentTopic[]>;
911

1012
export interface IReceiver {
13+
toSubscriptionIterator: <T extends IDecodedMessage>(
14+
decoders: IDecoder<T> | IDecoder<T>[],
15+
opts?: ProtocolOptions
16+
) => Promise<IAsyncIterator<T>>;
1117
subscribe: <T extends IDecodedMessage>(
1218
decoders: IDecoder<T> | IDecoder<T>[],
1319
callback: Callback<T>,

packages/tests/tests/utils.spec.ts

+98
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,98 @@
1+
import {
2+
createDecoder,
3+
createEncoder,
4+
DefaultPubSubTopic,
5+
waitForRemotePeer,
6+
} from "@waku/core";
7+
import { createLightNode } from "@waku/create";
8+
import type { LightNode } from "@waku/interfaces";
9+
import { Protocols } from "@waku/interfaces";
10+
import { toAsyncIterator } from "@waku/utils";
11+
import { bytesToUtf8, utf8ToBytes } from "@waku/utils/bytes";
12+
import { expect } from "chai";
13+
14+
import { makeLogFileName, NOISE_KEY_1, Nwaku } from "../src/index.js";
15+
16+
const TestContentTopic = "/test/1/waku-filter";
17+
const TestEncoder = createEncoder({ contentTopic: TestContentTopic });
18+
const TestDecoder = createDecoder(TestContentTopic);
19+
20+
describe("Util: toAsyncIterator", () => {
21+
let waku: LightNode;
22+
let nwaku: Nwaku;
23+
24+
beforeEach(async function () {
25+
this.timeout(15000);
26+
nwaku = new Nwaku(makeLogFileName(this));
27+
await nwaku.start({ filter: true, lightpush: true, relay: true });
28+
waku = await createLightNode({
29+
staticNoiseKey: NOISE_KEY_1,
30+
libp2p: { addresses: { listen: ["/ip4/0.0.0.0/tcp/0/ws"] } },
31+
});
32+
await waku.start();
33+
await waku.dial(await nwaku.getMultiaddrWithId());
34+
await waitForRemotePeer(waku, [Protocols.Filter, Protocols.LightPush]);
35+
});
36+
37+
afterEach(async () => {
38+
try {
39+
await nwaku.stop();
40+
await waku.stop();
41+
} catch (err) {
42+
console.log("Failed to stop", err);
43+
}
44+
});
45+
46+
it("creates an iterator", async function () {
47+
const messageText = "hey, what's up?";
48+
const sent = { payload: utf8ToBytes(messageText) };
49+
50+
const { iterator } = await toAsyncIterator(waku.filter, TestDecoder);
51+
52+
await waku.lightPush.send(TestEncoder, sent);
53+
const { value } = await iterator.next();
54+
55+
expect(value.contentTopic).to.eq(TestContentTopic);
56+
expect(value.pubSubTopic).to.eq(DefaultPubSubTopic);
57+
expect(bytesToUtf8(value.payload)).to.eq(messageText);
58+
});
59+
60+
it("handles multiple messages", async function () {
61+
const { iterator } = await toAsyncIterator(waku.filter, TestDecoder);
62+
63+
await waku.lightPush.send(TestEncoder, {
64+
payload: utf8ToBytes("Filtering works!"),
65+
});
66+
await waku.lightPush.send(TestEncoder, {
67+
payload: utf8ToBytes("Filtering still works!"),
68+
});
69+
70+
let result = await iterator.next();
71+
expect(bytesToUtf8(result.value.payload)).to.eq("Filtering works!");
72+
73+
result = await iterator.next();
74+
expect(bytesToUtf8(result.value.payload)).to.eq("Filtering still works!");
75+
});
76+
77+
it("unsubscribes", async function () {
78+
const { iterator, stop } = await toAsyncIterator(waku.filter, TestDecoder);
79+
80+
await waku.lightPush.send(TestEncoder, {
81+
payload: utf8ToBytes("This should be received"),
82+
});
83+
84+
await stop();
85+
86+
await waku.lightPush.send(TestEncoder, {
87+
payload: utf8ToBytes("This should not be received"),
88+
});
89+
90+
let result = await iterator.next();
91+
expect(result.done).to.eq(true);
92+
expect(bytesToUtf8(result.value.payload)).to.eq("This should be received");
93+
94+
result = await iterator.next();
95+
expect(result.value).to.eq(undefined);
96+
expect(result.done).to.eq(true);
97+
});
98+
});

packages/utils/package.json

+1
Original file line numberDiff line numberDiff line change
@@ -78,6 +78,7 @@
7878
"@typescript-eslint/eslint-plugin": "^5.57.0",
7979
"@typescript-eslint/parser": "^5.51.0",
8080
"@waku/build-utils": "*",
81+
"@waku/interfaces": "*",
8182
"cspell": "^6.31.1",
8283
"eslint": "^8.35.0",
8384
"eslint-config-prettier": "^8.6.0",

packages/utils/src/common/index.ts

+1
Original file line numberDiff line numberDiff line change
@@ -1,2 +1,3 @@
11
export * from "./is_defined.js";
22
export * from "./random_subset.js";
3+
export * from "./to_async_iterator.js";
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,51 @@
1+
import type {
2+
IAsyncIterator,
3+
IDecodedMessage,
4+
IDecoder,
5+
IReceiver,
6+
ProtocolOptions,
7+
Unsubscribe,
8+
} from "@waku/interfaces";
9+
10+
export async function toAsyncIterator<T extends IDecodedMessage>(
11+
receiver: IReceiver,
12+
decoder: IDecoder<T> | IDecoder<T>[],
13+
options?: ProtocolOptions
14+
): Promise<IAsyncIterator<T>> {
15+
const messages: T[] = [];
16+
17+
let unsubscribe: undefined | Unsubscribe;
18+
unsubscribe = await receiver.subscribe(
19+
decoder,
20+
(message: T) => {
21+
messages.push(message);
22+
},
23+
options
24+
);
25+
26+
async function* iterator(): AsyncIterator<T> {
27+
while (true) {
28+
const message = messages.shift() as T;
29+
30+
if (!unsubscribe && messages.length === 0) {
31+
return message;
32+
}
33+
34+
if (!message && unsubscribe) {
35+
continue;
36+
}
37+
38+
yield message;
39+
}
40+
}
41+
42+
return {
43+
iterator: iterator(),
44+
async stop() {
45+
if (unsubscribe) {
46+
await unsubscribe();
47+
unsubscribe = undefined;
48+
}
49+
},
50+
};
51+
}

0 commit comments

Comments
 (0)