Skip to content

Commit ef87af6

Browse files
authored
chore: add iterator timeout and unblock main thread (#1357)
* add iterator timeout * add idle state to generator * add js-doc and iterator option
1 parent bc2615e commit ef87af6

File tree

2 files changed

+53
-4
lines changed

2 files changed

+53
-4
lines changed

packages/tests/tests/utils.spec.ts

+18-3
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,12 @@ describe("Util: toAsyncIterator", () => {
4848
const messageText = "hey, what's up?";
4949
const sent = { payload: utf8ToBytes(messageText) };
5050

51-
const { iterator } = await toAsyncIterator(waku.filter, TestDecoder);
51+
const { iterator } = await toAsyncIterator(
52+
waku.filter,
53+
TestDecoder,
54+
{},
55+
{ timeoutMs: 1000 }
56+
);
5257

5358
await waku.lightPush.send(TestEncoder, sent);
5459
const { value } = await iterator.next();
@@ -60,7 +65,12 @@ describe("Util: toAsyncIterator", () => {
6065

6166
it("handles multiple messages", async function () {
6267
this.timeout(10000);
63-
const { iterator } = await toAsyncIterator(waku.filter, TestDecoder);
68+
const { iterator } = await toAsyncIterator(
69+
waku.filter,
70+
TestDecoder,
71+
{},
72+
{ timeoutMs: 1000 }
73+
);
6474

6575
await waku.lightPush.send(TestEncoder, {
6676
payload: utf8ToBytes("Filtering works!"),
@@ -78,7 +88,12 @@ describe("Util: toAsyncIterator", () => {
7888

7989
it("unsubscribes", async function () {
8090
this.timeout(10000);
81-
const { iterator, stop } = await toAsyncIterator(waku.filter, TestDecoder);
91+
const { iterator, stop } = await toAsyncIterator(
92+
waku.filter,
93+
TestDecoder,
94+
{},
95+
{ timeoutMs: 1000 }
96+
);
8297

8398
await waku.lightPush.send(TestEncoder, {
8499
payload: utf8ToBytes("This should be received"),

packages/utils/src/common/to_async_iterator.ts

+35-1
Original file line numberDiff line numberDiff line change
@@ -7,11 +7,29 @@ import type {
77
Unsubscribe,
88
} from "@waku/interfaces";
99

10+
type IteratorOptions = {
11+
timeoutMs?: number;
12+
iteratorDelay?: number;
13+
};
14+
15+
const FRAME_RATE = 60;
16+
17+
/**
18+
* Function that transforms IReceiver subscription to iterable stream of data.
19+
* @param receiver - object that allows to be subscribed to;
20+
* @param decoder - parameter to be passed to receiver for subscription;
21+
* @param options - options for receiver for subscription;
22+
* @param iteratorOptions - optional configuration for iterator;
23+
* @returns iterator and stop function to terminate it.
24+
*/
1025
export async function toAsyncIterator<T extends IDecodedMessage>(
1126
receiver: IReceiver,
1227
decoder: IDecoder<T> | IDecoder<T>[],
13-
options?: ProtocolOptions
28+
options?: ProtocolOptions,
29+
iteratorOptions?: IteratorOptions
1430
): Promise<IAsyncIterator<T>> {
31+
const iteratorDelay = iteratorOptions?.iteratorDelay ?? FRAME_RATE;
32+
1533
const messages: T[] = [];
1634

1735
let unsubscribe: undefined | Unsubscribe;
@@ -23,8 +41,18 @@ export async function toAsyncIterator<T extends IDecodedMessage>(
2341
options
2442
);
2543

44+
const isWithTimeout = Number.isInteger(iteratorOptions?.timeoutMs);
45+
const timeoutMs = iteratorOptions?.timeoutMs ?? 0;
46+
const startTime = Date.now();
47+
2648
async function* iterator(): AsyncIterator<T> {
2749
while (true) {
50+
if (isWithTimeout && Date.now() - startTime >= timeoutMs) {
51+
return;
52+
}
53+
54+
await wait(iteratorDelay);
55+
2856
const message = messages.shift() as T;
2957

3058
if (!unsubscribe && messages.length === 0) {
@@ -49,3 +77,9 @@ export async function toAsyncIterator<T extends IDecodedMessage>(
4977
},
5078
};
5179
}
80+
81+
function wait(ms: number): Promise<void> {
82+
return new Promise((resolve) => {
83+
setTimeout(resolve, ms);
84+
});
85+
}

0 commit comments

Comments
 (0)