跳过正文
  1. 文章/

A2A协议:2026年构建真正可用的多Agent系统

作者
XiDao
XiDao 为全球开发者提供稳定、高速、低成本的大模型 API 网关服务。一个 API Key 接入 OpenAI、Anthropic、Google、Meta 等主流模型,智能路由、自动重试、成本优化。

2026年的多Agent难题
#

到2026年中,大多数开发团队已经采用MCP(Model Context Protocol)来连接AI模型和工具。但一个关键的空白仍然存在:AI Agent之间如何相互通信?

来看一个真实场景:一个电商平台部署了三个专业Agent:

  • 库存Agent — 监控库存水平,预测需求
  • 定价Agent — 根据市场情况调整价格
  • 客服Agent — 处理咨询,处理退货

每个Agent独立运行时都表现出色。但当定价Agent需要在应用折扣前询问库存Agent关于库存可用性的信息时,它们之间没有标准的通信方式。团队最终只能构建脆弱的、定制化的集成方案,在规模化时频繁崩溃。

这正是Google的 Agent-to-Agent(A2A)协议 所解决的问题。

什么是A2A?
#

A2A是一个开放协议,使AI Agent能够相互发现、通信和协作——无论它们使用什么框架、供应商或运行时。MCP连接模型和工具,而A2A连接 Agent与Agent

这样理解:

协议连接对象类比
MCP模型 ↔ 工具USB-C(设备到外设)
A2AAgent ↔ AgentHTTP(服务器到服务器)

核心概念
#

┌──────────────┐   A2A 协议        ┌──────────────┐
│   Agent A    │ ◄───────────────► │   Agent B    │
│  (客户端)    │  HTTP + JSON-RPC  │  (远程端)    │
└──────┬───────┘                   └──────┬───────┘
       │                                  │
       ▼                                  ▼
  Agent Card                        Agent Card
  (能力发现)                        (能力发现)

A2A定义了三个核心原语:

  1. Agent Card — 发布在 /.well-known/agent.json 的JSON文档,描述Agent的能力、端点和认证要求
  2. Task — 带有生命周期的工作单元(已提交 → 处理中 → 已完成/失败)
  3. Message — Agent之间的结构化通信,支持文本、文件和结构化数据

Agent Card示例
#

每个符合A2A标准的Agent都会发布其能力:

{
  "name": "库存智能Agent",
  "description": "监控库存水平、预测需求并优化库存分配",
  "url": "https://inventory-agent.example.com/a2a",
  "version": "2.0",
  "capabilities": {
    "streaming": true,
    "pushNotifications": true,
    "stateTransitionHistory": true
  },
  "authentication": {
    "schemes": ["Bearer"]
  },
  "defaultInputModes": ["text", "structured-data"],
  "defaultOutputModes": ["text", "structured-data", "chart"],
  "skills": [
    {
      "id": "demand-forecast",
      "name": "需求预测",
      "description": "预测未来7-90天的产品需求",
      "tags": ["库存", "预测", "分析"],
      "examples": [
        "预测SKU-12345未来30天的需求",
        "下周哪些产品需要补货?"
      ]
    },
    {
      "id": "stock-check",
      "name": "库存查询",
      "description": "所有仓库的实时库存水平",
      "tags": ["库存", "实时"],
      "examples": [
        "SKU-67890还有多少库存?",
        "检查Widget Pro在所有仓库的库存"
      ]
    }
  ]
}

构建A2A服务器
#

让我们用Python构建一个生产级的A2A服务器。我们将创建一个代码审查Agent,其他Agent可以向它委派代码分析任务。

项目结构
#

code-review-agent/
├── agent_card.json
├── server.py
├── skills/
│   ├── security_scan.py
│   ├── performance_analysis.py
│   └── style_check.py
└── requirements.txt

核心A2A服务器实现
#

# server.py
import uuid
import asyncio
from datetime import datetime
from fastapi import FastAPI, Request, HTTPException
from fastapi.responses import StreamingResponse
from pydantic import BaseModel
from typing import Optional
import json

app = FastAPI(title="代码审查 A2A Agent")

# 任务存储(生产环境使用Redis)
tasks: dict[str, dict] = {}

# --- Agent Card 端点 ---

@app.get("/.well-known/agent.json")
async def agent_card():
    return {
        "name": "Code Review Agent",
        "description": "执行安全扫描、性能分析和代码风格检查",
        "url": "https://code-review-agent.example.com/a2a",
        "version": "1.0.0",
        "capabilities": {
            "streaming": True,
            "pushNotifications": True,
            "stateTransitionHistory": True
        },
        "authentication": {"schemes": ["Bearer"]},
        "defaultInputModes": ["text", "structured-data"],
        "defaultOutputModes": ["text", "structured-data"],
        "skills": [
            {
                "id": "security-scan",
                "name": "安全漏洞扫描",
                "description": "检测OWASP Top 10漏洞和常见安全问题",
                "tags": ["security", "code-review", "owasp"],
                "examples": [
                    "扫描这段Python代码的SQL注入风险",
                    "检查这个React组件的XSS漏洞"
                ]
            },
            {
                "id": "performance-analysis",
                "name": "性能分析",
                "description": "识别N+1查询、内存泄漏和算法效率问题",
                "tags": ["performance", "optimization"],
                "examples": [
                    "查找这个Django视图中的N+1查询问题",
                    "分析这个函数的时间复杂度问题"
                ]
            }
        ]
    }

# --- 任务管理 ---

@app.post("/a2a")
async def handle_task(request: Request):
    body = await request.json()

    method = body.get("method")
    params = body.get("params", {})
    request_id = body.get("id")

    if method == "tasks/send":
        return await handle_send_task(params, request_id)
    elif method == "tasks/sendSubscribe":
        return await handle_streaming_task(params, request_id)
    elif method == "tasks/get":
        return await handle_get_task(params, request_id)
    elif method == "tasks/cancel":
        return await handle_cancel_task(params, request_id)
    else:
        raise HTTPException(400, f"Unknown method: {method}")


async def handle_send_task(params: dict, request_id: str) -> dict:
    """处理代码审查任务。"""
    task_id = params.get("id", str(uuid.uuid4()))
    message = params.get("message", {})
    skill_id = params.get("skillId", "security-scan")

    tasks[task_id] = {
        "id": task_id,
        "status": {"state": "working", "timestamp": datetime.utcnow().isoformat()},
        "history": [message],
        "artifacts": []
    }

    code_text = extract_code(message)
    result = await run_review_skill(skill_id, code_text)

    tasks[task_id]["status"] = {
        "state": "completed",
        "timestamp": datetime.utcnow().isoformat()
    }
    tasks[task_id]["artifacts"] = [{
        "parts": [{"type": "text", "text": result}]
    }]

    return {
        "jsonrpc": "2.0",
        "id": request_id,
        "result": tasks[task_id]
    }


async def handle_streaming_task(params: dict, request_id: str):
    """通过SSE流式传输审查进度。"""
    task_id = params.get("id", str(uuid.uuid4()))
    message = params.get("message", {})
    skill_id = params.get("skillId", "security-scan")

    async def event_stream():
        yield f"data: {json.dumps({'type': 'task_status', 'state': 'working'})}\n\n"

        code_text = extract_code(message)
        stages = [
            "解析代码结构...",
            "分析导入和依赖...",
            "运行安全模式匹配...",
            "生成发现报告..."
        ]

        for i, stage in enumerate(stages):
            await asyncio.sleep(0.5)
            progress = {"type": "progress", "stage": stage, "percent": (i + 1) * 25}
            yield f"data: {json.dumps(progress)}\n\n"

        result = await run_review_skill(skill_id, code_text)
        yield f"data: {json.dumps({'type': 'task_result', 'state': 'completed', 'result': result})}\n\n"

    return StreamingResponse(event_stream(), media_type="text/event-stream")

# --- 审查技能 ---

async def run_review_skill(skill_id: str, code: str) -> str:
    """通过XiDao API Gateway使用LLM执行审查技能。"""
    import openai

    client = openai.AsyncOpenAI(
        api_key="your-xidao-api-key",
        base_url="https://global.xidao.online/v1"
    )

    prompts = {
        "security-scan": (
            "你是安全专家。分析以下代码的OWASP Top 10漏洞。"
            "对每个发现,提供:严重程度(Critical/High/Medium/Low)、"
            "位置、描述和修复代码。\n\n代码:\n```\n{code}\n```"
        ),
        "performance-analysis": (
            "你是性能工程师。分析这段代码的:N+1查询、"
            "不必要的分配、算法复杂度问题和async/await反模式。"
            "提供具体的修复代码示例。\n\n代码:\n```\n{code}\n```"
        ),
    }

    prompt = prompts.get(skill_id, prompts["security-scan"]).format(code=code)

    response = await client.chat.completions.create(
        model="claude-4-sonnet",
        messages=[{"role": "user", "content": prompt}],
        temperature=0.1,
        max_tokens=4096
    )

    return response.choices[0].message.content


def extract_code(message: dict) -> str:
    """从A2A消息中提取代码文本。"""
    parts = message.get("parts", [])
    for part in parts:
        if part.get("type") == "text":
            return part["text"]
    return ""


@app.get("/a2a/tasks/{task_id}")
async def get_task(task_id: str):
    if task_id not in tasks:
        raise HTTPException(404, "Task not found")
    return tasks[task_id]

if __name__ == "__main__":
    import uvicorn
    uvicorn.run(app, host="0.0.0.0", port=8000)

构建A2A客户端:编排多个Agent
#

现在让我们构建一个编排器,用于发现Agent并委派工作:

# orchestrator.py
import httpx
import asyncio
from dataclasses import dataclass

@dataclass
class AgentInfo:
    name: str
    url: str
    skills: list[dict]
    capabilities: dict

class A2AOrchestrator:
    """发现和协调多个A2A Agent。"""

    def __init__(self, gateway_url: str = "https://global.xidao.online"):
        self.agents: dict[str, AgentInfo] = {}
        self.http = httpx.AsyncClient(timeout=120.0)
        self.gateway_url = gateway_url

    async def discover_agent(self, base_url: str) -> AgentInfo:
        """通过获取Agent Card来发现Agent。"""
        card_url = f"{base_url}/.well-known/agent.json"
        response = await self.http.get(card_url)
        card = response.json()

        agent = AgentInfo(
            name=card["name"],
            url=card["url"],
            skills=card.get("skills", []),
            capabilities=card.get("capabilities", {}),
        )
        self.agents[agent.name] = agent
        print(f"已发现: {agent.name} ({len(agent.skills)} 个技能)")
        return agent

    async def find_agent_for_task(self, task_description: str) -> tuple[str, str]:
        """使用LLM找到最适合任务的Agent和技能。"""
        import openai

        catalog = []
        for agent in self.agents.values():
            for skill in agent.skills:
                catalog.append({
                    "agent": agent.name,
                    "skill_id": skill["id"],
                    "skill_name": skill["name"],
                    "description": skill["description"],
                    "tags": skill.get("tags", []),
                })

        client = openai.AsyncOpenAI(
            api_key="your-xidao-api-key",
            base_url=f"{self.gateway_url}/v1"
        )

        response = await client.chat.completions.create(
            model="gpt-4o-mini",  # 快速路由模型
            messages=[
                {"role": "system", "content": (
                    "你是一个路由Agent。给定任务描述和可用Agent技能目录,"
                    "返回最佳匹配的JSON: "
                    '{"agent_name": "...", "skill_id": "..."}'
                )},
                {"role": "user", "content": (
                    f"任务: {task_description}\n\n"
                    f"可用技能:\n{_format_catalog(catalog)}"
                )}
            ],
            response_format={"type": "json_object"},
            temperature=0
        )

        import json
        match = json.loads(response.choices[0].message.content)
        return match["agent_name"], match["skill_id"]

    async def send_task(
        self, agent_name: str, skill_id: str, content: str, stream: bool = False
    ) -> dict:
        """向特定Agent发送任务。"""
        agent = self.agents[agent_name]

        payload = {
            "jsonrpc": "2.0",
            "method": "tasks/sendSubscribe" if stream else "tasks/send",
            "id": f"req-{asyncio.get_event_loop().time():.0f}",
            "params": {
                "skillId": skill_id,
                "message": {
                    "role": "user",
                    "parts": [{"type": "text", "text": content}]
                }
            }
        }

        if stream:
            async with self.http.stream("POST", agent.url, json=payload) as resp:
                async for line in resp.aiter_lines():
                    if line.startswith("data: "):
                        import json
                        event = json.loads(line[6:])
                        print(f"  [{event.get('type', '?')}] {event.get('stage', '')[:100]}")
        else:
            response = await self.http.post(agent.url, json=payload)
            return response.json()

    async def execute_workflow(self, plan: list[dict]) -> list[dict]:
        """执行多步骤Agent工作流。"""
        results = []

        for step in plan:
            task = step["task"]
            depends_on = step.get("depends_on")

            if depends_on is not None:
                context = results[depends_on].get("result", {})
                task = f"上一步的上下文:\n{context}\n\n任务: {task}"

            agent_name, skill_id = await self.find_agent_for_task(task)
            print(f"\n步骤 {len(results)}: 委派给 {agent_name} (技能: {skill_id})")

            result = await self.send_task(agent_name, skill_id, task)
            results.append(result)

        return results


def _format_catalog(catalog: list[dict]) -> str:
    lines = []
    for entry in catalog:
        tags = ", ".join(entry["tags"])
        lines.append(
            f"- [{entry['agent']}] {entry['skill_name']} ({entry['skill_id']}): "
            f"{entry['description']} [标签: {tags}]"
        )
    return "\n".join(lines)


# 使用:多Agent工作流
async def main():
    orchestrator = A2AOrchestrator()

    # 从基础设施发现Agent
    await asyncio.gather(
        orchestrator.discover_agent("https://code-review-agent.example.com"),
        orchestrator.discover_agent("https://deploy-agent.example.com"),
        orchestrator.discover_agent("https://monitoring-agent.example.com"),
    )

    # 定义工作流:审查 -> 部署 -> 监控
    workflow = [
        {"task": "扫描 /src/api/routes.py 的安全漏洞"},
        {"task": "将最新版本部署到预发布环境", "depends_on": 0},
        {"task": "为已部署的服务设置错误率监控", "depends_on": 1},
    ]

    results = await orchestrator.execute_workflow(workflow)
    print("\n工作流完成!", len(results), "个步骤已执行。")

asyncio.run(main())

A2A + MCP:完整的Agent技术栈
#

2026年Agent架构的真正威力来自于两个协议的结合:

┌────────────────────────────────────────────────────────┐
│                    用户请求                             │
└───────────────────────┬────────────────────────────────┘
              ┌──────────────────┐
              │    编排器 Agent   │  ← A2A 客户端
              └────────┬─────────┘
          ┌────────────┼────────────┐
          ▼            ▼            ▼
    ┌──────────┐ ┌──────────┐ ┌──────────┐
    │ 代码审查  │ │  部署    │ │  监控    │  ← A2A Agent
    │  Agent   │ │  Agent   │ │  Agent   │
    └────┬─────┘ └────┬─────┘ └────┬─────┘
         │            │            │
    ┌────┴─────┐ ┌────┴─────┐ ┌────┴─────┐
    │ MCP      │ │ MCP      │ │ MCP      │  ← MCP 工具
    │ 服务器:  │ │ 服务器:  │ │ 服务器:  │
    │ • Git    │ │ • Docker │ │ • Grafana│
    │ • SAST   │ │ • K8s    │ │ • PagerD │
    │ • Semgrep│ │ • AWS    │ │ • Datadog│
    └──────────┘ └──────────┘ └──────────┘

核心原则是关注点的清晰分离:

  • MCP 处理模型到工具的通信(Agent如何执行工作)
  • A2A 处理Agent到Agent的通信(Agent如何协调配合)

API网关作为A2A基础设施
#

在生产环境中运行多Agent系统时,API网关变得至关重要。XiDao API网关为A2A部署提供关键基础设施:

# xidao-a2a-gateway.yaml
a2a_gateway:
  # Agent发现代理
  discovery:
    endpoint: https://gateway.xidao.online/agents
    auto_register: true
    health_check:
      interval: 30s
      path: /.well-known/agent.json

  # 每对Agent的速率限制
  rate_limits:
    default:
      requests_per_minute: 100
      max_concurrent_tasks: 10
    high_priority:
      requests_per_minute: 500
      max_concurrent_tasks: 50

  # 认证和授权
  auth:
    method: bearer
    token_rotation: true
    scopes:
      - agent:read    # 可以发现Agent
      - task:send     # 可以发送任务
      - task:receive  # 可以接收任务

  # 可观测性
  observability:
    log_all_tasks: true
    trace_propagation: true
    metrics:
      - task_latency_p99
      - agent_success_rate
      - skill_invocation_count

API网关对A2A的关键优势
#

特性收益
服务发现Agent通过网关注册并可被发现
负载均衡在同一Agent的多个实例间分配任务
熔断器当Agent宕机时防止级联故障
请求追踪跨多个Agent节点追踪任务
成本归属追踪哪些Agent消耗了最多的LLM Token

多Agent系统生产清单
#

1. 任务超时和截止时间
#

async def send_task_with_deadline(
    self, agent_url: str, task: dict, timeout_seconds: int = 60
):
    """发送带有硬截止时间的任务。"""
    task["params"]["configuration"] = {
        "blockingTimeoutSeconds": timeout_seconds,
    }

    try:
        result = await asyncio.wait_for(
            self.http.post(agent_url, json=task),
            timeout=timeout_seconds + 5
        )
        return result.json()
    except asyncio.TimeoutError:
        return {"error": "Agent未在截止时间内响应", "state": "timeout"}

2. 幂等任务执行
#

import hashlib

def generate_deterministic_task_id(agent_name: str, content: str) -> str:
    """生成确定性任务ID用于去重。"""
    payload = f"{agent_name}:{content}"
    return hashlib.sha256(payload.encode()).hexdigest()[:16]

3. 优雅降级
#

async def execute_with_fallback(
    self, primary_agent: str, fallback_agent: str,
    skill_id: str, task: str
):
    """尝试主Agent,失败则切换到备用Agent。"""
    try:
        return await self.send_task(primary_agent, skill_id, task)
    except Exception as e:
        print(f"主Agent失败 ({e}),切换到备用...")
        return await self.send_task(fallback_agent, skill_id, task)

2026年A2A生态
#

A2A生态正在快速增长:

框架/平台A2A支持
Google Vertex AI原生A2A服务器和客户端
LangChain / LangGraphAgent图的A2A适配器
CrewAI基于A2A的多Agent编排
AutoGen (Microsoft)A2A传输层
Semantic KernelA2A Agent连接器
XiDao API网关A2A基础设施代理

核心要点
#

  1. A2A是Agent的HTTP — 它提供了不同供应商和框架之间AI Agent缺失的互操作层
  2. MCP + A2A是完整技术栈 — MCP用于工具,A2A用于Agent间通信
  3. API网关不可或缺 — 为多Agent系统提供服务发现、速率限制、追踪和认证
  4. 从简单开始 — 先发现一个Agent,发送一个任务,然后逐步构建编排工作流
  5. 生产环境至关重要 — 从第一天就实施超时、重试、幂等性和熔断器

立即开始
#

准备好构建多Agent系统了吗?这是你的行动计划:

  1. 阅读规范: google.github.io/A2A
  2. 尝试SDK: pip install a2a-sdknpm install @a2a/sdk
  3. 获取API密钥: 在 global.xidao.online 注册,获取支持A2A流量的统一API网关
  4. 构建你的第一个Agent: 从上面的代码审查Agent示例开始
  5. 连接Agent: 使用编排器模式来协调工作流

正在构建多Agent系统?在 global.xidao.online 与XiDao社区分享你的经验,或通过 support@xidao.online 联系我们。

相关文章

A2A Protocol: Building Multi-Agent Systems That Actually Work in 2026

The Multi-Agent Problem in 2026 # By mid-2026, most development teams have adopted MCP (Model Context Protocol) for connecting AI models to tools. But a critical gap remains: how do AI agents talk to each other? Consider a real-world scenario: An e-commerce platform deploys three specialized agents: Inventory Agent — monitors stock levels, predicts demand Pricing Agent — adjusts prices based on market conditions Customer Support Agent — handles inquiries, processes returns Each agent works brilliantly in isolation. But when the Pricing Agent needs to ask the Inventory Agent about stock availability before applying a discount, there’s no standard way for them to communicate. Teams end up building fragile, custom integrations that break at scale.