diff --git a/openhands-sdk/openhands/sdk/conversation/impl/local_conversation.py b/openhands-sdk/openhands/sdk/conversation/impl/local_conversation.py index 885ec5abdf..5acb84794b 100644 --- a/openhands-sdk/openhands/sdk/conversation/impl/local_conversation.py +++ b/openhands-sdk/openhands/sdk/conversation/impl/local_conversation.py @@ -34,7 +34,7 @@ from openhands.sdk.hooks import HookConfig, HookEventProcessor, create_hook_callback from openhands.sdk.llm import LLM, Message, TextContent from openhands.sdk.llm.llm_registry import LLMRegistry -from openhands.sdk.logger import get_logger +from openhands.sdk.logger import flush_stdin, get_logger from openhands.sdk.observability.laminar import observe from openhands.sdk.plugin import ( Plugin, @@ -621,6 +621,12 @@ def run(self) -> None: ) iteration += 1 + # Flush any pending terminal query responses that may have + # accumulated during the step (Rich display, tool execution, etc.) + # This prevents ANSI escape codes from leaking to stdin. + # See: https://github.com/OpenHands/software-agent-sdk/issues/2244 + flush_stdin() + # Check for non-finished terminal conditions # Note: We intentionally do NOT check for FINISHED status here. # This allows concurrent user messages to be processed: diff --git a/openhands-sdk/openhands/sdk/conversation/visualizer/default.py b/openhands-sdk/openhands/sdk/conversation/visualizer/default.py index 2314d6560b..e1f0823ca4 100644 --- a/openhands-sdk/openhands/sdk/conversation/visualizer/default.py +++ b/openhands-sdk/openhands/sdk/conversation/visualizer/default.py @@ -23,6 +23,7 @@ ) from openhands.sdk.event.base import Event from openhands.sdk.event.condenser import Condensation, CondensationRequest +from openhands.sdk.logger import flush_stdin logger = logging.getLogger(__name__) @@ -248,6 +249,12 @@ def __init__( def on_event(self, event: Event) -> None: """Main event handler that displays events with Rich formatting.""" + # Flush any pending terminal query responses before rendering. + # This prevents ANSI escape codes from accumulating in stdin + # and corrupting subsequent input() calls. + # See: https://github.com/OpenHands/software-agent-sdk/issues/2244 + flush_stdin() + output = self._create_event_block(event) if output: self._console.print(output) diff --git a/openhands-sdk/openhands/sdk/logger/__init__.py b/openhands-sdk/openhands/sdk/logger/__init__.py index 42b1b09c78..337ee11b6a 100644 --- a/openhands-sdk/openhands/sdk/logger/__init__.py +++ b/openhands-sdk/openhands/sdk/logger/__init__.py @@ -4,6 +4,9 @@ ENV_LOG_DIR, ENV_LOG_LEVEL, IN_CI, + clear_buffered_input, + flush_stdin, + get_buffered_input, get_logger, setup_logging, ) @@ -13,6 +16,9 @@ __all__ = [ "get_logger", "setup_logging", + "flush_stdin", + "get_buffered_input", + "clear_buffered_input", "DEBUG", "ENV_JSON", "ENV_LOG_LEVEL", diff --git a/openhands-sdk/openhands/sdk/logger/logger.py b/openhands-sdk/openhands/sdk/logger/logger.py index b728923d85..66511243b7 100644 --- a/openhands-sdk/openhands/sdk/logger/logger.py +++ b/openhands-sdk/openhands/sdk/logger/logger.py @@ -9,8 +9,10 @@ logger.info("Hello from this module!") """ +import atexit import logging import os +import select from logging.handlers import TimedRotatingFileHandler import litellm @@ -193,3 +195,258 @@ def get_logger(name: str) -> logging.Logger: # Auto-configure if desired if ENV_AUTO_CONFIG: setup_logging() + + +# ========= TERMINAL CLEANUP ========= +# Prevents ANSI escape code leaks during operation and at exit. +# See: https://github.com/OpenHands/software-agent-sdk/issues/2244 +# +# The issue: Terminal queries (like DSR for cursor position) get responses +# written to stdin. If not consumed, these leak as garbage to the shell or +# corrupt the next input() call in CLI applications. +# +# This implementation uses SELECTIVE FLUSHING: +# - Parse pending stdin data byte-by-byte +# - Discard only recognized escape sequences (CSI, OSC) +# - Preserve all other data (likely user typeahead) in a buffer +# - Provide get_buffered_input() for SDK to retrieve preserved data + +_cleanup_registered = False +_preserved_input_buffer: bytes = b"" + + +def _is_csi_final_byte(byte: int) -> bool: + """Check if byte is a CSI sequence final character (0x40-0x7E). + + Per ECMA-48, CSI sequences end with a byte in the range 0x40-0x7E. + Common finals: 'R' (0x52) for cursor position, 'n' for device status. + """ + return 0x40 <= byte <= 0x7E + + +def _find_csi_end(data: bytes, start: int) -> int: + """Find end position of a CSI sequence. + + CSI format: ESC [ [params] [intermediates] final + - Params: 0x30-0x3F (digits, semicolons, etc.) + - Intermediates: 0x20-0x2F (space, !"#$%&'()*+,-./) + - Final: 0x40-0x7E (@A-Z[\\]^_`a-z{|}~) + + Args: + data: The byte buffer to parse. + start: Position of the ESC byte (start of \\x1b[ sequence). + + Returns: + Position AFTER the sequence ends. If incomplete, returns start + (preserving the incomplete sequence as potential user input). + """ + pos = start + 2 # Skip \x1b[ + while pos < len(data): + byte = data[pos] + if _is_csi_final_byte(byte): + return pos + 1 # Include the final byte + if byte < 0x20 or byte > 0x3F and byte < 0x40: + # Invalid byte in CSI sequence - treat as end + return pos + pos += 1 + # Incomplete sequence at end of buffer - preserve it (might be user input) + return start + + +def _find_osc_end(data: bytes, start: int) -> int: + """Find end position of an OSC sequence. + + OSC format: ESC ] ... (BEL or ST) + - BEL terminator: 0x07 + - ST terminator: ESC \\ (0x1b 0x5c) + + Args: + data: The byte buffer to parse. + start: Position of the ESC byte (start of \\x1b] sequence). + + Returns: + Position AFTER the sequence ends. If incomplete, returns start + (preserving the incomplete sequence as potential user input). + """ + pos = start + 2 # Skip \x1b] + while pos < len(data): + if data[pos] == 0x07: # BEL terminator + return pos + 1 + if data[pos] == 0x1B and pos + 1 < len(data) and data[pos + 1] == 0x5C: + return pos + 2 # ST terminator \x1b\\ + pos += 1 + # Incomplete sequence - preserve it + return start + + +def _parse_stdin_data(data: bytes) -> tuple[bytes, int]: + """Parse stdin data, separating escape sequences from user input. + + This function implements selective flushing: it identifies and discards + terminal escape sequence responses (CSI and OSC) while preserving any + other data that may be legitimate user typeahead. + + Args: + data: Raw bytes read from stdin. + + Returns: + Tuple of (preserved_user_input, flushed_escape_sequence_bytes). + """ + preserved = b"" + flushed = 0 + i = 0 + + while i < len(data): + # Check for CSI sequence: \x1b[ + if i + 1 < len(data) and data[i] == 0x1B and data[i + 1] == 0x5B: + end = _find_csi_end(data, i) + if end > i: # Complete sequence found + flushed += end - i + i = end + else: # Incomplete - preserve as potential user input + preserved += data[i : i + 1] + i += 1 + + # Check for OSC sequence: \x1b] + elif i + 1 < len(data) and data[i] == 0x1B and data[i + 1] == 0x5D: + end = _find_osc_end(data, i) + if end > i: # Complete sequence found + flushed += end - i + i = end + else: # Incomplete - preserve + preserved += data[i : i + 1] + i += 1 + + # Single ESC followed by another character - could be escape sequence + # Be conservative: if next byte looks like start of known sequence type, + # preserve both bytes for next iteration or as user input + elif data[i] == 0x1B and i + 1 < len(data): + next_byte = data[i + 1] + # Known sequence starters we don't fully parse: SS2, SS3, DCS, PM, APC + # SS2=N, SS3=O, DCS=P, PM=^, APC=_ + if next_byte in (0x4E, 0x4F, 0x50, 0x5E, 0x5F): + # These are less common; preserve as user input + preserved += data[i : i + 1] + i += 1 + else: + # Unknown escape sequence type - preserve it + preserved += data[i : i + 1] + i += 1 + + # Regular byte - preserve it (likely user input) + else: + preserved += data[i : i + 1] + i += 1 + + return preserved, flushed + + +def flush_stdin() -> int: + """Flush terminal escape sequences from stdin, preserving user input. + + On macOS (and some Linux terminals), terminal query responses can leak + to stdin. If not consumed before exit or between conversation turns, + they corrupt input or appear as garbage in the shell. + + This function uses SELECTIVE FLUSHING: + - Only discards recognized escape sequences (CSI `\\x1b[...`, OSC `\\x1b]...`) + - Preserves all other data in an internal buffer + - Use get_buffered_input() to retrieve preserved user input + + This function is called automatically: + 1. At exit (registered via atexit) + 2. After each agent step in LocalConversation.run() + 3. Before rendering events in DefaultConversationVisualizer + + It can also be called manually if needed. + + Returns: + Number of escape sequence bytes flushed from stdin. + """ # noqa: E501 + global _preserved_input_buffer + import sys as _sys # Import locally to avoid issues at atexit time + + if not _sys.stdin.isatty(): + return 0 + + try: + import termios + except ImportError: + return 0 # Windows + + flushed = 0 + old = None + try: + old = termios.tcgetattr(_sys.stdin) + # Deep copy required: old[6] is a list (cc), and list(old) only + # does a shallow copy. Without deep copy, modifying new[6][VMIN] + # would also modify old[6][VMIN], corrupting the restore. + new = [item[:] if isinstance(item, list) else item for item in old] + # termios attrs: [iflag, oflag, cflag, lflag, ispeed, ospeed, cc] + # Index 3 is lflag (int), index 6 is cc (list) + lflag = new[3] + assert isinstance(lflag, int) # Help type checker + new[3] = lflag & ~(termios.ICANON | termios.ECHO) + new[6][termios.VMIN] = 0 + new[6][termios.VTIME] = 0 + termios.tcsetattr(_sys.stdin, termios.TCSANOW, new) + + while select.select([_sys.stdin], [], [], 0)[0]: + data = os.read(_sys.stdin.fileno(), 4096) + if not data: + break + # Parse data: discard escape sequences, preserve user input + preserved, seq_flushed = _parse_stdin_data(data) + flushed += seq_flushed + _preserved_input_buffer += preserved + + except (OSError, termios.error): + pass + finally: + if old is not None: + try: + termios.tcsetattr(_sys.stdin, termios.TCSANOW, old) + except (OSError, termios.error): + pass + return flushed + + +def get_buffered_input() -> bytes: + """Get any user input that was preserved during flush_stdin calls. + + When flush_stdin() discards escape sequences, it preserves any other + data that might be legitimate user typeahead. This function retrieves + and clears that buffered input. + + SDK components that read user input should call this function to + prepend any buffered data to their input. + + Returns: + Bytes that were preserved from stdin during flush operations. + The internal buffer is cleared after this call. + + Example: + >>> # In code that reads user input: + >>> buffered = get_buffered_input() + >>> user_input = buffered.decode('utf-8', errors='replace') + input() + """ + global _preserved_input_buffer + data = _preserved_input_buffer + _preserved_input_buffer = b"" + return data + + +def clear_buffered_input() -> None: + """Clear any buffered input without returning it. + + Use this when you want to discard any preserved input, for example + at the start of a new conversation or after a timeout. + """ + global _preserved_input_buffer + _preserved_input_buffer = b"" + + +# Register cleanup at module load time +if not _cleanup_registered: + atexit.register(flush_stdin) + _cleanup_registered = True diff --git a/openhands-tools/openhands/tools/terminal/terminal/terminal_session.py b/openhands-tools/openhands/tools/terminal/terminal/terminal_session.py index 87d19b8dc5..f3d43e5b50 100644 --- a/openhands-tools/openhands/tools/terminal/terminal/terminal_session.py +++ b/openhands-tools/openhands/tools/terminal/terminal/terminal_session.py @@ -26,6 +26,7 @@ escape_bash_special_chars, split_bash_commands, ) +from openhands.tools.terminal.utils.escape_filter import filter_terminal_queries logger = get_logger(__name__) @@ -120,7 +121,12 @@ def _get_command_output( metadata: CmdOutputMetadata, continue_prefix: str = "", ) -> str: - """Get the command output with the previous command output removed.""" + """Get the command output with the previous command output removed. + + Also filters terminal query sequences that could cause visible escape + code garbage when the output is displayed. + See: https://github.com/OpenHands/software-agent-sdk/issues/2244 + """ # remove the previous command output from the new output if any if self.prev_output: command_output = raw_command_output.removeprefix(self.prev_output) @@ -129,6 +135,11 @@ def _get_command_output( command_output = raw_command_output self.prev_output = raw_command_output # update current command output anyway command_output = _remove_command_prefix(command_output, command) + + # Filter terminal query sequences that would cause the terminal to + # respond when displayed, producing visible garbage + command_output = filter_terminal_queries(command_output) + return command_output.rstrip() def _handle_completed_command( diff --git a/openhands-tools/openhands/tools/terminal/utils/__init__.py b/openhands-tools/openhands/tools/terminal/utils/__init__.py new file mode 100644 index 0000000000..0ebade7057 --- /dev/null +++ b/openhands-tools/openhands/tools/terminal/utils/__init__.py @@ -0,0 +1,14 @@ +"""Terminal tool utilities.""" + +from openhands.tools.terminal.utils.command import ( + escape_bash_special_chars, + split_bash_commands, +) +from openhands.tools.terminal.utils.escape_filter import filter_terminal_queries + + +__all__ = [ + "escape_bash_special_chars", + "split_bash_commands", + "filter_terminal_queries", +] diff --git a/openhands-tools/openhands/tools/terminal/utils/escape_filter.py b/openhands-tools/openhands/tools/terminal/utils/escape_filter.py new file mode 100644 index 0000000000..53543c44d2 --- /dev/null +++ b/openhands-tools/openhands/tools/terminal/utils/escape_filter.py @@ -0,0 +1,83 @@ +"""Filter terminal query sequences from captured output. + +When CLI tools (like `gh`, `npm`, etc.) run inside a PTY, they may send +terminal query sequences as part of their progress/spinner UI. These queries +get captured as output. When displayed, the terminal processes them and +responds, causing visible escape code garbage. + +This module provides filtering to remove these query sequences while +preserving legitimate formatting escape codes (colors, bold, etc.). + +See: https://github.com/OpenHands/software-agent-sdk/issues/2244 +""" + +import re + + +# Terminal query sequences that trigger responses (and cause visible garbage) +# These should be stripped from captured output before display. +# +# Reference: ECMA-48, XTerm Control Sequences +# https://invisible-island.net/xterm/ctlseqs/ctlseqs.html + +# DSR (Device Status Report) - cursor position query +# Format: ESC [ 6 n -> Response: ESC [ row ; col R +_DSR_PATTERN = re.compile(rb"\x1b\[6n") + +# OSC (Operating System Command) queries +# Format: ESC ] Ps ; Pt (BEL | ST) +# Common queries: +# OSC 10 ; ? - foreground color query +# OSC 11 ; ? - background color query +# OSC 4 ; index ; ? - palette color query +# Terminators: BEL (\x07) or ST (ESC \) +_OSC_QUERY_PATTERN = re.compile( + rb"\x1b\]" # OSC introducer + rb"(?:10|11|4)" # Color query codes (10=fg, 11=bg, 4=palette) + rb"[^" # Match until terminator + rb"\x07\x1b]*" # (not BEL or ESC) + rb"(?:\x07|\x1b\\)" # BEL or ST terminator +) + +# DA (Device Attributes) primary query +# Format: ESC [ c or ESC [ 0 c +_DA_PATTERN = re.compile(rb"\x1b\[0?c") + +# DA2 (Secondary Device Attributes) query +# Format: ESC [ > c or ESC [ > 0 c +_DA2_PATTERN = re.compile(rb"\x1b\[>0?c") + +# DECRQSS (Request Selection or Setting) - various terminal state queries +# Format: ESC P $ q ST +_DECRQSS_PATTERN = re.compile( + rb"\x1bP\$q" # DCS introducer + DECRQSS + rb"[^\x1b]*" # Setting identifier + rb"\x1b\\" # ST terminator +) + + +def filter_terminal_queries(output: str) -> str: + """Filter terminal query sequences from captured terminal output. + + Removes escape sequences that would cause the terminal to respond + when the output is displayed, while preserving legitimate formatting + sequences (colors, cursor movement, etc.). + + Args: + output: Raw terminal output that may contain query sequences. + + Returns: + Filtered output with query sequences removed. + """ + # Convert to bytes for regex matching (escape sequences are byte-level) + output_bytes = output.encode("utf-8", errors="surrogateescape") + + # Remove each type of query sequence + output_bytes = _DSR_PATTERN.sub(b"", output_bytes) + output_bytes = _OSC_QUERY_PATTERN.sub(b"", output_bytes) + output_bytes = _DA_PATTERN.sub(b"", output_bytes) + output_bytes = _DA2_PATTERN.sub(b"", output_bytes) + output_bytes = _DECRQSS_PATTERN.sub(b"", output_bytes) + + # Convert back to string + return output_bytes.decode("utf-8", errors="surrogateescape") diff --git a/tests/sdk/logger/test_flush_stdin.py b/tests/sdk/logger/test_flush_stdin.py new file mode 100644 index 0000000000..231079e134 --- /dev/null +++ b/tests/sdk/logger/test_flush_stdin.py @@ -0,0 +1,441 @@ +"""Tests for flush_stdin terminal cleanup functionality. + +See: https://github.com/OpenHands/software-agent-sdk/issues/2244 +""" + +import importlib.util +import os +import sys +from unittest import mock + +import pytest + +from openhands.sdk.logger import ( + clear_buffered_input, + flush_stdin, + get_buffered_input, +) +from openhands.sdk.logger.logger import ( + _find_csi_end, + _find_osc_end, + _is_csi_final_byte, + _parse_stdin_data, +) + + +# Check if pty module is available (Unix only) +PTY_AVAILABLE = importlib.util.find_spec("pty") is not None + + +class TestFlushStdin: + """Tests for the flush_stdin function.""" + + def test_flush_stdin_returns_zero_when_not_tty(self): + """flush_stdin should return 0 when stdin is not a tty.""" + with mock.patch.object(sys.stdin, "isatty", return_value=False): + result = flush_stdin() + assert result == 0 + + def test_flush_stdin_returns_zero_when_termios_unavailable(self): + """flush_stdin should return 0 when termios is not available (e.g., Windows). + + Note: This test verifies the code path by checking the function's behavior + when termios import fails. On platforms where termios is available, we + simulate its absence by patching the import mechanism. + """ + if importlib.util.find_spec("termios") is None: + # On Windows/platforms without termios, test the real behavior + with mock.patch.object(sys.stdin, "isatty", return_value=True): + result = flush_stdin() + assert result == 0 + else: + # On Unix, we can't easily unload termios since it's already imported. + # The termios unavailable path is covered by Windows CI. + pytest.skip("termios is available; Windows CI covers the unavailable path") + + def test_flush_stdin_is_exported(self): + """flush_stdin should be available in the public API.""" + from openhands.sdk.logger import flush_stdin as exported_flush_stdin + + assert callable(exported_flush_stdin) + + def test_flush_stdin_handles_oserror_gracefully(self): + """flush_stdin should handle OSError gracefully.""" + if importlib.util.find_spec("termios") is None: + pytest.skip("termios not available on this platform") + + with mock.patch.object(sys.stdin, "isatty", return_value=True): + with mock.patch("termios.tcgetattr", side_effect=OSError("test error")): + result = flush_stdin() + assert result == 0 + + def test_flush_stdin_handles_termios_error_gracefully(self): + """flush_stdin should handle termios.error gracefully.""" + if importlib.util.find_spec("termios") is None: + pytest.skip("termios not available on this platform") + + import termios + + with mock.patch.object(sys.stdin, "isatty", return_value=True): + with mock.patch( + "termios.tcgetattr", + side_effect=termios.error("test error"), + ): + result = flush_stdin() + assert result == 0 + + +class TestSelectiveFlushing: + """Tests for selective flushing - parsing escape sequences vs user input.""" + + def test_is_csi_final_byte(self): + """Test CSI final byte detection (0x40-0x7E).""" + # Valid final bytes + assert _is_csi_final_byte(0x40) # @ + assert _is_csi_final_byte(0x52) # R (cursor position) + assert _is_csi_final_byte(0x6E) # n (device status) + assert _is_csi_final_byte(0x7E) # ~ (end of range) + + # Invalid final bytes + assert not _is_csi_final_byte(0x3F) # ? (below range) + assert not _is_csi_final_byte(0x7F) # DEL (above range) + assert not _is_csi_final_byte(0x30) # 0 (parameter byte) + + def test_find_csi_end_complete_sequence(self): + """Test finding end of complete CSI sequences.""" + # DSR response: \x1b[5;10R + data = b"\x1b[5;10R" + assert _find_csi_end(data, 0) == 7 + + # Simple sequence: \x1b[H (cursor home) + data = b"\x1b[H" + assert _find_csi_end(data, 0) == 3 + + # With offset + data = b"abc\x1b[5;10Rxyz" + assert _find_csi_end(data, 3) == 10 + + def test_find_csi_end_incomplete_sequence(self): + """Test handling of incomplete CSI sequences (preserved).""" + # Incomplete - no final byte + data = b"\x1b[5;10" + assert _find_csi_end(data, 0) == 0 # Returns start = preserve + + # Just the introducer + data = b"\x1b[" + assert _find_csi_end(data, 0) == 0 + + def test_find_osc_end_with_bel_terminator(self): + """Test finding end of OSC sequences with BEL terminator.""" + # Background color response: \x1b]11;rgb:XXXX/XXXX/XXXX\x07 + data = b"\x1b]11;rgb:30fb/3708/41af\x07" + assert _find_osc_end(data, 0) == len(data) + + def test_find_osc_end_with_st_terminator(self): + """Test finding end of OSC sequences with ST terminator.""" + # OSC with ST terminator: \x1b]...\x1b\\ + data = b"\x1b]11;rgb:30fb/3708/41af\x1b\\" + assert _find_osc_end(data, 0) == len(data) + + def test_find_osc_end_incomplete_sequence(self): + """Test handling of incomplete OSC sequences (preserved).""" + # No terminator + data = b"\x1b]11;rgb:30fb/3708/41af" + assert _find_osc_end(data, 0) == 0 # Returns start = preserve + + def test_parse_stdin_data_csi_only(self): + """Test parsing data with only CSI sequences - all flushed.""" + data = b"\x1b[5;10R" + preserved, flushed = _parse_stdin_data(data) + assert preserved == b"" + assert flushed == 7 + + def test_parse_stdin_data_osc_only(self): + """Test parsing data with only OSC sequences - all flushed.""" + data = b"\x1b]11;rgb:30fb/3708/41af\x07" + preserved, flushed = _parse_stdin_data(data) + assert preserved == b"" + assert flushed == len(data) + + def test_parse_stdin_data_user_input_only(self): + """Test parsing data with only user input - all preserved.""" + data = b"hello world" + preserved, flushed = _parse_stdin_data(data) + assert preserved == b"hello world" + assert flushed == 0 + + def test_parse_stdin_data_mixed_content(self): + """Test parsing mixed escape sequences and user input.""" + # User types "ls" while terminal response arrives + data = b"l\x1b[5;10Rs" + preserved, flushed = _parse_stdin_data(data) + assert preserved == b"ls" + assert flushed == 7 # The CSI sequence + + def test_parse_stdin_data_multiple_sequences(self): + """Test parsing multiple escape sequences.""" + # Two DSR responses: \x1b[5;10R (7 bytes) + \x1b[6;1R (6 bytes) + data = b"\x1b[5;10R\x1b[6;1R" + preserved, flushed = _parse_stdin_data(data) + assert preserved == b"" + assert flushed == 13 # 7 + 6 + + def test_parse_stdin_data_preserves_incomplete_csi(self): + """Test that incomplete CSI sequences are preserved as user input.""" + # User typing escape followed by [ (could be arrow key start) + data = b"\x1b[5;10" # Incomplete - no final byte + preserved, flushed = _parse_stdin_data(data) + # Incomplete sequence preserved byte by byte + assert b"\x1b" in preserved + assert flushed == 0 + + def test_parse_stdin_data_arrow_keys_preserved(self): + """Test that arrow key sequences are handled correctly. + + Note: Arrow keys are CSI sequences like \\x1b[A, \\x1b[B, etc. + They WILL be flushed because they are complete CSI sequences. + This is acceptable because arrow keys during agent execution + are unlikely to be meaningful user input. + """ + # Up arrow + data = b"\x1b[A" + preserved, flushed = _parse_stdin_data(data) + # Arrow key is a complete CSI sequence - gets flushed + assert flushed == 3 + assert preserved == b"" + + +class TestBufferedInput: + """Tests for get_buffered_input and clear_buffered_input.""" + + def test_get_buffered_input_is_exported(self): + """get_buffered_input should be available in the public API.""" + from openhands.sdk.logger import ( + get_buffered_input as exported_get_buffered_input, + ) + + assert callable(exported_get_buffered_input) + + def test_clear_buffered_input_is_exported(self): + """clear_buffered_input should be available in the public API.""" + from openhands.sdk.logger import ( + clear_buffered_input as exported_clear, + ) + + assert callable(exported_clear) + + def test_get_buffered_input_returns_and_clears(self): + """get_buffered_input should return buffer and clear it.""" + from openhands.sdk.logger import logger as logger_module + + # Set up some buffered data + logger_module._preserved_input_buffer = b"test data" + + # Get the data + data = get_buffered_input() + assert data == b"test data" + + # Buffer should now be empty + assert logger_module._preserved_input_buffer == b"" + + # Second call returns empty + data2 = get_buffered_input() + assert data2 == b"" + + def test_clear_buffered_input_clears_buffer(self): + """clear_buffered_input should clear the buffer without returning.""" + from openhands.sdk.logger import logger as logger_module + + # Set up some buffered data + logger_module._preserved_input_buffer = b"test data" + + # Clear it + clear_buffered_input() + + # Buffer should be empty + assert logger_module._preserved_input_buffer == b"" + + +class TestFlushStdinIntegration: + """Integration tests for flush_stdin in conversation flow.""" + + def test_flush_stdin_called_in_visualizer_on_event(self): + """Verify flush_stdin is called before rendering events in the visualizer.""" + from openhands.sdk.conversation.visualizer.default import ( + DefaultConversationVisualizer, + ) + from openhands.sdk.event import PauseEvent + + visualizer = DefaultConversationVisualizer() + + with mock.patch( + "openhands.sdk.conversation.visualizer.default.flush_stdin" + ) as mock_flush: + # Create a simple event and trigger on_event + event = PauseEvent() + visualizer.on_event(event) + + # Verify flush_stdin was called + mock_flush.assert_called_once() + + def test_flush_stdin_registered_with_atexit(self): + """Verify flush_stdin is registered as an atexit handler.""" + + # Get registered atexit functions + # Note: atexit._run_exitfuncs() would run them, but we just check registration + # The atexit module doesn't expose a clean way to inspect handlers, + # but we can verify by checking the module-level flag + from openhands.sdk.logger import logger as logger_module + + assert logger_module._cleanup_registered is True + + +@pytest.mark.skipif(not PTY_AVAILABLE, reason="pty module not available (Windows)") +class TestFlushStdinPTY: + """PTY-based tests for flush_stdin to verify real terminal behavior.""" + + def test_flush_stdin_restores_termios_settings(self): + """Verify termios settings are properly restored after flush_stdin. + + This test uses a PTY to create a real terminal environment and verifies + that the termios settings (especially VMIN/VTIME in cc array) are + correctly restored after flush_stdin modifies them temporarily. + + This catches the shallow-copy bug where list(old) would cause old[6] + and new[6] to share the same reference, corrupting the restore. + """ + import pty + import termios + + # Create a pseudo-terminal + master_fd, slave_fd = pty.openpty() + slave_file = None + + try: + # Open the slave as a file object to use as stdin + slave_file = os.fdopen(slave_fd, "r") + + # Get original termios settings + original_settings = termios.tcgetattr(slave_file) + original_vmin = original_settings[6][termios.VMIN] + original_vtime = original_settings[6][termios.VTIME] + + # Patch stdin to use our PTY slave + with mock.patch.object(sys, "stdin", slave_file): + # Call flush_stdin - this modifies VMIN/VTIME temporarily + flush_stdin() + + # Get settings after flush_stdin + restored_settings = termios.tcgetattr(slave_file) + restored_vmin = restored_settings[6][termios.VMIN] + restored_vtime = restored_settings[6][termios.VTIME] + + # Verify settings were restored correctly + assert restored_vmin == original_vmin, ( + f"VMIN not restored: {original_vmin!r} -> {restored_vmin!r}" + ) + assert restored_vtime == original_vtime, ( + f"VTIME not restored: {original_vtime!r} -> {restored_vtime!r}" + ) + + finally: + os.close(master_fd) + # slave_fd is closed when slave_file is closed + if slave_file is not None: + try: + slave_file.close() + except OSError: + pass # May already be closed + + def test_flush_stdin_drains_escape_sequences_only(self): + """Verify flush_stdin drains only escape sequences from stdin. + + This test writes mixed data (escape sequences + user input) to a PTY + and verifies that flush_stdin only counts escape sequences as flushed, + while preserving user input in the buffer. + """ + import pty + import time + + from openhands.sdk.logger import logger as logger_module + + # Clear any existing buffer + logger_module._preserved_input_buffer = b"" + + master_fd, slave_fd = pty.openpty() + slave_file = None + + try: + slave_file = os.fdopen(slave_fd, "r") + + # Write mixed data: CSI sequence + user text + # \x1b[5;10R is a DSR response (7 bytes) + # "hello" is user input (5 bytes) + test_data = b"\x1b[5;10Rhello" + os.write(master_fd, test_data) + + time.sleep(0.05) + + with mock.patch.object(sys, "stdin", slave_file): + bytes_flushed = flush_stdin() + + # Only the CSI sequence should be counted as flushed + assert bytes_flushed == 7, f"Expected 7 bytes flushed, got {bytes_flushed}" + + # "hello" should be preserved in buffer + buffered = get_buffered_input() + assert buffered == b"hello", f"Expected b'hello' buffered, got {buffered!r}" + + finally: + os.close(master_fd) + if slave_file is not None: + try: + slave_file.close() + except OSError: + pass + + def test_flush_stdin_drains_pending_data(self): + """Verify flush_stdin actually drains pending escape sequence data. + + This test writes pure escape sequence data to a PTY and verifies + that flush_stdin reads and discards it, returning the correct byte count. + """ + import pty + import time + + from openhands.sdk.logger import logger as logger_module + + # Clear any existing buffer + logger_module._preserved_input_buffer = b"" + + master_fd, slave_fd = pty.openpty() + slave_file = None + + try: + slave_file = os.fdopen(slave_fd, "r") + + # Write escape sequence data to the master (simulating terminal response) + test_data = b"\x1b[5;10R" # Simulated DSR response + os.write(master_fd, test_data) + + time.sleep(0.05) + + with mock.patch.object(sys, "stdin", slave_file): + bytes_flushed = flush_stdin() + + # Verify data was flushed + assert bytes_flushed == len(test_data), ( + f"Expected {len(test_data)} bytes flushed, got {bytes_flushed}" + ) + + # Nothing should be buffered (it was all escape sequences) + buffered = get_buffered_input() + assert buffered == b"", f"Expected empty buffer, got {buffered!r}" + + finally: + os.close(master_fd) + if slave_file is not None: + try: + slave_file.close() + except OSError: + pass diff --git a/tests/tools/terminal/test_escape_filter.py b/tests/tools/terminal/test_escape_filter.py new file mode 100644 index 0000000000..f82d3dc2a0 --- /dev/null +++ b/tests/tools/terminal/test_escape_filter.py @@ -0,0 +1,114 @@ +"""Tests for terminal escape sequence filtering.""" + +from openhands.tools.terminal.utils.escape_filter import filter_terminal_queries + + +class TestFilterTerminalQueries: + """Tests for filter_terminal_queries function.""" + + def test_removes_dsr_query(self): + """DSR (cursor position query) should be removed.""" + # ESC [ 6 n is the DSR query + output = "before\x1b[6nafter" + result = filter_terminal_queries(output) + assert result == "beforeafter" + + def test_removes_multiple_dsr_queries(self): + """Multiple DSR queries should all be removed.""" + output = "\x1b[6n\x1b[6ntext\x1b[6n" + result = filter_terminal_queries(output) + assert result == "text" + + def test_removes_osc_11_query_with_bel(self): + """OSC 11 (background color query) with BEL terminator should be removed.""" + # ESC ] 11 ; ? BEL + output = "before\x1b]11;?\x07after" + result = filter_terminal_queries(output) + assert result == "beforeafter" + + def test_removes_osc_11_query_with_st(self): + """OSC 11 (background color query) with ST terminator should be removed.""" + # ESC ] 11 ; ? ESC \ + output = "before\x1b]11;?\x1b\\after" + result = filter_terminal_queries(output) + assert result == "beforeafter" + + def test_removes_osc_10_query(self): + """OSC 10 (foreground color query) should be removed.""" + output = "before\x1b]10;?\x07after" + result = filter_terminal_queries(output) + assert result == "beforeafter" + + def test_removes_osc_4_query(self): + """OSC 4 (palette color query) should be removed.""" + output = "before\x1b]4;0;?\x07after" + result = filter_terminal_queries(output) + assert result == "beforeafter" + + def test_removes_da_query(self): + """DA (Device Attributes) primary query should be removed.""" + output = "before\x1b[cafter" + result = filter_terminal_queries(output) + assert result == "beforeafter" + + def test_removes_da_query_with_zero(self): + """DA query with explicit 0 parameter should be removed.""" + output = "before\x1b[0cafter" + result = filter_terminal_queries(output) + assert result == "beforeafter" + + def test_removes_da2_query(self): + """DA2 (Secondary Device Attributes) query should be removed.""" + output = "before\x1b[>cafter" + result = filter_terminal_queries(output) + assert result == "beforeafter" + + def test_preserves_color_codes(self): + """ANSI color codes should be preserved.""" + # Red text: ESC [ 31 m + output = "\x1b[31mred text\x1b[0m" + result = filter_terminal_queries(output) + assert result == "\x1b[31mred text\x1b[0m" + + def test_preserves_cursor_movement(self): + """Cursor movement sequences should be preserved.""" + # Move cursor up: ESC [ A + output = "line1\x1b[Aline2" + result = filter_terminal_queries(output) + assert result == "line1\x1b[Aline2" + + def test_preserves_bold_formatting(self): + """Bold/bright formatting should be preserved.""" + output = "\x1b[1mbold\x1b[0m" + result = filter_terminal_queries(output) + assert result == "\x1b[1mbold\x1b[0m" + + def test_real_gh_output(self): + """Test with actual captured gh command output pattern.""" + # This is a simplified version of what gh outputs + output = ( + "\x1b]11;?\x1b\\\x1b[6n" # Query sequences at start + "\nShowing 3 of 183 open pull requests\n" + "\x1b[0;32m#13199\x1b[0m feat: feature \x1b[0;36mbranch\x1b[0m" + ) + result = filter_terminal_queries(output) + # Query sequences removed, colors preserved + assert "\x1b[6n" not in result + assert "\x1b]11;?" not in result + assert "\x1b[0;32m" in result # Color preserved + assert "Showing 3 of 183" in result + + def test_empty_string(self): + """Empty string should return empty string.""" + assert filter_terminal_queries("") == "" + + def test_no_escape_sequences(self): + """Plain text without escape sequences should pass through unchanged.""" + output = "Hello, world!\nLine 2" + assert filter_terminal_queries(output) == output + + def test_unicode_content(self): + """Unicode content should be preserved.""" + output = "Hello 🌍\x1b[6nδΈ–η•Œ" + result = filter_terminal_queries(output) + assert result == "Hello πŸŒδΈ–η•Œ"