mcp_test/app/tools/weather.py
sun feb1a0b280 feat(agent): 新增MCP Agent客户端和工具系统
- 实现了基于LangChain的MCP Agent,支持连接MCP服务器调用工具
- 添加了环境配置文件(.env),包含LLM模型和API配置信息
- 创建了完整的工具系统,包括BaseTool基类和Bash、Terminate、Add等工具
- 集成了天气查询工具,支持通过中国气象局API获取天气预报信息
- 实现了交互式对话功能,支持多轮工具调用和结果处理
- 添加了详细的CLAUDE.md开发指导文档
2026-02-25 18:04:48 +08:00

159 lines
6.9 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import os
import requests
from typing import Dict, Union
from app.tools.base import BaseTool, ToolResult
_WEATHER_DESCRIPTION = """
调用中国气象局天气预报 API获取指定省份和地点的完整当日天气信息含基础预报、实时数据、气象预警
### 参数说明
- user_id: str - 接口调用身份标识,需从 http://www.apihz.cn 注册获取,不可为空
- user_key: str - 接口通讯秘钥,与 user_id 对应,注册后获取,不可为空
- province: str - 查询省份/直辖市,建议去除""""后缀
- place: str - 查询城市/区/县,建议去除""""""后缀
### 返回说明
成功时返回包含基础预报、实时天气和气象预警的完整信息;失败时返回错误码和错误消息。
"""
class GetWeatherByLocation(BaseTool):
"""天气预报查询工具"""
name: str = "get_weather_by_location"
description: str = _WEATHER_DESCRIPTION
parameters: dict = {
"type": "object",
"properties": {
"user_id": {
"type": "string",
"description": "接口调用身份标识,需从 http://www.apihz.cn 注册获取",
},
"user_key": {
"type": "string",
"description": "接口通讯秘钥,与 user_id 对应",
},
"province": {
"type": "string",
"description": "查询省份/直辖市,建议去除''''后缀",
},
"place": {
"type": "string",
"description": "查询城市/区/县,建议去除''''''后缀",
},
},
"required": ["user_id", "user_key", "province", "place"],
}
async def execute(
self, user_id: str, user_key: str, province: str, place: str
) -> ToolResult:
"""执行天气查询"""
api_url = "https://cn.apihz.cn/api/tianqi/tqyb.php"
clean_province = province.replace("", "").replace("", "").strip()
clean_place = place.replace("", "").replace("", "").replace("", "").strip()
final_user_id = os.getenv("APIHZ_ID", user_id)
final_user_key = os.getenv("APIHZ_KEY", user_key)
params = {
"id": final_user_id,
"key": final_user_key,
"sheng": clean_province,
"place": clean_place,
}
try:
# 发送请求
response = requests.get(api_url, params=params, timeout=10)
response.raise_for_status()
# 解析响应
response_text = response.text.strip()
if not response_text:
return self.fail_response("API 响应为空,无法解析天气数据")
weather_data = response.json()
if not isinstance(weather_data, dict):
return self.fail_response(
f"API 响应格式错误,不是有效字典(实际类型:{type(weather_data).__name__}"
)
# 处理 API 返回的错误状态
if weather_data.get("code") == 400:
return self.fail_response(
weather_data.get("msg", "API 返回错误,原因未知")
)
elif weather_data.get("code") != 200:
return self.fail_response(
f"API 返回非成功状态码:{weather_data.get('code', '未知')}"
)
# 提取嵌套数据
now_info = weather_data.get("nowinfo", {})
if not isinstance(now_info, dict):
now_info = {}
alarm_info = weather_data.get("alarm", {})
if not isinstance(alarm_info, dict):
alarm_info = {}
# 构造最终返回结果
result = {
# 基础预报字段
"code": 200,
"guo": weather_data.get("guo", ""),
"sheng": weather_data.get("sheng", ""),
"shi": weather_data.get("shi", ""),
"name": weather_data.get("name", ""),
"weather1": weather_data.get("weather1", ""),
"weather2": weather_data.get("weather2", ""),
"wd1": weather_data.get("wd1", ""),
"wd2": weather_data.get("wd2", ""),
"winddirection1": weather_data.get("winddirection1", ""),
"winddirection2": weather_data.get("winddirection2", ""),
"windleve1": weather_data.get("windleve1", ""),
"windleve2": weather_data.get("windleve2", ""),
"weather1img": weather_data.get("weather1img", ""),
"weather2img": weather_data.get("weather2img", ""),
"lon": weather_data.get("lon", ""),
"lat": weather_data.get("lat", ""),
"uptime": weather_data.get("uptime", ""),
# 实时天气字段
"now_precipitation": float(now_info.get("precipitation", 0.0)),
"now_temperature": float(now_info.get("temperature", 0.0)),
"now_pressure": int(now_info.get("pressure", 0)),
"now_humidity": int(now_info.get("humidity", 0)),
"now_windDirection": now_info.get("windDirection", ""),
"now_windDirectionDegree": int(now_info.get("windDirectionDegree", 0)),
"now_windSpeed": float(now_info.get("windSpeed", 0.0)),
"now_windScale": now_info.get("windScale", ""),
"now_feelst": float(now_info.get("feelst", 0.0)),
"now_uptime": now_info.get("uptime", ""),
# 预警字段
"alarm_id": alarm_info.get("id", ""),
"alarm_title": alarm_info.get("title", ""),
"alarm_signaltype": alarm_info.get("signaltype", ""),
"alarm_signallevel": alarm_info.get("signallevel", ""),
"alarm_effective": alarm_info.get("effective", ""),
"alarm_eventType": alarm_info.get("eventType", ""),
"alarm_severity": alarm_info.get("severity", ""),
"alarm_type": alarm_info.get("type", ""),
}
return self.success_response(result)
except requests.exceptions.HTTPError as e:
status_code = (
response.status_code if "response" in locals() else "未知"
)
return self.fail_response(f"HTTP 请求错误(状态码:{status_code}{str(e)}")
except requests.exceptions.ConnectionError:
return self.fail_response("网络连接错误:无法连接到天气 API 服务器")
except requests.exceptions.Timeout:
return self.fail_response("请求超时API 服务器 10 秒内未响应")
except ValueError as e:
return self.fail_response(f"API 响应解析失败JSON 格式错误):{str(e)}")
except Exception as e:
return self.fail_response(f"未知错误:{str(e)}(错误类型:{type(e).__name__}")