-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathzozo-improved.py
373 lines (348 loc) · 18.4 KB
/
zozo-improved.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
import irc.bot
import openai
import json
import os
import requests
import pyshorteners
import imgbbpy
import time
from pylatexenc.latex2text import LatexNodes2Text
class ChatGPTBot(irc.bot.SingleServerIRCBot):
def __init__(self, config_file):
# Charger la configuration
with open(config_file, "r") as f:
config = json.load(f)
# Initialiser les variables
self.channel_list = [channel.strip() for channel in config["channels"].split(",")]
self.nickname = config["nickname"]
self.server = config["server"]
self.port = config["port"]
self.api_key = config["api_key"]
self.max_num_line = config["max_num_line"]
self.imgbb_api_key = config["imgbb_api_key"]
self.web_url = config["display_url"]
self.image_filename= config["image_filename"]
self.admin_user = config["admin_user"]
self.grok_api_key = config["grok_api_key"]
self.blocked_users = set()
self.user_contexts = [] # List to store context information
self.model = "gpt-4o-mini"
self.tag = ""
openai.api_key = self.api_key
self.imgbb_client = imgbbpy.SyncClient(self.imgbb_api_key)
if not os.path.exists("conversations"):
os.makedirs("conversations")
irc.bot.SingleServerIRCBot.__init__(self, [(self.server, self.port)], self.nickname, self.nickname)
def on_welcome(self, connection, event):
for channel in self.channel_list:
connection.join(channel)
def on_pubmsg(self, connection, event):
message = event.arguments[0]
user = event.source.nick
channel = event.target
bot_nickname = self.connection.get_nickname()
if message.startswith("grok"):
prompt = message.split("grok", 1)[1].strip()
self.update_context(channel, user, prompt)
response = self.generate_response_grok(channel, user, prompt)
self.send_message_in_chunks(connection, channel, response)
#connection.privmsg(channel, f"Grok: {response}")
elif message.strip().startswith(bot_nickname + ":"):
message = message[len(bot_nickname) + 1:].strip()
command, args = self.parse_command(message)
if command == "tag":
self.tag = args # Mettre à jour le tag
connection.privmsg(channel, f"Tag mis à jour: {self.tag}")
elif command == "help":
self.send_help_message(connection, channel)
elif command == "raz":
self.reset_user_context(channel, user)
connection.privmsg(channel, "Conversation oubliée ...")
elif command == "save":
self.save_user_context(channel, user, args)
elif command == "load":
self.load_user_context(channel, user, args)
elif command == "files":
self.list_user_files(channel, user)
elif command == "delete":
self.delete_user_context(channel, user, args)
elif command == "block" and user == self.admin_user:
self.block_user(args)
connection.privmsg(channel, f"Utilisateur {args} bloqué.")
elif command == "unblock" and user == self.admin_user:
self.unblock_user(args)
connection.privmsg(channel, f"Utilisateur {args} débloqué.")
elif command == "model":
self.change_model(channel, user, args)
elif command == "list-models":
self.list_models(channel)
elif command == "current":
connection.privmsg(channel, f"Modèle actuel utilisé : {self.model}")
elif command == "local":
self.generate_image_local(connection, channel, args)
elif command == "image":
self.generate_image_tiny(connection, channel, args)
elif command == "imgbb":
self.generate_image_imgbb(connection, channel, args)
elif command == "vision":
self.generate_image_description(connection, channel, args)
elif command == "url":
self.summarize_url(connection, channel, args)
else:
if user in self.blocked_users:
connection.privmsg(channel, "Vous êtes bloqué et ne pouvez pas recevoir de réponses.")
else:
self.update_context(channel, user, message)
response = self.generate_response(channel, user, message)
self.send_message_in_chunks(connection, channel, response)
def parse_command(self, message):
parts = message.strip().split(" ", 1)
command = parts[0].lower()
args = parts[1] if len(parts) > 1 else ""
return command, args
def send_help_message(self, connection, channel):
help_message = ("'raz' oublie la conversation, 'save [titre]', 'load [titre]', "
"'delete [titre]', 'files' liste les conversations, 'block [user]' "
"bloque un utilisateur, 'unblock [user]' débloque un utilisateur, "
"'model [model_name]' pour changer le modèle, 'list-models' liste les modèles valides, "
"'image [prompt]' pour générer une image, 'vision [image URL]' pour décrire une image. 'url [ URL]' pour décrire une page web.")
connection.privmsg(channel, help_message)
def update_context(self, channel, user, message):
context_entry = next((entry for entry in self.user_contexts if entry[0] == channel and entry[1] == user), None)
if context_entry:
context = context_entry[2]
context.append(message + ".")
if len(context) > self.max_num_line:
context = context[-self.max_num_line:]
context_entry[2] = context
else:
self.user_contexts.append([channel, user, [message + "."]])
def generate_response(self, channel, user, message):
context_entry = next((entry for entry in self.user_contexts if entry[0] == channel and entry[1] == user), None)
if context_entry:
context = "\n".join(context_entry[2][:-1])
last_message = context_entry[2][-1]
prompt_text = f"Contexte:\n{context}\n\nRépond seulement à la dernière ligne en tenant compte du contexte précédent.\nDernière ligne: {last_message}"
response = openai.ChatCompletion.create(
model=self.model,
messages=[{"role": "user", "content": prompt_text}]
)
generated_text = response.choices[0].message.content.strip()
#generated_text = f"{self.tag} {response.choices[0].message.content.strip()}" if self.tag else response.choices[0].message.content.strip()
# Convertir le texte LaTeX en texte lisible
readable_text = LatexNodes2Text().latex_to_text(generated_text)
return readable_text
return ""
def generate_response_grok (self, channel, user, message):
api_key = self.grok_api_key
context_entry = next((entry for entry in self.user_contexts if entry[0] == channel and entry[1] == user), None)
headers = {'Authorization': f'Bearer {api_key}'}
if context_entry:
context = "\n".join(context_entry[2][:-1])
last_message = context_entry[2][-1]
prompt_text = f"Contexte:\n{context}\n\nRépond seulement à la dernière ligne en tenant compte du contexte précédent.\nDernière ligne: {last_message}"
data = {
"model": "grok-beta", # Remplacez par le modèle correct si nécessaire.
"messages": [{"role": "user", "content": prompt_text}]
}
response = requests.post('https://api.x.ai/v1/chat/completions', json=data, headers=headers)
#generated_text = response.choices[0].message.content.strip()
data = response.json()
if 'choices' in data and data['choices']:
generated_text = data['choices'][0]['message']['content']
else:
generated_text = "Erreur ou réponse non disponible."
readable_text = LatexNodes2Text().latex_to_text(generated_text)
return readable_text
return ""
def reset_user_context(self, channel, user):
context_entry = next((entry for entry in self.user_contexts if entry[0] == channel and entry[1] == user), None)
if context_entry:
context_entry[2] = ["Bonjour"]
else:
self.user_contexts.append([channel, user, ["Bonjour"]])
def save_user_context(self, channel, user, title):
context_entry = next((entry for entry in self.user_contexts if entry[0] == channel and entry[1] == user), None)
if context_entry:
context = context_entry[2]
filename = os.path.join("conversations", f"{user}.{title}.context.json")
with open(filename, "w") as file:
json.dump(context, file)
self.connection.privmsg(channel, f"Contexte utilisateur de {user} sauvegardé sous le titre {title} dans {filename}.")
else:
self.connection.privmsg(channel, f"Aucun contexte à sauvegarder pour {user}.")
def load_user_context(self, channel, user, title):
filename = os.path.join("conversations", f"{user}.{title}.context.json")
if os.path.exists(filename):
with open(filename, "r") as file:
context = json.load(file)
context_entry = next((entry for entry in self.user_contexts if entry[0] == channel and entry[1] == user), None)
if context_entry:
context_entry[2] = context
else:
self.user_contexts.append([channel, user, context])
self.connection.privmsg(channel, f"Contexte utilisateur de {user} avec le titre {title} chargé depuis {filename}.")
else:
self.connection.privmsg(channel, f"Aucun fichier de contexte trouvé pour {user} avec le titre {title}.")
def list_user_files(self, channel, user):
files = [filename.split(".")[1].replace(".context.json", "") for filename in os.listdir("conversations") if filename.startswith(user) and filename.endswith(".context.json")]
files.sort()
if files:
self.connection.privmsg(channel, f"Fichiers de contexte disponibles pour {user} :")
for file in files:
self.connection.privmsg(channel, f"- {file}")
else:
self.connection.privmsg(channel, f"Aucun fichier de contexte disponible pour {user}.")
def delete_user_context(self, channel, user, title):
filename = os.path.join("conversations", f"{user}.{title}.context.json")
if os.path.exists(filename):
os.remove(filename)
self.connection.privmsg(channel, f"Fichier de contexte '{title}' supprimé pour l'utilisateur {user}.")
else:
self.connection.privmsg(channel, f"Aucun fichier de contexte trouvé pour {user} avec le titre {title}.")
def block_user(self, user):
self.blocked_users.add(user)
def unblock_user(self, user):
self.blocked_users.discard(user)
def change_model(self, channel, user, model):
valid_models = ["gpt-3.5-turbo", "gpt-4", "o1-mini", "o1-preview","o1","o3-mini", "gpt-4o", "gpt-4o-2024-08-06", "gpt-4o-mini", "gpt-3.5-turbo-16k", "gpt-4-16k"]
if model in valid_models:
self.model = model
self.connection.privmsg(channel, f"Modèle changé à {model}.")
else:
self.connection.privmsg(channel, f"Modèle {model} invalide. Modèles valides : {', '.join(valid_models)}.")
def list_models(self, channel):
valid_models = ["gpt-3.5-turbo", "gpt-4", "o1-mini", "o1-preview", "o1","o3-mini" ,"gpt-4o", "gpt-4o-2024-08-06", "gpt-4o-mini", "gpt-3.5-turbo-16k", "gpt-4-16k"]
self.connection.privmsg(channel, f"Modèles valides : {', '.join(valid_models)}.")
def summarize_url(self, connection, channel, url):
try:
# Vérifier si l'URL commence par "http" pour éviter les erreurs
if not url.startswith("http"):
url = "http://" + url
# Récupérer le contenu de l'URL
response = requests.get(url, timeout=10)
response.raise_for_status() # Vérifie si la requête a réussi
content = response.text
# Envoyer le contenu à GPT pour le résumer
prompt = f"Voici le contenu d'une page web : {content[:4000]} \n\nRésume ce contenu en quelques phrases claires et concises."
ai_response = openai.ChatCompletion.create(
model=self.model,
messages=[{"role": "user", "content": prompt}]
)
# Extraire et envoyer le résumé
summary = ai_response['choices'][0]['message']['content'].strip()
self.send_message_in_chunks(connection, channel, f"Résumé de la page : {summary}")
except requests.exceptions.RequestException as e:
connection.privmsg(channel, f"Erreur lors de la récupération de l'URL : {e}")
except openai.error.OpenAIError as e:
connection.privmsg(channel, f"Erreur lors de la génération du résumé : {e}")
except Exception as e:
connection.privmsg(channel, f"Une erreur inattendue est survenue : {e}")
def generate_image_tiny(self, connection, channel, prompt):
try:
response = openai.Image.create(
model = "dall-e-3",
prompt=prompt,
n=1,
size="1024x1024"
)
image_url = response['data'][0]['url']
shortener = pyshorteners.Shortener()
short_url = shortener.tinyurl.short(image_url)
connection.privmsg(channel, short_url)
except openai.error.OpenAIError as e:
connection.privmsg(channel, f"Erreur lors de la génération de l'image: {str(e)}")
except Exception as e:
connection.privmsg(channel, f"Une erreur inattendue est survenue: {str(e)}")
def generate_image_imgbb(self, connection, channel, prompt):
try:
response = openai.Image.create(
model = "dall-e-3",
prompt=prompt,
n=1,
size="1024x1024"
)
image_url = response['data'][0]['url']
# Télécharger l'image
image_data = requests.get(image_url).content
# Enregistrer l'image temporairement
temp_image_path = 'temp_image.png'
with open(temp_image_path, 'wb') as f:
f.write(image_data)
imgbb_response = self.imgbb_client.upload(file=temp_image_path)
os.remove(temp_image_path)
short_url = imgbb_response.url
connection.privmsg(channel, short_url)
except openai.error.OpenAIError as e:
connection.privmsg(channel, f"Erreur lors de la génération de l'image: {str(e)}")
except requests.exceptions.RequestException as e:
connection.privmsg(channel, f"Erreur de téléchargement de l'image: {str(e)}")
except imgbbpy.exceptions.ImgBBError as e:
connection.privmsg(channel, f"Erreur lors du téléchargement sur ImgBB: {str(e)}")
except Exception as e:
connection.privmsg(channel, f"Une erreur inattendue est survenue: {str(e)}")
def generate_image_local(self, connection, channel, prompt):
try:
response = openai.Image.create(
model = "dall-e-3",
prompt=prompt,
n=1,
size="1024x1024"
)
image_url = response['data'][0]['url']
image_response = requests.get(image_url)
with open(self.image_filename, "wb") as image_file:
image_file.write(image_response.content)
connection.privmsg(channel, sef.web_url)
except openai.error.OpenAIError as e:
connection.privmsg(channel, f"Erreur lors de la génération de l'image: {str(e)}")
except requests.exceptions.RequestException as e:
connection.privmsg(channel, f"Erreur de téléchargement de l'image: {str(e)}")
except Exception as e:
connection.privmsg(channel, f"Une erreur inattendue est survenue: {str(e)}")
def generate_image_description(self, connection, channel, image_url):
try:
response = openai.ChatCompletion.create(
model="gpt-4o-mini",
messages=[
{
"role": "user",
"content": [
{"type": "text", "text": "Décrit moi cette image en détails"},
{
"type": "image_url",
"image_url": {"url": image_url},
},
],
}
],
max_tokens=500,
)
description = response.choices[0].message["content"]
self.send_message_in_chunks(connection, channel, f"Description de l'image : {description}")
except openai.error.OpenAIError as e:
connection.privmsg(channel, f"Erreur lors de l'appel à l'API OpenAI: {str(e)}")
except Exception as e:
connection.privmsg(channel, f"Une erreur inattendue est survenue: {str(e)}")
def send_message_in_chunks(self, connection, target, message):
lines = message.split('\n')
for line in lines:
if len(line.encode('utf-8')) <= 392:
connection.privmsg(target, line.strip())
else:
while line:
if len(line.encode('utf-8')) > 392:
last_space_index = line[:392].rfind(' ')
if last_space_index == -1:
connection.privmsg(target, line[:392].strip())
line = line[392:]
else:
connection.privmsg(target, line[:last_space_index].strip())
line = line[last_space_index:].strip()
else:
connection.privmsg(target, line.strip())
line = ''
time.sleep(0.5)
if __name__ == "__main__":
bot = ChatGPTBot("zozo.json")
bot.start()