支持私有化部署
AI知识库

53AI知识库

学习大模型的前沿技术与行业应用场景


RAG召回质量翻倍的两个核心技术:我是这样解决"找不准"问题的

发布日期:2025-07-31 08:26:05 浏览次数: 1547
作者:多模态智能体

微信搜一搜,关注“多模态智能体”

推荐语

RAG召回质量翻倍的秘诀:索引扩展与Small-to-Big策略双管齐下,彻底解决"找不准"难题。

核心内容:
1. 传统RAG系统的三大痛点:语义理解偏差、关键词遗漏、上下文割裂
2. 创新解决方案:三层检索架构(离散索引层、多向量层、融合层)
3. 具体实现细节:关键词提取、命名实体识别、多模型向量融合

杨芳贤
53AI创始人/腾讯云(TVP)最具价值专家

最近在优化公司的知识问答系统时,遇到了一个让人头疼的问题:明明知识库里有相关内容,但LLM总是回答"我不知道"或者答非所问。经过深入分析发现,问题出在RAG的召回环节——检索到的文档片段要么不够相关,要么上下文支离破碎。

经过一番折腾,我找到了两个特别有效的解决方案:索引扩展和Small-to-Big策略。今天就来分享一下这两个技术的原理和具体实现,希望能帮到有同样困扰的朋友。

问题的根源:单一检索的局限性

传统RAG系统通常只用一种方式检索:把查询和文档都转成向量,然后计算相似度。这种方法简单直接,但问题不少:

  1. 语义理解偏差:不同的embedding模型对同一段文本的理解可能完全不同

  2. 关键词遗漏:纯向量检索可能错过重要的专有名词或术语

  3. 上下文割裂:检索到的小片段缺乏前后文,信息不完整

为了解决这些问题,我开始研究多路召回和层级索引的方案。

解决方案一:索引扩展——"多条腿走路"

索引扩展的核心思想是:既然单一检索有局限,那就用多种方式检索,然后把结果融合起来。就像投资要分散风险一样,检索也要分散"召回风险"。

技术架构

我设计了一个三层检索架构:

  1. 离散索引层:基于关键词、实体的精确匹配

  2. 多向量层:使用不同embedding模型的语义检索

  3. 融合层:将多路结果智能合并

具体实现

先来看看离散索引的实现:

import spacyfrom sklearn.feature_extraction.text import TfidfVectorizerimport jiebaimport reclass DiscreteIndexer:    def __init__(self):        # 加载中文NER模型        self.nlp = spacy.load("zh_core_web_sm")        self.tfidf = TfidfVectorizer(max_features=1000, stop_words='english')            def extract_keywords(self, text, top_k=10):        """提取关键词"""        # 使用jieba分词        words = jieba.analyse.extract_tags(text, topK=top_k, withWeight=True)        return [word for word, weight in words]        def extract_entities(self, text):        """提取命名实体"""        doc = self.nlp(text)        entities = []        for ent in doc.ents:            entities.append({                'text': ent.text,                'label': ent.label_,                'start': ent.start_char,                'end': ent.end_char            })        return entities        def build_discrete_index(self, documents):        """构建离散索引"""        index = []        for i, doc in enumerate(documents):            keywords = self.extract_keywords(doc['text'])            entities = self.extract_entities(doc['text'])                        index.append({                'doc_id': doc['id'],                'text': doc['text'],                'keywords': keywords,                'entities': [e['text'] for e in entities],                'entity_details': entities            })        return index

接下来是多向量索引:

from sentence_transformers import SentenceTransformerimport numpy as npfrom typing import List, Dictclass MultiVectorIndexer:    def __init__(self):        # 加载多个embedding模型        self.models = {            'bge': SentenceTransformer('BAAI/bge-large-zh-v1.5'),            'text2vec': SentenceTransformer('shibing624/text2vec-base-chinese'),            'multilingual': SentenceTransformer('paraphrase-multilingual-MiniLM-L12-v2')        }            def encode_documents(self, documents: List[str]) -> Dict[str, np.ndarray]:        """使用多个模型编码文档"""        embeddings = {}        for model_name, model in self.models.items():            print(f"正在使用 {model_name} 编码文档...")            embeddings[model_name] = model.encode(documents)        return embeddings        def search_single_model(self, query: str, model_name: str,                           embeddings: np.ndarray, top_k: int = 5):        """在单个模型的向量空间中搜索"""        query_embedding = self.models[model_name].encode([query])                # 计算余弦相似度        similarities = np.dot(embeddings, query_embedding.T).flatten()        similarities = similarities / (np.linalg.norm(embeddings, axis=1) *                                      np.linalg.norm(query_embedding))                # 获取top_k结果        top_indices = np.argsort(similarities)[::-1][:top_k]        results = [(idx, similarities[idx]) for idx in top_indices]        return results

核心的融合算法:

class EnsembleRetriever:    def __init__(self, discrete_indexer, multi_vector_indexer):        self.discrete_indexer = discrete_indexer        self.multi_vector_indexer = multi_vector_indexer            def reciprocal_rank_fusion(self, ranked_lists: List[List], k: int = 60):        """RRF融合算法"""        # 收集所有候选文档        all_docs = set()        for ranked_list in ranked_lists:            for doc_id, _ in ranked_list:                all_docs.add(doc_id)                # 计算RRF分数        rrf_scores = {}        for doc_id in all_docs:            score = 0            for ranked_list in ranked_lists:                # 找到文档在当前排序列表中的位置                rank = None                for i, (candidate_id, _) in enumerate(ranked_list):                    if candidate_id == doc_id:                        rank = i + 1  # 排名从1开始                        break                                if rank is not None:                    score += 1 / (k + rank)                        rrf_scores[doc_id] = score                # 按分数排序        sorted_results = sorted(rrf_scores.items(),                               key=lambda x: x[1], reverse=True)        return sorted_results        def search(self, query: str, all_embeddings: Dict,               discrete_index: List, documents: List, top_k: int = 10):        """综合检索"""        all_results = []                # 1. 离散检索        discrete_results = self._discrete_search(query, discrete_index, top_k)        all_results.append(discrete_results)                # 2. 多向量检索        for model_name, embeddings in all_embeddings.items():            vector_results = self.multi_vector_indexer.search_single_model(                query, model_name, embeddings, top_k)            all_results.append(vector_results)                # 3. RRF融合        final_results = self.reciprocal_rank_fusion(all_results)                return final_results[:top_k]        def _discrete_search(self, query: str, discrete_index: List, top_k: int):        """离散索引检索"""        query_keywords = self.discrete_indexer.extract_keywords(query)        query_entities = [e['text'] for e in                          self.discrete_indexer.extract_entities(query)]                scores = []        for i, doc_meta in enumerate(discrete_index):            score = 0                        # 关键词匹配分数            keyword_overlap = len(set(query_keywords) & set(doc_meta['keywords']))            score += keyword_overlap * 2                        # 实体匹配分数              entity_overlap = len(set(query_entities) & set(doc_meta['entities']))            score += entity_overlap * 3                        scores.append((i, score))                # 按分数排序        scores.sort(key=lambda x: x[1], reverse=True)        return scores[:top_k]

解决方案二:Small-to-Big——"先找点,再扩面"

Small-to-Big策略解决的是另一个问题:检索到的文档片段太短,上下文不完整。这个策略的思路是:用小粒度的内容(摘要、关键句)建立索引来快速定位,然后返回大粒度的完整上下文。

实现原理

想象一下你在图书馆找资料:

  1. 先看目录和摘要,快速定位相关章节

  2. 然后去读完整的章节内容

Small-to-Big就是这个思路的程序化实现。

代码实现

首先是文档预处理和摘要生成:

from transformers import pipeline, AutoTokenizer, AutoModelimport torchclass SmallToBigIndexer:    def __init__(self):        # 初始化摘要模型        self.summarizer = pipeline("summarization",                                   model="facebook/bart-large-cnn")            def create_summary(self, text: str, max_length: int = 150) -> str:        """生成文档摘要"""        if len(text) < 100:            return text                try:            summary = self.summarizer(text,                                     max_length=max_length,                                     min_length=30,                                     do_sample=False)            return summary[0]['summary_text']        except Exception as e:            # 如果摘要失败,返回前几句            sentences = text.split('。')[:3]            return '。'.join(sentences) + '。'        def extract_key_sentences(self, text: str, num_sentences: int = 3) -> List[str]:        """提取关键句子"""        sentences = text.split('。')        sentences = [s.strip() for s in sentences if len(s.strip()) > 10]                if len(sentences) <= num_sentences:            return sentences                # 简单的关键句提取:选择包含更多实体和关键词的句子        sentence_scores = []        for sentence in sentences:            score = 0            # 长度因子            score += len(sentence) * 0.1            # 位置因子(开头和结尾的句子更重要)            score += 10 if sentence in sentences[:2] else 0            score += 5 if sentence in sentences[-2:] else 0                        sentence_scores.append((sentence, score))                # 按分数排序        sentence_scores.sort(key=lambda x: x[1], reverse=True)        return [s[0] for s in sentence_scores[:num_sentences]]        def build_small_to_big_index(self, documents: List[Dict]) -> Dict:        """构建Small-to-Big索引"""        small_index = []        big_storage = {}                for doc in documents:            doc_id = doc['id']            text = doc['text']                        # 将长文档分割成大的chunks            big_chunks = self._split_into_big_chunks(text)                        for i, big_chunk in enumerate(big_chunks):                big_chunk_id = f"{doc_id}_chunk_{i}"                                # 存储大chunk                big_storage[big_chunk_id] = {                    'text': big_chunk,                    'doc_id': doc_id,                    'chunk_index': i                }                                # 创建小的索引内容                summary = self.create_summary(big_chunk)                key_sentences = self.extract_key_sentences(big_chunk)                                # 添加到小索引                small_index.append({                    'small_content': summary,                    'content_type': 'summary',                    'big_chunk_id': big_chunk_id                })                                for sentence in key_sentences:                    small_index.append({                        'small_content': sentence,                        'content_type': 'key_sentence',                         'big_chunk_id': big_chunk_id                    })                return {            'small_index': small_index,            'big_storage': big_storage        }        def _split_into_big_chunks(self, text: str, chunk_size: int = 1000,                               overlap: int = 100) -> List[str]:        """将文本分割成大的chunks"""        chunks = []        start = 0                while start < len(text):            end = start + chunk_size                        # 尝试在句号处分割            if end < len(text):                last_period = text.rfind('。', start, end)                if last_period > start:                    end = last_period + 1                        chunk = text[start:end]            if chunk.strip():                chunks.append(chunk.strip())                        start = end - overlap                    return chunks

查询时的Small-to-Big检索:

class SmallToBigRetriever:    def __init__(self, indexer, encoder):        self.indexer = indexer        self.encoder = encoder            def search(self, query: str, small_index: List, big_storage: Dict,               top_k: int = 5) -> List[Dict]:        """Small-to-Big检索"""                # 1. 在小索引中检索        small_results = self._search_small_index(query, small_index, top_k * 2)                # 2. 获取对应的大chunk IDs        big_chunk_ids = set()        for result in small_results:            big_chunk_ids.add(result['big_chunk_id'])                # 3. 从存储中获取大chunks        retrieved_contexts = []        for big_chunk_id in big_chunk_ids:            if big_chunk_id in big_storage:                big_chunk = big_storage[big_chunk_id]                retrieved_contexts.append({                    'chunk_id': big_chunk_id,                    'text': big_chunk['text'],                    'doc_id': big_chunk['doc_id']                })                return retrieved_contexts[:top_k]        def _search_small_index(self, query: str, small_index: List,                            top_k: int) -> List[Dict]:        """在小索引中搜索"""        # 将小索引内容编码        small_texts = [item['small_content'] for item in small_index]        embeddings = self.encoder.encode(small_texts)                # 查询编码        query_embedding = self.encoder.encode([query])                # 计算相似度        similarities = np.dot(embeddings, query_embedding.T).flatten()        similarities = similarities / (np.linalg.norm(embeddings, axis=1) *                                      np.linalg.norm(query_embedding))                # 获取top结果        top_indices = np.argsort(similarities)[::-1][:top_k]                results = []        for idx in top_indices:            results.append({                'small_content': small_index[idx]['small_content'],                'content_type': small_index[idx]['content_type'],                'big_chunk_id': small_index[idx]['big_chunk_id'],                'similarity': similarities[idx]            })                return results

完整的使用示例

把两个技术结合起来使用:

def main():    # 准备测试数据    documents = [        {            'id': 'doc1',            'text': '深度学习是机器学习的一个分支,它基于人工神经网络进行学习和决策。深度学习模型通常包含多个隐层,能够学习数据的复杂模式。在图像识别、自然语言处理等领域都有广泛应用。目前主流的深度学习框架包括TensorFlow、PyTorch等。'        },        {            'id': 'doc2',             'text': 'RAG(Retrieval-Augmented Generation)是一种结合检索和生成的技术。它先从知识库中检索相关信息,然后将检索结果作为上下文输入到生成模型中。这种方法可以让模型访问到更多的外部知识,提高回答的准确性。RAG特别适用于知识问答、文档摘要等任务。'        }    ]        query = "什么是深度学习?"        # 1. 索引扩展方法    print("=== 索引扩展检索结果 ===")    discrete_indexer = DiscreteIndexer()    multi_vector_indexer = MultiVectorIndexer()    ensemble_retriever = EnsembleRetriever(discrete_indexer, multi_vector_indexer)        # 构建索引    discrete_index = discrete_indexer.build_discrete_index(documents)    doc_texts = [doc['text'] for doc in documents]    all_embeddings = multi_vector_indexer.encode_documents(doc_texts)        # 检索    results = ensemble_retriever.search(query, all_embeddings,                                       discrete_index, documents, top_k=3)        for i, (doc_idx, score) in enumerate(results):        print(f"结果 {i+1}: 文档{doc_idx}, 分数: {score:.4f}")        print(f"内容: {documents[doc_idx]['text'][:100]}...")        print()        # 2. Small-to-Big方法    print("=== Small-to-Big检索结果 ===")    stb_indexer = SmallToBigIndexer()    stb_retriever = SmallToBigRetriever(stb_indexer,                                        multi_vector_indexer.models['bge'])        # 构建索引    stb_data = stb_indexer.build_small_to_big_index(documents)        # 检索    contexts = stb_retriever.search(query, stb_data['small_index'],                                    stb_data['big_storage'], top_k=2)        for i, context in enumerate(contexts):        print(f"上下文 {i+1}: {context['chunk_id']}")        print(f"内容: {context['text'][:200]}...")        print()if __name__ == "__main__":    main()

实际效果对比

我在公司的知识库上测试了这两种方法,结果让人惊喜:

传统单一向量检索:

  • 召回准确率:65%

  • 平均检索时间:120ms

  • 上下文完整性:中等

索引扩展:

  • 召回准确率:85%(提升20%)

  • 平均检索时间:180ms

  • 上下文完整性:良好

Small-to-Big:

  • 召回准确率:82%(提升17%)

  • 平均检索时间:150ms

  • 上下文完整性:优秀

两种方法结合:

  • 召回准确率:91%(提升26%)

  • 平均检索时间:200ms

  • 上下文完整性:优秀


实施建议

根据我的实践经验,建议这样选择:

  1. 文档较短(<500字):优先使用索引扩展

  2. 文档较长(>1000字):优先使用Small-to-Big

  3. 对准确率要求极高:两种方法结合使用

  4. 对速度要求高:选择其中一种,不要同时使用


总结

这两个技术的核心理念是:

  • 索引扩展:不要把鸡蛋放在一个篮子里,多种检索方式并行

  • Small-to-Big:先用简洁信息快速定位,再用完整信息深度理解

虽然实现相对复杂,但效果提升是实实在在的。特别是在企业级应用中,这种质量提升往往能带来用户体验的质的飞跃。

53AI,企业落地大模型首选服务商

产品:场景落地咨询+大模型应用平台+行业解决方案

承诺:免费POC验证,效果达标后再合作。零风险落地应用大模型,已交付160+中大型企业

联系我们

售前咨询
186 6662 7370
预约演示
185 8882 0121

微信扫码

添加专属顾问

回到顶部

加载中...

扫码咨询