Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
269 changes: 269 additions & 0 deletions docs/design-document/04-a2a-protocol/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,269 @@
---
title: EventMesh A2A Protocol
sidebar_position: 4
---

# EventMesh A2A Protocol Architecture & Functional Specification

## 1. Overview

The **EventMesh A2A (Agent-to-Agent) Protocol** is a specialized, high-performance protocol plugin designed to enable asynchronous communication, collaboration, and task coordination between autonomous agents.

With the release of v2.0, A2A adopts the **MCP (Model Context Protocol)** architecture, transforming EventMesh into a robust **Agent Collaboration Bus**. It bridges the gap between synchronous LLM-based tool calls (JSON-RPC 2.0) and asynchronous Event-Driven Architectures (EDA), enabling scalable, distributed, and decoupled agent systems.

## 2. Core Philosophy

The architecture adheres to the principles outlined in the broader agent community (e.g., A2A Project, FIPA-ACL, and CloudEvents):

1. **JSON-RPC 2.0 as Lingua Franca**: Uses standard JSON-RPC for payload semantics, ensuring compatibility with modern LLM ecosystems (LangChain, AutoGen).
2. **Transport Agnostic**: Encapsulates all messages within **CloudEvents**, allowing transport over any EventMesh-supported protocol (HTTP, TCP, gRPC, Kafka).
3. **Async by Default**: Maps synchronous Request/Response patterns to asynchronous Event streams using correlation IDs.
4. **Native Pub/Sub Semantics**: Supports O(1) broadcast complexity, temporal decoupling (Late Join), and backpressure isolation, solving the scalability limits of traditional P2P webhook callbacks.

### 2.1 Native Pub/Sub Semantics

Traditional A2A implementations often rely on HTTP Webhooks (`POST /inbox`) for asynchronous callbacks. While functional, this **Point-to-Point (P2P)** model suffers from significant scaling issues:

* **Insufficient Fan-Out**: A publisher must send $N$ requests to reach $N$ subscribers, leading to $O(N)$ complexity.
* **Temporal Coupling**: Consumers must be online at the exact moment of publication.
* **Backpressure Propagation**: A slow subscriber can block the publisher.

**EventMesh A2A** solves this by introducing **Native Pub/Sub** capabilities:

```mermaid
flowchart LR
Publisher["Publisher Agent"] -->|1. Publish (Once)| Bus["EventMesh Bus"]

subgraph FanoutLayer ["EventMesh Fanout Layer"]
Queue["Topic Queue"]
end

Bus --> Queue

Queue -->|"Push"| Sub1["Subscriber 1"]
Queue -->|"Push"| Sub2["Subscriber 2"]
Queue -->|"Push"| Sub3["Subscriber 3"]

style Bus fill:#f9f,stroke:#333
style FanoutLayer fill:#ccf,stroke:#333
```

### 2.1 Hybrid Protocol Support (JSON-RPC & CloudEvents)

A2A Protocol introduces a unique **Hybrid Architecture** that bridges the gap between the AI ecosystem (which prefers simple JSON) and the Cloud Native ecosystem (which prefers structured CloudEvents).

| Feature | JSON-RPC 2.0 Mode | Native CloudEvents Mode |
| :--- | :--- | :--- |
| **Primary Audience** | LLMs, Scripts (Python/JS), LangChain | EventMesh Apps, Knative, Java SDK |
| **Philosophy** | **"Battery Included"** | **"Power User"** |
| **Usage** | Send raw JSON (`{"method":...}`) | Send `CloudEvent` object |
| **Complexity** | Low (No SDK required) | Medium (Requires CE SDK) |
| **Mechanism** | Adaptor automatically wraps JSON in CE | Adaptor passes through the event |

**Benefits:**
* **Zero-Barrier Entry**: Developers can interact with the mesh using just `curl` or simple JSON libraries.
* **Full Flexibility**: Advanced users retain full control over CloudEvent attributes (Source, Type, Extensions) for complex routing or tracing scenarios.

## 3. Architecture Design

### 3.1 System Context

```mermaid
graph TD
Client["Client Agent / LLM"] -- "JSON-RPC Request" --> EM["EventMesh Runtime"]
EM -- "CloudEvent (Request)" --> Server["Server Agent / Tool"]
Server -- "CloudEvent (Response)" --> EM
EM -- "JSON-RPC Response" --> Client

subgraph Runtime ["EventMesh Runtime"]
Plugin["A2A Protocol Plugin"]
end

style EM fill:#f9f,stroke:#333,stroke-width:4px
style Plugin fill:#ccf,stroke:#333,stroke-width:2px
```

### 3.2 Component Design (`eventmesh-protocol-a2a`)

The core logic resides in the `eventmesh-protocol-plugin` module.

* **`EnhancedA2AProtocolAdaptor`**: The central brain of the protocol.
* **Intelligent Parsing**: Automatically detects message format (MCP vs. Raw CloudEvent).
* **Protocol Delegation**: Delegates to `CloudEvents` or `HTTP` adaptors when necessary.
* **Semantic Mapping**: Transforms JSON-RPC methods and IDs into CloudEvent attributes.
* **`A2AProtocolConstants`**: Defines standard operations like `task/get`, `message/sendStream`.
* **`JsonRpc*` Models**: Strictly typed POJOs for JSON-RPC 2.0 compliance.

### 3.3 Asynchronous RPC Mapping ( The "Async Bridge" )

To support MCP on an Event Bus, synchronous RPC concepts are mapped to asynchronous events:

| Concept | MCP / JSON-RPC | CloudEvent Mapping |
| :--- | :--- | :--- |
| **Action** | `method` (e.g., `tools/call`) | **Type**: `org.apache.eventmesh.a2a.tools.call.req`<br>**Extension**: `a2amethod` |
| **Correlation** | `id` (e.g., `req-123`) | **Extension**: `collaborationid` (on Response)<br>**ID**: Preserved on Request |
| **Direction** | Implicit (Request vs Result) | **Extension**: `mcptype` (`request` or `response`) |
| **P2P Routing** | `params._agentId` | **Extension**: `targetagent` |
| **Pub/Sub Topic** | `params._topic` | **Subject**: The topic value (e.g. `market.btc`) |
| **Streaming Seq** | `params._seq` | **Extension**: `seq` |

## 4. Functional Specification

### 4.1 Message Processing Flow

1. **Ingestion**: The adaptor receives a `ProtocolTransportObject` (byte array/string).
2. **Detection**: Checks for `jsonrpc: "2.0"`.
3. **Transformation (MCP Mode)**:
* **Request**: Parses `method`.
* If `message/sendStream`, sets type suffix to `.stream` and extracts `_seq`.
* If `_topic` present, sets `subject` (Pub/Sub).
* If `_agentId` present, sets `targetagent` (P2P).
* **Response**: Parses `result`/`error`. Sets `collaborationid` = `id`.
4. **Batch Processing**: Splits JSON Array into a `List<CloudEvent>`.

### 4.2 Key Features

#### A. Intelligent Routing Support
* **Mechanism**: Promotes `_agentId` or `_topic` from JSON body to CloudEvent attributes.
* **Benefit**: Enables EventMesh Router to perform content-based routing (CBR) efficiently.

#### B. Batching
* **Benefit**: Significantly increases throughput for high-frequency interactions.

#### C. Streaming Support
* **Operation**: `message/sendStream`
* **Mechanism**: Maps to `.stream` event type and preserves sequence order via `seq` extension attribute.

## 5. Usage Examples

### 5.1 JSON-RPC 2.0 (MCP) Mode

This mode is ideal for LLMs, scripts, and simple integrations where you want to send raw JSON without worrying about CloudEvent headers.

#### 5.1.1 Sending a Tool Call (RPC Request)

**Client Sends (Raw JSON):**
```json
{
"jsonrpc": "2.0",
"method": "tools/call",
"params": {
"name": "weather",
"city": "Shanghai",
"_agentId": "weather-agent"
},
"id": "req-101"
}
```

**EventMesh Converts to:**
* **Type**: `org.apache.eventmesh.a2a.tools.call.req`
* **Extension (targetagent)**: `weather-agent`
* **Extension (mcptype)**: `request`

#### 5.1.2 Pub/Sub Broadcast (Notification)

**Client Sends (Raw JSON):**
```json
{
"jsonrpc": "2.0",
"method": "notifications/alert",
"params": {
"message": "System Maintenance in 10 mins",
"_topic": "system.alerts"
}
}
```

**EventMesh Converts to:**
* **Type**: `org.apache.eventmesh.a2a.notifications.alert`
* **Subject**: `system.alerts`
* **Extension (mcptype)**: `notification`

#### 5.1.3 Java SDK Example (MCP Mode)

```java
// See eventmesh-examples/src/main/java/org/apache/eventmesh/a2a/demo/mcp/McpCaller.java

Map<String, Object> request = new HashMap<>();
request.put("jsonrpc", "2.0");
request.put("method", "tools/call");
request.put("params", Map.of("name", "weather", "_agentId", "weather-agent"));
request.put("id", UUID.randomUUID().toString());

CloudEvent event = CloudEventBuilder.v1()
.withType("org.apache.eventmesh.a2a.tools.call.req")
.withData(JsonUtils.toJSONString(request).getBytes())
.withExtension("protocol", "A2A") // Critical to trigger A2A adaptor
.build();

producer.publish(event);
```

### 5.2 Native CloudEvents Mode

This mode provides full control over all CloudEvent attributes and is recommended for robust, typed applications using the EventMesh SDK.

#### 5.2.1 Native RPC Request

**Client Sends (CloudEvent):**
```json
{
"specversion": "1.0",
"type": "com.example.rpc.request",
"source": "my-app",
"id": "evt-123",
"data": "...",
"protocol": "A2A",
"targetagent": "target-agent-001"
}
```

**Java SDK Example:**
```java
// See eventmesh-examples/src/main/java/org/apache/eventmesh/a2a/demo/ce/CloudEventsCaller.java

CloudEvent event = CloudEventBuilder.v1()
.withId(UUID.randomUUID().toString())
.withSource(URI.create("ce-client"))
.withType("com.example.rpc.request")
.withData("application/text", "RPC Payload".getBytes())
.withExtension("protocol", "A2A")
.withExtension("targetagent", "target-agent-001") // Explicit routing
.build();

producer.publish(event);
```

#### 5.2.2 Native Pub/Sub

**Client Sends (CloudEvent):**
```json
{
"specversion": "1.0",
"type": "com.example.notification",
"source": "my-app",
"subject": "broadcast.topic",
"protocol": "A2A"
}
```

#### 5.2.3 Native Streaming

**Client Sends (CloudEvent):**
```json
{
"specversion": "1.0",
"type": "com.example.stream",
"source": "my-app",
"subject": "stream-topic",
"protocol": "A2A",
"sessionid": "session-555",
"seq": "1"
}
```

## 6. Future Roadmap

* **Schema Registry**: Implement dynamic discovery of Agent capabilities via `methods/list`.
* **Sidecar Injection**: Fully integrate the adaptor into the EventMesh Sidecar.
Loading