微信扫码
添加专属顾问
我要投稿
Dify 与 DMS 的强强联合,打造企业级 AI 应用开发新范式,实现从开发到调度的全链路闭环。核心内容: 1. Dify 原生能力的局限与挑战 2. DMS Notebook 和 DMS Airflow 如何增强 Dify 功能 3. 销售数据分析机器人的完整开发案例
背景与挑战
dify 作为一款低代码 AI 应用开发平台,凭借其直观的可视化工作流编排能力,极大降低了大模型应用的开发门槛。然而,在实际企业级落地过程中,我们发现其原生能力仍存在两个关键瓶颈:
代码执行能力受限:Dify 内置的 Sandbox 节点虽支持基础 Python 代码执行,但无法安装自定义 Python 包,难以支撑复杂的业务逻辑、数据处理或算法调用;
缺乏自动化调度机制:Dify 原生架构不支持 Agent 或 Agentic Workflow 的定时触发、周期性运行与依赖编排,导致其难以融入企业级自动化运维体系。
为突破这些限制,我们在实践中探索出一套 “Dify + DMS Notebook + DMS Airflow”三位一体的一站式增强开发架构,有效补齐了 Dify 在执行能力与调度能力上的短板:
使用 DMS Notebook 扩展 Dify 的代码执行能力
为什么需要 Notebook?
在 Dify 中,若需调用 Pandas 进行数据清洗、使用 Prophet 做时间序列预测,或集成企业内部 SDK,Sandbox 节点往往无能为力。而 DMS Notebook 提供了完整的 Python 环境,支持:
步骤详解:构建一个销售数据分析 API 服务
import osfrom fastapi import FastAPI, HTTPException, Request, File, UploadFile, Path, Query, Form, Headerfrom fastapi.staticfiles import StaticFilesfrom typing import Optionalimport nest_asyncioimport asyncioimport httpximport io'''注意,Jupyter本身就在一个asyncio事件循环中运行。我们不能在已有循环中直接运行另一个循环,但nest_asyncio这个库可以“打补丁”,允许我们这样做。'''nest_asyncio.apply()app = FastAPI(title="Your Service Name", description="Description of Your Service")# 创建static目录(如果不存在)static_dir = "static"ifnot os.path.exists(static_dir):os.makedirs(static_dir)# 挂载静态文件服务app.mount("/static", StaticFiles(directory=static_dir), name="static")async def root():"""根节点,返回服务基本信息# --- 如何使用 curl 调用 ---curl -X GET "http://127.0.0.1:8000/""""return {"message": "Service is running","documentation": "/docs","note": "..."}async def process_data(request: Request, # 使用 Request 对象来接收原始请求# 路径参数item_id: int = Path(..., title="物品ID", ge=1),# 查询参数is_premium: bool = Query(False, description="是否为高级物品"),# 请求头参数x_token: Optional[str] = Header(None, description="自定义的认证Token")):"""接收 JSON 请求体、路径参数、查询参数和请求头。# --- 如何使用 curl 调用 ---# -X POST: 指定请求方法# URL: 包含路径参数 {item_id} 和查询参数 ?is_premium=true# -H: 添加请求头 (Header)# -d: 发送请求体 (Body),这里是 JSON 字符串curl -X POST "http://127.0.0.1:8000/process-data/101?is_premium=true" \-H "Content-Type: application/json" \-H "X-Token: my-secret-token" \-d '{"name": "笔记本电脑", "price": 7999.9, "tags": ["electronics", "office"]}'"""if x_token != "my-secret-token":raise HTTPException(status_code=401, detail="X-Token 无效")try:# 手动解析 JSON 请求体json_body = await request.json()name = json_body.get("name")price = json_body.get("price")# 你的业务逻辑代码if not name or not price:raise HTTPException(status_code=400, detail="请求体中缺少 'name' 或 'price'")return {"message": "数据处理成功","received_data": {"item_id": item_id,"is_premium": is_premium,"x_token": x_token,"body": json_body}}except Exception as e:raise HTTPException(status_code=500, detail=f"服务执行错误: {str(e)}")async def upload_file(# 表单数据token: str = Form(...),# 上传文件file: UploadFile = File(...)):"""通过表单(form-data)上传文件和附带的文本信息。# --- 如何使用 curl 调用 ---# -F: 用于发送 multipart/form-data# -F "file=@/path/to/your/file.txt": @符号表示后面是文件路径,curl会读取该文件内容作为上传数据# -F "token=user123": 发送一个名为 token 的表单字段# 注意: 请将 /path/to/your/file.txt 替换为你的本地文件真实路径curl -X POST "http://127.0.0.1:8000/upload-file" \-F "file=@./test_upload.txt" \-F "token=my-form-token""""# 为了让curl示例能工作,我们先创建一个示例文件if not os.path.exists("test_upload.txt"):with open("test_upload.txt", "w") as f:f.write("This is a test file for curl upload.")try:contents = await file.read()file_location = os.path.join(static_dir, file.filename)with open(file_location, "wb") as f:f.write(contents)return {"message": "文件上传成功!","token": token,"filename": file.filename,"file_size": len(contents),"file_url": f"/static/{file.filename}"}except Exception as e:raise HTTPException(status_code=500, detail=f"文件处理错误: {str(e)}")async def get_server_status():"""获取服务器状态。# --- 如何使用 curl 调用 ---curl -X GET "http://127.0.0.1:8000/status""""return {"status": "running"}async def run_server(host="127.0.0.1", port=8000):"""在后台运行uvicorn服务器"""import uvicornconfig = uvicorn.Config(app, host=host, port=port, log_level="info")server = uvicorn.Server(config)# uvicorn.run() 是一个阻塞调用,所以我们用更底层的Server.serve()await server.serve()task = asyncio.create_task(run_server(host="0.0.0.0", port=8000))# 等待服务启动await asyncio.sleep(2)# 创建一个异步HTTP客户端async with httpx.AsyncClient() as client:print("正在向 http://127.0.0.1:8000/status/ 发送请求...")# 发送POST请求response = await client.get("http://127.0.0.1:8000/status")# 打印结果if response.status_code == 200:print("服务启动成功")else:print("服务启动失败,请检查报错信息")
接下来我们以构建一个单日销售数据分析的API为例,代码内容如下:
import osimport pandas as pdfrom fastapi import FastAPI, HTTPException, Request, File, UploadFilefrom fastapi.staticfiles import StaticFilesimport nest_asyncioimport asyncioimport httpximport io'''注意,Jupyter本身就在一个asyncio事件循环中运行。我们不能在已有循环中直接运行另一个循环,但nest_asyncio这个库可以“打补丁”,允许我们这样做。'''nest_asyncio.apply()app = FastAPI(title="Sales Data Analysis Service", description="Provides data analysis and chart generation capabilities for Dify")# 创建static目录(如果不存在)static_dir = "static"ifnot os.path.exists(static_dir):os.makedirs(static_dir)# 挂载静态文件服务app.mount("/static", StaticFiles(directory=static_dir), name="static")def load_sales_data_from_file(file_content: bytes):"""从上传的文件内容加载销售数据"""try:# 将字节内容转换为StringIO对象csv_string = file_content.decode('utf-8')df = pd.read_csv(io.StringIO(csv_string))# 验证必要的列是否存在required_columns = ['Date', 'Product', 'Price', 'Amount', 'Region']ifnot all(col in df.columns for col in required_columns):raise ValueError(f"CSV file must contain columns: {', '.join(required_columns)}")# 转换数据类型df['Date'] = pd.to_datetime(df['Date'])df['Price'] = pd.to_numeric(df['Price'], errors='coerce')df['Amount'] = pd.to_numeric(df['Amount'], errors='coerce')# 计算销售额 (Sales = Price × Amount)df['Sales'] = df['Price'] * df['Amount']return dfexcept Exception as e:raise HTTPException(status_code=500, detail=f"Error processing CSV file: {str(e)}")async def root():"""Root endpoint, returns service information"""return {"message": "Sales Data Analysis Service is running","documentation": "/docs","endpoints": ["POST /analysis/daily_sale_analysis"],"note": "Require a CSV file upload with columns: Date, Product, Price, Amount, Region"}async def daily_sale_analysis(file: UploadFile = File(...)):"""当日销售数据分析 - 分析上传文件中的销售数据"""try:# 验证文件类型ifnot file.filename.endswith('.csv'):raise HTTPException(status_code=400, detail="文件必须是CSV格式")# 读取上传的文件file_content = await file.read()df = load_sales_data_from_file(file_content)# 获取数据中的日期范围df['Date'] = pd.to_datetime(df['Date']).dt.dateunique_dates = sorted(df['Date'].unique())if len(unique_dates) == 0:raise HTTPException(status_code=400, detail="数据文件中没有有效的日期数据")# 如果有多个日期,取最新的日期作为分析目标target_date = unique_dates[-1] if len(unique_dates) > 1else unique_dates[0]# 筛选目标日期的数据daily_data = df[df['Date'] == target_date].copy()if daily_data.empty:raise HTTPException(status_code=400, detail=f"没有找到日期 {target_date} 的销售数据")# 基础统计total_sales = daily_data['Sales'].sum()total_orders = len(daily_data)total_quantity = daily_data['Amount'].sum()avg_order_value = total_sales / total_orders if total_orders > 0else0# 产品分析product_analysis = daily_data.groupby('Product').agg({'Sales': 'sum','Amount': 'sum','Price': 'mean'}).round(2)# 按销售额排序,取前5名产品top_products = product_analysis.sort_values('Sales', ascending=False).head(5)top_products_list = []for product, row in top_products.iterrows():top_products_list.append({"product": product,"sales": float(row['Sales']),"quantity": int(row['Amount']),"avg_price": float(row['Price'])})# 地区分析region_analysis = daily_data.groupby('Region').agg({'Sales': 'sum','Amount': 'sum'}).round(2)# 按销售额排序top_regions = region_analysis.sort_values('Sales', ascending=False)region_list = []for region, row in top_regions.iterrows():region_list.append({"region": region,"sales": float(row['Sales']),"quantity": int(row['Amount']),"percentage": round(float(row['Sales']) / total_sales * 100, 2)})# 价格区间分析daily_data['price_range'] = pd.cut(daily_data['Price'],bins=[0, 100, 500, 1000, 5000, float('inf')],labels=['0-100', '100-500', '500-1000', '1000-5000', '5000+'])price_range_analysis = daily_data.groupby('price_range').agg({'Sales': 'sum','Amount': 'sum'}).round(2)price_ranges = []for price_range, row in price_range_analysis.iterrows():ifnot pd.isna(row['Sales']) and row['Sales'] > 0:price_ranges.append({"range": str(price_range),"sales": float(row['Sales']),"quantity": int(row['Amount'])})# 生成洞察分析insights = []# 销售额洞察if total_sales > 100000:insights.append(f"当日销售表现优秀,总销售额达到 {total_sales:,.2f} 元")elif total_sales > 50000:insights.append(f"当日销售表现良好,总销售额为 {total_sales:,.2f} 元")else:insights.append(f"当日销售额为 {total_sales:,.2f} 元,可能需要关注销售策略")# 产品洞察if len(top_products_list) > 0:best_product = top_products_list[0]insights.append(f"最佳销售产品是 {best_product['product']},销售额 {best_product['sales']:,.2f} 元")# 地区洞察if len(region_list) > 0:best_region = region_list[0]insights.append(f"销售表现最佳的地区是 {best_region['region']},占总销售额的 {best_region['percentage']}%")# 订单洞察if avg_order_value > 1000:insights.append(f"平均订单价值较高,为 {avg_order_value:,.2f} 元,显示客户购买力强")return {"analysis_date": str(target_date),"summary": {"total_sales": round(float(total_sales), 2),"total_orders": int(total_orders),"total_quantity": int(total_quantity),"average_order_value": round(float(avg_order_value), 2)},"top_products": top_products_list,"region_analysis": region_list,"price_range_analysis": price_ranges,"insights": insights,"data_info": {"date_range": f"{unique_dates[0]} 到 {unique_dates[-1]}"if len(unique_dates) > 1else str(unique_dates[0]),"total_records": len(daily_data),"unique_products": len(daily_data['Product'].unique()),"unique_regions": len(daily_data['Region'].unique())}}except HTTPException:raiseexcept Exception as e:raise HTTPException(status_code=500, detail=f"当日销售数据分析错误: {str(e)}")async def get_server_status():"""获取服务器状态"""try:return {"status": "running"}except Exception as e:raise HTTPException(status_code=500, detail=f"Error getting server status: {str(e)}")async def run_server(host="127.0.0.1", port=8000):"""在后台运行uvicorn服务器"""import uvicornconfig = uvicorn.Config(app, host=host, port=port, log_level="info")server = uvicorn.Server(config)# uvicorn.run() 是一个阻塞调用,所以我们用更底层的Server.serve()await server.serve()task = asyncio.create_task(run_server(host="0.0.0.0", port=8000))# 等待服务启动await asyncio.sleep(2)# 创建一个异步HTTP客户端async with httpx.AsyncClient() as client:print("正在向 http://127.0.0.1:8000/status/ 发送请求...")# 发送POST请求response = await client.get("http://127.0.0.1:8000/status")# 打印结果if response.status_code == 200:print("服务启动成功")else:print("服务启动失败,请检查报错信息")
✅ 异步支持:Jupyter 内置 asyncio 事件循环,可直接使用 async/await。
在Dify on DMS实例中访问服务
现在我们使用这份模拟销售数据文件来访问API服务:
接下来以该Dify工作流为例子进行完整的服务调用;
在钉钉群组中添加一个自定义的机器人,并参考https://open.dingtalk.com/document/orgapp/robot-overview 了解如何获取钉钉机器人的access_token和sign_secret
在填写完你的钉钉机器人参数之后,点击右上角的运行-》从本地上传上传示例的销售数据,并点击开始运行;
钉钉群中的机器人成功发送了对该销售数据的分析报告;
点击右上角的发布-》发布更新发布工作流用于后面的定时调用;
使用 DMS Airflow 实现定时调度
创建DMS Airflow实例
参照以下链接在DMS中创建Airflow实例
https://help.aliyun.com/zh/dms/purchase-airflow-resources
https://help.aliyun.com/zh/dms/create-and-manage-an-airflow-instance
更多关于Airflow的操作,参照https://airflow.apache.org/docs/apache-airflow/stable/index.html
编写 DAG:每日自动触发销售分析
import pendulumimport requestsimport jsonfrom airflow.models.dag import DAGfrom airflow.operators.python import PythonOperatorfrom airflow.models import VariableCSV_FILE_PATH = "/yourpath/daily_sale_data.csv"DIFY_API_URL = "https://dify-dms.aliyuncs.com/v1" # 替换成你的 Dify Workflow API URL# 从 Airflow Variable 中安全地获取 API KeyDIFY_API_KEY = Variable.get("dify_api_key")APP_API_KEY= Variable.get("app_api_key")def call_dify_workflow_with_csv(**kwargs):"""读取 CSV 文件内容,并将其作为文件上传调用 Dify 工作流。"""print(f"准备从 '{CSV_FILE_PATH}' 读取文件...")try:with open(CSV_FILE_PATH, 'rb') as f:files_to_upload = {'file': ('daily_sale_data.csv', f, 'document/csv')}# 准备 API 请求的 headers 和 bodyheaders = {'Authorization': f'Bearer {APP_API_KEY}','DifyApiKey': f'{DIFY_API_KEY}',}file_upload_response=requests.post(DIFY_API_URL+'/files/upload',headers=headers,data={'user': 'airflow-user-demo'},files=files_to_upload,)print(file_upload_response.json())file_id=file_upload_response.json().get('id')headers.update({'Content-Type': 'application/json'})# 'inputs' 通常是json字符串# 'user' 是必须的,代表最终用户的标识符input_data = {'sales_data': {"type": "document","transfer_method": "local_file","upload_file_id": file_id}}data = {'inputs': input_data,'user': 'airflow-user-demo','response_mode': 'blocking',}print("开始调用 Dify API...")print(f"URL: {DIFY_API_URL}")response = requests.post(DIFY_API_URL+'/workflows/run',headers=headers,json=data,)# 检查请求是否成功response.raise_for_status()print(f"API 调用成功!状态码: {response.status_code}")# 打印响应内容print("--- Dify API Response ---")print(response.json()["data"]["outputs"]["answer"])print("\n--- End of Response ---")# 你也可以将完整的响应推送到 XComs,以便下游任务使用# ti = kwargs['ti']# ti.xcom_push(key='dify_response', value=full_response)except FileNotFoundError:print(f"错误:文件未找到于 '{CSV_FILE_PATH}'")raiseexcept requests.exceptions.RequestException as e:print(f"API 调用失败: {e}")raisewith DAG(dag_id="dify_workflow",start_date=pendulum.datetime(2023, 10, 27, tz="Asia/Shanghai"),# '0 8 * * *' 代表每天早上8:00 (UTC+8)# Airflow 默认使用 UTC,但 Cron 表达式本身不带时区,调度器会根据 DAG 的时区设置来解释schedule="0 8 * * *",catchup=False,tags=["dify", "api", "example"],doc_md="""### Dify 工作流调用 DAG此 DAG 每天早上8点执行,它会:1. 从本地文件系统读取一个 CSV 文件。2. 将该 CSV 文件作为附件,调用一个 Dify 工作流。3. 打印出 Dify API 的响应。""") as dag:run_dify_workflow = PythonOperator(task_id="call_dify",python_callable=call_dify_workflow_with_csv,)
📌 注意:通过API调用Dify工作流中想要上传本地文件,需要先通过/files/upload接口传入相应的文件获取文件id,再将文件id传入工作流中。
每日 8:00,系统自动调用 Dify 工作流,最终由钉钉机器人推送分析报告;
总结与思考
通过 DMS Notebook + DMS Airflow 对 Dify 的能力扩展,我们成功构建了一个具备以下特性的企业级 Agent 开发范式:
这套方案不仅解决了 Dify 当前的局限性,更重要的是,它保留了 Dify 的低代码优势,同时通过与成熟数据基础设施(Notebook + Airflow)的深度集成,实现了 “敏捷开发”与“工程可靠”的平衡。
🌟 核心理念:Agent 的价值不在于“全自动”,而在于“可扩展、可调度、可运维”。真正的生产级智能系统,一定是平台能力与工程实践的结合体。
钉钉搜索:96015019923,加入Dify on DMS用户群!
Dify开通申请:https://page.aliyun.com/form/act1621739703/index.htm
Qwen-Image,生图告别文字乱码
针对AI绘画文字生成不准确的普遍痛点,本方案搭载业界领先的Qwen-Image系列模型,提供精准的图文生成和图像编辑能力,助您轻松创作清晰美观的中英文海报、Logo与创意图。此外,本方案还支持一键图生视频,为内容创作全面赋能。
点击阅读原文查看详情。
53AI,企业落地大模型首选服务商
产品:场景落地咨询+大模型应用平台+行业解决方案
承诺:免费POC验证,效果达标后再合作。零风险落地应用大模型,已交付160+中大型企业
2025-08-28
2025-09-12
2025-09-15
2025-10-19
2025-08-29
2025-09-30
2025-08-29
2025-10-07
2025-09-19
2025-11-06
2025-09-30
2025-06-26
2025-06-15
2025-06-03
2025-05-29
2025-05-26
2025-05-22
2025-05-21