-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathmain.py
268 lines (214 loc) · 8.02 KB
/
main.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
import os
import shutil
import time
from typing import Dict, List, Union
import backoff
import httpx
import nest_asyncio
import textract
from deeplake.core.dataset import Dataset
from deeplake.core.vectorstore import VectorStore
from minio import Minio
from neo4j import GraphDatabase
from openai import OpenAI
nest_asyncio.apply()
client = Minio(
"host.docker.internal:9000",
access_key="miniouser",
secret_key="miniosecret",
secure=False,
)
if client.bucket_exists("datalake"):
print("bucket exists")
else:
print("make bucket")
client.make_bucket("datalake")
# Retry decorator for handling timeouts
@backoff.on_exception(
backoff.expo, (httpx.ReadTimeout, httpx.ConnectTimeout), max_tries=5
)
def retry_request(func, *args, **kwargs):
return func(*args, **kwargs)
client = OpenAI(
base_url="http://host.docker.internal:1234/v1", api_key="lm-studio", timeout=60.0
)
# DeepLake setup
vector_store = VectorStore(
"s3://datalake",
creds={
"aws_access_key_id": "miniouser",
"aws_secret_access_key": "miniosecret",
"endpoint_url": "http://minio:9000",
},
)
# Neo4j setup
neo4j_uri = "bolt://neo4j:7687"
neo4j_user = "neo4j"
neo4j_password = "securepassword"
neo4j_driver = GraphDatabase.driver(neo4j_uri, auth=(neo4j_user, neo4j_password))
def embedding_function(texts, model="nomic-ai/nomic-embed-text-v1.5-GGUF"):
if isinstance(texts, str):
texts = [texts]
texts = [t.replace("\n", " ") for t in texts]
return [
data.embedding
for data in client.embeddings.create(input=texts, model=model).data
]
def add_to_vector_store(chunked_text, metadata, vector_store):
vector_store.add(
text=chunked_text,
embedding_function=embedding_function,
embedding_data=chunked_text,
metadata=metadata,
)
def initialize_vector_store(
data_path: str = "/app/data", processed_path: str = "/app/processed_data"
):
CHUNK_SIZE = 1000
# Ensure the processed_path directory exists
os.makedirs(processed_path, exist_ok=True)
chunked_text = []
metadata = []
for dirpath, dirnames, filenames in os.walk(data_path):
for file in filenames:
try:
full_path = os.path.join(dirpath, file)
file_extension = os.path.splitext(file)[1].lower()
if file_extension in [
".txt",
".doc",
".docx",
".pdf",
".csv",
".xls",
".xlsx",
]:
# Use textract to extract text from the file
text = textract.process(full_path).decode("utf-8")
# Chunk the text
new_chunked_text = [
text[i : i + CHUNK_SIZE]
for i in range(0, len(text), CHUNK_SIZE)
]
chunked_text += new_chunked_text
metadata += [
{"filepath": full_path, "filetype": file_extension}
for _ in range(len(new_chunked_text))
]
# Move the processed file
new_path = os.path.join(
processed_path, os.path.relpath(full_path, data_path)
)
os.makedirs(os.path.dirname(new_path), exist_ok=True)
shutil.move(full_path, new_path)
print(f"Moved processed file to: {new_path}")
else:
print(f"Skipping unsupported file type: {full_path}")
except Exception as e:
print(f"Error processing file {full_path}: {e}")
if chunked_text:
add_to_vector_store(chunked_text, metadata, vector_store)
print(
f"Added {len(chunked_text)} chunks from {len(metadata)} files to the vector store."
)
else:
print("No valid documents found to add to the vector store.")
@backoff.on_exception(
backoff.expo, (httpx.ReadTimeout, httpx.ConnectTimeout), max_tries=5
)
def query_vector_store(prompt: str) -> Union[Dict, Dataset]:
results = vector_store.search(
embedding_data=prompt, embedding_function=embedding_function
)
return results[0].text if results else ""
@backoff.on_exception(
backoff.expo, (httpx.ReadTimeout, httpx.ConnectTimeout), max_tries=5
)
def generate_search_queries(context: str) -> List[str]:
prompt = f"Based on the following context, generate 3 specific Google Dorks to research further:\n\n{context}\n\nGoogle Dorks:"
stream = client.chat.completions.create(
model="lmstudio-community/Meta-Llama-3.1-8B-Instruct-GGUF",
messages=[
{
"role": "system",
"content": "Always output only the search queries, no other text.",
},
{"role": "user", "content": prompt},
],
temperature=0.1,
stream=True
)
out = []
for part in stream:
content = part.choices[0].delta.content
if content is not None:
out.append(content)
print(content, end="")
return out
def analyze_graph_data(tx, query: str) -> List[Dict]:
result = tx.run(query)
return [dict(record) for record in result]
def disambiguate_data(data: List[Dict]) -> List[Dict]:
# Implement disambiguation logic here
return data
def resolve_edge_redundancies(tx):
query = """
MATCH (a)-[r1]->(b)<-[r2]-(a)
WHERE type(r1) = type(r2)
WITH a, b, collect(r1) + collect(r2) AS rels
WHERE size(rels) > 1
FOREACH (r IN tail(rels) | DELETE r)
"""
tx.run(query)
def identify_enrichment_opportunities(data: List[Dict]) -> List[str]:
# Implement logic to identify nodes/edges that need enrichment
return ["Entity1", "Entity2", "Relationship1"]
def enrich_graph_data(tx, entity: str, new_data: Dict):
query = f"""
MERGE (n:{entity})
SET n += $props
"""
tx.run(query, props=new_data)
def main_research_loop():
initialize_vector_store("/app/data", "/app/processed_data")
while True:
try:
# 1. Analyze available data and formulate questions
question = "What are the main unanswered questions in our current research?"
context = query_vector_store(question)
# 2. Generate search queries
search_queries = generate_search_queries(context)
print("Generated search queries:", search_queries)
# 3. Perform web search and ingest new data (not implemented in this example)
# new_data = perform_web_search(search_queries)
# ingest_new_data(new_data)
# 4. Analyze graph data
with neo4j_driver.session() as session:
graph_data = session.read_transaction(
analyze_graph_data, "MATCH (n) RETURN n"
)
# 5. Disambiguate data
disambiguated_data = disambiguate_data(graph_data)
# 6. Resolve edge redundancies
session.write_transaction(resolve_edge_redundancies)
# 7. Identify enrichment opportunities
enrichment_targets = identify_enrichment_opportunities(
disambiguated_data
)
# 8. Enrich graph data
for target in enrichment_targets:
enrichment_query = (
f"How can we enrich our knowledge about {target}?"
)
enrichment_info = query_vector_store(enrichment_query)
session.write_transaction(
enrich_graph_data, target, {"info": str(enrichment_info)}
)
# Optional: Add a break condition or user input to stop the loop
break
except Exception as e:
print(f"An error occurred: {str(e)}")
print("Retrying in 60 seconds...")
time.sleep(60)
if __name__ == "__main__":
main_research_loop()