微信扫码
添加专属顾问
我要投稿
深入理解Dify后端架构设计,掌握Python Flask框架项目结构。 核心内容: 1. Dify后端架构概览及技术栈 2. 项目目录结构与接口入口解析 3. 模块化分层与代码规范性探讨
在上一篇dify项目结构说明与win11本地部署之后,本文将带你剖析Dify的后端架构设计思路。
Dify 的后端使用 Python 编写,使用 Flask 作为web框架。使用 SQLAlchemy 作为 ORM,使用 Celery 作为任务队列。授权逻辑通过 Flask-login 进行处理。
第一次打开Dify的代码目录,随便翻了翻,就把dify的目录功能推测出来了.
api/
├── app.py # 主入口
├── app_factory.py # 初始化逻辑
├── controllers/ # API路由
├── core/ # 核心业务
├── extensions/ # 各种插件
└── services/ # 业务逻辑
├── migrations # 数据库迁移脚本
├── models # 数据库模型和表结构定义
这种布局让新上手项目的开发者能在不看文档的情况下快速找到需要修改的地方,大大降低了熟悉代码的难度。
在controllers目录下,是api的视线
我们看下console内的__init__.py
文件
bp = Blueprint("console", __name__, url_prefix="/console/api")
api = ExternalApi(bp)
我们可以看到在这个模块下所有的接口都是console/api
开头,在任意一个功能文件的下面都有类似下面的路由注册。手动注册,很累。
api.add_resource(LoginApi, "/login")
api.add_resource(LogoutApi, "/logout")
api.add_resource(EmailCodeLoginSendEmailApi, "/email-code-login")
api.add_resource(EmailCodeLoginApi, "/email-code-login/validity")
api.add_resource(ResetPasswordSendEmailApi, "/reset-password")
api.add_resource(RefreshTokenApi, "/refresh-token")
login
接口,对应的是loginApi
,规范性很强。console
,直接在对应的目录下找对应的文件即可。flask 写起来是真麻烦。
Dify的其核心设计理念可概括为:
1. 分层设计:严格遵循表现层-应用层-领域层-基础设施层的分层原则(这是为啥我看dify的代码比较亲切,和java比较像,哈哈)
2. 模块化解耦:功能模块高内聚低耦合,像积木一样可自由组合
3. 扩展优先:所有非核心功能都设计为可插拔的扩展
我把所有的模块归纳以后,就是下面的示意图。
基础设施数据库缓存消息队列Celery Worker领域层核心模型业务规则应用层业务服务流程编排表现层API接口参数校验
我们通过启动命令来分析下
uv run flask run --host 0.0.0.0 --port=5001 --debug
flask run
这就涉及到flask命令的机制了FLASK_APP
指定的 Python 应用模块(默认依次查找wsgi.py
、app.py
、application.py
)werkzeug
).flaskenv
或 .env
文件(如果存在)是否非调试模式调试模式启动命令是否为数据库命令?创建迁移专用应用创建完整应用仅初始化数据库相关扩展返回迁移应用检查调试模式启用gevent补丁初始化gRPC gevent补丁psycopg2跳过gevent补丁创建基础应用create_app加载配置初始化24个扩展启动服务器
app_factory类中的create_app
DifyConfig
类把.env
文件加载到了内存,并创建了一个DifyApp
对象appDifyApp
继承了Flask,可以认为app是一个Flask对象flask
上添加各种各样的功能以下是整理的extensions
目录下的扩展功能
1. 核心功能扩展:
- ext_database: 数据库支持
- ext_redis: Redis缓存
- ext_celery: 异步任务
- ext_migrate: 数据库迁移
2. 安全相关扩展:
- ext_login: 用户认证
- ext_set_secretkey: 密钥管理
- ext_sentry: 错误监控
3. 性能相关扩展:
- ext_compress: 响应压缩
- ext_proxy_fix: 代理支持
- ext_app_metrics: 应用指标
4. 功能模块扩展:
- ext_blueprints: 路由注册
- ext_commands: CLI命令
- ext_storage: 文件存储
5. 运维相关扩展:
- ext_logging: 日志配置
- ext_otel: OpenTelemetry
- ext_request_logging: 请求日志
我拿路由注册来说
def init_app(app: DifyApp):
# register blueprint routers
from flask_cors import CORS # type: ignore
from controllers.console import bp as console_app_bp
from controllers.files import bp as files_bp
from controllers.inner_api import bp as inner_api_bp
from controllers.service_api import bp as service_api_bp
from controllers.web import bp as web_bp
CORS(
service_api_bp,
allow_headers=["Content-Type", "Authorization", "X-App-Code"],
methods=["GET", "PUT", "POST", "DELETE", "OPTIONS", "PATCH"],
)
app.register_blueprint(service_api_bp)
app.register_blueprint
将这个目录对应的接口都注册到flask中uv run celery -A app.celery worker -P solo --without-gossip --without-mingle -Q dataset,generation,mail,ops_trace --loglevel INFO
我们来让ai分析下这个启动命令都是什么意思。
celery
: Celery命令行工具-A app.celery
: 指定Celery应用模块,表示入口模块,具体指什么下面再说worker
: 启动worker进程-P solo
: 使用solo池(单进程)--without-gossip
: 禁用worker之间的gossip通信--without-mingle
: 禁用worker启动时的mingle同步-Q dataset,generation,mail,ops_trace
: 只监听这4个队列--loglevel INFO
: 设置日志级别为INFOExtensionsFlask实例(app变量)app模块(app.py)Celery CLIExtensionsFlask实例(app变量)app模块(app.py)Celery CLI导入app模块访问celery属性查找extensions["celery"]返回Celery实例返回celery属性值提供Celery应用
这边就有点绕,我在想这个是app.py?还是app实例属性?通过上面的流程图可以看到
app.celery
最终指向的是通过celery = app.extensions["celery"]
获取到的的celery
我们看下celery里是什么
def init_app(app: DifyApp) -> Celery:
classFlaskTask(Task):
def__call__(self, *args: object, **kwargs: object) -> object:
with app.app_context():
returnself.run(*args, **kwargs)
celery_app = Celery(
app.name,
task_cls=FlaskTask,
broker=dify_config.CELERY_BROKER_URL,
backend=dify_config.CELERY_BACKEND,
task_ignore_result=True,
)
celery_app.set_default()
app.extensions["celery"] = celery_app
在ext_celery.app
中知识创建了一个celery_app实例,然后绑定到了flask上。然后我们要好好的了解下celery。
Celery 是一个强大的分布式任务队列系统,用于处理异步任务和定时任务。
回到dify中,我们根据CELERY_BROKER_URL
可以找到.env
中找到
CELERY_BROKER_URL=redis://:difyai123456@localhost:${REDIS_PORT}/1
说明dify的中的任务传递是靠redis来的。
根据CELERY_BACKEND
没有找到配置,但是看代码,默认是database
class CeleryConfig(DatabaseConfig):
CELERY_BACKEND: str = Field(
description="Backend for Celery task results. Options: 'database', 'redis'.",
default="database",
)
CELERY_BROKER_URL: Optional[str] = Field(
description="URL of the message broker for Celery tasks.",
default=None,
)
然后我就在dify中找任务是怎么和celery关联上的,没找到,然后问了下大模型。celery在启动的时候自动加载tasks目录下的任务。
任务注册表tasks模块Celery AppCelery Worker启动任务注册表tasks模块Celery AppCelery Worker启动加载Celery应用自动发现任务模块(通过include/自动扫描)@shared_task装饰器执行时注册任务注册完成Worker开始监听队列
@shared_task(queue="dataset")
def add_document_to_index_task(dataset_document_id: str):
concurrency: 28 (solo)
28个线程然后我们通过add_document_to_index_task
反向查找下
我们可以看到在操作文档状态的时候,会添加到任务队列里。
我让DeepSeek给我画了下操作流程
Database BackendCelery WorkerRedis Broker客户端/生产者Database BackendCelery WorkerRedis Broker客户端/生产者1. 发布任务到队列 (LPUSH)2. 轮询队列 (BRPOP)3. 执行任务逻辑4. 存储任务结果 (INSERT/UPDATE)5. 可选: 发送完成事件
当看到这个流程,我想到的是一致性如何保障?
接着看到入队和消费的代码
数据库Celery WorkerRedis主流程数据库Celery WorkerRedis主流程alt[文档存在且状态为complete-d][异常情况]setex(indexing_cache_key, 600, 1)add_document_to_index_task.delay()查询DatasetDocument查询Segments更新Segment状态删除AutoDisableLogdel(indexing_cache_key)标记文档为error状态del(indexing_cache_key)
dataset
队列中dataset
队列中消费add_document_to_index_task53AI,企业落地大模型首选服务商
产品:场景落地咨询+大模型应用平台+行业解决方案
承诺:免费场景POC验证,效果验证后签署服务协议。零风险落地应用大模型,已交付160+中大型企业
2024-12-24
2024-04-25
2024-07-16
2024-07-20
2024-04-24
2024-06-21
2024-11-15
2024-05-08
2024-08-06
2025-03-09
2025-05-29
2025-05-28
2025-05-22
2025-04-27
2025-04-15
2025-03-20
2024-12-19
2024-09-13