feat(realtime): reuse realtime session across agent handoffs if supported#5229
feat(realtime): reuse realtime session across agent handoffs if supported#5229
Conversation
1b817a3 to
cc1c8de
Compare
chenghao-mou
left a comment
There was a problem hiding this comment.
I think it is hitting an issue with funtion calls when the handoff happens before the tool call result is added:
11:45:11.071 DEBUG livekit.agents executing tool
{"function": "transfer_to_weather", "arguments": "{}", "speech_id": "speech_a183e40b8c57", "room":
"console"}
11:45:11.074 INFO basic-agent handing off to WeatherAgent {"room": "console"}
11:45:11.961 INFO livekit.agents RealtimeModel metrics
{"model_name": "gpt-realtime", "model_provider": "api.openai.com", "ttft": -1.0, "input_tokens": 219,
"cached_input_tokens": 0, "input_text_tokens": 113, "input_cached_text_tokens": 0, "input_image_tokens": 0, "input_cached_image_tokens": 0,
"input_audio_tokens": 106, "input_cached_audio_tokens": 0, "output_tokens": 12, "output_text_tokens": 12, "output_audio_tokens": 0,
"output_image_tokens": 0, "total_tokens": 231, "tokens_per_second": 9.8, "room": "console"}
11:45:11.964 DEBUG livekit.agents tools execution completed {"speech_id": "speech_a183e40b8c57", "room": "console"}
11:45:16.973 WARNING livekit.agents failed to update chat context before generating the function calls results
{"error": "update_chat_ctx timed out.", "room": "console"}
11:45:16.977 DEBUG livekit.agents reusing realtime session from previous activity {"room": "console"}
11:45:17.490 ERROR livekit.…ns.openai failed to handle event
{"event": {"type": "conversation.item.added", "event_id": "event_DRVBjFMStnIRUZry4pSYE",
"previous_item_id": "item_DRVBe7B7O7BGvRaTMEa50", "item": {"id": "item_e2ca6d129cd5", "type": "function_call_output", "call_id":
"call_9UoYr1iIQM8DltFx", "output": ""}}, "room": "console"}
Traceback (most recent call last):
File "/Users/chenghao/Developer/agents-review/livekit-plugins/livekit-plugins-openai/livekit/plugins/openai/realtime/realtime_model.py",
line 1023, in _recv_task
self._handle_conversion_item_added(ConversationItemAdded.construct(**event))
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/Users/chenghao/Developer/agents-review/livekit-plugins/livekit-plugins-openai/livekit/plugins/openai/realtime/realtime_model.py",
line 1675, in _handle_conversion_item_added
fut.set_result(None)
~~~~~~~~~~~~~~^^^^^^
asyncio.exceptions.InvalidStateError: invalid state
Code
import logging
from dotenv import load_dotenv
from livekit.agents import (
Agent,
AgentServer,
AgentSession,
JobContext,
JobProcess,
MetricsCollectedEvent,
RunContext,
cli,
metrics,
)
from livekit.agents.llm import function_tool
from livekit.plugins import openai, silero
logger = logging.getLogger("basic-agent")
load_dotenv("../agents/.env")
class GreeterAgent(Agent):
def __init__(self) -> None:
super().__init__(
instructions=(
"You are a friendly greeter named Kelly. "
"Keep responses concise. No emojis or markdown. "
"Your job is to greet the user and ask what they need help with. "
"When the user asks about weather, use the transfer_to_weather tool."
),
)
async def on_enter(self) -> None:
logger.info("GreeterAgent entered")
self.session.generate_reply(instructions="greet the user and ask how you can help")
@function_tool
async def transfer_to_weather(self, context: RunContext) -> Agent:
"""Transfer to the weather agent when the user asks about weather."""
logger.info("handing off to WeatherAgent")
return WeatherAgent()
class WeatherAgent(Agent):
def __init__(self) -> None:
super().__init__(
instructions=(
"You are a weather specialist named Sunny. "
"Keep responses concise. No emojis or markdown. "
"Help the user with weather questions. "
"When the user wants to talk about something else, use transfer_to_greeter."
),
)
async def on_enter(self) -> None:
logger.info("WeatherAgent entered")
self.session.generate_reply(
instructions="introduce yourself as the weather specialist and ask what city"
)
@function_tool
async def lookup_weather(self, context: RunContext, location: str) -> str:
"""Called when the user asks for weather information.
Args:
location: The city or region to check weather for
"""
logger.info(f"Looking up weather for {location}")
return f"The weather in {location} is sunny, 70 degrees Fahrenheit."
@function_tool
async def transfer_to_greeter(self, context: RunContext) -> Agent:
"""Transfer back to the greeter when the user is done with weather."""
logger.info("handing off back to GreeterAgent")
return GreeterAgent()
server = AgentServer()
def prewarm(proc: JobProcess) -> None:
proc.userdata["vad"] = silero.VAD.load()
server.setup_fnc = prewarm
@server.rtc_session()
async def entrypoint(ctx: JobContext) -> None:
ctx.log_context_fields = {"room": ctx.room.name}
# shared realtime model -- both agents inherit this from the session,
# so `self.llm is new_activity.llm` is True and the RT session is reused across handoffs
rt_model = openai.realtime.RealtimeModel(voice="echo")
session: AgentSession = AgentSession(
llm=rt_model,
vad=ctx.proc.userdata["vad"],
)
@session.on("metrics_collected")
def _on_metrics_collected(ev: MetricsCollectedEvent) -> None:
metrics.log_metrics(ev.metrics)
async def log_usage():
logger.info(f"Usage: {session.usage}")
ctx.add_shutdown_callback(log_usage)
await session.start(
agent=GreeterAgent(),
room=ctx.room,
)
if __name__ == "__main__":
cli.run_app(server)…n-across-handoffs
| """Whether the instructions can be updated mid-session""" | ||
| mid_session_tools_update: bool = False | ||
| """Whether the tools can be updated mid-session""" | ||
| per_response_tool_choice: bool = False |
There was a problem hiding this comment.
IMO we should find a better name for per_response_tool_choice
There was a problem hiding this comment.
How about mutable_*: mutable_chat_context, mutable_instructions, mutable_tools, and mutable_tool_choice?
| async def update_session( | ||
| self, | ||
| *, | ||
| instructions: NotGivenOr[str] = NOT_GIVEN, | ||
| chat_ctx: NotGivenOr[ChatContext] = NOT_GIVEN, | ||
| tools: NotGivenOr[list[Tool]] = NOT_GIVEN, | ||
| ) -> None: | ||
|
|
||
| if is_given(instructions): | ||
| try: | ||
| await self.update_instructions(instructions) | ||
| except RealtimeError: | ||
| logger.exception("failed to update the instructions") | ||
|
|
||
| if is_given(chat_ctx): | ||
| try: | ||
| await self.update_chat_ctx(chat_ctx) | ||
| except RealtimeError: | ||
| logger.exception("failed to update the chat_ctx") | ||
|
|
||
| if is_given(tools): | ||
| try: | ||
| await self.update_tools(tools) | ||
| except RealtimeError: | ||
| logger.exception("failed to update the tools") | ||
|
|
There was a problem hiding this comment.
Any advantage of having this utility?
There was a problem hiding this comment.
It was asked by @tinalenguyen in #5303, I guess it's some plugins can overwrite this method to customize the init process, like send all configs in a single event.
@chenghao-mou I think the issue was the timeout of update chat ctx, after the timeout, the futures are cancelled but not removed from the It's not caused by this pr, but I can clean the |
44b21c2 to
a7ec954
Compare
chenghao-mou
left a comment
There was a problem hiding this comment.
Tested locally and it worked well handing off back and forth. Maybe we need less verbose capability names.
theomonnom
left a comment
There was a problem hiding this comment.
lgtm, we can try to find a better name for _update_session when we make it public.
Otherwise nit:
per_response_tool_choice -> mutable_tool_choice?
Just thinking out loud, it kind of means the same thing if you edit it mid-session through generate_reply
the |
When an agent hands off to another agent that uses the same
RealtimeModel, reuse the existing WebSocket session instead of closing and reopening a new one. The session is detached from the old activity (event listeners removed), transferred to the new activity, and reconfigured viaupdate_session— updating instructions, chat context, and tools as needed based on the model'smid_session_*capabilities.Handoff Benchmark
Test with 10 conversation turns, then handoff — reuse vs fresh session, with copied or empty chat context.
Handoff = time for
update_sessionor new session creation. Reply = time from handoff start togenerate_replyresponse.CI (good network):
Local network: