2026年3月27日,来腾讯会议(限30人)了解掌握如何用Openclaw构建企业AI生产力
免费POC, 零成本试错
AI知识库

53AI知识库

学习大模型的前沿技术与行业应用场景


我要投稿

从零构建 Claude Code:揭秘 AI Coding Agent 的 12 层架构演进

发布日期:2026-03-18 14:32:59 浏览次数: 1531
作者:Coggle数据科学

微信搜一搜,关注“Coggle数据科学”

推荐语

揭秘AI Coding Agent的极简哲学:从基础循环到集群协作,12层架构带你拆解智能体核心原理。

核心内容:
1. 从黑盒到白盒:解析Agent框架的极简设计哲学
2. 四大演进阶段:基础循环、认知升级、工程化持久性、集群作战
3. 实战演示:核心循环代码解析与可视化交互平台体验

杨芳贤
53AI创始人/腾讯云(TVP)最具价值专家

看来你已经准备好了项目的核心结构和路线图。这份内容非常扎实,它不仅是一个代码库,更像是一套 Agent 架构师的修炼路径

为了配合你将这些内容转化为高质量的博客,我建议将这篇博文结构化为 “理论模型 -> 12 级演进阶梯 -> 快速实操” 的模式。

https://github.com/shareAI-lab/learn-claude-code

unsetunset内容大纲unsetunset

1. 开篇:从“黑盒”到“白盒”

  • 痛点引入: 现在的 Agent 框架(如 LangChain, CrewAI)功能强大但过于臃肿。
  • 项目初衷: 模仿 Claude Code 的极简与高效,用最纯粹的 Python 代码(0 到 1)拆解智能体的每一个核心零件。
  • 核心哲学:“One Loop to Rule Them All”。无论多么复杂的 Agent,本质都是一个不断处理 tool_use 的 while 循环。

2. 四大演进阶段(Learning Path 可视化)

你可以直接引用你的阶段划分,并为每个阶段总结一个“技术灵魂”:

  • Phase 1: 基础循环 (The Loop)
    • 技术点: 处理 stop_reason,建立 dispatch map
    • 感悟: Agent 的生命力在于它能反复感知并响应环境。
  • Phase 2: 认知升级 (Planning & Knowledge)
    • 技术点: Todo 管理、上下文压缩(Context Compact)。
    • 感悟: 记忆不是无限的,聪明的 Agent 懂得如何“遗忘”和“规划”。
  • Phase 3: 工程化持久性 (Persistence)
    • 技术点: 文件系统驱动的任务图、后台守护进程(Daemon)。
    • 感悟: 能够处理长时任务(Long-running tasks)是玩具与工具的分水岭。
  • Phase 4: 集群作战 (Teams)
    • 技术点: JSONL 信箱协议、自主领任务、隔离工作区。
    • 感悟: 多智能体不是简单的并行,而是有序的协作与资源隔离。

3. 代码展示:核心循环的魅力

在博客中贴出你那段简洁的 def agent_loop(messages):。这段代码是整个项目的灵魂,展示了如何通过简单的 if response.stop_reason != "tool_use": return 控制整个智能体的生命周期。

4. 快速开始与交互体验

  • 引导读者通过 git clone 快速上手。
  • 亮点推荐: 重点提到你的 Web Platform(Next.js 打造成的可视化平台)。对于学习者来说,能看到 Agent 思考过程的“单步调试”和“拓扑图”是极具吸引力的。

unsetunsets01: The Agent Loop (智能体循环)unsetunset

[ s01 ] s02 > s03 > s04 > s05 > s06 | s07 > s08 > s09 > s10 > s11 > s12

"One loop & Bash is all you need" -- 一个工具 + 一个循环 = 一个智能体。

语言模型能推理代码, 但碰不到真实世界 -- 不能读文件、跑测试、看报错。没有循环, 每次工具调用你都得手动把结果粘回去。你自己就是那个循环。

解决方案

+--------+      +-------+      +---------+
|  User  | ---> |  LLM  | ---> |  Tool   |
| prompt |      |       |      | execute |
+--------+      +---+---+      +----+----+
                    ^                |
                    |   tool_result  |
                    +----------------+
                    (loop until stop_reason != "tool_use")

一个退出条件控制整个流程。循环持续运行, 直到模型不再调用工具。

工作原理

  1. 用户 prompt 作为第一条消息。
messages.append({"role""user""content": query})
  1. 将消息和工具定义一起发给 LLM。
response = client.messages.create(
    model=MODEL, system=SYSTEM, messages=messages,
    tools=TOOLS, max_tokens=8000,
)
  1. 追加助手响应。检查 stop_reason -- 如果模型没有调用工具, 结束。
messages.append({"role""assistant""content": response.content})
if response.stop_reason != "tool_use":
    return
  1. 执行每个工具调用, 收集结果, 作为 user 消息追加。回到第 2 步。
results = []
for block in response.content:
    if block.type == "tool_use":
        output = run_bash(block.input["command"])
        results.append({
            "type""tool_result",
            "tool_use_id": block.id,
            "content": output,
        })
messages.append({"role""user""content": results})

组装为一个完整函数:

def agent_loop(query):
    messages = [{"role""user""content": query}]
    whileTrue:
        response = client.messages.create(
            model=MODEL, system=SYSTEM, messages=messages,
            tools=TOOLS, max_tokens=8000,
        )
        messages.append({"role""assistant""content": response.content})

        if response.stop_reason != "tool_use":
            return

        results = []
        for block in response.content:
            if block.type == "tool_use":
                output = run_bash(block.input["command"])
                results.append({
                    "type""tool_result",
                    "tool_use_id": block.id,
                    "content": output,
                })
        messages.append({"role""user""content": results})

不到 30 行, 这就是整个智能体。后面 11 个章节都在这个循环上叠加机制 -- 循环本身始终不变。

变更内容

组件
之前
之后
Agent loop
(无)
while True
 + stop_reason
Tools
(无)
bash
 (单一工具)
Messages
(无)
累积式消息列表
Control flow
(无)
stop_reason != "tool_use"

unsetunsets02: Tool Use (工具使用)unsetunset

s01 > [ s02 ] s03 > s04 > s05 > s06 | s07 > s08 > s09 > s10 > s11 > s12

"加一个工具, 只加一个 handler" -- 循环不用动, 新工具注册进 dispatch map 就行。

只有 bash 时, 所有操作都走 shell。cat 截断不可预测, sed 遇到特殊字符就崩, 每次 bash 调用都是不受约束的安全面。专用工具 (read_filewrite_file) 可以在工具层面做路径沙箱。

关键洞察: 加工具不需要改循环。

解决方案

+--------+      +-------+      +------------------+
|  User  | ---> |  LLM  | ---> | Tool Dispatch    |
| prompt |      |       |      | {                |
+--------+      +---+---+      |   bash: run_bash |
                    ^           |   read: run_read |
                    |           |   write: run_wr  |
                    +-----------+   edit: run_edit |
                    tool_result | }                |
                                +------------------+

The dispatch map is a dict: {tool_name: handler_function}.
One lookup replaces any if/elif chain.

工作原理

  1. 每个工具有一个处理函数。路径沙箱防止逃逸工作区。
def safe_path(p: str) -> Path:
    path = (WORKDIR / p).resolve()
    if not path.is_relative_to(WORKDIR):
        raise ValueError(f"Path escapes workspace: {p}")
    return path

def run_read(path: str, limit: int = None) -> str:
    text = safe_path(path).read_text()
    lines = text.splitlines()
    if limit and limit < len(lines):
        lines = lines[:limit]
    return "\n".join(lines)[:50000]
  1. dispatch map 将工具名映射到处理函数。
TOOL_HANDLERS = {
    "bash":       lambda **kw: run_bash(kw["command"]),
    "read_file":  lambda **kw: run_read(kw["path"], kw.get("limit")),
    "write_file"lambda **kw: run_write(kw["path"], kw["content"]),
    "edit_file":  lambda **kw: run_edit(kw["path"], kw["old_text"],
                                        kw["new_text"]),
}
  1. 循环中按名称查找处理函数。循环体本身与 s01 完全一致。
for block in response.content:
    if block.type == "tool_use":
        handler = TOOL_HANDLERS.get(block.name)
        output = handler(**block.input) if handler \
            else f"Unknown tool: {block.name}"
        results.append({
            "type""tool_result",
            "tool_use_id": block.id,
            "content": output,
        })

加工具 = 加 handler + 加 schema。循环永远不变。

相对 s01 的变更

组件
之前 (s01)
之后 (s02)
Tools
1 (仅 bash)
4 (bash, read, write, edit)
Dispatch
硬编码 bash 调用
TOOL_HANDLERS
 字典
路径安全
safe_path()
 沙箱
Agent loop
不变
不变

unsetunsets03: TodoWrite (待办写入)unsetunset

s01 > s02 > [ s03 ] s04 > s05 > s06 | s07 > s08 > s09 > s10 > s11 > s12

"没有计划的 agent 走哪算哪" -- 先列步骤再动手, 完成率翻倍。

多步任务中, 模型会丢失进度 -- 重复做过的事、跳步、跑偏。对话越长越严重: 工具结果不断填满上下文, 系统提示的影响力逐渐被稀释。一个 10 步重构可能做完 1-3 步就开始即兴发挥, 因为 4-10 步已经被挤出注意力了。

解决方案

+--------+      +-------+      +---------+
|  User  | ---> |  LLM  | ---> | Tools   |
| prompt |      |       |      | + todo  |
+--------+      +---+---+      +----+----+
                    ^                |
                    |   tool_result  |
                    +----------------+
                          |
              +-----------+-----------+
              | TodoManager state     |
              | [ ] task A            |
              | [>] task B  <- doing  |
              | [x] task C            |
              +-----------------------+
                          |
              if rounds_since_todo >= 3:
                inject <reminder> into tool_result

工作原理

  1. TodoManager 存储带状态的项目。同一时间只允许一个 in_progress
class TodoManager:
    def update(self, items: list) -> str:
        validated, in_progress_count = [], 0
        for item in items:
            status = item.get("status""pending")
            if status == "in_progress":
                in_progress_count += 1
            validated.append({"id": item["id"], "text": item["text"],
                              "status": status})
        if in_progress_count > 1:
            raise ValueError("Only one task can be in_progress")
        self.items = validated
        return self.render()
  1. todo 工具和其他工具一样加入 dispatch map。
TOOL_HANDLERS = {
    # ...base tools...
    "todo"lambda **kw: TODO.update(kw["items"]),
}
  1. nag reminder: 模型连续 3 轮以上不调用 todo 时注入提醒。
if rounds_since_todo >= 3 and messages:
    last = messages[-1]
    if last["role"] == "user" and isinstance(last.get("content"), list):
        last["content"].insert(0, {
            "type""text",
            "text""<reminder>Update your todos.</reminder>",
        })

"同时只能有一个 in_progress" 强制顺序聚焦。nag reminder 制造问责压力 -- 你不更新计划, 系统就追着你问。

相对 s02 的变更

组件
之前 (s02)
之后 (s03)
Tools
4
5 (+todo)
规划
带状态的 TodoManager
Nag 注入
3 轮后注入 <reminder>
Agent loop
简单分发
+ rounds_since_todo 计数器

unsetunsets04: Subagents (子智能体)unsetunset

s01 > s02 > s03 > [ s04 ] s05 > s06 | s07 > s08 > s09 > s10 > s11 > s12

"大任务拆小, 每个小任务干净的上下文" -- 子智能体用独立 messages[], 不污染主对话。

智能体工作越久, messages 数组越胖。每次读文件、跑命令的输出都永久留在上下文里。"这个项目用什么测试框架?" 可能要读 5 个文件, 但父智能体只需要一个词: "pytest。"

unsetunset解决方案unsetunset

Parent agent                     Subagent
+------------------+             +------------------+
| messages=[...]   |             | messages=[]      | <-- fresh
|                  |  dispatch   |                  |
| tool: task       | ----------> | while tool_use:  |
|   prompt="..."   |             |   call tools     |
|                  |  summary    |   append results |
|   result = "..." | <---------- | return last text |
+------------------+             +------------------+

Parent context stays clean. Subagent context is discarded.

工作原理

  1. 父智能体有一个 task 工具。子智能体拥有除 task 外的所有基础工具 (禁止递归生成)。
PARENT_TOOLS = CHILD_TOOLS + [
    {"name""task",
     "description""Spawn a subagent with fresh context.",
     "input_schema": {
         "type""object",
         "properties": {"prompt": {"type""string"}},
         "required": ["prompt"],
     }},
]
  1. 子智能体以 messages=[] 启动, 运行自己的循环。只有最终文本返回给父智能体。
def run_subagent(prompt: str) -> str:
    sub_messages = [{"role""user""content": prompt}]
    for _ in range(30):  # safety limit
        response = client.messages.create(
            model=MODEL, system=SUBAGENT_SYSTEM,
            messages=sub_messages,
            tools=CHILD_TOOLS, max_tokens=8000,
        )
        sub_messages.append({"role""assistant",
                             "content": response.content})
        if response.stop_reason != "tool_use":
            break
        results = []
        for block in response.content:
            if block.type == "tool_use":
                handler = TOOL_HANDLERS.get(block.name)
                output = handler(**block.input)
                results.append({"type""tool_result",
                    "tool_use_id": block.id,
                    "content": str(output)[:50000]})
        sub_messages.append({"role""user""content": results})
    return"".join(
        b.text for b in response.content if hasattr(b, "text")
    ) or"(no summary)"

子智能体可能跑了 30+ 次工具调用, 但整个消息历史直接丢弃。父智能体收到的只是一段摘要文本, 作为普通 tool_result 返回。

相对 s03 的变更

组件
之前 (s03)
之后 (s04)
Tools
5
5 (基础) + task (仅父端)
上下文
单一共享
父 + 子隔离
Subagent
run_subagent()
 函数
返回值
不适用
仅摘要文本

unsetunsets05: Skills (技能加载)unsetunset

s01 > s02 > s03 > s04 > [ s05 ] s06 | s07 > s08 > s09 > s10 > s11 > s12

"用到什么知识, 临时加载什么知识" -- 通过 tool_result 注入, 不塞 system prompt。

你希望智能体遵循特定领域的工作流: git 约定、测试模式、代码审查清单。全塞进系统提示太浪费 -- 10 个技能, 每个 2000 token, 就是 20,000 token, 大部分跟当前任务毫无关系。

解决方案

System prompt (Layer 1 -- always present):
+--------------------------------------+
| You are a coding agent.              |
| Skills available:                    |
|   - git: Git workflow helpers        |  ~100 tokens/skill
|   - test: Testing best practices     |
+--------------------------------------+

When model calls load_skill("git"):
+--------------------------------------+
| tool_result (Layer 2 -- on demand):  |
| <skill name="git">                   |
|   Full git workflow instructions...  |  ~2000 tokens
|   Step 1: ...                        |
| </skill>                             |
+--------------------------------------+

第一层: 系统提示中放技能名称 (低成本)。第二层: tool_result 中按需放完整内容。

工作原理

  1. 每个技能是一个目录, 包含 SKILL.md 文件和 YAML frontmatter。
skills/
  pdf/
    SKILL.md       # ---\n name: pdf\n description: Process PDF files\n ---\n ...
  code-review/
    SKILL.md       # ---\n name: code-review\n description: Review code\n ---\n ...
  1. SkillLoader 递归扫描 SKILL.md 文件, 用目录名作为技能标识。
class SkillLoader:
    def __init__(self, skills_dir: Path):
        self.skills = {}
        for f in sorted(skills_dir.rglob("SKILL.md")):
            text = f.read_text()
            meta, body = self._parse_frontmatter(text)
            name = meta.get("name", f.parent.name)
            self.skills[name] = {"meta": meta, "body": body}

    def get_descriptions(self) -> str:
        lines = []
        for name, skill in self.skills.items():
            desc = skill["meta"].get("description""")
            lines.append(f"  - {name}{desc}")
        return"\n".join(lines)

    def get_content(self, name: str) -> str:
        skill = self.skills.get(name)
        ifnot skill:
            returnf"Error: Unknown skill '{name}'."
        returnf"<skill name=\"{name}\">\n{skill['body']}\n</skill>"
  1. 第一层写入系统提示。第二层不过是 dispatch map 中的又一个工具。
SYSTEM = f"""You are a coding agent at {WORKDIR}.
Skills available:
{SKILL_LOADER.get_descriptions()}"""


TOOL_HANDLERS = {
    # ...base tools...
    "load_skill"lambda **kw: SKILL_LOADER.get_content(kw["name"]),
}

模型知道有哪些技能 (便宜), 需要时再加载完整内容 (贵)。

相对 s04 的变更

组件
之前 (s04)
之后 (s05)
Tools
5 (基础 + task)
5 (基础 + load_skill)
系统提示
静态字符串
+ 技能描述列表
知识库
skills/*/SKILL.md 文件
注入方式
两层 (系统提示 + result)

unsetunsets06: Context Compact (上下文压缩)unsetunset

s01 > s02 > s03 > s04 > s05 > [ s06 ] | s07 > s08 > s09 > s10 > s11 > s12

"上下文总会满, 要有办法腾地方" -- 三层压缩策略, 换来无限会话。

上下文窗口是有限的。读一个 1000 行的文件就吃掉 ~4000 token; 读 30 个文件、跑 20 条命令, 轻松突破 100k token。不压缩, 智能体根本没法在大项目里干活。

解决方案

三层压缩, 激进程度递增:

Every turn:
+------------------+
| Tool call result |
+------------------+
        |
        v
[Layer 1: micro_compact]        (silent, every turn)
  Replace tool_result > 3 turns old
  with "[Previous: used {tool_name}]"
        |
        v
[Check: tokens > 50000?]
   |               |
   no              yes
   |               |
   v               v
continue    [Layer 2: auto_compact]
              Save transcript to .transcripts/
              LLM summarizes conversation.
              Replace all messages with [summary].
                    |
                    v
            [Layer 3: compact tool]
              Model calls compact explicitly.
              Same summarization as auto_compact.

unsetunset工作原理unsetunset

  1. 第一层 -- micro_compact: 每次 LLM 调用前, 将旧的 tool result 替换为占位符。
def micro_compact(messages: list) -> list:
    tool_results = []
    for i, msg in enumerate(messages):
        if msg["role"] == "user" and isinstance(msg.get("content"), list):
            for j, part in enumerate(msg["content"]):
                if isinstance(part, dict) and part.get("type") == "tool_result":
                    tool_results.append((i, j, part))
    if len(tool_results) <= KEEP_RECENT:
        return messages
    for _, _, part in tool_results[:-KEEP_RECENT]:
        if len(part.get("content""")) > 100:
            part["content"] = f"[Previous: used {tool_name}]"
    return messages
  1. 第二层 -- auto_compact: token 超过阈值时, 保存完整对话到磁盘, 让 LLM 做摘要。
def auto_compact(messages: list) -> list:
    # Save transcript for recovery
    transcript_path = TRANSCRIPT_DIR / f"transcript_{int(time.time())}.jsonl"
    with open(transcript_path, "w"as f:
        for msg in messages:
            f.write(json.dumps(msg, default=str) + "\n")
    # LLM summarizes
    response = client.messages.create(
        model=MODEL,
        messages=[{"role""user""content":
            "Summarize this conversation for continuity..."
            + json.dumps(messages, default=str)[:80000]}],
        max_tokens=2000,
    )
    return [
        {"role""user""content"f"[Compressed]\n\n{response.content[0].text}"},
        {"role""assistant""content""Understood. Continuing."},
    ]
  1. 第三层 -- manual compactcompact 工具按需触发同样的摘要机制。

  2. 循环整合三层:

def agent_loop(messages: list):
    while True:
        micro_compact(messages)                        # Layer 1
        if estimate_tokens(messages) > THRESHOLD:
            messages[:] = auto_compact(messages)       # Layer 2
        response = client.messages.create(...)
        # ... tool execution ...
        if manual_compact:
            messages[:] = auto_compact(messages)       # Layer 3

完整历史通过 transcript 保存在磁盘上。信息没有真正丢失, 只是移出了活跃上下文。

相对 s05 的变更

组件
之前 (s05)
之后 (s06)
Tools
5
5 (基础 + compact)
上下文管理
三层压缩
Micro-compact
旧结果 -> 占位符
Auto-compact
token 阈值触发
Transcripts
保存到 .transcripts/

unsetunsets07: Task System (任务系统)unsetunset

s01 > s02 > s03 > s04 > s05 > s06 | [ s07 ] s08 > s09 > s10 > s11 > s12

"大目标要拆成小任务, 排好序, 记在磁盘上" -- 文件持久化的任务图, 为多 agent 协作打基础。

s03 的 TodoManager 只是内存中的扁平清单: 没有顺序、没有依赖、状态只有做完没做完。真实目标是有结构的 -- 任务 B 依赖任务 A, 任务 C 和 D 可以并行, 任务 E 要等 C 和 D 都完成。

没有显式的关系, 智能体分不清什么能做、什么被卡住、什么能同时跑。而且清单只活在内存里, 上下文压缩 (s06) 一跑就没了。

解决方案

把扁平清单升级为持久化到磁盘的任务图。每个任务是一个 JSON 文件, 有状态、前置依赖 (blockedBy) 和后置依赖 (blocks)。任务图随时回答三个问题:

  • 什么可以做? -- 状态为 pending 且 blockedBy 为空的任务。
  • 什么被卡住? -- 等待前置任务完成的任务。
  • 什么做完了? -- 状态为 completed 的任务, 完成时自动解锁后续任务。
.tasks/
  task_1.json  {"id":1, "status":"completed"}
  task_2.json  {"id":2, "blockedBy":[1], "status":"pending"}
  task_3.json  {"id":3, "blockedBy":[1], "status":"pending"}
  task_4.json  {"id":4, "blockedBy":[2,3], "status":"pending"}

任务图 (DAG):
                 +----------+
            +--> | task 2   | --+
            |    | pending  |   |
+----------+     +----------+    +--> +----------+
| task 1   |                          | task 4   |
| completed| --> +----------+    +--> | blocked  |
+----------+     | task 3   | --+     +----------+
                 | pending  |
                 +----------+

顺序:   task 1 必须先完成, 才能开始 2 和 3
并行:   task 2 和 3 可以同时执行
依赖:   task 4 要等 2 和 3 都完成
状态:   pending -> in_progress -> completed

这个任务图是 s07 之后所有机制的协调骨架: 后台执行 (s08)、多 agent 团队 (s09+)、worktree 隔离 (s12) 都读写这同一个结构。

工作原理

  1. TaskManager: 每个任务一个 JSON 文件, CRUD + 依赖图。
class TaskManager:
    def __init__(self, tasks_dir: Path):
        self.dir = tasks_dir
        self.dir.mkdir(exist_ok=True)
        self._next_id = self._max_id() + 1

    def create(self, subject, description=""):
        task = {"id": self._next_id, "subject": subject,
                "status""pending""blockedBy": [],
                "blocks": [], "owner"""}
        self._save(task)
        self._next_id += 1
        return json.dumps(task, indent=2)
  1. 依赖解除: 完成任务时, 自动将其 ID 从其他任务的 blockedBy 中移除, 解锁后续任务。
def _clear_dependency(self, completed_id):
    for f in self.dir.glob("task_*.json"):
        task = json.loads(f.read_text())
        if completed_id in task.get("blockedBy", []):
            task["blockedBy"].remove(completed_id)
            self._save(task)
  1. 状态变更 + 依赖关联update 处理状态转换和依赖边。
def update(self, task_id, status=None,
           add_blocked_by=None, add_blocks=None)
:

    task = self._load(task_id)
    if status:
        task["status"] = status
        if status == "completed":
            self._clear_dependency(task_id)
    self._save(task)
  1. 四个任务工具加入 dispatch map。
TOOL_HANDLERS = {
    # ...base tools...
    "task_create"lambda **kw: TASKS.create(kw["subject"]),
    "task_update"lambda **kw: TASKS.update(kw["task_id"], kw.get("status")),
    "task_list":   lambda **kw: TASKS.list_all(),
    "task_get":    lambda **kw: TASKS.get(kw["task_id"]),
}

从 s07 起, 任务图是多步工作的默认选择。s03 的 Todo 仍可用于单次会话内的快速清单。

相对 s06 的变更

组件
之前 (s06)
之后 (s07)
Tools
5
8 (task_create/update/list/get)
规划模型
扁平清单 (仅内存)
带依赖关系的任务图 (磁盘)
关系
blockedBy
 + blocks 边
状态追踪
做完没做完
pending
 -> in_progress -> completed
持久化
压缩后丢失
压缩和重启后存活

unsetunsets08: Background Tasks (后台任务)unsetunset

s01 > s02 > s03 > s04 > s05 > s06 | s07 > [ s08 ] s09 > s10 > s11 > s12

"慢操作丢后台, agent 继续想下一步" -- 后台线程跑命令, 完成后注入通知。

有些命令要跑好几分钟: npm installpytestdocker build。阻塞式循环下模型只能干等。用户说 "装依赖, 顺便建个配置文件", 智能体却只能一个一个来。

解决方案

Main thread                Background thread
+-----------------+        +-----------------+
| agent loop      |        | subprocess runs |
| ...             |        | ...             |
| [LLM call] <---+------- | enqueue(result) |
|  ^drain queue   |        +-----------------+
+-----------------+

Timeline:
Agent --[spawn A]--[spawn B]--[other work]----
             |          |
             v          v
          [A runs]   [B runs]      (parallel)
             |          |
             +-- results injected before next LLM call --+

工作原理

  1. BackgroundManager 用线程安全的通知队列追踪任务。
class BackgroundManager:
    def __init__(self):
        self.tasks = {}
        self._notification_queue = []
        self._lock = threading.Lock()
  1. run() 启动守护线程, 立即返回。
def run(self, command: str) -> str:
    task_id = str(uuid.uuid4())[:8]
    self.tasks[task_id] = {"status""running""command": command}
    thread = threading.Thread(
        target=self._execute, args=(task_id, command), daemon=True)
    thread.start()
    return f"Background task {task_id} started"
  1. 子进程完成后, 结果进入通知队列。
def _execute(self, task_id, command):
    try:
        r = subprocess.run(command, shell=True, cwd=WORKDIR,
            capture_output=True, text=True, timeout=300)
        output = (r.stdout + r.stderr).strip()[:50000]
    except subprocess.TimeoutExpired:
        output = "Error: Timeout (300s)"
    with self._lock:
        self._notification_queue.append({
            "task_id": task_id, "result": output[:500]})
  1. 每次 LLM 调用前排空通知队列。
def agent_loop(messages: list):
    while True:
        notifs = BG.drain_notifications()
        if notifs:
            notif_text = "\n".join(
                f"[bg:{n['task_id']}{n['result']}" for n in notifs)
            messages.append({"role""user",
                "content"f"<background-results>\n{notif_text}\n"
                           f"</background-results>"})
            messages.append({"role""assistant",
                "content""Noted background results."})
        response = client.messages.create(...)

循环保持单线程。只有子进程 I/O 被并行化。

相对 s07 的变更

组件
之前 (s07)
之后 (s08)
Tools
8
6 (基础 + background_run + check)
执行方式
仅阻塞
阻塞 + 后台线程
通知机制
每轮排空的队列
并发
守护线程

unsetunsets09: Agent Teams (智能体团队)unsetunset

s01 > s02 > s03 > s04 > s05 > s06 | s07 > s08 > [ s09 ] s10 > s11 > s12

"任务太大一个人干不完, 要能分给队友" -- 持久化队友 + JSONL 邮箱。

子智能体 (s04) 是一次性的: 生成、干活、返回摘要、消亡。没有身份, 没有跨调用的记忆。后台任务 (s08) 能跑 shell 命令, 但做不了 LLM 引导的决策。

真正的团队协作需要三样东西: (1) 能跨多轮对话存活的持久智能体, (2) 身份和生命周期管理, (3) 智能体之间的通信通道。

解决方案

Teammate lifecycle:
  spawn -> WORKING -> IDLE -> WORKING -> ... -> SHUTDOWN

Communication:
  .team/
    config.json           <- team roster + statuses
    inbox/
      alice.jsonl         <- append-only, drain-on-read
      bob.jsonl
      lead.jsonl

              +--------+    send("alice","bob","...")    +--------+
              | alice  | -----------------------------> |  bob   |
              | loop   |    bob.jsonl << {json_line}    |  loop  |
              +--------+                                +--------+
                   ^                                         |
                   |        BUS.read_inbox("alice")          |
                   +---- alice.jsonl -> read + drain ---------+

工作原理

  1. TeammateManager 通过 config.json 维护团队名册。
class TeammateManager:
    def __init__(self, team_dir: Path):
        self.dir = team_dir
        self.dir.mkdir(exist_ok=True)
        self.config_path = self.dir / "config.json"
        self.config = self._load_config()
        self.threads = {}
  1. spawn() 创建队友并在线程中启动 agent loop。
def spawn(self, name: str, role: str, prompt: str) -> str:
    member = {"name": name, "role": role, "status""working"}
    self.config["members"].append(member)
    self._save_config()
    thread = threading.Thread(
        target=self._teammate_loop,
        args=(name, role, prompt), daemon=True)
    thread.start()
    return f"Spawned teammate '{name}' (role: {role})"
  1. MessageBus: append-only 的 JSONL 收件箱。send() 追加一行; read_inbox() 读取全部并清空。
class MessageBus:
    def send(self, sender, to, content, msg_type="message", extra=None):
        msg = {"type": msg_type, "from": sender,
               "content": content, "timestamp": time.time()}
        if extra:
            msg.update(extra)
        with open(self.dir / f"{to}.jsonl""a"as f:
            f.write(json.dumps(msg) + "\n")

    def read_inbox(self, name):
        path = self.dir / f"{name}.jsonl"
        ifnot path.exists(): return"[]"
        msgs = [json.loads(l) for l in path.read_text().strip().splitlines() if l]
        path.write_text("")  # drain
        return json.dumps(msgs, indent=2)
  1. 每个队友在每次 LLM 调用前检查收件箱, 将消息注入上下文。
def _teammate_loop(self, name, role, prompt):
    messages = [{"role""user""content": prompt}]
    for _ in range(50):
        inbox = BUS.read_inbox(name)
        if inbox != "[]":
            messages.append({"role""user",
                "content"f"<inbox>{inbox}</inbox>"})
            messages.append({"role""assistant",
                "content""Noted inbox messages."})
        response = client.messages.create(...)
        if response.stop_reason != "tool_use":
            break
        # execute tools, append results...
    self._find_member(name)["status"] = "idle"

相对 s08 的变更

组件
之前 (s08)
之后 (s09)
Tools
6
9 (+spawn/send/read_inbox)
智能体数量
单一
领导 + N 个队友
持久化
config.json + JSONL 收件箱
线程
后台命令
每线程完整 agent loop
生命周期
一次性
idle -> working -> idle
通信
message + broadcast

s10: Team Protocols (团队协议)

s01 > s02 > s03 > s04 > s05 > s06 | s07 > s08 > s09 > [ s10 ] s11 > s12

"队友之间要有统一的沟通规矩" -- 一个 request-response 模式驱动所有协商。

s09 中队友能干活能通信, 但缺少结构化协调:

关机: 直接杀线程会留下写了一半的文件和过期的 config.json。需要握手 -- 领导请求, 队友批准 (收尾退出) 或拒绝 (继续干)。

计划审批: 领导说 "重构认证模块", 队友立刻开干。高风险变更应该先过审。

两者结构一样: 一方发带唯一 ID 的请求, 另一方引用同一 ID 响应。

解决方案

Shutdown Protocol            Plan Approval Protocol
==================           ======================

Lead             Teammate    Teammate           Lead
  |                 |           |                 |
  |--shutdown_req-->|           |--plan_req------>|
  | {req_id:"abc"}  |           | {req_id:"xyz"}  |
  |                 |           |                 |
  |<--shutdown_resp-|           |<--plan_resp-----|
  | {req_id:"abc",  |           | {req_id:"xyz",  |
  |  approve:true}  |           |  approve:true}  |

Shared FSM:
  [pending] --approve--> [approved]
  [pending] --reject---> [rejected]

Trackers:
  shutdown_requests = {req_id: {target, status}}
  plan_requests     = {req_id: {from, plan, status}}

工作原理

  1. 领导生成 request_id, 通过收件箱发起关机请求。
shutdown_requests = {}

def handle_shutdown_request(teammate: str) -> str:
    req_id = str(uuid.uuid4())[:8]
    shutdown_requests[req_id] = {"target": teammate, "status""pending"}
    BUS.send("lead", teammate, "Please shut down gracefully.",
             "shutdown_request", {"request_id": req_id})
    return f"Shutdown request {req_id} sent (status: pending)"
  1. 队友收到请求后, 用 approve/reject 响应。
if tool_name == "shutdown_response":
    req_id = args["request_id"]
    approve = args["approve"]
    shutdown_requests[req_id]["status"] = "approved" if approve else "rejected"
    BUS.send(sender, "lead", args.get("reason"""),
             "shutdown_response",
             {"request_id": req_id, "approve": approve})
  1. 计划审批遵循完全相同的模式。队友提交计划 (生成 request_id), 领导审查 (引用同一个 request_id)。
plan_requests = {}

def handle_plan_review(request_id, approve, feedback=""):
    req = plan_requests[request_id]
    req["status"] = "approved" if approve else "rejected"
    BUS.send("lead", req["from"], feedback,
             "plan_approval_response",
             {"request_id": request_id, "approve": approve})

一个 FSM, 两种用途。同样的 pending -> approved | rejected 状态机可以套用到任何请求-响应协议上。

相对 s09 的变更

组件
之前 (s09)
之后 (s10)
Tools
9
12 (+shutdown_req/resp +plan)
关机
仅自然退出
请求-响应握手
计划门控
提交/审查与审批
关联
每个请求一个 request_id
FSM
pending -> approved/rejected

s11: Autonomous Agents (自治智能体)

s01 > s02 > s03 > s04 > s05 > s06 | s07 > s08 > s09 > s10 > [ s11 ] s12

"队友自己看看板, 有活就认领" -- 不需要领导逐个分配, 自组织。

s09-s10 中, 队友只在被明确指派时才动。领导得给每个队友写 prompt, 任务看板上 10 个未认领的任务得手动分配。这扩展不了。

真正的自治: 队友自己扫描任务看板, 认领没人做的任务, 做完再找下一个。

一个细节: 上下文压缩 (s06) 后智能体可能忘了自己是谁。身份重注入解决这个问题。

解决方案

Teammate lifecycle with idle cycle:

+-------+
| spawn |
+---+---+
    |
    v
+-------+   tool_use     +-------+
| WORK  | <------------- |  LLM  |
+---+---+                +-------+
    |
    | stop_reason != tool_use (or idle tool called)
    v
+--------+
|  IDLE  |  poll every 5s for up to 60s
+---+----+
    |
    +---> check inbox --> message? ----------> WORK
    |
    +---> scan .tasks/ --> unclaimed? -------> claim -> WORK
    |
    +---> 60s timeout ----------------------> SHUTDOWN

Identity re-injection after compression:
if len(messages) <= 3:
    messages.insert(0, identity_block)

工作原理

  1. 队友循环分两个阶段: WORK 和 IDLE。LLM 停止调用工具 (或调用了 idle) 时, 进入 IDLE。
def _loop(self, name, role, prompt):
    whileTrue:
        # -- WORK PHASE --
        messages = [{"role""user""content": prompt}]
        for _ in range(50):
            response = client.messages.create(...)
            if response.stop_reason != "tool_use":
                break
            # execute tools...
            if idle_requested:
                break

        # -- IDLE PHASE --
        self._set_status(name, "idle")
        resume = self._idle_poll(name, messages)
        ifnot resume:
            self._set_status(name, "shutdown")
            return
        self._set_status(name, "working")
  1. 空闲阶段循环轮询收件箱和任务看板。
def _idle_poll(self, name, messages):
    for _ in range(IDLE_TIMEOUT // POLL_INTERVAL):  # 60s / 5s = 12
        time.sleep(POLL_INTERVAL)
        inbox = BUS.read_inbox(name)
        if inbox:
            messages.append({"role""user",
                "content"f"<inbox>{inbox}</inbox>"})
            returnTrue
        unclaimed = scan_unclaimed_tasks()
        if unclaimed:
            claim_task(unclaimed[0]["id"], name)
            messages.append({"role""user",
                "content"f"<auto-claimed>Task #{unclaimed[0]['id']}: "
                           f"{unclaimed[0]['subject']}</auto-claimed>"})
            returnTrue
    returnFalse# timeout -> shutdown
  1. 任务看板扫描: 找 pending 状态、无 owner、未被阻塞的任务。
def scan_unclaimed_tasks() -> list:
    unclaimed = []
    for f in sorted(TASKS_DIR.glob("task_*.json")):
        task = json.loads(f.read_text())
        if (task.get("status") == "pending"
                and not task.get("owner")
                and not task.get("blockedBy")):
            unclaimed.append(task)
    return unclaimed
  1. 身份重注入: 上下文过短 (说明发生了压缩) 时, 在开头插入身份块。
if len(messages) <= 3:
    messages.insert(0, {"role""user",
        "content"f"<identity>You are '{name}', role: {role}, "
                   f"team: {team_name}. Continue your work.</identity>"})
    messages.insert(1, {"role""assistant",
        "content"f"I am {name}. Continuing."})

相对 s10 的变更

组件
之前 (s10)
之后 (s11)
Tools
12
14 (+idle, +claim_task)
自治性
领导指派
自组织
空闲阶段
轮询收件箱 + 任务看板
任务认领
仅手动
自动认领未分配任务
身份
系统提示
+ 压缩后重注入
超时
60 秒空闲 -> 自动关机

unsetunsets12: Worktree + Task Isolation (Worktree 任务隔离)unsetunset

s01 > s02 > s03 > s04 > s05 > s06 | s07 > s08 > s09 > s10 > s11 > [ s12 ]

"各干各的目录, 互不干扰" -- 任务管目标, worktree 管目录, 按 ID 绑定。

到 s11, 智能体已经能自主认领和完成任务。但所有任务共享一个目录。两个智能体同时重构不同模块 -- A 改 config.py, B 也改 config.py, 未提交的改动互相污染, 谁也没法干净回滚。

任务板管 "做什么" 但不管 "在哪做"。解法: 给每个任务一个独立的 git worktree 目录, 用任务 ID 把两边关联起来。

解决方案

Control plane (.tasks/)             Execution plane (.worktrees/)
+------------------+                +------------------------+
| task_1.json      |                | auth-refactor/         |
|   status: in_progress  <------>   branch: wt/auth-refactor
|   worktree: "auth-refactor"   |   task_id: 1             |
+------------------+                +------------------------+
| task_2.json      |                | ui-login/              |
|   status: pending    <------>     branch: wt/ui-login
|   worktree: "ui-login"       |   task_id: 2             |
+------------------+                +------------------------+
                                    |
                          index.json (worktree registry)
                          events.jsonl (lifecycle log)

State machines:
  Task:     pending -> in_progress -> completed
  Worktree: absent  -> active      -> removed | kept

工作原理

  1. 创建任务。 先把目标持久化。
TASKS.create("Implement auth refactor")
# -> .tasks/task_1.json  status=pending  worktree=""
  1. 创建 worktree 并绑定任务。 传入 task_id 自动将任务推进到 in_progress
WORKTREES.create("auth-refactor", task_id=1)
# -> git worktree add -b wt/auth-refactor .worktrees/auth-refactor HEAD
# -> index.json gets new entry, task_1.json gets worktree="auth-refactor"

绑定同时写入两侧状态:

def bind_worktree(self, task_id, worktree):
    task = self._load(task_id)
    task["worktree"] = worktree
    if task["status"] == "pending":
        task["status"] = "in_progress"
    self._save(task)
  1. 在 worktree 中执行命令。cwd 指向隔离目录。
subprocess.run(command, shell=True, cwd=worktree_path,
               capture_output=True, text=True, timeout=300)
  1. 收尾。 两种选择:
  • worktree_keep(name) -- 保留目录供后续使用。
  • worktree_remove(name, complete_task=True) -- 删除目录, 完成绑定任务, 发出事件。一个调用搞定拆除 + 完成。
def remove(self, name, force=False, complete_task=False):
    self._run_git(["worktree""remove", wt["path"]])
    if complete_task and wt.get("task_id"is not None:
        self.tasks.update(wt["task_id"], status="completed")
        self.tasks.unbind_worktree(wt["task_id"])
        self.events.emit("task.completed", ...)
  1. 事件流。 每个生命周期步骤写入 .worktrees/events.jsonl:
{
  "event""worktree.remove.after",
  "task": {"id"1"status""completed"},
  "worktree": {"name""auth-refactor""status""removed"},
  "ts"1730000000
}

事件类型: worktree.create.before/after/failedworktree.remove.before/after/failedworktree.keeptask.completed

崩溃后从 .tasks/ + .worktrees/index.json 重建现场。会话记忆是易失的; 磁盘状态是持久的。

相对 s11 的变更

组件
之前 (s11)
之后 (s12)
协调
任务板 (owner/status)
任务板 + worktree 显式绑定
执行范围
共享目录
每个任务独立目录
可恢复性
仅任务状态
任务状态 + worktree 索引
收尾
任务完成
任务完成 + 显式 keep/remove
生命周期可见性
隐式日志
.worktrees/events.jsonl
 显式事件流

53AI,企业落地大模型首选服务商

产品:场景落地咨询+大模型应用平台+行业解决方案

承诺:免费POC验证,效果达标后再合作。零风险落地应用大模型,已交付160+中大型企业

联系我们

售前咨询
186 6662 7370
预约演示
185 8882 0121

微信扫码

添加专属顾问

回到顶部

加载中...

扫码咨询