The Multi-Agent Problem in 2026#
By mid-2026, most development teams have adopted MCP (Model Context Protocol) for connecting AI models to tools. But a critical gap remains: how do AI agents talk to each other?
Consider a real-world scenario: An e-commerce platform deploys three specialized agents:
- Inventory Agent — monitors stock levels, predicts demand
- Pricing Agent — adjusts prices based on market conditions
- Customer Support Agent — handles inquiries, processes returns
Each agent works brilliantly in isolation. But when the Pricing Agent needs to ask the Inventory Agent about stock availability before applying a discount, there’s no standard way for them to communicate. Teams end up building fragile, custom integrations that break at scale.
This is exactly what Google’s Agent-to-Agent (A2A) protocol solves.
What Is A2A?#
A2A is an open protocol that enables AI agents to discover, communicate, and collaborate — regardless of which framework, vendor, or runtime they use. While MCP connects models to tools, A2A connects agents to agents.
Think of it this way:
| Protocol | Connects | Analogy |
|---|---|---|
| MCP | Model ↔ Tool | USB-C (device to peripheral) |
| A2A | Agent ↔ Agent | HTTP (server to server) |
Core Concepts#
┌──────────────┐ A2A Protocol ┌──────────────┐
│ Agent A │ ◄───────────────► │ Agent B │
│ (Client) │ HTTP + JSON-RPC │ (Remote) │
└──────┬───────┘ └──────┬───────┘
│ │
▼ ▼
Agent Card Agent Card
(Capability (Capability
Discovery) Discovery)A2A defines three key primitives:
- Agent Card — A JSON document published at
/.well-known/agent.jsonthat describes an agent’s capabilities, endpoints, and authentication requirements - Task — A unit of work with a lifecycle (submitted → working → completed/failed)
- Message — Structured communication between agents, supporting text, files, and structured data
Agent Card Example#
Every A2A-compliant agent publishes its capabilities:
{
"name": "Inventory Intelligence Agent",
"description": "Monitors inventory levels, predicts demand, and optimizes stock allocation",
"url": "https://inventory-agent.example.com/a2a",
"version": "2.0",
"capabilities": {
"streaming": true,
"pushNotifications": true,
"stateTransitionHistory": true
},
"authentication": {
"schemes": ["Bearer"]
},
"defaultInputModes": ["text", "structured-data"],
"defaultOutputModes": ["text", "structured-data", "chart"],
"skills": [
{
"id": "demand-forecast",
"name": "Demand Forecasting",
"description": "Predict product demand for the next 7-90 days",
"tags": ["inventory", "prediction", "analytics"],
"examples": [
"Predict demand for SKU-12345 for the next 30 days",
"What products will need restocking next week?"
]
},
{
"id": "stock-check",
"name": "Stock Availability Check",
"description": "Real-time stock levels across all warehouses",
"tags": ["inventory", "realtime"],
"examples": [
"How many units of SKU-67890 are available?",
"Check stock across all warehouses for Widget Pro"
]
}
]
}Building an A2A Server#
Let’s build a production-ready A2A server in Python. We’ll create a Code Review Agent that other agents can delegate code analysis tasks to.
Project Structure#
code-review-agent/
├── agent_card.json
├── server.py
├── skills/
│ ├── security_scan.py
│ ├── performance_analysis.py
│ └── style_check.py
└── requirements.txtCore A2A Server Implementation#
# server.py
import uuid
import asyncio
from datetime import datetime
from fastapi import FastAPI, Request, HTTPException
from fastapi.responses import StreamingResponse
from pydantic import BaseModel
from typing import Optional
import json
app = FastAPI(title="Code Review A2A Agent")
# Task storage (use Redis in production)
tasks: dict[str, dict] = {}
# --- Agent Card Endpoint ---
@app.get("/.well-known/agent.json")
async def agent_card():
return {
"name": "Code Review Agent",
"description": "Performs security scans, performance analysis, and style checks on code",
"url": "https://code-review-agent.example.com/a2a",
"version": "1.0.0",
"capabilities": {
"streaming": True,
"pushNotifications": True,
"stateTransitionHistory": True
},
"authentication": {"schemes": ["Bearer"]},
"defaultInputModes": ["text", "structured-data"],
"defaultOutputModes": ["text", "structured-data"],
"skills": [
{
"id": "security-scan",
"name": "Security Vulnerability Scan",
"description": "Detect OWASP Top 10 vulnerabilities and common security issues",
"tags": ["security", "code-review", "owasp"],
"examples": [
"Scan this Python code for SQL injection risks",
"Check for XSS vulnerabilities in this React component"
]
},
{
"id": "performance-analysis",
"name": "Performance Analysis",
"description": "Identify N+1 queries, memory leaks, and algorithmic inefficiencies",
"tags": ["performance", "optimization"],
"examples": [
"Find N+1 query issues in this Django view",
"Analyze this function for time complexity problems"
]
}
]
}
# --- Task Management ---
class TaskRequest(BaseModel):
id: str
message: dict
@app.post("/a2a")
async def handle_task(request: Request):
body = await request.json()
# Parse A2A JSON-RPC request
method = body.get("method")
params = body.get("params", {})
request_id = body.get("id")
if method == "tasks/send":
return await handle_send_task(params, request_id)
elif method == "tasks/sendSubscribe":
return await handle_streaming_task(params, request_id)
elif method == "tasks/get":
return await handle_get_task(params, request_id)
elif method == "tasks/cancel":
return await handle_cancel_task(params, request_id)
else:
raise HTTPException(400, f"Unknown method: {method}")
async def handle_send_task(params: dict, request_id: str) -> dict:
"""Process a code review task."""
task_id = params.get("id", str(uuid.uuid4()))
message = params.get("message", {})
skill_id = params.get("skillId", "security-scan")
# Initialize task
tasks[task_id] = {
"id": task_id,
"status": {"state": "working", "timestamp": datetime.utcnow().isoformat()},
"history": [message],
"artifacts": []
}
# Process based on skill
code_text = extract_code(message)
result = await run_review_skill(skill_id, code_text)
# Complete task
tasks[task_id]["status"] = {
"state": "completed",
"timestamp": datetime.utcnow().isoformat()
}
tasks[task_id]["artifacts"] = [{
"parts": [{"type": "text", "text": result}]
}]
return {
"jsonrpc": "2.0",
"id": request_id,
"result": tasks[task_id]
}
async def handle_streaming_task(params: dict, request_id: str):
"""Stream review progress via SSE."""
task_id = params.get("id", str(uuid.uuid4()))
message = params.get("message", {})
skill_id = params.get("skillId", "security-scan")
async def event_stream():
# Task started
yield f"data: {json.dumps({'type': 'task_status', 'state': 'working'})}\n\n"
code_text = extract_code(message)
stages = [
"Parsing code structure...",
"Analyzing imports and dependencies...",
"Running security pattern matching...",
"Generating findings report..."
]
for i, stage in enumerate(stages):
await asyncio.sleep(0.5) # Simulate processing
progress = {"type": "progress", "stage": stage, "percent": (i + 1) * 25}
yield f"data: {json.dumps(progress)}\n\n"
# Final result
result = await run_review_skill(skill_id, code_text)
yield f"data: {json.dumps({'type': 'task_result', 'state': 'completed', 'result': result})}\n\n"
return StreamingResponse(event_stream(), media_type="text/event-stream")
# --- Review Skills ---
async def run_review_skill(skill_id: str, code: str) -> str:
"""Execute a review skill using an LLM via XiDao API Gateway."""
import openai
client = openai.AsyncOpenAI(
api_key="your-xidao-api-key",
base_url="https://global.xidao.online/v1"
)
prompts = {
"security-scan": (
"You are a security expert. Analyze the following code for OWASP Top 10 "
"vulnerabilities. For each finding, provide: severity (Critical/High/Medium/Low), "
"location, description, and remediation code.\n\nCode:\n```\n{code}\n```"
),
"performance-analysis": (
"You are a performance engineer. Analyze this code for: N+1 queries, "
"unnecessary allocations, algorithmic complexity issues, and async/await "
"anti-patterns. Provide specific fixes with code examples.\n\nCode:\n```\n{code}\n```"
),
}
prompt = prompts.get(skill_id, prompts["security-scan"]).format(code=code)
response = await client.chat.completions.create(
model="claude-4-sonnet",
messages=[{"role": "user", "content": prompt}],
temperature=0.1,
max_tokens=4096
)
return response.choices[0].message.content
def extract_code(message: dict) -> str:
"""Extract code text from A2A message parts."""
parts = message.get("parts", [])
for part in parts:
if part.get("type") == "text":
return part["text"]
return ""
# --- Utility Endpoints ---
@app.get("/a2a/tasks/{task_id}")
async def get_task(task_id: str):
if task_id not in tasks:
raise HTTPException(404, "Task not found")
return tasks[task_id]
if __name__ == "__main__":
import uvicorn
uvicorn.run(app, host="0.0.0.0", port=8000)Building an A2A Client: Orchestrating Multiple Agents#
Now let’s build an orchestrator that discovers agents and delegates work:
# orchestrator.py
import httpx
import asyncio
from dataclasses import dataclass
@dataclass
class AgentInfo:
name: str
url: str
skills: list[dict]
capabilities: dict
class A2AOrchestrator:
"""Discovers and coordinates multiple A2A agents."""
def __init__(self, gateway_url: str = "https://global.xidao.online"):
self.agents: dict[str, AgentInfo] = {}
self.http = httpx.AsyncClient(timeout=120.0)
self.gateway_url = gateway_url
async def discover_agent(self, base_url: str) -> AgentInfo:
"""Discover an agent by fetching its Agent Card."""
card_url = f"{base_url}/.well-known/agent.json"
response = await self.http.get(card_url)
card = response.json()
agent = AgentInfo(
name=card["name"],
url=card["url"],
skills=card.get("skills", []),
capabilities=card.get("capabilities", {}),
)
self.agents[agent.name] = agent
print(f"Discovered: {agent.name} ({len(agent.skills)} skills)")
return agent
async def find_agent_for_task(self, task_description: str) -> tuple[str, str]:
"""Use an LLM to find the best agent and skill for a task."""
import openai
catalog = []
for agent in self.agents.values():
for skill in agent.skills:
catalog.append({
"agent": agent.name,
"skill_id": skill["id"],
"skill_name": skill["name"],
"description": skill["description"],
"tags": skill.get("tags", []),
})
client = openai.AsyncOpenAI(
api_key="your-xidao-api-key",
base_url=f"{self.gateway_url}/v1"
)
response = await client.chat.completions.create(
model="gpt-4o-mini", # Fast model for routing
messages=[
{"role": "system", "content": (
"You are a routing agent. Given a task description and a catalog "
"of available agent skills, return the best match as JSON: "
'{"agent_name": "...", "skill_id": "..."}'
)},
{"role": "user", "content": (
f"Task: {task_description}\n\n"
f"Available skills:\n{_format_catalog(catalog)}"
)}
],
response_format={"type": "json_object"},
temperature=0
)
import json
match = json.loads(response.choices[0].message.content)
return match["agent_name"], match["skill_id"]
async def send_task(
self, agent_name: str, skill_id: str, content: str, stream: bool = False
) -> dict:
"""Send a task to a specific agent."""
agent = self.agents[agent_name]
payload = {
"jsonrpc": "2.0",
"method": "tasks/sendSubscribe" if stream else "tasks/send",
"id": f"req-{asyncio.get_event_loop().time():.0f}",
"params": {
"skillId": skill_id,
"message": {
"role": "user",
"parts": [{"type": "text", "text": content}]
}
}
}
if stream:
async with self.http.stream("POST", agent.url, json=payload) as resp:
async for line in resp.aiter_lines():
if line.startswith("data: "):
import json
event = json.loads(line[6:])
print(f" [{event.get('type', '?')}] {event.get('stage', event.get('result', ''))[:100]}")
else:
response = await self.http.post(agent.url, json=payload)
return response.json()
async def execute_workflow(self, plan: list[dict]) -> list[dict]:
"""Execute a multi-step agent workflow."""
results = []
for step in plan:
task = step["task"]
depends_on = step.get("depends_on")
# Inject context from previous steps
if depends_on is not None:
context = results[depends_on].get("result", {})
task = f"Context from previous step:\n{context}\n\nTask: {task}"
agent_name, skill_id = await self.find_agent_for_task(task)
print(f"\nStep {len(results)}: Delegating to {agent_name} (skill: {skill_id})")
result = await self.send_task(agent_name, skill_id, task)
results.append(result)
return results
def _format_catalog(catalog: list[dict]) -> str:
lines = []
for entry in catalog:
tags = ", ".join(entry["tags"])
lines.append(
f"- [{entry['agent']}] {entry['skill_name']} ({entry['skill_id']}): "
f"{entry['description']} [tags: {tags}]"
)
return "\n".join(lines)
# Usage: Multi-Agent Workflow
async def main():
orchestrator = A2AOrchestrator()
# Discover agents from your infrastructure
await asyncio.gather(
orchestrator.discover_agent("https://code-review-agent.example.com"),
orchestrator.discover_agent("https://deploy-agent.example.com"),
orchestrator.discover_agent("https://monitoring-agent.example.com"),
)
# Define a workflow: review -> deploy -> monitor
workflow = [
{"task": "Scan /src/api/routes.py for security vulnerabilities"},
{"task": "Deploy the latest version to staging environment", "depends_on": 0},
{"task": "Set up error rate monitoring for the deployed service", "depends_on": 1},
]
results = await orchestrator.execute_workflow(workflow)
print("\nWorkflow completed!", len(results), "steps executed.")
asyncio.run(main())A2A + MCP: The Complete Agent Stack#
The real power of 2026 agent architecture comes from combining both protocols:
┌────────────────────────────────────────────────────────┐
│ User Request │
└───────────────────────┬────────────────────────────────┘
▼
┌──────────────────┐
│ Orchestrator │ ← A2A Client
│ Agent │
└────────┬─────────┘
┌────────────┼────────────┐
▼ ▼ ▼
┌──────────┐ ┌──────────┐ ┌──────────┐
│ Code │ │ Deploy │ │ Monitor │ ← A2A Agents
│ Review │ │ Agent │ │ Agent │
│ Agent │ │ │ │ │
└────┬─────┘ └────┬─────┘ └────┬─────┘
│ │ │
┌────┴─────┐ ┌────┴─────┐ ┌────┴─────┐
│ MCP │ │ MCP │ │ MCP │ ← MCP Tools
│ Servers: │ │ Servers: │ │ Servers: │
│ • Git │ │ • Docker │ │ • Grafana│
│ • SAST │ │ • K8s │ │ • PagerD │
│ • Semgrep│ │ • AWS │ │ • Datadog│
└──────────┘ └──────────┘ └──────────┘The principle is clean separation of concerns:
- MCP handles model-to-tool communication (how agents do their work)
- A2A handles agent-to-agent communication (how agents coordinate)
API Gateway as A2A Infrastructure#
When running multi-agent systems in production, an API gateway becomes essential. XiDao API Gateway provides critical infrastructure for A2A deployments:
# xidao-a2a-gateway.yaml
a2a_gateway:
# Agent discovery proxy
discovery:
endpoint: https://gateway.xidao.online/agents
auto_register: true
health_check:
interval: 30s
path: /.well-known/agent.json
# Rate limiting per agent pair
rate_limits:
default:
requests_per_minute: 100
max_concurrent_tasks: 10
high_priority:
requests_per_minute: 500
max_concurrent_tasks: 50
# Authentication and authorization
auth:
method: bearer
token_rotation: true
scopes:
- agent:read # Can discover agents
- task:send # Can send tasks
- task:receive # Can receive tasks
# Observability
observability:
log_all_tasks: true
trace_propagation: true
metrics:
- task_latency_p99
- agent_success_rate
- skill_invocation_countKey Gateway Benefits for A2A#
| Feature | Benefit |
|---|---|
| Service Discovery | Agents register and are discoverable via the gateway |
| Load Balancing | Distribute tasks across multiple instances of the same agent |
| Circuit Breaking | Prevent cascade failures when an agent goes down |
| Request Tracing | Follow a task across multiple agent hops |
| Cost Attribution | Track which agents consume the most LLM tokens |
Production Checklist for Multi-Agent Systems#
1. Task Timeouts and Deadlines#
async def send_task_with_deadline(
self, agent_url: str, task: dict, timeout_seconds: int = 60
):
"""Send a task with a hard deadline."""
task["params"]["configuration"] = {
"blockingTimeoutSeconds": timeout_seconds,
}
try:
result = await asyncio.wait_for(
self.http.post(agent_url, json=task),
timeout=timeout_seconds + 5
)
return result.json()
except asyncio.TimeoutError:
return {"error": "Agent did not respond within deadline",
"state": "timeout"}2. Idempotent Task Execution#
import hashlib
def generate_deterministic_task_id(agent_name: str, content: str) -> str:
"""Generate a deterministic task ID for deduplication."""
payload = f"{agent_name}:{content}"
return hashlib.sha256(payload.encode()).hexdigest()[:16]3. Graceful Degradation#
async def execute_with_fallback(
self, primary_agent: str, fallback_agent: str,
skill_id: str, task: str
):
"""Try primary agent, fall back to secondary."""
try:
return await self.send_task(primary_agent, skill_id, task)
except Exception as e:
print(f"Primary agent failed ({e}), switching to fallback...")
return await self.send_task(fallback_agent, skill_id, task)The 2026 A2A Ecosystem#
The A2A ecosystem is growing rapidly:
| Framework/Platform | A2A Support |
|---|---|
| Google Vertex AI | Native A2A server and client |
| LangChain / LangGraph | A2A adapter for agent graphs |
| CrewAI | A2A-based multi-agent orchestration |
| AutoGen (Microsoft) | A2A transport layer |
| Semantic Kernel | A2A agent connectors |
| XiDao API Gateway | A2A infrastructure proxy |
Key Takeaways#
- A2A is the HTTP of agents — it provides the missing interoperability layer between AI agents from different vendors and frameworks
- MCP + A2A is the full stack — MCP for tools, A2A for agent-to-agent communication
- API gateways are essential — service discovery, rate limiting, tracing, and auth for multi-agent systems
- Start simple — discover one agent, send one task, then build up to orchestrated workflows
- Production matters — implement timeouts, retries, idempotency, and circuit breakers from day one
Get Started#
Ready to build multi-agent systems? Here’s your action plan:
- Read the spec: google.github.io/A2A
- Try the SDK:
pip install a2a-sdkornpm install @a2a/sdk - Get an API key: Register at global.xidao.online for a unified API gateway that supports A2A traffic
- Build your first agent: Start with the Code Review Agent example above
- Connect agents: Use the Orchestrator pattern to coordinate workflows
Building multi-agent systems? Share your experience with the XiDao community at global.xidao.online or reach out at support@xidao.online.