Skip to content

Commit 3160f0d

Browse files
committed
fix: improve locking and ensure locks are released
1 parent a2e74a9 commit 3160f0d

File tree

2 files changed

+80
-41
lines changed

2 files changed

+80
-41
lines changed

src/JobDbRepository.ts

-6
Original file line numberDiff line numberDiff line change
@@ -182,13 +182,7 @@ export class JobDbRepository {
182182
url = `mongodb://${url}`;
183183
}
184184

185-
const reconnectOptions = {
186-
autoReconnect: true,
187-
reconnectTries: Number.MAX_SAFE_INTEGER
188-
};
189-
190185
const client = await MongoClient.connect(url, {
191-
...reconnectOptions,
192186
...options,
193187
useNewUrlParser: true,
194188
useUnifiedTopology: true

src/JobProcessor.ts

+80-35
Original file line numberDiff line numberDiff line change
@@ -164,43 +164,55 @@ export class JobProcessor {
164164
// Don't have any jobs to run? Return
165165
if (this.jobsToLock.length === 0) {
166166
log('no jobs to current lock on the fly, returning');
167-
this.isLockingOnTheFly = false;
168167
return;
169168
}
170169

171170
// Set that we are running this
172-
this.isLockingOnTheFly = true;
173-
174-
// Grab a job that needs to be locked
175-
const job = this.jobsToLock.pop();
171+
try {
172+
this.isLockingOnTheFly = true;
176173

177-
if (job) {
178-
// If locking limits have been hit, stop locking on the fly.
179-
// Jobs that were waiting to be locked will be picked up during a
180-
// future locking interval.
181-
if (!this.shouldLock(job.attrs.name)) {
182-
log('lock limit hit for: [%s]', job.attrs.name);
183-
this.jobsToLock = [];
184-
this.isLockingOnTheFly = false;
185-
return;
186-
}
174+
// Grab a job that needs to be locked
175+
const job = this.jobsToLock.pop();
187176

188-
// Lock the job in MongoDB!
189-
const resp = await this.agenda.db.lockJob(job);
177+
if (job) {
178+
// If locking limits have been hit, stop locking on the fly.
179+
// Jobs that were waiting to be locked will be picked up during a
180+
// future locking interval.
181+
if (!this.shouldLock(job.attrs.name)) {
182+
log('lock limit hit for: [%s]', job.attrs.name);
183+
this.jobsToLock = [];
184+
return;
185+
}
190186

191-
if (resp) {
192-
const jobToEnqueue = new Job(this.agenda, resp);
193-
log('found job [%s] that can be locked on the fly', jobToEnqueue.attrs.name);
194-
this.lockedJobs.push(jobToEnqueue);
195-
this.updateStatus(jobToEnqueue.attrs.name, 'locked', +1);
196-
this.enqueueJob(jobToEnqueue);
197-
this.jobProcessing();
187+
// Lock the job in MongoDB!
188+
const resp = await this.agenda.db.lockJob(job);
189+
190+
if (resp) {
191+
// Before en-queing job make sure we haven't exceed our lock limits
192+
if (!this.shouldLock(resp.name)) {
193+
log(
194+
'lock limit reached while job was locked in database. Releasing lock on [%s]',
195+
resp.name
196+
);
197+
job.attrs.lockedAt = undefined;
198+
await job.save();
199+
200+
this.jobsToLock = [];
201+
return;
202+
}
203+
const jobToEnqueue = new Job(this.agenda, resp);
204+
log('found job [%s] that can be locked on the fly', jobToEnqueue.attrs.name);
205+
this.lockedJobs.push(jobToEnqueue);
206+
this.updateStatus(jobToEnqueue.attrs.name, 'locked', +1);
207+
this.enqueueJob(jobToEnqueue);
208+
this.jobProcessing();
209+
}
198210
}
211+
} finally {
212+
// Mark lock on fly is done for now
213+
this.isLockingOnTheFly = false;
199214
}
200215

201-
// Mark lock on fly is done for now
202-
this.isLockingOnTheFly = false;
203-
204216
// Re-run in case anything is in the queue
205217
await this.lockOnTheFly();
206218
}
@@ -318,8 +330,8 @@ export class JobProcessor {
318330
*/
319331
private async runOrRetry() {
320332
if (!this.isRunning) {
321-
const a = new Error();
322-
console.log('STACK', a.stack);
333+
// const a = new Error();
334+
// console.log('STACK', a.stack);
323335
log('JobProcessor got stopped already while calling runOrRetry, returning!', this);
324336
return;
325337
}
@@ -345,7 +357,18 @@ export class JobProcessor {
345357
// NOTE: Shouldn't we update the 'lockedAt' value in MongoDB so it can be picked up on restart?
346358
if (job.attrs.lockedAt && job.attrs.lockedAt < lockDeadline) {
347359
log('[%s:%s] job lock has expired, freeing it up', job.attrs.name, job.attrs._id);
348-
this.lockedJobs.splice(this.lockedJobs.indexOf(job), 1);
360+
let lockedJobIndex = this.lockedJobs.indexOf(job);
361+
if (lockedJobIndex === -1) {
362+
// lookup by id
363+
lockedJobIndex = this.lockedJobs.findIndex(
364+
j => j.attrs._id?.toString() === job.attrs._id?.toString()
365+
);
366+
}
367+
if (lockedJobIndex === -1) {
368+
throw new Error(`cannot find job ${job.attrs._id} in locked jobs queue?`);
369+
}
370+
371+
this.lockedJobs.splice(lockedJobIndex, 1);
349372
this.updateStatus(job.attrs.name, 'locked', -1);
350373
this.jobProcessing();
351374
return;
@@ -370,16 +393,38 @@ export class JobProcessor {
370393
`callback already called - job ${job.attrs.name} already marked complete`
371394
);
372395
}
373-
396+
} catch (err) {
397+
job.agenda.emit('error', err);
398+
} finally {
374399
// Remove the job from the running queue
375-
this.runningJobs.splice(this.runningJobs.indexOf(job), 1);
400+
let runningJobIndex = this.runningJobs.indexOf(job);
401+
if (runningJobIndex === -1) {
402+
// lookup by id
403+
runningJobIndex = this.runningJobs.findIndex(
404+
j => j.attrs._id?.toString() === job.attrs._id?.toString()
405+
);
406+
}
407+
if (runningJobIndex === -1) {
408+
// eslint-disable-next-line no-unsafe-finally
409+
throw new Error(`cannot find job ${job.attrs._id} in running jobs queue?`);
410+
}
411+
this.runningJobs.splice(runningJobIndex, 1);
376412
this.updateStatus(job.attrs.name, 'running', -1);
377413

378414
// Remove the job from the locked queue
379-
this.lockedJobs.splice(this.lockedJobs.indexOf(job), 1);
415+
let lockedJobIndex = this.lockedJobs.indexOf(job);
416+
if (lockedJobIndex === -1) {
417+
// lookup by id
418+
lockedJobIndex = this.lockedJobs.findIndex(
419+
j => j.attrs._id?.toString() === job.attrs._id?.toString()
420+
);
421+
}
422+
if (lockedJobIndex === -1) {
423+
// eslint-disable-next-line no-unsafe-finally
424+
throw new Error(`cannot find job ${job.attrs._id} in locked jobs queue?`);
425+
}
426+
this.lockedJobs.splice(lockedJobIndex, 1);
380427
this.updateStatus(job.attrs.name, 'locked', -1);
381-
} catch (err) {
382-
job.agenda.emit('error', err);
383428
}
384429

385430
// Re-process jobs now that one has finished

0 commit comments

Comments
 (0)