Skip to content

Commit e301511

Browse files
committed
fix: check if job has expired before we run it
it can happen that the process is so long in the queue, that it gets expired in the meantime. if it is expired, it is put again in the queue, and then it could happen that a job that should run with a concurrency of e.g. 1 runs in parallel twice.
1 parent 544e948 commit e301511

File tree

3 files changed

+63
-54
lines changed

3 files changed

+63
-54
lines changed

src/Job.ts

+4
Original file line numberDiff line numberDiff line change
@@ -256,6 +256,10 @@ export class Job<DATA = unknown | void> {
256256
await this.fetchStatus();
257257
}
258258

259+
return this.isExpired();
260+
}
261+
262+
isExpired(): boolean {
259263
const definition = this.agenda.definitions[this.attrs.name];
260264

261265
const lockDeadline = new Date(Date.now() - definition.lockLifetime);

src/JobDbRepository.ts

+1-1
Original file line numberDiff line numberDiff line change
@@ -73,7 +73,7 @@ export class JobDbRepository {
7373
}
7474

7575
async unlockJob(job: Job): Promise<void> {
76-
// only unlock jobs which are not currently procsesed (nextRunAT is not null)
76+
// only unlock jobs which are not currently processed (nextRunAT is not null)
7777
await this.collection.updateOne(
7878
{ _id: job.attrs._id, nextRunAt: { $ne: null } },
7979
{ $unset: { lockedAt: true } }

src/JobProcessor.ts

+58-53
Original file line numberDiff line numberDiff line change
@@ -334,7 +334,7 @@ export class JobProcessor {
334334
if (job) {
335335
if (job.attrs.name !== name) {
336336
throw new Error(
337-
`got different job name: ${job.attrs.name} (acutal) !== ${name} (expected)`
337+
`got different job name: ${job.attrs.name} (actual) !== ${name} (expected)`
338338
);
339339
}
340340

@@ -393,66 +393,71 @@ export class JobProcessor {
393393
return;
394394
}
395395

396-
log.extend('jobProcessing')(
397-
'[%s:%s] there is a job to process (priority = %d)',
398-
job.attrs.name,
399-
job.attrs._id,
400-
job.attrs.priority,
401-
job.gotTimerToExecute
402-
);
403-
404396
this.jobQueue.remove(job);
405397

406-
// If the 'nextRunAt' time is older than the current time, run the job
407-
// Otherwise, setTimeout that gets called at the time of 'nextRunAt'
408-
if (job.attrs.nextRunAt <= now) {
398+
if (!job.isExpired()) {
399+
// check if job has expired (and therefore probably got picked up again by another queue in the meantime)
400+
// before it even has started to run
401+
409402
log.extend('jobProcessing')(
410-
'[%s:%s] nextRunAt is in the past, run the job immediately',
403+
'[%s:%s] there is a job to process (priority = %d)',
411404
job.attrs.name,
412-
job.attrs._id
405+
job.attrs._id,
406+
job.attrs.priority,
407+
job.gotTimerToExecute
413408
);
414-
this.runOrRetry(job);
415-
} else {
416-
const runIn = job.attrs.nextRunAt.getTime() - now.getTime();
417-
if (runIn > this.processEvery) {
418-
// this job is not in the near future, remove it (it will be picked up later)
419-
log.extend('runOrRetry')(
420-
'[%s:%s] job is too far away, freeing it up',
421-
job.attrs.name,
422-
job.attrs._id
423-
);
424-
let lockedJobIndex = this.lockedJobs.indexOf(job);
425-
if (lockedJobIndex === -1) {
426-
// lookup by id
427-
lockedJobIndex = this.lockedJobs.findIndex(
428-
j => j.attrs._id?.toString() === job.attrs._id?.toString()
429-
);
430-
}
431-
if (lockedJobIndex === -1) {
432-
throw new Error(`cannot find job ${job.attrs._id} in locked jobs queue?`);
433-
}
434409

435-
this.lockedJobs.splice(lockedJobIndex, 1);
436-
this.updateStatus(job.attrs.name, 'locked', -1);
437-
} else {
410+
// If the 'nextRunAt' time is older than the current time, run the job
411+
// Otherwise, setTimeout that gets called at the time of 'nextRunAt'
412+
if (job.attrs.nextRunAt <= now) {
438413
log.extend('jobProcessing')(
439-
'[%s:%s] nextRunAt is in the future, calling setTimeout(%d)',
414+
'[%s:%s] nextRunAt is in the past, run the job immediately',
440415
job.attrs.name,
441-
job.attrs._id,
442-
runIn
416+
job.attrs._id
443417
);
444-
// re add to queue (puts it at the right position in the queue)
445-
this.jobQueue.insert(job);
446-
// ensure every job gets a timer to run at the near future time (but also ensure this time is set only once)
447-
if (!job.gotTimerToExecute) {
448-
job.gotTimerToExecute = true;
449-
setTimeout(
450-
() => {
451-
this.jobProcessing();
452-
},
453-
runIn > MAX_SAFE_32BIT_INTEGER ? MAX_SAFE_32BIT_INTEGER : runIn
454-
); // check if runIn is higher than unsined 32 bit int, if so, use this time to recheck,
455-
// because setTimeout will run in an overflow otherwise and reprocesses immediately
418+
this.runOrRetry(job);
419+
} else {
420+
const runIn = job.attrs.nextRunAt.getTime() - now.getTime();
421+
if (runIn > this.processEvery) {
422+
// this job is not in the near future, remove it (it will be picked up later)
423+
log.extend('runOrRetry')(
424+
'[%s:%s] job is too far away, freeing it up',
425+
job.attrs.name,
426+
job.attrs._id
427+
);
428+
let lockedJobIndex = this.lockedJobs.indexOf(job);
429+
if (lockedJobIndex === -1) {
430+
// lookup by id
431+
lockedJobIndex = this.lockedJobs.findIndex(
432+
j => j.attrs._id?.toString() === job.attrs._id?.toString()
433+
);
434+
}
435+
if (lockedJobIndex === -1) {
436+
throw new Error(`cannot find job ${job.attrs._id} in locked jobs queue?`);
437+
}
438+
439+
this.lockedJobs.splice(lockedJobIndex, 1);
440+
this.updateStatus(job.attrs.name, 'locked', -1);
441+
} else {
442+
log.extend('jobProcessing')(
443+
'[%s:%s] nextRunAt is in the future, calling setTimeout(%d)',
444+
job.attrs.name,
445+
job.attrs._id,
446+
runIn
447+
);
448+
// re add to queue (puts it at the right position in the queue)
449+
this.jobQueue.insert(job);
450+
// ensure every job gets a timer to run at the near future time (but also ensure this time is set only once)
451+
if (!job.gotTimerToExecute) {
452+
job.gotTimerToExecute = true;
453+
setTimeout(
454+
() => {
455+
this.jobProcessing();
456+
},
457+
runIn > MAX_SAFE_32BIT_INTEGER ? MAX_SAFE_32BIT_INTEGER : runIn
458+
); // check if runIn is higher than unsined 32 bit int, if so, use this time to recheck,
459+
// because setTimeout will run in an overflow otherwise and reprocesses immediately
460+
}
456461
}
457462
}
458463
}
@@ -508,7 +513,7 @@ export class JobProcessor {
508513
return;
509514
}
510515

511-
if (await job.isDead()) {
516+
if (job.isExpired()) {
512517
reject(
513518
new Error(
514519
`execution of '${job.attrs.name}' canceled, execution took more than ${

0 commit comments

Comments
 (0)