From 7b0966b3579fbf1b596fcc61d2500de942b9cfe4 Mon Sep 17 00:00:00 2001 From: openhands Date: Wed, 25 Feb 2026 22:01:24 +0000 Subject: [PATCH] Add interrupt callbacks for terminal tool termination - Add _interrupt_callbacks list to LocalConversation with register/unregister methods - Invoke all registered callbacks when conversation.interrupt() is called - Add request_interrupt() method to TerminalSession that sets flag and sends Ctrl+C - Add interrupt check in TerminalSession execute loop to return early when interrupted - Register interrupt callback in TerminalExecutor.__call__ to terminate terminal commands - Add tests for interrupt callback registration and terminal session interruption Co-authored-by: openhands --- .../conversation/impl/local_conversation.py | 39 ++++++ .../openhands/tools/terminal/impl.py | 117 ++++++++++-------- .../terminal/terminal/terminal_session.py | 68 ++++++++++ .../test_conversation_interrupt.py | 95 ++++++++++++++ tests/tools/terminal/test_terminal_session.py | 45 +++++++ 5 files changed, 315 insertions(+), 49 deletions(-) diff --git a/openhands-sdk/openhands/sdk/conversation/impl/local_conversation.py b/openhands-sdk/openhands/sdk/conversation/impl/local_conversation.py index c848572611..5b6309b5b5 100644 --- a/openhands-sdk/openhands/sdk/conversation/impl/local_conversation.py +++ b/openhands-sdk/openhands/sdk/conversation/impl/local_conversation.py @@ -79,6 +79,8 @@ class LocalConversation(BaseConversation): _resolved_plugins: list[ResolvedPluginSource] | None _plugins_loaded: bool _pending_hook_config: HookConfig | None # Hook config to combine with plugin hooks + # Interrupt callbacks - invoked when interrupt() is called + _interrupt_callbacks: list def __init__( self, @@ -146,6 +148,9 @@ def __init__( # initialized instances during interpreter shutdown. self._cleanup_initiated = False + # Initialize interrupt callbacks list (thread-safe via list operations) + self._interrupt_callbacks = [] + # Store plugin specs for lazy loading (no IO in constructor) # Plugins will be loaded on first run() or send_message() call self._plugin_specs = plugins @@ -745,6 +750,9 @@ def interrupt(self) -> None: This method is thread-safe and can be called from any thread. After interruption, the conversation status is set to PAUSED. + + Additionally, all registered interrupt callbacks are invoked to allow + other components (like terminal sessions) to terminate their operations. """ from openhands.sdk.event.user_action import InterruptEvent @@ -753,6 +761,14 @@ def interrupt(self) -> None: for llm in self.llm_registry.usage_to_llm.values(): llm.cancel() + # Invoke all registered interrupt callbacks + # Copy the list to avoid issues if callbacks modify it during iteration + for callback in list(self._interrupt_callbacks): + try: + callback() + except Exception as e: + logger.warning(f"Error in interrupt callback: {e}") + # Set paused status with self._state: if self._state.execution_status in [ @@ -764,6 +780,29 @@ def interrupt(self) -> None: self._on_event(interrupt_event) logger.info("Agent execution interrupted") + def register_interrupt_callback(self, callback) -> None: + """Register a callback to be invoked when interrupt() is called. + + Callbacks are invoked synchronously during interrupt() and should be + fast, non-blocking operations (e.g., setting a flag). They are called + before the conversation status is changed. + + Args: + callback: A no-argument callable to invoke on interrupt. + """ + self._interrupt_callbacks.append(callback) + + def unregister_interrupt_callback(self, callback) -> None: + """Remove a previously registered interrupt callback. + + Args: + callback: The callback to remove. If not found, this is a no-op. + """ + try: + self._interrupt_callbacks.remove(callback) + except ValueError: + pass # Callback not in list, ignore + def update_secrets(self, secrets: Mapping[str, SecretValue]) -> None: """Add secrets to the conversation. diff --git a/openhands-tools/openhands/tools/terminal/impl.py b/openhands-tools/openhands/tools/terminal/impl.py index b5d12d2d37..c758594edf 100644 --- a/openhands-tools/openhands/tools/terminal/impl.py +++ b/openhands-tools/openhands/tools/terminal/impl.py @@ -141,59 +141,78 @@ def __call__( if action.reset and action.is_input: raise ValueError("Cannot use reset=True with is_input=True") - if action.reset or self.session._closed: - reset_result = self.reset() - - # Handle command execution after reset - if action.command.strip(): - command_action = TerminalAction( - command=action.command, - timeout=action.timeout, - is_input=False, # is_input validated to be False when reset=True - ) - self._export_envs(command_action, conversation) - command_result = self.session.execute(command_action) - - # Extract text from content - reset_text = reset_result.text - command_text = command_result.text - - observation = command_result.model_copy( - update={ - "content": [ - TextContent(text=f"{reset_text}\n\n{command_text}") - ], - "command": f"[RESET] {action.command}", - } - ) - else: - # Reset only, no command to execute - observation = reset_result - else: - # If env keys detected, export env values to bash as a separate action first - self._export_envs(action, conversation) - observation = self.session.execute(action) + # Register interrupt callback if conversation is available + # This allows conversation.interrupt() to terminate the terminal command + interrupt_callback = None + if conversation is not None: - # Apply automatic secrets masking - content_text = observation.text + def _on_interrupt() -> None: + self.session.request_interrupt() - if content_text and conversation is not None: - try: - secret_registry = conversation.state.secret_registry - masked_content = secret_registry.mask_secrets_in_output(content_text) - if masked_content: - data = observation.model_dump( - exclude={"content", "full_output_save_dir"} + conversation.register_interrupt_callback(_on_interrupt) + interrupt_callback = _on_interrupt + + try: + if action.reset or self.session._closed: + reset_result = self.reset() + + # Handle command execution after reset + if action.command.strip(): + # is_input validated to be False when reset=True + command_action = TerminalAction( + command=action.command, + timeout=action.timeout, + is_input=False, ) - return TerminalObservation.from_text( - text=masked_content, - full_output_save_dir=self.full_output_save_dir, - **data, + self._export_envs(command_action, conversation) + command_result = self.session.execute(command_action) + + # Extract text from content + reset_text = reset_result.text + command_text = command_result.text + + observation = command_result.model_copy( + update={ + "content": [ + TextContent(text=f"{reset_text}\n\n{command_text}") + ], + "command": f"[RESET] {action.command}", + } ) - except Exception: - pass - - return observation + else: + # Reset only, no command to execute + observation = reset_result + else: + # If env keys detected, export env values to bash as a separate action + self._export_envs(action, conversation) + observation = self.session.execute(action) + + # Apply automatic secrets masking + content_text = observation.text + + if content_text and conversation is not None: + try: + secret_registry = conversation.state.secret_registry + masked_content = secret_registry.mask_secrets_in_output( + content_text + ) + if masked_content: + data = observation.model_dump( + exclude={"content", "full_output_save_dir"} + ) + return TerminalObservation.from_text( + text=masked_content, + full_output_save_dir=self.full_output_save_dir, + **data, + ) + except Exception: + pass + + return observation + finally: + # Always unregister the interrupt callback + if interrupt_callback is not None and conversation is not None: + conversation.unregister_interrupt_callback(interrupt_callback) def close(self) -> None: """Close the terminal session and clean up resources.""" diff --git a/openhands-tools/openhands/tools/terminal/terminal/terminal_session.py b/openhands-tools/openhands/tools/terminal/terminal/terminal_session.py index 87d19b8dc5..3fa9279aa6 100644 --- a/openhands-tools/openhands/tools/terminal/terminal/terminal_session.py +++ b/openhands-tools/openhands/tools/terminal/terminal/terminal_session.py @@ -55,6 +55,7 @@ class TerminalSession(TerminalSessionBase): terminal: TerminalInterface prev_status: TerminalCommandStatus | None prev_output: str + _interrupt_requested: bool def __init__( self, @@ -79,6 +80,8 @@ def __init__( # Store the last command for interactive input handling self.prev_status = None self.prev_output = "" + # Flag for external interruption (e.g., from conversation.interrupt()) + self._interrupt_requested = False def initialize(self) -> None: """Initialize the terminal backend.""" @@ -97,6 +100,20 @@ def interrupt(self) -> bool: """Interrupt the currently running command (equivalent to Ctrl+C).""" return self.terminal.interrupt() + def request_interrupt(self) -> None: + """Request interruption of the current command execution. + + This sets an internal flag that the execute loop checks. When set, + the execute loop will terminate early and also send Ctrl+C to the + underlying terminal process. + + This method is thread-safe and can be called from any thread + (e.g., from a conversation interrupt callback). + """ + self._interrupt_requested = True + # Also send Ctrl+C to the process to stop it + self.terminal.interrupt() + def is_running(self) -> bool: """Check if a command is currently running.""" if not self._initialized: @@ -276,6 +293,43 @@ def _handle_hard_timeout_command( metadata=metadata, ) + def _handle_interrupted_command( + self, + command: str, + terminal_content: str, + ps1_matches: list[re.Match], + ) -> TerminalObservation: + """Handle a command that was interrupted by an external request.""" + self.prev_status = TerminalCommandStatus.INTERRUPTED + if len(ps1_matches) < 1: + logger.warning( + "Expected at least one PS1 metadata block, but got none.\n" + f"---\n{terminal_content!r}\n---" + ) + raw_command_output = self._combine_outputs_between_matches( + terminal_content, ps1_matches + ) + metadata = CmdOutputMetadata() # No metadata available + metadata.suffix = ( + "\n[The command was interrupted by user request. " + "The process received SIGINT (Ctrl+C).]" + ) + command_output = self._get_command_output( + command, + raw_command_output, + metadata, + continue_prefix="[Below is the partial output before interruption.]\n", + ) + command_output = maybe_truncate( + command_output, truncate_after=MAX_CMD_OUTPUT_SIZE + ) + return TerminalObservation.from_text( + command=command, + exit_code=-1, # Convention for interrupted commands + text=command_output, + metadata=metadata, + ) + def _ready_for_next_command(self) -> None: """Reset the content buffer for a new command.""" # Clear the current content @@ -314,6 +368,9 @@ def execute(self, action: TerminalAction) -> TerminalObservation: if not self._initialized: raise RuntimeError("Unified session is not initialized") + # Reset interrupt flag at the start of each execution + self._interrupt_requested = False + # Strip the command of any leading/trailing whitespace logger.debug(f"RECEIVED ACTION: {action}") command = action.command.strip() @@ -517,5 +574,16 @@ def execute(self, action: TerminalAction) -> TerminalObservation: logger.debug(f"RETURNING OBSERVATION (hard-timeout): {obs}") return obs + # 4) Check for external interrupt request (from conversation.interrupt()) + if self._interrupt_requested: + logger.info(f"Interrupt requested for command: {command}") + obs = self._handle_interrupted_command( + command, + terminal_content=cur_terminal_output, + ps1_matches=ps1_matches, + ) + logger.debug(f"RETURNING OBSERVATION (interrupted): {obs}") + return obs + # Sleep before next check time.sleep(POLL_INTERVAL) diff --git a/tests/sdk/conversation/test_conversation_interrupt.py b/tests/sdk/conversation/test_conversation_interrupt.py index 4eebea9130..5b24901cfb 100644 --- a/tests/sdk/conversation/test_conversation_interrupt.py +++ b/tests/sdk/conversation/test_conversation_interrupt.py @@ -212,3 +212,98 @@ def test_conversation_interrupt_is_thread_safe(agent: Agent, tmp_path): # Should not raise any errors and status should be PAUSED assert conv.state.execution_status == ConversationExecutionStatus.PAUSED + + +def test_conversation_register_interrupt_callback(agent: Agent, tmp_path): + """Test that interrupt callbacks can be registered.""" + conv = LocalConversation(agent=agent, workspace=str(tmp_path)) + + called = [] + + def callback(): + called.append(True) + + conv.register_interrupt_callback(callback) + + # Callback should not be called yet + assert len(called) == 0 + + # Call interrupt + conv.interrupt() + + # Callback should have been called + assert len(called) == 1 + + +def test_conversation_unregister_interrupt_callback(agent: Agent, tmp_path): + """Test that interrupt callbacks can be unregistered.""" + conv = LocalConversation(agent=agent, workspace=str(tmp_path)) + + called = [] + + def callback(): + called.append(True) + + conv.register_interrupt_callback(callback) + conv.unregister_interrupt_callback(callback) + + # Call interrupt + conv.interrupt() + + # Callback should not have been called since it was unregistered + assert len(called) == 0 + + +def test_conversation_multiple_interrupt_callbacks(agent: Agent, tmp_path): + """Test that multiple interrupt callbacks are all invoked.""" + conv = LocalConversation(agent=agent, workspace=str(tmp_path)) + + results = [] + + def callback1(): + results.append("callback1") + + def callback2(): + results.append("callback2") + + conv.register_interrupt_callback(callback1) + conv.register_interrupt_callback(callback2) + + conv.interrupt() + + # Both callbacks should have been called + assert "callback1" in results + assert "callback2" in results + + +def test_conversation_interrupt_callback_exception_handling(agent: Agent, tmp_path): + """Test that exceptions in callbacks don't prevent other callbacks.""" + conv = LocalConversation(agent=agent, workspace=str(tmp_path)) + + results = [] + + def bad_callback(): + raise RuntimeError("Intentional error") + + def good_callback(): + results.append("good") + + conv.register_interrupt_callback(bad_callback) + conv.register_interrupt_callback(good_callback) + + # Should not raise despite bad_callback raising + conv.interrupt() + + # good_callback should still have been called + assert "good" in results + + +def test_conversation_unregister_nonexistent_callback(agent: Agent, tmp_path): + """Test that unregistering a non-existent callback is a no-op.""" + conv = LocalConversation(agent=agent, workspace=str(tmp_path)) + + def callback(): + pass + + # Should not raise + conv.unregister_interrupt_callback(callback) diff --git a/tests/tools/terminal/test_terminal_session.py b/tests/tools/terminal/test_terminal_session.py index 45c64ae55e..b9bf3ed28d 100644 --- a/tests/tools/terminal/test_terminal_session.py +++ b/tests/tools/terminal/test_terminal_session.py @@ -1093,3 +1093,48 @@ def test_bash_remove_prefix(terminal_type): assert "git remote -v" not in obs.text finally: session.close() + + +@parametrize_terminal_types +def test_request_interrupt_sets_flag(terminal_type): + """Test that request_interrupt sets the interrupt flag and sends Ctrl+C.""" + with tempfile.TemporaryDirectory() as temp_dir: + session = create_terminal_session( + work_dir=temp_dir, terminal_type=terminal_type + ) + session.initialize() + try: + # Initially, the interrupt flag should be False + assert session._interrupt_requested is False + + # Call request_interrupt + session.request_interrupt() + + # The interrupt flag should now be True + assert session._interrupt_requested is True + finally: + session.close() + + +@parametrize_terminal_types +def test_execute_resets_interrupt_flag(terminal_type): + """Test that execute() resets the interrupt flag at the start.""" + with tempfile.TemporaryDirectory() as temp_dir: + session = create_terminal_session( + work_dir=temp_dir, terminal_type=terminal_type + ) + session.initialize() + try: + # Set the interrupt flag manually + session._interrupt_requested = True + + # Execute a command + obs = session.execute(TerminalAction(command="echo test")) + + # The interrupt flag should be reset + assert session._interrupt_requested is False + # The command should complete normally since the flag was reset + # before the loop started + assert obs.metadata.exit_code == 0 + finally: + session.close()