微信扫码
添加专属顾问
我要投稿
深入剖析RAGFlow源码架构,揭开硬核技术细节。 核心内容: 1. RAGFlow项目结构概览及其主要目录功能 2. 关键目录如api、conf、deepdoc等详细解析 3. RAGFlow API服务启动流程及核心代码解读
后续我会对ragflow的源码进行深入的拆解,深入源码拆解之前我们先对ragflow的项目结构有一个大概的了解。
E:\ai\code\ragflow/
├── agent/
├── agentic_reasoning/
├── api/
├── conf/
├── deepdoc/
├── docker/
├── docs/
├── example/
├── graphrag/
├── helm/
├── intergrations/
├── nltk_data
├── rag/
├── sdk/
└── web/
└── uv.lock
agent
目录主要是 智能体相关的实现agentic_reasoning
目录是0.17.2更新的推理能力的实现api
目录主要是后端apiconf
是源码启动的时候的配置文件deepdoc
目录是 deepdoc的实现,解析文档(偷偷的告诉你们后续会用视觉模型处理文档)docker
目录 快速验证部署的docker配置docs
目录是 ragflow的文档库example
目录主要是接口和sdk操作dataset的示例graphrag
目录是 知识图谱实现helm
k8s配置intergrations
目录中ragflow 与 ChatGPT-on-WeChat 对接nltk_data
目录为下载的nltk数据,docker构建的时候创建rag
目录是rag实现sdk
目录是ragflow封装的python sdkweb
主要是ragflow的前端项目uv.lock
python项目的依赖组件接下来我详细介绍下使用的几个目录。
apps
目录主要是 ragflow对外暴露的接口sdk
目录是对外提供的api,主要是操作知识库和一些扩展*_app.py
的文件都是对ragflow的web提供的接口db
目录是操作数据库的封装utils
是封装的一些工具ragflow_server.py
是ragflow api服务的启动入口文件。核心代码如下:
settings.init_settings()
# init db
init_web_db()
init_web_data()
# start http server
try:
logging.info("RAGFlow HTTP server start...")
run_simple(
hostname=settings.HOST_IP,
port=settings.HOST_PORT,
application=app,
threaded=True,
use_reloader=RuntimeConfig.DEBUG,
use_debugger=RuntimeConfig.DEBUG,
)
except Exception:
run_simple
启动Flask应用(app
对象来自api.apps
模块)threaded=True
)use_reloader
)和调试器(use_debugger
)SIGINT
和SIGTERM
信号实现优雅关闭在settings.py
中做了很多初始化的工作
def init_settings():
global DOC_ENGINE, docStoreConn, retrievaler, kg_retrievaler
#文档引擎默认使用elasticsearch
DOC_ENGINE = os.environ.get('DOC_ENGINE', "elasticsearch")
lower_case_doc_engine = DOC_ENGINE.lower()
if lower_case_doc_engine == "elasticsearch":
docStoreConn = rag.utils.es_conn.ESConnection()
elif lower_case_doc_engine == "infinity":
docStoreConn = rag.utils.infinity_conn.InfinityConnection()
else:
raise Exception(f"Not supported doc engine: {DOC_ENGINE}")
retrievaler = search.Dealer(docStoreConn)
kg_retrievaler = kg_search.KGSearch(docStoreConn)
我们接着看下app
目录下的__init__.py
文件
def search_pages_path(pages_dir):
app_path_list = [
path for path in pages_dir.glob("*_app.py") ifnot path.name.startswith(".")
]
api_path_list = [
path for path in pages_dir.glob("*sdk/*.py") ifnot path.name.startswith(".")
]
app_path_list.extend(api_path_list)
return app_path_list
defregister_page(page_path):
path = f"{page_path}"
page_name = page_path.stem.rstrip("_app")
module_name = ".".join(
page_path.parts[page_path.parts.index("api"): -1] + (page_name,)
)
spec = spec_from_file_location(module_name, page_path)
page = module_from_spec(spec)
page.app = app
page.manager = Blueprint(page_name, module_name)
sys.modules[module_name] = page
spec.loader.exec_module(page)
page_name = getattr(page, "page_name", page_name)
sdk_path = "\\sdk\\"if sys.platform.startswith("win") else"/sdk/"
url_prefix = (
f"/api/{API_VERSION}"if sdk_path in path elsef"/{API_VERSION}/{page_name}"
)
app.register_blueprint(page.manager, url_prefix=url_prefix)
return url_prefix
pages_dir = [
Path(__file__).parent,
Path(__file__).parent.parent / "api" / "apps",
Path(__file__).parent.parent / "api" / "apps" / "sdk",
]
client_urls_prefix = [
register_page(path) fordirin pages_dir for path in search_pages_path(dir)
这里主要做的是通过扫描 api/apps/
和 api/apps/sdk/
包下的 *_app.py
和sdk
包下文件,通过register_page()
动态注册到_blueprint。
/{API_VERSION}/{page_name}
page_name
是document_app.py 移除_app.py
后缀后的名称/api/{API_VERSION}/...
http://localhost:8002/v1/document/upload
,拆解下,就是去document
+_app.py
对应的文件,也就是document_app.py
文件中找upload
,这样我们从前端找接口就好找了。ragflow的接口的代码逻辑基本上在*_app.py
中。
API_VERSION
就是上面介绍的api的版本SERVICE_CONF
是加载配置文件REQUEST_WAIT_SEC
和REQUEST_MAX_WAIT_SEC
两个参数没啥用DATASET_NAME_LIMIT
限制数据集的名称大小核心文件是db_models.py
和service目录中的common_service.py
。
db_models.py
封装了表模型,common_service.py
通过Peewee ORM
封装了常用数据库的CURD
。基础CRUD基础CRUD基础CRUD基础CRUD高级功能高级功能CommonService查询模块写入模块更新模块删除模块批量操作条件过滤queryget/get_allinsert/saveupdate_by_iddelete_by_idinsert_manyupdate_many_by_idfilter_scope_listfilter_delete
这样在其他的service中能CommonService
后,就具备了所有的基础功能
class DocumentService(CommonService):
model = Document
@classmethod
@DB.connection_context()
defget_list(cls, kb_id, page_number, items_per_page,
orderby, desc, keywords, id, name):
docs = cls.model.select().where(cls.model.kb_id == kb_id)
ifid:
docs = docs.where(
cls.model.id == id)
if name:
docs = docs.where(
cls.model.name == name
)
if keywords:
docs = docs.where(
fn.LOWER(cls.model.name).contains(keywords.lower())
)
if desc:
docs = docs.order_by(cls.model.getter_by(orderby).desc())
else:
docs = docs.order_by(cls.model.getter_by(orderby).asc())
count = docs.count()
docs = docs.paginate(page_number, items_per_page)
returnlist(docs.dicts()), count
比如DocumentService
通过model=Document 指定操作哪个数据模型(也就是哪张表)能快速实现增删改查。通过自定义扩展其他方法达到自己想要的效果。
封装的一些工具,就是字面意思。
service_conf.yaml
,基本上ragflow支持什么,在这个文件中都能体现出来。这个是知识图谱的视线。关注几个文件。
general/index.py
抽取入口entity_resolution.py
search.py
该方法主要是暴露给task_executor
用于抽取知识图谱。主要流程如下
CommunityReportsExtractorEntityResolutionElasticSearchNetworkXExtractorGraphRAGClientCommunityReportsExtractorEntityResolutionElasticSearchNetworkXExtractorGraphRAGClientrun_graphrag(doc_data)生成子图(实体&关系)NetworkX子图合并到全局图消歧新增实体生成社区报告存储结果完成通知
async def run_graphrag(row: dict, ...):
# 生成子图后存储到ES
subgraph = await generate_subgraph(...)
# 合并到全局图谱
new_graph = await merge_subgraph(...)
# 实体消歧和社区发现后,最终存储社区报告到ES
await extract_community(...)
generate_subgraph代码中的核心片段
subgraph.graph["source_id"] = [doc_id]
chunk = {
"content_with_weight": json.dumps(nx.node_link_data(subgraph)), # 将NetworkX图序列化为JSON
"knowledge_graph_kwd": "subgraph", # ES索引标识
"kb_id": kb_id,
"source_id": [doc_id],
"available_int": 0,
"removed_kwd": "N",
}
# 写入ES的核心操作
await trio.to_thread.run_sync(
lambda: settings.docStoreConn.insert(
[{"id": chunk_id(chunk), **chunk}],
search.index_name(tenant_id),
kb_id
)
)
knowledge_graph_kwd="subgraph"
: 标识该数据为子图类型docStoreConn
: RAGFlow封装的文档客户端,这个在api模块中说了search.index_name(tenant_id)
: 动态生成索引名(格式如ragflow_{tenant_id}
解决知识图谱中 同一实体多表述 的问题(如"苹果公司" vs "Apple Inc."),确保图谱中每个真实世界实体仅对应唯一节点。
GraphLLMERGraphLLMERalt[存在候选节点][无候选节点]loop[每个待消歧节点]获取新增子图节点(subgraph_nodes)查找相似候选节点(相似度>阈值)发送实体对比请求返回是否同一实体的判断合并节点或保留新节点保留为新节点更新全局图谱
该模块是构建高质量知识图谱的核心组件,通过语义理解与图结构分析的结合,显著提升图谱的准确性和一致性。
提供给内部的直接查询知识图谱的方法,核心概流程如下
ElasticSearchLLMKGSearchUserElasticSearchLLMKGSearchUserretrieval(question, kb_ids)query_rewrite(question)实体类型&关键词并行发起三类检索原始结果结果融合与评分结构化检索结果
package.json
可以快速了解前端使用了哪些组件,以及常用的脚本.env
定义了前端端口.umirc.ts
文件主要是源码启动,调用后端的路由配置public
中的logo.svg 可以修改项目的logoconf.json
配置前端的中文名称routes.ts
静态路由配置,所有的页面都在pages中,大家可以从路由中快速定位。前端使用的是react写的,真不想再折腾了,学的太多,太杂了。能改改就行了。
53AI,企业落地大模型首选服务商
产品:场景落地咨询+大模型应用平台+行业解决方案
承诺:免费场景POC验证,效果验证后签署服务协议。零风险落地应用大模型,已交付160+中大型企业
2025-04-27
RAGFlow 集成 Milvus向量库操作指南
2025-04-27
Docker 迁移RAGFlow镜像后出现问题解析与如何修复
2025-04-26
ragflow v0.18.0:VLM模型支持、知识库共享、Langfuse集成,企业级AI新选择
2025-04-26
Ragflow(v0.18.0)更新内容概览与同步计划
2025-04-11
Windows WSL 安装 RAGFlow 详细教程
2025-03-31
喂饭教程-Dify如何集成RAGFlow知识库
2025-03-19
一文读懂 RAGFlow 知识库接入 Dify 的全流程
2025-03-16
MacOS 安装 RagFlow 全踩坑指南
2025-02-07
2025-03-11
2024-11-25
2024-09-30
2025-03-19
2024-12-24
2025-04-09
2025-03-16
2025-03-31
2025-04-11