支持私有化部署
AI知识库

53AI知识库

学习大模型的前沿技术与行业应用场景


Dify 深度拆解(二):后端架构设计与启动流程全景图

发布日期:2025-06-09 13:52:41 浏览次数: 1561 作者:5ycode
推荐语

深入理解Dify后端架构设计,掌握Python Flask框架项目结构。

核心内容:
1. Dify后端架构概览及技术栈
2. 项目目录结构与接口入口解析
3. 模块化分层与代码规范性探讨

杨芳贤
53A创始人/腾讯云(TVP)最具价值专家


在上一篇dify项目结构说明与win11本地部署之后,本文将带你剖析Dify的后端架构设计思路。

Dify 的后端使用 Python 编写,使用 Flask 作为web框架。使用 SQLAlchemy 作为 ORM,使用 Celery 作为任务队列。授权逻辑通过 Flask-login 进行处理。

一、项目结构

第一次打开Dify的代码目录,随便翻了翻,就把dify的目录功能推测出来了.

api/
  ├── app.py              # 主入口
  ├── app_factory.py      # 初始化逻辑
  ├── controllers/        # API路由
  ├── core/               # 核心业务
  ├── extensions/         # 各种插件
  └── services/           # 业务逻辑
  ├── migrations          # 数据库迁移脚本
  ├── models              # 数据库模型和表结构定义 

这种布局让新上手项目的开发者能在不看文档的情况下快速找到需要修改的地方,大大降低了熟悉代码的难度。

接口入口

在controllers目录下,是api的视线

  • • console 主要是后端管理平台的api服务
  • • web 对应c端用户的api服务
  • • service_api 开放服务的api
  • • inner_api 内部api服务

我们看下console内的__init__.py文件

bp = Blueprint("console", __name__, url_prefix="/console/api")  
api = ExternalApi(bp)

我们可以看到在这个模块下所有的接口都是console/api 开头,在任意一个功能文件的下面都有类似下面的路由注册。手动注册,很累。

api.add_resource(LoginApi, "/login")  
api.add_resource(LogoutApi, "/logout")  
api.add_resource(EmailCodeLoginSendEmailApi, "/email-code-login")  
api.add_resource(EmailCodeLoginApi, "/email-code-login/validity")  
api.add_resource(ResetPasswordSendEmailApi, "/reset-password")  
api.add_resource(RefreshTokenApi, "/refresh-token")
而且对应的视线都是接口后面的单词+api,比如login接口,对应的是loginApi,规范性很强。

我们在后台页面登录的时候,注意模块为console,直接在对应的目录下找对应的文件即可。

flask 写起来是真麻烦。

二、整体架构:模块化分层的艺术

Dify的其核心设计理念可概括为:
1. 分层设计:严格遵循表现层-应用层-领域层-基础设施层的分层原则(这是为啥我看dify的代码比较亲切,和java比较像,哈哈)
2. 模块化解耦:功能模块高内聚低耦合,像积木一样可自由组合
3. 扩展优先:所有非核心功能都设计为可插拔的扩展

我把所有的模块归纳以后,就是下面的示意图。

基础设施数据库缓存消息队列Celery Worker领域层核心模型业务规则应用层业务服务流程编排表现层API接口参数校验

三、启动分析

api启动分析

我们通过启动命令来分析下

uv run flask run --host 0.0.0.0 --port=5001 --debug
  • • 在这里,我们看到flask run 这就涉及到flask命令的机制了
    • • 查找环境变量 FLASK_APP 指定的 Python 应用模块(默认依次查找wsgi.pyapp.pyapplication.py
    • • 启动开发服务器(默认使用 werkzeug
    • • 加载 .flaskenv 或 .env 文件(如果存在)
是否非调试模式调试模式启动命令是否为数据库命令?创建迁移专用应用创建完整应用仅初始化数据库相关扩展返回迁移应用检查调试模式启用gevent补丁初始化gRPC gevent补丁psycopg2跳过gevent补丁创建基础应用create_app加载配置初始化24个扩展启动服务器
  • • 在启动的时候,先做了一些判断,如果只是迁移数据库,就直接终止
  • • 然后核心走到了app_factory类中的create_app
  • • 加载配置那通过DifyConfig类把.env文件加载到了内存,并创建了一个DifyApp 对象app
    • • DifyApp继承了Flask,可以认为app是一个Flask对象
  • • 初始化扩展,就是在flask上添加各种各样的功能

扩展功能

以下是整理的extensions目录下的扩展功能

1. 核心功能扩展:
- ext_database: 数据库支持
- ext_redis: Redis缓存
- ext_celery: 异步任务
- ext_migrate: 数据库迁移

2. 安全相关扩展:
- ext_login: 用户认证
- ext_set_secretkey: 密钥管理
- ext_sentry: 错误监控

3. 性能相关扩展:
- ext_compress: 响应压缩
- ext_proxy_fix: 代理支持
- ext_app_metrics: 应用指标

4. 功能模块扩展:
- ext_blueprints: 路由注册
- ext_commands: CLI命令
- ext_storage: 文件存储

5. 运维相关扩展:
- ext_logging: 日志配置
- ext_otel: OpenTelemetry
- ext_request_logging: 请求日志

我拿路由注册来说

def init_app(app: DifyApp):  
    # register blueprint routers  

    from flask_cors import CORS  # type: ignore  

    from controllers.console import bp as console_app_bp  
    from controllers.files import bp as files_bp  
    from controllers.inner_api import bp as inner_api_bp  
    from controllers.service_api import bp as service_api_bp  
    from controllers.web import bp as web_bp  

    CORS(  
        service_api_bp,  
        allow_headers=["Content-Type""Authorization""X-App-Code"],  
        methods=["GET""PUT""POST""DELETE""OPTIONS""PATCH"],  
    )  
    app.register_blueprint(service_api_bp)
  • • 首先 通过把controllers目录下的包都导入了进来
  • • 然后通过 CORS,给对应的包的路径增加跨域设置
  • • 最后通过app.register_blueprint 将这个目录对应的接口都注册到flask中

任务处理启动

uv run celery -A app.celery worker -P solo --without-gossip --without-mingle -Q dataset,generation,mail,ops_trace --loglevel INFO

我们来让ai分析下这个启动命令都是什么意思。

  • • celery: Celery命令行工具
  • • -A app.celery: 指定Celery应用模块,表示入口模块,具体指什么下面再说
  • • worker: 启动worker进程
  • • -P solo: 使用solo池(单进程)
  • • --without-gossip: 禁用worker之间的gossip通信
  • • --without-mingle: 禁用worker启动时的mingle同步
  • • -Q dataset,generation,mail,ops_trace: 只监听这4个队列
  • • --loglevel INFO: 设置日志级别为INFO
ExtensionsFlask实例(app变量)app模块(app.py)Celery CLIExtensionsFlask实例(app变量)app模块(app.py)Celery CLI导入app模块访问celery属性查找extensions["celery"]返回Celery实例返回celery属性值提供Celery应用

这边就有点绕,我在想这个是app.py?还是app实例属性?通过上面的流程图可以看到

  • • app.celery 最终指向的是通过celery = app.extensions["celery"]获取到的的celery

我们看下celery里是什么

def init_app(app: DifyApp) -> Celery:  
    classFlaskTask(Task):  
        def__call__(self, *args: object, **kwargs: object) -> object:  
            with app.app_context():  
                returnself.run(*args, **kwargs)  
    celery_app = Celery(  
        app.name,  
        task_cls=FlaskTask,  
        broker=dify_config.CELERY_BROKER_URL,  
        backend=dify_config.CELERY_BACKEND,  
        task_ignore_result=True,  
    )
    celery_app.set_default()  
    app.extensions["celery"] = celery_app

ext_celery.app中知识创建了一个celery_app实例,然后绑定到了flask上。然后我们要好好的了解下celery。

celery

Celery 是一个强大的分布式任务队列系统,用于处理异步任务和定时任务。

基本组件

  • • 任务 (Task): 需要异步执行的函数
  • • Worker: 执行任务的进程
  • • Broker: 消息中间件,负责传递任务 (常用 Redis/RabbitMQ)
  • • Backend: 存储任务结果 (可选,常用 Redis/RabbitMQ/数据库)

回到dify中,我们根据CELERY_BROKER_URL可以找到.env中找到

CELERY_BROKER_URL=redis://:difyai123456@localhost:${REDIS_PORT}/1

说明dify的中的任务传递是靠redis来的。

根据CELERY_BACKEND 没有找到配置,但是看代码,默认是database

class CeleryConfig(DatabaseConfig):  
    CELERY_BACKEND: str = Field(  
        description="Backend for Celery task results. Options: 'database', 'redis'.",  
        default="database",  
    )  
  
    CELERY_BROKER_URL: Optional[str] = Field(  
        description="URL of the message broker for Celery tasks.",  
        default=None,  
    )

然后我就在dify中找任务是怎么和celery关联上的,没找到,然后问了下大模型。celery在启动的时候自动加载tasks目录下的任务。

任务注册表tasks模块Celery AppCelery Worker启动任务注册表tasks模块Celery AppCelery Worker启动加载Celery应用自动发现任务模块(通过include/自动扫描)@shared_task装饰器执行时注册任务注册完成Worker开始监听队列
然后看下启动日志。

看日志输出,确实是。然后随便找一个代码
@shared_task(queue="dataset")  
def add_document_to_index_task(dataset_document_id: str):
  • • queue是 在@shared_task中定义的,整个就4个queue
  • • 从启动日志上看 有个concurrency: 28 (solo) 28个线程

然后我们通过add_document_to_index_task反向查找下

我们可以看到在操作文档状态的时候,会添加到任务队列里。

我让DeepSeek给我画了下操作流程

Database BackendCelery WorkerRedis Broker客户端/生产者Database BackendCelery WorkerRedis Broker客户端/生产者1. 发布任务到队列 (LPUSH)2. 轮询队列 (BRPOP)3. 执行任务逻辑4. 存储任务结果 (INSERT/UPDATE)5. 可选: 发送完成事件

当看到这个流程,我想到的是一致性如何保障?

接着看到入队和消费的代码

数据库Celery WorkerRedis主流程数据库Celery WorkerRedis主流程alt[文档存在且状态为complete-d][异常情况]setex(indexing_cache_key, 600, 1)add_document_to_index_task.delay()查询DatasetDocument查询Segments更新Segment状态删除AutoDisableLogdel(indexing_cache_key)标记文档为error状态del(indexing_cache_key)
  • • 在客户端,任务入队,设置了10分钟的分布式锁
  • • 然后通过redis的队列推到了dataset 队列中
  • • celery worker 从dataset队列中消费add_document_to_index_task
  • • 在add_document_to_index_task中主要是业务逻辑的处理

53AI,企业落地大模型首选服务商

产品:场景落地咨询+大模型应用平台+行业解决方案

承诺:免费场景POC验证,效果验证后签署服务协议。零风险落地应用大模型,已交付160+中大型企业

联系我们

售前咨询
186 6662 7370
预约演示
185 8882 0121

微信扫码

添加专属顾问

回到顶部

加载中...

扫码咨询