Skip to content

Commit 291f16e

Browse files
committed
fix: only unlock jobs which have a nextRunAt jobs on shutdown
1 parent fff42d5 commit 291f16e

File tree

2 files changed

+25
-16
lines changed

2 files changed

+25
-16
lines changed

src/JobDbRepository.ts

+11-13
Original file line numberDiff line numberDiff line change
@@ -73,7 +73,11 @@ export class JobDbRepository {
7373
}
7474

7575
async unlockJob(job: Job): Promise<void> {
76-
await this.collection.updateOne({ _id: job.attrs._id }, { $unset: { lockedAt: true } });
76+
// only unlock jobs which are not currently procsesed (nextRunAT is not null)
77+
await this.collection.updateOne(
78+
{ _id: job.attrs._id, nextRunAt: { $ne: null } },
79+
{ $unset: { lockedAt: true } }
80+
);
7781
}
7882

7983
/**
@@ -119,21 +123,15 @@ export class JobDbRepository {
119123
const JOB_PROCESS_WHERE_QUERY: FilterQuery<
120124
Omit<IJobParameters, 'lockedAt'> & { lockedAt?: Date | null }
121125
> = {
122-
$and: [
126+
name: jobName,
127+
disabled: { $ne: true },
128+
$or: [
123129
{
124-
name: jobName,
125-
disabled: { $ne: true }
130+
lockedAt: { $eq: null },
131+
nextRunAt: { $lte: nextScanAt }
126132
},
127133
{
128-
$or: [
129-
{
130-
lockedAt: { $eq: null },
131-
nextRunAt: { $lte: nextScanAt }
132-
},
133-
{
134-
lockedAt: { $lte: lockDeadline }
135-
}
136-
]
134+
lockedAt: { $lte: lockDeadline }
137135
}
138136
]
139137
};

test/agenda.test.ts

+14-3
Original file line numberDiff line numberDiff line change
@@ -670,31 +670,42 @@ describe('Agenda', () => {
670670
const rejectionsHandler = error => unhandledRejections.push(error);
671671
process.on('unhandledRejection', rejectionsHandler);
672672

673+
/*
674+
let j0processes = 0;
675+
globalAgenda.define('j0', (_job, done) => {
676+
j0processes += 1;
677+
done();
678+
}); */
679+
673680
let j1processes = 0;
674-
globalAgenda.define('j1', (job, done) => {
681+
globalAgenda.define('j1', (_job, done) => {
675682
j1processes += 1;
676683
done();
677684
});
678685

679686
let j2processes = 0;
680-
globalAgenda.define('j2', (job, done) => {
687+
globalAgenda.define('j2', (_job, done) => {
681688
j2processes += 1;
682689
done();
683690
});
684691

685692
let j3processes = 0;
686-
globalAgenda.define('j3', async job => {
693+
globalAgenda.define('j3', async _job => {
687694
j3processes += 1;
688695
});
689696

690697
await globalAgenda.start();
698+
699+
// await globalAgenda.every('1 seconds', 'j0');
691700
await globalAgenda.every('5 seconds', 'j1');
692701
await globalAgenda.every('10 seconds', 'j2');
693702
await globalAgenda.every('15 seconds', 'j3');
694703

695704
await delay(6000);
705+
696706
process.removeListener('unhandledRejection', rejectionsHandler);
697707

708+
// expect(j0processes).to.equal(5);
698709
expect(j1processes).to.equal(2);
699710
expect(j2processes).to.equal(1);
700711
expect(j3processes).to.equal(1);

0 commit comments

Comments
 (0)