微信扫码
添加专属顾问
我要投稿
备注:这是春节期间研究的一个笔记,先当个存稿发出来,为这一些系列加热一下。
相信大家对langchain都很熟悉了,看起来有些复杂,实则很简单的一个LLM应用开发框架。我从去年初,刚开始做RAG项目时开始接触这个框架,那时的langchain还做的相对较轻,框架的设计思想和编程技巧也用的比较友好,当时我们的项目为了跟langchain接口兼容,对一些接口做了重载实现,以适应我们项目的一些额外需求。最近又重新学习了langchain的一些资料,发现变化非常大,其中LangGraph借鉴了图的思路,可以用于构建一些带流程的LLM应用,如multi agents,也可以用于构建rag应用,实现更灵活的业务逻辑。因此,我打算写几篇系列来学习总结一下这个LangGraph。
LangGraph是一套在langchain框架之上的一套开发组件,可以使用LCEL(Langchain Expression Language)轻松的开发带有状态的,可控循环流程的LLM应用,例如多agent应用。它借鉴了NetworkX框架的设计思路,把一个应用流程定义成一个图,其中节点(node)可以代表一个agent tool,或者一次function call,亦或者一次大模型调用。而边(edge),则代表节点的执行顺序(数据流向),并且可以通过设置conditional edge,来控制流程分支。
我们要实现一个能够判断用户意图是闲聊还是知识库问答(QA)场景的RAG应用。我们可以通过以下的流程来实现。
State 将用于Graph图中,用来保存整个流程图中的消息,可以理解为节点之间通信的消息list。每个节点在被执行的时候,会传入这个消息列表参数,节点可以取出里面的消息然后进行处理,处理完成之后,只需按 {"messages": [response]}这种格式作为函数返回值,就会自动的append进这个消息列表中,从而继续被下一次执行的节点当作参数传入。这个State也可以存放其他的变量,不仅仅是messsage
import operatorfrom typing import Annotated, Sequence, TypedDictfrom langchain_core.messages import BaseMessageclass AgentState(TypedDict):messages: Annotated[Sequence[BaseMessage], operator.add]
这里把知识库检索当作一个工具调用,意图识别节点根据用户请求判断是否需要调用工具,如果需要则通过function call的方式来实现知识库检索。为了简化测试,我找了一篇《中华人民共和国道路交通安全法实施条例》网页作为知识内容加载到Chroma向量数据库中,使用最基本的文本分片建索引。
from langchain.text_splitter import RecursiveCharacterTextSplitterfrom langchain_community.document_loaders import WebBaseLoaderfrom langchain_community.vectorstores import Chromafrom langchain_openai import OpenAIEmbeddingsfrom langchain.tools.retriever import create_retriever_toolfrom langgraph.prebuilt import ToolExecutor## 入库的知识内容urls = ["https://www.gov.cn/gongbao/content/2019/content_5468932.htm",]docs = [WebBaseLoader(url).load() for url in urls]docs_list = [item for sublist in docs for item in sublist]text_splitter = RecursiveCharacterTextSplitter.from_tiktoken_encoder(chunk_size=500, chunk_overlap=50)doc_splits = text_splitter.split_documents(docs_list)## 文档加入到向量数据库中vectorstore = Chroma.from_documents(documents=doc_splits,collection_name="rag-chroma",embedding=OpenAIEmbeddings(),)##创建retriever对象retriever = vectorstore.as_retriever()## 创建tooltool = create_retriever_tool(retriever,"retrieve_transportation_rules","搜索并返回中国道路交通安全管理条例相关的问题",)tools = [tool]tool_executor = ToolExecutor(tools)
节点代表着一个执行步骤,可以是一次大模型调用,也可以是一次函数调用。
先导包
import jsonimport operatorfrom typing import Annotated, Sequence, TypedDictfrom langchain import hubfrom langchain.output_parsers import PydanticOutputParserfrom langchain.prompts import PromptTemplatefrom langchain.tools.render import format_tool_to_openai_functionfrom langchain_core.utils.function_calling import convert_to_openai_toolfrom langchain_core.messages import BaseMessage, FunctionMessagefrom langchain.output_parsers.openai_tools import PydanticToolsParserfrom langchain_core.pydantic_v1 import BaseModel, Fieldfrom langchain_openai import ChatOpenAIfrom langgraph.prebuilt import ToolInvocationfrom langchain_core.output_parsers import StrOutputParserfrom langchain_core.utils.function_calling import convert_to_openai_functionfrom langchain_core.prompts import ChatPromptTemplate
定义一个意图识别agent节点,这个agent会调用大模型判断是否需要调用知识检索工具retriever。
def intention_agent(state):"""Args:state (messages): The current stateReturns:dict: The updated state with the agent response apended to messages"""print("---CALL intention detecting---")messages = state["messages"]functions = [convert_to_openai_function(t) for t in tools]model = ChatOpenAI(temperature=0, streaming=True)model = model.bind_functions(functions)response = model.invoke(messages)# We return a list, because this will get added to the existing listreturn {"messages": [response]}
定义一个知识检索节点,这个节点会调用调用知识检索工具
def retrieve(state):"""Args:state (messages): The current stateReturns:dict: The updated state with retrieved docs"""print("---EXECUTE RETRIEVAL---")messages = state["messages"]# 最后一条消息,一定来自于上一个节点的输出消息,包含function call所需要的参数last_message = messages[-1]# 从消息里取出function call的函数名和参数,构建一个ToolInvocationaction = ToolInvocation(tool=last_message.additional_kwargs["function_call"]["name"],tool_input=json.loads(last_message.additional_kwargs["function_call"]["arguments"]),)# 执行调用结果,并最终返回FunctionMessage对象response = tool_executor.invoke(action)function_message = FunctionMessage(content=str(response), name=action.tool)return {"messages": [function_message]}
定义一个QA 知识问答生成节点,用于将query,召回的知识context,组合成prompt扔给大模型回答。
# 定义一个QA repsonse生成节点def generate(state):print("---GENERATE---")messages = state["messages"]question = messages[0].contentlast_message = messages[-1]question = messages[0].contentdocs = last_message.content# Promptprompt = hub.pull("rlm/rag-prompt")model = ChatOpenAI(temperature=0, streaming=True)# Chainrag_chain = prompt | model | StrOutputParser()# Runresponse = rag_chain.invoke({"context": docs, "question": question})return {"messages": [response]}
再定义一个闲聊问答生成节点,用于将query直接扔给大模型回答。
# 定义一个chat response生成节点def chat_generate(state):print("---CHAT---")messages = state["messages"]question = messages[0].contentlast_message = messages[-1]question = messages[0].content# Promptprompt = ChatPromptTemplate.from_messages([("system", "You are helpful assistant."),("user", "{question}")])# Chainrag_chain = prompt | model | StrOutputParser()# Runresponse = rag_chain.invoke({"question": question})return {"messages": [response]}
边edge相当于节点之间的执行顺序。有两种,一种是普通edge,还有一种条件edge。前者就是用于组织两个节点的上下游关系,后者是加上分支条件,可以根据自定义的条件判断函数的返回结果连接不同的节点,类似于switch的功能。
我们先加一个条件判断函数,用于conditional edge的判断
# 判断是否是走知识问答def should_qa(state):print("---INTENTION DETECTION---")messages = state["messages"]last_message = messages[-1]# If there is no function call, then we finishif "function_call" not in last_message.additional_kwargs:print("---DECISION: chat---")return "chat"# Otherwise there is a function call, so we continueelse:print("---DECISION: qa---")return "qa"
图的定义逻辑是:
先创建一个状态图对象,构造参数是第一步创建的状态类。
from langgraph.graph import END, StateGraph# 定义一个图对象workflow = StateGraph(AgentState)
使用add_node 方法往加入节点
# 往图里加入节点workflow.add_node("intention_agent", intention_agent) # intention_agentworkflow.add_node("retrieve", retrieve) # retrievalworkflow.add_node("generate", generate) # generateworkflow.add_node("chat_generate", chat_generate) # generate
使用 set_entry_point 方法设置流程的起点
# 设置初始起点workflow.set_entry_point("intention_agent")
使用 add_conditional_edges 方法来设置条件判断的分支
# 加入条件边,通过should_qa的返回值来判断下一个节点workflow.add_conditional_edges("intention_agent", ##上游节点should_qa, ## 判断函数{"qa": "retrieve", ##根据判断函数的返回值,调用不同的下游节点"chat": "chat_generate",},)
使用 add_edge 方法来设置上下游节点的关系,END节点代表流程的结束
# retrieve -> generate 节点的流向workflow.add_edge("retrieve", "generate")# generate ->end 节点的流向workflow.add_edge("generate", END)# chat_generate ->end 节点的流向workflow.add_edge("chat_generate", END)
最后compile 一下
# Compileapp = workflow.compile()
from langchain_core.messages import HumanMessageinputs = {"messages": [HumanMessage(content="⼩型微型⾮营运载客汽车的年检规则是?")]}for output in app.stream(inputs):for key, value in output.items():if key in ['generate','chat_generate']:print(f"Output from node '{key}':")print(value['messages'][0])
通过打印输出,看到整个调用步骤,最终使用QA问答,从“generate”节点生成了答复。
我们换一个问题“电动汽车有什么优势?”,可以看到,这时走的时chat,从“chat_generate”生成的答复
本文对LangGraph的使用有一个基本介绍,并且用它来实现一个带意图判断的RAG应用。跟传统的方式比起来,使用LangGraph来开发业务,可以用工作流的模式来形象思考,先设计DAG图,定义好节点和边,然后再写代码实现节点和边就行,从业务流程到转换为代码实现的过程非常直观,代码结构也相当更清晰。特别是如果我们构建一个流程相对比较复杂的,Multi-Agent应用,里面需要用到顺序,循环,分叉等各种复杂的流程,这时候用图的构建方式来做就会体现出很大的优势。都说一张图胜过千言万语,所以个人还是很看好这种框架,近期准备采用LangGraph做一个相对复杂的Multi-Agent应用,后续还会研究几个更复杂的场景,出几篇文章总结,感谢各位看官点赞?
53AI,企业落地大模型首选服务商
产品:场景落地咨询+大模型应用平台+行业解决方案
承诺:免费POC验证,效果达标后再合作。零风险落地应用大模型,已交付160+中大型企业
2025-08-21
2025-08-20
2025-09-07
2025-08-21
2025-08-19
2025-08-05
2025-09-16
2025-10-02
2025-08-20
2025-09-08