微信扫码
添加专属顾问
我要投稿
探索如何让AI智能体像人类一样拥有持久记忆,LangGraph框架下的长短期记忆管理实践指南为你揭秘。核心内容: 1. Agent Memory的基本概念与核心作用 2. LangGraph框架下短期记忆与长期记忆的实现方法 3. 基于MCP协议构建真实Multi-Agent系统的实战案例
作者:adacyang
如何让AI智能体(Agent)像人类一样拥有持久的记忆,从而在复杂的连续任务中保持上下文感知和深度理解?这已成为构建高级智能体的核心挑战。本文将深入探讨Agent Memory的核心概念,并聚焦于LangGraph框架下的长短期记忆实现,详解短期会话与长期知识的存储、管理、语义检索等技巧。更进一步地,我们将通过一个引入MCP协议的实战案例,手把手带你构建一个真实的融合长记忆机制的Multi-Agent系统,直观展示中断、记忆与协作的融合。
基于大语言模型(LLM)的智能体(Agent)系统中,记忆机制是实现持续、连贯和个性化交互的核心基石,通过记忆,可以让Agent记住过往的交互,保持上下文的一致性,并能从反馈中学习,适应用户的偏好。
本文核心要点概述:
1.介绍Agent Memory的基本情况
2.LangGraph长短期记忆详解及案例说明:包含短期记忆实现、管理方法,长期记忆的实现方法,以及搭建了融合postgres数据库、集成Embedding服务进行语义搜索等可用于生产环境的真实案例。
3.引入MCP协议构建真实的Agent长记忆应用:搭建一个基于supervisor架构,集成中断机制、长短期记忆机制的multi-agent系统。
上图中(来源于Mem0[1]),左边是没有Memory的agent,右边是有Memory的agent,后者可以根据用户的过往信息(素食主义者、不喜欢乳制品)给出更合理的响应(不含乳制品的素食菜单),而前者的回答显然是不合适的。
简单来说,Memory是赋予Agent记忆能力的技术和架构,能够让Agent像人一样记住过去的交互、学到的知识、执行过的任务及未来的计划,是将一个LLM转变为能够执行复杂、长期任务的真正”智能体“的核心所在。
如何获取记忆:通过和用户交互、环境交互...
怎么组织记忆:模型参数、模型上下文、数据库
怎么利用记忆:RAG、Few-shot...
关于Memory的分类,有许多种分类体系,这里我们只讨论最常见及最易于理解的。
正如人类利用长短期记忆进行有效的交互和学习一样,Agent的记忆机制通常划分为短期记忆(short-term memory)和长期记忆(long-term memory),短期记忆决定了Agent在微观任务上的即时表现,而长期记忆则作为持久知识库,决定了Agent在宏观时间尺度上的智能深度和个性化水平,通过两者配合,Agent才能表现出连贯性、上下文感知能力,才会显得更智能。
Agent通常通过以下几步来有效地管理记忆,使得每次于用户的交互都更加精准智能:
LangGraph[2]作为一款面向多智能体协作与状态管理的框架,其设计了巧妙的记忆管理系统,旨在为Agent提供在不同交互中存储、检索和利用信息的能力。它区分了两种主要的记忆类型:短期记忆和长期记忆。在实际使用中,通过这两种记忆协同,既能保障实时任务的高效执行,又支持了跨任务、跨周期的经验复用。
● short-term memory(通过Checkpointer实现):针对单个对话线程,核心价值在于保障对话的临时性,使得Agent能够跟踪会话中的多轮对话,可以在该线程内的任何时刻被回忆。
● long-term memory(通过Store实现):可以跨对话线程共享,可以在任何时间,任何线程中被回忆,而不像短期记忆局限于单个对话。
通过下表,可以更清晰的看到两者的区别:
要想更好的理解LangGraph中的记忆机制,首先需要理解其支持双轨记忆系统的核心概念。
LangGraph有一个内置的持久化(Persistence)层,通过checkpointer实现,能够持久化存储图状态,这使得开发记忆功能和人类干预功能成为可能。
当使用检查点编译一个图时,检查点会在每个super-step保存图状态的checkpoint,这些checkpoint被保存到一个thread中,可以在图执行后访问。因为threads允许在执行后访问图的状态,所以可以实现记忆、人机协作、时间旅行、容错等多种强大的功能。
工作流程:
用户输入 → [节点 1] → 💾 保存状态 → [节点 2] → 💾 保存状态 → 输出
↓ ↓
Checkpoint 1 Checkpoint 2
为了管理多个独立的对话,LangGraph使用了thread的概念。thread_id是由checkpointer保存的每个checkpoint的唯一id,是激活和区分不同对话线程的唯一key。在调用图的invoke或stream方法时,通过configurable字典传入一个thread_id,就代表这次操作属于thread_id这个特定的对话。
如上所述,图状态可以由checkpointer在每个super-step写入线程,从而实现状态的持久化。但是,如果想在多个线程之间保留一些信息的话,那么就需要用到Store。Store本质上是一个暴露给图节点和工具的键值数据库,与checkpointer的自动化快照不同,Store需要显式和主动的进行操作。
Store中的数据通常通过更持久的标识来组织。user_id是最常见的,用于关联用户的所有信息,此外,namespace提供了一种数据隔离机制,例如,使用使用 (“memories”, user_id) 这样的元组作为命名空间,可以将用户的记忆与其他类型的数据(如用户偏好 (“preferences”, user_id))清晰地分离开来,避免数据冲突,保持知识库的整洁有序。
对于开发、原型设计和测试阶段,最简单快捷的方式是使用InMemorySaver。它将所有的对话状态存储在内存中的一个Python字典里。
1.设置记忆管理检查点
from langchain_openai import ChatOpenAI
from langchain.chat_models import init_chat_model
from langgraph.checkpoint.memory import InMemorySaver
from langgraph.prebuilt import create_react_agent
# 初始化检查点保存器
checkpointer = InMemorySaver()
2.定义大模型并创建agent
BASE_URL=""
TOKEN=""
MODEL_NAME=""
model = init_chat_model(
model=MODEL_NAME,
model_provider="openai",
base_url=BASE_URL,
api_key=TOKEN,
temperature=0,
)
agent = create_react_agent(
model=model,
tools=[],
# 传入检查点,是将持久化能力“注入”图的关键步骤。编译后的graph对象现在具备了状态管理的能力。
checkpointer=checkpointer
)
如果是底层自定义api在图构建阶段传入检查点的代码是graph = builder.compile(checkpointer=checkpointer)。
3.短期记忆-内存后端
config = {"configurable": {"thread_id": "1"}} # 激活记忆机制的核心。如果没有提供thread_id,每次invoke调用都将是无状态的,只要使用相同的thread_id,LangGraph就会在多次调用之间维持对话状态
response = agent.invoke(
{"messages": [{"role": "user", "content": "你好,我叫ada!"}]},
config
)
print(f"thread1_bot_answer:{response['messages'][-1].content}")
response = agent.invoke(
{"messages": [{"role": "user", "content": "你好,请问你还记得我叫什么名字么?"}]},
config
)
print('------------线程1------------------')
print(f"thread1_bot_answer:{response['messages'][-1].content}")
new_config = {"configurable": {"thread_id": "2"}}
response = agent.invoke(
{"messages": [{"role": "user", "content": "你好,请问你还记得我叫什么名字么?"}]},
new_config
)
print('------------线程2------------------')
print(f"thread2_bot_answer:{response['messages'][-1].content}")
执行上面代码,可以看到输出如下:
thread1_bot_answer:你好,Ada!很高兴认识你!😊
这是一个很美的名字呢!有什么我可以帮助你的吗?无论是想聊聊天,还是有任何问题需要解答,我都很乐意为你提供帮助。
------------线程1------------------
thread1_bot_answer:当然记得!你刚才告诉我你叫 Ada~很高兴再次和你打招呼!😊
------------线程2------------------
thread2_bot_answer:你好!很抱歉,我无法记住之前对话中的个人信息,比如你的名字。这是为了保护你的隐私,所以我不会保留这类数据。你可以告诉我你的名字,或者任何你想让我称呼你的方式,我会很乐意在这次的对话中使用它!😊
短期记忆与线程相关,在对话时,需要在配置中传入thread_id。通过上面的结果我们可以看到,当我们传入相同的thread_id时,agent就可以记住用户的名字,然而当我们更换thread_id时,agent就不记得用户的名字了。
需要注意的是,InMemorySaver将所有状态都保存在内存中,一旦程序终止,那么所有对话历史都会消失。
可以发现,上面一小节的代码在应用程序结束后再启动,记忆就又消失了。这是因为InMemorySaver仅仅是把记忆保存在内存中,应用程序结束后释放内存记忆就消失了。在生产环境中常常使用数据库支持的检查点记录器持久化保存记忆,以保证数据的可靠性和服务的连续性。
这里我们以postgres数据库为例来说明,怎么持久化地保存记忆数据。
1.首先安装以下依赖:
pip install -U "psycopg[binary,pool]" langgraph-checkpoint-postgres
2.安装postgres数据库,具体的安装方法可以参考:Linux下安装PostgreSQL_linux安装postgresql-CSDN博客。这里选择以源码的方式进行安装,安装包从官网(PostgreSQL: Downloads)下载,选择最新的postgresql-18.0.tar.gz。
3.安装数据库成功后,编码如下代码。
DB_URI是数据库连接的URL。想要自动保存在数据库中的State需要在PostgresSaver.from_conn_string(DB_URI)上下文中操作。
from langchain.chat_models import init_chat_model
from langgraph.graph import StateGraph, MessagesState, START
from langgraph.checkpoint.postgres import PostgresSaver
BASE_URL=""
TOKEN=""
MODEL_NAME=""
model = init_chat_model(
model=MODEL_NAME,
model_provider="openai",
base_url=BASE_URL,
api_key=TOKEN,
temperature=0,
)
DB_URI = "postgresql://postgres:postgres@localhost:5432/postgres?sslmode=disable"
with PostgresSaver.from_conn_string(DB_URI) as checkpointer:
checkpointer.setup() # 第一次调用时必须要setup()
def call_model(state: MessagesState):
response = model.invoke(state["messages"])
return {"messages": response}
builder = StateGraph(MessagesState)
builder.add_node(call_model)
builder.add_edge(START, "call_model")
graph = builder.compile(checkpointer=checkpointer)
config = {
"configurable": {
"thread_id": "1"
}
}
response = graph.invoke(
{"messages": [{"role": "user", "content": "你好,我叫ada!"}]},
config
)
print(response['messages'][-1].content)
response = graph.invoke(
{"messages": [{"role": "user", "content": "你好,请问你还记得我叫什么名字么?"}]},
config
)
print(response['messages'][-1].content)
运行一次上述代码后,关闭应用程序后重启,再次运行上述代码,print结果如下:
bot_answer_1:你好,Ada!很高兴再次见到你!😊
你的名字真动听!今天有什么我可以帮你解答或者想聊的话题吗?
bot_answer_2:当然记得!你告诉我你叫 **Ada**。很高兴再次和你打招呼!😊
可以看到,记忆已经被保存了。我们检查数据库可以发现,postgres数据库中出现了四个表:
上述表中,checkpoints表是”状态快照“表,每当程序执行一个step时,它就会在这张表中创建一条新记录,这条记录就是一个检查点的快照。查询该表,可以得到如下结果:
接下来,我们来分析每一列的含义:
理解了上面checkpoints表后,那么不禁会问,真正的消息内容被存到了哪里呢?真正的消息内容存储在checkpoint_writes表中,如下:
除了PostgreSQL之外,LangGraph还支持MongoDB、Redis等数据库。
当构建复杂的、由多个子图嵌套而成的应用时,需要更灵活的记忆管理策略。
● 记忆继承(默认):默认情况下,子图会继承其父图的checkpointer。这意味着整个嵌套图共享同一个对话状态,数据可以在父子图之间无缝流动。这对于将一个大型任务分解为多个模块化子任务非常有用。
● 记忆隔离:在某些场景下,例如构建多智能体系统,希望每个智能体(由一个子图表示)拥有自己独立的内存空间,互不干扰。此时,可以在编译子图时设置checkpointer=True。
如下代码,可以在子图中直接使用父图的短期记忆:
from langgraph.graph import START, StateGraph
from langgraph.checkpoint.memory import InMemorySaver
from typing import TypedDict
class State(TypedDict):
foo: str
# 子图
def subgraph_node_1(state: State):
return {"foo": state["foo"] + "bar"}
subgraph_builder = StateGraph(State)
subgraph_builder.add_node(subgraph_node_1)
subgraph_builder.add_edge(START, "subgraph_node_1")
subgraph = subgraph_builder.compile()
# 父图
builder = StateGraph(State)
builder.add_node("node_1", subgraph)
builder.add_edge(START, "node_1")
checkpointer = InMemorySaver()
graph = builder.compile(checkpointer=checkpointer)
如果子图希望使用自己的短期记忆,那么需要在编译子图时,显示传入子图的检查点:
subgraph_builder = StateGraph(...)
subgraph = subgraph_builder.compile(checkpointer=True)
在工具中读取状态:
LangGraph允许工具直接访问和读取当前的图状态,使其具备上下文感知能力。
核心机制:state: Annotated[CustomState, InjectedState],InjectedState的作用是在调用这个工具时,将当前的完整状态对象作为第一个参数传递到工具中,使得这个工具能根据当前状态来执行更智能的操作。
from typing import Annotated
from langchain.chat_models import init_chat_model
from langgraph.prebuilt import InjectedState, create_react_agent
from langgraph.prebuilt.chat_agent_executor import AgentState
BASE_URL=""
TOKEN=""
MODEL_NAME=""
model = init_chat_model(
model=MODEL_NAME,
model_provider="openai",
base_url=BASE_URL,
api_key=TOKEN,
temperature=0,
)
class CustomState(AgentState):
user_id: str
def get_user_info(
state: Annotated[CustomState, InjectedState]
) -> str:
"""查询用户信息"""
user_id = state["user_id"]
return"ada"if state["user_id"] == "user_ada"else"Unknown"
agent = create_react_agent(
model=model,
tools=[get_user_info],
state_schema=CustomState,
)
agent.invoke({
"messages": "查询用户信息",
"user_id": "user_ada"
})
返回结果如下:
根据查询结果,您的用户信息显示用户名为:**ada**\n\n这是您的用户信息查询结果。如果您需要了解其他信息或有其他需求,请告诉我。
在工具中写入状态:
如果要在工具执行期间修改图的记忆,那么可以直接从工具返回状态更新。这对于持久化中间结果、传递信息给后续工具等非常有用。
核心机制:工具返回Command对象。此时,LangGraph会将其返回值解释为对状态的直接修改指令。Command(update={...})中的字典定义了要更新的状态字段及其新值。这允许工具在完成其主要任务的同时,将结果写回智能体的短期记忆中,从而影响后续的决策。
from typing import Annotated
from langchain_core.tools import InjectedToolCallId
from langchain_core.runnables import RunnableConfig
from langchain_core.messages import ToolMessage
from langgraph.prebuilt import InjectedState, create_react_agent
from langgraph.prebuilt.chat_agent_executor import AgentState
from langgraph.types import Command
class CustomState(AgentState):
user_name: str
def update_user_info(
tool_call_id: Annotated[str, InjectedToolCallId],
config: RunnableConfig
) -> Command:
"""查询并更新用户信息"""
user_id = config["configurable"].get("user_id")
name = "ada"if user_id == "user_123"else"Unknown user"
return Command(update={
"user_name": name,
# 更新消息历史记录
"messages": [
ToolMessage(
"成功查询到用户信息",
tool_call_id=tool_call_id
)
]
})
def greet(
state: Annotated[CustomState, InjectedState]
) -> str:
"""找到用户信息后,使用此方式向用户问好。"""
user_name = state["user_name"]
return f"你好 {user_name}!"
agent = create_react_agent(
model=model,
tools=[update_user_info, greet],
state_schema=CustomState
)
agent.invoke(
{"messages": [{"role": "user", "content": "向用户打招呼"}]},
config={"configurable": {"user_id": "user_123"}}
)
输出结果如下:
LangGraph中的长期记忆允许系统在不同对话中保留信息,是跨对话线程共享的,可以在任何时间、任何线程中被回忆。与短期记忆不同,长期记忆保存在自定义的命名空间中,每个记忆都组织在一个自定义的namespace和一个唯一的key下。
记忆存储:LangGraph将长期记忆存储为JSON文档,使用Store进行管理,允许存储结构化和非结构化的数据。
记忆更新时机:
● 热路径(Hot Path):在应用程序逻辑运行时实时创建记忆(store.put()),优点是实时更新,但可能增加程序复杂性、延迟等问题。
● 后台(Background):作为单独的异步任务创建记忆(store.put()),优点是避免主应用延迟、逻辑分离,难点在于确定更新频率和触发时机。
记忆检索:
● store.get():根据命名空间和键精确获取记忆。
● store.search():在指定命名空间内实现灵活记忆检索,不但可以通过命名空间和标识符,更可以通过语义检索到记忆内容。通常需要Store配置一个embed来支持语义搜索。
记忆的应用:
● 语义记忆:存储事实和概念。分为以下两种情况:Profile:将关于用户、组织或代理自身的特定信息存储为一个持续更新的JSON文档,需要模型来生成新的Profile或更新已有JSON档案;Collection:将记忆存储为一组独立的文档,易于生成,但检索和更新较为复杂,且可能难以捕获记忆间的完整上下文。在应用时,可以将检索到的记忆作为上下文或系统指令的一部分传递给LLM,用于个性化响应和回答事实性问题。
● 情景记忆:存储过去的事件或行为经验。通常通过few-shot example prompt来实现,以指导模型完成任务。
● 程序记忆:存储执行任务的规则或指令。通常通过修改代码自身的prompt来实现,将其应用于LLM。
与Checkpointer类似,InMemoryStore用于快速开发和原型验证。它将所有数据存储在内存中。
from langchain.chat_models import init_chat_model
from langchain_core.runnables import RunnableConfig
from langgraph.store.memory import InMemoryStore
from langgraph.config import get_store
from langgraph.prebuilt import create_react_agent
store = InMemoryStore()
BASE_URL=""
TOKEN=""
MODEL_NAME=""
model = init_chat_model(
model=MODEL_NAME,
model_provider="openai",
base_url=BASE_URL,
api_key=TOKEN,
temperature=0,
)
store.put(
("users",), # 命名空间:元组类型,类比文件系统中的文件夹,支持分层组织结构
"user_123", # 键: 字符串,是命名空间内的唯一标识符,一般推荐使用uuid库生成唯一标识符
{
"name": "ada",
"language": "中文",
} # 值:Python字典类型,比如保存公共角色资料时可以是包含姓名、偏好等键值对的字典
)
def get_user_info(config: RunnableConfig) -> str:
"""查找用户信息的函数,可以查看长期记忆中储存的用户信息"""
store = get_store() # 获取上下文中可用的store实例
user_id = config["configurable"].get("user_id")
user_info = store.get(("users",), user_id) # 输入命名空间和键进行精确查询
return str(user_info.value) if user_info else"Unknown user"
agent = create_react_agent(
model=model,
tools=[get_user_info],
# 传入store
store=store
)
response = agent.invoke(
{"messages": [{"role": "user", "content": "帮我查找长期记忆中储存的用户信息"}]},
config={"configurable": {"user_id": "user_123"}}
)
print(response['messages'])
输出结果如下,可以看到在工具函数中成功调用store查找到了保存的用户信息:
为了让记忆真正”长期“,生产环境必须使用数据库支持的Store,LangGraph目前主要支持PostgresStore和RedisStore。我们以PostgresStore为例来进行说明。
pip install -U "psycopg[binary,pool]" langgraph-checkpoint-postgres
Postgres数据库的安装,请参考上文。
接下来进行示例说明。整体过程与Checkpointer类似,关键区别在于Store是怎样在节点内部被访问和使用的。
import uuid
from langchain.chat_models import init_chat_model
from langchain_core.runnables import RunnableConfig
from langgraph.graph import StateGraph, MessagesState, START
from langgraph.store.base import BaseStore
from langgraph.store.postgres import PostgresStore
from langgraph.checkpoint.postgres import PostgresSaver
BASE_URL=""
TOKEN=""
MODEL_NAME=""
model = init_chat_model(
model=MODEL_NAME,
model_provider="openai",
base_url=BASE_URL,
api_key=TOKEN,
temperature=0,
)
DB_URI = "postgresql://postgres:postgres@localhost:5432/postgres?sslmode=disable"
with (
PostgresStore.from_conn_string(DB_URI) as store,
PostgresSaver.from_conn_string(DB_URI) as checkpointer,
):
store.setup() # 第一次调用时必须要setup()
#checkpointer.setup()
# 声明store参数
def call_model(
state: MessagesState,
config: RunnableConfig,
*,
store: BaseStore, # 在节点中访问store的标准方式,需要在函数签名上,加一个store
):
# 从store中读取记忆
user_id = config["configurable"]["user_id"]
namespace = ("memories", user_id)
memories = store.search(namespace, query=str(state["messages"][-1].content))
info = "\n".join([d.value["data"] for d in memories])
system_msg = f"你是一个与人类交流的小助手,用户信息: {info}"
# 向store中写入记忆
last_message = state["messages"][-1]
if"记住"in last_message.content.lower():
memory = "用户名字是ada"
store.put(namespace, str(uuid.uuid4()), {"data": memory})
response = model.invoke([{"role": "system", "content": system_msg}] + state["messages"])
return {"messages": response}
builder = StateGraph(MessagesState)
builder.add_node(call_model)
builder.add_edge(START, "call_model")
graph = builder.compile(checkpointer=checkpointer, store=store) # agent同时配备了短期记忆和长期记忆能力
# 第一次对话,告诉agent用户的名字
config = {
"configurable": {
"thread_id": "3",
"user_id": "1",
}
}
response = graph.invoke(
{"messages": [{"role": "user", "content": "你好,我叫ada!记住这个名字呦~"}]},
config
)
print(response['messages'][-1].content)
# 第二次对话,新线程,询问agent记不记得用户的名字
config = {
"configurable": {
"thread_id": "4",
"user_id": "1",
}
}
response = graph.invoke(
{"messages": [{"role": "user", "content": "我的名字是什么?"}]},
config
)
print(response['messages'][-1].content)
输出结果如下:
# ------------------第一次对话----------------------
你好ada!很高兴认识你~我已经记住你的名字啦!✨
有什么我可以帮你的吗?
# ------------------第二次对话----------------------
你的名字是ada
在第一次对话时,对话线程id为3,agent被要求记住用户的名字,并且根据代码逻辑,用户名字信息是通过store.put()写入数据库的。第二次对话时,线程id为4,当被问起用户的名字时,agent通过store.search()办法从数据库中检索到了这个信息,并成功回答,这展示了Store的跨记忆存储能力。
在工具中读取长期记忆:
参考上文中,长期记忆-InMemoryStore中的示例。
其中,核心在于store = get_store() ,这个函数是一个上下文感知的辅助函数,能够在工具执行时,自动获取并返回compile或create_react_agent中传入的store实例。
在工具中写入长期记忆:
from typing_extensions import TypedDict
from langchain.chat_models import init_chat_model
from langgraph.config import get_store
from langchain_core.runnables import RunnableConfig
from langgraph.prebuilt import create_react_agent
from langgraph.store.memory import InMemoryStore
store = InMemoryStore()
BASE_URL=""
TOKEN=""
MODEL_NAME=""
model = init_chat_model(
model=MODEL_NAME,
model_provider="openai",
base_url=BASE_URL,
api_key=TOKEN,
temperature=0,
)
class UserInfo(TypedDict):
name: str
def save_user_info(user_info: UserInfo, config: RunnableConfig) -> str:
"""将用户信息保存到store"""
store = get_store()
user_id = config["configurable"].get("user_id")
store.put(("users",), user_id, user_info)
return"成功保存了用户信息"
agent = create_react_agent(
model=model,
tools=[save_user_info],
store=store
)
agent.invoke(
{"messages": [{"role": "user", "content": "我叫ada!请你记住我的名字"}]},
config={"configurable": {"user_id": "user_123"}}
)
首先需要定义要存储的数据内容,即UserInfo,它为LLM提供了一个清晰的结构化输出格式,当LLM决定调用save_user_info的工具时,会自动生成一个包含name字段的字典。然后调用store.put()方法,将数据存储下来。
store.get(("users",), "user_123").value
# 输出:{'name': 'ada'}
Store最强大的功能之一是支持语义搜索,这能将Store从一个简单的键值数据库,转变为一个功能完备的向量数据库。智能体不再只能通过精确的关键词来检索记忆,而是能够根据概念的相似性来查找相关信息。这实际就是一套RAG流程。
下面我们将基于自定义部署的Embedding服务,来演示如何进行长期记忆语义搜索。特别说明的是,代码仅供演示使用,实际使用可以参考下面代码,编写更规范的代码。
1.首先我们需要自己部署一个Embedding服务,这里我们以Qwen3-Embedding-4B为例。
2.创建自定义Embedding类,这个类需要继承自langchain.embeddings.base.Embeddings,这个类的作用是负责与Embedding服务进行通信。
import requests
from typing import List, Optional, Dict
from langchain.embeddings.base import Embeddings
class SelfAPIEmbeddings(Embeddings):
"""
一个自定义的 Embedding 类,用于调用自部署的 embedding API 服务。
"""
def __init__(self):
"""
初始化函数。
"""
self.token = ""
self.url = ""
self.model_id = ""
def _call_api(self, texts: List[str]) -> List[List[float]]:
"""
内部方法,用于调用 API。
*** 您需要根据您自己服务的实际 API 格式来修改这部分 ***
"""
try:
payload = {
'model': self.model_id,
'input': texts
}
headers = {
'Content-Type': 'application/json',
'Authorization': f'Bearer {self.token}'
}
response = requests.post(self.url, headers=headers, data=json.dumps(payload))
# 判断是否异常
if response.status_code != 200:
print(response.json())
exit()
res = response.json()
return res['data']
except requests.exceptions.RequestException as e:
print(f"Error calling embedding API: {e}")
# 可以选择返回空列表或重新抛出异常
raise e
def embed_documents(self, texts: List[str]) -> List[List[float]]:
"""
为文档列表生成 embeddings。
"""
if not texts:
return []
# 为了避免请求体过大,可以分批处理
# 这里为了简单起见,一次性发送所有文本
new_texts = []
for i in texts:
i = eval(i)
new_texts.append(i['text'])
res = self._call_api(new_texts)
new_res = []
for i in res:
new_res.append(i['embedding'])
return new_res
def embed_query(self, text: str) -> List[float]:
"""
为单个查询文本生成 embedding。
"""
if not text:
return []
result = self._call_api([text])
return result[0]['embedding']
3.将自定义的Embedding类集成到工作流中,通过在Store中配置index来启用语义搜索。
from langchain.embeddings import init_embeddings
from langgraph.store.memory import InMemoryStore
custom_embeddings = SelfAPIEmbeddings()
store = InMemoryStore(
index={
"embed": custom_embeddings,
"dims": 2560,
}
)
4.语义查询测试。
store.put(("user_123", "memories"), "1", {"text": "我喜欢吃披萨"})
store.put(("user_123", "memories"), "2", {"text": "我是一名程序员"})
items = store.search(
("user_123", "memories"), query="我肚子饿了", limit=1
)
输出如下,尽管查询没有”披萨“这个词,但是通过Embedding模型计算,知道披萨和饿了是相近的语义,因此成功检索出了相关的记忆。
[Item(namespace=['user_123', 'memories'], key='1', value={'text': '我喜欢吃披萨'}, created_at='2025-11-12T09:59:55.097931+00:00', updated_at='2025-11-12T09:59:55.097937+00:00', score=0.6804530799409887)]
随着对话的进行,短期记忆(对话历史)会不断增长,可能会超出LLM的上下文窗口,导致请求调用失败,或者使LLM反应变慢、变差。这时,就需要对记忆进行管理了。常见的解决办法有:
● 修剪消息(trim messages):移除前 N 条或后 N 条消息(在调用 LLM 之前)。最简单直接,但信息丢失严重,适合短期任务、无状态问答机器人、近期上下文最重要的应用。
● 删除消息(delete messages):从LangGraph状态中永久删除消息。可以精确的控制移除内容,但需要自定义逻辑来判断哪些消息需要删除,适合用于移除不再需要的冗余系统消息、工具输出或错误信息。
● 总结消息(summarize messages):汇总历史记录中的早期消息并将其替换为摘要。保留了核心语义信息,但计算成本高,实现相对复杂,适合用于长期连续对话、需要维持深度长期上下文的智能体。
● 自定义策略:例如消息过滤等。
管理对话历史的一个重要概念是限制传递给模型的消息数量,trim_messages就是LangChain提供的一个实用函数,它根据指定的策略、token限制、模型要求以及是否包含系统消息等来裁剪消息列表,它的主要目的是确保对话历史不会超出模型的上下文窗口大小。
它的解决策略是:当消息历史过长时,从开头或结尾丢弃一部分消息,以确保总长度符合限制。
from langchain_core.messages.utils import (
trim_messages,
count_tokens_approximately
)
from langchain.chat_models import init_chat_model
from langgraph.checkpoint.memory import InMemorySaver
from langgraph.graph import StateGraph, START, MessagesState
BASE_URL=""
TOKEN=""
MODEL_NAME=""
model = init_chat_model(
model=MODEL_NAME,
model_provider="openai",
base_url=BASE_URL,
api_key=TOKEN,
temperature=0,
)
summarization_model = model.bind(max_tokens=128)
def call_model(state: MessagesState):
# 保留最近消息,总 token ≤ 128
messages = trim_messages(
state["messages"],
strategy="last", # 保留最后的消息
token_counter=count_tokens_approximately,
max_tokens=128,
start_on="human", # 确保第一条消息(不包括系统消息)是从human消息开始保留
end_on=("human", "tool"), # 保留到human或tool消息为止
allow_partial=False, # 不允许分割消息内容
include_system=True # 保留system prompt
)
# --- 在这里打印传入模型的内容 ---
print("-" * 20)
print(f"Messages being sent to the model (trimmed to <= 128 tokens): {len(messages)}")
for msg in messages:
print(f" [{msg.type.upper()}]: {msg.content}")
print("-" * 20)
response = model.invoke(messages)
return {"messages": [response]}
checkpointer = InMemorySaver()
builder = StateGraph(MessagesState)
builder.add_node(call_model)
builder.add_edge(START, "call_model")
graph = builder.compile(checkpointer=checkpointer)
# 如果需要在agent中修剪,那么需要将pre_model_hook和trim_messages结合使用
"""
def call_model(state)
messages = trim_messages(...)
agent = create_react_agent(
model,
tools,
pre_model_hook=call_model,
checkpointer=checkpointer,
)
"""
发起请求如下:
config = {"configurable": {"thread_id": "1"}}
graph.invoke({"messages": "你好,我叫ada"}, config)
graph.invoke({"messages": "请写一首诗,关于小狗的"}, config)
graph.invoke({"messages": "再写一首关于小猫的"}, config)
final_response = graph.invoke({"messages": "我叫什么名字呢?"}, config)
输出打印如下:
--------------------
Messages being sent to the model (trimmed to <= 128 tokens): 1
[HUMAN]: 你好,我叫ada
--------------------
--------------------
Messages being sent to the model (trimmed to <= 128 tokens): 3
[HUMAN]: 你好,我叫ada
[AI]: 你好,Ada!很高兴认识你!😊
这是一个很美的名字呢!有什么我可以帮助你的吗?无论是想聊天、有问题需要解答,还是需要任何形式的帮助,我都很乐意为你服务。
[HUMAN]: 请写一首诗,关于小狗的
--------------------
--------------------
Messages being sent to the model (trimmed to <= 128 tokens): 5
[HUMAN]: 你好,我叫ada
[AI]: 你好,Ada!很高兴认识你!😊
这是一个很美的名字呢!有什么我可以帮助你的吗?无论是想聊天、有问题需要解答,还是需要任何形式的帮助,我都很乐意为你服务。
[HUMAN]: 请写一首诗,关于小狗的
[AI]: 好的,Ada!为你写一首关于小狗的可爱小诗,希望你喜欢:
**《小狗的约定》**
..... (此处省略诗的内容)
这首诗捕捉了小狗的活泼、忠诚和它们带给我们的温暖。你觉得怎么样?😊
[HUMAN]: 再写一首关于小猫的
--------------------
--------------------
Messages being sent to the model (trimmed to <= 128 tokens): 3
[HUMAN]: 再写一首关于小猫的
[AI]: 好的,Ada!这首关于小猫的诗,希望同样能带给你一丝轻盈与温柔:
**《小猫的遐想》**
..... (此处省略诗的内容)
希望你喜欢这首小诗!🐾
[HUMAN]: 我叫什么名字呢?
--------------------
模型最终的输出为:
可以看出,传递给模型的消息内容已经被裁剪,修剪的过程为:
1.保留系统消息,include_system=True
2.strategy="last",反转消息列表,以便从最新的消息开始处理
3.累积token数量,当达到max_tokens限制,那么进行修剪
4.修剪时,由于allow_partial=False,因此,保留的消息都是完整的;且start_on="human",所以修剪后第一条非system prompt是用户消息
虽然对传递给模型的历史消息进行了裁剪,但是查询state可以发现,历史记录仍被完整的保留在内存中,没有被删除。
print("\n" + "="*30)
print(" 查看 thread_id='1' 的完整对话历史")
print("="*30)
current_state = graph.get_state(config)
conversation_history = current_state.values["messages"]
for message in conversation_history:
print(f"[{message.type.upper()}]: {message.content}")
这种方法允许从状态中永久移除特定的消息。要删除消息,不能直接从状态的messages列表中移除,而是使用RemoveMessage函数,从graph state中直接删除消息来管理对话历史。为了让RemoveMessage生效,需要使用带有add_messages reducer的状态键,例如MessagesState。
删除特定消息:
from langchain_core.messages import RemoveMessage
from langchain.chat_models import init_chat_model
from langgraph.checkpoint.memory import InMemorySaver
from langgraph.graph import StateGraph, START, MessagesState
BASE_URL=""
TOKEN=""
MODEL_NAME=""
model = init_chat_model(
model=MODEL_NAME,
model_provider="openai",
base_url=BASE_URL,
api_key=TOKEN,
temperature=0,
)
def delete_messages(state):
messages = state["messages"]
if len(messages) > 2:
# 删除最早的两条消息
return {"messages": [RemoveMessage(id=m.id) for m in messages[:2]]}
def call_model(state: MessagesState):
response = model.invoke(state["messages"])
return {"messages": response}
builder = StateGraph(MessagesState)
builder.add_sequence([call_model, delete_messages])
builder.add_edge(START, "call_model")
checkpointer = InMemorySaver()
app = builder.compile(checkpointer=checkpointer)
config = {"configurable": {"thread_id": "1"}}
for event in app.stream(
{"messages": [{"role": "user", "content": "你好,我是ada"}]},
config,
stream_mode="values"
):
print([(message.type, message.content) for message in event["messages"]])
for event in app.stream(
{"messages": [{"role": "user", "content": "我叫什么名字"}]},
config,
stream_mode="values"
):
print([(message.type, message.content) for message in event["messages"]])
输出如下,当请求完成时,如果消息数量>2,那么最早的两条消息会被删除。
清空所有消息:
from langgraph.graph.message import REMOVE_ALL_MESSAGES
def clear_messages(state):
return {"messages": [RemoveMessage(id=REMOVE_ALL_MESSAGES)]}
builder = StateGraph(MessagesState)
builder.add_sequence([call_model, clear_messages])
builder.add_edge(START, "call_model")
checkpointer = InMemorySaver()
app = builder.compile(checkpointer=checkpointer)
config = {"configurable": {"thread_id": "1"}}
for event in app.stream(
{"messages": [{"role": "user", "content": "你好,我是ada"}]},
config,
stream_mode="values"
):
print([(message.type, message.content) for message in event["messages"]])
for event in app.stream(
{"messages": [{"role": "user", "content": "我叫什么名字"}]},
config,
stream_mode="values"
):
print([(message.type, message.content) for message in event["messages"]])
输出如下,请求完成后,会立即删除所有消息记录。
通过修剪、删除来管理历史消息,会有丢失信息的问题。为了避免这个问题,可以进行消息总结,也就是通过调用LLM对历史对话进行摘要,并将摘要作为新的上下文传入,以在减少消息数量的同时保留关键信息。
1.首先,安装LangMem,这是一个由LangChain维护的库,提供了用于在agent中管理记忆的工具。
pip install -U langmem
2.langmem库提供了一个预构建的SummarizationNode,可以极大地简化实现过程:
import tiktoken
from typing import Any, TypedDict
from langchain.chat_models import init_chat_model
from langchain_core.messages import AnyMessage, BaseMessage, SystemMessage
from langchain_core.prompts import ChatPromptTemplate, MessagesPlaceholder
from langgraph.graph import StateGraph, START, MessagesState
from langgraph.checkpoint.memory import InMemorySaver
from langmem.short_term import SummarizationNode, RunningSummary
summary_prompt = ChatPromptTemplate.from_messages(
[
MessagesPlaceholder(variable_name="messages"),
# 使用 HumanMessage 模拟用户在最后发出总结指令
("human", "请根据以上对话,生成一段简洁、连贯的中文摘要,注意不要丢失细节"),
]
)
update_summary_prompt = ChatPromptTemplate.from_messages(
[
MessagesPlaceholder(variable_name="messages"),
# 使用 HumanMessage 模拟用户在最后发出总结指令
("human", "以下是目前为止的对话摘要:{existing_summary}\n\n请根据以上新消息扩展此摘要:"),
]
)
BASE_URL=""
TOKEN=""
MODEL_NAME=""
model = init_chat_model(
model=MODEL_NAME,
model_provider="openai",
base_url=BASE_URL,
api_key=TOKEN,
temperature=0,
)
summarization_model = model.bind(max_tokens=128)
# count_tokens_approximately更适合英文分词,中文这里使用tiktoken来计算token数量
encoding = tiktoken.get_encoding("cl100k_base")
def count_tokens_accurately(messages: list[BaseMessage]) -> int:
"""使用 tiktoken 精确计算消息列表的 token 总数"""
# 注意:langmem 的 token_counter 期望的输入是消息列表
text_content = " ".join([msg.content for msg in messages if isinstance(msg.content, str)])
return len(encoding.encode(text_content))
class State(MessagesState):
context: dict[str, RunningSummary]
class LLMInputState(TypedDict):
summarized_messages: list[AnyMessage]
context: dict[str, RunningSummary]
summarization_node = SummarizationNode(
#token_counter=count_tokens_approximately,
token_counter=count_tokens_accurately, # 更换为自定义的token计算工具
model=summarization_model,
max_tokens=256,
max_tokens_before_summary=256,
max_summary_tokens=128,
initial_summary_prompt=summary_prompt, # 使用自定义prompt,默认为英文
existing_summary_prompt=update_summary_prompt
)
def call_model(state: LLMInputState):
response = model.invoke(state["summarized_messages"])
return {"messages": [response]}
checkpointer = InMemorySaver()
builder = StateGraph(State)
builder.add_node(call_model)
builder.add_node("summarize", summarization_node)
builder.add_edge(START, "summarize")
builder.add_edge("summarize", "call_model")
graph = builder.compile(checkpointer=checkpointer)
# Invoke the graph
config = {"configurable": {"thread_id": "1"}}
graph.invoke({"messages": "你好,我叫ada"}, config)
graph.invoke({"messages": "请写一首诗,关于小狗的"}, config)
graph.invoke({"messages": "再写一首关于小猫的"}, config)
final_response = graph.invoke({"messages": "我叫什么名字?"}, config)
final_response["messages"][-1].pretty_print()
# 检查摘要是否生成
if"running_summary"in final_response["context"]:
print("\n生成的摘要:", final_response["context"]["running_summary"].summary)
else:
print("\n对话较短,尚未生成摘要。")
输出如下:
如果需要在agent中实现的话,那么将SummarizationNode传入pre_model_hook即可。SummarizationNode会自动检查历史消息的长度,当token数量超过阈值时,触发一次总结,然后将”摘要 + 最新消息“的组合传递给model。其中,initial_summary_prompt用于第一次生成摘要时的prompt模板,existing_summary_prompt用于更新现有摘要的prompt模板,final_prompt是将摘要与剩余的消息合并后的prompt模板。
State中的context字段用于存储运行中的摘要信息, 避免在每次调用时都重复总结。
对有状态的agent的记忆进行检查、管理和重置,对于监控agent和提高用户使用体验都必不可少,LangGraph提供了以下的一些工具,用来对检查点进行管理。
查看最近的短期记忆,也就是最近的检查点的状态:
graph.get_state(config=config)
查看线程的所有短期记忆,会按时间顺序返回这个线程所有的历史检查点:
graph.get_state_history(config=config)
删除一个线程的所有短期记忆,一般用于重启对话场景:
checkpointer.delete_thread(thread_id)
本节将介绍如何基于Model Context Protocol(MCP)协议,使用LangGraph-Supervisor框架构建一个实用的、集成中断机制、有长记忆的多Agent系统。MCP是一种社区共建的AI开放协议,它标准化了应用向AI提供上下文的方式,极大简化了工具集成过程。
接下来,我们从头开始搭建multi-agent系统,模拟一个用户去进行旅游信息查询,并进行酒店预定,然后酒店管理侧可以查询用户的预定信息。整个Demo我们将展示Supervisor框架的搭建、人工介入、长短期记忆的应用等。
我们将逐步构建multi-agent工作流的每个组件,它包含三个子智能体,三个专门的 ReAct(推理和行动)子智能体,然后它们将组合起来创建一个包含额外步骤的多智能体工作流。
我们的工作流从以下开始:
1.human_input:用户输入;admin_input:管理员输入
2.supervisor协调三个子agent,根据input内容,选择合适的agent进行工作
3.当supervisor选择调用search_assistant的时候,那么查询信息,并将结果返回
4.当supervisor选择调用hotel_assistant的时候,那么把用户的预定信息,更新到Store中
5.当supervisor选择调用booking_info_assistant,会先进行verify_info,中断图的执行以请求管理员ID,当输入管理员ID后,接着判断管理员ID是否符合要求,如果不符合,那么不进行记忆查询,如果符合,则查询记忆,并返回。
pip install langchain-mcp-adapters
BASE_URL=""
TOKEN=""
MODEL_NAME=""
from langchain_openai import ChatOpenAI
from langchain.chat_models import init_chat_model
model = init_chat_model(
model=MODEL_NAME,
model_provider="openai",
base_url=BASE_URL,
api_key=TOKEN,
temperature=0,
)
from langgraph.store.memory import InMemoryStore
from langgraph.checkpoint.memory import InMemorySaver
store = InMemoryStore()
checkpointer = InMemorySaver()
from typing import List
from typing_extensions import TypedDict
from langchain_core.messages import BaseMessage
from langchain_core.runnables import RunnableConfig
from langchain_mcp_adapters.client import MultiServerMCPClient
from langgraph.prebuilt import create_react_agent
from langgraph.config import get_store
from langgraph.graph import StateGraph, END
from langgraph.types import interrupt
from langchain_core.messages import AIMessage, ToolMessage, HumanMessage
# 搜索功能
url = ''
TOKEN = ''
search_client = MultiServerMCPClient(
{
"other_search": {
"url": url,
"headers": {
"Authorization": f"Bearer {TOKEN}"
},
"transport": "sse"
}
}
)
search_tools = await search_client.get_tools()
search_agent = create_react_agent(
model,
search_tools,
name="search_assistant",
prompt="你是一个能搜索各种信息的助手。"
)
定义图节点之间流动的共享数据结构,将需要存储的记忆格式化。
class UserInfo(TypedDict):
user_id: str
hotel_name: str
date: str
num_guests: int
定义预定酒店子智能体,并将用户预定历史存储下来。
def book_hotel(user_info: UserInfo, config: RunnableConfig):
"""处理酒店预订并更新长期记忆"""
user_id = config["configurable"].get("user_id")
print(user_info)
hotel_name = user_info.get("hotel_name")
date = user_info.get("date")
num_guests = user_info.get("num_guests")
# 存储用户个人预订历史
namespace = ("user_bookings",)
user_bookings = store.get(namespace, user_id) or []
user_bookings.append(user_info)
store.put(namespace, user_id, user_bookings)
# 更新总预订计数
namespace = ("total_hotel_bookings",)
total_bookings = store.get(namespace, 'total_bookings_num') or 0
store.put(namespace, 'total_bookings_num', total_bookings + 1)
return f"成功为用户 {user_id} 预订了 {hotel_name},入住日期:{date},入住人数:{num_guests}"
book_hotel_agent = create_react_agent(
model=model,
tools=[book_hotel],
store=store,
name="hotel_assistant",
prompt="你是一个酒店预定助手。不需要用户ID、身份证号码、姓名和联系方式,就可以预定,请直接预定!"
)
用户的预定信息都是需要保密的,只有特定的管理员才可以进行查询,因此,需要设计中断机制,审核请求查询用户的权限是否符号要求,即:在正式查询前,先中断一下,要求输入管理员ID信息;等输入后,接着执行图,再去判断管理员ID信息是否符合要求;只有符合才能正常进行用户信息查询。
1.查询工具定义:
def query_booking_from_store(config: RunnableConfig) -> str:
"""根据用户ID从存储中查询酒店预订信息。"""
store = get_store()
user_id = config["configurable"].get("user_id")
booking_info = store.get(("user_bookings",), user_id)
if booking_info and booking_info.value:
return f"已找到预订信息:{str(booking_info.value)}"
else:
return "未找到该用户的预订信息"
2.定义子图的状态:
class SubgraphState(TypedDict):
messages: List[BaseMessage]
3.创建新的图节点,负责中断和验证:
def authentication_and_query_node(state: SubgraphState, config: RunnableConfig):
"""
这个节点首先中断图的执行以请求管理员ID,
然后在恢复后验证ID,并调用工具查询信息。
"""
# 核心:调用 interrupt() 来暂停图的执行
admin_input = interrupt("请输入管理员id,如需退出查询,请输入exit")
# 当图被恢复时,admin_input 将会获得传入的值
if admin_input == "exit":
result = "用户已退出查询。"
elif admin_input == "admin_123":
# 验证成功,调用真正的查询工具
result = query_booking_from_store(config)
else:
# 验证失败
result = f"没有权限查询:admin_id 不匹配 (输入为: '{admin_input}')"
return {"messages": [AIMessage(content=result)]}
4.构建包含中断节点的子agent:
query_workflow = StateGraph(SubgraphState)
query_workflow.add_node("auth_and_query", authentication_and_query_node)
query_workflow.set_entry_point("auth_and_query")
query_workflow.add_edge("auth_and_query", END)
booking_query_subgraph = query_workflow.compile(checkpointer=checkpointer, store=store)
# 为子图命名,以便Supervisor可以调用它
booking_query_subgraph.name = "booking_info_assistant"
基于上述组件,我们将构建一个完整的Supervisor架构工作流。
from langgraph_supervisor import create_supervisor
workflow = create_supervisor(
[search_agent, book_hotel_agent, booking_query_subgraph],
model=model,
prompt=(
"您是团队主管,负责管理信息搜索助手、酒店预订助手、以及用户信息查询助手。"
"如需搜索各种信息,请交由 search_assistant 处理。"
"如需预定酒店,请交由 hotel_assistant 处理。"
"如需查询用户的酒店预定信息,请交由 booking_info_assistant 处理。"
"**注意**,你每次只能调用一个助理agent!"
),
)
supervisor = workflow.compile(checkpointer=checkpointer, store=store)
config = {"configurable": {"thread_id": "1", "user_id": "user_123"}}
print("--- 第一次交互 :查询北京火锅店---")
async for chunk in supervisor.astream(
{"messages": [("user", "北京最出名的老北京火锅是哪家?")]},
config
):
# 打印每个数据块的内容
for key, value in chunk.items():
print(f"Node: '{key}'")
if value:
print(" value:")
print(value)
print("----")
输出:
北京最出名的老北京火锅有很多家,其中比较有名的包括:\n\n1. **东来顺**:东来顺是北京最著名的老北京涮羊肉火锅店之一,历史悠久,以其独特的涮羊肉和秘制的调料而闻名。\n\n2. **南门涮肉**:南门涮肉也是一家老字号的火锅店,以其传统的涮羊肉和地道的北京风味而受到欢迎。\n\n3. **老北京涮肉馆**:这是一家专注于传统老北京涮肉的火锅店,以其正宗的口味和优质的服务而受到食客的喜爱。\n\n4. **聚宝源**:聚宝源是一家以涮羊肉为主的火锅店,以其新鲜的食材和独特的调料而受到欢迎。\n\n这些火锅店都有各自的特色和忠实的顾客群体,您可以根据自己的口味和需求选择合适的火锅店。
print("--- 第二次交互 :根据上一步推荐的火锅店查询酒店---")
async for chunk in supervisor.astream(
{"messages": [("user", "那第一个推荐的火锅店附近有哪些酒店呀")]},
config
):
# 打印每个数据块的内容
for key, value in chunk.items():
print(f"Node: '{key}'")
if value:
print(" value:")
print(value)
print("----")
输出:
东来顺火锅店位于北京市东城区东华门大街。以下是东来顺火锅店附近的一些酒店推荐:\n\n1. **北京王府井希尔顿酒店**:这是一家豪华酒店,距离东来顺火锅店步行约10分钟,提供高品质的住宿和服务。\n\n2. **北京东方君悦大酒店**:这家五星级酒店位于王府井大街,距离东来顺火锅店步行约15分钟,设施齐全,服务优质。\n\n3. **北京华尔道夫酒店**:这是一家高端酒店,距离东来顺火锅店步行约10分钟,提供豪华的住宿体验和优质的服务。\n\n4. **北京诺富特和平宾馆**:这家四星级酒店位于王府井大街,距离东来顺火锅店步行约15分钟,性价比较高,适合商务和休闲旅行。\n\n5. **北京天伦王朝酒店**:这家四星级酒店距离东来顺火锅店步行约10分钟,提供舒适的住宿环境和便捷的交通。\n\n这些酒店都位于东来顺火锅店附近,您可以根据自己的需求和预算选择合适的酒店。
可以看到,supervisor记得上文中提到过的第一个推荐的火锅店,这是短期记忆的典型应用。
print("--- 第三次交互 :预定酒店---")
async for chunk in supervisor.astream(
{"messages": [("user", "帮我预定北京王府井希尔顿酒店酒店,预定日期:2025-11-13到2025-11-14,入住人数1")]},
config
):
for key, value in chunk.items():
print(f"Node: '{key}'")
if value:
print(" value:")
print(value)
print("----")
输出:
已成功为您预订了北京王府井希尔顿酒店,入住日期为2025年11月13日至2025年11月14日,入住人数为1人。祝您旅途愉快!
成功调用预定酒店助手,并完成酒店预定。
print("--- 第四次交互 :管理员查询预定信息---")
config = {"configurable": {"thread_id": "2", "user_id": "user_123"}} # 更换管理员操作线程
interrupt_data = None
interrupt_input = None
print("--- 第一次运行,将会触发中断 ---")
async for chunk in supervisor.astream(
{"messages": [("user", "查询用户预定酒店信息")]},
config,
):
for key, value in chunk.items():
print(f"Node: '{key}'")
if value:
print(" value:")
print(value)
if key == "__interrupt__":
print("\n======= 图已成功中断!=======")
interrupt_data = value[0]
print(f"中断信息: {interrupt_data.value}")
break
if interrupt_data:
break
print("----")
if interrupt_data:
# 模拟管理员输入正确的密码
interrupt_input = Command(resume="admin_123")
# 如果想测试错误的密码,可以使用下面这行
#interrupt_input = Command(resume="wrong_password")
print(f"\n--- 接收到中断输入 '{interrupt_input}',继续执行图 ---")
# 恢复图的执行
async for chunk in supervisor.astream(
interrupt_input,
config,
):
for key, value in chunk.items():
print(f"Node: '{key}'")
if value:
print(" value:")
print(value)
print("----")
第一次运行,触发中断并输出:
请输入管理员id,如需退出查询,请输入exit
接着,程序模拟管理员输入正确的ID后,继续执行图,booking_info_assistant查询到长期记忆,并输出:
已找到预订信息:[{'user_id': 'user_123', 'hotel_name': '北京王府井希尔顿酒店', 'date': '2025-11-13到2025-11-14', 'num_guests': 1}]
最终,supervisor返回最终的结果:
用户预定的酒店信息如下:\n- 用户ID: user_123\n- 酒店名称: 北京王府井希尔顿酒店\n- 预定日期: 2025-11-13到2025-11-14\n- 客人数: 1
在上述的示例(预定酒店)中,我们仅将用户预定信息直接进行存储、检索,但对于supervisor来说,记住和不同用户的过往交互是非常重要的,在后续的交互中,才能针对不同的用户,给出更合适的回应。让Agent能自主决定记忆的存储、遗忘、更新和检索优先级,才能真正模拟人类的记忆过程。因此,未来需要改进设计更智能的记忆管理策略。
在上述示例(预定酒店)中,我们选择了Supervisor架构进行实现,但这显然存在缺陷,管理员系统不应该和用户系统使用同一个中央智能体,当系统功能越来越完善时,这样的设计会使得supervisor非常繁杂、且难以维护,Supervisor架构更适合需要明确控制流程和集中决策的场景。融合记忆功能的Multi-Agent系统可以根据应用场景选择更合适的架构,例如Hierarchical架构,可以用于不同层级的记忆服务于不同目的(个体、团队、全局)的场景;Custom架构,预先定义好各个Agent的记忆走向,构建更灵活的系统。
腾讯小Q介绍
小Q是官方QQ助手,集成了腾讯混元、DeepSeek等前沿大模型技术,具备强大的多模态交互与复杂任务处理能力。
[1] Chhikara P, Khant D, Aryan S, et al. Mem0: Building production-ready ai agents with scalable long-term memory[J]. arXiv preprint arXiv:2504.19413, 2025.
[2] Langchain Docs
福利时刻
53AI,企业落地大模型首选服务商
产品:场景落地咨询+大模型应用平台+行业解决方案
承诺:免费POC验证,效果达标后再合作。零风险落地应用大模型,已交付160+中大型企业
2025-12-04
Agentic RAG这样用LangChain解决复杂问题
2025-12-01
Deep Agent 进化论:基于文件系统的 Context Engineering 深度解析
2025-11-27
langgraph 1.0.4 最新发布:功能优化与修复详解
2025-11-25
LangChain 最新agent框架deepagents测评:长任务友好,高可控
2025-11-25
被 LangChain 全家桶搞晕了?LangGraph、LangSmith、LangFlow 一文读懂
2025-11-21
如何用 LangGraph 构建高效的 Agentic 系统
2025-11-19
LangChain v1.0 模型选型:静态还是动态?一文看懂 Agent 的正确打开方式
2025-11-18
LangChain 1.0 变革
2025-09-13
2025-09-21
2025-11-03
2025-10-23
2025-10-19
2025-10-31
2025-11-06
2025-09-12
2025-11-05
2025-09-19
2025-11-03
2025-10-29
2025-07-14
2025-07-13
2025-07-05
2025-06-26
2025-06-13
2025-05-21