Skip to content

Commit 8d6e137

Browse files
UzlopakAras Abbasi
and
Aras Abbasi
authored
fix: more typings (#5)
* add more typings * refactor nextRunAt and processEvery * reduce complexity of calculateProcessEvery * minor changes in interfaces Co-authored-by: Aras Abbasi <a.abbasi@cognigy.com>
1 parent 9eca3de commit 8d6e137

12 files changed

+64
-62
lines changed

src/Job.ts

+5-6
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,10 @@
11
import * as date from 'date.js';
22
import * as debug from 'debug';
3-
import { JobPriority, parsePriority } from './utils/priority';
43
import type { Agenda } from './index';
5-
import { computeFromInterval, computeFromRepeatAt } from './utils/nextRunAt';
6-
import { IJobParameters } from './types/JobParameters';
4+
import type { IJobParameters } from './types/JobParameters';
75
import type { DefinitionProcessor } from './types/JobDefinition';
6+
import { JobPriority, parsePriority } from './utils/priority';
7+
import { computeFromInterval, computeFromRepeatAt } from './utils/nextRunAt';
88

99
const log = debug('agenda:job');
1010

@@ -56,7 +56,7 @@ export class Job<DATA = unknown | void> {
5656
};
5757
}
5858

59-
toJson(): Partial<IJobParameters> {
59+
toJson(): IJobParameters {
6060
const attrs = this.attrs || {};
6161
const result = {};
6262

@@ -74,8 +74,7 @@ export class Job<DATA = unknown | void> {
7474
}
7575
});
7676

77-
// console.log('toJson', this.attrs, result);
78-
return result;
77+
return result as IJobParameters;
7978
}
8079

8180
repeatEvery(

src/JobDbRepository.ts

+29-21
Original file line numberDiff line numberDiff line change
@@ -7,13 +7,14 @@ import {
77
MongoClientOptions,
88
UpdateQuery,
99
ObjectId,
10-
SortOptionObject
10+
SortOptionObject,
11+
FindOneAndUpdateOption
1112
} from 'mongodb';
1213
import type { Job } from './Job';
13-
import { hasMongoProtocol } from './utils/hasMongoProtocol';
1414
import type { Agenda } from './index';
15-
import { IDatabaseOptions, IDbConfig, IMongoOptions } from './types/DbOptions';
16-
import { IJobParameters } from './types/JobParameters';
15+
import type { IDatabaseOptions, IDbConfig, IMongoOptions } from './types/DbOptions';
16+
import type { IJobParameters } from './types/JobParameters';
17+
import { hasMongoProtocol } from './utils/hasMongoProtocol';
1718

1819
const log = debug('agenda:db');
1920

@@ -81,7 +82,7 @@ export class JobDbRepository {
8182

8283
async lockJob(job: Job): Promise<IJobParameters | undefined> {
8384
// Query to run against collection to see if we need to lock it
84-
const criteria = {
85+
const criteria: FilterQuery<Omit<IJobParameters, 'lockedAt'> & { lockedAt?: Date | null }> = {
8586
_id: job.attrs._id,
8687
name: job.attrs.name,
8788
lockedAt: null,
@@ -90,11 +91,16 @@ export class JobDbRepository {
9091
};
9192

9293
// Update / options for the MongoDB query
93-
const update = { $set: { lockedAt: new Date() } };
94-
const options = { returnOriginal: false };
94+
const update: UpdateQuery<IJobParameters> = { $set: { lockedAt: new Date() } };
95+
const options: FindOneAndUpdateOption<IJobParameters> = { returnOriginal: false };
9596

9697
// Lock the job in MongoDB!
97-
const resp = await this.collection.findOneAndUpdate(criteria, update, options);
98+
const resp = await this.collection.findOneAndUpdate(
99+
criteria as FilterQuery<IJobParameters>,
100+
update,
101+
options
102+
);
103+
98104
return resp?.value;
99105
}
100106

@@ -104,11 +110,13 @@ export class JobDbRepository {
104110
lockDeadline: Date,
105111
now: Date = new Date()
106112
): Promise<IJobParameters | undefined> {
107-
// /**
108-
// * Query used to find job to run
109-
// * @type {{$and: [*]}}
110-
// */
111-
const JOB_PROCESS_WHERE_QUERY = {
113+
/**
114+
* Query used to find job to run
115+
* @type {{$and: [*]}}
116+
*/
117+
const JOB_PROCESS_WHERE_QUERY: FilterQuery<
118+
Omit<IJobParameters, 'lockedAt'> & { lockedAt?: Date | null }
119+
> = {
112120
$and: [
113121
{
114122
name: jobName,
@@ -132,13 +140,16 @@ export class JobDbRepository {
132140
* Query used to set a job as locked
133141
* @type {{$set: {lockedAt: Date}}}
134142
*/
135-
const JOB_PROCESS_SET_QUERY = { $set: { lockedAt: now } };
143+
const JOB_PROCESS_SET_QUERY: UpdateQuery<IJobParameters> = { $set: { lockedAt: now } };
136144

137145
/**
138146
* Query used to affect what gets returned
139147
* @type {{returnOriginal: boolean, sort: object}}
140148
*/
141-
const JOB_RETURN_QUERY = { returnOriginal: false, sort: this.connectOptions.sort };
149+
const JOB_RETURN_QUERY: FindOneAndUpdateOption<IJobParameters> = {
150+
returnOriginal: false,
151+
sort: this.connectOptions.sort
152+
};
142153

143154
// Find ONE and ONLY ONE job and set the 'lockedAt' time so that job begins to be processed
144155
const result = await this.collection.findOneAndUpdate(
@@ -208,7 +219,7 @@ export class JobDbRepository {
208219

209220
private processDbResult<DATA = unknown | void>(
210221
job: Job<DATA>,
211-
res: IJobParameters<DATA>
222+
res?: IJobParameters<DATA>
212223
): Job<DATA> {
213224
log(
214225
'processDbResult() called with success, checking whether to process job immediately or not'
@@ -243,7 +254,6 @@ export class JobDbRepository {
243254

244255
// Grab information needed to save job but that we don't want to persist in MongoDB
245256
const id = job.attrs._id;
246-
// const { unique, uniqueOpts } = job.attrs;
247257

248258
// Store job as JSON and remove props we don't want to store from object
249259
// _id, unique, uniqueOpts
@@ -284,7 +294,7 @@ export class JobDbRepository {
284294
if (props.nextRunAt && props.nextRunAt <= now) {
285295
log('job has a scheduled nextRunAt time, protecting that field from upsert');
286296
protect.nextRunAt = props.nextRunAt;
287-
delete props.nextRunAt;
297+
delete (props as Partial<IJobParameters>).nextRunAt;
288298
}
289299

290300
// If we have things to protect, set them in MongoDB using $setOnInsert
@@ -309,7 +319,7 @@ export class JobDbRepository {
309319
update,
310320
{
311321
upsert: true,
312-
returnOriginal: false // same as new: true -> returns the final document
322+
returnOriginal: false
313323
}
314324
);
315325
log(
@@ -330,8 +340,6 @@ export class JobDbRepository {
330340
update = { $setOnInsert: props };
331341
}
332342

333-
// console.log('update', query, update, uniqueOpts);
334-
335343
// Use the 'unique' query object to find an existing job or create a new one
336344
log('calling findOneAndUpdate() with unique object as query: \n%O', query);
337345
const result = await this.collection.findOneAndUpdate(query, update, {

src/JobProcessingQueue.ts

+1-1
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
// eslint-disable-next-line prettier/prettier
22
import type { Job } from './Job';
3-
import { IJobParameters } from './types/JobParameters';
3+
import type { IJobParameters } from './types/JobParameters';
44
import type { Agenda } from './index';
55
/**
66
* @class

src/JobProcessor.ts

+4-4
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,10 @@
11
import * as debug from 'debug';
2-
import { Job } from './Job';
3-
import { IAgendaJobStatus, IAgendaStatus } from './types/AgendaStatus';
4-
import { IJobDefinition } from './types/JobDefinition';
5-
import { JobProcessingQueue } from './JobProcessingQueue';
2+
import type { IAgendaJobStatus, IAgendaStatus } from './types/AgendaStatus';
3+
import type { IJobDefinition } from './types/JobDefinition';
64
import type { Agenda } from './index';
75
import type { IJobParameters } from './types/JobParameters';
6+
import { Job } from './Job';
7+
import { JobProcessingQueue } from './JobProcessingQueue';
88

99
const log = debug('agenda:jobProcessor');
1010

src/index.ts

+8-8
Original file line numberDiff line numberDiff line change
@@ -1,18 +1,18 @@
11
import { EventEmitter } from 'events';
22
import * as debug from 'debug';
33

4-
import { Db, FilterQuery, MongoClientOptions, SortOptionObject } from 'mongodb';
5-
import { Job } from './Job';
6-
import { JobProcessor } from './JobProcessor';
4+
import type { Db, FilterQuery, MongoClientOptions, SortOptionObject } from 'mongodb';
75
import type { IJobDefinition } from './types/JobDefinition';
8-
import { IAgendaConfig } from './types/AgendaConfig';
6+
import type { IAgendaConfig } from './types/AgendaConfig';
7+
import type { IDatabaseOptions, IDbConfig, IMongoOptions } from './types/DbOptions';
8+
import type { IAgendaStatus } from './types/AgendaStatus';
9+
import type { IJobParameters } from './types/JobParameters';
10+
import { Job } from './Job';
911
import { JobDbRepository } from './JobDbRepository';
10-
import { IDatabaseOptions, IDbConfig, IMongoOptions } from './types/DbOptions';
11-
import { filterUndefined } from './utils/filterUndefined';
1212
import { JobPriority, parsePriority } from './utils/priority';
13-
import { IAgendaStatus } from './types/AgendaStatus';
14-
import { IJobParameters } from './types/JobParameters';
13+
import { JobProcessor } from './JobProcessor';
1514
import { calculateProcessEvery } from './utils/processEvery';
15+
import { filterUndefined } from './utils/filterUndefined';
1616

1717
const log = debug('agenda');
1818

src/types/AgendaStatus.ts

+1-1
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
import { Job } from '../Job';
1+
import type { Job } from '../Job';
22

33
export interface IAgendaJobStatus {
44
[name: string]: { running: number; locked: number };

src/types/DbOptions.ts

+2-2
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
1-
import { Db, MongoClientOptions, SortOptionObject } from 'mongodb';
2-
import { IJobParameters } from './JobParameters';
1+
import type { Db, MongoClientOptions, SortOptionObject } from 'mongodb';
2+
import type { IJobParameters } from './JobParameters';
33

44
export interface IDatabaseOptions {
55
db: {

src/types/JobDefinition.ts

+3-6
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
import { Job } from '../Job';
1+
import type { Job } from '../Job';
22

33
export interface IJobDefinition<DATA = unknown> {
44
/** max number of locked jobs of this kind */
@@ -7,13 +7,10 @@ export interface IJobDefinition<DATA = unknown> {
77
lockLifetime: number;
88
/** Higher priority jobs will run first. */
99
priority?: number;
10-
/** how many jobs of this kind can run in parallel/simultanously */
10+
/** how many jobs of this kind can run in parallel/simultanously per Agenda instance */
1111
concurrency?: number;
1212

13-
// running: number;
14-
// locked: number;
15-
16-
fn: DefinitionProcessor<DATA, void | ((err?) => void)>;
13+
fn: DefinitionProcessor<DATA, void | ((err?: Error) => void)>;
1714
}
1815

1916
export type DefinitionProcessor<DATA, CB> = (

src/types/JobParameters.ts

+1-2
Original file line numberDiff line numberDiff line change
@@ -8,11 +8,10 @@ export interface IJobParameters<DATA = unknown | void> {
88
priority: number;
99
nextRunAt: Date | null;
1010
/**
11-
* // once: the job is just queued in the database --> this does not really exists, it's just fallback
1211
* normal: job is queued and will be processed (regular case when the user adds a new job)
1312
* single: job with this name is only queued once, if there is an exisitn gentry in the database, the job is just updated, but not newly inserted (this is used for .every())
1413
*/
15-
type: /* 'once' | */ 'normal' | 'single';
14+
type: 'normal' | 'single';
1615

1716
lockedAt?: Date;
1817
lastFinishedAt?: Date;

src/utils/priority.ts

+7-8
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,3 @@
1-
/**
2-
* Internal method to turn priority into a number
3-
* @name Job#priority
4-
* @function
5-
* @param {String|Number} priority string to parse into number
6-
* @returns {Number} priority that was parsed
7-
*/
8-
91
export type JobPriority = number | keyof typeof priorityMap;
102

113
const priorityMap = {
@@ -16,6 +8,13 @@ const priorityMap = {
168
highest: 20
179
};
1810

11+
/**
12+
* Internal method to turn priority into a number
13+
* @name Job#priority
14+
* @function
15+
* @param {String|Number} priority string to parse into number
16+
* @returns {Number} priority that was parsed
17+
*/
1918
export function parsePriority(priority?: JobPriority): number {
2019
if (typeof priority === 'number') {
2120
return priority;

test/job.test.ts

+1-1
Original file line numberDiff line numberDiff line change
@@ -1150,7 +1150,7 @@ describe('Job', () => {
11501150
]);
11511151
expect(results).to.eql([10, 0, -10]);
11521152
} catch (err) {
1153-
console.log('stats', JSON.stringify(await agenda.getRunningStats(), undefined, 3));
1153+
// console.log('stats', JSON.stringify(await agenda.getRunningStats(), undefined, 3));
11541154
}
11551155
});
11561156

test/retry.test.ts

+2-2
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@ const clearJobs = async (): Promise<void> => {
1616
};
1717

1818
const jobType = 'do work';
19-
const jobProcessor = () => {};
19+
const jobProcessor = () => { };
2020

2121
describe('Retry', () => {
2222
beforeEach(async () => {
@@ -79,5 +79,5 @@ describe('Retry', () => {
7979

8080
await agenda.start();
8181
await successPromise;
82-
});
82+
}).timeout(100000);
8383
});

0 commit comments

Comments
 (0)