Skip to content

Commit a39c809

Browse files
committed
fix: check if job is still alive
this fixes a bug where jobs got just stuck in the running queue and never got released
1 parent eb08c17 commit a39c809

File tree

3 files changed

+118
-29
lines changed

3 files changed

+118
-29
lines changed

src/Job.ts

+12
Original file line numberDiff line numberDiff line change
@@ -180,6 +180,18 @@ export class Job<DATA = any | void> {
180180
return this.agenda.cancel({ _id: this.attrs._id });
181181
}
182182

183+
isDead(): boolean {
184+
const definition = this.agenda.definitions[this.attrs.name];
185+
const lockDeadline = new Date(Date.now() - definition.lockLifetime);
186+
187+
// This means a job has "expired", as in it has not been "touched" within the lockoutTime
188+
// Remove from local lock
189+
if (this.attrs.lockedAt && this.attrs.lockedAt < lockDeadline) {
190+
return true;
191+
}
192+
return false;
193+
}
194+
183195
async touch(progress?: number): Promise<void> {
184196
this.attrs.lockedAt = new Date();
185197
this.attrs.progress = progress;

src/JobProcessor.ts

+30-8
Original file line numberDiff line numberDiff line change
@@ -256,7 +256,9 @@ export class JobProcessor {
256256
definition: IJobDefinition
257257
): Promise<Job | undefined> {
258258
const lockDeadline = new Date(Date.now().valueOf() - definition.lockLifetime);
259-
log.extend('findAndLockNextJob')('findAndLockNextJob(%s, [Function])', jobName);
259+
log.extend('findAndLockNextJob')(
260+
`looking for lockable jobs for ${jobName} (lock dead line = ${lockDeadline})`
261+
);
260262

261263
// Find ONE and ONLY ONE job and set the 'lockedAt' time so that job begins to be processed
262264
const result = await this.agenda.db.getNextJobToRun(jobName, this.nextScanAt, lockDeadline);
@@ -325,6 +327,8 @@ export class JobProcessor {
325327
this.enqueueJob(job);
326328
await this.jobQueueFilling(name);
327329
// this.jobProcessing();
330+
} else {
331+
log.extend('jobQueueFilling')('Cannot lock job [%s]', name);
328332
}
329333
} catch (error) {
330334
log.extend('jobQueueFilling')('[%s] job lock failed while filling queue', name, error);
@@ -400,13 +404,9 @@ export class JobProcessor {
400404
(!jobDefinition.concurrency || !status || status.running < jobDefinition.concurrency) &&
401405
this.runningJobs.length < this.maxConcurrency
402406
) {
403-
// Get the deadline of when the job is not supposed to go past for locking
404-
const lockDeadline = new Date(Date.now() - jobDefinition.lockLifetime);
405-
406-
// This means a job has "expired", as in it has not been "touched" within the lockoutTime
407-
// Remove from local lock
408407
// NOTE: Shouldn't we update the 'lockedAt' value in MongoDB so it can be picked up on restart?
409-
if (job.attrs.lockedAt && job.attrs.lockedAt < lockDeadline) {
408+
// -> not needed, as the timeout has been reached, and it will be picked up anyways again
409+
if (job.isDead()) {
410410
log.extend('runOrRetry')(
411411
'[%s:%s] job lock has expired, freeing it up',
412412
job.attrs.name,
@@ -435,8 +435,30 @@ export class JobProcessor {
435435

436436
try {
437437
log.extend('runOrRetry')('[%s:%s] processing job', job.attrs.name, job.attrs._id);
438+
439+
// check if the job is still alive
440+
const checkIfJobIsDead = () => {
441+
// check every processInterval
442+
return new Promise((resolve, reject) =>
443+
setTimeout(() => {
444+
if (job.isDead()) {
445+
reject(
446+
new Error(
447+
`execution of '${job.attrs.name}' canceled, execution took more than ${
448+
this.agenda.definitions[job.attrs.name].lockLifetime
449+
}ms. Call touch() for long running jobs to keep them alive.`
450+
)
451+
);
452+
return;
453+
}
454+
resolve(checkIfJobIsDead());
455+
}, this.processEvery)
456+
);
457+
};
458+
438459
// CALL THE ACTUAL METHOD TO PROCESS THE JOB!!!
439-
await job.run();
460+
await Promise.race([job.run(), checkIfJobIsDead()]);
461+
440462
log.extend('runOrRetry')(
441463
'[%s:%s] processing job successfull',
442464
job.attrs.name,

test/jobprocessor.test.ts

+76-21
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
import * as expect from 'expect.js';
1+
import { expect } from 'chai';
22

33
import { Db } from 'mongodb';
44
import { Agenda } from '../src';
@@ -15,7 +15,7 @@ const clearJobs = async () => {
1515
}
1616
};
1717

18-
describe('Agenda', function () {
18+
describe('JobProcessor', function () {
1919
// this.timeout(1000000);
2020

2121
beforeEach(async () => {
@@ -48,35 +48,90 @@ describe('Agenda', function () {
4848
await clearJobs();
4949
});
5050

51-
describe('configuration methods', () => {
52-
it('ensure new jobs are always filling up running queue', async () => {
53-
let shortOneFinished = false;
51+
it('ensure new jobs are always filling up running queue', async () => {
52+
let shortOneFinished = false;
5453

55-
agenda.define('test long', async () => {
54+
agenda.define('test long', async () => {
55+
await new Promise(resolve => setTimeout(resolve, 1000));
56+
});
57+
agenda.define('test short', async () => {
58+
shortOneFinished = true;
59+
await new Promise(resolve => setTimeout(resolve, 5));
60+
});
61+
62+
await agenda.start();
63+
64+
// queue up long ones
65+
for (let i = 0; i < 100; i++) {
66+
agenda.now('test long');
67+
}
68+
69+
await new Promise(resolve => setTimeout(resolve, 1000));
70+
71+
// queue more short ones (they should complete first!)
72+
for (let j = 0; j < 100; j++) {
73+
agenda.now('test short');
74+
}
75+
76+
await new Promise(resolve => setTimeout(resolve, 1000));
77+
78+
expect(shortOneFinished).to.be.true;
79+
});
80+
81+
it('ensure slow jobs time out', async () => {
82+
agenda.define(
83+
'test long',
84+
async () => {
5685
await new Promise(resolve => setTimeout(resolve, 1000));
86+
},
87+
{ lockLifetime: 500 }
88+
);
89+
90+
await agenda.start();
91+
92+
// queue up long ones
93+
agenda.now('test long');
94+
95+
const promiseResult = await new Promise(resolve => {
96+
agenda.on('error', err => {
97+
resolve(err);
5798
});
58-
agenda.define('test short', async () => {
59-
shortOneFinished = true;
60-
await new Promise(resolve => setTimeout(resolve, 5));
99+
100+
agenda.on('success', () => {
101+
resolve();
61102
});
103+
});
62104

63-
await agenda.start();
105+
expect(promiseResult).to.be.an('error');
106+
});
64107

65-
// queue up long ones
66-
for (let i = 0; i < 100; i++) {
67-
agenda.now('test long');
68-
}
108+
it('ensure slow jobs do not time out when calling touch', async () => {
109+
agenda.define(
110+
'test long',
111+
async job => {
112+
for (let i = 0; i < 10; i++) {
113+
await new Promise(resolve => setTimeout(resolve, 100));
114+
await job.touch();
115+
}
116+
},
117+
{ lockLifetime: 500 }
118+
);
69119

70-
await new Promise(resolve => setTimeout(resolve, 1000));
120+
await agenda.start();
71121

72-
// queue more short ones (they should complete first!)
73-
for (let j = 0; j < 100; j++) {
74-
agenda.now('test short');
75-
}
122+
// queue up long ones
123+
agenda.now('test long');
76124

77-
await new Promise(resolve => setTimeout(resolve, 1000));
125+
const promiseResult = await new Promise(resolve => {
126+
agenda.on('error', err => {
127+
resolve(err);
128+
});
78129

79-
expect(shortOneFinished).to.be(true);
130+
agenda.on('success', () => {
131+
resolve();
132+
});
80133
});
134+
135+
expect(promiseResult).to.not.be.an('error');
81136
});
82137
});

0 commit comments

Comments
 (0)