支持私有化部署
AI知识库

53AI知识库

学习大模型的前沿技术与行业应用场景


精通 MCP Server | Client 2

发布日期:2025-05-12 22:18:54 浏览次数: 1516 作者:半夏决明
推荐语

深入探索MCP Server|Client框架,实现工具服务与智能Agent的无缝集成。

核心内容:
1. 利用MCP和LangChain框架集成工具服务
2. 创建天气查询和货币转换的MCP工具服务
3. 构建Agent客户端,实现对MCP服务的智能调用

杨芳贤
53A创始人/腾讯云(TVP)最具价值专家

喜欢阅读,写作和摄影的「coder」


精通MCP Server|Client 2

上一篇介绍了基本的MPC Demo,今天,我们将展示如何使用「MCP」「LangChain」框架,轻松地将多个独立运行的工具服务(通过 MCP 暴露)集成到一个 Agent 中,让 LLM 能够根据用户需求智能地调用这些工具。

「本文目标:」

  • 创建两个简单的 MCP 工具服务:一个用于查询天气,一个用于货币转换。
  • 构建一个使用 LangChain 和 ChatOpenAI 的 Agent 客户端。
  • 让 Agent 能够连接到这两个 MCP 服务,理解用户意图,并调用合适的工具来回答问题。

「准备工作:」

  1. 「安装必要的库:」
uv add mcp langchain-openai python-dotenv
  1. 「API Key」:本教程使用ChatOpenAI(通过智谱 AI 的 API 地址),你需要一个相应的 API Key。将你的 Key 存放在项目根目录下的 .env 文件中:
OPENAI_API_KEY="YOUR_API_KEY_HERE"

(你也可以替换成其他 LangChain 支持的模型,例如本地运行的 Ollama)



第一步:创建 MCP 工具服务

MCP 工具本质上是可以通过网络调用的函数。我们将创建两个独立的 Python 脚本来运行我们的工具服务。

1.天气查询服务server_weather.py

这个服务提供一个 get_weather 工具,根据城市名称返回模拟的天气信息。

# server_weather.py
from mcp.server.fastmcp import FastMCP

# 初始化 FastMCP server,监听在 9990 端口
mcp = FastMCP("weather_service", host="localhost", port=9990# 服务名称和地址

@mcp.tool()
asyncdef get_weather(city: str) -> str:
    """Get weather information for a city.

    Args:
        city: Name of the city
    """

    # 注意:这里只是一个示例,返回固定信息
    returnf"{city} is sunny, enjoy it!"


if __name__ == "__main__":
    print("Weather service running on http://localhost:9990/sse")
    # 使用 SSE (Server-Sent Events) 传输协议运行服务
    mcp.run(transport='sse')
  • FastMCP(...): 初始化一个 MCP 服务实例,指定服务名称、主机和端口。
  • @mcp.tool(): 装饰器,将 get_weather 函数注册为一个 MCP 工具。函数签名和 docstring 会被用来生成工具的描述和输入模式 (schema),供 LLM 理解。
  • mcp.run(transport='sse'): 启动服务,使用 SSE 协议进行通信。

2. 货币转换服务server_currency.py

这个服务提供一个 convert 工具,用于转换货币。

# server_currency.py
from mcp.server.fastmcp import FastMCP

# 初始化 FastMCP server,监听在 9991 端口
mcp = FastMCP("currency_service", host="localhost", port=9991# 注意端口不同

@mcp.tool()
def convert(amount: float, currency_from: str, currency_to: str) -> float:
    """use latest exchange rate to convert currency

    Args:
      amount: the amount of currency to convert
      currency_from: the currency to convert from
      currency_to: the currency to convert to
    """

    # 注意:这里只是一个示例,使用固定汇率
    # 实际应用中,这里应该调用真实的汇率API
    if currency_from.upper() == 'USD'and currency_to.upper() == 'CNY':
         # 模拟一个简单的汇率,实际应查询API
         exchange_rate = 7.2
    elif currency_from.upper() == 'EUR'and currency_to.upper() == 'USD':
         exchange_rate = 1.1
    else:
         # 默认或未知的汇率
         exchange_rate = 0.8# 使用代码中原有的 0.8 作为默认值
    return amount * exchange_rate


if __name__ == "__main__":
    print("Currency service running on http://localhost:9991/sse")
    # 使用 SSE 传输协议运行服务
    mcp.run(transport='sse')

  • 「重要提示:」 这个 convert 函数使用了 固定 的模拟汇率。在实际应用中,你应该在这里调用真实的汇率 API 获取最新汇率。

「运行服务:」

打开两个终端窗口,分别运行这两个脚本:

# 终端 1
uv run server_weather.py
# 输出: Weather service running on http://localhost:9990/sse

# 终端 2
uv run server_currency.py
# 输出: Currency service running on http://localhost:9991/sse

现在,你的两个 MCP 工具服务已经在后台运行了。



第二步:构建 Agent 客户端agent_client.py

接下来,我们编写客户端代码,该客户端将连接到这两个 MCP 服务,并使用 LangChain 来决定何时以及如何调用这些工具。

# agent_client.py
import asyncio
from contextlib import AsyncExitStack, asynccontextmanager
from dataclasses import dataclass
from typing import Optional, List, Dict, Any

from dotenv import load_dotenv
from langchain_core.language_models import BaseChatModel
from langchain_core.messages import HumanMessage, ToolMessage
from langchain_openai import ChatOpenAI
from mcp import ClientSession
from mcp.client.sse import sse_client
from mcp.types import TextContent

# 加载 .env 文件中的 OPENAI_API_KEY
load_dotenv()

class MCPServerHTTP:
    """管理与单个 MCP SSE 服务器的连接和交互"""
    def __init__(self, url: str):
        self.server_url = url
        self.session: Optional[ClientSession] = None
        self.exit_stack = AsyncExitStack()
        self.tools = None# 存储从该服务器获取的工具列表 (LangChain 格式)

    asyncdef __aenter__(self):
        # 进入上下文时连接
        await self.connect()
        return self

    asyncdef __aexit__(self, exc_type, exc_val, exc_tb):
        # 退出上下文时清理连接
        await self.cleanup()

    asyncdef connect(self):
        """连接到SSE服务器并初始化会话,获取工具列表"""
        sc = sse_client(url=self.server_url)
        sse_transport = await self.exit_stack.enter_async_context(sc)
        sse, write = sse_transport
        cs = ClientSession(sse, write)
        self.session = await self.exit_stack.enter_async_context(cs)
        await self.session.initialize()

        # 获取并转换工具列表为 LangChain 格式
        self.tools = await self.list_tools()
        print(f"Fetched tools from {self.server_url}{[tool['function']['name'for tool in self.tools]}")
        return self

    asyncdef list_tools(self) -> List[Dict[str, Any]]:
        """获取 MCP 工具并转换为 LangChain bind_tools 所需的字典列表格式"""
        ifnot self.session:
            raise RuntimeError("Not connected to server")

        response = await self.session.list_tools()

        converted_tools = []
        for tool in response.tools:
            tool_dict = {
                "type""function",
                "function": {
                    "name": tool.name,
                    "description": tool.description,
                    "parameters": tool.inputSchema # MCP 的输入模式可以直接用
                }
            }
            converted_tools.append(tool_dict)
        return converted_tools

    asyncdef tool_call(self, tool_name: str, args: dict) -> Any:# 返回原始结果或文本
        """调用 MCP 服务器上的特定工具"""
        ifnot self.session:
            raise RuntimeError("Not connected to server")

        print(f"Calling tool '{tool_name}' on {self.server_url} with args: {args}")
        response = await self.session.call_tool(tool_name, args)
        print(f"Received response from tool '{tool_name}': {response.content}")

        # 提取主要内容,这里简化处理,优先取 text
        if response.content:
            first_content = response.content[0]
            if isinstance(first_content, TextContent):
                return first_content.text
            # 可以根据需要处理 ImageContent, EmbeddedResource 等
            return str(first_content) # Fallback to string representation
        returnNone# Or raise an error if no content

    asyncdef cleanup(self):
        """关闭连接和清理资源"""
        print(f"Cleaning up connection to {self.server_url}...")
        await self.exit_stack.aclose()
        print(f"Connection to {self.server_url} closed.")


@dataclass
class AgentResult:
    """封装 Agent 的运行结果"""
    data: str
    tool_used: Optional[str] = None


class Agent:
    """协调 LLM 和多个 MCP 服务器的 Agent"""
    def __init__(self, model: BaseChatModel, mcp_servers: List[MCPServerHTTP] = None):
        self.mcp_servers = mcp_servers or []
        self.model = model # 基础 LLM
        self.tool_model = None# 绑定了工具的 LLM
        self.server_tool_map: Dict[str, MCPServerHTTP] = {} # 工具名 -> 服务器实例映射

    @asynccontextmanager
    asyncdef run_mcp_servers(self):
        """上下文管理器,用于连接所有 MCP 服务器并准备工具"""
        exit_stack = AsyncExitStack()
        try:
            print("Starting MCP server connections...")
            # 异步连接所有服务器
            for server in self.mcp_servers:
                await exit_stack.enter_async_context(server)
            print("All MCP servers connected.")

            # 收集所有服务器的工具并创建映射
            all_tools = []
            self.server_tool_map = {}
            for server in self.mcp_servers:
                if server.tools:
                    all_tools.extend(server.tools)
                    for tool in server.tools:
                        self.server_tool_map[tool['function']['name']] = server

            # 如果有工具,将它们绑定到模型
            if all_tools:
                print(f"Binding tools to LLM: {[tool['function']['name'for tool in all_tools]}")
                self.tool_model = self.model.bind_tools(all_tools, tool_choice="auto")
            else:
                print("No tools found to bind.")
                self.tool_model = self.model # 使用原始模型

            yield self # 进入运行时
        finally:
            print("Closing MCP server connections...")
            # 退出时自动清理所有连接
            await exit_stack.aclose()
            self.tool_model = None# 重置 tool_model
            self.server_tool_map = {}
            print("All MCP server connections closed.")

    asyncdef run(self, query: str) -> AgentResult:
        """
        运行 Agent 处理用户查询。
        包含 LLM 调用、工具选择、工具执行和(可选的)最终结果生成。
        """

        ifnot self.tool_model:
            # 如果没有工具或未在上下文中运行,直接查询基础模型
            print("No tool model available, querying base model directly.")
            response = await self.model.ainvoke([HumanMessage(content=query)])
            return AgentResult(data=response.content)

        print(f"\nProcessing query: '{query}'")
        # 第一次调用 LLM,让其决定是否需要调用工具
        message = HumanMessage(content=query)
        ai_response = await self.tool_model.ainvoke([message])

        tool_calls = ai_response.tool_calls
        ifnot tool_calls:
            # LLM 认为不需要调用工具,直接返回其回答
            print("LLM decided no tool call is needed.")
            return AgentResult(data=ai_response.content)

        # LLM 决定调用一个或多个工具
        tool_messages = [] # 存储工具调用的结果消息
        tool_results_data = [] # 存储工具调用的原始结果文本

        for tool_call in tool_calls:
            tool_name = tool_call["name"]
            tool_args = tool_call["args"]
            tool_call_id = tool_call["id"# Langchain 需要 ID 来匹配结果

            # 查找能执行此工具的服务器
            server = self.server_tool_map.get(tool_name)
            if server:
                try:
                    # 调用 MCP 服务器上的工具
                    tool_result = await server.tool_call(tool_name, tool_args)
                    result_str = str(tool_result) # 确保是字符串
                    tool_results_data.append(result_str)
                    # 创建 ToolMessage,包含工具调用的结果,供后续 LLM 使用
                    tool_messages.append(ToolMessage(content=result_str, tool_call_id=tool_call_id))
                    print(f"Tool '{tool_name}' executed successfully. Result: {result_str}")
                except Exception as e:
                    print(f"Error calling tool '{tool_name}': {e}")
                    # 发生错误时,也创建一个 ToolMessage 告知 LLM
                    tool_messages.append(ToolMessage(content=f"Error executing tool: {e}", tool_call_id=tool_call_id))
                    tool_results_data.append(f"Error calling {tool_name}")
            else:
                print(f"Error: Tool '{tool_name}' not found on any connected server.")
                tool_messages.append(ToolMessage(content=f"Error: Tool '{tool_name}' not found.", tool_call_id=tool_call_id))
                tool_results_data.append(f"Error: {tool_name} not found")

        # --- 重要改进:将工具结果反馈给 LLM 进行最终回复 ---
        # 将原始用户消息、LLM 的工具调用决定以及工具执行结果一起发送给 LLM
        final_response = await self.tool_model.ainvoke([
            message, # 原始用户查询
            ai_response, # LLM 的首次回复(包含工具调用请求)
            *tool_messages # 所有工具的执行结果
        ])

        print(f"LLM generated final response after tool calls: {final_response.content}")
        # 返回 LLM 基于工具结果生成的最终答案
        return AgentResult(data=final_response.content, tool_used=", ".join([tc["name"for tc in tool_calls]))


asyncdef main():
    # 1. 配置 MCP 服务器连接信息
    weather_server = MCPServerHTTP(url='http://localhost:9990/sse')
    currency_server = MCPServerHTTP(url='http://localhost:9991/sse')

    # 2. 初始化 LangChain LLM 模型
    # 使用 ZhipuAI GLM-4-Flash 作为示例
    llm = ChatOpenAI(
        base_url='https://open.bigmodel.cn/api/paas/v4/'# ZhipuAI API 地址
        model='glm-4-flash',                            # 模型名称
        temperature=0.0,                                # 温度设低一点,保证工具调用更稳定
    )

    # 3. 创建 Agent 实例
    agent = Agent(model=llm, mcp_servers=[weather_server, currency_server])

    # 4. 使用 async context manager 运行 Agent
    asyncwith agent.run_mcp_servers():
        # 在这里,Agent 会自动连接服务器、获取工具并绑定到 LLM

        # 示例查询 1: 天气
        print("--- Query 1: Weather ---")
        result1 = await agent.run('BeiJing天气怎么样?'# 使用中文查询
        print(f"Agent Result 1:\nData: {result1.data}\nTool Used: {result1.tool_used}\n")

        # 示例查询 2: 货币转换
        print("--- Query 2: Currency Conversion ---")
        result2 = await agent.run('我想知道5美元等于多少人民币?'# 使用中文查询
        print(f"Agent Result 2:\nData: {result2.data}\nTool Used: {result2.tool_used}\n")


if __name__ == "__main__":
    # 运行主异步函数
    asyncio.run(main())

「代码解释:」

  • MCPServerHTTP Class: 封装了与 单个 MCP 服务器通过 SSE 进行通信的逻辑。

    • __aenter__ / __aexit__: 使用 asynccontextmanager 的标准模式,在进入时连接 (connect),退出时清理 (cleanup)。
    • connect: 使用 mcp.client.sse.sse_client 建立 SSE 连接,初始化 ClientSession,并调用 list_tools 获取工具信息。
    • list_tools: 调用 MCP 的 session.list_tools(),然后将返回的 MCP Tool 对象列表转换成 LangChain bind_tools 方法期望的字典列表格式。
    • tool_call: 调用 MCP 的 session.call_tool() 来执行指定的工具,并简化处理返回结果(提取文本内容)。
    • cleanup: 使用 AsyncExitStack 来确保所有异步资源(如 SSE 连接、会话)被正确关闭。
  • Agent Class:」 负责管理 多个 MCPServerHTTP 实例,并与 LangChain 模型交互。

    • 找到能执行该工具的 MCPServerHTTP 实例(使用 server_tool_map)。
    • 调用服务器的 tool_call 方法执行工具。
    • 将工具执行结果封装成 ToolMessage
    • __init__: 接收 LangChain 模型和 MCPServerHTTP 实例列表。
    • run_mcp_servers: 这是一个异步上下文管理器。进入时,它会并行连接所有配置的 MCP 服务器,收集所有可用的工具,创建一个从工具名到对应服务器实例的映射server_tool_map,然后使用model.bind_tools()创建一个新的模型self.tool_model,这个模型知道如何以及何时请求调用这些工具。退出时,它会负责断开所有连接。
    • run: 这是 Agent 的核心处理逻辑。
  1. 将用户查询 (query) 发送给绑定了工具的 tool_model
  2. 检查 LLM 的响应 (ai_response) 是否包含 tool_calls
  3. 如果没有 tool_calls,说明 LLM 认为不需要工具,直接返回 LLM 的回复内容。
  4. 如果  tool_calls,遍历每个调用请求:
  5. (重要),这也是langchain等框架所做的封装原理:将原始用户消息、LLM 的首次响应(包含工具调用请求)以及所有 ToolMessage 再次 发送给 tool_model。这样LLM 基于工具的实际执行结果来生成最终的、更连贯、更准确的回答。
  6. 返回 LLM 的最终回答,并附带使用了哪个工具的信息。
  • main Function:」

    • 创建 MCPServerHTTP 实例,指向我们之前运行的两个服务地址。
    • 初始化 ChatOpenAI 模型(配置了智谱 AI 的接入点)。
    • 创建 Agent 实例。
    • 使用 async with agent.run_mcp_servers(): 来确保在执行查询前,所有服务器都已连接且工具已绑定。
    • 调用 agent.run() 发起几个不同的查询,演示 Agent 如何根据问题选择调用天气工具、货币转换工具或不调用任何工具。


    第三步:运行 Agent 客户端

    确保你的两个MCP服务server_weather.pyserver_currency.py在运行。然后,在另一个终端中运行 Agent 客户端脚本:

    uv run agent_client.py

    「看到以下输出:」

    Starting MCP server connections...
    Fetched tools from http://localhost:9990/sse: ['get_weather']
    Fetched tools from http://localhost:9991/sse: ['convert']
    All MCP servers connected.
    Binding tools to LLM: ['get_weather', 'convert']
    --- Query 1: Weather ---

    Processing query: 'BeiJing天气怎么样?'
    Calling tool 'get_weather' on http://localhost:9990/sse with args: {'city': '北京'}
    Received response from tool 'get_weather': [TextContent(type='text', text='北京 is sunny, enjoy it!', annotations=None)]
    Tool 'get_weather' executed successfully. Result: 北京 is sunny, enjoy it!
    LLM generated final response after tool calls: 北京现在是晴天,您可以尽情享受!
    Agent Result 1:
    Data: 北京现在是晴天,您可以尽情享受!
    Tool Used: get_weather

    --- Query 2: Currency Conversion ---

    Processing query: '我想知道5美元等于多少人民币?'
    Calling tool 'convert' on http://localhost:9991/sse with args: {'amount': 5, 'currency_from': 'USD', 'currency_to': 'CNY'}
    Received response from tool 'convert': [TextContent(type='text', text='36.0', annotations=None)]
    Tool 'convert' executed successfully. Result: 36.0
    LLM generated final response after tool calls: 根据最新的汇率,5美元等于36.0人民币。
    Agent Result 2:
    Data: 根据最新的汇率,5美元等于36.0人民币。
    Tool Used: convert

    Closing MCP server connections...
    Cleaning up connection to http://localhost:9991/sse...
    Connection to http://localhost:9991/sse closed.
    Cleaning up connection to http://localhost:9990/sse...
    Connection to http://localhost:9990/sse closed.
    All MCP server connections closed.

    正如你所看到的,Agent 成功地:

    1. 连接到了两个独立的 MCP 工具服务。
    2. 根据用户的中文提问,理解了意图。
    3. 为天气问题调用了 get_weather 工具。
    4. 为货币转换问题调用了 convert 工具。
    5. 对于不需要工具的问题,直接给出了回答。
    6. 将工具的执行结果反馈给 LLM,生成了最终的自然语言回答。


    总结与展望

    通过结合 MCP 和 LangChain,我们构建了一个能够与多个外部工具交互的智能 Agent。

    MCP 负责将工具函数封装成标准化的网络服务,而 LangChain 则利用 LLM 的理解能力来充当“调度中心”,决定何时调用哪个工具。

    「这个架构的优势:」

    • 「解耦:」 工具服务可以独立开发、部署和扩展,无需修改 Agent 核心逻辑。
    • 「标准化:」 MCP 提供了一种标准方式来描述和调用工具。
    • 「灵活性:」 可以轻松添加或移除工具服务。
    • 「利用 LangChain 生态:」 可以方便地集成 LangChain 的各种模型、提示工程和 Agent 功能。

    希望这篇教程能帮助你入门使用 MCP 和 LangChain 构建自己的多工具 Agent!快去动手试试吧!

    关注不迷路。

    53AI,企业落地大模型首选服务商

    产品:场景落地咨询+大模型应用平台+行业解决方案

    承诺:免费场景POC验证,效果验证后签署服务协议。零风险落地应用大模型,已交付160+中大型企业

    联系我们

    售前咨询
    186 6662 7370
    预约演示
    185 8882 0121

    微信扫码

    添加专属顾问

    回到顶部

    加载中...

    扫码咨询