converted = new ArrayList<>();
-
- // Convert to A2A message
- Message a2aMessage =
- ResponseConverter.eventToMessage(adkEvent, contextId);
- if (a2aMessage != null && !a2aMessage.getParts().isEmpty()) {
- // Create artifact update event
- Artifact artifact =
- new Artifact.Builder()
- .artifactId(UUID.randomUUID().toString())
- .parts(a2aMessage.getParts())
- .build();
- TaskArtifactUpdateEvent artifactEvent =
- new TaskArtifactUpdateEvent.Builder()
- .taskId(taskId)
- .contextId(contextId)
- .artifact(artifact)
- .lastChunk(false)
- .build();
- converted.add(artifactEvent);
- }
-
- return Flowable.fromIterable(converted);
- })
- .onErrorResumeNext(
- error -> {
- logger.error("Error converting ADK event to A2A event", error);
- TaskStatusUpdateEvent errorEvent =
- new TaskStatusUpdateEvent(
- taskId,
- new TaskStatus(
- TaskState.FAILED,
- new Message.Builder()
- .messageId(UUID.randomUUID().toString())
- .role(Message.Role.AGENT)
- .parts(
- ImmutableList.of(
- new io.a2a.spec.TextPart(
- "Error: " + error.getMessage())))
- .build(),
- null),
- contextId,
- true,
- null);
- return Flowable.just(errorEvent);
- });
-
- // 8. Create final completion events
- TaskArtifactUpdateEvent finalArtifact =
- new TaskArtifactUpdateEvent.Builder()
- .taskId(taskId)
- .contextId(contextId)
- .artifact(
- new Artifact.Builder()
- .artifactId(UUID.randomUUID().toString())
- .parts(ImmutableList.of())
- .build())
- .lastChunk(true)
- .build();
-
- TaskStatusUpdateEvent completedEvent =
- new TaskStatusUpdateEvent(
- taskId,
- new TaskStatus(TaskState.COMPLETED, null, null),
- contextId,
- true,
- null);
-
- // Combine all events in sequence: submitted → working → agent events → completion
- return Flowable.just((io.a2a.spec.Event) submittedEvent)
- .concatWith(Flowable.just((io.a2a.spec.Event) workingEvent))
- .concatWith(a2aEvents)
- .concatWith(Flowable.just((io.a2a.spec.Event) finalArtifact))
- .concatWith(Flowable.just((io.a2a.spec.Event) completedEvent));
- })
- .onErrorResumeNext(
- error -> {
- logger.error("Error executing A2A request", error);
- TaskStatusUpdateEvent errorEvent =
- new TaskStatusUpdateEvent(
- taskId,
- new TaskStatus(
- TaskState.FAILED,
- new Message.Builder()
- .messageId(UUID.randomUUID().toString())
- .role(Message.Role.AGENT)
- .parts(
- ImmutableList.of(
- new io.a2a.spec.TextPart("Error: " + error.getMessage())))
- .build(),
- null),
- contextId,
- true,
- null);
- return Flowable.just(errorEvent);
- });
- }
-}
diff --git a/a2a/src/main/java/com/google/adk/a2a/grpc/A2aGrpcServer.java b/a2a/src/main/java/com/google/adk/a2a/grpc/A2aGrpcServer.java
deleted file mode 100644
index 48950818a..000000000
--- a/a2a/src/main/java/com/google/adk/a2a/grpc/A2aGrpcServer.java
+++ /dev/null
@@ -1,115 +0,0 @@
-/** Author: Sandeep Belgavi Date: January 17, 2026 */
-package com.google.adk.a2a.grpc;
-
-import com.google.adk.agents.BaseAgent;
-import java.io.IOException;
-import java.net.URL;
-
-/**
- * A utility class to easily expose an ADK BaseAgent as a standalone A2A gRPC server. This class
- * provides a Python-like {@code to_a2a} experience for Java developers.
- *
- * This utility abstracts away the boilerplate of creating a gRPC server, allowing developers to
- * expose their agents with minimal code.
- *
- *
Example usage:
- *
- *
{@code
- * import com.google.adk.agents.LlmAgent;
- * import com.google.adk.a2a.grpc.A2aGrpcServer;
- *
- * public class Main {
- * public static void main(String[] args) {
- * BaseAgent agent = new LlmAgent(...);
- * // This single line starts the entire gRPC A2A server!
- * A2aGrpcServer.run(agent, args);
- * }
- * }
- * }
- *
- * Note: This assumes the gRPC service definition (`.proto` file) and generated classes
- * ({@code A2AServiceGrpc}, {@code SendMessageRequest}, {@code SendMessageResponse}) are available
- * in the classpath through the Maven {@code protobuf-maven-plugin} configuration.
- */
-public final class A2aGrpcServer {
-
- private A2aGrpcServer() {
- // Utility class - prevent instantiation
- }
-
- /**
- * Starts an A2A gRPC server for the given agent instance. This method creates and starts a gRPC
- * server on the default port (8080).
- *
- *
The server will run until the JVM is shut down. A shutdown hook is automatically registered
- * to ensure graceful shutdown.
- *
- * @param agent The ADK agent to expose as a gRPC service.
- * @throws IOException If the server fails to start.
- * @throws InterruptedException If the server is interrupted while waiting for termination.
- */
- public static void run(BaseAgent agent) throws IOException, InterruptedException {
- run(agent, 8080);
- }
-
- /**
- * Starts an A2A gRPC server for the given agent instance on the specified port.
- *
- *
The server will run until the JVM is shut down. A shutdown hook is automatically registered
- * to ensure graceful shutdown.
- *
- * @param agent The ADK agent to expose as a gRPC service.
- * @param port The port number on which the server should listen.
- * @throws IOException If the server fails to start.
- * @throws InterruptedException If the server is interrupted while waiting for termination.
- */
- public static void run(BaseAgent agent, int port) throws IOException, InterruptedException {
- run(agent, port, null);
- }
-
- /**
- * Starts an A2A gRPC server for the given agent instance with optional registry integration.
- *
- *
The server will run until the JVM is shut down. A shutdown hook is automatically registered
- * to ensure graceful shutdown.
- *
- * @param agent The ADK agent to expose as a gRPC service.
- * @param port The port number on which the server should listen.
- * @param registryUrl Optional URL of the service registry. If provided, the server will register
- * itself with the registry upon startup and unregister upon shutdown.
- * @throws IOException If the server fails to start.
- * @throws InterruptedException If the server is interrupted while waiting for termination.
- */
- public static void run(BaseAgent agent, int port, URL registryUrl)
- throws IOException, InterruptedException {
- if (agent == null) {
- throw new IllegalArgumentException("Agent cannot be null");
- }
-
- A2aServer server = new A2aServerBuilder(agent).port(port).withRegistry(registryUrl).build();
-
- // Start the server and block until termination
- server.start();
- }
-
- /**
- * Creates and returns an {@link A2aServerBuilder} for advanced configuration. This allows for
- * more fine-grained control over the server setup.
- *
- *
Example:
- *
- *
{@code
- * A2aServer server = A2aGrpcServer.builder(agent)
- * .port(9090)
- * .withRegistry(new URL("http://localhost:8081"))
- * .build();
- * server.start();
- * }
- *
- * @param agent The ADK agent to expose as a gRPC service.
- * @return A new {@link A2aServerBuilder} instance.
- */
- public static A2aServerBuilder builder(BaseAgent agent) {
- return new A2aServerBuilder(agent);
- }
-}
diff --git a/a2a/src/main/java/com/google/adk/a2a/grpc/A2aServer.java b/a2a/src/main/java/com/google/adk/a2a/grpc/A2aServer.java
deleted file mode 100644
index dc58b6966..000000000
--- a/a2a/src/main/java/com/google/adk/a2a/grpc/A2aServer.java
+++ /dev/null
@@ -1,195 +0,0 @@
-/** Author: Sandeep Belgavi Date: January 16, 2026 */
-package com.google.adk.a2a.grpc;
-
-import com.google.gson.Gson;
-import io.grpc.Server;
-import java.io.IOException;
-import java.net.URI;
-import java.net.URL;
-import java.net.http.HttpClient;
-import java.net.http.HttpRequest;
-import java.net.http.HttpResponse;
-import java.util.concurrent.TimeUnit;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/** Manages the lifecycle of a standalone A2A gRPC server. */
-public class A2aServer {
-
- private static final Logger logger = LoggerFactory.getLogger(A2aServer.class);
- private static final Gson gson = new Gson();
-
- private final Server server;
- private final URL registryUrl;
- private final AgentInfo agentInfo;
- private final HttpClient httpClient;
-
- /**
- * Constructs a new server instance.
- *
- * @param server The underlying gRPC {@link Server} to manage.
- * @param registryUrl The URL of the service registry.
- * @param httpClient The HTTP client to use for registry communication.
- */
- public A2aServer(Server server, URL registryUrl, HttpClient httpClient, int port) {
- this.server = server;
- this.registryUrl = registryUrl;
- this.agentInfo = new AgentInfo(port);
- this.httpClient = httpClient;
- }
-
- /**
- * Starts the gRPC server and blocks the current thread until the server is terminated.
- *
- * @throws IOException If the server fails to start.
- * @throws InterruptedException If the server is interrupted while waiting for termination.
- */
- public void start() throws IOException, InterruptedException {
- start(true);
- }
-
- /**
- * Starts the gRPC server.
- *
- * @param awaitTermination If true, blocks the current thread until the server is terminated.
- * @throws IOException If the server fails to start.
- * @throws InterruptedException If the server is interrupted while waiting for termination.
- */
- public void start(boolean awaitTermination) throws IOException, InterruptedException {
- server.start();
- logger.info("A2A gRPC server started, listening on port " + server.getPort());
-
- if (registryUrl != null) {
- register();
- }
-
- // Add a shutdown hook to ensure a graceful server shutdown
- Runtime.getRuntime()
- .addShutdownHook(
- new Thread(
- () -> {
- logger.info("Shutting down gRPC server since JVM is shutting down");
- try {
- A2aServer.this.stop();
- } catch (InterruptedException e) {
- logger.error("gRPC server shutdown interrupted", e);
- Thread.currentThread().interrupt(); // Preserve the interrupted status
- }
- logger.info("Server shut down");
- }));
-
- // Block until the server is terminated
- if (awaitTermination) {
- server.awaitTermination();
- }
- }
-
- /**
- * Gets the port on which the server is listening.
- *
- * @return The port number.
- */
- public int getPort() {
- return server.getPort();
- }
-
- /**
- * Stops the gRPC server gracefully.
- *
- * @throws InterruptedException If the server is interrupted while shutting down.
- */
- public void stop() throws InterruptedException {
- if (registryUrl != null) {
- unregister();
- }
- if (server != null) {
- server.shutdown().awaitTermination(30, TimeUnit.SECONDS);
- }
- }
-
- private void register() {
- try {
- HttpRequest request =
- HttpRequest.newBuilder()
- .uri(URI.create(registryUrl + "/register"))
- .header("Content-Type", "application/json")
- .POST(HttpRequest.BodyPublishers.ofString(gson.toJson(agentInfo)))
- .build();
-
- httpClient
- .sendAsync(request, HttpResponse.BodyHandlers.ofString())
- .whenComplete(
- (response, throwable) -> {
- if (throwable != null) {
- logger.error("Failed to register with registry service", throwable);
- return;
- }
- if (response.statusCode() >= 200 && response.statusCode() < 300) {
- logger.info(
- "Successfully registered with registry service. Status: {}, Body: {}",
- response.statusCode(),
- response.body());
- } else {
- logger.warn(
- "Failed to register with registry service. Status: {}, Body: {}",
- response.statusCode(),
- response.body());
- }
- });
- } catch (Exception e) {
- logger.error("Error building registry request", e);
- }
- }
-
- private void unregister() {
- try {
- HttpRequest request =
- HttpRequest.newBuilder()
- .uri(URI.create(registryUrl + "/unregister"))
- .header("Content-Type", "application/json")
- .POST(HttpRequest.BodyPublishers.ofString(gson.toJson(agentInfo)))
- .build();
-
- httpClient
- .sendAsync(request, HttpResponse.BodyHandlers.ofString())
- .whenComplete(
- (response, throwable) -> {
- if (throwable != null) {
- logger.error("Failed to unregister from registry service", throwable);
- return;
- }
- if (response.statusCode() >= 200 && response.statusCode() < 300) {
- logger.info(
- "Successfully unregistered from registry service. Status: {}, Body: {}",
- response.statusCode(),
- response.body());
- } else {
- logger.warn(
- "Failed to unregister from registry service. Status: {}, Body: {}",
- response.statusCode(),
- response.body());
- }
- });
- } catch (Exception e) {
- logger.error("Error building unregister request", e);
- }
- }
-
- private static class AgentInfo {
- private final String name;
- private final String url;
-
- AgentInfo(int port) {
- this.name = "agent-" + port;
- this.url = "http://localhost:" + port;
- }
-
- public String getName() {
- return name;
- }
-
- public String getUrl() {
- return url;
- }
- }
-}
diff --git a/a2a/src/main/java/com/google/adk/a2a/grpc/A2aServerBuilder.java b/a2a/src/main/java/com/google/adk/a2a/grpc/A2aServerBuilder.java
deleted file mode 100644
index 2df7d126e..000000000
--- a/a2a/src/main/java/com/google/adk/a2a/grpc/A2aServerBuilder.java
+++ /dev/null
@@ -1,78 +0,0 @@
-/** Author: Sandeep Belgavi Date: January 16, 2026 */
-package com.google.adk.a2a.grpc;
-
-import com.google.adk.agents.BaseAgent;
-import io.grpc.Server;
-import io.grpc.ServerBuilder;
-import java.net.URL;
-import java.net.http.HttpClient;
-
-/** A builder for creating a lightweight, standalone A2A gRPC server. */
-public class A2aServerBuilder {
-
- private final BaseAgent agent;
- private int port = 8080; // Default port
- private URL registryUrl;
- private HttpClient httpClient;
-
- /**
- * Constructs a new builder for a given ADK agent.
- *
- * @param agent The ADK {@link BaseAgent} to be exposed via the gRPC service.
- */
- public A2aServerBuilder(BaseAgent agent) {
- if (agent == null) {
- throw new IllegalArgumentException("Agent cannot be null");
- }
- this.agent = agent;
- }
-
- /**
- * Sets the port on which the server should listen.
- *
- * @param port The port number.
- * @return This builder instance for chaining.
- */
- public A2aServerBuilder port(int port) {
- if (port <= 0 || port > 65535) {
- throw new IllegalArgumentException("Port must be between 1 and 65535");
- }
- this.port = port;
- return this;
- }
-
- /**
- * Sets the URL of the A2A service registry. If set, the server will attempt to register itself
- * with the registry upon startup.
- *
- * @param registryUrl The URL of the service registry.
- * @return This builder instance for chaining.
- */
- public A2aServerBuilder withRegistry(URL registryUrl) {
- this.registryUrl = registryUrl;
- return this;
- }
-
- /**
- * Sets the {@link HttpClient} to be used for registry communication. If not set, a default client
- * will be created.
- *
- * @param httpClient The HTTP client to use.
- * @return This builder instance for chaining.
- */
- public A2aServerBuilder httpClient(HttpClient httpClient) {
- this.httpClient = httpClient;
- return this;
- }
-
- /**
- * Builds the {@link A2aServer} instance.
- *
- * @return A new {@link A2aServer} configured with the settings from this builder.
- */
- public A2aServer build() {
- Server grpcServer = ServerBuilder.forPort(port).addService(new A2aService(agent)).build();
- HttpClient client = (httpClient != null) ? httpClient : HttpClient.newHttpClient();
- return new A2aServer(grpcServer, registryUrl, client, this.port);
- }
-}
diff --git a/a2a/src/main/java/com/google/adk/a2a/grpc/A2aService.java b/a2a/src/main/java/com/google/adk/a2a/grpc/A2aService.java
deleted file mode 100644
index d49466b23..000000000
--- a/a2a/src/main/java/com/google/adk/a2a/grpc/A2aService.java
+++ /dev/null
@@ -1,246 +0,0 @@
-/** Author: Sandeep Belgavi Date: January 18, 2026 */
-package com.google.adk.a2a.grpc;
-
-import com.google.adk.agents.BaseAgent;
-import com.google.adk.agents.RunConfig;
-import com.google.adk.artifacts.InMemoryArtifactService;
-import com.google.adk.events.Event;
-import com.google.adk.memory.InMemoryMemoryService;
-import com.google.adk.runner.Runner;
-import com.google.adk.sessions.InMemorySessionService;
-import com.google.adk.sessions.Session;
-import com.google.genai.types.Content;
-import com.google.genai.types.Part;
-import io.grpc.stub.StreamObserver;
-import io.reactivex.rxjava3.core.Flowable;
-import io.reactivex.rxjava3.core.Maybe;
-import java.time.LocalDate;
-import java.util.Map;
-import java.util.Optional;
-import java.util.UUID;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * Implements the A2A gRPC service, bridging requests to an ADK agent.
- *
- * This service uses a Runner internally to properly handle agent execution with session
- * management, artifacts, and memory services.
- */
-class A2aService extends A2AServiceGrpc.A2AServiceImplBase {
-
- private static final Logger logger = LoggerFactory.getLogger(A2aService.class);
- private final BaseAgent agent;
- private final Runner runner;
- private static final String DEFAULT_APP_NAME = "adk-a2a-server";
-
- /**
- * Constructs the service with a given ADK agent.
- *
- *
This constructor creates an internal Runner with in-memory services for sessions, artifacts,
- * and memory. For production use, consider using a constructor that accepts a pre-configured
- * Runner.
- *
- * @param agent The {@link BaseAgent} to handle the requests.
- */
- A2aService(BaseAgent agent) {
- this.agent = agent;
- // Create a Runner with in-memory services for simplicity
- // In production, this could be configured with persistent services
- this.runner =
- new Runner.Builder()
- .agent(agent)
- .appName(DEFAULT_APP_NAME)
- .artifactService(new InMemoryArtifactService())
- .sessionService(new InMemorySessionService())
- .memoryService(new InMemoryMemoryService())
- .build();
- }
-
- /**
- * Constructs the service with a pre-configured Runner.
- *
- * @param agent The {@link BaseAgent} to handle the requests.
- * @param runner The {@link Runner} instance to use for agent execution.
- */
- A2aService(BaseAgent agent, Runner runner) {
- this.agent = agent;
- this.runner = runner;
- }
-
- @Override
- public void sendMessage(
- SendMessageRequest request, StreamObserver responseObserver) {
- String userQuery = request.getUserQuery();
- String sessionId =
- request.getSessionId() != null && !request.getSessionId().isEmpty()
- ? request.getSessionId()
- : "new-session";
-
- // Console output for validation
- System.out.println("\n" + "=".repeat(80));
- System.out.println("🔵 A2A REQUEST RECEIVED");
- System.out.println("=".repeat(80));
- System.out.println("Session ID: " + sessionId);
- System.out.println("Agent: " + agent.name());
- System.out.println("Query: " + userQuery);
- System.out.println("=".repeat(80) + "\n");
-
- logger.info("Received message from client: sessionId={}, query={}", sessionId, userQuery);
-
- try {
- // Extract session ID from request or generate a new one
- String userId = "default-user";
- final String actualSessionId =
- (sessionId == null || sessionId.equals("new-session"))
- ? UUID.randomUUID().toString()
- : sessionId;
-
- // Create user content from the request
- Content userContent = Content.fromParts(Part.fromText(request.getUserQuery()));
-
- // Get or create session - Runner requires session to exist
- Maybe maybeSession =
- runner
- .sessionService()
- .getSession(runner.appName(), userId, actualSessionId, Optional.empty());
-
- Session session =
- maybeSession
- .switchIfEmpty(
- runner
- .sessionService()
- .createSession(runner.appName(), userId, null, null)
- .toMaybe())
- .blockingGet();
-
- // Initialize session state with default values if missing
- // This prevents errors when agent instruction references state variables
- // The state map is mutable, so we can update it directly
- Map sessionState = session.state();
- if (sessionState.isEmpty() || !sessionState.containsKey("currentDate")) {
- sessionState.put("currentDate", LocalDate.now().toString());
- sessionState.put("sourceCityName", "");
- sessionState.put("destinationCityName", "");
- sessionState.put("dateOfJourney", "");
- sessionState.put("mriSessionId", actualSessionId);
- sessionState.put("userMsg", request.getUserQuery());
- // Track A2A handovers (using _temp prefix for non-persistent tracking)
- sessionState.put("_temp_a2aCallCount", 0);
- sessionState.put("_temp_a2aCalls", new java.util.ArrayList