-
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathinteractive.py
115 lines (89 loc) · 4.5 KB
/
interactive.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
import asyncio
from openai import AsyncOpenAI
import requests
import json
import os
# Configure the OpenAI client with your API key
client = AsyncOpenAI(api_key=os.getenv("OPENAI_API_KEY"))
class Conversation:
messages = None
def __init__(self):
Conversation.messages = [
{
"role": "system",
"content": (
"You are a ClickHouse expert specializing in OLAP databases, SQL format, and functions. You can produce SQL queries using knowledge of ClickHouse's architecture, data modeling, performance optimization, query execution, and advanced analytical functions."
),
}
]
async def answer(self, user_prompt):
prompt = self.build_query_prompt(user_prompt)
self._update("user", prompt)
chat_completion = await client.chat.completions.create(
messages=Conversation.messages,
model="gpt-3.5-turbo",
)
response_content = chat_completion.choices[0].message.content
self._update("assistant", response_content)
return response_content
def _update(self, role, content):
new_message = {"role": role, "content": content}
new_message_length = len(json.dumps(new_message)) # Estimate the token length
while len(Conversation.messages) > 0 and new_message_length + len(json.dumps(Conversation.messages)) > 4096:
Conversation.messages.pop(0) # Remove the oldest message
Conversation.messages.append(new_message)
def build_query_prompt(self, query):
input_str = f"""
I want you to act as a ClickHouse expert specializing in OLAP databases, SQL format, and functions. You can produce SQL queries using knowledge of ClickHouse's architecture, data modeling, performance optimization, query execution, and advanced analytical functions.
I want you to generate an accurate ClickHouse SQL query for the question:
{query}
- Make sure the query is ClickHouse compatible
- Make sure ClickHouse SQL and ClickHouse functions are used
- Make sure the simplest ClickHouse SQL query is used and avoid complexity.
- Assume there are no tables in memory, data is always remote
- Load data from URLs containing http or https URLs using the url() ClickHouse function, for instance url('https://domain.com/file.csv')
- Load data from files using the file() ClickHouse function, for instance: file('data.csv')
- Make sure any file hosted on s3 is loaded using the s3() ClickHouse function
- Ensure case sensitivity
- Ensure NULL check
- When LIMIT is 1 or lower than 10 always select *.
- Do not write explanations. Just return the SQL query in a single code block.
The expected output is code only. Always use table name in column reference to avoid ambiguity.
"""
return input_str
async def execute_sql_query(query: str):
"""Execute SQL query and return results."""
base_url = "https://chdb.fly.dev/"
params = {"query": query, "default_format": "JSONCompact"}
response = requests.get(base_url, params=params)
# print("Debug Query:\n",query)
try:
data = response.json()
if response.status_code == 200:
return data
else:
return 'Query execution failed with status code {}: {}'.format(response.status_code, data)
except json.JSONDecodeError:
return response.text
async def generate_contextual_response(user_query, query_results):
prompt = f"Based on the following data: {query_results}, how would you summarize the answer to this query: '{user_query}'?"
# print("Debug Results:\n",query_results)
chat_completion = await client.chat.completions.create(
messages=[{"role": "system", "content": prompt}],
model="gpt-3.5-turbo",
)
return chat_completion.choices[0].message.content
async def main():
conversation = Conversation()
print("\nHi, I'm chdbGPT, an AI assistant that can execute ClickHouse SQL queries for you.")
while True:
user_input = input("\nWhat would you like to know? => ").strip()
if user_input.lower() in ['exit', 'quit']:
print("Exiting SQLGPT. Goodbye!")
break
sql_query = await conversation.answer(user_input)
query_results = await execute_sql_query(sql_query)
contextual_response = await generate_contextual_response(user_input, query_results)
print(contextual_response)
break
asyncio.run(main())