Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature/In-Mem LLMcache #1036

Merged
merged 2 commits into from
Oct 12, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions packages/components/credentials/UpstashRedisApi.credential.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,15 @@ class UpstashRedisApi implements INodeCredential {
label: string
name: string
version: number
description: string
inputs: INodeParams[]

constructor() {
this.label = 'Upstash Redis API'
this.name = 'upstashRedisApi'
this.version = 1.0
this.description =
'Refer to <a target="_blank" href="https://upstash.com/docs/redis/overall/getstarted">official guide</a> on how to create redis instance and get redis REST URL and Token'
this.inputs = [
{
label: 'Upstash Redis REST URL',
Expand Down
65 changes: 65 additions & 0 deletions packages/components/nodes/cache/InMemoryCache/InMemoryCache.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
import { getBaseClasses, ICommonObject, INode, INodeData, INodeParams } from '../../../src'
import { BaseCache } from 'langchain/schema'
import hash from 'object-hash'

class InMemoryCache implements INode {
label: string
name: string
version: number
description: string
type: string
icon: string
category: string
baseClasses: string[]
inputs: INodeParams[]
credential: INodeParams

constructor() {
this.label = 'InMemory Cache'
this.name = 'inMemoryCache'
this.version = 1.0
this.type = 'InMemoryCache'
this.description = 'Cache LLM response in memory, will be cleared once app restarted'
this.icon = 'inmemorycache.png'
this.category = 'Cache'
this.baseClasses = [this.type, ...getBaseClasses(InMemoryCacheExtended)]
this.inputs = []
}

async init(nodeData: INodeData, _: string, options: ICommonObject): Promise<any> {
const memoryMap = options.cachePool.getLLMCache(options.chatflowid) ?? new Map()
const inMemCache = new InMemoryCacheExtended(memoryMap)

inMemCache.lookup = async (prompt: string, llmKey: string): Promise<any | null> => {
const memory = options.cachePool.getLLMCache(options.chatflowid) ?? inMemCache.cache
return Promise.resolve(memory.get(getCacheKey(prompt, llmKey)) ?? null)
}

inMemCache.update = async (prompt: string, llmKey: string, value: any): Promise<void> => {
inMemCache.cache.set(getCacheKey(prompt, llmKey), value)
options.cachePool.addLLMCache(options.chatflowid, inMemCache.cache)
}
return inMemCache
}
}

const getCacheKey = (...strings: string[]): string => hash(strings.join('_'))

class InMemoryCacheExtended extends BaseCache {
cache: Map<string, any>

constructor(map: Map<string, any>) {
super()
this.cache = map
}

lookup(prompt: string, llmKey: string): Promise<any | null> {
return Promise.resolve(this.cache.get(getCacheKey(prompt, llmKey)) ?? null)
}

async update(prompt: string, llmKey: string, value: any): Promise<void> {
this.cache.set(getCacheKey(prompt, llmKey), value)
}
}

module.exports = { nodeClass: InMemoryCache }
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ class MomentoCache implements INode {
this.name = 'momentoCache'
this.version = 1.0
this.type = 'MomentoCache'
this.description = 'Cache LLM response using Momento, a distributed, serverless cache'
this.icon = 'momento.png'
this.category = 'Cache'
this.baseClasses = [this.type, ...getBaseClasses(LangchainMomentoCache)]
Expand Down
1 change: 1 addition & 0 deletions packages/components/nodes/cache/RedisCache/RedisCache.ts
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ class RedisCache implements INode {
this.name = 'redisCache'
this.version = 1.0
this.type = 'RedisCache'
this.description = 'Cache LLM response in Redis, useful for sharing cache across multiple processes or servers'
this.icon = 'redis.svg'
this.category = 'Cache'
this.baseClasses = [this.type, ...getBaseClasses(LangchainRedisCache)]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ class UpstashRedisCache implements INode {
this.name = 'upstashRedisCache'
this.version = 1.0
this.type = 'UpstashRedisCache'
this.description = 'Cache LLM response in Upstash Redis, serverless data for Redis and Kafka'
this.icon = 'upstash.png'
this.category = 'Cache'
this.baseClasses = [this.type, ...getBaseClasses(LangchainUpstashRedisCache)]
Expand Down
2 changes: 2 additions & 0 deletions packages/components/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@
"node-fetch": "^2.6.11",
"node-html-markdown": "^1.3.0",
"notion-to-md": "^3.1.1",
"object-hash": "^3.0.0",
"pdf-parse": "^1.1.1",
"pdfjs-dist": "^3.7.107",
"pg": "^8.11.2",
Expand All @@ -73,6 +74,7 @@
"devDependencies": {
"@types/gulp": "4.0.9",
"@types/node-fetch": "2.6.2",
"@types/object-hash": "^3.0.2",
"@types/pg": "^8.10.2",
"@types/ws": "^8.5.3",
"gulp": "^4.0.2",
Expand Down
53 changes: 53 additions & 0 deletions packages/server/src/CachePool.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
import { IActiveCache } from './Interface'

/**
* This pool is to keep track of in-memory cache used for LLM and Embeddings
*/
export class CachePool {
activeLLMCache: IActiveCache = {}
activeEmbeddingCache: IActiveCache = {}

/**
* Add to the llm cache pool
* @param {string} chatflowid
* @param {Map<any, any>} value
*/
addLLMCache(chatflowid: string, value: Map<any, any>) {
this.activeLLMCache[chatflowid] = value
}

/**
* Add to the embedding cache pool
* @param {string} chatflowid
* @param {Map<any, any>} value
*/
addEmbeddingCache(chatflowid: string, value: Map<any, any>) {
this.activeEmbeddingCache[chatflowid] = value
}

/**
* Get item from llm cache pool
* @param {string} chatflowid
*/
getLLMCache(chatflowid: string): Map<any, any> | undefined {
return this.activeLLMCache[chatflowid]
}

/**
* Get item from embedding cache pool
* @param {string} chatflowid
*/
getEmbeddingCache(chatflowid: string): Map<any, any> | undefined {
return this.activeEmbeddingCache[chatflowid]
}
}

let cachePoolInstance: CachePool | undefined

export function getInstance(): CachePool {
if (cachePoolInstance === undefined) {
cachePoolInstance = new CachePool()
}

return cachePoolInstance
}
4 changes: 4 additions & 0 deletions packages/server/src/Interface.ts
Original file line number Diff line number Diff line change
Expand Up @@ -157,6 +157,10 @@ export interface IActiveChatflows {
}
}

export interface IActiveCache {
[key: string]: Map<any, any>
}

export interface IOverrideConfig {
node: string
nodeId: string
Expand Down
9 changes: 8 additions & 1 deletion packages/server/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -53,13 +53,15 @@ import { ChatMessage } from './database/entities/ChatMessage'
import { Credential } from './database/entities/Credential'
import { Tool } from './database/entities/Tool'
import { ChatflowPool } from './ChatflowPool'
import { CachePool } from './CachePool'
import { ICommonObject, INodeOptionsValue } from 'flowise-components'
import { createRateLimiter, getRateLimiter, initializeRateLimiter } from './utils/rateLimit'

export class App {
app: express.Application
nodesPool: NodesPool
chatflowPool: ChatflowPool
cachePool: CachePool
AppDataSource = getDataSource()

constructor() {
Expand Down Expand Up @@ -91,6 +93,9 @@ export class App {
// Initialize Rate Limit
const AllChatFlow: IChatFlow[] = await getAllChatFlow()
await initializeRateLimiter(AllChatFlow)

// Initialize cache pool
this.cachePool = new CachePool()
})
.catch((err) => {
logger.error('❌ [server]: Error during Data Source initialization:', err)
Expand Down Expand Up @@ -944,8 +949,10 @@ export class App {
incomingInput.question,
incomingInput.history,
chatId,
chatflowid,
this.AppDataSource,
incomingInput?.overrideConfig
incomingInput?.overrideConfig,
this.cachePool
)

const nodeToExecute = reactFlowNodes.find((node: IReactFlowNode) => node.id === endingNodeId)
Expand Down
11 changes: 9 additions & 2 deletions packages/server/src/utils/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ import { ChatMessage } from '../database/entities/ChatMessage'
import { Credential } from '../database/entities/Credential'
import { Tool } from '../database/entities/Tool'
import { DataSource } from 'typeorm'
import { CachePool } from '../CachePool'

const QUESTION_VAR_PREFIX = 'question'
const CHAT_HISTORY_VAR_PREFIX = 'chat_history'
Expand Down Expand Up @@ -197,8 +198,10 @@ export const getEndingNode = (nodeDependencies: INodeDependencies, graph: INodeD
* @param {IComponentNodes} componentNodes
* @param {string} question
* @param {string} chatId
* @param {string} chatflowid
* @param {DataSource} appDataSource
* @param {ICommonObject} overrideConfig
* @param {CachePool} cachePool
*/
export const buildLangchain = async (
startingNodeIds: string[],
Expand All @@ -209,8 +212,10 @@ export const buildLangchain = async (
question: string,
chatHistory: IMessage[],
chatId: string,
chatflowid: string,
appDataSource: DataSource,
overrideConfig?: ICommonObject
overrideConfig?: ICommonObject,
cachePool?: CachePool
) => {
const flowNodes = cloneDeep(reactFlowNodes)

Expand Down Expand Up @@ -245,9 +250,11 @@ export const buildLangchain = async (
logger.debug(`[server]: Initializing ${reactFlowNode.data.label} (${reactFlowNode.data.id})`)
flowNodes[nodeIndex].data.instance = await newNodeInstance.init(reactFlowNodeData, question, {
chatId,
chatflowid,
appDataSource,
databaseEntities,
logger
logger,
cachePool
})
logger.debug(`[server]: Finished initializing ${reactFlowNode.data.label} (${reactFlowNode.data.id})`)
} catch (e: any) {
Expand Down