支持私有化部署
AI知识库

53AI知识库

学习大模型的前沿技术与行业应用场景


LangChain + MCP 构建带可视化图表功能的ChatBI智能体

发布日期:2025-07-27 21:48:21 浏览次数: 1520
作者:狂热JAVA小毕超

微信搜一搜,关注“狂热JAVA小毕超”

推荐语

如何用LangChain和MCP打造能自动生成可视化图表的ChatBI智能体?本文手把手教你实现数据分析+可视化的完整流程。

核心内容:
1. 基于COVID-19疫情数据构建ChatBI+可视化智能体的完整实现流程
2. 使用ModelScope的MCP-Server实现图表可视化功能
3. 通过FastMCP自定义实现Covid19查询服务的实战案例

杨芳贤
53AI创始人/腾讯云(TVP)最具价值专家

 

一、实现流程介绍

在本专栏的前面文章中,介绍过基于 LangChain + MCP 和 SpringAI + MCP 构建 ChatBI 智能体,但未对数据进行可视化图表的分析,而可视化图表是数据分析的重要组成部分,它能够将复杂的数据转化为直观易懂的图形表示。因此本篇文章基于 LangChain + MCP 实验构建 ChatBI + 可视化图表 综合的智能体。

使用数据说明

其中分析数据采用数据采用 COVID-19 测试案例,包括:美国 2021-01-28 号,各个县county的新冠疫情累计案例信息,包括确诊病例和死亡病例,数据格式如下所示:

date(日期),county(县),state(州),fips(县编码code),cases(累计确诊病例),deaths(累计死亡病例)
2021-01-28,Pike,Alabama,01109,2704,35
2021-01-28,Randolph,Alabama,01111,1505,37
2021-01-28,Russell,Alabama,01113,3675,16
2021-01-28,Shelby,Alabama,01117,19878,141
2021-01-28,St. Clair,Alabama,01115,8047,147
2021-01-28,Sumter,Alabama,01119,925,28
2021-01-28,Talladega,Alabama,01121,6711,114
2021-01-28,Tallapoosa,Alabama,01123,3258,112
2021-01-28,Tuscaloosa,Alabama,01125,22083,283
2021-01-28,Walker,Alabama,01127,6105,185
2021-01-28,Washington,Alabama,01129,1454,27

下载地址如下:

https://github.com/BIXUECHAO/covid-19-tes-data/blob/main/us-covid19-counties.csv

整体实现过程如下所示:

其中可视化图表能力采用 ModelScope 上公开的 MCP-Server,地址为:

https://modelscope.cn/mcp/servers/@antvis/mcp-server-chart

连接方式:

{
  "mcpServers": {
    "mcp-server-chart": {
      "type": "sse",
      "url": "https://mcp.api-inference.modelscope.net/530c948576c248/sse"
    }
  }
}

Covid19 Query MCP Server 则是通过 FastMCP 来自定义实现。大模型端采用 OpenAI 的 GPT-4.1 模型,你也可以换成其他支持工具调通的模型。

实验最终实现的效果如下所示:

提问: 找出确诊人数前五名的州,按饼图的方式绘制给我

提问: 根据每个县的确诊人数取Top10,以柱状图的方式呈现给我

本次实验所依赖的版本如下所示:

mcp==1.9.2
openai==1.75.0
langchain==0.3.25
langchain-openai==0.3.18
langgraph==0.4.7
pymysql==1.0.3

二、搭建 数据查询 MCP Server

首先创建数据表,并导入上述数据:

CREATE TABLE `us_covid19_counties` (
  `datevarchar(255CHARACTER SET utf8mb4 COLLATE utf8mb4_0900_ai_ci DEFAULT NULL COMMENT '日期',
  `county` varchar(255CHARACTER SET utf8mb4 COLLATE utf8mb4_0900_ai_ci DEFAULT NULL COMMENT '县',
  `state` varchar(255CHARACTER SET utf8mb4 COLLATE utf8mb4_0900_ai_ci DEFAULT NULL COMMENT '州',
  `fips` varchar(255CHARACTER SET utf8mb4 COLLATE utf8mb4_0900_ai_ci DEFAULT NULL COMMENT '县编码code',
  `cases` int DEFAULT NULL COMMENT '累计确诊病例',
  `deaths` int DEFAULT NULL COMMENT '累计死亡病例'
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci COMMENT='COVID-19 案例';

实现数据库操作,这里仅仅做了数据库交互,实际使用时应考虑很多性能细节的优化:

utils_db.py

import pymysql

defget_conn():
    return pymysql.connect(
        host="127.0.0.1",
        port=3306,
        database="test3",
        user="root",
        password="root",
        autocommit=True
    )

defquery(sql):
    conn = get_conn()
    cursor = conn.cursor()
    cursor.execute(sql)
    columns = [column[0for column in cursor.description]
    res = list()
    for row in cursor.fetchall():
        res.append(dict(zip(columns, row)))
    cursor.close()
    conn.close()
    return res

创建数据查询 MCP Server,这里实现了两个 MCP Tools, 分别是获取表 Schema 和 执行SQL 两个操作:

import json

from mcp.server.fastmcp import FastMCP
import utils_db

mcp = FastMCP("COVID-19-DB-Query-Mcp-Server")

schema_sql = """
SELECT COLUMN_NAME, DATA_TYPE, COLUMN_COMMENT FROM INFORMATION_SCHEMA.COLUMNS 
WHERE TABLE_SCHEMA = 'test3' AND TABLE_NAME = '{table}'
"""


effective_tables = ["us_covid19_counties"]

@mcp.tool()
defget_table_schema() -> str:
    """获取当前有效表的 MySQL Schema, 生成SQL之前,你应该使用该工具获取完整的表结构"""
    table_schema = []
    for table in effective_tables:
        schemas = utils_db.query(schema_sql.format(table=table))
        schemas = ", ".join([f"{s['COLUMN_NAME']} {s['DATA_TYPE']} COMMENT {s['COLUMN_COMMENT']}"for s in schemas])
        table_schema.append(f"TABLE `{table}` ({schemas})")
    return"\n\n".join(table_schema)


@mcp.tool()
defrun_sql(sql: str) -> str:
    """执行MySQL SQL语句查询数据,一次仅能执行一句SQL!"""
    try:
        returnstr(utils_db.query(sql))
    except Exception as e:
        returnf"执行SQL错误:{str(e)} ,请修正后重新发起。"


if __name__ == "__main__":
    mcp.settings.port = 6030
    mcp.run("sse")

启动 MCP Server

运行效果图

三、LangChain MCP Client 智能体构建

LangChain 官方关于 MCP 的集成介绍文档如下:

https://langchain-ai.github.io/langgraph/Agents/mcp/

关于MCP的连接,通过 MultiServerMCPClient 抽取出所有的 MCP Tools ,然后交给 create_react_agent 构建智能体。

实现过程和本地控制台测试逻辑如下:

import os, config

os.environ["OPENAI_BASE_URL"] = "https://api.openai.com/v1"## 支持openai协议的 API BASE
os.environ["OPENAI_API_KEY"] = "sk-xxxxx"## 你的API KEY
from langgraph.prebuilt import create_react_agent
from langchain_mcp_adapters.client import MultiServerMCPClient
from colorama import Fore, Style
import asyncio
from langgraph.checkpoint.memory import InMemorySaver

asyncdefmain():
    client = MultiServerMCPClient(
        {
            "chart": {
                "url""https://mcp.api-inference.modelscope.net/530c948576c248/sse",
                "transport""sse"
            },
            "covid19-query": {
                "url""http://127.0.0.1:6030/sse",
                "transport""sse"
            }
        }
    )
    tools = await client.get_tools()
    checkpointer = InMemorySaver()
    agent = create_react_agent(
        "openai:gpt-4.1",
        tools,
        checkpointer=checkpointer,
        prompt="You are an AI assistant that helps people find information."
    )
    config = {
        "configurable": {
            "thread_id""1"
        }
    }
    whileTrue:
        question = input("请输入:")
        ifnot question:
            continue
        if question == "q":
            break
        asyncfor chunk in agent.astream({"messages": [{"role""user""content": question}]},
                                         stream_mode="updates",
                                         config=config):
            if"agent"in chunk:
                content = chunk["agent"]["messages"][0].content
                tool_calls = chunk["agent"]["messages"][0].tool_calls
                if tool_calls:
                    for tool in tool_calls:
                        print(Fore.YELLOW, Style.BRIGHT, f">>> Call MCP Server: {tool['name']} , args: {tool['args']}")
                else:
                    print(Fore.BLACK, Style.BRIGHT, f"LLM: {content}")
            elif"tools"in chunk:
                content = chunk["tools"]["messages"][0].content
                name = chunk["tools"]["messages"][0].name
                print(Fore.GREEN, Style.BRIGHT, f"<<< {name} : {content}")


if __name__ == '__main__':
    asyncio.run(main())

运行测试:

提问:统计的确诊数量前十名的州,以折线图的方式展示



四、封装为 OpenAI 协议接口

上述已经通过控制台实现了完整的 Agent 交互过程,下面继续改造逻辑,使用 FastAPI 封装为 OpenAI 协议接口,便于在客户端使用。

注意:这里主要为实现功能,api_key 写死在程序中为: sk-da4b6cb4a41e4cascascasc9508deb556942 (随机生成的), 后续使用客户端连接时,需要填写该 api_key 。

import os
os.environ["OPENAI_BASE_URL"] = "https://api.openai.com/v1"## 支持openai协议的 API BASE
os.environ["OPENAI_API_KEY"] = "sk-xxxxx"## 你的API KEY

from fastapi import FastAPI, HTTPException, Header, Depends
from fastapi.responses import StreamingResponse
from pydantic import BaseModel
from typing importListDictAnyOptional, Generator
from langgraph.prebuilt import create_react_agent
from langchain_mcp_adapters.client import MultiServerMCPClient
from datetime import datetime
import time, uuid

app = FastAPI(title="OpenAI Compatible Chat API")
api_key = "sk-da4b6cb4a41e4cascascasc9508deb556942"
agent = None


classChatCompletionRequest(BaseModel):
    model: str
    messages: List[Dict[strAny]]
    temperature: Optional[float] = 1.0
    max_tokens: Optional[int] = None
    stream: Optional[bool] = False


classChatCompletionChoice(BaseModel):
    index: int
    message: List[Dict[strAny]]
    finish_reason: str


classChatCompletionResponse(BaseModel):
    idstr
    objectstr = "chat.completion"
    created: int
    model: str
    choices: List[ChatCompletionChoice]
    usage: Dict[strint]


classChatCompletionChunk(BaseModel):
    idstr
    objectstr = "chat.completion.chunk"
    created: int
    model: str
    choices: List[Dict[strAny]]


asyncdefcreate_agent():
    '''创建MCP Agent'''
    client = MultiServerMCPClient(
        {
            "chart": {
                "url""https://mcp.api-inference.modelscope.net/530c948576c248/sse",
                "transport""sse"
            },
            "covid19-query": {
                "url""http://127.0.0.1:6030/sse",
                "transport""sse"
            }
        }
    )
    tools = await client.get_tools()
    return create_react_agent(
        "openai:gpt-4.1",,
        tools,
        prompt="You are an AI assistant that helps people find information.\n针对用户的问题,你的执行流程是1.获取表结构。2.生成SQL查询数据。3.生成可视化图表。4.总结结果回复用户"
    )


asyncdefinvoke_agent(messages: []):
    global agent
    ifnot agent:
        agent = await create_agent()
    asyncfor chunk in agent.astream({"messages": messages}, stream_mode="updates"):
        if"agent"in chunk:
            content = chunk["agent"]["messages"][0].content
            tool_calls = chunk["agent"]["messages"][0].tool_calls
            if tool_calls:
                for tool in tool_calls:
                    yieldf"> ```Call MCP Server: {tool['name']}```\n\n"
            else:
                yield content


asyncdefhandle_stream_response(messages: [],
                                 model: str, request_id: str
) -> Generator[strNoneNone]:
    # 执行 agent
    asyncfor msg in invoke_agent(messages):
        chunk_data = ChatCompletionChunk(
            id=request_id,
            created=int(time.time()),
            model=model,
            choices=[{
                "index"0,
                "delta": {
                    "content": msg
                },
                "finish_reason"None
            }]
        )
        yieldf"data: {chunk_data.model_dump_json()}\n\n"

    final_chunk = ChatCompletionChunk(
        id=request_id,
        created=int(time.time()),
        model=model,
        choices=[{
            "index"0,
            "delta": {},
            "finish_reason""stop"
        }]
    )
    yieldf"data: {final_chunk.model_dump_json()}\n\n"
    yield"data: [DONE]\n\n"


asyncdefverify_auth(authorization: Optional[str] = Header(None)) -> bool:
    '''验证token'''
    ifnot authorization:
        returnFalse
    if authorization.startswith("Bearer "):
        token = authorization[7:]
    else:
        token = authorization
    return token == api_key


@app.post("/v1/chat/completions")
asyncdefchat_completions(request: ChatCompletionRequest, auth_result: bool = Depends(verify_auth)):
    # 检查身份验证结果
    ifnot auth_result:
        raise HTTPException(
            status_code=401,
            detail={
                "error": {
                    "message""Invalid authentication credentials",
                    "type""invalid_request_error",
                    "param"None,
                    "code""invalid_api_key"
                }
            },
            headers={"WWW-Authenticate""Bearer"}
        )
    ## 暂不支持非流式返回
    ifnot request.stream:
        raise HTTPException(
            status_code=400,
            detail={
                "error": {
                    "message""Streaming responses are not implemented in this mock API",
                    "type""invalid_request_error",
                    "param""stream",
                    "code""invalid_parameter"
                }
            }
        )
    try:
        # 触发 agent 并流式返回
        request_id = f"chatcmpl-{uuid.uuid4().hex[:8]}"
        return StreamingResponse(
            handle_stream_response(request.messages, request.model, request_id),
            media_type="text/plain",
            headers={
                "Cache-Control""no-cache",
                "Connection""keep-alive",
                "Content-Type""text/event-stream"
            }
        )
    except Exception as e:
        raise HTTPException(status_code=500, detail=str(e))


@app.get("/v1/models")
asyncdeflist_models():
    """列出可用模型"""
    return {
        "object""list",
        "data": [
            {
                "id""agent_model",
                "object""model",
                "created"int(time.time()),
                "owned_by""agent"
            }
        ]
    }


@app.get("/health")
asyncdefhealth_check():
    """健康检查"""
    return {"status""healthy""timestamp": datetime.now().isoformat()}


if __name__ == "__main__":
    import uvicorn
    uvicorn.run(app, host="0.0.0.0", port=8000)

启动服务:

五、使用 OpenAI 协议客户端进行测试

这里以 Cherry Studio 为例,你也可以换成 OpenWebUI 或者其他支持 OpenAI协议的客户端:

配置连接:

对话测试:

提问:统计的确诊数量前十名的州,以折线图的方式展示

提问: 找出确诊人数前五名的州,按饼图的方式绘制给我

提问: 根据每个县的确诊人数取Top10,以柱状图的方式呈现给我

提问: 统计确诊数量最多的州占整体的百分比,以液体展示

 

53AI,企业落地大模型首选服务商

产品:场景落地咨询+大模型应用平台+行业解决方案

承诺:免费POC验证,效果达标后再合作。零风险落地应用大模型,已交付160+中大型企业

联系我们

售前咨询
186 6662 7370
预约演示
185 8882 0121

微信扫码

添加专属顾问

回到顶部

加载中...

扫码咨询