sun feb1a0b280 feat(agent): 新增MCP Agent客户端和工具系统
- 实现了基于LangChain的MCP Agent,支持连接MCP服务器调用工具
- 添加了环境配置文件(.env),包含LLM模型和API配置信息
- 创建了完整的工具系统,包括BaseTool基类和Bash、Terminate、Add等工具
- 集成了天气查询工具,支持通过中国气象局API获取天气预报信息
- 实现了交互式对话功能,支持多轮工具调用和结果处理
- 添加了详细的CLAUDE.md开发指导文档
2026-02-25 18:04:48 +08:00

195 lines
7.0 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import logging
import sys
logging.basicConfig(level=logging.INFO, handlers=[logging.StreamHandler(sys.stderr)])
import argparse
import asyncio
import atexit
import json
from inspect import Parameter, Signature
from typing import Any, Dict, Optional
from mcp.server.fastmcp import FastMCP
from app.utils.logger import logger
from app.tools.base import BaseTool
from app.tools.bash import Bash
# from app.tools.browser_use_tool import BrowserUseTool
# from app.tools.str_replace_editor import StrReplaceEditor
from app.tools.terminate import Terminate
from app.tools.add import Add
from app.tools.weather import GetWeatherByLocation
class MCPServer:
"""具有工具注册和管理功能的 MCP 服务器实现。"""
def __init__(self, name: str = "openmanus"):
self.server = FastMCP(name)
self.tools: Dict[str, BaseTool] = {}
# 初始化标准工具
self.tools["bash"] = Bash()
# self.tools["browser"] = BrowserUseTool()
# self.tools["editor"] = StrReplaceEditor()
self.tools["terminate"] = Terminate()
self.tools["add"] = Add()
self.tools["weather"] = GetWeatherByLocation()
def register_tool(self, tool: BaseTool, method_name: Optional[str] = None) -> None:
"""注册一个工具,包含参数验证和文档。"""
tool_name = method_name or tool.name
tool_param = tool.to_param()
tool_function = tool_param["function"]
# 定义要注册的异步函数
async def tool_method(**kwargs):
logger.info(f"Executing {tool_name}: {kwargs}")
result = await tool.execute(**kwargs)
logger.info(f"Result of {tool_name}: {result}")
# 处理不同类型的结果(匹配原始逻辑)
if hasattr(result, "model_dump"):
return json.dumps(result.model_dump())
elif isinstance(result, dict):
return json.dumps(result)
return result
# 设置方法元数据
tool_method.__name__ = tool_name
tool_method.__doc__ = self._build_docstring(tool_function)
tool_method.__signature__ = self._build_signature(tool_function)
# 存储参数模式(对于以编程方式访问它的工具很重要)
param_props = tool_function.get("parameters", {}).get("properties", {})
required_params = tool_function.get("parameters", {}).get("required", [])
tool_method._parameter_schema = {
param_name: {
"description": param_details.get("description", ""),
"type": param_details.get("type", "any"),
"required": param_name in required_params,
}
for param_name, param_details in param_props.items()
}
# 注册到服务器
self.server.tool()(tool_method)
logger.info(f"Registered tool: {tool_name}")
def _build_docstring(self, tool_function: dict) -> str:
"""从工具函数元数据构建格式化的文档字符串。"""
description = tool_function.get("description", "")
param_props = tool_function.get("parameters", {}).get("properties", {})
required_params = tool_function.get("parameters", {}).get("required", [])
# 构建文档字符串(匹配原始格式)
docstring = description
if param_props:
docstring += "\n\nParameters:\n"
for param_name, param_details in param_props.items():
required_str = (
"(required)" if param_name in required_params else "(optional)"
)
param_type = param_details.get("type", "any")
param_desc = param_details.get("description", "")
docstring += (
f" {param_name} ({param_type}) {required_str}: {param_desc}\n"
)
return docstring
def _build_signature(self, tool_function: dict) -> Signature:
"""从工具函数元数据构建函数签名。"""
param_props = tool_function.get("parameters", {}).get("properties", {})
required_params = tool_function.get("parameters", {}).get("required", [])
parameters = []
# 遵循原始类型映射
for param_name, param_details in param_props.items():
param_type = param_details.get("type", "")
default = Parameter.empty if param_name in required_params else None
# 将 JSON Schema 类型映射到 Python 类型(与原始相同)
annotation = Any
if param_type == "string":
annotation = str
elif param_type == "integer":
annotation = int
elif param_type == "number":
annotation = float
elif param_type == "boolean":
annotation = bool
elif param_type == "object":
annotation = dict
elif param_type == "array":
annotation = list
# 创建与原始结构相同的参数
param = Parameter(
name=param_name,
kind=Parameter.KEYWORD_ONLY,
default=default,
annotation=annotation,
)
parameters.append(param)
return Signature(parameters=parameters)
async def cleanup(self) -> None:
"""清理服务器资源。"""
logger.info("Cleaning up resources")
# 遵循原始清理逻辑 - 仅清理浏览器工具
if "browser" in self.tools and hasattr(self.tools["browser"], "cleanup"):
await self.tools["browser"].cleanup()
def register_all_tools(self) -> None:
"""向服务器注册所有工具。"""
for tool in self.tools.values():
self.register_tool(tool)
def run(self, transport: str = "stdio") -> None:
"""运行 MCP 服务器。"""
# 注册所有工具
self.register_all_tools()
# 注册清理函数(匹配原始行为)
atexit.register(lambda: asyncio.run(self.cleanup()))
# 启动服务器(使用与原始相同的日志记录)
logger.info(f"Starting OpenManus server ({transport} mode)")
self.server.run(transport=transport)
def parse_args() -> argparse.Namespace:
"""解析命令行参数。"""
parser = argparse.ArgumentParser(description="OpenManus MCP Server")
parser.add_argument(
"--transport",
choices=["stdio", "sse"],
default="stdio",
help="通信方法stdio 或 sse (默认stdio)",
)
return parser.parse_args()
if __name__ == "__main__":
args = parse_args()
# 创建服务器
server = MCPServer()
server.register_all_tools()
if args.transport == "sse":
# SSE 模式:直接运行 FastMCP 服务器(默认端口 8000
logger.info(f"Starting MCP server with SSE transport")
server.server.run(transport="sse")
else:
# stdio 模式
atexit.register(lambda: asyncio.run(server.cleanup()))
logger.info(f"Starting MCP server with stdio transport")
server.server.run(transport="stdio")