支持私有化部署
AI知识库

53AI知识库

学习大模型的前沿技术与行业应用场景


解剖RAGFlow!全网最硬核源码架构解析

发布日期:2025-04-09 08:22:13 浏览次数: 2285 作者:5ycode
推荐语

深入剖析RAGFlow源码架构,揭开硬核技术细节。

核心内容:
1. RAGFlow项目结构概览及其主要目录功能
2. 关键目录如api、conf、deepdoc等详细解析
3. RAGFlow API服务启动流程及核心代码解读

杨芳贤
53A创始人/腾讯云(TVP)最具价值专家

 

后续我会对ragflow的源码进行深入的拆解,深入源码拆解之前我们先对ragflow的项目结构有一个大概的了解。

ragflow目录解说

E:\ai\code\ragflow/
├── agent/  
├── agentic_reasoning/ 
├── api/ 
├── conf/ 
├── deepdoc/ 
├── docker/ 
├── docs/  
├── example/
├── graphrag/ 
├── helm/ 
├── intergrations/
├── nltk_data
├── rag/ 
├── sdk/
└── web/
└── uv.lock
  • • agent 目录主要是 智能体相关的实现
  • • agentic_reasoning 目录是0.17.2更新的推理能力的实现
  • • api 目录主要是后端api
  • • conf 是源码启动的时候的配置文件
  • • deepdoc目录是 deepdoc的实现,解析文档(偷偷的告诉你们后续会用视觉模型处理文档)
  • • docker目录 快速验证部署的docker配置
  • • docs 目录是 ragflow的文档库
  • • example 目录主要是接口和sdk操作dataset的示例
  • • graphrag 目录是 知识图谱实现
  • • helm k8s配置
  • • intergrations 目录中ragflow 与 ChatGPT-on-WeChat 对接
  • • nltk_data 目录为下载的nltk数据,docker构建的时候创建
  • • rag 目录是rag实现
  • • sdk 目录是ragflow封装的python sdk
  • • web 主要是ragflow的前端项目
  • • uv.lockpython项目的依赖组件

接下来我详细介绍下使用的几个目录。

api 相关介绍

api可能是二开最常用的模块了。
  • • apps目录主要是 ragflow对外暴露的接口
    • • sdk目录是对外提供的api,主要是操作知识库和一些扩展
    • • *_app.py的文件都是对ragflow的web提供的接口
  • • db目录是操作数据库的封装
  • • utils是封装的一些工具

启动文件与apps包

ragflow_server.py是ragflow api服务的启动入口文件。核心代码如下:

settings.init_settings()
# init db  
init_web_db()  
init_web_data()
# start http server  
try:  
    logging.info("RAGFlow HTTP server start...")  
    run_simple(  
        hostname=settings.HOST_IP,  
        port=settings.HOST_PORT,  
        application=app,  
        threaded=True,  
        use_reloader=RuntimeConfig.DEBUG,  
        use_debugger=RuntimeConfig.DEBUG,  
    )  
except Exception:
  • • 基于Werkzeug:使用run_simple启动Flask应用(app对象来自api.apps模块)
    • • 支持多线程模式(threaded=True
    • • 开发模式下支持热重载(use_reloader)和调试器(use_debugger
  • • 信号处理:捕获SIGINTSIGTERM信号实现优雅关闭

settings.py中做了很多初始化的工作

def init_settings():
    global DOC_ENGINE, docStoreConn, retrievaler, kg_retrievaler  
    #文档引擎默认使用elasticsearch
    DOC_ENGINE = os.environ.get('DOC_ENGINE'"elasticsearch")  
    lower_case_doc_engine = DOC_ENGINE.lower()  
    if lower_case_doc_engine == "elasticsearch":  
        docStoreConn = rag.utils.es_conn.ESConnection()  
    elif lower_case_doc_engine == "infinity":  
        docStoreConn = rag.utils.infinity_conn.InfinityConnection()  
    else:  
        raise Exception(f"Not supported doc engine: {DOC_ENGINE}")  
      
    retrievaler = search.Dealer(docStoreConn)  
    kg_retrievaler = kg_search.KGSearch(docStoreConn)
  • • 从代码可以看出文档引擎默认使用elasticsearch
  • • kg_search 为知识图谱的检索

我们接着看下app目录下的__init__.py文件

def search_pages_path(pages_dir):  
    app_path_list = [  
        path for path in pages_dir.glob("*_app.py"ifnot path.name.startswith(".")  
    ]  
    api_path_list = [  
        path for path in pages_dir.glob("*sdk/*.py"ifnot path.name.startswith(".")  
    ]  
    app_path_list.extend(api_path_list)  
    return app_path_list  


defregister_page(page_path):  
    path = f"{page_path}"

    page_name = page_path.stem.rstrip("_app")  
    module_name = ".".join(  
        page_path.parts[page_path.parts.index("api"): -1] + (page_name,)  
    )  

    spec = spec_from_file_location(module_name, page_path)  
    page = module_from_spec(spec)  
    page.app = app  
    page.manager = Blueprint(page_name, module_name)  
    sys.modules[module_name] = page  
    spec.loader.exec_module(page)  
    page_name = getattr(page, "page_name", page_name)  
    sdk_path = "\\sdk\\"if sys.platform.startswith("win"else"/sdk/"
    url_prefix = (  
        f"/api/{API_VERSION}"if sdk_path in path elsef"/{API_VERSION}/{page_name}"
    )  

    app.register_blueprint(page.manager, url_prefix=url_prefix)  
    return url_prefix  


pages_dir = [  
    Path(__file__).parent,  
    Path(__file__).parent.parent / "api" / "apps",  
    Path(__file__).parent.parent / "api" / "apps" / "sdk",  
]  

client_urls_prefix = [  
    register_page(path) fordirin pages_dir for path in search_pages_path(dir)

这里主要做的是通过扫描 api/apps/ 和 api/apps/sdk/ 包下的 *_app.py 和sdk包下文件,通过register_page()动态注册到_blueprint。

  • • 路由规则
    • • 普通路由:/{API_VERSION}/{page_name}
      • • 这里的page_name是document_app.py 移除_app.py后缀后的名称
    • • SDK 路由:/api/{API_VERSION}/...
比如ragflow中的上传文件接口http://localhost:8002/v1/document/upload ,拆解下,就是去document+_app.py 对应的文件,也就是document_app.py文件中找upload,这样我们从前端找接口就好找了。

ragflow的接口的代码逻辑基本上在*_app.py中。

常量文件


在常量文件中:
  • • API_VERSION就是上面介绍的api的版本
  • • SERVICE_CONF是加载配置文件
  • • REQUEST_WAIT_SECREQUEST_MAX_WAIT_SEC两个参数没啥用
  • • DATASET_NAME_LIMIT 限制数据集的名称大小

db包

核心文件是db_models.py和service目录中的common_service.py

  • • db_models.py封装了表模型,
  • • common_service.py 通过Peewee ORM封装了常用数据库的CURD
基础CRUD基础CRUD基础CRUD基础CRUD高级功能高级功能CommonService查询模块写入模块更新模块删除模块批量操作条件过滤queryget/get_allinsert/saveupdate_by_iddelete_by_idinsert_manyupdate_many_by_idfilter_scope_listfilter_delete

这样在其他的service中能CommonService后,就具备了所有的基础功能

class DocumentService(CommonService):  
    model = Document  

    @classmethod  
    @DB.connection_context()  
    defget_list(cls, kb_id, page_number, items_per_page,  
                 orderby, desc, keywords, id, name
):  
        docs = cls.model.select().where(cls.model.kb_id == kb_id)  
        ifid:  
            docs = docs.where(  
                cls.model.id == id)  
        if name:  
            docs = docs.where(  
                cls.model.name == name  
            )  
        if keywords:  
            docs = docs.where(  
                fn.LOWER(cls.model.name).contains(keywords.lower())  
            )  
        if desc:  
            docs = docs.order_by(cls.model.getter_by(orderby).desc())  
        else:  
            docs = docs.order_by(cls.model.getter_by(orderby).asc())  

        count = docs.count()  
        docs = docs.paginate(page_number, items_per_page)  
        returnlist(docs.dicts()), count

比如DocumentService通过model=Document 指定操作哪个数据模型(也就是哪张表)能快速实现增删改查。通过自定义扩展其他方法达到自己想要的效果。

utils包

封装的一些工具,就是字面意思。

conf目录

在conf中,我们重点关注service_conf.yaml,基本上ragflow支持什么,在这个文件中都能体现出来。

大家根据字面意思去添加配置即可。这个文件知识源码启动的时候使用,大家如果使用docker修改docker目录中的文件。

graphrag

这个是知识图谱的视线。关注几个文件。

  • • general/index.py 抽取入口
  • • entity_resolution.py
  • • search.py

index.py

该方法主要是暴露给task_executor用于抽取知识图谱。主要流程如下

CommunityReportsExtractorEntityResolutionElasticSearchNetworkXExtractorGraphRAGClientCommunityReportsExtractorEntityResolutionElasticSearchNetworkXExtractorGraphRAGClientrun_graphrag(doc_data)生成子图(实体&关系)NetworkX子图合并到全局图消歧新增实体生成社区报告存储结果完成通知
async def run_graphrag(row: dict, ...):
    # 生成子图后存储到ES
    subgraph = await generate_subgraph(...)
    # 合并到全局图谱
    new_graph = await merge_subgraph(...)
    # 实体消歧和社区发现后,最终存储社区报告到ES
    await extract_community(...)

generate_subgraph代码中的核心片段

subgraph.graph["source_id"] = [doc_id]
chunk = {
    "content_with_weight": json.dumps(nx.node_link_data(subgraph)),  # 将NetworkX图序列化为JSON
    "knowledge_graph_kwd""subgraph",  # ES索引标识
    "kb_id": kb_id,
    "source_id": [doc_id],
    "available_int"0,
    "removed_kwd""N",
}
# 写入ES的核心操作
await trio.to_thread.run_sync(
    lambda: settings.docStoreConn.insert(
        [{"id": chunk_id(chunk), **chunk}], 
        search.index_name(tenant_id), 
        kb_id
    )
)
  • • knowledge_graph_kwd="subgraph": 标识该数据为子图类型
  • • docStoreConn: RAGFlow封装的文档客户端,这个在api模块中说了
  • • search.index_name(tenant_id): 动态生成索引名(格式如ragflow_{tenant_id}

entity_resolution.py

解决知识图谱中 同一实体多表述 的问题(如"苹果公司" vs "Apple Inc."),确保图谱中每个真实世界实体仅对应唯一节点。

GraphLLMERGraphLLMERalt[存在候选节点][无候选节点]loop[每个待消歧节点]获取新增子图节点(subgraph_nodes)查找相似候选节点(相似度>阈值)发送实体对比请求返回是否同一实体的判断合并节点或保留新节点保留为新节点更新全局图谱

该模块是构建高质量知识图谱的核心组件,通过语义理解与图结构分析的结合,显著提升图谱的准确性和一致性。

search.py

提供给内部的直接查询知识图谱的方法,核心概流程如下

ElasticSearchLLMKGSearchUserElasticSearchLLMKGSearchUserretrieval(question, kb_ids)query_rewrite(question)实体类型&关键词并行发起三类检索原始结果结果融合与评分结构化检索结果

web工程

  • • 通过package.json可以快速了解前端使用了哪些组件,以及常用的脚本
  • • .env定义了前端端口
  • • .umirc.ts文件主要是源码启动,调用后端的路由配置
  • • public中的logo.svg 可以修改项目的logo
  • • conf.json配置前端的中文名称
  • • routes.ts静态路由配置,所有的页面都在pages中,大家可以从路由中快速定位。

前端使用的是react写的,真不想再折腾了,学的太多,太杂了。能改改就行了。

后记

  • • 正常二开,建议只动api工程,然后前端自己套一层,权限做一个前置应用去处理。维护人员是少数,用的最多的还是流程的使用人员
  • • rag这个目录,我得好好的整理下,这个是它的和核心。

 

53AI,企业落地大模型首选服务商

产品:场景落地咨询+大模型应用平台+行业解决方案

承诺:免费场景POC验证,效果验证后签署服务协议。零风险落地应用大模型,已交付160+中大型企业

联系我们

售前咨询
186 6662 7370
预约演示
185 8882 0121

微信扫码

添加专属顾问

回到顶部

加载中...

扫码咨询