1
1
import asyncio
2
- import threading
3
2
import uuid
4
- import webbrowser
5
3
from datetime import datetime
6
4
from json import dumps
7
5
12
10
from fastapi .templating import Jinja2Templates
13
11
from pydantic import BaseModel
14
12
15
-
16
13
app = FastAPI ()
17
14
18
15
app .mount ("/static" , StaticFiles (directory = "static" ), name = "static" )
26
23
allow_headers = ["*" ],
27
24
)
28
25
29
-
30
26
class Task (BaseModel ):
31
27
id : str
32
28
prompt : str
@@ -36,10 +32,9 @@ class Task(BaseModel):
36
32
37
33
def model_dump (self , * args , ** kwargs ):
38
34
data = super ().model_dump (* args , ** kwargs )
39
- data [" created_at" ] = self .created_at .isoformat ()
35
+ data [' created_at' ] = self .created_at .isoformat ()
40
36
return data
41
37
42
-
43
38
class TaskManager :
44
39
def __init__ (self ):
45
40
self .tasks = {}
@@ -48,55 +43,61 @@ def __init__(self):
48
43
def create_task (self , prompt : str ) -> Task :
49
44
task_id = str (uuid .uuid4 ())
50
45
task = Task (
51
- id = task_id , prompt = prompt , created_at = datetime .now (), status = "pending"
46
+ id = task_id ,
47
+ prompt = prompt ,
48
+ created_at = datetime .now (),
49
+ status = "pending"
52
50
)
53
51
self .tasks [task_id ] = task
54
52
self .queues [task_id ] = asyncio .Queue ()
55
53
return task
56
54
57
- async def update_task_step (
58
- self , task_id : str , step : int , result : str , step_type : str = "step"
59
- ):
55
+ async def update_task_step (self , task_id : str , step : int , result : str , step_type : str = "step" ):
60
56
if task_id in self .tasks :
61
57
task = self .tasks [task_id ]
62
58
task .steps .append ({"step" : step , "result" : result , "type" : step_type })
63
- await self .queues [task_id ].put (
64
- {"type" : step_type , "step" : step , "result" : result }
65
- )
66
- await self .queues [task_id ].put (
67
- {"type" : "status" , "status" : task .status , "steps" : task .steps }
68
- )
59
+ await self .queues [task_id ].put ({
60
+ "type" : step_type ,
61
+ "step" : step ,
62
+ "result" : result
63
+ })
64
+ await self .queues [task_id ].put ({
65
+ "type" : "status" ,
66
+ "status" : task .status ,
67
+ "steps" : task .steps
68
+ })
69
69
70
70
async def complete_task (self , task_id : str ):
71
71
if task_id in self .tasks :
72
72
task = self .tasks [task_id ]
73
73
task .status = "completed"
74
- await self .queues [task_id ].put (
75
- {"type" : "status" , "status" : task .status , "steps" : task .steps }
76
- )
74
+ await self .queues [task_id ].put ({
75
+ "type" : "status" ,
76
+ "status" : task .status ,
77
+ "steps" : task .steps
78
+ })
77
79
await self .queues [task_id ].put ({"type" : "complete" })
78
80
79
81
async def fail_task (self , task_id : str , error : str ):
80
82
if task_id in self .tasks :
81
83
self .tasks [task_id ].status = f"failed: { error } "
82
- await self .queues [task_id ].put ({"type" : "error" , "message" : error })
83
-
84
+ await self .queues [task_id ].put ({
85
+ "type" : "error" ,
86
+ "message" : error
87
+ })
84
88
85
89
task_manager = TaskManager ()
86
90
87
-
88
91
@app .get ("/" , response_class = HTMLResponse )
89
92
async def index (request : Request ):
90
93
return templates .TemplateResponse ("index.html" , {"request" : request })
91
94
92
-
93
95
@app .post ("/tasks" )
94
96
async def create_task (prompt : str = Body (..., embed = True )):
95
97
task = task_manager .create_task (prompt )
96
98
asyncio .create_task (run_task (task .id , prompt ))
97
99
return {"task_id" : task .id }
98
100
99
-
100
101
from app .agent .manus import Manus
101
102
102
103
@@ -107,21 +108,17 @@ async def run_task(task_id: str, prompt: str):
107
108
agent = Manus (
108
109
name = "Manus" ,
109
110
description = "A versatile agent that can solve various tasks using multiple tools" ,
110
- max_steps = 30 ,
111
+ max_steps = 30
111
112
)
112
113
113
114
async def on_think (thought ):
114
115
await task_manager .update_task_step (task_id , 0 , thought , "think" )
115
116
116
117
async def on_tool_execute (tool , input ):
117
- await task_manager .update_task_step (
118
- task_id , 0 , f"Executing tool: { tool } \n Input: { input } " , "tool"
119
- )
118
+ await task_manager .update_task_step (task_id , 0 , f"Executing tool: { tool } \n Input: { input } " , "tool" )
120
119
121
120
async def on_action (action ):
122
- await task_manager .update_task_step (
123
- task_id , 0 , f"Executing action: { action } " , "act"
124
- )
121
+ await task_manager .update_task_step (task_id , 0 , f"Executing action: { action } " , "act" )
125
122
126
123
async def on_run (step , result ):
127
124
await task_manager .update_task_step (task_id , step , result , "run" )
@@ -136,7 +133,7 @@ async def __call__(self, message):
136
133
import re
137
134
138
135
# 提取 - 后面的内容
139
- cleaned_message = re .sub (r" ^.*? - " , "" , message )
136
+ cleaned_message = re .sub (r' ^.*? - ' , '' , message )
140
137
141
138
event_type = "log"
142
139
if "✨ Manus's thoughts:" in cleaned_message :
@@ -150,9 +147,7 @@ async def __call__(self, message):
150
147
elif "🏁 Special tool" in cleaned_message :
151
148
event_type = "complete"
152
149
153
- await task_manager .update_task_step (
154
- self .task_id , 0 , cleaned_message , event_type
155
- )
150
+ await task_manager .update_task_step (self .task_id , 0 , cleaned_message , event_type )
156
151
157
152
sse_handler = SSELogHandler (task_id )
158
153
logger .add (sse_handler )
@@ -163,7 +158,6 @@ async def __call__(self, message):
163
158
except Exception as e :
164
159
await task_manager .fail_task (task_id , str (e ))
165
160
166
-
167
161
@app .get ("/tasks/{task_id}/events" )
168
162
async def task_events (task_id : str ):
169
163
async def event_generator ():
@@ -175,9 +169,11 @@ async def event_generator():
175
169
176
170
task = task_manager .tasks .get (task_id )
177
171
if task :
178
- message = {"type" : "status" , "status" : task .status , "steps" : task .steps }
179
- json_message = dumps (message )
180
- yield f"event: status\n data: { json_message } \n \n "
172
+ yield f"event: status\n data: { dumps ({
173
+ 'type' : 'status' ,
174
+ 'status' : task .status ,
175
+ 'steps' : task .steps
176
+ })} \n \n "
181
177
182
178
while True :
183
179
try :
@@ -195,13 +191,11 @@ async def event_generator():
195
191
elif event ["type" ] == "step" :
196
192
task = task_manager .tasks .get (task_id )
197
193
if task :
198
- message = {
199
- "type" : "status" ,
200
- "status" : task .status ,
201
- "steps" : task .steps ,
202
- }
203
- json_message = dumps (message )
204
- yield f"event: status\n data: { json_message } \n \n "
194
+ yield f"event: status\n data: { dumps ({
195
+ 'type' : 'status' ,
196
+ 'status' : task .status ,
197
+ 'steps' : task .steps
198
+ })} \n \n "
205
199
yield f"event: { event ['type' ]} \n data: { formatted_event } \n \n "
206
200
elif event ["type" ] in ["think" , "tool" , "act" , "run" ]:
207
201
yield f"event: { event ['type' ]} \n data: { formatted_event } \n \n "
@@ -222,42 +216,35 @@ async def event_generator():
222
216
headers = {
223
217
"Cache-Control" : "no-cache" ,
224
218
"Connection" : "keep-alive" ,
225
- "X-Accel-Buffering" : "no" ,
226
- },
219
+ "X-Accel-Buffering" : "no"
220
+ }
227
221
)
228
222
229
-
230
223
@app .get ("/tasks" )
231
224
async def get_tasks ():
232
225
sorted_tasks = sorted (
233
- task_manager .tasks .values (), key = lambda task : task .created_at , reverse = True
226
+ task_manager .tasks .values (),
227
+ key = lambda task : task .created_at ,
228
+ reverse = True
234
229
)
235
230
return JSONResponse (
236
231
content = [task .model_dump () for task in sorted_tasks ],
237
- headers = {"Content-Type" : "application/json" },
232
+ headers = {"Content-Type" : "application/json" }
238
233
)
239
234
240
-
241
235
@app .get ("/tasks/{task_id}" )
242
236
async def get_task (task_id : str ):
243
237
if task_id not in task_manager .tasks :
244
238
raise HTTPException (status_code = 404 , detail = "Task not found" )
245
239
return task_manager .tasks [task_id ]
246
240
247
-
248
241
@app .exception_handler (Exception )
249
242
async def generic_exception_handler (request : Request , exc : Exception ):
250
243
return JSONResponse (
251
- status_code = 500 , content = {"message" : f"Server error: { str (exc )} " }
244
+ status_code = 500 ,
245
+ content = {"message" : f"Server error: { str (exc )} " }
252
246
)
253
247
254
-
255
- def open_local_browser ():
256
- webbrowser .open_new_tab ("http://localhost:5172" )
257
-
258
-
259
248
if __name__ == "__main__" :
260
- threading .Timer (3 , open_local_browser ).start ()
261
249
import uvicorn
262
-
263
250
uvicorn .run (app , host = "localhost" , port = 5172 )
0 commit comments