微信扫码
添加专属顾问
我要投稿
用LangGraph构建高级RAG智能体,实现智能查询重写、主题验证和自适应优化,轻松应对复杂对话场景。 核心内容: 1. 传统RAG系统的局限性分析 2. 高级RAG智能体的多阶段工作流程设计 3. 基于LangGraph的具体实现步骤与环境配置
在本文中我们将介绍如何使用 LangGraph 实现复杂的 RAG 智能体。该智能体能够重写用户问题、对其进行分类、验证文档相关性,甚至在最终优雅放弃前,还能用优化后的查询进行重试。
在我们深入构建高级 RAG 智能体之前,重新审视如何将 RAG 用作 LangGraph 智能体中的工具会很有帮助。
传统的 RAG(检索增强生成)系统适用于简单的问题,但难以处理复杂的对话场景。当用户提出后续问题时会发生什么,例如“定价怎么样?这些依赖于上下文的查询经常失败,因为系统缺乏对话记忆和智能查询处理。
今天,我们将构建一个高级 RAG 智能体,通过以下方式解决这些挑战:
让我们使用真实场景逐步构建此系统:一个技术支持的知识库。
我们的高级 RAG 智能体实施了复杂的多阶段工作流程:
这创建了一个强大的系统,可以处理复杂的对话,同时保持质量和相关性。
uv
我们使用快速的 Python 包管理器uv
来快速设置环境。
uv venv rag-envsource rag-env/bin/activate
创建一个名为rag-env虚拟环境
并激活它的。
现在,安装此 RAG 智能体的核心依赖项:
uv pip install \\ langchain \\ langgraph \\ langchain-google-genai \\ langchain-community \\ python-dotenv \\ jupyterlab \\ ipykernel
要将您的rag-env
作为 Jupyter 内核:
python -m ipykernel install --user --name=rag-env --display-name "RAG Agent (uv)"
现在,您可以在 Jupyter Notebook 或 JupyterLab 中选择 RAG 代理 (uv) 作为内核。
接下来,在项目根目录中创建一个.env
文件并添加您的 Gemini API 密钥:
GOOGLE_API_KEY=your_google_gemini_api_key_here
from dotenv import load_dotenv
load_dotenv()
# Core LangChain components
from langchain.schema import Document
from langchain_google_genai import ChatGoogleGenerativeAI
from langchain_huggingface import HuggingFaceEmbeddings
from langchain_community.vectorstores import Chroma
from langchain_core.prompts import ChatPromptTemplate
# Graph and state management
from typing import TypedDict, List
from langchain_core.messages import BaseMessage, HumanMessage, AIMessage, SystemMessage
from pydantic import BaseModel, Field
from langgraph.graph import StateGraph, END
from langgraph.checkpoint.memory import MemorySaver
技术栈基本原理:
我们为“ TechFlow Solutions”创建一个全面的技术支持的知识库:
# Initialize our embedding model
embedding_model = HuggingFaceEmbeddings(model_name="all-MiniLM-L6-v2")
# Initialize our embedding model
embedding_model = HuggingFaceEmbeddings(model_name="all-MiniLM-L6-v2")
# Create comprehensive technology support knowledge base
knowledge_documents = [
Document(
page_content="TechFlow Solutions offers three main service tiers: Basic Support ($29/month) includes email support and basic troubleshooting, Professional Support ($79/month) includes priority phone support and advanced diagnostics, Enterprise Support ($199/month) includes 24/7 dedicated support and custom integrations.",
metadata={"source": "pricing_guide.pdf", "category": "pricing"},
),
Document(
page_content="Our cloud infrastructure services include: Virtual Private Servers starting at $15/month, Managed Databases from $45/month, Content Delivery Network at $0.08/GB, and Load Balancing services at $25/month. All services include 99.9% uptime guarantee.",
metadata={"source": "infrastructure_pricing.pdf", "category": "services"},
),
Document(
page_content="TechFlow Solutions was founded in 2018 by Maria Rodriguez, a former Google engineer with 15 years of experience in cloud architecture. The company has grown from 3 employees to over 150 team members across 12 countries, specializing in enterprise cloud solutions.",
metadata={"source": "company_history.pdf", "category": "company"},
),
Document(
page_content="Our technical support team operates 24/7 for Enterprise customers, business hours (9 AM - 6 PM EST) for Professional customers, and email-only support for Basic customers. Average response times: Enterprise (15 minutes), Professional (2 hours), Basic (24 hours).",
metadata={"source": "support_procedures.pdf", "category": "support"},
)
]
# Build vector database
vector_store = Chroma.from_documents(knowledge_documents, embedding_model)
document_retriever = vector_store.as_retriever(search_kwargs={"k": 2})
知识库设计:
我们的智能体使用复杂的状态管理来跟踪对话流:
class ConversationState(TypedDict): conversation_history: List[BaseMessage] # Full conversation thread retrieved_documents: List[Document] # Current retrieved documents topic_relevance: str # On-topic or off-topic classification enhanced_query: str # Reformulated question should_generate: bool # Whether to proceed with answer generation optimization_attempts: int # Number of query refinement attempts current_query: HumanMessage # User's current question
状态架构的好处:
def enhance_user_query(state: ConversationState):
"""
Reformulates user questions based on conversation history to create
standalone queries optimized for vector search.
"""
print(f"Enhancing query: {state['current_query'].content}")
# Initialize state for new query processing
state["retrieved_documents"] = []
state["topic_relevance"] = ""
state["enhanced_query"] = ""
state["should_generate"] = False
state["optimization_attempts"] = 0
# Ensure conversation history exists
if "conversation_history" not in state or state["conversation_history"] is None:
state["conversation_history"] = []
# Add current query to history if not already present
if state["current_query"] not in state["conversation_history"]:
state["conversation_history"].append(state["current_query"])
# Check if we have conversation context
if len(state["conversation_history"]) > 1:
# Extract context and current question
previous_messages = state["conversation_history"][:-1]
current_question = state["current_query"].content
# Build context-aware prompt
context_messages = [
SystemMessage(
content="""You are an expert query reformulator. Transform the user's question into a standalone,
search-optimized query that incorporates relevant context from the conversation history.
Guidelines:
- Make the question self-contained and clear
- Preserve the user's intent while adding necessary context
- Optimize for vector database retrieval
- Keep the reformulated query concise but comprehensive"""
)
]
context_messages.extend(previous_messages)
context_messages.append(HumanMessage(content=f"Current question: {current_question}"))
# Generate enhanced query
enhancement_prompt = ChatPromptTemplate.from_messages(context_messages)
llm = ChatGoogleGenerativeAI(model="gemini-2.0-flash-exp", temperature=0.1)
formatted_prompt = enhancement_prompt.format()
response = llm.invoke(formatted_prompt)
enhanced_question = response.content.strip()
print(f"Enhanced query: {enhanced_question}")
state["enhanced_query"] = enhanced_question
else:
# First question in conversation - use as-is
state["enhanced_query"] = state["current_query"].content
print(f"First query - using original: {state['enhanced_query']}")
return state
增强策略:
class TopicRelevance(BaseModel):
"""Structured output for topic classification"""
classification: str = Field(
description="Is the question about TechFlow Solutions services/pricing/company? Answer 'RELEVANT' or 'IRRELEVANT'"
)
confidence: str = Field(
description="Confidence level: 'HIGH', 'MEDIUM', or 'LOW'"
)
def validate_topic_relevance(state: ConversationState):
"""
Determines if the user's question is within our knowledge domain.
Uses the enhanced query for better classification accuracy.
"""
print("Validating topic relevance...")
classification_prompt = SystemMessage(
content="""You are a topic classifier for TechFlow Solutions support system.
RELEVANT topics include:
- TechFlow Solutions services (cloud infrastructure, migration, DevOps)
- Pricing for any TechFlow Solutions products or services
- Company information (history, team, locations)
- Support procedures and response times
- Security and compliance features
- Technical specifications and capabilities
IRRELEVANT topics include:
- General technology questions not specific to TechFlow
- Other companies' products or services
- Personal questions unrelated to business
- Weather, news, or general knowledge queries
Classify based on the enhanced query which incorporates conversation context."""
)
user_question = HumanMessage(
content=f"Enhanced query to classify: {state['enhanced_query']}"
)
# Create classification chain
classification_chain = ChatPromptTemplate.from_messages([classification_prompt, user_question])
llm = ChatGoogleGenerativeAI(model="gemini-2.0-flash-exp", temperature=0)
structured_llm = llm.with_structured_output(TopicRelevance)
classifier = classification_chain | structured_llm
result = classifier.invoke({})
state["topic_relevance"] = result.classification.strip()
print(f"Topic classification: {state['topic_relevance']} (Confidence: {result.confidence})")
return state
分类优势:
def fetch_relevant_content(state: ConversationState):
"""
Retrieves documents from the knowledge base using the enhanced query.
"""
print("Fetching relevant documents...")
# Use enhanced query for better retrieval
retrieved_docs = document_retriever.invoke(state["enhanced_query"])
print(f"Retrieved {len(retrieved_docs)} documents")
for i, doc in enumerate(retrieved_docs):
print(f" Document {i+1}: {doc.page_content[:50]}...")
state["retrieved_documents"] = retrieved_docs
return state
class DocumentRelevance(BaseModel):
"""Structured output for document relevance assessment"""
relevance: str = Field(
description="Is this document relevant to answering the question? Answer 'RELEVANT' or 'IRRELEVANT'"
)
reasoning: str = Field(
description="Brief explanation of why the document is relevant or irrelevant"
)
def assess_document_relevance(state: ConversationState):
"""
Evaluates each retrieved document to determine if it's relevant
for answering the user's question.
"""
print("Assessing document relevance...")
assessment_prompt = SystemMessage(
content="""You are a document relevance assessor. Evaluate whether each document
contains information that can help answer the user's question.
A document is RELEVANT if it contains:
- Direct answers to the question
- Supporting information that contributes to a complete answer
- Context that helps understand the topic
A document is IRRELEVANT if it:
- Contains no information related to the question
- Discusses completely different topics
- Provides no value for answering the question
Be strict but fair in your assessment."""
)
llm = ChatGoogleGenerativeAI(model="gemini-2.0-flash-exp", temperature=0)
structured_llm = llm.with_structured_output(DocumentRelevance)
relevant_documents = []
for i, doc in enumerate(state["retrieved_documents"]):
assessment_query = HumanMessage(
content=f"""Question: {state['enhanced_query']}
Document to assess:
{doc.page_content}
Is this document relevant for answering the question?"""
)
assessment_chain = ChatPromptTemplate.from_messages([assessment_prompt, assessment_query])
assessor = assessment_chain | structured_llm
result = assessor.invoke({})
print(f"Document {i+1}: {result.relevance} - {result.reasoning}")
if result.relevance.strip().upper() == "RELEVANT":
relevant_documents.append(doc)
# Update state with filtered documents
state["retrieved_documents"] = relevant_documents
state["should_generate"] = len(relevant_documents) > 0
print(f"Final relevant documents: {len(relevant_documents)}")
return state
质量控制优势:
def generate_contextual_response(state: ConversationState):
"""
Generates final response using conversation history and relevant documents.
"""
print("Generating contextual response...")
if "conversation_history" not in state or state["conversation_history"] is None:
raise ValueError("Conversation history is required for response generation")
# Extract components for response generation
conversation_context = state["conversation_history"]
relevant_docs = state["retrieved_documents"]
enhanced_question = state["enhanced_query"]
# Create comprehensive response template
response_template = """You are a knowledgeable TechFlow Solutions support agent. Generate a helpful,
accurate response based on the conversation history and retrieved documents.
Guidelines:
- Use information from the provided documents to answer the question
- Maintain conversation context and refer to previous exchanges when relevant
- Be conversational and helpful in tone
- If the documents don't fully answer the question, acknowledge limitations
- Provide specific details when available (prices, timeframes, etc.)
Conversation History:
{conversation_history}
Retrieved Knowledge:
{document_context}
Current Question: {current_question}
Generate a helpful response:"""
response_prompt = ChatPromptTemplate.from_template(response_template)
llm = ChatGoogleGenerativeAI(model="gemini-2.0-flash-exp", temperature=0.3)
# Create response generation chain
response_chain = response_prompt | llm
# Generate response
response = response_chain.invoke({
"conversation_history": conversation_context,
"document_context": relevant_docs,
"current_question": enhanced_question
})
generated_response = response.content.strip()
# Add response to conversation history
state["conversation_history"].append(AIMessage(content=generated_response))
print(f"Generated response: {generated_response[:100]}...")
return state
def optimize_search_query(state: ConversationState):
"""
Refines the search query when initial retrieval doesn't yield relevant results.
Includes loop prevention to avoid infinite optimization cycles.
"""
print("Optimizing search query...")
current_attempts = state.get("optimization_attempts", 0)
# Prevent infinite optimization loops
if current_attempts >= 2:
print("⚠Maximum optimization attempts reached")
return state
current_query = state["enhanced_query"]
optimization_prompt = SystemMessage(
content="""You are a search query optimizer. The current query didn't retrieve relevant documents.
Create an improved version that:
- Uses different keywords or synonyms
- Adjusts the query structure for better matching
- Maintains the original intent while improving searchability
- Considers alternative ways to express the same concept
Provide only the optimized query without explanations."""
)
optimization_request = HumanMessage(
content=f"Current query that needs optimization: {current_query}"
)
optimization_chain = ChatPromptTemplate.from_messages([optimization_prompt, optimization_request])
llm = ChatGoogleGenerativeAI(model="gemini-2.0-flash-exp", temperature=0.2)
formatted_prompt = optimization_chain.format()
response = llm.invoke(formatted_prompt)
optimized_query = response.content.strip()
# Update state
state["enhanced_query"] = optimized_query
state["optimization_attempts"] = current_attempts + 1
print(f"Optimized query (attempt {current_attempts + 1}): {optimized_query}")
return state
def route_by_topic(state: ConversationState):
"""Routes based on topic relevance classification"""
print("Routing based on topic relevance...")
relevance = state.get("topic_relevance", "").strip().upper()
if relevance == "RELEVANT":
print(" → Proceeding to content retrieval")
return "fetch_content"
else:
print(" → Routing to off-topic handler")
return "handle_off_topic"
def route_by_document_quality(state: ConversationState):
"""Routes based on document relevance assessment"""
print("Routing based on document quality...")
optimization_attempts = state.get("optimization_attempts", 0)
if state.get("should_generate", False):
print(" → Generating response with relevant documents")
return "generate_response"
elif optimization_attempts >= 2:
print(" → Maximum optimization attempts reached")
return "handle_no_results"
else:
print(" → Optimizing query for better results")
return "optimize_query"
# Helper functions for edge cases
def handle_off_topic_queries(state: ConversationState):
"""Handles queries outside our knowledge domain"""
print("Handling off-topic query...")
if "conversation_history" not in state or state["conversation_history"] is None:
state["conversation_history"] = []
off_topic_response = """I'm specialized in helping with TechFlow Solutions services, pricing, and company information.
Your question seems to be outside my area of expertise.
I can help you with:
- Our cloud infrastructure services and pricing
- Support procedures and response times
- Company information and team details
- Security and compliance features
Is there something specific about TechFlow Solutions I can help you with?"""
state["conversation_history"].append(AIMessage(content=off_topic_response))
return state
def handle_no_relevant_results(state: ConversationState):
"""Handles cases where no relevant documents are found after optimization"""
print("No relevant results found after optimization...")
if "conversation_history" not in state or state["conversation_history"] is None:
state["conversation_history"] = []
no_results_response = """I apologize, but I couldn't find specific information to answer your question in our current knowledge base.
This might be because:
- The information isn't available in our documentation
- Your question might need clarification
- You might need to contact our support team directly
For immediate assistance, you can reach our support team at support@techflow.com or call 1-800-TECHFLOW."""
state["conversation_history"].append(AIMessage(content=no_results_response))
return state
# Initialize conversation memory
conversation_memory = MemorySaver()
# Create workflow graph
workflow = StateGraph(ConversationState)
# Add all processing nodes
workflow.add_node("enhance_query", enhance_user_query)
workflow.add_node("validate_topic", validate_topic_relevance)
workflow.add_node("handle_off_topic", handle_off_topic_queries)
workflow.add_node("fetch_content", fetch_relevant_content)
workflow.add_node("assess_relevance", assess_document_relevance)
workflow.add_node("generate_response", generate_contextual_response)
workflow.add_node("optimize_query", optimize_search_query)
workflow.add_node("handle_no_results", handle_no_relevant_results)
# Define workflow connections
workflow.add_edge("enhance_query", "validate_topic")
# Conditional routing based on topic relevance
workflow.add_conditional_edges(
"validate_topic",
route_by_topic,
{
"fetch_content": "fetch_content",
"handle_off_topic": "handle_off_topic",
},
)
# Content processing pipeline
workflow.add_edge("fetch_content", "assess_relevance")
# Conditional routing based on document quality
workflow.add_conditional_edges(
"assess_relevance",
route_by_document_quality,
{
"generate_response": "generate_response",
"optimize_query": "optimize_query",
"handle_no_results": "handle_no_results",
},
)
# Optimization loop
workflow.add_edge("optimize_query", "fetch_content")
# Terminal nodes
workflow.add_edge("generate_response", END)
workflow.add_edge("handle_no_results", END)
workflow.add_edge("handle_off_topic", END)
# Set entry point
workflow.set_entry_point("enhance_query")
# Compile the workflow
advanced_rag_agent = workflow.compile(checkpointer=conversation_memory)
用各种场景测试我们的系统:
测试#1:
print("Testing Advanced RAG Agent\n")
# Test 1: Off-topic query
print("=== Test 1: Off-Topic Query ===")
test_input = {"current_query": HumanMessage(content="What's the weather like today?")}
result = advanced_rag_agent.invoke(
input=test_input,
config={"configurable": {"thread_id": "test_session_1"}}
)
print(f"Response: {result['conversation_history'][-1].content}\n")
Testing Advanced RAG Agent
=== Test 1: Off-Topic Query ===
Enhancing query: What's the weather like today?
First query - using original: What's the weather like today?
Validating topic relevance...
Topic classification: IRRELEVANT (Confidence: HIGH)
Routing based on topic relevance...
→ Routing to off-topic handler
Handling off-topic query...
Response: I'm specialized in helping with TechFlow Solutions services, pricing, and company information.
Your question seems to be outside my area of expertise.
I can help you with:
- Our cloud infrastructure services and pricing
- Support procedures and response times
- Company information and team details
- Security and compliance features
Is there something specific about TechFlow Solutions I can help you with?
# Test 2: On-topic query about pricingprint("=== Test 2: Service Pricing Query ===")test_input = {"current_query": HumanMessage(content="What are your support service pricing options?")}result = advanced_rag_agent.invoke( input=test_input, config={"configurable": {"thread_id": "test_session_2"}})print(f"Response: {result['conversation_history'][-1].content}\n")
=== Test 2: Service Pricing Query === Enhancing query: What are your support service pricing options?📝 First query - using original: What are your support service pricing options?🎯 Validating topic relevance...🏷️ Topic classification: RELEVANT (Confidence: HIGH)🚦 Routing based on topic relevance... → Proceeding to content retrieval📚 Fetching relevant documents...📄 Retrieved 2 documents Document 1: TechFlow Solutions offers three main service tiers... Document 2: Our cloud infrastructure services include: Virtual...🔍 Assessing document relevance...📋 Document 1: RELEVANT - The document directly answers the question by listing the names, features, and prices of the support service tiers offered by TechFlow Solutions.📋 Document 2: IRRELEVANT - The document describes pricing options for cloud infrastructure services, not support services. Therefore, it's not relevant to the question about support service pricing.✅ Final relevant documents: 1🚦 Routing based on document quality... → Generating response with relevant documents💬 Generating contextual response...📝 Generated response: We have three support service tiers available. Basic Support is $29 per month and includes email sup...Response: We have three support service tiers available. Basic Support is $29 per month and includes email support and basic troubleshooting. Professional Support is $79 per month, providing priority phone support and advanced diagnostics. Finally, Enterprise Support, at $199 per month, includes 24/7 dedicated support and custom integrations.
我们构建了一个复杂的 RAG 智能体,它远远超出了简单的问答。该系统展示了多种人工智能技术如何协同工作,以创建更智能、上下文感知和可靠的对话式人工智能。
主要创新包括:
自然对话的上下文感知问题重写
通过分类和分级进行多层质量控制
迭代细化以提高检索成功率
强大的工作流程管理,具有适当的错误处理
此架构为构建生产就绪型 RAG 应用程序提供了坚实的基础,这些应用程序可以处理复杂的多轮对话,同时保持响应的高质量和相关性。
要进一步增强此系统,考虑:
53AI,企业落地大模型首选服务商
产品:场景落地咨询+大模型应用平台+行业解决方案
承诺:免费POC验证,效果达标后再合作。零风险落地应用大模型,已交付160+中大型企业
2025-05-30
2025-06-05
2025-06-06
2025-05-19
2025-06-05
2025-05-20
2025-05-27
2025-06-05
2025-05-19
2025-06-05
2025-08-11
2025-08-05
2025-07-28
2025-07-09
2025-07-04
2025-07-01
2025-07-01
2025-07-01