From 8a7afa1116b5049d9db94ca528b04adb1f279320 Mon Sep 17 00:00:00 2001 From: Daniel DeGroff Date: Thu, 19 Feb 2026 17:00:02 -0700 Subject: [PATCH 1/2] feat: add MaybeDontAnalyzer security analyzer Implements a SecurityAnalyzerBase that validates agent actions against policy rules configured in a Maybe Don't Gateway instance. Calls the gateway's POST /api/v1/action/validate endpoint and maps the response risk_level directly to SecurityRisk. - MaybeDontAnalyzer class following GraySwan patterns - 41 tests covering init, request building, risk mapping, error handling, HTTP lifecycle, and end-to-end security_risk flow - Example script (40_maybedont_security_analyzer.py) - Exported from openhands.sdk.security module Co-Authored-By: Claude Opus 4.6 --- .../40_maybedont_security_analyzer.py | 146 ++++ .../openhands/sdk/security/__init__.py | 2 + .../sdk/security/maybedont/__init__.py | 4 + .../sdk/security/maybedont/analyzer.py | 264 ++++++++ tests/sdk/security/maybedont/__init__.py | 0 .../maybedont/test_maybedont_analyzer.py | 638 ++++++++++++++++++ 6 files changed, 1054 insertions(+) create mode 100644 examples/01_standalone_sdk/40_maybedont_security_analyzer.py create mode 100644 openhands-sdk/openhands/sdk/security/maybedont/__init__.py create mode 100644 openhands-sdk/openhands/sdk/security/maybedont/analyzer.py create mode 100644 tests/sdk/security/maybedont/__init__.py create mode 100644 tests/sdk/security/maybedont/test_maybedont_analyzer.py diff --git a/examples/01_standalone_sdk/40_maybedont_security_analyzer.py b/examples/01_standalone_sdk/40_maybedont_security_analyzer.py new file mode 100644 index 0000000000..c18755f2cf --- /dev/null +++ b/examples/01_standalone_sdk/40_maybedont_security_analyzer.py @@ -0,0 +1,146 @@ +"""OpenHands Agent SDK — Maybe Don't Security Analyzer Example + +This example shows how to use the MaybeDontAnalyzer to validate agent actions +against policy rules configured in a Maybe Don't Gateway before execution. + +Prerequisites: + 1. A running Maybe Don't Gateway instance. Quick start with Docker: + + docker run -p 8080:8080 ghcr.io/maybedont/maybe-dont:latest + + For configuration, see: https://maybedont.ai/docs + + 2. Set environment variables: + - LLM_API_KEY: Your LLM provider API key + - MAYBE_DONT_GATEWAY_URL: Gateway URL (default: http://localhost:8080) + +The Maybe Don't Gateway supports two layers of protection: + - Security Analyzer (this example): Pre-execution validation of ALL actions + - MCP Proxy (separate config): Execution-time validation of MCP tool calls + +For more information, see: https://maybedont.ai/docs +""" + +import os +import signal +from collections.abc import Callable + +from pydantic import SecretStr + +from openhands.sdk import LLM, Agent, BaseConversation, Conversation +from openhands.sdk.conversation.state import ( + ConversationExecutionStatus, + ConversationState, +) +from openhands.sdk.security.confirmation_policy import ConfirmRisky +from openhands.sdk.security.maybedont import MaybeDontAnalyzer +from openhands.sdk.tool import Tool +from openhands.tools.file_editor import FileEditorTool +from openhands.tools.terminal import TerminalTool + + +# Clean ^C exit: no stack trace noise +signal.signal(signal.SIGINT, lambda *_: (_ for _ in ()).throw(KeyboardInterrupt())) + + +def _print_blocked_actions(pending_actions) -> None: + print(f"\nšŸ”’ Maybe Don't blocked {len(pending_actions)} high-risk action(s):") + for i, action in enumerate(pending_actions, start=1): + snippet = str(action.action)[:100].replace("\n", " ") + print(f" {i}. {action.tool_name}: {snippet}...") + + +def confirm_high_risk_in_console(pending_actions) -> bool: + """ + Return True to approve, False to reject. + Defaults to 'no' on EOF/KeyboardInterrupt. + """ + _print_blocked_actions(pending_actions) + while True: + try: + ans = ( + input( + "\nThese actions were flagged as HIGH RISK by Maybe Don't. " + "Do you want to execute them anyway? (yes/no): " + ) + .strip() + .lower() + ) + except (EOFError, KeyboardInterrupt): + print("\nāŒ No input received; rejecting by default.") + return False + + if ans in ("yes", "y"): + print("āœ… Approved — executing high-risk actions...") + return True + if ans in ("no", "n"): + print("āŒ Rejected — skipping high-risk actions...") + return False + print("Please enter 'yes' or 'no'.") + + +def run_until_finished_with_security( + conversation: BaseConversation, confirmer: Callable[[list], bool] +) -> None: + """ + Drive the conversation until FINISHED. + - If WAITING_FOR_CONFIRMATION: ask the confirmer. + * On approve: set execution_status = IDLE. + * On reject: conversation.reject_pending_actions(...). + """ + while conversation.state.execution_status != ConversationExecutionStatus.FINISHED: + if ( + conversation.state.execution_status + == ConversationExecutionStatus.WAITING_FOR_CONFIRMATION + ): + pending = ConversationState.get_unmatched_actions(conversation.state.events) + if not pending: + raise RuntimeError( + "āš ļø Agent is waiting for confirmation but no pending actions " + "were found. This should not happen." + ) + if not confirmer(pending): + conversation.reject_pending_actions("User rejected high-risk actions") + continue + + print("ā–¶ļø Running conversation.run()...") + conversation.run() + + +# Configure LLM +api_key = os.getenv("LLM_API_KEY") +assert api_key is not None, "LLM_API_KEY environment variable is not set." +model = os.getenv("LLM_MODEL", "anthropic/claude-sonnet-4-5-20250929") +base_url = os.getenv("LLM_BASE_URL") +llm = LLM( + usage_id="maybedont-security", + model=model, + base_url=base_url, + api_key=SecretStr(api_key), +) + +# Tools +tools = [ + Tool(name=TerminalTool.name), + Tool(name=FileEditorTool.name), +] + +# Agent +agent = Agent(llm=llm, tools=tools) + +# Conversation with Maybe Don't security analyzer +# The analyzer calls the Maybe Don't Gateway to validate actions before execution. +# Gateway URL defaults to http://localhost:8080, or set MAYBE_DONT_GATEWAY_URL. +conversation = Conversation( + agent=agent, persistence_dir="./.conversations", workspace="." +) +conversation.set_security_analyzer(MaybeDontAnalyzer()) +conversation.set_confirmation_policy(ConfirmRisky()) + +print("\n1) Safe command (LOW risk - should execute automatically)...") +conversation.send_message("List files in the current directory") +conversation.run() + +print("\n2) Potentially risky command (may require confirmation)...") +conversation.send_message("Delete all files in the /tmp directory recursively") +run_until_finished_with_security(conversation, confirm_high_risk_in_console) diff --git a/openhands-sdk/openhands/sdk/security/__init__.py b/openhands-sdk/openhands/sdk/security/__init__.py index 8d3f204d31..8036c662af 100644 --- a/openhands-sdk/openhands/sdk/security/__init__.py +++ b/openhands-sdk/openhands/sdk/security/__init__.py @@ -7,6 +7,7 @@ ) from openhands.sdk.security.grayswan import GraySwanAnalyzer from openhands.sdk.security.llm_analyzer import LLMSecurityAnalyzer +from openhands.sdk.security.maybedont import MaybeDontAnalyzer from openhands.sdk.security.risk import SecurityRisk @@ -15,6 +16,7 @@ "SecurityAnalyzerBase", "LLMSecurityAnalyzer", "GraySwanAnalyzer", + "MaybeDontAnalyzer", "ConfirmationPolicyBase", "AlwaysConfirm", "NeverConfirm", diff --git a/openhands-sdk/openhands/sdk/security/maybedont/__init__.py b/openhands-sdk/openhands/sdk/security/maybedont/__init__.py new file mode 100644 index 0000000000..ddb9698194 --- /dev/null +++ b/openhands-sdk/openhands/sdk/security/maybedont/__init__.py @@ -0,0 +1,4 @@ +from openhands.sdk.security.maybedont.analyzer import MaybeDontAnalyzer + + +__all__ = ["MaybeDontAnalyzer"] diff --git a/openhands-sdk/openhands/sdk/security/maybedont/analyzer.py b/openhands-sdk/openhands/sdk/security/maybedont/analyzer.py new file mode 100644 index 0000000000..906cc50b00 --- /dev/null +++ b/openhands-sdk/openhands/sdk/security/maybedont/analyzer.py @@ -0,0 +1,264 @@ +"""Maybe Don't Gateway security analyzer for OpenHands SDK. + +This module provides a security analyzer that validates agent actions against +policy rules configured in the Maybe Don't Gateway. It calls the gateway's +action validation endpoint before actions are executed. + +For more information, see: https://maybedont.ai/docs +""" + +from __future__ import annotations + +import json +import os +from typing import Any + +import httpx +from pydantic import Field, PrivateAttr + +from openhands.sdk.event import ActionEvent, LLMConvertibleEvent +from openhands.sdk.logger import get_logger +from openhands.sdk.security.analyzer import SecurityAnalyzerBase +from openhands.sdk.security.risk import SecurityRisk + + +logger = get_logger(__name__) + +_RISK_MAP: dict[str, SecurityRisk] = { + "high": SecurityRisk.HIGH, + "medium": SecurityRisk.MEDIUM, + "low": SecurityRisk.LOW, + "unknown": SecurityRisk.UNKNOWN, +} + + +class MaybeDontAnalyzer(SecurityAnalyzerBase): + """Security analyzer using the Maybe Don't Gateway for policy-based validation. + + This analyzer sends agent actions to a Maybe Don't Gateway instance for + evaluation against configured CEL and AI-powered policy rules. The gateway + returns a risk level that maps directly to SecurityRisk. + + The Maybe Don't Gateway supports two layers of protection: + - **Security Analyzer** (this class): Pre-execution validation of ALL actions + (shell commands, file ops, browser, tool calls) + - **MCP Proxy** (separate config): Execution-time validation of MCP tool calls + + Environment Variables: + MAYBE_DONT_GATEWAY_URL: Gateway base URL (default: http://localhost:8080) + + Example: + >>> from openhands.sdk.security.maybedont import MaybeDontAnalyzer + >>> analyzer = MaybeDontAnalyzer() + >>> risk = analyzer.security_risk(action_event) + """ + + gateway_url: str | None = Field( + default=None, + description="Maybe Don't Gateway base URL (via MAYBE_DONT_GATEWAY_URL env var)", + ) + timeout: float = Field( + default=30.0, + description="Request timeout in seconds", + ) + client_id: str = Field( + default="openhands", + description="Client identifier for audit attribution", + ) + + _client: httpx.Client | None = PrivateAttr(default=None) + _events: list[LLMConvertibleEvent] = PrivateAttr(default_factory=list) + + def model_post_init(self, __context: Any) -> None: + """Initialize the analyzer after model creation.""" + # Resolve gateway URL: explicit param > env var > default + if self.gateway_url is None: + env_url = os.getenv("MAYBE_DONT_GATEWAY_URL") + if env_url: + self.gateway_url = env_url + logger.debug( + "Gateway URL resolved from MAYBE_DONT_GATEWAY_URL " + "environment variable" + ) + else: + self.gateway_url = "http://localhost:8080" + + logger.info( + f"MaybeDontAnalyzer initialized with gateway_url={self.gateway_url}, " + f"timeout={self.timeout}s, client_id={self.client_id}" + ) + + def set_events(self, events: Any) -> None: + """Store events for future use. + + The Maybe Don't Gateway does not currently use conversation history + for action validation. Events are stored for interface compatibility + and future extensibility. + + Args: + events: Sequence of events (stored but not used in v1) + """ + self._events = list(events) + + def _create_client(self) -> httpx.Client: + """Create a new HTTP client instance.""" + return httpx.Client( + timeout=self.timeout, + headers={ + "Content-Type": "application/json", + "X-Maybe-Dont-Client-ID": self.client_id, + }, + ) + + def _get_client(self) -> httpx.Client: + """Get or create HTTP client.""" + if self._client is None: + self._client = self._create_client() + elif self._client.is_closed: + self._client = self._create_client() + return self._client + + def _build_request(self, action: ActionEvent) -> dict[str, Any]: + """Build the action validation request body from an ActionEvent. + + Maps ActionEvent fields to the gateway's ActionValidationRequest format. + + Args: + action: The ActionEvent to convert + + Returns: + Dictionary matching the gateway's expected request format + """ + # Parse tool_call arguments from JSON string to dict + parameters: dict[str, Any] = {} + if action.tool_call and action.tool_call.arguments: + try: + parameters = json.loads(action.tool_call.arguments) + except (json.JSONDecodeError, TypeError): + logger.warning( + f"Failed to parse tool_call arguments for {action.tool_name}, " + "sending empty parameters" + ) + + # Build context from agent reasoning + context: dict[str, str] = {} + if action.thought: + thought_text = " ".join(t.text for t in action.thought if t.text) + if thought_text: + context["thought"] = thought_text + if action.summary: + context["summary"] = action.summary + + # v1: All actions are sent as "tool_call" — the gateway evaluates them + # uniformly via EvaluateToolCall(). A future version may route by + # action_type for type-specific evaluation. + request: dict[str, Any] = { + "action_type": "tool_call", + "target": action.tool_name, + "parameters": parameters, + "actor": self.client_id, + } + if context: + request["context"] = context + + return request + + def _map_response_to_risk(self, response: dict[str, Any]) -> SecurityRisk: + """Map gateway response to SecurityRisk. + + Args: + response: Parsed JSON response from the gateway + + Returns: + SecurityRisk level based on the gateway's risk_level field + """ + risk_level = response.get("risk_level", "unknown") + return _RISK_MAP.get(risk_level, SecurityRisk.UNKNOWN) + + def _call_gateway(self, payload: dict[str, Any]) -> SecurityRisk: + """Call the Maybe Don't Gateway action validation endpoint. + + Args: + payload: Request body for the action validation endpoint + + Returns: + SecurityRisk level based on gateway response + """ + url = f"{self.gateway_url.rstrip('/')}/api/v1/action/validate" + + try: + client = self._get_client() + + logger.debug( + f"Sending action validation request to {url} " + f"for target: {payload.get('target')}" + ) + + response = client.post(url, json=payload) + + if response.status_code == 200: + try: + result = response.json() + except json.JSONDecodeError: + logger.error( + f"Invalid JSON from Maybe Don't Gateway: {response.text}" + ) + return SecurityRisk.UNKNOWN + + risk = self._map_response_to_risk(result) + allowed = result.get("allowed", True) + + logger.info( + f"Maybe Don't risk assessment: {risk.name} " + f"(allowed={allowed}, target={payload.get('target')})" + ) + return risk + + elif response.status_code == 400: + logger.error( + f"Maybe Don't Gateway rejected request (400): {response.text}" + ) + return SecurityRisk.UNKNOWN + + else: + logger.error( + f"Maybe Don't Gateway error {response.status_code}: {response.text}" + ) + return SecurityRisk.UNKNOWN + + except httpx.TimeoutException: + logger.error("Maybe Don't Gateway request timed out") + return SecurityRisk.UNKNOWN + except Exception as e: + logger.error(f"Maybe Don't Gateway request failed: {e}") + return SecurityRisk.UNKNOWN + + def security_risk(self, action: ActionEvent) -> SecurityRisk: + """Analyze action for security risks using the Maybe Don't Gateway. + + Converts the ActionEvent to the gateway's request format and calls the + action validation endpoint. The gateway evaluates the action against + configured CEL and AI-powered policy rules. + + Args: + action: The ActionEvent to analyze + + Returns: + SecurityRisk level based on gateway policy evaluation + """ + logger.debug( + f"Calling security_risk on MaybeDontAnalyzer for action: {action.tool_name}" + ) + + try: + payload = self._build_request(action) + return self._call_gateway(payload) + except Exception as e: + logger.error(f"Maybe Don't security analysis failed: {e}") + return SecurityRisk.UNKNOWN + + def close(self) -> None: + """Clean up resources.""" + if self._client is not None and not self._client.is_closed: + self._client.close() + self._client = None diff --git a/tests/sdk/security/maybedont/__init__.py b/tests/sdk/security/maybedont/__init__.py new file mode 100644 index 0000000000..e69de29bb2 diff --git a/tests/sdk/security/maybedont/test_maybedont_analyzer.py b/tests/sdk/security/maybedont/test_maybedont_analyzer.py new file mode 100644 index 0000000000..99e19f74a9 --- /dev/null +++ b/tests/sdk/security/maybedont/test_maybedont_analyzer.py @@ -0,0 +1,638 @@ +"""Tests for the MaybeDontAnalyzer class.""" + +import json +from unittest.mock import MagicMock, patch + +import httpx +import pytest + +from openhands.sdk.event import ActionEvent +from openhands.sdk.llm import MessageToolCall, TextContent +from openhands.sdk.security.maybedont import MaybeDontAnalyzer +from openhands.sdk.security.risk import SecurityRisk +from openhands.sdk.tool import Action + + +class MaybeDontTestAction(Action): + """Mock action for MaybeDont analyzer testing.""" + + command: str = "test_command" + + +def create_mock_action_event( + tool_name: str = "execute_bash", + command: str = "ls -la", + thought_text: str = "I need to list files", + summary: str | None = "listing directory contents", + security_risk: SecurityRisk = SecurityRisk.UNKNOWN, +) -> ActionEvent: + """Helper to create ActionEvent for testing.""" + return ActionEvent( + thought=[TextContent(text=thought_text)], + action=MaybeDontTestAction(command=command), + tool_name=tool_name, + tool_call_id="test_call_id", + tool_call=MessageToolCall( + id="test_call_id", + name=tool_name, + arguments=json.dumps({"command": command}), + origin="completion", + ), + llm_response_id="test_response_id", + security_risk=security_risk, + summary=summary, + ) + + +class TestMaybeDontAnalyzerInit: + """Tests for MaybeDontAnalyzer initialization.""" + + def test_init_with_defaults(self): + """Test default initialization values.""" + with patch.dict("os.environ", {}, clear=True): + analyzer = MaybeDontAnalyzer() + assert analyzer.gateway_url == "http://localhost:8080" + assert analyzer.timeout == 30.0 + assert analyzer.client_id == "openhands" + + def test_init_with_gateway_url_from_env(self): + """Test that gateway URL is read from environment.""" + with patch.dict( + "os.environ", {"MAYBE_DONT_GATEWAY_URL": "http://gateway:9090"} + ): + analyzer = MaybeDontAnalyzer() + assert analyzer.gateway_url == "http://gateway:9090" + + def test_init_with_explicit_gateway_url(self): + """Test that explicit gateway URL takes precedence over default.""" + analyzer = MaybeDontAnalyzer(gateway_url="http://custom:8080") + assert analyzer.gateway_url == "http://custom:8080" + + def test_init_explicit_takes_precedence_over_env(self): + """Test that explicit param takes precedence over env var.""" + with patch.dict( + "os.environ", + {"MAYBE_DONT_GATEWAY_URL": "http://from-env:9090"}, + ): + analyzer = MaybeDontAnalyzer(gateway_url="http://explicit:8080") + assert analyzer.gateway_url == "http://explicit:8080" + + def test_init_with_custom_timeout(self): + """Test that custom timeout can be set.""" + analyzer = MaybeDontAnalyzer(timeout=10.0) + assert analyzer.timeout == 10.0 + + def test_init_with_custom_client_id(self): + """Test that custom client_id can be set.""" + analyzer = MaybeDontAnalyzer(client_id="my-agent") + assert analyzer.client_id == "my-agent" + + def test_init_logs_configuration(self, caplog: pytest.LogCaptureFixture): + """Test that initialization logs the configuration.""" + MaybeDontAnalyzer(gateway_url="http://test:8080") + assert "MaybeDontAnalyzer initialized" in caplog.text + assert "http://test:8080" in caplog.text + + +class TestMaybeDontAnalyzerBuildRequest: + """Tests for the _build_request method.""" + + @pytest.fixture + def analyzer(self) -> MaybeDontAnalyzer: + """Create analyzer with default config.""" + return MaybeDontAnalyzer() + + def test_build_request_basic(self, analyzer: MaybeDontAnalyzer): + """Test basic request building from ActionEvent.""" + action = create_mock_action_event( + tool_name="execute_bash", + command="ls -la", + ) + + request = analyzer._build_request(action) + + assert request["action_type"] == "tool_call" + assert request["target"] == "execute_bash" + assert request["parameters"] == {"command": "ls -la"} + assert request["actor"] == "openhands" + + def test_build_request_includes_thought(self, analyzer: MaybeDontAnalyzer): + """Test that agent thought is included in context.""" + action = create_mock_action_event( + thought_text="I need to clean up temporary files", + ) + + request = analyzer._build_request(action) + + assert "context" in request + assert request["context"]["thought"] == "I need to clean up temporary files" + + def test_build_request_includes_summary(self, analyzer: MaybeDontAnalyzer): + """Test that action summary is included in context.""" + action = create_mock_action_event( + summary="cleaning up temp files", + ) + + request = analyzer._build_request(action) + + assert request["context"]["summary"] == "cleaning up temp files" + + def test_build_request_no_summary(self, analyzer: MaybeDontAnalyzer): + """Test request building without summary.""" + action = create_mock_action_event(summary=None) + + request = analyzer._build_request(action) + + assert "summary" not in request.get("context", {}) + + def test_build_request_custom_client_id(self): + """Test that custom client_id is used as actor.""" + analyzer = MaybeDontAnalyzer(client_id="my-agent") + action = create_mock_action_event() + + request = analyzer._build_request(action) + + assert request["actor"] == "my-agent" + + def test_build_request_invalid_arguments_json(self, analyzer: MaybeDontAnalyzer): + """Test that invalid JSON in tool_call arguments sends empty parameters.""" + action = ActionEvent( + thought=[TextContent(text="test thought")], + action=MaybeDontTestAction(command="test"), + tool_name="execute_bash", + tool_call_id="test_call_id", + tool_call=MessageToolCall( + id="test_call_id", + name="execute_bash", + arguments="not valid json", + origin="completion", + ), + llm_response_id="test_response_id", + ) + + request = analyzer._build_request(action) + + assert request["parameters"] == {} + + def test_build_request_empty_thought(self, analyzer: MaybeDontAnalyzer): + """Test request building with empty thought text.""" + action = create_mock_action_event(thought_text="") + + request = analyzer._build_request(action) + + # Empty thought should not be included in context + assert "thought" not in request.get("context", {}) + + +class TestMaybeDontAnalyzerRiskMapping: + """Tests for risk_level response mapping.""" + + @pytest.fixture + def analyzer(self) -> MaybeDontAnalyzer: + """Create analyzer with default config.""" + return MaybeDontAnalyzer() + + def test_map_high_risk(self, analyzer: MaybeDontAnalyzer): + """Test that 'high' risk_level maps to HIGH.""" + assert ( + analyzer._map_response_to_risk({"risk_level": "high"}) == SecurityRisk.HIGH + ) + + def test_map_medium_risk(self, analyzer: MaybeDontAnalyzer): + """Test that 'medium' risk_level maps to MEDIUM.""" + assert ( + analyzer._map_response_to_risk({"risk_level": "medium"}) + == SecurityRisk.MEDIUM + ) + + def test_map_low_risk(self, analyzer: MaybeDontAnalyzer): + """Test that 'low' risk_level maps to LOW.""" + assert analyzer._map_response_to_risk({"risk_level": "low"}) == SecurityRisk.LOW + + def test_map_unknown_risk(self, analyzer: MaybeDontAnalyzer): + """Test that 'unknown' risk_level maps to UNKNOWN.""" + assert ( + analyzer._map_response_to_risk({"risk_level": "unknown"}) + == SecurityRisk.UNKNOWN + ) + + def test_map_missing_risk_level(self, analyzer: MaybeDontAnalyzer): + """Test that missing risk_level defaults to UNKNOWN.""" + assert analyzer._map_response_to_risk({}) == SecurityRisk.UNKNOWN + + def test_map_unrecognized_risk_level(self, analyzer: MaybeDontAnalyzer): + """Test that unrecognized risk_level defaults to UNKNOWN.""" + assert ( + analyzer._map_response_to_risk({"risk_level": "critical"}) + == SecurityRisk.UNKNOWN + ) + + +class TestMaybeDontAnalyzerSecurityRisk: + """Tests for the security_risk method (end-to-end).""" + + @pytest.fixture + def analyzer(self) -> MaybeDontAnalyzer: + """Create analyzer with default config.""" + return MaybeDontAnalyzer() + + def test_security_risk_allowed_low(self, analyzer: MaybeDontAnalyzer): + """Test allowed action returns LOW risk.""" + action = create_mock_action_event() + + mock_response = MagicMock() + mock_response.status_code = 200 + mock_response.json.return_value = { + "request_id": "abc123", + "allowed": True, + "risk_level": "low", + "message": "Action allowed", + } + + with patch.object(analyzer, "_get_client") as mock_get_client: + mock_client = MagicMock() + mock_client.post.return_value = mock_response + mock_get_client.return_value = mock_client + + result = analyzer.security_risk(action) + + assert result == SecurityRisk.LOW + mock_client.post.assert_called_once() + + def test_security_risk_denied_high(self, analyzer: MaybeDontAnalyzer): + """Test denied action returns HIGH risk.""" + action = create_mock_action_event( + tool_name="execute_bash", + command="rm -rf /", + ) + + mock_response = MagicMock() + mock_response.status_code = 200 + mock_response.json.return_value = { + "request_id": "abc123", + "allowed": False, + "risk_level": "high", + "message": "Action denied by policy", + "results": [ + { + "policy_name": "no-destructive-ops", + "policy_type": "ai", + "action": "deny", + "message": "Destructive operation", + } + ], + } + + with patch.object(analyzer, "_get_client") as mock_get_client: + mock_client = MagicMock() + mock_client.post.return_value = mock_response + mock_get_client.return_value = mock_client + + result = analyzer.security_risk(action) + + assert result == SecurityRisk.HIGH + + def test_security_risk_audit_only_medium(self, analyzer: MaybeDontAnalyzer): + """Test audit_only deny returns MEDIUM risk.""" + action = create_mock_action_event() + + mock_response = MagicMock() + mock_response.status_code = 200 + mock_response.json.return_value = { + "request_id": "abc123", + "allowed": True, + "risk_level": "medium", + "message": "Action allowed (audit_only bypass)", + } + + with patch.object(analyzer, "_get_client") as mock_get_client: + mock_client = MagicMock() + mock_client.post.return_value = mock_response + mock_get_client.return_value = mock_client + + result = analyzer.security_risk(action) + + assert result == SecurityRisk.MEDIUM + + def test_security_risk_no_policies_unknown(self, analyzer: MaybeDontAnalyzer): + """Test no policies evaluated returns UNKNOWN risk.""" + action = create_mock_action_event() + + mock_response = MagicMock() + mock_response.status_code = 200 + mock_response.json.return_value = { + "request_id": "abc123", + "allowed": True, + "risk_level": "unknown", + "message": "No policies evaluated", + } + + with patch.object(analyzer, "_get_client") as mock_get_client: + mock_client = MagicMock() + mock_client.post.return_value = mock_response + mock_get_client.return_value = mock_client + + result = analyzer.security_risk(action) + + assert result == SecurityRisk.UNKNOWN + + def test_security_risk_gateway_unreachable(self, analyzer: MaybeDontAnalyzer): + """Test that unreachable gateway returns UNKNOWN risk.""" + action = create_mock_action_event() + + with patch.object(analyzer, "_get_client") as mock_get_client: + mock_client = MagicMock() + mock_client.post.side_effect = httpx.ConnectError("Connection refused") + mock_get_client.return_value = mock_client + + result = analyzer.security_risk(action) + + assert result == SecurityRisk.UNKNOWN + + def test_security_risk_gateway_500(self, analyzer: MaybeDontAnalyzer): + """Test that gateway 500 error returns UNKNOWN risk.""" + action = create_mock_action_event() + + mock_response = MagicMock() + mock_response.status_code = 500 + mock_response.text = "Internal Server Error" + + with patch.object(analyzer, "_get_client") as mock_get_client: + mock_client = MagicMock() + mock_client.post.return_value = mock_response + mock_get_client.return_value = mock_client + + result = analyzer.security_risk(action) + + assert result == SecurityRisk.UNKNOWN + + def test_security_risk_gateway_400(self, analyzer: MaybeDontAnalyzer): + """Test that gateway 400 error returns UNKNOWN risk.""" + action = create_mock_action_event() + + mock_response = MagicMock() + mock_response.status_code = 400 + mock_response.text = '{"error": "missing target field"}' + + with patch.object(analyzer, "_get_client") as mock_get_client: + mock_client = MagicMock() + mock_client.post.return_value = mock_response + mock_get_client.return_value = mock_client + + result = analyzer.security_risk(action) + + assert result == SecurityRisk.UNKNOWN + + def test_security_risk_timeout(self, analyzer: MaybeDontAnalyzer): + """Test that gateway timeout returns UNKNOWN risk.""" + action = create_mock_action_event() + + with patch.object(analyzer, "_get_client") as mock_get_client: + mock_client = MagicMock() + mock_client.post.side_effect = httpx.TimeoutException("Timeout") + mock_get_client.return_value = mock_client + + result = analyzer.security_risk(action) + + assert result == SecurityRisk.UNKNOWN + + def test_security_risk_invalid_json_response(self, analyzer: MaybeDontAnalyzer): + """Test that invalid JSON response returns UNKNOWN risk.""" + action = create_mock_action_event() + + mock_response = MagicMock() + mock_response.status_code = 200 + mock_response.json.side_effect = json.JSONDecodeError("", "", 0) + mock_response.text = "not valid json" + + with patch.object(analyzer, "_get_client") as mock_get_client: + mock_client = MagicMock() + mock_client.post.return_value = mock_response + mock_get_client.return_value = mock_client + + result = analyzer.security_risk(action) + + assert result == SecurityRisk.UNKNOWN + + +class TestMaybeDontAnalyzerHTTPHeaders: + """Tests for HTTP header handling.""" + + def test_client_sends_content_type_header(self): + """Test that HTTP client sends correct Content-Type header.""" + analyzer = MaybeDontAnalyzer() + client = analyzer._create_client() + try: + assert client.headers["content-type"] == "application/json" + finally: + client.close() + + def test_client_sends_client_id_header(self): + """Test that HTTP client sends X-Maybe-Dont-Client-ID header.""" + analyzer = MaybeDontAnalyzer(client_id="my-agent") + client = analyzer._create_client() + try: + assert client.headers["x-maybe-dont-client-id"] == "my-agent" + finally: + client.close() + + def test_request_url_construction(self, caplog: pytest.LogCaptureFixture): + """Test that the correct URL is called.""" + analyzer = MaybeDontAnalyzer(gateway_url="http://gateway:9090") + action = create_mock_action_event() + + mock_response = MagicMock() + mock_response.status_code = 200 + mock_response.json.return_value = {"risk_level": "low", "allowed": True} + + with patch.object(analyzer, "_get_client") as mock_get_client: + mock_client = MagicMock() + mock_client.post.return_value = mock_response + mock_get_client.return_value = mock_client + + analyzer.security_risk(action) + + call_args = mock_client.post.call_args + assert call_args[0][0] == "http://gateway:9090/api/v1/action/validate" + + def test_request_url_strips_trailing_slash(self): + """Test that trailing slash in gateway_url is handled.""" + analyzer = MaybeDontAnalyzer(gateway_url="http://gateway:9090/") + action = create_mock_action_event() + + mock_response = MagicMock() + mock_response.status_code = 200 + mock_response.json.return_value = {"risk_level": "low", "allowed": True} + + with patch.object(analyzer, "_get_client") as mock_get_client: + mock_client = MagicMock() + mock_client.post.return_value = mock_response + mock_get_client.return_value = mock_client + + analyzer.security_risk(action) + + call_args = mock_client.post.call_args + assert call_args[0][0] == "http://gateway:9090/api/v1/action/validate" + + +class TestMaybeDontAnalyzerLifecycle: + """Tests for analyzer lifecycle management.""" + + def test_close_cleans_up_client(self): + """Test that close cleans up the HTTP client.""" + analyzer = MaybeDontAnalyzer() + + mock_client = MagicMock() + mock_client.is_closed = False + analyzer._client = mock_client + + analyzer.close() + + mock_client.close.assert_called_once() + assert analyzer._client is None + + def test_close_handles_no_client(self): + """Test that close handles case when no client exists.""" + analyzer = MaybeDontAnalyzer() + # Should not raise + analyzer.close() + + def test_close_handles_already_closed_client(self): + """Test that close handles already-closed client gracefully.""" + analyzer = MaybeDontAnalyzer() + + mock_client = MagicMock() + mock_client.is_closed = True + analyzer._client = mock_client + + analyzer.close() + + mock_client.close.assert_not_called() + + def test_set_events_stores_events(self): + """Test that set_events stores events for future use.""" + analyzer = MaybeDontAnalyzer() + events = ["event1", "event2"] + analyzer.set_events(events) + assert analyzer._events == events + + def test_set_events_empty_list(self): + """Test that set_events handles empty list.""" + analyzer = MaybeDontAnalyzer() + analyzer.set_events([]) + assert analyzer._events == [] + + +class TestMaybeDontAnalyzerHTTPClientLifecycle: + """Integration tests for HTTP client lifecycle using MockTransport.""" + + def test_client_creation_and_reuse(self): + """Test that HTTP client is created and reused correctly.""" + + def mock_handler(request: httpx.Request) -> httpx.Response: + return httpx.Response( + 200, + json={ + "request_id": "test", + "allowed": True, + "risk_level": "low", + }, + ) + + transport = httpx.MockTransport(mock_handler) + analyzer = MaybeDontAnalyzer() + analyzer._client = httpx.Client(transport=transport) + + action = create_mock_action_event() + + try: + result = analyzer.security_risk(action) + assert result == SecurityRisk.LOW + + # Second call should reuse the same client + result = analyzer.security_risk(action) + assert result == SecurityRisk.LOW + finally: + analyzer.close() + + def test_client_recreated_after_close(self): + """Test that client is recreated after close() is called.""" + call_count = 0 + + def mock_handler(request: httpx.Request) -> httpx.Response: + nonlocal call_count + call_count += 1 + return httpx.Response( + 200, + json={ + "request_id": "test", + "allowed": True, + "risk_level": "low", + }, + ) + + analyzer = MaybeDontAnalyzer() + transport = httpx.MockTransport(mock_handler) + analyzer._client = httpx.Client(transport=transport) + + action = create_mock_action_event() + + try: + result = analyzer.security_risk(action) + assert result == SecurityRisk.LOW + assert call_count == 1 + + analyzer.close() + assert analyzer._client is None + + # Next call should create a new client + with patch.object(analyzer, "_create_client") as mock_create: + new_transport = httpx.MockTransport(mock_handler) + mock_create.return_value = httpx.Client(transport=new_transport) + + result = analyzer.security_risk(action) + assert result == SecurityRisk.LOW + mock_create.assert_called_once() + finally: + analyzer.close() + + def test_request_body_sent_correctly(self): + """Test that the correct request body is sent to the gateway.""" + captured_request: httpx.Request | None = None + + def mock_handler(request: httpx.Request) -> httpx.Response: + nonlocal captured_request + captured_request = request + return httpx.Response( + 200, + json={ + "request_id": "test", + "allowed": True, + "risk_level": "low", + }, + ) + + transport = httpx.MockTransport(mock_handler) + analyzer = MaybeDontAnalyzer(client_id="test-agent") + analyzer._client = httpx.Client(transport=transport) + + action = create_mock_action_event( + tool_name="execute_bash", + command="rm -rf /tmp/old", + thought_text="I need to remove old temporary files", + summary="removing old temp files", + ) + + try: + analyzer.security_risk(action) + + assert captured_request is not None + body = json.loads(captured_request.content) + assert body["action_type"] == "tool_call" + assert body["target"] == "execute_bash" + assert body["parameters"] == {"command": "rm -rf /tmp/old"} + assert body["actor"] == "test-agent" + assert body["context"]["thought"] == "I need to remove old temporary files" + assert body["context"]["summary"] == "removing old temp files" + finally: + analyzer.close() From 868df36c0bfad6a8eaa460ae2839589d3ba3ddf3 Mon Sep 17 00:00:00 2001 From: Daniel DeGroff Date: Fri, 20 Feb 2026 10:27:39 -0700 Subject: [PATCH 2/2] fix: Docker example in docstring that won't start - Add MAYBE_DONT_SERVER_LISTEN_ADDR=0.0.0.0:8080 (127.0.0.1 unreachable from host) - Disable AI validation and audit report (require OpenAI API key) - Match Docker command to docs PR (OpenHands/docs#350) Co-Authored-By: Claude Opus 4.6 --- .../40_maybedont_security_analyzer.py | 10 +++++++--- 1 file changed, 7 insertions(+), 3 deletions(-) diff --git a/examples/01_standalone_sdk/40_maybedont_security_analyzer.py b/examples/01_standalone_sdk/40_maybedont_security_analyzer.py index c18755f2cf..8eb88dfc83 100644 --- a/examples/01_standalone_sdk/40_maybedont_security_analyzer.py +++ b/examples/01_standalone_sdk/40_maybedont_security_analyzer.py @@ -4,15 +4,19 @@ against policy rules configured in a Maybe Don't Gateway before execution. Prerequisites: - 1. A running Maybe Don't Gateway instance. Quick start with Docker: + 1. A running Maybe Don't instance. Quick start with Docker: - docker run -p 8080:8080 ghcr.io/maybedont/maybe-dont:latest + docker run -d --name maybe-dont -p 8080:8080 \ + -e MAYBE_DONT_SERVER_LISTEN_ADDR=0.0.0.0:8080 \ + -e MAYBE_DONT_REQUEST_VALIDATION_AI_ENABLED=false \ + -e MAYBE_DONT_NATIVE_TOOLS_AUDIT_REPORT_ENABLED=false \ + ghcr.io/maybedont/maybe-dont:latest For configuration, see: https://maybedont.ai/docs 2. Set environment variables: - LLM_API_KEY: Your LLM provider API key - - MAYBE_DONT_GATEWAY_URL: Gateway URL (default: http://localhost:8080) + - MAYBE_DONT_GATEWAY_URL: Maybe Don't URL (default: http://localhost:8080) The Maybe Don't Gateway supports two layers of protection: - Security Analyzer (this example): Pre-execution validation of ALL actions