-
Couldn't load subscription status.
- Fork 858
Description
MCP server does not support typewriter style streaming return codes
I want to encapsulate the streaming return of a model, but the MCP tool directly returns all at once, not word by word
server code:
import requests
import json
from fastapi import FastAPI, HTTPException
from fastapi.responses import StreamingResponse
from pydantic import BaseModel
from typing import Optional
import asyncio
import aiohttp
import io
from fastapi_mcp import FastApiMCP
# 创建FastAPI应用
app = FastAPI(title="AI Chat API", description="AI聊天接口,支持流式和非流式返回", version="1.0.0")
# 创建 MCP 服务器
mcp = FastApiMCP(
app,
name="My API", # MCP 名称
describe_all_responses=True, # 展示所有响应模型
describe_full_response_schema=True # 展示完整 JSON 模式
)
# 挂载 MCP
mcp.mount_sse()
# 请求模型
class ChatRequest(BaseModel):
msg: str
sessionId: Optional[str] = ""
stream: str = "false" # "true" 或 "false"
channel: str = "test__1111"
# 响应模型
class ChatResponse(BaseModel):
success: bool
data: str
message: str
# 原始API配置
API_URL = "xxxxxxxxxxxxxxxxxxxxxxxxxxxxx"
HEADERS = {
'User-Agent': 'Apifox/1.0.0 (https://apifox.com)',
'Content-Type': 'application/json'
}
@app.post("/chat", response_model=ChatResponse, operation_id='chat')
async def chat(request: ChatRequest):
"""
AI聊天接口
- **msg**: 聊天消息内容
- **sessionId**: 会话ID(可选)
- **stream**: 是否流式返回,"true"为流式,"false"为非流式
- **channel**: 渠道标识
"""
try:
# 构建请求数据
payload = {
"msg": request.msg,
"sessionId": request.sessionId,
"stream": request.stream,
"channel": request.channel
}
if request.stream == "true":
# 流式返回
return StreamingResponse(
stream_chat_response(payload),
media_type="text/event-stream"
)
else:
# 非流式返回
async with aiohttp.ClientSession() as session:
async with session.post(
API_URL,
headers=HEADERS,
json=payload
) as response:
if response.status == 200:
data = await response.text()
return ChatResponse(
success=True,
data=data,
message="请求成功"
)
else:
raise HTTPException(
status_code=response.status,
detail=f"API请求失败: {response.status}"
)
except Exception as e:
raise HTTPException(status_code=500, detail=f"服务器内部错误: {str(e)}")
async def stream_chat_response(payload: dict):
"""
流式返回聊天响应
"""
try:
async with aiohttp.ClientSession() as session:
async with session.post(
API_URL,
headers=HEADERS,
json=payload
) as response:
if response.status == 200:
async for chunk in response.content.iter_chunked(1024):
if chunk:
yield chunk.decode('utf-8', errors='ignore')
else:
yield f"错误: API请求失败,状态码: {response.status}"
except Exception as e:
yield f"错误: {str(e)}"
# 刷新 MCP 服务器
mcp.setup_server()
def print_routes():
print("=== Registered routes ===")
for route in app.routes:
try:
methods = getattr(route, "methods", None)
path = getattr(route, "path", None)
name = getattr(route, "name", None)
print(f"{methods} {path} (name={name})")
except Exception:
pass
print("=========================")
print_routes()
if __name__ == "__main__":
import uvicorn
uvicorn.run("main:app", host="0.0.0.0", port=8000, reload=True)
client code:
mcp_client.py
import asyncio
import json
from mcp import ClientSession
from mcp.client.sse import sse_client
SSE_URL = "http://127.0.0.1:8000/sse" # 与服务端路由一致
async def main():
# 1) 建立 SSE 传输的连接(sse_client 返回一对 streams:read, write)
print("连接到 MCP SSE 服务器:", SSE_URL)
async with sse_client(url=SSE_URL) as streams:
# streams 通常是 (read_stream, write_stream) 或能被 ClientSession 解包的流对
async with ClientSession(*streams) as session:
# 2) 初始化 MCP 会话(发送 initialize 等)
await session.initialize()
print("初始化完成")
# 3) 列出可用工具
tools = await session.list_tools()
print("发现工具(示例前三项):")
try:
# tools 的格式可能是一个列表或包含 tools 属性,按情况打印
if hasattr(tools, "tools"):
tools_list = tools.tools
else:
tools_list = tools
for t in tools_list[:10]:
# 每个 t 的具体结构可能不同,尝试提取 name / id / description
name = t.get("name") if isinstance(t, dict) else getattr(t, "name", str(t))
desc = t.get("description") if isinstance(t, dict) else getattr(t, "description", "")
print(f" - {name}: {desc}")
except Exception:
print("tools raw:", tools)
# 4) 调用 chat 工具
params = {
"msg": "北京到南京的火车票",
"sessionId": "",
"stream": "true",
"channel": "test__1111"
}
print("调用 chat 工具,params =", params)
# call_tool 接口:传入工具名(或 operation_id)和参数 dict
result = await session.call_tool("chat", params)
# print("工具返回(raw):", result)
# 5) 尝试从返回对象中提取常见字段做友好打印
try:
# SDK 不同版本返回结构差异,这里尽可能地去兼容几种常见结构
if hasattr(result, "content"):
# 常见:result.content 是一个列表,各项可能有 text / structured content
print("result.content:")
for item in result.content:
# item 可能是对象、字典或简单字符串
if hasattr(item, "text"):
print(" -", item.text)
elif isinstance(item, dict) and "text" in item:
print(" -", item["text"])
else:
print(" -", item)
elif isinstance(result, dict):
print(json.dumps(result, ensure_ascii=False, indent=2))
else:
print("未识别结构,raw:", result)
except Exception as e:
print("解析返回时出错:", e)
if name == "main":
asyncio.run(main())