Skip to content

Commit 54bc53c

Browse files
committed
fix: job processor handling for recurring jobs could fill up queue and block processing
1 parent 1aaaa61 commit 54bc53c

7 files changed

+79
-48
lines changed

src/JobProcessingQueue.ts

+8-1
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,10 @@ export class JobProcessingQueue {
1919
return this._queue.length;
2020
}
2121

22+
getQueue(): Job[] {
23+
return this._queue;
24+
}
25+
2226
/**
2327
* Pops and returns last queue element (next job to be processed) without checking concurrency.
2428
* @returns {Job} Next Job to be processed
@@ -111,10 +115,13 @@ export class JobProcessingQueue {
111115
// and if concurrency limit is not reached yet (actual running jobs is lower than max concurrency)
112116
if (
113117
def &&
114-
this._queue[i].attrs.nextRunAt &&
115118
!handledJobs.includes(this._queue[i].attrs._id) &&
116119
(!status || !def.concurrency || status.running < def.concurrency)
117120
) {
121+
if (!this._queue[i].attrs.nextRunAt) {
122+
console.log('this._queue[i]', this._queue[i].attrs);
123+
throw new Error('no nextRunAt date');
124+
}
118125
return true;
119126
}
120127
return false;

src/JobProcessor.ts

+53-40
Original file line numberDiff line numberDiff line change
@@ -47,13 +47,21 @@ export class JobProcessor {
4747
version,
4848
queueName: this.agenda.attrs.name,
4949
totalQueueSizeDB: await this.agenda.db.getQueueSize(),
50+
internal: {
51+
localQueueProcessing: this.localQueueProcessing
52+
},
5053
config: {
5154
totalLockLimit: this.totalLockLimit,
5255
maxConcurrency: this.maxConcurrency,
5356
processEvery: this.processEvery
5457
},
5558
jobStatus,
56-
queuedJobs: this.jobQueue.length,
59+
queuedJobs: !fullDetails
60+
? this.jobQueue.length
61+
: this.jobQueue.getQueue().map(job => ({
62+
...job.toJson(),
63+
canceled: job.canceled?.message || job.canceled
64+
})),
5765
runningJobs: !fullDetails
5866
? this.runningJobs.length
5967
: this.runningJobs.map(job => ({
@@ -363,7 +371,7 @@ export class JobProcessor {
363371
* handledJobs keeps list of already processed jobs
364372
* @returns {undefined}
365373
*/
366-
private jobProcessing(handledJobs: IJobParameters['_id'][] = []) {
374+
private async jobProcessing(handledJobs: IJobParameters['_id'][] = []) {
367375
// Ensure we have jobs
368376
if (this.jobQueue.length === 0) {
369377
return;
@@ -381,7 +389,14 @@ export class JobProcessor {
381389
return;
382390
}
383391

384-
log.extend('jobProcessing')('[%s:%s] there is a job to process', job.attrs.name, job.attrs._id);
392+
log.extend('jobProcessing')(
393+
'[%s:%s] there is a job to process (priority = %d)',
394+
job.attrs.name,
395+
job.attrs._id,
396+
job.attrs.priority
397+
);
398+
399+
this.jobQueue.remove(job);
385400

386401
// If the 'nextRunAt' time is older than the current time, run the job
387402
// Otherwise, setTimeout that gets called at the time of 'nextRunAt'
@@ -394,15 +409,39 @@ export class JobProcessor {
394409
this.runOrRetry(job);
395410
} else {
396411
const runIn = job.attrs.nextRunAt.getTime() - now.getTime();
397-
log.extend('jobProcessing')(
398-
'[%s:%s] nextRunAt is in the future, calling setTimeout(%d)',
399-
job.attrs.name,
400-
job.attrs._id,
401-
runIn
402-
);
403-
setTimeout(() => {
404-
this.jobProcessing();
405-
}, runIn);
412+
if (runIn > this.processEvery) {
413+
// this job is not in the near future, remove it (it will be picked up later)
414+
log.extend('runOrRetry')(
415+
'[%s:%s] job is too far away, freeing it up',
416+
job.attrs.name,
417+
job.attrs._id
418+
);
419+
let lockedJobIndex = this.lockedJobs.indexOf(job);
420+
if (lockedJobIndex === -1) {
421+
// lookup by id
422+
lockedJobIndex = this.lockedJobs.findIndex(
423+
j => j.attrs._id?.toString() === job.attrs._id?.toString()
424+
);
425+
}
426+
if (lockedJobIndex === -1) {
427+
throw new Error(`cannot find job ${job.attrs._id} in locked jobs queue?`);
428+
}
429+
430+
this.lockedJobs.splice(lockedJobIndex, 1);
431+
this.updateStatus(job.attrs.name, 'locked', -1);
432+
} else {
433+
log.extend('jobProcessing')(
434+
'[%s:%s] nextRunAt is in the future, calling setTimeout(%d)',
435+
job.attrs.name,
436+
job.attrs._id,
437+
runIn
438+
);
439+
// re add to queue (puts it at the ned of the queue)
440+
this.jobQueue.push(job);
441+
setTimeout(() => {
442+
this.jobProcessing();
443+
}, runIn);
444+
}
406445
}
407446

408447
handledJobs.push(job.attrs._id);
@@ -430,39 +469,13 @@ export class JobProcessor {
430469
return;
431470
}
432471

433-
this.jobQueue.remove(job);
434-
435472
const jobDefinition = this.agenda.definitions[job.attrs.name];
436473
const status = this.jobStatus[job.attrs.name];
437474

438475
if (
439476
(!jobDefinition.concurrency || !status || status.running < jobDefinition.concurrency) &&
440477
this.runningJobs.length < this.maxConcurrency
441478
) {
442-
if (job.isDead()) {
443-
// not needed to update lockedAt in databsase, as the timeout has been reached,
444-
// and it will be picked up anyways again
445-
log.extend('runOrRetry')(
446-
'[%s:%s] job lock has expired, freeing it up',
447-
job.attrs.name,
448-
job.attrs._id
449-
);
450-
let lockedJobIndex = this.lockedJobs.indexOf(job);
451-
if (lockedJobIndex === -1) {
452-
// lookup by id
453-
lockedJobIndex = this.lockedJobs.findIndex(
454-
j => j.attrs._id?.toString() === job.attrs._id?.toString()
455-
);
456-
}
457-
if (lockedJobIndex === -1) {
458-
throw new Error(`cannot find job ${job.attrs._id} in locked jobs queue?`);
459-
}
460-
461-
this.lockedJobs.splice(lockedJobIndex, 1);
462-
this.updateStatus(job.attrs.name, 'locked', -1);
463-
return;
464-
}
465-
466479
// Add to local "running" queue
467480
this.runningJobs.push(job);
468481
this.updateStatus(job.attrs.name, 'running', 1);
@@ -474,9 +487,9 @@ export class JobProcessor {
474487
const checkIfJobIsStillAlive = () => {
475488
// check every "this.agenda.definitions[job.attrs.name].lockLifetime / 2"" (or at mininum every processEvery)
476489
return new Promise((resolve, reject) =>
477-
setTimeout(() => {
490+
setTimeout(async () => {
478491
// when job is not running anymore, just finish
479-
if (!job.isRunning()) {
492+
if (!(await job.isRunning())) {
480493
resolve();
481494
return;
482495
}

src/types/AgendaStatus.ts

+4-1
Original file line numberDiff line numberDiff line change
@@ -13,8 +13,11 @@ export interface IAgendaStatus {
1313
maxConcurrency: number;
1414
processEvery: string | number;
1515
};
16+
internal: {
17+
localQueueProcessing: number;
18+
};
1619
jobStatus?: IAgendaJobStatus;
17-
queuedJobs: number;
20+
queuedJobs: number | IJobParameters[];
1821
runningJobs: number | IJobParameters[];
1922
lockedJobs: number | IJobParameters[];
2023
jobsToLock: number | IJobParameters[];

test/agenda.test.ts

+2-1
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
/* eslint-disable no-console */
12
import * as delay from 'delay';
23
import { Db } from 'mongodb';
34
import { expect } from 'chai';
@@ -463,7 +464,7 @@ describe('Agenda', () => {
463464

464465
it('runs the job immediately', async () => {
465466
globalAgenda.define('immediateJob', async job => {
466-
expect(job.isRunning()).to.be.equal(true);
467+
expect(await job.isRunning()).to.be.equal(true);
467468
await globalAgenda.stop();
468469
});
469470
await globalAgenda.now('immediateJob');

test/job.test.ts

+10-5
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
/* eslint-disable no-console */
12
import * as path from 'path';
23
import * as cp from 'child_process';
34
import { expect } from 'chai';
@@ -1027,7 +1028,8 @@ describe('Job', () => {
10271028
]);
10281029
expect(results).not.to.contain(2);
10291030
} catch (err) {
1030-
console.log('stats', JSON.stringify(await agenda.getRunningStats(), undefined, 3));
1031+
console.log('stats', err, JSON.stringify(await agenda.getRunningStats(), undefined, 3));
1032+
throw err;
10311033
}
10321034
});
10331035

@@ -1064,14 +1066,15 @@ describe('Job', () => {
10641066
]);
10651067
expect(results.join('')).to.eql(results.sort().join(''));
10661068
} catch (err) {
1067-
console.log('stats', JSON.stringify(await agenda.getRunningStats(), undefined, 3));
1069+
console.log('stats', err, JSON.stringify(await agenda.getRunningStats(), undefined, 3));
1070+
throw err;
10681071
}
10691072
});
10701073

10711074
it('should run jobs as first in first out (FIFO) with respect to priority', async () => {
10721075
const now = Date.now();
10731076

1074-
agenda.define('fifo-priority', (job, cb) => cb(), { concurrency: 1 });
1077+
agenda.define('fifo-priority', (job, cb) => setTimeout(cb, 100), { concurrency: 1 });
10751078

10761079
const checkResultsPromise = new Promise(resolve => {
10771080
const times: number[] = [];
@@ -1112,7 +1115,8 @@ describe('Job', () => {
11121115
expect(times.join('')).to.eql(times.sort().join(''));
11131116
expect(priorities).to.eql([10, 10, -10]);
11141117
} catch (err) {
1115-
console.log('stats', JSON.stringify(await agenda.getRunningStats(), undefined, 3));
1118+
console.log('stats', err, JSON.stringify(await agenda.getRunningStats(), undefined, 3));
1119+
throw err;
11161120
}
11171121
});
11181122

@@ -1150,7 +1154,8 @@ describe('Job', () => {
11501154
]);
11511155
expect(results).to.eql([10, 0, -10]);
11521156
} catch (err) {
1153-
// console.log('stats', JSON.stringify(await agenda.getRunningStats(), undefined, 3));
1157+
console.log('stats', JSON.stringify(await agenda.getRunningStats(), undefined, 3));
1158+
throw err;
11541159
}
11551160
});
11561161

test/jobprocessor.test.ts

+1
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
/* eslint-disable no-console */
12
import { expect } from 'chai';
23

34
import { Db } from 'mongodb';

test/retry.test.ts

+1
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
/* eslint-disable no-console */
12
import { Db } from 'mongodb';
23
import * as delay from 'delay';
34
import { mockMongo } from './helpers/mock-mongodb';

0 commit comments

Comments
 (0)