微信扫码
添加专属顾问
我要投稿
如何用LangChain和MCP打造能自动生成可视化图表的ChatBI智能体?本文手把手教你实现数据分析+可视化的完整流程。核心内容: 1. 基于COVID-19疫情数据构建ChatBI+可视化智能体的完整实现流程 2. 使用ModelScope的MCP-Server实现图表可视化功能 3. 通过FastMCP自定义实现Covid19查询服务的实战案例
在本专栏的前面文章中,介绍过基于 LangChain + MCP
和 SpringAI + MCP
构建 ChatBI
智能体,但未对数据进行可视化图表的分析,而可视化图表是数据分析的重要组成部分,它能够将复杂的数据转化为直观易懂的图形表示。因此本篇文章基于 LangChain + MCP
实验构建 ChatBI + 可视化图表 综合的智能体。
其中分析数据采用数据采用 COVID-19
测试案例,包括:美国 2021-01-28
号,各个县county
的新冠疫情累计案例信息,包括确诊病例和死亡病例,数据格式如下所示:
date(日期),county(县),state(州),fips(县编码code),cases(累计确诊病例),deaths(累计死亡病例)
2021-01-28,Pike,Alabama,01109,2704,35
2021-01-28,Randolph,Alabama,01111,1505,37
2021-01-28,Russell,Alabama,01113,3675,16
2021-01-28,Shelby,Alabama,01117,19878,141
2021-01-28,St. Clair,Alabama,01115,8047,147
2021-01-28,Sumter,Alabama,01119,925,28
2021-01-28,Talladega,Alabama,01121,6711,114
2021-01-28,Tallapoosa,Alabama,01123,3258,112
2021-01-28,Tuscaloosa,Alabama,01125,22083,283
2021-01-28,Walker,Alabama,01127,6105,185
2021-01-28,Washington,Alabama,01129,1454,27
下载地址如下:
https://github.com/BIXUECHAO/covid-19-tes-data/blob/main/us-covid19-counties.csv
其中可视化图表能力采用 ModelScope
上公开的 MCP-Server
,地址为:
https://modelscope.cn/mcp/servers/@antvis/mcp-server-chart
连接方式:
{
"mcpServers": {
"mcp-server-chart": {
"type": "sse",
"url": "https://mcp.api-inference.modelscope.net/530c948576c248/sse"
}
}
}
Covid19 Query MCP Server
则是通过 FastMCP
来自定义实现。大模型端采用 OpenAI
的 GPT-4.1
模型,你也可以换成其他支持工具调通的模型。
实验最终实现的效果如下所示:
提问: 找出确诊人数前五名的州,按饼图的方式绘制给我
提问: 根据每个县的确诊人数取Top10,以柱状图的方式呈现给我
本次实验所依赖的版本如下所示:
mcp==1.9.2
openai==1.75.0
langchain==0.3.25
langchain-openai==0.3.18
langgraph==0.4.7
pymysql==1.0.3
首先创建数据表,并导入上述数据:
CREATE TABLE `us_covid19_counties` (
`date` varchar(255) CHARACTER SET utf8mb4 COLLATE utf8mb4_0900_ai_ci DEFAULT NULL COMMENT '日期',
`county` varchar(255) CHARACTER SET utf8mb4 COLLATE utf8mb4_0900_ai_ci DEFAULT NULL COMMENT '县',
`state` varchar(255) CHARACTER SET utf8mb4 COLLATE utf8mb4_0900_ai_ci DEFAULT NULL COMMENT '州',
`fips` varchar(255) CHARACTER SET utf8mb4 COLLATE utf8mb4_0900_ai_ci DEFAULT NULL COMMENT '县编码code',
`cases` int DEFAULT NULL COMMENT '累计确诊病例',
`deaths` int DEFAULT NULL COMMENT '累计死亡病例'
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci COMMENT='COVID-19 案例';
实现数据库操作,这里仅仅做了数据库交互,实际使用时应考虑很多性能细节的优化:
utils_db.py
import pymysql
defget_conn():
return pymysql.connect(
host="127.0.0.1",
port=3306,
database="test3",
user="root",
password="root",
autocommit=True
)
defquery(sql):
conn = get_conn()
cursor = conn.cursor()
cursor.execute(sql)
columns = [column[0] for column in cursor.description]
res = list()
for row in cursor.fetchall():
res.append(dict(zip(columns, row)))
cursor.close()
conn.close()
return res
创建数据查询 MCP Server
,这里实现了两个 MCP Tools
, 分别是获取表 Schema
和 执行SQL
两个操作:
import json
from mcp.server.fastmcp import FastMCP
import utils_db
mcp = FastMCP("COVID-19-DB-Query-Mcp-Server")
schema_sql = """
SELECT COLUMN_NAME, DATA_TYPE, COLUMN_COMMENT FROM INFORMATION_SCHEMA.COLUMNS
WHERE TABLE_SCHEMA = 'test3' AND TABLE_NAME = '{table}'
"""
effective_tables = ["us_covid19_counties"]
@mcp.tool()
defget_table_schema() -> str:
"""获取当前有效表的 MySQL Schema, 生成SQL之前,你应该使用该工具获取完整的表结构"""
table_schema = []
for table in effective_tables:
schemas = utils_db.query(schema_sql.format(table=table))
schemas = ", ".join([f"{s['COLUMN_NAME']} {s['DATA_TYPE']} COMMENT {s['COLUMN_COMMENT']}"for s in schemas])
table_schema.append(f"TABLE `{table}` ({schemas})")
return"\n\n".join(table_schema)
@mcp.tool()
defrun_sql(sql: str) -> str:
"""执行MySQL SQL语句查询数据,一次仅能执行一句SQL!"""
try:
returnstr(utils_db.query(sql))
except Exception as e:
returnf"执行SQL错误:{str(e)} ,请修正后重新发起。"
if __name__ == "__main__":
mcp.settings.port = 6030
mcp.run("sse")
启动 MCP Server
:
LangChain
官方关于 MCP
的集成介绍文档如下:
https://langchain-ai.github.io/langgraph/Agents/mcp/
关于MCP
的连接,通过 MultiServerMCPClient
抽取出所有的 MCP Tools
,然后交给 create_react_agent
构建智能体。
实现过程和本地控制台测试逻辑如下:
import os, config
os.environ["OPENAI_BASE_URL"] = "https://api.openai.com/v1"## 支持openai协议的 API BASE
os.environ["OPENAI_API_KEY"] = "sk-xxxxx"## 你的API KEY
from langgraph.prebuilt import create_react_agent
from langchain_mcp_adapters.client import MultiServerMCPClient
from colorama import Fore, Style
import asyncio
from langgraph.checkpoint.memory import InMemorySaver
asyncdefmain():
client = MultiServerMCPClient(
{
"chart": {
"url": "https://mcp.api-inference.modelscope.net/530c948576c248/sse",
"transport": "sse"
},
"covid19-query": {
"url": "http://127.0.0.1:6030/sse",
"transport": "sse"
}
}
)
tools = await client.get_tools()
checkpointer = InMemorySaver()
agent = create_react_agent(
"openai:gpt-4.1",
tools,
checkpointer=checkpointer,
prompt="You are an AI assistant that helps people find information."
)
config = {
"configurable": {
"thread_id": "1"
}
}
whileTrue:
question = input("请输入:")
ifnot question:
continue
if question == "q":
break
asyncfor chunk in agent.astream({"messages": [{"role": "user", "content": question}]},
stream_mode="updates",
config=config):
if"agent"in chunk:
content = chunk["agent"]["messages"][0].content
tool_calls = chunk["agent"]["messages"][0].tool_calls
if tool_calls:
for tool in tool_calls:
print(Fore.YELLOW, Style.BRIGHT, f">>> Call MCP Server: {tool['name']} , args: {tool['args']}")
else:
print(Fore.BLACK, Style.BRIGHT, f"LLM: {content}")
elif"tools"in chunk:
content = chunk["tools"]["messages"][0].content
name = chunk["tools"]["messages"][0].name
print(Fore.GREEN, Style.BRIGHT, f"<<< {name} : {content}")
if __name__ == '__main__':
asyncio.run(main())
运行测试:
提问:统计的确诊数量前十名的州,以折线图的方式展示
上述已经通过控制台实现了完整的 Agent
交互过程,下面继续改造逻辑,使用 FastAPI
封装为 OpenAI
协议接口,便于在客户端使用。
注意:这里主要为实现功能,api_key
写死在程序中为: sk-da4b6cb4a41e4cascascasc9508deb556942
(随机生成的), 后续使用客户端连接时,需要填写该 api_key
。
import os
os.environ["OPENAI_BASE_URL"] = "https://api.openai.com/v1"## 支持openai协议的 API BASE
os.environ["OPENAI_API_KEY"] = "sk-xxxxx"## 你的API KEY
from fastapi import FastAPI, HTTPException, Header, Depends
from fastapi.responses import StreamingResponse
from pydantic import BaseModel
from typing importList, Dict, Any, Optional, Generator
from langgraph.prebuilt import create_react_agent
from langchain_mcp_adapters.client import MultiServerMCPClient
from datetime import datetime
import time, uuid
app = FastAPI(title="OpenAI Compatible Chat API")
api_key = "sk-da4b6cb4a41e4cascascasc9508deb556942"
agent = None
classChatCompletionRequest(BaseModel):
model: str
messages: List[Dict[str, Any]]
temperature: Optional[float] = 1.0
max_tokens: Optional[int] = None
stream: Optional[bool] = False
classChatCompletionChoice(BaseModel):
index: int
message: List[Dict[str, Any]]
finish_reason: str
classChatCompletionResponse(BaseModel):
id: str
object: str = "chat.completion"
created: int
model: str
choices: List[ChatCompletionChoice]
usage: Dict[str, int]
classChatCompletionChunk(BaseModel):
id: str
object: str = "chat.completion.chunk"
created: int
model: str
choices: List[Dict[str, Any]]
asyncdefcreate_agent():
'''创建MCP Agent'''
client = MultiServerMCPClient(
{
"chart": {
"url": "https://mcp.api-inference.modelscope.net/530c948576c248/sse",
"transport": "sse"
},
"covid19-query": {
"url": "http://127.0.0.1:6030/sse",
"transport": "sse"
}
}
)
tools = await client.get_tools()
return create_react_agent(
"openai:gpt-4.1",,
tools,
prompt="You are an AI assistant that helps people find information.\n针对用户的问题,你的执行流程是1.获取表结构。2.生成SQL查询数据。3.生成可视化图表。4.总结结果回复用户"
)
asyncdefinvoke_agent(messages: []):
global agent
ifnot agent:
agent = await create_agent()
asyncfor chunk in agent.astream({"messages": messages}, stream_mode="updates"):
if"agent"in chunk:
content = chunk["agent"]["messages"][0].content
tool_calls = chunk["agent"]["messages"][0].tool_calls
if tool_calls:
for tool in tool_calls:
yieldf"> ```Call MCP Server: {tool['name']}```\n\n"
else:
yield content
asyncdefhandle_stream_response(messages: [],
model: str, request_id: str) -> Generator[str, None, None]:
# 执行 agent
asyncfor msg in invoke_agent(messages):
chunk_data = ChatCompletionChunk(
id=request_id,
created=int(time.time()),
model=model,
choices=[{
"index": 0,
"delta": {
"content": msg
},
"finish_reason": None
}]
)
yieldf"data: {chunk_data.model_dump_json()}\n\n"
final_chunk = ChatCompletionChunk(
id=request_id,
created=int(time.time()),
model=model,
choices=[{
"index": 0,
"delta": {},
"finish_reason": "stop"
}]
)
yieldf"data: {final_chunk.model_dump_json()}\n\n"
yield"data: [DONE]\n\n"
asyncdefverify_auth(authorization: Optional[str] = Header(None)) -> bool:
'''验证token'''
ifnot authorization:
returnFalse
if authorization.startswith("Bearer "):
token = authorization[7:]
else:
token = authorization
return token == api_key
@app.post("/v1/chat/completions")
asyncdefchat_completions(request: ChatCompletionRequest, auth_result: bool = Depends(verify_auth)):
# 检查身份验证结果
ifnot auth_result:
raise HTTPException(
status_code=401,
detail={
"error": {
"message": "Invalid authentication credentials",
"type": "invalid_request_error",
"param": None,
"code": "invalid_api_key"
}
},
headers={"WWW-Authenticate": "Bearer"}
)
## 暂不支持非流式返回
ifnot request.stream:
raise HTTPException(
status_code=400,
detail={
"error": {
"message": "Streaming responses are not implemented in this mock API",
"type": "invalid_request_error",
"param": "stream",
"code": "invalid_parameter"
}
}
)
try:
# 触发 agent 并流式返回
request_id = f"chatcmpl-{uuid.uuid4().hex[:8]}"
return StreamingResponse(
handle_stream_response(request.messages, request.model, request_id),
media_type="text/plain",
headers={
"Cache-Control": "no-cache",
"Connection": "keep-alive",
"Content-Type": "text/event-stream"
}
)
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
@app.get("/v1/models")
asyncdeflist_models():
"""列出可用模型"""
return {
"object": "list",
"data": [
{
"id": "agent_model",
"object": "model",
"created": int(time.time()),
"owned_by": "agent"
}
]
}
@app.get("/health")
asyncdefhealth_check():
"""健康检查"""
return {"status": "healthy", "timestamp": datetime.now().isoformat()}
if __name__ == "__main__":
import uvicorn
uvicorn.run(app, host="0.0.0.0", port=8000)
启动服务:
这里以 Cherry Studio
为例,你也可以换成 OpenWebUI
或者其他支持 OpenAI
协议的客户端:
提问:统计的确诊数量前十名的州,以折线图的方式展示
提问: 找出确诊人数前五名的州,按饼图的方式绘制给我
提问: 根据每个县的确诊人数取Top10,以柱状图的方式呈现给我
提问: 统计确诊数量最多的州占整体的百分比,以液体展示
53AI,企业落地大模型首选服务商
产品:场景落地咨询+大模型应用平台+行业解决方案
承诺:免费POC验证,效果达标后再合作。零风险落地应用大模型,已交付160+中大型企业
2025-05-29
2025-05-11
2025-05-09
2025-05-19
2025-05-08
2025-06-08
2025-05-12
2025-07-01
2025-05-09
2025-05-12