Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,8 @@ class LocalConversation(BaseConversation):
_resolved_plugins: list[ResolvedPluginSource] | None
_plugins_loaded: bool
_pending_hook_config: HookConfig | None # Hook config to combine with plugin hooks
# Interrupt callbacks - invoked when interrupt() is called
_interrupt_callbacks: list

def __init__(
self,
Expand Down Expand Up @@ -146,6 +148,9 @@ def __init__(
# initialized instances during interpreter shutdown.
self._cleanup_initiated = False

# Initialize interrupt callbacks list (thread-safe via list operations)
self._interrupt_callbacks = []

# Store plugin specs for lazy loading (no IO in constructor)
# Plugins will be loaded on first run() or send_message() call
self._plugin_specs = plugins
Expand Down Expand Up @@ -745,6 +750,9 @@ def interrupt(self) -> None:

This method is thread-safe and can be called from any thread.
After interruption, the conversation status is set to PAUSED.

Additionally, all registered interrupt callbacks are invoked to allow
other components (like terminal sessions) to terminate their operations.
"""
from openhands.sdk.event.user_action import InterruptEvent

Expand All @@ -753,6 +761,14 @@ def interrupt(self) -> None:
for llm in self.llm_registry.usage_to_llm.values():
llm.cancel()

# Invoke all registered interrupt callbacks
# Copy the list to avoid issues if callbacks modify it during iteration
for callback in list(self._interrupt_callbacks):
try:
callback()
except Exception as e:
logger.warning(f"Error in interrupt callback: {e}")

# Set paused status
with self._state:
if self._state.execution_status in [
Expand All @@ -764,6 +780,29 @@ def interrupt(self) -> None:
self._on_event(interrupt_event)
logger.info("Agent execution interrupted")

def register_interrupt_callback(self, callback) -> None:
"""Register a callback to be invoked when interrupt() is called.

Callbacks are invoked synchronously during interrupt() and should be
fast, non-blocking operations (e.g., setting a flag). They are called
before the conversation status is changed.

Args:
callback: A no-argument callable to invoke on interrupt.
"""
self._interrupt_callbacks.append(callback)

def unregister_interrupt_callback(self, callback) -> None:
"""Remove a previously registered interrupt callback.

Args:
callback: The callback to remove. If not found, this is a no-op.
"""
try:
self._interrupt_callbacks.remove(callback)
except ValueError:
pass # Callback not in list, ignore

def update_secrets(self, secrets: Mapping[str, SecretValue]) -> None:
"""Add secrets to the conversation.

Expand Down
117 changes: 68 additions & 49 deletions openhands-tools/openhands/tools/terminal/impl.py
Original file line number Diff line number Diff line change
Expand Up @@ -141,59 +141,78 @@ def __call__(
if action.reset and action.is_input:
raise ValueError("Cannot use reset=True with is_input=True")

if action.reset or self.session._closed:
reset_result = self.reset()

# Handle command execution after reset
if action.command.strip():
command_action = TerminalAction(
command=action.command,
timeout=action.timeout,
is_input=False, # is_input validated to be False when reset=True
)
self._export_envs(command_action, conversation)
command_result = self.session.execute(command_action)

# Extract text from content
reset_text = reset_result.text
command_text = command_result.text

observation = command_result.model_copy(
update={
"content": [
TextContent(text=f"{reset_text}\n\n{command_text}")
],
"command": f"[RESET] {action.command}",
}
)
else:
# Reset only, no command to execute
observation = reset_result
else:
# If env keys detected, export env values to bash as a separate action first
self._export_envs(action, conversation)
observation = self.session.execute(action)
# Register interrupt callback if conversation is available
# This allows conversation.interrupt() to terminate the terminal command
interrupt_callback = None
if conversation is not None:

# Apply automatic secrets masking
content_text = observation.text
def _on_interrupt() -> None:
self.session.request_interrupt()

if content_text and conversation is not None:
try:
secret_registry = conversation.state.secret_registry
masked_content = secret_registry.mask_secrets_in_output(content_text)
if masked_content:
data = observation.model_dump(
exclude={"content", "full_output_save_dir"}
conversation.register_interrupt_callback(_on_interrupt)
interrupt_callback = _on_interrupt

try:
if action.reset or self.session._closed:
reset_result = self.reset()

# Handle command execution after reset
if action.command.strip():
# is_input validated to be False when reset=True
command_action = TerminalAction(
command=action.command,
timeout=action.timeout,
is_input=False,
)
return TerminalObservation.from_text(
text=masked_content,
full_output_save_dir=self.full_output_save_dir,
**data,
self._export_envs(command_action, conversation)
command_result = self.session.execute(command_action)

# Extract text from content
reset_text = reset_result.text
command_text = command_result.text

observation = command_result.model_copy(
update={
"content": [
TextContent(text=f"{reset_text}\n\n{command_text}")
],
"command": f"[RESET] {action.command}",
}
)
except Exception:
pass

return observation
else:
# Reset only, no command to execute
observation = reset_result
else:
# If env keys detected, export env values to bash as a separate action
self._export_envs(action, conversation)
observation = self.session.execute(action)

# Apply automatic secrets masking
content_text = observation.text

if content_text and conversation is not None:
try:
secret_registry = conversation.state.secret_registry
masked_content = secret_registry.mask_secrets_in_output(
content_text
)
if masked_content:
data = observation.model_dump(
exclude={"content", "full_output_save_dir"}
)
return TerminalObservation.from_text(
text=masked_content,
full_output_save_dir=self.full_output_save_dir,
**data,
)
except Exception:
pass

return observation
finally:
# Always unregister the interrupt callback
if interrupt_callback is not None and conversation is not None:
conversation.unregister_interrupt_callback(interrupt_callback)

def close(self) -> None:
"""Close the terminal session and clean up resources."""
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ class TerminalSession(TerminalSessionBase):
terminal: TerminalInterface
prev_status: TerminalCommandStatus | None
prev_output: str
_interrupt_requested: bool

def __init__(
self,
Expand All @@ -79,6 +80,8 @@ def __init__(
# Store the last command for interactive input handling
self.prev_status = None
self.prev_output = ""
# Flag for external interruption (e.g., from conversation.interrupt())
self._interrupt_requested = False

def initialize(self) -> None:
"""Initialize the terminal backend."""
Expand All @@ -97,6 +100,20 @@ def interrupt(self) -> bool:
"""Interrupt the currently running command (equivalent to Ctrl+C)."""
return self.terminal.interrupt()

def request_interrupt(self) -> None:
"""Request interruption of the current command execution.

This sets an internal flag that the execute loop checks. When set,
the execute loop will terminate early and also send Ctrl+C to the
underlying terminal process.

This method is thread-safe and can be called from any thread
(e.g., from a conversation interrupt callback).
"""
self._interrupt_requested = True
# Also send Ctrl+C to the process to stop it
self.terminal.interrupt()

def is_running(self) -> bool:
"""Check if a command is currently running."""
if not self._initialized:
Expand Down Expand Up @@ -276,6 +293,43 @@ def _handle_hard_timeout_command(
metadata=metadata,
)

def _handle_interrupted_command(
self,
command: str,
terminal_content: str,
ps1_matches: list[re.Match],
) -> TerminalObservation:
"""Handle a command that was interrupted by an external request."""
self.prev_status = TerminalCommandStatus.INTERRUPTED
if len(ps1_matches) < 1:
logger.warning(
"Expected at least one PS1 metadata block, but got none.\n"
f"---\n{terminal_content!r}\n---"
)
raw_command_output = self._combine_outputs_between_matches(
terminal_content, ps1_matches
)
metadata = CmdOutputMetadata() # No metadata available
metadata.suffix = (
"\n[The command was interrupted by user request. "
"The process received SIGINT (Ctrl+C).]"
)
command_output = self._get_command_output(
command,
raw_command_output,
metadata,
continue_prefix="[Below is the partial output before interruption.]\n",
)
command_output = maybe_truncate(
command_output, truncate_after=MAX_CMD_OUTPUT_SIZE
)
return TerminalObservation.from_text(
command=command,
exit_code=-1, # Convention for interrupted commands
text=command_output,
metadata=metadata,
)

def _ready_for_next_command(self) -> None:
"""Reset the content buffer for a new command."""
# Clear the current content
Expand Down Expand Up @@ -314,6 +368,9 @@ def execute(self, action: TerminalAction) -> TerminalObservation:
if not self._initialized:
raise RuntimeError("Unified session is not initialized")

# Reset interrupt flag at the start of each execution
self._interrupt_requested = False

# Strip the command of any leading/trailing whitespace
logger.debug(f"RECEIVED ACTION: {action}")
command = action.command.strip()
Expand Down Expand Up @@ -517,5 +574,16 @@ def execute(self, action: TerminalAction) -> TerminalObservation:
logger.debug(f"RETURNING OBSERVATION (hard-timeout): {obs}")
return obs

# 4) Check for external interrupt request (from conversation.interrupt())
if self._interrupt_requested:
logger.info(f"Interrupt requested for command: {command}")
obs = self._handle_interrupted_command(
command,
terminal_content=cur_terminal_output,
ps1_matches=ps1_matches,
)
logger.debug(f"RETURNING OBSERVATION (interrupted): {obs}")
return obs

# Sleep before next check
time.sleep(POLL_INTERVAL)
Loading
Loading