Skip to content

Commit 26ad106

Browse files
committed
fix: too greedy locking
brings over agenda/agenda#1086 thanks to @leonardlin
1 parent c00a7a2 commit 26ad106

File tree

3 files changed

+94
-15
lines changed

3 files changed

+94
-15
lines changed

src/JobProcessor.ts

+27-15
Original file line numberDiff line numberDiff line change
@@ -87,6 +87,8 @@ export class JobProcessor {
8787

8888
private isLockingOnTheFly = false;
8989

90+
private isJobQueueFilling = new Map<string, boolean>();
91+
9092
private isRunning = true;
9193

9294
private processInterval?: ReturnType<typeof setInterval>;
@@ -211,19 +213,24 @@ export class JobProcessor {
211213
return;
212214
}
213215

216+
this.isLockingOnTheFly = true;
217+
214218
// Set that we are running this
215219
try {
216-
this.isLockingOnTheFly = true;
217-
218220
// Grab a job that needs to be locked
219221
const job = this.jobsToLock.pop();
220222

221223
if (job) {
224+
if (this.isJobQueueFilling.has(job.attrs.name)) {
225+
log.extend('lockOnTheFly')('jobQueueFilling already running for: %s', job.attrs.name);
226+
return;
227+
}
228+
222229
// If locking limits have been hit, stop locking on the fly.
223230
// Jobs that were waiting to be locked will be picked up during a
224231
// future locking interval.
225232
if (!this.shouldLock(job.attrs.name)) {
226-
log.extend('lockOnTheFly')('lock limit hit for: [%s]', job.attrs.name);
233+
log.extend('lockOnTheFly')('lock limit hit for: [%s:%S]', job.attrs.name, job.attrs._id);
227234
this.jobsToLock = [];
228235
return;
229236
}
@@ -253,8 +260,9 @@ export class JobProcessor {
253260
}
254261

255262
log.extend('lockOnTheFly')(
256-
'found job [%s] that can be locked on the fly',
257-
jobToEnqueue.attrs.name
263+
'found job [%s:%s] that can be locked on the fly',
264+
jobToEnqueue.attrs.name,
265+
jobToEnqueue.attrs._id
258266
);
259267
this.updateStatus(jobToEnqueue.attrs.name, 'locked', +1);
260268
this.lockedJobs.push(jobToEnqueue);
@@ -302,18 +310,20 @@ export class JobProcessor {
302310
* @returns {undefined}
303311
*/
304312
private async jobQueueFilling(name: string): Promise<void> {
305-
// Don't lock because of a limit we have set (lockLimit, etc)
306-
if (!this.shouldLock(name)) {
307-
log.extend('jobQueueFilling')('lock limit reached in queue filling for [%s]', name);
308-
return;
309-
}
313+
this.isJobQueueFilling.set(name, true);
310314

311-
// Set the date of the next time we are going to run _processEvery function
312-
const now = new Date();
313-
this.nextScanAt = new Date(now.valueOf() + this.processEvery);
314-
315-
// For this job name, find the next job to run and lock it!
316315
try {
316+
// Don't lock because of a limit we have set (lockLimit, etc)
317+
if (!this.shouldLock(name)) {
318+
log.extend('jobQueueFilling')('lock limit reached in queue filling for [%s]', name);
319+
return;
320+
}
321+
322+
// Set the date of the next time we are going to run _processEvery function
323+
const now = new Date();
324+
this.nextScanAt = new Date(now.valueOf() + this.processEvery);
325+
326+
// For this job name, find the next job to run and lock it!
317327
const job = await this.findAndLockNextJob(name, this.agenda.definitions[name]);
318328

319329
// Still have the job?
@@ -354,6 +364,8 @@ export class JobProcessor {
354364
} catch (error) {
355365
log.extend('jobQueueFilling')('[%s] job lock failed while filling queue', name, error);
356366
this.agenda.emit('error', error);
367+
} finally {
368+
this.isJobQueueFilling.delete(name);
357369
}
358370
}
359371

test/agenda.test.ts

+43
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22
import * as delay from 'delay';
33
import { Db } from 'mongodb';
44
import { expect } from 'chai';
5+
import { fail } from 'assert';
56
import { mockMongo } from './helpers/mock-mongodb';
67

78
import { Agenda } from '../src';
@@ -660,6 +661,48 @@ describe('Agenda', () => {
660661
});
661662

662663
describe('process jobs', () => {
664+
// eslint-disable-line prefer-arrow-callback
665+
it('do not run failed jobs again', async () => {
666+
const unhandledRejections: any[] = [];
667+
const rejectionsHandler = error => unhandledRejections.push(error);
668+
process.on('unhandledRejection', rejectionsHandler);
669+
670+
let jprocesses = 0;
671+
672+
globalAgenda.define('failing job', async _job => {
673+
console.log('FALING JOB');
674+
jprocesses++;
675+
throw new Error('failed');
676+
});
677+
678+
let failCalled = false;
679+
globalAgenda.on('fail:failing job', err => {
680+
console.log('ERROR FAILING JOB', err);
681+
failCalled = true;
682+
});
683+
684+
let errorCalled = false;
685+
globalAgenda.on('error', err => {
686+
console.log('GLOBAL ERROR', err);
687+
688+
errorCalled = true;
689+
});
690+
691+
globalAgenda.processEvery(100);
692+
await globalAgenda.start();
693+
694+
await globalAgenda.now('failing job');
695+
696+
await delay(500);
697+
698+
process.removeListener('unhandledRejection', rejectionsHandler);
699+
700+
expect(jprocesses).to.be.equal(1);
701+
expect(errorCalled).to.be.false;
702+
expect(failCalled).to.be.true;
703+
expect(unhandledRejections).to.have.length(0);
704+
}).timeout(10000);
705+
663706
// eslint-disable-line prefer-arrow-callback
664707
it('ensure there is no unhandledPromise on job timeouts', async () => {
665708
const unhandledRejections: any[] = [];

test/job.test.ts

+24
Original file line numberDiff line numberDiff line change
@@ -962,6 +962,30 @@ describe('Job', () => {
962962
expect((await agenda.getRunningStats()).lockedJobs).to.equal(1);
963963
});
964964

965+
it('does not on-the-fly lock more mixed jobs than agenda._lockLimit jobs', async () => {
966+
agenda.lockLimit(1);
967+
968+
agenda.define('lock job', (job, cb) => {}); // eslint-disable-line no-unused-vars
969+
agenda.define('lock job2', (job, cb) => {}); // eslint-disable-line no-unused-vars
970+
agenda.define('lock job3', (job, cb) => {}); // eslint-disable-line no-unused-vars
971+
agenda.define('lock job4', (job, cb) => {}); // eslint-disable-line no-unused-vars
972+
agenda.define('lock job5', (job, cb) => {}); // eslint-disable-line no-unused-vars
973+
974+
await agenda.start();
975+
976+
await Promise.all([
977+
agenda.now('lock job', { i: 1 }),
978+
agenda.now('lock job5', { i: 2 }),
979+
agenda.now('lock job4', { i: 3 }),
980+
agenda.now('lock job3', { i: 4 }),
981+
agenda.now('lock job2', { i: 5 })
982+
]);
983+
984+
await delay(500);
985+
expect((await agenda.getRunningStats()).lockedJobs).to.equal(1);
986+
await agenda.stop();
987+
});
988+
965989
it('does not on-the-fly lock more than definition.lockLimit jobs', async () => {
966990
agenda.define('lock job', (job, cb) => {}, { lockLimit: 1 }); // eslint-disable-line no-unused-vars
967991

0 commit comments

Comments
 (0)