支持私有化部署
AI知识库

53AI知识库

学习大模型的前沿技术与行业应用场景


AI Agent&MCP的工程化实践-系列4-模型的可观测工程

发布日期:2025-07-16 06:28:58 浏览次数: 1530
作者:进击的大数据

微信搜一搜,关注“进击的大数据”

推荐语

深入解析AI Agent轨迹记录的核心价值与实现细节,助你打造更健壮的智能系统。

核心内容:
1. Trajectory在Agent调试、审计和优化中的关键作用
2. 基于LangGraph的Agent实现方案剖析
3. 开源项目Trae agent的源码解析与改进实践

杨芳贤
53AI创始人/腾讯云(TVP)最具价值专家

写在前面

👀
在Trajectory的实现中,参照了一些开源的实现,主要是字节的trae Agent和langgraph的源码。这里在看字节的Trae agent的过程中,也顺带改了一个小bug,下一篇会讲下trae agent的源码解析,主要聚焦在几个方面:
  • prompt是怎么写的?
  • benchmark是如何做的?
  • 有一个奇怪的多轮对话的报错,是为什么?
另外下一步会看下PromptX的实现,据说是提出一个挑选工具的协议,看看PromptX这个项目是怎么实现的。
感兴趣的话,先关注吧,说实话,网上也很少有博主像我写的这么详细的。

背景

📌
Trajectory 是什么?它的核心价值是什么?
在 Agent 的语境下,Trajectory(轨迹)指的是 Agent 在处理一个任务或一次对话交互过程中的完整执行记录(轨迹)指的是 Agent 在处理一个任务或一次对话交互过程中的完整执行记录。它就像飞机的“黑匣子”,详细记录了从接收用户输入开始,到最终给出响应的每一步思考、决策和行动。

一个典型的 Trajectory 包含以下关键信息:
  • 用户输入 (User Message):对话的起点。
  • AI 的思考过程 (Thought):Agent 的中间推理步骤。
  • 工具调用 (Tool Call):Agent 决定使用哪个工具,以及传入的参数。
  • 工具执行结果 (Tool Output):工具返回给 Agent 的信息。
  • AI 的最终响应 (AI Message):Agent 回复给用户的内容。

其核心价值主要体现在以下几个方面:
  • 调试与可观测性 (Debugging & Observability):当 Agent 行为不符合预期时,Trajectory 是定位问题的最直接、最有效的工具。开发者可以清晰地看到每一步的输入输出,快速诊断是模型幻觉、工具错误还是逻辑流问题。
  • 审计与归档 (Audit & Archive):在金融、法务、客服等需要合规和追溯的场景下,Trajectory 提供了一份不可篡改的、详细的交互历史。这既可以作为审计凭证,也可以作为历史案例进行归档,计与归档 (Audit & Archive):在金融、法务、客服等需要合规和追溯的场景下,Trajectory 提供了一份不可篡改的、详细的交互历史。这既可以作为审计凭证,也可以作为历史案例进行归档,用于后续的分析和复盘。
  • 评估与优化 (Evaluation & Optimization):通过分析大量的 Trajectory 数据,我们可以评估 Agent 在不同任务上的表现,发现其能力的边界和常见的失败模式,为后续的模型微调(Fine-tuning)或 Prompt Engineering 提供数据支持。
因此想基于 LangGraph 的 Agent 设计并实现一个健壮、可扩展的 Trajectory(轨迹)记录系统。该系统不仅能满足基本的调试、审计和归-档需求,还引入了分布式追踪理念,为 Agent 的行为提供了深度可观测性,最终实现了类似 LangSmith 的轨迹分组与追踪效果。

实现目标

核心目标是捕获 Agent 在执行任务过程中的每一步关键信息,并将其结构化地记录下来。希望做到的功能:
  • 调试与审计:开发者可以清晰地回溯 Agent 的思考链、工具调用和模型响应,快速定位问题。
  • 归档与分析:将 Agent 的完整交互历史永久化存储,用于后续的行为分析和模型优化。
  • 可观测性与分布式追踪:借鉴 OpenTelemetry 等分布式追踪系统的理念,为每一次用户交互(Trace)和其中的每一个步骤(Span)分配唯一 ID,实现跨组件、跨服务的行为链路追踪。
  • 多轮对话分组:能准确地将一次完整的端到端对话(从用户输入到最终回复)划分为一个独立的 Trace,便于分组查看和分析。
  • 高可扩展性:系统设计应与具体存储后端解耦,支持从本地文件轻松扩展到 Kafka、数据库或专业日志系统。
  • 灵活集成:既能无缝集成到 LangGraph 的 ReAct Agent 中,也能作为一个独立的节点(Node)在任何 LangGraph 图中即插即用。

设计与实现

设计

在实现 Trajectory 记录功能的过程中,我评估了多种技术方案,最终选择了一种基于“扫描 state['messages']”的模式Trajectory 记录功能的过程中,我评估了多种技术方案,最终选择了一种基于“扫描 state['messages']”的模式。这个决策是权衡了多种方案的利弊后做出的。

曾经考虑过的方案:

  1. 节点装饰器 (Node Decorator):最初,想为 LangGraph 中的每一个 Node(节点)都包裹一个装饰器。这个装饰器会在节点执行前后自动记录日志。
  • 代码类似
  • 优点:逻辑和业务分离,看起来很优雅。
  • 缺点:实现起来非常复杂。LangGraph 的节点功能各异,有的调用 LLM,有的执行工具,有的只是简单的逻辑判断。为这些异构的节点设计一个通用的、能提取所有关键信息的装饰器,成本很高,且容易与 LangGraph 的内部机制产生冲突。
    class TrajectoryHook:    """Hook for automatically recording LangGraph execution."""
        def __init__(self, recorder: TrajectoryRecorder):        self.recorder = recorder        self._session_id: Optional[str] = None
        def wrap_node(self, node_name: str, node_func: Callable) -> Callable:        """Wrap a node function to record its execution."""        @wraps(node_func)        async def wrapped_node(state: Dict[str, Any]) -> Any:            if not self._session_id:                return await node_func(state)
                # Record node start            await self.recorder.record_event(                self._session_id,                node_name=node_name,                event_type="node_start",                data={"state_keys": list(state.keys()) if isinstance(state, dict) else None}            )
                try:                # Execute node                if asyncio.iscoroutinefunction(node_func):                    result = await node_func(state)                else:                    result = node_func(state)
                    # Record node end                await self.recorder.record_event(                    self._session_id,                    node_name=node_name,                    event_type="node_end",                    data={"has_result": result is not None}                )
                    # Record messages if present                if isinstance(result, dict) and "messages" in result:                    messages = result["messages"]                    if isinstance(messages, list):                        for msg in messages:                            if hasattr(msg, "content"):  # 确保是消息对象                                await self.recorder.record_message(self._session_id, msg)
                    # Record node output                await self.recorder.record_node_output(                    self._session_id,                    node_name,                    result                )
                    return result
                except Exception as e:                # Record error                await self.recorder.record_error(                    self._session_id,                    error_type=type(e).__name__,                    error_message=str(e),                    node_name=node_name                )                raise
            return wrapped_node
        async def __aenter__(self):        """Start recording session."""        self._session_id = await self.recorder.start_session()        return self
        async def __aexit__(self, exc_type, exc_val, exc_tb):        """End recording session."""        if self._session_id:            success = exc_type is None            await self.recorder.end_session(self._session_id, success=success)            self._session_id = None

  • 重写检查点 (RecordingCheckpoint):另一个思路是包装 LangGraph 的 Checkpoint(检查点)机制。因为 Checkpoint 本身就是用来持久化 Agent 状态的,我们可以在它保存状态的同时,将轨迹信息也一并记录下来。
    • 优点:这是一个非常可行的方案。Checkpoint 是状态变更的关键枢纽,在这里做文章能确保捕捉到所有重要的变化。
    • 缺点:它将轨迹记录与状态持久化逻辑强耦合。如果我们未来想更换 Checkpoint 的后端(比如从内存换到 Redis),或者在某些场景下禁用 Checkpoint,可能会影响到轨迹记录功能。当然也可以做一个组合的checkpoint,也是一个可行的方案!

    最终选择的方案:扫描 state['messages']
    我最终实现的方案,其核心依据是:
    👀
    无论 Agent 内部的逻辑图(Graph)多么复杂,其所有关键的外部交互(用户输入、AI 回复、工具调用与结果)最终都会以消息(Message)的形式被追加到 state['messages'] 列表中。

    基于这个洞察,我的实现变得非常纯粹和解耦:
    • 我们设计了一个 MessageProcessor,它的唯一职责就是对比上一次的状态和当前状态,找出 messages 列表中的**新消息**。
    • 一旦发现新消息,就将其标准化为一条或多条轨迹事件(Event),并赋予它们正确的 Trace ID 和 Span ID。
    • 这种方式不关心这些消息是哪个 Node 产生的,也不关心 Checkpoint 是否启用。它只关心最终的结果,从而实现了与 LangGraph 内部执行逻辑的解耦。

    实现

    为实现上述目标,系统被设计为由多个松散耦合的组件构成,每个组件承担独立的职责。
    关键组件
    1. TraceContext: 追踪上下文,一个轻量级的数据容器,负责在整个 LangGraph 的执行流程中传递追踪状态,主要包含 trace_id、span_id和 parent_sntext: 追踪上下文,一个轻量级的数据容器,负责在整个 LangGraph 的执行流程中传递追踪状态,主要包含 trace_id、span_id 和 parent_span_id。
    2. TrajectoryRecorder: 轨迹记录器,是系统的核心协调者。它本身是无状态的,接收处理过的事件数据,并通过一个可插拔的 TrajectoryBackend 将数据写入到指定的存ecorder: 轨迹记录器,是系统的核心协调者。它本身是无状态的,接收处理过的事件数据,并通过一个可插拔的 TrajectoryBackend 将数据写入到指定的存储中。
    3. TrajectoryBackend: 存储后端,定义了数据写入的接口。LocalFileBackendend: 存储后端,定义了数据写入的接口。LocalFileBackend 是其默认实现,将轨迹数据以 JSONL 格式写入本地文件,每一行代表一个事件。这种设计使得更换后端(如 Kafka、Redis、PostgreSQL)变得非常简单。
    4. MessageProcessor: 消息处理器,负责将 LangGraph state['messages'] 中的原始消息转换为结构化的、可记录的 Trajectory 事件。它能识别消息类型(human, ai, tool)并生成相应的事件 payload。
    5. ReactTrajectoryHook **TrajectoryNode**: 集成层,负责将轨迹记录功能接入 LangGraph。
    • ReactTrajectoryHook 作为 ReAct Agent 的 post_model_hook,在每次模型调用后触发,自动管理 Trace 的生命周期(开始与结束)。
    • TrajectoryNode 作为一个独立的 LangGraph 节点,可以在图的任意位置被调用,手动记录轨迹。
  • TrajectoryViewer: 一个简单的命令行工具,用于读取本地存储的轨迹文件,并按照 trace_id 进行分组和格式化展示,模拟了 LangSmith 的可视化效果。

  • 分布式追踪机制

    为了实现类似 LangSmith 那样的可视化效果,我们需要将一次完整的“请求-响应”流程中的所有事件(多次工具调用、AI 思考等)聚合到同一个 Trace 下。这就引出了一个关键问题:如何判断一轮对话的开始和结束?
    • 当前实现:我们的策略相对直接——**通过新的用户输入来开启一个新的 Trace**。当一个 HumanMessage(用户消息)在上一轮对话结束后出现时,系统会生成一个新的 trace_id。本轮中所有的后续事件(AIMessageToolCallToolMessage)都会沿用这个 trace_id,但会拥有各自独立的 span_id,并通过 parent_span_id 建立父子关系。
    通过这三个 ID,我们可以清晰地重构出 Agent 的完整执行树。

    • langgraph_hook.py
      • 提供了 TrajectoryNode,一个可以被添加到任何 LangGraph 图中的独立节点。它提供了与 ReactTrajectoryHook 类似的功能,但给予开发者更大的控制权,可以在图的任意位置手动记录状态。

    • trajectory_viewer.py
      • TrajectoryViewer 是一个离线分析工具。它读取 JSONL 文件,使用 itertools.groupby **按 ****trace_id**用 itertools.groupby **按 ****trace_id**** 对所有事件进行分组**,然后格式化输出每一次完整的交互,清晰地展示了思考链和工具调用过程。

    调用过程

    效果

    1. 实现了完整的轨迹记录与追踪:成功构建了一个从数据捕获、处理、存储到展示的端到端轨迹系统。
    2. 高度解耦和可扩展:通过 TrajectoryBackend 抽象,系统可以轻松适配不同的生产环境存储方案。
    3. 优雅的 Trace 划分机制:通过监控用户输入来判定新 Trace 的开始,符合直觉且实现简洁,准确地将多轮对话划分为独立追踪单元。
    4. 双重集成模式:提供了 Hook 和 Node 两种集成方式,兼顾了自动化和灵活性,适用于不同类型的 LangGraph 应用。
    5. 简化的实现逻辑:从最初复杂的节点装饰器和状态快照方案,演进到最终“扫描消息增量”的模式,代码更简洁、鲁棒性更强。
    6. 实用的可视化工具TrajectoryViewer 提供了类似 LangSmith 的分组展示功能,极大地提升了调试和分析的效率。

    未来展望


    虽然当前系统已功能完备,但距离真实的线上系统仍有许多可以扩展和优化的方向,
    • 丰富后端支持:实现更多 TrajectoryBackend,如 Kafka(用于实时数据流)、Redis(用于快速缓存)或 ClickHouse/PostgreSQL(用于持久化存储和复杂查询)。
    • 增强可视化界面:将 TrajectoryViewer 从命令行工具升级为交互式 Web 应用,提供更丰富的过滤、搜索和可视化功能。
    • 与日志/数据管道集成:在生产环境中,将 TrajectoryBackend 对接到公司统一的日志系统和数据管道(如 Flink、Spark),实现企业级的监控和分析
    • 优化 Trace 结束判定:探索更复杂的 Trace 结束逻辑,例如基于特定事件或超时机制,以适应更复杂的业务场景。
    👀
    在实际的大模型工程的工作中,每一个小组件都经过多轮的评审和打磨,拿这个数据举例,可以做的事情还非常多,比如:
    • 要满足数据合规的要求,有的不可以存明文,可能就需要大模型改写信息来脱敏
    • 关联用户的点赞,不点赞的行为,分析上下文,来改写模型
    • 将数据进行冷热分离,持久归档冷数据到便宜的存储,省钱
    等等,但这些很多都是传统的后端和大数据的技术栈。有句话叫:所有的产品都值得被大模型重做一遍。同样的:所有的技术都会在大模型上找到影子。

53AI,企业落地大模型首选服务商

产品:场景落地咨询+大模型应用平台+行业解决方案

承诺:免费场景POC验证,效果验证后签署服务协议。零风险落地应用大模型,已交付160+中大型企业

联系我们

售前咨询
186 6662 7370
预约演示
185 8882 0121

微信扫码

添加专属顾问

回到顶部

加载中...

扫码咨询