微信扫码
添加专属顾问
我要投稿
RAG检索结果相似度高但不可用?CRAG通过评估和纠正机制解决这一痛点,让检索结果真正精准可用。核心内容: 1. 传统RAG的三个检索问题:检索偏差、时效性缺失、记忆污染 2. CRAG的四步闭环机制:检索→评估→纠正→生成 3. 如何用langchain+Milvus搭建CRAG系统
Partition Key 实现零成本多租户隔离
原生混合检索,搞定边界场景的检索失效
JSON 字段,支持记忆结构灵活演化
fields = [FieldSchema(name="agent_id", dtype=DataType.VARCHAR, is_partition_key=True), # 多租户FieldSchema(name="dense_embedding", dtype=DataType.FLOAT_VECTOR, dim=1536), # 语义检索FieldSchema(name="sparse_embedding", dtype=DataType.SPARSE_FLOAT_VECTOR),# BM25FieldSchema(name="metadata", dtype=DataType.JSON),# 动态 Schema]# 混合检索 + 元数据过滤results = collection.hybrid_search(reqs=[AnnSearchRequest(data=[dense_vec], anns_field="dense_embedding", limit=20),AnnSearchRequest(data=[sparse_vec], anns_field="sparse_embedding", limit=20)],rerank=RRFRanker(),output_fields=["metadata"],expr='metadata["confidence"] > 0.9',# CRAG 置信度过滤limit=5)
export OPENAI_API_KEY="your-api-key"export TAVILY_API_KEY="your-tavily-key"
# filename: crag_agent.py# ============ 导入依赖 ============from typing import Literal, Listfrom langchain.agents import create_agentfrom langchain.agents.middleware import AgentMiddleware, before_model, dynamic_promptfrom langchain.chat_models import init_chat_modelfrom langchain_milvus import Milvusfrom langchain_openai import OpenAIEmbeddingsfrom langchain_core.documents import Documentfrom langchain_core.messages import SystemMessage, HumanMessagefrom langchain_community.tools.tavily_search import TavilySearchResults# ============ CRAG Middleware(最小改动版) ============class CRAGMiddleware(AgentMiddleware):"""CRAG 评估与纠正中间件(使用官方装饰器注册钩子,避免永久污染消息栈)"""def __init__(self, vector_store: Milvus, agent_id: str):super().__init__()self.vector_store = vector_storeself.agent_id = agent_id # 多租户隔离# 轻量评估器:用于相关性判定(可替换为你后文的结构化版本)self.evaluator = init_chat_model("openai:gpt-4o-mini", temperature=0)# Web 搜索托底self.web_search = TavilySearchResults(max_results=3)@before_modeldef run_crag(self, state):"""在模型调用前执行检索→评估→纠正,准备最终上下文"""# 获取最后一条用户消息last_msg = state["messages"][-1]query = getattr(last_msg, "content", "") if hasattr(last_msg, "content") else last_msg.get("content", "")# 1. 检索:从 Milvus 获取文档(PartitionKey + 置信度过滤)docs = self._retrieve_from_milvus(query)# 2. 评估:三元判决verdict = self._evaluate_relevance(query, docs)# 3. 纠正:根据判决决定处理策略if verdict == "incorrect":# 检索失败,完全依赖 Web 搜索web_results = self._web_search_fallback(query)final_context = self._format_web_results(web_results)elif verdict == "ambiguous":# 检索模糊,精炼文档 + Web 搜索补充refined_docs = self._refine_documents(docs, query)web_results = self._web_search_fallback(query)final_context = self._merge_context(refined_docs, web_results)else:# 检索质量良好,只精炼文档refined_docs = self._refine_documents(docs, query)final_context = self._format_internal_docs(refined_docs)# 4. 将上下文放入临时键,仅用于“当前模型调用”的动态提示拼接state["_crag_context"] = final_contextreturn state@dynamic_promptdef attach_context(self, state, prompt_messages: List):"""将 CRAG 合成上下文以 SystemMessage 注入到“本次模型调用”的提示前"""final_context = state.get("_crag_context")if final_context:sys_msg = SystemMessage(content=f"以下是相关背景信息,请基于这些信息回答用户问题:\n\n{final_context}")# 仅对当前调用生效,不永久写入 state["messages"]prompt_messages = [sys_msg] + prompt_messagesreturn prompt_messages# ======== 内部方法:检索 / 评估 / 精炼 / 格式化 ========def _retrieve_from_milvus(self, query: str) -> list:"""从 Milvus 检索文档(Partition Key + 置信度过滤)"""try:# 注意:不同版本的适配器对过滤参数位置不同,这里使用 search_kwargs 传递 exprdocs = self.vector_store.similarity_search(query,k=3,search_kwargs={"expr": f'agent_id == "{self.agent_id}"'})# 置信度过滤(避免低质量记忆污染)filtered_docs = [doc for doc in docsif (doc.metadata or {}).get("confidence", 0.0) > 0.7]return filtered_docs or docs # 若无高置信度,退回原结果以便 evaluator 判定except Exception as e:print(f"[CRAG] 检索失败: {e}")return []def _evaluate_relevance(self, query: str, docs: list) -> Literal["relevant", "ambiguous", "incorrect"]:"""评估文档相关性(三元判决),简化版:LLM 直接返回 verdict"""if not docs:return "incorrect"# 只评估 Top-3 文档,每个文档取前 500 字符doc_content = "\n\n".join([f"[文档{i+1}] {(doc.page_content or '')[:500]}..."for i, doc in enumerate(docs[:3])])prompt = f"""你是文档相关性评估专家。评估以下文档是否能回答查询。查询:{query}文档内容:{doc_content}评估标准:- relevant:文档直接包含答案,高度相关- ambiguous:文档部分相关,需要补充外部知识- incorrect:文档不相关,无法回答查询只返回一个词:relevant 或 ambiguous 或 incorrect"""try:result = self.evaluator.invoke(prompt)verdict = (getattr(result, "content", "") or "").strip().lower()if verdict not in {"relevant", "ambiguous", "incorrect"}:verdict = "ambiguous"return verdictexcept Exception as e:print(f"[CRAG] 评估失败: {e}")return "ambiguous"def _refine_documents(self, docs: list, query: str) -> list:"""精炼文档(简化条带:基于关键词的句子筛选)"""refined = []# 简单中文句号替换 + 英文断句的粗切keywords = [kw.strip() for kw in query.split() if kw.strip()]for doc in docs:text = doc.page_content or ""sentences = (text.replace("。", "。\n").replace(". ", ".\n").replace("! ", "!\n").replace("? ", "?\n").split("\n"))sentences = [s.strip() for s in sentences if s.strip()]# 命中任一关键词relevant_sentences = [s for s in sentencesif any(keyword in s for keyword in keywords)]if relevant_sentences:refined_text = "。".join(relevant_sentences[:3])refined.append(Document(page_content=refined_text, metadata=doc.metadata or {}))return refined if refined else docs # 若未提取到,回退原文档def _web_search_fallback(self, query: str) -> list:"""Web 搜索托底"""try:return self.web_search.invoke(query) or []except Exception as e:print(f"[CRAG] Web 搜索失败: {e}")return []def _merge_context(self, internal_docs: list, web_results: list) -> str:"""合并内部记忆与外部知识为最终上下文"""parts = []if internal_docs:parts.append("【内部记忆】")for i, doc in enumerate(internal_docs, 1):parts.append(f"{i}. {doc.page_content}")if web_results:parts.append("【外部知识】")for i, result in enumerate(web_results, 1):content = (result or {}).get("content", "")url = (result or {}).get("url", "")parts.append(f"{i}. {content}\n 来源: {url}")return "\n\n".join(parts) if parts else "未找到相关信息"def _format_internal_docs(self, docs: list) -> str:"""格式化内部文档"""if not docs:return "未找到相关信息"parts = ["【内部记忆】"]for i, doc in enumerate(docs, 1):parts.append(f"{i}. {doc.page_content}")return "\n\n".join(parts)def _format_web_results(self, results: list) -> str:"""格式化 Web 搜索结果"""if not results:return "未找到相关信息"parts = ["【外部知识】"]for i, result in enumerate(results, 1):content = (result or {}).get("content", "")url = (result or {}).get("url", "")parts.append(f"{i}. {content}\n 来源: {url}")return "\n\n".join(parts)# ============ 初始化 Milvus 向量数据库 ============vector_store = Milvus(embedding_function=OpenAIEmbeddings(),connection_args={"host": "localhost", "port": "19530"},collection_name="agent_memory")# ============ 创建 Agent ============agent = create_agent(model="openai:gpt-4o",tools=[TavilySearchResults(max_results=3)], # Web 搜索工具middleware=[CRAGMiddleware(vector_store=vector_store,agent_id="user_123_session_456" # 多租户隔离:每个 Agent 实例使用独立 ID)])# ============ 运行示例 ============if __name__ == "__main__":# 示例查询:使用 HumanMessage 以保证兼容性response = agent.invoke({"messages": [HumanMessage(content="Nike 最新季度财报中的运营成本是多少?")]})print(response["messages"][-1].content)
# filename: crag_agent.py# ============ 导入依赖 ============from typing import Literal, Listfrom langchain.agents import create_agentfrom langchain.agents.middleware import AgentMiddleware, before_model, dynamic_promptfrom langchain.chat_models import init_chat_modelfrom langchain_milvus import Milvusfrom langchain_openai import OpenAIEmbeddingsfrom langchain_core.documents import Documentfrom langchain_core.messages import SystemMessage, HumanMessagefrom langchain_community.tools.tavily_search import TavilySearchResults# ============ CRAG Middleware(最小改动版) ============class CRAGMiddleware(AgentMiddleware):"""CRAG 评估与纠正中间件(使用官方装饰器注册钩子,避免永久污染消息栈)"""def __init__(self, vector_store: Milvus, agent_id: str):super().__init__()self.vector_store = vector_storeself.agent_id = agent_id # 多租户隔离# 轻量评估器:用于相关性判定(可替换为你后文的结构化版本)self.evaluator = init_chat_model("openai:gpt-4o-mini", temperature=0)# Web 搜索托底self.web_search = TavilySearchResults(max_results=3)@before_modeldef run_crag(self, state):"""在模型调用前执行检索→评估→纠正,准备最终上下文"""# 获取最后一条用户消息last_msg = state["messages"][-1]query = getattr(last_msg, "content", "") if hasattr(last_msg, "content") else last_msg.get("content", "")# 1. 检索:从 Milvus 获取文档(PartitionKey + 置信度过滤)docs = self._retrieve_from_milvus(query)# 2. 评估:三元判决verdict = self._evaluate_relevance(query, docs)# 3. 纠正:根据判决决定处理策略if verdict == "incorrect":# 检索失败,完全依赖 Web 搜索web_results = self._web_search_fallback(query)final_context = self._format_web_results(web_results)elif verdict == "ambiguous":# 检索模糊,精炼文档 + Web 搜索补充refined_docs = self._refine_documents(docs, query)web_results = self._web_search_fallback(query)final_context = self._merge_context(refined_docs, web_results)else:# 检索质量良好,只精炼文档refined_docs = self._refine_documents(docs, query)final_context = self._format_internal_docs(refined_docs)# 4. 将上下文放入临时键,仅用于“当前模型调用”的动态提示拼接state["_crag_context"] = final_contextreturn state@dynamic_promptdef attach_context(self, state, prompt_messages: List):"""将 CRAG 合成上下文以 SystemMessage 注入到“本次模型调用”的提示前"""final_context = state.get("_crag_context")if final_context:sys_msg = SystemMessage(content=f"以下是相关背景信息,请基于这些信息回答用户问题:\n\n{final_context}")# 仅对当前调用生效,不永久写入 state["messages"]prompt_messages = [sys_msg] + prompt_messagesreturn prompt_messages# ======== 内部方法:检索 / 评估 / 精炼 / 格式化 ========def _retrieve_from_milvus(self, query: str) -> list:"""从 Milvus 检索文档(Partition Key + 置信度过滤)"""try:# 注意:不同版本的适配器对过滤参数位置不同,这里使用 search_kwargs 传递 exprdocs = self.vector_store.similarity_search(query,k=3,search_kwargs={"expr": f'agent_id == "{self.agent_id}"'})# 置信度过滤(避免低质量记忆污染)filtered_docs = [doc for doc in docsif (doc.metadata or {}).get("confidence", 0.0) > 0.7]return filtered_docs or docs # 若无高置信度,退回原结果以便 evaluator 判定except Exception as e:print(f"[CRAG] 检索失败: {e}")return []def _evaluate_relevance(self, query: str, docs: list) -> Literal["relevant", "ambiguous", "incorrect"]:"""评估文档相关性(三元判决),简化版:LLM 直接返回 verdict"""if not docs:return "incorrect"# 只评估 Top-3 文档,每个文档取前 500 字符doc_content = "\n\n".join([f"[文档{i+1}] {(doc.page_content or '')[:500]}..."for i, doc in enumerate(docs[:3])])prompt = f"""你是文档相关性评估专家。评估以下文档是否能回答查询。查询:{query}文档内容:{doc_content}评估标准:- relevant:文档直接包含答案,高度相关- ambiguous:文档部分相关,需要补充外部知识- incorrect:文档不相关,无法回答查询只返回一个词:relevant 或 ambiguous 或 incorrect"""try:result = self.evaluator.invoke(prompt)verdict = (getattr(result, "content", "") or "").strip().lower()if verdict not in {"relevant", "ambiguous", "incorrect"}:verdict = "ambiguous"return verdictexcept Exception as e:print(f"[CRAG] 评估失败: {e}")return "ambiguous"def _refine_documents(self, docs: list, query: str) -> list:"""精炼文档(简化条带:基于关键词的句子筛选)"""refined = []# 简单中文句号替换 + 英文断句的粗切keywords = [kw.strip() for kw in query.split() if kw.strip()]for doc in docs:text = doc.page_content or ""sentences = (text.replace("。", "。\n").replace(". ", ".\n").replace("! ", "!\n").replace("? ", "?\n").split("\n"))sentences = [s.strip() for s in sentences if s.strip()]# 命中任一关键词relevant_sentences = [s for s in sentencesif any(keyword in s for keyword in keywords)]if relevant_sentences:refined_text = "。".join(relevant_sentences[:3])refined.append(Document(page_content=refined_text, metadata=doc.metadata or {}))return refined if refined else docs # 若未提取到,回退原文档def _web_search_fallback(self, query: str) -> list:"""Web 搜索托底"""try:return self.web_search.invoke(query) or []except Exception as e:print(f"[CRAG] Web 搜索失败: {e}")return []def _merge_context(self, internal_docs: list, web_results: list) -> str:"""合并内部记忆与外部知识为最终上下文"""parts = []if internal_docs:parts.append("【内部记忆】")for i, doc in enumerate(internal_docs, 1):parts.append(f"{i}. {doc.page_content}")if web_results:parts.append("【外部知识】")for i, result in enumerate(web_results, 1):content = (result or {}).get("content", "")url = (result or {}).get("url", "")parts.append(f"{i}. {content}\n 来源: {url}")return "\n\n".join(parts) if parts else "未找到相关信息"def _format_internal_docs(self, docs: list) -> str:"""格式化内部文档"""if not docs:return "未找到相关信息"parts = ["【内部记忆】"]for i, doc in enumerate(docs, 1):parts.append(f"{i}. {doc.page_content}")return "\n\n".join(parts)def _format_web_results(self, results: list) -> str:"""格式化 Web 搜索结果"""if not results:return "未找到相关信息"parts = ["【外部知识】"]for i, result in enumerate(results, 1):content = (result or {}).get("content", "")url = (result or {}).get("url", "")parts.append(f"{i}. {content}\n 来源: {url}")return "\n\n".join(parts)# ============ 初始化 Milvus 向量数据库 ============vector_store = Milvus(embedding_function=OpenAIEmbeddings(),connection_args={"host": "localhost", "port": "19530"},collection_name="agent_memory")# ============ 创建 Agent ============agent = create_agent(model="openai:gpt-4o",tools=[TavilySearchResults(max_results=3)], # Web 搜索工具middleware=[CRAGMiddleware(vector_store=vector_store,agent_id="user_123_session_456" # 多租户隔离:每个 Agent 实例使用独立 ID)])# ============ 运行示例 ============if __name__ == "__main__":# 示例查询:使用 HumanMessage 以保证兼容性response = agent.invoke({"messages": [HumanMessage(content="Nike 最新季度财报中的运营成本是多少?")]})print(response["messages"][-1].content)
from pydantic import BaseModelfrom langchain.prompts import PromptTemplateclass RelevanceVerdict(BaseModel):"""评估结果的结构化输出"""verdict: Literal["relevant", "ambiguous", "incorrect"]confidence: float # 置信度分数(用于记忆质量监控)reasoning: str # 判断理由(用于调试和审核)# 注意:CRAG 论文使用微调的 T5-Large 评估器(10-20ms 延迟)# 这里使用 gpt-4o-mini 作为工程实现方案(更易部署,但延迟略高)grader_llm = ChatOpenAI(model="gpt-4o-mini", temperature=0)grader_prompt = PromptTemplate(template="""你是文档相关性评估专家。评估以下文档是否能回答查询。查询:{query}文档内容:{document}评估标准:- relevant:文档直接包含答案,置信度 > 0.9- ambiguous:文档部分相关,置信度 0.5-0.9- incorrect:文档不相关,置信度 < 0.5返回 JSON 格式:{{"verdict": "...", "confidence": 0.xx, "reasoning": "..."}}""",input_variables=["query", "document"])grader_chain = grader_prompt | grader_llm.with_structured_output(RelevanceVerdict)# 替换 CRAGMiddleware 中的 _evaluate_relevance() 方法def _evaluate_relevance(self, query: str, docs: list) -> Literal["relevant", "ambiguous", "incorrect"]:"""评估文档相关性(返回结构化结果)"""if not docs:return "incorrect"# 只评估 Top-3 文档,每个文档取前 500 字符doc_content = "\n\n".join([f"[文档{i+1}] {doc.page_content[:500]}..."for i, doc in enumerate(docs[:3])])result = grader_chain.invoke({"query": query,"document": doc_content})# 将置信度存储到日志或监控系统print(f"[CRAG 评估] verdict={result.verdict}, confidence={result.confidence:.2f}")print(f"[CRAG 推理] {result.reasoning}")# 可选:将评估结果存储到 Milvus,用于记忆质量分析self._store_evaluation_metrics(query, result)return result.verdictdef _store_evaluation_metrics(self, query: str, verdict_result: RelevanceVerdict):"""存储评估指标到 Milvus(用于记忆质量监控)"""# 示例:将评估结果存储到单独的 Collection 用于分析# 实际使用时需要创建 evaluation_metrics Collectionpass
第二,可观测性:把监控体系搭起来
第三,长期治理:严防记忆污染
作者介绍
Zilliz黄金写手:尹珉
阅读推荐 80%的 Multi-Agent都是伪需求!如何判断是否需要Multi-Agent,以及如何搭? 养虾实战教程:我用OpenClaw做了个能盯盘,也能深度复盘的投资agent Qwen3.5-397B+Milvus+ColQwen2,如何做基于PDF的多模态RAG知识库 开源|Milvus2.6又有功能上新啦!Embedding Function、N-gram、decay ranker、field-level boosting、Highlighting解读 AI互撕后code review表现会更好?Claude、Gemini、Codex、Qwen、MiniMax 最新模型测评
53AI,企业落地大模型首选服务商
产品:场景落地咨询+大模型应用平台+行业解决方案
承诺:免费POC验证,效果达标后再合作。零风险落地应用大模型,已交付160+中大型企业
2026-03-11
上下文腐烂:拖垮企业AI与LLM表现的隐患与对策
2026-03-10
从向量里逆向出原始文本和模型来源
2026-02-27
如何用 AI 做业务级 Code Review
2026-02-22
不用向量数据库的 RAG,居然跑得更准了?
2026-02-22
AIOps探索:做运维领域的RAG,如何做数据清洗
2026-02-21
Claude Code 每次都要重新探索代码?这个工具直接省下30%成本
2026-02-18
函数计算 AgentRun 重磅上线知识库功能,赋能智能体更“懂”你
2026-02-15
当RAG遇上Agent记忆:为什么相似度检索会"塌方"?
2026-01-15
2026-01-02
2025-12-23
2026-02-13
2025-12-18
2026-02-03
2026-02-03
2025-12-31
2026-01-06
2025-12-29
2026-03-11
2026-02-22
2026-02-15
2026-02-04
2026-02-03
2026-01-19
2026-01-12
2026-01-08