Skip to content

Commit b13d054

Browse files
UzlopakAras Abbasi
and
Aras Abbasi
authored
fix: more typings, minor functionality changes (#2)
* add more typings and streamline functions, remoev unnused file * use IJobParameters instead of Job, fix unit test Co-authored-by: Aras Abbasi <a.abbasi@cognigy.com>
1 parent 31f9b6e commit b13d054

20 files changed

+158
-210
lines changed

src/Job.ts

+15-24
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
import * as date from 'date.js';
22
import * as debug from 'debug';
3-
import { parsePriority } from './utils/priority';
3+
import { JobPriority, parsePriority } from './utils/priority';
44
import type { Agenda } from './index';
55
import { computeFromInterval, computeFromRepeatAt } from './utils/nextRunAt';
66
import { IJobParameters } from './types/JobParameters';
@@ -46,18 +46,13 @@ export class Job<DATA = any | void> {
4646
data: any;
4747
}
4848
) {
49-
// Remove special args
50-
51-
// Process args
52-
args.priority = parsePriority(args.priority) || 0;
53-
5449
// Set attrs to args
5550
this.attrs = {
5651
...args,
5752
// Set defaults if undefined
58-
priority: args.priority || 0,
53+
priority: parsePriority(args.priority),
5954
nextRunAt: args.nextRunAt || new Date(),
60-
type: args.type // || 'once'
55+
type: args.type
6156
};
6257
}
6358

@@ -86,7 +81,7 @@ export class Job<DATA = any | void> {
8681
repeatEvery(
8782
interval: string | number,
8883
options: { timezone?: string; skipImmediate?: boolean } = {}
89-
) {
84+
): this {
9085
this.attrs.repeatInterval = interval;
9186
this.attrs.repeatTimezone = options.timezone;
9287
if (options.skipImmediate) {
@@ -101,28 +96,28 @@ export class Job<DATA = any | void> {
10196
return this;
10297
}
10398

104-
repeatAt(time) {
99+
repeatAt(time: string): this {
105100
this.attrs.repeatAt = time;
106101
return this;
107102
}
108103

109-
disable() {
104+
disable(): this {
110105
this.attrs.disabled = true;
111106
return this;
112107
}
113108

114-
enable() {
109+
enable(): this {
115110
this.attrs.disabled = false;
116111
return this;
117112
}
118113

119-
unique(unique: IJobParameters['unique'], opts?: IJobParameters['uniqueOpts']) {
114+
unique(unique: IJobParameters['unique'], opts?: IJobParameters['uniqueOpts']): this {
120115
this.attrs.unique = unique;
121116
this.attrs.uniqueOpts = opts;
122117
return this;
123118
}
124119

125-
schedule(time) {
120+
schedule(time: string | Date): this {
126121
const d = new Date(time);
127122

128123
this.attrs.nextRunAt = Number.isNaN(d.getTime()) ? date(time) : d;
@@ -135,17 +130,13 @@ export class Job<DATA = any | void> {
135130
* @param {String} priority priority of when job should be queued
136131
* @returns {exports} instance of Job
137132
*/
138-
priority(priority: 'lowest' | 'low' | 'normal' | 'high' | 'highest' | number) {
133+
priority(priority: JobPriority): this {
139134
this.attrs.priority = parsePriority(priority);
140135
return this;
141136
}
142137

143-
fail(reason: Error | string) {
144-
if (reason instanceof Error) {
145-
reason = reason.message;
146-
}
147-
148-
this.attrs.failReason = reason;
138+
fail(reason: Error | string): this {
139+
this.attrs.failReason = reason instanceof Error ? reason.message : reason;
149140
this.attrs.failCount = (this.attrs.failCount || 0) + 1;
150141
const now = new Date();
151142
this.attrs.failedAt = now;
@@ -159,7 +150,7 @@ export class Job<DATA = any | void> {
159150
return this;
160151
}
161152

162-
isRunning() {
153+
isRunning(): boolean {
163154
if (!this.attrs.lastRunAt) {
164155
return false;
165156
}
@@ -178,7 +169,7 @@ export class Job<DATA = any | void> {
178169
return false;
179170
}
180171

181-
save() {
172+
async save(): Promise<Job> {
182173
return this.agenda.db.saveJob(this);
183174
}
184175

@@ -237,7 +228,7 @@ export class Job<DATA = any | void> {
237228
return this;
238229
}
239230

240-
async run() {
231+
async run(): Promise<void> {
241232
const definition = this.agenda.definitions[this.attrs.name];
242233

243234
this.attrs.lastRunAt = new Date();

src/JobDbRepository.ts

+23-14
Original file line numberDiff line numberDiff line change
@@ -6,10 +6,11 @@ import {
66
MongoClient,
77
MongoClientOptions,
88
UpdateQuery,
9-
ObjectId
9+
ObjectId,
10+
SortOptionObject
1011
} from 'mongodb';
1112
import type { Job } from './Job';
12-
import { hasMongoProtocol } from './utils/mongodb';
13+
import { hasMongoProtocol } from './utils/hasMongoProtocol';
1314
import type { Agenda } from './index';
1415
import { IDatabaseOptions, IDbConfig, IMongoOptions } from './types/DbOptions';
1516
import { IJobParameters } from './types/JobParameters';
@@ -49,26 +50,32 @@ export class JobDbRepository {
4950
return !!connectOptions.db?.address;
5051
}
5152

52-
async getJobs(query: any, sort: any = {}, limit = 0, skip = 0) {
53+
async getJobs(
54+
query: FilterQuery<IJobParameters>,
55+
sort: SortOptionObject<IJobParameters> = {},
56+
limit = 0,
57+
skip = 0
58+
): Promise<IJobParameters[]> {
5359
return this.collection.find(query).sort(sort).limit(limit).skip(skip).toArray();
5460
}
5561

56-
async removeJobs(query: any) {
57-
return this.collection.deleteMany(query);
62+
async removeJobs(query: FilterQuery<IJobParameters>): Promise<number> {
63+
const result = await this.collection.deleteMany(query);
64+
return result.result.n || 0;
5865
}
5966

6067
async getQueueSize(): Promise<number> {
6168
return this.collection.countDocuments({ nextRunAt: { $lt: new Date() } });
6269
}
6370

64-
async unlockJob(job) {
65-
await this.collection.updateOne({ _id: job._id }, { $unset: { lockedAt: true } });
71+
async unlockJob(job: Job): Promise<void> {
72+
await this.collection.updateOne({ _id: job.attrs._id }, { $unset: { lockedAt: true } });
6673
}
6774

6875
/**
6976
* Internal method to unlock jobs so that they can be re-run
7077
*/
71-
async unlockJobs(jobIds: ObjectId[]) {
78+
async unlockJobs(jobIds: ObjectId[]): Promise<void> {
7279
await this.collection.updateMany({ _id: { $in: jobIds } }, { $unset: { lockedAt: true } });
7380
}
7481

@@ -143,7 +150,7 @@ export class JobDbRepository {
143150
return result.value;
144151
}
145152

146-
async connect() {
153+
async connect(): Promise<void> {
147154
const db = await this.createConnection();
148155
log('successful connection to MongoDB', db.options);
149156

@@ -184,8 +191,10 @@ export class JobDbRepository {
184191
}
185192

186193
private async database(url: string, options?: MongoClientOptions) {
187-
if (!hasMongoProtocol(url)) {
188-
url = `mongodb://${url}`;
194+
let connectionString = url;
195+
196+
if (!hasMongoProtocol(connectionString)) {
197+
connectionString = `mongodb://${connectionString}`;
189198
}
190199

191200
const client = await MongoClient.connect(url, {
@@ -197,7 +206,7 @@ export class JobDbRepository {
197206
return client.db();
198207
}
199208

200-
private processDbResult(job: Job, res: IJobParameters) {
209+
private processDbResult(job: Job, res: IJobParameters): Job {
201210
log(
202211
'processDbResult() called with success, checking whether to process job immediately or not'
203212
);
@@ -245,7 +254,7 @@ export class JobDbRepository {
245254
// Grab current time and set default query options for MongoDB
246255
const now = new Date();
247256
const protect: Partial<IJobParameters> = {};
248-
let update: UpdateQuery<any> = { $set: props };
257+
let update: UpdateQuery<IJobParameters> = { $set: props };
249258
log('current time stored as %s', now.toISOString());
250259

251260
// If the job already had an ID, then update the properties of the job
@@ -310,7 +319,7 @@ export class JobDbRepository {
310319

311320
if (job.attrs.unique) {
312321
// If we want the job to be unique, then we can upsert based on the 'unique' query object that was passed in
313-
const query: FilterQuery<any> = job.attrs.unique;
322+
const query = job.attrs.unique;
314323
query.name = props.name;
315324
if (job.attrs.uniqueOpts?.insertOnly) {
316325
update = { $setOnInsert: props };

src/JobProcessingQueue.ts

+5-5
Original file line numberDiff line numberDiff line change
@@ -15,15 +15,15 @@ export class JobProcessingQueue {
1515
this._queue = [];
1616
}
1717

18-
get length() {
18+
get length(): number {
1919
return this._queue.length;
2020
}
2121

2222
/**
2323
* Pops and returns last queue element (next job to be processed) without checking concurrency.
2424
* @returns {Job} Next Job to be processed
2525
*/
26-
pop() {
26+
pop(): Job | undefined {
2727
return this._queue.pop();
2828
}
2929

@@ -32,11 +32,11 @@ export class JobProcessingQueue {
3232
* @param {Job} job job to add to queue
3333
* @returns {undefined}
3434
*/
35-
push(job: Job) {
35+
push(job: Job): void {
3636
this._queue.push(job);
3737
}
3838

39-
remove(job: Job) {
39+
remove(job: Job): void {
4040
let removeJobIndex = this._queue.indexOf(job);
4141
if (removeJobIndex === -1) {
4242
// lookup by id
@@ -58,7 +58,7 @@ export class JobProcessingQueue {
5858
* @param {Job} job job to add to queue
5959
* @returns {undefined}
6060
*/
61-
insert(job: Job) {
61+
insert(job: Job): void {
6262
const matchIndex = this._queue.findIndex(element => {
6363
if (
6464
element.attrs.nextRunAt &&

src/JobProcessor.ts

+11-15
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
import * as debug from 'debug';
22
import { Job } from './Job';
3+
import { IAgendaStatus } from './types/AgendaStatus';
34
import { IJobDefinition } from './types/JobDefinition';
45
import { JobProcessingQueue } from './JobProcessingQueue';
56
import type { Agenda } from './index';
@@ -15,23 +16,18 @@ const log = debug('agenda:jobProcessor');
1516
*/
1617
export class JobProcessor {
1718
private jobStatus: {
18-
[name: string]:
19-
| {
20-
running: number;
21-
locked: number;
22-
}
23-
| undefined;
19+
[name: string]: { running: number; locked: number } | undefined;
2420
} = {};
2521

2622
private localQueueProcessing = 0;
2723

28-
async getStatus(fullDetails = false) {
24+
async getStatus(fullDetails = false): Promise<IAgendaStatus> {
2925
// eslint-disable-next-line @typescript-eslint/no-var-requires,global-require
3026
const { version } = require('../package.json');
3127

3228
return {
3329
version,
34-
queueName: this.agenda.name,
30+
queueName: this.agenda.attrs.name,
3531
totalQueueSizeDB: await this.agenda.db.getQueueSize(),
3632
config: {
3733
totalLockLimit: this.totalLockLimit,
@@ -42,7 +38,7 @@ export class JobProcessor {
4238
Object.keys(this.jobStatus).map(job => [
4339
job,
4440
{
45-
...this.jobStatus[job],
41+
...this.jobStatus[job]!,
4642
config: this.agenda.definitions[job]
4743
}
4844
])
@@ -95,7 +91,7 @@ export class JobProcessor {
9591
}
9692

9793
// processJobs
98-
async process(extraJob?: Job) {
94+
async process(extraJob?: Job): Promise<void> {
9995
// Make sure an interval has actually been set
10096
// Prevents race condition with 'Agenda.stop' and already scheduled run
10197
if (!this.isRunning) {
@@ -138,7 +134,7 @@ export class JobProcessor {
138134
* @param {String} name name of job to check if we should lock or not
139135
* @returns {boolean} whether or not you should lock job
140136
*/
141-
shouldLock(name) {
137+
shouldLock(name: string): boolean {
142138
const jobDefinition = this.agenda.definitions[name];
143139
let shouldLock = true;
144140
// global lock limit
@@ -168,7 +164,7 @@ export class JobProcessor {
168164
* @param {boolean} inFront puts the job in front of queue if true
169165
* @returns {undefined}
170166
*/
171-
private enqueueJob(job: Job) {
167+
private enqueueJob(job: Job): void {
172168
this.jobQueue.insert(job);
173169
}
174170

@@ -178,7 +174,7 @@ export class JobProcessor {
178174
* We do this because sometimes jobs are scheduled but will be run before the next process time
179175
* @returns {undefined}
180176
*/
181-
async lockOnTheFly() {
177+
async lockOnTheFly(): Promise<void> {
182178
// Already running this? Return
183179
if (this.isLockingOnTheFly) {
184180
log.extend('lockOnTheFly')('already running, returning');
@@ -347,7 +343,7 @@ export class JobProcessor {
347343
return;
348344
}
349345

350-
this.localQueueProcessing++;
346+
this.localQueueProcessing += 1;
351347

352348
let jobEnqueued = false;
353349
try {
@@ -400,7 +396,7 @@ export class JobProcessor {
400396
);
401397
} */
402398
} finally {
403-
this.localQueueProcessing--;
399+
this.localQueueProcessing -= 1;
404400
}
405401
}
406402

0 commit comments

Comments
 (0)