diff --git a/python/packages/codex/AGENTS.md b/python/packages/codex/AGENTS.md new file mode 100644 index 0000000000..ca15e6e7db --- /dev/null +++ b/python/packages/codex/AGENTS.md @@ -0,0 +1,28 @@ +# Codex Package (agent-framework-codex) + +Integration with OpenAI Codex as a managed agent (Codex SDK). + +## Main Classes + +- **`CodexAgent`** - Agent using Codex's native agent capabilities +- **`CodexAgentOptions`** - Options for Codex agent configuration +- **`CodexAgentSettings`** - TypedDict-based settings populated via the framework's `load_settings()` helper + +## Usage + +```python +from agent_framework_codex import CodexAgent + +agent = CodexAgent(...) +response = await agent.run("Hello") +``` + +## Import Path + +```python +from agent_framework_codex import CodexAgent +``` + +## Note + +This package is for Codex's managed agent functionality. For basic OpenAI chat, use `agent-framework-openai` instead. diff --git a/python/packages/codex/LICENSE b/python/packages/codex/LICENSE new file mode 100644 index 0000000000..9e841e7a26 --- /dev/null +++ b/python/packages/codex/LICENSE @@ -0,0 +1,21 @@ + MIT License + + Copyright (c) Microsoft Corporation. + + Permission is hereby granted, free of charge, to any person obtaining a copy + of this software and associated documentation files (the "Software"), to deal + in the Software without restriction, including without limitation the rights + to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + copies of the Software, and to permit persons to whom the Software is + furnished to do so, subject to the following conditions: + + The above copyright notice and this permission notice shall be included in all + copies or substantial portions of the Software. + + THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + SOFTWARE diff --git a/python/packages/codex/README.md b/python/packages/codex/README.md new file mode 100644 index 0000000000..02452c0ad1 --- /dev/null +++ b/python/packages/codex/README.md @@ -0,0 +1,11 @@ +# Get Started with Microsoft Agent Framework Codex + +Please install this package via pip: + +```bash +pip install agent-framework-codex --pre +``` + +## Codex Agent + +The Codex agent enables integration with OpenAI Codex SDK, allowing you to interact with Codex's agentic coding capabilities through the Agent Framework. diff --git a/python/packages/codex/agent_framework_codex/__init__.py b/python/packages/codex/agent_framework_codex/__init__.py new file mode 100644 index 0000000000..eac7165458 --- /dev/null +++ b/python/packages/codex/agent_framework_codex/__init__.py @@ -0,0 +1,18 @@ +# Copyright (c) Microsoft. All rights reserved. + +import importlib.metadata + +from ._agent import CodexAgent, CodexAgentOptions, CodexAgentSettings, RawCodexAgent + +try: + __version__ = importlib.metadata.version(__name__) +except importlib.metadata.PackageNotFoundError: + __version__ = "0.0.0" # Fallback for development mode + +__all__ = [ + "CodexAgent", + "CodexAgentOptions", + "CodexAgentSettings", + "RawCodexAgent", + "__version__", +] diff --git a/python/packages/codex/agent_framework_codex/_agent.py b/python/packages/codex/agent_framework_codex/_agent.py new file mode 100644 index 0000000000..6402ae47b5 --- /dev/null +++ b/python/packages/codex/agent_framework_codex/_agent.py @@ -0,0 +1,735 @@ +# Copyright (c) Microsoft. All rights reserved. + +from __future__ import annotations + +import contextlib +import logging +import sys +from collections.abc import AsyncIterable, Awaitable, Callable, Sequence +from typing import TYPE_CHECKING, Any, ClassVar, Generic, Literal, overload + +from agent_framework import ( + AgentMiddlewareTypes, + AgentResponse, + AgentResponseUpdate, + AgentRunInputs, + AgentSession, + BaseAgent, + BaseContextProvider, + Content, + FunctionTool, + Message, + ResponseStream, + ToolTypes, + load_settings, + normalize_messages, + normalize_tools, +) +from agent_framework.observability import AgentTelemetryLayer +from agent_framework.exceptions import AgentException +from codex_sdk import ( + AssistantMessage, + CodexSDKClient, + ResultMessage, + SdkMcpTool, + create_sdk_mcp_server, +) +from codex_sdk import ( + CodexAgentOptions as SDKOptions, +) +from codex_sdk.types import StreamEvent, TextBlock + +if sys.version_info >= (3, 13): + from typing import TypeVar # type: ignore # pragma: no cover +else: + from typing_extensions import TypeVar # type: ignore # pragma: no cover +if sys.version_info >= (3, 11): + from typing import TypedDict # pragma: no cover +else: + from typing_extensions import TypedDict # pragma: no cover + +if TYPE_CHECKING: + from codex_sdk import ( + AgentDefinition, + CanUseTool, + HookMatcher, + McpServerConfig, + PermissionMode, + SandboxSettings, + SdkBeta, + ) + + +logger = logging.getLogger("agent_framework.codex") + + +# Name of the in-process MCP server that hosts Agent Framework tools. +# FunctionTool instances are converted to SDK MCP tools and served +# through this server, as Codex CLI only supports tools via MCP. +TOOLS_MCP_SERVER_NAME = "_agent_framework_tools" + + +class CodexAgentSettings(TypedDict, total=False): + """Codex Agent settings. + + Settings are resolved in this order: explicit keyword arguments, values from an + explicitly provided .env file, then environment variables with the prefix + 'CODEX_AGENT_'. + + Keys: + cli_path: The path to Codex CLI executable. + model: The model to use (codex-mini-latest, gpt-5.1-codex). + cwd: The working directory for Codex CLI. + permission_mode: Permission mode (default, acceptEdits, plan, bypassPermissions). + max_turns: Maximum number of conversation turns. + max_budget_usd: Maximum budget in USD. + """ + + cli_path: str | None + model: str | None + cwd: str | None + permission_mode: str | None + max_turns: int | None + max_budget_usd: float | None + + +class CodexAgentOptions(TypedDict, total=False): + """Codex Agent-specific options.""" + + system_prompt: str + """System prompt for the agent.""" + + cli_path: str + """Path to Codex CLI executable. Default: auto-detected.""" + + cwd: str + """Working directory for Codex CLI. Default: current working directory.""" + + env: dict[str, str] + """Environment variables to pass to CLI.""" + + model: str + """Model to use ("codex-mini-latest", "gpt-5.1-codex"). Default: "codex-mini-latest".""" + + fallback_model: str + """Fallback model if primary fails.""" + + max_thinking_tokens: int + """Maximum tokens for thinking blocks.""" + + allowed_tools: list[str] + """Allowlist of tools. If set, Codex can ONLY use tools in this list.""" + + disallowed_tools: list[str] + """Blocklist of tools. Codex cannot use these tools.""" + + mcp_servers: dict[str, McpServerConfig] + """MCP server configurations for external tools.""" + + permission_mode: PermissionMode + """Permission handling mode ("default", "acceptEdits", "plan", "bypassPermissions").""" + + can_use_tool: CanUseTool + """Permission callback for tool use.""" + + max_turns: int + """Maximum conversation turns.""" + + max_budget_usd: float + """Budget limit in USD.""" + + hooks: dict[str, list[HookMatcher]] + """Pre/post tool hooks.""" + + add_dirs: list[str] + """Additional directories to add to context.""" + + sandbox: SandboxSettings + """Sandbox configuration for execution isolation.""" + + agents: dict[str, AgentDefinition] + """Custom agent definitions.""" + + output_format: dict[str, Any] + """Structured output format (JSON schema).""" + + enable_file_checkpointing: bool + """Enable file checkpointing for rewind.""" + + betas: list[SdkBeta] + """Beta features to enable.""" + + +OptionsT = TypeVar( + "OptionsT", + bound=TypedDict, # type: ignore[valid-type] + default="CodexAgentOptions", + covariant=True, +) + + +class RawCodexAgent(BaseAgent, Generic[OptionsT]): + """OpenAI Codex Agent using Codex CLI. + + Wraps the Codex SDK to provide agentic coding capabilities including + tool use, session management, and streaming responses. + + This agent communicates with Codex through the Codex CLI, + enabling access to Codex's full agentic capabilities like file + editing, code execution, and tool use. + + The agent can be used as an async context manager to ensure proper cleanup: + + Examples: + Basic usage with context manager: + + .. code-block:: python + + from agent_framework_codex import CodexAgent + + async with CodexAgent( + instructions="You are a helpful coding assistant.", + ) as agent: + response = await agent.run("Hello!") + print(response.text) + + With streaming: + + .. code-block:: python + + async with CodexAgent() as agent: + async for update in agent.run("Write a poem", stream=True): + print(update.text, end="", flush=True) + + With session management: + + .. code-block:: python + + async with CodexAgent() as agent: + session = agent.create_session() + await agent.run("Remember my name is Alice", session=session) + response = await agent.run("What's my name?", session=session) + # Codex will remember "Alice" from the same session + + With Agent Framework tools: + + .. code-block:: python + + from agent_framework import tool + + @tool + def greet(name: str) -> str: + \"\"\"Greet someone by name.\"\"\" + return f"Hello, {name}!" + + async with CodexAgent(tools=[greet]) as agent: + response = await agent.run("Greet Alice") + """ + + AGENT_PROVIDER_NAME: ClassVar[str] = "openai.codex" + + def __init__( + self, + instructions: str | None = None, + *, + client: CodexSDKClient | None = None, + id: str | None = None, + name: str | None = None, + description: str | None = None, + context_providers: Sequence[BaseContextProvider] | None = None, + middleware: Sequence[AgentMiddlewareTypes] | None = None, + tools: ToolTypes | Callable[..., Any] | str | Sequence[ToolTypes | Callable[..., Any] | str] | None = None, + default_options: OptionsT | None = None, + env_file_path: str | None = None, + env_file_encoding: str | None = None, + ) -> None: + """Initialize a CodexAgent instance. + + Args: + instructions: System prompt for the agent. + + Keyword Args: + client: Optional pre-configured CodexSDKClient instance. If not provided, + a new client will be created using the other parameters. + id: Unique identifier for the agent. + name: Name of the agent. + description: Description of the agent. + context_providers: Context providers for the agent. + middleware: List of middleware. + tools: Tools for the agent. Can be: + - Strings for built-in tools (e.g., "Read", "Write", "Bash", "Glob") + - Functions for custom tools + default_options: Default CodexAgentOptions including system_prompt, model, etc. + env_file_path: Path to .env file. + env_file_encoding: Encoding of .env file. + """ + super().__init__( + id=id, + name=name, + description=description, + context_providers=context_providers, + middleware=middleware, + ) + + self._client = client + self._owns_client = client is None + + # Parse options + opts: dict[str, Any] = dict(default_options) if default_options else {} + + # Handle instructions parameter - set as system_prompt in options + if instructions is not None: + opts["system_prompt"] = instructions + + cli_path = opts.pop("cli_path", None) + model = opts.pop("model", None) + cwd = opts.pop("cwd", None) + permission_mode = opts.pop("permission_mode", None) + max_turns = opts.pop("max_turns", None) + max_budget_usd = opts.pop("max_budget_usd", None) + self._mcp_servers: dict[str, Any] = opts.pop("mcp_servers", None) or {} + + # Load settings from environment and options + self._settings = load_settings( + CodexAgentSettings, + env_prefix="CODEX_AGENT_", + cli_path=cli_path, + model=model, + cwd=cwd, + permission_mode=permission_mode, + max_turns=max_turns, + max_budget_usd=max_budget_usd, + env_file_path=env_file_path, + env_file_encoding=env_file_encoding, + ) + + # Separate built-in tools (strings) from custom tools (callables/FunctionTool) + self._builtin_tools: list[str] = [] + self._custom_tools: list[ToolTypes] = [] + self._normalize_tools(tools) + + self._default_options = opts + self._started = False + self._current_session_id: str | None = None + + def _normalize_tools( + self, + tools: ToolTypes | Callable[..., Any] | str | Sequence[ToolTypes | Callable[..., Any] | str] | None, + ) -> None: + """Separate built-in tools (strings) from custom tools. + + Args: + tools: Mixed list of tool names and custom tools. + """ + if tools is None: + return + + # Normalize to sequence + if isinstance(tools, str): + tools_list: Sequence[Any] = [tools] + elif isinstance(tools, Sequence): + tools_list = list(tools) + else: + tools_list = [tools] + + for tool in tools_list: + if isinstance(tool, str): + self._builtin_tools.append(tool) + else: + # Use normalize_tools for custom tools + normalized = normalize_tools(tool) + self._custom_tools.extend(normalized) + + async def __aenter__(self) -> RawCodexAgent[OptionsT]: + """Start the agent when entering async context.""" + await self.start() + return self + + async def __aexit__(self, exc_type: Any, exc_val: Any, exc_tb: Any) -> None: + """Stop the agent when exiting async context.""" + await self.stop() + + async def start(self) -> None: + """Start the Codex SDK client. + + This method initializes the Codex SDK client and establishes a connection + to the Codex CLI. It is called automatically when using the agent + as an async context manager. + + Raises: + AgentException: If the client fails to start. + """ + await self._ensure_session() + + async def stop(self) -> None: + """Stop the Codex SDK client and clean up resources. + + Stops the client if owned by this agent. Called automatically when + using the agent as an async context manager. + """ + if self._client and self._owns_client: + with contextlib.suppress(Exception): + await self._client.disconnect() + + self._started = False + self._current_session_id = None + + async def _ensure_session(self, session_id: str | None = None) -> None: + """Ensure the client is connected for the specified session. + + If the requested session differs from the current one, recreates the client. + Treats None as a distinct session identity so that switching from a resumed + session back to a fresh session correctly creates a new client. + + Args: + session_id: The session ID to use, or None for a new session. + """ + needs_new_client = ( + not self._started + or self._client is None + or session_id != self._current_session_id + ) + + if needs_new_client: + # Stop existing client if any + if self._client and self._owns_client: + with contextlib.suppress(Exception): + await self._client.disconnect() + self._started = False + + # Create new client with resume option if needed + opts = self._prepare_client_options(resume_session_id=session_id) + self._client = CodexSDKClient(options=opts) + self._owns_client = True + + try: + await self._client.connect() + self._started = True + self._current_session_id = session_id + except Exception as ex: + self._client = None + raise AgentException(f"Failed to start Codex SDK client: {ex}") from ex + + def _prepare_client_options(self, resume_session_id: str | None = None) -> SDKOptions: + """Prepare SDK options for client initialization. + + Args: + resume_session_id: Optional session ID to resume. + + Returns: + SDKOptions instance configured for the client. + """ + opts: dict[str, Any] = {} + + # Set resume option if provided + if resume_session_id: + opts["resume"] = resume_session_id + + # Apply settings from environment + if cli_path := self._settings.get("cli_path"): + opts["cli_path"] = cli_path + if model := self._settings.get("model"): + opts["model"] = model + if cwd := self._settings.get("cwd"): + opts["cwd"] = cwd + if permission_mode := self._settings.get("permission_mode"): + opts["permission_mode"] = permission_mode + if max_turns := self._settings.get("max_turns"): + opts["max_turns"] = max_turns + if max_budget_usd := self._settings.get("max_budget_usd"): + opts["max_budget_usd"] = max_budget_usd + + # Apply default options + for key, value in self._default_options.items(): + if value is not None: + opts[key] = value + + # Add built-in tools (strings like "Read", "Write", "Bash") + if self._builtin_tools: + opts["tools"] = self._builtin_tools + + # Prepare custom tools (FunctionTool instances) + custom_tools_server, custom_tool_names = ( + self._prepare_tools(self._custom_tools) if self._custom_tools else (None, []) + ) + + # MCP servers - merge user-provided servers with custom tools server + mcp_servers = dict(self._mcp_servers) if self._mcp_servers else {} + if custom_tools_server: + mcp_servers[TOOLS_MCP_SERVER_NAME] = custom_tools_server + if mcp_servers: + opts["mcp_servers"] = mcp_servers + + # Add custom tools to allowed_tools so they can be executed + if custom_tool_names: + existing_allowed = opts.get("allowed_tools", []) + opts["allowed_tools"] = list(existing_allowed) + custom_tool_names + + # Always enable partial messages for streaming support + opts["include_partial_messages"] = True + + return SDKOptions(**opts) + + def _prepare_tools( + self, + tools: Sequence[ToolTypes], + ) -> tuple[Any, list[str]]: + """Convert Agent Framework tools to SDK MCP server. + + Args: + tools: List of Agent Framework tools. + + Returns: + Tuple of (MCP server config, list of allowed tool names). + """ + sdk_tools: list[SdkMcpTool[Any]] = [] + tool_names: list[str] = [] + + for tool in tools: + if isinstance(tool, FunctionTool): + sdk_tools.append(self._function_tool_to_sdk_mcp_tool(tool)) + # Codex SDK convention: MCP tools use format "mcp__{server}__{tool}" + tool_names.append(f"mcp__{TOOLS_MCP_SERVER_NAME}__{tool.name}") + else: + # Non-FunctionTool items (e.g., dict-based hosted tools) cannot be converted to SDK MCP tools + logger.debug(f"Unsupported tool type: {type(tool)}") + + if not sdk_tools: + return None, [] + + return create_sdk_mcp_server(name=TOOLS_MCP_SERVER_NAME, tools=sdk_tools), tool_names + + def _function_tool_to_sdk_mcp_tool(self, func_tool: FunctionTool) -> SdkMcpTool[Any]: + """Convert a FunctionTool to an SDK MCP tool. + + Args: + func_tool: The FunctionTool to convert. + + Returns: + An SdkMcpTool instance. + """ + + async def handler(args: dict[str, Any]) -> dict[str, Any]: + """Handler that invokes the FunctionTool.""" + try: + if func_tool.input_model: + args_instance = func_tool.input_model(**args) + result = await func_tool.invoke(arguments=args_instance) + else: + result = await func_tool.invoke(arguments=args) + return {"content": [{"type": "text", "text": str(result)}]} + except Exception as e: + return {"content": [{"type": "text", "text": f"Error: {e}"}]} + + # Get JSON schema from pydantic model + schema: dict[str, Any] = func_tool.input_model.model_json_schema() if func_tool.input_model else {} + input_schema: dict[str, Any] = { + "type": "object", + "properties": schema.get("properties", {}), + "required": schema.get("required", []), + } + # Preserve $defs for nested type references (Pydantic uses $defs for nested models) + if "$defs" in schema: + input_schema["$defs"] = schema["$defs"] + + return SdkMcpTool( + name=func_tool.name, + description=func_tool.description, + input_schema=input_schema, + handler=handler, + ) + + async def _apply_runtime_options(self, options: dict[str, Any] | None) -> None: + """Apply runtime options that can be changed dynamically. + + The Codex SDK supports changing model and permission_mode after connection. + + Args: + options: Runtime options to apply. + """ + if not options or not self._client: + return + + if "model" in options: + await self._client.set_model(options["model"]) + + if "permission_mode" in options: + await self._client.set_permission_mode(options["permission_mode"]) + + def _format_prompt(self, messages: list[Message] | None) -> str: + """Format messages into a prompt string. + + Args: + messages: List of chat messages. + + Returns: + Formatted prompt string. + """ + if not messages: + return "" + return "\n".join([msg.text or "" for msg in messages]) + + @overload + def run( + self, + messages: AgentRunInputs | None = None, + *, + stream: Literal[True], + session: AgentSession | None = None, + options: OptionsT | None = None, + **kwargs: Any, + ) -> AsyncIterable[AgentResponseUpdate]: ... + + @overload + async def run( + self, + messages: AgentRunInputs | None = None, + *, + stream: Literal[False] = ..., + session: AgentSession | None = None, + options: OptionsT | None = None, + **kwargs: Any, + ) -> AgentResponse[Any]: ... + + def run( + self, + messages: AgentRunInputs | None = None, + *, + stream: bool = False, + session: AgentSession | None = None, + options: OptionsT | None = None, + **kwargs: Any, + ) -> AsyncIterable[AgentResponseUpdate] | Awaitable[AgentResponse[Any]]: + """Run the agent with the given messages. + + Args: + messages: The messages to process. + + Keyword Args: + stream: If True, returns an async iterable of updates. If False (default), + returns an awaitable AgentResponse. + session: The conversation session. If session has service_session_id set, + the agent will resume that session. + options: Runtime options (model, permission_mode can be changed per-request). + kwargs: Additional keyword arguments. + + Returns: + When stream=True: An ResponseStream for streaming updates. + When stream=False: An Awaitable[AgentResponse] with the complete response. + """ + response = ResponseStream( + self._get_stream(messages, session=session, options=options, **kwargs), + finalizer=self._finalize_response, + ) + if stream: + return response + return response.get_final_response() + + def _finalize_response(self, updates: Sequence[AgentResponseUpdate]) -> AgentResponse[Any]: + """Build AgentResponse and propagate structured_output as value. + + Args: + updates: The collected stream updates. + + Returns: + An AgentResponse with structured_output set as value if present. + """ + structured_output = getattr(self, "_structured_output", None) + return AgentResponse.from_updates(updates, value=structured_output) + + async def _get_stream( + self, + messages: AgentRunInputs | None = None, + *, + session: AgentSession | None = None, + options: OptionsT | None = None, + **kwargs: Any, + ) -> AsyncIterable[AgentResponseUpdate]: + """Internal streaming implementation.""" + session = session or self.create_session() + + # Ensure we're connected to the right session + await self._ensure_session(session.service_session_id) + + if not self._client: + raise RuntimeError("Codex SDK client not initialized.") + + prompt = self._format_prompt(normalize_messages(messages)) + + # Apply runtime options (model, permission_mode) + await self._apply_runtime_options(dict(options) if options else None) + + session_id: str | None = None + structured_output: Any = None + + await self._client.query(prompt) + async for message in self._client.receive_response(): + if isinstance(message, StreamEvent): + # Handle streaming events - extract text/thinking deltas + event = message.event + if event.get("type") == "content_block_delta": + delta = event.get("delta", {}) + delta_type = delta.get("type") + if delta_type == "text_delta": + text = delta.get("text", "") + if text: + yield AgentResponseUpdate( + role="assistant", + contents=[Content.from_text(text=text, raw_representation=message)], + raw_representation=message, + ) + elif delta_type == "thinking_delta": + thinking = delta.get("thinking", "") + if thinking: + yield AgentResponseUpdate( + role="assistant", + contents=[Content.from_text_reasoning(text=thinking, raw_representation=message)], + raw_representation=message, + ) + elif isinstance(message, AssistantMessage): + # Handle AssistantMessage - check for API errors + # Note: In streaming mode, the content was already yielded via StreamEvent, + # so we only check for errors here, not re-emit content. + if message.error: + # Map error types to descriptive messages + error_messages = { + "authentication_failed": "Authentication failed with Codex API", + "billing_error": "Billing error with Codex API", + "rate_limit": "Rate limit exceeded for Codex API", + "invalid_request": "Invalid request to Codex API", + "server_error": "Codex API server error", + "unknown": "Unknown error from Codex API", + } + error_msg = error_messages.get(message.error, f"Codex API error: {message.error}") + # Extract any error details from content blocks + if message.content: + for block in message.content: + if isinstance(block, TextBlock): + error_msg = f"{error_msg}: {block.text}" + break + raise AgentException(error_msg) + elif isinstance(message, ResultMessage): + # Check for errors in result message + if message.is_error: + error_msg = message.result or "Unknown error from Codex API" + raise AgentException(f"Codex API error: {error_msg}") + session_id = message.session_id + structured_output = message.structured_output + + # Update session with session ID + if session_id: + session.service_session_id = session_id + + # Store structured output for the finalizer + self._structured_output = structured_output + + +class CodexAgent(AgentTelemetryLayer, RawCodexAgent[OptionsT], Generic[OptionsT]): + """OpenAI Codex Agent with built-in OpenTelemetry instrumentation. + + Extends :class:`RawCodexAgent` with automatic telemetry spans via + :class:`AgentTelemetryLayer`. Use ``RawCodexAgent`` directly if you + need the agent without telemetry overhead. + """ + + pass diff --git a/python/packages/codex/pyproject.toml b/python/packages/codex/pyproject.toml new file mode 100644 index 0000000000..78cbfe5ced --- /dev/null +++ b/python/packages/codex/pyproject.toml @@ -0,0 +1,94 @@ +[project] +name = "agent-framework-codex" +description = "OpenAI Codex SDK integration for Microsoft Agent Framework." +authors = [{ name = "Microsoft", email = "af-support@microsoft.com"}] +readme = "README.md" +requires-python = ">=3.10" +version = "1.0.0b260225" +license-files = ["LICENSE"] +urls.homepage = "https://aka.ms/agent-framework" +urls.source = "https://github.com/microsoft/agent-framework/tree/main/python" +urls.release_notes = "https://github.com/microsoft/agent-framework/releases?q=tag%3Apython-1&expanded=true" +urls.issues = "https://github.com/microsoft/agent-framework/issues" +classifiers = [ + "License :: OSI Approved :: MIT License", + "Development Status :: 4 - Beta", + "Intended Audience :: Developers", + "Programming Language :: Python :: 3", + "Programming Language :: Python :: 3.10", + "Programming Language :: Python :: 3.11", + "Programming Language :: Python :: 3.12", + "Programming Language :: Python :: 3.13", + "Programming Language :: Python :: 3.14", + "Typing :: Typed", +] +dependencies = [ + "agent-framework-core>=1.0.0rc2", + "codex-sdk>=0.1.0", +] + +[tool.uv] +prerelease = "if-necessary-or-explicit" +environments = [ + "sys_platform == 'darwin'", + "sys_platform == 'linux'", + "sys_platform == 'win32'" +] + +[tool.uv-dynamic-versioning] +fallback-version = "0.0.0" + +[tool.pytest.ini_options] +testpaths = 'tests' +addopts = "-ra -q -r fEX" +asyncio_mode = "auto" +asyncio_default_fixture_loop_scope = "function" +filterwarnings = [ + "ignore:Support for class-based `config` is deprecated:DeprecationWarning:pydantic.*" +] +timeout = 120 +markers = [ + "integration: marks tests as integration tests that require external services", +] + +[tool.ruff] +extend = "../../pyproject.toml" + +[tool.coverage.run] +omit = [ + "**/__init__.py" +] + +[tool.pyright] +extends = "../../pyproject.toml" +exclude = ['tests'] + +[tool.mypy] +plugins = ['pydantic.mypy'] +strict = true +python_version = "3.10" +ignore_missing_imports = true +disallow_untyped_defs = true +no_implicit_optional = true +check_untyped_defs = true +warn_return_any = true +show_error_codes = true +warn_unused_ignores = false +disallow_incomplete_defs = true +disallow_untyped_decorators = true + +[tool.bandit] +targets = ["agent_framework_codex"] +exclude_dirs = ["tests"] + +[tool.poe] +executor.type = "uv" +include = "../../shared_tasks.toml" + +[tool.poe.tasks] +mypy = "mypy --config-file $POE_ROOT/pyproject.toml agent_framework_codex" +test = "pytest --cov=agent_framework_codex --cov-report=term-missing:skip-covered tests" + +[build-system] +requires = ["flit-core >= 3.11,<4.0"] +build-backend = "flit_core.buildapi" diff --git a/python/packages/codex/tests/test_codex_agent.py b/python/packages/codex/tests/test_codex_agent.py new file mode 100644 index 0000000000..7ae7feba65 --- /dev/null +++ b/python/packages/codex/tests/test_codex_agent.py @@ -0,0 +1,911 @@ +# Copyright (c) Microsoft. All rights reserved. + +from typing import Any +from unittest.mock import AsyncMock, MagicMock, patch + +import pytest +from agent_framework import AgentResponseUpdate, AgentSession, Content, Message, tool +from agent_framework._settings import load_settings + +from agent_framework_codex import CodexAgent, CodexAgentOptions, CodexAgentSettings +from agent_framework_codex._agent import TOOLS_MCP_SERVER_NAME + +# region Test CodexAgentSettings + + +class TestCodexAgentSettings: + """Tests for CodexAgentSettings.""" + + def test_default_values(self) -> None: + """Test default values are None.""" + settings = load_settings(CodexAgentSettings, env_prefix="CODEX_AGENT_") + assert settings["cli_path"] is None + assert settings["model"] is None + assert settings["cwd"] is None + assert settings["permission_mode"] is None + assert settings["max_turns"] is None + assert settings["max_budget_usd"] is None + + def test_explicit_values(self) -> None: + """Test explicit values override defaults.""" + settings = load_settings( + CodexAgentSettings, + env_prefix="CODEX_AGENT_", + cli_path="/usr/local/bin/codex", + model="codex-mini-latest", + cwd="/home/user/project", + permission_mode="default", + max_turns=10, + max_budget_usd=5.0, + ) + assert settings["cli_path"] == "/usr/local/bin/codex" + assert settings["model"] == "codex-mini-latest" + assert settings["cwd"] == "/home/user/project" + assert settings["permission_mode"] == "default" + assert settings["max_turns"] == 10 + assert settings["max_budget_usd"] == 5.0 + + def test_env_variable_loading(self, monkeypatch: pytest.MonkeyPatch) -> None: + """Test loading from environment variables.""" + monkeypatch.setenv("CODEX_AGENT_MODEL", "gpt-5.1-codex") + monkeypatch.setenv("CODEX_AGENT_MAX_TURNS", "20") + settings = load_settings(CodexAgentSettings, env_prefix="CODEX_AGENT_") + assert settings["model"] == "gpt-5.1-codex" + assert settings["max_turns"] == 20 + + +# region Test CodexAgent Initialization + + +class TestCodexAgentInit: + """Tests for CodexAgent initialization.""" + + def test_default_initialization(self) -> None: + """Test agent initializes with defaults.""" + agent = CodexAgent() + assert agent.id is not None + assert agent.name is None + assert agent.description is None + + def test_with_name_and_description(self) -> None: + """Test agent with name and description.""" + agent = CodexAgent(name="test-agent", description="A test agent") + assert agent.name == "test-agent" + assert agent.description == "A test agent" + + def test_with_instructions_parameter(self) -> None: + """Test agent with instructions parameter.""" + agent = CodexAgent(instructions="You are a helpful assistant.") + assert agent._default_options.get("system_prompt") == "You are a helpful assistant." # type: ignore[reportPrivateUsage] + + def test_with_system_prompt_in_options(self) -> None: + """Test agent with system_prompt in options.""" + options: CodexAgentOptions = { + "system_prompt": "You are a helpful assistant.", + } + agent = CodexAgent(default_options=options) + assert agent._default_options.get("system_prompt") == "You are a helpful assistant." # type: ignore[reportPrivateUsage] + + def test_with_default_options(self) -> None: + """Test agent with default options.""" + options: CodexAgentOptions = { + "model": "codex-mini-latest", + "permission_mode": "default", + "max_turns": 10, + } + agent = CodexAgent(default_options=options) + assert agent._settings["model"] == "codex-mini-latest" # type: ignore[reportPrivateUsage] + assert agent._settings["permission_mode"] == "default" # type: ignore[reportPrivateUsage] + assert agent._settings["max_turns"] == 10 # type: ignore[reportPrivateUsage] + + def test_with_function_tool(self) -> None: + """Test agent with function tool.""" + + @tool + def greet(name: str) -> str: + """Greet someone.""" + return f"Hello, {name}!" + + agent = CodexAgent(tools=[greet]) + assert len(agent._custom_tools) == 1 # type: ignore[reportPrivateUsage] + + def test_with_single_tool(self) -> None: + """Test agent with single tool (not in list).""" + + @tool + def greet(name: str) -> str: + """Greet someone.""" + return f"Hello, {name}!" + + agent = CodexAgent(tools=greet) + assert len(agent._custom_tools) == 1 # type: ignore[reportPrivateUsage] + + def test_with_builtin_tools(self) -> None: + """Test agent with built-in tool names.""" + agent = CodexAgent(tools=["Read", "Write", "Bash"]) + assert agent._builtin_tools == ["Read", "Write", "Bash"] # type: ignore[reportPrivateUsage] + assert agent._custom_tools == [] # type: ignore[reportPrivateUsage] + + def test_with_mixed_tools(self) -> None: + """Test agent with both built-in and custom tools.""" + + @tool + def greet(name: str) -> str: + """Greet someone.""" + return f"Hello, {name}!" + + agent = CodexAgent(tools=["Read", greet, "Bash"]) + assert agent._builtin_tools == ["Read", "Bash"] # type: ignore[reportPrivateUsage] + assert len(agent._custom_tools) == 1 # type: ignore[reportPrivateUsage] + + +# region Test CodexAgent Lifecycle + + +class TestCodexAgentLifecycle: + """Tests for CodexAgent tool initialization.""" + + def test_custom_tools_stored_from_constructor(self) -> None: + """Test that custom tools from constructor are stored.""" + + @tool + def greet(name: str) -> str: + """Greet someone.""" + return f"Hello, {name}!" + + agent = CodexAgent(tools=[greet]) + assert len(agent._custom_tools) == 1 # type: ignore[reportPrivateUsage] + + def test_multiple_custom_tools(self) -> None: + """Test agent with multiple custom tools.""" + + @tool + def greet(name: str) -> str: + """Greet someone.""" + return f"Hello, {name}!" + + @tool + def farewell(name: str) -> str: + """Say goodbye.""" + return f"Goodbye, {name}!" + + agent = CodexAgent(tools=[greet, farewell]) + assert len(agent._custom_tools) == 2 # type: ignore[reportPrivateUsage] + + def test_no_tools(self) -> None: + """Test agent without tools.""" + agent = CodexAgent() + assert agent._custom_tools == [] # type: ignore[reportPrivateUsage] + assert agent._builtin_tools == [] # type: ignore[reportPrivateUsage] + + +# region Test CodexAgent Run + + +class TestCodexAgentRun: + """Tests for CodexAgent run method.""" + + @staticmethod + async def _create_async_generator(items: list[Any]) -> Any: + """Helper to create async generator from list.""" + for item in items: + yield item + + def _create_mock_client(self, messages: list[Any]) -> MagicMock: + """Create a mock CodexSDKClient that yields given messages.""" + mock_client = MagicMock() + mock_client.connect = AsyncMock() + mock_client.disconnect = AsyncMock() + mock_client.query = AsyncMock() + mock_client.set_model = AsyncMock() + mock_client.set_permission_mode = AsyncMock() + mock_client.receive_response = MagicMock(return_value=self._create_async_generator(messages)) + return mock_client + + async def test_run_with_string_message(self) -> None: + """Test run with string message.""" + from codex_sdk import AssistantMessage, ResultMessage, TextBlock + from codex_sdk.types import StreamEvent + + messages = [ + StreamEvent( + event={ + "type": "content_block_delta", + "delta": {"type": "text_delta", "text": "Hello!"}, + }, + uuid="event-1", + session_id="session-123", + ), + AssistantMessage( + content=[TextBlock(text="Hello!")], + model="codex-mini-latest", + ), + ResultMessage( + subtype="success", + duration_ms=100, + duration_api_ms=50, + is_error=False, + num_turns=1, + session_id="session-123", + ), + ] + mock_client = self._create_mock_client(messages) + + with patch("agent_framework_codex._agent.CodexSDKClient", return_value=mock_client): + agent = CodexAgent() + response = await agent.run("Hello") + assert response.text == "Hello!" + + async def test_run_captures_session_id(self) -> None: + """Test that session ID is captured from ResultMessage.""" + from codex_sdk import AssistantMessage, ResultMessage, TextBlock + from codex_sdk.types import StreamEvent + + messages = [ + StreamEvent( + event={ + "type": "content_block_delta", + "delta": {"type": "text_delta", "text": "Response"}, + }, + uuid="event-1", + session_id="test-session-id", + ), + AssistantMessage( + content=[TextBlock(text="Response")], + model="codex-mini-latest", + ), + ResultMessage( + subtype="success", + duration_ms=100, + duration_api_ms=50, + is_error=False, + num_turns=1, + session_id="test-session-id", + ), + ] + mock_client = self._create_mock_client(messages) + + with patch("agent_framework_codex._agent.CodexSDKClient", return_value=mock_client): + agent = CodexAgent() + session = agent.create_session() + await agent.run("Hello", session=session) + assert session.service_session_id == "test-session-id" + + async def test_run_with_session(self) -> None: + """Test run with existing session.""" + from codex_sdk import AssistantMessage, ResultMessage, TextBlock + from codex_sdk.types import StreamEvent + + messages = [ + StreamEvent( + event={ + "type": "content_block_delta", + "delta": {"type": "text_delta", "text": "Response"}, + }, + uuid="event-1", + session_id="session-123", + ), + AssistantMessage( + content=[TextBlock(text="Response")], + model="codex-mini-latest", + ), + ResultMessage( + subtype="success", + duration_ms=100, + duration_api_ms=50, + is_error=False, + num_turns=1, + session_id="session-123", + ), + ] + mock_client = self._create_mock_client(messages) + + with patch("agent_framework_codex._agent.CodexSDKClient", return_value=mock_client): + agent = CodexAgent() + session = agent.create_session() + session.service_session_id = "existing-session" + await agent.run("Hello", session=session) + + +# region Test CodexAgent Run Stream + + +class TestCodexAgentRunStream: + """Tests for CodexAgent streaming run method.""" + + @staticmethod + async def _create_async_generator(items: list[Any]) -> Any: + """Helper to create async generator from list.""" + for item in items: + yield item + + def _create_mock_client(self, messages: list[Any]) -> MagicMock: + """Create a mock CodexSDKClient that yields given messages.""" + mock_client = MagicMock() + mock_client.connect = AsyncMock() + mock_client.disconnect = AsyncMock() + mock_client.query = AsyncMock() + mock_client.set_model = AsyncMock() + mock_client.set_permission_mode = AsyncMock() + mock_client.receive_response = MagicMock(return_value=self._create_async_generator(messages)) + return mock_client + + async def test_run_stream_yields_updates(self) -> None: + """Test run(stream=True) yields AgentResponseUpdate objects.""" + from codex_sdk import AssistantMessage, ResultMessage, TextBlock + from codex_sdk.types import StreamEvent + + messages = [ + StreamEvent( + event={ + "type": "content_block_delta", + "delta": {"type": "text_delta", "text": "Streaming "}, + }, + uuid="event-1", + session_id="stream-session", + ), + StreamEvent( + event={ + "type": "content_block_delta", + "delta": {"type": "text_delta", "text": "response"}, + }, + uuid="event-2", + session_id="stream-session", + ), + AssistantMessage( + content=[TextBlock(text="Streaming response")], + model="codex-mini-latest", + ), + ResultMessage( + subtype="success", + duration_ms=100, + duration_api_ms=50, + is_error=False, + num_turns=1, + session_id="stream-session", + ), + ] + mock_client = self._create_mock_client(messages) + + with patch("agent_framework_codex._agent.CodexSDKClient", return_value=mock_client): + agent = CodexAgent() + updates: list[AgentResponseUpdate] = [] + async for update in agent.run("Hello", stream=True): + updates.append(update) + # StreamEvent yields text deltas (2 events) + assert len(updates) == 2 + assert updates[0].role == "assistant" + assert updates[0].text == "Streaming " + assert updates[1].text == "response" + + async def test_run_stream_raises_on_assistant_message_error(self) -> None: + """Test run raises AgentException when AssistantMessage has an error.""" + from agent_framework.exceptions import AgentException + from codex_sdk import AssistantMessage, ResultMessage, TextBlock + + messages = [ + AssistantMessage( + content=[TextBlock(text="Error details from API")], + model="codex-mini-latest", + error="invalid_request", + ), + ResultMessage( + subtype="success", + duration_ms=100, + duration_api_ms=50, + is_error=False, + num_turns=1, + session_id="error-session", + ), + ] + mock_client = self._create_mock_client(messages) + + with patch("agent_framework_codex._agent.CodexSDKClient", return_value=mock_client): + agent = CodexAgent() + with pytest.raises(AgentException) as exc_info: + async for _ in agent.run("Hello", stream=True): + pass + assert "Invalid request to Codex API" in str(exc_info.value) + assert "Error details from API" in str(exc_info.value) + + async def test_run_stream_raises_on_result_message_error(self) -> None: + """Test run raises AgentException when ResultMessage.is_error is True.""" + from agent_framework.exceptions import AgentException + from codex_sdk import ResultMessage + + messages = [ + ResultMessage( + subtype="error", + duration_ms=100, + duration_api_ms=50, + is_error=True, + num_turns=0, + session_id="error-session", + result="Model 'codex-mini-latest' not found", + ), + ] + mock_client = self._create_mock_client(messages) + + with patch("agent_framework_codex._agent.CodexSDKClient", return_value=mock_client): + agent = CodexAgent() + with pytest.raises(AgentException) as exc_info: + async for _ in agent.run("Hello", stream=True): + pass + assert "Model 'codex-mini-latest' not found" in str(exc_info.value) + + +# region Test CodexAgent Session Management + + +class TestCodexAgentSessionManagement: + """Tests for CodexAgent session management.""" + + def test_create_session(self) -> None: + """Test create_session creates a new session.""" + agent = CodexAgent() + session = agent.create_session() + assert isinstance(session, AgentSession) + assert session.service_session_id is None + + def test_create_session_with_service_session_id(self) -> None: + """Test create_session with existing service_session_id.""" + agent = CodexAgent() + session = agent.create_session(session_id="existing-session-123") + assert isinstance(session, AgentSession) + + async def test_ensure_session_creates_client(self) -> None: + """Test _ensure_session creates client when not started.""" + with patch("agent_framework_codex._agent.CodexSDKClient") as mock_client_class: + mock_client = MagicMock() + mock_client.connect = AsyncMock() + mock_client_class.return_value = mock_client + + agent = CodexAgent() + await agent._ensure_session(None) # type: ignore[reportPrivateUsage] + + assert agent._started # type: ignore[reportPrivateUsage] + mock_client.connect.assert_called_once() + + async def test_ensure_session_recreates_for_different_session(self) -> None: + """Test _ensure_session recreates client for different session ID.""" + with patch("agent_framework_codex._agent.CodexSDKClient") as mock_client_class: + mock_client1 = MagicMock() + mock_client1.connect = AsyncMock() + mock_client1.disconnect = AsyncMock() + + mock_client2 = MagicMock() + mock_client2.connect = AsyncMock() + + mock_client_class.side_effect = [mock_client1, mock_client2] + + agent = CodexAgent() + + # First session + await agent._ensure_session(None) # type: ignore[reportPrivateUsage] + assert agent._started # type: ignore[reportPrivateUsage] + + # Different session should recreate client + await agent._ensure_session("new-session-id") # type: ignore[reportPrivateUsage] + assert agent._current_session_id == "new-session-id" # type: ignore[reportPrivateUsage] + mock_client1.disconnect.assert_called_once() + + async def test_ensure_session_reuses_for_same_session(self) -> None: + """Test _ensure_session reuses client for same session ID.""" + with patch("agent_framework_codex._agent.CodexSDKClient") as mock_client_class: + mock_client = MagicMock() + mock_client.connect = AsyncMock() + mock_client_class.return_value = mock_client + + agent = CodexAgent() + + # First call + await agent._ensure_session("session-123") # type: ignore[reportPrivateUsage] + + # Same session should not recreate + await agent._ensure_session("session-123") # type: ignore[reportPrivateUsage] + + # Only called once + assert mock_client_class.call_count == 1 + + async def test_ensure_session_recreates_when_resumed_then_fresh(self) -> None: + """Test _ensure_session creates a new client when switching from a resumed session to a fresh one (None). + + Regression test: previously, _ensure_session only recreated the client when + (session_id and session_id != self._current_session_id), so a transition from + a named session back to None would silently reuse the old client. + """ + with patch("agent_framework_codex._agent.CodexSDKClient") as mock_client_class: + mock_client1 = MagicMock() + mock_client1.connect = AsyncMock() + mock_client1.disconnect = AsyncMock() + + mock_client2 = MagicMock() + mock_client2.connect = AsyncMock() + + mock_client_class.side_effect = [mock_client1, mock_client2] + + agent = CodexAgent() + + # Start with a resumed session + await agent._ensure_session("resumed-session-id") # type: ignore[reportPrivateUsage] + assert agent._current_session_id == "resumed-session-id" # type: ignore[reportPrivateUsage] + assert mock_client_class.call_count == 1 + + # Switch to a fresh session (None) — must create a new client + await agent._ensure_session(None) # type: ignore[reportPrivateUsage] + assert agent._current_session_id is None # type: ignore[reportPrivateUsage] + assert mock_client_class.call_count == 2 + mock_client1.disconnect.assert_called_once() + + +# region Test CodexAgent Tool Conversion + + +class TestCodexAgentToolConversion: + """Tests for CodexAgent tool conversion.""" + + def test_prepare_tools_creates_mcp_server(self) -> None: + """Test _prepare_tools creates MCP server for AF tools.""" + + @tool + def add(a: int, b: int) -> int: + """Add two numbers.""" + return a + b + + agent = CodexAgent(tools=[add]) + server, tool_names = agent._prepare_tools(agent._custom_tools) # type: ignore[reportPrivateUsage] + + assert server is not None + assert len(tool_names) == 1 + assert tool_names[0] == f"mcp__{TOOLS_MCP_SERVER_NAME}__add" + + def test_function_tool_to_sdk_mcp_tool(self) -> None: + """Test converting FunctionTool to SDK MCP tool.""" + + @tool + def greet(name: str) -> str: + """Greet someone.""" + return f"Hello, {name}!" + + agent = CodexAgent() + sdk_tool = agent._function_tool_to_sdk_mcp_tool(greet) # type: ignore[reportPrivateUsage] + + assert sdk_tool.name == "greet" + assert sdk_tool.description == "Greet someone." + assert sdk_tool.input_schema is not None + assert "properties" in sdk_tool.input_schema # type: ignore[operator] + + def test_function_tool_to_sdk_mcp_tool_preserves_defs_for_nested_types(self) -> None: + """Test that $defs is preserved for tools with nested Pydantic models.""" + from pydantic import BaseModel + + class Address(BaseModel): + street: str + city: str + + class Person(BaseModel): + name: str + address: Address + + @tool + def create_person(person: Person) -> str: + """Create a person with address.""" + return f"{person.name} lives at {person.address.street}, {person.address.city}" + + agent = CodexAgent() + sdk_tool = agent._function_tool_to_sdk_mcp_tool(create_person) # type: ignore[reportPrivateUsage] + + # Verify $defs is preserved in the schema + assert sdk_tool.input_schema is not None + assert "$defs" in sdk_tool.input_schema # type: ignore[operator] + assert "Address" in sdk_tool.input_schema["$defs"] # type: ignore[index] + # Verify the nested reference exists in properties + assert "person" in sdk_tool.input_schema["properties"] # type: ignore[index] + + async def test_tool_handler_success(self) -> None: + """Test tool handler executes successfully.""" + + @tool + def greet(name: str) -> str: + """Greet someone.""" + return f"Hello, {name}!" + + agent = CodexAgent() + sdk_tool = agent._function_tool_to_sdk_mcp_tool(greet) # type: ignore[reportPrivateUsage] + + result = await sdk_tool.handler({"name": "World"}) + assert result["content"][0]["text"] == "Hello, World!" + + async def test_tool_handler_error(self) -> None: + """Test tool handler handles errors.""" + + @tool + def failing_tool() -> str: + """A tool that fails.""" + raise ValueError("Something went wrong") + + agent = CodexAgent() + sdk_tool = agent._function_tool_to_sdk_mcp_tool(failing_tool) # type: ignore[reportPrivateUsage] + + result = await sdk_tool.handler({}) + assert "Error:" in result["content"][0]["text"] + assert "Something went wrong" in result["content"][0]["text"] + + +# region Test CodexAgent Permissions + + +class TestCodexAgentPermissions: + """Tests for CodexAgent permission handling.""" + + def test_default_permission_mode(self) -> None: + """Test default permission mode.""" + agent = CodexAgent() + assert agent._settings["permission_mode"] is None # type: ignore[reportPrivateUsage] + + def test_permission_mode_from_settings(self, monkeypatch: pytest.MonkeyPatch) -> None: + """Test permission mode from environment settings.""" + monkeypatch.setenv("CODEX_AGENT_PERMISSION_MODE", "acceptEdits") + settings = load_settings(CodexAgentSettings, env_prefix="CODEX_AGENT_") + assert settings["permission_mode"] == "acceptEdits" + + def test_permission_mode_in_options(self) -> None: + """Test permission mode in options.""" + options: CodexAgentOptions = { + "permission_mode": "bypassPermissions", + } + agent = CodexAgent(default_options=options) + assert agent._settings["permission_mode"] == "bypassPermissions" # type: ignore[reportPrivateUsage] + + +# region Test CodexAgent Error Handling + + +class TestCodexAgentErrorHandling: + """Tests for CodexAgent error handling.""" + + @staticmethod + async def _empty_gen() -> Any: + """Empty async generator.""" + if False: + yield + + async def test_handles_empty_response(self) -> None: + """Test handling of empty response.""" + mock_client = MagicMock() + mock_client.connect = AsyncMock() + mock_client.disconnect = AsyncMock() + mock_client.query = AsyncMock() + mock_client.set_model = AsyncMock() + mock_client.set_permission_mode = AsyncMock() + mock_client.receive_response = MagicMock(return_value=self._empty_gen()) + + with patch("agent_framework_codex._agent.CodexSDKClient", return_value=mock_client): + agent = CodexAgent() + response = await agent.run("Hello") + assert response.messages == [] + + +# region Test Format Prompt + + +class TestFormatPrompt: + """Tests for _format_prompt method.""" + + def test_format_empty_messages(self) -> None: + """Test formatting empty messages.""" + agent = CodexAgent() + result = agent._format_prompt([]) # type: ignore[reportPrivateUsage] + assert result == "" + + def test_format_none_messages(self) -> None: + """Test formatting None messages.""" + agent = CodexAgent() + result = agent._format_prompt(None) # type: ignore[reportPrivateUsage] + assert result == "" + + def test_format_user_message(self) -> None: + """Test formatting user message.""" + agent = CodexAgent() + msg = Message( + role="user", + contents=[Content.from_text(text="Hello")], + ) + result = agent._format_prompt([msg]) # type: ignore[reportPrivateUsage] + assert "Hello" in result + + def test_format_multiple_messages(self) -> None: + """Test formatting multiple messages.""" + agent = CodexAgent() + messages = [ + Message(role="user", contents=[Content.from_text(text="Hi")]), + Message(role="assistant", contents=[Content.from_text(text="Hello!")]), + Message(role="user", contents=[Content.from_text(text="How are you?")]), + ] + result = agent._format_prompt(messages) # type: ignore[reportPrivateUsage] + assert "Hi" in result + assert "Hello!" in result + assert "How are you?" in result + + +# region Test Build Options + + +class TestPrepareClientOptions: + """Tests for _prepare_client_options method.""" + + def test_prepare_client_options_with_settings(self, monkeypatch: pytest.MonkeyPatch) -> None: + """Test preparing options with settings.""" + monkeypatch.setenv("CODEX_AGENT_MODEL", "gpt-5.1-codex") + monkeypatch.setenv("CODEX_AGENT_MAX_TURNS", "15") + + agent = CodexAgent() + + with patch("agent_framework_codex._agent.SDKOptions") as mock_opts: + mock_opts.return_value = MagicMock() + agent._prepare_client_options() # type: ignore[reportPrivateUsage] + call_kwargs = mock_opts.call_args[1] + assert call_kwargs.get("model") == "gpt-5.1-codex" + assert call_kwargs.get("max_turns") == 15 + + def test_prepare_client_options_with_instructions(self) -> None: + """Test building options with instructions parameter.""" + agent = CodexAgent(instructions="Be helpful") + + with patch("agent_framework_codex._agent.SDKOptions") as mock_opts: + mock_opts.return_value = MagicMock() + agent._prepare_client_options() # type: ignore[reportPrivateUsage] + call_kwargs = mock_opts.call_args[1] + assert call_kwargs.get("system_prompt") == "Be helpful" + + def test_prepare_client_options_includes_custom_tools(self) -> None: + """Test that _prepare_client_options includes custom tools MCP server.""" + + @tool + def greet(name: str) -> str: + """Greet someone.""" + return f"Hello, {name}!" + + agent = CodexAgent(tools=[greet]) + + with patch("agent_framework_codex._agent.SDKOptions") as mock_opts: + mock_opts.return_value = MagicMock() + agent._prepare_client_options() # type: ignore[reportPrivateUsage] + call_kwargs = mock_opts.call_args[1] + assert "mcp_servers" in call_kwargs + assert TOOLS_MCP_SERVER_NAME in call_kwargs["mcp_servers"] + + +class TestApplyRuntimeOptions: + """Tests for _apply_runtime_options method.""" + + async def test_apply_runtime_model(self) -> None: + """Test applying runtime model option.""" + mock_client = MagicMock() + mock_client.set_model = AsyncMock() + mock_client.set_permission_mode = AsyncMock() + + agent = CodexAgent() + agent._client = mock_client # type: ignore[reportPrivateUsage] + + await agent._apply_runtime_options({"model": "gpt-5.1-codex"}) # type: ignore[reportPrivateUsage] + mock_client.set_model.assert_called_once_with("gpt-5.1-codex") + + async def test_apply_runtime_permission_mode(self) -> None: + """Test applying runtime permission_mode option.""" + mock_client = MagicMock() + mock_client.set_model = AsyncMock() + mock_client.set_permission_mode = AsyncMock() + + agent = CodexAgent() + agent._client = mock_client # type: ignore[reportPrivateUsage] + + await agent._apply_runtime_options({"permission_mode": "acceptEdits"}) # type: ignore[reportPrivateUsage] + mock_client.set_permission_mode.assert_called_once_with("acceptEdits") + + async def test_apply_runtime_options_none(self) -> None: + """Test applying None options does nothing.""" + mock_client = MagicMock() + mock_client.set_model = AsyncMock() + mock_client.set_permission_mode = AsyncMock() + + agent = CodexAgent() + agent._client = mock_client # type: ignore[reportPrivateUsage] + + await agent._apply_runtime_options(None) # type: ignore[reportPrivateUsage] + mock_client.set_model.assert_not_called() + mock_client.set_permission_mode.assert_not_called() + + +# region Test CodexAgent Structured Output + + +class TestCodexAgentStructuredOutput: + """Tests for CodexAgent structured output propagation.""" + + @staticmethod + async def _create_async_generator(items: list[Any]) -> Any: + """Helper to create async generator from list.""" + for item in items: + yield item + + def _create_mock_client(self, messages: list[Any]) -> MagicMock: + """Create a mock CodexSDKClient that yields given messages.""" + mock_client = MagicMock() + mock_client.connect = AsyncMock() + mock_client.disconnect = AsyncMock() + mock_client.query = AsyncMock() + mock_client.set_model = AsyncMock() + mock_client.set_permission_mode = AsyncMock() + mock_client.receive_response = MagicMock(return_value=self._create_async_generator(messages)) + return mock_client + + async def test_structured_output_propagated_to_response(self) -> None: + """Test that structured_output from ResultMessage is propagated to response.value.""" + from codex_sdk import AssistantMessage, ResultMessage, TextBlock + from codex_sdk.types import StreamEvent + + structured_data = {"name": "Alice", "age": 30} + messages = [ + StreamEvent( + event={ + "type": "content_block_delta", + "delta": {"type": "text_delta", "text": '{"name": "Alice", "age": 30}'}, + }, + uuid="event-1", + session_id="session-123", + ), + AssistantMessage( + content=[TextBlock(text='{"name": "Alice", "age": 30}')], + model="codex-mini-latest", + ), + ResultMessage( + subtype="success", + duration_ms=100, + duration_api_ms=50, + is_error=False, + num_turns=1, + session_id="session-123", + structured_output=structured_data, + ), + ] + mock_client = self._create_mock_client(messages) + + with patch("agent_framework_codex._agent.CodexSDKClient", return_value=mock_client): + agent = CodexAgent() + response = await agent.run("Return structured data") + assert response.value == structured_data + + async def test_structured_output_none_when_not_present(self) -> None: + """Test that response.value is None when structured_output is not present.""" + from codex_sdk import AssistantMessage, ResultMessage, TextBlock + from codex_sdk.types import StreamEvent + + messages = [ + StreamEvent( + event={ + "type": "content_block_delta", + "delta": {"type": "text_delta", "text": "Hello!"}, + }, + uuid="event-1", + session_id="session-123", + ), + AssistantMessage( + content=[TextBlock(text="Hello!")], + model="codex-mini-latest", + ), + ResultMessage( + subtype="success", + duration_ms=100, + duration_api_ms=50, + is_error=False, + num_turns=1, + session_id="session-123", + ), + ] + mock_client = self._create_mock_client(messages) + + with patch("agent_framework_codex._agent.CodexSDKClient", return_value=mock_client): + agent = CodexAgent() + response = await agent.run("Hello") + assert response.value is None