Skip to content
140 changes: 140 additions & 0 deletions examples/agent_patterns/human_in_the_loop.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,140 @@
"""Human-in-the-loop example with tool approval.

This example demonstrates how to:
1. Define tools that require approval before execution
2. Handle interruptions when tool approval is needed
3. Serialize/deserialize run state to continue execution later
4. Approve or reject tool calls based on user input
"""

import asyncio
import json

from agents import Agent, Runner, RunState, ToolApprovalItem, function_tool


@function_tool
async def get_weather(city: str) -> str:
"""Get the weather for a given city.

Args:
city: The city to get weather for.

Returns:
Weather information for the city.
"""
return f"The weather in {city} is sunny"


async def _needs_temperature_approval(_ctx, params, _call_id) -> bool:
"""Check if temperature tool needs approval."""
return "Oakland" in params.get("city", "")


@function_tool(
# Dynamic approval: only require approval for Oakland
needs_approval=_needs_temperature_approval
)
async def get_temperature(city: str) -> str:
"""Get the temperature for a given city.

Args:
city: The city to get temperature for.

Returns:
Temperature information for the city.
"""
return f"The temperature in {city} is 20° Celsius"


# Main agent with tool that requires approval
agent = Agent(
name="Weather Assistant",
instructions=(
"You are a helpful weather assistant. "
"Answer questions about weather and temperature using the available tools."
),
tools=[get_weather, get_temperature],
)


async def confirm(question: str) -> bool:
"""Prompt user for yes/no confirmation.

Args:
question: The question to ask.

Returns:
True if user confirms, False otherwise.
"""
# Note: In a real application, you would use proper async input
# For now, using synchronous input with run_in_executor
loop = asyncio.get_event_loop()
answer = await loop.run_in_executor(None, input, f"{question} (y/n): ")
normalized = answer.strip().lower()
return normalized in ("y", "yes")


async def main():
"""Run the human-in-the-loop example."""
result = await Runner.run(
agent,
"What is the weather and temperature in Oakland?",
)

has_interruptions = len(result.interruptions) > 0

while has_interruptions:
print("\n" + "=" * 80)
print("Run interrupted - tool approval required")
print("=" * 80)

# Storing state to file (demonstrating serialization)
state = result.to_state()
state_json = state.to_json()
with open("result.json", "w") as f:
json.dump(state_json, f, indent=2)

print("State saved to result.json")

# From here on you could run things on a different thread/process

# Reading state from file (demonstrating deserialization)
print("Loading state from result.json")
with open("result.json") as f:
stored_state_json = json.load(f)

state = RunState.from_json(agent, stored_state_json)

# Process each interruption
for interruption in result.interruptions:
if not isinstance(interruption, ToolApprovalItem):
continue

print("\nTool call details:")
print(f" Agent: {interruption.agent.name}")
print(f" Tool: {interruption.raw_item.name}")
print(f" Arguments: {interruption.raw_item.arguments}")

confirmed = await confirm("\nDo you approve this tool call?")

if confirmed:
print(f"✓ Approved: {interruption.raw_item.name}")
state.approve(interruption)
else:
print(f"✗ Rejected: {interruption.raw_item.name}")
state.reject(interruption)

# Resume execution with the updated state
print("\nResuming agent execution...")
result = await Runner.run(agent, state)
has_interruptions = len(result.interruptions) > 0

print("\n" + "=" * 80)
print("Final Output:")
print("=" * 80)
print(result.final_output)


if __name__ == "__main__":
asyncio.run(main())
123 changes: 123 additions & 0 deletions examples/agent_patterns/human_in_the_loop_stream.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,123 @@
"""Human-in-the-loop example with streaming.

This example demonstrates the human-in-the-loop (HITL) pattern with streaming.
The agent will pause execution when a tool requiring approval is called,
allowing you to approve or reject the tool call before continuing.

The streaming version provides real-time feedback as the agent processes
the request, then pauses for approval when needed.
"""

import asyncio

from agents import Agent, Runner, ToolApprovalItem, function_tool


async def _needs_temperature_approval(_ctx, params, _call_id) -> bool:
"""Check if temperature tool needs approval."""
return "Oakland" in params.get("city", "")


@function_tool(
# Dynamic approval: only require approval for Oakland
needs_approval=_needs_temperature_approval
)
async def get_temperature(city: str) -> str:
"""Get the temperature for a given city.

Args:
city: The city to get temperature for.

Returns:
Temperature information for the city.
"""
return f"The temperature in {city} is 20° Celsius"


@function_tool
async def get_weather(city: str) -> str:
"""Get the weather for a given city.

Args:
city: The city to get weather for.

Returns:
Weather information for the city.
"""
return f"The weather in {city} is sunny."


async def confirm(question: str) -> bool:
"""Prompt user for yes/no confirmation.

Args:
question: The question to ask.

Returns:
True if user confirms, False otherwise.
"""
loop = asyncio.get_event_loop()
answer = await loop.run_in_executor(None, input, f"{question} (y/n): ")
return answer.strip().lower() in ["y", "yes"]


async def main():
"""Run the human-in-the-loop example."""
main_agent = Agent(
name="Weather Assistant",
instructions=(
"You are a helpful weather assistant. "
"Answer questions about weather and temperature using the available tools."
),
tools=[get_temperature, get_weather],
)

# Run the agent with streaming
result = Runner.run_streamed(
main_agent,
"What is the weather and temperature in Oakland?",
)
async for _ in result.stream_events():
pass # Process streaming events silently or could print them

# Handle interruptions
while len(result.interruptions) > 0:
print("\n" + "=" * 80)
print("Human-in-the-loop: approval required for the following tool calls:")
print("=" * 80)

state = result.to_state()

for interruption in result.interruptions:
if not isinstance(interruption, ToolApprovalItem):
continue

print("\nTool call details:")
print(f" Agent: {interruption.agent.name}")
print(f" Tool: {interruption.raw_item.name}")
print(f" Arguments: {interruption.raw_item.arguments}")

confirmed = await confirm("\nDo you approve this tool call?")

if confirmed:
print(f"✓ Approved: {interruption.raw_item.name}")
state.approve(interruption)
else:
print(f"✗ Rejected: {interruption.raw_item.name}")
state.reject(interruption)

# Resume execution with streaming
print("\nResuming agent execution...")
result = Runner.run_streamed(main_agent, state)
async for _ in result.stream_events():
pass # Process streaming events silently or could print them

print("\n" + "=" * 80)
print("Final Output:")
print("=" * 80)
print(result.final_output)
print("\nDone!")


if __name__ == "__main__":
asyncio.run(main())
117 changes: 117 additions & 0 deletions examples/memory/memory_session_hitl_example.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,117 @@
"""
Example demonstrating SQLite in-memory session with human-in-the-loop (HITL) tool approval.

This example shows how to use SQLite in-memory session memory combined with
human-in-the-loop tool approval. The session maintains conversation history while
requiring approval for specific tool calls.
"""

import asyncio

from agents import Agent, Runner, SQLiteSession, function_tool


async def _needs_approval(_ctx, _params, _call_id) -> bool:
"""Always require approval for weather tool."""
return True


@function_tool(needs_approval=_needs_approval)
def get_weather(location: str) -> str:
"""Get weather for a location.

Args:
location: The location to get weather for

Returns:
Weather information as a string
"""
# Simulated weather data
weather_data = {
"san francisco": "Foggy, 58°F",
"oakland": "Sunny, 72°F",
"new york": "Rainy, 65°F",
}
# Check if any city name is in the provided location string
location_lower = location.lower()
for city, weather in weather_data.items():
if city in location_lower:
return weather
return f"Weather data not available for {location}"


async def prompt_yes_no(question: str) -> bool:
"""Prompt user for yes/no answer.

Args:
question: The question to ask

Returns:
True if user answered yes, False otherwise
"""
print(f"\n{question} (y/n): ", end="", flush=True)
loop = asyncio.get_event_loop()
answer = await loop.run_in_executor(None, input)
normalized = answer.strip().lower()
return normalized in ("y", "yes")


async def main():
# Create an agent with a tool that requires approval
agent = Agent(
name="HITL Assistant",
instructions="You help users with information. Always use available tools when appropriate. Keep responses concise.",
tools=[get_weather],
)

# Create an in-memory SQLite session instance that will persist across runs
session = SQLiteSession(":memory:")
session_id = session.session_id

print("=== Memory Session + HITL Example ===")
print(f"Session id: {session_id}")
print("Enter a message to chat with the agent. Submit an empty line to exit.")
print("The agent will ask for approval before using tools.\n")

while True:
# Get user input
print("You: ", end="", flush=True)
loop = asyncio.get_event_loop()
user_message = await loop.run_in_executor(None, input)

if not user_message.strip():
break

# Run the agent
result = await Runner.run(agent, user_message, session=session)

# Handle interruptions (tool approvals)
while result.interruptions:
# Get the run state
state = result.to_state()

for interruption in result.interruptions:
tool_name = interruption.raw_item.name # type: ignore[union-attr]
args = interruption.raw_item.arguments or "(no arguments)" # type: ignore[union-attr]

approved = await prompt_yes_no(
f"Agent {interruption.agent.name} wants to call '{tool_name}' with {args}. Approve?"
)

if approved:
state.approve(interruption)
print("Approved tool call.")
else:
state.reject(interruption)
print("Rejected tool call.")

# Resume the run with the updated state
result = await Runner.run(agent, state, session=session)

# Display the response
reply = result.final_output or "[No final output produced]"
print(f"Assistant: {reply}\n")


if __name__ == "__main__":
asyncio.run(main())
Loading