Skip to content

Commit

Permalink
feat(core): Add DB-based buffer storage support to DefaultJobQueuePlugin
Browse files Browse the repository at this point in the history
Relates to #1137
  • Loading branch information
michaelbromley committed Oct 7, 2021
1 parent 6f4a89f commit f26ad4b
Show file tree
Hide file tree
Showing 6 changed files with 116 additions and 28 deletions.
1 change: 0 additions & 1 deletion packages/core/src/job-queue/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ export * from './job-buffer/in-memory-job-buffer-storage-strategy';
export * from './job-buffer/job-buffer';
export * from './job-buffer/job-buffer';
export * from './job-buffer/job-buffer-storage-strategy';
export * from './job-buffer/sql-job-buffer-storage-strategy';
export * from './job';
export * from './job-queue';
export * from './job-queue.service';
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,9 @@ import { BackoffStrategy } from '../../job-queue/polling-job-queue-strategy';
import { PluginCommonModule } from '../plugin-common.module';
import { VendurePlugin } from '../vendure-plugin';

import { JobRecordBuffer } from './job-record-buffer.entity';
import { JobRecord } from './job-record.entity';
import { SqlJobBufferStorageStrategy } from './sql-job-buffer-storage-strategy';
import { SqlJobQueueStrategy } from './sql-job-queue-strategy';

/**
Expand All @@ -21,6 +23,17 @@ export interface DefaultJobQueueOptions {
concurrency?: number;
backoffStrategy?: BackoffStrategy;
setRetries?: (queueName: string, job: Job) => number;
/**
* @description
* If set to `true`, the database will be used to store buffered jobs. This is
* recommended for production.
*
* When enabled, a new `JobRecordBuffer` database entity will be defined which will
* require a migration when first enabling this option.
*
* @since 1.3.0
*/
useDatabaseForBuffer?: boolean;
}

/**
Expand Down Expand Up @@ -115,7 +128,10 @@ export interface DefaultJobQueueOptions {
*/
@VendurePlugin({
imports: [PluginCommonModule],
entities: [JobRecord],
entities: () =>
DefaultJobQueuePlugin.options.useDatabaseForBuffer === true
? [JobRecord, JobRecordBuffer]
: [JobRecord],
configuration: config => {
const { pollInterval, concurrency, backoffStrategy, setRetries } =
DefaultJobQueuePlugin.options ?? {};
Expand All @@ -125,12 +141,15 @@ export interface DefaultJobQueueOptions {
backoffStrategy,
setRetries,
});
if (DefaultJobQueuePlugin.options.useDatabaseForBuffer === true) {
config.jobQueueOptions.jobBufferStorageStrategy = new SqlJobBufferStorageStrategy();
}
return config;
},
})
export class DefaultJobQueuePlugin {
/** @internal */
static options: DefaultJobQueueOptions;
static options: DefaultJobQueueOptions = {};

static init(options: DefaultJobQueueOptions): Type<DefaultJobQueuePlugin> {
DefaultJobQueuePlugin.options = options;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
import { DeepPartial } from '@vendure/common/lib/shared-types';
import { Column, Entity } from 'typeorm';

import { VendureEntity } from '../../entity/base/base.entity';
import { JobConfig } from '../../job-queue/types';

@Entity()
export class JobRecordBuffer extends VendureEntity {
constructor(input: DeepPartial<JobRecordBuffer>) {
super(input);
}

@Column()
bufferId: string;

@Column('simple-json')
job: JobConfig<any>;
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
import { Injector } from '../../common/injector';
import { TransactionalConnection } from '../../connection/transactional-connection';
import { Job } from '../../job-queue/job';
import { JobBufferStorageStrategy } from '../../job-queue/job-buffer/job-buffer-storage-strategy';
import { JobConfig } from '../../job-queue/types';

import { JobRecordBuffer } from './job-record-buffer.entity';

/**
* @description
* This stores the buffered jobs in the database.
*/
export class SqlJobBufferStorageStrategy implements JobBufferStorageStrategy {
private connection: TransactionalConnection;

init(injector: Injector) {
this.connection = injector.get(TransactionalConnection);
}

async add(bufferId: string, job: Job): Promise<Job> {
await this.connection.getRepository(JobRecordBuffer).save(
new JobRecordBuffer({
bufferId,
job: this.toJobConfig(job),
}),
);

return job;
}

async bufferSize(bufferIds?: string[]): Promise<{ [bufferId: string]: number }> {
const rows = await this.connection
.getRepository(JobRecordBuffer)
.createQueryBuilder('record')
.select(`COUNT(*)`, 'count')
.addSelect(`record.bufferId`, 'bufferId')
.groupBy('record.bufferId')
.getRawMany();

const result: { [bufferId: string]: number } = {};
for (const row of rows) {
result[row.bufferId] = +row.count;
}
return result;
}

async flush(bufferIds?: string[]): Promise<{ [bufferId: string]: Job[] }> {
const selectQb = this.connection.getRepository(JobRecordBuffer).createQueryBuilder('record');
if (bufferIds?.length) {
selectQb.where(`record.bufferId IN (:...bufferIds)`, { bufferIds });
}
const rows = await selectQb.getMany();
const result: { [bufferId: string]: Job[] } = {};
for (const row of rows) {
if (!result[row.bufferId]) {
result[row.bufferId] = [];
}
result[row.bufferId].push(new Job(row.job));
}
const deleteQb = this.connection.rawConnection.createQueryBuilder().delete().from(JobRecordBuffer);
if (bufferIds?.length) {
deleteQb.where(`bufferId IN (:...bufferIds)`, { bufferIds });
}
await deleteQb.execute();
return result;
}

private toJobConfig(job: Job<any>): JobConfig<any> {
return {
...job,
data: job.data,
id: job.id ?? undefined,
};
}
}
2 changes: 2 additions & 0 deletions packages/core/src/plugin/index.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
export * from './default-search-plugin/default-search-plugin';
export * from './default-job-queue-plugin/default-job-queue-plugin';
export * from './default-job-queue-plugin/job-record-buffer.entity';
export * from './default-job-queue-plugin/sql-job-buffer-storage-strategy';
export * from './vendure-plugin';
export * from './plugin-common.module';
export * from './plugin-utils';

0 comments on commit f26ad4b

Please sign in to comment.