免费POC,零成本试错

AI知识库

53AI知识库

学习大模型的前沿技术与行业应用场景


构建通用 MCP 客户端:无缝连接本地与远程 AI 工具

发布日期:2025-08-21 14:47:33 浏览次数: 1524
作者:活水智能

微信搜一搜,关注“活水智能”

推荐语

一文掌握如何构建通用MCP客户端,无缝连接本地与远程AI工具,提升开发效率。

核心内容:
1. 通用MCP客户端的设计理念与架构组成
2. 基于Google ADK与Gemini的实现方法
3. 实际应用场景与效果演示

杨芳贤
53AI创始人/腾讯云(TVP)最具价值专家

点击上方↗️活水智能关注 + 星标🌟

图片

作者:Piyush Agnihotri

编译:活水智能


 

与传统只支持单一传输类型的客户端不同,本文将带你构建一个通用的 MCP(Model Context Protocol,模型上下文协议)客户端,既能连接本地的 Stdio 服务器,也能连接远程的 可流式 HTTP 服务器。单一客户端即可通过统一接口访问本地计算工具、远程 API 服务以及介于两者之间的各类能力。

我们将基于 Google ADK(Agent Development Kit,代理开发套件)与 Gemini,构建一个支持可流式 HTTP 传输的通用 MCP 客户端。

引言

MCP(模型上下文协议)正在重塑我们构建可与外部工具和服务交互的 AI Agent 的方式。随着可流式 HTTP 传输协议的引入,MCP 又迈出关键一步——以更稳健的远程部署方案取代早期的 HTTP + SSE(服务器发送事件)。

本文将带你构建一个同时兼容 Stdio 服务器(本地运行) 与 可流式 HTTP 服务器(远程部署) 的通用 MCP 客户端。我们将以 Google ADK 与 Gemini 为基础,完整展示现代 MCP 的强大与灵活。

成果预览

在深入代码之前,先看看这个客户端能做什么。服务器启动后,客户端会自动从多个来源加载工具:

  • • 温度服务器(HTTP):提供 celsius_to_fahrenheitfahrenheit_to_celsiuscelsius_to_kelvinkelvin_to_celsiusfahrenheit_to_kelvinkelvin_to_fahrenheit 等工具。
  • • 终端服务器(Stdio):提供用于本地文件操作的 run_command 工具。

例如,当你提出“把 25°C 转为华氏和开尔文,再把 100°F 转为摄氏,并把所有结果按漂亮格式写入文件”的请求,客户端会依次完成:

  1. 1. 调用 celsius_to_fahrenheit(25)
  2. 2. 调用 celsius_to_kelvin(25)
  3. 3. 调用 fahrenheit_to_celsius(100)
  4. 4. 使用 run_command 把结果写入格式化文件

整个过程中,客户端会以流事件的形式展示每一次函数调用与响应,让 Agent 的决策过程透明可见。

架构概览

我们的通用 MCP 客户端由四个主要部分组成:

1. 命令行界面(cli/main.py):作为用户界面,提供自然语言交互,负责处理输入、管理聊天循环并展示来自 Agent 的流式响应。

2. 客户端层(src/client/mcp_client.py):定义 MCP 客户端类,创建会话、构建 Agent,并把来自 UI 的查询路由给 Agent,可视为 UI 与“大脑”之间的交通指挥。

3. Agent 核心(src/agent/agent_wrapper.py):基于 Google ADK(由 Gemini 驱动)实例化 Agent,并借助 ADK 的 MCP 工具集从配置中加载所有可用工具。

4. 配置(config/servers.json):统一管理服务器端点,包括可流式 HTTP 服务器 URL 与本地 Stdio 服务器位置,是声明客户端可访问工具的中心。

通用 MCP 客户端序列图
通用 MCP 客户端序列图

序列图解析

下面结合序列图,详细说明这个通用 MCP 客户端的内部工作机制。

序列从一个简单场景开始:用户在 CLI 中输入请求:“将 100°F 转为摄氏与开尔文,并保存到 temp_conv.txt”。这一自然语言命令触发一连串动作。

阶段一:系统初始化与发现

CLI 将用户请求转发给充当中枢的 MCP 客户端。MCP 客户端查询配置加载器获取服务器配置清单——这里正体现系统的通用性:得到的既包含 HTTP 服务器,也包含 Stdio 服务器。

随后进入关键步骤“基于已加载工具构建 Agent”。MCP 客户端与 Agent 核心通信,后者与两类服务器协作发现可用工具,等同于为系统盘点所有能力。

阶段二:Agent 规划

Agent 核心分析用户请求,制定执行计划,识别出三项关键操作:

  • • fahrenheit_to_celsius
  • • fahrenheit_to_kelvin
  • • run_command(写文件)

阶段三:并行温度转换

Agent 核心并行调用:

  • • 向 HTTP 服务器发起 fahrenheit_to_celsius(100)
  • • 向 HTTP 服务器发起 fahrenheit_to_kelvin(100)

两个转换各自独立执行并返回:

  • • 100°F = 37.78°C
  • • 100°F = 310.93K

相较串行处理,并行显著提升性能。

阶段四:结果流与文件操作

Agent 核心以流事件向 MCP 客户端返回消息“100°F = 37.78°C 和 310.93K”。随后,它准备写文件命令并发送给 Stdio 服务器

run_command("echo '100°F = 37.78°C = 310.93K' > temp_conv.txt")

Stdio 服务器在受限的工作区目录执行命令,返回退出码 0 以确认成功。

阶段五:收尾与反馈

系统给出“文件已保存”的确认信息:“我已将转换后的温度保存到 temp_conv.txt”。聊天循环结束,用户可继续输入新请求或正常退出。

项目结构与环境设置

让我们从项目结构与开发环境开始。

项目结构

build-universal-mcp-client/

├── .env                          # Environment variables
├── .gitignore                    # Git ignore patterns
├── README.md                     # Project documentation
├── requirements.txt              # Python dependencies

├── config/                      # Configuration files
│   ├── __init__.py
│   └── servers.json            # MCP server configurations

├── src/                        # Source code
│   ├── __init__.py
│   ├── agent/                  # Agent implementation
│   │   ├── __init__.py
│   │   └── agent_wrapper.py    # Agent logic and toolset management
│   ├── client/                 # Client implementation
│   │   ├── __init__.py
│   │   └── mcp_client.py       # Main client interface
│   └── utils/                  # Utilities and helpers
│       ├── __init__.py
│       ├── config_loader.py    # Configuration management
│       └── formatters.py       # Response formatting utilities

├── servers/                    # MCP server implementations
│   ├── __init__.py
│   ├── http/                   # HTTP/Streamable servers
│   │   ├── __init__.py
│   │   ├── temperature_server.py  # Temperature conversion server
│   │   └── server_launcher.py     # HTTP server launcher
│   └── stdio/                  # Stdio servers
│       ├── __init__.py
│       └── terminal_server.py  # Terminal command server

├── cli/                        # Command line interface
│   ├── __init__.py
│   └── main.py                 # CLI entry point

├── workspace/                  # Working directory for file operations
│   └── .gitkeep

└── tests/                      # Test files (for future expansion)
    ├── __init__.py
    ├── test_agent.py
    ├── test_client.py
    └── test_servers.py

环境设置

项目使用虚拟环境,示例中采用 uv(快速的 Python 包管理器)。

第一步,克隆仓库并创建虚拟环境。

步骤 1:创建虚拟环境

# clone the repo and move to the project directory
git clone https://github.com/piyushagni5/langgraph-ai.git
cd mcp/04-build-streammable-http-mcp-client

# create virtual env using uv package manager
uv venv

# Activate virtual environment (On macOS/Linux)
source .venv/bin/activate   # On Windows: .venv\Scripts\activate

步骤 2:安装依赖

虚拟环境创建完成后,安装 requirements.txt 中列出的依赖:

# AI and MCP Core
google-adk>=1.5.0
google-cloud-aiplatform>=1.38.0
google-generativeai>=0.3.0
mcp>=1.0.0
fastmcp>=0.9.0
# Data and Validation
pydantic>=2.5.0
# CLI and UI
click>=8.1.7
rich>=13.7.0
# Environment and Configuration
python-dotenv>=1.0.0
# Async Support
asyncio-mqtt>=0.13.0
# Development (optional)
pytest>=7.4.0
black>=23.0.0
flake8>=6.1.0

安装依赖:

uv pip install -r requirements.txt

步骤 3:环境变量

创建 .env 文件保存敏感信息(如 API Key)。本文使用两项免费服务:Google Gemini API 与 Tavily Search API。前者可在 Google AI Studio 申请(登录 Gmail → “Get API Key” → 同意条款 → 复制密钥至 .env);后者在 Tavily 官网注册获取。

创建 .env 文件:

# Google AI API Configuration

GOOGLE_API_KEY=your_google_api_key_here

# MCP Configuration

MCP_CONFIG_PATH=config/servers.json

# Logging

LOG_LEVEL=INFO

# Workspace

WORKSPACE_
DIR=workspace

实现详解

该架构由多个脚本组成,下面结合更新后的可用于生产的代码逐一讲解:

  • • 温度转换服务器(HTTP)
  • • Stdio 服务器实现
  • • 配置管理
  • • Agent 实现
  • • 客户端实现
  • • 具备增强调试功能的 CLI 实现

Stdio 服务器实现

本项目实现两类服务器:Stdio 与 HTTP。先来看 Stdio MCP 服务器。它提供安全的本地命令执行,通过标准输入/输出与客户端通信,并在沙盒化的工作区目录中运行命令。

servers/stdio/terminal_server.py

"""
Terminal MCP Server
Provides secure local command execution capabilities through MCP stdio transport.
"""


import os
import subprocess
import logging
from pathlib import Path
from mcp.server.fastmcp import FastMCP
from pydantic import BaseModel, Field

# Configure logging for stdio server
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

# Create FastMCP server for stdio transport (no HTTP, uses stdin/stdout)
mcp = FastMCP("terminal")

# Get workspace directory from environment or use default
WORKSPACE_DIR = Path(os.getenv("WORKSPACE_DIR""workspace")).resolve()

# Ensure workspace exists and log location
WORKSPACE_DIR.mkdir(exist_ok=True)
logger.info(f"Terminal server workspace: {WORKSPACE_DIR}")

classCommandInput(BaseModel):
    """Input model for terminal commands with validation."""
    command: str = Field(
        ...,
        description="Shell command to execute in the workspace directory",
        min_length=1
    )

classCommandResult(BaseModel):
    """Output model for command execution results with full details."""
    command: str = Field(..., description="The command that was executed")
    exit_code: int = Field(..., description="Process exit code (0 = success)")
    stdout: str = Field(..., description="Standard output from the command")
    stderr: str = Field(..., description="Standard error from the command")
    working_directory: str = Field(..., description="Directory where command was executed")

@mcp.tool(
    description="Execute a shell command in the secure workspace directory. Use for file operations, text processing, and system tasks.",
    title="Terminal Command Executor"
)

asyncdefrun_command(params: CommandInput) -> CommandResult:
    """
    Execute a terminal command within the workspace directory.

    Security features:
    - Commands execute only within the workspace directory
    - 30-second timeout to prevent hanging processes
    - Full command output captured for transparency

    Args:
        params: CommandInput containing the command to execute

    Returns:
        CommandResult with execution details and output
    """

    command = params.command.strip()

    # Log the command execution for debugging
    logger.info(f"Executing command: {command}")

    try:
        # Execute command in workspace directory with timeout
        result = subprocess.run(
            command,
            shell=True,
            cwd=WORKSPACE_DIR,  # Sandbox to workspace directory
            capture_output=True,  # Capture both stdout and stderr
            text=True,  # Return strings instead of bytes
            timeout=30# 30 second timeout for safety
        )

        # Prepare structured result
        cmd_result = CommandResult(
            command=command,
            exit_code=result.returncode,
            stdout=result.stdout,
            stderr=result.stderr,
            working_directory=str(WORKSPACE_DIR)
        )

        # Log result summary for monitoring
        status = "SUCCESS"if result.returncode == 0else"ERROR"
        logger.info(f"{status}: Command '{command}' completed with exit code {result.returncode}")

        return cmd_result

    except subprocess.TimeoutExpired:
        logger.error(f"Command '{command}' timed out after 30 seconds")
        return CommandResult(
            command=command,
            exit_code=-1,
            stdout="",
            stderr="Command timed out after 30 seconds",
            working_directory=str(WORKSPACE_DIR)
        )
    except Exception as e:
        logger.error(f"Error executing command '{command}': {e}")
        return CommandResult(
            command=command,
            exit_code=-1,
            stdout="",
            stderr=f"Execution error: {str(e)}",
            working_directory=str(WORKSPACE_DIR)
        )

if __name__ == "__main__":
    logger.info("Starting Terminal MCP Server (stdio transport)")
    logger.info(f"Workspace: {WORKSPACE_DIR}")
    # Run with stdio transport (communicates via stdin/stdout)
    mcp.run(transport="stdio")

要点:FastMCP("terminal") 创建通过标准输入/输出通信的服务器;WORKSPACE_DIR 负责构建安全沙盒,防止访问敏感系统目录。Pydantic 模型(CommandInputCommandResult)保证输入输出的结构化与有效性;@mcp.tool 将 run_command 暴露为可被客户端发现与调用的 MCP 工具。subprocess.run() 执行命令并捕获输出,30 秒超时避免失控,cwd=WORKSPACE_DIR 限制命令只能在沙盒内执行。

温度转换服务器(HTTP)

基于 FastMCP 构建支持流式传输的 HTTP 服务器,提供完整的温度转换工具,并包含参数校验。

servers/http/temperature_server.py

"""
Temperature Conversion MCP Server
Provides comprehensive temperature conversion tools supporting Celsius, Fahrenheit, and Kelvin.
"
""

import click
import logging
from typing import Union
from pydantic import BaseModelField, validator
from mcp.server.fastmcp import FastMCP

@click.command()
@click.option("--port", default=8000, help="Port to run the server on")
@click.option("--host", default="localhost", help="Host to bind the server to")
@click.option("--log-level", default="INFO", help="Logging level")

defmain(port: int, host: str, log_level: str) -> None:
    """Launch the Temperature Conversion MCP Server."""

    # Configure logging
    logging.basicConfig(
        level=getattr(logging, log_level.upper(), logging.INFO),
        format="%(asctime)s - %(name)s - %(levelname)s - %(message)s"
    )
    logger = logging.getLogger(__name__)
    logger.info("Starting Temperature Conversion MCP Server...")

    # Create FastMCP server with streamable HTTP transport
    mcp = FastMCP(
        "Temperature Converter",
        host=host,
        port=port,
        stateless_http=True# Enable streamable HTTP protocol
    )

    # Input/Output Models for type safety and validation
    classTemperatureInput(BaseModel):
        """Input model for temperature conversion with validation."""
        temperature: float = Field(
            ...,
            description="Temperature value to convert"
        )

        @validator('temperature')
        defvalidate_temperature_range(cls, v):
            """Validate temperature is within physically reasonable bounds."""
            if v < -273.15:  # Below absolute zero in Celsius
                raiseValueError("Temperature cannot be below absolute zero (-273.15°C)")
            return v

    classTemperatureResult(BaseModel):
        """Output model for temperature conversion results."""
        original_value: float = Field(..., description="Original temperature value")
        original_scale: str = Field(..., description="Original temperature scale")
        converted_value: float = Field(..., description="Converted temperature value")
        converted_scale: str = Field(..., description="Target temperature scale")
        formula: str = Field(..., description="Conversion formula used")

    # Core conversion functions (business logic)
    defcelsius_to_fahrenheit_calc(celsius: float) -> float:
        """Convert Celsius to Fahrenheit using standard formula."""
        return (celsius * 9/5) + 32

    deffahrenheit_to_celsius_calc(fahrenheit: float) -> float:
        """Convert Fahrenheit to Celsius using standard formula."""
        return (fahrenheit - 32) * 5/9

    defcelsius_to_kelvin_calc(celsius: float) -> float:
        """Convert Celsius to Kelvin by adding absolute zero offset."""
        return celsius + 273.15

    defkelvin_to_celsius_calc(kelvin: float) -> float:
        """Convert Kelvin to Celsius by subtracting absolute zero offset."""
        return kelvin - 273.15

    deffahrenheit_to_kelvin_calc(fahrenheit: float) -> float:
        """Convert Fahrenheit to Kelvin via Celsius intermediate."""
        celsius = fahrenheit_to_celsius_calc(fahrenheit)
        return celsius_to_kelvin_calc(celsius)

    defkelvin_to_fahrenheit_calc(kelvin: float) -> float:
        """Convert Kelvin to Fahrenheit via Celsius intermediate."""
        celsius = kelvin_to_celsius_calc(kelvin)
        return celsius_to_fahrenheit_calc(celsius)

    # MCP Tool Registrations - These become available to clients

    @mcp.tool(
        description="Convert temperature from Celsius to Fahrenheit",
        title="Celsius to Fahrenheit Converter"
    )
    defcelsius_to_fahrenheit(params: TemperatureInput) -> TemperatureResult:
        """Convert Celsius to Fahrenheit with validation and formula info."""
        converted = celsius_to_fahrenheit_calc(params.temperature)
        returnTemperatureResult(
            original_value=params.temperature,
            original_scale="Celsius",
            converted_value=round(converted, 2),
            converted_scale="Fahrenheit",
            formula="°F = (°C × 9/5) + 32"
        )

    @mcp.tool(
        description="Convert temperature from Fahrenheit to Celsius",
        title="Fahrenheit to Celsius Converter"
    )
    deffahrenheit_to_celsius(params: TemperatureInput) -> TemperatureResult:
        """Convert Fahrenheit to Celsius with additional validation."""
        # Additional validation for Fahrenheit absolute zero
        if params.temperature < -459.67:  # Below absolute zero in Fahrenheit
            raiseValueError("Temperature cannot be below absolute zero (-459.67°F)")

        converted = fahrenheit_to_celsius_calc(params.temperature)
        returnTemperatureResult(
            original_value=params.temperature,
            original_scale="Fahrenheit",
            converted_value=round(converted, 2),
            converted_scale="Celsius",
            formula="°C = (°F - 32) × 5/9"
        )

    @mcp.tool(
        description="Convert temperature from Celsius to Kelvin",
        title="Celsius to Kelvin Converter"
    )
    defcelsius_to_kelvin(params: TemperatureInput) -> TemperatureResult:
        """Convert Celsius to Kelvin - simple offset addition."""
        converted = celsius_to_kelvin_calc(params.temperature)
        returnTemperatureResult(
            original_value=params.temperature,
            original_scale="Celsius",
            converted_value=round(converted, 2),
            converted_scale="Kelvin",
            formula="K = °C + 273.15"
        )

    @mcp.tool(
        description="Convert temperature from Kelvin to Celsius",
        title="Kelvin to Celsius Converter"
    )
    defkelvin_to_celsius(params: TemperatureInput) -> TemperatureResult:
        """Convert Kelvin to Celsius with non-negative validation."""
        # Kelvin cannot be negative by definition
        if params.temperature < 0:
            raiseValueError("Kelvin temperature cannot be negative")

        converted = kelvin_to_celsius_calc(params.temperature)
        returnTemperatureResult(
            original_value=params.temperature,
            original_scale="Kelvin",
            converted_value=round(converted, 2),
            converted_scale="Celsius",
            formula="°C = K - 273.15"
        )

    @mcp.tool(
        description="Convert temperature from Fahrenheit to Kelvin",
        title="Fahrenheit to Kelvin Converter"
    )
    deffahrenheit_to_kelvin(params: TemperatureInput) -> TemperatureResult:
        """Convert Fahrenheit to Kelvin via two-step conversion."""
        if params.temperature < -459.67:
            raiseValueError("Temperature cannot be below absolute zero (-459.67°F)")

        converted = fahrenheit_to_kelvin_calc(params.temperature)
        returnTemperatureResult(
            original_value=params.temperature,
            original_scale="Fahrenheit",
            converted_value=round(converted, 2),
            converted_scale="Kelvin",
            formula="K = (°F - 32) × 5/9 + 273.15"
        )

    @mcp.tool(
        description="Convert temperature from Kelvin to Fahrenheit",
        title="Kelvin to Fahrenheit Converter"
    )
    defkelvin_to_fahrenheit(params: TemperatureInput) -> TemperatureResult:
        """Convert Kelvin to Fahrenheit via two-step conversion."""
        if params.temperature < 0:
            raiseValueError("Kelvin temperature cannot be negative")

        converted = kelvin_to_fahrenheit_calc(params.temperature)
        returnTemperatureResult(
            original_value=params.temperature,
            original_scale="Kelvin",
            converted_value=round(converted, 2),
            converted_scale="Fahrenheit",
            formula="°F = (K - 273.15) × 9/5 + 32"
        )

    # Server startup with error handling
    try:
        logger.info(f"Temperature server running on {host}:{port}")
        logger.info("Available conversions: °C↔°F, °C↔K, °F↔K")
        mcp.run(transport="streamable-http")  # Use new streamable HTTP transport
    except KeyboardInterrupt:
        logger.info("Server shutting down gracefully...")
    except Exception as e:
        logger.error(f"Server error: {e}")
        raise
    finally:
        logger.info("Temperature conversion server stopped")

if __name__ == "__main__":
    main()

要点:使用 Click 装饰器构建启动命令行,可配置主机、端口与日志级别;FastMCP(stateless_http=True) 启用可流式 HTTP 传输。内置的“温度合法性验证”会拒绝低于绝对零度的输入。所有转换函数以 @mcp.tool 暴露为工具;TemperatureResult 模型确保返回包括原值、目标值、温标与公式在内的结构化数据。

servers/http/server_launcher.py

这是一个用于管理 HTTP MCP 服务器生命周期的工具,涵盖启动、健康检查与关闭。

"""
HTTP Server Launcher
Provides utilities to start and manage HTTP MCP servers with health monitoring.
"
""

import subprocess
import sys
import time
import requests
import logging
from pathlib import Path
from typing import ListOptional

logger = logging.getLogger(__name__)

classServerLauncher:
    """Manages HTTP MCP server lifecycle with health monitoring."""

    def__init__(self):
        self.processes:List[subprocess.Popen] = []

    defstart_temperature_server(selfport: int = 8000host: str = "localhost") -> bool:
        """Start the temperature conversion server with health monitoring."""
        try:
            server_path = Path(__file__).parent / "temperature_server.py"

            cmd = [
                sys.executable,
                str(server_path),
                "--port", str(port),
                "--host", host,
                "--log-level""INFO"
            ]

            logger.info(f"Starting temperature server on {host}:{port}")
            process = subprocess.Popen(
                cmd,
                stdout=subprocess.PIPE,
                stderr=subprocess.PIPE,
                text=True
            )

            self.processes.append(process)

            # Wait for server to be ready and verify health
            returnself._wait_for_server(host, port)

        except Exception as e:
            logger.error(f"Failed to start temperature server: {e}")
            returnFalse

    def_wait_for_server(selfhost: str, port: int, timeout: int = 10) -> bool:
        """Wait for server to become available with health checking."""
        url = f"http://{host}:{port}/mcp"
        start_time = time.time()

        while time.time() - start_time < timeout:
            try:
                # Try to connect to the MCP endpoint
                # We expect a 406 "Not Acceptable" response which means the server is running
                # but needs proper MCP headers (this confirms the MCP server is active)
                response = requests.get(url, timeout=1)
                if response.status_code == 406:  # MCP server expects proper headers
                    logger.info(f"Server ready at {host}:{port}")
                    returnTrue
            except requests.RequestException:
                pass

            time.sleep(0.5)

        logger.warning(f"Server at {host}:{port} not ready within {timeout}s")
        returnFalse

    defstop_all_servers(self) -> None:
        """Stop all managed server processes gracefully."""
        for process inself.processes:
            try:
                process.terminate()  # Send SIGTERM for graceful shutdown
                process.wait(timeout=5)  # Wait up to 5 seconds
                logger.info(f"Stopped server process {process.pid}")
            except Exception as e:
                logger.error(f"Error stopping process {process.pid}: {e}")
                try:
                    process.kill()  # Force kill if graceful shutdown fails
                except:
                    pass

        self.processes.clear()

# Global launcher instance
launcher = ServerLauncher()

要点:

  • • 进程管理:跟踪已启动的服务器进程,便于清理
  • • 健康检查:在宣告成功前确保服务器可响应
  • • 优雅关闭:优先 SIGTERM,必要时 SIGKILL
  • • 超时处理:避免启动时挂起
  • • HTTP 状态校验:识别 406 为 MCP 服务器的健康信号

配置管理

config/servers.json

定义客户端可连接的所有 MCP 服务器,包含传输类型(HTTP 或 stdio)、连接参数与描述。

{
  "mcpServers":{
    "temperature_converter":{
      "type":"http",
      "url":"http://localhost:8000/mcp/",
      "description":"Temperature conversion utilities supporting Celsius, Fahrenheit, and Kelvin"
    },
    "terminal":{
      "type":"stdio",
      "command":"python",
      "args":["servers/stdio/terminal_server.py"],
      "description":"Local terminal command execution for file operations"
    }
}
}

说明:

  • • temperature_converter:运行于 localhost:8000 的 HTTP 服务器,提供温度转换工具
  • • terminal:通过标准输入/输出与客户端通信的 Stdio 服务器
  • • 每个条目包含类型、连接参数与人类可读的描述

src/utils/config_loader.py

负责加载与校验服务器配置,集中式管理并具备完备的错误处理。

"""Configuration management utilities."""

import os
import json
import logging
from pathlib import Path
from typing import Dict, Any
from dotenv import load_dotenv

# Load environment variables
load_dotenv()

logger = logging.getLogger(__name__)

class ConfigLoader:
    """Handles loading and validation of MCP server configurations."""

    def __init__(self, config_path: str = None):
        """Initialize with optional config path override."""
        self.config_path = self._resolve_config_path(config_path)
        self._config_cache = None

    def _resolve_config_path(self, config_path: str = None-> Path:
        """Resolve configuration file path with fallbacks."""
        if config_path:
            returnPath(config_path)

        # Try environment variable
        env_path = os.getenv("MCP_CONFIG_PATH")
        if env_path:
            returnPath(env_path)

        # Default to config/servers.json
        project_root = Path(__file__).parent.parent.parent
        return project_root / "config" / "servers.json"

    def load_config(self-> Dict[str, Any]:
        """Load and cache configuration from JSON file."""
        ifself._config_cache is not None:
            returnself._config_cache

        try:
            if not self.config_path.exists():
                raise FileNotFoundError(f"Config file not found: {self.config_path}")

            with open(self.config_path, 'r', encoding='utf-8') as f:
                self._config_cache = json.load(f)

            logger.info(f"Configuration loaded from: {self.config_path}")
            return self._config_cache

        except Exception as e:
            logger.error(f"Failed to load configuration: {e}")
            raise

    def get_servers(self) -> Dict[str, Dict[str, Any]]:
        """Get MCP server configurations."""
        config = self.load_config()
        return config.get("mcpServers", {})

    def validate_server_config(self, server_name: str, server_config: Dict[str, Any]) -> bool:
        """Validate individual server configuration."""
        required_fields = ["type"]

        for field in required_fields:
            if field not in server_config:
                logger.error(f"Server '
{server_name}' missing required field: {field}")
                return False

        server_type = server_config["type"]

        if server_type == "http":
            if "url" not in server_config:
                logger.error(f"HTTP server '
{server_name}' missing 'url' field")
                return False

        elif server_type == "stdio":
            if "command" not in server_config:
                logger.error(f"Stdio server '
{server_name}' missing 'command' field")
                return False
        else:
            logger.error(f"Server '
{server_name}' has unsupported type: {server_type}")
            return False

        return True

# Global config loader instance
config_loader = ConfigLoader()

要点:ConfigLoader 采用“智能路径解析”,从函数参数、环境变量、默认路径多处查找配置;内置缓存(_config_cache)避免重复读盘。validate_server_config() 根据传输类型校验必填字段:HTTP 必须有 URL,Stdio 必须有命令,避免因配置不当导致的运行时错误。

Agent 实现

该模块负责管理 Google ADK Agent,并协调连接多个 MCP 服务器。

职责:服务器发现、工具加载,并向客户端提供统一接口。

src/agent/agent_wrapper.py

"""
Agent Wrapper
Manages the Google ADK agent and MCP toolset connections with enhanced error handling and debugging.
"""


import asyncio
import logging
import os
from typing importListOptionalDictAny
from pathlib import Path

# Google ADK imports
from google.adk.agents.llm_agent import LlmAgent
from google.adk.tools.mcp_tool.mcp_toolset import MCPToolset
from google.adk.tools.mcp_tool.mcp_session_manager import StreamableHTTPServerParams
from google.adk.tools.mcp_tool import StdioConnectionParams
from mcp import StdioServerParameters

# Local imports
from src.utils.config_loader import config_loader
from src.utils.formatters import formatter

logger = logging.getLogger(__name__)

classAgentWrapper:
    """
    Enhanced wrapper for Google ADK agent with MCP toolset management.

    This class orchestrates the connection between the ADK agent and multiple MCP servers,
    providing automatic server discovery, connection health monitoring, and tool filtering.
    """


    def__init__(self, tool_filter: Optional[List[str]] = None):
        """
        Initialize the agent wrapper.

        Args:
            tool_filter: Optional list of tool names to allow. If None, all tools are loaded.
        """

        self.tool_filter = tool_filter
        self.agent: Optional[LlmAgent] = None
        self.toolsets: List[MCPToolset] = []
        self.server_status: Dict[strstr] = {}

        logger.info("AgentWrapper initialized")
        if tool_filter:
            logger.info(f"Tool filter active: {tool_filter}")

    asyncdefbuild(self) -> None:
        """
        Build the ADK agent with MCP toolsets.

        This method orchestrates the entire agent building process:
        1. Loads server configurations from config file
        2. Establishes connections to each configured server
        3. Discovers and filters available tools from each server
        4. Creates the ADK agent with all loaded toolsets
        """

        logger.info("Building agent with MCP toolsets...")

        try:
            # Load toolsets from all configured servers
            toolsets = awaitself._load_toolsets()

            ifnot toolsets:
                logger.warning("No toolsets loaded - agent will have no tools available")

            # Create the ADK agent with Gemini 2.0 Flash Exp
            self.agent = LlmAgent(
                model="gemini-2.0-flash-exp",  # Use latest Gemini model
                name="universal_mcp_assistant",
                instruction=self._get_agent_instruction(),
                tools=toolsets  # Provide all loaded toolsets
            )

            self.toolsets = toolsets
            logger.info(f"Agent built successfully with {len(toolsets)} toolsets")

        except Exception as e:
            logger.error(f"Failed to build agent: {e}")
            raise

    def_get_agent_instruction(self) -> str:
        """Get the system instruction that defines the agent's behavior and capabilities."""
        return"""You are a helpful assistant with access to temperature conversion tools and local file operations.

Your capabilities include:
- Converting temperatures between Celsius, Fahrenheit, and Kelvin
- Executing local commands for file operations
- Providing detailed explanations of conversions and formulas

When handling temperature conversions:
- Always validate input values for physical reasonableness
- Show the conversion formula used
- Round results to appropriate precision
- Handle multiple conversions in sequence when requested

When working with files:
- Use the terminal tools to create, read, and modify files
- Format output clearly and professionally
- Confirm successful file operations

Be precise, helpful, and educational in your responses. Show your work and explain the steps you're taking."""


    asyncdef_load_toolsets(self) -> List[MCPToolset]:
        """
        Load toolsets from all configured MCP servers.

        This method iterates through all configured servers, attempts to connect
        to each one, and loads their available tools into MCPToolset instances.

        Returns:
            List of successfully connected MCPToolset instances.
        """

        servers = config_loader.get_servers()
        toolsets = []

        logger.info(f"Loading toolsets from {len(servers)} configured servers...")

        for server_name, server_config in servers.items():
            try:
                # Validate server configuration first
                ifnot config_loader.validate_server_config(server_name, server_config):
                    self.server_status[server_name] = "invalid_config"
                    continue

                # Create connection parameters based on server type
                connection_params = awaitself._create_connection_params(
                    server_name, server_config
                )

                ifnot connection_params:
                    self.server_status[server_name] = "connection_failed"
                    continue

                # Create MCPToolset and connect to server
                toolset = MCPToolset(
                    connection_params=connection_params,
                    tool_filter=self.tool_filter  # Apply tool filtering if specified
                )

                # Test connection by attempting to get available tools
                tools = await toolset.get_tools()
                tool_names = [tool.name for tool in tools]

                if tools:
                    toolsets.append(toolset)
                    self.server_status[server_name] = "connected"
                    formatter.print_tool_summary(server_name, tool_names)
                    logger.info(f"Connected to {server_name}{len(tool_names)} tools loaded")
                else:
                    logger.warning(f"No tools found on server '{server_name}'")
                    self.server_status[server_name] = "no_tools"

            except Exception as e:
                logger.error(f"Failed to connect to server '{server_name}': {e}")
                self.server_status[server_name] = f"error: {str(e)}"
                continue

        logger.info(f"Successfully loaded {len(toolsets)} toolsets")
        return toolsets

    asyncdef_create_connection_params(
        self,
        server_name: str,
        server_config: Dict[strAny]
    
) -> Optional[Any]:
        """
        Create appropriate connection parameters based on server transport type.

        Args:
            server_name: Name of the server for logging
            server_config: Server configuration dictionary

        Returns:
            Connection parameters object or None if creation failed
        """

        server_type = server_config["type"]

        try:
            if server_type == "http":
                # Create HTTP connection parameters for streamable HTTP servers
                return StreamableHTTPServerParams(url=server_config["url"])

            elif server_type == "stdio":
                # Create stdio connection parameters for local process servers
                command = server_config["command"]
                args = server_config.get("args", [])

                # Resolve relative paths to absolute paths
                if args:
                    project_root = Path(__file__).parent.parent.parent
                    resolved_args = []
                    for arg in args:
                        ifnot os.path.isabs(arg) and arg.endswith('.py'):
                            resolved_args.append(str(project_root / arg))
                        else:
                            resolved_args.append(arg)
                    args = resolved_args

                return StdioConnectionParams(
                    server_params=StdioServerParameters(
                        command=command,
                        args=args
                    ),
                    timeout=10# Connection timeout
                )
            else:
                raise ValueError(f"Unsupported server type: {server_type}")

        except Exception as e:
            logger.error(f"Error creating connection params for '{server_name}': {e}")
            returnNone

    asyncdefclose(self) -> None:
        """
        Gracefully close all toolset connections and cleanup resources.
        """

        logger.info("Shutting down agent and closing toolset connections...")

        for i, toolset inenumerate(self.toolsets):
            try:
                await toolset.close()
                logger.debug(f"Closed toolset {i+1}")
            except Exception as e:
                logger.error(f"Error closing toolset {i+1}{e}")

        self.toolsets.clear()
        self.agent = None

        # Small delay to ensure cleanup completes
        await asyncio.sleep(0.5)
        logger.info("Agent shutdown complete")

    defget_server_status(self) -> Dict[strstr]:
        """Get the current connection status of all configured servers."""
        returnself.server_status.copy()

    defis_ready(self) -> bool:
        """Check if the agent is properly initialized and ready for use."""
        returnself.agent isnot None

要点:AgentWrapper 是中枢协调器,把 ADK Agent 连接到多个 MCP 服务器;_load_toolsets() 迭代配置的服务器,尝试连接并收集工具,MCPToolset 封装连接以供 Agent 使用;_create_connection_params() 按传输类型构造连接参数(HTTP 用 URL,Stdio 用命令参数),路径解析确保相对路径在不同运行位置都有效;LlmAgent 将模型、系统提示与工具集装配成能无缝使用多服务器的智能 Agent。

客户端实现

客户端作为 UI 与 Agent 的协调层,提供会话管理、流式响应与细粒度调试。

src/client/mcp_client.py

"""
MCP Client
Enhanced client interface with detailed debugging of MCP interactions.
"
""

import logging
from typing import Optional, List, AsyncGenerator, Any
from google.genai.types import Content, Part
from google.adk.runners import Runner
from google.adk.sessions import InMemorySessionService

from src.agent.agent_wrapper import AgentWrapper
from src.utils.formatters import formatter

logger = logging.getLogger(__name__)

class MCPClient:
    """
    Enhanced MCP client with session management, streaming responses, and detailed debugging.

    This client provides a high-level interface for interacting with the ADK agent
    while showing detailed information about MCP server interactions.
    "
""

    def __init__(
        self,
        app_name: str = "universal_mcp_client",
        user_id: str = "default_user",
        session_id: str = "default_session",
        tool_filter: Optional[List[str]] = None,
        debug_mode: bool = False
    ):
        """
        Initialize the MCP client with debugging capabilities.

        Args:
            app_name: Application identifier for ADK
            user_id: User identifier for session context
            session_id: Session identifier for conversation context
            tool_filter: Optional list of allowed tool names
            debug_mode: Enable detailed debugging of MCP interactions
        "
""
        self.app_name = app_name
        self.user_id = user_id
        self.session_id = session_id
        self.debug_mode = debug_mode

        # Initialize core components
        self.session_service = InMemorySessionService()
        self.agent_wrapper = AgentWrapper(tool_filter=tool_filter)
        self.runner: Optional[Runner] = None

        # State tracking
        self.is_initialized = False

        logger.info(f"MCPClient initialized for user '{user_id}', session '{session_id}'")
        if debug_mode:
            logger.info("Debug mode enabled - detailed MCP interactions will be shown")

    async def initialize(self->None:
        """
        Initialize the client session and agent.

        This method must be called before using send_message().
        It sets up the session, builds the agent, and prepares the runner.
        "
""
        ifself.is_initialized:
            logger.warning("Client already initialized")
            return

        try:
            logger.info("Initializing MCP client...")

            # Create ADK session
            awaitself.session_service.create_session(
                app_name=self.app_name,
                user_id=self.user_id,
                session_id=self.session_id
            )
            logger.debug("ADK session created")

            # Build agent with all MCP toolsets
            awaitself.agent_wrapper.build()

            if not self.agent_wrapper.is_ready():
                raise RuntimeError("Agent failed to initialize properly")

            # Create runner to handle agent execution
            self.runner = Runner(
                agent=self.agent_wrapper.agent,
                app_name=self.app_name,
                session_service=self.session_service
            )

            self.is_initialized = True
            logger.info("MCP client initialized successfully")

            # Print server status summary
            status = self.agent_wrapper.get_server_status()
            connected = sum(1forsin status.values() if s == "connected")
            logger.info(f"Server status: {connected}/{len(status)} servers connected")

        except Exception as e:
            logger.error(f"Failed to initialize MCP client: {e}")
            awaitself.shutdown()
            raise

    async def send_message(self, message: str-> AsyncGenerator[Any, None]:
        """
        Send a message to the agent and stream the response with detailed debugging.

        Args:
            message: User message to send to the agent

        Yields:
            Streaming response events from the agent with MCP interaction details

        Raises:
            RuntimeError: If client is not initialized
        "
""
        if not self.is_initialized:
            raise RuntimeError("Client not initialized. Call initialize() first.")

        if not message.strip():
            raise ValueError("Message cannot be empty")

        logger.info(f"Sending message: {message[:100]}{'...' if len(message) > 100 else ''}")

        try:
            # Create content object forADK
            content = Content(
                role="user",
                parts=[Part(text=message)]
            )

            event_count = 0
            # Send to agent and yield streaming responses with debugging
            asyncforeventinself.runner.run_async(
                user_id=self.user_id,
                session_id=self.session_id,
                new_message=content
            ):
                event_count += 1

                # Show detailed debugging information
                ifself.debug_mode:
                    formatter.print_json_response(event, f"Event #{event_count}")
                    self._analyze_event(event, event_count)

                yield event

        except Exception as e:
            logger.error(f"Error sending message: {e}")
            raise

    def _analyze_event(self, event: Any, event_count: int) ->None:
        """
        Analyze and display detailed information about MCP events.

        Args:
            event: The event object from the agent
            event_count: Sequential event number for tracking
        "
""
        try:
            # Check if this is a tool-related event
            ifhasattr(event, 'tool_calls') and event.tool_calls:
                for tool_call in event.tool_calls:
                    formatter.print_mcp_interaction(
                        "tool_call",
                        {
                            "tool_name": tool_call.name if hasattr(tool_call, '
name') else "Unknown",
                            "parameters": tool_call.args if hasattr(tool_call, '
args') else {},
                            "server": "MCP Server",
                            "event_number": event_count
                        }
                    )

            # Check for tool responses
            if hasattr(event, '
tool_responses') and event.tool_responses:
                for tool_response in event.tool_responses:
                    formatter.print_mcp_interaction(
                        "tool_response",
                        {
                            "tool_name": getattr(tool_response, '
name', 'Unknown'),
                            "result": str(tool_response.content) if hasattr(tool_response, '
content') else "No result",
                            "status": "success" if not hasattr(tool_response, '
error') else "error",
                            "event_number": event_count
                        }
                    )

            # Check for agent thinking/processing
            if hasattr(event, '
content') and hasattr(event.content, 'parts'):
                if event.content.parts and not getattr(event, '
is_final_response', lambda: False)():
                    formatter.print_mcp_interaction(
                        "agent_thinking",
                        {
                            "content": event.content.parts[0].text if event.content.parts else "Processing...",
                            "event_number": event_count
                        }
                    )

            # Check for final response
            if hasattr(event, '
is_final_response') and event.is_final_response():
                content = ""
                if hasattr(event, '
content') and hasattr(event.content, 'parts') and event.content.parts:
                    content = event.content.parts[0].text

                formatter.print_mcp_interaction(
                    "final_response",
                    {
                        "content": content,
                        "event_number": event_count
                    }
                )

        except Exception as e:
            logger.debug(f"Error analyzing event {event_count}: {e}")

    def toggle_debug_mode(self) -> bool:
        """Toggle debug mode on/off and return new state."""
        self.debug_mode = not self.debug_mode
        logger.info(f"Debug mode {'
enabled' if self.debug_mode else 'disabled'}")
        return self.debug_mode

    async def shutdown(self) -> None:
        """
        Gracefully shutdown the client and cleanup all resources.
        """
        logger.info("Shutting down MCP client...")

        try:
            if self.agent_wrapper:
                await self.agent_wrapper.close()

            # Reset state
            self.runner = None
            self.is_initialized = False

            logger.info("MCP client shutdown complete")

        except Exception as e:
            logger.error(f"Error during shutdown: {e}")

    def get_status(self) -> dict:
        """
        Get comprehensive client status information.

        Returns:
            Dictionary with detailed client status information
        """
        status = {
            "initialized": self.is_initialized,
            "debug_mode": self.debug_mode,
            "app_name": self.app_name,
            "user_id": self.user_id,
            "session_id": self.session_id,
            "agent_ready": self.agent_wrapper.is_ready() if self.agent_wrapper else False,
            "server_status": self.agent_wrapper.get_server_status() if self.agent_wrapper else {}
        }

        return status

要点:MCPClient 管理会话生命周期,创建 ADK 会话、构建 Agent 并负责清理;send_message() 负责实时流式传输,将用户输入转为 ADK Content,经 Runner 发送给 Agent,并随着事件发生滚动输出;调试模式可“透视”工具调用、参数与响应。_analyze_event() 解析流事件,定位工具调用、响应与“思考过程”,帮助理解决策与数据流。

具备增强调试功能的 CLI 实现

该入口提供交互式命令行界面,实时展示 MCP 客户端与服务器的交互细节。

cli/main.py

"""
CLI Main Entry Point
Enhanced command-line interface with detailed MCP interaction debugging.
"""


import asyncio
import logging
import os
import sys
from pathlib import Path
from typing importOptional

# Add project root to Python path
project_root = Path(__file__).parent.parent
sys.path.insert(0str(project_root))

from src.client.mcp_client import MCPClient
from src.utils.formatters import formatter
from servers.http.server_launcher import launcher

# Configuration
DEFAULT_TOOLS = [
    'celsius_to_fahrenheit',
    'fahrenheit_to_celsius',
    'celsius_to_kelvin',
    'kelvin_to_celsius',
    'fahrenheit_to_kelvin',
    'kelvin_to_fahrenheit',
    'run_command'
]

# Configure logging with detailed format
logging.basicConfig(
    level=logging.INFO,
    format="%(asctime)s - %(name)s - %(levelname)s - %(message)s",
    handlers=[
        logging.StreamHandler(),
        logging.FileHandler("mcp_client.log")
    ]
)

# Suppress verbose logs from external libraries
logging.getLogger("httpx").setLevel(logging.WARNING)
logging.getLogger("google").setLevel(logging.WARNING)

logger = logging.getLogger(__name__)

classMCPClientCLI:
    """Enhanced CLI for MCP Client with detailed debugging and server management."""

    def__init__(self):
        self.client: Optional[MCPClient] = None
        self.server_started = False
        self.debug_mode = True# Enable debug mode by default

    asyncdefstart_servers(self) -> bool:
        """Start required HTTP servers with health monitoring."""
        try:
            logger.info("Starting HTTP servers...")

            # Start temperature server
            if launcher.start_temperature_server(port=8000):
                self.server_started = True
                logger.info("Temperature server started successfully")
                returnTrue
            else:
                logger.error("Failed to start temperature server")
                returnFalse

        except Exception as e:
            logger.error(f"Error starting servers: {e}")
            returnFalse

    asyncdefinitialize_client(self) -> bool:
        """Initialize the MCP client with debugging enabled."""
        try:
            logger.info("Initializing MCP client...")

            self.client = MCPClient(
                app_name="universal_mcp_client",
                user_id="cli_user_001",
                session_id="cli_session_001",
                tool_filter=DEFAULT_TOOLS,
                debug_mode=self.debug_mode  # Enable debugging
            )

            awaitself.client.initialize()
            logger.info("MCP client initialized")
            returnTrue

        except Exception as e:
            logger.error(f"Failed to initialize client: {e}")
            formatter.print_error("Client initialization failed", e)
            returnFalse

    asyncdefchat_loop(self) -> None:
        """Main interactive chat loop with enhanced debugging."""
        formatter.print_welcome_banner()

        print("\nChat started. Commands:")
        print("  - Type your requests in natural language")
        print("  - 'status' - Show system status")
        print("  - 'debug on/off' - Toggle detailed debugging")
        print("  - 'help' - Show example requests")
        print("  - 'quit', 'exit', ':q' - Exit the application\n")

        try:
            whileTrue:
                try:
                    # Get user input
                    user_input = input("You: ").strip()

                    ifnot user_input:
                        continue

                    # Handle special commands
                    if user_input.lower() in ['quit''exit'':q']:
                        print("Goodbye!")
                        break
                    elif user_input.lower() == 'status':
                        self._show_status()
                        continue
                    elif user_input.lower().startswith('debug'):
                        self._handle_debug_command(user_input)
                        continue
                    elif user_input.lower() == 'help':
                        self._show_help()
                        continue

                    # Send message to agent with debugging
                    awaitself._handle_user_message(user_input)

                except KeyboardInterrupt:
                    print("\n\nInterrupted by user. Goodbye!")
                    break
                except EOFError:
                    print("\n\nEnd of input. Goodbye!")
                    break
                except Exception as e:
                    logger.error(f"Error in chat loop: {e}")
                    formatter.print_error("An error occurred", e)

        except Exception as e:
            logger.error(f"Critical error in chat loop: {e}")
            formatter.print_error("Critical error occurred", e)

    asyncdef_handle_user_message(self, message: str) -> None:
        """Handle a user message with detailed MCP interaction debugging."""
        event_count = 0
        final_response = None

        try:
            print(f"\nAssistant: Processing your request...")
            ifself.debug_mode:
                print("[DEBUG MODE] Showing detailed MCP interactions:\n")

            asyncfor event inself.client.send_message(message):
                event_count += 1

                # In debug mode, detailed interactions are shown by the client
                # Here we just count events and look for the final response
                ifhasattr(event, 'is_final_response'and event.is_final_response():
                    final_response = event
                    break

            # Display final response
            if final_response andhasattr(final_response, 'content'):
                ifhasattr(final_response.content, 'parts'and final_response.content.parts:
                    response_text = final_response.content.parts[0].text
                    print(f"\nFinal Response:\n{response_text}\n")
                else:
                    print("Task completed (no text response)\n")
            else:
                print("No final response received\n")

            ifself.debug_mode:
                print(f"Total events processed: {event_count}")

        except Exception as e:
            logger.error(f"Error handling message: {e}")
            formatter.print_error("Failed to process message", e)

    def_handle_debug_command(self, command: str) -> None:
        """Handle debug mode toggle commands."""
        parts = command.split()
        iflen(parts) > 1:
            mode = parts[1].lower()
            if mode == 'on':
                self.debug_mode = True
                ifself.client:
                    self.client.debug_mode = True
                print("Debug mode enabled - detailed MCP interactions will be shown")
            elif mode == 'off':
                self.debug_mode = False
                ifself.client:
                    self.client.debug_mode = False
                print("Debug mode disabled - only final responses will be shown")
            else:
                print("Usage: debug on/off")
        else:
            current_state = "enabled"ifself.debug_mode else"disabled"
            print(f"Debug mode is currently {current_state}")

    def_show_status(self) -> None:
        """Show current system status with detailed information."""
        ifnotself.client:
            print("Client not initialized")
            return

        status = self.client.get_status()

        print("\nSystem Status:")
        print(f"  - Client initialized: {'[OK]' if status['initialized'else '[FAIL]'}")
        print(f"  - Agent ready: {'[OK]' if status['agent_ready'else '[FAIL]'}")
        print(f"  - Debug mode: {'ON' if status['debug_mode'else 'OFF'}")
        print(f"  - User: {status['user_id']}")
        print(f"  - Session: {status['session_id']}")

        print("\nServer Status:")
        for server_name, server_status in status['server_status'].items():
            status_icon = "[OK]"if server_status == "connected"else"[FAIL]"
            print(f"  - {server_name}{status_icon} {server_status}")
        print()

    def_show_help(self) -> None:
        """Show example requests and debugging tips."""
        examples = [
            "Convert 25 degrees Celsius to Fahrenheit",
            "What is 100°F in Celsius and Kelvin?",
            "Convert 300 Kelvin to Celsius and Fahrenheit",
            "Convert 0°C to all other temperature scales",
            "Convert room temperature (20°C) to Fahrenheit and save to file",
            "Create a temperature conversion table for 0, 25, 50, 75, 100°C"
        ]

        debug_tips = [
            "Use 'debug on' to see detailed MCP server interactions",
            "Watch for tool calls, parameters, and server responses",
            "Each event shows the communication between client and servers",
            "Use 'debug off' to see only final responses"
        ]

        print("\nExample Requests:")
        for i, example inenumerate(examples, 1):
            print(f"  {i}{example}")

        print("\nDebug Mode Tips:")
        for i, tip inenumerate(debug_tips, 1):
            print(f"  {i}{tip}")
        print()

    asyncdefcleanup(self) -> None:
        """Cleanup resources and stop servers."""
        try:
            ifself.client:
                awaitself.client.shutdown()

            ifself.server_started:
                launcher.stop_all_servers()

        except Exception as e:
            logger.error(f"Error during cleanup: {e}")

asyncdefmain():
    """Main entry point for the CLI application with enhanced debugging."""
    cli = MCPClientCLI()

    try:
        # Start servers
        print("Starting Universal MCP Client with Debug Mode...")

        ifnotawait cli.start_servers():
            print("Failed to start required servers")
            return1

        # Initialize client
        ifnotawait cli.initialize_client():
            print("Failed to initialize client")
            return1

        # Start chat loop with debugging
        await cli.chat_loop()

        return0

    except KeyboardInterrupt:
        print("\nInterrupted by user")
        return130
    except Exception as e:
        logger.error(f"Unexpected error: {e}")
        formatter.print_error("Unexpected error occurred", e)
        return1
    finally:
        await cli.cleanup()

if __name__ == "__main__":
    try:
        exit_code = asyncio.run(main())
        sys.exit(exit_code)
    except asyncio.CancelledError:
        # Suppress cancelled error messages during shutdown
        logger.debug("Main coroutine cancelled during shutdown")
        sys.exit(0)

要点:MCPClientCLI 串起全局;start_servers() 启动所需 HTTP 服务并持续监控;chat_loop() 提供 REPL,支持 statusdebug on/offhelp 等“内置命令”;_handle_user_message() 将输入流式发送到客户端并汇聚事件;_handle_debug_command 支持随时切换详细/简洁模式;cleanup() 在退出时回收资源。

带调试信息的响应格式化

该模块用于将不同类型的响应美观化输出,包括展示客户端-服务器交互细节的调试信息。

我们使用 Rich 库生成配色清晰的控制台输出。详见 src/utils/formatter.py

测试最终的 MCP 客户端工作流

通过实际示例来体验通用 MCP 客户端:

启动系统

cd mcp/04-build-streammable-http-mcp-client
source .venv/bin/activate

uv run python cli/main.py

系统将自动:

  1. 1. 启动温度转换 HTTP 服务器
  2. 2. 连接 Stdio 终端服务器
  3. 3. 加载所有可用工具
  4. 4. 在调试模式下展示交互式聊天界面
连接建立后的终端输出示意
连接建立后的终端输出示意

一旦与服务器建立连接,你将在终端看到如下输出。

带调试信息的交互示例

多重转换并写入文件:

You: Convert 100°F to Celsius and Kelvin, then save results to a file temp_conv.txt

预期响应:


**Assistant**Processing your request...

**[DEBUGMODE]**Showing detailed MCP interactions:

## Event #1-ParallelFunctionCalls
```json
{
"content": {
    "parts": [
      {
        "function_call": {
          "id""adk-3e153461-264d-4242-aabf-d796468119bb",
          "args": {
            "params": {
              "temperature"100
            }
          },
          "name""fahrenheit_to_celsius"
        }
      },
      {
        "function_call": {
          "id""adk-6dc14eb3-d76e-41a4-96cc-b9d9113b1903",
          "args": {
            "params": {
              "temperature"100
            }
          },
          "name""fahrenheit_to_kelvin"
        }
      }
    ],
    "role""model"
  },
"usage_metadata": {
    "candidates_token_count"20,
    "prompt_token_count"613,
    "total_token_count"633
  },
"timestamp"1754508151.836657
}
```

## Event #2-ConversionResults

```json
{
"content": {
    "parts": [
      {
        "function_response": {
          "id""adk-3e153461-264d-4242-aabf-d796468119bb",
          "name""fahrenheit_to_celsius",
          "response": {
            "result": {
              "structuredContent": {
                "original_value"100.0,
                "original_scale""Fahrenheit",
                "converted_value"37.78,
                "converted_scale""Celsius",
                "formula""°C = (°F - 32) × 5/9"
              },
              "isError"false
            }
          }
        }
      },
      {
        "function_response": {
          "id""adk-6dc14eb3-d76e-41a4-96cc-b9d9113b1903",
          "name""fahrenheit_to_kelvin",
          "response": {
            "result": {
              "structuredContent": {
                "original_value"100.0,
                "original_scale""Fahrenheit",
                "converted_value"310.93,
                "converted_scale""Kelvin",
                "formula""K = (°F - 32) × 5/9 + 273.15"
              },
              "isError"false
            }
          }
        }
      }
    ],
    "role""user"
  },
"timestamp"1754508152.818855
}
```

## Event #3-FileSaveCommand

```json
{
"content": {
    "parts": [
      {
        "text""Okay, 100°F is equal to 37.78°C and 310.93K.\n\nHere's the conversion information:\n* Fahrenheit to Celsius: °C = (°F - 32) × 5/9\n* Fahrenheit to Kelvin: K = (°F - 32) × 5/9 + 273.15\n\nNow, I will save the results to a file named `temp_conv.txt`."
      },
      {
        "function_call": {
          "id""adk-696edb80-7cad-4815-a3ca-11b7d1884aae",
          "args": {
            "params": {
              "command""echo '100°F = 37.78°C = 310.93K' > temp_conv.txt"
            }
          },
          "name""run_command"
        }
      }
    ],
    "role""model"
  },
"usage_metadata": {
    "candidates_token_count"147,
    "prompt_token_count"894,
    "total_token_count"1041
  },
"timestamp"1754508152.854623
}
```

**AgentThinking**Okay100°Fis equal to 37.78°C and 310.93K.

Here's the conversion information:

-Fahrenheit to Celsius°C= (°F-32×5/9
-Fahrenheit to KelvinK= (°F-32×5/9+273.15

NowI will save the results to a file named `temp_conv.txt`.

## Event #4-CommandExecutionResult

```json
{
"content": {
    "parts": [
      {
        "function_response": {
          "id""adk-696edb80-7cad-4815-a3ca-11b7d1884aae",
          "name""run_command",
          "response": {
            "result": {
              "structuredContent": {
                "command""echo '100°F = 37.78°C = 310.93K' > temp_conv.txt",
                "exit_code"0,
                "stdout""",
                "stderr""",
                "working_directory""/Users/piyushagni5/Desktop/github/langgraph-ai/mcp/04-build-streammable-http-mcp-client/workspace"
              },
              "isError"false
            }
          }
        }
      }
    ],
    "role""user"
  },
"timestamp"1754508155.788237
}
```

## Event #5-FinalResponse

```json
{
"content": {
    "parts": [
      {
        "text""I have saved the converted temperatures to the file `temp_conv.txt`.\n"
      }
    ],
    "role""model"
  },
"usage_metadata": {
    "candidates_token_count"17,
    "prompt_token_count"1241,
    "total_token_count"1258
  },
"timestamp"1754508155.805442
}
```

---

## FinalResponse

**I have saved the converted temperatures to the file `temp_conv.txt`.**

**ConversionResults:**

-100°F=37.78°C=310.93K

**FormulasUsed:**

-Fahrenheit to Celsius°C= (°F-32×5/9
-Fahrenheit to KelvinK= (°F-32×5/9+273.15

---

**Summary**5 total events processed

下面是成功执行查询后得到的最终响应截图:

执行完所有事件后的预期响应
执行完所有事件后的预期响应

高级功能

为保证调试顺畅与校验严谨,系统提供多项增强能力:

服务器健康监控

客户端持续监控服务器健康并显示连接状态:

  • • connected:服务器健康且响应正常
  • • connection_failed:连接失败
  • • invalid_config:配置无效
  • • no_tools:服务器已连接,但没有可用工具

工具过滤

可限制 Agent 可访问的工具集合:


DEFAULT_TOOLS = [
'celsius_to_fahrenheit',
'fahrenheit_to_celsius',
'run_command' # Only allow specific tools
]

错误处理

系统能优雅处理多类异常:

  • • 服务器超时:命令执行默认 30 秒超时
  • • 无效温度:验证阻止物理不可能值
  • • 连接失败:自动跳过失败服务器,继续运行
  • • 工具错误:在调试模式输出详细错误信息

结论

通用 MCP 客户端充分展示了 MCP 生态的灵活性。得益于同时支持 HTTP 与 Stdio 两种传输,它能把本地工具与远程服务无缝纳入统一接口,胜任复杂的多步操作。增强的调试能力清晰呈现客户-服务器的交互细节,便于:

  • • 理解流程:观察用户请求如何转化为工具调用
  • • 调试问题:定位 MCP 链路中问题的具体环节
  • • 监控性能:跟踪响应时间与服务器健康
  • • 学习 MCP:通过真实交互理解协议

致谢: Anthropicmcp-server





 

读完这篇信息量爆炸的“通用客户端”构建指南,你是否一方面为MCP的强大潜力感到震撼,另一方面又对其中复杂的组件(CLI、Client、Agent Core、Config...)感到一丝畏惧,不知从何学起?

别担心,你不是一个人。任何强大的系统都是从坚实的基础开始的。

这篇文章为你展示了山顶的风景,而活水智能的 「MCP101:扩展你的AI大模型能力」 课程,则为你铺设了一条通往山顶的坚实台阶。我们将从最基础的MCP概念(CH01)讲起,逐步带你了解其工作原理(CH02)和生态(CH03),并最终在课程的最后一章(CH05),亲手带你从0到1,开发出自己的第一个MCP。

 ■

立即报名,开发你的第一个MCP!
立即报名,开发你的第一个MCP!
立即报名,开发你的第一个MCP!

图片

图片
 点击阅读原文,立即报名「MCP101

53AI,企业落地大模型首选服务商

产品:场景落地咨询+大模型应用平台+行业解决方案

承诺:免费POC验证,效果达标后再合作。零风险落地应用大模型,已交付160+中大型企业

联系我们

售前咨询
186 6662 7370
预约演示
185 8882 0121

微信扫码

添加专属顾问

回到顶部

加载中...

扫码咨询