@@ -77,6 +77,7 @@ export class JobProcessor {
77
77
) {
78
78
log ( 'creating interval to call processJobs every [%dms]' , processEvery ) ;
79
79
this . processInterval = setInterval ( ( ) => this . process ( ) , processEvery ) ;
80
+ this . process ( ) ;
80
81
}
81
82
82
83
stop ( ) : Job [ ] {
@@ -345,34 +346,33 @@ export class JobProcessor {
345
346
// Check if there is any job that is not blocked by concurrency
346
347
const job = this . jobQueue . returnNextConcurrencyFreeJob ( this . jobStatus ) ;
347
348
348
- if ( job ) {
349
+ if ( ! job ) {
350
+ log . extend ( 'jobProcessing' ) ( '[%s:%s] there is no job to process' ) ;
351
+ return ;
352
+ }
353
+
354
+ log . extend ( 'jobProcessing' ) ( '[%s:%s] there is a job to process' , job . attrs . name , job . attrs . _id ) ;
355
+
356
+ // If the 'nextRunAt' time is older than the current time, run the job
357
+ // Otherwise, setTimeout that gets called at the time of 'nextRunAt'
358
+ if ( job . attrs . nextRunAt <= now ) {
349
359
log . extend ( 'jobProcessing' ) (
350
- '[%s:%s] there is a job to process ' ,
360
+ '[%s:%s] nextRunAt is in the past, run the job immediately ' ,
351
361
job . attrs . name ,
352
362
job . attrs . _id
353
363
) ;
354
-
355
- // If the 'nextRunAt' time is older than the current time, run the job
356
- // Otherwise, setTimeout that gets called at the time of 'nextRunAt'
357
- if ( job . attrs . nextRunAt <= now ) {
358
- log . extend ( 'jobProcessing' ) (
359
- '[%s:%s] nextRunAt is in the past, run the job immediately' ,
360
- job . attrs . name ,
361
- job . attrs . _id
362
- ) ;
363
- this . runOrRetry ( job ) ;
364
- } else {
365
- const runIn = job . attrs . nextRunAt . getTime ( ) - now . getTime ( ) ;
366
- log . extend ( 'jobProcessing' ) (
367
- '[%s:%s] nextRunAt is in the future, calling setTimeout(%d)' ,
368
- job . attrs . name ,
369
- job . attrs . _id ,
370
- runIn
371
- ) ;
372
- setTimeout ( ( ) => {
373
- this . jobProcessing ( ) ;
374
- } , runIn ) ;
375
- }
364
+ this . runOrRetry ( job ) ;
365
+ } else {
366
+ const runIn = job . attrs . nextRunAt . getTime ( ) - now . getTime ( ) ;
367
+ log . extend ( 'jobProcessing' ) (
368
+ '[%s:%s] nextRunAt is in the future, calling setTimeout(%d)' ,
369
+ job . attrs . name ,
370
+ job . attrs . _id ,
371
+ runIn
372
+ ) ;
373
+ setTimeout ( ( ) => {
374
+ this . jobProcessing ( ) ;
375
+ } , runIn ) ;
376
376
}
377
377
}
378
378
@@ -437,6 +437,11 @@ export class JobProcessor {
437
437
log . extend ( 'runOrRetry' ) ( '[%s:%s] processing job' , job . attrs . name , job . attrs . _id ) ;
438
438
// CALL THE ACTUAL METHOD TO PROCESS THE JOB!!!
439
439
await job . run ( ) ;
440
+ log . extend ( 'runOrRetry' ) (
441
+ '[%s:%s] processing job successfull' ,
442
+ job . attrs . name ,
443
+ job . attrs . _id
444
+ ) ;
440
445
441
446
// Job isn't in running jobs so throw an error
442
447
if ( ! this . runningJobs . includes ( job ) ) {
@@ -449,6 +454,12 @@ export class JobProcessor {
449
454
) ;
450
455
}
451
456
} catch ( err ) {
457
+ log . extend ( 'runOrRetry' ) (
458
+ '[%s:%s] processing job failed' ,
459
+ job . attrs . name ,
460
+ job . attrs . _id ,
461
+ err
462
+ ) ;
452
463
job . agenda . emit ( 'error' , err ) ;
453
464
} finally {
454
465
// Remove the job from the running queue
0 commit comments