From 7be5aa0d31fb50039057152c0d5585ebcbd11834 Mon Sep 17 00:00:00 2001 From: Sandeep Belgavi Date: Mon, 9 Feb 2026 16:27:50 +0530 Subject: [PATCH] Revert "Add comprehensive A2A implementation with tests" --- A2A_SERVICE_ENHANCEMENTS.md | 87 ------ a2a/pom.xml | 111 +------ .../google/adk/a2a/grpc/A2aAgentExecutor.java | 270 ------------------ .../google/adk/a2a/grpc/A2aGrpcServer.java | 115 -------- .../com/google/adk/a2a/grpc/A2aServer.java | 195 ------------- .../google/adk/a2a/grpc/A2aServerBuilder.java | 78 ----- .../com/google/adk/a2a/grpc/A2aService.java | 246 ---------------- .../adk/a2a/grpc/A2aServiceEnhanced.java | 160 ----------- a2a/src/main/proto/a2a_service.proto | 27 -- .../adk/a2a/grpc/A2aAgentExecutorTest.java | 155 ---------- .../google/adk/a2a/grpc/A2aGrpcServerIT.java | 155 ---------- .../adk/a2a/grpc/A2aGrpcServerTest.java | 92 ------ .../adk/a2a/grpc/A2aServerBuilderTest.java | 66 ----- .../com/google/adk/a2a/grpc/A2aServerIT.java | 115 -------- .../google/adk/a2a/grpc/A2aServerTest.java | 68 ----- .../adk/a2a/grpc/A2aServiceEnhancedTest.java | 122 -------- .../google/adk/a2a/grpc/A2aServiceTest.java | 51 ---- .../google/adk/a2a/grpc/MediaSupportTest.java | 240 ---------------- core/pom.xml | 7 - .../java/com/google/adk/agents/LlmAgent.java | 143 ---------- .../google/adk/agents/LlmAgentA2aTest.java | 227 --------------- 21 files changed, 3 insertions(+), 2727 deletions(-) delete mode 100644 A2A_SERVICE_ENHANCEMENTS.md delete mode 100644 a2a/src/main/java/com/google/adk/a2a/grpc/A2aAgentExecutor.java delete mode 100644 a2a/src/main/java/com/google/adk/a2a/grpc/A2aGrpcServer.java delete mode 100644 a2a/src/main/java/com/google/adk/a2a/grpc/A2aServer.java delete mode 100644 a2a/src/main/java/com/google/adk/a2a/grpc/A2aServerBuilder.java delete mode 100644 a2a/src/main/java/com/google/adk/a2a/grpc/A2aService.java delete mode 100644 a2a/src/main/java/com/google/adk/a2a/grpc/A2aServiceEnhanced.java delete mode 100644 a2a/src/main/proto/a2a_service.proto delete mode 100644 a2a/src/test/java/com/google/adk/a2a/grpc/A2aAgentExecutorTest.java delete mode 100644 a2a/src/test/java/com/google/adk/a2a/grpc/A2aGrpcServerIT.java delete mode 100644 a2a/src/test/java/com/google/adk/a2a/grpc/A2aGrpcServerTest.java delete mode 100644 a2a/src/test/java/com/google/adk/a2a/grpc/A2aServerBuilderTest.java delete mode 100644 a2a/src/test/java/com/google/adk/a2a/grpc/A2aServerIT.java delete mode 100644 a2a/src/test/java/com/google/adk/a2a/grpc/A2aServerTest.java delete mode 100644 a2a/src/test/java/com/google/adk/a2a/grpc/A2aServiceEnhancedTest.java delete mode 100644 a2a/src/test/java/com/google/adk/a2a/grpc/A2aServiceTest.java delete mode 100644 a2a/src/test/java/com/google/adk/a2a/grpc/MediaSupportTest.java delete mode 100644 core/src/test/java/com/google/adk/agents/LlmAgentA2aTest.java diff --git a/A2A_SERVICE_ENHANCEMENTS.md b/A2A_SERVICE_ENHANCEMENTS.md deleted file mode 100644 index 0c621efc1..000000000 --- a/A2A_SERVICE_ENHANCEMENTS.md +++ /dev/null @@ -1,87 +0,0 @@ -# A2A Service Enhancements - -## Overview - -This PR enhances the A2A service with console output, session state initialization, and proper unary RPC response handling. These changes enable better observability and fix issues with session state management. - -## Key Changes - -### 1. Console Output for Observability - -Added console output to track A2A requests and responses: -- `🔵 A2A REQUEST RECEIVED` - Shows session ID, agent name, and query -- `🟢 A2A RESPONSE SENT` - Shows session ID, agent name, response length, and preview - -### 2. Session State Initialization - -Fixed "Context variable not found" errors by initializing required state variables: -- `currentDate` - Current date -- `sourceCityName`, `destinationCityName`, `dateOfJourney` - Empty strings (populated by agent) -- `mriSessionId` - Session ID -- `userMsg` - User query -- `_temp_a2aCallCount`, `_temp_a2aCalls` - A2A tracking variables - -### 3. Response Aggregation - -Changed from streaming multiple responses to aggregating all events into a single response: -- Required because `sendMessage` is unary RPC, not streaming -- Uses `toList().blockingGet()` to collect all events before sending -- Ensures single `onNext()` call followed by `onCompleted()` - -### 4. Session Management - -Changed from `InvocationContext` to explicit `Session` object management: -- Get or create session before agent execution -- Ensures session state is properly initialized -- Prevents state-related errors - -### 5. Constructor Fix - -Fixed `A2aServer` constructor to accept port parameter directly: -- Avoids `IllegalStateException` when calling `server.getPort()` before server starts -- Updated `A2aServerBuilder` to pass port to constructor -- Updated tests accordingly - -## Files Modified - -1. **A2aService.java** (+169/-38 lines) - - Added console output - - Added session state initialization - - Changed response handling (streaming → unary) - - Changed session management - -2. **A2aServer.java** (+1/-1 lines) - - Added port parameter to constructor - -3. **A2aServerBuilder.java** (+1/-1 lines) - - Updated to pass port to constructor - -4. **A2aServerTest.java** (+2/-2 lines) - - Updated test constructors to pass port - -## Testing - -✅ All existing tests pass -✅ Console output validated -✅ Session state initialization prevents errors -✅ Unary RPC response handling works correctly - -## Impact - -- **Observability**: Console output enables easy debugging and validation -- **Reliability**: Session state initialization prevents runtime errors -- **Compatibility**: Proper unary RPC handling ensures client-server compatibility -- **Backward Compatible**: No breaking changes - -## Related Changes - -This PR works in conjunction with changes in `rae` repository (`a2a_main` branch): -- Client updated to use correct proto package (`com.google.adk.a2a.grpc`) -- Client changed from streaming to unary RPC -- Agents updated with A2A handover tracking - ---- - -**Author**: Sandeep Belgavi -**Date**: January 18, 2026 -**Branch**: `a2a` diff --git a/a2a/pom.xml b/a2a/pom.xml index 6182a883a..dc606afa9 100644 --- a/a2a/pom.xml +++ b/a2a/pom.xml @@ -26,31 +26,9 @@ 2.38.0 1.4.4 4.13.2 - 1.62.2 - - io.grpc - grpc-netty-shaded - ${grpc.version} - runtime - - - io.grpc - grpc-protobuf - ${grpc.version} - - - io.grpc - grpc-stub - ${grpc.version} - - - com.google.code.gson - gson - 2.10.1 - com.google.adk google-adk @@ -128,99 +106,16 @@ ${truth.version} test - - org.junit.jupiter - junit-jupiter-api - 5.10.2 - test - - - org.mockito - mockito-core - 5.10.0 - test - - - org.mockito - mockito-junit-jupiter - 5.10.0 - test - - - org.junit.jupiter - junit-jupiter-engine - 5.10.2 - test - - - - - kr.motd.maven - os-maven-plugin - 1.7.0 - - org.apache.maven.plugins - maven-surefire-plugin - 3.2.5 - - - org.apache.maven.plugins - maven-failsafe-plugin - 3.2.5 - - - - integration-test - verify - - - - - - org.xolstice.maven.plugins - protobuf-maven-plugin - 0.6.1 + maven-compiler-plugin + 3.13.0 - - com.google.protobuf:protoc:3.21.7:exe:${os.detected.classifier} - - grpc-java - - io.grpc:protoc-gen-grpc-java:1.48.1:exe:${os.detected.classifier} - + ${java.version} - - - - compile - compile-custom - - - - - - org.codehaus.mojo - build-helper-maven-plugin - 3.2.0 - - - generate-sources - - add-source - - - - target/generated-sources/protobuf/java - target/generated-sources/protobuf/grpc-java - - - - diff --git a/a2a/src/main/java/com/google/adk/a2a/grpc/A2aAgentExecutor.java b/a2a/src/main/java/com/google/adk/a2a/grpc/A2aAgentExecutor.java deleted file mode 100644 index cc979bc98..000000000 --- a/a2a/src/main/java/com/google/adk/a2a/grpc/A2aAgentExecutor.java +++ /dev/null @@ -1,270 +0,0 @@ -/** Author: Sandeep Belgavi Date: January 17, 2026 */ -package com.google.adk.a2a.grpc; - -import com.google.adk.a2a.converters.RequestConverter; -import com.google.adk.a2a.converters.ResponseConverter; -import com.google.adk.agents.BaseAgent; -import com.google.adk.agents.RunConfig; -import com.google.adk.artifacts.InMemoryArtifactService; -import com.google.adk.events.Event; -import com.google.adk.memory.InMemoryMemoryService; -import com.google.adk.runner.Runner; -import com.google.adk.sessions.InMemorySessionService; -import com.google.adk.sessions.Session; -import com.google.common.collect.ImmutableList; -import com.google.genai.types.Content; -import io.a2a.spec.Artifact; -import io.a2a.spec.Message; -import io.a2a.spec.TaskArtifactUpdateEvent; -import io.a2a.spec.TaskState; -import io.a2a.spec.TaskStatus; -import io.a2a.spec.TaskStatusUpdateEvent; -import io.reactivex.rxjava3.core.Flowable; -import java.util.ArrayList; -import java.util.List; -import java.util.UUID; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * Executor that runs an ADK Agent against an A2A request and publishes updates to an event stream. - * - *

This class is similar to Python's A2aAgentExecutor and handles: - * - *

- */ -public class A2aAgentExecutor { - private static final Logger logger = LoggerFactory.getLogger(A2aAgentExecutor.class); - - private final Runner runner; - private final String appName; - - /** - * Creates an executor with a pre-configured Runner. - * - * @param runner The Runner instance to use for agent execution. - */ - public A2aAgentExecutor(Runner runner) { - this.runner = runner; - this.appName = runner.appName(); - } - - /** - * Creates an executor with a BaseAgent, automatically creating a Runner with in-memory services. - * - * @param agent The BaseAgent to execute. - * @param appName The application name. - */ - public A2aAgentExecutor(BaseAgent agent, String appName) { - this.runner = - new Runner.Builder() - .agent(agent) - .appName(appName) - .artifactService(new InMemoryArtifactService()) - .sessionService(new InMemorySessionService()) - .memoryService(new InMemoryMemoryService()) - .build(); - this.appName = appName; - } - - /** - * Executes an A2A request and returns a stream of A2A events (TaskStatusUpdateEvent, - * TaskArtifactUpdateEvent, etc.). - * - * @param request The A2A message request. - * @param taskId The task ID for this execution. - * @param contextId The context ID for this execution. - * @return A Flowable stream of A2A events. - */ - public Flowable execute(Message request, String taskId, String contextId) { - if (request == null) { - throw new IllegalArgumentException("A2A request must have a message"); - } - - // 1. Convert A2A request to ADK content - List inputEvents = - RequestConverter.convertAggregatedA2aMessageToAdkEvents( - request, UUID.randomUUID().toString()); - - // 2. Extract user content from events - Content userContent = null; - for (Event event : inputEvents) { - if (event.content().isPresent()) { - Content content = event.content().get(); - if ("user".equals(content.role())) { - userContent = content; - break; - } - } - } - - if (userContent == null || userContent.parts().isEmpty()) { - throw new IllegalArgumentException("A2A request must have user content"); - } - - // Make userContent final for lambda - final Content finalUserContent = userContent; - - // 3. Get or create session - final String userId = contextId; // Use contextId as userId for simplicity - final String sessionId = contextId; - - return Flowable.fromCallable( - () -> { - io.reactivex.rxjava3.core.Maybe sessionMaybe = - runner - .sessionService() - .getSession(appName, userId, sessionId, java.util.Optional.empty()); - - Session session = sessionMaybe.blockingGet(); - - if (session == null) { - java.util.concurrent.ConcurrentHashMap initialState = - new java.util.concurrent.ConcurrentHashMap<>(); - session = - runner - .sessionService() - .createSession(appName, userId, initialState, sessionId) - .blockingGet(); - } - return session; - }) - .flatMap( - session -> { - // 4. Publish task submitted event - TaskStatusUpdateEvent submittedEvent = - new TaskStatusUpdateEvent( - taskId, - new TaskStatus(TaskState.SUBMITTED, request, null), - contextId, - false, - null); - - // 5. Publish task working event - TaskStatusUpdateEvent workingEvent = - new TaskStatusUpdateEvent( - taskId, - new TaskStatus(TaskState.WORKING, null, null), - contextId, - false, - null); - - // 6. Execute agent - Flowable agentEvents = - runner.runAsync( - userId, - sessionId, - finalUserContent, - RunConfig.builder() - .setStreamingMode(RunConfig.StreamingMode.SSE) - .setMaxLlmCalls(20) - .build()); - - // 7. Convert ADK events to A2A task artifact events - Flowable a2aEvents = - agentEvents - .flatMap( - adkEvent -> { - List converted = new ArrayList<>(); - - // Convert to A2A message - Message a2aMessage = - ResponseConverter.eventToMessage(adkEvent, contextId); - if (a2aMessage != null && !a2aMessage.getParts().isEmpty()) { - // Create artifact update event - Artifact artifact = - new Artifact.Builder() - .artifactId(UUID.randomUUID().toString()) - .parts(a2aMessage.getParts()) - .build(); - TaskArtifactUpdateEvent artifactEvent = - new TaskArtifactUpdateEvent.Builder() - .taskId(taskId) - .contextId(contextId) - .artifact(artifact) - .lastChunk(false) - .build(); - converted.add(artifactEvent); - } - - return Flowable.fromIterable(converted); - }) - .onErrorResumeNext( - error -> { - logger.error("Error converting ADK event to A2A event", error); - TaskStatusUpdateEvent errorEvent = - new TaskStatusUpdateEvent( - taskId, - new TaskStatus( - TaskState.FAILED, - new Message.Builder() - .messageId(UUID.randomUUID().toString()) - .role(Message.Role.AGENT) - .parts( - ImmutableList.of( - new io.a2a.spec.TextPart( - "Error: " + error.getMessage()))) - .build(), - null), - contextId, - true, - null); - return Flowable.just(errorEvent); - }); - - // 8. Create final completion events - TaskArtifactUpdateEvent finalArtifact = - new TaskArtifactUpdateEvent.Builder() - .taskId(taskId) - .contextId(contextId) - .artifact( - new Artifact.Builder() - .artifactId(UUID.randomUUID().toString()) - .parts(ImmutableList.of()) - .build()) - .lastChunk(true) - .build(); - - TaskStatusUpdateEvent completedEvent = - new TaskStatusUpdateEvent( - taskId, - new TaskStatus(TaskState.COMPLETED, null, null), - contextId, - true, - null); - - // Combine all events in sequence: submitted → working → agent events → completion - return Flowable.just((io.a2a.spec.Event) submittedEvent) - .concatWith(Flowable.just((io.a2a.spec.Event) workingEvent)) - .concatWith(a2aEvents) - .concatWith(Flowable.just((io.a2a.spec.Event) finalArtifact)) - .concatWith(Flowable.just((io.a2a.spec.Event) completedEvent)); - }) - .onErrorResumeNext( - error -> { - logger.error("Error executing A2A request", error); - TaskStatusUpdateEvent errorEvent = - new TaskStatusUpdateEvent( - taskId, - new TaskStatus( - TaskState.FAILED, - new Message.Builder() - .messageId(UUID.randomUUID().toString()) - .role(Message.Role.AGENT) - .parts( - ImmutableList.of( - new io.a2a.spec.TextPart("Error: " + error.getMessage()))) - .build(), - null), - contextId, - true, - null); - return Flowable.just(errorEvent); - }); - } -} diff --git a/a2a/src/main/java/com/google/adk/a2a/grpc/A2aGrpcServer.java b/a2a/src/main/java/com/google/adk/a2a/grpc/A2aGrpcServer.java deleted file mode 100644 index 48950818a..000000000 --- a/a2a/src/main/java/com/google/adk/a2a/grpc/A2aGrpcServer.java +++ /dev/null @@ -1,115 +0,0 @@ -/** Author: Sandeep Belgavi Date: January 17, 2026 */ -package com.google.adk.a2a.grpc; - -import com.google.adk.agents.BaseAgent; -import java.io.IOException; -import java.net.URL; - -/** - * A utility class to easily expose an ADK BaseAgent as a standalone A2A gRPC server. This class - * provides a Python-like {@code to_a2a} experience for Java developers. - * - *

This utility abstracts away the boilerplate of creating a gRPC server, allowing developers to - * expose their agents with minimal code. - * - *

Example usage: - * - *

{@code
- * import com.google.adk.agents.LlmAgent;
- * import com.google.adk.a2a.grpc.A2aGrpcServer;
- *
- * public class Main {
- *     public static void main(String[] args) {
- *         BaseAgent agent = new LlmAgent(...);
- *         // This single line starts the entire gRPC A2A server!
- *         A2aGrpcServer.run(agent, args);
- *     }
- * }
- * }
- * - *

Note: This assumes the gRPC service definition (`.proto` file) and generated classes - * ({@code A2AServiceGrpc}, {@code SendMessageRequest}, {@code SendMessageResponse}) are available - * in the classpath through the Maven {@code protobuf-maven-plugin} configuration. - */ -public final class A2aGrpcServer { - - private A2aGrpcServer() { - // Utility class - prevent instantiation - } - - /** - * Starts an A2A gRPC server for the given agent instance. This method creates and starts a gRPC - * server on the default port (8080). - * - *

The server will run until the JVM is shut down. A shutdown hook is automatically registered - * to ensure graceful shutdown. - * - * @param agent The ADK agent to expose as a gRPC service. - * @throws IOException If the server fails to start. - * @throws InterruptedException If the server is interrupted while waiting for termination. - */ - public static void run(BaseAgent agent) throws IOException, InterruptedException { - run(agent, 8080); - } - - /** - * Starts an A2A gRPC server for the given agent instance on the specified port. - * - *

The server will run until the JVM is shut down. A shutdown hook is automatically registered - * to ensure graceful shutdown. - * - * @param agent The ADK agent to expose as a gRPC service. - * @param port The port number on which the server should listen. - * @throws IOException If the server fails to start. - * @throws InterruptedException If the server is interrupted while waiting for termination. - */ - public static void run(BaseAgent agent, int port) throws IOException, InterruptedException { - run(agent, port, null); - } - - /** - * Starts an A2A gRPC server for the given agent instance with optional registry integration. - * - *

The server will run until the JVM is shut down. A shutdown hook is automatically registered - * to ensure graceful shutdown. - * - * @param agent The ADK agent to expose as a gRPC service. - * @param port The port number on which the server should listen. - * @param registryUrl Optional URL of the service registry. If provided, the server will register - * itself with the registry upon startup and unregister upon shutdown. - * @throws IOException If the server fails to start. - * @throws InterruptedException If the server is interrupted while waiting for termination. - */ - public static void run(BaseAgent agent, int port, URL registryUrl) - throws IOException, InterruptedException { - if (agent == null) { - throw new IllegalArgumentException("Agent cannot be null"); - } - - A2aServer server = new A2aServerBuilder(agent).port(port).withRegistry(registryUrl).build(); - - // Start the server and block until termination - server.start(); - } - - /** - * Creates and returns an {@link A2aServerBuilder} for advanced configuration. This allows for - * more fine-grained control over the server setup. - * - *

Example: - * - *

{@code
-   * A2aServer server = A2aGrpcServer.builder(agent)
-   *     .port(9090)
-   *     .withRegistry(new URL("http://localhost:8081"))
-   *     .build();
-   * server.start();
-   * }
- * - * @param agent The ADK agent to expose as a gRPC service. - * @return A new {@link A2aServerBuilder} instance. - */ - public static A2aServerBuilder builder(BaseAgent agent) { - return new A2aServerBuilder(agent); - } -} diff --git a/a2a/src/main/java/com/google/adk/a2a/grpc/A2aServer.java b/a2a/src/main/java/com/google/adk/a2a/grpc/A2aServer.java deleted file mode 100644 index dc58b6966..000000000 --- a/a2a/src/main/java/com/google/adk/a2a/grpc/A2aServer.java +++ /dev/null @@ -1,195 +0,0 @@ -/** Author: Sandeep Belgavi Date: January 16, 2026 */ -package com.google.adk.a2a.grpc; - -import com.google.gson.Gson; -import io.grpc.Server; -import java.io.IOException; -import java.net.URI; -import java.net.URL; -import java.net.http.HttpClient; -import java.net.http.HttpRequest; -import java.net.http.HttpResponse; -import java.util.concurrent.TimeUnit; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** Manages the lifecycle of a standalone A2A gRPC server. */ -public class A2aServer { - - private static final Logger logger = LoggerFactory.getLogger(A2aServer.class); - private static final Gson gson = new Gson(); - - private final Server server; - private final URL registryUrl; - private final AgentInfo agentInfo; - private final HttpClient httpClient; - - /** - * Constructs a new server instance. - * - * @param server The underlying gRPC {@link Server} to manage. - * @param registryUrl The URL of the service registry. - * @param httpClient The HTTP client to use for registry communication. - */ - public A2aServer(Server server, URL registryUrl, HttpClient httpClient, int port) { - this.server = server; - this.registryUrl = registryUrl; - this.agentInfo = new AgentInfo(port); - this.httpClient = httpClient; - } - - /** - * Starts the gRPC server and blocks the current thread until the server is terminated. - * - * @throws IOException If the server fails to start. - * @throws InterruptedException If the server is interrupted while waiting for termination. - */ - public void start() throws IOException, InterruptedException { - start(true); - } - - /** - * Starts the gRPC server. - * - * @param awaitTermination If true, blocks the current thread until the server is terminated. - * @throws IOException If the server fails to start. - * @throws InterruptedException If the server is interrupted while waiting for termination. - */ - public void start(boolean awaitTermination) throws IOException, InterruptedException { - server.start(); - logger.info("A2A gRPC server started, listening on port " + server.getPort()); - - if (registryUrl != null) { - register(); - } - - // Add a shutdown hook to ensure a graceful server shutdown - Runtime.getRuntime() - .addShutdownHook( - new Thread( - () -> { - logger.info("Shutting down gRPC server since JVM is shutting down"); - try { - A2aServer.this.stop(); - } catch (InterruptedException e) { - logger.error("gRPC server shutdown interrupted", e); - Thread.currentThread().interrupt(); // Preserve the interrupted status - } - logger.info("Server shut down"); - })); - - // Block until the server is terminated - if (awaitTermination) { - server.awaitTermination(); - } - } - - /** - * Gets the port on which the server is listening. - * - * @return The port number. - */ - public int getPort() { - return server.getPort(); - } - - /** - * Stops the gRPC server gracefully. - * - * @throws InterruptedException If the server is interrupted while shutting down. - */ - public void stop() throws InterruptedException { - if (registryUrl != null) { - unregister(); - } - if (server != null) { - server.shutdown().awaitTermination(30, TimeUnit.SECONDS); - } - } - - private void register() { - try { - HttpRequest request = - HttpRequest.newBuilder() - .uri(URI.create(registryUrl + "/register")) - .header("Content-Type", "application/json") - .POST(HttpRequest.BodyPublishers.ofString(gson.toJson(agentInfo))) - .build(); - - httpClient - .sendAsync(request, HttpResponse.BodyHandlers.ofString()) - .whenComplete( - (response, throwable) -> { - if (throwable != null) { - logger.error("Failed to register with registry service", throwable); - return; - } - if (response.statusCode() >= 200 && response.statusCode() < 300) { - logger.info( - "Successfully registered with registry service. Status: {}, Body: {}", - response.statusCode(), - response.body()); - } else { - logger.warn( - "Failed to register with registry service. Status: {}, Body: {}", - response.statusCode(), - response.body()); - } - }); - } catch (Exception e) { - logger.error("Error building registry request", e); - } - } - - private void unregister() { - try { - HttpRequest request = - HttpRequest.newBuilder() - .uri(URI.create(registryUrl + "/unregister")) - .header("Content-Type", "application/json") - .POST(HttpRequest.BodyPublishers.ofString(gson.toJson(agentInfo))) - .build(); - - httpClient - .sendAsync(request, HttpResponse.BodyHandlers.ofString()) - .whenComplete( - (response, throwable) -> { - if (throwable != null) { - logger.error("Failed to unregister from registry service", throwable); - return; - } - if (response.statusCode() >= 200 && response.statusCode() < 300) { - logger.info( - "Successfully unregistered from registry service. Status: {}, Body: {}", - response.statusCode(), - response.body()); - } else { - logger.warn( - "Failed to unregister from registry service. Status: {}, Body: {}", - response.statusCode(), - response.body()); - } - }); - } catch (Exception e) { - logger.error("Error building unregister request", e); - } - } - - private static class AgentInfo { - private final String name; - private final String url; - - AgentInfo(int port) { - this.name = "agent-" + port; - this.url = "http://localhost:" + port; - } - - public String getName() { - return name; - } - - public String getUrl() { - return url; - } - } -} diff --git a/a2a/src/main/java/com/google/adk/a2a/grpc/A2aServerBuilder.java b/a2a/src/main/java/com/google/adk/a2a/grpc/A2aServerBuilder.java deleted file mode 100644 index 2df7d126e..000000000 --- a/a2a/src/main/java/com/google/adk/a2a/grpc/A2aServerBuilder.java +++ /dev/null @@ -1,78 +0,0 @@ -/** Author: Sandeep Belgavi Date: January 16, 2026 */ -package com.google.adk.a2a.grpc; - -import com.google.adk.agents.BaseAgent; -import io.grpc.Server; -import io.grpc.ServerBuilder; -import java.net.URL; -import java.net.http.HttpClient; - -/** A builder for creating a lightweight, standalone A2A gRPC server. */ -public class A2aServerBuilder { - - private final BaseAgent agent; - private int port = 8080; // Default port - private URL registryUrl; - private HttpClient httpClient; - - /** - * Constructs a new builder for a given ADK agent. - * - * @param agent The ADK {@link BaseAgent} to be exposed via the gRPC service. - */ - public A2aServerBuilder(BaseAgent agent) { - if (agent == null) { - throw new IllegalArgumentException("Agent cannot be null"); - } - this.agent = agent; - } - - /** - * Sets the port on which the server should listen. - * - * @param port The port number. - * @return This builder instance for chaining. - */ - public A2aServerBuilder port(int port) { - if (port <= 0 || port > 65535) { - throw new IllegalArgumentException("Port must be between 1 and 65535"); - } - this.port = port; - return this; - } - - /** - * Sets the URL of the A2A service registry. If set, the server will attempt to register itself - * with the registry upon startup. - * - * @param registryUrl The URL of the service registry. - * @return This builder instance for chaining. - */ - public A2aServerBuilder withRegistry(URL registryUrl) { - this.registryUrl = registryUrl; - return this; - } - - /** - * Sets the {@link HttpClient} to be used for registry communication. If not set, a default client - * will be created. - * - * @param httpClient The HTTP client to use. - * @return This builder instance for chaining. - */ - public A2aServerBuilder httpClient(HttpClient httpClient) { - this.httpClient = httpClient; - return this; - } - - /** - * Builds the {@link A2aServer} instance. - * - * @return A new {@link A2aServer} configured with the settings from this builder. - */ - public A2aServer build() { - Server grpcServer = ServerBuilder.forPort(port).addService(new A2aService(agent)).build(); - HttpClient client = (httpClient != null) ? httpClient : HttpClient.newHttpClient(); - return new A2aServer(grpcServer, registryUrl, client, this.port); - } -} diff --git a/a2a/src/main/java/com/google/adk/a2a/grpc/A2aService.java b/a2a/src/main/java/com/google/adk/a2a/grpc/A2aService.java deleted file mode 100644 index d49466b23..000000000 --- a/a2a/src/main/java/com/google/adk/a2a/grpc/A2aService.java +++ /dev/null @@ -1,246 +0,0 @@ -/** Author: Sandeep Belgavi Date: January 18, 2026 */ -package com.google.adk.a2a.grpc; - -import com.google.adk.agents.BaseAgent; -import com.google.adk.agents.RunConfig; -import com.google.adk.artifacts.InMemoryArtifactService; -import com.google.adk.events.Event; -import com.google.adk.memory.InMemoryMemoryService; -import com.google.adk.runner.Runner; -import com.google.adk.sessions.InMemorySessionService; -import com.google.adk.sessions.Session; -import com.google.genai.types.Content; -import com.google.genai.types.Part; -import io.grpc.stub.StreamObserver; -import io.reactivex.rxjava3.core.Flowable; -import io.reactivex.rxjava3.core.Maybe; -import java.time.LocalDate; -import java.util.Map; -import java.util.Optional; -import java.util.UUID; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * Implements the A2A gRPC service, bridging requests to an ADK agent. - * - *

This service uses a Runner internally to properly handle agent execution with session - * management, artifacts, and memory services. - */ -class A2aService extends A2AServiceGrpc.A2AServiceImplBase { - - private static final Logger logger = LoggerFactory.getLogger(A2aService.class); - private final BaseAgent agent; - private final Runner runner; - private static final String DEFAULT_APP_NAME = "adk-a2a-server"; - - /** - * Constructs the service with a given ADK agent. - * - *

This constructor creates an internal Runner with in-memory services for sessions, artifacts, - * and memory. For production use, consider using a constructor that accepts a pre-configured - * Runner. - * - * @param agent The {@link BaseAgent} to handle the requests. - */ - A2aService(BaseAgent agent) { - this.agent = agent; - // Create a Runner with in-memory services for simplicity - // In production, this could be configured with persistent services - this.runner = - new Runner.Builder() - .agent(agent) - .appName(DEFAULT_APP_NAME) - .artifactService(new InMemoryArtifactService()) - .sessionService(new InMemorySessionService()) - .memoryService(new InMemoryMemoryService()) - .build(); - } - - /** - * Constructs the service with a pre-configured Runner. - * - * @param agent The {@link BaseAgent} to handle the requests. - * @param runner The {@link Runner} instance to use for agent execution. - */ - A2aService(BaseAgent agent, Runner runner) { - this.agent = agent; - this.runner = runner; - } - - @Override - public void sendMessage( - SendMessageRequest request, StreamObserver responseObserver) { - String userQuery = request.getUserQuery(); - String sessionId = - request.getSessionId() != null && !request.getSessionId().isEmpty() - ? request.getSessionId() - : "new-session"; - - // Console output for validation - System.out.println("\n" + "=".repeat(80)); - System.out.println("🔵 A2A REQUEST RECEIVED"); - System.out.println("=".repeat(80)); - System.out.println("Session ID: " + sessionId); - System.out.println("Agent: " + agent.name()); - System.out.println("Query: " + userQuery); - System.out.println("=".repeat(80) + "\n"); - - logger.info("Received message from client: sessionId={}, query={}", sessionId, userQuery); - - try { - // Extract session ID from request or generate a new one - String userId = "default-user"; - final String actualSessionId = - (sessionId == null || sessionId.equals("new-session")) - ? UUID.randomUUID().toString() - : sessionId; - - // Create user content from the request - Content userContent = Content.fromParts(Part.fromText(request.getUserQuery())); - - // Get or create session - Runner requires session to exist - Maybe maybeSession = - runner - .sessionService() - .getSession(runner.appName(), userId, actualSessionId, Optional.empty()); - - Session session = - maybeSession - .switchIfEmpty( - runner - .sessionService() - .createSession(runner.appName(), userId, null, null) - .toMaybe()) - .blockingGet(); - - // Initialize session state with default values if missing - // This prevents errors when agent instruction references state variables - // The state map is mutable, so we can update it directly - Map sessionState = session.state(); - if (sessionState.isEmpty() || !sessionState.containsKey("currentDate")) { - sessionState.put("currentDate", LocalDate.now().toString()); - sessionState.put("sourceCityName", ""); - sessionState.put("destinationCityName", ""); - sessionState.put("dateOfJourney", ""); - sessionState.put("mriSessionId", actualSessionId); - sessionState.put("userMsg", request.getUserQuery()); - // Track A2A handovers (using _temp prefix for non-persistent tracking) - sessionState.put("_temp_a2aCallCount", 0); - sessionState.put("_temp_a2aCalls", new java.util.ArrayList>()); - } else { - // Ensure required fields exist even if state is not empty - if (!sessionState.containsKey("mriSessionId")) { - sessionState.put("mriSessionId", actualSessionId); - } - if (!sessionState.containsKey("userMsg")) { - sessionState.put("userMsg", request.getUserQuery()); - } - if (!sessionState.containsKey("currentDate")) { - sessionState.put("currentDate", LocalDate.now().toString()); - } - // Ensure empty strings for city names if not present - if (!sessionState.containsKey("sourceCityName")) { - sessionState.put("sourceCityName", ""); - } - if (!sessionState.containsKey("destinationCityName")) { - sessionState.put("destinationCityName", ""); - } - if (!sessionState.containsKey("dateOfJourney")) { - sessionState.put("dateOfJourney", ""); - } - // Initialize A2A tracking if not present - if (!sessionState.containsKey("_temp_a2aCallCount")) { - sessionState.put("_temp_a2aCallCount", 0); - } - if (!sessionState.containsKey("_temp_a2aCalls")) { - sessionState.put("_temp_a2aCalls", new java.util.ArrayList>()); - } - } - - // Configure run settings for streaming - RunConfig runConfig = - RunConfig.builder() - .setStreamingMode(RunConfig.StreamingMode.SSE) - .setMaxLlmCalls(20) - .build(); - - // Execute the agent using Runner with Session object - Flowable eventStream = runner.runAsync(session, userContent, runConfig); - - // Collect all events and aggregate into a single response - // Since sendMessage is unary RPC, we need to send a single response - eventStream - .doOnError( - error -> { - logger.error("Error executing agent", error); - responseObserver.onError( - io.grpc.Status.INTERNAL - .withDescription("Agent execution failed: " + error.getMessage()) - .withCause(error) - .asRuntimeException()); - }) - .toList() - .subscribe( - events -> { - // Aggregate all events into a single response - StringBuilder aggregatedResponse = new StringBuilder(); - for (Event event : events) { - String content = event.stringifyContent(); - if (content != null && !content.trim().isEmpty()) { - if (aggregatedResponse.length() > 0) { - aggregatedResponse.append("\n"); - } - aggregatedResponse.append(content); - } - } - - String finalResponse = aggregatedResponse.toString(); - if (finalResponse.isEmpty()) { - finalResponse = "(No response generated)"; - } - - // Console output for validation - System.out.println("\n" + "=".repeat(80)); - System.out.println("🟢 A2A RESPONSE SENT"); - System.out.println("=".repeat(80)); - System.out.println("Session ID: " + actualSessionId); - System.out.println("Agent: " + agent.name()); - System.out.println("Response Length: " + finalResponse.length() + " characters"); - System.out.println("Response Preview (first 500 chars):"); - System.out.println( - finalResponse.substring(0, Math.min(500, finalResponse.length()))); - if (finalResponse.length() > 500) { - System.out.println("... (truncated)"); - } - System.out.println("=".repeat(80) + "\n"); - - logger.info( - "Agent execution completed for sessionId={}, response length={}", - actualSessionId, - finalResponse.length()); - - SendMessageResponse response = - SendMessageResponse.newBuilder().setAgentReply(finalResponse).build(); - responseObserver.onNext(response); - responseObserver.onCompleted(); - }, - error -> { - logger.error("Error collecting events", error); - responseObserver.onError( - io.grpc.Status.INTERNAL - .withDescription("Error collecting agent response: " + error.getMessage()) - .withCause(error) - .asRuntimeException()); - }); - - } catch (Exception e) { - logger.error("Unexpected error processing request", e); - responseObserver.onError( - io.grpc.Status.INTERNAL - .withDescription("Unexpected error: " + e.getMessage()) - .withCause(e) - .asRuntimeException()); - } - } -} diff --git a/a2a/src/main/java/com/google/adk/a2a/grpc/A2aServiceEnhanced.java b/a2a/src/main/java/com/google/adk/a2a/grpc/A2aServiceEnhanced.java deleted file mode 100644 index c840e60e3..000000000 --- a/a2a/src/main/java/com/google/adk/a2a/grpc/A2aServiceEnhanced.java +++ /dev/null @@ -1,160 +0,0 @@ -/** Author: Sandeep Belgavi Date: January 17, 2026 */ -package com.google.adk.a2a.grpc; - -import com.google.adk.agents.BaseAgent; -import io.a2a.spec.Artifact; -import io.a2a.spec.Event; -import io.a2a.spec.Message; -import io.a2a.spec.TaskArtifactUpdateEvent; -import io.a2a.spec.TaskStatus; -import io.a2a.spec.TaskStatusUpdateEvent; -import io.a2a.spec.TextPart; -import io.grpc.stub.StreamObserver; -import io.reactivex.rxjava3.core.Flowable; -import java.util.UUID; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * Enhanced A2A gRPC service implementation with full task lifecycle support. - * - *

This service provides Python-like functionality: - * - *

    - *
  • Full A2A task lifecycle (submitted → working → completed/failed) - *
  • Task status updates - *
  • Task artifact updates - *
  • Proper event conversion - *
- */ -class A2aServiceEnhanced extends A2AServiceGrpc.A2AServiceImplBase { - - private static final Logger logger = LoggerFactory.getLogger(A2aServiceEnhanced.class); - private final A2aAgentExecutor executor; - - /** - * Constructs the service with a given ADK agent. - * - * @param agent The {@link BaseAgent} to handle the requests. - */ - A2aServiceEnhanced(BaseAgent agent) { - this.executor = new A2aAgentExecutor(agent, "adk-a2a-server"); - } - - /** - * Constructs the service with a pre-configured executor. - * - * @param executor The {@link A2aAgentExecutor} instance. - */ - A2aServiceEnhanced(A2aAgentExecutor executor) { - this.executor = executor; - } - - @Override - public void sendMessage( - SendMessageRequest request, StreamObserver responseObserver) { - logger.info( - "Received message from client: sessionId={}, query={}", - request.getSessionId(), - request.getUserQuery()); - - try { - // Generate task and context IDs - String taskId = UUID.randomUUID().toString(); - String contextId = - request.getSessionId() != null && !request.getSessionId().isEmpty() - ? request.getSessionId() - : UUID.randomUUID().toString(); - - // Convert request to A2A Message - Message a2aMessage = - new Message.Builder() - .messageId(UUID.randomUUID().toString()) - .role(Message.Role.USER) - .parts( - com.google.common.collect.ImmutableList.of(new TextPart(request.getUserQuery()))) - .build(); - - // Execute using A2aAgentExecutor (handles full task lifecycle) - Flowable a2aEvents = executor.execute(a2aMessage, taskId, contextId); - - // Stream events back to client - a2aEvents - .doOnError( - error -> { - logger.error("Error executing agent", error); - responseObserver.onError( - io.grpc.Status.INTERNAL - .withDescription("Agent execution failed: " + error.getMessage()) - .withCause(error) - .asRuntimeException()); - }) - .subscribe( - event -> { - // Convert A2A event to gRPC response - String responseText = convertEventToResponseText(event); - if (responseText != null && !responseText.trim().isEmpty()) { - SendMessageResponse response = - SendMessageResponse.newBuilder().setAgentReply(responseText).build(); - responseObserver.onNext(response); - } - }, - error -> { - logger.error("Error in event stream", error); - }, - () -> { - logger.info("Agent execution completed for taskId={}", taskId); - responseObserver.onCompleted(); - }); - - } catch (Exception e) { - logger.error("Unexpected error processing request", e); - responseObserver.onError( - io.grpc.Status.INTERNAL - .withDescription("Unexpected error: " + e.getMessage()) - .withCause(e) - .asRuntimeException()); - } - } - - /** - * Converts an A2A event to a response text string. - * - * @param event The A2A event to convert. - * @return The response text, or null if the event should be skipped. - */ - private String convertEventToResponseText(Event event) { - if (event instanceof TaskStatusUpdateEvent statusEvent) { - TaskStatus status = statusEvent.getStatus(); - if (status.state() == io.a2a.spec.TaskState.SUBMITTED) { - return "[Task Submitted]"; - } else if (status.state() == io.a2a.spec.TaskState.WORKING) { - return "[Task Working...]"; - } else if (status.state() == io.a2a.spec.TaskState.COMPLETED) { - return "[Task Completed]"; - } else if (status.state() == io.a2a.spec.TaskState.FAILED) { - Message errorMsg = status.message(); - if (errorMsg != null && !errorMsg.getParts().isEmpty()) { - io.a2a.spec.Part part = errorMsg.getParts().get(0); - if (part instanceof TextPart) { - return "[Error] " + ((TextPart) part).getText(); - } - } - return "[Task Failed]"; - } - } else if (event instanceof TaskArtifactUpdateEvent artifactEvent) { - // Extract text from artifact parts - StringBuilder text = new StringBuilder(); - Artifact artifact = artifactEvent.getArtifact(); - if (artifact != null && artifact.parts() != null) { - for (io.a2a.spec.Part part : artifact.parts()) { - if (part instanceof TextPart) { - text.append(((TextPart) part).getText()); - } - } - } - return text.length() > 0 ? text.toString() : null; - } - return null; - } -} diff --git a/a2a/src/main/proto/a2a_service.proto b/a2a/src/main/proto/a2a_service.proto deleted file mode 100644 index 4251aba36..000000000 --- a/a2a/src/main/proto/a2a_service.proto +++ /dev/null @@ -1,27 +0,0 @@ -// Author: Sandeep Belgavi -// Date: January 16, 2026 - -syntax = "proto3"; - -package com.google.adk.a2a.grpc; - -option java_multiple_files = true; -option java_package = "com.google.adk.a2a.grpc"; -option java_outer_classname = "A2aServiceProto"; - -// The A2A service definition. -service A2AService { - // Sends a message to an agent and receives a response. - rpc SendMessage (SendMessageRequest) returns (SendMessageResponse) {} -} - -// The request message containing the user's query and session information. -message SendMessageRequest { - string session_id = 1; - string user_query = 2; -} - -// The response message containing the agent's reply. -message SendMessageResponse { - string agent_reply = 1; -} diff --git a/a2a/src/test/java/com/google/adk/a2a/grpc/A2aAgentExecutorTest.java b/a2a/src/test/java/com/google/adk/a2a/grpc/A2aAgentExecutorTest.java deleted file mode 100644 index 29d731ae8..000000000 --- a/a2a/src/test/java/com/google/adk/a2a/grpc/A2aAgentExecutorTest.java +++ /dev/null @@ -1,155 +0,0 @@ -/** Author: Sandeep Belgavi Date: January 17, 2026 */ -package com.google.adk.a2a.grpc; - -import static com.google.common.truth.Truth.assertThat; -import static org.junit.jupiter.api.Assertions.assertDoesNotThrow; -import static org.junit.jupiter.api.Assertions.assertNotNull; -import static org.junit.jupiter.api.Assertions.assertThrows; -import static org.mockito.ArgumentMatchers.any; -import static org.mockito.ArgumentMatchers.anyString; -import static org.mockito.Mockito.when; - -import com.google.adk.agents.BaseAgent; -import com.google.adk.agents.RunConfig; -import com.google.adk.events.Event; -import com.google.adk.runner.Runner; -import com.google.adk.sessions.Session; -import com.google.common.collect.ImmutableList; -import com.google.genai.types.Content; -import com.google.genai.types.Part; -import io.a2a.spec.Message; -import io.a2a.spec.TaskState; -import io.a2a.spec.TaskStatusUpdateEvent; -import io.a2a.spec.TextPart; -import io.reactivex.rxjava3.core.Flowable; -import java.util.List; -import java.util.UUID; -import org.junit.jupiter.api.BeforeEach; -import org.junit.jupiter.api.Test; -import org.junit.jupiter.api.extension.ExtendWith; -import org.mockito.Mock; -import org.mockito.junit.jupiter.MockitoExtension; - -@ExtendWith(MockitoExtension.class) -class A2aAgentExecutorTest { - - @Mock private BaseAgent mockAgent; - @Mock private Runner mockRunner; - @Mock private Session mockSession; - - private A2aAgentExecutor executor; - - @BeforeEach - void setUp() { - when(mockRunner.appName()).thenReturn("test-app"); - com.google.adk.sessions.InMemorySessionService sessionService = - new com.google.adk.sessions.InMemorySessionService(); - when(mockRunner.sessionService()).thenReturn(sessionService); - - when(mockSession.userId()).thenReturn("test-user"); - when(mockSession.id()).thenReturn("test-session"); - - when(mockRunner.runAsync(anyString(), anyString(), any(Content.class), any(RunConfig.class))) - .thenReturn( - Flowable.just( - Event.builder() - .id(UUID.randomUUID().toString()) - .author("agent") - .content( - Content.builder() - .role("model") - .parts(ImmutableList.of(Part.builder().text("Hello, world!").build())) - .build()) - .build())); - } - - @Test - void testConstructor_withAgent() { - executor = new A2aAgentExecutor(mockAgent, "test-app"); - assertNotNull(executor); - } - - @Test - void testConstructor_withRunner() { - executor = new A2aAgentExecutor(mockRunner); - assertNotNull(executor); - } - - @Test - void testExecute_withTextMessage() { - executor = new A2aAgentExecutor(mockRunner); - - Message request = - new Message.Builder() - .messageId(UUID.randomUUID().toString()) - .role(Message.Role.USER) - .parts(ImmutableList.of(new TextPart("Hello"))) - .build(); - - String taskId = UUID.randomUUID().toString(); - String contextId = UUID.randomUUID().toString(); - - assertDoesNotThrow( - () -> { - Flowable events = executor.execute(request, taskId, contextId); - assertNotNull(events); - - // Collect events - List eventList = events.toList().blockingGet(); - assertThat(eventList).isNotEmpty(); - - // Check for task lifecycle events - boolean hasSubmitted = false; - boolean hasWorking = false; - boolean hasCompleted = false; - - for (io.a2a.spec.Event event : eventList) { - if (event instanceof TaskStatusUpdateEvent) { - TaskStatusUpdateEvent statusEvent = (TaskStatusUpdateEvent) event; - TaskState state = statusEvent.getStatus().state(); - if (state == TaskState.SUBMITTED) { - hasSubmitted = true; - } else if (state == TaskState.WORKING) { - hasWorking = true; - } else if (state == TaskState.COMPLETED) { - hasCompleted = true; - } - } - } - - assertThat(hasSubmitted).isTrue(); - assertThat(hasWorking).isTrue(); - assertThat(hasCompleted).isTrue(); - }); - } - - @Test - void testExecute_withNullRequest_throwsException() { - executor = new A2aAgentExecutor(mockRunner); - - assertThrows( - IllegalArgumentException.class, - () -> { - executor.execute(null, UUID.randomUUID().toString(), UUID.randomUUID().toString()); - }); - } - - @Test - void testExecute_withEmptyMessage_throwsException() { - executor = new A2aAgentExecutor(mockRunner); - - Message emptyRequest = - new Message.Builder() - .messageId(UUID.randomUUID().toString()) - .role(Message.Role.USER) - .parts(ImmutableList.of()) - .build(); - - assertThrows( - IllegalArgumentException.class, - () -> { - executor.execute( - emptyRequest, UUID.randomUUID().toString(), UUID.randomUUID().toString()); - }); - } -} diff --git a/a2a/src/test/java/com/google/adk/a2a/grpc/A2aGrpcServerIT.java b/a2a/src/test/java/com/google/adk/a2a/grpc/A2aGrpcServerIT.java deleted file mode 100644 index c7b6dc6e3..000000000 --- a/a2a/src/test/java/com/google/adk/a2a/grpc/A2aGrpcServerIT.java +++ /dev/null @@ -1,155 +0,0 @@ -/** Author: Sandeep Belgavi Date: January 17, 2026 */ -package com.google.adk.a2a.grpc; - -import static com.google.common.truth.Truth.assertThat; -import static org.junit.jupiter.api.Assertions.assertNotNull; - -import com.google.adk.agents.BaseAgent; -import com.google.adk.agents.InvocationContext; -import com.google.adk.events.Event; -import com.google.common.collect.ImmutableList; -import com.google.genai.types.Content; -import com.google.genai.types.Part; -import io.grpc.ManagedChannel; -import io.grpc.ManagedChannelBuilder; -import io.reactivex.rxjava3.core.Flowable; -import java.io.IOException; -import java.util.UUID; -import org.junit.jupiter.api.AfterEach; -import org.junit.jupiter.api.BeforeEach; -import org.junit.jupiter.api.Test; - -/** - * Integration tests for A2aGrpcServer. - * - *

These tests verify end-to-end functionality including: - * - *

    - *
  • Server startup and shutdown - *
  • gRPC communication - *
  • Text message handling - *
  • Session management - *
- */ -class A2aGrpcServerIT { - - private A2aServer server; - private ManagedChannel channel; - private A2AServiceGrpc.A2AServiceBlockingStub client; - private int port; - - private final BaseAgent testAgent = createTestAgent(); - - private BaseAgent createTestAgent() { - return new BaseAgent( - "test-agent", "Test agent for integration tests", ImmutableList.of(), null, null) { - @Override - protected Flowable runAsyncImpl(InvocationContext context) { - return runLiveImpl(context); - } - - @Override - protected Flowable runLiveImpl(InvocationContext context) { - String userText = - context - .userContent() - .map( - c -> - c.parts().get().stream() - .filter(p -> p.text().isPresent()) - .map(p -> p.text().get()) - .reduce("", (a, b) -> a + b)) - .orElse(""); - return Flowable.just( - Event.builder() - .author("agent") - .content( - Content.builder() - .role("model") - .parts(ImmutableList.of(Part.builder().text("Echo: " + userText).build())) - .build()) - .build()); - } - }; - } - - @BeforeEach - void setUp() throws IOException, InterruptedException { - // Find an available port - port = findAvailablePort(); - - // Start server - server = new A2aServerBuilder(testAgent).port(port).build(); - server.start(false); // Non-blocking - - // Wait for server to start - Thread.sleep(1000); - - // Create gRPC client - channel = ManagedChannelBuilder.forAddress("localhost", port).usePlaintext().build(); - client = A2AServiceGrpc.newBlockingStub(channel); - } - - @AfterEach - void tearDown() throws InterruptedException { - if (channel != null) { - channel.shutdown(); - } - if (server != null) { - server.stop(); - } - } - - @Test - void testSendMessage_withTextRequest() { - SendMessageRequest request = - SendMessageRequest.newBuilder() - .setSessionId(UUID.randomUUID().toString()) - .setUserQuery("Hello, A2A!") - .build(); - - SendMessageResponse response = client.sendMessage(request); - - assertNotNull(response); - assertThat(response.getAgentReply()).isNotEmpty(); - // Response should contain the echo - assertThat(response.getAgentReply()).contains("Hello, A2A!"); - } - - @Test - void testSendMessage_withDifferentSessions() { - String session1 = UUID.randomUUID().toString(); - String session2 = UUID.randomUUID().toString(); - - SendMessageRequest request1 = - SendMessageRequest.newBuilder().setSessionId(session1).setUserQuery("First").build(); - SendMessageRequest request2 = - SendMessageRequest.newBuilder().setSessionId(session2).setUserQuery("Second").build(); - - SendMessageResponse response1 = client.sendMessage(request1); - SendMessageResponse response2 = client.sendMessage(request2); - - assertNotNull(response1); - assertNotNull(response2); - // Responses should contain the respective queries - assertThat(response1.getAgentReply()).isNotEmpty(); - assertThat(response2.getAgentReply()).isNotEmpty(); - } - - @Test - void testSendMessage_withEmptySessionId() { - SendMessageRequest request = - SendMessageRequest.newBuilder().setSessionId("").setUserQuery("Test").build(); - - SendMessageResponse response = client.sendMessage(request); - - assertNotNull(response); - assertThat(response.getAgentReply()).isNotEmpty(); - } - - private int findAvailablePort() throws IOException { - try (java.net.ServerSocket socket = new java.net.ServerSocket(0)) { - return socket.getLocalPort(); - } - } -} diff --git a/a2a/src/test/java/com/google/adk/a2a/grpc/A2aGrpcServerTest.java b/a2a/src/test/java/com/google/adk/a2a/grpc/A2aGrpcServerTest.java deleted file mode 100644 index 3b2976d06..000000000 --- a/a2a/src/test/java/com/google/adk/a2a/grpc/A2aGrpcServerTest.java +++ /dev/null @@ -1,92 +0,0 @@ -/** Author: Sandeep Belgavi Date: January 17, 2026 */ -package com.google.adk.a2a.grpc; - -import static org.junit.jupiter.api.Assertions.assertDoesNotThrow; -import static org.junit.jupiter.api.Assertions.assertNotNull; -import static org.junit.jupiter.api.Assertions.assertThrows; -import static org.mockito.ArgumentMatchers.any; -import static org.mockito.Mockito.when; - -import com.google.adk.agents.BaseAgent; -import com.google.adk.agents.InvocationContext; -import com.google.adk.events.Event; -import io.reactivex.rxjava3.core.Flowable; -import java.net.URL; -import org.junit.jupiter.api.BeforeEach; -import org.junit.jupiter.api.Test; -import org.junit.jupiter.api.extension.ExtendWith; -import org.mockito.Mock; -import org.mockito.junit.jupiter.MockitoExtension; - -@ExtendWith(MockitoExtension.class) -class A2aGrpcServerTest { - - @Mock private BaseAgent mockAgent; - - @BeforeEach - void setUp() { - when(mockAgent.runAsync(any(InvocationContext.class))) - .thenReturn(Flowable.just(Event.builder().author("test").build())); - } - - @Test - void testRun_withAgent_only() { - assertDoesNotThrow( - () -> { - // This will block, so we'll test the builder pattern instead - A2aServerBuilder builder = A2aGrpcServer.builder(mockAgent); - assertNotNull(builder); - }); - } - - @Test - void testRun_withAgentAndPort() { - assertDoesNotThrow( - () -> { - A2aServerBuilder builder = A2aGrpcServer.builder(mockAgent).port(9090); - assertNotNull(builder); - }); - } - - @Test - void testRun_withAgentPortAndRegistry() throws Exception { - assertDoesNotThrow( - () -> { - A2aServerBuilder builder = - A2aGrpcServer.builder(mockAgent) - .port(9090) - .withRegistry(new URL("http://localhost:8081")); - assertNotNull(builder); - }); - } - - @Test - void testBuilder_withNullAgent_throwsException() { - assertThrows( - IllegalArgumentException.class, - () -> { - A2aGrpcServer.builder(null); - }); - } - - @Test - void testBuilder_returnsBuilder() { - A2aServerBuilder builder = A2aGrpcServer.builder(mockAgent); - assertNotNull(builder); - } - - @Test - void testBuilder_portConfiguration() { - A2aServerBuilder builder = A2aGrpcServer.builder(mockAgent).port(9090); - A2aServer server = builder.build(); - assertNotNull(server); - } - - @Test - void testBuilder_registryConfiguration() throws Exception { - A2aServerBuilder builder = - A2aGrpcServer.builder(mockAgent).port(9090).withRegistry(new URL("http://localhost:8081")); - A2aServer server = builder.build(); - assertNotNull(server); - } -} diff --git a/a2a/src/test/java/com/google/adk/a2a/grpc/A2aServerBuilderTest.java b/a2a/src/test/java/com/google/adk/a2a/grpc/A2aServerBuilderTest.java deleted file mode 100644 index 2eef65966..000000000 --- a/a2a/src/test/java/com/google/adk/a2a/grpc/A2aServerBuilderTest.java +++ /dev/null @@ -1,66 +0,0 @@ -/** Author: Sandeep Belgavi Date: January 17, 2026 */ -package com.google.adk.a2a.grpc; - -import static org.junit.jupiter.api.Assertions.assertNotNull; -import static org.junit.jupiter.api.Assertions.assertThrows; -import static org.mockito.Mockito.mock; - -import com.google.adk.agents.BaseAgent; -import java.net.MalformedURLException; -import java.net.URL; -import java.net.http.HttpClient; -import org.junit.jupiter.api.BeforeEach; -import org.junit.jupiter.api.Test; - -class A2aServerBuilderTest { - - private BaseAgent mockAgent; - - @BeforeEach - void setUp() { - mockAgent = mock(BaseAgent.class); - } - - @Test - void testBuild_Default() { - A2aServerBuilder builder = new A2aServerBuilder(mockAgent); - A2aServer server = builder.build(); - assertNotNull(server); - } - - @Test - void testPort_Valid() { - A2aServerBuilder builder = new A2aServerBuilder(mockAgent).port(8081); - A2aServer server = builder.build(); - assertNotNull(server); - } - - @Test - void testPort_Invalid() { - A2aServerBuilder builder = new A2aServerBuilder(mockAgent); - assertThrows(IllegalArgumentException.class, () -> builder.port(0)); - assertThrows(IllegalArgumentException.class, () -> builder.port(-1)); - assertThrows(IllegalArgumentException.class, () -> builder.port(65536)); - } - - @Test - void testWithRegistry() throws MalformedURLException { - URL registryUrl = new URL("http://localhost:8080"); - A2aServerBuilder builder = new A2aServerBuilder(mockAgent).withRegistry(registryUrl); - A2aServer server = builder.build(); - assertNotNull(server); - } - - @Test - void testHttpClient() { - HttpClient mockHttpClient = mock(HttpClient.class); - A2aServerBuilder builder = new A2aServerBuilder(mockAgent).httpClient(mockHttpClient); - A2aServer server = builder.build(); - assertNotNull(server); - } - - @Test - void testConstructor_NullAgent() { - assertThrows(IllegalArgumentException.class, () -> new A2aServerBuilder(null)); - } -} diff --git a/a2a/src/test/java/com/google/adk/a2a/grpc/A2aServerIT.java b/a2a/src/test/java/com/google/adk/a2a/grpc/A2aServerIT.java deleted file mode 100644 index e9dfa5ba6..000000000 --- a/a2a/src/test/java/com/google/adk/a2a/grpc/A2aServerIT.java +++ /dev/null @@ -1,115 +0,0 @@ -/** Author: Sandeep Belgavi Date: January 17, 2026 */ -package com.google.adk.a2a.grpc; - -import static com.google.common.truth.Truth.assertThat; -import static org.junit.jupiter.api.Assertions.assertNotNull; - -import com.google.adk.agents.BaseAgent; -import com.google.adk.agents.InvocationContext; -import com.google.adk.events.Event; -import com.google.common.collect.ImmutableList; -import com.google.genai.types.Content; -import com.google.genai.types.Part; -import io.grpc.ManagedChannel; -import io.grpc.ManagedChannelBuilder; -import io.reactivex.rxjava3.core.Flowable; -import java.io.IOException; -import java.util.UUID; -import org.junit.jupiter.api.AfterEach; -import org.junit.jupiter.api.BeforeEach; -import org.junit.jupiter.api.Test; - -/** - * Integration tests for A2aServer (basic server functionality). - * - *

These tests verify end-to-end functionality without registry. - */ -class A2aServerIT { - - private A2aServer server; - private ManagedChannel channel; - private A2AServiceGrpc.A2AServiceBlockingStub client; - private int port; - - private final BaseAgent testAgent = createTestAgent(); - - private BaseAgent createTestAgent() { - return new BaseAgent( - "test-agent", "Test agent for integration tests", ImmutableList.of(), null, null) { - @Override - protected Flowable runAsyncImpl(InvocationContext context) { - return runLiveImpl(context); - } - - @Override - protected Flowable runLiveImpl(InvocationContext context) { - String userText = - context - .userContent() - .map( - c -> - c.parts().get().stream() - .filter(p -> p.text().isPresent()) - .map(p -> p.text().get()) - .reduce("", (a, b) -> a + b)) - .orElse(""); - return Flowable.just( - Event.builder() - .author("agent") - .content( - Content.builder() - .role("model") - .parts(ImmutableList.of(Part.builder().text("Echo: " + userText).build())) - .build()) - .build()); - } - }; - } - - @BeforeEach - void setUp() throws IOException, InterruptedException { - // Find an available port - port = findAvailablePort(); - - // Start server without registry - server = new A2aServerBuilder(testAgent).port(port).build(); - server.start(false); // Non-blocking - - // Wait for server to start - Thread.sleep(1000); - - // Create gRPC client - channel = ManagedChannelBuilder.forAddress("localhost", port).usePlaintext().build(); - client = A2AServiceGrpc.newBlockingStub(channel); - } - - @AfterEach - void tearDown() throws InterruptedException { - if (channel != null) { - channel.shutdown(); - } - if (server != null) { - server.stop(); - } - } - - @Test - void testA2aServer_basicFunctionality() { - SendMessageRequest request = - SendMessageRequest.newBuilder() - .setSessionId(UUID.randomUUID().toString()) - .setUserQuery("hello") - .build(); - SendMessageResponse response = client.sendMessage(request); - - // Verify the response from the server - assertNotNull(response); - assertThat(response.getAgentReply()).isNotEmpty(); - } - - private int findAvailablePort() throws IOException { - try (java.net.ServerSocket socket = new java.net.ServerSocket(0)) { - return socket.getLocalPort(); - } - } -} diff --git a/a2a/src/test/java/com/google/adk/a2a/grpc/A2aServerTest.java b/a2a/src/test/java/com/google/adk/a2a/grpc/A2aServerTest.java deleted file mode 100644 index 56f15459a..000000000 --- a/a2a/src/test/java/com/google/adk/a2a/grpc/A2aServerTest.java +++ /dev/null @@ -1,68 +0,0 @@ -/** Author: Sandeep Belgavi Date: January 17, 2026 */ -package com.google.adk.a2a.grpc; - -import static org.mockito.ArgumentMatchers.any; -import static org.mockito.Mockito.verify; -import static org.mockito.Mockito.when; - -import io.grpc.Server; -import java.io.IOException; -import java.net.URL; -import java.net.http.HttpClient; -import java.net.http.HttpRequest; -import java.net.http.HttpResponse; -import java.util.concurrent.CompletableFuture; -import org.junit.jupiter.api.BeforeEach; -import org.junit.jupiter.api.Test; -import org.junit.jupiter.api.extension.ExtendWith; -import org.mockito.Mock; -import org.mockito.junit.jupiter.MockitoExtension; - -@ExtendWith(MockitoExtension.class) -class A2aServerTest { - - @Mock private Server mockServer; - @Mock private HttpClient mockHttpClient; - @Mock private HttpResponse mockHttpResponse; - - private A2aServer a2aServer; - - @BeforeEach - void setUp() throws IOException { - when(mockServer.getPort()).thenReturn(8080); - } - - @Test - void testStartAndStop_withRegistry() throws IOException, InterruptedException { - URL registryUrl = new URL("http://localhost:8080"); - a2aServer = new A2aServer(mockServer, registryUrl, mockHttpClient, 8080); - - when(mockHttpClient.sendAsync(any(HttpRequest.class), any(HttpResponse.BodyHandler.class))) - .thenReturn(CompletableFuture.completedFuture(mockHttpResponse)); - - when(mockServer.shutdown()).thenReturn(mockServer); - when(mockServer.awaitTermination(any(long.class), any(java.util.concurrent.TimeUnit.class))) - .thenReturn(true); - - a2aServer.start(false); - verify(mockServer).start(); - verify(mockHttpClient).sendAsync(any(HttpRequest.class), any(HttpResponse.BodyHandler.class)); - - a2aServer.stop(); - verify(mockServer).shutdown(); - } - - @Test - void testStartAndStop_withoutRegistry() throws IOException, InterruptedException { - a2aServer = new A2aServer(mockServer, null, mockHttpClient, 8080); - when(mockServer.shutdown()).thenReturn(mockServer); - when(mockServer.awaitTermination(any(long.class), any(java.util.concurrent.TimeUnit.class))) - .thenReturn(true); - - a2aServer.start(false); - verify(mockServer).start(); - - a2aServer.stop(); - verify(mockServer).shutdown(); - } -} diff --git a/a2a/src/test/java/com/google/adk/a2a/grpc/A2aServiceEnhancedTest.java b/a2a/src/test/java/com/google/adk/a2a/grpc/A2aServiceEnhancedTest.java deleted file mode 100644 index 451dab380..000000000 --- a/a2a/src/test/java/com/google/adk/a2a/grpc/A2aServiceEnhancedTest.java +++ /dev/null @@ -1,122 +0,0 @@ -/** Author: Sandeep Belgavi Date: January 17, 2026 */ -package com.google.adk.a2a.grpc; - -import static org.junit.jupiter.api.Assertions.assertNotNull; -import static org.mockito.ArgumentMatchers.any; -import static org.mockito.ArgumentMatchers.anyString; -import static org.mockito.Mockito.verify; -import static org.mockito.Mockito.when; - -import com.google.adk.agents.BaseAgent; -import com.google.common.collect.ImmutableList; -import io.a2a.spec.Artifact; -import io.a2a.spec.Message; -import io.a2a.spec.TaskArtifactUpdateEvent; -import io.a2a.spec.TaskState; -import io.a2a.spec.TaskStatus; -import io.a2a.spec.TaskStatusUpdateEvent; -import io.a2a.spec.TextPart; -import io.grpc.stub.StreamObserver; -import io.reactivex.rxjava3.core.Flowable; -import java.util.UUID; -import org.junit.jupiter.api.BeforeEach; -import org.junit.jupiter.api.Test; -import org.junit.jupiter.api.extension.ExtendWith; -import org.mockito.Mock; -import org.mockito.junit.jupiter.MockitoExtension; - -@ExtendWith(MockitoExtension.class) -class A2aServiceEnhancedTest { - - @Mock private BaseAgent mockAgent; - @Mock private A2aAgentExecutor mockExecutor; - @Mock private StreamObserver mockResponseObserver; - - private A2aServiceEnhanced service; - - @BeforeEach - void setUp() { - service = new A2aServiceEnhanced(mockAgent); - } - - @Test - void testConstructor_withAgent() { - A2aServiceEnhanced newService = new A2aServiceEnhanced(mockAgent); - assertNotNull(newService); - } - - @Test - void testConstructor_withExecutor() { - A2aServiceEnhanced newService = new A2aServiceEnhanced(mockExecutor); - assertNotNull(newService); - } - - @Test - void testSendMessage_withTextRequest() { - // Setup - SendMessageRequest request = - SendMessageRequest.newBuilder().setSessionId("test-session").setUserQuery("Hello").build(); - - // Mock executor to return task lifecycle events - Message a2aMessage = - new Message.Builder() - .messageId(UUID.randomUUID().toString()) - .role(Message.Role.USER) - .parts(ImmutableList.of(new TextPart("Hello"))) - .build(); - - TaskStatusUpdateEvent submittedEvent = - new TaskStatusUpdateEvent( - "task-1", - new TaskStatus(TaskState.SUBMITTED, a2aMessage, null), - "context-1", - false, - null); - - TaskStatusUpdateEvent workingEvent = - new TaskStatusUpdateEvent( - "task-1", new TaskStatus(TaskState.WORKING, null, null), "context-1", false, null); - - TaskArtifactUpdateEvent artifactEvent = - new TaskArtifactUpdateEvent.Builder() - .taskId("task-1") - .contextId("context-1") - .artifact( - new Artifact.Builder() - .artifactId(UUID.randomUUID().toString()) - .parts(ImmutableList.of(new TextPart("Hello, world!"))) - .build()) - .lastChunk(false) - .build(); - - TaskStatusUpdateEvent completedEvent = - new TaskStatusUpdateEvent( - "task-1", new TaskStatus(TaskState.COMPLETED, null, null), "context-1", true, null); - - // Use executor-based service - A2aServiceEnhanced serviceWithExecutor = new A2aServiceEnhanced(mockExecutor); - when(mockExecutor.execute(any(Message.class), anyString(), anyString())) - .thenReturn(Flowable.just(submittedEvent, workingEvent, artifactEvent, completedEvent)); - - // Execute - serviceWithExecutor.sendMessage(request, mockResponseObserver); - - // Verify - verify(mockResponseObserver).onNext(any(SendMessageResponse.class)); - verify(mockResponseObserver).onCompleted(); - } - - @Test - void testSendMessage_withError() { - SendMessageRequest request = - SendMessageRequest.newBuilder().setSessionId("test-session").setUserQuery("Hello").build(); - - A2aServiceEnhanced serviceWithExecutor = new A2aServiceEnhanced(mockExecutor); - when(mockExecutor.execute(any(Message.class), anyString(), anyString())) - .thenReturn(Flowable.error(new RuntimeException("Test error"))); - - serviceWithExecutor.sendMessage(request, mockResponseObserver); - - verify(mockResponseObserver).onError(any(Throwable.class)); - } -} diff --git a/a2a/src/test/java/com/google/adk/a2a/grpc/A2aServiceTest.java b/a2a/src/test/java/com/google/adk/a2a/grpc/A2aServiceTest.java deleted file mode 100644 index 45032bc4f..000000000 --- a/a2a/src/test/java/com/google/adk/a2a/grpc/A2aServiceTest.java +++ /dev/null @@ -1,51 +0,0 @@ -package com.google.adk.a2a.grpc; - -import static org.mockito.ArgumentMatchers.any; -import static org.mockito.Mockito.verify; -import static org.mockito.Mockito.when; - -import com.google.adk.agents.BaseAgent; -import com.google.adk.agents.InvocationContext; -import com.google.adk.events.Event; -import com.google.genai.types.Content; -import com.google.genai.types.Part; -import io.grpc.stub.StreamObserver; -import io.reactivex.rxjava3.core.Flowable; -import org.junit.jupiter.api.BeforeEach; -import org.junit.jupiter.api.Test; -import org.junit.jupiter.api.extension.ExtendWith; -import org.mockito.Mock; -import org.mockito.junit.jupiter.MockitoExtension; - -@ExtendWith(MockitoExtension.class) -class A2aServiceTest { - - @Mock private BaseAgent mockAgent; - @Mock private StreamObserver mockResponseObserver; - - private A2aService a2aService; - - @BeforeEach - void setUp() { - a2aService = new A2aService(mockAgent); - } - - @Test - void testSendMessage_returnsAgentResponse() { - when(mockAgent.runAsync(any(InvocationContext.class))) - .thenReturn( - Flowable.just( - Event.builder() - .author("test-agent") - .content(Content.fromParts(Part.fromText("Hello, world!"))) - .build())); - - SendMessageRequest request = - SendMessageRequest.newBuilder().setSessionId("test-session").setUserQuery("Hi").build(); - - a2aService.sendMessage(request, mockResponseObserver); - - verify(mockResponseObserver).onNext(any(SendMessageResponse.class)); - verify(mockResponseObserver).onCompleted(); - } -} diff --git a/a2a/src/test/java/com/google/adk/a2a/grpc/MediaSupportTest.java b/a2a/src/test/java/com/google/adk/a2a/grpc/MediaSupportTest.java deleted file mode 100644 index 10b034224..000000000 --- a/a2a/src/test/java/com/google/adk/a2a/grpc/MediaSupportTest.java +++ /dev/null @@ -1,240 +0,0 @@ -/** Author: Sandeep Belgavi Date: January 17, 2026 */ -package com.google.adk.a2a.grpc; - -import static com.google.common.truth.Truth.assertThat; - -import com.google.adk.a2a.converters.PartConverter; -import com.google.genai.types.Blob; -import com.google.genai.types.FileData; -import com.google.genai.types.Part; -import io.a2a.spec.FilePart; -import io.a2a.spec.FileWithBytes; -import io.a2a.spec.FileWithUri; -import io.a2a.spec.TextPart; -import java.util.Base64; -import java.util.Optional; -import org.junit.jupiter.api.Test; - -/** Tests for image, audio, and video support in A2A. */ -class MediaSupportTest { - - // Sample base64 encoded data (1x1 pixel PNG) - private static final String SAMPLE_IMAGE_BASE64 = - "iVBORw0KGgoAAAANSUhEUgAAAAEAAAABCAYAAAAfFcSJAAAADUlEQVR42mNk+M9QDwADhgGAWjR9awAAAABJRU5ErkJggg=="; - - // Sample base64 encoded data (minimal WAV file) - private static final String SAMPLE_AUDIO_BASE64 = - "UklGRiQAAABXQVZFZm10IBAAAAABAAEAQB8AAEAfAAABAAgAZGF0YQAAAAA="; - - // Sample base64 encoded data (minimal MP4 file) - private static final String SAMPLE_VIDEO_BASE64 = - "AAAAIGZ0eXBpc29tAAACAGlzb21pc28yYXZjMW1wNDEAAAAIZnJlZQAAAZhtZGF0AAACrgYF//+q3EXpvebZSLeWLNgg2SPu73gyNjQgLSBjb3JlIDE0OCByMTkzOCA1YzY1MDk1IC0gSC4yNjQvTVBFRy00IEFWQyBjb2RlYyAtIENvcHlsZWZ0IDIwMDMtMjAxNyAtIGh0dHA6Ly93d3cudmlkZW9sYW4ub3JnL3gyNjQuaHRtbCAtIG9wdGlvbnM6IGNhYmFjPTEgcmVmPTMgZGVibG9jaz0xOjA6MCBhbmFseXNlPTB4MzoweDExMyBtZT1oZXggc3VibWU9NyBwc3k9MSBwc3lfcmQ9MS4wMDowLjAwIG1peGVkX3JlZj0xIG1lX3JhbmdlPTE2IGNocm9tYV9tZT0xIHRyZWxsaXM9MSA4eDhkY3Q9MSBjcW09MCBkZWFkem9uZT0yMSwxMSBmYXN0X3Bza2lwPTEgY2hyb21hX3FwX29mZnNldD0tMiB0aHJlYWRzPTEgbG9va2FoZWFkX3RocmVhZHM9MSBzbGljZWRfdGhyZWFkcz0wIG5yPTAgZGVjaW1hdGU9MSBpbnRlcmxhY2VkPTAgYmx1cmF5X2NvbXBhdD0wIGNvbnN0cmFpbmVkX2ludHJhPTAgYmZyYW1lcz0zIGJfcHlyYW1pZD0yIGJfYWRhcHQ9MSBiX2JpYXM9MCBkaXJlY3Q9MSB3ZWlnaHRiPTEgb3Blbl9nb3A9MCB3ZWlnaHRwPTIga2V5aW50PTI1MCBrZXlpbnRfbWluPTI1IHNjZW5lY3V0PTQwIGludHJhX3JlZnJlc2g9MCByY19sb29rYWhlYWQ9NDAgcmM9Y3JmIG1idHJlZT0xIGRyYWZ0PTEgdHRfcGVyZnJhbWU9MSB0cmVfbG9va2FoZWFkPTQw"; - - @Test - void testTextPart_conversion() { - // A2A TextPart to GenAI Part - TextPart textPart = new TextPart("Hello, world!"); - Optional genaiPart = PartConverter.toGenaiPart(textPart); - - assertThat(genaiPart).isPresent(); - assertThat(genaiPart.get().text()).isPresent(); - assertThat(genaiPart.get().text().get()).isEqualTo("Hello, world!"); - - // GenAI Part to A2A TextPart - Part genaiTextPart = Part.builder().text("Hello, world!").build(); - Optional> a2aPart = PartConverter.fromGenaiPart(genaiTextPart); - - assertThat(a2aPart).isPresent(); - assertThat(a2aPart.get()).isInstanceOf(TextPart.class); - assertThat(((TextPart) a2aPart.get()).getText()).isEqualTo("Hello, world!"); - } - - @Test - void testImageFilePart_withUri() { - // A2A FilePart (Image) with URI to GenAI Part - FilePart imagePart = - new FilePart(new FileWithUri("image/png", "test.png", "https://example.com/image.png")); - - Optional genaiPart = PartConverter.toGenaiPart(imagePart); - - assertThat(genaiPart).isPresent(); - assertThat(genaiPart.get().fileData()).isPresent(); - FileData fileData = genaiPart.get().fileData().get(); - assertThat(fileData.fileUri()).isPresent(); - assertThat(fileData.fileUri().get()).isEqualTo("https://example.com/image.png"); - assertThat(fileData.mimeType()).isPresent(); - assertThat(fileData.mimeType().get()).isEqualTo("image/png"); - } - - @Test - void testImageFilePart_withBytes() { - // A2A FilePart (Image) with base64 bytes to GenAI Part - FilePart imagePart = - new FilePart(new FileWithBytes("image/png", "test.png", SAMPLE_IMAGE_BASE64)); - - Optional genaiPart = PartConverter.toGenaiPart(imagePart); - - assertThat(genaiPart).isPresent(); - assertThat(genaiPart.get().inlineData()).isPresent(); - Blob blob = genaiPart.get().inlineData().get(); - assertThat(blob.mimeType()).isPresent(); - assertThat(blob.mimeType().get()).isEqualTo("image/png"); - assertThat(blob.data()).isPresent(); - assertThat(blob.data().get().length).isGreaterThan(0); - } - - @Test - void testAudioFilePart_withUri() { - // A2A FilePart (Audio) with URI to GenAI Part - FilePart audioPart = - new FilePart(new FileWithUri("audio/mpeg", "test.mp3", "https://example.com/audio.mp3")); - - Optional genaiPart = PartConverter.toGenaiPart(audioPart); - - assertThat(genaiPart).isPresent(); - assertThat(genaiPart.get().fileData()).isPresent(); - FileData fileData = genaiPart.get().fileData().get(); - assertThat(fileData.mimeType()).isPresent(); - assertThat(fileData.mimeType().get()).isEqualTo("audio/mpeg"); - } - - @Test - void testAudioFilePart_withBytes() { - // A2A FilePart (Audio) with base64 bytes to GenAI Part - FilePart audioPart = - new FilePart(new FileWithBytes("audio/wav", "test.wav", SAMPLE_AUDIO_BASE64)); - - Optional genaiPart = PartConverter.toGenaiPart(audioPart); - - assertThat(genaiPart).isPresent(); - assertThat(genaiPart.get().inlineData()).isPresent(); - Blob blob = genaiPart.get().inlineData().get(); - assertThat(blob.mimeType()).isPresent(); - assertThat(blob.mimeType().get()).isEqualTo("audio/wav"); - } - - @Test - void testVideoFilePart_withUri() { - // A2A FilePart (Video) with URI to GenAI Part - FilePart videoPart = - new FilePart(new FileWithUri("video/mp4", "test.mp4", "https://example.com/video.mp4")); - - Optional genaiPart = PartConverter.toGenaiPart(videoPart); - - assertThat(genaiPart).isPresent(); - assertThat(genaiPart.get().fileData()).isPresent(); - FileData fileData = genaiPart.get().fileData().get(); - assertThat(fileData.mimeType()).isPresent(); - assertThat(fileData.mimeType().get()).isEqualTo("video/mp4"); - } - - @Test - void testVideoFilePart_withBytes() { - // A2A FilePart (Video) with base64 bytes to GenAI Part - FilePart videoPart = - new FilePart(new FileWithBytes("video/mp4", "test.mp4", SAMPLE_VIDEO_BASE64)); - - Optional genaiPart = PartConverter.toGenaiPart(videoPart); - - assertThat(genaiPart).isPresent(); - assertThat(genaiPart.get().inlineData()).isPresent(); - Blob blob = genaiPart.get().inlineData().get(); - assertThat(blob.mimeType()).isPresent(); - assertThat(blob.mimeType().get()).isEqualTo("video/mp4"); - } - - @Test - void testGenAIImagePart_toA2A() { - // GenAI Part (Image FileData) to A2A FilePart - Part genaiImagePart = - Part.builder() - .fileData( - FileData.builder() - .fileUri("https://example.com/image.jpg") - .mimeType("image/jpeg") - .displayName("photo.jpg") - .build()) - .build(); - - Optional> a2aPart = PartConverter.fromGenaiPart(genaiImagePart); - - assertThat(a2aPart).isPresent(); - assertThat(a2aPart.get()).isInstanceOf(FilePart.class); - FilePart filePart = (FilePart) a2aPart.get(); - assertThat(filePart.getFile()).isInstanceOf(FileWithUri.class); - FileWithUri fileWithUri = (FileWithUri) filePart.getFile(); - assertThat(fileWithUri.uri()).isEqualTo("https://example.com/image.jpg"); - assertThat(fileWithUri.mimeType()).isEqualTo("image/jpeg"); - } - - @Test - void testGenAIAudioPart_toA2A() { - // GenAI Part (Audio InlineData) to A2A FilePart - byte[] audioBytes = Base64.getDecoder().decode(SAMPLE_AUDIO_BASE64); - Part genaiAudioPart = - Part.builder() - .inlineData( - Blob.builder() - .data(audioBytes) - .mimeType("audio/wav") - .displayName("sound.wav") - .build()) - .build(); - - Optional> a2aPart = PartConverter.fromGenaiPart(genaiAudioPart); - - assertThat(a2aPart).isPresent(); - assertThat(a2aPart.get()).isInstanceOf(FilePart.class); - FilePart filePart = (FilePart) a2aPart.get(); - assertThat(filePart.getFile()).isInstanceOf(FileWithBytes.class); - FileWithBytes fileWithBytes = (FileWithBytes) filePart.getFile(); - assertThat(fileWithBytes.mimeType()).isEqualTo("audio/wav"); - assertThat(fileWithBytes.bytes()).isEqualTo(SAMPLE_AUDIO_BASE64); - } - - @Test - void testGenAIVideoPart_toA2A() { - // GenAI Part (Video FileData) to A2A FilePart - Part genaiVideoPart = - Part.builder() - .fileData( - FileData.builder() - .fileUri("https://example.com/video.mp4") - .mimeType("video/mp4") - .displayName("movie.mp4") - .build()) - .build(); - - Optional> a2aPart = PartConverter.fromGenaiPart(genaiVideoPart); - - assertThat(a2aPart).isPresent(); - assertThat(a2aPart.get()).isInstanceOf(FilePart.class); - FilePart filePart = (FilePart) a2aPart.get(); - assertThat(filePart.getFile()).isInstanceOf(FileWithUri.class); - FileWithUri fileWithUri = (FileWithUri) filePart.getFile(); - assertThat(fileWithUri.mimeType()).isEqualTo("video/mp4"); - } - - @Test - void testMultipleMediaTypes_inMessage() { - // Test that multiple media types can be converted together - FilePart imagePart = - new FilePart(new FileWithUri("image/jpeg", "photo.jpg", "https://example.com/photo.jpg")); - FilePart audioPart = - new FilePart(new FileWithUri("audio/mpeg", "sound.mp3", "https://example.com/sound.mp3")); - FilePart videoPart = - new FilePart(new FileWithUri("video/mp4", "movie.mp4", "https://example.com/movie.mp4")); - - Optional imageGenai = PartConverter.toGenaiPart(imagePart); - Optional audioGenai = PartConverter.toGenaiPart(audioPart); - Optional videoGenai = PartConverter.toGenaiPart(videoPart); - - assertThat(imageGenai).isPresent(); - assertThat(audioGenai).isPresent(); - assertThat(videoGenai).isPresent(); - - assertThat(imageGenai.get().fileData().get().mimeType().get()).isEqualTo("image/jpeg"); - assertThat(audioGenai.get().fileData().get().mimeType().get()).isEqualTo("audio/mpeg"); - assertThat(videoGenai.get().fileData().get().mimeType().get()).isEqualTo("video/mp4"); - } -} diff --git a/core/pom.xml b/core/pom.xml index 141e35898..157ee2dc8 100644 --- a/core/pom.xml +++ b/core/pom.xml @@ -274,13 +274,6 @@ future-converter-java8-guava 1.2.0 - - - com.google.adk - google-adk-a2a - ${project.version} - true - diff --git a/core/src/main/java/com/google/adk/agents/LlmAgent.java b/core/src/main/java/com/google/adk/agents/LlmAgent.java index c228c7bea..444985971 100644 --- a/core/src/main/java/com/google/adk/agents/LlmAgent.java +++ b/core/src/main/java/com/google/adk/agents/LlmAgent.java @@ -64,7 +64,6 @@ import io.reactivex.rxjava3.core.Flowable; import io.reactivex.rxjava3.core.Maybe; import io.reactivex.rxjava3.core.Single; -import java.io.IOException; import java.util.ArrayList; import java.util.List; import java.util.Map; @@ -651,107 +650,6 @@ public LlmAgent build() { validate(); return new LlmAgent(this); } - - /** - * Builds the agent and starts it as an A2A server on the default port (8080). - * - *

This method requires the {@code google-adk-a2a} module to be on the classpath. If the A2A - * module is not available, this will throw a {@link NoClassDefFoundError}. - * - *

Example: - * - *

{@code
-     * LlmAgent.builder()
-     *     .name("MyAgent")
-     *     .model("gemini-2.0-flash-exp")
-     *     .instruction("You are helpful")
-     *     .toA2aServerAndStart();
-     * }
- * - * @return The started A2aServer instance - * @throws NoClassDefFoundError if the A2A module is not on the classpath - * @throws IOException if the server fails to start - * @throws InterruptedException if interrupted while starting - */ - /** - * Builds the agent and starts it as an A2A server on the default port (8080). This method - * blocks until the server is terminated. - * - *

This method requires the {@code google-adk-a2a} module to be on the classpath. If the A2A - * module is not available, this will throw a {@link NoClassDefFoundError}. - * - *

Example: - * - *

{@code
-     * LlmAgent.builder()
-     *     .name("MyAgent")
-     *     .model("gemini-2.0-flash-exp")
-     *     .instruction("You are helpful")
-     *     .toA2aServerAndStart();
-     * }
- * - * @throws NoClassDefFoundError if the A2A module is not on the classpath - * @throws IOException if the server fails to start - * @throws InterruptedException if interrupted while starting - */ - public void toA2aServerAndStart() throws IOException, InterruptedException { - toA2aServerAndStart(8080); - } - - /** - * Builds the agent and starts it as an A2A server on the specified port. This method blocks - * until the server is terminated. - * - *

This method requires the {@code google-adk-a2a} module to be on the classpath. If the A2A - * module is not available, this will throw a {@link NoClassDefFoundError}. - * - *

Example: - * - *

{@code
-     * LlmAgent.builder()
-     *     .name("MyAgent")
-     *     .model("gemini-2.0-flash-exp")
-     *     .instruction("You are helpful")
-     *     .toA2aServerAndStart(5066);
-     * }
- * - * @param port The port to start the server on - * @throws NoClassDefFoundError if the A2A module is not on the classpath - * @throws IOException if the server fails to start - * @throws InterruptedException if interrupted while starting - */ - public void toA2aServerAndStart(int port) throws IOException, InterruptedException { - LlmAgent agent = build(); - new com.google.adk.a2a.grpc.A2aServerBuilder(agent).port(port).build().start(); - } - - /** - * Returns an A2aServerBuilder for advanced configuration. - * - *

This method requires the {@code google-adk-a2a} module to be on the classpath. If the A2A - * module is not available, this will throw a {@link NoClassDefFoundError}. - * - *

Example: - * - *

{@code
-     * LlmAgent.builder()
-     *     .name("MyAgent")
-     *     .model("gemini-2.0-flash-exp")
-     *     .instruction("You are helpful")
-     *     .toA2a()
-     *     .port(5066)
-     *     .withRegistry(registryUrl)
-     *     .build()
-     *     .start();
-     * }
- * - * @return An A2aServerBuilder instance - * @throws NoClassDefFoundError if the A2A module is not on the classpath - */ - public com.google.adk.a2a.grpc.A2aServerBuilder toA2a() { - LlmAgent agent = build(); - return new com.google.adk.a2a.grpc.A2aServerBuilder(agent); - } } protected BaseLlmFlow determineLlmFlow() { @@ -1053,47 +951,6 @@ public Model resolvedModel() { return resolvedModel; } - /** - * Starts this agent as an A2A server on the default port (8080). This method blocks until the - * server is terminated. - * - *

This method requires the {@code google-adk-a2a} module to be on the classpath. - * - * @throws NoClassDefFoundError if the A2A module is not on the classpath - * @throws IOException if the server fails to start - * @throws InterruptedException if interrupted while starting - */ - public void toA2aServerAndStart() throws IOException, InterruptedException { - toA2aServerAndStart(8080); - } - - /** - * Starts this agent as an A2A server on the specified port. This method blocks until the server - * is terminated. - * - *

This method requires the {@code google-adk-a2a} module to be on the classpath. - * - * @param port The port to start the server on - * @throws NoClassDefFoundError if the A2A module is not on the classpath - * @throws IOException if the server fails to start - * @throws InterruptedException if interrupted while starting - */ - public void toA2aServerAndStart(int port) throws IOException, InterruptedException { - new com.google.adk.a2a.grpc.A2aServerBuilder(this).port(port).build().start(); - } - - /** - * Returns an A2aServerBuilder for advanced configuration of this agent. - * - *

This method requires the {@code google-adk-a2a} module to be on the classpath. - * - * @return An A2aServerBuilder instance - * @throws NoClassDefFoundError if the A2A module is not on the classpath - */ - public com.google.adk.a2a.grpc.A2aServerBuilder toA2a() { - return new com.google.adk.a2a.grpc.A2aServerBuilder(this); - } - /** * Resolves the model for this agent, checking first if it is defined locally, then searching * through ancestors. diff --git a/core/src/test/java/com/google/adk/agents/LlmAgentA2aTest.java b/core/src/test/java/com/google/adk/agents/LlmAgentA2aTest.java deleted file mode 100644 index 9c5c52227..000000000 --- a/core/src/test/java/com/google/adk/agents/LlmAgentA2aTest.java +++ /dev/null @@ -1,227 +0,0 @@ -/* - * Copyright 2025 Google LLC - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package com.google.adk.agents; - -import static com.google.adk.testing.TestUtils.createLlmResponse; -import static com.google.adk.testing.TestUtils.createTestAgentBuilder; -import static com.google.adk.testing.TestUtils.createTestLlm; -import static com.google.common.truth.Truth.assertThat; - -import com.google.adk.testing.TestLlm; -import com.google.genai.types.Content; -import com.google.genai.types.Part; -import java.io.IOException; -import org.junit.Test; -import org.junit.runner.RunWith; -import org.junit.runners.JUnit4; - -/** - * Unit tests for {@link LlmAgent} A2A fluent API methods. - * - *

Note: These tests require the {@code google-adk-a2a} module to be on the classpath. If the - * module is not available, tests will be skipped or throw {@link NoClassDefFoundError}. - */ -@RunWith(JUnit4.class) -public final class LlmAgentA2aTest { - - @Test - public void testBuilder_toA2a_returnsA2aServerBuilder() { - TestLlm testLlm = createTestLlm(createLlmResponse(Content.fromParts(Part.fromText("Test")))); - LlmAgent.Builder builder = createTestAgentBuilder(testLlm).name("TestAgent"); - - try { - Object result = builder.toA2a(); - assertThat(result).isNotNull(); - // Verify it's an A2aServerBuilder instance - assertThat(result.getClass().getName()).contains("A2aServerBuilder"); - } catch (NoClassDefFoundError e) { - // A2A module not available - skip test - System.out.println("Skipping A2A test - module not available: " + e.getMessage()); - } - } - - @Test - public void testBuilder_toA2a_buildsAgentFirst() { - TestLlm testLlm = createTestLlm(createLlmResponse(Content.fromParts(Part.fromText("Test")))); - LlmAgent.Builder builder = createTestAgentBuilder(testLlm).name("TestAgent"); - - try { - Object result = builder.toA2a(); - assertThat(result).isNotNull(); - // Verify builder can still be used after toA2a() - LlmAgent agent = builder.build(); - assertThat(agent.name()).isEqualTo("TestAgent"); - } catch (NoClassDefFoundError e) { - System.out.println("Skipping A2A test - module not available: " + e.getMessage()); - } - } - - @Test - public void testBuilder_toA2aServerAndStart_withPort() throws IOException, InterruptedException { - TestLlm testLlm = createTestLlm(createLlmResponse(Content.fromParts(Part.fromText("Test")))); - LlmAgent.Builder builder = createTestAgentBuilder(testLlm).name("TestAgent"); - - try { - // This will start a server and block, so we test it in a separate thread - Thread testThread = - new Thread( - () -> { - try { - builder.toA2aServerAndStart(0); // Use port 0 for auto-assignment - } catch (Exception e) { - // Expected - port 0 might not work, or server might start successfully - } - }); - testThread.start(); - Thread.sleep(100); // Give it a moment to start - testThread.interrupt(); // Stop the blocking call - testThread.join(1000); - } catch (NoClassDefFoundError e) { - System.out.println("Skipping A2A test - module not available: " + e.getMessage()); - } catch (IllegalArgumentException e) { - // Port 0 might not be valid - that's okay, we're just testing the method exists - assertThat(e.getMessage()).contains("port"); - } - } - - @Test - public void testBuilder_toA2aServerAndStart_defaultPort() - throws IOException, InterruptedException { - TestLlm testLlm = createTestLlm(createLlmResponse(Content.fromParts(Part.fromText("Test")))); - LlmAgent.Builder builder = createTestAgentBuilder(testLlm).name("TestAgent"); - - try { - // This will start a server and block, so we test it in a separate thread - Thread testThread = - new Thread( - () -> { - try { - builder.toA2aServerAndStart(); // Uses default port 8080 - } catch (Exception e) { - // Expected - port might be in use, or server might start successfully - } - }); - testThread.start(); - Thread.sleep(100); // Give it a moment to start - testThread.interrupt(); // Stop the blocking call - testThread.join(1000); - } catch (NoClassDefFoundError e) { - System.out.println("Skipping A2A test - module not available: " + e.getMessage()); - } - } - - @Test - public void testInstance_toA2a_returnsA2aServerBuilder() { - TestLlm testLlm = createTestLlm(createLlmResponse(Content.fromParts(Part.fromText("Test")))); - LlmAgent agent = createTestAgentBuilder(testLlm).name("TestAgent").build(); - - try { - Object result = agent.toA2a(); - assertThat(result).isNotNull(); - // Verify it's an A2aServerBuilder instance - assertThat(result.getClass().getName()).contains("A2aServerBuilder"); - } catch (NoClassDefFoundError e) { - System.out.println("Skipping A2A test - module not available: " + e.getMessage()); - } - } - - @Test - public void testInstance_toA2aServerAndStart_withPort() throws IOException, InterruptedException { - TestLlm testLlm = createTestLlm(createLlmResponse(Content.fromParts(Part.fromText("Test")))); - LlmAgent agent = createTestAgentBuilder(testLlm).name("TestAgent").build(); - - try { - // This will start a server and block, so we test it in a separate thread - Thread testThread = - new Thread( - () -> { - try { - agent.toA2aServerAndStart(0); // Use port 0 for auto-assignment - } catch (Exception e) { - // Expected - port 0 might not work, or server might start successfully - } - }); - testThread.start(); - Thread.sleep(100); // Give it a moment to start - testThread.interrupt(); // Stop the blocking call - testThread.join(1000); - } catch (NoClassDefFoundError e) { - System.out.println("Skipping A2A test - module not available: " + e.getMessage()); - } catch (IllegalArgumentException e) { - // Port 0 might not be valid - that's okay, we're just testing the method exists - assertThat(e.getMessage()).contains("port"); - } - } - - @Test - public void testInstance_toA2aServerAndStart_defaultPort() - throws IOException, InterruptedException { - TestLlm testLlm = createTestLlm(createLlmResponse(Content.fromParts(Part.fromText("Test")))); - LlmAgent agent = createTestAgentBuilder(testLlm).name("TestAgent").build(); - - try { - // This will start a server and block, so we test it in a separate thread - Thread testThread = - new Thread( - () -> { - try { - agent.toA2aServerAndStart(); // Uses default port 8080 - } catch (Exception e) { - // Expected - port might be in use, or server might start successfully - } - }); - testThread.start(); - Thread.sleep(100); // Give it a moment to start - testThread.interrupt(); // Stop the blocking call - testThread.join(1000); - } catch (NoClassDefFoundError e) { - System.out.println("Skipping A2A test - module not available: " + e.getMessage()); - } - } - - @Test - public void testBuilder_toA2a_throwsNoClassDefFoundError_whenA2aNotAvailable() { - // This test verifies that NoClassDefFoundError is thrown when A2A module is not available - // In practice, if A2A is not on classpath, the method will throw NoClassDefFoundError - // We can't easily simulate this without removing the dependency, so we just verify - // the method exists and can be called when A2A is available - TestLlm testLlm = createTestLlm(createLlmResponse(Content.fromParts(Part.fromText("Test")))); - LlmAgent.Builder builder = createTestAgentBuilder(testLlm).name("TestAgent"); - - try { - builder.toA2a(); - // If we get here, A2A is available - test passes - } catch (NoClassDefFoundError e) { - // Expected if A2A module not available - assertThat(e.getMessage()).contains("A2aServerBuilder"); - } - } - - @Test - public void testInstance_toA2a_throwsNoClassDefFoundError_whenA2aNotAvailable() { - TestLlm testLlm = createTestLlm(createLlmResponse(Content.fromParts(Part.fromText("Test")))); - LlmAgent agent = createTestAgentBuilder(testLlm).name("TestAgent").build(); - - try { - agent.toA2a(); - // If we get here, A2A is available - test passes - } catch (NoClassDefFoundError e) { - // Expected if A2A module not available - assertThat(e.getMessage()).contains("A2aServerBuilder"); - } - } -}