|
| 1 | +import { ICommonObject, INode, INodeData, INodeOutputsValue, INodeParams } from '../../../src/Interface' |
| 2 | +import { Embeddings } from 'langchain/embeddings/base' |
| 3 | +import { Document } from 'langchain/document' |
| 4 | +import { DataSourceOptions } from 'typeorm' |
| 5 | +import { TypeORMVectorStore, TypeORMVectorStoreDocument } from 'langchain/vectorstores/typeorm' |
| 6 | +import { getBaseClasses, getCredentialData, getCredentialParam } from '../../../src/utils' |
| 7 | +import { Pool } from 'pg' |
| 8 | + |
| 9 | +class Postgres_Existing_VectorStores implements INode { |
| 10 | + label: string |
| 11 | + name: string |
| 12 | + version: number |
| 13 | + description: string |
| 14 | + type: string |
| 15 | + icon: string |
| 16 | + category: string |
| 17 | + baseClasses: string[] |
| 18 | + inputs: INodeParams[] |
| 19 | + credential: INodeParams |
| 20 | + outputs: INodeOutputsValue[] |
| 21 | + |
| 22 | + constructor() { |
| 23 | + this.label = 'Postgres Load Existing Index' |
| 24 | + this.name = 'postgresExistingIndex' |
| 25 | + this.version = 1.0 |
| 26 | + this.type = 'Postgres' |
| 27 | + this.icon = 'postgres.svg' |
| 28 | + this.category = 'Vector Stores' |
| 29 | + this.description = 'Load existing index from Postgres using pgvector (i.e: Document has been upserted)' |
| 30 | + this.baseClasses = [this.type, 'VectorStoreRetriever', 'BaseRetriever'] |
| 31 | + this.credential = { |
| 32 | + label: 'Connect Credential', |
| 33 | + name: 'credential', |
| 34 | + type: 'credential', |
| 35 | + credentialNames: ['PostgresApi'] |
| 36 | + } |
| 37 | + this.inputs = [ |
| 38 | + { |
| 39 | + label: 'Embeddings', |
| 40 | + name: 'embeddings', |
| 41 | + type: 'Embeddings' |
| 42 | + }, |
| 43 | + { |
| 44 | + label: 'Host', |
| 45 | + name: 'host', |
| 46 | + type: 'string' |
| 47 | + }, |
| 48 | + { |
| 49 | + label: 'Database', |
| 50 | + name: 'database', |
| 51 | + type: 'string' |
| 52 | + }, |
| 53 | + { |
| 54 | + label: 'Port', |
| 55 | + name: 'port', |
| 56 | + type: 'number', |
| 57 | + placeholder: '6432', |
| 58 | + optional: true |
| 59 | + }, |
| 60 | + { |
| 61 | + label: 'Table Name', |
| 62 | + name: 'tableName', |
| 63 | + type: 'string', |
| 64 | + placeholder: 'documents', |
| 65 | + additionalParams: true, |
| 66 | + optional: true |
| 67 | + }, |
| 68 | + { |
| 69 | + label: 'Top K', |
| 70 | + name: 'topK', |
| 71 | + description: 'Number of top results to fetch. Default to 4', |
| 72 | + placeholder: '4', |
| 73 | + type: 'number', |
| 74 | + additionalParams: true, |
| 75 | + optional: true |
| 76 | + } |
| 77 | + ] |
| 78 | + this.outputs = [ |
| 79 | + { |
| 80 | + label: 'Postgres Retriever', |
| 81 | + name: 'retriever', |
| 82 | + baseClasses: this.baseClasses |
| 83 | + }, |
| 84 | + { |
| 85 | + label: 'Postgres Vector Store', |
| 86 | + name: 'vectorStore', |
| 87 | + baseClasses: [this.type, ...getBaseClasses(TypeORMVectorStore)] |
| 88 | + } |
| 89 | + ] |
| 90 | + } |
| 91 | + |
| 92 | + async init(nodeData: INodeData, _: string, options: ICommonObject): Promise<any> { |
| 93 | + const credentialData = await getCredentialData(nodeData.credential ?? '', options) |
| 94 | + const user = getCredentialParam('user', credentialData, nodeData) |
| 95 | + const password = getCredentialParam('password', credentialData, nodeData) |
| 96 | + const _tableName = nodeData.inputs?.tableName as string |
| 97 | + const tableName = _tableName ? _tableName : 'documents' |
| 98 | + const embeddings = nodeData.inputs?.embeddings as Embeddings |
| 99 | + const output = nodeData.outputs?.output as string |
| 100 | + const topK = nodeData.inputs?.topK as string |
| 101 | + const k = topK ? parseFloat(topK) : 4 |
| 102 | + |
| 103 | + const postgresConnectionOptions = { |
| 104 | + type: 'postgres', |
| 105 | + host: nodeData.inputs?.host as string, |
| 106 | + port: nodeData.inputs?.port as number, |
| 107 | + username: user, |
| 108 | + password: password, |
| 109 | + database: nodeData.inputs?.database as string |
| 110 | + } |
| 111 | + |
| 112 | + const args = { |
| 113 | + postgresConnectionOptions: postgresConnectionOptions as DataSourceOptions, |
| 114 | + tableName: tableName |
| 115 | + } |
| 116 | + |
| 117 | + const vectorStore = await TypeORMVectorStore.fromDataSource(embeddings, args) |
| 118 | + |
| 119 | + // Rewrite the method to use pg pool connection instead of the default connection |
| 120 | + /* Otherwise a connection error is displayed when the chain tries to execute the function |
| 121 | + [chain/start] [1:chain:ConversationalRetrievalQAChain] Entering Chain run with input: { "question": "what the document is about", "chat_history": [] } |
| 122 | + [retriever/start] [1:chain:ConversationalRetrievalQAChain > 2:retriever:VectorStoreRetriever] Entering Retriever run with input: { "query": "what the document is about" } |
| 123 | + [ERROR]: uncaughtException: Illegal invocation TypeError: Illegal invocation at Socket.ref (node:net:1524:18) at Connection.ref (.../node_modules/pg/lib/connection.js:183:17) at Client.ref (.../node_modules/pg/lib/client.js:591:21) at BoundPool._pulseQueue (/node_modules/pg-pool/index.js:148:28) at .../node_modules/pg-pool/index.js:184:37 at process.processTicksAndRejections (node:internal/process/task_queues:77:11) |
| 124 | + */ |
| 125 | + vectorStore.similaritySearchVectorWithScore = async (query: number[], k: number, filter?: any) => { |
| 126 | + const embeddingString = `[${query.join(',')}]` |
| 127 | + const _filter = filter ?? '{}' |
| 128 | + |
| 129 | + const queryString = ` |
| 130 | + SELECT *, embedding <=> $1 as "_distance" |
| 131 | + FROM ${tableName} |
| 132 | + WHERE metadata @> $2 |
| 133 | + ORDER BY "_distance" ASC |
| 134 | + LIMIT $3;` |
| 135 | + |
| 136 | + const poolOptions = { |
| 137 | + host: postgresConnectionOptions.host, |
| 138 | + port: postgresConnectionOptions.port, |
| 139 | + user: postgresConnectionOptions.username, |
| 140 | + password: postgresConnectionOptions.password, |
| 141 | + database: postgresConnectionOptions.database |
| 142 | + } |
| 143 | + const pool = new Pool(poolOptions) |
| 144 | + const conn = await pool.connect() |
| 145 | + |
| 146 | + const documents = await conn.query(queryString, [embeddingString, _filter, k]) |
| 147 | + |
| 148 | + conn.release() |
| 149 | + |
| 150 | + const results = [] as [TypeORMVectorStoreDocument, number][] |
| 151 | + for (const doc of documents.rows) { |
| 152 | + if (doc._distance != null && doc.pageContent != null) { |
| 153 | + const document = new Document(doc) as TypeORMVectorStoreDocument |
| 154 | + document.id = doc.id |
| 155 | + results.push([document, doc._distance]) |
| 156 | + } |
| 157 | + } |
| 158 | + |
| 159 | + return results |
| 160 | + } |
| 161 | + |
| 162 | + if (output === 'retriever') { |
| 163 | + const retriever = vectorStore.asRetriever(k) |
| 164 | + return retriever |
| 165 | + } else if (output === 'vectorStore') { |
| 166 | + ;(vectorStore as any).k = k |
| 167 | + return vectorStore |
| 168 | + } |
| 169 | + return vectorStore |
| 170 | + } |
| 171 | +} |
| 172 | + |
| 173 | +module.exports = { nodeClass: Postgres_Existing_VectorStores } |
0 commit comments