2026年的多Agent难题#
到2026年中,大多数开发团队已经采用MCP(Model Context Protocol)来连接AI模型和工具。但一个关键的空白仍然存在:AI Agent之间如何相互通信?
来看一个真实场景:一个电商平台部署了三个专业Agent:
- 库存Agent — 监控库存水平,预测需求
- 定价Agent — 根据市场情况调整价格
- 客服Agent — 处理咨询,处理退货
每个Agent独立运行时都表现出色。但当定价Agent需要在应用折扣前询问库存Agent关于库存可用性的信息时,它们之间没有标准的通信方式。团队最终只能构建脆弱的、定制化的集成方案,在规模化时频繁崩溃。
这正是Google的 Agent-to-Agent(A2A)协议 所解决的问题。
什么是A2A?#
A2A是一个开放协议,使AI Agent能够相互发现、通信和协作——无论它们使用什么框架、供应商或运行时。MCP连接模型和工具,而A2A连接 Agent与Agent。
这样理解:
| 协议 | 连接对象 | 类比 |
|---|---|---|
| MCP | 模型 ↔ 工具 | USB-C(设备到外设) |
| A2A | Agent ↔ Agent | HTTP(服务器到服务器) |
核心概念#
┌──────────────┐ A2A 协议 ┌──────────────┐
│ Agent A │ ◄───────────────► │ Agent B │
│ (客户端) │ HTTP + JSON-RPC │ (远程端) │
└──────┬───────┘ └──────┬───────┘
│ │
▼ ▼
Agent Card Agent Card
(能力发现) (能力发现)A2A定义了三个核心原语:
- Agent Card — 发布在
/.well-known/agent.json的JSON文档,描述Agent的能力、端点和认证要求 - Task — 带有生命周期的工作单元(已提交 → 处理中 → 已完成/失败)
- Message — Agent之间的结构化通信,支持文本、文件和结构化数据
Agent Card示例#
每个符合A2A标准的Agent都会发布其能力:
{
"name": "库存智能Agent",
"description": "监控库存水平、预测需求并优化库存分配",
"url": "https://inventory-agent.example.com/a2a",
"version": "2.0",
"capabilities": {
"streaming": true,
"pushNotifications": true,
"stateTransitionHistory": true
},
"authentication": {
"schemes": ["Bearer"]
},
"defaultInputModes": ["text", "structured-data"],
"defaultOutputModes": ["text", "structured-data", "chart"],
"skills": [
{
"id": "demand-forecast",
"name": "需求预测",
"description": "预测未来7-90天的产品需求",
"tags": ["库存", "预测", "分析"],
"examples": [
"预测SKU-12345未来30天的需求",
"下周哪些产品需要补货?"
]
},
{
"id": "stock-check",
"name": "库存查询",
"description": "所有仓库的实时库存水平",
"tags": ["库存", "实时"],
"examples": [
"SKU-67890还有多少库存?",
"检查Widget Pro在所有仓库的库存"
]
}
]
}构建A2A服务器#
让我们用Python构建一个生产级的A2A服务器。我们将创建一个代码审查Agent,其他Agent可以向它委派代码分析任务。
项目结构#
code-review-agent/
├── agent_card.json
├── server.py
├── skills/
│ ├── security_scan.py
│ ├── performance_analysis.py
│ └── style_check.py
└── requirements.txt核心A2A服务器实现#
# server.py
import uuid
import asyncio
from datetime import datetime
from fastapi import FastAPI, Request, HTTPException
from fastapi.responses import StreamingResponse
from pydantic import BaseModel
from typing import Optional
import json
app = FastAPI(title="代码审查 A2A Agent")
# 任务存储(生产环境使用Redis)
tasks: dict[str, dict] = {}
# --- Agent Card 端点 ---
@app.get("/.well-known/agent.json")
async def agent_card():
return {
"name": "Code Review Agent",
"description": "执行安全扫描、性能分析和代码风格检查",
"url": "https://code-review-agent.example.com/a2a",
"version": "1.0.0",
"capabilities": {
"streaming": True,
"pushNotifications": True,
"stateTransitionHistory": True
},
"authentication": {"schemes": ["Bearer"]},
"defaultInputModes": ["text", "structured-data"],
"defaultOutputModes": ["text", "structured-data"],
"skills": [
{
"id": "security-scan",
"name": "安全漏洞扫描",
"description": "检测OWASP Top 10漏洞和常见安全问题",
"tags": ["security", "code-review", "owasp"],
"examples": [
"扫描这段Python代码的SQL注入风险",
"检查这个React组件的XSS漏洞"
]
},
{
"id": "performance-analysis",
"name": "性能分析",
"description": "识别N+1查询、内存泄漏和算法效率问题",
"tags": ["performance", "optimization"],
"examples": [
"查找这个Django视图中的N+1查询问题",
"分析这个函数的时间复杂度问题"
]
}
]
}
# --- 任务管理 ---
@app.post("/a2a")
async def handle_task(request: Request):
body = await request.json()
method = body.get("method")
params = body.get("params", {})
request_id = body.get("id")
if method == "tasks/send":
return await handle_send_task(params, request_id)
elif method == "tasks/sendSubscribe":
return await handle_streaming_task(params, request_id)
elif method == "tasks/get":
return await handle_get_task(params, request_id)
elif method == "tasks/cancel":
return await handle_cancel_task(params, request_id)
else:
raise HTTPException(400, f"Unknown method: {method}")
async def handle_send_task(params: dict, request_id: str) -> dict:
"""处理代码审查任务。"""
task_id = params.get("id", str(uuid.uuid4()))
message = params.get("message", {})
skill_id = params.get("skillId", "security-scan")
tasks[task_id] = {
"id": task_id,
"status": {"state": "working", "timestamp": datetime.utcnow().isoformat()},
"history": [message],
"artifacts": []
}
code_text = extract_code(message)
result = await run_review_skill(skill_id, code_text)
tasks[task_id]["status"] = {
"state": "completed",
"timestamp": datetime.utcnow().isoformat()
}
tasks[task_id]["artifacts"] = [{
"parts": [{"type": "text", "text": result}]
}]
return {
"jsonrpc": "2.0",
"id": request_id,
"result": tasks[task_id]
}
async def handle_streaming_task(params: dict, request_id: str):
"""通过SSE流式传输审查进度。"""
task_id = params.get("id", str(uuid.uuid4()))
message = params.get("message", {})
skill_id = params.get("skillId", "security-scan")
async def event_stream():
yield f"data: {json.dumps({'type': 'task_status', 'state': 'working'})}\n\n"
code_text = extract_code(message)
stages = [
"解析代码结构...",
"分析导入和依赖...",
"运行安全模式匹配...",
"生成发现报告..."
]
for i, stage in enumerate(stages):
await asyncio.sleep(0.5)
progress = {"type": "progress", "stage": stage, "percent": (i + 1) * 25}
yield f"data: {json.dumps(progress)}\n\n"
result = await run_review_skill(skill_id, code_text)
yield f"data: {json.dumps({'type': 'task_result', 'state': 'completed', 'result': result})}\n\n"
return StreamingResponse(event_stream(), media_type="text/event-stream")
# --- 审查技能 ---
async def run_review_skill(skill_id: str, code: str) -> str:
"""通过XiDao API Gateway使用LLM执行审查技能。"""
import openai
client = openai.AsyncOpenAI(
api_key="your-xidao-api-key",
base_url="https://global.xidao.online/v1"
)
prompts = {
"security-scan": (
"你是安全专家。分析以下代码的OWASP Top 10漏洞。"
"对每个发现,提供:严重程度(Critical/High/Medium/Low)、"
"位置、描述和修复代码。\n\n代码:\n```\n{code}\n```"
),
"performance-analysis": (
"你是性能工程师。分析这段代码的:N+1查询、"
"不必要的分配、算法复杂度问题和async/await反模式。"
"提供具体的修复代码示例。\n\n代码:\n```\n{code}\n```"
),
}
prompt = prompts.get(skill_id, prompts["security-scan"]).format(code=code)
response = await client.chat.completions.create(
model="claude-4-sonnet",
messages=[{"role": "user", "content": prompt}],
temperature=0.1,
max_tokens=4096
)
return response.choices[0].message.content
def extract_code(message: dict) -> str:
"""从A2A消息中提取代码文本。"""
parts = message.get("parts", [])
for part in parts:
if part.get("type") == "text":
return part["text"]
return ""
@app.get("/a2a/tasks/{task_id}")
async def get_task(task_id: str):
if task_id not in tasks:
raise HTTPException(404, "Task not found")
return tasks[task_id]
if __name__ == "__main__":
import uvicorn
uvicorn.run(app, host="0.0.0.0", port=8000)构建A2A客户端:编排多个Agent#
现在让我们构建一个编排器,用于发现Agent并委派工作:
# orchestrator.py
import httpx
import asyncio
from dataclasses import dataclass
@dataclass
class AgentInfo:
name: str
url: str
skills: list[dict]
capabilities: dict
class A2AOrchestrator:
"""发现和协调多个A2A Agent。"""
def __init__(self, gateway_url: str = "https://global.xidao.online"):
self.agents: dict[str, AgentInfo] = {}
self.http = httpx.AsyncClient(timeout=120.0)
self.gateway_url = gateway_url
async def discover_agent(self, base_url: str) -> AgentInfo:
"""通过获取Agent Card来发现Agent。"""
card_url = f"{base_url}/.well-known/agent.json"
response = await self.http.get(card_url)
card = response.json()
agent = AgentInfo(
name=card["name"],
url=card["url"],
skills=card.get("skills", []),
capabilities=card.get("capabilities", {}),
)
self.agents[agent.name] = agent
print(f"已发现: {agent.name} ({len(agent.skills)} 个技能)")
return agent
async def find_agent_for_task(self, task_description: str) -> tuple[str, str]:
"""使用LLM找到最适合任务的Agent和技能。"""
import openai
catalog = []
for agent in self.agents.values():
for skill in agent.skills:
catalog.append({
"agent": agent.name,
"skill_id": skill["id"],
"skill_name": skill["name"],
"description": skill["description"],
"tags": skill.get("tags", []),
})
client = openai.AsyncOpenAI(
api_key="your-xidao-api-key",
base_url=f"{self.gateway_url}/v1"
)
response = await client.chat.completions.create(
model="gpt-4o-mini", # 快速路由模型
messages=[
{"role": "system", "content": (
"你是一个路由Agent。给定任务描述和可用Agent技能目录,"
"返回最佳匹配的JSON: "
'{"agent_name": "...", "skill_id": "..."}'
)},
{"role": "user", "content": (
f"任务: {task_description}\n\n"
f"可用技能:\n{_format_catalog(catalog)}"
)}
],
response_format={"type": "json_object"},
temperature=0
)
import json
match = json.loads(response.choices[0].message.content)
return match["agent_name"], match["skill_id"]
async def send_task(
self, agent_name: str, skill_id: str, content: str, stream: bool = False
) -> dict:
"""向特定Agent发送任务。"""
agent = self.agents[agent_name]
payload = {
"jsonrpc": "2.0",
"method": "tasks/sendSubscribe" if stream else "tasks/send",
"id": f"req-{asyncio.get_event_loop().time():.0f}",
"params": {
"skillId": skill_id,
"message": {
"role": "user",
"parts": [{"type": "text", "text": content}]
}
}
}
if stream:
async with self.http.stream("POST", agent.url, json=payload) as resp:
async for line in resp.aiter_lines():
if line.startswith("data: "):
import json
event = json.loads(line[6:])
print(f" [{event.get('type', '?')}] {event.get('stage', '')[:100]}")
else:
response = await self.http.post(agent.url, json=payload)
return response.json()
async def execute_workflow(self, plan: list[dict]) -> list[dict]:
"""执行多步骤Agent工作流。"""
results = []
for step in plan:
task = step["task"]
depends_on = step.get("depends_on")
if depends_on is not None:
context = results[depends_on].get("result", {})
task = f"上一步的上下文:\n{context}\n\n任务: {task}"
agent_name, skill_id = await self.find_agent_for_task(task)
print(f"\n步骤 {len(results)}: 委派给 {agent_name} (技能: {skill_id})")
result = await self.send_task(agent_name, skill_id, task)
results.append(result)
return results
def _format_catalog(catalog: list[dict]) -> str:
lines = []
for entry in catalog:
tags = ", ".join(entry["tags"])
lines.append(
f"- [{entry['agent']}] {entry['skill_name']} ({entry['skill_id']}): "
f"{entry['description']} [标签: {tags}]"
)
return "\n".join(lines)
# 使用:多Agent工作流
async def main():
orchestrator = A2AOrchestrator()
# 从基础设施发现Agent
await asyncio.gather(
orchestrator.discover_agent("https://code-review-agent.example.com"),
orchestrator.discover_agent("https://deploy-agent.example.com"),
orchestrator.discover_agent("https://monitoring-agent.example.com"),
)
# 定义工作流:审查 -> 部署 -> 监控
workflow = [
{"task": "扫描 /src/api/routes.py 的安全漏洞"},
{"task": "将最新版本部署到预发布环境", "depends_on": 0},
{"task": "为已部署的服务设置错误率监控", "depends_on": 1},
]
results = await orchestrator.execute_workflow(workflow)
print("\n工作流完成!", len(results), "个步骤已执行。")
asyncio.run(main())A2A + MCP:完整的Agent技术栈#
2026年Agent架构的真正威力来自于两个协议的结合:
┌────────────────────────────────────────────────────────┐
│ 用户请求 │
└───────────────────────┬────────────────────────────────┘
▼
┌──────────────────┐
│ 编排器 Agent │ ← A2A 客户端
└────────┬─────────┘
┌────────────┼────────────┐
▼ ▼ ▼
┌──────────┐ ┌──────────┐ ┌──────────┐
│ 代码审查 │ │ 部署 │ │ 监控 │ ← A2A Agent
│ Agent │ │ Agent │ │ Agent │
└────┬─────┘ └────┬─────┘ └────┬─────┘
│ │ │
┌────┴─────┐ ┌────┴─────┐ ┌────┴─────┐
│ MCP │ │ MCP │ │ MCP │ ← MCP 工具
│ 服务器: │ │ 服务器: │ │ 服务器: │
│ • Git │ │ • Docker │ │ • Grafana│
│ • SAST │ │ • K8s │ │ • PagerD │
│ • Semgrep│ │ • AWS │ │ • Datadog│
└──────────┘ └──────────┘ └──────────┘核心原则是关注点的清晰分离:
- MCP 处理模型到工具的通信(Agent如何执行工作)
- A2A 处理Agent到Agent的通信(Agent如何协调配合)
API网关作为A2A基础设施#
在生产环境中运行多Agent系统时,API网关变得至关重要。XiDao API网关为A2A部署提供关键基础设施:
# xidao-a2a-gateway.yaml
a2a_gateway:
# Agent发现代理
discovery:
endpoint: https://gateway.xidao.online/agents
auto_register: true
health_check:
interval: 30s
path: /.well-known/agent.json
# 每对Agent的速率限制
rate_limits:
default:
requests_per_minute: 100
max_concurrent_tasks: 10
high_priority:
requests_per_minute: 500
max_concurrent_tasks: 50
# 认证和授权
auth:
method: bearer
token_rotation: true
scopes:
- agent:read # 可以发现Agent
- task:send # 可以发送任务
- task:receive # 可以接收任务
# 可观测性
observability:
log_all_tasks: true
trace_propagation: true
metrics:
- task_latency_p99
- agent_success_rate
- skill_invocation_countAPI网关对A2A的关键优势#
| 特性 | 收益 |
|---|---|
| 服务发现 | Agent通过网关注册并可被发现 |
| 负载均衡 | 在同一Agent的多个实例间分配任务 |
| 熔断器 | 当Agent宕机时防止级联故障 |
| 请求追踪 | 跨多个Agent节点追踪任务 |
| 成本归属 | 追踪哪些Agent消耗了最多的LLM Token |
多Agent系统生产清单#
1. 任务超时和截止时间#
async def send_task_with_deadline(
self, agent_url: str, task: dict, timeout_seconds: int = 60
):
"""发送带有硬截止时间的任务。"""
task["params"]["configuration"] = {
"blockingTimeoutSeconds": timeout_seconds,
}
try:
result = await asyncio.wait_for(
self.http.post(agent_url, json=task),
timeout=timeout_seconds + 5
)
return result.json()
except asyncio.TimeoutError:
return {"error": "Agent未在截止时间内响应", "state": "timeout"}2. 幂等任务执行#
import hashlib
def generate_deterministic_task_id(agent_name: str, content: str) -> str:
"""生成确定性任务ID用于去重。"""
payload = f"{agent_name}:{content}"
return hashlib.sha256(payload.encode()).hexdigest()[:16]3. 优雅降级#
async def execute_with_fallback(
self, primary_agent: str, fallback_agent: str,
skill_id: str, task: str
):
"""尝试主Agent,失败则切换到备用Agent。"""
try:
return await self.send_task(primary_agent, skill_id, task)
except Exception as e:
print(f"主Agent失败 ({e}),切换到备用...")
return await self.send_task(fallback_agent, skill_id, task)2026年A2A生态#
A2A生态正在快速增长:
| 框架/平台 | A2A支持 |
|---|---|
| Google Vertex AI | 原生A2A服务器和客户端 |
| LangChain / LangGraph | Agent图的A2A适配器 |
| CrewAI | 基于A2A的多Agent编排 |
| AutoGen (Microsoft) | A2A传输层 |
| Semantic Kernel | A2A Agent连接器 |
| XiDao API网关 | A2A基础设施代理 |
核心要点#
- A2A是Agent的HTTP — 它提供了不同供应商和框架之间AI Agent缺失的互操作层
- MCP + A2A是完整技术栈 — MCP用于工具,A2A用于Agent间通信
- API网关不可或缺 — 为多Agent系统提供服务发现、速率限制、追踪和认证
- 从简单开始 — 先发现一个Agent,发送一个任务,然后逐步构建编排工作流
- 生产环境至关重要 — 从第一天就实施超时、重试、幂等性和熔断器
立即开始#
准备好构建多Agent系统了吗?这是你的行动计划:
- 阅读规范: google.github.io/A2A
- 尝试SDK:
pip install a2a-sdk或npm install @a2a/sdk - 获取API密钥: 在 global.xidao.online 注册,获取支持A2A流量的统一API网关
- 构建你的第一个Agent: 从上面的代码审查Agent示例开始
- 连接Agent: 使用编排器模式来协调工作流
正在构建多Agent系统?在 global.xidao.online 与XiDao社区分享你的经验,或通过 support@xidao.online 联系我们。