Skip to main content
  1. Posts/

A2A Protocol: Building Multi-Agent Systems That Actually Work in 2026

Author
XiDao
XiDao provides stable, high-speed, and cost-effective LLM API gateway services for developers worldwide. One API Key to access OpenAI, Anthropic, Google, Meta models with smart routing and auto-retry.

The Multi-Agent Problem in 2026
#

By mid-2026, most development teams have adopted MCP (Model Context Protocol) for connecting AI models to tools. But a critical gap remains: how do AI agents talk to each other?

Consider a real-world scenario: An e-commerce platform deploys three specialized agents:

  • Inventory Agent — monitors stock levels, predicts demand
  • Pricing Agent — adjusts prices based on market conditions
  • Customer Support Agent — handles inquiries, processes returns

Each agent works brilliantly in isolation. But when the Pricing Agent needs to ask the Inventory Agent about stock availability before applying a discount, there’s no standard way for them to communicate. Teams end up building fragile, custom integrations that break at scale.

This is exactly what Google’s Agent-to-Agent (A2A) protocol solves.

What Is A2A?
#

A2A is an open protocol that enables AI agents to discover, communicate, and collaborate — regardless of which framework, vendor, or runtime they use. While MCP connects models to tools, A2A connects agents to agents.

Think of it this way:

ProtocolConnectsAnalogy
MCPModel ↔ ToolUSB-C (device to peripheral)
A2AAgent ↔ AgentHTTP (server to server)

Core Concepts
#

┌──────────────┐   A2A Protocol    ┌──────────────┐
│   Agent A    │ ◄───────────────► │   Agent B    │
│  (Client)    │   HTTP + JSON-RPC │  (Remote)    │
└──────┬───────┘                   └──────┬───────┘
       │                                  │
       ▼                                  ▼
  Agent Card                        Agent Card
  (Capability                       (Capability
   Discovery)                        Discovery)

A2A defines three key primitives:

  1. Agent Card — A JSON document published at /.well-known/agent.json that describes an agent’s capabilities, endpoints, and authentication requirements
  2. Task — A unit of work with a lifecycle (submitted → working → completed/failed)
  3. Message — Structured communication between agents, supporting text, files, and structured data

Agent Card Example
#

Every A2A-compliant agent publishes its capabilities:

{
  "name": "Inventory Intelligence Agent",
  "description": "Monitors inventory levels, predicts demand, and optimizes stock allocation",
  "url": "https://inventory-agent.example.com/a2a",
  "version": "2.0",
  "capabilities": {
    "streaming": true,
    "pushNotifications": true,
    "stateTransitionHistory": true
  },
  "authentication": {
    "schemes": ["Bearer"]
  },
  "defaultInputModes": ["text", "structured-data"],
  "defaultOutputModes": ["text", "structured-data", "chart"],
  "skills": [
    {
      "id": "demand-forecast",
      "name": "Demand Forecasting",
      "description": "Predict product demand for the next 7-90 days",
      "tags": ["inventory", "prediction", "analytics"],
      "examples": [
        "Predict demand for SKU-12345 for the next 30 days",
        "What products will need restocking next week?"
      ]
    },
    {
      "id": "stock-check",
      "name": "Stock Availability Check",
      "description": "Real-time stock levels across all warehouses",
      "tags": ["inventory", "realtime"],
      "examples": [
        "How many units of SKU-67890 are available?",
        "Check stock across all warehouses for Widget Pro"
      ]
    }
  ]
}

Building an A2A Server
#

Let’s build a production-ready A2A server in Python. We’ll create a Code Review Agent that other agents can delegate code analysis tasks to.

Project Structure
#

code-review-agent/
├── agent_card.json
├── server.py
├── skills/
│   ├── security_scan.py
│   ├── performance_analysis.py
│   └── style_check.py
└── requirements.txt

Core A2A Server Implementation
#

# server.py
import uuid
import asyncio
from datetime import datetime
from fastapi import FastAPI, Request, HTTPException
from fastapi.responses import StreamingResponse
from pydantic import BaseModel
from typing import Optional
import json

app = FastAPI(title="Code Review A2A Agent")

# Task storage (use Redis in production)
tasks: dict[str, dict] = {}

# --- Agent Card Endpoint ---

@app.get("/.well-known/agent.json")
async def agent_card():
    return {
        "name": "Code Review Agent",
        "description": "Performs security scans, performance analysis, and style checks on code",
        "url": "https://code-review-agent.example.com/a2a",
        "version": "1.0.0",
        "capabilities": {
            "streaming": True,
            "pushNotifications": True,
            "stateTransitionHistory": True
        },
        "authentication": {"schemes": ["Bearer"]},
        "defaultInputModes": ["text", "structured-data"],
        "defaultOutputModes": ["text", "structured-data"],
        "skills": [
            {
                "id": "security-scan",
                "name": "Security Vulnerability Scan",
                "description": "Detect OWASP Top 10 vulnerabilities and common security issues",
                "tags": ["security", "code-review", "owasp"],
                "examples": [
                    "Scan this Python code for SQL injection risks",
                    "Check for XSS vulnerabilities in this React component"
                ]
            },
            {
                "id": "performance-analysis",
                "name": "Performance Analysis",
                "description": "Identify N+1 queries, memory leaks, and algorithmic inefficiencies",
                "tags": ["performance", "optimization"],
                "examples": [
                    "Find N+1 query issues in this Django view",
                    "Analyze this function for time complexity problems"
                ]
            }
        ]
    }

# --- Task Management ---

class TaskRequest(BaseModel):
    id: str
    message: dict

@app.post("/a2a")
async def handle_task(request: Request):
    body = await request.json()

    # Parse A2A JSON-RPC request
    method = body.get("method")
    params = body.get("params", {})
    request_id = body.get("id")

    if method == "tasks/send":
        return await handle_send_task(params, request_id)
    elif method == "tasks/sendSubscribe":
        return await handle_streaming_task(params, request_id)
    elif method == "tasks/get":
        return await handle_get_task(params, request_id)
    elif method == "tasks/cancel":
        return await handle_cancel_task(params, request_id)
    else:
        raise HTTPException(400, f"Unknown method: {method}")


async def handle_send_task(params: dict, request_id: str) -> dict:
    """Process a code review task."""
    task_id = params.get("id", str(uuid.uuid4()))
    message = params.get("message", {})
    skill_id = params.get("skillId", "security-scan")

    # Initialize task
    tasks[task_id] = {
        "id": task_id,
        "status": {"state": "working", "timestamp": datetime.utcnow().isoformat()},
        "history": [message],
        "artifacts": []
    }

    # Process based on skill
    code_text = extract_code(message)
    result = await run_review_skill(skill_id, code_text)

    # Complete task
    tasks[task_id]["status"] = {
        "state": "completed",
        "timestamp": datetime.utcnow().isoformat()
    }
    tasks[task_id]["artifacts"] = [{
        "parts": [{"type": "text", "text": result}]
    }]

    return {
        "jsonrpc": "2.0",
        "id": request_id,
        "result": tasks[task_id]
    }


async def handle_streaming_task(params: dict, request_id: str):
    """Stream review progress via SSE."""
    task_id = params.get("id", str(uuid.uuid4()))
    message = params.get("message", {})
    skill_id = params.get("skillId", "security-scan")

    async def event_stream():
        # Task started
        yield f"data: {json.dumps({'type': 'task_status', 'state': 'working'})}\n\n"

        code_text = extract_code(message)
        stages = [
            "Parsing code structure...",
            "Analyzing imports and dependencies...",
            "Running security pattern matching...",
            "Generating findings report..."
        ]

        for i, stage in enumerate(stages):
            await asyncio.sleep(0.5)  # Simulate processing
            progress = {"type": "progress", "stage": stage, "percent": (i + 1) * 25}
            yield f"data: {json.dumps(progress)}\n\n"

        # Final result
        result = await run_review_skill(skill_id, code_text)
        yield f"data: {json.dumps({'type': 'task_result', 'state': 'completed', 'result': result})}\n\n"

    return StreamingResponse(event_stream(), media_type="text/event-stream")

# --- Review Skills ---

async def run_review_skill(skill_id: str, code: str) -> str:
    """Execute a review skill using an LLM via XiDao API Gateway."""
    import openai

    client = openai.AsyncOpenAI(
        api_key="your-xidao-api-key",
        base_url="https://global.xidao.online/v1"
    )

    prompts = {
        "security-scan": (
            "You are a security expert. Analyze the following code for OWASP Top 10 "
            "vulnerabilities. For each finding, provide: severity (Critical/High/Medium/Low), "
            "location, description, and remediation code.\n\nCode:\n```\n{code}\n```"
        ),
        "performance-analysis": (
            "You are a performance engineer. Analyze this code for: N+1 queries, "
            "unnecessary allocations, algorithmic complexity issues, and async/await "
            "anti-patterns. Provide specific fixes with code examples.\n\nCode:\n```\n{code}\n```"
        ),
    }

    prompt = prompts.get(skill_id, prompts["security-scan"]).format(code=code)

    response = await client.chat.completions.create(
        model="claude-4-sonnet",
        messages=[{"role": "user", "content": prompt}],
        temperature=0.1,
        max_tokens=4096
    )

    return response.choices[0].message.content


def extract_code(message: dict) -> str:
    """Extract code text from A2A message parts."""
    parts = message.get("parts", [])
    for part in parts:
        if part.get("type") == "text":
            return part["text"]
    return ""


# --- Utility Endpoints ---

@app.get("/a2a/tasks/{task_id}")
async def get_task(task_id: str):
    if task_id not in tasks:
        raise HTTPException(404, "Task not found")
    return tasks[task_id]

if __name__ == "__main__":
    import uvicorn
    uvicorn.run(app, host="0.0.0.0", port=8000)

Building an A2A Client: Orchestrating Multiple Agents
#

Now let’s build an orchestrator that discovers agents and delegates work:

# orchestrator.py
import httpx
import asyncio
from dataclasses import dataclass

@dataclass
class AgentInfo:
    name: str
    url: str
    skills: list[dict]
    capabilities: dict

class A2AOrchestrator:
    """Discovers and coordinates multiple A2A agents."""

    def __init__(self, gateway_url: str = "https://global.xidao.online"):
        self.agents: dict[str, AgentInfo] = {}
        self.http = httpx.AsyncClient(timeout=120.0)
        self.gateway_url = gateway_url

    async def discover_agent(self, base_url: str) -> AgentInfo:
        """Discover an agent by fetching its Agent Card."""
        card_url = f"{base_url}/.well-known/agent.json"
        response = await self.http.get(card_url)
        card = response.json()

        agent = AgentInfo(
            name=card["name"],
            url=card["url"],
            skills=card.get("skills", []),
            capabilities=card.get("capabilities", {}),
        )
        self.agents[agent.name] = agent
        print(f"Discovered: {agent.name} ({len(agent.skills)} skills)")
        return agent

    async def find_agent_for_task(self, task_description: str) -> tuple[str, str]:
        """Use an LLM to find the best agent and skill for a task."""
        import openai

        catalog = []
        for agent in self.agents.values():
            for skill in agent.skills:
                catalog.append({
                    "agent": agent.name,
                    "skill_id": skill["id"],
                    "skill_name": skill["name"],
                    "description": skill["description"],
                    "tags": skill.get("tags", []),
                })

        client = openai.AsyncOpenAI(
            api_key="your-xidao-api-key",
            base_url=f"{self.gateway_url}/v1"
        )

        response = await client.chat.completions.create(
            model="gpt-4o-mini",  # Fast model for routing
            messages=[
                {"role": "system", "content": (
                    "You are a routing agent. Given a task description and a catalog "
                    "of available agent skills, return the best match as JSON: "
                    '{"agent_name": "...", "skill_id": "..."}'
                )},
                {"role": "user", "content": (
                    f"Task: {task_description}\n\n"
                    f"Available skills:\n{_format_catalog(catalog)}"
                )}
            ],
            response_format={"type": "json_object"},
            temperature=0
        )

        import json
        match = json.loads(response.choices[0].message.content)
        return match["agent_name"], match["skill_id"]

    async def send_task(
        self, agent_name: str, skill_id: str, content: str, stream: bool = False
    ) -> dict:
        """Send a task to a specific agent."""
        agent = self.agents[agent_name]

        payload = {
            "jsonrpc": "2.0",
            "method": "tasks/sendSubscribe" if stream else "tasks/send",
            "id": f"req-{asyncio.get_event_loop().time():.0f}",
            "params": {
                "skillId": skill_id,
                "message": {
                    "role": "user",
                    "parts": [{"type": "text", "text": content}]
                }
            }
        }

        if stream:
            async with self.http.stream("POST", agent.url, json=payload) as resp:
                async for line in resp.aiter_lines():
                    if line.startswith("data: "):
                        import json
                        event = json.loads(line[6:])
                        print(f"  [{event.get('type', '?')}] {event.get('stage', event.get('result', ''))[:100]}")
        else:
            response = await self.http.post(agent.url, json=payload)
            return response.json()

    async def execute_workflow(self, plan: list[dict]) -> list[dict]:
        """Execute a multi-step agent workflow."""
        results = []

        for step in plan:
            task = step["task"]
            depends_on = step.get("depends_on")

            # Inject context from previous steps
            if depends_on is not None:
                context = results[depends_on].get("result", {})
                task = f"Context from previous step:\n{context}\n\nTask: {task}"

            agent_name, skill_id = await self.find_agent_for_task(task)
            print(f"\nStep {len(results)}: Delegating to {agent_name} (skill: {skill_id})")

            result = await self.send_task(agent_name, skill_id, task)
            results.append(result)

        return results


def _format_catalog(catalog: list[dict]) -> str:
    lines = []
    for entry in catalog:
        tags = ", ".join(entry["tags"])
        lines.append(
            f"- [{entry['agent']}] {entry['skill_name']} ({entry['skill_id']}): "
            f"{entry['description']} [tags: {tags}]"
        )
    return "\n".join(lines)


# Usage: Multi-Agent Workflow
async def main():
    orchestrator = A2AOrchestrator()

    # Discover agents from your infrastructure
    await asyncio.gather(
        orchestrator.discover_agent("https://code-review-agent.example.com"),
        orchestrator.discover_agent("https://deploy-agent.example.com"),
        orchestrator.discover_agent("https://monitoring-agent.example.com"),
    )

    # Define a workflow: review -> deploy -> monitor
    workflow = [
        {"task": "Scan /src/api/routes.py for security vulnerabilities"},
        {"task": "Deploy the latest version to staging environment", "depends_on": 0},
        {"task": "Set up error rate monitoring for the deployed service", "depends_on": 1},
    ]

    results = await orchestrator.execute_workflow(workflow)
    print("\nWorkflow completed!", len(results), "steps executed.")

asyncio.run(main())

A2A + MCP: The Complete Agent Stack
#

The real power of 2026 agent architecture comes from combining both protocols:

┌────────────────────────────────────────────────────────┐
│                   User Request                         │
└───────────────────────┬────────────────────────────────┘
              ┌──────────────────┐
              │   Orchestrator   │  ← A2A Client
              │     Agent        │
              └────────┬─────────┘
          ┌────────────┼────────────┐
          ▼            ▼            ▼
    ┌──────────┐ ┌──────────┐ ┌──────────┐
    │  Code    │ │  Deploy  │ │ Monitor  │  ← A2A Agents
    │  Review  │ │  Agent   │ │  Agent   │
    │  Agent   │ │          │ │          │
    └────┬─────┘ └────┬─────┘ └────┬─────┘
         │            │            │
    ┌────┴─────┐ ┌────┴─────┐ ┌────┴─────┐
    │ MCP      │ │ MCP      │ │ MCP      │  ← MCP Tools
    │ Servers: │ │ Servers: │ │ Servers: │
    │ • Git    │ │ • Docker │ │ • Grafana│
    │ • SAST   │ │ • K8s    │ │ • PagerD │
    │ • Semgrep│ │ • AWS    │ │ • Datadog│
    └──────────┘ └──────────┘ └──────────┘

The principle is clean separation of concerns:

  • MCP handles model-to-tool communication (how agents do their work)
  • A2A handles agent-to-agent communication (how agents coordinate)

API Gateway as A2A Infrastructure
#

When running multi-agent systems in production, an API gateway becomes essential. XiDao API Gateway provides critical infrastructure for A2A deployments:

# xidao-a2a-gateway.yaml
a2a_gateway:
  # Agent discovery proxy
  discovery:
    endpoint: https://gateway.xidao.online/agents
    auto_register: true
    health_check:
      interval: 30s
      path: /.well-known/agent.json

  # Rate limiting per agent pair
  rate_limits:
    default:
      requests_per_minute: 100
      max_concurrent_tasks: 10
    high_priority:
      requests_per_minute: 500
      max_concurrent_tasks: 50

  # Authentication and authorization
  auth:
    method: bearer
    token_rotation: true
    scopes:
      - agent:read    # Can discover agents
      - task:send     # Can send tasks
      - task:receive  # Can receive tasks

  # Observability
  observability:
    log_all_tasks: true
    trace_propagation: true
    metrics:
      - task_latency_p99
      - agent_success_rate
      - skill_invocation_count

Key Gateway Benefits for A2A
#

FeatureBenefit
Service DiscoveryAgents register and are discoverable via the gateway
Load BalancingDistribute tasks across multiple instances of the same agent
Circuit BreakingPrevent cascade failures when an agent goes down
Request TracingFollow a task across multiple agent hops
Cost AttributionTrack which agents consume the most LLM tokens

Production Checklist for Multi-Agent Systems
#

1. Task Timeouts and Deadlines
#

async def send_task_with_deadline(
    self, agent_url: str, task: dict, timeout_seconds: int = 60
):
    """Send a task with a hard deadline."""
    task["params"]["configuration"] = {
        "blockingTimeoutSeconds": timeout_seconds,
    }

    try:
        result = await asyncio.wait_for(
            self.http.post(agent_url, json=task),
            timeout=timeout_seconds + 5
        )
        return result.json()
    except asyncio.TimeoutError:
        return {"error": "Agent did not respond within deadline",
                "state": "timeout"}

2. Idempotent Task Execution
#

import hashlib

def generate_deterministic_task_id(agent_name: str, content: str) -> str:
    """Generate a deterministic task ID for deduplication."""
    payload = f"{agent_name}:{content}"
    return hashlib.sha256(payload.encode()).hexdigest()[:16]

3. Graceful Degradation
#

async def execute_with_fallback(
    self, primary_agent: str, fallback_agent: str,
    skill_id: str, task: str
):
    """Try primary agent, fall back to secondary."""
    try:
        return await self.send_task(primary_agent, skill_id, task)
    except Exception as e:
        print(f"Primary agent failed ({e}), switching to fallback...")
        return await self.send_task(fallback_agent, skill_id, task)

The 2026 A2A Ecosystem
#

The A2A ecosystem is growing rapidly:

Framework/PlatformA2A Support
Google Vertex AINative A2A server and client
LangChain / LangGraphA2A adapter for agent graphs
CrewAIA2A-based multi-agent orchestration
AutoGen (Microsoft)A2A transport layer
Semantic KernelA2A agent connectors
XiDao API GatewayA2A infrastructure proxy

Key Takeaways
#

  1. A2A is the HTTP of agents — it provides the missing interoperability layer between AI agents from different vendors and frameworks
  2. MCP + A2A is the full stack — MCP for tools, A2A for agent-to-agent communication
  3. API gateways are essential — service discovery, rate limiting, tracing, and auth for multi-agent systems
  4. Start simple — discover one agent, send one task, then build up to orchestrated workflows
  5. Production matters — implement timeouts, retries, idempotency, and circuit breakers from day one

Get Started
#

Ready to build multi-agent systems? Here’s your action plan:

  1. Read the spec: google.github.io/A2A
  2. Try the SDK: pip install a2a-sdk or npm install @a2a/sdk
  3. Get an API key: Register at global.xidao.online for a unified API gateway that supports A2A traffic
  4. Build your first agent: Start with the Code Review Agent example above
  5. Connect agents: Use the Orchestrator pattern to coordinate workflows

Building multi-agent systems? Share your experience with the XiDao community at global.xidao.online or reach out at support@xidao.online.

Related

Building Production AI Agents with MCP: A 2026 Developer's Complete Guide

The Rise of AI Agents in 2026 # 2026 has marked a turning point for AI agents. What was experimental in 2024-2025 is now production infrastructure at thousands of companies. The catalyst? Model Context Protocol (MCP) — Anthropic’s open standard that gives LLMs a universal interface to interact with external tools, data sources, and services. If you’re a developer building AI-powered workflows in 2026, MCP is no longer optional — it’s the backbone of the agentic ecosystem.

Complete Guide to Claude 4.7 API Integration in 2026: From Zero to Production

Introduction # In 2026, Anthropic released Claude 4.7 — a landmark model that pushes the boundaries of reasoning, code generation, multimodal understanding, and long-context processing. For developers, knowing how to efficiently and reliably integrate the Claude 4.7 API into production systems is now an essential skill. This guide walks you through everything: from your first API call to production-grade deployment, covering the latest API changes, pricing structure, and battle-tested best practices.

The Complete Guide to LLM API Gateways in 2026

·53 words·1 min
Why Do You Need an API Gateway? # In 2026, LLM API calls have become a daily necessity. XiDao API Gateway provides unified interface, smart routing, cost optimization, and high availability. import openai client = openai.OpenAI( api_key="your-xidao-api-key", base_url="https://global.xidao.online/v1" ) response = client.chat.completions.create( model="gpt-4o", messages=[{"role": "user", "content": "Hello!"}] ) 👉 Try it now: global.xidao.online

2026 AI Coding Assistants Deep Review & Integration Tutorial: Cursor, Copilot, Windsurf, Claude Code Compared

Introduction: In 2026, AI Coding Assistants Have Fundamentally Transformed Software Development # In 2026, AI coding assistants have evolved from “helpful add-ons” into core productivity engines for developers worldwide. According to the Stack Overflow 2026 Developer Survey, 92% of developers now use at least one AI coding tool in their daily workflow—a dramatic leap from 65% in 2024. This year has witnessed several landmark milestones: Claude 4.7 launched with a 2-million-token context window, achieving unprecedented code comprehension GPT-5.5 Turbo integrated into GitHub Copilot, boosting code generation accuracy by 40% Cursor 2.0 introduced “Agent Mode”—autonomous multi-file refactoring from natural language descriptions Windsurf 3.0 debuted real-time collaborative AI, where team members and AI co-edit the same file simultaneously This article provides an in-depth review of the major AI coding assistants of 2026, comparing them across features, pricing, IDE support, and underlying model quality, followed by a complete tutorial for building your own custom coding assistant using the XiDao API.