免费POC, 零成本试错
AI知识库

53AI知识库

学习大模型的前沿技术与行业应用场景


Dify平台集成阿里云AI安全护栏,构建AI Runtime安全防线

发布日期:2025-10-23 22:13:57 浏览次数: 1516
作者:阿里云安全

微信搜一搜,关注“阿里云安全”

推荐语

阿里云AI安全护栏为Dify平台带来全方位AI安全防护,从输入到输出构建端到端安全闭环,助力开发者轻松应对大模型应用中的各类风险挑战。

核心内容:
1. 阿里云AI安全护栏的核心优势与功能特性
2. 专为Dify优化的两种集成模式详解
3. 多模态防护与弹性性能配置方案

杨芳贤
53AI创始人/腾讯云(TVP)最具价值专家

随着大模型在企业前台业务中的广泛应用,从智能客服、AI搜索到虚拟助手,每一次用户交互都潜藏着不可忽视的安全风险。一旦模型输出违规内容、泄露敏感信息或被恶意操控,不仅影响用户体验,更可能引发法律合规、品牌声誉乃至系统级安全危机。


在此背景下,阿里云AI安全护栏(AI Guardrails)正式登陆Dify Marketplace,为使用Dify构建AI应用的开发者提供原生集成的一站式AI安全解决方案。通过插件化部署或API扩展方式,开发者可在工作流(Workflow)、Agent和Chatflow中快速启用输入输出双向防护机制,实现从用户输入与模型输出的双向检测


阿里云AI安全护栏产品架构图



 阿里云AI安全护栏 


核心优势




全链路防护

从输入到输出,构建端到端安全闭环,全面应对大模型在真实业务中面临的内容安全、外部攻击、隐私泄露与输出失控等关键挑战。



智能双引擎

深度集成Qwen3-Guard与基于Qwen系列SFT的审核大模型,融合对抗检测与语义理解能力,可精准识别变体、谐音、隐喻表达及意识形态渗透等高隐蔽性风险。



流式审核

支持全链路流式审核,在模型逐段生成内容的过程中实时送检,显著降低从token生成到风险发现的延迟,保障高并发场景下的交互流畅性与安全性。



上下文感知

支持单轮问答、多轮问答等场景下的风险检测,通过融合历史对话信息,识别跨轮次诱导、语义漂移与越狱行为,实现对完整对话意图的准确理解,避免因上下文割裂导致的误判。



多模态防护

支持文本、图片、文件等模态的混合检测,有效识别跨模态隐藏指令与复合型攻击,实现多模态风险覆盖。



灵活快速接入

支持All-in-One API接入,一次调用即可完成全模态检测,按需启用防护能力,集成简单高效,适配主流AI应用架构,助力客户敏捷上线。目前,阿里云AI安全护栏为客户提供包括API接入、阿里云百炼接入、阿里云WAF接入等多种一键接入模式。



弹性性能配置

通过算法编排动态平衡精度、时延与成本;对于高并发、低延迟场景,可在保障检测效果的前提下提供高性能服务,满足严苛生产需求。



可视化与自定义

提供可视化控制台,支持风险策略配置、黑白名单、阈值调节与效果验证;更支持用户创建自定义检测Agent——可定义专属标签与提示词,精准识别金融、医疗、教育等行业或特定场景下的业务风险,实现安全能力的灵活扩展与深度定制。



 两种集成模式详解 


专为Dify用户优化


针对当前大量开发者使用Dify构建AI应用的现状,AI安全护栏已上架Dify官方插件市场,提供两种集成路径:



模式一
工作流插件集成(零代码,快速上线)

◆  适用场景标准Workflow、非流式输出应用;

◆  操作方式


  • 在Dify插件市场下载并安装“AI安全护栏”插件。

  • 在LLM节点前插入“输入检测”节点,拦截提示词注入、DDoS攻击等。
  • 在LLM节点后插入“输出检测”节点,防止敏感信息泄露或不当内容输出。
  • 效果示例:


模式二
扩展内容审查API(支持流式,适合高阶场景)

  适用场景Agent、Chatflow、需支持流式输出的应用;

◆  操作方式

  • 部署转发服务
    • AI安全护栏产品的API最大支持单次2000字符输入,因此输入长度大于2000字符的情况下,需要进行适配,处理方法如下:

    • 输入审查:将输入切分为多段,每段不超过2000个字符,并发调用安全护栏API。

    • 输出审查:Dify每隔300字符左右发起一次内容审核API调用,处理上截取最近2000字符进行调用。

    • 以下分别为处理逻辑和启动脚本的示例代码:

from fastapi import FastAPI, Body, HTTPException, Headerfrom pydantic import BaseModelimport base64from collections.abc import Generatorfrom typing import Anyimport hmacimport hashlibfrom urllib.parse import quoteimport requestsfrom datetime import datetimefrom datetime import timezoneimport uuidimport jsonimport reimport concurrent.futures# 可以根据需要调用不同区域的服务,支持上海(cn-shanghai)、北京(cn-beijing)、杭州(cn-hangzhou)、深圳(cn-shenzhen)SERVICE_URL = "https://green-cip.cn-shanghai.aliyuncs.com"# 超过这个长度时对文本进行切分MAX_LENGTH = 2000# 调用安全护栏的输入检测和输出检测的ServiceCodeSERVICE_INPUT = "query_security_check"SERVICE_OUTPUT = "response_security_check"ENCODING = "UTF-8"ISO8601_DATE_FORMAT = "%Y-%m-%dT%H:%M:%SZ"ALGORITHM = "HmacSHA1"def format_iso8601_date():    return datetime.now(timezone.utc).strftime(ISO8601_DATE_FORMAT)def percent_encode(value):    if value is None:        return ""    return (        quote(value.encode(ENCODING), safe="~").replace("+", "%20").replace("*", "%2A")    )def create_signature(string_to_sign, secret):    secret = secret + "&"    signature = hmac.new(        secret.encode(ENCODING), string_to_sign.encode(ENCODING), hashlib.sha1    ).digest()    return base64.b64encode(signature).decode(ENCODING)def create_string_to_sign(http_method, parameters):    sorted_keys = sorted(parameters.keys())    canonicalized_query_string = ""    for key in sorted_keys:        canonicalized_query_string += (            "&" + percent_encode(key) + "=" + percent_encode(parameters[key])        )    string_to_sign = (        http_method        + "&"        + percent_encode("/")        + "&"        + percent_encode(canonicalized_query_string[1:])    )    return string_to_signdef split_text(text: str, max_length: int = 1950) -> list[str]:    """将文本按 max_length 分段,尽量保留完整句子(识别多种标点)"""    segments = []    while len(text) > max_length:        # 提取当前最大长度范围内的子串        chunk = text[:max_length]        # 使用正则查找最后一个句号、感叹号、问号等断句符号的位置        match = None        for pattern in [r"[。!?;:\.?!]+"]:  # 匹配多种结束符号            matches = list(re.finditer(pattern, chunk))            if matches:                match = matches[-1]  # 取最后一个匹配项        if match:            cut_point = match.end()  # 包含标点符号        else:            cut_point = max_length  # 找不到就强制截断        segments.append(text[:cut_point])        text = text[cut_point:]    if text:        segments.append(text)    return segmentsdef request(content_segment, type, aliyun_access_key, aliyun_access_secret):    print(datetime.now(), f" [{type} request content]-> {content_segment}")    # 3.1 构造请求参数    parameters = {        "Action": "MultiModalGuard",        "Version": "2022-03-02",        "AccessKeyId": aliyun_access_key,        "Timestamp": format_iso8601_date(),        "SignatureMethod": "HMAC-SHA1",        "SignatureVersion": "1.0",        "SignatureNonce": str(uuid.uuid4()),        "Format": "JSON",        "Service": (            SERVICE_INPUT if type == "input" else SERVICE_OUTPUT        ),        "ServiceParameters": json.dumps(            {"content": content_segment}, ensure_ascii=False        ),    }    string_to_sign = create_string_to_sign("POST", parameters)    signature = create_signature(string_to_sign, aliyun_access_secret)    parameters["Signature"] = signature    # 3.2 发送请求    response = requests.post(SERVICE_URL, data=parameters)    body = response.json()    print(datetime.now(), " [response body]-> ", body)    if response.status_code != 200:        raise Exception(            f"response http status_code not 200. status_code: {response.status_code}, body: {body}"        )    if body.get("Code") != 200:        raise Exception(            f"response code not 200. code: {body.get('Code')}, body: {body}"        )    return bodyapp = FastAPI()class InputData(BaseModel):    point: str    params: dict = {}@app.post("/api/dify/receive")async def dify_receive(data: InputData = Body(...), authorization: str = Header(None)):    """    Receive API query data from Dify.    """    #print(data)    auth_scheme, _, api_key = authorization.partition(" ")    if auth_scheme.lower() != "bearer":        raise HTTPException(status_code=401, detail="Unauthorized")    # api_key decode    try:        decoded_bytes = base64.b64decode(api_key)        decoded_str = decoded_bytes.decode("utf-8")        ak, sk = decoded_str.split(":", 1)    except Exception as e:        # 如果调用失败,抛出异常        raise HTTPException(status_code=401, detail=f"Base64 Decode AK/SK fail: {e}")    point = data.point    if point == "ping":        return {"result": "pong"}    if point == "app.moderation.input":        return handle_app_moderation_input(params=data.params, ak=ak, sk=sk)    elif point == "app.moderation.output":        return handle_app_moderation_output(params=data.params, ak=ak, sk=sk)    raise HTTPException(status_code=400, detail="Not implemented")def handle_app_moderation_input(params: dict, ak: str, sk: str):    app_id = params.get("app_id")    inputs = params.get("inputs", {})    query = params.get("query")    contents = (        [query] if len(query) <= MAX_LENGTH else split_text(query, MAX_LENGTH - 50)    )    # 并发执行    bodys = []    with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor:        futures = [executor.submit(request, seg, "input", ak, sk) for seg in contents]        for future in concurrent.futures.as_completed(futures):            bodys.append(future.result())    contentModerationSuggestion=""    sensitiveDataSuggestion=""    promptAttackSuggestion=""    maliciousUrlSuggestion=""    _finalSuggestion="pass"    desensitization=""    # 遍历bodys解析出各个检测项的建议    for body in bodys:        finalSuggestion = body.get("Data", {}).get("Suggestion", "")        detailList = body.get("Data", {}).get("Detail", [])        if finalSuggestion and _finalSuggestion!="block" :            _finalSuggestion = finalSuggestion        for detail in detailList:            suggestion = detail.get("Suggestion", "")            type = detail.get("Type", "")            if type == "contentModeration":                if suggestion and contentModerationSuggestion!="block" :                    contentModerationSuggestion = suggestion            elif type == "sensitiveData":                desensitization = detail.get("Result",[])[0].get("Ext",{}).get("Desensitization","")                if suggestion and sensitiveDataSuggestion!="block" :                    sensitiveDataSuggestion = suggestion            elif type == "promptAttack":                if suggestion and promptAttackSuggestion!="block" :                    promptAttackSuggestion = suggestion            elif type == "maliciousUrl":                if suggestion and maliciousUrlSuggestion!="block" :                    maliciousUrlSuggestion = suggestion    # 可以根据不同的场景返回不同的回答内容    output_response = "Your content violates our usage policy."    if contentModerationSuggestion=="block":        output_response = "Your content involves content security."    elif sensitiveDataSuggestion=="block" or sensitiveDataSuggestion=="mask":        output_response = "Your content involves sensitive data."    elif promptAttackSuggestion=="block":        output_response = "Your content involves prompt attack."    elif maliciousUrlSuggestion=="block":        output_response = "Your content involves malicious url."    flagged = False    action = "direct_output"    if _finalSuggestion == "block" :        flagged = True    elif sensitiveDataSuggestion=="mask":        flagged = True        action = "overridden"        query = desensitization    response = {"flagged": flagged, "action": action}    if flagged:        if action == "direct_output":            response["preset_response"] = output_response        elif action == "overridden":            response["inputs"] = inputs            response["query"] = query    print(response)    return responsedef handle_app_moderation_output(params: dict, ak: str, sk: str):    app_id = params.get("app_id")    text = params.get("text", "")    print(f"handle_app_moderation_output length:{len(text)}")    # 获取最近的2000字符,大小根据需要调整,建议大于dify的窗口大小    if len(text) > MAX_LENGTH:        content = text[-MAX_LENGTH:]    else:        content = text    # 执行检测    body = request(content, "output", ak, sk)    contentModerationSuggestion=""    sensitiveDataSuggestion=""    promptAttackSuggestion=""    maliciousUrlSuggestion=""    desensitization=""    _finalSuggestion=body.get("Data", {}).get("Suggestion", "")    detailList = body.get("Data", {}).get("Detail", [])    for detail in detailList:        suggestion = detail.get("Suggestion", "")        type = detail.get("Type", "")        if type == "contentModeration":            contentModerationSuggestion = suggestion        elif type == "sensitiveData":            desensitization = detail.get("Result",[])[0].get("Ext",{}).get("Desensitization","")            sensitiveDataSuggestion = suggestion        elif type == "promptAttack":            promptAttackSuggestion = suggestion        elif type == "maliciousUrl":            maliciousUrlSuggestion = suggestion    # 可以根据不同的场景返回不同的回答内容    output_response = "Your content violates our usage policy."    if contentModerationSuggestion=="block":        output_response = "Your content involves content security."    elif sensitiveDataSuggestion=="block":        output_response = "Your content involves sensitive data."    elif promptAttackSuggestion=="block":        output_response = "Your content involves prompt attack."    elif maliciousUrlSuggestion=="block":        output_response = "Your content involves malicious url."    flagged = False    action = "direct_output"    if _finalSuggestion == "block":        flagged = True    elif sensitiveDataSuggestion=="mask":        flagged = True        action = "overridden"    response = {"flagged": flagged, "action": action}    if flagged:        if action == "direct_output":            response["preset_response"] = output_response        elif action == "overridden":            response["text"] = desensitization    print(response)    return responseif __name__ == "__main__":    import uvicorn    # 开放端口可以根据自定义选择    uvicorn.run(app, host="0.0.0.0", port=8000, reload=True)

  • 将上述Python代码保存到main.py中,使用如下命令启动:


# 启动脚本示例pip install fastapi uvicornuvicorn main:app --reload --host 0.0.0.0
  • 以上输出内容审核的示例代码中,默认为直接回复拒答内容,您可以通过调整返回的action字段,更换为内容替换模式,即将命中的关键词或敏感内容数据替换为*(星号)。


  • 新增API扩展:通过设置-API扩展页面,来实现新增API扩展。

  • API Endpoint:填写部署转发服务脚本后的可访问地址。

  • API-Key:填写的是阿里云AK/SK通过:拼接后的Base64字符串,伪代码参考base64({aliyun_accessKey_id}:{aliyun_accessKey_secret})


2-3.png
import base64# AccessKeyId 和 AccessKeySecretaccess_key_id = ""access_key_secret = ""# 拼接并编码auth_str = f"{access_key_id}:{access_key_secret}"encoded_auth = base64.b64encode(auth_str.encode('utf-8')).decode('utf-8')print(encoded_auth)

  • 在Agent中配置API扩展:需要在Agent中完成API扩展的配置,已实现成功集成。

  • 在Agent页面右下角选择管理,去配置内容审查。

2-6.png
  • 选择API扩展。

  • 选中已创建完成的AI安全护栏的API扩展。

  • 根据业务需求选择是否打开输入和输出内容的开关。

2-7.png
  • 在输出时Dify会累计约300个字符做一次内容审查。

2-8.png
  • 效果示例:



AI安全能力的建设,不应成为应用开发的额外负担。阿里云AI安全护栏的加入,为Dify社区提供了一种新的可能性:将专业的安全检测能力,以标准化、可配置的方式,无缝融入生成式AI的构建流程中。这不仅是工具链的延伸,更是对“可信赖AI”实践路径的一次探索。


我们相信,只有当安全能力足够轻量、灵活且易于集成,开发者才能在创新与风险控制之间,真正掌握主动权。Dify始终致力于打造开放、透明、可持续的插件生态。也期待每一位开发者在实际场景中验证、反馈并完善这些能力。让AI更好用,也要更安心。


  阿里云安全  

国际领先的云安全解决方案提供方,零信任SASE、数据安全、流量安全等8大安全域百余项核心能力,助力百行百业在云上构建生于云架构,具备高度一体化、智能化、自我进化特征的原生安全保护体系。

阿里云安全能力获权威机构认可:在2023年Forrester《基础设施即服务平台原生安全Wave™》报告中,阿里云荣升强劲表现者象限,容器安全等标准中获得最高分;在IDC《中国公有云网络安全即服务市场份额,2022》报告中,阿里云市场份额位居第一;在Gartner®发布的网络防火墙魔力象限《Magic Quadrant™ for Network Firewalls,2022》中,阿里云连续2年进入“挑战者”象限。

云原生安全技术的引领探索和实践者,通过安全能力与云紧耦合,实现双向技术的变革式突破,安全能效数倍提升,高弹高可用、稳定与协同;云服务内置天然免疫基因,与用户一起共同守护云上数字原生世界安全。

53AI,企业落地大模型首选服务商

产品:场景落地咨询+大模型应用平台+行业解决方案

承诺:免费POC验证,效果达标后再合作。零风险落地应用大模型,已交付160+中大型企业

联系我们

售前咨询
186 6662 7370
预约演示
185 8882 0121

微信扫码

添加专属顾问

回到顶部

加载中...

扫码咨询