支持私有化部署
AI知识库

53AI知识库

学习大模型的前沿技术与行业应用场景


构建端到端的高级RAG AGENT

发布日期:2025-08-11 12:11:08 浏览次数: 1530
作者:机器之魂

微信搜一搜,关注“机器之魂”

推荐语

用LangGraph构建高级RAG智能体,实现智能查询重写、主题验证和自适应优化,轻松应对复杂对话场景。

核心内容:
1. 传统RAG系统的局限性分析
2. 高级RAG智能体的多阶段工作流程设计
3. 基于LangGraph的具体实现步骤与环境配置

杨芳贤
53AI创始人/腾讯云(TVP)最具价值专家

在本文中我们将介绍如何使用 LangGraph 实现复杂的 RAG 智能体。该智能体能够重写用户问题、对其进行分类、验证文档相关性,甚至在最终优雅放弃前,还能用优化后的查询进行重试。

在我们深入构建高级 RAG 智能体之前,重新审视如何将 RAG 用作 LangGraph 智能体中的工具会很有帮助。

介绍

传统的 RAG(检索增强生成)系统适用于简单的问题,但难以处理复杂的对话场景。当用户提出后续问题时会发生什么,例如“定价怎么样?这些依赖于上下文的查询经常失败,因为系统缺乏对话记忆和智能查询处理。

今天,我们将构建一个高级 RAG 智能体,通过以下方式解决这些挑战:

  • 智能查询重新表述:将后续问题转换为独立查询
  • 智能主题检测:确保查询保持在我们的知识域内
  • 文档质量评估:在生成响应之前验证检索到的内容
  • 自适应查询增强:在初始尝试失败时迭代改进搜索
  • 持久对话记忆:跨多个交互维护上下文

让我们使用真实场景逐步构建此系统:一个技术支持的知识库

系统架构

我们的高级 RAG 智能体实施了复杂的多阶段工作流程:

  1. 查询增强器→ 使用对话历史记录重新表述问题 
  2. 主题验证器→确定查询是否与我们的知识领域匹配
  3. Content Retriever→ 从我们的知识库中获取相关文档 
  4. 相关性评估员→ 评估文档质量和相关性 
  5. 响应生成器 → 创建上下文答案
  6. 查询优化器→ 在需要时优化搜索(具有循环保护)

这创建了一个强大的系统,可以处理复杂的对话,同时保持质量和相关性。

实现

第 1 步:使用uv

我们使用快速的 Python 包管理器uv来快速设置环境。

1.1 创建和激活虚拟环境

uv venv rag-envsource rag-env/bin/activate

创建一个名为rag-env虚拟环境并激活它的。

1.2 安装所需的软件包

现在,安装此 RAG 智能体的核心依赖项:

uv pip install \\    langchain \\    langgraph \\    langchain-google-genai \\    langchain-community \\    python-dotenv \\    jupyterlab \\    ipykernel

1.3 向 Jupyter 注册虚拟环境

要将您的rag-env作为 Jupyter 内核:

python -m ipykernel install --user --name=rag-env --display-name "RAG Agent (uv)"

现在,您可以在 Jupyter Notebook 或 JupyterLab 中选择 RAG 代理 (uv) 作为内核。

1.4 添加 LLM API 密钥

接下来,在项目根目录中创建一个.env文件并添加您的 Gemini API 密钥:

GOOGLE_API_KEY=your_google_gemini_api_key_here

1.5 依赖项

from dotenv import load_dotenvload_dotenv()
# Core LangChain componentsfrom langchain.schema import Documentfrom langchain_google_genai import ChatGoogleGenerativeAIfrom langchain_huggingface import HuggingFaceEmbeddingsfrom langchain_community.vectorstores import Chromafrom langchain_core.prompts import ChatPromptTemplate
# Graph and state managementfrom typing import TypedDict, Listfrom langchain_core.messages import BaseMessage, HumanMessage, AIMessage, SystemMessagefrom pydantic import BaseModel, Fieldfrom langgraph.graph import StateGraph, ENDfrom langgraph.checkpoint.memory import MemorySaver

技术栈基本原理:

  • Google Gemini:用于理解复杂查询的高级推理功能
  • HuggingFace Embeddings:高质量、经济高效的嵌入生成
  • Chroma Vector DB:用于开发的轻量级、快速矢量存储
  • LangGraph:通过状态管理实现复杂的工作流程编排
  • Pydantic:确保 LLM作的结构化、经过验证的输出

第 2 步:建立我们的知识库

我们为“ TechFlow Solutions”创建一个全面的技术支持的知识库:

# Initialize our embedding modelembedding_model = HuggingFaceEmbeddings(model_name="all-MiniLM-L6-v2")
# Initialize our embedding modelembedding_model = HuggingFaceEmbeddings(model_name="all-MiniLM-L6-v2")
# Create comprehensive technology support knowledge baseknowledge_documents = [    Document(        page_content="TechFlow Solutions offers three main service tiers: Basic Support ($29/month) includes email support and basic troubleshooting, Professional Support ($79/month) includes priority phone support and advanced diagnostics, Enterprise Support ($199/month) includes 24/7 dedicated support and custom integrations.",        metadata={"source""pricing_guide.pdf""category""pricing"},    ),    Document(        page_content="Our cloud infrastructure services include: Virtual Private Servers starting at $15/month, Managed Databases from $45/month, Content Delivery Network at $0.08/GB, and Load Balancing services at $25/month. All services include 99.9% uptime guarantee.",        metadata={"source""infrastructure_pricing.pdf""category""services"},    ),    Document(        page_content="TechFlow Solutions was founded in 2018 by Maria Rodriguez, a former Google engineer with 15 years of experience in cloud architecture. The company has grown from 3 employees to over 150 team members across 12 countries, specializing in enterprise cloud solutions.",        metadata={"source""company_history.pdf""category""company"},    ),    Document(        page_content="Our technical support team operates 24/7 for Enterprise customers, business hours (9 AM - 6 PM EST) for Professional customers, and email-only support for Basic customers. Average response times: Enterprise (15 minutes), Professional (2 hours), Basic (24 hours).",        metadata={"source""support_procedures.pdf""category""support"},    )]
# Build vector databasevector_store = Chroma.from_documents(knowledge_documents, embedding_model)document_retriever = vector_store.as_retriever(search_kwargs={"k"2})

知识库设计:

  • 多样化的内容类型:涵盖定价、服务、公司信息、支持程序
  • 丰富的元数据:实现更好的文档组织和过滤
  • 现实范围:足够全面,可以演示复杂的场景
  • 业务背景:反映现实世界的企业知识管理需求

第 3 步:状态管理系统

我们的智能体使用复杂的状态管理来跟踪对话流:

class ConversationState(TypedDict):    conversation_history: List[BaseMessage]  # Full conversation thread    retrieved_documents: List[Document]      # Current retrieved documents    topic_relevance: str                     # On-topic or off-topic classification    enhanced_query: str                      # Reformulated question    should_generate: bool                    # Whether to proceed with answer generation    optimization_attempts: int              # Number of query refinement attempts    current_query: HumanMessage             # User's current question

状态架构的好处:

  • 对话连续性:跨多个回合维护上下文
  • 质量控制:跟踪文档相关性和生成决策
  • 循环预防:监控细化尝试以避免无限循环
  • 调试支持:全面的状态跟踪以进行故障排除

第 4 步:核心智能体组件

4.1 查询增强器 — 智能问题重新表述

def enhance_user_query(state: ConversationState):    """    Reformulates user questions based on conversation history to create     standalone queries optimized for vector search.    """    print(f"Enhancing query: {state['current_query'].content}")
    # Initialize state for new query processing    state["retrieved_documents"] = []    state["topic_relevance"] = ""    state["enhanced_query"] = ""    state["should_generate"] = False    state["optimization_attempts"] = 0
    # Ensure conversation history exists    if "conversation_history" not in state or state["conversation_history"is None:        state["conversation_history"] = []
    # Add current query to history if not already present    if state["current_query"not in state["conversation_history"]:        state["conversation_history"].append(state["current_query"])
    # Check if we have conversation context    if len(state["conversation_history"]) > 1:        # Extract context and current question        previous_messages = state["conversation_history"][:-1]        current_question = state["current_query"].content
        # Build context-aware prompt        context_messages = [            SystemMessage(                content="""You are an expert query reformulator. Transform the user's question into a standalone,                 search-optimized query that incorporates relevant context from the conversation history.
                Guidelines:                - Make the question self-contained and clear                - Preserve the user's intent while adding necessary context                - Optimize for vector database retrieval                - Keep the reformulated query concise but comprehensive"""            )        ]        context_messages.extend(previous_messages)        context_messages.append(HumanMessage(content=f"Current question: {current_question}"))
        # Generate enhanced query        enhancement_prompt = ChatPromptTemplate.from_messages(context_messages)        llm = ChatGoogleGenerativeAI(model="gemini-2.0-flash-exp", temperature=0.1)
        formatted_prompt = enhancement_prompt.format()        response = llm.invoke(formatted_prompt)        enhanced_question = response.content.strip()
        print(f"Enhanced query: {enhanced_question}")        state["enhanced_query"] = enhanced_question    else:        # First question in conversation - use as-is        state["enhanced_query"] = state["current_query"].content        print(f"First query - using original: {state['enhanced_query']}")
    return state

增强策略:

  • 上下文集成:将对话历史记录与当前问题相结合
  • 搜索优化:创建与矢量数据库配合良好的查询
  • 意图保留:保持用户的原始意图,同时增加清晰度
  • 效率:跳过第一个问题的增强功能以减少延迟

4.2 主题验证器 — 智能域分类

class TopicRelevance(BaseModel):    """Structured output for topic classification"""    classification: str = Field(        description="Is the question about TechFlow Solutions services/pricing/company? Answer 'RELEVANT' or 'IRRELEVANT'"    )    confidence: str = Field(        description="Confidence level: 'HIGH', 'MEDIUM', or 'LOW'"    )
def validate_topic_relevance(state: ConversationState):    """    Determines if the user's question is within our knowledge domain.    Uses the enhanced query for better classification accuracy.    """    print("Validating topic relevance...")
    classification_prompt = SystemMessage(        content="""You are a topic classifier for TechFlow Solutions support system.
        RELEVANT topics include:        - TechFlow Solutions services (cloud infrastructure, migration, DevOps)        - Pricing for any TechFlow Solutions products or services        - Company information (history, team, locations)        - Support procedures and response times        - Security and compliance features        - Technical specifications and capabilities
        IRRELEVANT topics include:        - General technology questions not specific to TechFlow        - Other companies' products or services        - Personal questions unrelated to business        - Weather, news, or general knowledge queries
        Classify based on the enhanced query which incorporates conversation context."""    )
    user_question = HumanMessage(        content=f"Enhanced query to classify: {state['enhanced_query']}"    )
    # Create classification chain    classification_chain = ChatPromptTemplate.from_messages([classification_prompt, user_question])    llm = ChatGoogleGenerativeAI(model="gemini-2.0-flash-exp", temperature=0)
    structured_llm = llm.with_structured_output(TopicRelevance)    classifier = classification_chain | structured_llm
    result = classifier.invoke({})    state["topic_relevance"] = result.classification.strip()
    print(f"Topic classification: {state['topic_relevance']} (Confidence: {result.confidence})")    return state

分类优势:

  • 领域特异性:明确定义系统可以处理哪些问题
  • 置信度评分:提供有关分类确定性的透明度
  • 上下文感知:使用增强的查询以提高准确性
  • 结构化输出:确保一致、可解析的响应

4.3 内容检索器 — 智能文档获取

def fetch_relevant_content(state: ConversationState):    """    Retrieves documents from the knowledge base using the enhanced query.    """    print("Fetching relevant documents...")
    # Use enhanced query for better retrieval    retrieved_docs = document_retriever.invoke(state["enhanced_query"])
    print(f"Retrieved {len(retrieved_docs)} documents")    for i, doc in enumerate(retrieved_docs):        print(f"   Document {i+1}{doc.page_content[:50]}...")
    state["retrieved_documents"] = retrieved_docs    return state

4.4 相关性评估员 — 文档质量控制

class DocumentRelevance(BaseModel):    """Structured output for document relevance assessment"""    relevance: str = Field(        description="Is this document relevant to answering the question? Answer 'RELEVANT' or 'IRRELEVANT'"    )    reasoning: str = Field(        description="Brief explanation of why the document is relevant or irrelevant"    )
def assess_document_relevance(state: ConversationState):    """    Evaluates each retrieved document to determine if it's relevant     for answering the user's question.    """    print("Assessing document relevance...")
    assessment_prompt = SystemMessage(        content="""You are a document relevance assessor. Evaluate whether each document         contains information that can help answer the user's question.
        A document is RELEVANT if it contains:        - Direct answers to the question        - Supporting information that contributes to a complete answer        - Context that helps understand the topic
        A document is IRRELEVANT if it:        - Contains no information related to the question        - Discusses completely different topics        - Provides no value for answering the question
        Be strict but fair in your assessment."""    )
    llm = ChatGoogleGenerativeAI(model="gemini-2.0-flash-exp", temperature=0)    structured_llm = llm.with_structured_output(DocumentRelevance)
    relevant_documents = []
    for i, doc in enumerate(state["retrieved_documents"]):        assessment_query = HumanMessage(            content=f"""Question: {state['enhanced_query']}
            Document to assess:            {doc.page_content}
            Is this document relevant for answering the question?"""        )
        assessment_chain = ChatPromptTemplate.from_messages([assessment_prompt, assessment_query])        assessor = assessment_chain | structured_llm
        result = assessor.invoke({})
        print(f"Document {i+1}{result.relevance} - {result.reasoning}")
        if result.relevance.strip().upper() == "RELEVANT":            relevant_documents.append(doc)
    # Update state with filtered documents    state["retrieved_documents"] = relevant_documents    state["should_generate"] = len(relevant_documents) > 0
    print(f"Final relevant documents: {len(relevant_documents)}")    return state

质量控制优势:

  • 精度提升:生成前过滤掉不相关的文档
  • 预防幻觉:确保答案基于相关信息
  • 透明度:为相关性决策提供推理
  • 质量保证:保持响应生成的高标准

4.5 响应生成器 — 上下文感知答案创建

def generate_contextual_response(state: ConversationState):    """    Generates final response using conversation history and relevant documents.    """    print("Generating contextual response...")
    if "conversation_history" not in state or state["conversation_history"is None:        raise ValueError("Conversation history is required for response generation")
    # Extract components for response generation    conversation_context = state["conversation_history"]    relevant_docs = state["retrieved_documents"]    enhanced_question = state["enhanced_query"]
    # Create comprehensive response template    response_template = """You are a knowledgeable TechFlow Solutions support agent. Generate a helpful,     accurate response based on the conversation history and retrieved documents.
    Guidelines:    - Use information from the provided documents to answer the question    - Maintain conversation context and refer to previous exchanges when relevant    - Be conversational and helpful in tone    - If the documents don't fully answer the question, acknowledge limitations    - Provide specific details when available (prices, timeframes, etc.)
    Conversation History:    {conversation_history}
    Retrieved Knowledge:    {document_context}
    Current Question: {current_question}
    Generate a helpful response:"""
    response_prompt = ChatPromptTemplate.from_template(response_template)    llm = ChatGoogleGenerativeAI(model="gemini-2.0-flash-exp", temperature=0.3)
    # Create response generation chain    response_chain = response_prompt | llm
    # Generate response    response = response_chain.invoke({        "conversation_history": conversation_context,        "document_context": relevant_docs,        "current_question": enhanced_question    })
    generated_response = response.content.strip()
    # Add response to conversation history    state["conversation_history"].append(AIMessage(content=generated_response))
    print(f"Generated response: {generated_response[:100]}...")    return state

4.6 查询优化器 — 自适应搜索改进

def optimize_search_query(state: ConversationState):    """    Refines the search query when initial retrieval doesn't yield relevant results.    Includes loop prevention to avoid infinite optimization cycles.    """    print("Optimizing search query...")
    current_attempts = state.get("optimization_attempts"0)
    # Prevent infinite optimization loops    if current_attempts >= 2:        print("⚠Maximum optimization attempts reached")        return state
    current_query = state["enhanced_query"]
    optimization_prompt = SystemMessage(        content="""You are a search query optimizer. The current query didn't retrieve relevant documents.
        Create an improved version that:        - Uses different keywords or synonyms        - Adjusts the query structure for better matching        - Maintains the original intent while improving searchability        - Considers alternative ways to express the same concept
        Provide only the optimized query without explanations."""    )
    optimization_request = HumanMessage(        content=f"Current query that needs optimization: {current_query}"    )
    optimization_chain = ChatPromptTemplate.from_messages([optimization_prompt, optimization_request])    llm = ChatGoogleGenerativeAI(model="gemini-2.0-flash-exp", temperature=0.2)
    formatted_prompt = optimization_chain.format()    response = llm.invoke(formatted_prompt)    optimized_query = response.content.strip()
    # Update state    state["enhanced_query"] = optimized_query    state["optimization_attempts"] = current_attempts + 1
    print(f"Optimized query (attempt {current_attempts + 1}): {optimized_query}")    return state

步骤 5.使用智能路由进行工作流编排

def route_by_topic(state: ConversationState):    """Routes based on topic relevance classification"""    print("Routing based on topic relevance...")
    relevance = state.get("topic_relevance""").strip().upper()
    if relevance == "RELEVANT":        print("   → Proceeding to content retrieval")        return "fetch_content"    else:        print("   → Routing to off-topic handler")        return "handle_off_topic"
def route_by_document_quality(state: ConversationState):    """Routes based on document relevance assessment"""    print("Routing based on document quality...")
    optimization_attempts = state.get("optimization_attempts"0)
    if state.get("should_generate"False):        print("   → Generating response with relevant documents")        return "generate_response"    elif optimization_attempts >= 2:        print("   → Maximum optimization attempts reached")        return "handle_no_results"    else:        print("   → Optimizing query for better results")        return "optimize_query"
# Helper functions for edge casesdef handle_off_topic_queries(state: ConversationState):    """Handles queries outside our knowledge domain"""    print("Handling off-topic query...")
    if "conversation_history" not in state or state["conversation_history"is None:        state["conversation_history"] = []
    off_topic_response = """I'm specialized in helping with TechFlow Solutions services, pricing, and company information.     Your question seems to be outside my area of expertise. 
    I can help you with:    - Our cloud infrastructure services and pricing    - Support procedures and response times      - Company information and team details    - Security and compliance features
    Is there something specific about TechFlow Solutions I can help you with?"""
    state["conversation_history"].append(AIMessage(content=off_topic_response))    return state
def handle_no_relevant_results(state: ConversationState):    """Handles cases where no relevant documents are found after optimization"""    print("No relevant results found after optimization...")
    if "conversation_history" not in state or state["conversation_history"is None:        state["conversation_history"] = []
    no_results_response = """I apologize, but I couldn't find specific information to answer your question in our current knowledge base. 
    This might be because:    - The information isn't available in our documentation    - Your question might need clarification    - You might need to contact our support team directly
    For immediate assistance, you can reach our support team at support@techflow.com or call 1-800-TECHFLOW."""
    state["conversation_history"].append(AIMessage(content=no_results_response))    return state

步骤 6.完整的工作流程组装

# Initialize conversation memoryconversation_memory = MemorySaver()
# Create workflow graphworkflow = StateGraph(ConversationState)
# Add all processing nodesworkflow.add_node("enhance_query", enhance_user_query)workflow.add_node("validate_topic", validate_topic_relevance)workflow.add_node("handle_off_topic", handle_off_topic_queries)workflow.add_node("fetch_content", fetch_relevant_content)workflow.add_node("assess_relevance", assess_document_relevance)workflow.add_node("generate_response", generate_contextual_response)workflow.add_node("optimize_query", optimize_search_query)workflow.add_node("handle_no_results", handle_no_relevant_results)
# Define workflow connectionsworkflow.add_edge("enhance_query""validate_topic")
# Conditional routing based on topic relevanceworkflow.add_conditional_edges(    "validate_topic",    route_by_topic,    {        "fetch_content""fetch_content",        "handle_off_topic""handle_off_topic",    },)
# Content processing pipelineworkflow.add_edge("fetch_content""assess_relevance")
# Conditional routing based on document qualityworkflow.add_conditional_edges(    "assess_relevance",    route_by_document_quality,    {        "generate_response""generate_response",        "optimize_query""optimize_query"        "handle_no_results""handle_no_results",    },)
# Optimization loopworkflow.add_edge("optimize_query""fetch_content")
# Terminal nodesworkflow.add_edge("generate_response", END)workflow.add_edge("handle_no_results", END)workflow.add_edge("handle_off_topic", END)
# Set entry pointworkflow.set_entry_point("enhance_query")
# Compile the workflowadvanced_rag_agent = workflow.compile(checkpointer=conversation_memory)
可视化工作流程:

步骤 7.测试我们的高级 RAG 智能体

用各种场景测试我们的系统:

测试#1:

print("Testing Advanced RAG Agent\n")
# Test 1: Off-topic queryprint("=== Test 1: Off-Topic Query ===")test_input = {"current_query": HumanMessage(content="What's the weather like today?")}result = advanced_rag_agent.invoke(    input=test_input,     config={"configurable": {"thread_id""test_session_1"}})print(f"Response: {result['conversation_history'][-1].content}\n")
输出:
 Testing Advanced RAG Agent
=== Test 1: Off-Topic Query === Enhancing query: What's the weather like today? First query - using original: What's the weather like today? Validating topic relevance... Topic classification: IRRELEVANT (Confidence: HIGH) Routing based on topic relevance...   → Routing to off-topic handler Handling off-topic query...Response: I'm specialized in helping with TechFlow Solutions services, pricing, and company information.    Your question seems to be outside my area of expertise.
    I can help you with:    - Our cloud infrastructure services and pricing    - Support procedures and response times    - Company information and team details    - Security and compliance features
    Is there something specific about TechFlow Solutions I can help you with?
测试#2
# Test 2: On-topic query about pricingprint("=== Test 2: Service Pricing Query ===")test_input = {"current_query": HumanMessage(content="What are your support service pricing options?")}result = advanced_rag_agent.invoke(    input=test_input,    config={"configurable": {"thread_id": "test_session_2"}})print(f"Response: {result['conversation_history'][-1].content}\n")
输出:
=== Test 2: Service Pricing Query === Enhancing query: What are your support service pricing options?📝 First query - using original: What are your support service pricing options?🎯 Validating topic relevance...🏷️  Topic classification: RELEVANT (Confidence: HIGH)🚦 Routing based on topic relevance...   → Proceeding to content retrieval📚 Fetching relevant documents...📄 Retrieved 2 documents   Document 1: TechFlow Solutions offers three main service tiers...   Document 2: Our cloud infrastructure services include: Virtual...🔍 Assessing document relevance...📋 Document 1: RELEVANT - The document directly answers the question by listing the names, features, and prices of the support service tiers offered by TechFlow Solutions.📋 Document 2: IRRELEVANT - The document describes pricing options for cloud infrastructure services, not support services. Therefore, it's not relevant to the question about support service pricing.✅ Final relevant documents: 1🚦 Routing based on document quality...   → Generating response with relevant documents💬 Generating contextual response...📝 Generated response: We have three support service tiers available. Basic Support is $29 per month and includes email sup...Response: We have three support service tiers available. Basic Support is $29 per month and includes email support and basic troubleshooting. Professional Support is $79 per month, providing priority phone support and advanced diagnostics. Finally, Enterprise Support, at $199 per month, includes 24/7 dedicated support and custom integrations.

结论

我们构建了一个复杂的 RAG 智能体,它远远超出了简单的问答。该系统展示了多种人工智能技术如何协同工作,以创建更智能、上下文感知和可靠的对话式人工智能。

主要创新包括:

  • 自然对话的上下文感知问题重写

  • 通过分类和分级进行多层质量控制

  • 迭代细化以提高检索成功率

  • 强大的工作流程管理,具有适当的错误处理

此架构为构建生产就绪型 RAG 应用程序提供了坚实的基础,这些应用程序可以处理复杂的多轮对话,同时保持响应的高质量和相关性。

后续步骤

要进一步增强此系统,考虑:

  • 添加更复杂的嵌入模型以更好地检索
  • 实施反馈循环以实现持续改进
  • 添加评估指标以衡量性能
  • 使用特定领域的分类器扩展到多个知识领域

完整代码仓库:https://github.com/piyushagni5/langgraph-ai/blob/main/rag/Building%20an%20Advanced%20RAG%20Agent.ipynb

53AI,企业落地大模型首选服务商

产品:场景落地咨询+大模型应用平台+行业解决方案

承诺:免费POC验证,效果达标后再合作。零风险落地应用大模型,已交付160+中大型企业

联系我们

售前咨询
186 6662 7370
预约演示
185 8882 0121

微信扫码

添加专属顾问

回到顶部

加载中...

扫码咨询