微信扫码
添加专属顾问
我要投稿
探索A2A协议,掌握Python服务器构建技巧。 核心内容: 1. A2A协议基础与Python服务器创建指南 2. 环境设置与项目初始化详细步骤 3. 代理功能添加与本地Ollama模型使用
• 简介
• 设置环境
• 创建项目
• 代理技能
• 代理卡片
• A2A服务器
• 与A2A服务器交互
• 添加代理功能
• 使用本地Ollama模型
在本教程中,您将使用Python构建一个简单的回显A2A服务器。这个基础实现将展示A2A提供的所有功能。通过本教程,您将能够使用Ollama或Google的Agent Development Kit添加代理功能。
您将学习:
• A2A的基本概念
• 如何用Python创建A2A服务器
• 与A2A服务器交互
• 添加训练模型作为代理
• 代码编辑器,如Cursor/VsCode
• 命令提示符,如Terminal(Linux)、iTerm/Warp(Mac)或Cursor中的Terminal
我们将使用uv作为包管理器并设置我们的项目。
我们将使用的A2A库需要python >= 3.12
,如果您还没有匹配的版本,uv可以安装。我们将使用python 3.12。
运行以下命令以确保您已准备好进行下一步:
echo 'import sys; print(sys.version)' | uv run -
如果您看到类似以下内容,则表示您已准备就绪!
3.12.3 (main, Feb 4 2025, 14:48:35) [GCC 13.3.0]
• Python 3.13
• uv: uv 0.7.2 (Homebrew 2025-04-30)
• Warp
• Ollama 0.6.7 (支持Qwen3)
• macOs Sequoia 15.4.1
首先使用uv
创建一个项目。我们将添加--package
标志,以便您以后可以添加测试或发布项目:
uv init --package my-project
cd my-project
我们将为这个项目创建一个虚拟环境。这只需要做一次:
uv venv .venv
对于此及将来打开的任何终端窗口,您需要激活此虚拟环境:
source .venv/bin/activate
如果您使用的是VS Code等代码编辑器,您需要设置Python解释器以获取代码补全。在VS Code中,按Ctrl-Shift-P
并选择Python: Select Interpreter
。然后选择您的项目my-project
,然后选择正确的python解释器Python 3.12.3 ('.venv':venv) ./.venv/bin/python
源代码现在应该类似于这样:
# my-project
tree
.
|____pyproject.toml
|____README.md
|____.venv
| |____bin
| | |____activate.bat
| | |____activate.ps1
| | |____python3
| | |____python
| | |____activate.fish
| | |____pydoc.bat
| | |____activate_this.py
| | |____activate
| | |____activate.nu
| | |____deactivate.bat
| | |____python3.13
| | |____activate.csh
| |____pyvenv.cfg
| |____CACHEDIR.TAG
| |____.gitignore
| |____lib
| | |____python3.13
| | | |____site-packages
| | | | |_____virtualenv.py
| | | | |_____virtualenv.pth
|____.python-version
|____src
| |____my_project
| | |______init__.py
接下来,我们将添加来自Google的示例A2A python库:
uv add git+https://github.com/google/A2A#subdirectory=samples/python
pyproject.toml:
[project]
name = "my-project"
version = "0.1.0"
description = "Add your description here"
readme = "README.md"
authors = [
{ name = "zhangcheng", email = "zh.milo@gmail.com" }
]
requires-python = ">=3.13"
dependencies = [
"a2a-samples",
]
[project.scripts]
my-project = "my_project:main"
[build-system]
requires = ["hatchling"]
build-backend = "hatchling.build"
[tool.uv.sources]
a2a-samples = { git = "https://github.com/google/A2A", subdirectory = "samples/python" }
现在创建一些我们稍后将使用的文件:
touch src/my_project/agent.py
touch src/my_project/task_manager.py
如果一切设置正确,您现在应该能够运行您的应用程序:
uv run my-project
输出应该类似于这样:
Hello from my-project!
代理技能是代理可以执行的一组功能。以下是我们的回显代理的示例:
{
id: "my-project-echo-skill"
name: "Echo Tool",
description: "Echos the input given",
tags: ["echo", "repeater"],
examples: ["I will see this echoed back to me"],
inputModes: ["text"],
outputModes: ["text"]
}
这符合代理卡片的技能部分:
{
id: string; // 代理技能的唯一标识符
name: string; // 技能的人类可读名称
// 技能描述 - 将被客户端或人类用作
// 了解技能作用的提示。
description: string;
// 描述此特定技能能力类别的标签词集合
// (例如 "cooking", "customer support", "billing")
tags: string[];
// 技能可以执行的示例场景集合。
// 将被客户端用作了解如何使用技能的提示。
// (例如 "I need a recipe for bread")
examples?: string[]; // 任务提示示例
// 技能支持的交互模式集合
// (如果与默认值不同)
inputModes?: string[]; // 支持的输入mime类型
outputModes?: string[]; // 支持的输出mime类型
}
让我们在代码中创建这个代理技能。打开src/my-project/__init__.py
并用以下代码替换内容:
import google_a2a
from google_a2a.common.types import AgentSkill
def main():
skill = AgentSkill(
id="my-project-echo-skill",
name="Echo Tool",
description="Echos the input given",
tags=["echo", "repeater"],
examples=["I will see this echoed back to me"],
inputModes=["text"],
outputModes=["text"],
)
print(skill)
if __name__ == "__main__":
main()
如果遇到模块错误,请尝试以下方式:
from common.types import AgentSkill
# 相同的代码
让我们运行一下:
uv run my-project
输出应该类似于这样:
id='my-project-echo-skill' name='Echo Tool' description='Echos the input given' tags=['echo', 'repeater'] examples=['I will see this echoed back to me'] inputModes=['text'] outputModes=['text']
现在我们已经定义了技能,可以创建代理卡片了。
远程代理需要以JSON格式发布代理卡片,描述代理的功能和技能以及身份验证机制。换句话说,这让世界了解您的代理以及如何与之交互。
首先添加一些用于解析命令行参数的辅助工具。这对于以后启动服务器会很有帮助:
uv add click
并更新我们的代码:
import logging
import click
import google_a2a
from google_a2a.common.types import AgentSkill, AgentCapabilities, AgentCard
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
@click.command()
@click.option("--host", default="localhost")
@click.option("--port", default=10002)
def main(host, port):
skill = AgentSkill(
id="my-project-echo-skill",
name="Echo Tool",
description="Echos the input given",
tags=["echo", "repeater"],
examples=["I will see this echoed back to me"],
inputModes=["text"],
outputModes=["text"],
)
logging.info(skill)
if __name__ == "__main__":
main()
接下来我们添加代理卡片:
# ...
def main(host, port):
# ...
capabilities = AgentCapabilities()
agent_card = AgentCard(
name="Echo Agent",
description="This agent echos the input given",
url=f"http://{host}:{port}/",
version="0.1.0",
defaultInputModes=["text"],
defaultOutputModes=["text"],
capabilities=capabilities,
skills=[skill]
)
logging.info(agent_card)
if __name__ == "__main__":
main()
让我们运行一下:
uv run my-project
输出应该类似于这样:
INFO:root:id='my-project-echo-skill' name='Echo Tool' description='Echos the input given' tags=['echo', 'repeater'] examples=['I will see this echoed back to me'] inputModes=['text'] outputModes=['text']
INFO:root:name='Echo Agent' description='This agent echos the input given' url='http://localhost:10002/' provider=None version='0.1.0' documentationUrl=None capabilities=AgentCapabilities(streaming=False, pushNotifications=False, stateTransitionHistory=False) authentication=None defaultInputModes=['text'] defaultOutputModes=['text'] skills=[AgentSkill(id='my-project-echo-skill', name='Echo Tool', description='Echos the input given', tags=['echo', 'repeater'], examples=['I will see this echoed back to me'], inputModes=['text'], outputModes=['text'])]
我们几乎准备好启动服务器了!我们将使用Google-A2A
中的A2AServer
类,它在底层启动了一个uvicorn服务器。
在创建服务器之前,我们需要一个任务管理器来处理传入请求。
我们将实现InMemoryTaskManager接口,这需要我们实现两个方法:
async def on_send_task(
self,
request: SendTaskRequest
) -> SendTaskResponse:
"""
此方法查询或为代理创建任务。
调用者将收到恰好一个响应。
"""
pass
async def on_send_task_subscribe(
self,
request: SendTaskStreamingRequest
) -> AsyncIterable[SendTaskStreamingResponse] | JSONRPCResponse:
"""
此方法让调用者订阅关于任务的未来更新。
调用者将收到一个响应,并通过客户端和服务器之间建立的会话
额外接收订阅更新
"""
pass
打开src/my_project/task_manager.py
并添加以下代码。我们将简单地返回直接回显响应,并立即将任务标记为完成,不使用任何会话或订阅:
from typing import AsyncIterable
import google_a2a
from google_a2a.common.server.task_manager import InMemoryTaskManager
from google_a2a.common.types import (
Artifact,
JSONRPCResponse,
Message,
SendTaskRequest,
SendTaskResponse,
SendTaskStreamingRequest,
SendTaskStreamingResponse,
Task,
TaskState,
TaskStatus,
TaskStatusUpdateEvent,
)
class MyAgentTaskManager(InMemoryTaskManager):
def __init__(self):
super().__init__()
async def on_send_task(self, request: SendTaskRequest) -> SendTaskResponse:
# 更新InMemoryTaskManager存储的任务
await self.upsert_task(request.params)
task_id = request.params.id
# 我们的自定义逻辑,简单地将任务标记为完成
# 并返回回显文本
received_text = request.params.message.parts[0].text
task = await self._update_task(
task_id=task_id,
task_state=TaskState.COMPLETED,
response_text=f"on_send_task received: {received_text}"
)
# 发送响应
return SendTaskResponse(id=request.id, result=task)
async def on_send_task_subscribe(
self,
request: SendTaskStreamingRequest
) -> AsyncIterable[SendTaskStreamingResponse] | JSONRPCResponse:
pass
async def _update_task(
self,
task_id: str,
task_state: TaskState,
response_text: str,
) -> Task:
task = self.tasks[task_id]
agent_response_parts = [
{
"type": "text",
"text": response_text,
}
]
task.status = TaskStatus(
state=task_state,
message=Message(
role="agent",
parts=agent_response_parts,
)
)
task.artifacts = [
Artifact(
parts=agent_response_parts,
)
]
return task
有了完整的任务管理器,我们现在可以创建服务器了。
打开src/my_project/__init__.py
并添加以下代码:
# ...
from google_a2a.common.server import A2AServer
from my_project.task_manager import MyAgentTaskManager
# ...
def main(host, port):
# ...
task_manager = MyAgentTaskManager()
server = A2AServer(
agent_card=agent_card,
task_manager=task_manager,
host=host,
port=port,
)
server.start()
让我们运行一下:
uv run my-project
输出应该类似于这样:
INFO:root:id='my-project-echo-skill' name='Echo Tool' description='Echos the input given' tags=['echo', 'repeater'] examples=['I will see this echoed back to me'] inputModes=['text'] outputModes=['text']
INFO:root:name='Echo Agent' description='This agent echos the input given' url='http://localhost:10002/' provider=None version='0.1.0' documentationUrl=None capabilities=AgentCapabilities(streaming=False, pushNotifications=False, stateTransitionHistory=False) authentication=None defaultInputModes=['text'] defaultOutputModes=['text'] skills=[AgentSkill(id='my-project-echo-skill', name='Echo Tool', description='Echos the input given', tags=['echo', 'repeater'], examples=['I will see this echoed back to me'], inputModes=['text'], outputModes=['text'])]
INFO: Started server process [582]
INFO: Waiting for application startup.
INFO: Application startup complete.
INFO: Uvicorn running on http://localhost:10002 (Press CTRL+C to quit)
恭喜!您的A2A服务器现在已经运行!
首先,我们将使用Google-A2A的命令行工具向我们的A2A服务器发送请求。尝试之后,我们将编写自己的基本客户端,看看这在幕后是如何工作的。
在上一步中,让您的A2A服务器已经运行:
# 这应该已经在您的终端中运行
$ uv run my-project
INFO: Started server process [20538]
INFO: Waiting for application startup.
INFO: Application startup complete.
INFO: Uvicorn running on http://localhost:10002 (Press CTRL+C to quit)
在同一目录中打开一个新终端:
source .venv/bin/activate
uv run google-a2a-cli --agent http://localhost:10002
# 如果出错,请尝试这个(确保.venv/lib/python3.13/site-packages中有一个hosts目录):
uv run python -m hosts.cli --agent http://localhost:10002
注意:只有当您从这个pull request安装了google-a2a,这才能工作,因为cli之前没有暴露。
否则,您必须直接检出Google/A2A仓库,导航到samples/python
仓库并直接运行cli。
也可以使用如下的方式运行简陋的 cli:
git clone git@github.com:google/A2A.git
cd samples/python/hosts/cli
uv run . --agent [url-of-your-a2a-server]
然后您可以通过输入并按Enter键向服务器发送消息:
========= starting a new task ========
What do you want to send to the agent? (:q or quit to exit): Hello!
如果一切正常,您将在响应中看到:
$ uv run python -m hosts.cli --agent http://localhost:10002
======= Agent Card ========
{"name":"Echo Agent","description":"This agent echos the input given","url":"http://localhost:10002/","version":"0.1.0","capabilities":{"streaming":false,"pushNotifications":false,"stateTransitionHistory":false},"defaultInputModes":["text"],"defaultOutputModes":["text"],"skills":[{"id":"my-project-echo-skill","name":"Echo Tool","description":"Echos the input given","tags":["echo","repeater"],"examples":["I will see this echoed back to me"],"inputModes":["text"],"outputModes":["text"]}]}
========= starting a new task ========
What do you want to send to the agent? (:q or quit to exit): hello
Select a file path to attach? (press enter to skip):
{"jsonrpc":"2.0","id":"5b3b74b7ea80495daff4047ee48a6c48","result":{"id":"740f1e21465b4ee2af4af7b8c6cacad5","sessionId":"7fbd065264cb4d6c91ed96909589fc35","status":{"state":"completed","message":{"role":"agent","parts":[{"type":"text","text":"on_send_task received: hello"}]},"timestamp":"2025-05-03T22:18:41.649600"},"artifacts":[{"parts":[{"type":"text","text":"on_send_task received: hello"}],"index":0}],"history":[{"role":"user","parts":[{"type":"text","text":"hello"}]}]}}
========= starting a new task ========
What do you want to send to the agent? (:q or quit to exit):
要退出,请输入:q
并按Enter键。
现在我们已经有了一个基本的A2A服务器运行,让我们添加更多功能。我们将探索A2A如何异步工作并流式传输响应。
这允许客户端订阅服务器并接收多个更新,而不是单个响应。这对于长时间运行的代理任务或可能向客户端流式传输多个Artifacts的情况非常有用。
首先,我们将声明我们的代理已准备好进行流式传输。打开src/my_project/__init__.py
并更新AgentCapabilities:
# ...
def main(host, port):
# ...
capabilities = AgentCapabilities(
streaming=True
)
# ...
现在在src/my_project/task_manager.py
中,我们必须实现on_send_task_subscribe
:
import asyncio
# ...
class MyAgentTaskManager(InMemoryTaskManager):
# ...
async def _stream_3_messages(self, request: SendTaskStreamingRequest):
task_id = request.params.id
received_text = request.params.message.parts[0].text
text_messages = ["one", "two", "three"]
for text in text_messages:
parts = [
{
"type": "text",
"text": f"{received_text}: {text}",
}
]
message = Message(role="agent", parts=parts)
is_last = text == text_messages[-1]
task_state = TaskState.COMPLETED if is_last else TaskState.WORKING
task_status = TaskStatus(
state=task_state,
message=message
)
task_update_event = TaskStatusUpdateEvent(
id=request.params.id,
status=task_status,
final=is_last,
)
await self.enqueue_events_for_sse(
request.params.id,
task_update_event
)
async def on_send_task_subscribe(
self,
request: SendTaskStreamingRequest
) -> AsyncIterable[SendTaskStreamingResponse] | JSONRPCResponse:
# 更新InMemoryTaskManager存储的任务
await self.upsert_task(request.params)
task_id = request.params.id
# 为此任务创建一个工作队列
sse_event_queue = await self.setup_sse_consumer(task_id=task_id)
# 为此任务启动异步工作
asyncio.create_task(self._stream_3_messages(request))
# 告诉客户端期望未来的流式响应
return self.dequeue_events_for_sse(
request_id=request.id,
task_id=task_id,
sse_event_queue=sse_event_queue,
)
重启您的A2A服务器以获取新更改,然后重新运行cli:
$ uv run python -m hosts.cli --agent http://localhost:10002
======= Agent Card ========
{"name":"Echo Agent","description":"This agent echos the input given","url":"http://localhost:10002/","version":"0.1.0","capabilities":{"streaming":true,"pushNotifications":false,"stateTransitionHistory":false},"defaultInputModes":["text"],"defaultOutputModes":["text"],"skills":[{"id":"my-project-echo-skill","name":"Echo Tool","description":"Echos the input given","tags":["echo","repeater"],"examples":["I will see this echoed back to me"],"inputModes":["text"],"outputModes":["text"]}]}
========= starting a new task ========
What do you want to send to the agent? (:q or quit to exit): Streaming?
Select a file path to attach? (press enter to skip):
stream event => {"jsonrpc":"2.0","id":"c6f21c0b7e5e497caaca4a692aaefd7a","result":{"id":"d7218dd3c122477c89d62e7d897fea0b","status":{"state":"working","message":{"role":"agent","parts":[{"type":"text","text":"Streaming?: one"}]},"timestamp":"2025-05-03T22:22:31.354656"},"final":false}}
stream event => {"jsonrpc":"2.0","id":"c6f21c0b7e5e497caaca4a692aaefd7a","result":{"id":"d7218dd3c122477c89d62e7d897fea0b","status":{"state":"working","message":{"role":"agent","parts":[{"type":"text","text":"Streaming?: two"}]},"timestamp":"2025-05-03T22:22:31.354684"},"final":false}}
stream event => {"jsonrpc":"2.0","id":"c6f21c0b7e5e497caaca4a692aaefd7a","result":{"id":"d7218dd3c122477c89d62e7d897fea0b","status":{"state":"completed","message":{"role":"agent","parts":[{"type":"text","text":"Streaming?: three"}]},"timestamp":"2025-05-03T22:22:31.354698"},"final":true}}
========= starting a new task ========
What do you want to send to the agent? (:q or quit to exit):
有时代理可能需要额外输入。例如,也许代理会询问客户端是否想继续重复3条消息。在这种情况下,代理将响应TaskState.INPUT_REQUIRED
,然后客户端将重新发送send_task_streaming
,使用相同的task_id
和session_id
,但更新了提供代理所需输入的消息。在服务器端,我们将更新on_send_task_subscribe
来处理这种情况:
import asyncio
from typing import AsyncIterable
from common.server.task_manager import InMemoryTaskManager
from common.types import (
Artifact,
JSONRPCResponse,
Message,
SendTaskRequest,
SendTaskResponse,
SendTaskStreamingRequest,
SendTaskStreamingResponse,
Task,
TaskState,
TaskStatus,
TaskStatusUpdateEvent,
)
class MyAgentTaskManager(InMemoryTaskManager):
def __init__(self):
super().__init__()
async def on_send_task(self, request: SendTaskRequest) -> SendTaskResponse:
# 更新InMemoryTaskManager存储的任务
await self.upsert_task(request.params)
task_id = request.params.id
# 我们的自定义逻辑,简单地将任务标记为完成
# 并返回回显文本
received_text = request.params.message.parts[0].text
task = await self._update_task(
task_id=task_id,
task_state=TaskState.COMPLETED,
response_text=f"on_send_task received: {received_text}"
)
# 发送响应
return SendTaskResponse(id=request.id, result=task)
async def _stream_3_messages(self, request: SendTaskStreamingRequest):
task_id = request.params.id
received_text = request.params.message.parts[0].text
text_messages = ["one", "two", "three"]
for text in text_messages:
parts = [
{
"type": "text",
"text": f"{received_text}: {text}",
}
]
message = Message(role="agent", parts=parts)
# is_last = text == text_messages[-1]
task_state = TaskState.WORKING
# task_state = TaskState.COMPLETED if is_last else TaskState.WORKING
task_status = TaskStatus(
state=task_state,
message=message
)
task_update_event = TaskStatusUpdateEvent(
id=request.params.id,
status=task_status,
final=False,
)
await self.enqueue_events_for_sse(
request.params.id,
task_update_event
)
ask_message = Message(
role="agent",
parts=[
{
"type": "text",
"text": "Would you like more messages? (Y/N)"
}
]
)
task_update_event = TaskStatusUpdateEvent(
id=request.params.id,
status=TaskStatus(
state=TaskState.INPUT_REQUIRED,
message=ask_message
),
final=True,
)
await self.enqueue_events_for_sse(
request.params.id,
task_update_event
)
async def on_send_task_subscribe(
self,
request: SendTaskStreamingRequest
) -> AsyncIterable[SendTaskStreamingResponse] | JSONRPCResponse:
task_id = request.params.id
is_new_task = task_id in self.tasks
# 更新InMemoryTaskManager存储的任务
await self.upsert_task(request.params)
received_text = request.params.message.parts[0].text
sse_event_queue = await self.setup_sse_consumer(task_id=task_id)
if not is_new_task and received_text == "N":
task_update_event = TaskStatusUpdateEvent(
id=request.params.id,
status=TaskStatus(
state=TaskState.COMPLETED,
message=Message(
role="agent",
parts=[
{
"type": "text",
"text": "All done!"
}
]
)
),
final=True,
)
await self.enqueue_events_for_sse(
request.params.id,
task_update_event,
)
else:
asyncio.create_task(self._stream_3_messages(request))
return self.dequeue_events_for_sse(
request_id=request.id,
task_id=task_id,
sse_event_queue=sse_event_queue,
)
async def _update_task(
self,
task_id: str,
task_state: TaskState,
response_text: str,
) -> Task:
task = self.tasks[task_id]
agent_response_parts = [
{
"type": "text",
"text": response_text,
}
]
task.status = TaskStatus(
state=task_state,
message=Message(
role="agent",
parts=agent_response_parts,
)
)
task.artifacts = [
Artifact(
parts=agent_response_parts,
)
]
return task
现在重启服务器并运行cli,我们可以看到任务将继续运行,直到我们告诉代理N
:
uv run python -m hosts.cli --agent http://localhost:10002
======= Agent Card ========
{"name":"Echo Agent","description":"This agent echos the input given","url":"http://localhost:10002/","version":"0.1.0","capabilities":{"streaming":true,"pushNotifications":false,"stateTransitionHistory":false},"defaultInputModes":["text"],"defaultOutputModes":["text"],"skills":[{"id":"my-project-echo-skill","name":"Echo Tool","description":"Echos the input given","tags":["echo","repeater"],"examples":["I will see this echoed back to me"],"inputModes":["text"],"outputModes":["text"]}]}
========= starting a new task ========
What do you want to send to the agent? (:q or quit to exit): Streaming?
Select a file path to attach? (press enter to skip):
stream event => {"jsonrpc":"2.0","id":"18357b72fc5841ef8e8ede073b91ac48","result":{"id":"b02f6989e72f44818560778d39fcef18","status":{"state":"working","message":{"role":"agent","parts":[{"type":"text","text":"Streaming?: one"}]},"timestamp":"2025-05-04T09:18:18.235994"},"final":false}}
stream event => {"jsonrpc":"2.0","id":"18357b72fc5841ef8e8ede073b91ac48","result":{"id":"b02f6989e72f44818560778d39fcef18","status":{"state":"working","message":{"role":"agent","parts":[{"type":"text","text":"Streaming?: two"}]},"timestamp":"2025-05-04T09:18:18.236021"},"final":false}}
stream event => {"jsonrpc":"2.0","id":"18357b72fc5841ef8e8ede073b91ac48","result":{"id":"b02f6989e72f44818560778d39fcef18","status":{"state":"working","message":{"role":"agent","parts":[{"type":"text","text":"Streaming?: three"}]},"timestamp":"2025-05-04T09:18:18.236033"},"final":false}}
stream event => {"jsonrpc":"2.0","id":"18357b72fc5841ef8e8ede073b91ac48","result":{"id":"b02f6989e72f44818560778d39fcef18","status":{"state":"input-required","message":{"role":"agent","parts":[{"type":"text","text":"Would you like more messages? (Y/N)"}]},"timestamp":"2025-05-04T09:18:18.236044"},"final":true}}
========= starting a new task ========
What do you want to send to the agent? (:q or quit to exit): N
Select a file path to attach? (press enter to skip):
stream event => {"jsonrpc":"2.0","id":"86ce510ba68b4797a5b68061c8c4780b","result":{"id":"64e51665dc354d2da7c31bcc45abc8f9","status":{"state":"completed","message":{"role":"agent","parts":[{"type":"text","text":"All done!"}]},"timestamp":"2025-05-04T09:22:24.598749"},"final":true}}
========= starting a new task ========
What do you want to send to the agent? (:q or quit to exit):
恭喜!您现在拥有一个能够异步执行工作并在需要时向用户请求输入的代理。
现在我们来到了激动人心的部分。我们将为A2A服务器添加AI功能。
在本教程中,我们将设置一个本地Ollama模型并将其与A2A服务器集成。
我们将安装ollama
、langchain
,并下载支持MCP工具的ollama模型(用于将来的教程)。
1. 下载ollama
2. 运行ollama服务器:
# 注意:如果ollama已经在运行,您可能会收到错误,例如
# Error: listen tcp 127.0.0.1:11434: bind: address already in use
# 在Linux上,您可以运行systemctl stop ollama来停止ollama
ollama serve
3. 从此列表下载模型。我们将使用qwen3
:
ollama pull qwen3:4b
# 仅2.4G
4. 安装langchain
:
uv add langchain langchain-ollama langgraph
现在ollama设置好了,我们可以开始将其集成到我们的A2A服务器中。
首先打开src/my_project/__init__.py
:
# ...
@click.command()
@click.option("--host", default="localhost")
@click.option("--port", default=10002)
@click.option("--ollama-host", default="http://127.0.0.1:11434")
@click.option("--ollama-model", default=None)
def main(host, port, ollama_host, ollama_model):
# ...
capabilities = AgentCapabilities(
streaming=False # 我们将流式功能作为读者的练习
)
# ...
task_manager = MyAgentTaskManager(
ollama_host=ollama_host,
ollama_model=ollama_model,
)
# ..
现在让我们在src/my_project/agent.py
中添加AI功能:
from langchain_ollama import ChatOllama
from langgraph.prebuilt import create_react_agent
from langgraph.graph.graph import CompiledGraph
def create_ollama_agent(ollama_base_url: str, ollama_model: str):
ollama_chat_llm = ChatOllama(
base_url=ollama_base_url,
model=ollama_model,
temperature=0.2
)
agent = create_react_agent(ollama_chat_llm, tools=[])
return agent
async def run_ollama(ollama_agent: CompiledGraph, prompt: str):
agent_response = await ollama_agent.ainvoke(
{"messages": prompt }
)
message = agent_response["messages"][-1].content
return str(message)
最后,让我们从src/my_project/task_manager.py
调用我们的ollama代理:
# ...
from my_project.agent import create_ollama_agent, run_ollama
class MyAgentTaskManager(InMemoryTaskManager):
def __init__(
self,
ollama_host: str,
ollama_model: typing.Union[None, str]
):
super().__init__()
if ollama_model is not None:
self.ollama_agent = create_ollama_agent(
ollama_base_url=ollama_host,
ollama_model=ollama_model
)
else:
self.ollama_agent = None
async def on_send_task(self, request: SendTaskRequest) -> SendTaskResponse:
# ...
received_text = request.params.message.parts[0].text
response_text = f"on_send_task received: {received_text}"
if self.ollama_agent is not None:
response_text = await run_ollama(ollama_agent=self.ollama_agent, prompt=received_text)
task = await self._update_task(
task_id=task_id,
task_state=TaskState.COMPLETED,
response_text=response_text
)
# 发送响应
return SendTaskResponse(id=request.id, result=task)
# ...
让我们测试一下!
首先重新运行我们的A2A服务器,将qwq
替换为您下载的ollama模型:
uv run my-project --ollama-host http://127.0.0.1:11434 --ollama-model qwen3:4b
然后重新运行cli:
uv run python -m hosts.cli --agent http://localhost:10002
注意,如果您使用的是大型模型,可能需要一段时间才能加载。cli可能会超时。在这种情况下,一旦ollama服务器完成加载模型,就重新运行cli。
您应该会看到类似以下内容:
======= Agent Card ========
{"name":"Echo Agent","description":"This agent echos the input given","url":"http://localhost:10002/","version":"0.1.0","capabilities":{"streaming":false,"pushNotifications":false,"stateTransitionHistory":false},"defaultInputModes":["text"],"defaultOutputModes":["text"],"skills":[{"id":"my-project-echo-skill","name":"Echo Tool","description":"Echos the input given","tags":["echo","repeater"],"examples":["I will see this echoed back to me"],"inputModes":["text"],"outputModes":["text"]}]}
========= starting a new task ========
What do you want to send to the agent? (:q or quit to exit): hey
Select a file path to attach? (press enter to skip):
{"jsonrpc":"2.0","id":"eca7ecf4d6da4a65a4ff99ab0954b957","result":{"id":"62636e021ac0483bb31d40c1473796fa","sessionId":"438927e3540f459389f3d3cb216dd945","status":{"state":"completed","message":{"role":"agent","parts":[{"type":"text","text":"<think>\nOkay, the user just said \"hey\". That's a pretty open-ended greeting. I need to respond in a friendly and welcoming way. Maybe start with a greeting like \"Hi there!\" to keep it casual. Then, ask how I can assist them. Since they didn't specify a topic, I should keep the response general but inviting. Let me make sure the tone is positive and approachable. Also, check if there's any specific context I should consider, but since there's no prior conversation, it's safe to assume they just want to start a new interaction. Alright, time to put that together.\n</think>\n\nHi there! How can I assist you today? ?"}]},"timestamp":"2025-05-04T10:01:55.068049"},"artifacts":[{"parts":[{"type":"text","text":"<think>\nOkay, the user just said \"hey\". That's a pretty open-ended greeting. I need to respond in a friendly and welcoming way. Maybe start with a greeting like \"Hi there!\" to keep it casual. Then, ask how I can assist them. Since they didn't specify a topic, I should keep the response general but inviting. Let me make sure the tone is positive and approachable. Also, check if there's any specific context I should consider, but since there's no prior conversation, it's safe to assume they just want to start a new interaction. Alright, time to put that together.\n</think>\n\nHi there! How can I assist you today? ?"}],"index":0}],"history":[{"role":"user","parts":[{"type":"text","text":"hey"}]}]}}
========= starting a new task ========
What do you want to send to the agent? (:q or quit to exit):
恭喜!您现在拥有一个使用AI模型生成响应的A2A服务器!
目前有关 A2A的实现都很简陋,包括官方的,其他的一些参考如下:
Name | Author | Description | Stars |
a2ajava | @vishalmysore | A pure Java implementation of Google's A2A protocol for Spring Boot applications, featuring both client and server implementations | |
legion-a2a | @TheRaLabs | A TypeScript implementation of the A2A protocol with a focus on modularity and extensibility | |
trpc-a2a-go | @trpc-group | Go A2A implementation by the tRPC team featuring full client/server support, in-memory task management, streaming responses, session management, multiple auth methods (JWT, API Key, OAuth2), and comprehensive examples | |
jira-a2a | @tuannvm | The Jira A2A system is a DevOps workflow automation platform using the tRPC-A2A-Go framework. It consists of independent Go agents that communicate via A2A messages. | |
a2a-go | @a2aserver | A Go library for building A2A servers, with example implementations | |
a2a-rs | @EmilLindfors | An idiomatic Rust implementation following hexagonal architecture principles | |
a2a_min | @pcingola | A minimalistic Python SDK for A2A communication | |
a2adotnet | @azixaka | A C#/.NET implementation of the A2A protocol | |
nestjs-a2a | @thestupd | A module for integrating the A2A protocol into NestJS applications | |
python-a2a | @themanojdesai | An easy-to-use Python library for implementing the A2A protocol | |
Aira | @IhateCreatingUserNames2 | An A2A network implementation for hosting, registering, discovering, and interacting with agents | |
Cognisphere | @IhateCreatingUserNames2 | An AI agent development framework built on Google's ADK, facilitating agent creation potentially for A2A networks | |
a2a-server | @chrishayuk | A lightweight A2A python implementation | |
a2a-cli | @chrishayuk | A command-line client for the A2A | |
A2A Test Suit | @robert-at-pretension-io | A2A Test Suite | |
Grasp | @adcentury | A Self-hosted Browser Using Agent with built-in MCP and A2A support | |
swissknife | @daltonnyx | A multi-agent chat application with MCP support, aiming to expose agents via the A2A protocol and connect to remote A2A agents as a client | |
artinet-sdk | @the-artinet-project | A JS/TS SDK for the Agent2Agent Protocol with a focus on developer experience and comprehensive features |
一些社区的案例:
Name | Author | Description | Stars |
a2a-agent-coder | @sing1ee | A Coder Agent implementation with A2A Server and Client | |
agentic-trading | @kweinmeister | A sample application demonstrating Google ADK and A2A interoperability for trading automation | |
python-a2a-tutorial | @sing1ee | A comprehensive tutorial for implementing A2A in Python with practical examples |
53AI,企业落地大模型首选服务商
产品:场景落地咨询+大模型应用平台+行业解决方案
承诺:免费场景POC验证,效果验证后签署服务协议。零风险落地应用大模型,已交付160+中大型企业
2025-06-06
Anthropic官方揭秘内部团队如何使用 Claude Code(附完整版手册)
2025-06-06
图像编辑模型SeedEdit 3.0发布!更强保持力,更高可用率
2025-06-05
如何构建AI Agent快速分析行业景气度
2025-06-05
MCP Server 之旅第 5 站:服务鉴权体系解密
2025-06-05
Cursor 1.0 正式发布!BugBot 自动代码审查,Background Agent 全面开放、MCP一键安装
2025-06-05
11张图全面总结 MCP、A2A、Function Calling 架构设计间关系
2025-06-05
一手实测地产首个Agent,实话说:不错!
2025-06-05
面向 Data+AI 的新一代数智开发平台
2024-08-13
2024-06-13
2024-08-21
2024-07-31
2024-09-23
2024-05-28
2024-08-04
2024-04-26
2024-07-09
2024-07-20
2025-06-05
2025-06-04
2025-06-04
2025-06-03
2025-06-02
2025-05-31
2025-05-29
2025-05-29