Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
24 commits
Select commit Hold shift + click to select a range
bdf1338
Implement A2A (Agent-to-Agent) protocol with EventMesh publish/subscr…
qqeasonchen Sep 25, 2025
f9f21cd
Fix compilation errors in A2A protocol implementation
qqeasonchen Sep 28, 2025
e23bea4
feat(a2a): implement MCP over CloudEvents architecture
qqeasonchen Dec 8, 2025
bf5d1b0
refactor(a2a): cleanup legacy code, add SPI config and integration tests
qqeasonchen Dec 8, 2025
4e88d3b
Merge branch 'apache:master' into feature-a2a
qqeasonchen Dec 8, 2025
8abe649
docs(a2a): update documentation for v2.0 MCP architecture
qqeasonchen Dec 8, 2025
caef605
feat(a2a): implement native pub/sub, streaming, and dual-mode support
qqeasonchen Dec 9, 2025
3cb60d0
chore(a2a): cleanup runtime legacy implementation
qqeasonchen Dec 9, 2025
27f31ee
style(a2a): apply code formatting
qqeasonchen Dec 9, 2025
8be3aca
Fix build failures: Unit Tests, Checkstyle, Javadoc, and PMD
qqeasonchen Dec 9, 2025
5fee57a
Fix A2A Protocol SPI: Move to correct directory and fix content format
qqeasonchen Dec 9, 2025
a105a8a
Fix license headers for A2A protocol config and SPI file
qqeasonchen Dec 9, 2025
1578b85
Remove old SPI file location
qqeasonchen Dec 9, 2025
bbe8691
Enable removeUnusedImports in Spotless configuration
qqeasonchen Dec 9, 2025
f98da0d
Update A2A protocol configuration to match implementation capabilities
qqeasonchen Dec 9, 2025
8196310
Add A2A protocol demo examples
qqeasonchen Dec 9, 2025
eaca624
Add A2A protocol Provider demo examples
qqeasonchen Dec 9, 2025
a3ed0e7
Fix Checkstyle violations in A2A demo examples
qqeasonchen Dec 9, 2025
c805fe3
Fix ObjectConverterTest failures in eventmesh-common
qqeasonchen Dec 9, 2025
cb56c1c
Fix potential NPE in ObjectConverter.init
qqeasonchen Dec 9, 2025
31ed3cf
Update A2A Protocol documentation with usage examples for MCP/JSON-RP…
qqeasonchen Dec 9, 2025
e844ad2
Revert System Context mermaid graph and fix Native Pub/Sub Semantics …
qqeasonchen Dec 9, 2025
de18250
Fix ObjectConverterTest to resolve variable declaration usage distanc…
qqeasonchen Dec 9, 2025
3be1300
modify mermaid code
qqeasonchen Dec 9, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -527,7 +527,7 @@ subprojects {
tasks.register('printAllDependencyTrees', DependencyReportTask) {}

jacoco {
toolVersion = "0.8.6"
toolVersion = "0.8.11"
}

jacocoTestReport {
Expand Down
155 changes: 155 additions & 0 deletions docs/a2a-protocol/ARCHITECTURE.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,155 @@
# EventMesh A2A Protocol Architecture & Functional Specification

## 1. Overview

The **EventMesh A2A (Agent-to-Agent) Protocol** is a specialized, high-performance protocol plugin designed to enable asynchronous communication, collaboration, and task coordination between autonomous agents.

With the release of v2.0, A2A adopts the **MCP (Model Context Protocol)** architecture, transforming EventMesh into a robust **Agent Collaboration Bus**. It bridges the gap between synchronous LLM-based tool calls (JSON-RPC 2.0) and asynchronous Event-Driven Architectures (EDA), enabling scalable, distributed, and decoupled agent systems.

## 2. Core Philosophy

The architecture adheres to the principles outlined in the broader agent community (e.g., A2A Project, FIPA-ACL, and CloudEvents):

1. **JSON-RPC 2.0 as Lingua Franca**: Uses standard JSON-RPC for payload semantics, ensuring compatibility with modern LLM ecosystems (LangChain, AutoGen).
2. **Transport Agnostic**: Encapsulates all messages within **CloudEvents**, allowing transport over any EventMesh-supported protocol (HTTP, TCP, gRPC, Kafka).
3. **Async by Default**: Maps synchronous Request/Response patterns to asynchronous Event streams using correlation IDs.
4. **Native Pub/Sub Semantics**: Supports O(1) broadcast complexity, temporal decoupling (Late Join), and backpressure isolation, solving the scalability limits of traditional P2P webhook callbacks.

### 2.1 Native Pub/Sub Semantics

Traditional A2A implementations often rely on HTTP Webhooks (`POST /inbox`) for asynchronous callbacks. While functional, this **Point-to-Point (P2P)** model suffers from significant scaling issues:

* **Insufficient Fan-Out**: A publisher must send $N$ requests to reach $N$ subscribers, leading to $O(N)$ complexity.
* **Temporal Coupling**: Consumers must be online at the exact moment of publication.
* **Backpressure Propagation**: A slow subscriber can block the publisher.

**EventMesh A2A** solves this by introducing **Native Pub/Sub** capabilities:

```mermaid
graph LR
Publisher[Publisher Agent] -->|1. Publish (Once)| Bus[EventMesh Bus]

subgraph Fanout_Layer [EventMesh Fanout Layer]
Queue[Topic Queue]
end

Bus --> Queue

Queue -->|Push| Sub1[Subscriber 1]
Queue -->|Push| Sub2[Subscriber 2]
Queue -->|Push| Sub3[Subscriber 3]

style Bus fill:#f9f,stroke:#333
style Fanout_Layer fill:#ccf,stroke:#333
```

## 3. Architecture Design

### 3.1 System Context

```mermaid
graph TD
Client[Client Agent / LLM] -- "JSON-RPC Request" --> EM[EventMesh Runtime]
EM -- "CloudEvent (Request)" --> Server[Server Agent / Tool]
Server -- "CloudEvent (Response)" --> EM
EM -- "JSON-RPC Response" --> Client

subgraph Runtime [EventMesh Runtime]
Plugin[A2A Protocol Plugin]
end

style EM fill:#f9f,stroke:#333,stroke-width:4px
style Plugin fill:#ccf,stroke:#333,stroke-width:2px
```

### 3.2 Component Design (`eventmesh-protocol-a2a`)

The core logic resides in the `eventmesh-protocol-plugin` module.

* **`EnhancedA2AProtocolAdaptor`**: The central brain of the protocol.
* **Intelligent Parsing**: Automatically detects message format (MCP vs. Raw CloudEvent).
* **Protocol Delegation**: Delegates to `CloudEvents` or `HTTP` adaptors when necessary.
* **Semantic Mapping**: Transforms JSON-RPC methods and IDs into CloudEvent attributes.
* **`A2AProtocolConstants`**: Defines standard operations like `task/get`, `message/sendStream`.
* **`JsonRpc*` Models**: Strictly typed POJOs for JSON-RPC 2.0 compliance.

### 3.3 Asynchronous RPC Mapping ( The "Async Bridge" )

To support MCP on an Event Bus, synchronous RPC concepts are mapped to asynchronous events:

| Concept | MCP / JSON-RPC | CloudEvent Mapping |
| :--- | :--- | :--- |
| **Action** | `method` (e.g., `tools/call`) | **Type**: `org.apache.eventmesh.a2a.tools.call.req`<br>**Extension**: `a2amethod` |
| **Correlation** | `id` (e.g., `req-123`) | **Extension**: `collaborationid` (on Response)<br>**ID**: Preserved on Request |
| **Direction** | Implicit (Request vs Result) | **Extension**: `mcptype` (`request` or `response`) |
| **P2P Routing** | `params._agentId` | **Extension**: `targetagent` |
| **Pub/Sub Topic** | `params._topic` | **Subject**: The topic value (e.g. `market.btc`) |
| **Streaming Seq** | `params._seq` | **Extension**: `seq` |

## 4. Functional Specification

### 4.1 Message Processing Flow

1. **Ingestion**: The adaptor receives a `ProtocolTransportObject` (byte array/string).
2. **Detection**: Checks for `jsonrpc: "2.0"`.
3. **Transformation (MCP Mode)**:
* **Request**: Parses `method`.
* If `message/sendStream`, sets type suffix to `.stream` and extracts `_seq`.
* If `_topic` present, sets `subject` (Pub/Sub).
* If `_agentId` present, sets `targetagent` (P2P).
* **Response**: Parses `result`/`error`. Sets `collaborationid` = `id`.
4. **Batch Processing**: Splits JSON Array into a `List<CloudEvent>`.

### 4.2 Key Features

#### A. Intelligent Routing Support
* **Mechanism**: Promotes `_agentId` or `_topic` from JSON body to CloudEvent attributes.
* **Benefit**: Enables EventMesh Router to perform content-based routing (CBR) efficiently.

#### B. Batching
* **Benefit**: Significantly increases throughput for high-frequency interactions.

#### C. Streaming Support
* **Operation**: `message/sendStream`
* **Mechanism**: Maps to `.stream` event type and preserves sequence order via `seq` extension attribute.

## 5. Usage Examples

### 5.1 Sending a Tool Call (Request)

**Raw Payload:**
```json
{
"jsonrpc": "2.0",
"method": "tools/call",
"params": {
"name": "weather_service",
"arguments": { "city": "New York" }
},
"id": "msg-101"
}
```

### 5.2 Pub/Sub Broadcast

**Raw Payload:**
```json
{
"jsonrpc": "2.0",
"method": "market/update",
"params": {
"symbol": "BTC",
"price": 50000,
"_topic": "market.crypto.btc"
}
}
```

**Generated CloudEvent:**
* `subject`: `market.crypto.btc`
* `targetagent`: (Empty)

## 6. Future Roadmap

* **Schema Registry**: Implement dynamic discovery of Agent capabilities via `methods/list`.
* **Sidecar Injection**: Fully integrate the adaptor into the EventMesh Sidecar.
37 changes: 37 additions & 0 deletions docs/a2a-protocol/IMPLEMENTATION_SUMMARY.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
# 实现总结:EventMesh A2A 协议 v2.0 (MCP 版)

## 核心成果

A2A 协议已成功重构为采用 **MCP (Model Context Protocol)** 架构,将 EventMesh 定位为现代化的 **智能体协作总线 (Agent Collaboration Bus)**。

### 1. 核心协议重构 (`EnhancedA2AProtocolAdaptor`)
- **混合引擎 (JSON-RPC & CloudEvents)**: 实现了智能解析引擎,支持:
- **MCP/JSON-RPC 2.0**: 面向 LLM 和脚本的低门槛接入,自动封装 CloudEvent。
- **原生 CloudEvents**: 面向 EventMesh 原生应用的灵活接入,支持自定义元数据和透传。
- 适配器根据 `jsonrpc` 字段自动分发处理逻辑。
- **异步 RPC 映射**: 建立了同步 RPC 语义与异步事件驱动架构 (EDA) 之间的桥梁。
- **请求 (Requests)** 映射为 `*.req` 事件,属性 `mcptype=request`。
- **响应 (Responses)** 映射为 `*.resp` 事件,属性 `mcptype=response`。
- **关联 (Correlation)** 通过将 JSON-RPC `id` 映射到 CloudEvent `collaborationid` 来处理。
- **路由优化**: 实现了“深度内容路由提取”:
- `params._agentId` -> CloudEvent 扩展属性 `targetagent` (P2P)。
- `params._topic` -> CloudEvent Subject (Pub/Sub)。

### 2. 原生 Pub/Sub 与流式支持
- **Pub/Sub**: 通过将 `_topic` 映射到 CloudEvent Subject,支持 O(1) 广播复杂度。
- **流式 (Streaming)**: 支持 `message/sendStream` 操作,映射为 `.stream` 事件类型,并通过 `_seq` -> `seq` 扩展属性保证顺序。

### 3. 标准化与兼容性
- **数据模型**: 定义了符合 JSON-RPC 2.0 规范的 `JsonRpcRequest`、`JsonRpcResponse`、`JsonRpcError` POJO 对象。
- **方法定义**: 引入了 `McpMethods` 常量,支持标准操作如 `tools/call`、`resources/read`。

### 4. 测试与质量
- **单元测试**: 在 `EnhancedA2AProtocolAdaptorTest` 中实现了对请求/响应循环、错误处理、通知和批处理的全面覆盖。
- **集成演示**: `McpIntegrationDemoTest` 模拟了 P2P RPC 闭环。
- **模式测试**: `McpPatternsIntegrationTest` 模拟了 Pub/Sub 和 Streaming 流程。

## 下一步计划

1. **路由集成**: 更新 EventMesh Runtime Router,利用新的 `targetagent` 和 `a2amethod` 扩展属性实现高级路由规则。
2. **Schema 注册中心**: 实现一个“注册中心智能体 (Registry Agent)”,允许智能体动态发布其 MCP 能力 (`methods/list`)。
3. **Sidecar 支持**: 将 A2A 适配器逻辑暴露在 Sidecar 代理中,允许非 Java 智能体 (Python, Node.js) 通过简单的 HTTP/JSON 进行交互。
38 changes: 38 additions & 0 deletions docs/a2a-protocol/IMPLEMENTATION_SUMMARY_EN.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
# Implementation Summary: EventMesh A2A Protocol v2.0 (MCP Edition)

## Key Achievements

The A2A protocol has been successfully refactored to adopt the **MCP (Model Context Protocol)** architecture, positioning EventMesh as a modern **Agent Collaboration Bus**.

### 1. Core Protocol Refactoring (`EnhancedA2AProtocolAdaptor`)
- **Hybrid Engine (JSON-RPC & CloudEvents)**: Implemented a smart parsing engine that supports:
- **MCP/JSON-RPC 2.0**: For LLM-friendly, low-code integration.
- **Native CloudEvents**: For advanced, protocol-compliant integration.
- The adaptor automatically delegates processing based on the payload content (`jsonrpc` detection).
- **Async RPC Mapping**: Established a bridge between synchronous RPC semantics and asynchronous Event-Driven Architecture (EDA).
- **Requests** map to `*.req` events with `mcptype=request`.
- **Responses** map to `*.resp` events with `mcptype=response`.
- **Correlation** is handled by mapping JSON-RPC `id` to CloudEvent `collaborationid`.
- **Routing Optimization**: Implemented "Deep Body Routing" extraction:
- `params._agentId` -> CloudEvent Extension `targetagent` (P2P).
- `params._topic` -> CloudEvent Subject (Pub/Sub).

### 2. Native Pub/Sub & Streaming
- **Pub/Sub**: Added support for O(1) broadcast complexity by mapping `_topic` to CloudEvent Subject.
- **Streaming**: Added support for `message/sendStream` operation, mapping to `.stream` event type and preserving sequence via `_seq` -> `seq` extension.

### 3. Standardization & Compatibility
- **Models**: Defined `JsonRpcRequest`, `JsonRpcResponse`, `JsonRpcError` POJOs compliant with JSON-RPC 2.0 spec.
- **Methods**: Introduced `McpMethods` constants for standard operations like `tools/call`, `resources/read`.
- **Backward Compatibility**: Legacy A2A support is preserved where applicable, but deprecated in favor of MCP.

### 4. Testing & Quality
- **Unit Tests**: Comprehensive coverage for Request/Response cycles, Error handling, Notifications, and Batching in `EnhancedA2AProtocolAdaptorTest`.
- **Integration Demo**: `McpIntegrationDemoTest` simulates P2P RPC.
- **Patterns Test**: `McpPatternsIntegrationTest` simulates Pub/Sub and Streaming flows.

## Next Steps

1. **Router Integration**: Update EventMesh Runtime Router to leverage the new `targetagent` and `a2amethod` extension attributes for advanced routing rules.
2. **Schema Registry**: Implement a "Registry Agent" that allows agents to publish their MCP capabilities (`methods/list`) dynamically.
3. **Sidecar Support**: Expose the A2A adaptor logic in the Sidecar proxy to allow non-Java agents (Python, Node.js) to interact via simple HTTP/JSON.
Loading
Loading