微信扫码
添加专属顾问
更多AI技术,关注我的github,欢迎点亮星星:
https://github.com/xinyuwei-david/david-share.git
本demo代码使用Chainlit框架。
Chainlit是一个开源的Python框架,用于构建生产级的对话式人工智能(Conversational AI)应用。它允许开发者快速地创建和部署对话式AI或代理应用,支持多平台和数据持久化。
Chainlit的一些关键特点包括:
快速构建:可以轻松集成到现有代码库,或者从头开始构建
多平台支持:一次编写逻辑,可以在多个平台上使用2
数据持久化:收集、监控和分析用户数据2
可视化多步推理:理解生成输出的中间步骤2
与流行库和框架的集成:支持OpenAI、Mistral AI、Llama Index、LangChain、Autogen、Haystack等。
我们分析代码的主程序:app.py。
代码中,Chainlit 框架封装了所有的接口,使得您需要直接处理底层的网络通信、事件分发或接口实现。
代码中使用了许多由 Chainlit 提供的装饰器,这些装饰器封装了与前端交互的接口,简化了编程过程。例如:
@cl.on_chat_start:当聊天会话开始时触发。
@cl.on_message:当收到用户文本消息时触发。
@cl.on_audio_start:当用户开始音频输入时触发。
@cl.on_audio_chunk:当收到音频数据块时触发。
@cl.on_audio_end、@cl.on_chat_end、@cl.on_stop:当音频输入结束、聊天结束或会话停止时触发。
这些装饰器使您能够专注于业务逻辑的实现,而不必关心底层的事件监听和处理机制。
在传统的应用程序中,您可能需要自己编写代码来处理前端和后端之间的通信,例如:
监听前端的请求或事件。
解析前端发送的数据。
将处理结果返回给前端。
然而,Chainlit 为您处理了这些细节,它封装了前端与后端之间的通信,使得后端代码可以直接处理事件。
通过 cl.user_session,您可以在用户会话中存储和获取数据。这使您能够:
为每个用户维护独立的会话状态。
存储用户相关的信息,如 track_id、openai_realtime 实例等。
在不同的事件处理函数之间共享数据。
在您的代码中,您使用了 AsyncAzureOpenAI 客户端与 Azure OpenAI 服务进行交互。Chainlit 可以与这些服务无缝集成,简化了与大型语言模型的交互过程。
错误处理:通过提供 @cl.on_error 等装饰器,Chainlit 允许您处理在应用程序中发生的异常和错误。
日志记录:通过 chainlit.logger 模块,您可以方便地记录日志信息,帮助调试和监控应用程序。
简化开发:您不需要处理复杂的事件循环、网络通信或异步 I/O 操作,只需专注于实现业务逻辑。
提高效率:使用 Chainlit 的封装,可以大大减少样板代码(boilerplate code)的数量,使代码更加简洁明了。
代码可读性:通过装饰器和上下文管理,代码的结构更加清晰,逻辑更加直观。
可扩展性:Chainlit 提供了丰富的功能和插件,方便您在未来扩展应用程序的功能。
Chainlit 封装了应用程序与前端的交互接口,包括事件监听、消息传递、音频数据处理等。
您的代码主要使用 Chainlit 提供的装饰器和上下文管理来处理各种事件和会话数据。
这种封装使得您可以专注于业务逻辑的实现,而不必关心底层的通信和接口细节。
import os
import asyncio
from openai import AsyncAzureOpenAI
import chainlit as cl
from uuid import uuid4
from chainlit.logger import logger
from realtime import RealtimeClient
from realtime.tools import tools
解释:
os:用于与操作系统交互,主要用来获取环境变量。
asyncio:Python 标准库,用于编写异步代码,支持异步 I/O 操作。
AsyncAzureOpenAI:从 openai 包中导入,异步的 Azure OpenAI 客户端,用于与 Azure OpenAI 服务进行异步交互。
chainlit:一个用于创建聊天机器人的框架,简化了聊天机器人的开发流程。
uuid4:用于生成随机的 UUID(通用唯一标识符)。
logger:从 chainlit.logger 导入,用于日志记录,方便调试和监控。
RealtimeClient 和 tools:从自定义的 realtime 模块中导入,用于实时通信和处理。
client = AsyncAzureOpenAI(
api_key=os.environ["AZURE_OPENAI_API_KEY"],
azure_endpoint=os.environ["AZURE_OPENAI_ENDPOINT"],
azure_deployment=os.environ["AZURE_OPENAI_DEPLOYMENT"],
api_version="2024-10-01-preview"
)
解释:
这里创建了一个异步的 Azure OpenAI 客户端 client,需要从环境变量中获取 API 密钥、终端、部署名称。这些信息通常在 Azure 上注册服务时获得。
api_version 指定了使用的 API 版本。
async def setup_openai_realtime(system_prompt: str):
"""Instantiate and configure the OpenAI Realtime Client"""
openai_realtime = RealtimeClient(system_prompt=system_prompt)
cl.user_session.set("track_id", str(uuid4()))
# 以下省略事件处理函数的定义和配置
解释:
定义了一个异步函数 setup_openai_realtime,用于实例化和配置 OpenAI 实时客户端。
system_prompt:系统提示,用于指导模型的行为(我们不需要详细分析这个部分)。
openai_realtime:创建了一个实时客户端实例,传入了系统提示。
cl.user_session.set("track_id", str(uuid4())):为用户会话设置一个唯一的 track_id,用于跟踪音频流或其他会话相关的信息。
在 setup_openai_realtime 函数内,定义了一系列异步的事件处理函数,用于处理不同的事件。这些事件处理函数会在特定的事件发生时被调用。
handle_conversation_updated
async def handle_conversation_updated(event):
item = event.get("item")
delta = event.get("delta")
"""Currently used to stream audio back to the client."""
if delta:
# Only one of the following will be populated for any given event
if 'audio' in delta:
audio = delta['audio'] # Int16Array, audio added
await cl.context.emitter.send_audio_chunk(cl.OutputAudioChunk(mimeType="pcm16", data=audio, track=cl.user_session.get("track_id")))
if 'transcript' in delta:
transcript = delta['transcript'] # string, transcript added
print(transcript)
if 'arguments' in delta:
arguments = delta['arguments'] # string, function arguments added
pass
解释:
当对话更新时调用此函数。
event:包含事件的数据。
delta:表示与上一次事件的差异,可能包含 audio、transcript 或 arguments。
处理逻辑:
如果 delta 包含 audio:
获取音频数据 audio。
使用 cl.context.emitter.send_audio_chunk 将音频数据发送回客户端,用于音频流的实时传输。
如果 delta 包含 transcript:
获取转录文本并打印出来。这可能是语音识别后的文本。
如果 delta 包含 arguments:
获取函数参数。这可能与功能调用或命令有关。
handle_item_completed
async def handle_item_completed(item):
"""Used to populate the chat context with transcription once an item is completed."""
# print(item) # TODO
pass
解释:
当某个项目(可能是一次对话或消息)完成时调用。
当前函数内容为空,仅包含一个 pass,表示未来可能会添加处理逻辑。
handle_conversation_interrupt
async def handle_conversation_interrupt(event):
"""Used to cancel the client previous audio playback."""
cl.user_session.set("track_id", str(uuid4()))
await cl.context.emitter.send_audio_interrupt()
解释:
当对话被中断时调用,例如用户停止了音频播放或有新的音频需要播放。
更新用户会话的 track_id,以标记新的音频流。
调用 send_audio_interrupt 通知客户端停止当前的音频播放。
handle_error
async def handle_error(event): logger.error(event)
解释:
当发生错误时调用。
使用日志记录器 logger 记录错误信息,方便调试。
openai_realtime.on('conversation.updated', handle_conversation_updated)
openai_realtime.on('conversation.item.completed', handle_item_completed)
openai_realtime.on('conversation.interrupted', handle_conversation_interrupt)
openai_realtime.on('error', handle_error)
解释:
为 openai_realtime 客户端注册事件处理函数,当特定事件发生时,调用相应的处理函数。
cl.user_session.set("openai_realtime", openai_realtime)
coros = [openai_realtime.add_tool(tool_def, tool_handler) for tool_def, tool_handler in tools]
await asyncio.gather(*coros)
解释:
将 openai_realtime 客户端实例保存到用户会话中,方便在其他地方访问。
使用列表推导式,为每个工具定义和处理器添加到 openai_realtime 中。
await asyncio.gather(*coros):异步地运行所有添加工具的协程。
接下来,代码使用了 @cl.on_* 的装饰器,定义了各种事件处理函数,用于响应聊天和音频相关的事件。
@cl.on_chat_start
@cl.on_chat_start
async def start():
await cl.Message(
content="Hi, Welcome to Xinyu Company. How can I help you?. Press `P` to talk!"
).send()
await setup_openai_realtime(system_prompt=system_prompt + "\n\n Customer ID: 333")
解释:
当聊天会话开始时调用。
发送一条欢迎消息给用户,提示可以按 P 说话。
调用 setup_openai_realtime 函数,传入 system_prompt(系统提示,加上了客户 ID)。
@cl.on_message
@cl.on_message
async def on_message(message: cl.Message):
openai_realtime: RealtimeClient = cl.user_session.get("openai_realtime")
if openai_realtime and openai_realtime.is_connected():
await openai_realtime.send_user_message_content([{ "type": 'input_text', "text": message.content}])
else:
await cl.Message(content="Please activate voice mode before sending messages!").send()
解释:
当收到用户的文本消息时调用。
从用户会话中获取 openai_realtime 客户端实例。
如果实时客户端存在且已连接:
调用 send_user_message_content 将用户的文本消息发送给实时客户端,类型为 input_text。
否则:
提示用户需要激活语音模式才能发送消息。
@cl.on_audio_start
@cl.on_audio_start
async def on_audio_start():
try:
openai_realtime: RealtimeClient = cl.user_session.get("openai_realtime")
# TODO: might want to recreate items to restore context
# openai_realtime.create_conversation_item(item)
await openai_realtime.connect()
logger.info("Connected to OpenAI realtime")
return True
except Exception as e:
await cl.ErrorMessage(content=f"Failed to connect to OpenAI realtime: {e}").send()
return False
解释:
当音频输入开始时调用,即用户开始说话或录音。
尝试连接到 openai_realtime 客户端。
如果成功,记录连接成功的信息,并返回 True。
如果发生异常,发送错误消息给用户,并返回 False。
@cl.on_audio_chunk
@cl.on_audio_chunk
async def on_audio_chunk(chunk: cl.InputAudioChunk):
openai_realtime: RealtimeClient = cl.user_session.get("openai_realtime")
if openai_realtime:
if openai_realtime.is_connected():
await openai_realtime.append_input_audio(chunk.data)
else:
logger.info("RealtimeClient is not connected")
解释:
当收到音频数据块时调用。
从用户会话中获取 openai_realtime 客户端实例。
如果客户端存在且已连接:
将音频数据块添加到实时客户端的输入中,可能用于语音识别或处理。
否则:
记录客户端未连接的信息。
@cl.on_audio_end, @cl.on_chat_end, @cl.on_stop
@cl.on_audio_end
@cl.on_chat_end
@cl.on_stop
async def on_end():
openai_realtime: RealtimeClient = cl.user_session.get("openai_realtime")
if openai_realtime and openai_realtime.is_connected():
await openai_realtime.disconnect()
解释:
当音频输入结束、聊天结束或会话停止时调用。
从用户会话中获取 openai_realtime 客户端实例。
如果客户端存在且已连接:
调用 disconnect 方法,与实时服务断开连接。
会话开始:
用户进入聊天,会触发 @cl.on_chat_start,发送欢迎消息,并调用 setup_openai_realtime 初始化实时客户端。
用户发送消息:
如果用户发送文本消息,触发 @cl.on_message。
检查实时客户端是否连接,如果已连接,将消息发送给实时客户端处理。
语音交互:
用户按下 P 开始语音输入,触发 @cl.on_audio_start。
尝试连接实时客户端,准备接收音频数据。
用户的语音被分割成音频数据块,每个数据块触发 @cl.on_audio_chunk,将音频数据发送给实时客户端。
用户结束语音输入,触发 @cl.on_audio_end,断开与实时客户端的连接。
实时处理:
实时客户端将音频数据进行处理,可能是语音识别、生成回复等。
处理结果通过事件(如 conversation.updated)回调,调用相应的处理函数 handle_conversation_updated。
将生成的音频或文本回复发送回用户。
会话结束:
用户退出或会话结束,触发 @cl.on_chat_end 或 @cl.on_stop,确保断开与实时客户端的连接,释放资源。
这段代码实现了一个基于 Chainlit 框架的聊天机器人,支持文本和语音交互。它使用了 Azure OpenAI 的服务,通过 RealtimeClient 与 OpenAI 的实时接口交互,处理用户的输入,并将生成的回复(文本或音频)发送给用户。
主要的逻辑包括:
初始化和配置实时客户端,设置事件处理函数。
处理用户的文本消息和语音输入,将其发送给实时客户端。
接收并处理实时客户端的回调事件,将回复发送给用户。
管理会话的开始和结束,确保资源的正确分配和释放。
源码地址:
https://github.com/monuminu/AOAI_Samples/tree/main/realtime-assistant-support
53AI,企业落地大模型首选服务商
产品:场景落地咨询+大模型应用平台+行业解决方案
承诺:免费POC验证,效果达标后再合作。零风险落地应用大模型,已交付160+中大型企业
2026-06-27
大模型问答流程节点设计:从一轮到多轮对话的意图识别
2026-06-21
AI智能体怎么接入你现有的ERP和CRM——系统集成的实操指南
2026-06-03
你的公司需要一个"养Agent的人",但没人知道这个岗位叫什么
2026-05-18
你的下一位同事,可能是“管 AI 的 AI”
2026-04-29
拦截率从15%到55%:快手智能Oncall系统演进与落地实践
2026-02-11
别再让语音机器人“答非所问”:AI Force任务型语音对话技术总结
2026-01-20
用AI做物业管理与租赁助手,A16Z领投2.5亿美元,这家AI团队年营收超1亿
2026-01-19
熠智AI+Milvus:从Embedding 到数据处理、问题重写,电商AI客服架构怎么搭?
2026-01-19
2025-11-22
2025-11-17
2025-11-10
2025-11-02
2025-08-27
2025-08-25
2025-08-08
欢迎您使用【53AI 官方网站】(以下简称“本网站”或“我们”)。本《会员服务协议》(以下简称“本协议”)是您(以下简称“会员”或“用户”)与【深圳市博思协创网络科技有限公司】之间关于注册、登录及使用本网站会员服务所订立的法律协议。
在您注册或登录前,请务必审慎阅读、充分理解各条款内容,特别是免除或限制责任的条款、知识产权条款、争议解决条款等。此类条款将以加粗形式提示您注意。 当您通过微信公众号授权、手机验证码验证或其他方式成功登录本网站时,即视为您已完全理解并同意接受本协议的全部内容。
一、 定义
本网站:指由【深圳市博思协创网络科技有限公司】运营的,域名为【53ai.com】的网站及相关移动端页面。
会员服务:指本网站向注册会员提供的知识库文章查阅、内容检索及其他相关增值服务。
知识库内容:指本网站发布的包括但不限于文字、图表、数据、研究报告、行业分析等数字化内容资源。
二、 账号注册与登录
登录方式:本网站支持以下登录方式,您可根据实际情况选择:
微信公众号授权登录:您同意将您的微信OpenID信息授权给本网站,用于创建或关联会员账号。
手机验证码登录:您需提供真实有效的手机号码,并通过短信验证码完成身份验证与登录/注册。
账号安全:您的账号仅限您本人使用,禁止赠与、借用、租用、转让或售卖。因您保管不善导致的账号被盗、密码泄露等损失,由您自行承担。
实名认证:根据相关法律法规要求,我们可能要求您在特定功能下完成实名认证。如您拒绝提供,可能无法使用部分或全部服务。
未成年人保护:若您未满18周岁,请在法定监护人的陪同下阅读本协议,并在征得监护人同意后使用本服务。
三、 服务内容与规范
知识库查阅权限:会员登录后,有权按照其会员等级对应的权限范围,在线浏览、检索本网站知识库中的相关文章及内容。
服务变更:我们有权根据业务发展需要,调整、变更或终止部分服务内容,并将以网站公告、公众号消息等方式提前通知。
禁止行为:您在使用服务时不得实施以下行为:
利用技术手段批量爬取、下载、转存知识库内容;
将知识库内容用于商业目的或未经授权地向第三方传播;
干扰本网站正常运行或侵犯其他用户合法权益;
发布违法违规信息或从事违反公序良俗的活动。
四、 知识产权声明
权利归属:本网站知识库中的排版设计、软件代码等内容的知识产权均归【公司全称】或原权利人所有,受《中华人民共和国著作权法》等法律保护。
有限许可:本网站授予会员一项非独占、不可转让、不可转授权的普通许可,仅限于个人学习、研究之目的在线查阅知识库内容。
侵权追责:未经书面许可,任何单位或个人不得以任何形式复制、转载、摘编、镜像、汇编或以其他方式使用上述内容。一经发现,我们保留追究其法律责任的权利。
五、 个人信息保护
我们重视对您个人信息的保护。关于我们如何收集、使用、存储和保护您的个人信息,请单独阅读 《隐私政策》。
您通过微信公众号授权或手机号验证所提供的信息,我们将严格按照《个人信息保护法》的规定处理,仅用于身份识别、服务提供及安全验证等必要用途。
您可以随时通过网站设置或联系客服行使查阅、更正、删除个人信息及撤回授权同意的权利。
六、 免责声明
内容准确性:知识库内容仅供参考,不构成专业建议。我们不对其完整性、准确性、时效性作任何明示或暗示的保证,您应自行判断并承担使用风险。
不可抗力:因自然灾害、政策法规变化、网络故障、第三方平台接口异常(如微信接口维护、运营商短信通道故障)等不可抗力导致的服务中断或延迟,我们不承担违约责任。
第三方链接:本网站可能包含指向第三方网站的链接,该等网站的内容和服务不受我们控制,请您自行甄别风险。
七、 违约责任
如您违反本协议约定,我们有权视情节采取警告、限制功能、暂停服务、注销账号等措施,并保留要求赔偿损失的权利。
如因您的违约行为导致我们遭受行政处罚、第三方索赔或商誉损失,您应承担全部赔偿责任(包括但不限于罚款、赔偿金、律师费、公证费等)。
八、 法律适用与争议解决
本协议的订立、执行和解释均适用中华人民共和国大陆地区法律。
因本协议产生的或与本协议有关的任何争议,双方应友好协商解决;协商不成的,任何一方均可向【公司所在地】有管辖权的人民法院提起诉讼。
九、 其他
本协议构成双方就本服务达成的完整协议,取代此前任何口头或书面约定。
本协议任一条款被认定为无效或不可执行的,不影响其他条款的效力。
我们对本协议享有最终解释权,并在法律允许的范围内保留随时修改的权利。修改后的协议一经公布即生效,继续使用服务即视为同意修订内容。