Skip to content

Commit d20a970

Browse files
xmaiconxMaicon Matsubara
and
Maicon Matsubara
authored
Bugfix/redis connection is closed (#3591)
* Added redis open connection if it is closed * Removed unecessary modification * Added check connection in all methods * Renamed method * added await on method call * Refactor Redis connection handling: remove singleton pattern, ensure connections are opened and closed per operation. --------- Co-authored-by: Maicon Matsubara <maicon@fullwise.com.br>
1 parent 3b804d7 commit d20a970

File tree

1 file changed

+58
-114
lines changed

1 file changed

+58
-114
lines changed

packages/components/nodes/memory/RedisBackedChatMemory/RedisBackedChatMemory.ts

+58-114
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,5 @@
11
import { Redis, RedisOptions } from 'ioredis'
2-
import { isEqual } from 'lodash'
32
import { BufferMemory, BufferMemoryInput } from 'langchain/memory'
4-
import { RedisChatMessageHistory, RedisChatMessageHistoryInput } from '@langchain/community/stores/message/ioredis'
53
import { mapStoredMessageToChatMessage, BaseMessage, AIMessage, HumanMessage } from '@langchain/core/messages'
64
import { INode, INodeData, INodeParams, ICommonObject, MessageType, IMessage, MemoryMethods, FlowiseMemory } from '../../../src/Interface'
75
import {
@@ -12,42 +10,6 @@ import {
1210
mapChatMessageToBaseMessage
1311
} from '../../../src/utils'
1412

15-
let redisClientSingleton: Redis
16-
let redisClientOption: RedisOptions
17-
let redisClientUrl: string
18-
19-
const getRedisClientbyOption = (option: RedisOptions) => {
20-
if (!redisClientSingleton) {
21-
// if client doesn't exists
22-
redisClientSingleton = new Redis(option)
23-
redisClientOption = option
24-
return redisClientSingleton
25-
} else if (redisClientSingleton && !isEqual(option, redisClientOption)) {
26-
// if client exists but option changed
27-
redisClientSingleton.quit()
28-
redisClientSingleton = new Redis(option)
29-
redisClientOption = option
30-
return redisClientSingleton
31-
}
32-
return redisClientSingleton
33-
}
34-
35-
const getRedisClientbyUrl = (url: string) => {
36-
if (!redisClientSingleton) {
37-
// if client doesn't exists
38-
redisClientSingleton = new Redis(url)
39-
redisClientUrl = url
40-
return redisClientSingleton
41-
} else if (redisClientSingleton && url !== redisClientUrl) {
42-
// if client exists but option changed
43-
redisClientSingleton.quit()
44-
redisClientSingleton = new Redis(url)
45-
redisClientUrl = url
46-
return redisClientSingleton
47-
}
48-
return redisClientSingleton
49-
}
50-
5113
class RedisBackedChatMemory_Memory implements INode {
5214
label: string
5315
name: string
@@ -114,11 +76,11 @@ class RedisBackedChatMemory_Memory implements INode {
11476
}
11577

11678
async init(nodeData: INodeData, _: string, options: ICommonObject): Promise<any> {
117-
return await initalizeRedis(nodeData, options)
79+
return await initializeRedis(nodeData, options)
11880
}
11981
}
12082

121-
const initalizeRedis = async (nodeData: INodeData, options: ICommonObject): Promise<BufferMemory> => {
83+
const initializeRedis = async (nodeData: INodeData, options: ICommonObject): Promise<BufferMemory> => {
12284
const sessionTTL = nodeData.inputs?.sessionTTL as number
12385
const memoryKey = nodeData.inputs?.memoryKey as string
12486
const sessionId = nodeData.inputs?.sessionId as string
@@ -127,120 +89,102 @@ const initalizeRedis = async (nodeData: INodeData, options: ICommonObject): Prom
12789
const credentialData = await getCredentialData(nodeData.credential ?? '', options)
12890
const redisUrl = getCredentialParam('redisUrl', credentialData, nodeData)
12991

130-
let client: Redis
131-
132-
if (!redisUrl || redisUrl === '') {
133-
const username = getCredentialParam('redisCacheUser', credentialData, nodeData)
134-
const password = getCredentialParam('redisCachePwd', credentialData, nodeData)
135-
const portStr = getCredentialParam('redisCachePort', credentialData, nodeData)
136-
const host = getCredentialParam('redisCacheHost', credentialData, nodeData)
137-
const sslEnabled = getCredentialParam('redisCacheSslEnabled', credentialData, nodeData)
138-
139-
const tlsOptions = sslEnabled === true ? { tls: { rejectUnauthorized: false } } : {}
140-
141-
client = getRedisClientbyOption({
142-
port: portStr ? parseInt(portStr) : 6379,
143-
host,
144-
username,
145-
password,
146-
...tlsOptions
147-
})
148-
} else {
149-
client = getRedisClientbyUrl(redisUrl)
150-
}
151-
152-
let obj: RedisChatMessageHistoryInput = {
153-
sessionId,
154-
client
155-
}
156-
157-
if (sessionTTL) {
158-
obj = {
159-
...obj,
160-
sessionTTL
161-
}
162-
}
163-
164-
const redisChatMessageHistory = new RedisChatMessageHistory(obj)
92+
const redisOptions = redisUrl
93+
? redisUrl
94+
: ({
95+
port: parseInt(getCredentialParam('redisCachePort', credentialData, nodeData) || '6379'),
96+
host: getCredentialParam('redisCacheHost', credentialData, nodeData),
97+
username: getCredentialParam('redisCacheUser', credentialData, nodeData),
98+
password: getCredentialParam('redisCachePwd', credentialData, nodeData),
99+
tls: getCredentialParam('redisCacheSslEnabled', credentialData, nodeData) ? { rejectUnauthorized: false } : undefined
100+
} as RedisOptions)
165101

166102
const memory = new BufferMemoryExtended({
167103
memoryKey: memoryKey ?? 'chat_history',
168-
chatHistory: redisChatMessageHistory,
169104
sessionId,
170105
windowSize,
171-
redisClient: client,
172-
sessionTTL
106+
sessionTTL,
107+
redisOptions
173108
})
174109

175110
return memory
176111
}
177112

178113
interface BufferMemoryExtendedInput {
179-
redisClient: Redis
180114
sessionId: string
181115
windowSize?: number
182116
sessionTTL?: number
117+
redisOptions: RedisOptions | string
183118
}
184119

185120
class BufferMemoryExtended extends FlowiseMemory implements MemoryMethods {
186121
sessionId = ''
187-
redisClient: Redis
188122
windowSize?: number
189123
sessionTTL?: number
124+
redisOptions: RedisOptions | string
190125

191126
constructor(fields: BufferMemoryInput & BufferMemoryExtendedInput) {
192127
super(fields)
193128
this.sessionId = fields.sessionId
194-
this.redisClient = fields.redisClient
195129
this.windowSize = fields.windowSize
196130
this.sessionTTL = fields.sessionTTL
131+
this.redisOptions = fields.redisOptions
132+
}
133+
134+
private async withRedisClient<T>(fn: (client: Redis) => Promise<T>): Promise<T> {
135+
const client = typeof this.redisOptions === 'string' ? new Redis(this.redisOptions) : new Redis(this.redisOptions)
136+
try {
137+
return await fn(client)
138+
} finally {
139+
await client.quit()
140+
}
197141
}
198142

199143
async getChatMessages(
200144
overrideSessionId = '',
201145
returnBaseMessages = false,
202146
prependMessages?: IMessage[]
203147
): Promise<IMessage[] | BaseMessage[]> {
204-
if (!this.redisClient) return []
205-
206-
const id = overrideSessionId ? overrideSessionId : this.sessionId
207-
const rawStoredMessages = await this.redisClient.lrange(id, this.windowSize ? this.windowSize * -1 : 0, -1)
208-
const orderedMessages = rawStoredMessages.reverse().map((message) => JSON.parse(message))
209-
const baseMessages = orderedMessages.map(mapStoredMessageToChatMessage)
210-
if (prependMessages?.length) {
211-
baseMessages.unshift(...(await mapChatMessageToBaseMessage(prependMessages)))
212-
}
213-
return returnBaseMessages ? baseMessages : convertBaseMessagetoIMessage(baseMessages)
148+
return this.withRedisClient(async (client) => {
149+
const id = overrideSessionId ? overrideSessionId : this.sessionId
150+
const rawStoredMessages = await client.lrange(id, this.windowSize ? this.windowSize * -1 : 0, -1)
151+
const orderedMessages = rawStoredMessages.reverse().map((message) => JSON.parse(message))
152+
const baseMessages = orderedMessages.map(mapStoredMessageToChatMessage)
153+
if (prependMessages?.length) {
154+
baseMessages.unshift(...(await mapChatMessageToBaseMessage(prependMessages)))
155+
}
156+
return returnBaseMessages ? baseMessages : convertBaseMessagetoIMessage(baseMessages)
157+
})
214158
}
215159

216160
async addChatMessages(msgArray: { text: string; type: MessageType }[], overrideSessionId = ''): Promise<void> {
217-
if (!this.redisClient) return
218-
219-
const id = overrideSessionId ? overrideSessionId : this.sessionId
220-
const input = msgArray.find((msg) => msg.type === 'userMessage')
221-
const output = msgArray.find((msg) => msg.type === 'apiMessage')
222-
223-
if (input) {
224-
const newInputMessage = new HumanMessage(input.text)
225-
const messageToAdd = [newInputMessage].map((msg) => msg.toDict())
226-
await this.redisClient.lpush(id, JSON.stringify(messageToAdd[0]))
227-
if (this.sessionTTL) await this.redisClient.expire(id, this.sessionTTL)
228-
}
161+
await this.withRedisClient(async (client) => {
162+
const id = overrideSessionId ? overrideSessionId : this.sessionId
163+
const input = msgArray.find((msg) => msg.type === 'userMessage')
164+
const output = msgArray.find((msg) => msg.type === 'apiMessage')
165+
166+
if (input) {
167+
const newInputMessage = new HumanMessage(input.text)
168+
const messageToAdd = [newInputMessage].map((msg) => msg.toDict())
169+
await client.lpush(id, JSON.stringify(messageToAdd[0]))
170+
if (this.sessionTTL) await client.expire(id, this.sessionTTL)
171+
}
229172

230-
if (output) {
231-
const newOutputMessage = new AIMessage(output.text)
232-
const messageToAdd = [newOutputMessage].map((msg) => msg.toDict())
233-
await this.redisClient.lpush(id, JSON.stringify(messageToAdd[0]))
234-
if (this.sessionTTL) await this.redisClient.expire(id, this.sessionTTL)
235-
}
173+
if (output) {
174+
const newOutputMessage = new AIMessage(output.text)
175+
const messageToAdd = [newOutputMessage].map((msg) => msg.toDict())
176+
await client.lpush(id, JSON.stringify(messageToAdd[0]))
177+
if (this.sessionTTL) await client.expire(id, this.sessionTTL)
178+
}
179+
})
236180
}
237181

238182
async clearChatMessages(overrideSessionId = ''): Promise<void> {
239-
if (!this.redisClient) return
240-
241-
const id = overrideSessionId ? overrideSessionId : this.sessionId
242-
await this.redisClient.del(id)
243-
await this.clear()
183+
await this.withRedisClient(async (client) => {
184+
const id = overrideSessionId ? overrideSessionId : this.sessionId
185+
await client.del(id)
186+
await this.clear()
187+
})
244188
}
245189
}
246190

0 commit comments

Comments
 (0)