免费POC, 零成本试错
AI知识库

53AI知识库

学习大模型的前沿技术与行业应用场景


我要投稿

Embedding相似度虚高,如何用langchain+Milvus搭建CRAG解决?

发布日期:2026-03-11 18:49:09 浏览次数: 1518
作者:Zilliz

微信搜一搜,关注“Zilliz”

推荐语

RAG检索结果相似度高但不可用?CRAG通过评估和纠正机制解决这一痛点,让检索结果真正精准可用。

核心内容:
1. 传统RAG的三个检索问题:检索偏差、时效性缺失、记忆污染
2. CRAG的四步闭环机制:检索→评估→纠正→生成
3. 如何用langchain+Milvus搭建CRAG系统

杨芳贤
53AI创始人/腾讯云(TVP)最具价值专家
图片最近后台经常会收到一个提问,就是我的RAG经常打捞上来一个相似性分数极高,但是完全不可用的内容,到底要怎么办?
这背后,其实是个传统 RAG在设计最底层的想当然:相似度高 = 结果好。但这套逻辑在生产环境里根本站不住脚。因为高分文档可能早就过期了,或者讲的是完全不匹配的场景、甚至核心信息缺斤短两。
而 CRAG 的核心价值,就是在检索和生成之间引入了一道评估环节,对检索结果做三元判决(正确 / 模糊 / 错误),在错误信息进入大模型推理环节之前,就把它拦截、修正或者补充完整,从根源上解决检索不靠谱的问题。
接下来,本文将展示如何用langchain+Milvus搭建一套CRAG系统。

01

传统 RAG 的三个检索问题

这里先总结一下传统 RAG的几个问题。
第一,检索偏差
我们之前做了个运维助手,用户查 “如何在 Nginx 上配置 HTTPS 证书”,检索出来的 Top3 高分文档,分别讲的是 Apache 的配置方法、早就停更的旧版本教程、甚至是 HTTPS 的原理解析。可以看到,语义相似,和能解决问题,在这时候其实完全是两码事。
第二,时效性缺失
比如你查 “Python 异步编程最佳实践”,它可能会同时捞上来2018 年早就废弃的写法,和 2024 年官方推荐的写法。而哪一个更契合,则取决于你需要最新的方案,还是需要了解历史项目的演进逻辑。
第三,也是最无解的记忆污染。一旦检索到过时的 API 用法,生成了错误代码,这段错误内容又被存进了记忆库,接下来就是彻底的恶性循环:新旧版本内容共存,向量检索次次都能命中旧内容,越用越错,连回滚都找不到抓手。
也是因此,引入检索质量评估是RAG与Agent 系统的必选项。

02

CRAG:检索质量的评估与纠正机制

传统 RAG 只有 用 / 不用的二元处理,但绝大多数检索结果都是部分相关但不全对的模糊状态,二元处理要么丢有效信息,要么带进错误内容。
也是因此,CRAG (Corrective Retrieval-Augmented Generation)的核心改进,就是把传统 RAG的检索→生成两步走,改成 检索→评估→纠正→生成四步闭环。
具体来说,CRAG 会将检索结果分为三种:正确(直接精炼使用)、模糊(补充外部知识)、错误(丢弃检索结果)。
但只有做检索内容的判断,还远远不够。传统 RAG 还有个致命误区:检索到文档就直接全文塞给大模型。 token 浪费只是其次,一堆无关信息混进去,很容易就把大模型带偏,平白无故多出很多幻觉。
CRAG 的改进在于:绝不直接用检索全文,而是先做一轮知识精炼,把无效信息全筛掉,只留和查询最相关的核心内容。
原论文方案中里用了知识条带(Knowledge Strips)+ 启发式规则,不过,我们日常工程落地,用简化的关键词匹配方案就能搞定;如果是生产环境,也可以换成 LLM 摘要、结构化提取,进一步提升精炼质量。
整个精炼流程如下图所示,分三步:
文档分解(从 2000 tokens 提取 500 tokens 关键片段)、查询重写(将模糊查询改写为精确搜索词)、知识选择(对结果去重、排序、截断)。
做完精炼之后,就进入最核心的评估器环节。
这里先划个重点:不要用评估器做复杂推理,评估器的核心使命是快速筛选,在几毫秒里判断内容能不能用。
所以 CRAG 原论文里,用的是微调后的 T5-Large,而不是通用大模型。给大家列个直观的对比:
评估完之后,就会引入一个新问题,如果内部检索的资料不够,怎么办?
CRAG 设计了一套内部检索 + Web 搜索的协同机制,说人话就是当内部检索结果错误、或者信息不足的时候,系统会自动触发 Web 搜索补充最新信息,解决时效性问题。

03

Milvus:CRAG 的高性能存储引擎

聊完了 CRAG 的核心逻辑,很多朋友会问:这套机制跑起来,对底层的向量数据库有没有要求?
当然有。我们当时试了好几个向量数据库,最后线上落地用 Milvus,就是因为它的架构设计,刚好踩中了 CRAG 生产落地的三个核心刚需:多租户隔离(数百个 Agent 实例独立记忆)、混合检索(应对语义漂移)、动态 Schema(记忆结构随系统迭代演化)。

Partition Key 实现零成本多租户隔离

做 Agent 系统的都懂,你可能要同时跑几百个 Agent 实例,每个用户、每个场景的记忆必须完全独立,不能串数据。之前我们试过给每个租户建单独的 Collection,管理起来扩容、运维全是坑。
Milvus 的 Partition Key 功能,直接实现了零成本的多租户隔离。你只要在 Schema 定义时给 agent_id 字段加上is_partition_key=True,查询时系统会自动路由到对应分区,不用手动维护一大堆 Collection。
实测下来,在 1000 万向量、100 个租户的场景中,配合 Clustering Compaction,直接带来了 3-5 倍的 QPS 提升。

原生混合检索,搞定边界场景的检索失效

纯向量检索有个天然的短板:遇到用户查专有名词、型号编码(比如 "SKU-2024-X5")、精确版本号这类场景,很容易直接失效 。
Milvus 2.5 原生支持Dense 稠密向量(语义)+ Sparse 稀疏向量(BM25)+ 元数据过滤的混合检索,还自带 RRF 融合排序,不用搭多路检索再做融合,省了大量开发成本。
实测数据:100 万向量规模下,Milvus Sparse-BM25 检索延迟仅 6ms,完全不拖 CRAG 全流程的后腿。

JSON 字段,支持记忆结构灵活演化

系统不是一成不变的。随着 CRAG 评估机制的完善,我们要不断给记忆新增 confidence、verified、source 这类字段,要是用传统结构化数据库,改表结构就得停机维护,对线上业务太不友好。
Milvus 的 JSON 字段,可以灵活扩展元数据字段,想加什么直接加,完全不用停机改 Schema。
这是个极简的 Schema 定义示例
fields = [    FieldSchema(name="agent_id", dtype=DataType.VARCHAR, is_partition_key=True),  # 多租户    FieldSchema(name="dense_embedding", dtype=DataType.FLOAT_VECTOR, dim=1536),   # 语义检索    FieldSchema(name="sparse_embedding", dtype=DataType.SPARSE_FLOAT_VECTOR),# BM25FieldSchema(name="metadata", dtype=DataType.JSON),# 动态 Schema]# 混合检索 + 元数据过滤results = collection.hybrid_search(    reqs=[        AnnSearchRequest(data=[dense_vec], anns_field="dense_embedding"limit=20),        AnnSearchRequest(data=[sparse_vec], anns_field="sparse_embedding"limit=20)    ],    rerank=RRFRanker(),    output_fields=["metadata"],    expr='metadata["confidence"] > 0.9',# CRAG 置信度过滤    limit=5)
另外,Milvus还有个关键优势是平滑迁移:Milvus 提供 Lite / Standalone / Distributed 三种部署模式,代码完全兼容。我们在本地 Lite 上开发,生产环境切换到 Distributed,只需修改连接字符串。

04

基于 LangGraph middleware  +Milvus的 CRAG 教程

教程开始前,多说一句选型的思路。很多人做 CRAG,喜欢用 LangGraph 画一堆节点和边,状态流转搞的特别复杂,维护起来巨麻烦。
我们踩过这个坑之后,最后选了 LangGraph 1.0 的middleware 模式,它能在模型调用前直接拦截请求,一站式完成检索、评估、纠正全流程,不用手动管理状态图的节点和边,代码量少、可读性高,出问题也好排查。
整个流程一共分四步:
检索:从 Milvus 获取 Top-3 相关文档,自带租户隔离
评估:用轻量级模型完成三元判决,判断文档质量
纠正:根据判决结果,执行精炼、补充搜索、兜底替换策略
注入:把处理好的上下文,通过动态提示词注入给大模型
环境变量配置
export OPENAI_API_KEY="your-api-key"export TAVILY_API_KEY="your-tavily-key"
Milvus Collection 创建
在运行代码前,需要先创建 Collection 并定义 Schema:
# filename: crag_agent.py# ============ 导入依赖 ============from typing import LiteralListfrom langchain.agents import create_agentfrom langchain.agents.middleware import AgentMiddleware, before_model, dynamic_promptfrom langchain.chat_models import init_chat_modelfrom langchain_milvus import Milvusfrom langchain_openai import OpenAIEmbeddingsfrom langchain_core.documents import Documentfrom langchain_core.messages import SystemMessage, HumanMessagefrom langchain_community.tools.tavily_search import TavilySearchResults# ============ CRAG Middleware(最小改动版) ============class CRAGMiddleware(AgentMiddleware):    """CRAG 评估与纠正中间件(使用官方装饰器注册钩子,避免永久污染消息栈)"""    def __init__(self, vector_store: Milvus, agent_id: str):        super().__init__()        self.vector_store = vector_store        self.agent_id = agent_id  # 多租户隔离        # 轻量评估器:用于相关性判定(可替换为你后文的结构化版本)        self.evaluator = init_chat_model("openai:gpt-4o-mini", temperature=0)        # Web 搜索托底        self.web_search = TavilySearchResults(max_results=3)    @before_model    def run_crag(self, state):        """在模型调用前执行检索→评估→纠正,准备最终上下文"""        # 获取最后一条用户消息        last_msg = state["messages"][-1]        query = getattr(last_msg, "content"""if hasattr(last_msg, "content"else last_msg.get("content""")        # 1. 检索:从 Milvus 获取文档(PartitionKey + 置信度过滤)        docs = self._retrieve_from_milvus(query)        # 2. 评估:三元判决        verdict = self._evaluate_relevance(query, docs)        # 3. 纠正:根据判决决定处理策略        if verdict == "incorrect":            # 检索失败,完全依赖 Web 搜索            web_results = self._web_search_fallback(query)            final_context = self._format_web_results(web_results)        elif verdict == "ambiguous":            # 检索模糊,精炼文档 + Web 搜索补充            refined_docs = self._refine_documents(docs, query)            web_results = self._web_search_fallback(query)            final_context = self._merge_context(refined_docs, web_results)        else:            # 检索质量良好,只精炼文档            refined_docs = self._refine_documents(docs, query)            final_context = self._format_internal_docs(refined_docs)        # 4. 将上下文放入临时键,仅用于“当前模型调用”的动态提示拼接        state["_crag_context"] = final_context        return state    @dynamic_prompt    def attach_context(self, state, prompt_messages: List):        """将 CRAG 合成上下文以 SystemMessage 注入到“本次模型调用”的提示前"""        final_context = state.get("_crag_context")        if final_context:            sys_msg = SystemMessage(                content=f"以下是相关背景信息,请基于这些信息回答用户问题:\n\n{final_context}"            )            # 仅对当前调用生效,不永久写入 state["messages"]            prompt_messages = [sys_msg] + prompt_messages        return prompt_messages    # ======== 内部方法:检索 / 评估 / 精炼 / 格式化 ========    def _retrieve_from_milvus(self, query: str) -> list:        """从 Milvus 检索文档(Partition Key + 置信度过滤)"""        try:            # 注意:不同版本的适配器对过滤参数位置不同,这里使用 search_kwargs 传递 expr            docs = self.vector_store.similarity_search(                query,                k=3,                search_kwargs={"expr"f'agent_id == "{self.agent_id}"'}            )            # 置信度过滤(避免低质量记忆污染)            filtered_docs = [                doc for doc in docs                if (doc.metadata or {}).get("confidence"0.0) > 0.7            ]            return filtered_docs or docs  # 若无高置信度,退回原结果以便 evaluator 判定        except Exception as e:            print(f"[CRAG] 检索失败: {e}")            return []    def _evaluate_relevance(self, query: str, docs: list) -> Literal["relevant""ambiguous""incorrect"]:        """评估文档相关性(三元判决),简化版:LLM 直接返回 verdict"""        if not docs:            return "incorrect"        # 只评估 Top-3 文档,每个文档取前 500 字符        doc_content = "\n\n".join([            f"[文档{i+1}{(doc.page_content or '')[:500]}..."            for i, doc in enumerate(docs[:3])        ])        prompt = f"""你是文档相关性评估专家。评估以下文档是否能回答查询。查询:{query}文档内容:{doc_content}评估标准:- relevant:文档直接包含答案,高度相关- ambiguous:文档部分相关,需要补充外部知识- incorrect:文档不相关,无法回答查询只返回一个词:relevant 或 ambiguous 或 incorrect"""        try:            result = self.evaluator.invoke(prompt)            verdict = (getattr(result, "content"""or "").strip().lower()            if verdict not in {"relevant""ambiguous""incorrect"}:                verdict = "ambiguous"            return verdict        except Exception as e:            print(f"[CRAG] 评估失败: {e}")            return "ambiguous"    def _refine_documents(self, docs: list, query: str) -> list:        """精炼文档(简化条带:基于关键词的句子筛选)"""        refined = []        # 简单中文句号替换 + 英文断句的粗切        keywords = [kw.strip() for kw in query.split() if kw.strip()]        for doc in docs:            text = doc.page_content or ""            sentences = (                text.replace("。""。\n")                    .replace(". "".\n")                    .replace("! ""!\n")                    .replace("? ""?\n")                    .split("\n")            )            sentences = [s.strip() for s in sentences if s.strip()]            # 命中任一关键词            relevant_sentences = [                s for s in sentences                if any(keyword in s for keyword in keywords)            ]            if relevant_sentences:                refined_text = "。".join(relevant_sentences[:3])                refined.append(Document(page_content=refined_text, metadata=doc.metadata or {}))        return refined if refined else docs  # 若未提取到,回退原文档    def _web_search_fallback(self, query: str) -> list:        """Web 搜索托底"""        try:            return self.web_search.invoke(query) or []        except Exception as e:            print(f"[CRAG] Web 搜索失败: {e}")            return []    def _merge_context(self, internal_docs: list, web_results: list) -> str:        """合并内部记忆与外部知识为最终上下文"""        parts = []        if internal_docs:            parts.append("【内部记忆】")            for i, doc in enumerate(internal_docs, 1):                parts.append(f"{i}{doc.page_content}")        if web_results:            parts.append("【外部知识】")            for i, result in enumerate(web_results, 1):                content = (result or {}).get("content""")                url = (result or {}).get("url""")                parts.append(f"{i}{content}\n   来源: {url}")        return "\n\n".join(parts) if parts else "未找到相关信息"    def _format_internal_docs(self, docs: list) -> str:        """格式化内部文档"""        if not docs:            return "未找到相关信息"        parts = ["【内部记忆】"]        for i, doc in enumerate(docs, 1):            parts.append(f"{i}{doc.page_content}")        return "\n\n".join(parts)    def _format_web_results(self, results: list) -> str:        """格式化 Web 搜索结果"""        if not results:            return "未找到相关信息"        parts = ["【外部知识】"]        for i, result in enumerate(results, 1):            content = (result or {}).get("content""")            url = (result or {}).get("url""")            parts.append(f"{i}{content}\n   来源: {url}")        return "\n\n".join(parts)# ============ 初始化 Milvus 向量数据库 ============vector_store = Milvus(    embedding_function=OpenAIEmbeddings(),    connection_args={"host""localhost""port""19530"},    collection_name="agent_memory")# ============ 创建 Agent ============agent = create_agent(    model="openai:gpt-4o",    tools=[TavilySearchResults(max_results=3)],  # Web 搜索工具    middleware=[        CRAGMiddleware(            vector_store=vector_store,            agent_id="user_123_session_456"  # 多租户隔离:每个 Agent 实例使用独立 ID        )    ])# ============ 运行示例 ============if __name__ == "__main__":    # 示例查询:使用 HumanMessage 以保证兼容性    response = agent.invoke({        "messages": [            HumanMessage(content="Nike 最新季度财报中的运营成本是多少?")        ]    })    print(response["messages"][-1].content)
本文代码基于 LangChain 1.0 的 middleware 特性实现。Middleware 是 LangChain 1.0 的核心特性,但具体 API 可能随版本更新而变化。
# filename: crag_agent.py# ============ 导入依赖 ============from typing import LiteralListfrom langchain.agents import create_agentfrom langchain.agents.middleware import AgentMiddleware, before_model, dynamic_promptfrom langchain.chat_models import init_chat_modelfrom langchain_milvus import Milvusfrom langchain_openai import OpenAIEmbeddingsfrom langchain_core.documents import Documentfrom langchain_core.messages import SystemMessage, HumanMessagefrom langchain_community.tools.tavily_search import TavilySearchResults# ============ CRAG Middleware(最小改动版) ============class CRAGMiddleware(AgentMiddleware):    """CRAG 评估与纠正中间件(使用官方装饰器注册钩子,避免永久污染消息栈)"""    def __init__(self, vector_store: Milvus, agent_id: str):        super().__init__()        self.vector_store = vector_store        self.agent_id = agent_id  # 多租户隔离        # 轻量评估器:用于相关性判定(可替换为你后文的结构化版本)        self.evaluator = init_chat_model("openai:gpt-4o-mini", temperature=0)        # Web 搜索托底        self.web_search = TavilySearchResults(max_results=3)    @before_model    def run_crag(self, state):        """在模型调用前执行检索→评估→纠正,准备最终上下文"""        # 获取最后一条用户消息        last_msg = state["messages"][-1]        query = getattr(last_msg, "content"""if hasattr(last_msg, "content"else last_msg.get("content""")        # 1. 检索:从 Milvus 获取文档(PartitionKey + 置信度过滤)        docs = self._retrieve_from_milvus(query)        # 2. 评估:三元判决        verdict = self._evaluate_relevance(query, docs)        # 3. 纠正:根据判决决定处理策略        if verdict == "incorrect":            # 检索失败,完全依赖 Web 搜索            web_results = self._web_search_fallback(query)            final_context = self._format_web_results(web_results)        elif verdict == "ambiguous":            # 检索模糊,精炼文档 + Web 搜索补充            refined_docs = self._refine_documents(docs, query)            web_results = self._web_search_fallback(query)            final_context = self._merge_context(refined_docs, web_results)        else:            # 检索质量良好,只精炼文档            refined_docs = self._refine_documents(docs, query)            final_context = self._format_internal_docs(refined_docs)        # 4. 将上下文放入临时键,仅用于“当前模型调用”的动态提示拼接        state["_crag_context"] = final_context        return state    @dynamic_prompt    def attach_context(self, state, prompt_messages: List):        """将 CRAG 合成上下文以 SystemMessage 注入到“本次模型调用”的提示前"""        final_context = state.get("_crag_context")        if final_context:            sys_msg = SystemMessage(                content=f"以下是相关背景信息,请基于这些信息回答用户问题:\n\n{final_context}"            )            # 仅对当前调用生效,不永久写入 state["messages"]            prompt_messages = [sys_msg] + prompt_messages        return prompt_messages    # ======== 内部方法:检索 / 评估 / 精炼 / 格式化 ========    def _retrieve_from_milvus(self, query: str) -> list:        """从 Milvus 检索文档(Partition Key + 置信度过滤)"""        try:            # 注意:不同版本的适配器对过滤参数位置不同,这里使用 search_kwargs 传递 expr            docs = self.vector_store.similarity_search(                query,                k=3,                search_kwargs={"expr"f'agent_id == "{self.agent_id}"'}            )            # 置信度过滤(避免低质量记忆污染)            filtered_docs = [                doc for doc in docs                if (doc.metadata or {}).get("confidence"0.0) > 0.7            ]            return filtered_docs or docs  # 若无高置信度,退回原结果以便 evaluator 判定        except Exception as e:            print(f"[CRAG] 检索失败: {e}")            return []    def _evaluate_relevance(self, query: str, docs: list) -> Literal["relevant""ambiguous""incorrect"]:        """评估文档相关性(三元判决),简化版:LLM 直接返回 verdict"""        if not docs:            return "incorrect"        # 只评估 Top-3 文档,每个文档取前 500 字符        doc_content = "\n\n".join([            f"[文档{i+1}{(doc.page_content or '')[:500]}..."            for i, doc in enumerate(docs[:3])        ])        prompt = f"""你是文档相关性评估专家。评估以下文档是否能回答查询。查询:{query}文档内容:{doc_content}评估标准:- relevant:文档直接包含答案,高度相关- ambiguous:文档部分相关,需要补充外部知识- incorrect:文档不相关,无法回答查询只返回一个词:relevant 或 ambiguous 或 incorrect"""        try:            result = self.evaluator.invoke(prompt)            verdict = (getattr(result, "content"""or "").strip().lower()            if verdict not in {"relevant""ambiguous""incorrect"}:                verdict = "ambiguous"            return verdict        except Exception as e:            print(f"[CRAG] 评估失败: {e}")            return "ambiguous"    def _refine_documents(self, docs: list, query: str) -> list:        """精炼文档(简化条带:基于关键词的句子筛选)"""        refined = []        # 简单中文句号替换 + 英文断句的粗切        keywords = [kw.strip() for kw in query.split() if kw.strip()]        for doc in docs:            text = doc.page_content or ""            sentences = (                text.replace("。""。\n")                    .replace(". "".\n")                    .replace("! ""!\n")                    .replace("? ""?\n")                    .split("\n")            )            sentences = [s.strip() for s in sentences if s.strip()]            # 命中任一关键词            relevant_sentences = [                s for s in sentences                if any(keyword in s for keyword in keywords)            ]            if relevant_sentences:                refined_text = "。".join(relevant_sentences[:3])                refined.append(Document(page_content=refined_text, metadata=doc.metadata or {}))        return refined if refined else docs  # 若未提取到,回退原文档    def _web_search_fallback(self, query: str) -> list:        """Web 搜索托底"""        try:            return self.web_search.invoke(query) or []        except Exception as e:            print(f"[CRAG] Web 搜索失败: {e}")            return []    def _merge_context(self, internal_docs: list, web_results: list) -> str:        """合并内部记忆与外部知识为最终上下文"""        parts = []        if internal_docs:            parts.append("【内部记忆】")            for i, doc in enumerate(internal_docs, 1):                parts.append(f"{i}{doc.page_content}")        if web_results:            parts.append("【外部知识】")            for i, result in enumerate(web_results, 1):                content = (result or {}).get("content""")                url = (result or {}).get("url""")                parts.append(f"{i}{content}\n   来源: {url}")        return "\n\n".join(parts) if parts else "未找到相关信息"    def _format_internal_docs(self, docs: list) -> str:        """格式化内部文档"""        if not docs:            return "未找到相关信息"        parts = ["【内部记忆】"]        for i, doc in enumerate(docs, 1):            parts.append(f"{i}{doc.page_content}")        return "\n\n".join(parts)    def _format_web_results(self, results: list) -> str:        """格式化 Web 搜索结果"""        if not results:            return "未找到相关信息"        parts = ["【外部知识】"]        for i, result in enumerate(results, 1):            content = (result or {}).get("content""")            url = (result or {}).get("url""")            parts.append(f"{i}{content}\n   来源: {url}")        return "\n\n".join(parts)# ============ 初始化 Milvus 向量数据库 ============vector_store = Milvus(    embedding_function=OpenAIEmbeddings(),    connection_args={"host""localhost""port""19530"},    collection_name="agent_memory")# ============ 创建 Agent ============agent = create_agent(    model="openai:gpt-4o",    tools=[TavilySearchResults(max_results=3)],  # Web 搜索工具    middleware=[        CRAGMiddleware(            vector_store=vector_store,            agent_id="user_123_session_456"  # 多租户隔离:每个 Agent 实例使用独立 ID        )    ])# ============ 运行示例 ============if __name__ == "__main__":    # 示例查询:使用 HumanMessage 以保证兼容性    response = agent.invoke({        "messages": [            HumanMessage(content="Nike 最新季度财报中的运营成本是多少?")        ]    })    print(response["messages"][-1].content)
plaintext 评估器的进阶优化
上述代码中的_evaluate_relevance() 方法采用了简化实现,适合快速验证。如果需要更完善的评估器(包含置信度和可解释性),可以采用以下实现:
from pydantic import BaseModelfrom langchain.prompts import PromptTemplateclass RelevanceVerdict(BaseModel):    """评估结果的结构化输出"""    verdict: Literal["relevant""ambiguous""incorrect"]    confidence: float  # 置信度分数(用于记忆质量监控)    reasoning: str     # 判断理由(用于调试和审核)# 注意:CRAG 论文使用微调的 T5-Large 评估器(10-20ms 延迟)# 这里使用 gpt-4o-mini 作为工程实现方案(更易部署,但延迟略高)grader_llm = ChatOpenAI(model="gpt-4o-mini", temperature=0)grader_prompt = PromptTemplate(    template="""你是文档相关性评估专家。评估以下文档是否能回答查询。查询:{query}文档内容:{document}评估标准:- relevant:文档直接包含答案,置信度 > 0.9- ambiguous:文档部分相关,置信度 0.5-0.9- incorrect:文档不相关,置信度 < 0.5返回 JSON 格式:{{"verdict": "...", "confidence": 0.xx, "reasoning": "..."}}""",    input_variables=["query""document"])grader_chain = grader_prompt | grader_llm.with_structured_output(RelevanceVerdict)# 替换 CRAGMiddleware 中的 _evaluate_relevance() 方法def _evaluate_relevance(self, query: str, docs: list) -> Literal["relevant""ambiguous""incorrect"]:    """评估文档相关性(返回结构化结果)"""    if not docs:        return "incorrect"
    # 只评估 Top-3 文档,每个文档取前 500 字符    doc_content = "\n\n".join([        f"[文档{i+1}{doc.page_content[:500]}..."         for i, doc in enumerate(docs[:3])    ])
    result = grader_chain.invoke({        "query": query,         "document": doc_content    })
    # 将置信度存储到日志或监控系统    print(f"[CRAG 评估] verdict={result.verdict}, confidence={result.confidence:.2f}")    print(f"[CRAG 推理] {result.reasoning}")
    # 可选:将评估结果存储到 Milvus,用于记忆质量分析    self._store_evaluation_metrics(query, result)
    return result.verdictdef _store_evaluation_metrics(self, query: str, verdict_result: RelevanceVerdict):    """存储评估指标到 Milvus(用于记忆质量监控)"""    # 示例:将评估结果存储到单独的 Collection 用于分析    # 实际使用时需要创建 evaluation_metrics Collection    pass
知识精炼和托底的实现比较简单:文档精炼提取包含查询关键词的句子,Web 搜索在检索失败时触发 Tavily 补充外部知识。关键是在 merge 节点合并内部记忆和外部知识,形成最终上下文。

05 

尾声

最后,分享一些CRAG 的生产部署建议,大家将 CRAG 部署到生产环境,可以重点关注三个问题:
第一,成本控制:一定要选对评估器
评估器是 CRAG 流程里调用频次最高的环节,选型直接决定了系统的延迟和成本。
如果是高并发的线上生产场景,优先选微调的 T5-Large 这类轻量级模型,延迟 10-20ms,成本可控;如果是快速验证、小流量场景,用 gpt-4o-mini 这类托管模型,不用自己运维,落地快,就是延迟和成本会高一些。

第二,可观测性:把监控体系搭起来

线上系统,看不见的问题才是最致命的。但好在,Milvus 原生支持 Prometheus 指标,可以重点盯这三个核心数据:milvus_query_latency_seconds(查询延迟分布)、milvus_search_qps(每秒查询数)、milvus_insert_throughput(写入吞吐量),这是检索环节的生命线。
另一方面,CRAG 的评估判决分布、Web 搜索触发率、置信度分布,也必须接入监控。不然线上出了问题,都不知道是检索崩了,还是评估器判歪了。

第三,长期治理:严防记忆污染

Agent 跑的越久,记忆库里的垃圾信息就越多,可以提前引入以下机制:
前置过滤:检索时就只返回 confidence>0.7 的高置信度记忆,从源头减少垃圾信息进入流程
时间衰减:给记忆加半衰期权重(建议 30 天,可根据业务场景调整),越旧的记忆检索权重越低,避免过期内容长期霸榜
定期清理:设置定时任务,每周删除低置信度、从未被验证过的旧记忆,给记忆库定期瘦身,从根源上避免恶性循环。

作者介绍

图片

Zilliz黄金写手:尹珉

阅读推荐
80%的 Multi-Agent都是伪需求!如何判断是否需要Multi-Agent,以及如何搭?
养虾实战教程:我用OpenClaw做了个能盯盘,也能深度复盘的投资agent
Qwen3.5-397B+Milvus+ColQwen2,如何做基于PDF的多模态RAG知识库
开源|Milvus2.6又有功能上新啦!Embedding Function、N-gram、decay ranker、field-level boosting、Highlighting解读
AI互撕后code review表现会更好?Claude、Gemini、Codex、Qwen、MiniMax 最新模型测评
图片
图片

53AI,企业落地大模型首选服务商

产品:场景落地咨询+大模型应用平台+行业解决方案

承诺:免费POC验证,效果达标后再合作。零风险落地应用大模型,已交付160+中大型企业

联系我们

售前咨询
186 6662 7370
预约演示
185 8882 0121

微信扫码

添加专属顾问

回到顶部

加载中...

扫码咨询