Skip to content

Commit 413f673

Browse files
committed
fix: job processor localQueueProcessing flag
1 parent d0ff5b3 commit 413f673

File tree

3 files changed

+64
-61
lines changed

3 files changed

+64
-61
lines changed

src/JobDbRepository.ts

+1-1
Original file line numberDiff line numberDiff line change
@@ -251,7 +251,7 @@ export class JobDbRepository {
251251
*/
252252
async saveJob<DATA = unknown | void>(job: Job<DATA>): Promise<Job<DATA>> {
253253
try {
254-
log('attempting to save a job into Agenda instance');
254+
log('attempting to save a job');
255255

256256
// Grab information needed to save job but that we don't want to persist in MongoDB
257257
const id = job.attrs._id;

src/JobProcessingQueue.ts

+2-1
Original file line numberDiff line numberDiff line change
@@ -36,9 +36,10 @@ export class JobProcessingQueue {
3636
* @param {Job} job job to add to queue
3737
* @returns {undefined}
3838
*/
39+
/*
3940
push(job: Job): void {
4041
this._queue.push(job);
41-
}
42+
} */
4243

4344
remove(job: Job): void {
4445
let removeJobIndex = this._queue.indexOf(job);

src/JobProcessor.ts

+61-59
Original file line numberDiff line numberDiff line change
@@ -363,86 +363,88 @@ export class JobProcessor {
363363
* handledJobs keeps list of already processed jobs
364364
* @returns {undefined}
365365
*/
366-
private async jobProcessing(handledJobs: IJobParameters['_id'][] = []) {
366+
private jobProcessing(handledJobs: IJobParameters['_id'][] = []) {
367367
// Ensure we have jobs
368368
if (this.jobQueue.length === 0) {
369369
return;
370370
}
371371

372372
this.localQueueProcessing += 1;
373373

374-
const now = new Date();
375-
376-
// Check if there is any job that is not blocked by concurrency
377-
const job = this.jobQueue.returnNextConcurrencyFreeJob(this.jobStatus, handledJobs);
374+
try {
375+
const now = new Date();
378376

379-
if (!job) {
380-
log.extend('jobProcessing')('[%s:%s] there is no job to process');
381-
return;
382-
}
377+
// Check if there is any job that is not blocked by concurrency
378+
const job = this.jobQueue.returnNextConcurrencyFreeJob(this.jobStatus, handledJobs);
383379

384-
log.extend('jobProcessing')(
385-
'[%s:%s] there is a job to process (priority = %d)',
386-
job.attrs.name,
387-
job.attrs._id,
388-
job.attrs.priority
389-
);
390-
391-
this.jobQueue.remove(job);
380+
if (!job) {
381+
log.extend('jobProcessing')('[%s:%s] there is no job to process');
382+
return;
383+
}
392384

393-
// If the 'nextRunAt' time is older than the current time, run the job
394-
// Otherwise, setTimeout that gets called at the time of 'nextRunAt'
395-
if (job.attrs.nextRunAt <= now) {
396385
log.extend('jobProcessing')(
397-
'[%s:%s] nextRunAt is in the past, run the job immediately',
386+
'[%s:%s] there is a job to process (priority = %d)',
398387
job.attrs.name,
399-
job.attrs._id
388+
job.attrs._id,
389+
job.attrs.priority
400390
);
401-
this.runOrRetry(job);
402-
} else {
403-
const runIn = job.attrs.nextRunAt.getTime() - now.getTime();
404-
if (runIn > this.processEvery) {
405-
// this job is not in the near future, remove it (it will be picked up later)
406-
log.extend('runOrRetry')(
407-
'[%s:%s] job is too far away, freeing it up',
391+
392+
this.jobQueue.remove(job);
393+
394+
// If the 'nextRunAt' time is older than the current time, run the job
395+
// Otherwise, setTimeout that gets called at the time of 'nextRunAt'
396+
if (job.attrs.nextRunAt <= now) {
397+
log.extend('jobProcessing')(
398+
'[%s:%s] nextRunAt is in the past, run the job immediately',
408399
job.attrs.name,
409400
job.attrs._id
410401
);
411-
let lockedJobIndex = this.lockedJobs.indexOf(job);
412-
if (lockedJobIndex === -1) {
413-
// lookup by id
414-
lockedJobIndex = this.lockedJobs.findIndex(
415-
j => j.attrs._id?.toString() === job.attrs._id?.toString()
402+
this.runOrRetry(job);
403+
} else {
404+
const runIn = job.attrs.nextRunAt.getTime() - now.getTime();
405+
if (runIn > this.processEvery) {
406+
// this job is not in the near future, remove it (it will be picked up later)
407+
log.extend('runOrRetry')(
408+
'[%s:%s] job is too far away, freeing it up',
409+
job.attrs.name,
410+
job.attrs._id
416411
);
417-
}
418-
if (lockedJobIndex === -1) {
419-
throw new Error(`cannot find job ${job.attrs._id} in locked jobs queue?`);
420-
}
412+
let lockedJobIndex = this.lockedJobs.indexOf(job);
413+
if (lockedJobIndex === -1) {
414+
// lookup by id
415+
lockedJobIndex = this.lockedJobs.findIndex(
416+
j => j.attrs._id?.toString() === job.attrs._id?.toString()
417+
);
418+
}
419+
if (lockedJobIndex === -1) {
420+
throw new Error(`cannot find job ${job.attrs._id} in locked jobs queue?`);
421+
}
421422

422-
this.lockedJobs.splice(lockedJobIndex, 1);
423-
this.updateStatus(job.attrs.name, 'locked', -1);
424-
} else {
425-
log.extend('jobProcessing')(
426-
'[%s:%s] nextRunAt is in the future, calling setTimeout(%d)',
427-
job.attrs.name,
428-
job.attrs._id,
429-
runIn
430-
);
431-
// re add to queue (puts it at the ned of the queue)
432-
this.jobQueue.push(job);
433-
setTimeout(() => {
434-
this.jobProcessing();
435-
}, runIn);
423+
this.lockedJobs.splice(lockedJobIndex, 1);
424+
this.updateStatus(job.attrs.name, 'locked', -1);
425+
} else {
426+
log.extend('jobProcessing')(
427+
'[%s:%s] nextRunAt is in the future, calling setTimeout(%d)',
428+
job.attrs.name,
429+
job.attrs._id,
430+
runIn
431+
);
432+
// re add to queue (puts it at the right position in the queue)
433+
this.jobQueue.insert(job);
434+
setTimeout(() => {
435+
this.jobProcessing();
436+
}, runIn);
437+
}
436438
}
437-
}
438-
439-
handledJobs.push(job.attrs._id);
440439

441-
this.localQueueProcessing -= 1;
440+
handledJobs.push(job.attrs._id);
442441

443-
if (job && this.localQueueProcessing < this.maxConcurrency) {
444-
// additionally run again and check if there are more jobs that we can process right now (as long concurrency not reached)
445-
setImmediate(() => this.jobProcessing(handledJobs));
442+
if (job && this.localQueueProcessing < this.maxConcurrency) {
443+
// additionally run again and check if there are more jobs that we can process right now (as long concurrency not reached)
444+
setImmediate(() => this.jobProcessing(handledJobs));
445+
}
446+
} finally {
447+
this.localQueueProcessing -= 1;
446448
}
447449
}
448450

0 commit comments

Comments
 (0)