Skip to content

Commit d661578

Browse files
authored
Merge pull request #377 from Feige-cn/main
add manus web-ui
2 parents 8b955c7 + 825855d commit d661578

File tree

5 files changed

+1212
-0
lines changed

5 files changed

+1212
-0
lines changed

app.py

+247
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,247 @@
1+
from fastapi import FastAPI, Request, Body, HTTPException
2+
from fastapi.responses import HTMLResponse, JSONResponse, StreamingResponse
3+
from fastapi.staticfiles import StaticFiles
4+
from fastapi.templating import Jinja2Templates
5+
from fastapi.middleware.cors import CORSMiddleware
6+
from pydantic import BaseModel
7+
from datetime import datetime
8+
import asyncio
9+
import uuid
10+
from json import dumps
11+
12+
app = FastAPI()
13+
14+
app.mount("/static", StaticFiles(directory="static"), name="static")
15+
templates = Jinja2Templates(directory="templates")
16+
17+
app.add_middleware(
18+
CORSMiddleware,
19+
allow_origins=["*"],
20+
allow_credentials=True,
21+
allow_methods=["*"],
22+
allow_headers=["*"],
23+
)
24+
25+
class Task(BaseModel):
26+
id: str
27+
prompt: str
28+
created_at: datetime
29+
status: str
30+
steps: list = []
31+
32+
def model_dump(self, *args, **kwargs):
33+
data = super().model_dump(*args, **kwargs)
34+
data['created_at'] = self.created_at.isoformat()
35+
return data
36+
37+
class TaskManager:
38+
def __init__(self):
39+
self.tasks = {}
40+
self.queues = {}
41+
42+
def create_task(self, prompt: str) -> Task:
43+
task_id = str(uuid.uuid4())
44+
task = Task(
45+
id=task_id,
46+
prompt=prompt,
47+
created_at=datetime.now(),
48+
status="pending"
49+
)
50+
self.tasks[task_id] = task
51+
self.queues[task_id] = asyncio.Queue()
52+
return task
53+
54+
async def update_task_step(self, task_id: str, step: int, result: str, step_type: str = "step"):
55+
if task_id in self.tasks:
56+
task = self.tasks[task_id]
57+
task.steps.append({"step": step, "result": result, "type": step_type})
58+
await self.queues[task_id].put({
59+
"type": step_type,
60+
"step": step,
61+
"result": result
62+
})
63+
await self.queues[task_id].put({
64+
"type": "status",
65+
"status": task.status,
66+
"steps": task.steps
67+
})
68+
69+
async def complete_task(self, task_id: str):
70+
if task_id in self.tasks:
71+
task = self.tasks[task_id]
72+
task.status = "completed"
73+
await self.queues[task_id].put({
74+
"type": "status",
75+
"status": task.status,
76+
"steps": task.steps
77+
})
78+
await self.queues[task_id].put({"type": "complete"})
79+
80+
async def fail_task(self, task_id: str, error: str):
81+
if task_id in self.tasks:
82+
self.tasks[task_id].status = f"failed: {error}"
83+
await self.queues[task_id].put({
84+
"type": "error",
85+
"message": error
86+
})
87+
88+
task_manager = TaskManager()
89+
90+
@app.get("/", response_class=HTMLResponse)
91+
async def index(request: Request):
92+
return templates.TemplateResponse("index.html", {"request": request})
93+
94+
@app.post("/tasks")
95+
async def create_task(prompt: str = Body(..., embed=True)):
96+
task = task_manager.create_task(prompt)
97+
asyncio.create_task(run_task(task.id, prompt))
98+
return {"task_id": task.id}
99+
100+
from app.agent.toolcall import ToolCallAgent
101+
102+
async def run_task(task_id: str, prompt: str):
103+
try:
104+
task_manager.tasks[task_id].status = "running"
105+
106+
agent = ToolCallAgent(
107+
name="TaskAgent",
108+
description="Agent for handling task execution",
109+
max_steps=30
110+
)
111+
112+
async def on_think(thought):
113+
await task_manager.update_task_step(task_id, 0, thought, "think")
114+
115+
async def on_tool_execute(tool, input):
116+
await task_manager.update_task_step(task_id, 0, f"执行工具: {tool}\n输入: {input}", "tool")
117+
118+
async def on_action(action):
119+
await task_manager.update_task_step(task_id, 0, f"执行动作: {action}", "act")
120+
121+
async def on_run(step, result):
122+
await task_manager.update_task_step(task_id, step, result, "run")
123+
124+
from app.logger import logger
125+
126+
class SSELogHandler:
127+
def __init__(self, task_id):
128+
self.task_id = task_id
129+
130+
async def __call__(self, message):
131+
import re
132+
# 提取 - 后面的内容
133+
cleaned_message = re.sub(r'^.*? - ', '', message)
134+
135+
event_type = "log"
136+
if "✨ TaskAgent's thoughts:" in cleaned_message:
137+
event_type = "think"
138+
elif "🛠️ TaskAgent selected" in cleaned_message:
139+
event_type = "tool"
140+
elif "🎯 Tool" in cleaned_message:
141+
event_type = "act"
142+
elif "📝 Oops!" in cleaned_message:
143+
event_type = "error"
144+
elif "🏁 Special tool" in cleaned_message:
145+
event_type = "complete"
146+
147+
await task_manager.update_task_step(self.task_id, 0, cleaned_message, event_type)
148+
149+
sse_handler = SSELogHandler(task_id)
150+
logger.add(sse_handler)
151+
152+
result = await agent.run(prompt)
153+
await task_manager.update_task_step(task_id, 1, result, "result")
154+
await task_manager.complete_task(task_id)
155+
except Exception as e:
156+
await task_manager.fail_task(task_id, str(e))
157+
158+
@app.get("/tasks/{task_id}/events")
159+
async def task_events(task_id: str):
160+
async def event_generator():
161+
if task_id not in task_manager.queues:
162+
yield f"event: error\ndata: {dumps({'message': 'Task not found'})}\n\n"
163+
return
164+
165+
queue = task_manager.queues[task_id]
166+
167+
task = task_manager.tasks.get(task_id)
168+
if task:
169+
yield f"event: status\ndata: {dumps({
170+
'type': 'status',
171+
'status': task.status,
172+
'steps': task.steps
173+
})}\n\n"
174+
175+
while True:
176+
try:
177+
event = await queue.get()
178+
formatted_event = dumps(event)
179+
180+
yield ": heartbeat\n\n"
181+
182+
if event["type"] == "complete":
183+
yield f"event: complete\ndata: {formatted_event}\n\n"
184+
break
185+
elif event["type"] == "error":
186+
yield f"event: error\ndata: {formatted_event}\n\n"
187+
break
188+
elif event["type"] == "step":
189+
task = task_manager.tasks.get(task_id)
190+
if task:
191+
yield f"event: status\ndata: {dumps({
192+
'type': 'status',
193+
'status': task.status,
194+
'steps': task.steps
195+
})}\n\n"
196+
yield f"event: {event['type']}\ndata: {formatted_event}\n\n"
197+
elif event["type"] in ["think", "tool", "act", "run"]:
198+
yield f"event: {event['type']}\ndata: {formatted_event}\n\n"
199+
else:
200+
yield f"event: {event['type']}\ndata: {formatted_event}\n\n"
201+
202+
except asyncio.CancelledError:
203+
print(f"Client disconnected for task {task_id}")
204+
break
205+
except Exception as e:
206+
print(f"Error in event stream: {str(e)}")
207+
yield f"event: error\ndata: {dumps({'message': str(e)})}\n\n"
208+
break
209+
210+
return StreamingResponse(
211+
event_generator(),
212+
media_type="text/event-stream",
213+
headers={
214+
"Cache-Control": "no-cache",
215+
"Connection": "keep-alive",
216+
"X-Accel-Buffering": "no"
217+
}
218+
)
219+
220+
@app.get("/tasks")
221+
async def get_tasks():
222+
sorted_tasks = sorted(
223+
task_manager.tasks.values(),
224+
key=lambda task: task.created_at,
225+
reverse=True
226+
)
227+
return JSONResponse(
228+
content=[task.model_dump() for task in sorted_tasks],
229+
headers={"Content-Type": "application/json"}
230+
)
231+
232+
@app.get("/tasks/{task_id}")
233+
async def get_task(task_id: str):
234+
if task_id not in task_manager.tasks:
235+
raise HTTPException(status_code=404, detail="Task not found")
236+
return task_manager.tasks[task_id]
237+
238+
@app.exception_handler(Exception)
239+
async def generic_exception_handler(request: Request, exc: Exception):
240+
return JSONResponse(
241+
status_code=500,
242+
content={"message": f"服务器内部错误: {str(exc)}"}
243+
)
244+
245+
if __name__ == "__main__":
246+
import uvicorn
247+
uvicorn.run(app, host="0.0.0.0", port=8000)

run.bat

+3
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
@echo off
2+
venv\Scripts\python.exe app.py
3+
pause

0 commit comments

Comments
 (0)