微信扫码
添加专属顾问
我要投稿
一文掌握如何构建通用MCP客户端,无缝连接本地与远程AI工具,提升开发效率。核心内容: 1. 通用MCP客户端的设计理念与架构组成 2. 基于Google ADK与Gemini的实现方法 3. 实际应用场景与效果演示
点击上方↗️「活水智能」,关注 + 星标🌟
作者:Piyush Agnihotri
编译:活水智能
与传统只支持单一传输类型的客户端不同,本文将带你构建一个通用的 MCP(Model Context Protocol,模型上下文协议)客户端,既能连接本地的 Stdio 服务器,也能连接远程的 可流式 HTTP 服务器。单一客户端即可通过统一接口访问本地计算工具、远程 API 服务以及介于两者之间的各类能力。
我们将基于 Google ADK(Agent Development Kit,代理开发套件)与 Gemini,构建一个支持可流式 HTTP 传输的通用 MCP 客户端。
MCP(模型上下文协议)正在重塑我们构建可与外部工具和服务交互的 AI Agent 的方式。随着可流式 HTTP 传输协议的引入,MCP 又迈出关键一步——以更稳健的远程部署方案取代早期的 HTTP + SSE(服务器发送事件)。
本文将带你构建一个同时兼容 Stdio 服务器(本地运行) 与 可流式 HTTP 服务器(远程部署) 的通用 MCP 客户端。我们将以 Google ADK 与 Gemini 为基础,完整展示现代 MCP 的强大与灵活。
在深入代码之前,先看看这个客户端能做什么。服务器启动后,客户端会自动从多个来源加载工具:
celsius_to_fahrenheit
、fahrenheit_to_celsius
、celsius_to_kelvin
、kelvin_to_celsius
、fahrenheit_to_kelvin
、kelvin_to_fahrenheit
等工具。run_command
工具。例如,当你提出“把 25°C 转为华氏和开尔文,再把 100°F 转为摄氏,并把所有结果按漂亮格式写入文件”的请求,客户端会依次完成:
celsius_to_fahrenheit(25)
celsius_to_kelvin(25)
fahrenheit_to_celsius(100)
run_command
把结果写入格式化文件整个过程中,客户端会以流事件的形式展示每一次函数调用与响应,让 Agent 的决策过程透明可见。
我们的通用 MCP 客户端由四个主要部分组成:
1. 命令行界面(cli/main.py):作为用户界面,提供自然语言交互,负责处理输入、管理聊天循环并展示来自 Agent 的流式响应。
2. 客户端层(src/client/mcp_client.py):定义 MCP 客户端类,创建会话、构建 Agent,并把来自 UI 的查询路由给 Agent,可视为 UI 与“大脑”之间的交通指挥。
3. Agent 核心(src/agent/agent_wrapper.py):基于 Google ADK(由 Gemini 驱动)实例化 Agent,并借助 ADK 的 MCP 工具集从配置中加载所有可用工具。
4. 配置(config/servers.json):统一管理服务器端点,包括可流式 HTTP 服务器 URL 与本地 Stdio 服务器位置,是声明客户端可访问工具的中心。
下面结合序列图,详细说明这个通用 MCP 客户端的内部工作机制。
序列从一个简单场景开始:用户在 CLI 中输入请求:“将 100°F 转为摄氏与开尔文,并保存到 temp_conv.txt”。这一自然语言命令触发一连串动作。
CLI 将用户请求转发给充当中枢的 MCP 客户端。MCP 客户端查询配置加载器获取服务器配置清单——这里正体现系统的通用性:得到的既包含 HTTP 服务器,也包含 Stdio 服务器。
随后进入关键步骤“基于已加载工具构建 Agent”。MCP 客户端与 Agent 核心通信,后者与两类服务器协作发现可用工具,等同于为系统盘点所有能力。
Agent 核心分析用户请求,制定执行计划,识别出三项关键操作:
fahrenheit_to_celsius
fahrenheit_to_kelvin
run_command
(写文件)Agent 核心并行调用:
fahrenheit_to_celsius(100)
fahrenheit_to_kelvin(100)
两个转换各自独立执行并返回:
相较串行处理,并行显著提升性能。
Agent 核心以流事件向 MCP 客户端返回消息“100°F = 37.78°C 和 310.93K”。随后,它准备写文件命令并发送给 Stdio 服务器:
run_command("echo '100°F = 37.78°C = 310.93K' > temp_conv.txt")
Stdio 服务器在受限的工作区目录执行命令,返回退出码 0 以确认成功。
系统给出“文件已保存”的确认信息:“我已将转换后的温度保存到 temp_conv.txt”。聊天循环结束,用户可继续输入新请求或正常退出。
让我们从项目结构与开发环境开始。
build-universal-mcp-client/
│
├── .env # Environment variables
├── .gitignore # Git ignore patterns
├── README.md # Project documentation
├── requirements.txt # Python dependencies
│
├── config/ # Configuration files
│ ├── __init__.py
│ └── servers.json # MCP server configurations
│
├── src/ # Source code
│ ├── __init__.py
│ ├── agent/ # Agent implementation
│ │ ├── __init__.py
│ │ └── agent_wrapper.py # Agent logic and toolset management
│ ├── client/ # Client implementation
│ │ ├── __init__.py
│ │ └── mcp_client.py # Main client interface
│ └── utils/ # Utilities and helpers
│ ├── __init__.py
│ ├── config_loader.py # Configuration management
│ └── formatters.py # Response formatting utilities
│
├── servers/ # MCP server implementations
│ ├── __init__.py
│ ├── http/ # HTTP/Streamable servers
│ │ ├── __init__.py
│ │ ├── temperature_server.py # Temperature conversion server
│ │ └── server_launcher.py # HTTP server launcher
│ └── stdio/ # Stdio servers
│ ├── __init__.py
│ └── terminal_server.py # Terminal command server
│
├── cli/ # Command line interface
│ ├── __init__.py
│ └── main.py # CLI entry point
│
├── workspace/ # Working directory for file operations
│ └── .gitkeep
│
└── tests/ # Test files (for future expansion)
├── __init__.py
├── test_agent.py
├── test_client.py
└── test_servers.py
项目使用虚拟环境,示例中采用 uv(快速的 Python 包管理器)。
第一步,克隆仓库并创建虚拟环境。
步骤 1:创建虚拟环境
# clone the repo and move to the project directory
git clone https://github.com/piyushagni5/langgraph-ai.git
cd mcp/04-build-streammable-http-mcp-client
# create virtual env using uv package manager
uv venv
# Activate virtual environment (On macOS/Linux)
source .venv/bin/activate # On Windows: .venv\Scripts\activate
步骤 2:安装依赖
虚拟环境创建完成后,安装 requirements.txt
中列出的依赖:
# AI and MCP Core
google-adk>=1.5.0
google-cloud-aiplatform>=1.38.0
google-generativeai>=0.3.0
mcp>=1.0.0
fastmcp>=0.9.0
# Data and Validation
pydantic>=2.5.0
# CLI and UI
click>=8.1.7
rich>=13.7.0
# Environment and Configuration
python-dotenv>=1.0.0
# Async Support
asyncio-mqtt>=0.13.0
# Development (optional)
pytest>=7.4.0
black>=23.0.0
flake8>=6.1.0
安装依赖:
uv pip install -r requirements.txt
步骤 3:环境变量
创建 .env
文件保存敏感信息(如 API Key)。本文使用两项免费服务:Google Gemini API 与 Tavily Search API。前者可在 Google AI Studio 申请(登录 Gmail → “Get API Key” → 同意条款 → 复制密钥至 .env
);后者在 Tavily 官网注册获取。
创建 .env
文件:
# Google AI API Configuration
GOOGLE_API_KEY=your_google_api_key_here
# MCP Configuration
MCP_CONFIG_PATH=config/servers.json
# Logging
LOG_LEVEL=INFO
# Workspace
WORKSPACE_DIR=workspace
该架构由多个脚本组成,下面结合更新后的可用于生产的代码逐一讲解:
本项目实现两类服务器:Stdio 与 HTTP。先来看 Stdio MCP 服务器。它提供安全的本地命令执行,通过标准输入/输出与客户端通信,并在沙盒化的工作区目录中运行命令。
servers/stdio/terminal_server.py
"""
Terminal MCP Server
Provides secure local command execution capabilities through MCP stdio transport.
"""
import os
import subprocess
import logging
from pathlib import Path
from mcp.server.fastmcp import FastMCP
from pydantic import BaseModel, Field
# Configure logging for stdio server
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
# Create FastMCP server for stdio transport (no HTTP, uses stdin/stdout)
mcp = FastMCP("terminal")
# Get workspace directory from environment or use default
WORKSPACE_DIR = Path(os.getenv("WORKSPACE_DIR", "workspace")).resolve()
# Ensure workspace exists and log location
WORKSPACE_DIR.mkdir(exist_ok=True)
logger.info(f"Terminal server workspace: {WORKSPACE_DIR}")
classCommandInput(BaseModel):
"""Input model for terminal commands with validation."""
command: str = Field(
...,
description="Shell command to execute in the workspace directory",
min_length=1
)
classCommandResult(BaseModel):
"""Output model for command execution results with full details."""
command: str = Field(..., description="The command that was executed")
exit_code: int = Field(..., description="Process exit code (0 = success)")
stdout: str = Field(..., description="Standard output from the command")
stderr: str = Field(..., description="Standard error from the command")
working_directory: str = Field(..., description="Directory where command was executed")
@mcp.tool(
description="Execute a shell command in the secure workspace directory. Use for file operations, text processing, and system tasks.",
title="Terminal Command Executor"
)
asyncdefrun_command(params: CommandInput) -> CommandResult:
"""
Execute a terminal command within the workspace directory.
Security features:
- Commands execute only within the workspace directory
- 30-second timeout to prevent hanging processes
- Full command output captured for transparency
Args:
params: CommandInput containing the command to execute
Returns:
CommandResult with execution details and output
"""
command = params.command.strip()
# Log the command execution for debugging
logger.info(f"Executing command: {command}")
try:
# Execute command in workspace directory with timeout
result = subprocess.run(
command,
shell=True,
cwd=WORKSPACE_DIR, # Sandbox to workspace directory
capture_output=True, # Capture both stdout and stderr
text=True, # Return strings instead of bytes
timeout=30# 30 second timeout for safety
)
# Prepare structured result
cmd_result = CommandResult(
command=command,
exit_code=result.returncode,
stdout=result.stdout,
stderr=result.stderr,
working_directory=str(WORKSPACE_DIR)
)
# Log result summary for monitoring
status = "SUCCESS"if result.returncode == 0else"ERROR"
logger.info(f"{status}: Command '{command}' completed with exit code {result.returncode}")
return cmd_result
except subprocess.TimeoutExpired:
logger.error(f"Command '{command}' timed out after 30 seconds")
return CommandResult(
command=command,
exit_code=-1,
stdout="",
stderr="Command timed out after 30 seconds",
working_directory=str(WORKSPACE_DIR)
)
except Exception as e:
logger.error(f"Error executing command '{command}': {e}")
return CommandResult(
command=command,
exit_code=-1,
stdout="",
stderr=f"Execution error: {str(e)}",
working_directory=str(WORKSPACE_DIR)
)
if __name__ == "__main__":
logger.info("Starting Terminal MCP Server (stdio transport)")
logger.info(f"Workspace: {WORKSPACE_DIR}")
# Run with stdio transport (communicates via stdin/stdout)
mcp.run(transport="stdio")
要点:FastMCP("terminal")
创建通过标准输入/输出通信的服务器;WORKSPACE_DIR
负责构建安全沙盒,防止访问敏感系统目录。Pydantic 模型(CommandInput
、CommandResult
)保证输入输出的结构化与有效性;@mcp.tool
将 run_command
暴露为可被客户端发现与调用的 MCP 工具。subprocess.run()
执行命令并捕获输出,30 秒超时避免失控,cwd=WORKSPACE_DIR
限制命令只能在沙盒内执行。
基于 FastMCP 构建支持流式传输的 HTTP 服务器,提供完整的温度转换工具,并包含参数校验。
servers/http/temperature_server.py
"""
Temperature Conversion MCP Server
Provides comprehensive temperature conversion tools supporting Celsius, Fahrenheit, and Kelvin.
"""
import click
import logging
from typing import Union
from pydantic import BaseModel, Field, validator
from mcp.server.fastmcp import FastMCP
@click.command()
@click.option("--port", default=8000, help="Port to run the server on")
@click.option("--host", default="localhost", help="Host to bind the server to")
@click.option("--log-level", default="INFO", help="Logging level")
defmain(port: int, host: str, log_level: str) -> None:
"""Launch the Temperature Conversion MCP Server."""
# Configure logging
logging.basicConfig(
level=getattr(logging, log_level.upper(), logging.INFO),
format="%(asctime)s - %(name)s - %(levelname)s - %(message)s"
)
logger = logging.getLogger(__name__)
logger.info("Starting Temperature Conversion MCP Server...")
# Create FastMCP server with streamable HTTP transport
mcp = FastMCP(
"Temperature Converter",
host=host,
port=port,
stateless_http=True# Enable streamable HTTP protocol
)
# Input/Output Models for type safety and validation
classTemperatureInput(BaseModel):
"""Input model for temperature conversion with validation."""
temperature: float = Field(
...,
description="Temperature value to convert"
)
@validator('temperature')
defvalidate_temperature_range(cls, v):
"""Validate temperature is within physically reasonable bounds."""
if v < -273.15: # Below absolute zero in Celsius
raiseValueError("Temperature cannot be below absolute zero (-273.15°C)")
return v
classTemperatureResult(BaseModel):
"""Output model for temperature conversion results."""
original_value: float = Field(..., description="Original temperature value")
original_scale: str = Field(..., description="Original temperature scale")
converted_value: float = Field(..., description="Converted temperature value")
converted_scale: str = Field(..., description="Target temperature scale")
formula: str = Field(..., description="Conversion formula used")
# Core conversion functions (business logic)
defcelsius_to_fahrenheit_calc(celsius: float) -> float:
"""Convert Celsius to Fahrenheit using standard formula."""
return (celsius * 9/5) + 32
deffahrenheit_to_celsius_calc(fahrenheit: float) -> float:
"""Convert Fahrenheit to Celsius using standard formula."""
return (fahrenheit - 32) * 5/9
defcelsius_to_kelvin_calc(celsius: float) -> float:
"""Convert Celsius to Kelvin by adding absolute zero offset."""
return celsius + 273.15
defkelvin_to_celsius_calc(kelvin: float) -> float:
"""Convert Kelvin to Celsius by subtracting absolute zero offset."""
return kelvin - 273.15
deffahrenheit_to_kelvin_calc(fahrenheit: float) -> float:
"""Convert Fahrenheit to Kelvin via Celsius intermediate."""
celsius = fahrenheit_to_celsius_calc(fahrenheit)
return celsius_to_kelvin_calc(celsius)
defkelvin_to_fahrenheit_calc(kelvin: float) -> float:
"""Convert Kelvin to Fahrenheit via Celsius intermediate."""
celsius = kelvin_to_celsius_calc(kelvin)
return celsius_to_fahrenheit_calc(celsius)
# MCP Tool Registrations - These become available to clients
@mcp.tool(
description="Convert temperature from Celsius to Fahrenheit",
title="Celsius to Fahrenheit Converter"
)
defcelsius_to_fahrenheit(params: TemperatureInput) -> TemperatureResult:
"""Convert Celsius to Fahrenheit with validation and formula info."""
converted = celsius_to_fahrenheit_calc(params.temperature)
returnTemperatureResult(
original_value=params.temperature,
original_scale="Celsius",
converted_value=round(converted, 2),
converted_scale="Fahrenheit",
formula="°F = (°C × 9/5) + 32"
)
@mcp.tool(
description="Convert temperature from Fahrenheit to Celsius",
title="Fahrenheit to Celsius Converter"
)
deffahrenheit_to_celsius(params: TemperatureInput) -> TemperatureResult:
"""Convert Fahrenheit to Celsius with additional validation."""
# Additional validation for Fahrenheit absolute zero
if params.temperature < -459.67: # Below absolute zero in Fahrenheit
raiseValueError("Temperature cannot be below absolute zero (-459.67°F)")
converted = fahrenheit_to_celsius_calc(params.temperature)
returnTemperatureResult(
original_value=params.temperature,
original_scale="Fahrenheit",
converted_value=round(converted, 2),
converted_scale="Celsius",
formula="°C = (°F - 32) × 5/9"
)
@mcp.tool(
description="Convert temperature from Celsius to Kelvin",
title="Celsius to Kelvin Converter"
)
defcelsius_to_kelvin(params: TemperatureInput) -> TemperatureResult:
"""Convert Celsius to Kelvin - simple offset addition."""
converted = celsius_to_kelvin_calc(params.temperature)
returnTemperatureResult(
original_value=params.temperature,
original_scale="Celsius",
converted_value=round(converted, 2),
converted_scale="Kelvin",
formula="K = °C + 273.15"
)
@mcp.tool(
description="Convert temperature from Kelvin to Celsius",
title="Kelvin to Celsius Converter"
)
defkelvin_to_celsius(params: TemperatureInput) -> TemperatureResult:
"""Convert Kelvin to Celsius with non-negative validation."""
# Kelvin cannot be negative by definition
if params.temperature < 0:
raiseValueError("Kelvin temperature cannot be negative")
converted = kelvin_to_celsius_calc(params.temperature)
returnTemperatureResult(
original_value=params.temperature,
original_scale="Kelvin",
converted_value=round(converted, 2),
converted_scale="Celsius",
formula="°C = K - 273.15"
)
@mcp.tool(
description="Convert temperature from Fahrenheit to Kelvin",
title="Fahrenheit to Kelvin Converter"
)
deffahrenheit_to_kelvin(params: TemperatureInput) -> TemperatureResult:
"""Convert Fahrenheit to Kelvin via two-step conversion."""
if params.temperature < -459.67:
raiseValueError("Temperature cannot be below absolute zero (-459.67°F)")
converted = fahrenheit_to_kelvin_calc(params.temperature)
returnTemperatureResult(
original_value=params.temperature,
original_scale="Fahrenheit",
converted_value=round(converted, 2),
converted_scale="Kelvin",
formula="K = (°F - 32) × 5/9 + 273.15"
)
@mcp.tool(
description="Convert temperature from Kelvin to Fahrenheit",
title="Kelvin to Fahrenheit Converter"
)
defkelvin_to_fahrenheit(params: TemperatureInput) -> TemperatureResult:
"""Convert Kelvin to Fahrenheit via two-step conversion."""
if params.temperature < 0:
raiseValueError("Kelvin temperature cannot be negative")
converted = kelvin_to_fahrenheit_calc(params.temperature)
returnTemperatureResult(
original_value=params.temperature,
original_scale="Kelvin",
converted_value=round(converted, 2),
converted_scale="Fahrenheit",
formula="°F = (K - 273.15) × 9/5 + 32"
)
# Server startup with error handling
try:
logger.info(f"Temperature server running on {host}:{port}")
logger.info("Available conversions: °C↔°F, °C↔K, °F↔K")
mcp.run(transport="streamable-http") # Use new streamable HTTP transport
except KeyboardInterrupt:
logger.info("Server shutting down gracefully...")
except Exception as e:
logger.error(f"Server error: {e}")
raise
finally:
logger.info("Temperature conversion server stopped")
if __name__ == "__main__":
main()
要点:使用 Click 装饰器构建启动命令行,可配置主机、端口与日志级别;FastMCP(stateless_http=True)
启用可流式 HTTP 传输。内置的“温度合法性验证”会拒绝低于绝对零度的输入。所有转换函数以 @mcp.tool
暴露为工具;TemperatureResult
模型确保返回包括原值、目标值、温标与公式在内的结构化数据。
servers/http/server_launcher.py
这是一个用于管理 HTTP MCP 服务器生命周期的工具,涵盖启动、健康检查与关闭。
"""
HTTP Server Launcher
Provides utilities to start and manage HTTP MCP servers with health monitoring.
"""
import subprocess
import sys
import time
import requests
import logging
from pathlib import Path
from typing import List, Optional
logger = logging.getLogger(__name__)
classServerLauncher:
"""Manages HTTP MCP server lifecycle with health monitoring."""
def__init__(self):
self.processes:List[subprocess.Popen] = []
defstart_temperature_server(self, port: int = 8000, host: str = "localhost") -> bool:
"""Start the temperature conversion server with health monitoring."""
try:
server_path = Path(__file__).parent / "temperature_server.py"
cmd = [
sys.executable,
str(server_path),
"--port", str(port),
"--host", host,
"--log-level", "INFO"
]
logger.info(f"Starting temperature server on {host}:{port}")
process = subprocess.Popen(
cmd,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
text=True
)
self.processes.append(process)
# Wait for server to be ready and verify health
returnself._wait_for_server(host, port)
except Exception as e:
logger.error(f"Failed to start temperature server: {e}")
returnFalse
def_wait_for_server(self, host: str, port: int, timeout: int = 10) -> bool:
"""Wait for server to become available with health checking."""
url = f"http://{host}:{port}/mcp"
start_time = time.time()
while time.time() - start_time < timeout:
try:
# Try to connect to the MCP endpoint
# We expect a 406 "Not Acceptable" response which means the server is running
# but needs proper MCP headers (this confirms the MCP server is active)
response = requests.get(url, timeout=1)
if response.status_code == 406: # MCP server expects proper headers
logger.info(f"Server ready at {host}:{port}")
returnTrue
except requests.RequestException:
pass
time.sleep(0.5)
logger.warning(f"Server at {host}:{port} not ready within {timeout}s")
returnFalse
defstop_all_servers(self) -> None:
"""Stop all managed server processes gracefully."""
for process inself.processes:
try:
process.terminate() # Send SIGTERM for graceful shutdown
process.wait(timeout=5) # Wait up to 5 seconds
logger.info(f"Stopped server process {process.pid}")
except Exception as e:
logger.error(f"Error stopping process {process.pid}: {e}")
try:
process.kill() # Force kill if graceful shutdown fails
except:
pass
self.processes.clear()
# Global launcher instance
launcher = ServerLauncher()
要点:
config/servers.json
定义客户端可连接的所有 MCP 服务器,包含传输类型(HTTP 或 stdio)、连接参数与描述。
{
"mcpServers":{
"temperature_converter":{
"type":"http",
"url":"http://localhost:8000/mcp/",
"description":"Temperature conversion utilities supporting Celsius, Fahrenheit, and Kelvin"
},
"terminal":{
"type":"stdio",
"command":"python",
"args":["servers/stdio/terminal_server.py"],
"description":"Local terminal command execution for file operations"
}
}
}
说明:
temperature_converter
:运行于 localhost:8000
的 HTTP 服务器,提供温度转换工具terminal
:通过标准输入/输出与客户端通信的 Stdio 服务器src/utils/config_loader.py
负责加载与校验服务器配置,集中式管理并具备完备的错误处理。
"""Configuration management utilities."""
import os
import json
import logging
from pathlib import Path
from typing import Dict, Any
from dotenv import load_dotenv
# Load environment variables
load_dotenv()
logger = logging.getLogger(__name__)
class ConfigLoader:
"""Handles loading and validation of MCP server configurations."""
def __init__(self, config_path: str = None):
"""Initialize with optional config path override."""
self.config_path = self._resolve_config_path(config_path)
self._config_cache = None
def _resolve_config_path(self, config_path: str = None) -> Path:
"""Resolve configuration file path with fallbacks."""
if config_path:
returnPath(config_path)
# Try environment variable
env_path = os.getenv("MCP_CONFIG_PATH")
if env_path:
returnPath(env_path)
# Default to config/servers.json
project_root = Path(__file__).parent.parent.parent
return project_root / "config" / "servers.json"
def load_config(self) -> Dict[str, Any]:
"""Load and cache configuration from JSON file."""
ifself._config_cache is not None:
returnself._config_cache
try:
if not self.config_path.exists():
raise FileNotFoundError(f"Config file not found: {self.config_path}")
with open(self.config_path, 'r', encoding='utf-8') as f:
self._config_cache = json.load(f)
logger.info(f"Configuration loaded from: {self.config_path}")
return self._config_cache
except Exception as e:
logger.error(f"Failed to load configuration: {e}")
raise
def get_servers(self) -> Dict[str, Dict[str, Any]]:
"""Get MCP server configurations."""
config = self.load_config()
return config.get("mcpServers", {})
def validate_server_config(self, server_name: str, server_config: Dict[str, Any]) -> bool:
"""Validate individual server configuration."""
required_fields = ["type"]
for field in required_fields:
if field not in server_config:
logger.error(f"Server '{server_name}' missing required field: {field}")
return False
server_type = server_config["type"]
if server_type == "http":
if "url" not in server_config:
logger.error(f"HTTP server '{server_name}' missing 'url' field")
return False
elif server_type == "stdio":
if "command" not in server_config:
logger.error(f"Stdio server '{server_name}' missing 'command' field")
return False
else:
logger.error(f"Server '{server_name}' has unsupported type: {server_type}")
return False
return True
# Global config loader instance
config_loader = ConfigLoader()
要点:ConfigLoader
采用“智能路径解析”,从函数参数、环境变量、默认路径多处查找配置;内置缓存(_config_cache
)避免重复读盘。validate_server_config()
根据传输类型校验必填字段:HTTP 必须有 URL,Stdio 必须有命令,避免因配置不当导致的运行时错误。
该模块负责管理 Google ADK Agent,并协调连接多个 MCP 服务器。
职责:服务器发现、工具加载,并向客户端提供统一接口。
src/agent/agent_wrapper.py
"""
Agent Wrapper
Manages the Google ADK agent and MCP toolset connections with enhanced error handling and debugging.
"""
import asyncio
import logging
import os
from typing importList, Optional, Dict, Any
from pathlib import Path
# Google ADK imports
from google.adk.agents.llm_agent import LlmAgent
from google.adk.tools.mcp_tool.mcp_toolset import MCPToolset
from google.adk.tools.mcp_tool.mcp_session_manager import StreamableHTTPServerParams
from google.adk.tools.mcp_tool import StdioConnectionParams
from mcp import StdioServerParameters
# Local imports
from src.utils.config_loader import config_loader
from src.utils.formatters import formatter
logger = logging.getLogger(__name__)
classAgentWrapper:
"""
Enhanced wrapper for Google ADK agent with MCP toolset management.
This class orchestrates the connection between the ADK agent and multiple MCP servers,
providing automatic server discovery, connection health monitoring, and tool filtering.
"""
def__init__(self, tool_filter: Optional[List[str]] = None):
"""
Initialize the agent wrapper.
Args:
tool_filter: Optional list of tool names to allow. If None, all tools are loaded.
"""
self.tool_filter = tool_filter
self.agent: Optional[LlmAgent] = None
self.toolsets: List[MCPToolset] = []
self.server_status: Dict[str, str] = {}
logger.info("AgentWrapper initialized")
if tool_filter:
logger.info(f"Tool filter active: {tool_filter}")
asyncdefbuild(self) -> None:
"""
Build the ADK agent with MCP toolsets.
This method orchestrates the entire agent building process:
1. Loads server configurations from config file
2. Establishes connections to each configured server
3. Discovers and filters available tools from each server
4. Creates the ADK agent with all loaded toolsets
"""
logger.info("Building agent with MCP toolsets...")
try:
# Load toolsets from all configured servers
toolsets = awaitself._load_toolsets()
ifnot toolsets:
logger.warning("No toolsets loaded - agent will have no tools available")
# Create the ADK agent with Gemini 2.0 Flash Exp
self.agent = LlmAgent(
model="gemini-2.0-flash-exp", # Use latest Gemini model
name="universal_mcp_assistant",
instruction=self._get_agent_instruction(),
tools=toolsets # Provide all loaded toolsets
)
self.toolsets = toolsets
logger.info(f"Agent built successfully with {len(toolsets)} toolsets")
except Exception as e:
logger.error(f"Failed to build agent: {e}")
raise
def_get_agent_instruction(self) -> str:
"""Get the system instruction that defines the agent's behavior and capabilities."""
return"""You are a helpful assistant with access to temperature conversion tools and local file operations.
Your capabilities include:
- Converting temperatures between Celsius, Fahrenheit, and Kelvin
- Executing local commands for file operations
- Providing detailed explanations of conversions and formulas
When handling temperature conversions:
- Always validate input values for physical reasonableness
- Show the conversion formula used
- Round results to appropriate precision
- Handle multiple conversions in sequence when requested
When working with files:
- Use the terminal tools to create, read, and modify files
- Format output clearly and professionally
- Confirm successful file operations
Be precise, helpful, and educational in your responses. Show your work and explain the steps you're taking."""
asyncdef_load_toolsets(self) -> List[MCPToolset]:
"""
Load toolsets from all configured MCP servers.
This method iterates through all configured servers, attempts to connect
to each one, and loads their available tools into MCPToolset instances.
Returns:
List of successfully connected MCPToolset instances.
"""
servers = config_loader.get_servers()
toolsets = []
logger.info(f"Loading toolsets from {len(servers)} configured servers...")
for server_name, server_config in servers.items():
try:
# Validate server configuration first
ifnot config_loader.validate_server_config(server_name, server_config):
self.server_status[server_name] = "invalid_config"
continue
# Create connection parameters based on server type
connection_params = awaitself._create_connection_params(
server_name, server_config
)
ifnot connection_params:
self.server_status[server_name] = "connection_failed"
continue
# Create MCPToolset and connect to server
toolset = MCPToolset(
connection_params=connection_params,
tool_filter=self.tool_filter # Apply tool filtering if specified
)
# Test connection by attempting to get available tools
tools = await toolset.get_tools()
tool_names = [tool.name for tool in tools]
if tools:
toolsets.append(toolset)
self.server_status[server_name] = "connected"
formatter.print_tool_summary(server_name, tool_names)
logger.info(f"Connected to {server_name}: {len(tool_names)} tools loaded")
else:
logger.warning(f"No tools found on server '{server_name}'")
self.server_status[server_name] = "no_tools"
except Exception as e:
logger.error(f"Failed to connect to server '{server_name}': {e}")
self.server_status[server_name] = f"error: {str(e)}"
continue
logger.info(f"Successfully loaded {len(toolsets)} toolsets")
return toolsets
asyncdef_create_connection_params(
self,
server_name: str,
server_config: Dict[str, Any]
) -> Optional[Any]:
"""
Create appropriate connection parameters based on server transport type.
Args:
server_name: Name of the server for logging
server_config: Server configuration dictionary
Returns:
Connection parameters object or None if creation failed
"""
server_type = server_config["type"]
try:
if server_type == "http":
# Create HTTP connection parameters for streamable HTTP servers
return StreamableHTTPServerParams(url=server_config["url"])
elif server_type == "stdio":
# Create stdio connection parameters for local process servers
command = server_config["command"]
args = server_config.get("args", [])
# Resolve relative paths to absolute paths
if args:
project_root = Path(__file__).parent.parent.parent
resolved_args = []
for arg in args:
ifnot os.path.isabs(arg) and arg.endswith('.py'):
resolved_args.append(str(project_root / arg))
else:
resolved_args.append(arg)
args = resolved_args
return StdioConnectionParams(
server_params=StdioServerParameters(
command=command,
args=args
),
timeout=10# Connection timeout
)
else:
raise ValueError(f"Unsupported server type: {server_type}")
except Exception as e:
logger.error(f"Error creating connection params for '{server_name}': {e}")
returnNone
asyncdefclose(self) -> None:
"""
Gracefully close all toolset connections and cleanup resources.
"""
logger.info("Shutting down agent and closing toolset connections...")
for i, toolset inenumerate(self.toolsets):
try:
await toolset.close()
logger.debug(f"Closed toolset {i+1}")
except Exception as e:
logger.error(f"Error closing toolset {i+1}: {e}")
self.toolsets.clear()
self.agent = None
# Small delay to ensure cleanup completes
await asyncio.sleep(0.5)
logger.info("Agent shutdown complete")
defget_server_status(self) -> Dict[str, str]:
"""Get the current connection status of all configured servers."""
returnself.server_status.copy()
defis_ready(self) -> bool:
"""Check if the agent is properly initialized and ready for use."""
returnself.agent isnot None
要点:AgentWrapper
是中枢协调器,把 ADK Agent 连接到多个 MCP 服务器;_load_toolsets()
迭代配置的服务器,尝试连接并收集工具,MCPToolset
封装连接以供 Agent 使用;_create_connection_params()
按传输类型构造连接参数(HTTP 用 URL,Stdio 用命令参数),路径解析确保相对路径在不同运行位置都有效;LlmAgent
将模型、系统提示与工具集装配成能无缝使用多服务器的智能 Agent。
客户端作为 UI 与 Agent 的协调层,提供会话管理、流式响应与细粒度调试。
src/client/mcp_client.py
"""
MCP Client
Enhanced client interface with detailed debugging of MCP interactions.
"""
import logging
from typing import Optional, List, AsyncGenerator, Any
from google.genai.types import Content, Part
from google.adk.runners import Runner
from google.adk.sessions import InMemorySessionService
from src.agent.agent_wrapper import AgentWrapper
from src.utils.formatters import formatter
logger = logging.getLogger(__name__)
class MCPClient:
"""
Enhanced MCP client with session management, streaming responses, and detailed debugging.
This client provides a high-level interface for interacting with the ADK agent
while showing detailed information about MCP server interactions.
"""
def __init__(
self,
app_name: str = "universal_mcp_client",
user_id: str = "default_user",
session_id: str = "default_session",
tool_filter: Optional[List[str]] = None,
debug_mode: bool = False
):
"""
Initialize the MCP client with debugging capabilities.
Args:
app_name: Application identifier for ADK
user_id: User identifier for session context
session_id: Session identifier for conversation context
tool_filter: Optional list of allowed tool names
debug_mode: Enable detailed debugging of MCP interactions
"""
self.app_name = app_name
self.user_id = user_id
self.session_id = session_id
self.debug_mode = debug_mode
# Initialize core components
self.session_service = InMemorySessionService()
self.agent_wrapper = AgentWrapper(tool_filter=tool_filter)
self.runner: Optional[Runner] = None
# State tracking
self.is_initialized = False
logger.info(f"MCPClient initialized for user '{user_id}', session '{session_id}'")
if debug_mode:
logger.info("Debug mode enabled - detailed MCP interactions will be shown")
async def initialize(self) ->None:
"""
Initialize the client session and agent.
This method must be called before using send_message().
It sets up the session, builds the agent, and prepares the runner.
"""
ifself.is_initialized:
logger.warning("Client already initialized")
return
try:
logger.info("Initializing MCP client...")
# Create ADK session
awaitself.session_service.create_session(
app_name=self.app_name,
user_id=self.user_id,
session_id=self.session_id
)
logger.debug("ADK session created")
# Build agent with all MCP toolsets
awaitself.agent_wrapper.build()
if not self.agent_wrapper.is_ready():
raise RuntimeError("Agent failed to initialize properly")
# Create runner to handle agent execution
self.runner = Runner(
agent=self.agent_wrapper.agent,
app_name=self.app_name,
session_service=self.session_service
)
self.is_initialized = True
logger.info("MCP client initialized successfully")
# Print server status summary
status = self.agent_wrapper.get_server_status()
connected = sum(1forsin status.values() if s == "connected")
logger.info(f"Server status: {connected}/{len(status)} servers connected")
except Exception as e:
logger.error(f"Failed to initialize MCP client: {e}")
awaitself.shutdown()
raise
async def send_message(self, message: str) -> AsyncGenerator[Any, None]:
"""
Send a message to the agent and stream the response with detailed debugging.
Args:
message: User message to send to the agent
Yields:
Streaming response events from the agent with MCP interaction details
Raises:
RuntimeError: If client is not initialized
"""
if not self.is_initialized:
raise RuntimeError("Client not initialized. Call initialize() first.")
if not message.strip():
raise ValueError("Message cannot be empty")
logger.info(f"Sending message: {message[:100]}{'...' if len(message) > 100 else ''}")
try:
# Create content object forADK
content = Content(
role="user",
parts=[Part(text=message)]
)
event_count = 0
# Send to agent and yield streaming responses with debugging
asyncforeventinself.runner.run_async(
user_id=self.user_id,
session_id=self.session_id,
new_message=content
):
event_count += 1
# Show detailed debugging information
ifself.debug_mode:
formatter.print_json_response(event, f"Event #{event_count}")
self._analyze_event(event, event_count)
yield event
except Exception as e:
logger.error(f"Error sending message: {e}")
raise
def _analyze_event(self, event: Any, event_count: int) ->None:
"""
Analyze and display detailed information about MCP events.
Args:
event: The event object from the agent
event_count: Sequential event number for tracking
"""
try:
# Check if this is a tool-related event
ifhasattr(event, 'tool_calls') and event.tool_calls:
for tool_call in event.tool_calls:
formatter.print_mcp_interaction(
"tool_call",
{
"tool_name": tool_call.name if hasattr(tool_call, 'name') else "Unknown",
"parameters": tool_call.args if hasattr(tool_call, 'args') else {},
"server": "MCP Server",
"event_number": event_count
}
)
# Check for tool responses
if hasattr(event, 'tool_responses') and event.tool_responses:
for tool_response in event.tool_responses:
formatter.print_mcp_interaction(
"tool_response",
{
"tool_name": getattr(tool_response, 'name', 'Unknown'),
"result": str(tool_response.content) if hasattr(tool_response, 'content') else "No result",
"status": "success" if not hasattr(tool_response, 'error') else "error",
"event_number": event_count
}
)
# Check for agent thinking/processing
if hasattr(event, 'content') and hasattr(event.content, 'parts'):
if event.content.parts and not getattr(event, 'is_final_response', lambda: False)():
formatter.print_mcp_interaction(
"agent_thinking",
{
"content": event.content.parts[0].text if event.content.parts else "Processing...",
"event_number": event_count
}
)
# Check for final response
if hasattr(event, 'is_final_response') and event.is_final_response():
content = ""
if hasattr(event, 'content') and hasattr(event.content, 'parts') and event.content.parts:
content = event.content.parts[0].text
formatter.print_mcp_interaction(
"final_response",
{
"content": content,
"event_number": event_count
}
)
except Exception as e:
logger.debug(f"Error analyzing event {event_count}: {e}")
def toggle_debug_mode(self) -> bool:
"""Toggle debug mode on/off and return new state."""
self.debug_mode = not self.debug_mode
logger.info(f"Debug mode {'enabled' if self.debug_mode else 'disabled'}")
return self.debug_mode
async def shutdown(self) -> None:
"""
Gracefully shutdown the client and cleanup all resources.
"""
logger.info("Shutting down MCP client...")
try:
if self.agent_wrapper:
await self.agent_wrapper.close()
# Reset state
self.runner = None
self.is_initialized = False
logger.info("MCP client shutdown complete")
except Exception as e:
logger.error(f"Error during shutdown: {e}")
def get_status(self) -> dict:
"""
Get comprehensive client status information.
Returns:
Dictionary with detailed client status information
"""
status = {
"initialized": self.is_initialized,
"debug_mode": self.debug_mode,
"app_name": self.app_name,
"user_id": self.user_id,
"session_id": self.session_id,
"agent_ready": self.agent_wrapper.is_ready() if self.agent_wrapper else False,
"server_status": self.agent_wrapper.get_server_status() if self.agent_wrapper else {}
}
return status
要点:MCPClient
管理会话生命周期,创建 ADK 会话、构建 Agent 并负责清理;send_message()
负责实时流式传输,将用户输入转为 ADK Content
,经 Runner
发送给 Agent,并随着事件发生滚动输出;调试模式可“透视”工具调用、参数与响应。_analyze_event()
解析流事件,定位工具调用、响应与“思考过程”,帮助理解决策与数据流。
该入口提供交互式命令行界面,实时展示 MCP 客户端与服务器的交互细节。
cli/main.py
"""
CLI Main Entry Point
Enhanced command-line interface with detailed MCP interaction debugging.
"""
import asyncio
import logging
import os
import sys
from pathlib import Path
from typing importOptional
# Add project root to Python path
project_root = Path(__file__).parent.parent
sys.path.insert(0, str(project_root))
from src.client.mcp_client import MCPClient
from src.utils.formatters import formatter
from servers.http.server_launcher import launcher
# Configuration
DEFAULT_TOOLS = [
'celsius_to_fahrenheit',
'fahrenheit_to_celsius',
'celsius_to_kelvin',
'kelvin_to_celsius',
'fahrenheit_to_kelvin',
'kelvin_to_fahrenheit',
'run_command'
]
# Configure logging with detailed format
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s - %(name)s - %(levelname)s - %(message)s",
handlers=[
logging.StreamHandler(),
logging.FileHandler("mcp_client.log")
]
)
# Suppress verbose logs from external libraries
logging.getLogger("httpx").setLevel(logging.WARNING)
logging.getLogger("google").setLevel(logging.WARNING)
logger = logging.getLogger(__name__)
classMCPClientCLI:
"""Enhanced CLI for MCP Client with detailed debugging and server management."""
def__init__(self):
self.client: Optional[MCPClient] = None
self.server_started = False
self.debug_mode = True# Enable debug mode by default
asyncdefstart_servers(self) -> bool:
"""Start required HTTP servers with health monitoring."""
try:
logger.info("Starting HTTP servers...")
# Start temperature server
if launcher.start_temperature_server(port=8000):
self.server_started = True
logger.info("Temperature server started successfully")
returnTrue
else:
logger.error("Failed to start temperature server")
returnFalse
except Exception as e:
logger.error(f"Error starting servers: {e}")
returnFalse
asyncdefinitialize_client(self) -> bool:
"""Initialize the MCP client with debugging enabled."""
try:
logger.info("Initializing MCP client...")
self.client = MCPClient(
app_name="universal_mcp_client",
user_id="cli_user_001",
session_id="cli_session_001",
tool_filter=DEFAULT_TOOLS,
debug_mode=self.debug_mode # Enable debugging
)
awaitself.client.initialize()
logger.info("MCP client initialized")
returnTrue
except Exception as e:
logger.error(f"Failed to initialize client: {e}")
formatter.print_error("Client initialization failed", e)
returnFalse
asyncdefchat_loop(self) -> None:
"""Main interactive chat loop with enhanced debugging."""
formatter.print_welcome_banner()
print("\nChat started. Commands:")
print(" - Type your requests in natural language")
print(" - 'status' - Show system status")
print(" - 'debug on/off' - Toggle detailed debugging")
print(" - 'help' - Show example requests")
print(" - 'quit', 'exit', ':q' - Exit the application\n")
try:
whileTrue:
try:
# Get user input
user_input = input("You: ").strip()
ifnot user_input:
continue
# Handle special commands
if user_input.lower() in ['quit', 'exit', ':q']:
print("Goodbye!")
break
elif user_input.lower() == 'status':
self._show_status()
continue
elif user_input.lower().startswith('debug'):
self._handle_debug_command(user_input)
continue
elif user_input.lower() == 'help':
self._show_help()
continue
# Send message to agent with debugging
awaitself._handle_user_message(user_input)
except KeyboardInterrupt:
print("\n\nInterrupted by user. Goodbye!")
break
except EOFError:
print("\n\nEnd of input. Goodbye!")
break
except Exception as e:
logger.error(f"Error in chat loop: {e}")
formatter.print_error("An error occurred", e)
except Exception as e:
logger.error(f"Critical error in chat loop: {e}")
formatter.print_error("Critical error occurred", e)
asyncdef_handle_user_message(self, message: str) -> None:
"""Handle a user message with detailed MCP interaction debugging."""
event_count = 0
final_response = None
try:
print(f"\nAssistant: Processing your request...")
ifself.debug_mode:
print("[DEBUG MODE] Showing detailed MCP interactions:\n")
asyncfor event inself.client.send_message(message):
event_count += 1
# In debug mode, detailed interactions are shown by the client
# Here we just count events and look for the final response
ifhasattr(event, 'is_final_response') and event.is_final_response():
final_response = event
break
# Display final response
if final_response andhasattr(final_response, 'content'):
ifhasattr(final_response.content, 'parts') and final_response.content.parts:
response_text = final_response.content.parts[0].text
print(f"\nFinal Response:\n{response_text}\n")
else:
print("Task completed (no text response)\n")
else:
print("No final response received\n")
ifself.debug_mode:
print(f"Total events processed: {event_count}")
except Exception as e:
logger.error(f"Error handling message: {e}")
formatter.print_error("Failed to process message", e)
def_handle_debug_command(self, command: str) -> None:
"""Handle debug mode toggle commands."""
parts = command.split()
iflen(parts) > 1:
mode = parts[1].lower()
if mode == 'on':
self.debug_mode = True
ifself.client:
self.client.debug_mode = True
print("Debug mode enabled - detailed MCP interactions will be shown")
elif mode == 'off':
self.debug_mode = False
ifself.client:
self.client.debug_mode = False
print("Debug mode disabled - only final responses will be shown")
else:
print("Usage: debug on/off")
else:
current_state = "enabled"ifself.debug_mode else"disabled"
print(f"Debug mode is currently {current_state}")
def_show_status(self) -> None:
"""Show current system status with detailed information."""
ifnotself.client:
print("Client not initialized")
return
status = self.client.get_status()
print("\nSystem Status:")
print(f" - Client initialized: {'[OK]' if status['initialized'] else '[FAIL]'}")
print(f" - Agent ready: {'[OK]' if status['agent_ready'] else '[FAIL]'}")
print(f" - Debug mode: {'ON' if status['debug_mode'] else 'OFF'}")
print(f" - User: {status['user_id']}")
print(f" - Session: {status['session_id']}")
print("\nServer Status:")
for server_name, server_status in status['server_status'].items():
status_icon = "[OK]"if server_status == "connected"else"[FAIL]"
print(f" - {server_name}: {status_icon} {server_status}")
print()
def_show_help(self) -> None:
"""Show example requests and debugging tips."""
examples = [
"Convert 25 degrees Celsius to Fahrenheit",
"What is 100°F in Celsius and Kelvin?",
"Convert 300 Kelvin to Celsius and Fahrenheit",
"Convert 0°C to all other temperature scales",
"Convert room temperature (20°C) to Fahrenheit and save to file",
"Create a temperature conversion table for 0, 25, 50, 75, 100°C"
]
debug_tips = [
"Use 'debug on' to see detailed MCP server interactions",
"Watch for tool calls, parameters, and server responses",
"Each event shows the communication between client and servers",
"Use 'debug off' to see only final responses"
]
print("\nExample Requests:")
for i, example inenumerate(examples, 1):
print(f" {i}. {example}")
print("\nDebug Mode Tips:")
for i, tip inenumerate(debug_tips, 1):
print(f" {i}. {tip}")
print()
asyncdefcleanup(self) -> None:
"""Cleanup resources and stop servers."""
try:
ifself.client:
awaitself.client.shutdown()
ifself.server_started:
launcher.stop_all_servers()
except Exception as e:
logger.error(f"Error during cleanup: {e}")
asyncdefmain():
"""Main entry point for the CLI application with enhanced debugging."""
cli = MCPClientCLI()
try:
# Start servers
print("Starting Universal MCP Client with Debug Mode...")
ifnotawait cli.start_servers():
print("Failed to start required servers")
return1
# Initialize client
ifnotawait cli.initialize_client():
print("Failed to initialize client")
return1
# Start chat loop with debugging
await cli.chat_loop()
return0
except KeyboardInterrupt:
print("\nInterrupted by user")
return130
except Exception as e:
logger.error(f"Unexpected error: {e}")
formatter.print_error("Unexpected error occurred", e)
return1
finally:
await cli.cleanup()
if __name__ == "__main__":
try:
exit_code = asyncio.run(main())
sys.exit(exit_code)
except asyncio.CancelledError:
# Suppress cancelled error messages during shutdown
logger.debug("Main coroutine cancelled during shutdown")
sys.exit(0)
要点:MCPClientCLI
串起全局;start_servers()
启动所需 HTTP 服务并持续监控;chat_loop()
提供 REPL,支持 status
、debug on/off
、help
等“内置命令”;_handle_user_message()
将输入流式发送到客户端并汇聚事件;_handle_debug_command
支持随时切换详细/简洁模式;cleanup()
在退出时回收资源。
该模块用于将不同类型的响应美观化输出,包括展示客户端-服务器交互细节的调试信息。
我们使用 Rich 库生成配色清晰的控制台输出。详见 src/utils/formatter.py
。
通过实际示例来体验通用 MCP 客户端:
cd mcp/04-build-streammable-http-mcp-client
source .venv/bin/activate
uv run python cli/main.py
系统将自动:
一旦与服务器建立连接,你将在终端看到如下输出。
多重转换并写入文件:
You: Convert 100°F to Celsius and Kelvin, then save results to a file temp_conv.txt
预期响应:
**Assistant**: Processing your request...
**[DEBUGMODE]**Showing detailed MCP interactions:
## Event #1-ParallelFunctionCalls
```json
{
"content": {
"parts": [
{
"function_call": {
"id": "adk-3e153461-264d-4242-aabf-d796468119bb",
"args": {
"params": {
"temperature": 100
}
},
"name": "fahrenheit_to_celsius"
}
},
{
"function_call": {
"id": "adk-6dc14eb3-d76e-41a4-96cc-b9d9113b1903",
"args": {
"params": {
"temperature": 100
}
},
"name": "fahrenheit_to_kelvin"
}
}
],
"role": "model"
},
"usage_metadata": {
"candidates_token_count": 20,
"prompt_token_count": 613,
"total_token_count": 633
},
"timestamp": 1754508151.836657
}
```
## Event #2-ConversionResults
```json
{
"content": {
"parts": [
{
"function_response": {
"id": "adk-3e153461-264d-4242-aabf-d796468119bb",
"name": "fahrenheit_to_celsius",
"response": {
"result": {
"structuredContent": {
"original_value": 100.0,
"original_scale": "Fahrenheit",
"converted_value": 37.78,
"converted_scale": "Celsius",
"formula": "°C = (°F - 32) × 5/9"
},
"isError": false
}
}
}
},
{
"function_response": {
"id": "adk-6dc14eb3-d76e-41a4-96cc-b9d9113b1903",
"name": "fahrenheit_to_kelvin",
"response": {
"result": {
"structuredContent": {
"original_value": 100.0,
"original_scale": "Fahrenheit",
"converted_value": 310.93,
"converted_scale": "Kelvin",
"formula": "K = (°F - 32) × 5/9 + 273.15"
},
"isError": false
}
}
}
}
],
"role": "user"
},
"timestamp": 1754508152.818855
}
```
## Event #3-FileSaveCommand
```json
{
"content": {
"parts": [
{
"text": "Okay, 100°F is equal to 37.78°C and 310.93K.\n\nHere's the conversion information:\n* Fahrenheit to Celsius: °C = (°F - 32) × 5/9\n* Fahrenheit to Kelvin: K = (°F - 32) × 5/9 + 273.15\n\nNow, I will save the results to a file named `temp_conv.txt`."
},
{
"function_call": {
"id": "adk-696edb80-7cad-4815-a3ca-11b7d1884aae",
"args": {
"params": {
"command": "echo '100°F = 37.78°C = 310.93K' > temp_conv.txt"
}
},
"name": "run_command"
}
}
],
"role": "model"
},
"usage_metadata": {
"candidates_token_count": 147,
"prompt_token_count": 894,
"total_token_count": 1041
},
"timestamp": 1754508152.854623
}
```
**AgentThinking**: Okay, 100°Fis equal to 37.78°C and 310.93K.
Here's the conversion information:
-Fahrenheit to Celsius: °C= (°F-32) ×5/9
-Fahrenheit to Kelvin: K= (°F-32) ×5/9+273.15
Now, I will save the results to a file named `temp_conv.txt`.
## Event #4-CommandExecutionResult
```json
{
"content": {
"parts": [
{
"function_response": {
"id": "adk-696edb80-7cad-4815-a3ca-11b7d1884aae",
"name": "run_command",
"response": {
"result": {
"structuredContent": {
"command": "echo '100°F = 37.78°C = 310.93K' > temp_conv.txt",
"exit_code": 0,
"stdout": "",
"stderr": "",
"working_directory": "/Users/piyushagni5/Desktop/github/langgraph-ai/mcp/04-build-streammable-http-mcp-client/workspace"
},
"isError": false
}
}
}
}
],
"role": "user"
},
"timestamp": 1754508155.788237
}
```
## Event #5-FinalResponse
```json
{
"content": {
"parts": [
{
"text": "I have saved the converted temperatures to the file `temp_conv.txt`.\n"
}
],
"role": "model"
},
"usage_metadata": {
"candidates_token_count": 17,
"prompt_token_count": 1241,
"total_token_count": 1258
},
"timestamp": 1754508155.805442
}
```
---
## FinalResponse
**I have saved the converted temperatures to the file `temp_conv.txt`.**
**ConversionResults:**
-100°F=37.78°C=310.93K
**FormulasUsed:**
-Fahrenheit to Celsius: °C= (°F-32) ×5/9
-Fahrenheit to Kelvin: K= (°F-32) ×5/9+273.15
---
**Summary**: 5 total events processed
下面是成功执行查询后得到的最终响应截图:
为保证调试顺畅与校验严谨,系统提供多项增强能力:
服务器健康监控
客户端持续监控服务器健康并显示连接状态:
connected
:服务器健康且响应正常connection_failed
:连接失败invalid_config
:配置无效no_tools
:服务器已连接,但没有可用工具工具过滤
可限制 Agent 可访问的工具集合:
DEFAULT_TOOLS = [
'celsius_to_fahrenheit',
'fahrenheit_to_celsius',
'run_command' # Only allow specific tools
]
错误处理
系统能优雅处理多类异常:
通用 MCP 客户端充分展示了 MCP 生态的灵活性。得益于同时支持 HTTP 与 Stdio 两种传输,它能把本地工具与远程服务无缝纳入统一接口,胜任复杂的多步操作。增强的调试能力清晰呈现客户-服务器的交互细节,便于:
致谢: Anthropic, mcp-server
读完这篇信息量爆炸的“通用客户端”构建指南,你是否一方面为MCP的强大潜力感到震撼,另一方面又对其中复杂的组件(CLI、Client、Agent Core、Config...)感到一丝畏惧,不知从何学起?
别担心,你不是一个人。任何强大的系统都是从坚实的基础开始的。
这篇文章为你展示了山顶的风景,而活水智能的 「MCP101:扩展你的AI大模型能力」 课程,则为你铺设了一条通往山顶的坚实台阶。我们将从最基础的MCP概念(CH01)讲起,逐步带你了解其工作原理(CH02)和生态(CH03),并最终在课程的最后一章(CH05),亲手带你从0到1,开发出自己的第一个MCP。
■
53AI,企业落地大模型首选服务商
产品:场景落地咨询+大模型应用平台+行业解决方案
承诺:免费POC验证,效果达标后再合作。零风险落地应用大模型,已交付160+中大型企业
2025-08-21
别再用老地图,寻找AI这片新大陆丨AI时刻
2025-08-21
GPT-5 提示工程指南
2025-08-21
Skywork技术论坛一手分享:Agent与多模态的落地真相|甲子光年
2025-08-21
AI发展进入能动性阶段:上下文工程与模型能力并重 | 奇绩大模型笔记 #7
2025-08-21
企业微信,「奔向」AI一体化
2025-08-21
DeepSeekV3.1 提到的 UE8M0 FP8 Scale 是什么?下一代国产芯片设计?
2025-08-21
快手发布SeamlessFlow框架:完全解耦Trainer与Agent,时空复用实现无空泡的工业级RL训练!
2025-08-21
携程旅游的 AI 网关落地实践
2025-08-21
2025-05-29
2025-06-01
2025-06-21
2025-06-07
2025-06-12
2025-06-19
2025-06-13
2025-05-28
2025-07-29
2025-08-21
2025-08-20
2025-08-19
2025-08-19
2025-08-18
2025-08-18
2025-08-18
2025-08-15