Skip to content

Commit c020142

Browse files
committed
feat: add throttled promise class
@W-11940230@
1 parent 4e1df36 commit c020142

File tree

3 files changed

+224
-0
lines changed

3 files changed

+224
-0
lines changed

src/index.ts

+1
Original file line numberDiff line numberDiff line change
@@ -12,3 +12,4 @@ export * from './errors';
1212
export * from './json';
1313
export * from './nodash';
1414
export * from './collections';
15+
export * from './throttledPromiseAll';

src/throttledPromiseAll.ts

+133
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,133 @@
1+
/*
2+
* Copyright (c) 2020, salesforce.com, inc.
3+
* All rights reserved.
4+
* Licensed under the BSD 3-Clause license.
5+
* For full license text, see LICENSE.txt file in the repo root or https://opensource.org/licenses/BSD-3-Clause
6+
*/
7+
import { ensureArray } from './collections';
8+
import { Duration } from './duration';
9+
10+
export type PromiseOptions = {
11+
concurrency: number;
12+
stopOnError?: boolean;
13+
timeout?: Duration;
14+
};
15+
16+
export type PromiseItem<T, O = T | undefined> = {
17+
source: T;
18+
producer: (source: T, throttledPromise: ThrottledPromiseAll<T, O | undefined>) => Promise<O | undefined>;
19+
};
20+
21+
/**
22+
* A promise that throttles the number of promises running at a time.
23+
*
24+
* The constructor takes {@link PromiseOptions} to initialize the constraints of the promise.
25+
*
26+
* ```typescript
27+
* // Create a ThrottledPromiseAll that will take numbers and return numbers
28+
* const throttledPromise = new ThrottledPromiseAll<number, number>({ concurrency: 1, timeout: Duration.milliseconds(100) });
29+
*
30+
* // Define a producer function that will take a number and return a promise that resolves to a number
31+
* const numberProducer = (source: number, throttledPromiseAll: ThrottledPromiseAll<number, number | undefined>): Promise<number> => Promise.resolve(source + 1);
32+
* throttledPromiseAll.add([1, 2, 3, 4, 5], numberProducer);
33+
*
34+
* const numberResults = await throttledPromiseAll.all();
35+
* ```
36+
*/
37+
export class ThrottledPromiseAll<T, O = T> {
38+
private readonly queue: Array<PromiseItem<T, O | undefined>>;
39+
private readonly concurrency: number;
40+
private results: Array<O | undefined> = [];
41+
private wait: Duration;
42+
private timeout: NodeJS.Timeout | undefined;
43+
44+
/**
45+
* Construct a new ThrottledPromiseAll.
46+
*
47+
* @param options {@link PromiseOptions}
48+
*/
49+
public constructor(options: PromiseOptions = { concurrency: 1 }) {
50+
this.queue = [];
51+
this.concurrency = options.concurrency;
52+
this.wait = options.timeout ?? Duration.milliseconds(0);
53+
}
54+
55+
/**
56+
* Add source items to the queue of promises to be resolved.
57+
* Adding an item to the queue requires a producer function that will take the source item and return a promise.
58+
* Each item in the can have a different producer function, as long as the producer function conforms the
59+
* types of the ThrottledPromiseAll when constructed.
60+
*
61+
* @param source
62+
* @param producer the producer function that will take the source item and return a promise. The producer function signature
63+
* must conform to the types of the ThrottledPromiseAll when constructed.
64+
*/
65+
public add(
66+
source: T | T[],
67+
producer: (source: T, throttledPromise: ThrottledPromiseAll<T, O | undefined>) => Promise<O | undefined>
68+
): void {
69+
ensureArray(source).forEach((s) => this.queue.push({ source: s, producer }));
70+
}
71+
72+
/**
73+
* Returns a promise that resolves the items present in the queue using the associated producer.
74+
*
75+
* This function will throw an error if the timeout is reached before all items in the queue are resolved (see {@link PromiseOptions.timeout}).
76+
*
77+
* @returns A promise that resolves to an array of results.
78+
*/
79+
public async all(): Promise<Array<O | undefined>> {
80+
let timeoutPromise: Promise<void> | undefined;
81+
if (this.wait.milliseconds > 0) {
82+
if (!this.timeout) {
83+
timeoutPromise = new Promise((resolve, reject) => {
84+
this.timeout = setTimeout(() => {
85+
try {
86+
clearTimeout(this.timeout);
87+
this.stop();
88+
reject(new Error(`PromiseQueue timed out after ${this.wait.milliseconds} milliseconds`));
89+
} catch (e) {
90+
reject(e);
91+
}
92+
}, this.wait.milliseconds);
93+
});
94+
}
95+
}
96+
try {
97+
if (timeoutPromise) {
98+
await Promise.race([this.dequeue(), timeoutPromise]);
99+
} else {
100+
await this.dequeue();
101+
}
102+
this.stop();
103+
return this.results;
104+
} catch (e) {
105+
this.stop();
106+
throw e;
107+
}
108+
}
109+
110+
/**
111+
* Returns the results of the promises that have been resolved.
112+
*/
113+
public getResults(): Array<O | undefined> {
114+
return this.results;
115+
}
116+
117+
private stop(): void {
118+
clearTimeout(this.timeout);
119+
this.queue.splice(0, this.queue.length);
120+
}
121+
122+
private async dequeue(): Promise<void> {
123+
while (this.queue.length > 0) {
124+
const next = this.queue.slice(0, this.concurrency);
125+
this.queue.splice(0, this.concurrency);
126+
// eslint-disable-next-line no-await-in-loop
127+
const results = await Promise.all(
128+
next.map((item) => item.producer(item.source, this).catch((e) => Promise.reject(e)))
129+
);
130+
this.results.push(...results);
131+
}
132+
}
133+
}

test/promiseQueue.test.ts

+90
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,90 @@
1+
/*
2+
* Copyright (c) 2020, salesforce.com, inc.
3+
* All rights reserved.
4+
* Licensed under the BSD 3-Clause license.
5+
* For full license text, see LICENSE.txt file in the repo root or https://opensource.org/licenses/BSD-3-Clause
6+
*/
7+
import { expect } from 'chai';
8+
import { ThrottledPromiseAll } from '../src/throttledPromiseAll';
9+
import { Duration } from '../src/duration';
10+
11+
describe('throttledPromiseAll', () => {
12+
const numberProducer = (
13+
source: number,
14+
// eslint-disable-next-line @typescript-eslint/no-unused-vars
15+
throttledPromise: ThrottledPromiseAll<number, number | undefined>
16+
): Promise<number> => Promise.resolve(source + 1);
17+
18+
it('should execute promises in order', async () => {
19+
const throttledPromiseAll: ThrottledPromiseAll<number, number> = new ThrottledPromiseAll({ concurrency: 1 });
20+
for (const i of [1, 2, 3, 4, 5]) {
21+
// eslint-disable-next-line no-await-in-loop
22+
throttledPromiseAll.add(i, numberProducer);
23+
}
24+
await throttledPromiseAll.all();
25+
const results = throttledPromiseAll.getResults() as number[];
26+
expect(results).to.deep.equal([1, 2, 3, 4, 5].map((i) => i + 1));
27+
});
28+
it('should execute promises in groups of 2 - auto start', async () => {
29+
const throttledPromiseAll: ThrottledPromiseAll<number, number> = new ThrottledPromiseAll({ concurrency: 2 });
30+
for (const i of [1, 2, 3, 4, 5]) {
31+
// eslint-disable-next-line no-await-in-loop
32+
throttledPromiseAll.add(i, numberProducer);
33+
}
34+
await throttledPromiseAll.all();
35+
const results = throttledPromiseAll.getResults() as number[];
36+
expect(results).to.deep.equal([1, 2, 3, 4, 5].map((i) => i + 1));
37+
});
38+
it('should execute promises in groups of 10 - auto start', async () => {
39+
const throttledPromiseAll: ThrottledPromiseAll<number, number> = new ThrottledPromiseAll({ concurrency: 10 });
40+
for (const i of [1, 2, 3, 4, 5]) {
41+
// eslint-disable-next-line no-await-in-loop
42+
throttledPromiseAll.add(i, numberProducer);
43+
}
44+
await throttledPromiseAll.all();
45+
const results = throttledPromiseAll.getResults() as number[];
46+
expect(results).to.deep.equal([1, 2, 3, 4, 5].map((i) => i + 1));
47+
});
48+
it('should should reject', async () => {
49+
try {
50+
const throttledPromiseAll: ThrottledPromiseAll<number, number> = new ThrottledPromiseAll({
51+
concurrency: 1,
52+
timeout: Duration.milliseconds(5000),
53+
});
54+
throttledPromiseAll.add([1], (source) => Promise.reject(new Error(`Promise ${source} rejected`)));
55+
await throttledPromiseAll.all();
56+
} catch (e) {
57+
expect((e as Error).message).to.equal('Promise 1 rejected');
58+
}
59+
});
60+
it('should should timeout', async () => {
61+
try {
62+
const throttledPromiseAll: ThrottledPromiseAll<number, number> = new ThrottledPromiseAll({
63+
concurrency: 1,
64+
timeout: Duration.milliseconds(100),
65+
});
66+
throttledPromiseAll.add(
67+
[1, 2, 3, 4, 5],
68+
(source) => new Promise((resolve) => setTimeout(() => resolve(source + 1), 10000))
69+
);
70+
await throttledPromiseAll.all();
71+
} catch (e) {
72+
expect((e as Error).message).to.equal('PromiseQueue timed out after 100 milliseconds');
73+
}
74+
});
75+
it('should add more to the queue', async () => {
76+
const throttledPromiseAll: ThrottledPromiseAll<number, number> = new ThrottledPromiseAll({ concurrency: 1 });
77+
throttledPromiseAll.add(
78+
1,
79+
(source: number, throttledPromise: ThrottledPromiseAll<number, number | undefined>): Promise<number> => {
80+
if (source === 1) {
81+
throttledPromise.add([2, 3, 4, 5], numberProducer);
82+
}
83+
return Promise.resolve(source + 1);
84+
}
85+
);
86+
await throttledPromiseAll.all();
87+
const results = throttledPromiseAll.getResults() as number[];
88+
expect(results).to.deep.equal([1, 2, 3, 4, 5].map((i) => i + 1));
89+
});
90+
});

0 commit comments

Comments
 (0)