From 9f6cd5a11f63fbbc4ddea3dc70903a85170c96ae Mon Sep 17 00:00:00 2001 From: mairh Date: Fri, 30 Jun 2023 14:58:28 +0300 Subject: [PATCH] Implement user creation/updates subscription and enhance n8n interact --- .../n8n-webhook-manager.interface.ts | 4 +- .../n8n-webhook-manager.service.ts | 14 ++-- src/modules/n8n/n8n.module.ts | 4 +- .../dto/subscriptions/user-created.dto.ts | 9 +++ .../dto/subscriptions/user-updated.dto.ts | 12 +++ src/modules/user/user.constants.ts | 2 + src/modules/user/user.module.ts | 4 +- src/modules/user/user.n8n-subscription.ts | 81 ++++++++++++++----- src/modules/user/user.resolver.ts | 44 +++++++++- 9 files changed, 135 insertions(+), 39 deletions(-) create mode 100644 src/modules/user/dto/subscriptions/user-created.dto.ts create mode 100644 src/modules/user/dto/subscriptions/user-updated.dto.ts create mode 100644 src/modules/user/user.constants.ts diff --git a/src/modules/n8n-webhook-manager/n8n-webhook-manager.interface.ts b/src/modules/n8n-webhook-manager/n8n-webhook-manager.interface.ts index d18f720e..a9b42cf5 100644 --- a/src/modules/n8n-webhook-manager/n8n-webhook-manager.interface.ts +++ b/src/modules/n8n-webhook-manager/n8n-webhook-manager.interface.ts @@ -1,5 +1,5 @@ -type Unsubscribe = () => Promise; +type Unsubscribe = () => Promise | void; export interface N8nWebhookSubscriptionService { - subscribe(): Unsubscribe; + subscribe(): Promise | Unsubscribe; } diff --git a/src/modules/n8n-webhook-manager/n8n-webhook-manager.service.ts b/src/modules/n8n-webhook-manager/n8n-webhook-manager.service.ts index 6df3bdab..b3cfcd94 100644 --- a/src/modules/n8n-webhook-manager/n8n-webhook-manager.service.ts +++ b/src/modules/n8n-webhook-manager/n8n-webhook-manager.service.ts @@ -10,15 +10,13 @@ export class N8nWebhookManagerService implements N8nWebhookSubscriptionService { this.n8nSubscriptions = n8nSubscriptions; } - subscribe() { - const unsubscribes = this.n8nSubscriptions.map((subscriber) => - subscriber.subscribe(), + async subscribe() { + const unsubscribes = await Promise.all( + this.n8nSubscriptions.map((subscriber) => subscriber.subscribe()), ); - return async () => - unsubscribes.reduce( - (promise, unsubscribe) => promise.then(unsubscribe), - Promise.resolve(), - ); + return async () => { + await Promise.all(unsubscribes.map((unsubscribe) => unsubscribe())); + }; } } diff --git a/src/modules/n8n/n8n.module.ts b/src/modules/n8n/n8n.module.ts index 49027eca..020fe4fb 100644 --- a/src/modules/n8n/n8n.module.ts +++ b/src/modules/n8n/n8n.module.ts @@ -8,9 +8,9 @@ import { N8nWebhookManagerService } from '../n8n-webhook-manager/n8n-webhook-man export class N8nModule { constructor(private n8nWebhookManagerService: N8nWebhookManagerService) {} - onApplicationBootstrap() { + async onApplicationBootstrap() { N8nWebhookManagerModule.unsubscribe = - this.n8nWebhookManagerService.subscribe(); + await this.n8nWebhookManagerService.subscribe(); } async beforeApplicationShutdown() { diff --git a/src/modules/user/dto/subscriptions/user-created.dto.ts b/src/modules/user/dto/subscriptions/user-created.dto.ts new file mode 100644 index 00000000..3628c606 --- /dev/null +++ b/src/modules/user/dto/subscriptions/user-created.dto.ts @@ -0,0 +1,9 @@ +import { Field, ObjectType } from '@nestjs/graphql'; + +import { UserDto } from '../user/user.dto'; + +@ObjectType() +export class UserCreatedDto { + @Field(() => UserDto) + user: UserDto; +} diff --git a/src/modules/user/dto/subscriptions/user-updated.dto.ts b/src/modules/user/dto/subscriptions/user-updated.dto.ts new file mode 100644 index 00000000..8b261141 --- /dev/null +++ b/src/modules/user/dto/subscriptions/user-updated.dto.ts @@ -0,0 +1,12 @@ +import { Field, ObjectType } from '@nestjs/graphql'; + +import { UserDto } from '../user/user.dto'; + +@ObjectType() +export class UserUpdatedDto { + @Field(() => UserDto) + user: UserDto; + + // only for server (n8n module optimization) + beforeUpdateUser: UserDto; +} diff --git a/src/modules/user/user.constants.ts b/src/modules/user/user.constants.ts new file mode 100644 index 00000000..6dad3ec4 --- /dev/null +++ b/src/modules/user/user.constants.ts @@ -0,0 +1,2 @@ +export const USER_UPDATED_SUBSCRIPTION = 'userUpdated'; +export const USER_CREATED_SUBSCRIPTION = 'userCreated'; diff --git a/src/modules/user/user.module.ts b/src/modules/user/user.module.ts index 8a5ce9ce..ae5b2230 100644 --- a/src/modules/user/user.module.ts +++ b/src/modules/user/user.module.ts @@ -4,12 +4,14 @@ import { UserService } from './user.service'; import { FirebaseModule } from '../firebase/firebase.module'; import { N8nWebhookManagerModule } from '../n8n-webhook-manager/n8n-webhook-manager.module'; import { UserN8nSubscription } from './user.n8n-subscription'; +import { GraphQLPubsubModule } from '../graphql-pubsub/graphql-pubsub.module'; @Module({ imports: [ FirebaseModule, + GraphQLPubsubModule, N8nWebhookManagerModule.forFeature({ - imports: [FirebaseModule], + imports: [GraphQLPubsubModule], subscriptions: [UserN8nSubscription], }), ], diff --git a/src/modules/user/user.n8n-subscription.ts b/src/modules/user/user.n8n-subscription.ts index 844a5c29..4c389251 100644 --- a/src/modules/user/user.n8n-subscription.ts +++ b/src/modules/user/user.n8n-subscription.ts @@ -1,12 +1,18 @@ -import { Injectable, Logger } from '@nestjs/common'; +import axios, { isAxiosError } from 'axios'; +import { Inject, Injectable, Logger } from '@nestjs/common'; +import { ConfigService } from '@nestjs/config'; +import { isEqual } from 'lodash'; +import { PubSub } from 'graphql-subscriptions'; import { N8nWebhookSubscriptionService } from '../n8n-webhook-manager/n8n-webhook-manager.interface'; -import { FirebaseService } from '../firebase/firebase.service'; -import { subscribeOnFirebaseEvent } from '../firebase/firebase.utils'; -import { FirebaseDatabasePath } from '../firebase/firebase.constants'; -import { ConfigService } from '@nestjs/config'; -import axios, { isAxiosError } from 'axios'; -import { DataSnapshot } from 'firebase-admin/database'; +import { PUBSUB_PROVIDER } from '../graphql-pubsub/graphql-pubsub.constants'; +import { + USER_CREATED_SUBSCRIPTION, + USER_UPDATED_SUBSCRIPTION, +} from './user.constants'; +import { UserUpdatedDto } from './dto/subscriptions/user-updated.dto'; +import { UserDto } from './dto/user/user.dto'; +import { UserCreatedDto } from './dto/subscriptions/user-created.dto'; @Injectable() export class UserN8nSubscription implements N8nWebhookSubscriptionService { @@ -14,8 +20,8 @@ export class UserN8nSubscription implements N8nWebhookSubscriptionService { private templateWebhookUrl: string; constructor( - private firebaseService: FirebaseService, private configService: ConfigService, + @Inject(PUBSUB_PROVIDER) private pubSub: PubSub, ) { const baseUrl = this.configService.getOrThrow('n8n.webhookUrl'); const contactCreationPath = this.configService.getOrThrow( @@ -25,27 +31,45 @@ export class UserN8nSubscription implements N8nWebhookSubscriptionService { this.templateWebhookUrl = `${baseUrl}${contactCreationPath}`; } - subscribe(): () => Promise { - const app = this.firebaseService.getDefaultApp(); - const usersRef = app.database().ref(FirebaseDatabasePath.USERS); + public async subscribe(): Promise<() => Promise> { + const subscriptionIds = await Promise.all([ + this.subscribeOnUserUpdates(), + this.subscribeOnUserCreation(), + ]); - const unsubscribe = subscribeOnFirebaseEvent( - usersRef, - 'child_changed', - this.firebaseEventHandler, - ); + return async () => { + for (const subscriptionId of subscriptionIds) { + await this.pubSub.unsubscribe(subscriptionId); + } + }; + } - return async () => unsubscribe(); + private async subscribeOnUserUpdates() { + return this.pubSub.subscribe( + USER_UPDATED_SUBSCRIPTION, + ({ user, beforeUpdateUser }: UserUpdatedDto) => { + if (this.shouldCallWebhook(user, beforeUpdateUser)) { + this.callWebhook(user.id); + } + }, + ); } - private firebaseEventHandler = async (snapshot: DataSnapshot) => { - if (snapshot.key) { - await this.callWebhook(snapshot.key); - } - }; + private async subscribeOnUserCreation() { + return this.pubSub.subscribe( + USER_CREATED_SUBSCRIPTION, + ({ user }: UserCreatedDto) => { + this.callWebhook(user.id); + }, + ); + } private async callWebhook(userId: string) { try { + if (!userId) { + throw new Error('User id should be provided'); + } + const webhookUrl = this.templateWebhookUrl.replace(':userId', userId); await axios.get(webhookUrl); @@ -60,4 +84,17 @@ export class UserN8nSubscription implements N8nWebhookSubscriptionService { this.logger.error(error); } } + + private shouldCallWebhook(user: UserDto, previous: UserDto) { + return !isEqual( + this.getCompareValues(user), + this.getCompareValues(previous), + ); + } + + private getCompareValues = (compareUser: UserDto) => [ + compareUser.data.email, + compareUser.data.firstName, + compareUser.data.lastName, + ]; } diff --git a/src/modules/user/user.resolver.ts b/src/modules/user/user.resolver.ts index 2ad65247..a60fadd6 100644 --- a/src/modules/user/user.resolver.ts +++ b/src/modules/user/user.resolver.ts @@ -1,4 +1,4 @@ -import { UseFilters } from '@nestjs/common'; +import { Inject, UseFilters } from '@nestjs/common'; import { Args, Mutation, Query, Resolver } from '@nestjs/graphql'; import { FirebaseApp } from '../firebase/firebase.decorator'; @@ -10,10 +10,22 @@ import { UserService } from './user.service'; import { UserCreationArgs } from './dto/creation/user-creation.args'; import { UserDataEditingArgs } from './dto/editing/user-data-editing.args'; import { FirebaseExceptionFilter } from '../firebase/firebase.exception.filter'; +import { createSubscriptionEventEmitter } from '../graphql-pubsub/graphql-pubsub.utils'; +import { + USER_CREATED_SUBSCRIPTION, + USER_UPDATED_SUBSCRIPTION, +} from './user.constants'; +import { PubSub } from 'graphql-subscriptions'; +import { PUBSUB_PROVIDER } from '../graphql-pubsub/graphql-pubsub.constants'; +import { UserUpdatedDto } from './dto/subscriptions/user-updated.dto'; +import { UserCreatedDto } from './dto/subscriptions/user-created.dto'; @Resolver() export class UserResolver { - constructor(private userService: UserService) {} + constructor( + private userService: UserService, + @Inject(PUBSUB_PROVIDER) private pubSub: PubSub, + ) {} @Query(() => UserDto, { nullable: true }) @UseFilters(FirebaseExceptionFilter) @@ -48,7 +60,14 @@ export class UserResolver { @Args() args: UserCreationArgs, @FirebaseApp() app: FirebaseAppInstance, ): Promise { - return this.userService.createUser(args, app); + const emitUserCreated = createSubscriptionEventEmitter( + USER_CREATED_SUBSCRIPTION, + ); + const user = await this.userService.createUser(args, app); + + emitUserCreated(this.pubSub, { user }); + + return user; } @Mutation(() => UserDto) @@ -57,7 +76,24 @@ export class UserResolver { @Args() args: UserDataEditingArgs, @FirebaseApp() app: FirebaseAppInstance, ): Promise { - return this.userService.editUserData(args, app); + const emitUserUpdated = createSubscriptionEventEmitter( + USER_UPDATED_SUBSCRIPTION, + ); + + const beforeUpdateUser = await this.userService.getUserById( + args.userId, + app, + ); + + if (!beforeUpdateUser) { + throw new Error(`User not found`); + } + + const user = await this.userService.editUserData(args, app); + + emitUserUpdated(this.pubSub, { user, beforeUpdateUser }); + + return user; } @Mutation(() => Boolean)