Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 7 additions & 4 deletions livekit-agents/livekit/agents/voice/agent_activity.py
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,7 @@ def __init__(self, agent: Agent, sess: AgentSession) -> None:
self._started = False
self._closed = False
self._scheduling_paused = True
self._new_turns_blocked = False

self._current_speech: SpeechHandle | None = None
self._speech_q: list[tuple[int, float, SpeechHandle]] = []
Expand Down Expand Up @@ -758,6 +759,7 @@ async def _resume_scheduling_task(self) -> None:
return

self._scheduling_paused = False
self._new_turns_blocked = False
self._scheduling_atask = asyncio.create_task(
self._scheduling_task(), name="_scheduling_task"
)
Expand Down Expand Up @@ -1401,7 +1403,7 @@ def _on_generation_created(self, ev: llm.GenerationCreatedEvent) -> None:
# user_initiated generations are directly handled inside _realtime_reply_task
return

if self._scheduling_paused:
if self._scheduling_paused or self._new_turns_blocked:
# TODO(theomonnom): should we "forward" this new turn to the next agent?
logger.warning("skipping new realtime generation, the speech scheduling is not running")
return
Expand Down Expand Up @@ -1663,6 +1665,7 @@ def on_preemptive_generation(self, info: _PreemptiveGenerationInfo) -> None:
if (
not self._session.options.preemptive_generation
or self._scheduling_paused
or self._new_turns_blocked
or (self._current_speech is not None and not self._current_speech.interrupted)
or not isinstance(self.llm, llm.LLM)
):
Expand Down Expand Up @@ -1699,7 +1702,7 @@ def on_end_of_turn(self, info: _EndOfTurnInfo) -> bool:
# IMPORTANT: This method is sync to avoid it being cancelled by the AudioRecognition
# We explicitly create a new task here

if self._scheduling_paused:
if self._scheduling_paused or self._new_turns_blocked:
self._cancel_preemptive_generation()
logger.warning(
"skipping user input, speech scheduling is paused",
Expand Down Expand Up @@ -1800,7 +1803,7 @@ async def _user_turn_completed_task(
if self._rt_session is not None:
self._rt_session.interrupt()

if self._scheduling_paused:
if self._scheduling_paused or self._new_turns_blocked:
logger.warning(
"skipping on_user_turn_completed, speech scheduling is paused",
extra={"user_input": info.new_transcript},
Expand Down Expand Up @@ -1833,7 +1836,7 @@ async def _user_turn_completed_task(
elif self.llm is None:
return # skip response if no llm is set

if self._scheduling_paused:
if self._scheduling_paused or self._new_turns_blocked:
logger.warning(
"skipping reply to user input, speech scheduling is paused",
extra={"user_input": info.new_transcript},
Expand Down
5 changes: 5 additions & 0 deletions livekit-agents/livekit/agents/voice/agent_session.py
Original file line number Diff line number Diff line change
Expand Up @@ -1245,6 +1245,11 @@ def update_agent(self, agent: Agent) -> None:
self._agent = agent

if self._started:
# immediately block the old activity from accepting new user turns
# during the transition window (before drain() formally pauses scheduling)
if self._activity is not None:
self._activity._new_turns_blocked = True

self._update_activity_atask = task = asyncio.create_task(
self._update_activity_task(self._update_activity_atask, self._agent),
name="_update_activity_task",
Expand Down
Loading