免费POC,零成本试错

AI知识库

53AI知识库

学习大模型的前沿技术与行业应用场景


【万字长文】Dify 知识库全链路图解:7 个关键节点,彻底拆解 RAG 黑盒

发布日期:2025-08-19 19:57:31 浏览次数: 1518
作者:5ycode

微信搜一搜,关注“5ycode”

推荐语

深入解析Dify知识库的7个关键节点,带你彻底拆解RAG黑盒的实现原理。

核心内容:
1. 知识库创建与文档上传的完整流程
2. 文档解析规则设置与处理机制
3. 后端API调用与索引任务的触发过程

杨芳贤
53AI创始人/腾讯云(TVP)最具价值专家

 

在上一篇刨开源码看门道:dify 数据集的那些事(一),我们了解了dify的数据集架构,在这篇文章中,我们了解下dify内部知识库的整个流转实现。通过该篇,我们能详细的了解到dify数据是如何解析的。


整个流程如下:

  • • 知识库(数据集)的创建
  • • 知识库的设置
  • • 文档的上传
  • • 文档解析规则的设置
  • • 文档解析

知识库的解析流程

知识库创建


知识库的创建,调用的是console/api/datasets接口

创建完知识库以后,有三个选项
  • • 文档:列出所有文档,或上传文档
  • • 召回测试:检索
  • • 设置:设置知识库的基本信息

文档上传


选择文件,以后,已经通过console/api/files/upload进行了上传。

上传以后,在设置的docker存储目录指定的租户下面。

分片设置


上传完以后,点击下一步进入分片设置页面。在这里
  • • 通过接口console/api/workspaces/current/default-model?model_type=text-embedding 不同的model_type 加载了默认embedding模型和rerank 模型
  • • 通过接口console/api/datasets/process-rule加载到了默认处理规则
  • • 同时也会把所有的embedding模型和rerank 模型加载到用于选择

保存并处理

点击保存并处理的时候,调用的是/console/api/datasets/5acd7076-8fb4-46ba-833d-89b0196ef918/documents接口,

在这里把数据集和文档都传递到了后端。

最终定位到DatasetDocumentListApi的post方法中。具体代码位置如下:
controllers/console/datasets/datasets_document.py  中的DatasetDocumentListApi.post

services/dataset_service.py 中的DocumentService.save_document_with_dataset_id

代码流程如下:

上传文件

Notion导入

网站爬取

DatasetDocumentListApi.post
验证用户权限
解析请求参数为KnowledgeConfig
调用DocumentService.save_document_with_dataset_id
检查文档上传配额
是否更新现有文档?
更新文档并触发索引任务
创建新文档处理
检查批量上传限制
保存处理规则
处理不同类型数据源
检查文件是否存在
验证Notion连接
准备爬取任务
创建文档记录
触发异步索引任务
返回文档和批次信息
  • • 先对当前用户是否具有资源权限进行判断
  • • 计费体系开启,校验配额
  • • 创建或更新文档参数,并触发索引任务

关于配额

  • • 配额是按租户获取的,一些参数是从配置文件中获取,一些是从配置文件中BILLING_API_URL中配置的系统中获取的,在这里只限制在企业版中实现
  • • 当计费体系开启的时候,会计算上传的额度(数量)已经上传的数量和当前上传的数量做对比

索引方式

  • • 经济索引是每个数据库使用10个关键词进行检索
  • • 高质量索引,使用向量模型+关键词

    在代码里,对高质量索引进行了必要参数的补充。

更新或保持处理规则

这里有近两百行代码来处理不同类型数据源,确保文件和连接的可用。

在这里如果是upload_file,如果开启了重复文件校验,会拿到当前知识库中第一个文件,进行特殊处理。

异步任务推送

当所有的规则处理完成以后,会把任务通过document_indexing_task.delay(dataset.id, document_ids)推到队列中。

异步解析


在tasks目录下,有一个document_indexing_task.py进行异步解析。整体流程梳理如下:

开始
获取数据集信息
数据集存在?
检查资源限制
记录日志并退出
通过限制检查?
更新文档状态为parsing
标记文档为错误状态
创建IndexingRunner
执行索引处理
处理成功?
记录处理时间
捕获异常并记录
记录错误原因
结束
  • • 首先进行文件是否存在的校验(异步任务,可能中断,也可能用户取消)
  • • 开启计费模式以后,对额度进行再次校验(可能并发的问题)
  • • 更新状态为解析中
  • • 核心逻辑在IndexingRunner.run

解析

indexing_runner.py

这里的流程也很清晰,整个用了模板方法+工厂+策略模式实现
  1. 1. 首先对数据集进行校验,不存在,直接终止
  2. 2. 获取处理规则,没有处理规则也直接终止
  3. 3. 根据索引类型IndexProcessorFactory,获取对应的索引处理器: ParagraphIndexProcessor(通用)QAIndexProcessor(问答)ParentChildIndexProcessor(父子分段)
  4. 4. 获取文档内容_extract,解决了不同文档的差异性
  • • 支持三种数据源类型
  • • 统一返回Document对象列表
  • • 自动更新处理进度
  • 5. 转换阶段 _transform
    • • 执行文本清洗(根据处理规则)
    • • 分块处理(自定义或自动分块)
    • • 语言处理(国际化支持)
    • • 向量化准备(高质量索引模式)
  • 6. 分段处理 _load_segments
    • • 使用DatasetDocumentStore管理分段
    • • 支持父子文档结构
    • • 原子化状态更新
  • 7. 索引构建 _load
    • • 高质量模式:使用嵌入模型并行处理
    • • 经济模式:构建关键词倒排索引
    • • 状态一致性管理

    重点看下5、6、7 这三个阶段。

    转换阶段 _transform

    转换阶段的核心对应的是ParagraphIndexProcessor(通用)QAIndexProcessor(问答)ParentChildIndexProcessor(父子分段) 这三个处理器的transform 方法。

    重点分析下普通分段和父子分段。

    ParagraphIndexProcessor

    • • 在通用分段中,如果不勾选使用Q&A分段走的是这里

    整体逻辑如下:

    自动

    自定义

    有效

    无效

    所有文档处理完成

    开始
    验证处理规则
    规则模式
    加载自动规则
    加载自定义规则
    初始化分块器
    遍历文档
    文本清洗
    执行分块
    分块后处理
    生成文档ID和哈希
    去除引导符号
    有效内容检查
    添加到结果集
    返回结果
    • • 在验证处理规则这里,其实是一个兜底策略,正常情况下,如果没有设置,直接使用默认的分段策略
    splitter = self._get_splitter(  
        processing_rule_mode=process_rule.get("mode"),  
        max_tokens=rules.segmentation.max_tokens,  
        chunk_overlap=rules.segmentation.chunk_overlap,  
        separator=rules.segmentation.separator,  
        embedding_model_instance=kwargs.get("embedding_model_instance"),  
    )
    • • 根据规则参数创建分块器
    • • 在_get_splitter内支持两种分块器类型:
      • • 固定分块器(FixedRecursiveCharacterTextSplitter)
      • • 增强型分块器(EnhanceRecursiveCharacterTextSplitter)

    创建好分块器以后,接下来就是文档处理了

    for document in documents:
        # 清洗
        document_text = CleanProcessor.clean(document.page_content, kwargs.get("process_rule", {}))
        document.page_content = document_text
        # 分块
        document_nodes = splitter.split_documents([document])
        # 分块后处理
        for document_node in document_nodes:
            if document_node.page_content.strip():
                # 生成ID和哈希
                doc_id = str(uuid.uuid4())
                hash = helper.generate_text_hash(document_node.page_content) 
                # 去符号处理
                page_content = remove_leading_symbols(document_node.page_content).strip()
                iflen(page_content) > 0:
                    document_node.page_content = page_content
                    all_documents.append(document_node)
    • • 遍历所有的文档,处理步骤如下:
      • • 文本清洗(根据页面上的文本预处理规则进行清洗)
      • • 按规则分块
      • • 为每个分块生成唯一标识
      • • 内容标准化处理,把一些标题进行移除
    # TextSplitter.split_documents内部实现的典型处理:
    defsplit_documents(self, documents: Iterable[Document]) -> list[Document]:  
        """Split documents."""
        texts, metadatas = [], []  
        for doc in documents:  
            texts.append(doc.page_content)  
            metadatas.append(doc.metadata or {})  
        returnself.create_documents(texts, metadatas=metadatas)

    defcreate_documents(self, texts: list[str], metadatas: Optional[list[dict]] = None) -> list[Document]:  
        """Create documents from a list of texts."""
        _metadatas = metadatas or [{}] * len(texts)  
        documents = []  
        for i, text inenumerate(texts):  
            index = -1
            for chunk inself.split_text(text):  
                metadata = copy.deepcopy(_metadatas[i])  
                ifself._add_start_index:  
                    index = text.find(chunk, index + 1)  
                    metadata["start_index"] = index  
                new_doc = Document(page_content=chunk, metadata=metadata)  
                documents.append(new_doc)  
        return documents


    # FixedRecursiveCharacterTextSplitter.split_text

    defsplit_text(self, text: str) -> list[str]:  
        """Split incoming text and return chunks."""
        ifself._fixed_separator:  
            chunks = text.split(self._fixed_separator)  
        else:  
            chunks = [text]  

        final_chunks = []  
        chunks_lengths = self._length_function(chunks)  
        for chunk, chunk_length inzip(chunks, chunks_lengths):  
            if chunk_length > self._chunk_size:  
                final_chunks.extend(self.recursive_split_text(chunk))  
            else:  
                final_chunks.append(chunk)  

        return final_chunks

    这里的逻辑稍微有点绕,通过继承+模板抽象方法。通过子类实现对应的文本切分。主要在_get_splitter的时候创建的分块器

    ParentChildIndexProcessor


    在父子分段中,支持两种父文档模式:段落模式(PARAGRAPH)和全文模式(FULL_DOC),主要处理流程分支:
    • • PARAGRAPH模式:先分块父文档再生成子文档
    • • FULL_DOC模式:直接处理全文生成子文档

    整体流程如下:

    PARAGRAPH

    FULL_DOC

    所有文档处理完成

    开始
    验证处理规则
    父文档模式
    遍历原始文档
    合并全文内容
    文本清洗
    父文档分块
    生成子文档
    构建层次化文档
    添加到结果集
    生成子文档
    构建层次化文档
    添加到结果集
    返回结果

    相比于通用分段,多了一层,整体逻辑上差不多,只不过父子分段先用父分段规则,分出来的父分段,又用了子分段规则。

    分段处理 _load_segments

    继续回到index_runner.py中的_load_segments方法中,这块的逻辑比较简单。 将处理后的文档分段通过dataset_docstore.py的 add_documents保存到数据库,并且更新文档和分段的状态为indexing

    数据库的操作细节如下:存储到pg里的

    # DocumentStore.add_documents内部逻辑:
    defadd_documents(self, docs, save_child=False):
        segments = []
        for doc in docs:
            # 创建父分段
            segment = DocumentSegment(
                dataset_id=self.dataset.id,
                document_id=self.document_id,
                content=doc.page_content,
                index_node_id=doc.metadata["doc_id"],
                index_node_hash=doc.metadata["doc_hash"]
            )
            segments.append(segment)
            
            if save_child andhasattr(doc, 'children'):
                for child in doc.children:
                    # 创建子文档记录
                    child_chunk = ChildChunk(
                        segment_id=segment.id,
                        content=child.page_content,
                        index_node_id=child.metadata["doc_id"]
                    )
                    db.session.add(child_chunk)
        
        db.session.bulk_save_objects(segments)

    如果文档多了,这块是一个性能瓶颈。

    索引构建 _load

    分段完成,并保存完成以后,就开始做向量化了。在这里

    • • 根据索引技术类型(高质量/经济型)选择不同处理方式
    • • 更新文档和分段的状态为"completed"

    整体逻辑如下:

    高质量

    经济型

    开始
    索引技术类型
    初始化嵌入模型
    启动关键词索引线程
    初始化线程池
    文档分块分组
    提交并行任务
    等待任务完成
    等待线程完成
    统计总tokens
    更新文档状态
    结束

    高质量的时候,必须使用向量模型。高质量处理的关键逻辑

    with ThreadPoolExecutor(max_workers=10as executor:
        futures = []
        # 文档分块逻辑(避免哈希冲突)
        for chunk in document_groups:
            futures.append(executor.submit(
                self._process_chunk,
                flask_app,
                index_processor,
                chunk,
                dataset,
                dataset_document,
                embedding_model_instance
            ))
        tokens = sum(future.result() for future in futures)

    # _process_chunk 的核心逻辑如下:
    # 先检测任务是否暂停
    self._check_document_paused_status(dataset_document.id)
    # Token计算
    tokens = embedding_model_instance.get_text_embedding_num_tokens(texts)

    # 索引构建
    index_processor.load(dataset, chunk_documents, with_keywords=False)

    # 状态更新
    db.session.query(DocumentSegment).filter(
        DocumentSegment.index_node_id.in_(document_ids)
    ).update({
        "status""completed",
        "completed_at": datetime.now()
    })

    将分段chunk,通过线程池,批量的向量化。在索引构建的时候,又跑到了index_processor这里,也就是对应的ParagraphIndexProcessor(通用)QAIndexProcessor(问答)ParentChildIndexProcessor(父子分段)

    看到这里,我就有个疑问,在父子分段中,高质量索引的时候,关键词处理直接为false,在paragraph_index_processor.py中,只是进行了向量化存储,没有提取关键词。在parent_child_index_processor.py只是对子分段进行了向量化并存储,这个可以理解

    向量完以后,更新为完成。

    后记

    • • 通过深入dify的解析过程,了解了整体的分段过程
    • • 检索效果的提升,还需要对检索这块进行代码分析,方便后续优化dify的rag效果

    系列文章

    uv配置环境

    coze相关

    AI开发新选择:扣子平台功能详解与智能体拆解
    AI开发新选择:扣子平台工作流基础节点介绍
    AI Agent 新选择:Coze Studio 开源上手实录,能替代 Dify 吗?
    Dify 之外的新尝试:Coze Studio 知识库实战指南:部署、解析、接入全流程
    Coze Studio 又升了!本地 Ollama 向量 + 插件外挂知识库,RAG 体验翻倍提升

    dify应用

    DeepSeek+dify 本地知识库:真的太香了
    Deepseek+Dify本地知识库相关问题汇总
    dify的sandbox机制,安全隔离限制
    DeepSeek+dify 本地知识库:高级应用Agent+工作流
    DeepSeek+dify知识库,查询数据库的两种方式(api+直连)
    DeepSeek+dify 工作流应用,自然语言查询数据库信息并展示
    聊聊dify权限验证的三种方案及实现
    dify1.0.0版本升级及新功能预览
    Dify 1.1.0史诗级更新!新增"灵魂功能"元数据,实测竟藏致命Bug?手把手教你避坑
    【避坑血泪史】80次调试!我用Dify爬虫搭建个人知识库全记录
    手撕Dify1.x插件报错!从配置到网络到Pip镜像,一条龙排雷实录
    dify1.2.0升级,全新循环节点优化,长文写作案例
    dify1.x无网环境安装插件
    dify应用:另类的关键词检索
    Dify 1.5.0 上线:这次调试功能,真的省了我一半时间
    Dify × MCP 实战(一):用插件一分钟搞定MCP Server(含时间踩坑实践)
    Dify × MCP 实战(二):发布工作流为 AI 工具服务,全流程配置 + Cherry 调用实战
    Dify × MCP 实战(三):结果别再堆字了!用 AntV 插件打造图表可视化工具
    Dify 1.6.0 重磅上线:原生MCP 双向集成、结构化输出升级
    Dify结构化输出全攻略:从中医案例说起,到真实落地分析
    懒人必备!用 Dify 自动监控 GitHub 项目更新,并推送到钉钉/飞书

    dify源码

    dify项目结构说明与win11本地部署
    Dify 深度拆解(二):后端架构设计与启动流程全景图
    10分钟搞定企业级登录!Dify无缝集成LDAP实战指南
    一文吃透Dify账户系统:多租户 + 多登录方式 + 权限模型全揭底
    Dify插件实战
    刨开源码看门道:Dify 数据集的那些事(一)

    ragflow相关

    DeepSeek+ragflow构建企业知识库:突然觉的dify不香了(1)
    DeepSeek+ragflow构建企业知识库之工作流,突然觉的dify又香了
    DeepSeek+ragflow构建企业知识库:高级应用篇,越折腾越觉得ragflow好玩
    RAGFlow爬虫组件使用及ragflow vs dify 组件设计对比
    从8550秒到608秒!RAGFlow最新版本让知识图谱生成效率狂飙,终于不用通宵等结果了
    以为发现的ragflow的宝藏接口,其实是一个天坑、Chrome/Selenium版本地狱
    NLTK三重降噪内幕!RAGFlow检索强悍竟是靠这三板斧
    从代码逆向RAGFlow架构:藏在18张表里的AI知识库设计哲学
    解剖RAGFlow!全网最硬核源码架构解析
    深度拆解RAGFlow分片引擎!3大阶段+视觉增强,全网最硬核架构解析
    深度拆解RAGFlow分片引擎之切片实现
    RAGFlow核心引擎DeepDoc之PDF解析大起底:黑客级PDF解析术与致命漏洞
    RAGFlow 0.18.0 实战解读:从 MCP 支持到插件配置的全流程揭秘
    ragflow 0.19.0 图文混排功能支持

    mcp

    上线3周:告警减少70%!AI巡检分级报告实战(一)
    MCP不像想象的那么简单,MCP+数据库,rag之外的另一种解决方案
    上线3周:告警减少85%!纯AI驱动巡检通知实战(二)无硬编码方案曝光

     

    53AI,企业落地大模型首选服务商

    产品:场景落地咨询+大模型应用平台+行业解决方案

    承诺:免费POC验证,效果达标后再合作。零风险落地应用大模型,已交付160+中大型企业

    联系我们

    售前咨询
    186 6662 7370
    预约演示
    185 8882 0121

    微信扫码

    添加专属顾问

    回到顶部

    加载中...

    扫码咨询