微信扫码
添加专属顾问
我要投稿
阿里云AI安全护栏为Dify平台带来全方位AI安全防护,从输入到输出构建端到端安全闭环,助力开发者轻松应对大模型应用中的各类风险挑战。核心内容: 1. 阿里云AI安全护栏的核心优势与功能特性 2. 专为Dify优化的两种集成模式详解 3. 多模态防护与弹性性能配置方案
随着大模型在企业前台业务中的广泛应用,从智能客服、AI搜索到虚拟助手,每一次用户交互都潜藏着不可忽视的安全风险。一旦模型输出违规内容、泄露敏感信息或被恶意操控,不仅影响用户体验,更可能引发法律合规、品牌声誉乃至系统级安全危机。
在此背景下,阿里云AI安全护栏(AI Guardrails)正式登陆Dify Marketplace,为使用Dify构建AI应用的开发者提供原生集成的一站式AI安全解决方案。通过插件化部署或API扩展方式,开发者可在工作流(Workflow)、Agent和Chatflow中快速启用输入输出双向防护机制,实现从用户输入与模型输出的双向检测。
阿里云AI安全护栏产品架构图
从输入到输出,构建端到端安全闭环,全面应对大模型在真实业务中面临的内容安全、外部攻击、隐私泄露与输出失控等关键挑战。
深度集成Qwen3-Guard与基于Qwen系列SFT的审核大模型,融合对抗检测与语义理解能力,可精准识别变体、谐音、隐喻表达及意识形态渗透等高隐蔽性风险。
支持全链路流式审核,在模型逐段生成内容的过程中实时送检,显著降低从token生成到风险发现的延迟,保障高并发场景下的交互流畅性与安全性。
支持单轮问答、多轮问答等场景下的风险检测,通过融合历史对话信息,识别跨轮次诱导、语义漂移与越狱行为,实现对完整对话意图的准确理解,避免因上下文割裂导致的误判。
支持文本、图片、文件等模态的混合检测,有效识别跨模态隐藏指令与复合型攻击,实现多模态风险覆盖。
支持All-in-One API接入,一次调用即可完成全模态检测,按需启用防护能力,集成简单高效,适配主流AI应用架构,助力客户敏捷上线。目前,阿里云AI安全护栏为客户提供包括API接入、阿里云百炼接入、阿里云WAF接入等多种一键接入模式。
通过算法编排动态平衡精度、时延与成本;对于高并发、低延迟场景,可在保障检测效果的前提下提供高性能服务,满足严苛生产需求。
提供可视化控制台,支持风险策略配置、黑白名单、阈值调节与效果验证;更支持用户创建自定义检测Agent——可定义专属标签与提示词,精准识别金融、医疗、教育等行业或特定场景下的业务风险,实现安全能力的灵活扩展与深度定制。
针对当前大量开发者使用Dify构建AI应用的现状,AI安全护栏已上架Dify官方插件市场,提供两种集成路径:
◆ 适用场景:标准Workflow、非流式输出应用;
◆ 适用场景:Agent、Chatflow、需支持流式输出的应用;
◆ 操作方式:
AI安全护栏产品的API最大支持单次2000字符输入,因此输入长度大于2000字符的情况下,需要进行适配,处理方法如下:
输入审查:将输入切分为多段,每段不超过2000个字符,并发调用安全护栏API。
输出审查:Dify每隔300字符左右发起一次内容审核API调用,处理上截取最近2000字符进行调用。
以下分别为处理逻辑和启动脚本的示例代码:
from fastapi import FastAPI, Body, HTTPException, Header
from pydantic import BaseModel
import base64
from collections.abc import Generator
from typing import Any
import hmac
import hashlib
from urllib.parse import quote
import requests
from datetime import datetime
from datetime import timezone
import uuid
import json
import re
import concurrent.futures
# 可以根据需要调用不同区域的服务,支持上海(cn-shanghai)、北京(cn-beijing)、杭州(cn-hangzhou)、深圳(cn-shenzhen)
SERVICE_URL = "https://green-cip.cn-shanghai.aliyuncs.com"
# 超过这个长度时对文本进行切分
MAX_LENGTH = 2000
# 调用安全护栏的输入检测和输出检测的ServiceCode
SERVICE_INPUT = "query_security_check"
SERVICE_OUTPUT = "response_security_check"
ENCODING = "UTF-8"
ISO8601_DATE_FORMAT = "%Y-%m-%dT%H:%M:%SZ"
ALGORITHM = "HmacSHA1"
def format_iso8601_date():
return datetime.now(timezone.utc).strftime(ISO8601_DATE_FORMAT)
def percent_encode(value):
if value is None:
return ""
return (
quote(value.encode(ENCODING), safe="~").replace("+", "%20").replace("*", "%2A")
)
def create_signature(string_to_sign, secret):
secret = secret + "&"
signature = hmac.new(
secret.encode(ENCODING), string_to_sign.encode(ENCODING), hashlib.sha1
).digest()
return base64.b64encode(signature).decode(ENCODING)
def create_string_to_sign(http_method, parameters):
sorted_keys = sorted(parameters.keys())
canonicalized_query_string = ""
for key in sorted_keys:
canonicalized_query_string += (
"&" + percent_encode(key) + "=" + percent_encode(parameters[key])
)
string_to_sign = (
http_method
+ "&"
+ percent_encode("/")
+ "&"
+ percent_encode(canonicalized_query_string[1:])
)
return string_to_sign
def split_text(text: str, max_length: int = 1950) -> list[str]:
"""将文本按 max_length 分段,尽量保留完整句子(识别多种标点)"""
segments = []
while len(text) > max_length:
# 提取当前最大长度范围内的子串
chunk = text[:max_length]
# 使用正则查找最后一个句号、感叹号、问号等断句符号的位置
match = None
for pattern in [r"[。!?;:\.?!]+"]: # 匹配多种结束符号
matches = list(re.finditer(pattern, chunk))
if matches:
match = matches[-1] # 取最后一个匹配项
if match:
cut_point = match.end() # 包含标点符号
else:
cut_point = max_length # 找不到就强制截断
segments.append(text[:cut_point])
text = text[cut_point:]
if text:
segments.append(text)
return segments
def request(content_segment, type, aliyun_access_key, aliyun_access_secret):
print(datetime.now(), f" [{type} request content]-> {content_segment}")
# 3.1 构造请求参数
parameters = {
"Action": "MultiModalGuard",
"Version": "2022-03-02",
"AccessKeyId": aliyun_access_key,
"Timestamp": format_iso8601_date(),
"SignatureMethod": "HMAC-SHA1",
"SignatureVersion": "1.0",
"SignatureNonce": str(uuid.uuid4()),
"Format": "JSON",
"Service": (
SERVICE_INPUT if type == "input" else SERVICE_OUTPUT
),
"ServiceParameters": json.dumps(
{"content": content_segment}, ensure_ascii=False
),
}
string_to_sign = create_string_to_sign("POST", parameters)
signature = create_signature(string_to_sign, aliyun_access_secret)
parameters["Signature"] = signature
# 3.2 发送请求
response = requests.post(SERVICE_URL, data=parameters)
body = response.json()
print(datetime.now(), " [response body]-> ", body)
if response.status_code != 200:
raise Exception(
f"response http status_code not 200. status_code: {response.status_code}, body: {body}"
)
if body.get("Code") != 200:
raise Exception(
f"response code not 200. code: {body.get('Code')}, body: {body}"
)
return body
app = FastAPI()
class InputData(BaseModel):
point: str
params: dict = {}
@app.post("/api/dify/receive")
async def dify_receive(data: InputData = Body(...), authorization: str = Header(None)):
"""
Receive API query data from Dify.
"""
#print(data)
auth_scheme, _, api_key = authorization.partition(" ")
if auth_scheme.lower() != "bearer":
raise HTTPException(status_code=401, detail="Unauthorized")
# api_key decode
try:
decoded_bytes = base64.b64decode(api_key)
decoded_str = decoded_bytes.decode("utf-8")
ak, sk = decoded_str.split(":", 1)
except Exception as e:
# 如果调用失败,抛出异常
raise HTTPException(status_code=401, detail=f"Base64 Decode AK/SK fail: {e}")
point = data.point
if point == "ping":
return {"result": "pong"}
if point == "app.moderation.input":
return handle_app_moderation_input(params=data.params, ak=ak, sk=sk)
elif point == "app.moderation.output":
return handle_app_moderation_output(params=data.params, ak=ak, sk=sk)
raise HTTPException(status_code=400, detail="Not implemented")
def handle_app_moderation_input(params: dict, ak: str, sk: str):
app_id = params.get("app_id")
inputs = params.get("inputs", {})
query = params.get("query")
contents = (
[query] if len(query) <= MAX_LENGTH else split_text(query, MAX_LENGTH - 50)
)
# 并发执行
bodys = []
with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor:
futures = [executor.submit(request, seg, "input", ak, sk) for seg in contents]
for future in concurrent.futures.as_completed(futures):
bodys.append(future.result())
contentModerationSuggestion=""
sensitiveDataSuggestion=""
promptAttackSuggestion=""
maliciousUrlSuggestion=""
_finalSuggestion="pass"
desensitization=""
# 遍历bodys解析出各个检测项的建议
for body in bodys:
finalSuggestion = body.get("Data", {}).get("Suggestion", "")
detailList = body.get("Data", {}).get("Detail", [])
if finalSuggestion and _finalSuggestion!="block" :
_finalSuggestion = finalSuggestion
for detail in detailList:
suggestion = detail.get("Suggestion", "")
type = detail.get("Type", "")
if type == "contentModeration":
if suggestion and contentModerationSuggestion!="block" :
contentModerationSuggestion = suggestion
elif type == "sensitiveData":
desensitization = detail.get("Result",[])[0].get("Ext",{}).get("Desensitization","")
if suggestion and sensitiveDataSuggestion!="block" :
sensitiveDataSuggestion = suggestion
elif type == "promptAttack":
if suggestion and promptAttackSuggestion!="block" :
promptAttackSuggestion = suggestion
elif type == "maliciousUrl":
if suggestion and maliciousUrlSuggestion!="block" :
maliciousUrlSuggestion = suggestion
# 可以根据不同的场景返回不同的回答内容
output_response = "Your content violates our usage policy."
if contentModerationSuggestion=="block":
output_response = "Your content involves content security."
elif sensitiveDataSuggestion=="block" or sensitiveDataSuggestion=="mask":
output_response = "Your content involves sensitive data."
elif promptAttackSuggestion=="block":
output_response = "Your content involves prompt attack."
elif maliciousUrlSuggestion=="block":
output_response = "Your content involves malicious url."
flagged = False
action = "direct_output"
if _finalSuggestion == "block" :
flagged = True
elif sensitiveDataSuggestion=="mask":
flagged = True
action = "overridden"
query = desensitization
response = {"flagged": flagged, "action": action}
if flagged:
if action == "direct_output":
response["preset_response"] = output_response
elif action == "overridden":
response["inputs"] = inputs
response["query"] = query
print(response)
return response
def handle_app_moderation_output(params: dict, ak: str, sk: str):
app_id = params.get("app_id")
text = params.get("text", "")
print(f"handle_app_moderation_output length:{len(text)}")
# 获取最近的2000字符,大小根据需要调整,建议大于dify的窗口大小
if len(text) > MAX_LENGTH:
content = text[-MAX_LENGTH:]
else:
content = text
# 执行检测
body = request(content, "output", ak, sk)
contentModerationSuggestion=""
sensitiveDataSuggestion=""
promptAttackSuggestion=""
maliciousUrlSuggestion=""
desensitization=""
_finalSuggestion=body.get("Data", {}).get("Suggestion", "")
detailList = body.get("Data", {}).get("Detail", [])
for detail in detailList:
suggestion = detail.get("Suggestion", "")
type = detail.get("Type", "")
if type == "contentModeration":
contentModerationSuggestion = suggestion
elif type == "sensitiveData":
desensitization = detail.get("Result",[])[0].get("Ext",{}).get("Desensitization","")
sensitiveDataSuggestion = suggestion
elif type == "promptAttack":
promptAttackSuggestion = suggestion
elif type == "maliciousUrl":
maliciousUrlSuggestion = suggestion
# 可以根据不同的场景返回不同的回答内容
output_response = "Your content violates our usage policy."
if contentModerationSuggestion=="block":
output_response = "Your content involves content security."
elif sensitiveDataSuggestion=="block":
output_response = "Your content involves sensitive data."
elif promptAttackSuggestion=="block":
output_response = "Your content involves prompt attack."
elif maliciousUrlSuggestion=="block":
output_response = "Your content involves malicious url."
flagged = False
action = "direct_output"
if _finalSuggestion == "block":
flagged = True
elif sensitiveDataSuggestion=="mask":
flagged = True
action = "overridden"
response = {"flagged": flagged, "action": action}
if flagged:
if action == "direct_output":
response["preset_response"] = output_response
elif action == "overridden":
response["text"] = desensitization
print(response)
return response
if __name__ == "__main__":
import uvicorn
# 开放端口可以根据自定义选择
uvicorn.run(app, host="0.0.0.0", port=8000, reload=True)
将上述Python代码保存到main.py中,使用如下命令启动:
# 启动脚本示例
pip install fastapi uvicorn
uvicorn main:app --reload --host 0.0.0.0
以上输出内容审核的示例代码中,默认为直接回复拒答内容,您可以通过调整返回的action字段,更换为内容替换模式,即将命中的关键词或敏感内容数据替换为*(星号)。
新增API扩展:通过设置-API扩展页面,来实现新增API扩展。
API Endpoint:填写部署转发服务脚本后的可访问地址。
API-Key:填写的是阿里云AK/SK通过:拼接后的Base64字符串,伪代码参考base64({aliyun_accessKey_id}:{aliyun_accessKey_secret})
import base64
# AccessKeyId 和 AccessKeySecret
access_key_id = ""
access_key_secret = ""
# 拼接并编码
auth_str = f"{access_key_id}:{access_key_secret}"
encoded_auth = base64.b64encode(auth_str.encode('utf-8')).decode('utf-8')
print(encoded_auth)
在Agent中配置API扩展:需要在Agent中完成API扩展的配置,已实现成功集成。
在Agent页面右下角选择管理,去配置内容审查。
选择API扩展。
选中已创建完成的AI安全护栏的API扩展。
根据业务需求选择是否打开输入和输出内容的开关。
在输出时Dify会累计约300个字符做一次内容审查。
AI安全能力的建设,不应成为应用开发的额外负担。阿里云AI安全护栏的加入,为Dify社区提供了一种新的可能性:将专业的安全检测能力,以标准化、可配置的方式,无缝融入生成式AI的构建流程中。这不仅是工具链的延伸,更是对“可信赖AI”实践路径的一次探索。
我们相信,只有当安全能力足够轻量、灵活且易于集成,开发者才能在创新与风险控制之间,真正掌握主动权。Dify始终致力于打造开放、透明、可持续的插件生态。也期待每一位开发者在实际场景中验证、反馈并完善这些能力。让AI更好用,也要更安心。
53AI,企业落地大模型首选服务商
产品:场景落地咨询+大模型应用平台+行业解决方案
承诺:免费POC验证,效果达标后再合作。零风险落地应用大模型,已交付160+中大型企业
2025-10-24
阿里夸克AI眼镜开售:叠加补贴后3699元,它能打破AI眼镜的魔咒吗?
2025-10-13
Dify + 飞书组合拳:企业级 AI 安全大脑落地全指南,助力安管效率提升 300%
2025-09-23
专访Plaud中国区CEO:我们只做“必须做”和“不做要死”的事
2025-09-21
一款随时心理陪伴的AI硬件
2025-09-20
硬件不行,AI失灵,AI硬件创业者的第一道生死关
2025-09-19
刚刚,港版 iPhone 17 Air 抢空!eSIM+系统级AI,国行果粉实名羡慕
2025-09-15
钉钉AI硬件DingTalk A1,开会摸鱼的神
2025-09-11
手机敲三下,1秒录入客户信息:我用AI+n8n搭建的数据同步系统
2025-09-19
2025-09-02
2025-10-13
2025-09-02
2025-08-25
2025-09-05
2025-09-02
2025-08-22
2025-08-28
2025-09-11