Skip to content

Commit 1a8bb31

Browse files
committed
fix: ensure jobs are filled up till concurrency reached
1 parent 39950f3 commit 1a8bb31

File tree

2 files changed

+51
-9
lines changed

2 files changed

+51
-9
lines changed

src/JobProcessor.ts

+12-9
Original file line numberDiff line numberDiff line change
@@ -154,10 +154,8 @@ export class JobProcessor {
154154
'job [%s] lock status: shouldLock = %s',
155155
name,
156156
shouldLock,
157-
status?.locked,
158-
this.jobQueue.length,
159-
this.lockedJobs.length,
160-
this.totalLockLimit
157+
`${status?.locked} >= ${jobDefinition?.lockLimit}`,
158+
`${this.lockedJobs.length} >= ${this.totalLockLimit}`
161159
);
162160
return shouldLock;
163161
}
@@ -240,6 +238,8 @@ export class JobProcessor {
240238
this.lockedJobs.push(jobToEnqueue);
241239
this.enqueueJob(jobToEnqueue);
242240
this.jobProcessing();
241+
} else {
242+
log.extend('lockOnTheFly')('cannot lock job [%s] on the fly', job.attrs.name);
243243
}
244244
}
245245
} finally {
@@ -378,6 +378,9 @@ export class JobProcessor {
378378
this.jobProcessing();
379379
}, runIn);
380380
}
381+
382+
// additionally run again and check if there are more jobs that we can process right now (as long concurrency not reached)
383+
setImmediate(() => this.jobProcessing());
381384
}
382385

383386
/**
@@ -437,8 +440,8 @@ export class JobProcessor {
437440
log.extend('runOrRetry')('[%s:%s] processing job', job.attrs.name, job.attrs._id);
438441

439442
// check if the job is still alive
440-
const checkIfJobIsDead = () => {
441-
// check every processInterval
443+
const checkIfJobIsStillAlive = () => {
444+
// check every "this.agenda.definitions[job.attrs.name].lockLifetime"" (or at mininum every processEvery)
442445
return new Promise((resolve, reject) =>
443446
setTimeout(() => {
444447
if (job.isDead()) {
@@ -456,13 +459,13 @@ export class JobProcessor {
456459
resolve();
457460
return;
458461
}
459-
resolve(checkIfJobIsDead());
460-
}, this.processEvery)
462+
resolve(checkIfJobIsStillAlive());
463+
}, Math.max(this.processEvery, this.agenda.definitions[job.attrs.name].lockLifetime))
461464
);
462465
};
463466

464467
// CALL THE ACTUAL METHOD TO PROCESS THE JOB!!!
465-
await Promise.race([job.run(), checkIfJobIsDead()]);
468+
await Promise.race([job.run(), checkIfJobIsStillAlive()]);
466469

467470
log.extend('runOrRetry')(
468471
'[%s:%s] processing job successfull',

test/jobprocessor.test.ts

+39
Original file line numberDiff line numberDiff line change
@@ -134,4 +134,43 @@ describe('JobProcessor', function () {
134134

135135
expect(promiseResult).to.not.be.an('error');
136136
});
137+
138+
it('ensure concurrency is filled up', async () => {
139+
agenda.maxConcurrency(300);
140+
agenda.lockLimit(150);
141+
agenda.defaultLockLimit(20);
142+
agenda.defaultConcurrency(10);
143+
144+
for (let jobI = 0; jobI < 10; jobI++) {
145+
agenda.define(
146+
`test job ${jobI}`,
147+
async job => {
148+
await new Promise(resolve => setTimeout(resolve, 5000));
149+
},
150+
{ lockLifetime: 10000 }
151+
);
152+
}
153+
154+
// queue up jobs
155+
for (let jobI = 0; jobI < 10; jobI++) {
156+
for (let jobJ = 0; jobJ < 25; jobJ++) {
157+
agenda.now(`test job ${jobI}`);
158+
}
159+
}
160+
161+
await agenda.start();
162+
163+
const allJobsStarted = new Promise(async resolve => {
164+
let runningJobs = 0;
165+
do {
166+
runningJobs = (await agenda.getRunningStats()).runningJobs as number;
167+
await new Promise(wait => setTimeout(wait, 50));
168+
} while (runningJobs < 100);
169+
resolve('all started');
170+
});
171+
172+
expect(
173+
await Promise.race([allJobsStarted, new Promise(resolve => setTimeout(resolve, 1500))])
174+
).to.equal('all started');
175+
});
137176
});

0 commit comments

Comments
 (0)