Skip to content

Commit d2f3b20

Browse files
committed
fix: try to solve the locking issue
1 parent 68f13aa commit d2f3b20

File tree

1 file changed

+37
-16
lines changed

1 file changed

+37
-16
lines changed

src/JobProcessor.ts

+37-16
Original file line numberDiff line numberDiff line change
@@ -22,9 +22,16 @@ export class JobProcessor {
2222
} = {};
2323

2424
async getStatus(fullDetails = false) {
25+
// eslint-disable-next-line @typescript-eslint/no-var-requires,global-require
26+
const { version } = require('../package.json');
27+
2528
return {
29+
version,
2630
queueSize: await this.agenda.db.getQueueSize(),
27-
jobStatus: this.jobStatus,
31+
jobStatus: Object.keys(this.jobStatus).map(job => ({
32+
...this.jobStatus[job],
33+
config: this.agenda.definitions[job]
34+
})),
2835
runningJobs: !fullDetails ? this.runningJobs.length : this.runningJobs,
2936
lockedJobs: !fullDetails ? this.lockedJobs.length : this.lockedJobs,
3037
jobsToLock: !fullDetails ? this.jobsToLock.length : this.jobsToLock,
@@ -117,13 +124,13 @@ export class JobProcessor {
117124
const jobDefinition = this.agenda.definitions[name];
118125
let shouldLock = true;
119126
// global lock limit
120-
if (this.totalLockLimit && this.totalLockLimit <= this.lockedJobs.length) {
127+
if (this.totalLockLimit && this.lockedJobs.length >= this.totalLockLimit) {
121128
shouldLock = false;
122129
}
123130

124131
// job specific lock limit
125132
const status = this.jobStatus[name];
126-
if (jobDefinition.lockLimit && status && jobDefinition.lockLimit <= status.locked) {
133+
if (jobDefinition.lockLimit && status && status.locked >= jobDefinition.lockLimit) {
127134
shouldLock = false;
128135
}
129136

@@ -188,22 +195,30 @@ export class JobProcessor {
188195
const resp = await this.agenda.db.lockJob(job);
189196

190197
if (resp) {
198+
if (job.attrs.name !== resp.name) {
199+
throw new Error(
200+
`got different job name: ${resp.name} (actual) !== ${job.attrs.name} (expected)`
201+
);
202+
}
203+
204+
const jobToEnqueue = new Job(this.agenda, resp);
205+
191206
// Before en-queing job make sure we haven't exceed our lock limits
192-
if (!this.shouldLock(resp.name)) {
207+
if (!this.shouldLock(jobToEnqueue.attrs.name)) {
193208
log(
194209
'lock limit reached while job was locked in database. Releasing lock on [%s]',
195-
resp.name
210+
jobToEnqueue.attrs.name
196211
);
197-
job.attrs.lockedAt = undefined;
198-
await job.save();
212+
jobToEnqueue.attrs.lockedAt = undefined;
213+
await jobToEnqueue.save();
199214

200215
this.jobsToLock = [];
201216
return;
202217
}
203-
const jobToEnqueue = new Job(this.agenda, resp);
218+
204219
log('found job [%s] that can be locked on the fly', jobToEnqueue.attrs.name);
205-
this.lockedJobs.push(jobToEnqueue);
206220
this.updateStatus(jobToEnqueue.attrs.name, 'locked', +1);
221+
this.lockedJobs.push(jobToEnqueue);
207222
this.enqueueJob(jobToEnqueue);
208223
this.jobProcessing();
209224
}
@@ -227,13 +242,12 @@ export class JobProcessor {
227242
// Find ONE and ONLY ONE job and set the 'lockedAt' time so that job begins to be processed
228243
const result = await this.agenda.db.getNextJobToRun(jobName, this.nextScanAt, lockDeadline);
229244

230-
let job;
231245
if (result) {
232246
log('found a job available to lock, creating a new job on Agenda with id [%s]', result._id);
233-
job = new Job(this.agenda, result);
247+
return new Job(this.agenda, result);
234248
}
235249

236-
return job;
250+
return undefined;
237251
}
238252

239253
/**
@@ -255,12 +269,19 @@ export class JobProcessor {
255269
// For this job name, find the next job to run and lock it!
256270
try {
257271
const job = await this.findAndLockNextJob(name, this.agenda.definitions[name]);
272+
258273
// Still have the job?
259274
// 1. Add it to lock list
260275
// 2. Add count of locked jobs
261276
// 3. Queue the job to actually be run now that it is locked
262277
// 4. Recursively run this same method we are in to check for more available jobs of same type!
263278
if (job) {
279+
if (job.attrs.name !== name) {
280+
throw new Error(
281+
`got different job name: ${job.attrs.name} (acutal) !== ${name} (expected)`
282+
);
283+
}
284+
264285
// Before en-queing job make sure we haven't exceed our lock limits
265286
if (!this.shouldLock(name)) {
266287
log('lock limit reached before job was returned. Releasing lock on [%s]', name);
@@ -270,8 +291,8 @@ export class JobProcessor {
270291
}
271292

272293
log('[%s:%s] job locked while filling queue', name, job.attrs._id);
294+
this.updateStatus(name, 'locked', +1);
273295
this.lockedJobs.push(job);
274-
this.updateStatus(job.attrs.name, 'locked', +1);
275296

276297
this.enqueueJob(job);
277298
await this.jobQueueFilling(name);
@@ -447,8 +468,8 @@ export class JobProcessor {
447468
running: 0
448469
};
449470
}
450-
if ((this.jobStatus[name]![key] > 0 && number === -1) || number === 1) {
451-
this.jobStatus[name]![key] += number;
452-
}
471+
// if ((this.jobStatus[name]![key] > 0 && number === -1) || number === 1) {
472+
this.jobStatus[name]![key] += number;
473+
// }
453474
}
454475
}

0 commit comments

Comments
 (0)