微信扫码
添加专属顾问
我要投稿
Dify的RAG系统如何高效处理海量文档?揭秘其三大核心模块如何协同工作,提升大模型回答准确性。 核心内容: 1. Dify RAG系统的三模块架构设计 2. 文档处理模块的多格式支持与智能分段 3. 向量化索引与混合检索的优化机制
dify 的 RAG(检索增强生成)架构是一个完整的文档处理、索引和检索系统,旨在提高大语言模型生成内容的准确性和相关性。该架构由三个主要模块组成:文档处理模块、向量化与索引模块、检索与重排模块。
整个系统的数据流如下:
Dify RAG 系统的核心数据模型包括:
这些实体之间的关系是:一个 Dataset 包含多个 Document,一个 Document 包含多个 DocumentSegment。Dataset 通过index_struct 字段存储向量数据库的配置信息。
文档处理模块负责将各种格式的文档转换为可被索引的文本段落。该模块采用 ETL(提取-转换-加载)模式设计,具有高度的可扩展性和灵活性。
主要功能包括:
ExtractProcessor
是文档处理的入口,负责根据文档类型选择合适的提取器进行文本提取。它支持多种文档格式,包括:
提取器采用策略模式设计,可以轻松扩展以支持新的文档格式。
ExtractProcessor
类实现class ExtractProcessor:
@classmethod
def extract(cls, extract_setting: ExtractSetting, is_automatic: bool = False,
file_path: str = None) -> list[Document]:
# 根据数据源类型和文件类型选择合适的提取器
if extract_setting.datasource_type == DatasourceType.FILE.value:
# 根据文件扩展名选择不同的提取器
if file_extension == '.pdf':
extractor = PdfExtractor(file_path)
elif file_extension in ['.md', '.markdown']:
extractor = UnstructuredMarkdownExtractor(file_path) \
if is_automatic else MarkdownExtractor(file_path)
# ... 其他文件类型的处理
return extractor.extract()
TextSplitter
负责将长文本分割成适当大小的段落,是 RAG 系统性能的关键组件。它提供以下功能:
分割策略可以通过处理规则进行配置,允许用户根据不同类型的文档调整分割参数。
TextSplitter
实现类class TextSplitter(BaseDocument, ABC):
def __init__(
self,
chunk_size: int = 4000,
chunk_overlap: int = 200,
length_function: Callable[[str], int] = len,
keep_: bool = False,
add_start_index: bool = False,
) -> None:
# 初始化分割器参数
self._chunk_size = chunk_size
self._chunk_overlap = chunk_overlap
self._length_function = length_function
self._keep_separator = keep_separator
self._add_start_index = add_start_index
@abstractmethod
def split_text(self, text: str) -> list[str]:
"""Split text into multiple components."""
def create_documents(
self, texts: list[str], metadatas: Optional[list[dict]] = None
) -> list[Document]:
# 从分割后的文本创建文档对象
文档切块是RAG中的关键步骤,Dify 使用TextSplitter
类及其子类来实现不同的切块策略:
FixedRecursiveCharacterTextSplitter
:固定大小的递归字符分割EnhanceRecursiveCharacterTextSplitter
:增强的递归字符分割切块后的文档片段会保留原始文档的元数据,并添加唯一标识符(doc_id)和内容哈希值(doc_hash)。
TextCleaner
负责文本清洗,去除不必要的格式和噪声,提高索引和检索质量。清洗操作包括:
向量化与索引模块负责将文本段落转换为向量表示并存储在向量数据库中,同时创建关键词索引以支持混合搜索。该模块采用工厂模式和适配器模式设计,支持多种向量数据库和嵌入模型。
主要功能包括:
Dify使用CacheEmbedding
类来管理文本嵌入过程,它具有缓存功能,可以避免重复计算嵌入向量:
class CacheEmbedding(Embeddings):
def __init__(self, model_instance: ModelInstance, user: Optional[str] = None) -> None:
self._model_instance = model_instance
self._user = user
def embed_documents(self, texts: list[str]) -> list[list[]]:
# 使用文档嵌入缓存或存储(如果不存在)
text_embeddings = [Nonefor _ in range(len(texts))]
embedding_queue_indices = []
# 检查缓存中是否存在嵌入
for i, text in enumerate(texts):
hash = helper.generate_text_hash(text)
embedding = db.session.query(Embedding).filter_by(
model_name=self._model_instance.model,
hash=hash,
provider_name=self._model_instance.provider
).first()
if embedding:
text_embeddings[i] = embedding.get_embedding()
else:
embedding_queue_indices.append(i)
# 处理未缓存的嵌入
if embedding_queue_indices:
# 生成嵌入并缓存
Dify 支持多种向量数据库,通过工厂模式实现:
class Vector:
def __init__(self, dataset: Dataset, attributes: list = None):
if attributes isNone:
attributes = ['doc_id', 'dataset_id', 'document_id', 'doc_hash']
self._dataset = dataset
self._embeddings = self._get_embeddings()
self._attributes = attributes
self._vector_processor = self._init_vector()
def _init_vector(self) -> BaseVector:
vector_type = dify_config.VECTOR_STORE
if self._dataset.index_struct_dict:
vector_type = self._dataset.index_struct_dict['type']
vector_factory_cls = self.get_vector_factory(vector_type)
return vector_factory_cls().init_vector(self._dataset, self._attributes, self._embeddings)
@staticmethod
def get_vector_factory(vector_type: str) -> type[AbstractVectorFactory]:
match vector_type:
case VectorType.CHROMA:
core.rag.datasource.vdb.chroma.chroma_vector import ChromaVectorFactory
return ChromaVectorFactory
# ... 其他向量数据库的支持
支持的向量数据库包括:
IndexingRunner
负责协调整个索引过程,包括文档提取、转换和加载。它实现了完整的 ETL 流程:
class IndexingRunner:
def __init__(self, dataset, document_id, document_model, document_content, document_metadata, tenant_id, user_id):
# 初始化
def run(self):
# 1. 提取文本
documents = self.extract()
# 2. 转换(分割和清洗)
= self.transform(documents)
# 3. 保存段落
self.save_segments()
# 4. 加载到索引
self.load()
段落索引处理器实现了提取、转换、加载和检索方法:
class ParagraphIndexProcessor(IndexProcessorBase):
def extract(self, extract_setting: ExtractSetting) -> list[Document]:
# 提取文档
return ExtractProcessor.extract(extract_setting)
def transform(self, documents: list[Document]) -> list[Document]:
# 清洗和分割文本
cleaned_documents = self.clean(documents)
return self.split(cleaned_documents)
def load(self, dataset_id: str, : list[Document]) -> None:
# 创建向量索引和关键词索引
self.create_vector_index(dataset_id, segments)
self.create_keyword_index(dataset_id, segments)
def retrieve(self, dataset_id: str, query: str, **kwargs) -> list[Document]:
# 调用检索服务
returnService.retrieve(
retrival_method=kwargs.get('_method', 'semantic_search'),
dataset_id=dataset_id,
query=query,
top_k=kwargs.get('top_k', 2),
score_threshold=kwargs.get('score_threshold', 0.0),
reranking_model=kwargs.get('reranking_model'),
reranking_mode=kwargs.get('reranking_mode', 'reranking_model'),
weights=kwargs.get('weights')
)
Jieba
组件负责从文本中提取关键词并建立索引,支持关键词搜索和混合搜索。它提供:
检索与重排模块负责根据用户查询找到最相关的文本段落,并对结果进行优化排序。该模块支持多种检索方法和重排策略,可以根据不同场景进行配置。
主要功能包括:
RetrievalMethod
是一个枚举类,定义了系统支持的检索方法:
class Method(Enum):
SEMANTIC_SEARCH = 'semantic_search'
FULL_TEXT_SEARCH = 'full_text_search'
HYBRID_SEARCH = 'hybrid_search'
@staticmethod
def is_support_semantic_search(retrieval_method: str) -> bool:
return retrieval_method in {Method.SEMANTIC_SEARCH.value, RetrievalMethod.HYBRID_SEARCH.value}
@staticmethod
def is_support_fulltext_search(_method: str) -> bool:
return retrieval_method in {RetrievalMethod.FULL_TEXT_SEARCH.value, RetrievalMethod.HYBRID_SEARCH.value}
Service
是检索功能的核心实现,负责根据配置的检索方法执行搜索并返回结果。它提供以下功能:
主要方法包括:
retrieve
:主检索方法,根据检索方法执行搜索keyword_search
:关键词搜索实现embedding_search
:向量相似度搜索实现full_text_index_search
:全文索引搜索实现检索服务协调不同的检索方法并处理结果:
class Service:
@classmethod
def retrieve(cls, retrival_method: str, dataset_id: str, query: str,
top_k: int, score_threshold: Optional[] = .0,
reranking_model: Optional[dict] = None, reranking_mode: Optional[str] = 'reranking_model',
weights: Optional[dict] = None):
# 获取数据集
dataset = db.session.query(Dataset).filter(Dataset.id == dataset_id).first()
ifnot dataset or dataset._document_count == 0or dataset._segment_count == 0:
return []
all_documents = []
threads = []
exceptions = []
# 关键词搜索
if retrival_method == 'keyword_search':
keyword_thread = threading.Thread(target=Service.keyword_search, kwargs={...})
threads.append(keyword_thread)
keyword_thread.start()
# 语义搜索
ifMethod.is_support_semantic_search(retrival_method):
embedding_thread = threading.Thread(target=Service.embedding_search, kwargs={...})
threads.append(embedding_thread)
embedding_thread.start()
# 全文搜索
ifMethod.is_support_fulltext_search(retrival_method):
full_text_index_thread = threading.Thread(target=RetrievalService.full_text_index_search, kwargs={...})
threads.append(full_text_index_thread)
full_text_index_thread.start()
# 等待所有线程完成
for thread in threads:
thread.join()
# 混合搜索需要后处理
if retrival_method == Method.HYBRID_SEARCH.value:
data_post_processor = DataPostProcessor(str(dataset.tenant_id), reranking_mode,
reranking_model, weights, False)
all_documents = data_post_processor.invoke(
query=query,
documents=all_documents,
score_threshold=score_threshold,
top_n=top_k
)
return all_documents
WeightRerankRunner
实现了基于权重的重排序策略,可以根据不同因素对检索结果进行优化排序。它考虑以下因素:
通过调整不同因素的权重,可以优化检索结果的相关性和质量。
权重重排结合了关键词得分和向量相似度得分:
class WeightRerankRunner:
def __init__(self, tenant_id: str, weights: Weights) -> None:
self.tenant_id = tenant_id
self.weights = weights
def run(self, query: str, documents: list[Document], score_threshold: Optional[float] = None,
top_n: Optional[int] = None, user: Optional[str] = None) -> list[Document]:
# 去重
docs = []
doc_id = []
unique_documents = []
for document in documents:
if document.metadata['doc_id'] notin doc_id:
doc_id.append(document.metadata['doc_id'])
docs.append(document.page_content)
unique_documents.append(document)
documents = unique_documents
# 计算关键词得分和向量得分
rerank_documents = []
query_scores = self._calculate_keyword_score(query, documents)
query_vector_scores = self._calculate_cosine(self.tenant_id, query, documents, self.weights.vector_setting)
# 合并得分
for document, query_score, query_vector_score in zip(documents, query_scores, query_vector_scores):
score = self.weights.vector_setting.vector_weight * query_vector_score + \
self.weights.keyword_setting.keyword_weight * query_score
if score_threshold and score < score_threshold:
continue
document.metadata['score'] = score
rerank_documents.append(document)
# 排序并返回结果
rerank_documents = sorted(rerank_documents, key=lambda x: x.metadata['score'], reverse=True)
return rerank_documents[:top_n] if top_n else rerank_documents
DataPostProcessor
负责对检索结果进行后处理,包括重排序、去重和格式化。它支持多种重排序策略,可以根据不同场景进行配置。
数据后处理器负责重排序和结果处理:
class DataPostProcessor:
def __init__(self, tenant_id: str, reranking_mode: str, reranking_model: Optional[dict],
weights: Optional[dict], enable_reranking: bool = True):
# 初始化
def invoke(self, query: str, documents: list[Document], score_threshold: Optional[float] = None,
top_n: Optional[int] = None) -> list[Document]:
# 根据模式选择重排序方法
if self.enable_reranking and self.reranking_mode == RerankMode.RERANKING_MODEL.value:
# 使用模型重排序
return self.rerank_model_runner.run(query, documents, score_threshold, top_n)
elif self.enable_reranking and self.reranking_mode == RerankMode.WEIGHT.value:
# 使用权重重排序
return self.weight_rerank_runner.run(query, documents, score_threshold, top_n)
else:
# 不重排序,直接返回
return documents[:top_n] if top_n else documents
Dify 的 RAG 系统是一个功能完整、灵活可扩展的检索增强生成框架,具有以下特点:
多格式文档支持:支持 PDF、Word、Excel、Markdown、HTML、CSV 等多种文档格式。
灵活的文本分块:支持多种分块策略,包括固定大小、基于分隔符和基于语义的分块。
高效的向量化:实现了嵌入缓存机制,提高了向量化效率。
多样的存储选项:支持 Chroma、Milvus、PGVector、Qdrant 等多种向量数据库。
多种检索方法:支持语义搜索、全文搜索、关键词搜索和混合搜索。
高级重排序:支持基于模型的重排序和基于权重的重排序。
并行处理:使用多线程并行执行不同的检索方法,提高效率。
可扩展架构:采用工厂模式和抽象基类,便于扩展新的功能。
通过这些组件的协同工作,Dify 的 RAG 系统能够有效地处理和检索大量文档,为大语言模型提供准确的上下文信息,从而生成更加准确和相关的回答。
53AI,企业落地大模型首选服务商
产品:场景落地咨询+大模型应用平台+行业解决方案
承诺:免费场景POC验证,效果验证后签署服务协议。零风险落地应用大模型,已交付160+中大型企业
2025-04-19
2025-04-16
2025-05-08
2025-04-23
2025-06-06
2025-05-30
2025-05-19
2025-06-05
2025-06-05
2025-04-28
2025-07-09
2025-07-04
2025-07-01
2025-07-01
2025-07-01
2025-07-01
2025-06-30
2025-06-29