AGI围城
首页
基础知识
工程实践
所见所思
  • 学习
  • 面试
  • 心情杂货
  • 实用技巧
  • 友情链接
关于
收藏
  • 分类
  • 标签
  • 归档
GitHub (opens new window)

AGI围城

是的,这个世界还是很有趣的。
首页
基础知识
工程实践
所见所思
  • 学习
  • 面试
  • 心情杂货
  • 实用技巧
  • 友情链接
关于
收藏
  • 分类
  • 标签
  • 归档
GitHub (opens new window)
  • 提示词工程

  • 大模型API调用

  • OpenAI工具

  • 嵌入向量

  • 检索增强生成(RAG)

  • LlamaIndex

  • LangChain

  • Agent

  • Workflow

  • Transformer

  • 微调

  • MCP

  • A2A

    • 1.A2A概述
    • 2.A2A核心架构
    • 3.A2A开发实践
    • 4.核心实现
      • 4.1 重要组件
        • host
        • A2AClient
        • A2ACardResolver
        • A2AServer
        • AgentSkill
        • TaskManager
        • Task
        • Artifacts
        • Part
      • 4.2 多Agent协作
        • ApplicationManager
        • 多Agent管理 - HostAgent
      • 4.3 参考资料
  • 基础知识
  • A2A
xiao_sl
2025-05-26
目录

4.核心实现

a2a-core

# 4.1 重要组件

# host

host代表着真实的使用agent的应用,在这个应用中可能会集成多个A2AClient以调用不同的AI智能体,通过一定的Prompt和function calling能力来选择调用哪个远程Agent。

# A2AClient

每一个A2A Client对应着一个远程的Agent,通过该A2AClient可以与远程Agent进行通信交互,A2AClient中记录了远程Agent的url,可以通过http请求来发送信息。

class A2AClient:
    def __init__(self, agent_card: AgentCard = None, url: str = None):
        if agent_card:
            self.url = agent_card.url
        elif url:
            self.url = url
        else:
            raise ValueError("Must provide either agent_card or url")
1
2
3
4
5
6
7
8

# A2ACardResolver

每一个A2AClient中包含了一个AgentCard信息,通过A2ACardResolver来获取AgentCard信息

class A2ACardResolver:
    def __init__(self, base_url, agent_card_path="/.well-known/agent.json"):
        self.base_url = base_url.rstrip("/")
        self.agent_card_path = agent_card_path.lstrip("/")

    def get_agent_card(self) -> AgentCard:
        with httpx.Client() as client:
            response = client.get(self.base_url + "/" + self.agent_card_path)
            response.raise_for_status()
            try:
                return AgentCard(**response.json())
            except json.JSONDecodeError as e:
                raise A2AClientJSONError(str(e)) from e
1
2
3
4
5
6
7
8
9
10
11
12
13

# A2AServer

A2AServer代表一个具体的Agent的实现,每一个A2AServer都是一个http服务,对外提供Agent信息

class A2AServer:
    def __init__(
        self,
        host="0.0.0.0",
        port=5000,
        endpoint="/",
        agent_card: AgentCard = None,
        task_manager: TaskManager = None,
    ):
1
2
3
4
5
6
7
8
9

endpoint路径为/.well-known/agent.json,用于提供该agent的相关信息,A2AClient可以通过请求该信息来构造AgentCard,示例如下:

{
    "name": "CurrencyAgent",
    "description": "Helps with exchange rates for currencies",
    "url": "http://localhost:8080/",
    "version": "1.0.0",
    "capabilities": {
        "streaming": true,
        "pushNotifications": true,
        "stateTransitionHistory": false
    },
    "defaultInputModes": [
        "text",
        "text/plain"
    ],
    "defaultOutputModes": [
        "text",
        "text/plain"
    ],
    "skills": [
        {
            "id": "convert_currency",
            "name": "Currency Exchange Rates Tool",
            "description": "Helps with exchange values between various currencies",
            "tags": [
                "currency conversion",
                "currency exchange"
            ],
            "examples": [
                "What is exchange rate between USD and GBP?"
            ]
        }
    ]
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33

# AgentSkill

参数含义参见上文2.2.5 AgentSkill

class AgentSkill(BaseModel):
    id: str
    name: str
    description: str | None = None
    tags: List[str] | None = None
    examples: List[str] | None = None
    inputModes: List[str] | None = None
    outputModes: List[str] | None = None
1
2
3
4
5
6
7
8

# TaskManager

用于对agent的任务进行管理:

class TaskManager(ABC):
    @abstractmethod
    async def on_get_task(self, request: GetTaskRequest) -> GetTaskResponse:
        pass

    @abstractmethod
    async def on_cancel_task(self, request: CancelTaskRequest) -> CancelTaskResponse:
        pass

    @abstractmethod
    async def on_send_task(self, request: SendTaskRequest) -> SendTaskResponse:
        pass

    @abstractmethod
    async def on_send_task_subscribe(
        self, request: SendTaskStreamingRequest
    ) -> Union[AsyncIterable[SendTaskStreamingResponse], JSONRPCResponse]:
        pass

    @abstractmethod
    async def on_set_task_push_notification(
        self, request: SetTaskPushNotificationRequest
    ) -> SetTaskPushNotificationResponse:
        pass

    @abstractmethod
    async def on_get_task_push_notification(
        self, request: GetTaskPushNotificationRequest
    ) -> GetTaskPushNotificationResponse:
        pass

    @abstractmethod
    async def on_resubscribe_to_task(
        self, request: TaskResubscriptionRequest
    ) -> Union[AsyncIterable[SendTaskResponse], JSONRPCResponse]:
        pass
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36

# Task

Task对应着发布的一个具体的需要Agent协作完成的一个需求,是一种有状态的实体,并且执行后会生成一定的结果,结果称为制品(Artifacts)

多个任务可以通过设置一个共同的sessionId来归属于同一个会话(Conversation),任务的状态由不同的远程代理来决定,远程代理在接受到任务后可能会采取以下几种行动:

  • 立即执行请求
  • 安排延迟处理
  • 拒绝请求
  • 协商调整执行方式
  • 要求客户端补充信息
  • 委托其他代理或系统处理
class Task(BaseModel):
    id: str  # 任务唯一标识符
    sessionId: str | None = None  # 客户端生成的用于持有任务的会话 ID
    status: TaskStatus  # 任务当前状态
    artifacts: List[Artifact] | None = None  # 最终结果
    history: List[Message] | None = None  # 历史Agent创建的最终结果集合
    metadata: dict[str, Any] | None = None  # 元数据信息

# 任务状态以及伴随的消息
class TaskStatus(BaseModel):
    state: TaskState
    message: Message | None = None
    timestamp: datetime = Field(default_factory=datetime.now)

# 任务状态更新事件,sendSubscribe 或 subscribe 请求期间由服务器发送
class TaskStatusUpdateEvent(BaseModel):
    id: str
    status: TaskStatus
    final: bool = False
    metadata: dict[str, Any] | None = None

# 任务最终结果更新事件,sendSubscribe 或 subscribe 请求期间由服务器发送
class TaskArtifactUpdateEvent(BaseModel):
    id: str
    artifact: Artifact
    metadata: dict[str, Any] | None = None

# 创建、继续或者重启任务的入参
class TaskSendParams(BaseModel):
    id: str
    sessionId: str = Field(default_factory=lambda: uuid4().hex)
    message: Message
    acceptedOutputModes: Optional[List[str]] = None
    pushNotification: PushNotificationConfig | None = None
    historyLength: int | None = None
    metadata: dict[str, Any] | None = None

# 任务状态
class TaskState(str, Enum):
    SUBMITTED = "submitted"
    WORKING = "working"
    INPUT_REQUIRED = "input-required"
    COMPLETED = "completed"
    CANCELED = "canceled"
    FAILED = "failed"
    UNKNOWN = "unknown"
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46

# Artifacts

Agent将Artifacts作为任务的最终成果生成,Artifacts具有以下特性:

  • 不可变性: 生成后不可修改
  • 可命名性: 支持自定义标识
  • 多部分结构: 可由多个组件构成(流式响应可动态追加新组件至现有制品)

单个任务可能产出多个工件(Part)。例如执行"创建网页"任务时,可能分别生成HTML文件和图片资源两个独立的制品

class Artifact(BaseModel):
    name: str | None = None
    description: str | None = None
    parts: List[Part]
    metadata: dict[str, Any] | None = None
    index: int = 0
    append: bool | None = None
    lastChunk: bool | None = None
1
2
3
4
5
6
7
8

# Part

客户端与远程代理之间交换消息或工件时传输的独立数据块。每个内容单元包含:

  • 专属内容类型 (如JSON/二进制流)
  • 元数据标头 (描述性信息)
class TextPart(BaseModel):
    type: Literal["text"] = "text"
    text: str
    metadata: dict[str, Any] | None = None

class FilePart(BaseModel):
    type: Literal["file"] = "file"
    file: FileContent
    metadata: dict[str, Any] | None = None

class DataPart(BaseModel):
    type: Literal["data"] = "data"
    data: dict[str, Any]
    metadata: dict[str, Any] | None = None

Part = Annotated[Union[TextPart, FilePart, DataPart], Field(discriminator="type")]
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16

# 4.2 多Agent协作

# ApplicationManager

用于管理多个agent,核心方法如下

class ApplicationManager(ABC):
    @abstractmethod
    def create_conversation(self) -> Conversation:
        pass

    @abstractmethod
    def sanitize_message(self, message: Message) -> Message:
        pass

    @abstractmethod
    async def process_message(self, message: Message):
        pass

    @abstractmethod
    def register_agent(self, url: str):
        pass

    @abstractmethod
    def get_pending_messages(self) -> list[str]:
        pass

    @property
    @abstractmethod
    def conversations(self) -> list[Conversation]:
        pass

    @property
    @abstractmethod
    def tasks(self) -> list[Task]:
        pass

    @property
    @abstractmethod
    def agents(self) -> list[AgentCard]:
        pass

    @property
    @abstractmethod
    def events(self) -> list[Event]:
        pass
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40

# 多Agent管理 - HostAgent

可以注册多个A2AClient实现多个Agent的协作,通过Prompt和function calling能力来选择调用哪个远程Agent

class HostAgent:
    """The host agent.

    This is the agent responsible for choosing which remote agents to send
    tasks to and coordinate their work.
    """

    def __init__(
        self,
        remote_agent_addresses: List[str],
        task_callback: TaskUpdateCallback | None = None
    ):
        self.task_callback = task_callback
        self.remote_agent_connections: dict[str, RemoteAgentConnections] = {}
        self.cards: dict[str, AgentCard] = {}
        for address in remote_agent_addresses:
            card_resolver = A2ACardResolver(address)
            card = card_resolver.get_agent_card()
            remote_connection = RemoteAgentConnections(card)
            self.remote_agent_connections[card.name] = remote_connection
            self.cards[card.name] = card
        agent_info = []
        for ra in self.list_remote_agents():
            agent_info.append(json.dumps(ra))
        self.agents = '\n'.join(agent_info)

    # list_remote_agents: 获取所有的远程可用的agent信息
    # send_task: 将指定任务发生到指定的agent上

    def create_agent(self) -> Agent:
        return Agent(
            model="gemini-2.0-flash-001",
            name="host_agent",
            instruction=self.root_instruction,
            before_model_callback=self.before_model_callback,
            description=(
                "This agent orchestrates the decomposition of the user request into"
                " tasks that can be performed by the child agents."
            ),
            tools=[
                self.list_remote_agents,
                self.send_task,
            ],
        )
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44

# 4.3 参考资料

可参考A2A项目实现:

  • Python项目: https://github.com/themanojdesai/python-a2a (opens new window)
  • GoLang项目: https://github.com/trpc-group/trpc-a2a-go (opens new window)
  • A2A相关资料整理项目: https://github.com/ai-boost/awesome-a2a (opens new window)
编辑 (opens new window)
#A2A
上次更新: 2025/12/19, 15:17:48
3.A2A开发实践

← 3.A2A开发实践

最近更新
01
我是如何发现临时邮箱的?一个真实的故事
06-12
02
3.A2A开发实践
05-22
03
2.A2A核心架构
05-18
更多文章>
Theme by Vdoing | Copyright © 2019-2025 AGI围城 | 桂ICP备2024034950号 | 桂公网安备45142202000030
  • 跟随系统
  • 浅色模式
  • 深色模式
  • 阅读模式