Skip to content

Commit 116a887

Browse files
authored
feat: add maxSize to queues (#2742)
Also restore accidentally deleted tests
1 parent 35b4802 commit 116a887

File tree

5 files changed

+217
-0
lines changed

5 files changed

+217
-0
lines changed

packages/utils/package.json

+1
Original file line numberDiff line numberDiff line change
@@ -171,6 +171,7 @@
171171
"uint8arrays": "^5.1.0"
172172
},
173173
"devDependencies": {
174+
"@libp2p/peer-id": "^5.0.4",
174175
"@types/netmask": "^2.0.5",
175176
"aegir": "^44.0.1",
176177
"benchmark": "^2.1.4",

packages/utils/src/errors.ts

+9
Original file line numberDiff line numberDiff line change
@@ -18,3 +18,12 @@ export class RateLimitError extends Error {
1818
this.isFirstInDuration = props.isFirstInDuration
1919
}
2020
}
21+
22+
export class QueueFullError extends Error {
23+
static name = 'QueueFullError'
24+
25+
constructor (message: string = 'The queue was full') {
26+
super(message)
27+
this.name = 'QueueFullError'
28+
}
29+
}

packages/utils/src/queue/index.ts

+15
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
import { AbortError, TypedEventEmitter } from '@libp2p/interface'
22
import { pushable } from 'it-pushable'
33
import { raceEvent } from 'race-event'
4+
import { QueueFullError } from '../errors.js'
45
import { Job } from './job.js'
56
import type { AbortOptions, Metrics } from '@libp2p/interface'
67

@@ -21,6 +22,14 @@ export interface QueueInit<JobReturnType, JobOptions extends AbortOptions = Abor
2122
*/
2223
concurrency?: number
2324

25+
/**
26+
* If the queue size grows to larger than this number the promise returned
27+
* from the add function will reject
28+
*
29+
* @default Infinity
30+
*/
31+
maxSize?: number
32+
2433
/**
2534
* The name of the metric for the queue length
2635
*/
@@ -114,6 +123,7 @@ export interface QueueEvents<JobReturnType, JobOptions extends AbortOptions = Ab
114123
*/
115124
export class Queue<JobReturnType = unknown, JobOptions extends AbortOptions = AbortOptions> extends TypedEventEmitter<QueueEvents<JobReturnType, JobOptions>> {
116125
public concurrency: number
126+
public maxSize: number
117127
public queue: Array<Job<JobOptions, JobReturnType>>
118128
private pending: number
119129
private readonly sort?: Comparator<Job<JobOptions, JobReturnType>>
@@ -122,6 +132,7 @@ export class Queue<JobReturnType = unknown, JobOptions extends AbortOptions = Ab
122132
super()
123133

124134
this.concurrency = init.concurrency ?? Number.POSITIVE_INFINITY
135+
this.maxSize = init.maxSize ?? Number.POSITIVE_INFINITY
125136
this.pending = 0
126137

127138
if (init.metricName != null) {
@@ -212,6 +223,10 @@ export class Queue<JobReturnType = unknown, JobOptions extends AbortOptions = Ab
212223
async add (fn: RunFunction<JobOptions, JobReturnType>, options?: JobOptions): Promise<JobReturnType> {
213224
options?.signal?.throwIfAborted()
214225

226+
if (this.size === this.maxSize) {
227+
throw new QueueFullError()
228+
}
229+
215230
const job = new Job<JobOptions, JobReturnType>(fn, options)
216231
this.enqueue(job)
217232
this.safeDispatchEvent('add')
+172
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,172 @@
1+
/* eslint-env mocha */
2+
3+
import { generateKeyPair } from '@libp2p/crypto/keys'
4+
import { peerIdFromPrivateKey } from '@libp2p/peer-id'
5+
import { expect } from 'aegir/chai'
6+
import delay from 'delay'
7+
import pDefer from 'p-defer'
8+
import { raceEvent } from 'race-event'
9+
import { PeerQueue, type PeerQueueJobOptions } from '../src/peer-queue.js'
10+
import type { QueueJobFailure, QueueJobSuccess } from '../src/queue/index.js'
11+
12+
describe('peer queue', () => {
13+
it('should have jobs', async () => {
14+
const deferred = pDefer()
15+
16+
const privateKeyA = await generateKeyPair('Ed25519')
17+
const peerIdA = peerIdFromPrivateKey(privateKeyA)
18+
const privateKeyB = await generateKeyPair('Ed25519')
19+
const peerIdB = peerIdFromPrivateKey(privateKeyB)
20+
const queue = new PeerQueue({
21+
concurrency: 1
22+
})
23+
24+
expect(queue.has(peerIdA)).to.be.false()
25+
26+
void queue.add(async () => {
27+
await deferred.promise
28+
}, {
29+
peerId: peerIdB
30+
})
31+
32+
void queue.add(async () => {
33+
await deferred.promise
34+
}, {
35+
peerId: peerIdA
36+
})
37+
38+
expect(queue.has(peerIdA)).to.be.true()
39+
40+
deferred.resolve()
41+
42+
await queue.onIdle()
43+
44+
expect(queue.has(peerIdA)).to.be.false()
45+
})
46+
47+
it('can join existing jobs', async () => {
48+
const value = 'hello world'
49+
const deferred = pDefer<string>()
50+
51+
const privateKeyA = await generateKeyPair('Ed25519')
52+
const peerIdA = peerIdFromPrivateKey(privateKeyA)
53+
const queue = new PeerQueue<string>({
54+
concurrency: 1
55+
})
56+
57+
expect(queue.has(peerIdA)).to.be.false()
58+
expect(queue.find(peerIdA)).to.be.undefined()
59+
60+
void queue.add(async () => {
61+
return deferred.promise
62+
}, {
63+
peerId: peerIdA
64+
})
65+
66+
const job = queue.find(peerIdA)
67+
const join = job?.join()
68+
69+
deferred.resolve(value)
70+
71+
await expect(join).to.eventually.equal(value)
72+
73+
expect(queue.has(peerIdA)).to.be.false()
74+
expect(queue.find(peerIdA)).to.be.undefined()
75+
})
76+
77+
it('can join an existing job that fails', async () => {
78+
const error = new Error('nope!')
79+
const deferred = pDefer<string>()
80+
81+
const privateKeyA = await generateKeyPair('Ed25519')
82+
const peerIdA = peerIdFromPrivateKey(privateKeyA)
83+
const queue = new PeerQueue<string>({
84+
concurrency: 1
85+
})
86+
87+
void queue.add(async () => {
88+
return deferred.promise
89+
}, {
90+
peerId: peerIdA
91+
})
92+
.catch(() => {})
93+
94+
const job = queue.find(peerIdA)
95+
const joinedJob = job?.join()
96+
97+
deferred.reject(error)
98+
99+
await expect(joinedJob).to.eventually.rejected
100+
.with.property('message', error.message)
101+
})
102+
103+
it('cannot join jobs after clear', async () => {
104+
const value = 'hello world'
105+
const deferred = pDefer<string>()
106+
107+
const privateKeyA = await generateKeyPair('Ed25519')
108+
const peerIdA = peerIdFromPrivateKey(privateKeyA)
109+
const queue = new PeerQueue<string>({
110+
concurrency: 1
111+
})
112+
113+
expect(queue.has(peerIdA)).to.be.false()
114+
expect(queue.find(peerIdA)).to.be.undefined()
115+
116+
void queue.add(async () => {
117+
return deferred.promise
118+
}, {
119+
peerId: peerIdA
120+
}).catch(() => {})
121+
122+
queue.clear()
123+
124+
expect(queue.find(peerIdA)).to.be.undefined()
125+
126+
deferred.resolve(value)
127+
})
128+
129+
it('emits success event', async () => {
130+
const value = 'hello world'
131+
132+
const privateKeyA = await generateKeyPair('Ed25519')
133+
const peerIdA = peerIdFromPrivateKey(privateKeyA)
134+
const queue = new PeerQueue<string>({
135+
concurrency: 1
136+
})
137+
138+
void queue.add(async () => {
139+
await delay(100)
140+
return value
141+
}, {
142+
peerId: peerIdA
143+
}).catch(() => {})
144+
145+
const event = await raceEvent<CustomEvent<QueueJobSuccess<string, PeerQueueJobOptions>>>(queue, 'success')
146+
147+
expect(event.detail.job.options.peerId).to.equal(peerIdA)
148+
expect(event.detail.result).to.equal(value)
149+
})
150+
151+
it('emits failure event', async () => {
152+
const err = new Error('Oh no!')
153+
154+
const privateKeyA = await generateKeyPair('Ed25519')
155+
const peerIdA = peerIdFromPrivateKey(privateKeyA)
156+
const queue = new PeerQueue<string>({
157+
concurrency: 1
158+
})
159+
160+
void queue.add(async () => {
161+
await delay(100)
162+
throw err
163+
}, {
164+
peerId: peerIdA
165+
}).catch(() => {})
166+
167+
const event = await raceEvent<CustomEvent<QueueJobFailure<string, PeerQueueJobOptions>>>(queue, 'failure')
168+
169+
expect(event.detail.job.options.peerId).to.equal(peerIdA)
170+
expect(event.detail.error).to.equal(err)
171+
})
172+
})

packages/utils/test/queue.spec.ts

+20
Original file line numberDiff line numberDiff line change
@@ -800,4 +800,24 @@ describe('queue', () => {
800800
// job not in queue any more
801801
expect(queue.queue.find(job => !job.options.slow)).to.be.undefined()
802802
})
803+
804+
it('rejects job when the queue is full', async () => {
805+
const queue = new Queue<string>({
806+
concurrency: 1,
807+
maxSize: 1
808+
})
809+
810+
const job = async (): Promise<string> => {
811+
await delay(100)
812+
return 'hello'
813+
}
814+
815+
const p = queue.add(job)
816+
817+
await expect(queue.add(job)).to.eventually.be.rejected
818+
.with.property('name', 'QueueFullError')
819+
820+
await expect(p).to.eventually.equal('hello')
821+
await expect(queue.add(job)).to.eventually.equal('hello')
822+
})
803823
})

0 commit comments

Comments
 (0)