支持私有化部署
AI知识库

53AI知识库

学习大模型的前沿技术与行业应用场景


将Dify对接到NextChat中的具体操作和原理

发布日期:2025-07-04 08:37:16 浏览次数: 1541
作者:NLP工程化

微信搜一搜,关注“NLP工程化”

推荐语

轻松将Dify应用接入NextChat,实现强大的AI对话功能扩展。

核心内容:
1. 配置OpenAI Compatible Dify App插件的详细步骤
2. 源码运行NextChat并进行相关配置的方法
3. OpenAI Compatible Dify App的实现原理与工作机制

杨芳贤
53AI创始人/腾讯云(TVP)最具价值专家

NextChat是一个流行的 ChatGPT 网页应用,除了小程序端外,基本上其它的平台都支持,并且对于Markdown、LaTex、mermaid、代码等有着很好的渲染能力。因为dify接口不是OpenAI兼容的,所以可通过插件方式来实现Dify应用的 OpenAI 兼容模式。OpenAI Compatible Dify App扩展可将 Dify 应用 API 转换为 OpenAI 兼容的 API。

本文使用OpenAI Compatible Dify App v0.0.6版本。目前支持 last_user_message(默认)和all_messages记忆模式,并且仅适用于 chat(聊天)、Agent(智能体) 和 chatflow(对话流) 类型的应用。

一.配置OpenAI Compatible Dify App插件

1.构建简单的Chatflow示例

2.安装OpenAI Compatible Dify App插件

3.配置OpenAI Compatible Dify App插件

添加一个API端点,如下所示:

说明:记忆模式包括使用最后一条用户消息和使用所有消息。

4.OpenAI客户端测试

from openai import OpenAI

client = OpenAI(api_key="app-vEtiEYF28li8ODpLCkO3cB7B", base_url="http://localhost:5002/e/ltjm77cyt7iheipo")
response = client.chat.completions.create(
    model="dify",
    messages=[{
"role""user",
"content""编写Python异步爬虫教程,包含代码示例和注意事项"
    }],
    temperature=0.7,
    max_tokens=4096
)
print(response.choices[0].message.content)

二.源码运行NextChat

1.修改配置文件

2.安装依赖包和运行

yarn install
yarn dev

3.NextChat配置

需要说明接口地址的配置,OpenAI Compatible Dify App中的完整地址是http://localhost:5002/e/ltjm77cyt7iheipo/chat/completions,NextChat接口地址填写http://localhost:3002/api/proxy/dify/e/ltjm77cyt7iheipo,这样配置主要是解决CORS问题。

4.查看对话记录

三.OpenAI Compatible Dify App实现原理

OpenAI Compatible Dify App本质是一个Extension 类型的插件。

1._invoke()方法

整体流程实现了一个 OpenAI 兼容的聊天接口,支持流式和阻塞两种响应模式,并提供了完善的错误处理机制。

def_invoke(self, r: Request, values: Mapping, settings: Mapping) -> Response:
"""
    Invokes the endpoint with the given request.
    """

ifnot self.verify(r, settings):
return Response(
            json.dumps({"message""Unauthorized"}),
            status=401,
            content_type="application/json",
        )
    app_id: str = settings.get("app_id", {}).get("app_id""")
ifnot app_id:
raise ValueError("App ID is required")
ifnot isinstance(app_id, str):
raise ValueError("App ID must be a string")

    memory_mode: str = settings.get("memory_mode""last_user_message")
try:
        data = r.get_json()
        messages = data.get("messages", [])
        stream = data.get("stream"False)
        conversation_id, query = self._get_memory(memory_mode, messages)
        inputs = data.get("inputs", {})
        inputs["messages"] = json.dumps(messages)

if stream:
defgenerator():
                response = self.session.app.chat.invoke(
                    app_id=app_id,
                    inputs=inputs,
                    query=query,
                    response_mode="streaming",
                    conversation_id=conversation_id,
                )
return self._handle_chat_stream_message(app_id, response)

return Response(
                generator(),
                status=200,
                content_type="text/event-stream",
                headers={
"Cache-Control""no-cache",
"Transfer-Encoding""chunked",
                },
            )
else:
            response = self.session.app.chat.invoke(
                app_id=app_id,
                inputs=inputs,
                query=query,
                response_mode="blocking",
                conversation_id=conversation_id,
            )
return Response(
                self._handle_chat_blocking_message(app_id, response),
                status=200,
                content_type="text/html",
            )
except ValueError as e:
return Response(f"Error: {e}", status=400, content_type="text/plain")
except Exception as e:
return Response(f"Error: {e}", status=500, content_type="text/plain")

该方法是 OpenAI 兼容接口的核心处理函数,完整流程:

(1)认证验证

  • 通过 self.verify(r, settings) 检查请求是否已授权

  • 如果验证失败,返回 401 未授权错误响应

(2)参数提取与验证

  • 从设置中提取 app_id,这是必需参数

  • 验证 app_id 不为空且必须是字符串类型

  • 获取 memory_mode 设置,默认值为 "last_user_message"

(3)请求处理

  • 解析请求的 JSON 数据

  • 提取消息列表 messages 和流式处理标志 stream

  • 调用 self._get_memory() 根据记忆模式和消息获取会话 ID 和查询内容

  • 获取输入参数并将消息添加到 inputs 中

(4)分支处理

根据 stream 参数值走不同的处理流程:

  • 流式响应 (stream=True)

    • 定义生成器函数用于流式处理

    • 调用 self.session.app.chat.invoke 发起流式聊天请求

    • 处理流式响应并返回 text/event-stream 类型的数据流

    • 设置适当的 HTTP 头部:无缓存和分块传输编码

  • 阻塞响应 (stream=False)

    • 调用 self.session.app.chat.invoke 发起阻塞式聊天请求

    • 处理响应并返回 HTML 内容类型的响应

(5)错误处理

  • 捕获 ValueError 异常,返回 400 错误响应

  • 捕获其他所有异常,返回 500 服务器错误响应

2.messages_to_text()方法

该方法用于将OpenAI格式的消息列表转换为格式化的文本块。messages是OpenAI格式的消息列表,每个消息是包含rolecontent键的字典。

defmessages_to_text(self, messages: list[dict[str, Any]]) -> str:
"""
    Convert a list of messages to a formatted text block.

    :param messages: A list of dictionaries formatted as OpenAI messages.
    :return: A string containing the formatted conversation.
    """

    text_block = []
for message in messages:
        role = message.get("role")
        content = message.get("content")
if role and content:
            text_block.append(f"{role.upper()}:\n{content}")
return"\n".join(text_block)

(1)初始化空列表text_block,用于存储格式化后的消息文本

(2)遍历消息列表中的每条消息:

(3)最终使用换行符\n连接所有格式化后的消息,返回完整的文本块

这种格式便于将结构化的消息转换为可读性强的文本,适用于日志记录或在不支持结构化消息的场景中展示对话内容。输出格式示例,如下所示:

USER:
用户的消息内容
ASSISTANT:
助手的回复内容
SYSTEM:
系统提示内容

3.convert_to_openai_messages()方法

这个方法实现了从自定义文本格式到 OpenAI API 所需 JSON 格式的转换,使应用能够适配 OpenAI 的接口规范。

假设有以下原始文本字符串:

SYSTEM: 你是一个有用的AI助手。
USER: 你好,能帮我解释一下Python的列表推导式吗?
ASSISTANT: 当然!Python列表推导式是一种简洁创建列表的方法。基本语法是:[表达式 for 变量 in 可迭代对象 if 条件]。例如:[x*2 for x in range(5)] 会生成 [0, 2, 4, 6, 8]。
USER: 谢谢,那字典推导式呢?

方法处理后会生成以下格式的列表:

[
    {
"role""system"
"content""你是一个有用的AI助手。"
    },
    {
"role""user"
"content""你好,能帮我解释一下Python的列表推导式吗?"
    },
    {
"role""assistant"
"content""当然!Python列表推导式是一种简洁创建列表的方法。基本语法是:[表达式 for 变量 in 可迭代对象 if 条件]。例如:[x*2 for x in range(5)] 会生成 [0, 2, 4, 6, 8]。"
    },
    {
"role""user"
"content""谢谢,那字典推导式呢?"
    }
]

这个格式完全符合 OpenAI API 所需的消息格式,可以直接用于发送请求。

defconvert_to_openai_messages(self, raw_message: str) -> list[dict]:
"""
    Convert a raw message string to a list of messages suitable for OpenAI API.

    :param raw_message: A string containing the conversation messages.
    :return: A list of dictionaries formatted as OpenAI messages.
    """

    messages = []
    lines = raw_message.strip().split('\n')

for line in lines:
if line.startswith("SYSTEM:"):
            messages.append({"role""system""content": line[len("SYSTEM:"):].strip()})
elif line.startswith("USER:"):
            messages.append({"role""user""content": line[len("USER:"):].strip()})
elif line.startswith("ASSISTANT:"):
            messages.append({"role""assistant""content": line[len("ASSISTANT:"):].strip()})
return messages

(1)参数接收

接收一个 raw_message 字符串参数,包含格式化的对话内容。

(2)初始化

  • 创建空列表 messages 用于存储转换后的消息

  • 使用 strip() 去除字符串首尾空白,并用 split('\n') 按行分割

(3)消息解析

逐行遍历文本,根据前缀识别不同角色的消息:

  • "SYSTEM:" 开头 → 系统角色消息

  • "USER:" 开头 → 用户角色消息

  • "ASSISTANT:" 开头 → 助手角色消息

(4)消息格式转换

  • 对每行消息,使用 line[len("PREFIX:"):].strip() 提取实际内容

  • 创建符合 OpenAI 格式的字典 {"role": xxx, "content": xxx}

  • 将字典添加到 messages 列表

(5)返回结果

返回包含所有格式化消息的列表。

4._get_memory()方法

该方法在处理OpenAI兼容API请求时,根据配置的记忆模式提取适当的查询内容,以便传递给后续的聊天调用。不同的记忆模式决定了AI模型接收到的上下文信息量。

def_get_memory(
    self, memory_mode: str, messages: list[dict[str, Any]]
)
 -> tuple[str, str]:

"""
    Get the memory from the messages

    returns:
        - conversation_id: str
        - query: str
    """

if memory_mode == "last_user_message":
        user_message = ""
for message in reversed(messages):
if message.get("role") == "user":
                user_message = message.get("content")
break

ifnot user_message:
raise ValueError("No user message found")

return"", user_message
elif memory_mode == "all_messages":
return"", self.messages_to_text(messages)
else:
raise ValueError(
f"Invalid memory mode: {memory_mode}, only support last_user_message for now"
        )

(1)参数

  • memory_mode: 字符串类型,指定记忆模式

  • messages: 消息列表,每条消息是包含角色和内容的字典

(2)返回值

返回一个元组 (conversation_id, query),包含:

  • conversation_id: 会话ID(目前实现中总是返回空字符串)

  • query: 提取出的查询内容

(3)memory_mode"last_user_message"

  • 从后向前遍历消息列表

  • 查找第一条角色为 "user" 的消息

  • 提取其内容作为 user_message

  • 如果未找到用户消息,抛出异常

  • 返回 ("", user_message)

(4)memory_mode"all_messages"

  • 调用 messages_to_text() 方法将所有消息转换为格式化文本

  • 返回 ("", formatted_text)

(5)其它模式

抛出异常,提示只支持 "last_user_message" 模式。

四.流式响应_handle_chat_stream_message()方法

这个方法负责将Dify AI的流式响应转换为OpenAI兼容的格式,采用Server-Sent Events(SSE)方式向客户端推送数据。

注解:Server-Sent Events(SSE)简介

Server-Sent Events(SSE)是一种服务器推送技术,允许服务器通过HTTP连接向客户端实时发送消息流。SSE使用特定格式(data: {json数据}\n\n)进行数据传输,每条消息以"data:"开头并以两个换行符结束。这种技术特别适用于流式聊天应用,服务器可以逐步发送消息内容、结束标志和文件链接等不同类型的事件,而客户端只需建立一次连接即可持续接收数据,最后以"[DONE]"信号表示流结束。与WebSocket相比,SSE更轻量,但仅支持单向通信(服务器到客户端)。EventSource是浏览器原生支持的SSE客户端实现,通过简单的JavaScript API创建持久连接,自动接收服务器推送的事件。

def_handle_chat_stream_message(
    self, app_id: str, generator: Generator[dict[str, Any], None, None]
)
 -> Generator[str, NoneNone]:

"""
    Handle the chat stream
    """

    message_id = ""
for data in generator:
if data.get("event") == "agent_message"or data.get("event") == "message":
            message = {
"id""chatcmpl-" + data.get("message_id""none"),
"object""chat.completion.chunk",
"created": int(data.get("created"0)),
"model""gpt-3.5-turbo",
"system_fingerprint""difyai",
"choices": [
                    {
"index"0,
"delta": {
"role""assistant",
"content": data.get("answer"""),
                        },
"finish_reason"None,
                    }
                ],
            }
            message_id = message.get("id""none")
yieldf"data: {json.dumps(message)}\n\n"
elif data.get("event") == "message_end":
            message = {
"id""chatcmpl-" + data.get("message_id""none"),
"object""chat.completion.chunk",
"created": int(data.get("created"0)),
"model""gpt-3.5-turbo",
"system_fingerprint""difyai",
"choices": [
                    {
"index"0,
"delta": {},
"finish_reason""stop",
                    }
                ],
"usage": {
"completion_tokens": data.get("metadata", {})
                    .get("usage", {})
                    .get("completion_tokens"0),
"prompt_tokens": data.get("metadata", {})
                    .get("usage", {})
                    .get("prompt_tokens"0),
"total_tokens": data.get("metadata", {})
                    .get("usage", {})
                    .get("total_tokens"0),
                },
            }
yieldf"data: {json.dumps(message)}\n\n"
elif data.get("event") == "message_file":
            url = data.get("url""")
            message = {
"id""chatcmpl-" + message_id,
"object""chat.completion.chunk",
"created": int(data.get("created"0)),
"model""gpt-3.5-turbo",
"system_fingerprint""difyai",
"choices": [
                    {
"index"0,
"delta": {
"role""assistant",
"content"f"[{data.get('id''none')}]({url})",
                        },
                    }
                ],
            }
yieldf"data: {json.dumps(message)}\n\n"

yield"data: [DONE]\n\n"

1.方法签名

def_handle_chat_stream_message(
    self, app_id: str, generator: Generator[dict[str, Any], None, None]
)
 -> Generator[str, NoneNone]:

  • 参数:

    • app_id: 应用的唯一标识符

    • generator: 包含响应数据的生成器对象

  • 返回值: 产生SSE格式字符串的生成器

2.初始化变量

初始化一个空的消息ID变量,用于后续跟踪消息。

message_id = ""

3.迭代处理响应数据

代码根据事件类型(event字段)将响应分为三种情况处理:

(1)消息事件 - agent_message 或 message

当收到常规消息时:

message = {
"id""chatcmpl-" + data.get("message_id""none"),
"object""chat.completion.chunk",
# 其它字段...
"choices": [
        {
"index"0,
"delta": {
"role""assistant",
"content": data.get("answer"""),
            },
"finish_reason"None,
        }
    ],
}

这部分构建了一个OpenAI兼容的响应格式,包含消息内容和元数据,并记录当前消息ID。

(2)消息结束事件 - message_end

当一条消息完成时:

message = {
# 消息头部信息...
"choices": [
        {
"index"0,
"delta": {},  # 空delta表示内容结束
"finish_reason""stop",
        }
    ],
"usage": {
# 包含token使用统计信息
"completion_tokens": data.get("metadata", {}).get("usage", {}).get("completion_tokens"0),
# 其它使用统计...
    },
}

这部分表示消息已结束,并提供令牌使用统计信息。

(3)文件消息事件 - message_file

当响应包含文件时:

url = data.get("url""")
message = {
# 消息头部信息...
"choices": [
        {
"index"0,
"delta": {
"role""assistant",
"content"f"[{data.get('id''none')}]({url})",  # Markdown格式链接
            },
        }
    ],
}

这部分处理文件URL,并以Markdown格式[id](url)呈现文件链接。

4.完成整个流式响应

yield"data: [DONE]\n\n"

所有数据处理完后,发送[DONE]标记表示流式响应结束。

5.输出格式

每次产生的数据都按SSE格式输出:

data: {JSON格式的消息}\n\n

这个方法巧妙地将Dify AI的响应格式转换为与OpenAI API完全兼容的格式,使客户端能够无缝集成。

五.阻塞响应_handle_chat_blocking_message()方法

这个函数负责将Dify API的非流式(阻塞式)响应转换为OpenAI API兼容的格式。它是OpenAI兼容层的关键部分,确保Dify API的响应能够以OpenAI格式返回,使得原本为OpenAI设计的客户端可以无缝使用Dify的服务。

def_handle_chat_blocking_message(
    self, app_id: str, response: dict[str, Any]
)
 -> str:

"""
    Handle the chat blocking message
    """

    message = {
"id""chatcmpl-" + response.get("id""none"),
"object""chat.completion",
"created": int(response.get("created"0)),
"model""gpt-3.5-turbo",
"system_fingerprint""difyai",
"choices": [
            {
"index"0,
"message": {
"role""assistant",
"content": response.get("answer"""),
                },
"finish_reason""stop",
            }
        ],
"usage": {
"completion_tokens": response.get("metadata", {})
            .get("usage", {})
            .get("completion_tokens"0),
"prompt_tokens": response.get("metadata", {})
            .get("usage", {})
            .get("prompt_tokens"0),
        },
    }

return json.dumps(message)

1.函数签名

  • 接收两个参数:app_id(应用ID)和response(Dify API的原始响应)

  • 返回转换后的JSON字符串

2.转换过程

  • 创建一个符合OpenAI API格式的message字典

  • 设置基本信息:

    • id: 添加"chatcmpl-"前缀到原始响应ID

    • object: 设为"chat.completion"(OpenAI标准类型)

    • created: 转换时间戳为整数

    • model: 设为"gpt-3.5-turbo"

    • system_fingerprint: 设为"difyai"

  • 设置响应内容:

    • choices数组中添加标准结构

    • 包含助手角色和回答内容

    • 设置finish_reason为"stop"

  • 添加使用统计:

    • completion_tokens: 完成部分的token数

    • prompt_tokens: 提示部分的token数

    • 从原始响应的metadata.usage中提取数据

3.结果返回

使用json.dumps将字典转换为JSON字符串并返回

53AI,企业落地大模型首选服务商

产品:场景落地咨询+大模型应用平台+行业解决方案

承诺:免费场景POC验证,效果验证后签署服务协议。零风险落地应用大模型,已交付160+中大型企业

联系我们

售前咨询
186 6662 7370
预约演示
185 8882 0121

微信扫码

添加专属顾问

回到顶部

加载中...

扫码咨询