diff --git a/openhands-sdk/openhands/sdk/agent/base.py b/openhands-sdk/openhands/sdk/agent/base.py index 26ffd5204f..3fd6b3d5a3 100644 --- a/openhands-sdk/openhands/sdk/agent/base.py +++ b/openhands-sdk/openhands/sdk/agent/base.py @@ -22,7 +22,7 @@ from openhands.sdk.llm import LLM from openhands.sdk.llm.utils.model_prompt_spec import get_model_prompt_spec from openhands.sdk.logger import get_logger -from openhands.sdk.mcp import create_mcp_tools +from openhands.sdk.mcp import create_mcp_tools_graceful from openhands.sdk.tool import ( BUILT_IN_TOOL_CLASSES, BUILT_IN_TOOLS, @@ -318,6 +318,7 @@ def _initialize(self, state: ConversationState): return tools: list[ToolDefinition] = [] + mcp_errors: list[str] = [] # Use ThreadPoolExecutor to parallelize tool resolution with ThreadPoolExecutor(max_workers=4) as executor: @@ -328,16 +329,33 @@ def _initialize(self, state: ConversationState): future = executor.submit(resolve_tool, tool_spec, state) futures.append(future) - # Submit MCP tools creation if configured + # Submit MCP tools creation if configured (using graceful degradation) + mcp_future = None if self.mcp_config: - future = executor.submit(create_mcp_tools, self.mcp_config, 30) - futures.append(future) + mcp_future = executor.submit( + create_mcp_tools_graceful, self.mcp_config, 30 + ) - # Collect results as they complete + # Collect regular tool results for future in futures: result = future.result() tools.extend(result) + # Collect MCP results with graceful degradation + if mcp_future is not None: + mcp_result = mcp_future.result() + tools.extend(mcp_result.tools) + if mcp_result.has_errors: + for err in mcp_result.errors: + mcp_errors.append(f"{err.server_name}: {err}") + + # Log MCP errors but continue with available tools + if mcp_errors: + logger.warning( + "Some MCP servers failed to initialize:\n" + + "\n".join(f" - {e}" for e in mcp_errors) + ) + logger.info( f"Loaded {len(tools)} tools from spec: {[tool.name for tool in tools]}" ) diff --git a/openhands-sdk/openhands/sdk/mcp/__init__.py b/openhands-sdk/openhands/sdk/mcp/__init__.py index 4e0cfc11f4..a448d29cd9 100644 --- a/openhands-sdk/openhands/sdk/mcp/__init__.py +++ b/openhands-sdk/openhands/sdk/mcp/__init__.py @@ -2,13 +2,15 @@ from openhands.sdk.mcp.client import MCPClient from openhands.sdk.mcp.definition import MCPToolAction, MCPToolObservation -from openhands.sdk.mcp.exceptions import MCPError, MCPTimeoutError +from openhands.sdk.mcp.exceptions import MCPError, MCPServerError, MCPTimeoutError from openhands.sdk.mcp.tool import ( MCPToolDefinition, MCPToolExecutor, ) from openhands.sdk.mcp.utils import ( + MCPToolsResult, create_mcp_tools, + create_mcp_tools_graceful, ) @@ -18,7 +20,10 @@ "MCPToolAction", "MCPToolObservation", "MCPToolExecutor", + "MCPToolsResult", "create_mcp_tools", + "create_mcp_tools_graceful", "MCPError", + "MCPServerError", "MCPTimeoutError", ] diff --git a/openhands-sdk/openhands/sdk/mcp/exceptions.py b/openhands-sdk/openhands/sdk/mcp/exceptions.py index 6a627110c4..f6adeb30c9 100644 --- a/openhands-sdk/openhands/sdk/mcp/exceptions.py +++ b/openhands-sdk/openhands/sdk/mcp/exceptions.py @@ -17,3 +17,18 @@ def __init__(self, message: str, timeout: float, config: dict | None = None): self.timeout = timeout self.config = config super().__init__(message) + + +class MCPServerError(MCPError): + """Exception raised when an individual MCP server fails to initialize. + + Contains details about which server failed and the underlying cause. + """ + + server_name: str + cause: Exception | None + + def __init__(self, message: str, server_name: str, cause: Exception | None = None): + self.server_name = server_name + self.cause = cause + super().__init__(message) diff --git a/openhands-sdk/openhands/sdk/mcp/utils.py b/openhands-sdk/openhands/sdk/mcp/utils.py index fc53f77117..fe9072ddcb 100644 --- a/openhands-sdk/openhands/sdk/mcp/utils.py +++ b/openhands-sdk/openhands/sdk/mcp/utils.py @@ -1,6 +1,8 @@ """Utility functions for MCP integration.""" import logging +from dataclasses import dataclass, field +from typing import TYPE_CHECKING import mcp.types from fastmcp.client.logging import LogMessage @@ -8,14 +10,41 @@ from openhands.sdk.logger import get_logger from openhands.sdk.mcp.client import MCPClient -from openhands.sdk.mcp.exceptions import MCPTimeoutError +from openhands.sdk.mcp.exceptions import MCPServerError, MCPTimeoutError from openhands.sdk.mcp.tool import MCPToolDefinition +if TYPE_CHECKING: + from fastmcp.mcp_config import MCPServerTypes + logger = get_logger(__name__) LOGGING_LEVEL_MAP = logging.getLevelNamesMapping() +@dataclass +class MCPToolsResult: + """Result of creating MCP tools with graceful degradation. + + Contains both the successfully initialized tools and any errors that occurred + during initialization of individual servers. + """ + + tools: list[MCPToolDefinition] = field(default_factory=list) + errors: list[MCPServerError] = field(default_factory=list) + + @property + def has_errors(self) -> bool: + """Return True if any server failed to initialize.""" + return len(self.errors) > 0 + + def error_summary(self) -> str: + """Return a human-readable summary of all errors.""" + if not self.errors: + return "" + parts = [f"- {err.server_name}: {err}" for err in self.errors] + return "MCP server initialization failures:\n" + "\n".join(parts) + + async def log_handler(message: LogMessage): """ Handles incoming logs from the MCP server and forwards them @@ -89,3 +118,105 @@ def create_mcp_tools( logger.info(f"Created {len(client.tools)} MCP tools: {[t.name for t in client]}") return client + + +def _create_single_server_tools( + server_name: str, + server_config: "MCPServerTypes", + timeout: float, +) -> MCPClient: + """Create MCP tools for a single server. + + Internal helper used by create_mcp_tools_graceful. + Raises exceptions on failure (caller handles graceful degradation). + """ + single_server_config = MCPConfig(mcpServers={server_name: server_config}) + client = MCPClient(single_server_config, log_handler=log_handler) + + try: + client.call_async_from_sync( + _connect_and_list_tools, timeout=timeout, client=client + ) + except TimeoutError as e: + client.sync_close() + raise MCPTimeoutError( + f"MCP server '{server_name}' timed out after {timeout} seconds", + timeout=timeout, + config=single_server_config.model_dump(), + ) from e + except BaseException: + try: + client.sync_close() + except Exception as close_exc: + logger.warning( + f"Failed to close MCP client for '{server_name}' during error cleanup", + exc_info=close_exc, + ) + raise + + return client + + +def create_mcp_tools_graceful( + config: dict | MCPConfig, + timeout: float = 30.0, +) -> MCPToolsResult: + """Create MCP tools with per-server graceful degradation. + + Unlike create_mcp_tools() which fails if ANY server fails, this function + attempts to initialize each MCP server individually and continues even + if some servers fail. + + Args: + config: MCP configuration dictionary or MCPConfig object containing + server definitions. + timeout: Timeout in seconds for each server connection (default: 30.0). + + Returns: + MCPToolsResult containing: + - tools: List of successfully loaded MCPToolDefinitions from all servers + - errors: List of MCPServerError for any servers that failed + + Example: + result = create_mcp_tools_graceful(config) + if result.has_errors: + logger.warning(result.error_summary()) + for tool in result.tools: + # use tool + """ + if isinstance(config, dict): + config = MCPConfig.model_validate(config) + + if not config.mcpServers: + return MCPToolsResult() + + result = MCPToolsResult() + + for server_name, server_config in config.mcpServers.items(): + try: + client = _create_single_server_tools(server_name, server_config, timeout) + result.tools.extend(client.tools) + logger.info( + f"MCP server '{server_name}' connected: {len(client.tools)} tools" + ) + except Exception as e: + error = MCPServerError( + message=str(e), + server_name=server_name, + cause=e, + ) + result.errors.append(error) + logger.warning(f"MCP server '{server_name}' failed to initialize: {e}") + + if result.tools: + logger.info( + f"Created {len(result.tools)} MCP tools from " + f"{len(config.mcpServers) - len(result.errors)} servers" + ) + if result.errors: + logger.warning( + f"{len(result.errors)} MCP server(s) failed: " + f"{[e.server_name for e in result.errors]}" + ) + + return result diff --git a/tests/sdk/conversation/test_local_conversation_plugins.py b/tests/sdk/conversation/test_local_conversation_plugins.py index e7ae709fe4..ff386689f3 100644 --- a/tests/sdk/conversation/test_local_conversation_plugins.py +++ b/tests/sdk/conversation/test_local_conversation_plugins.py @@ -227,17 +227,21 @@ def test_plugin_mcp_config_is_initialized( This is a regression test for a bug where MCP tools from plugins were not being created because the agent was initialized before plugins were loaded. """ - # Mock create_mcp_tools to avoid actually starting MCP servers in tests + from openhands.sdk.mcp import MCPToolsResult + + # Mock create_mcp_tools_graceful to avoid actually starting MCP servers mcp_tools_created = [] - def mock_create_mcp_tools(config, timeout): + def mock_create_mcp_tools_graceful(config, timeout): mcp_tools_created.append(config) - return [] # Return empty list for testing + return MCPToolsResult(tools=[], errors=[]) # Return empty result import openhands.sdk.agent.base monkeypatch.setattr( - openhands.sdk.agent.base, "create_mcp_tools", mock_create_mcp_tools + openhands.sdk.agent.base, + "create_mcp_tools_graceful", + mock_create_mcp_tools_graceful, ) plugin_dir = create_test_plugin( @@ -269,7 +273,8 @@ def mock_create_mcp_tools(config, timeout): assert "test-server" in conversation.agent.mcp_config["mcpServers"] # The agent should have been initialized with the complete MCP config - # This verifies that create_mcp_tools was called with the plugin's MCP config + # This verifies that create_mcp_tools_graceful was called with the plugin's + # MCP config assert len(mcp_tools_created) > 0 assert "mcpServers" in mcp_tools_created[-1] assert "test-server" in mcp_tools_created[-1]["mcpServers"] diff --git a/tests/sdk/mcp/test_create_mcp_tools_graceful.py b/tests/sdk/mcp/test_create_mcp_tools_graceful.py new file mode 100644 index 0000000000..6a5b6f4d4a --- /dev/null +++ b/tests/sdk/mcp/test_create_mcp_tools_graceful.py @@ -0,0 +1,319 @@ +"""Tests for MCP graceful degradation functionality.""" + +import asyncio +import logging +import socket +import threading +import time +from collections.abc import Generator +from typing import Literal + +import httpx +import pytest +from fastmcp import FastMCP + +from openhands.sdk.mcp import ( + MCPServerError, + MCPToolsResult, + create_mcp_tools_graceful, +) + + +logger = logging.getLogger(__name__) + +MCPTransport = Literal["http", "streamable-http", "sse"] + + +def _find_free_port() -> int: + """Find an available port on localhost.""" + with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s: + s.bind(("", 0)) + return s.getsockname()[1] + + +def _wait_for_port(port: int, timeout: float = 5.0, interval: float = 0.1) -> None: + """Wait for a port to become available by polling with HTTP requests.""" + max_attempts = int(timeout / interval) + for _ in range(max_attempts): + try: + with httpx.Client(timeout=interval) as client: + client.get(f"http://127.0.0.1:{port}/") + return + except httpx.ConnectError: + pass + except (httpx.TimeoutException, httpx.HTTPStatusError): + return + except Exception: + return + time.sleep(interval) + raise RuntimeError(f"Server failed to start on port {port} within {timeout}s") + + +class MCPTestServer: + """Helper class to manage MCP test servers for testing.""" + + def __init__(self, name: str = "test-server"): + self.mcp = FastMCP(name) + self.port: int | None = None + self._server_thread: threading.Thread | None = None + + def add_tool(self, func): + """Add a tool to the server.""" + return self.mcp.tool()(func) + + def start(self, transport: MCPTransport = "http") -> int: + """Start the server and return the port.""" + self.port = _find_free_port() + path = "/sse" if transport == "sse" else "/mcp" + startup_error: list[Exception] = [] + + async def run_server(): + assert self.port is not None + await self.mcp.run_http_async( + host="127.0.0.1", + port=self.port, + transport=transport, + show_banner=False, + path=path, + ) + + def server_thread_target(): + loop = asyncio.new_event_loop() + asyncio.set_event_loop(loop) + try: + loop.run_until_complete(run_server()) + except Exception as e: + logger.error(f"MCP test server failed: {e}") + startup_error.append(e) + finally: + loop.close() + + self._server_thread = threading.Thread(target=server_thread_target, daemon=True) + self._server_thread.start() + _wait_for_port(self.port) + + if startup_error: + raise startup_error[0] + + return self.port + + def stop(self): + """Stop the server and clean up resources.""" + if self._server_thread is not None: + self._server_thread = None + self.port = None + + +@pytest.fixture +def http_mcp_server() -> Generator[MCPTestServer]: + """Fixture providing a running HTTP MCP server with test tools.""" + server = MCPTestServer("http-test-server") + + @server.add_tool + def greet(name: str) -> str: + """Greet someone by name.""" + return f"Hello, {name}!" + + @server.add_tool + def add_numbers(a: int, b: int) -> int: + """Add two numbers together.""" + return a + b + + server.start(transport="http") + yield server + server.stop() + + +@pytest.fixture +def sse_mcp_server() -> Generator[MCPTestServer]: + """Fixture providing a running SSE MCP server with test tools.""" + server = MCPTestServer("sse-test-server") + + @server.add_tool + def echo(message: str) -> str: + """Echo a message back.""" + return message + + @server.add_tool + def multiply(x: int, y: int) -> int: + """Multiply two numbers.""" + return x * y + + server.start(transport="sse") + yield server + server.stop() + + +def test_create_mcp_tools_graceful_empty_config(): + """Test graceful creation with empty config returns empty result.""" + result = create_mcp_tools_graceful({}) + assert isinstance(result, MCPToolsResult) + assert len(result.tools) == 0 + assert len(result.errors) == 0 + assert not result.has_errors + + +def test_create_mcp_tools_graceful_single_server(http_mcp_server: MCPTestServer): + """Test graceful creation with a single working server.""" + config = { + "mcpServers": { + "http_server": { + "transport": "http", + "url": f"http://127.0.0.1:{http_mcp_server.port}/mcp", + } + } + } + + result = create_mcp_tools_graceful(config, timeout=10.0) + + assert len(result.tools) == 2 + assert not result.has_errors + tool_names = {t.name for t in result.tools} + assert "greet" in tool_names + assert "add_numbers" in tool_names + + +def test_create_mcp_tools_graceful_with_failing_server(): + """Test graceful creation when one server fails.""" + config = { + "mcpServers": { + "nonexistent": { + "transport": "http", + "url": "http://127.0.0.1:59999/mcp", + } + } + } + + result = create_mcp_tools_graceful(config, timeout=5.0) + + assert len(result.tools) == 0 + assert result.has_errors + assert len(result.errors) == 1 + assert result.errors[0].server_name == "nonexistent" + + +def test_create_mcp_tools_graceful_mixed_success_failure( + http_mcp_server: MCPTestServer, +): + """Test graceful degradation with one working and one failing server.""" + config = { + "mcpServers": { + "working_server": { + "transport": "http", + "url": f"http://127.0.0.1:{http_mcp_server.port}/mcp", + }, + "failing_server": { + "transport": "http", + "url": "http://127.0.0.1:59999/mcp", + }, + } + } + + result = create_mcp_tools_graceful(config, timeout=5.0) + + # Should have tools from working server + # Note: since each server is initialized independently, tools are NOT prefixed + assert len(result.tools) == 2 + tool_names = {t.name for t in result.tools} + assert "greet" in tool_names + assert "add_numbers" in tool_names + + # Should have error for failing server + assert result.has_errors + assert len(result.errors) == 1 + assert result.errors[0].server_name == "failing_server" + + +def test_create_mcp_tools_graceful_multiple_working_servers( + http_mcp_server: MCPTestServer, sse_mcp_server: MCPTestServer +): + """Test graceful creation with multiple working servers.""" + config = { + "mcpServers": { + "http_server": { + "transport": "http", + "url": f"http://127.0.0.1:{http_mcp_server.port}/mcp", + }, + "sse_server": { + "transport": "sse", + "url": f"http://127.0.0.1:{sse_mcp_server.port}/sse", + }, + } + } + + result = create_mcp_tools_graceful(config, timeout=10.0) + + # Should have tools from both servers + # Note: since each server is initialized independently (single-server config), + # tools are NOT prefixed with server names + assert len(result.tools) == 4 + assert not result.has_errors + tool_names = {t.name for t in result.tools} + assert "greet" in tool_names + assert "add_numbers" in tool_names + assert "echo" in tool_names + assert "multiply" in tool_names + + +def test_create_mcp_tools_graceful_all_servers_fail(): + """Test graceful degradation when all servers fail.""" + config = { + "mcpServers": { + "server1": { + "transport": "http", + "url": "http://127.0.0.1:59999/mcp", + }, + "server2": { + "transport": "http", + "url": "http://127.0.0.1:59998/mcp", + }, + } + } + + result = create_mcp_tools_graceful(config, timeout=5.0) + + # Should have no tools + assert len(result.tools) == 0 + + # Should have errors for both servers + assert result.has_errors + assert len(result.errors) == 2 + error_names = {e.server_name for e in result.errors} + assert "server1" in error_names + assert "server2" in error_names + + +def test_mcp_tools_result_error_summary(): + """Test that error_summary formats errors correctly.""" + result = MCPToolsResult( + tools=[], + errors=[ + MCPServerError("Connection refused", "server1", None), + MCPServerError("Timeout", "server2", None), + ], + ) + + summary = result.error_summary() + assert "MCP server initialization failures:" in summary + assert "server1" in summary + assert "server2" in summary + + +def test_mcp_tools_result_error_summary_empty(): + """Test that error_summary returns empty string when no errors.""" + result = MCPToolsResult(tools=[], errors=[]) + assert result.error_summary() == "" + + +def test_mcp_server_error_attributes(): + """Test MCPServerError exception attributes.""" + cause = ValueError("test error") + error = MCPServerError( + message="Server failed", + server_name="test_server", + cause=cause, + ) + + assert error.server_name == "test_server" + assert error.cause is cause + assert str(error) == "Server failed"