Skip to content

Commit 8778166

Browse files
peternhalemshanemc
andauthored
fix: make promise throttle level consistent (#240)
* fix: make promising throttle level consistent @W-13709025@ * test: handling of undefineds * chore: add some really good tests Thanks Shane * test: ut for ensureArray * style: concise test assertions * chore: properly remove entries from concurrency pool * chore: make concurrency pool a map * test: another ensureArray UT * chore: apply review comment --------- Co-authored-by: mshanemc <shane.mclaughlin@salesforce.com>
1 parent 1991aa5 commit 8778166

5 files changed

+108
-24
lines changed

src/collections.ts

+1-1
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@
1313
*/
1414

1515
export const ensureArray = <T>(entryOrArray: T | T[] | undefined): T[] => {
16-
if (entryOrArray) {
16+
if (entryOrArray !== undefined && entryOrArray !== null) {
1717
return Array.isArray(entryOrArray) ? entryOrArray : [entryOrArray];
1818
}
1919
return [];

src/throttledPromiseAll.ts

+49-11
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,15 @@ export type PromiseItem<T, O = T | undefined> = {
1818
producer: (source: T, throttledPromise: ThrottledPromiseAll<T, O | undefined>) => Promise<O | undefined>;
1919
};
2020

21+
type IndexedProducer<T, O = T> = PromiseItem<T, O> & {
22+
index: number;
23+
};
24+
25+
type IndexedResult<O> = {
26+
index: number;
27+
result: O | undefined;
28+
};
29+
2130
/**
2231
* A promise that throttles the number of promises running at a time.
2332
*
@@ -39,7 +48,7 @@ export class ThrottledPromiseAll<T, O = T> {
3948
private readonly concurrency: number;
4049
private wait: Duration;
4150
private timeout: NodeJS.Timeout | undefined;
42-
readonly #results: Array<O | undefined> = [];
51+
readonly #results: Array<IndexedResult<O> | undefined> = [];
4352

4453
/**
4554
* Construct a new ThrottledPromiseAll.
@@ -56,7 +65,7 @@ export class ThrottledPromiseAll<T, O = T> {
5665
* Returns the results of the promises that have been resolved.
5766
*/
5867
public get results(): Array<O | undefined> {
59-
return this.#results;
68+
return this.#results.sort((a, b) => (a?.index ?? 0) - (b?.index ?? 0)).map((r) => r?.result);
6069
}
6170

6271
/**
@@ -73,7 +82,7 @@ export class ThrottledPromiseAll<T, O = T> {
7382
source: T | T[],
7483
producer: (source: T, throttledPromise: ThrottledPromiseAll<T, O | undefined>) => Promise<O | undefined>
7584
): void {
76-
ensureArray(source).forEach((s) => this.queue.push({ source: s, producer }));
85+
ensureArray<T>(source).forEach((s) => this.queue.push({ source: s, producer }));
7786
}
7887

7988
/**
@@ -109,7 +118,7 @@ export class ThrottledPromiseAll<T, O = T> {
109118
await this.dequeue();
110119
}
111120
this.stop();
112-
return this.#results;
121+
return this.results;
113122
} catch (e) {
114123
this.stop();
115124
throw e;
@@ -124,14 +133,43 @@ export class ThrottledPromiseAll<T, O = T> {
124133
}
125134

126135
private async dequeue(): Promise<void> {
127-
while (this.queue.length > 0) {
128-
const next = this.queue.slice(0, this.concurrency);
129-
this.queue.splice(0, this.concurrency);
136+
const generator = function* (
137+
data: Array<PromiseItem<T, O | undefined>>
138+
): Generator<PromiseItem<T, O | undefined> | undefined> {
139+
while (data.length > 0) {
140+
yield data.shift();
141+
}
142+
};
143+
const concurrencyPool: Map<number, Promise<IndexedResult<O> | undefined>> = new Map<
144+
number,
145+
Promise<IndexedResult<O> | undefined>
146+
>();
147+
const get = generator(this.queue);
148+
let index = 0;
149+
while (this.queue.length > 0 || concurrencyPool.size > 0) {
150+
while (concurrencyPool.size < this.concurrency) {
151+
const item = get.next().value as PromiseItem<T, O | undefined>;
152+
if (!item) {
153+
break;
154+
}
155+
156+
const p: IndexedProducer<T, O> = { ...item, index: index++ };
157+
concurrencyPool.set(
158+
p.index,
159+
p
160+
.producer(item.source, this)
161+
.then((result) => ({ index: p.index, result }))
162+
.catch((e) => Promise.reject(e))
163+
);
164+
}
130165
// eslint-disable-next-line no-await-in-loop
131-
const results = await Promise.all(
132-
next.map((item) => item.producer(item.source, this).catch((e) => Promise.reject(e)))
133-
);
134-
this.#results.push(...results);
166+
const r = await Promise.race(concurrencyPool.values());
167+
const rIndex = r?.index ?? -1;
168+
if (!concurrencyPool.has(rIndex)) {
169+
throw new Error(`PromiseQueue: Could not find index ${r?.index} in pool`);
170+
}
171+
concurrencyPool.delete(rIndex);
172+
this.#results.push(r);
135173
}
136174
}
137175
}

test/collections.test.ts

+10-2
Original file line numberDiff line numberDiff line change
@@ -10,8 +10,16 @@ import { ensureArray } from '../src/collections';
1010
describe('collections', () => {
1111
describe('ensureArray', () => {
1212
it('undefined => empty array', () => {
13-
const input = undefined;
14-
expect(ensureArray(input)).to.deep.equal([]);
13+
expect(ensureArray(undefined)).to.deep.equal([]);
14+
});
15+
it('null => empty array', () => {
16+
expect(ensureArray(null)).to.deep.equal([]);
17+
});
18+
it('zero => array with zero', () => {
19+
expect(ensureArray(0)).to.deep.equal([0]);
20+
});
21+
it('empty array => empty array', () => {
22+
expect(ensureArray([])).to.deep.equal([]);
1523
});
1624
it('an array => the array', () => {
1725
const input = ['a', 'b'];

test/promiseQueue.test.ts

+47-4
Original file line numberDiff line numberDiff line change
@@ -5,8 +5,8 @@
55
* For full license text, see LICENSE.txt file in the repo root or https://opensource.org/licenses/BSD-3-Clause
66
*/
77
import { expect } from 'chai';
8-
import { ThrottledPromiseAll } from '../src/throttledPromiseAll';
9-
import { Duration } from '../src/duration';
8+
import { ThrottledPromiseAll } from '../src';
9+
import { Duration } from '../src';
1010

1111
describe('throttledPromiseAll', () => {
1212
const numberProducer = (
@@ -19,7 +19,10 @@ describe('throttledPromiseAll', () => {
1919
const throttledPromiseAll: ThrottledPromiseAll<number, number> = new ThrottledPromiseAll({ concurrency: 1 });
2020
for (const i of [1, 2, 3, 4, 5]) {
2121
// eslint-disable-next-line no-await-in-loop
22-
throttledPromiseAll.add(i, numberProducer);
22+
throttledPromiseAll.add(
23+
i,
24+
(source) => new Promise((resolve) => setTimeout(() => resolve(source + 1), (5 - i) * 100))
25+
);
2326
}
2427
await throttledPromiseAll.all();
2528
const results = throttledPromiseAll.results as number[];
@@ -65,7 +68,7 @@ describe('throttledPromiseAll', () => {
6568
});
6669
throttledPromiseAll.add(
6770
[1, 2, 3, 4, 5],
68-
(source) => new Promise((resolve) => setTimeout(() => resolve(source + 1), 10000))
71+
(source) => new Promise((resolve) => setTimeout(() => resolve(source + 1), 200))
6972
);
7073
await throttledPromiseAll.all();
7174
} catch (e) {
@@ -87,4 +90,44 @@ describe('throttledPromiseAll', () => {
8790
const results = throttledPromiseAll.results as number[];
8891
expect(results).to.deep.equal([1, 2, 3, 4, 5].map((i) => i + 1));
8992
});
93+
94+
it('empty array', async () => {
95+
const throttledPromiseAll: ThrottledPromiseAll<number, number> = new ThrottledPromiseAll({ concurrency: 5 });
96+
await throttledPromiseAll.all();
97+
expect(throttledPromiseAll.results).to.deep.equal([]);
98+
});
99+
100+
it('add single arg that returns undefined', async () => {
101+
const throttledPromiseAll: ThrottledPromiseAll<number, number> = new ThrottledPromiseAll({ concurrency: 5 });
102+
throttledPromiseAll.add(0, () => Promise.resolve(undefined));
103+
await throttledPromiseAll.all();
104+
expect(throttledPromiseAll.results).to.deep.equal([undefined]);
105+
});
106+
107+
it('add single arg that returns null', async () => {
108+
const throttledPromiseAll: ThrottledPromiseAll<number, number> = new ThrottledPromiseAll({ concurrency: 5 });
109+
throttledPromiseAll.add(0, () => Promise.resolve(-10));
110+
await throttledPromiseAll.all();
111+
expect(throttledPromiseAll.results).to.deep.equal([-10]);
112+
});
113+
114+
it('add with producer of undefined', async () => {
115+
const throttledPromiseAll: ThrottledPromiseAll<number, number> = new ThrottledPromiseAll({ concurrency: 5 });
116+
throttledPromiseAll.add(0, () => Promise.resolve(undefined));
117+
[1, 2, 3, 4, 5].forEach((i) => throttledPromiseAll.add(i, numberProducer));
118+
throttledPromiseAll.add(6, () => Promise.resolve(undefined));
119+
await throttledPromiseAll.all();
120+
expect(throttledPromiseAll.results).to.deep.equal([undefined, 2, 3, 4, 5, 6, undefined]);
121+
});
122+
123+
it('multiple adds to check order/sort', async () => {
124+
const throttledPromiseAll: ThrottledPromiseAll<number, number> = new ThrottledPromiseAll({ concurrency: 2 });
125+
throttledPromiseAll.add(0, () => Promise.resolve(undefined));
126+
[1, 2].forEach((i) => throttledPromiseAll.add(i, numberProducer));
127+
throttledPromiseAll.add(6, () => Promise.resolve(undefined));
128+
[6, 7].forEach((i) => throttledPromiseAll.add(i, numberProducer));
129+
throttledPromiseAll.add(6, () => Promise.resolve(undefined));
130+
await throttledPromiseAll.all();
131+
expect(throttledPromiseAll.results).to.deep.equal([undefined, 2, 3, undefined, 7, 8, undefined]);
132+
});
90133
});

yarn.lock

+1-6
Original file line numberDiff line numberDiff line change
@@ -716,12 +716,7 @@ acorn-walk@^8.1.1:
716716
version "8.1.1"
717717
resolved "https://registry.npmjs.org/acorn-walk/-/acorn-walk-8.1.1.tgz"
718718

719-
acorn@^8.4.1:
720-
version "8.8.0"
721-
resolved "https://registry.npmjs.org/acorn/-/acorn-8.8.0.tgz#88c0187620435c7f6015803f5539dae05a9dbea8"
722-
integrity sha512-QOxyigPVrpZ2GXT+PFyZTl6TtOFc5egxHIP9IlQ+RbupQuX4RkT/Bee4/kQuC02Xkzg84JcT7oLYtDIQxp+v7w==
723-
724-
acorn@^8.9.0:
719+
acorn@^8.4.1, acorn@^8.9.0:
725720
version "8.9.0"
726721
resolved "https://registry.npmjs.org/acorn/-/acorn-8.9.0.tgz#78a16e3b2bcc198c10822786fa6679e245db5b59"
727722
integrity sha512-jaVNAFBHNLXspO543WnNNPZFRtavh3skAkITqD0/2aeMkKZTN+254PyhwxFYrk3vQ1xfY+2wbesJMs/JC8/PwQ==

0 commit comments

Comments
 (0)