微信扫码
添加专属顾问
我要投稿
深入探索MCP Server|Client框架,实现工具服务与智能Agent的无缝集成。核心内容:1. 利用MCP和LangChain框架集成工具服务2. 创建天气查询和货币转换的MCP工具服务3. 构建Agent客户端,实现对MCP服务的智能调用
❝喜欢阅读,写作和摄影的「coder」
❞
上一篇介绍了基本的MPC Demo,今天,我们将展示如何使用「MCP」和「LangChain」框架,轻松地将多个独立运行的工具服务(通过 MCP 暴露)集成到一个 Agent 中,让 LLM 能够根据用户需求智能地调用这些工具。
「本文目标:」
ChatOpenAI
的 Agent 客户端。「准备工作:」
uv add mcp langchain-openai python-dotenv
ChatOpenAI
(通过智谱 AI 的 API 地址),你需要一个相应的 API Key。将你的 Key 存放在项目根目录下的 .env
文件中:OPENAI_API_KEY="YOUR_API_KEY_HERE"
(你也可以替换成其他 LangChain 支持的模型,例如本地运行的 Ollama)
MCP 工具本质上是可以通过网络调用的函数。我们将创建两个独立的 Python 脚本来运行我们的工具服务。
1.天气查询服务server_weather.py
这个服务提供一个 get_weather
工具,根据城市名称返回模拟的天气信息。
# server_weather.py
from mcp.server.fastmcp import FastMCP
# 初始化 FastMCP server,监听在 9990 端口
mcp = FastMCP("weather_service", host="localhost", port=9990) # 服务名称和地址
@mcp.tool()
asyncdef get_weather(city: str) -> str:
"""Get weather information for a city.
Args:
city: Name of the city
"""
# 注意:这里只是一个示例,返回固定信息
returnf"{city} is sunny, enjoy it!"
if __name__ == "__main__":
print("Weather service running on http://localhost:9990/sse")
# 使用 SSE (Server-Sent Events) 传输协议运行服务
mcp.run(transport='sse')
FastMCP(...)
: 初始化一个 MCP 服务实例,指定服务名称、主机和端口。@mcp.tool()
: 装饰器,将 get_weather
函数注册为一个 MCP 工具。函数签名和 docstring 会被用来生成工具的描述和输入模式 (schema),供 LLM 理解。mcp.run(transport='sse')
: 启动服务,使用 SSE 协议进行通信。2. 货币转换服务server_currency.py
这个服务提供一个 convert
工具,用于转换货币。
# server_currency.py
from mcp.server.fastmcp import FastMCP
# 初始化 FastMCP server,监听在 9991 端口
mcp = FastMCP("currency_service", host="localhost", port=9991) # 注意端口不同
@mcp.tool()
def convert(amount: float, currency_from: str, currency_to: str) -> float:
"""use latest exchange rate to convert currency
Args:
amount: the amount of currency to convert
currency_from: the currency to convert from
currency_to: the currency to convert to
"""
# 注意:这里只是一个示例,使用固定汇率
# 实际应用中,这里应该调用真实的汇率API
if currency_from.upper() == 'USD'and currency_to.upper() == 'CNY':
# 模拟一个简单的汇率,实际应查询API
exchange_rate = 7.2
elif currency_from.upper() == 'EUR'and currency_to.upper() == 'USD':
exchange_rate = 1.1
else:
# 默认或未知的汇率
exchange_rate = 0.8# 使用代码中原有的 0.8 作为默认值
return amount * exchange_rate
if __name__ == "__main__":
print("Currency service running on http://localhost:9991/sse")
# 使用 SSE 传输协议运行服务
mcp.run(transport='sse')
convert
函数使用了 固定 的模拟汇率。在实际应用中,你应该在这里调用真实的汇率 API 获取最新汇率。「运行服务:」
打开两个终端窗口,分别运行这两个脚本:
# 终端 1
uv run server_weather.py
# 输出: Weather service running on http://localhost:9990/sse
# 终端 2
uv run server_currency.py
# 输出: Currency service running on http://localhost:9991/sse
现在,你的两个 MCP 工具服务已经在后台运行了。
agent_client.py
接下来,我们编写客户端代码,该客户端将连接到这两个 MCP 服务,并使用 LangChain 来决定何时以及如何调用这些工具。
# agent_client.py
import asyncio
from contextlib import AsyncExitStack, asynccontextmanager
from dataclasses import dataclass
from typing import Optional, List, Dict, Any
from dotenv import load_dotenv
from langchain_core.language_models import BaseChatModel
from langchain_core.messages import HumanMessage, ToolMessage
from langchain_openai import ChatOpenAI
from mcp import ClientSession
from mcp.client.sse import sse_client
from mcp.types import TextContent
# 加载 .env 文件中的 OPENAI_API_KEY
load_dotenv()
class MCPServerHTTP:
"""管理与单个 MCP SSE 服务器的连接和交互"""
def __init__(self, url: str):
self.server_url = url
self.session: Optional[ClientSession] = None
self.exit_stack = AsyncExitStack()
self.tools = None# 存储从该服务器获取的工具列表 (LangChain 格式)
asyncdef __aenter__(self):
# 进入上下文时连接
await self.connect()
return self
asyncdef __aexit__(self, exc_type, exc_val, exc_tb):
# 退出上下文时清理连接
await self.cleanup()
asyncdef connect(self):
"""连接到SSE服务器并初始化会话,获取工具列表"""
sc = sse_client(url=self.server_url)
sse_transport = await self.exit_stack.enter_async_context(sc)
sse, write = sse_transport
cs = ClientSession(sse, write)
self.session = await self.exit_stack.enter_async_context(cs)
await self.session.initialize()
# 获取并转换工具列表为 LangChain 格式
self.tools = await self.list_tools()
print(f"Fetched tools from {self.server_url}: {[tool['function']['name'] for tool in self.tools]}")
return self
asyncdef list_tools(self) -> List[Dict[str, Any]]:
"""获取 MCP 工具并转换为 LangChain bind_tools 所需的字典列表格式"""
ifnot self.session:
raise RuntimeError("Not connected to server")
response = await self.session.list_tools()
converted_tools = []
for tool in response.tools:
tool_dict = {
"type": "function",
"function": {
"name": tool.name,
"description": tool.description,
"parameters": tool.inputSchema # MCP 的输入模式可以直接用
}
}
converted_tools.append(tool_dict)
return converted_tools
asyncdef tool_call(self, tool_name: str, args: dict) -> Any:# 返回原始结果或文本
"""调用 MCP 服务器上的特定工具"""
ifnot self.session:
raise RuntimeError("Not connected to server")
print(f"Calling tool '{tool_name}' on {self.server_url} with args: {args}")
response = await self.session.call_tool(tool_name, args)
print(f"Received response from tool '{tool_name}': {response.content}")
# 提取主要内容,这里简化处理,优先取 text
if response.content:
first_content = response.content[0]
if isinstance(first_content, TextContent):
return first_content.text
# 可以根据需要处理 ImageContent, EmbeddedResource 等
return str(first_content) # Fallback to string representation
returnNone# Or raise an error if no content
asyncdef cleanup(self):
"""关闭连接和清理资源"""
print(f"Cleaning up connection to {self.server_url}...")
await self.exit_stack.aclose()
print(f"Connection to {self.server_url} closed.")
@dataclass
class AgentResult:
"""封装 Agent 的运行结果"""
data: str
tool_used: Optional[str] = None
class Agent:
"""协调 LLM 和多个 MCP 服务器的 Agent"""
def __init__(self, model: BaseChatModel, mcp_servers: List[MCPServerHTTP] = None):
self.mcp_servers = mcp_servers or []
self.model = model # 基础 LLM
self.tool_model = None# 绑定了工具的 LLM
self.server_tool_map: Dict[str, MCPServerHTTP] = {} # 工具名 -> 服务器实例映射
@asynccontextmanager
asyncdef run_mcp_servers(self):
"""上下文管理器,用于连接所有 MCP 服务器并准备工具"""
exit_stack = AsyncExitStack()
try:
print("Starting MCP server connections...")
# 异步连接所有服务器
for server in self.mcp_servers:
await exit_stack.enter_async_context(server)
print("All MCP servers connected.")
# 收集所有服务器的工具并创建映射
all_tools = []
self.server_tool_map = {}
for server in self.mcp_servers:
if server.tools:
all_tools.extend(server.tools)
for tool in server.tools:
self.server_tool_map[tool['function']['name']] = server
# 如果有工具,将它们绑定到模型
if all_tools:
print(f"Binding tools to LLM: {[tool['function']['name'] for tool in all_tools]}")
self.tool_model = self.model.bind_tools(all_tools, tool_choice="auto")
else:
print("No tools found to bind.")
self.tool_model = self.model # 使用原始模型
yield self # 进入运行时
finally:
print("Closing MCP server connections...")
# 退出时自动清理所有连接
await exit_stack.aclose()
self.tool_model = None# 重置 tool_model
self.server_tool_map = {}
print("All MCP server connections closed.")
asyncdef run(self, query: str) -> AgentResult:
"""
运行 Agent 处理用户查询。
包含 LLM 调用、工具选择、工具执行和(可选的)最终结果生成。
"""
ifnot self.tool_model:
# 如果没有工具或未在上下文中运行,直接查询基础模型
print("No tool model available, querying base model directly.")
response = await self.model.ainvoke([HumanMessage(content=query)])
return AgentResult(data=response.content)
print(f"\nProcessing query: '{query}'")
# 第一次调用 LLM,让其决定是否需要调用工具
message = HumanMessage(content=query)
ai_response = await self.tool_model.ainvoke([message])
tool_calls = ai_response.tool_calls
ifnot tool_calls:
# LLM 认为不需要调用工具,直接返回其回答
print("LLM decided no tool call is needed.")
return AgentResult(data=ai_response.content)
# LLM 决定调用一个或多个工具
tool_messages = [] # 存储工具调用的结果消息
tool_results_data = [] # 存储工具调用的原始结果文本
for tool_call in tool_calls:
tool_name = tool_call["name"]
tool_args = tool_call["args"]
tool_call_id = tool_call["id"] # Langchain 需要 ID 来匹配结果
# 查找能执行此工具的服务器
server = self.server_tool_map.get(tool_name)
if server:
try:
# 调用 MCP 服务器上的工具
tool_result = await server.tool_call(tool_name, tool_args)
result_str = str(tool_result) # 确保是字符串
tool_results_data.append(result_str)
# 创建 ToolMessage,包含工具调用的结果,供后续 LLM 使用
tool_messages.append(ToolMessage(content=result_str, tool_call_id=tool_call_id))
print(f"Tool '{tool_name}' executed successfully. Result: {result_str}")
except Exception as e:
print(f"Error calling tool '{tool_name}': {e}")
# 发生错误时,也创建一个 ToolMessage 告知 LLM
tool_messages.append(ToolMessage(content=f"Error executing tool: {e}", tool_call_id=tool_call_id))
tool_results_data.append(f"Error calling {tool_name}")
else:
print(f"Error: Tool '{tool_name}' not found on any connected server.")
tool_messages.append(ToolMessage(content=f"Error: Tool '{tool_name}' not found.", tool_call_id=tool_call_id))
tool_results_data.append(f"Error: {tool_name} not found")
# --- 重要改进:将工具结果反馈给 LLM 进行最终回复 ---
# 将原始用户消息、LLM 的工具调用决定以及工具执行结果一起发送给 LLM
final_response = await self.tool_model.ainvoke([
message, # 原始用户查询
ai_response, # LLM 的首次回复(包含工具调用请求)
*tool_messages # 所有工具的执行结果
])
print(f"LLM generated final response after tool calls: {final_response.content}")
# 返回 LLM 基于工具结果生成的最终答案
return AgentResult(data=final_response.content, tool_used=", ".join([tc["name"] for tc in tool_calls]))
asyncdef main():
# 1. 配置 MCP 服务器连接信息
weather_server = MCPServerHTTP(url='http://localhost:9990/sse')
currency_server = MCPServerHTTP(url='http://localhost:9991/sse')
# 2. 初始化 LangChain LLM 模型
# 使用 ZhipuAI GLM-4-Flash 作为示例
llm = ChatOpenAI(
base_url='https://open.bigmodel.cn/api/paas/v4/', # ZhipuAI API 地址
model='glm-4-flash', # 模型名称
temperature=0.0, # 温度设低一点,保证工具调用更稳定
)
# 3. 创建 Agent 实例
agent = Agent(model=llm, mcp_servers=[weather_server, currency_server])
# 4. 使用 async context manager 运行 Agent
asyncwith agent.run_mcp_servers():
# 在这里,Agent 会自动连接服务器、获取工具并绑定到 LLM
# 示例查询 1: 天气
print("--- Query 1: Weather ---")
result1 = await agent.run('BeiJing天气怎么样?') # 使用中文查询
print(f"Agent Result 1:\nData: {result1.data}\nTool Used: {result1.tool_used}\n")
# 示例查询 2: 货币转换
print("--- Query 2: Currency Conversion ---")
result2 = await agent.run('我想知道5美元等于多少人民币?') # 使用中文查询
print(f"Agent Result 2:\nData: {result2.data}\nTool Used: {result2.tool_used}\n")
if __name__ == "__main__":
# 运行主异步函数
asyncio.run(main())
「代码解释:」
MCPServerHTTP
Class: 封装了与 单个 MCP 服务器通过 SSE 进行通信的逻辑。
__aenter__
/ __aexit__
: 使用 asynccontextmanager
的标准模式,在进入时连接 (connect
),退出时清理 (cleanup
)。connect
: 使用 mcp.client.sse.sse_client
建立 SSE 连接,初始化 ClientSession
,并调用 list_tools
获取工具信息。list_tools
: 调用 MCP 的 session.list_tools()
,然后将返回的 MCP Tool
对象列表转换成 LangChain bind_tools
方法期望的字典列表格式。tool_call
: 调用 MCP 的 session.call_tool()
来执行指定的工具,并简化处理返回结果(提取文本内容)。cleanup
: 使用 AsyncExitStack
来确保所有异步资源(如 SSE 连接、会话)被正确关闭。「Agent
Class:」 负责管理 多个 MCPServerHTTP
实例,并与 LangChain 模型交互。
MCPServerHTTP
实例(使用 server_tool_map
)。tool_call
方法执行工具。ToolMessage
。__init__
: 接收 LangChain 模型和 MCPServerHTTP
实例列表。run_mcp_servers
: 这是一个异步上下文管理器。进入时,它会并行连接所有配置的 MCP 服务器,收集所有可用的工具,创建一个从工具名到对应服务器实例的映射server_tool_map
,然后使用model.bind_tools()
创建一个新的模型self.tool_model
,这个模型知道如何以及何时请求调用这些工具。退出时,它会负责断开所有连接。run
: 这是 Agent 的核心处理逻辑。query
) 发送给绑定了工具的 tool_model
。ai_response
) 是否包含 tool_calls
。tool_calls
,说明 LLM 认为不需要工具,直接返回 LLM 的回复内容。tool_calls
,遍历每个调用请求:ToolMessage
再次 发送给 tool_model
。这样LLM 基于工具的实际执行结果来生成最终的、更连贯、更准确的回答。「main
Function:」
MCPServerHTTP
实例,指向我们之前运行的两个服务地址。ChatOpenAI
模型(配置了智谱 AI 的接入点)。Agent
实例。async with agent.run_mcp_servers():
来确保在执行查询前,所有服务器都已连接且工具已绑定。agent.run()
发起几个不同的查询,演示 Agent 如何根据问题选择调用天气工具、货币转换工具或不调用任何工具。确保你的两个MCP服务server_weather.py
和server_currency.py
在运行。然后,在另一个终端中运行 Agent 客户端脚本:
uv run agent_client.py
「看到以下输出:」
Starting MCP server connections...
Fetched tools from http://localhost:9990/sse: ['get_weather']
Fetched tools from http://localhost:9991/sse: ['convert']
All MCP servers connected.
Binding tools to LLM: ['get_weather', 'convert']
--- Query 1: Weather ---
Processing query: 'BeiJing天气怎么样?'
Calling tool 'get_weather' on http://localhost:9990/sse with args: {'city': '北京'}
Received response from tool 'get_weather': [TextContent(type='text', text='北京 is sunny, enjoy it!', annotations=None)]
Tool 'get_weather' executed successfully. Result: 北京 is sunny, enjoy it!
LLM generated final response after tool calls: 北京现在是晴天,您可以尽情享受!
Agent Result 1:
Data: 北京现在是晴天,您可以尽情享受!
Tool Used: get_weather
--- Query 2: Currency Conversion ---
Processing query: '我想知道5美元等于多少人民币?'
Calling tool 'convert' on http://localhost:9991/sse with args: {'amount': 5, 'currency_from': 'USD', 'currency_to': 'CNY'}
Received response from tool 'convert': [TextContent(type='text', text='36.0', annotations=None)]
Tool 'convert' executed successfully. Result: 36.0
LLM generated final response after tool calls: 根据最新的汇率,5美元等于36.0人民币。
Agent Result 2:
Data: 根据最新的汇率,5美元等于36.0人民币。
Tool Used: convert
Closing MCP server connections...
Cleaning up connection to http://localhost:9991/sse...
Connection to http://localhost:9991/sse closed.
Cleaning up connection to http://localhost:9990/sse...
Connection to http://localhost:9990/sse closed.
All MCP server connections closed.
正如你所看到的,Agent 成功地:
get_weather
工具。convert
工具。通过结合 MCP 和 LangChain,我们构建了一个能够与多个外部工具交互的智能 Agent。
MCP 负责将工具函数封装成标准化的网络服务,而 LangChain 则利用 LLM 的理解能力来充当“调度中心”,决定何时调用哪个工具。
「这个架构的优势:」
希望这篇教程能帮助你入门使用 MCP 和 LangChain 构建自己的多工具 Agent!快去动手试试吧!
关注不迷路。
53AI,企业落地大模型首选服务商
产品:场景落地咨询+大模型应用平台+行业解决方案
承诺:免费场景POC验证,效果验证后签署服务协议。零风险落地应用大模型,已交付160+中大型企业
2025-05-12
LlamaIndex获得Databricks和KPMG LLP 战略投资
2025-05-10
跨网页解析实战:基于LangChain的多段文本分析与结构化抽取
2025-05-09
一张图讲清楚:AI Agent工程演进趋势
2025-05-08
LangChain脚本如何调度及提效?
2025-05-07
AI Agent 十问十答,降低认知摩擦
2025-05-06
Spring AI和Langchain4j对比,java栈到底用哪个好?
2025-04-30
通过langchain访问大模型并实现简单的查询
2025-04-27
AI Agent 十问十答,降低认知摩擦
2024-10-10
2024-07-13
2024-06-03
2024-04-08
2024-09-04
2024-04-08
2024-08-18
2024-03-28
2024-06-24
2024-07-10
2025-05-08
2025-05-06
2025-04-22
2025-04-18
2025-03-22
2025-03-22
2025-03-15
2025-02-05