4.核心实现

# 4.1 重要组件
# host
host代表着真实的使用agent的应用,在这个应用中可能会集成多个A2AClient以调用不同的AI智能体,通过一定的Prompt和function calling能力来选择调用哪个远程Agent。
# A2AClient
每一个A2A Client对应着一个远程的Agent,通过该A2AClient可以与远程Agent进行通信交互,A2AClient中记录了远程Agent的url,可以通过http请求来发送信息。
class A2AClient:
def __init__(self, agent_card: AgentCard = None, url: str = None):
if agent_card:
self.url = agent_card.url
elif url:
self.url = url
else:
raise ValueError("Must provide either agent_card or url")
2
3
4
5
6
7
8
# A2ACardResolver
每一个A2AClient中包含了一个AgentCard信息,通过A2ACardResolver来获取AgentCard信息
class A2ACardResolver:
def __init__(self, base_url, agent_card_path="/.well-known/agent.json"):
self.base_url = base_url.rstrip("/")
self.agent_card_path = agent_card_path.lstrip("/")
def get_agent_card(self) -> AgentCard:
with httpx.Client() as client:
response = client.get(self.base_url + "/" + self.agent_card_path)
response.raise_for_status()
try:
return AgentCard(**response.json())
except json.JSONDecodeError as e:
raise A2AClientJSONError(str(e)) from e
2
3
4
5
6
7
8
9
10
11
12
13
# A2AServer
A2AServer代表一个具体的Agent的实现,每一个A2AServer都是一个http服务,对外提供Agent信息
class A2AServer:
def __init__(
self,
host="0.0.0.0",
port=5000,
endpoint="/",
agent_card: AgentCard = None,
task_manager: TaskManager = None,
):
2
3
4
5
6
7
8
9
endpoint路径为/.well-known/agent.json,用于提供该agent的相关信息,A2AClient可以通过请求该信息来构造AgentCard,示例如下:
{
"name": "CurrencyAgent",
"description": "Helps with exchange rates for currencies",
"url": "http://localhost:8080/",
"version": "1.0.0",
"capabilities": {
"streaming": true,
"pushNotifications": true,
"stateTransitionHistory": false
},
"defaultInputModes": [
"text",
"text/plain"
],
"defaultOutputModes": [
"text",
"text/plain"
],
"skills": [
{
"id": "convert_currency",
"name": "Currency Exchange Rates Tool",
"description": "Helps with exchange values between various currencies",
"tags": [
"currency conversion",
"currency exchange"
],
"examples": [
"What is exchange rate between USD and GBP?"
]
}
]
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
# AgentSkill
参数含义参见上文2.2.5 AgentSkill
class AgentSkill(BaseModel):
id: str
name: str
description: str | None = None
tags: List[str] | None = None
examples: List[str] | None = None
inputModes: List[str] | None = None
outputModes: List[str] | None = None
2
3
4
5
6
7
8
# TaskManager
用于对agent的任务进行管理:
class TaskManager(ABC):
@abstractmethod
async def on_get_task(self, request: GetTaskRequest) -> GetTaskResponse:
pass
@abstractmethod
async def on_cancel_task(self, request: CancelTaskRequest) -> CancelTaskResponse:
pass
@abstractmethod
async def on_send_task(self, request: SendTaskRequest) -> SendTaskResponse:
pass
@abstractmethod
async def on_send_task_subscribe(
self, request: SendTaskStreamingRequest
) -> Union[AsyncIterable[SendTaskStreamingResponse], JSONRPCResponse]:
pass
@abstractmethod
async def on_set_task_push_notification(
self, request: SetTaskPushNotificationRequest
) -> SetTaskPushNotificationResponse:
pass
@abstractmethod
async def on_get_task_push_notification(
self, request: GetTaskPushNotificationRequest
) -> GetTaskPushNotificationResponse:
pass
@abstractmethod
async def on_resubscribe_to_task(
self, request: TaskResubscriptionRequest
) -> Union[AsyncIterable[SendTaskResponse], JSONRPCResponse]:
pass
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
# Task
Task对应着发布的一个具体的需要Agent协作完成的一个需求,是一种有状态的实体,并且执行后会生成一定的结果,结果称为制品(Artifacts)
多个任务可以通过设置一个共同的sessionId来归属于同一个会话(Conversation),任务的状态由不同的远程代理来决定,远程代理在接受到任务后可能会采取以下几种行动:
- 立即执行请求
- 安排延迟处理
- 拒绝请求
- 协商调整执行方式
- 要求客户端补充信息
- 委托其他代理或系统处理
class Task(BaseModel):
id: str # 任务唯一标识符
sessionId: str | None = None # 客户端生成的用于持有任务的会话 ID
status: TaskStatus # 任务当前状态
artifacts: List[Artifact] | None = None # 最终结果
history: List[Message] | None = None # 历史Agent创建的最终结果集合
metadata: dict[str, Any] | None = None # 元数据信息
# 任务状态以及伴随的消息
class TaskStatus(BaseModel):
state: TaskState
message: Message | None = None
timestamp: datetime = Field(default_factory=datetime.now)
# 任务状态更新事件,sendSubscribe 或 subscribe 请求期间由服务器发送
class TaskStatusUpdateEvent(BaseModel):
id: str
status: TaskStatus
final: bool = False
metadata: dict[str, Any] | None = None
# 任务最终结果更新事件,sendSubscribe 或 subscribe 请求期间由服务器发送
class TaskArtifactUpdateEvent(BaseModel):
id: str
artifact: Artifact
metadata: dict[str, Any] | None = None
# 创建、继续或者重启任务的入参
class TaskSendParams(BaseModel):
id: str
sessionId: str = Field(default_factory=lambda: uuid4().hex)
message: Message
acceptedOutputModes: Optional[List[str]] = None
pushNotification: PushNotificationConfig | None = None
historyLength: int | None = None
metadata: dict[str, Any] | None = None
# 任务状态
class TaskState(str, Enum):
SUBMITTED = "submitted"
WORKING = "working"
INPUT_REQUIRED = "input-required"
COMPLETED = "completed"
CANCELED = "canceled"
FAILED = "failed"
UNKNOWN = "unknown"
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
# Artifacts
Agent将Artifacts作为任务的最终成果生成,Artifacts具有以下特性:
- 不可变性: 生成后不可修改
- 可命名性: 支持自定义标识
- 多部分结构: 可由多个组件构成(流式响应可动态追加新组件至现有制品)
单个任务可能产出多个工件(Part)。例如执行"创建网页"任务时,可能分别生成HTML文件和图片资源两个独立的制品
class Artifact(BaseModel):
name: str | None = None
description: str | None = None
parts: List[Part]
metadata: dict[str, Any] | None = None
index: int = 0
append: bool | None = None
lastChunk: bool | None = None
2
3
4
5
6
7
8
# Part
客户端与远程代理之间交换消息或工件时传输的独立数据块。每个内容单元包含:
- 专属内容类型 (如JSON/二进制流)
- 元数据标头 (描述性信息)
class TextPart(BaseModel):
type: Literal["text"] = "text"
text: str
metadata: dict[str, Any] | None = None
class FilePart(BaseModel):
type: Literal["file"] = "file"
file: FileContent
metadata: dict[str, Any] | None = None
class DataPart(BaseModel):
type: Literal["data"] = "data"
data: dict[str, Any]
metadata: dict[str, Any] | None = None
Part = Annotated[Union[TextPart, FilePart, DataPart], Field(discriminator="type")]
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
# 4.2 多Agent协作
# ApplicationManager
用于管理多个agent,核心方法如下
class ApplicationManager(ABC):
@abstractmethod
def create_conversation(self) -> Conversation:
pass
@abstractmethod
def sanitize_message(self, message: Message) -> Message:
pass
@abstractmethod
async def process_message(self, message: Message):
pass
@abstractmethod
def register_agent(self, url: str):
pass
@abstractmethod
def get_pending_messages(self) -> list[str]:
pass
@property
@abstractmethod
def conversations(self) -> list[Conversation]:
pass
@property
@abstractmethod
def tasks(self) -> list[Task]:
pass
@property
@abstractmethod
def agents(self) -> list[AgentCard]:
pass
@property
@abstractmethod
def events(self) -> list[Event]:
pass
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
# 多Agent管理 - HostAgent
可以注册多个A2AClient实现多个Agent的协作,通过Prompt和function calling能力来选择调用哪个远程Agent
class HostAgent:
"""The host agent.
This is the agent responsible for choosing which remote agents to send
tasks to and coordinate their work.
"""
def __init__(
self,
remote_agent_addresses: List[str],
task_callback: TaskUpdateCallback | None = None
):
self.task_callback = task_callback
self.remote_agent_connections: dict[str, RemoteAgentConnections] = {}
self.cards: dict[str, AgentCard] = {}
for address in remote_agent_addresses:
card_resolver = A2ACardResolver(address)
card = card_resolver.get_agent_card()
remote_connection = RemoteAgentConnections(card)
self.remote_agent_connections[card.name] = remote_connection
self.cards[card.name] = card
agent_info = []
for ra in self.list_remote_agents():
agent_info.append(json.dumps(ra))
self.agents = '\n'.join(agent_info)
# list_remote_agents: 获取所有的远程可用的agent信息
# send_task: 将指定任务发生到指定的agent上
def create_agent(self) -> Agent:
return Agent(
model="gemini-2.0-flash-001",
name="host_agent",
instruction=self.root_instruction,
before_model_callback=self.before_model_callback,
description=(
"This agent orchestrates the decomposition of the user request into"
" tasks that can be performed by the child agents."
),
tools=[
self.list_remote_agents,
self.send_task,
],
)
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
# 4.3 参考资料
可参考A2A项目实现: