Skip to content

Commit 9854007

Browse files
committed
fix(jobprocessor): prevent overloading of job queue processing
1 parent 2919083 commit 9854007

File tree

2 files changed

+155
-129
lines changed

2 files changed

+155
-129
lines changed

src/JobProcessor.ts

+153-127
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,8 @@ export class JobProcessor {
2323
| undefined;
2424
} = {};
2525

26+
private localQueueProcessing = 0;
27+
2628
async getStatus(fullDetails = false) {
2729
// eslint-disable-next-line @typescript-eslint/no-var-requires,global-require
2830
const { version } = require('../package.json');
@@ -339,63 +341,82 @@ export class JobProcessor {
339341
* Internal method that processes any jobs in the local queue (array)
340342
* @returns {undefined}
341343
*/
342-
private jobProcessing() {
344+
private async jobProcessing() {
343345
// Ensure we have jobs
344346
if (this.jobQueue.length === 0) {
345347
return;
346348
}
347349

348-
const now = new Date();
350+
this.localQueueProcessing++;
349351

350-
// Check if there is any job that is not blocked by concurrency
351-
const job = this.jobQueue.returnNextConcurrencyFreeJob(this.jobStatus);
352+
let jobEnqueued = false;
353+
try {
354+
const now = new Date();
352355

353-
if (!job) {
354-
log.extend('jobProcessing')('[%s:%s] there is no job to process');
355-
return;
356-
}
356+
// Check if there is any job that is not blocked by concurrency
357+
const job = this.jobQueue.returnNextConcurrencyFreeJob(this.jobStatus);
357358

358-
log.extend('jobProcessing')('[%s:%s] there is a job to process', job.attrs.name, job.attrs._id);
359+
if (!job) {
360+
log.extend('jobProcessing')('[%s:%s] there is no job to process');
361+
return;
362+
}
359363

360-
// If the 'nextRunAt' time is older than the current time, run the job
361-
// Otherwise, setTimeout that gets called at the time of 'nextRunAt'
362-
if (job.attrs.nextRunAt <= now) {
363364
log.extend('jobProcessing')(
364-
'[%s:%s] nextRunAt is in the past, run the job immediately',
365+
'[%s:%s] there is a job to process',
365366
job.attrs.name,
366367
job.attrs._id
367368
);
368-
this.runOrRetry(job);
369-
} else {
370-
const runIn = job.attrs.nextRunAt.getTime() - now.getTime();
371-
log.extend('jobProcessing')(
372-
'[%s:%s] nextRunAt is in the future, calling setTimeout(%d)',
373-
job.attrs.name,
374-
job.attrs._id,
375-
runIn
376-
);
377-
setTimeout(() => {
378-
this.jobProcessing();
379-
}, runIn);
380-
}
381369

382-
// additionally run again and check if there are more jobs that we can process right now (as long concurrency not reached)
383-
setImmediate(() => this.jobProcessing());
370+
// If the 'nextRunAt' time is older than the current time, run the job
371+
// Otherwise, setTimeout that gets called at the time of 'nextRunAt'
372+
if (job.attrs.nextRunAt <= now) {
373+
log.extend('jobProcessing')(
374+
'[%s:%s] nextRunAt is in the past, run the job immediately',
375+
job.attrs.name,
376+
job.attrs._id
377+
);
378+
jobEnqueued = await this.runOrRetry(job);
379+
} else {
380+
const runIn = job.attrs.nextRunAt.getTime() - now.getTime();
381+
log.extend('jobProcessing')(
382+
'[%s:%s] nextRunAt is in the future, calling setTimeout(%d)',
383+
job.attrs.name,
384+
job.attrs._id,
385+
runIn
386+
);
387+
setTimeout(() => {
388+
this.jobProcessing();
389+
}, runIn);
390+
}
391+
// console.log('this.localQueueProcessing', this.localQueueProcessing);
392+
if (this.localQueueProcessing < this.maxConcurrency && jobEnqueued) {
393+
// additionally run again and check if there are more jobs that we can process right now (as long concurrency not reached)
394+
setImmediate(() => this.jobProcessing());
395+
} /* else {
396+
console.log(
397+
'NOT CALLING JOB PROCESSING AGAIN DUE TO',
398+
this.localQueueProcessing,
399+
this.maxConcurrency
400+
);
401+
} */
402+
} finally {
403+
this.localQueueProcessing--;
404+
}
384405
}
385406

386407
/**
387408
* Internal method that tries to run a job and if it fails, retries again!
388-
* @returns {undefined}
409+
* @returns {boolean} processed a job or not
389410
*/
390-
private async runOrRetry(job: Job) {
411+
private async runOrRetry(job: Job): Promise<boolean> {
391412
if (!this.isRunning) {
392413
// const a = new Error();
393414
// console.log('STACK', a.stack);
394415
log.extend('runOrRetry')(
395416
'JobProcessor got stopped already while calling runOrRetry, returning!',
396417
this
397418
);
398-
return;
419+
return false;
399420
}
400421

401422
this.jobQueue.remove(job);
@@ -429,114 +450,119 @@ export class JobProcessor {
429450
this.lockedJobs.splice(lockedJobIndex, 1);
430451
this.updateStatus(job.attrs.name, 'locked', -1);
431452
this.jobProcessing();
432-
return;
453+
return false;
433454
}
434455

435-
// Add to local "running" queue
436-
this.runningJobs.push(job);
437-
this.updateStatus(job.attrs.name, 'running', 1);
438-
439-
try {
440-
log.extend('runOrRetry')('[%s:%s] processing job', job.attrs.name, job.attrs._id);
441-
442-
// check if the job is still alive
443-
const checkIfJobIsStillAlive = () => {
444-
// check every "this.agenda.definitions[job.attrs.name].lockLifetime / 2"" (or at mininum every processEvery)
445-
return new Promise((resolve, reject) =>
446-
setTimeout(() => {
447-
// when job is not running anymore, just finish
448-
if (!job.isRunning()) {
449-
resolve();
450-
return;
451-
}
452-
453-
if (job.isDead()) {
454-
reject(
455-
new Error(
456-
`execution of '${job.attrs.name}' canceled, execution took more than ${
457-
this.agenda.definitions[job.attrs.name].lockLifetime
458-
}ms. Call touch() for long running jobs to keep them alive.`
459-
)
460-
);
461-
return;
462-
}
463-
464-
resolve(checkIfJobIsStillAlive());
465-
}, Math.max(this.processEvery, this.agenda.definitions[job.attrs.name].lockLifetime / 2))
466-
);
467-
};
468-
469-
// CALL THE ACTUAL METHOD TO PROCESS THE JOB!!!
470-
await Promise.race([job.run(), checkIfJobIsStillAlive()]);
456+
const runJob = async () => {
457+
// Add to local "running" queue
458+
this.runningJobs.push(job);
459+
this.updateStatus(job.attrs.name, 'running', 1);
460+
461+
try {
462+
log.extend('runOrRetry')('[%s:%s] processing job', job.attrs.name, job.attrs._id);
463+
464+
// check if the job is still alive
465+
const checkIfJobIsStillAlive = () => {
466+
// check every "this.agenda.definitions[job.attrs.name].lockLifetime / 2"" (or at mininum every processEvery)
467+
return new Promise((resolve, reject) =>
468+
setTimeout(() => {
469+
// when job is not running anymore, just finish
470+
if (!job.isRunning()) {
471+
resolve();
472+
return;
473+
}
474+
475+
if (job.isDead()) {
476+
reject(
477+
new Error(
478+
`execution of '${job.attrs.name}' canceled, execution took more than ${
479+
this.agenda.definitions[job.attrs.name].lockLifetime
480+
}ms. Call touch() for long running jobs to keep them alive.`
481+
)
482+
);
483+
return;
484+
}
485+
486+
resolve(checkIfJobIsStillAlive());
487+
}, Math.max(this.processEvery, this.agenda.definitions[job.attrs.name].lockLifetime / 2))
488+
);
489+
};
471490

472-
log.extend('runOrRetry')(
473-
'[%s:%s] processing job successfull',
474-
job.attrs.name,
475-
job.attrs._id
476-
);
491+
// CALL THE ACTUAL METHOD TO PROCESS THE JOB!!!
492+
await Promise.race([job.run(), checkIfJobIsStillAlive()]);
477493

478-
// Job isn't in running jobs so throw an error
479-
if (!this.runningJobs.includes(job)) {
480494
log.extend('runOrRetry')(
481-
'[%s] callback was called, job must have been marked as complete already',
495+
'[%s:%s] processing job successfull',
496+
job.attrs.name,
482497
job.attrs._id
483498
);
484-
throw new Error(
485-
`callback already called - job ${job.attrs.name} already marked complete`
486-
);
487-
}
488-
} catch (err) {
489-
job.canceled = err;
490-
log.extend('runOrRetry')(
491-
'[%s:%s] processing job failed',
492-
job.attrs.name,
493-
job.attrs._id,
494-
err
495-
);
496-
job.agenda.emit('error', err);
497-
} finally {
498-
// Remove the job from the running queue
499-
let runningJobIndex = this.runningJobs.indexOf(job);
500-
if (runningJobIndex === -1) {
501-
// lookup by id
502-
runningJobIndex = this.runningJobs.findIndex(
503-
j => j.attrs._id?.toString() === job.attrs._id?.toString()
504-
);
505-
}
506-
if (runningJobIndex === -1) {
507-
// eslint-disable-next-line no-unsafe-finally
508-
throw new Error(`cannot find job ${job.attrs._id} in running jobs queue?`);
509-
}
510-
this.runningJobs.splice(runningJobIndex, 1);
511-
this.updateStatus(job.attrs.name, 'running', -1);
512499

513-
// Remove the job from the locked queue
514-
let lockedJobIndex = this.lockedJobs.indexOf(job);
515-
if (lockedJobIndex === -1) {
516-
// lookup by id
517-
lockedJobIndex = this.lockedJobs.findIndex(
518-
j => j.attrs._id?.toString() === job.attrs._id?.toString()
500+
// Job isn't in running jobs so throw an error
501+
if (!this.runningJobs.includes(job)) {
502+
log.extend('runOrRetry')(
503+
'[%s] callback was called, job must have been marked as complete already',
504+
job.attrs._id
505+
);
506+
throw new Error(
507+
`callback already called - job ${job.attrs.name} already marked complete`
508+
);
509+
}
510+
} catch (err) {
511+
job.canceled = err;
512+
log.extend('runOrRetry')(
513+
'[%s:%s] processing job failed',
514+
job.attrs.name,
515+
job.attrs._id,
516+
err
519517
);
518+
job.agenda.emit('error', err);
519+
} finally {
520+
// Remove the job from the running queue
521+
let runningJobIndex = this.runningJobs.indexOf(job);
522+
if (runningJobIndex === -1) {
523+
// lookup by id
524+
runningJobIndex = this.runningJobs.findIndex(
525+
j => j.attrs._id?.toString() === job.attrs._id?.toString()
526+
);
527+
}
528+
if (runningJobIndex === -1) {
529+
// eslint-disable-next-line no-unsafe-finally
530+
throw new Error(`cannot find job ${job.attrs._id} in running jobs queue?`);
531+
}
532+
this.runningJobs.splice(runningJobIndex, 1);
533+
this.updateStatus(job.attrs.name, 'running', -1);
534+
535+
// Remove the job from the locked queue
536+
let lockedJobIndex = this.lockedJobs.indexOf(job);
537+
if (lockedJobIndex === -1) {
538+
// lookup by id
539+
lockedJobIndex = this.lockedJobs.findIndex(
540+
j => j.attrs._id?.toString() === job.attrs._id?.toString()
541+
);
542+
}
543+
if (lockedJobIndex === -1) {
544+
// eslint-disable-next-line no-unsafe-finally
545+
throw new Error(`cannot find job ${job.attrs._id} in locked jobs queue?`);
546+
}
547+
this.lockedJobs.splice(lockedJobIndex, 1);
548+
this.updateStatus(job.attrs.name, 'locked', -1);
520549
}
521-
if (lockedJobIndex === -1) {
522-
// eslint-disable-next-line no-unsafe-finally
523-
throw new Error(`cannot find job ${job.attrs._id} in locked jobs queue?`);
524-
}
525-
this.lockedJobs.splice(lockedJobIndex, 1);
526-
this.updateStatus(job.attrs.name, 'locked', -1);
527-
}
528550

529-
// Re-process jobs now that one has finished
530-
setImmediate(() => this.jobProcessing());
531-
} else {
532-
// Run the job immediately by putting it on the top of the queue
533-
log.extend('runOrRetry')(
534-
'[%s:%s] concurrency preventing immediate run, pushing job to top of queue',
535-
job.attrs.name,
536-
job.attrs._id
537-
);
538-
this.enqueueJob(job);
551+
// Re-process jobs now that one has finished
552+
setImmediate(() => this.jobProcessing());
553+
};
554+
runJob();
555+
return true;
539556
}
557+
558+
// Run the job later
559+
log.extend('runOrRetry')(
560+
'[%s:%s] concurrency preventing immediate run, pushing job to top of queue',
561+
job.attrs.name,
562+
job.attrs._id
563+
);
564+
this.enqueueJob(job);
565+
return false;
540566
}
541567

542568
private updateStatus(name: string, key: 'locked' | 'running', number: -1 | 1) {

test/job.test.ts

+2-2
Original file line numberDiff line numberDiff line change
@@ -1233,7 +1233,7 @@ describe('Job', () => {
12331233
done();
12341234
return n.send('exit');
12351235
}
1236-
} else {
1236+
} else if (!doneCalled) {
12371237
return done(new Error('Jobs did not run!'));
12381238
}
12391239
};
@@ -1353,7 +1353,7 @@ describe('Job', () => {
13531353
done();
13541354
return n.send('exit');
13551355
}
1356-
} else {
1356+
} else if (!doneCalled) {
13571357
return done(new Error('Jobs did not run!'));
13581358
}
13591359
};

0 commit comments

Comments
 (0)