微信扫码
添加专属顾问
我要投稿
轻松将Dify应用接入NextChat,实现强大的AI对话功能扩展。 核心内容: 1. 配置OpenAI Compatible Dify App插件的详细步骤 2. 源码运行NextChat并进行相关配置的方法 3. OpenAI Compatible Dify App的实现原理与工作机制
NextChat是一个流行的 ChatGPT 网页应用,除了小程序端外,基本上其它的平台都支持,并且对于Markdown、LaTex、mermaid、代码等有着很好的渲染能力。因为dify接口不是OpenAI兼容的,所以可通过插件方式来实现Dify应用的 OpenAI 兼容模式。OpenAI Compatible Dify App扩展可将 Dify 应用 API 转换为 OpenAI 兼容的 API。
本文使用OpenAI Compatible Dify App v0.0.6版本。目前支持 last_user_message
(默认)和all_messages
记忆模式,并且仅适用于 chat(聊天)、Agent(智能体) 和 chatflow(对话流) 类型的应用。
添加一个API端点,如下所示:
说明:记忆模式包括使用最后一条用户消息和使用所有消息。
from openai import OpenAI
client = OpenAI(api_key="app-vEtiEYF28li8ODpLCkO3cB7B", base_url="http://localhost:5002/e/ltjm77cyt7iheipo")
response = client.chat.completions.create(
model="dify",
messages=[{
"role": "user",
"content": "编写Python异步爬虫教程,包含代码示例和注意事项"
}],
temperature=0.7,
max_tokens=4096
)
print(response.choices[0].message.content)
yarn install
yarn dev
需要说明接口地址的配置,OpenAI Compatible Dify App中的完整地址是http://localhost:5002/e/ltjm77cyt7iheipo/chat/completions
,NextChat接口地址填写http://localhost:3002/api/proxy/dify/e/ltjm77cyt7iheipo
,这样配置主要是解决CORS问题。
OpenAI Compatible Dify App本质是一个Extension 类型的插件。
整体流程实现了一个 OpenAI 兼容的聊天接口,支持流式和阻塞两种响应模式,并提供了完善的错误处理机制。
def_invoke(self, r: Request, values: Mapping, settings: Mapping) -> Response:
"""
Invokes the endpoint with the given request.
"""
ifnot self.verify(r, settings):
return Response(
json.dumps({"message": "Unauthorized"}),
status=401,
content_type="application/json",
)
app_id: str = settings.get("app_id", {}).get("app_id", "")
ifnot app_id:
raise ValueError("App ID is required")
ifnot isinstance(app_id, str):
raise ValueError("App ID must be a string")
memory_mode: str = settings.get("memory_mode", "last_user_message")
try:
data = r.get_json()
messages = data.get("messages", [])
stream = data.get("stream", False)
conversation_id, query = self._get_memory(memory_mode, messages)
inputs = data.get("inputs", {})
inputs["messages"] = json.dumps(messages)
if stream:
defgenerator():
response = self.session.app.chat.invoke(
app_id=app_id,
inputs=inputs,
query=query,
response_mode="streaming",
conversation_id=conversation_id,
)
return self._handle_chat_stream_message(app_id, response)
return Response(
generator(),
status=200,
content_type="text/event-stream",
headers={
"Cache-Control": "no-cache",
"Transfer-Encoding": "chunked",
},
)
else:
response = self.session.app.chat.invoke(
app_id=app_id,
inputs=inputs,
query=query,
response_mode="blocking",
conversation_id=conversation_id,
)
return Response(
self._handle_chat_blocking_message(app_id, response),
status=200,
content_type="text/html",
)
except ValueError as e:
return Response(f"Error: {e}", status=400, content_type="text/plain")
except Exception as e:
return Response(f"Error: {e}", status=500, content_type="text/plain")
该方法是 OpenAI 兼容接口的核心处理函数,完整流程:
(1)认证验证
通过 self.verify(r, settings)
检查请求是否已授权
如果验证失败,返回 401 未授权错误响应
(2)参数提取与验证
从设置中提取 app_id
,这是必需参数
验证 app_id
不为空且必须是字符串类型
获取 memory_mode
设置,默认值为 "last_user_message"
(3)请求处理
解析请求的 JSON 数据
提取消息列表 messages
和流式处理标志 stream
调用 self._get_memory()
根据记忆模式和消息获取会话 ID 和查询内容
获取输入参数并将消息添加到 inputs
中
(4)分支处理
根据 stream
参数值走不同的处理流程:
流式响应 (stream=True)
定义生成器函数用于流式处理
调用 self.session.app.chat.invoke
发起流式聊天请求
处理流式响应并返回 text/event-stream
类型的数据流
设置适当的 HTTP 头部:无缓存和分块传输编码
阻塞响应 (stream=False)
调用 self.session.app.chat.invoke
发起阻塞式聊天请求
处理响应并返回 HTML 内容类型的响应
(5)错误处理
捕获 ValueError
异常,返回 400 错误响应
捕获其他所有异常,返回 500 服务器错误响应
该方法用于将OpenAI格式的消息列表转换为格式化的文本块。messages
是OpenAI格式的消息列表,每个消息是包含role
和content
键的字典。
defmessages_to_text(self, messages: list[dict[str, Any]]) -> str:
"""
Convert a list of messages to a formatted text block.
:param messages: A list of dictionaries formatted as OpenAI messages.
:return: A string containing the formatted conversation.
"""
text_block = []
for message in messages:
role = message.get("role")
content = message.get("content")
if role and content:
text_block.append(f"{role.upper()}:\n{content}")
return"\n".join(text_block)
(1)初始化空列表text_block
,用于存储格式化后的消息文本
(2)遍历消息列表中的每条消息:
(3)最终使用换行符\n
连接所有格式化后的消息,返回完整的文本块
这种格式便于将结构化的消息转换为可读性强的文本,适用于日志记录或在不支持结构化消息的场景中展示对话内容。输出格式示例,如下所示:
USER:
用户的消息内容
ASSISTANT:
助手的回复内容
SYSTEM:
系统提示内容
这个方法实现了从自定义文本格式到 OpenAI API 所需 JSON 格式的转换,使应用能够适配 OpenAI 的接口规范。
假设有以下原始文本字符串:
SYSTEM: 你是一个有用的AI助手。
USER: 你好,能帮我解释一下Python的列表推导式吗?
ASSISTANT: 当然!Python列表推导式是一种简洁创建列表的方法。基本语法是:[表达式 for 变量 in 可迭代对象 if 条件]。例如:[x*2 for x in range(5)] 会生成 [0, 2, 4, 6, 8]。
USER: 谢谢,那字典推导式呢?
方法处理后会生成以下格式的列表:
[
{
"role": "system",
"content": "你是一个有用的AI助手。"
},
{
"role": "user",
"content": "你好,能帮我解释一下Python的列表推导式吗?"
},
{
"role": "assistant",
"content": "当然!Python列表推导式是一种简洁创建列表的方法。基本语法是:[表达式 for 变量 in 可迭代对象 if 条件]。例如:[x*2 for x in range(5)] 会生成 [0, 2, 4, 6, 8]。"
},
{
"role": "user",
"content": "谢谢,那字典推导式呢?"
}
]
这个格式完全符合 OpenAI API 所需的消息格式,可以直接用于发送请求。
defconvert_to_openai_messages(self, raw_message: str) -> list[dict]:
"""
Convert a raw message string to a list of messages suitable for OpenAI API.
:param raw_message: A string containing the conversation messages.
:return: A list of dictionaries formatted as OpenAI messages.
"""
messages = []
lines = raw_message.strip().split('\n')
for line in lines:
if line.startswith("SYSTEM:"):
messages.append({"role": "system", "content": line[len("SYSTEM:"):].strip()})
elif line.startswith("USER:"):
messages.append({"role": "user", "content": line[len("USER:"):].strip()})
elif line.startswith("ASSISTANT:"):
messages.append({"role": "assistant", "content": line[len("ASSISTANT:"):].strip()})
return messages
(1)参数接收
接收一个 raw_message
字符串参数,包含格式化的对话内容。
(2)初始化
创建空列表 messages
用于存储转换后的消息
使用 strip()
去除字符串首尾空白,并用 split('\n')
按行分割
(3)消息解析
逐行遍历文本,根据前缀识别不同角色的消息:
"SYSTEM:" 开头 → 系统角色消息
"USER:" 开头 → 用户角色消息
"ASSISTANT:" 开头 → 助手角色消息
(4)消息格式转换
对每行消息,使用 line[len("PREFIX:"):].strip()
提取实际内容
创建符合 OpenAI 格式的字典 {"role": xxx, "content": xxx}
将字典添加到 messages
列表
(5)返回结果
返回包含所有格式化消息的列表。
该方法在处理OpenAI兼容API请求时,根据配置的记忆模式提取适当的查询内容,以便传递给后续的聊天调用。不同的记忆模式决定了AI模型接收到的上下文信息量。
def_get_memory(
self, memory_mode: str, messages: list[dict[str, Any]]
) -> tuple[str, str]:
"""
Get the memory from the messages
returns:
- conversation_id: str
- query: str
"""
if memory_mode == "last_user_message":
user_message = ""
for message in reversed(messages):
if message.get("role") == "user":
user_message = message.get("content")
break
ifnot user_message:
raise ValueError("No user message found")
return"", user_message
elif memory_mode == "all_messages":
return"", self.messages_to_text(messages)
else:
raise ValueError(
f"Invalid memory mode: {memory_mode}, only support last_user_message for now"
)
(1)参数
memory_mode
: 字符串类型,指定记忆模式
messages
: 消息列表,每条消息是包含角色和内容的字典
(2)返回值
返回一个元组 (conversation_id, query)
,包含:
conversation_id
: 会话ID(目前实现中总是返回空字符串)
query
: 提取出的查询内容
(3)memory_mode
为"last_user_message"
从后向前遍历消息列表
查找第一条角色为 "user"
的消息
提取其内容作为 user_message
如果未找到用户消息,抛出异常
返回 ("", user_message)
(4)memory_mode
为"all_messages"
调用 messages_to_text()
方法将所有消息转换为格式化文本
返回 ("", formatted_text)
(5)其它模式
抛出异常,提示只支持 "last_user_message"
模式。
这个方法负责将Dify AI的流式响应转换为OpenAI兼容的格式,采用Server-Sent Events(SSE)方式向客户端推送数据。
注解:Server-Sent Events(SSE)简介
Server-Sent Events(SSE)是一种服务器推送技术,允许服务器通过HTTP连接向客户端实时发送消息流。SSE使用特定格式(
data: {json数据}\n\n
)进行数据传输,每条消息以"data:"开头并以两个换行符结束。这种技术特别适用于流式聊天应用,服务器可以逐步发送消息内容、结束标志和文件链接等不同类型的事件,而客户端只需建立一次连接即可持续接收数据,最后以"[DONE]"信号表示流结束。与WebSocket相比,SSE更轻量,但仅支持单向通信(服务器到客户端)。EventSource
是浏览器原生支持的SSE客户端实现,通过简单的JavaScript API创建持久连接,自动接收服务器推送的事件。
def_handle_chat_stream_message(
self, app_id: str, generator: Generator[dict[str, Any], None, None]
) -> Generator[str, None, None]:
"""
Handle the chat stream
"""
message_id = ""
for data in generator:
if data.get("event") == "agent_message"or data.get("event") == "message":
message = {
"id": "chatcmpl-" + data.get("message_id", "none"),
"object": "chat.completion.chunk",
"created": int(data.get("created", 0)),
"model": "gpt-3.5-turbo",
"system_fingerprint": "difyai",
"choices": [
{
"index": 0,
"delta": {
"role": "assistant",
"content": data.get("answer", ""),
},
"finish_reason": None,
}
],
}
message_id = message.get("id", "none")
yieldf"data: {json.dumps(message)}\n\n"
elif data.get("event") == "message_end":
message = {
"id": "chatcmpl-" + data.get("message_id", "none"),
"object": "chat.completion.chunk",
"created": int(data.get("created", 0)),
"model": "gpt-3.5-turbo",
"system_fingerprint": "difyai",
"choices": [
{
"index": 0,
"delta": {},
"finish_reason": "stop",
}
],
"usage": {
"completion_tokens": data.get("metadata", {})
.get("usage", {})
.get("completion_tokens", 0),
"prompt_tokens": data.get("metadata", {})
.get("usage", {})
.get("prompt_tokens", 0),
"total_tokens": data.get("metadata", {})
.get("usage", {})
.get("total_tokens", 0),
},
}
yieldf"data: {json.dumps(message)}\n\n"
elif data.get("event") == "message_file":
url = data.get("url", "")
message = {
"id": "chatcmpl-" + message_id,
"object": "chat.completion.chunk",
"created": int(data.get("created", 0)),
"model": "gpt-3.5-turbo",
"system_fingerprint": "difyai",
"choices": [
{
"index": 0,
"delta": {
"role": "assistant",
"content": f"[{data.get('id', 'none')}]({url})",
},
}
],
}
yieldf"data: {json.dumps(message)}\n\n"
yield"data: [DONE]\n\n"
def_handle_chat_stream_message(
self, app_id: str, generator: Generator[dict[str, Any], None, None]
) -> Generator[str, None, None]:
参数:
app_id
: 应用的唯一标识符
generator
: 包含响应数据的生成器对象
返回值: 产生SSE格式字符串的生成器
初始化一个空的消息ID变量,用于后续跟踪消息。
message_id = ""
代码根据事件类型(event
字段)将响应分为三种情况处理:
(1)消息事件 - agent_message
或 message
当收到常规消息时:
message = {
"id": "chatcmpl-" + data.get("message_id", "none"),
"object": "chat.completion.chunk",
# 其它字段...
"choices": [
{
"index": 0,
"delta": {
"role": "assistant",
"content": data.get("answer", ""),
},
"finish_reason": None,
}
],
}
这部分构建了一个OpenAI兼容的响应格式,包含消息内容和元数据,并记录当前消息ID。
(2)消息结束事件 - message_end
当一条消息完成时:
message = {
# 消息头部信息...
"choices": [
{
"index": 0,
"delta": {}, # 空delta表示内容结束
"finish_reason": "stop",
}
],
"usage": {
# 包含token使用统计信息
"completion_tokens": data.get("metadata", {}).get("usage", {}).get("completion_tokens", 0),
# 其它使用统计...
},
}
这部分表示消息已结束,并提供令牌使用统计信息。
(3)文件消息事件 - message_file
当响应包含文件时:
url = data.get("url", "")
message = {
# 消息头部信息...
"choices": [
{
"index": 0,
"delta": {
"role": "assistant",
"content": f"[{data.get('id', 'none')}]({url})", # Markdown格式链接
},
}
],
}
这部分处理文件URL,并以Markdown格式[id](url)
呈现文件链接。
yield"data: [DONE]\n\n"
所有数据处理完后,发送[DONE]
标记表示流式响应结束。
每次产生的数据都按SSE格式输出:
data: {JSON格式的消息}\n\n
这个方法巧妙地将Dify AI的响应格式转换为与OpenAI API完全兼容的格式,使客户端能够无缝集成。
这个函数负责将Dify API的非流式(阻塞式)响应转换为OpenAI API兼容的格式。它是OpenAI兼容层的关键部分,确保Dify API的响应能够以OpenAI格式返回,使得原本为OpenAI设计的客户端可以无缝使用Dify的服务。
def_handle_chat_blocking_message(
self, app_id: str, response: dict[str, Any]
) -> str:
"""
Handle the chat blocking message
"""
message = {
"id": "chatcmpl-" + response.get("id", "none"),
"object": "chat.completion",
"created": int(response.get("created", 0)),
"model": "gpt-3.5-turbo",
"system_fingerprint": "difyai",
"choices": [
{
"index": 0,
"message": {
"role": "assistant",
"content": response.get("answer", ""),
},
"finish_reason": "stop",
}
],
"usage": {
"completion_tokens": response.get("metadata", {})
.get("usage", {})
.get("completion_tokens", 0),
"prompt_tokens": response.get("metadata", {})
.get("usage", {})
.get("prompt_tokens", 0),
},
}
return json.dumps(message)
接收两个参数:app_id
(应用ID)和response
(Dify API的原始响应)
返回转换后的JSON字符串
创建一个符合OpenAI API格式的message
字典
设置基本信息:
id
: 添加"chatcmpl-"前缀到原始响应ID
object
: 设为"chat.completion"(OpenAI标准类型)
created
: 转换时间戳为整数
model
: 设为"gpt-3.5-turbo"
system_fingerprint
: 设为"difyai"
设置响应内容:
在choices
数组中添加标准结构
包含助手角色和回答内容
设置finish_reason
为"stop"
添加使用统计:
completion_tokens
: 完成部分的token数
prompt_tokens
: 提示部分的token数
从原始响应的metadata.usage
中提取数据
使用json.dumps
将字典转换为JSON字符串并返回
53AI,企业落地大模型首选服务商
产品:场景落地咨询+大模型应用平台+行业解决方案
承诺:免费场景POC验证,效果验证后签署服务协议。零风险落地应用大模型,已交付160+中大型企业
2025-06-25
2025-06-04
2025-04-18
2025-04-28
2025-05-08
2025-06-03
2025-05-08
2025-04-14
2025-04-15
2025-04-14
2025-06-26
2025-06-17
2025-05-29
2025-05-28
2025-05-22
2025-04-27
2025-04-15
2025-03-20