微信扫码
添加专属顾问
我要投稿
深入解析RAGFlow开源RAG引擎的检索召回机制,了解其如何结合LLM实现深度文档理解与问答能力。 核心内容: 1. RAGFlow整体架构与核心特点 2. 检索召回机制的源码分析 3. 查询处理、向量化及检索流程详细解读
RAGFlow 是一个基于深度文档理解的开源 RAG(检索增强生成)引擎。接上文:关于 RAGFlow 项目中 RAG 技术的实现分析。本文将基于对 RAGFlow 源代码的分析,详细解析 RAGFlow 在检索召回(Retrieval)环节的实现机制。
根据 GitHub 仓库首页的介绍,RAGFlow 是一个"基于深度文档理解的开源 RAG 引擎",它提供了一个流线型的 RAG 工作流,结合大型语言模型(LLM)提供真实的问答能力,并通过各种复杂格式数据的引用支持回答。
RAGFlow 的核心特点包括:
通过分析 RAGFlow 的源代码 rag/nlp/search.py
、rag/nlp/query.py
等核心文件,来详细了解其检索召回机制的实现。
RAGFlow 的检索流程主要包括以下几个步骤:
在rag/nlp/query.py
中,FulltextQueryer
类负责处理用户查询,将其转换为可用于检索的形式:
def question(self, txt, tbl="qa", min_match: float = 0.6):
txt = FulltextQueryer.add_space_between_eng_zh(txt)
txt = re.sub(
r"[ :|\\r\\n\\t,,。??/`!!&^%%()\\[\\]{}<>]+", " ",
rag_tokenizer.tradi2simp(rag_tokenizer.strQ2B(txt.lower())),
).strip()
otxt = txt
txt = FulltextQueryer.rmWWW(txt)
ifnotself.isChinese(txt):
txt = FulltextQueryer.rmWWW(txt)
tks = rag_tokenizer.tokenize(txt).split()
keywords = [t for t in tks if t]
tks_w = self.tw.weights(tks, preprocess=False)
# ...处理权重和同义词
return MatchTextExpr(self.query_fields, query, 100, {"minimum_should_match": min_match}), keywords
这段代码展示了 RAGFlow 如何处理用户查询:
在rag/nlp/search.py
中,Dealer
类的get_vector
方法负责将查询文本转换为向量:
def get_vector(self, txt, emb_mdl, topk=10, similarity=0.1):
qv, _ = emb_mdl.encode_queries(txt)
shape = np.array(qv).shape
if len(shape) > 1:
raise Exception(
f"Dealer.get_vector returned array's shape {shape} doesn't match expectation(exact one dimension).")
embedding_data = [get_float(v) for v in qv]
vector_column_name = f"q_{len(embedding_data)}_vec"
return MatchDenseExpr(vector_column_name, embedding_data, 'float', 'cosine', topk, {"similarity": similarity})
这段代码展示了如何使用嵌入模型将查询文本转换为向量,并创建用于向量匹配的表达式。
RAGFlow 采用了混合检索策略,结合了向量检索和全文检索。在rag/nlp/search.py
的search
方法中可以看到这一实现:
def search(self, req, idx_names: str | list[str], kb_ids: list[str], emb_mdl=None, highlight=False, rank_feature: dict | None = None ):
# ...前置处理
ifnot qst:
# 处理无查询情况
# ...
else:
highlightFields = ["content_ltks", "title_tks"] if highlight else []
matchText, keywords = self.qryr.question(qst, min_match=0.3)
if emb_mdl isNone:
# 仅使用全文检索
matchExprs = [matchText]
res = self.dataStore.search(src, highlightFields, filters, matchExprs, orderBy, offset, limit, idx_names, kb_ids, rank_feature=rank_feature)
# ...
else:
# 混合检索:结合向量检索和全文检索
matchDense = self.get_vector(qst, emb_mdl, topk, req.get("similarity", 0.1))
q_vec = matchDense.embedding_data
src.append(f"q_{len(q_vec)}_vec")
fusionExpr = FusionExpr("weighted_sum", topk, {"weights": "0.05, 0.95"})
matchExprs = [matchText, matchDense, fusionExpr]
res = self.dataStore.search(src, highlightFields, filters, matchExprs, orderBy, offset, limit, idx_names, kb_ids, rank_feature=rank_feature)
# ...
这段代码揭示了 RAGFlow 的混合检索策略:
matchText
matchDense
FusionExpr
融合两种检索结果,权重分别为 0.05 和 0.95,表明向量检索在混合检索中占主导地位RAGFlow 的重排序机制是其检索找回流程中的关键环节,主要通过rerank
函数实现。在rag/nlp/search.py
中,rerank
函数负责对初步检索结果进行多维度重排序:
def rerank(self, sres, query, tkweight=0.3,
vtweight=0.7, rank_feature: dict | None = None):
"""
对检索结果进行重排序
参数:
- sres: 搜索结果
- query: 查询文本
- tkweight: 词元相似度权重
- vtweight: 向量相似度权重
- rank_feature: 排序特征
返回:
- 重排序后的结果
"""
_, keywords = self.qryr.question(query)
vector_size = len(sres.query_vector)
vector_column = f"q_{vector_size}_vec"
zero_vector = [0.0] * vector_size
ins_embd = []
# 提取文档向量
for chunk_id in sres.ids:
vector = sres.field[chunk_id].get(vector_column, zero_vector)
ifisinstance(vector, str):
vector = [get_float(v) for v in vector.split("\t")]
ins_embd.append(vector)
ifnot ins_embd:
return [], [], []
# 处理重要关键词
for i in sres.ids:
ifisinstance(sres.field[i].get("important_kwd", []), str):
sres.field[i]["important_kwd"] = [sres.field[i]["important_kwd"]]
# 提取文本特征
ins_tw = []
for i in sres.ids:
content_ltks = list(OrderedDict.fromkeys(sres.field[i]["field"].split()))
title_tks = [t for t in sres.field[i].get("title_tks", "").split() if t]
question_tks = [t for t in sres.field[i].get("question_tks", "").split() if t]
important_kwd = sres.field[i].get("important_kwd", [])
# 计算加权分数
tks = content_ltks + title_tks * 2 + important_kwd * 5 + question_tks * 6
ins_tw.append(tks)
# 计算排序特征分数
rank_fea = self._rank_feature_scores(rank_feature, sres)
# 计算混合相似度
sim, tksim, vtsim = self.qryr.hybrid_similarity(
sres.query_vector,
ins_embd,
keywords,
ins_tw, tkweight, vtweight)
# 返回综合排序分数
return sim + rank_fea, tksim, vtsim
这个函数展示了 RAGFlow 的重排序机制如何工作:
_rank_feature_scores
函数计算额外的排序特征分数tkweight
和vtweight
参数控制词元相似度和向量相似度的权重比例hybrid_similarity
函数计算查询与文档间的混合相似度_rank_feature_scores
函数负责计算基于标签和 PageRank 的排序特征分数:
def _rank_feature_scores(self, query_rfea, search_res):
"""计算排序特征分数"""
## For rank feature(tag_fea) scores.
rank_fea = []
pageranks = []
# 提取PageRank分数
for chunk_id in search_res.ids:
pageranks.append(search_res.field[chunk_id].get(PAGERANK_FLD, 0))
pageranks = np.array(pageranks, dtype=float)
# 如果没有查询特征,直接返回PageRank分数
ifnot query_rfea:
return np.array([0for _ inrange(len(search_res.ids))]) + pageranks
# 计算查询特征与文档标签的相似度
q_denor = np.sqrt(np.sum([s*s for s,t in query_rfea.items() if t != PAGERANK_FLD]))
for i in search_res.ids:
nor, denor = 0, 0
# 如果文档没有标签,添加0分
ifnot search_res.field[i].get(TAG_FLD):
rank_fea.append(0)
continue
# 计算标签相似度分数
for t, sc ineval(search_res.field[i].get(TAG_FLD, "{}"))).items():
if t in query_rfea:
nor += query_rfea[t] * sc
denor += sc * sc
# 归一化分数
if denor == 0:
rank_fea.append(0)
else:
rank_fea.append((nor/np.sqrt(denor)/q_denor))
# 返回标签相似度分数与PageRank分数的加权和
return np.array(rank_fea)*10. + pageranks
此外,RAGFlow 还提供了基于模型的重排序功能,通过rerank_by_model
函数实现:
def rerank_by_model(self, rerank_mdl, sres, query, tkweight=0.3,
vtweight=0.7, cfield="content_ltks",
rank_feature: dict | None = None):
"""
使用外部模型进行重排序
参数:
- rerank_mdl: 重排序模型
- sres: 搜索结果
- query: 查询文本
- tkweight: 词元相似度权重
- vtweight: 向量相似度权重
- cfield: 内容字段名
- rank_feature: 排序特征
返回:
- 重排序后的结果
"""
# 提取文档内容
contents = []
for i in sres.ids:
contents.append(sres.field[i].get(cfield, ""))
# 使用模型计算相似度分数
scores = rerank_mdl.compute_score(query, contents)
# 计算基本重排序分数
sim, tksim, vtsim = self.rerank(sres, query, tkweight, vtweight, rank_feature)
# 返回模型分数与基本分数的加权和
return scores * 0.7 + sim * 0.3, tksim, vtsim
这个函数展示了 RAGFlow 如何结合外部重排序模型与内部重排序机制,进一步提高检索结果的相关性。
RAGFlow 的一个重要特性是能够在生成的回答中插入引用,追踪信息来源。这在insert_citations
方法中实现:
def insert_citations(self, answer, chunks, chunk_v, embd_mdl, tkweight=0.1, vtweight=0.9):
assertlen(chunks) == len(chunk_v)
ifnot chunks:
return answer, set([])
# 将回答分割成片段
pieces = re.split(r"(```)", answer)
# ...处理代码块和句子分割
# 计算每个片段与知识库块的相似度
ans_v, _ = embd_mdl.encode(pieces_)
# 计算混合相似度并插入引用
for i, a inenumerate(pieces_):
sim, tksim, vtsim = self.qryr.hybrid_similarity(
ans_v[i], chunk_v,
rag_tokenizer.tokenize(self.qryr.rmWWW(pieces_[i])).split(),
chunks_tks, tkweight, vtweight
)
# ...根据相似度插入引用
这段代码展示了 RAGFlow 如何为生成的回答添加引用:
RAGFlow 使用DocStoreConnection
类作为文档存储和检索的接口,支持多种检索表达式:
from rag.utils.doc_store_conn import DocStoreConnection, MatchDenseExpr, FusionExpr, OrderByExpr
主要的检索表达式包括:
MatchTextExpr
:用于全文检索MatchDenseExpr
:用于向量检索FusionExpr
:用于融合多种检索结果这些表达式被传递给dataStore.search
方法执行实际的检索操作。
检索结果通过SearchResult
数据类进行封装:
@dataclass
class SearchResult:
total: int
ids: list[str]
query_vector: list[float] | None = None
field: dict | None = None
highlight: dict | None = None
aggregation: list | dict | None = None
keywords: list[str] | None = None
group_docs: list[list] | None = None
这个数据结构包含了检索结果的各种信息,包括总数、文档 ID、查询向量、高亮显示、聚合结果等。
RAGFlow 提供了"基于模板的分块"和"减少幻觉的引用"等特性。通过源码分析,我们可以看到这些特性在检索召回机制中的具体实现:
insert_citations
方法,RAGFlow 能够为生成的回答添加引用,追踪信息来源,从而减少幻觉。通过对 RAGFlow 源码的分析,我们可以看到其检索找回机制的核心实现包括:
rerank
函数实现多维度特征融合的重排序机制,包括:RAGFlow 的检索找回机制设计全面而灵活,能够有效地从知识库中检索相关信息,并通过多维度重排序提高检索结果的相关性,通过引用机制减少幻觉,提高回答的可靠性。
53AI,企业落地大模型首选服务商
产品:场景落地咨询+大模型应用平台+行业解决方案
承诺:免费场景POC验证,效果验证后签署服务协议。零风险落地应用大模型,已交付160+中大型企业
2024-10-27
2024-09-04
2024-05-05
2024-07-18
2024-06-20
2024-06-13
2024-07-09
2024-07-09
2024-05-19
2024-07-07
2025-06-09
2025-06-06
2025-05-30
2025-05-29
2025-05-29
2025-05-23
2025-05-16
2025-05-15