Skip to content

Commit 0e82025

Browse files
committed
fix: not running jobs even though concurrency is not reached
1 parent a70f500 commit 0e82025

File tree

2 files changed

+85
-7
lines changed

2 files changed

+85
-7
lines changed

src/JobProcessor.ts

+3-7
Original file line numberDiff line numberDiff line change
@@ -360,7 +360,7 @@ export class JobProcessor {
360360
job.attrs.name,
361361
job.attrs._id
362362
);
363-
this.runOrRetry();
363+
this.runOrRetry(job);
364364
} else {
365365
const runIn = job.attrs.nextRunAt.getTime() - now.getTime();
366366
log.extend('jobProcessing')(
@@ -380,7 +380,7 @@ export class JobProcessor {
380380
* Internal method that tries to run a job and if it fails, retries again!
381381
* @returns {undefined}
382382
*/
383-
private async runOrRetry() {
383+
private async runOrRetry(job: Job) {
384384
if (!this.isRunning) {
385385
// const a = new Error();
386386
// console.log('STACK', a.stack);
@@ -391,11 +391,7 @@ export class JobProcessor {
391391
return;
392392
}
393393

394-
const job = this.jobQueue.pop();
395-
if (!job) {
396-
console.info('empty queue');
397-
return;
398-
}
394+
this.jobQueue.remove(job);
399395

400396
const jobDefinition = this.agenda.definitions[job.attrs.name];
401397
const status = this.jobStatus[job.attrs.name];

test/jobprocessor.test.ts

+82
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,82 @@
1+
import * as expect from 'expect.js';
2+
3+
import { Db } from 'mongodb';
4+
import { Agenda } from '../src';
5+
import { mockMongo } from './helpers/mock-mongodb';
6+
7+
// Create agenda instances
8+
let agenda: Agenda;
9+
// mongo db connection db instance
10+
let mongoDb: Db;
11+
12+
const clearJobs = async () => {
13+
if (mongoDb) {
14+
await mongoDb.collection('agendaJobs').deleteMany({});
15+
}
16+
};
17+
18+
describe('Agenda', function () {
19+
// this.timeout(1000000);
20+
21+
beforeEach(async () => {
22+
if (!mongoDb) {
23+
const mockedMongo = await mockMongo();
24+
// mongoCfg = mockedMongo.uri;
25+
mongoDb = mockedMongo.mongo.db();
26+
}
27+
28+
return new Promise(resolve => {
29+
agenda = new Agenda(
30+
{
31+
mongo: mongoDb,
32+
maxConcurrency: 4,
33+
defaultConcurrency: 1,
34+
lockLimit: 15,
35+
defaultLockLimit: 6,
36+
processEvery: '1 second'
37+
},
38+
async () => {
39+
await clearJobs();
40+
return resolve();
41+
}
42+
);
43+
});
44+
});
45+
46+
afterEach(async () => {
47+
await agenda.stop();
48+
await clearJobs();
49+
});
50+
51+
describe('configuration methods', () => {
52+
it('ensure new jobs are always filling up running queue', async () => {
53+
let shortOneFinished = false;
54+
55+
agenda.define('test long', async () => {
56+
await new Promise(resolve => setTimeout(resolve, 1000));
57+
});
58+
agenda.define('test short', async () => {
59+
shortOneFinished = true;
60+
await new Promise(resolve => setTimeout(resolve, 5));
61+
});
62+
63+
await agenda.start();
64+
65+
// queue up long ones
66+
for (let i = 0; i < 100; i++) {
67+
agenda.now('test long');
68+
}
69+
70+
await new Promise(resolve => setTimeout(resolve, 1000));
71+
72+
// queue more short ones (they should complete first!)
73+
for (let j = 0; j < 100; j++) {
74+
agenda.now('test short');
75+
}
76+
77+
await new Promise(resolve => setTimeout(resolve, 1000));
78+
79+
expect(shortOneFinished).to.be(true);
80+
});
81+
});
82+
});

0 commit comments

Comments
 (0)