免费POC, 零成本试错
AI知识库

53AI知识库

学习大模型的前沿技术与行业应用场景


我要投稿

一个销售数据分析机器人的诞生:看 Dify 如何在 DMS 助力下实现自动化闭环

发布日期:2025-11-20 19:55:01 浏览次数: 1532
作者:阿里云开发者

微信搜一搜,关注“阿里云开发者”

推荐语

Dify 与 DMS 的强强联合,打造企业级 AI 应用开发新范式,实现从开发到调度的全链路闭环。

核心内容:
1. Dify 原生能力的局限与挑战
2. DMS Notebook 和 DMS Airflow 如何增强 Dify 功能
3. 销售数据分析机器人的完整开发案例

杨芳贤
53AI创始人/腾讯云(TVP)最具价值专家

背景与挑战

dify 作为一款低代码 AI 应用开发平台,凭借其直观的可视化工作流编排能力,极大降低了大模型应用的开发门槛。然而,在实际企业级落地过程中,我们发现其原生能力仍存在两个关键瓶颈:

  • 代码执行能力受限:Dify 内置的 Sandbox 节点虽支持基础 Python 代码执行,但无法安装自定义 Python 包,难以支撑复杂的业务逻辑、数据处理或算法调用;

  • 缺乏自动化调度机制:Dify 原生架构不支持 Agent 或 Agentic Workflow 的定时触发、周期性运行与依赖编排,导致其难以融入企业级自动化运维体系。

这两个问题严重制约了 Dify 在生产环境中的深度应用——尤其当我们希望构建一个具备“感知-决策-执行-反馈”闭环能力的智能 Agent 时,仅靠 Dify 自身往往力不从心。

为突破这些限制,我们在实践中探索出一套 “Dify + DMS Notebook + DMS Airflow”三位一体的一站式增强开发架构,有效补齐了 Dify 在执行能力与调度能力上的短板:

  • ✅ DMS Notebook:提供完整、可定制的 Python 运行环境,支持第三方库安装、交互式开发与复杂逻辑实现,作为 Dify Sandbox 的强力补充;
  • ✅ DMS Airflow:作为统一调度引擎,实现对 Dify 工作流、Notebook 脚本或 Agent 任务的定时触发、依赖管理与可靠执行;
  • ✅ DMS 平台集成:实现从开发、调试、部署到调度、监控的全链路闭环管理,显著提升 Dify 在企业场景下的工程化落地能力。
本文将以一个销售数据分析机器人的完整开发案例,详细介绍如何基于 DMS 平台,构建一个可调度、可扩展、可运维的 Agent 系统。

使用 DMS Notebook 扩展 Dify 的代码执行能力

为什么需要 Notebook?

在 Dify 中,若需调用 Pandas 进行数据清洗、使用 Prophet 做时间序列预测,或集成企业内部 SDK,Sandbox 节点往往无能为力。而 DMS Notebook 提供了完整的 Python 环境,支持:

  • 自定义 pip 包安装;
  • 环境变量配置(如 AK/SK、API Key);
  • 异步服务开发(如 FastAPI);
  • 与 VPC 内其他服务安全互通。
这使其成为 Dify 外部能力扩展的理想“执行单元”。

步骤详解:构建一个销售数据分析 API 服务

1.创建 DMS Notebook 会话

  • 进入 DMS 控制台 > Notebook 会话 > 创建会话;
1.png
  • 各参数的定义如下:
  • 选择合适的 Python 镜像版本;

2.png

  • 在 配置 > 编辑设置 中:

3.png

  • PyPI 包管理:按 requirements.txt 格式填入依赖(如 pandas, fastapi, uvicorn, nest-asyncio);
  • 环境变量:设置 ALIBABA_CLOUD_ACCESS_KEY_IDALIBABA_CLOUD_ACCESS_KEY_SECRET、大模型 API Key 等;
  • 关键配置:
  • fastapi、uvicorn、nest-asyncio库是必须要安装的;
  • 设置 资源释放时间 = 0(防止服务被自动释放);
  • 设置环境变量 DMS_KERNEL_IDLE_TIMEOUT=0(避免 Jupyter Kernel 因空闲被 kill)。
💡 踩坑提示:若未设置 DMS_KERNEL_IDLE_TIMEOUT=0,长时间运行的 API 服务可能在空闲数分钟后被系统回收,导致后续调用失败。
  • 创建完成后,在notebook会话窗口中点击启动即可

4.png

2.编写并启动 FastAPI 服务

点击文件夹图标,右键点击default(默认库),再点击新建Notebook文件

5.png

  • 在代码块中编写相关的python代码,可以参考以下模板构建你的API服务,更多FastAPI相关的使用方法请查看官方文档:https://fastapi.tiangolo.com/
import osfrom fastapi import FastAPI, HTTPException, Request, File, UploadFile, Path, Query, Form, Headerfrom fastapi.staticfiles import StaticFilesfrom typing import Optionalimport nest_asyncioimport asyncioimport httpximport io'''注意,Jupyter本身就在一个asyncio事件循环中运行。我们不能在已有循环中直接运行另一个循环,但nest_asyncio这个库可以“打补丁”,允许我们这样做。'''nest_asyncio.apply()app = FastAPI(title="Your Service Name", description="Description of Your Service")# 创建static目录(如果不存在)static_dir = "static"ifnot os.path.exists(static_dir):    os.makedirs(static_dir)# 挂载静态文件服务app.mount("/static", StaticFiles(directory=static_dir), name="static")@app.get("/")async def root():    """    根节点,返回服务基本信息    # --- 如何使用 curl 调用 ---    curl -X GET "http://127.0.0.1:8000/"        """    return {        "message""Service is running",        "documentation""/docs",        "note""..."    }@app.post("/process-data/{item_id}")async def process_data(    request: Request, # 使用 Request 对象来接收原始请求        # 路径参数    item_id: int = Path(..., title="物品ID", ge=1),        # 查询参数    is_premium: bool = Query(False, description="是否为高级物品"),        # 请求头参数    x_token: Optional[str] = Header(None, description="自定义的认证Token")):    """    接收 JSON 请求体、路径参数、查询参数和请求头。    # --- 如何使用 curl 调用 ---    # -X POST: 指定请求方法    # URL: 包含路径参数 {item_id} 和查询参数 ?is_premium=true    # -H: 添加请求头 (Header)    # -d: 发送请求体 (Body),这里是 JSON 字符串    curl -X POST "http://127.0.0.1:8000/process-data/101?is_premium=true" \         -H "Content-Type: application/json" \         -H "X-Token: my-secret-token" \         -d '{"name": "笔记本电脑", "price": 7999.9, "tags": ["electronics", "office"]}'    """    if x_token != "my-secret-token":        raise HTTPException(status_code=401, detail="X-Token 无效")        try:        # 手动解析 JSON 请求体        json_body = await request.json()                name = json_body.get("name")        price = json_body.get("price")                # 你的业务逻辑代码        if not name or not price:            raise HTTPException(status_code=400, detail="请求体中缺少 'name' 或 'price'")        return {            "message""数据处理成功",            "received_data": {                "item_id": item_id,                "is_premium": is_premium,                "x_token": x_token,                "body": json_body            }        }    except Exception as e:        raise HTTPException(status_code=500, detail=f"服务执行错误: {str(e)}")@app.post("/upload-file")async def upload_file(    # 表单数据    token: str = Form(...),    # 上传文件    file: UploadFile = File(...)):    """    通过表单(form-data)上传文件和附带的文本信息。    # --- 如何使用 curl 调用 ---    # -F: 用于发送 multipart/form-data    # -F "file=@/path/to/your/file.txt": @符号表示后面是文件路径,curl会读取该文件内容作为上传数据    # -F "token=user123": 发送一个名为 token 的表单字段    # 注意: 请将 /path/to/your/file.txt 替换为你的本地文件真实路径    curl -X POST "http://127.0.0.1:8000/upload-file" \         -F "file=@./test_upload.txt" \         -F "token=my-form-token"    """    # 为了让curl示例能工作,我们先创建一个示例文件    if not os.path.exists("test_upload.txt"):        with open("test_upload.txt""w"as f:            f.write("This is a test file for curl upload.")    try:        contents = await file.read()        file_location = os.path.join(static_dir, file.filename)        with open(file_location, "wb"as f:            f.write(contents)                    return {            "message""文件上传成功!",            "token": token,            "filename": file.filename,            "file_size"len(contents),            "file_url"f"/static/{file.filename}"        }    except Exception as e:        raise HTTPException(status_code=500, detail=f"文件处理错误: {str(e)}")@app.get("/status")async def get_server_status():    """    获取服务器状态。    # --- 如何使用 curl 调用 ---    curl -X GET "http://127.0.0.1:8000/status"    """    return {"status""running"}async def run_server(host="127.0.0.1", port=8000):    """在后台运行uvicorn服务器"""    import uvicorn    config = uvicorn.Config(app, host=host, port=port, log_level="info")    server = uvicorn.Server(config)    # uvicorn.run() 是一个阻塞调用,所以我们用更底层的Server.serve()    await server.serve()task = asyncio.create_task(run_server(host="0.0.0.0", port=8000))# 等待服务启动await asyncio.sleep(2)# 创建一个异步HTTP客户端async with httpx.AsyncClient() as client:    print("正在向 http://127.0.0.1:8000/status/ 发送请求...")        # 发送POST请求    response = await client.get("http://127.0.0.1:8000/status")        # 打印结果    if response.status_code == 200:        print("服务启动成功")    else:        print("服务启动失败,请检查报错信息")
  • 接下来我们以构建一个单日销售数据分析的API为例,代码内容如下:

import osimport pandas as pdfrom fastapi import FastAPI, HTTPException, Request, File, UploadFilefrom fastapi.staticfiles import StaticFilesimport nest_asyncioimport asyncioimport httpximport io
'''注意,Jupyter本身就在一个asyncio事件循环中运行。我们不能在已有循环中直接运行另一个循环,但nest_asyncio这个库可以“打补丁”,允许我们这样做。'''nest_asyncio.apply()
app = FastAPI(title="Sales Data Analysis Service", description="Provides data analysis and chart generation capabilities for Dify")
# 创建static目录(如果不存在)static_dir = "static"ifnot os.path.exists(static_dir):    os.makedirs(static_dir)
# 挂载静态文件服务app.mount("/static", StaticFiles(directory=static_dir), name="static")
def load_sales_data_from_file(file_content: bytes):    """从上传的文件内容加载销售数据"""    try:        # 将字节内容转换为StringIO对象        csv_string = file_content.decode('utf-8')        df = pd.read_csv(io.StringIO(csv_string))                # 验证必要的列是否存在        required_columns = ['Date''Product''Price''Amount''Region']        ifnot all(col in df.columns for col in required_columns):            raise ValueError(f"CSV file must contain columns: {', '.join(required_columns)}")                # 转换数据类型        df['Date'] = pd.to_datetime(df['Date'])        df['Price'] = pd.to_numeric(df['Price'], errors='coerce')        df['Amount'] = pd.to_numeric(df['Amount'], errors='coerce')                # 计算销售额 (Sales = Price × Amount)        df['Sales'] = df['Price'] * df['Amount']                return df    except Exception as e:        raise HTTPException(status_code=500, detail=f"Error processing CSV file: {str(e)}")
@app.get("/")async def root():    """Root endpoint, returns service information"""    return {        "message""Sales Data Analysis Service is running",        "documentation""/docs",        "endpoints": [            "POST /analysis/daily_sale_analysis"        ],        "note""Require a CSV file upload with columns: Date, Product, Price, Amount, Region"    }
@app.post("/analysis/daily_sale_analysis")async def daily_sale_analysis(    file: UploadFile = File(...)):    """当日销售数据分析 - 分析上传文件中的销售数据"""    try:        # 验证文件类型        ifnot file.filename.endswith('.csv'):            raise HTTPException(status_code=400, detail="文件必须是CSV格式")                # 读取上传的文件        file_content = await file.read()        df = load_sales_data_from_file(file_content)                # 获取数据中的日期范围        df['Date'] = pd.to_datetime(df['Date']).dt.date        unique_dates = sorted(df['Date'].unique())                if len(unique_dates) == 0:            raise HTTPException(status_code=400, detail="数据文件中没有有效的日期数据")                # 如果有多个日期,取最新的日期作为分析目标        target_date = unique_dates[-1if len(unique_dates) > 1else unique_dates[0]                # 筛选目标日期的数据        daily_data = df[df['Date'] == target_date].copy()                if daily_data.empty:            raise HTTPException(status_code=400, detail=f"没有找到日期 {target_date} 的销售数据")                # 基础统计        total_sales = daily_data['Sales'].sum()        total_orders = len(daily_data)        total_quantity = daily_data['Amount'].sum()        avg_order_value = total_sales / total_orders if total_orders > 0else0                # 产品分析        product_analysis = daily_data.groupby('Product').agg({            'Sales''sum',            'Amount''sum',            'Price''mean'        }).round(2)                # 按销售额排序,取前5名产品        top_products = product_analysis.sort_values('Sales', ascending=False).head(5)        top_products_list = []        for product, row in top_products.iterrows():            top_products_list.append({                "product": product,                "sales"float(row['Sales']),                "quantity"int(row['Amount']),                "avg_price"float(row['Price'])            })                # 地区分析        region_analysis = daily_data.groupby('Region').agg({            'Sales''sum',            'Amount''sum'        }).round(2)                # 按销售额排序        top_regions = region_analysis.sort_values('Sales', ascending=False)        region_list = []        for region, row in top_regions.iterrows():            region_list.append({                "region": region,                "sales"float(row['Sales']),                "quantity"int(row['Amount']),                "percentage"round(float(row['Sales']) / total_sales * 1002)            })                # 价格区间分析        daily_data['price_range'] = pd.cut(daily_data['Price'],                                          bins=[010050010005000float('inf')],                                          labels=['0-100''100-500''500-1000''1000-5000''5000+'])                price_range_analysis = daily_data.groupby('price_range').agg({            'Sales''sum',            'Amount''sum'        }).round(2)                price_ranges = []        for price_range, row in price_range_analysis.iterrows():            ifnot pd.isna(row['Sales']) and row['Sales'] > 0:                price_ranges.append({                    "range"str(price_range),                    "sales"float(row['Sales']),                    "quantity"int(row['Amount'])                })                # 生成洞察分析        insights = []                # 销售额洞察        if total_sales > 100000:            insights.append(f"当日销售表现优秀,总销售额达到 {total_sales:,.2f} 元")        elif total_sales > 50000:            insights.append(f"当日销售表现良好,总销售额为 {total_sales:,.2f} 元")        else:            insights.append(f"当日销售额为 {total_sales:,.2f} 元,可能需要关注销售策略")                # 产品洞察        if len(top_products_list) > 0:            best_product = top_products_list[0]            insights.append(f"最佳销售产品是 {best_product['product']},销售额 {best_product['sales']:,.2f} 元")                # 地区洞察        if len(region_list) > 0:            best_region = region_list[0]            insights.append(f"销售表现最佳的地区是 {best_region['region']},占总销售额的 {best_region['percentage']}%")                # 订单洞察        if avg_order_value > 1000:            insights.append(f"平均订单价值较高,为 {avg_order_value:,.2f} 元,显示客户购买力强")                return {            "analysis_date"str(target_date),            "summary": {                "total_sales"round(float(total_sales), 2),                "total_orders"int(total_orders),                "total_quantity"int(total_quantity),                "average_order_value"round(float(avg_order_value), 2)            },            "top_products": top_products_list,            "region_analysis": region_list,            "price_range_analysis": price_ranges,            "insights": insights,            "data_info": {                "date_range"f"{unique_dates[0]} 到 {unique_dates[-1]}"if len(unique_dates) > 1else str(unique_dates[0]),                "total_records"len(daily_data),                "unique_products"len(daily_data['Product'].unique()),                "unique_regions"len(daily_data['Region'].unique())            }        }            except HTTPException:        raise    except Exception as e:        raise HTTPException(status_code=500, detail=f"当日销售数据分析错误: {str(e)}")
@app.get("/status")async def get_server_status():    """获取服务器状态"""    try:        return {            "status""running"        }    except Exception as e:        raise HTTPException(status_code=500, detail=f"Error getting server status: {str(e)}")
async def run_server(host="127.0.0.1", port=8000):    """在后台运行uvicorn服务器"""    import uvicorn    config = uvicorn.Config(app, host=host, port=port, log_level="info")    server = uvicorn.Server(config)    # uvicorn.run() 是一个阻塞调用,所以我们用更底层的Server.serve()    await server.serve()
task = asyncio.create_task(run_server(host="0.0.0.0", port=8000))
# 等待服务启动await asyncio.sleep(2)
# 创建一个异步HTTP客户端async with httpx.AsyncClient() as client:    print("正在向 http://127.0.0.1:8000/status/ 发送请求...")        # 发送POST请求    response = await client.get("http://127.0.0.1:8000/status")        # 打印结果    if response.status_code == 200:        print("服务启动成功")    else:        print("服务启动失败,请检查报错信息")

✅ 异步支持:Jupyter 内置 asyncio 事件循环,可直接使用 async/await

  • 运行相关代码块,可以在输出部分看到API服务启动成功;

6.png

7.png

3. 查看IP地址

在notebook代码块中,可以使用英文感叹号+终端命令的形式来执行命令,你也可以使用!pip install xxx来安装额外需要的python包。接下来新建一个代码块,在代码块中输入!ifconfig并点击运行查看该 Notebook 会话在VPC中的IP地址,图中172.16.0.252即为所需的IP地址,API服务地址即为:http://172.16.0.252:8080/analyze_sales

8.png

在Dify on DMS实例中访问服务

  • 现在我们使用这份模拟销售数据文件来访问API服务:

  • 在工作流中添加HTTP请求节点:

9.png

  • 通过http://:<端口>/xxx 访问创建的API的服务,并在BODY中传入相应的参数;

10.png

  • 测试运行可以看到该请求成功返回了响应的输出;

11.png

  • 你也可以在Notebook会话中看到相应的服务被调用;

12.png

  • 接下来以该Dify工作流为例子进行完整的服务调用;

13.png

  • 在钉钉群组中添加一个自定义的机器人,并参考https://open.dingtalk.com/document/orgapp/robot-overview 了解如何获取钉钉机器人的access_token和sign_secret

14.png

  • 在填写完你的钉钉机器人参数之后,点击右上角的运行-》从本地上传上传示例的销售数据,并点击开始运行;

15.png

  • 钉钉群中的机器人成功发送了对该销售数据的分析报告;

16.png

  • 点击右上角的发布-》发布更新发布工作流用于后面的定时调用;

17.png

使用 DMS Airflow 实现定时调度

创建DMS Airflow实例

  • 参照以下链接在DMS中创建Airflow实例

  • https://help.aliyun.com/zh/dms/purchase-airflow-resources

  • https://help.aliyun.com/zh/dms/create-and-manage-an-airflow-instance

  • 更多关于Airflow的操作,参照https://airflow.apache.org/docs/apache-airflow/stable/index.html

编写 DAG:每日自动触发销售分析

以下是示例的python代码,用于定时调用Dify工作流的API
import pendulumimport requestsimport jsonfrom airflow.models.dag import DAGfrom airflow.operators.python import PythonOperatorfrom airflow.models import Variable
CSV_FILE_PATH = "/yourpath/daily_sale_data.csv"DIFY_API_URL = "https://dify-dms.aliyuncs.com/v1" # 替换成你的 Dify Workflow API URL# 从 Airflow Variable 中安全地获取 API KeyDIFY_API_KEY = Variable.get("dify_api_key")APP_API_KEY= Variable.get("app_api_key")
def call_dify_workflow_with_csv(**kwargs):    """    读取 CSV 文件内容,并将其作为文件上传调用 Dify 工作流。    """
    print(f"准备从 '{CSV_FILE_PATH}' 读取文件...")
    try:        with open(CSV_FILE_PATH, 'rb'as f:            files_to_upload = {                'file': ('daily_sale_data.csv', f, 'document/csv'            }
            # 准备 API 请求的 headers 和 body            headers = {                'Authorization'f'Bearer {APP_API_KEY}',                'DifyApiKey'f'{DIFY_API_KEY}',            }
                                    file_upload_response=requests.post(                DIFY_API_URL+'/files/upload',                headers=headers,                 data={'user''airflow-user-demo'},                 files=files_to_upload,            )

            print(file_upload_response.json())                            file_id=file_upload_response.json().get('id')
            headers.update({'Content-Type''application/json'})                        # 'inputs' 通常是json字符串            # 'user' 是必须的,代表最终用户的标识符            input_data = {                'sales_data':  {                        "type""document",                        "transfer_method""local_file",                        "upload_file_id": file_id                    }            }
            data = {                'inputs': input_data,                  'user''airflow-user-demo',                'response_mode''blocking',            }
            print("开始调用 Dify API...")            print(f"URL: {DIFY_API_URL}")                        response = requests.post(                DIFY_API_URL+'/workflows/run'                headers=headers,                 json=data,            )                        # 检查请求是否成功            response.raise_for_status()                         print(f"API 调用成功!状态码: {response.status_code}")                        # 打印响应内容            print("--- Dify API Response ---")                        print(response.json()["data"]["outputs"]["answer"])                                 print("\n--- End of Response ---")                        # 你也可以将完整的响应推送到 XComs,以便下游任务使用            # ti = kwargs['ti']            # ti.xcom_push(key='dify_response', value=full_response)
    except FileNotFoundError:        print(f"错误:文件未找到于 '{CSV_FILE_PATH}'")        raise    except requests.exceptions.RequestException as e:        print(f"API 调用失败: {e}")        raise

with DAG(    dag_id="dify_workflow",    start_date=pendulum.datetime(20231027, tz="Asia/Shanghai"),    # '0 8 * * *' 代表每天早上8:00 (UTC+8)    # Airflow 默认使用 UTC,但 Cron 表达式本身不带时区,调度器会根据 DAG 的时区设置来解释    schedule="0 8 * * *"    catchup=False,    tags=["dify""api""example"],    doc_md="""    ### Dify 工作流调用 DAG    此 DAG 每天早上8点执行,它会:    1. 从本地文件系统读取一个 CSV 文件。    2. 将该 CSV 文件作为附件,调用一个 Dify 工作流。    3. 打印出 Dify API 的响应。    """as dag:    run_dify_workflow = PythonOperator(        task_id="call_dify",        python_callable=call_dify_workflow_with_csv,    )

📌 注意:通过API调用Dify工作流中想要上传本地文件,需要先通过/files/upload接口传入相应的文件获取文件id,再将文件id传入工作流中。

  • 创建完成后打开Airflow实例,可以看到创建的定时任务;

18.png

  • 每日 8:00,系统自动调用 Dify 工作流,最终由钉钉机器人推送分析报告;

19.png

20.png

总结与思考

  • 通过 DMS Notebook + DMS Airflow 对 Dify 的能力扩展,我们成功构建了一个具备以下特性的企业级 Agent 开发范式:

  • 这套方案不仅解决了 Dify 当前的局限性,更重要的是,它保留了 Dify 的低代码优势,同时通过与成熟数据基础设施(Notebook + Airflow)的深度集成,实现了 “敏捷开发”与“工程可靠”的平衡。

🌟 核心理念:Agent 的价值不在于“全自动”,而在于“可扩展、可调度、可运维”。真正的生产级智能系统,一定是平台能力与工程实践的结合体。

了解更多

钉钉搜索:96015019923,加入Dify on DMS用户群!

Dify开通申请:https://page.aliyun.com/form/act1621739703/index.htm 

Qwen-Image,生图告别文字乱码


针对AI绘画文字生成不准确的普遍痛点,本方案搭载业界领先的Qwen-Image系列模型,提供精准的图文生成和图像编辑能力,助您轻松创作清晰美观的中英文海报、Logo与创意图。此外,本方案还支持一键图生视频,为内容创作全面赋能。


点击阅读原文查看详情。


53AI,企业落地大模型首选服务商

产品:场景落地咨询+大模型应用平台+行业解决方案

承诺:免费POC验证,效果达标后再合作。零风险落地应用大模型,已交付160+中大型企业

联系我们

售前咨询
186 6662 7370
预约演示
185 8882 0121

微信扫码

添加专属顾问

回到顶部

加载中...

扫码咨询