Skip to content

Commit e7e082a

Browse files
committed
idempotency was added + refactoring
1 parent f911507 commit e7e082a

File tree

1 file changed

+31
-4
lines changed

1 file changed

+31
-4
lines changed
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
import { Controller } from '@nestjs/common';
1+
import { Controller, Inject } from '@nestjs/common';
22
import { Ctx, EventPattern, KafkaContext } from '@nestjs/microservices';
33
import { config } from '../../infrastructure/config/config';
44
import { getMessageByContext } from '../../infrastructure/shared/utils/getMessageByContext';
@@ -9,23 +9,50 @@ import {
99
} from './dto/input/createSalesProductEventPayload.dto';
1010
import { SalesACLService } from './salesACL.service';
1111
import { InventoryItemCreateService } from '../application/inventoryItem/services/inventoryItemCreate.service';
12+
import { TRANSACTION_SERVICE } from '../../infrastructure/transaction/shared/constants';
13+
import { ITransactionService } from '../../infrastructure/transaction/ITransaction.service';
14+
import { CreateInventoryItemOutputDto } from '../application/inventoryItem/dto/output/createInventoryItemOutput.dto';
15+
import { ITransaction } from '../../infrastructure/transaction/shared/types/ITransaction';
16+
import { Message } from '../../infrastructure/shared/utils/extractMessage';
17+
import {
18+
ISalesProductIdempMessagesService,
19+
} from '../application/services/interfaces/ISalesProductIdempMessagesService';
20+
import { SALES_PRODUCT_IDEMP_MESSAGES_SERVICE } from '../../infrastructure/idempotency/constants';
1221

1322
@Controller()
1423
export class SalesMessagesController {
1524
constructor(
1625
private readonly salesACLService: SalesACLService,
1726
private readonly inventoryItemCreateService: InventoryItemCreateService,
27+
@Inject(TRANSACTION_SERVICE)
28+
private readonly transactionService: ITransactionService,
29+
@Inject(SALES_PRODUCT_IDEMP_MESSAGES_SERVICE)
30+
private readonly idempService: ISalesProductIdempMessagesService,
1831
) {}
1932

33+
// TODO: handle errors
2034
@EventPattern(config.kafka.kafkaProductsTopic)
2135
async handleSalesProductEvent(@Ctx() context: KafkaContext): Promise<void> {
2236
const message = getMessageByContext(context);
2337
const plainPayload = JSON.parse(message.value.payload);
38+
const { messageId } = message.headers;
39+
40+
await this.transactionService.withTransaction('READ COMMITTED', async transaction => {
41+
await this.idempService.assertMessageIsNotAlreadyProcessed(messageId, transaction);
42+
await this.idempService.insertMessage(messageId, transaction);
43+
await this.handle(message, plainPayload, transaction);
44+
});
45+
}
2446

47+
private async handle(message: Message, plainPayload: any, transaction: ITransaction): Promise<void> {
2548
if (message.headers.messageName === MessageNamesEnum.ProductCreated) {
26-
const event = await validatePayload(CreateSalesProductEventPayload, plainPayload);
27-
const dto = await this.salesACLService.mapCreateSalesProductPayloadToCreateInventoryItem(event);
28-
await this.inventoryItemCreateService.runTransaction(dto);
49+
await this.handleProductCreated(plainPayload, transaction);
2950
}
3051
}
52+
53+
private async handleProductCreated(plain: any, transaction: ITransaction): Promise<CreateInventoryItemOutputDto> {
54+
const event = await validatePayload(CreateSalesProductEventPayload, plain);
55+
const dto = await this.salesACLService.mapCreateSalesProductPayloadToCreateInventoryItem(event);
56+
return this.inventoryItemCreateService.create(dto, transaction);
57+
}
3158
}

0 commit comments

Comments
 (0)