Replies: 1 comment
-
|
Response from ADK Answering Agent (experimental, answer may be inaccurate) TLDR: The recommended approach is to maintain a shared dictionary at the application level that maps a unique identifier (like a Hello! That's an excellent question about how to architect communication between different agent types in ADK. Here’s a breakdown of the recommended approach. The Challenge: Scoped vs. Shared StateAs you've correctly identified, the ADK doesn't provide a built-in function to directly look up a The Solution: A Shared Queue ManagerYou can implement a simple in-memory "manager" (using a Python dictionary) to keep track of the active Here is a step-by-step guide: Step 1: Create a Shared Dictionary In your main application file, create a global dictionary to store the queues. from fastapi import FastAPI
from adk.live import LiveRequestQueue
app = FastAPI()
# A dictionary to map a user_id to their active LiveRequestQueue
active_live_queues: dict[str, LiveRequestQueue] = {}Step 2: Manage the Queue in Your WebSocket Endpoint In your WebSocket handler, add the user's queue to the dictionary when they connect and, most importantly, remove it when they disconnect. Using a @app.websocket("/ws/{user_id}")
async def websocket_endpoint(websocket: WebSocket, user_id: str):
# ...
live_request_queue = LiveRequestQueue()
active_live_queues[user_id] = live_request_queue
try:
# The run_live method will block here until the connection is closed
await runner.run_live(..., live_request_queue=live_request_queue)
finally:
# Once the connection is closed, remove the queue from the dictionary
del active_live_queues[user_id]
print(f"Cleaned up queue for user {user_id}")Step 3: Access the Queue from Your HTTP Endpoint Now, your from adk.api import types
@app.post("/trigger")
async def trigger_workflow(user_id: str, message: str):
# 1. Trigger your RootAgent's async workflow
# runner.run_async(...)
# ...imagine the workflow runs and produces a result...
# 2. Inform the LiveAgent about the result
if user_id in active_live_queues:
live_queue = active_live_queues[user_id]
# Create the content to send
content = types.Content(parts=[types.Part(text=f"An update from the workflow: {message}")])
# Send the content to the LiveAgent
live_queue.send_content(content)
return {"status": "Message sent to LiveAgent"}
else:
# Handle the case where the user is not connected via WebSocket
return {"status": f"User {user_id} not connected"}This pattern ensures that you can safely access the communication channel to the For more details you can check out the "Web Application" section in the ADK documentation, which describes a similar mechanism for sending events to a live agent. [1] [1] https://google.github.io/adk-docs/streaming/custom-streaming/ |
Beta Was this translation helpful? Give feedback.
Uh oh!
There was an error while loading. Please reload this page.
Uh oh!
There was an error while loading. Please reload this page.
-
I’m working with Google ADK using FastAPI, and I'm combining two types of agents in the same application:
CustomAgent, which orchestrates a workflow of multiple regular agents (run_asyncpath).The LiveAgent is triggered inside a WebSocket endpoint following the ADK documentation:
This works correctly, and I am able to talk to the live agent through audio streaming.
The RootAgent workflow can be triggered from an HTTP endpoint
/trigger, where I call:During this workflow, some agents produce results that I would like to send to the LiveAgent so that the live agent stays up-to-date and can talk about those results to the end-user.
ADK allows sending messages to a live agent via:
However, the live_request_queue only exists inside the WebSocket handler, and I cannot access it from the
/triggerendpoint or from inside the workflow.Beta Was this translation helpful? Give feedback.
All reactions