diff --git a/docs/design-document/04-a2a-protocol/README.md b/docs/design-document/04-a2a-protocol/README.md new file mode 100644 index 000000000..11589f754 --- /dev/null +++ b/docs/design-document/04-a2a-protocol/README.md @@ -0,0 +1,269 @@ +--- +title: EventMesh A2A Protocol +sidebar_position: 4 +--- + +# EventMesh A2A Protocol Architecture & Functional Specification + +## 1. Overview + +The **EventMesh A2A (Agent-to-Agent) Protocol** is a specialized, high-performance protocol plugin designed to enable asynchronous communication, collaboration, and task coordination between autonomous agents. + +With the release of v2.0, A2A adopts the **MCP (Model Context Protocol)** architecture, transforming EventMesh into a robust **Agent Collaboration Bus**. It bridges the gap between synchronous LLM-based tool calls (JSON-RPC 2.0) and asynchronous Event-Driven Architectures (EDA), enabling scalable, distributed, and decoupled agent systems. + +## 2. Core Philosophy + +The architecture adheres to the principles outlined in the broader agent community (e.g., A2A Project, FIPA-ACL, and CloudEvents): + +1. **JSON-RPC 2.0 as Lingua Franca**: Uses standard JSON-RPC for payload semantics, ensuring compatibility with modern LLM ecosystems (LangChain, AutoGen). +2. **Transport Agnostic**: Encapsulates all messages within **CloudEvents**, allowing transport over any EventMesh-supported protocol (HTTP, TCP, gRPC, Kafka). +3. **Async by Default**: Maps synchronous Request/Response patterns to asynchronous Event streams using correlation IDs. +4. **Native Pub/Sub Semantics**: Supports O(1) broadcast complexity, temporal decoupling (Late Join), and backpressure isolation, solving the scalability limits of traditional P2P webhook callbacks. + +### 2.1 Native Pub/Sub Semantics + +Traditional A2A implementations often rely on HTTP Webhooks (`POST /inbox`) for asynchronous callbacks. While functional, this **Point-to-Point (P2P)** model suffers from significant scaling issues: + +* **Insufficient Fan-Out**: A publisher must send $N$ requests to reach $N$ subscribers, leading to $O(N)$ complexity. +* **Temporal Coupling**: Consumers must be online at the exact moment of publication. +* **Backpressure Propagation**: A slow subscriber can block the publisher. + +**EventMesh A2A** solves this by introducing **Native Pub/Sub** capabilities: + +```mermaid +flowchart LR + Publisher["Publisher Agent"] -->|1. Publish (Once)| Bus["EventMesh Bus"] + + subgraph FanoutLayer ["EventMesh Fanout Layer"] + Queue["Topic Queue"] + end + + Bus --> Queue + + Queue -->|"Push"| Sub1["Subscriber 1"] + Queue -->|"Push"| Sub2["Subscriber 2"] + Queue -->|"Push"| Sub3["Subscriber 3"] + + style Bus fill:#f9f,stroke:#333 + style FanoutLayer fill:#ccf,stroke:#333 +``` + +### 2.1 Hybrid Protocol Support (JSON-RPC & CloudEvents) + +A2A Protocol introduces a unique **Hybrid Architecture** that bridges the gap between the AI ecosystem (which prefers simple JSON) and the Cloud Native ecosystem (which prefers structured CloudEvents). + +| Feature | JSON-RPC 2.0 Mode | Native CloudEvents Mode | +| :--- | :--- | :--- | +| **Primary Audience** | LLMs, Scripts (Python/JS), LangChain | EventMesh Apps, Knative, Java SDK | +| **Philosophy** | **"Battery Included"** | **"Power User"** | +| **Usage** | Send raw JSON (`{"method":...}`) | Send `CloudEvent` object | +| **Complexity** | Low (No SDK required) | Medium (Requires CE SDK) | +| **Mechanism** | Adaptor automatically wraps JSON in CE | Adaptor passes through the event | + +**Benefits:** +* **Zero-Barrier Entry**: Developers can interact with the mesh using just `curl` or simple JSON libraries. +* **Full Flexibility**: Advanced users retain full control over CloudEvent attributes (Source, Type, Extensions) for complex routing or tracing scenarios. + +## 3. Architecture Design + +### 3.1 System Context + +```mermaid +graph TD + Client["Client Agent / LLM"] -- "JSON-RPC Request" --> EM["EventMesh Runtime"] + EM -- "CloudEvent (Request)" --> Server["Server Agent / Tool"] + Server -- "CloudEvent (Response)" --> EM + EM -- "JSON-RPC Response" --> Client + + subgraph Runtime ["EventMesh Runtime"] + Plugin["A2A Protocol Plugin"] + end + + style EM fill:#f9f,stroke:#333,stroke-width:4px + style Plugin fill:#ccf,stroke:#333,stroke-width:2px +``` + +### 3.2 Component Design (`eventmesh-protocol-a2a`) + +The core logic resides in the `eventmesh-protocol-plugin` module. + +* **`EnhancedA2AProtocolAdaptor`**: The central brain of the protocol. + * **Intelligent Parsing**: Automatically detects message format (MCP vs. Raw CloudEvent). + * **Protocol Delegation**: Delegates to `CloudEvents` or `HTTP` adaptors when necessary. + * **Semantic Mapping**: Transforms JSON-RPC methods and IDs into CloudEvent attributes. +* **`A2AProtocolConstants`**: Defines standard operations like `task/get`, `message/sendStream`. +* **`JsonRpc*` Models**: Strictly typed POJOs for JSON-RPC 2.0 compliance. + +### 3.3 Asynchronous RPC Mapping ( The "Async Bridge" ) + +To support MCP on an Event Bus, synchronous RPC concepts are mapped to asynchronous events: + +| Concept | MCP / JSON-RPC | CloudEvent Mapping | +| :--- | :--- | :--- | +| **Action** | `method` (e.g., `tools/call`) | **Type**: `org.apache.eventmesh.a2a.tools.call.req`
**Extension**: `a2amethod` | +| **Correlation** | `id` (e.g., `req-123`) | **Extension**: `collaborationid` (on Response)
**ID**: Preserved on Request | +| **Direction** | Implicit (Request vs Result) | **Extension**: `mcptype` (`request` or `response`) | +| **P2P Routing** | `params._agentId` | **Extension**: `targetagent` | +| **Pub/Sub Topic** | `params._topic` | **Subject**: The topic value (e.g. `market.btc`) | +| **Streaming Seq** | `params._seq` | **Extension**: `seq` | + +## 4. Functional Specification + +### 4.1 Message Processing Flow + +1. **Ingestion**: The adaptor receives a `ProtocolTransportObject` (byte array/string). +2. **Detection**: Checks for `jsonrpc: "2.0"`. +3. **Transformation (MCP Mode)**: + * **Request**: Parses `method`. + * If `message/sendStream`, sets type suffix to `.stream` and extracts `_seq`. + * If `_topic` present, sets `subject` (Pub/Sub). + * If `_agentId` present, sets `targetagent` (P2P). + * **Response**: Parses `result`/`error`. Sets `collaborationid` = `id`. +4. **Batch Processing**: Splits JSON Array into a `List`. + +### 4.2 Key Features + +#### A. Intelligent Routing Support +* **Mechanism**: Promotes `_agentId` or `_topic` from JSON body to CloudEvent attributes. +* **Benefit**: Enables EventMesh Router to perform content-based routing (CBR) efficiently. + +#### B. Batching +* **Benefit**: Significantly increases throughput for high-frequency interactions. + +#### C. Streaming Support +* **Operation**: `message/sendStream` +* **Mechanism**: Maps to `.stream` event type and preserves sequence order via `seq` extension attribute. + +## 5. Usage Examples + +### 5.1 JSON-RPC 2.0 (MCP) Mode + +This mode is ideal for LLMs, scripts, and simple integrations where you want to send raw JSON without worrying about CloudEvent headers. + +#### 5.1.1 Sending a Tool Call (RPC Request) + +**Client Sends (Raw JSON):** +```json +{ + "jsonrpc": "2.0", + "method": "tools/call", + "params": { + "name": "weather", + "city": "Shanghai", + "_agentId": "weather-agent" + }, + "id": "req-101" +} +``` + +**EventMesh Converts to:** +* **Type**: `org.apache.eventmesh.a2a.tools.call.req` +* **Extension (targetagent)**: `weather-agent` +* **Extension (mcptype)**: `request` + +#### 5.1.2 Pub/Sub Broadcast (Notification) + +**Client Sends (Raw JSON):** +```json +{ + "jsonrpc": "2.0", + "method": "notifications/alert", + "params": { + "message": "System Maintenance in 10 mins", + "_topic": "system.alerts" + } +} +``` + +**EventMesh Converts to:** +* **Type**: `org.apache.eventmesh.a2a.notifications.alert` +* **Subject**: `system.alerts` +* **Extension (mcptype)**: `notification` + +#### 5.1.3 Java SDK Example (MCP Mode) + +```java +// See eventmesh-examples/src/main/java/org/apache/eventmesh/a2a/demo/mcp/McpCaller.java + +Map request = new HashMap<>(); +request.put("jsonrpc", "2.0"); +request.put("method", "tools/call"); +request.put("params", Map.of("name", "weather", "_agentId", "weather-agent")); +request.put("id", UUID.randomUUID().toString()); + +CloudEvent event = CloudEventBuilder.v1() + .withType("org.apache.eventmesh.a2a.tools.call.req") + .withData(JsonUtils.toJSONString(request).getBytes()) + .withExtension("protocol", "A2A") // Critical to trigger A2A adaptor + .build(); + +producer.publish(event); +``` + +### 5.2 Native CloudEvents Mode + +This mode provides full control over all CloudEvent attributes and is recommended for robust, typed applications using the EventMesh SDK. + +#### 5.2.1 Native RPC Request + +**Client Sends (CloudEvent):** +```json +{ + "specversion": "1.0", + "type": "com.example.rpc.request", + "source": "my-app", + "id": "evt-123", + "data": "...", + "protocol": "A2A", + "targetagent": "target-agent-001" +} +``` + +**Java SDK Example:** +```java +// See eventmesh-examples/src/main/java/org/apache/eventmesh/a2a/demo/ce/CloudEventsCaller.java + +CloudEvent event = CloudEventBuilder.v1() + .withId(UUID.randomUUID().toString()) + .withSource(URI.create("ce-client")) + .withType("com.example.rpc.request") + .withData("application/text", "RPC Payload".getBytes()) + .withExtension("protocol", "A2A") + .withExtension("targetagent", "target-agent-001") // Explicit routing + .build(); + +producer.publish(event); +``` + +#### 5.2.2 Native Pub/Sub + +**Client Sends (CloudEvent):** +```json +{ + "specversion": "1.0", + "type": "com.example.notification", + "source": "my-app", + "subject": "broadcast.topic", + "protocol": "A2A" +} +``` + +#### 5.2.3 Native Streaming + +**Client Sends (CloudEvent):** +```json +{ + "specversion": "1.0", + "type": "com.example.stream", + "source": "my-app", + "subject": "stream-topic", + "protocol": "A2A", + "sessionid": "session-555", + "seq": "1" +} +``` + +## 6. Future Roadmap + +* **Schema Registry**: Implement dynamic discovery of Agent capabilities via `methods/list`. +* **Sidecar Injection**: Fully integrate the adaptor into the EventMesh Sidecar.