diff --git a/examples/agent_control_demo/demo_agent.py b/examples/agent_control_demo/demo_agent.py index 1857d3b7..b6cb38c7 100644 --- a/examples/agent_control_demo/demo_agent.py +++ b/examples/agent_control_demo/demo_agent.py @@ -146,17 +146,8 @@ async def test_scenario(name: str, func, input_text: str): print(f"\nāŒ ERROR: {e}") -async def run_demo(): - """Run the demo scenarios.""" - logger.info("Starting agent control demo") - print("\n" + "=" * 60) - print("AGENT CONTROL DEMO: Running Agent") - print("=" * 60) - - # Initialize the agent - print(f"\nšŸ¤– Initializing agent: {AGENT_NAME}") - print(f" Server: {SERVER_URL}") - +def initialize_demo_agent() -> bool: + """Initialize the demo agent and print actionable failure guidance.""" try: logger.info(f"Initializing agent: {AGENT_NAME}") agent_control.init( @@ -165,14 +156,18 @@ async def run_demo(): server_url=SERVER_URL, ) logger.info("Agent initialized successfully") + return True except Exception as e: logger.error(f"Failed to initialize agent: {e}") print(f"\nāŒ Failed to initialize agent: {e}") print("\nMake sure:") print(" 1. Server is running: cd server && make run") print(" 2. Controls are created: python setup_controls.py") - return + return False + +async def run_demo_scenarios() -> None: + """Run the scripted demo scenarios.""" # ========================================================================== # Test 1: Safe chat message # ========================================================================== @@ -236,7 +231,9 @@ async def run_demo(): "Get user info" # Will generate response with SSN ) - # Summary + +def print_demo_summary() -> None: + """Print the final demo summary.""" logger.info("All demo scenarios completed") print("\n" + "=" * 60) print("DEMO COMPLETE!") @@ -254,5 +251,31 @@ async def run_demo(): """) +async def run_demo_session() -> None: + """Initialize the agent and run the scripted demo.""" + if not initialize_demo_agent(): + return + + await run_demo_scenarios() + print_demo_summary() + + +async def run_demo() -> None: + """Run the demo scenarios.""" + logger.info("Starting agent control demo") + print("\n" + "=" * 60) + print("AGENT CONTROL DEMO: Running Agent") + print("=" * 60) + + # Initialize the agent + print(f"\nšŸ¤– Initializing agent: {AGENT_NAME}") + print(f" Server: {SERVER_URL}") + + try: + await run_demo_session() + finally: + await agent_control.ashutdown() + + if __name__ == "__main__": asyncio.run(run_demo()) diff --git a/examples/crewai/content_agent_protection.py b/examples/crewai/content_agent_protection.py index 43be4bb6..1c61ddae 100644 --- a/examples/crewai/content_agent_protection.py +++ b/examples/crewai/content_agent_protection.py @@ -48,14 +48,20 @@ AGENT_NAME = "crew-ai-customer-support" AGENT_DESCRIPTION = "Customer support crew with PII protection and access controls" -# Initialize Agent Control server_url = os.getenv("AGENT_CONTROL_URL", "http://localhost:8000") -agent_control.init( - agent_name=AGENT_NAME, - agent_description=AGENT_DESCRIPTION, - server_url=server_url, -) + +def initialize_agent_control() -> None: + """Initialize the SDK once for this example process.""" + current_agent = agent_control.current_agent() + if current_agent is not None and current_agent.agent_name == AGENT_NAME: + return + + agent_control.init( + agent_name=AGENT_NAME, + agent_description=AGENT_DESCRIPTION, + server_url=server_url, + ) # --- Define CrewAI Guardrails for Quality Validation --- @@ -511,7 +517,8 @@ def verify_setup(): return False -def main(): +def print_demo_intro() -> None: + """Print the demo header and description.""" print("=" * 60) print("CrewAI Customer Support: Agent Control + Guardrails") print("=" * 60) @@ -520,29 +527,34 @@ def main(): print("work together to ensure both safe AND high-quality responses.") print() - # Verify setup + +def verify_prerequisites() -> bool: + """Validate server setup and required environment variables.""" print("\n" + "="*60) print("SETUP VERIFICATION") print("="*60) if not verify_setup(): print("\nāŒ Setup verification failed. Please fix the issues above.") - return + return False - # Check API key if not os.getenv("OPENAI_API_KEY"): print("\nāŒ Error: OPENAI_API_KEY not set") print("Please set: export OPENAI_API_KEY='your-key-here'") - return + return False - print("\nāœ… Setup verified! Starting demos...\n") + return True - # Create crew and final output validator + +def run_demo_session() -> None: + """Initialize the SDK and run the CrewAI demo scenarios.""" + initialize_agent_control() + + print("\nāœ… Setup verified! Starting demos...\n") print("Creating customer support crew with multi-layer protection...") crew = create_support_crew() validate_final_output = create_final_output_validator() - # --- Scenario 1: Unauthorized Access Attempt (Agent Control PRE-execution) --- print("\n" + "="*50) print("SCENARIO 1: Unauthorized Access (Agent Control PRE)") print("="*50) @@ -559,7 +571,6 @@ def main(): print("\nšŸ’” Explanation: Agent Control blocks security violations immediately.") print(" No retries because unauthorized access is non-negotiable.") - # --- Scenario 2: PII Leakage in Response (Agent Control POST-execution) --- print("\n" + "="*50) print("SCENARIO 2: PII Leakage (Agent Control POST)") print("="*50) @@ -576,7 +587,6 @@ def main(): print("\nšŸ’” Explanation: Agent Control blocks PII violations immediately.") print(" No retries because PII leakage is a compliance violation.") - # --- Scenario 2.5: Poor Quality Response (CrewAI Guardrails with Retry) --- print("\n" + "="*50) print("SCENARIO 2.5: Quality Issues (CrewAI Guardrails)") print("="*50) @@ -601,7 +611,6 @@ def main(): print(" - Friendly tone (LLM-based)") print(" - Useful information (LLM-based)") - # --- Scenario 3: Final Output Validation (Catches Unprotected Tool PII) --- print("\n" + "="*50) print("SCENARIO 3: Final Output Validation - Catches PII from Unprotected Tool") print("="*50) @@ -616,7 +625,6 @@ def main(): print("- Final output validation catches the PII and blocks it") print() - # Create separate crew for Scenario 3 with unprotected tool print("Creating Scenario 3 crew with unprotected customer info tool...") scenario3_crew = create_scenario3_crew() @@ -631,7 +639,6 @@ def main(): print("\n[Validating Final Output]") try: - # Validate the final crew output for PII validated_output = validate_final_output(str(result3)) print("\nšŸ“ Result (Validated - No PII):") print(validated_output) @@ -677,5 +684,17 @@ def main(): """) +def main(): + print_demo_intro() + + if not verify_prerequisites(): + return + + try: + run_demo_session() + finally: + agent_control.shutdown() + + if __name__ == "__main__": main() diff --git a/examples/customer_support_agent/run_demo.py b/examples/customer_support_agent/run_demo.py index a0d7ca41..c4ee09eb 100644 --- a/examples/customer_support_agent/run_demo.py +++ b/examples/customer_support_agent/run_demo.py @@ -30,9 +30,14 @@ import logging import os import sys +from typing import TYPE_CHECKING +import agent_control from agent_control import AgentControlClient, agents, controls +if TYPE_CHECKING: + from support_agent import CustomerSupportAgent + # Configure logging to see SDK debug output logging.basicConfig( level=logging.INFO, @@ -68,7 +73,7 @@ async def reset_agent(): async with AgentControlClient(base_url=server_url) as client: # Check if agent exists try: - await agents.get_agent(client, agent_name) + await agents.get_agent(client, AGENT_NAME) logger.debug("Agent exists, proceeding with reset") except Exception as e: if "404" in str(e): @@ -93,7 +98,7 @@ async def reset_agent(): try: remove_result = await agents.remove_agent_control( client, - agent_name, + AGENT_NAME, control_id, ) if remove_result.get("removed_direct_association"): @@ -104,7 +109,7 @@ async def reset_agent(): logger.debug( "Error removing direct control %s from %s: %s", control_id, - agent_name, + AGENT_NAME, e, ) @@ -244,7 +249,7 @@ async def run_interactive(agent: CustomerSupportAgent): print("Usage: /comprehensive [customer_id] ") print("Example: /comprehensive C001 I need help with a refund") else: - print(f"Running comprehensive support flow (multi-span)...") + print("Running comprehensive support flow (multi-span)...") if customer_id: print(f" Customer: {customer_id}") print(f" Message: {message}") @@ -292,7 +297,13 @@ async def run_interactive(agent: CustomerSupportAgent): response = await agent.chat(user_input) print(f"Agent: {response}") - print() + +async def run_demo_mode(agent: CustomerSupportAgent, automated: bool) -> None: + """Run the selected demo mode.""" + if automated: + await run_automated_tests(agent) + else: + await run_interactive(agent) async def run_safe_tests(agent: CustomerSupportAgent): @@ -609,19 +620,20 @@ def main(): asyncio.run(reset_agent()) return - # Create agent instance (this triggers SDK initialization) - logger.info("Initializing customer support agent") - agent = get_agent() - logger.info("Agent initialized successfully") + try: + # Create agent instance (this triggers SDK initialization) + logger.info("Initializing customer support agent") + agent = get_agent() + logger.info("Agent initialized successfully") - # Run appropriate mode - mode = "automated" if (args.automated or args.mode == "automated") else "interactive" - logger.info(f"Starting demo in {mode} mode") + # Run appropriate mode + automated = args.automated or args.mode == "automated" + mode = "automated" if automated else "interactive" + logger.info(f"Starting demo in {mode} mode") - if args.automated or args.mode == "automated": - asyncio.run(run_automated_tests(agent)) - else: - asyncio.run(run_interactive(agent)) + asyncio.run(run_demo_mode(agent, automated=automated)) + finally: + agent_control.shutdown() if __name__ == "__main__": diff --git a/examples/customer_support_agent/support_agent.py b/examples/customer_support_agent/support_agent.py index f7027110..74745199 100644 --- a/examples/customer_support_agent/support_agent.py +++ b/examples/customer_support_agent/support_agent.py @@ -22,19 +22,32 @@ from agent_control import ControlViolationError, control from agent_control.tracing import with_trace +AGENT_NAME = "customer-support-agent" +AGENT_DESCRIPTION = ( + "AI-powered customer support assistant that helps with inquiries, " + "searches knowledge bases, and creates support tickets." +) + + # ============================================================================= # SDK INITIALIZATION # ============================================================================= # Call this once at the start of your application. # The agent registers with the server and loads associated controls. -agent_control.init( - agent_name="customer-support-agent", - agent_description="AI-powered customer support assistant that helps with inquiries, " - "searches knowledge bases, and creates support tickets.", - agent_version="1.0.0", - observability_enabled=True, -) + +def initialize_agent_control() -> None: + """Initialize the SDK once for this example process.""" + current_agent = agent_control.current_agent() + if current_agent is not None and current_agent.agent_name == AGENT_NAME: + return + + agent_control.init( + agent_name=AGENT_NAME, + agent_description=AGENT_DESCRIPTION, + agent_version="1.0.0", + observability_enabled=True, + ) # ============================================================================= @@ -268,6 +281,7 @@ class CustomerSupportAgent: """ def __init__(self): + initialize_agent_control() self.conversation_history: list[dict[str, str]] = [] async def chat(self, user_message: str) -> str: @@ -410,42 +424,50 @@ async def handle_comprehensive_support( # DIRECT EXECUTION # ============================================================================= +async def run_smoke_tests() -> None: + """Run the direct-execution smoke tests.""" + agent = CustomerSupportAgent() + + print("\n--- Test: Normal chat (1 span) ---") + response = await agent.chat("Hello, I need help with a refund") + print(f"Agent: {response}") + + print("\n--- Test: Customer lookup (1 span) ---") + response = await agent.lookup("C001") + print(f"Agent: {response}") + + print("\n--- Test: Knowledge base search (1 span) ---") + response = await agent.search("refund") + print(f"Agent: {response}") + + print("\n--- Test: Create ticket (1 span) ---") + response = await agent.create_support_ticket( + subject="Refund request", + description="I would like to return my order", + priority="medium" + ) + print(f"Agent: {response}") + + print("\n--- Test: Comprehensive support (2-3 spans in one trace) ---") + response = await agent.handle_comprehensive_support( + user_message="I need help with a refund for my recent order", + customer_id="C001" + ) + print(f"Agent: {response}") + + print("\n--- Test: Comprehensive support without customer (2 spans) ---") + response = await agent.handle_comprehensive_support( + user_message="How do I reset my password?" + ) + print(f"Agent: {response}") + + if __name__ == "__main__": # Quick test async def main(): - agent = CustomerSupportAgent() - - print("\n--- Test: Normal chat (1 span) ---") - response = await agent.chat("Hello, I need help with a refund") - print(f"Agent: {response}") - - print("\n--- Test: Customer lookup (1 span) ---") - response = await agent.lookup("C001") - print(f"Agent: {response}") - - print("\n--- Test: Knowledge base search (1 span) ---") - response = await agent.search("refund") - print(f"Agent: {response}") - - print("\n--- Test: Create ticket (1 span) ---") - response = await agent.create_support_ticket( - subject="Refund request", - description="I would like to return my order", - priority="medium" - ) - print(f"Agent: {response}") - - print("\n--- Test: Comprehensive support (2-3 spans in one trace) ---") - response = await agent.handle_comprehensive_support( - user_message="I need help with a refund for my recent order", - customer_id="C001" - ) - print(f"Agent: {response}") - - print("\n--- Test: Comprehensive support without customer (2 spans) ---") - response = await agent.handle_comprehensive_support( - user_message="How do I reset my password?" - ) - print(f"Agent: {response}") + try: + await run_smoke_tests() + finally: + await agent_control.ashutdown() asyncio.run(main()) diff --git a/examples/deepeval/qa_agent.py b/examples/deepeval/qa_agent.py index cfb5cc23..328c6181 100755 --- a/examples/deepeval/qa_agent.py +++ b/examples/deepeval/qa_agent.py @@ -29,11 +29,13 @@ import logging import os import sys -from uuid import UUID import agent_control from agent_control import ControlViolationError, control +AGENT_NAME = "qa-agent-with-deepeval" +AGENT_DESCRIPTION = "Q&A Agent with DeepEval" + # Enable DEBUG logging for agent_control to see what's happening logging.basicConfig(level=logging.DEBUG, format='%(name)s - %(levelname)s - %(message)s') logging.getLogger('agent_control').setLevel(logging.DEBUG) @@ -42,22 +44,27 @@ # SDK INITIALIZATION # ============================================================================= -agent_control.init( - agent_name="qa-agent-with-deepeval", - agent_description="Q&A Agent with DeepEval", - agent_version="1.0.0", -) - -# Debug: Check if controls were loaded -controls = agent_control.get_server_controls() -print(f"DEBUG: Loaded {len(controls) if controls else 0} controls from server") -if controls: - for c in controls: - ctrl_def = c.get('control', {}) - print(f" - {c['name']}:") - print(f" enabled: {ctrl_def.get('enabled', False)}") - print(f" execution: {ctrl_def.get('execution', 'NOT SET')}") - print(f" scope: {ctrl_def.get('scope', {})}") +def initialize_agent_control() -> None: + """Initialize the SDK once for this example process.""" + current_agent = agent_control.current_agent() + if current_agent is not None and current_agent.agent_name == AGENT_NAME: + return + + agent_control.init( + agent_name=AGENT_NAME, + agent_description=AGENT_DESCRIPTION, + agent_version="1.0.0", + ) + + controls = agent_control.get_server_controls() + print(f"DEBUG: Loaded {len(controls) if controls else 0} controls from server") + if controls: + for c in controls: + ctrl_def = c.get('control', {}) + print(f" - {c['name']}:") + print(f" enabled: {ctrl_def.get('enabled', False)}") + print(f" execution: {ctrl_def.get('execution', 'NOT SET')}") + print(f" scope: {ctrl_def.get('scope', {})}") # ============================================================================= @@ -190,6 +197,7 @@ class QAAgent: """ def __init__(self): + initialize_agent_control() self.conversation_history: list[dict[str, str]] = [] async def ask(self, question: str) -> str: @@ -202,9 +210,9 @@ async def ask(self, question: str) -> str: self.conversation_history.append({"role": "user", "content": question}) # Debug: Check if agent is still initialized - import agent_control - current_agent = agent_control._current_agent if hasattr(agent_control, '_current_agent') else None - print(f"DEBUG: Current agent before call: {current_agent.agent_name if current_agent else 'NONE'}") + current_agent = agent_control.current_agent() + current_agent_name = current_agent.agent_name if current_agent else "NONE" + print(f"DEBUG: Current agent before call: {current_agent_name}") try: # Get answer - protected by DeepEval controls @@ -358,6 +366,11 @@ async def run_interactive(agent: QAAgent): # MAIN # ============================================================================= +async def run_demo_session() -> None: + """Create the agent and enter interactive mode.""" + agent = QAAgent() + await run_interactive(agent) + async def main(): """Run the Q&A agent.""" @@ -389,9 +402,10 @@ async def main(): print(" Run setup_controls.py first to configure the agent.") sys.exit(1) - # Create and run agent - agent = QAAgent() - await run_interactive(agent) + try: + await run_demo_session() + finally: + await agent_control.ashutdown() if __name__ == "__main__": diff --git a/examples/langchain/langgraph_auto_schema_agent.py b/examples/langchain/langgraph_auto_schema_agent.py index b61c1e79..6e685cc5 100644 --- a/examples/langchain/langgraph_auto_schema_agent.py +++ b/examples/langchain/langgraph_auto_schema_agent.py @@ -237,6 +237,14 @@ def _print_auto_derived_steps() -> None: async def main() -> None: """Run the demo end-to-end.""" + try: + await run_demo_session() + finally: + await agent_control.ashutdown() + + +async def run_demo_session() -> None: + """Initialize the SDK and run the LangGraph scenarios.""" print("Initializing Agent Control (no explicit steps passed)...") agent_control.init( agent_name=AGENT_NAME, @@ -247,7 +255,6 @@ async def main() -> None: _print_auto_derived_steps() app = _build_graph() - scenarios = [ "Track order 1001 and include its history", "Issue a refund for order 2048 because it was late (49 dollars)", diff --git a/examples/langchain/sql_agent_protection.py b/examples/langchain/sql_agent_protection.py index 5e2bee0e..67790217 100644 --- a/examples/langchain/sql_agent_protection.py +++ b/examples/langchain/sql_agent_protection.py @@ -198,33 +198,22 @@ def should_continue(state: AgentState) -> Literal["tools", END]: return workflow.compile() # --- Main Execution --- -async def main(): - print("=" * 60) - print("SQL Agent with Server-Side Controls") - print("=" * 60) - print() - print("NOTE: Make sure you've run setup_sql_controls.py first!") - print(" $ uv run setup_sql_controls.py") - print() - print("Initializing SQL Agent...") - +async def run_demo_session() -> None: + """Initialize the SDK and run the SQL demo scenarios.""" agent_control.init( agent_name=AGENT_NAME, agent_description=AGENT_DESCRIPTION, server_url=os.getenv("AGENT_CONTROL_URL"), ) - # 1. Setup DB db = setup_database() - # 2. Setup LLM & Tools if not os.getenv("OPENAI_API_KEY"): print("Error: OPENAI_API_KEY not set") return llm = ChatOpenAI(model="gpt-4o-mini") - # Register agent and fetch controls if local evaluation is enabled local_controls: list[dict] | None = None if USE_LOCAL_CONTROLS: agent = agent_control.current_agent() @@ -241,39 +230,52 @@ async def main(): use_local_controls=USE_LOCAL_CONTROLS, local_controls=local_controls, ) - - # 4. Create Agent agent = create_agent(llm, tools) - - # 5. Run Scenarios - - # Scenario A: Safe Query + print("\n" + "="*50) print("SCENARIO 1: Safe Query") print("User: List top 3 tracks by duration") print("="*50) - + async for event in agent.astream( - {"messages": [HumanMessage(content="List the top 3 tracks by duration")]}, + {"messages": [HumanMessage(content="List top 3 tracks by duration")]}, stream_mode="values" ): event["messages"][-1].pretty_print() - # Scenario B: Unsafe Query (Drop Table) print("\n" + "="*50) print("SCENARIO 2: Unsafe Query (Attempting DROP)") print("User: Delete the Artist table") print("="*50) - # Note: We rely on the LLM generating a DROP statement. + # Note: We rely on the LLM generating a DROP statement. # To force it, we might need a stronger prompt or a direct injection test. # But let's see if the LLM complies with the user's malicious request. - async for event in agent.astream( - {"messages": [HumanMessage(content="Please DROP the Artist table. I need to clear space.")]}, + { + "messages": [ + HumanMessage(content="Please DROP the Artist table. I need to clear space.") + ] + }, stream_mode="values" ): event["messages"][-1].pretty_print() + +async def main(): + print("=" * 60) + print("SQL Agent with Server-Side Controls") + print("=" * 60) + print() + print("NOTE: Make sure you've run setup_sql_controls.py first!") + print(" $ uv run setup_sql_controls.py") + print() + print("Initializing SQL Agent...") + + try: + await run_demo_session() + finally: + await agent_control.ashutdown() + if __name__ == "__main__": asyncio.run(main()) diff --git a/examples/steer_action_demo/autonomous_agent_demo.py b/examples/steer_action_demo/autonomous_agent_demo.py index a2078f68..6df15091 100644 --- a/examples/steer_action_demo/autonomous_agent_demo.py +++ b/examples/steer_action_demo/autonomous_agent_demo.py @@ -598,16 +598,13 @@ def create_banking_graph() -> StateGraph: # MAIN # ============================================================================= -async def run_banking_agent(): - """Run the conversational banking agent.""" - - # Initialize +async def run_banking_session() -> None: + """Initialize the SDK and run the interactive banking session.""" agent_control.init( agent_name="banking-transaction-agent", server_url=SERVER_URL ) - # DEBUG: Check if controls were loaded controls = agent_control.get_server_controls() print(f"\nšŸ” DEBUG: Loaded {len(controls) if controls else 0} controls from server") if controls: @@ -621,7 +618,6 @@ async def run_banking_agent(): else: print(" āš ļø WARNING: No controls loaded! Agent will allow all operations.") - # DEBUG: Check registered steps registered_steps = agent_control.get_registered_steps() print(f"\nšŸ” DEBUG: Registered steps: {len(registered_steps)}") for step in registered_steps: @@ -639,7 +635,6 @@ async def run_banking_agent(): print("\n" + "-"*80) try: - # Get user request request = user_input("Describe the wire transfer you'd like to make (or 'quit'): ") if request.lower() in ['quit', 'exit', 'q']: @@ -649,10 +644,7 @@ async def run_banking_agent(): if not request: continue - # Parse request transfer_data = await parse_transfer_request(request) - - # Confirm understanding agent_say(f"I understand you want to send ${transfer_data['amount']:,.2f} to {transfer_data['recipient_name']} in {transfer_data['destination_country']}.") confirm = user_input("Is this correct? (yes/no): ") @@ -660,9 +652,7 @@ async def run_banking_agent(): agent_say("Let's try again.") continue - # Process transfer agent_say("Let me process this transfer for you...") - initial_state: AgentState = { "messages": [], "transfer_request": transfer_data, @@ -675,8 +665,6 @@ async def run_banking_agent(): } await app.ainvoke(initial_state) - - # Done print("\n" + "-"*80) except KeyboardInterrupt: @@ -691,6 +679,14 @@ async def run_banking_agent(): print("="*80) +async def run_banking_agent(): + """Run the conversational banking agent.""" + try: + await run_banking_session() + finally: + await agent_control.ashutdown() + + if __name__ == "__main__": if not os.getenv("OPENAI_API_KEY"): print("āŒ Error: OPENAI_API_KEY environment variable not set") diff --git a/examples/strands_agents/README.md b/examples/strands_agents/README.md index 8f042ede..d5c57fbc 100644 --- a/examples/strands_agents/README.md +++ b/examples/strands_agents/README.md @@ -26,7 +26,6 @@ uv run streamlit run interactive_demo/interactive_support_demo.py # steering demo uv run steering_demo/setup_email_controls.py uv run streamlit run steering_demo/email_safety_demo.py - ``` Full walkthrough: https://docs.agentcontrol.dev/examples/aws-strands