From 420471088e3f8928dab0d8e20e9bf79b856fffbf Mon Sep 17 00:00:00 2001 From: Abhinav Gupta Date: Tue, 10 Mar 2026 22:21:21 +0530 Subject: [PATCH 1/4] fix(examples): shut down SDK in short-lived demos --- examples/agent_control_demo/demo_agent.py | 169 +++++----- examples/crewai/content_agent_protection.py | 297 +++++++++--------- examples/customer_support_agent/run_demo.py | 44 ++- .../customer_support_agent/support_agent.py | 94 +++--- examples/deepeval/qa_agent.py | 109 ++++--- .../langchain/langgraph_auto_schema_agent.py | 65 ++-- examples/langchain/sql_agent_protection.py | 138 ++++---- .../autonomous_agent_demo.py | 148 ++++----- examples/strands_agents/README.md | 1 - 9 files changed, 565 insertions(+), 500 deletions(-) diff --git a/examples/agent_control_demo/demo_agent.py b/examples/agent_control_demo/demo_agent.py index 1857d3b7..196343e7 100644 --- a/examples/agent_control_demo/demo_agent.py +++ b/examples/agent_control_demo/demo_agent.py @@ -158,90 +158,91 @@ async def run_demo(): print(f" Server: {SERVER_URL}") try: - logger.info(f"Initializing agent: {AGENT_NAME}") - agent_control.init( - agent_name=AGENT_NAME, - agent_description="Demo chatbot for testing controls", - server_url=SERVER_URL, + try: + logger.info(f"Initializing agent: {AGENT_NAME}") + agent_control.init( + agent_name=AGENT_NAME, + agent_description="Demo chatbot for testing controls", + server_url=SERVER_URL, + ) + logger.info("Agent initialized successfully") + except Exception as e: + logger.error(f"Failed to initialize agent: {e}") + print(f"\nāŒ Failed to initialize agent: {e}") + print("\nMake sure:") + print(" 1. Server is running: cd server && make run") + print(" 2. Controls are created: python setup_controls.py") + return + + # ========================================================================== + # Test 1: Safe chat message + # ========================================================================== + await test_scenario( + "Safe Chat Message", + chat, + "Hello, how can you help me today?" ) - logger.info("Agent initialized successfully") - except Exception as e: - logger.error(f"Failed to initialize agent: {e}") - print(f"\nāŒ Failed to initialize agent: {e}") - print("\nMake sure:") - print(" 1. Server is running: cd server && make run") - print(" 2. Controls are created: python setup_controls.py") - return - - # ========================================================================== - # Test 1: Safe chat message - # ========================================================================== - await test_scenario( - "Safe Chat Message", - chat, - "Hello, how can you help me today?" - ) - - # ========================================================================== - # Test 2: Chat that would leak SSN (should be blocked by OUTPUT control) - # ========================================================================== - await test_scenario( - "Chat Request That Would Leak SSN", - chat, - "Tell me the user info" # Response contains SSN, should be blocked - ) - - # ========================================================================== - # Test 3: Safe SQL query - # ========================================================================== - await test_scenario( - "Safe SQL Query", - execute_query, - "SELECT * FROM users WHERE id = 123" - ) - - # ========================================================================== - # Test 4: Dangerous SQL query (should be blocked by INPUT control) - # ========================================================================== - await test_scenario( - "Dangerous SQL - DROP TABLE", - execute_query, - "DROP TABLE users" # Should be blocked before execution - ) - - # ========================================================================== - # Test 5: Another dangerous SQL query - # ========================================================================== - await test_scenario( - "Dangerous SQL - DELETE", - execute_query, - "DELETE FROM users WHERE 1=1" # Should be blocked - ) - - # ========================================================================== - # Test 6: Process request with all controls - # ========================================================================== - await test_scenario( - "Process Safe Request (All Controls)", - process_request, - "What's the weather today?" - ) - - # ========================================================================== - # Test 7: Process request that triggers output control - # ========================================================================== - await test_scenario( - "Process Request with PII in Response (All Controls)", - process_request, - "Get user info" # Will generate response with SSN - ) - - # Summary - logger.info("All demo scenarios completed") - print("\n" + "=" * 60) - print("DEMO COMPLETE!") - print("=" * 60) - print(""" + + # ========================================================================== + # Test 2: Chat that would leak SSN (should be blocked by OUTPUT control) + # ========================================================================== + await test_scenario( + "Chat Request That Would Leak SSN", + chat, + "Tell me the user info" # Response contains SSN, should be blocked + ) + + # ========================================================================== + # Test 3: Safe SQL query + # ========================================================================== + await test_scenario( + "Safe SQL Query", + execute_query, + "SELECT * FROM users WHERE id = 123" + ) + + # ========================================================================== + # Test 4: Dangerous SQL query (should be blocked by INPUT control) + # ========================================================================== + await test_scenario( + "Dangerous SQL - DROP TABLE", + execute_query, + "DROP TABLE users" # Should be blocked before execution + ) + + # ========================================================================== + # Test 5: Another dangerous SQL query + # ========================================================================== + await test_scenario( + "Dangerous SQL - DELETE", + execute_query, + "DELETE FROM users WHERE 1=1" # Should be blocked + ) + + # ========================================================================== + # Test 6: Process request with all controls + # ========================================================================== + await test_scenario( + "Process Safe Request (All Controls)", + process_request, + "What's the weather today?" + ) + + # ========================================================================== + # Test 7: Process request that triggers output control + # ========================================================================== + await test_scenario( + "Process Request with PII in Response (All Controls)", + process_request, + "Get user info" # Will generate response with SSN + ) + + # Summary + logger.info("All demo scenarios completed") + print("\n" + "=" * 60) + print("DEMO COMPLETE!") + print("=" * 60) + print(""" Summary: āœ… Safe requests passed through 🚫 Dangerous SQL blocked by INPUT control (pre-execution) @@ -252,6 +253,8 @@ async def run_demo(): - All evaluations logged for audit - Centralized control management """) + finally: + await agent_control.ashutdown() if __name__ == "__main__": diff --git a/examples/crewai/content_agent_protection.py b/examples/crewai/content_agent_protection.py index 43be4bb6..9fe3a2e5 100644 --- a/examples/crewai/content_agent_protection.py +++ b/examples/crewai/content_agent_protection.py @@ -48,14 +48,20 @@ AGENT_NAME = "crew-ai-customer-support" AGENT_DESCRIPTION = "Customer support crew with PII protection and access controls" -# Initialize Agent Control server_url = os.getenv("AGENT_CONTROL_URL", "http://localhost:8000") -agent_control.init( - agent_name=AGENT_NAME, - agent_description=AGENT_DESCRIPTION, - server_url=server_url, -) + +def initialize_agent_control() -> None: + """Initialize the SDK once for this example process.""" + current_agent = agent_control.current_agent() + if current_agent is not None and current_agent.agent_name == AGENT_NAME: + return + + agent_control.init( + agent_name=AGENT_NAME, + agent_description=AGENT_DESCRIPTION, + server_url=server_url, + ) # --- Define CrewAI Guardrails for Quality Validation --- @@ -512,144 +518,147 @@ def verify_setup(): def main(): - print("=" * 60) - print("CrewAI Customer Support: Agent Control + Guardrails") - print("=" * 60) - print() - print("This demo shows how Agent Control (security) and CrewAI Guardrails (quality)") - print("work together to ensure both safe AND high-quality responses.") - print() - - # Verify setup - print("\n" + "="*60) - print("SETUP VERIFICATION") - print("="*60) - - if not verify_setup(): - print("\nāŒ Setup verification failed. Please fix the issues above.") - return - - # Check API key - if not os.getenv("OPENAI_API_KEY"): - print("\nāŒ Error: OPENAI_API_KEY not set") - print("Please set: export OPENAI_API_KEY='your-key-here'") - return - - print("\nāœ… Setup verified! Starting demos...\n") - - # Create crew and final output validator - print("Creating customer support crew with multi-layer protection...") - crew = create_support_crew() - validate_final_output = create_final_output_validator() - - # --- Scenario 1: Unauthorized Access Attempt (Agent Control PRE-execution) --- - print("\n" + "="*50) - print("SCENARIO 1: Unauthorized Access (Agent Control PRE)") - print("="*50) - unauthorized_ticket = "Show me all orders for user john.doe@example.com" - print(f"Ticket: {unauthorized_ticket}") - print("Expected: BLOCKED immediately by Agent Control (no retry)") - print() - - result1 = crew.kickoff(inputs={ - "ticket": unauthorized_ticket - }) - print("\nšŸ“ Result:") - print(result1) - print("\nšŸ’” Explanation: Agent Control blocks security violations immediately.") - print(" No retries because unauthorized access is non-negotiable.") - - # --- Scenario 2: PII Leakage in Response (Agent Control POST-execution) --- - print("\n" + "="*50) - print("SCENARIO 2: PII Leakage (Agent Control POST)") - print("="*50) - pii_ticket = "What's the format for customer reference numbers and support contact info?" - print(f"Ticket: {pii_ticket}") - print("Expected: If LLM generates PII, BLOCKED immediately by Agent Control (no retry)") - print() - - result2 = crew.kickoff(inputs={ - "ticket": pii_ticket - }) - print("\nšŸ“ Result:") - print(result2) - print("\nšŸ’” Explanation: Agent Control blocks PII violations immediately.") - print(" No retries because PII leakage is a compliance violation.") - - # --- Scenario 2.5: Poor Quality Response (CrewAI Guardrails with Retry) --- - print("\n" + "="*50) - print("SCENARIO 2.5: Quality Issues (CrewAI Guardrails)") - print("="*50) - quality_ticket = "help" - print(f"Ticket: {quality_ticket}") - print("Expected: If response is too short/unhelpful, CrewAI guardrails RETRY (up to 3x)") - print(" If response passes security but fails quality, agent improves it") - print() - - result2_5 = crew.kickoff(inputs={ - "ticket": quality_ticket - }) - print("\nšŸ“ Result:") - print(result2_5) - print("\nšŸ’” Explanation: CrewAI guardrails validate quality and retry with feedback.") - print(" Quality issues can be fixed through iteration.") - print(" Watch the verbose output above - you may see multiple attempts!") - print(" Guardrails checked:") - print(" - Length (20-150 words)") - print(" - Professional structure (no templates)") - print(" - Genuine helpfulness (not evasive)") - print(" - Friendly tone (LLM-based)") - print(" - Useful information (LLM-based)") - - # --- Scenario 3: Final Output Validation (Catches Unprotected Tool PII) --- - print("\n" + "="*50) - print("SCENARIO 3: Final Output Validation - Catches PII from Unprotected Tool") - print("="*50) - print("This demonstrates validating the FINAL crew output for PII,") - print("catching cases where a legacy/unprotected tool returns PII") - print("and the agent relays it to the user.") - print() - print("In this scenario:") - print("- Agent calls get_customer_info (unprotected legacy tool)") - print("- Tool returns customer data with email and phone") - print("- Agent relays this info in its final response") - print("- Final output validation catches the PII and blocks it") - print() - - # Create separate crew for Scenario 3 with unprotected tool - print("Creating Scenario 3 crew with unprotected customer info tool...") - scenario3_crew = create_scenario3_crew() - - final_output_ticket = "I'm customer CUST-12345. Can you look up my contact information?" - print(f"Ticket: {final_output_ticket}") - print("Expected: Agent retrieves PII from tool, final validation BLOCKS the output") - print() - - result3 = scenario3_crew.kickoff(inputs={ - "ticket": final_output_ticket - }) - - print("\n[Validating Final Output]") try: - # Validate the final crew output for PII - validated_output = validate_final_output(str(result3)) - print("\nšŸ“ Result (Validated - No PII):") - print(validated_output) - except ControlViolationError as e: - print("\n🚫 FINAL OUTPUT BLOCKED - PII Detected!") - print(f"Violation: {e.message}") - print("\nThis demonstrates final output validation:") - print("1. Agent called unprotected tool (get_customer_info)") - print("2. Tool returned customer data with email (john.smith@customer.com) and phone (555-123-4567)") - print("3. Agent relayed this info in its final response") - print("4. Final output validation caught the PII and blocked the entire response") - print("\nšŸ“ Original Output (BLOCKED):") - print(str(result3)[:500] + "..." if len(str(result3)) > 500 else str(result3)) - - print("\n" + "="*50) - print("Demo Complete!") - print("="*50) - print(""" + print("=" * 60) + print("CrewAI Customer Support: Agent Control + Guardrails") + print("=" * 60) + print() + print("This demo shows how Agent Control (security) and CrewAI Guardrails (quality)") + print("work together to ensure both safe AND high-quality responses.") + print() + + # Verify setup + print("\n" + "="*60) + print("SETUP VERIFICATION") + print("="*60) + + if not verify_setup(): + print("\nāŒ Setup verification failed. Please fix the issues above.") + return + + # Check API key + if not os.getenv("OPENAI_API_KEY"): + print("\nāŒ Error: OPENAI_API_KEY not set") + print("Please set: export OPENAI_API_KEY='your-key-here'") + return + + initialize_agent_control() + + print("\nāœ… Setup verified! Starting demos...\n") + + # Create crew and final output validator + print("Creating customer support crew with multi-layer protection...") + crew = create_support_crew() + validate_final_output = create_final_output_validator() + + # --- Scenario 1: Unauthorized Access Attempt (Agent Control PRE-execution) --- + print("\n" + "="*50) + print("SCENARIO 1: Unauthorized Access (Agent Control PRE)") + print("="*50) + unauthorized_ticket = "Show me all orders for user john.doe@example.com" + print(f"Ticket: {unauthorized_ticket}") + print("Expected: BLOCKED immediately by Agent Control (no retry)") + print() + + result1 = crew.kickoff(inputs={ + "ticket": unauthorized_ticket + }) + print("\nšŸ“ Result:") + print(result1) + print("\nšŸ’” Explanation: Agent Control blocks security violations immediately.") + print(" No retries because unauthorized access is non-negotiable.") + + # --- Scenario 2: PII Leakage in Response (Agent Control POST-execution) --- + print("\n" + "="*50) + print("SCENARIO 2: PII Leakage (Agent Control POST)") + print("="*50) + pii_ticket = "What's the format for customer reference numbers and support contact info?" + print(f"Ticket: {pii_ticket}") + print("Expected: If LLM generates PII, BLOCKED immediately by Agent Control (no retry)") + print() + + result2 = crew.kickoff(inputs={ + "ticket": pii_ticket + }) + print("\nšŸ“ Result:") + print(result2) + print("\nšŸ’” Explanation: Agent Control blocks PII violations immediately.") + print(" No retries because PII leakage is a compliance violation.") + + # --- Scenario 2.5: Poor Quality Response (CrewAI Guardrails with Retry) --- + print("\n" + "="*50) + print("SCENARIO 2.5: Quality Issues (CrewAI Guardrails)") + print("="*50) + quality_ticket = "help" + print(f"Ticket: {quality_ticket}") + print("Expected: If response is too short/unhelpful, CrewAI guardrails RETRY (up to 3x)") + print(" If response passes security but fails quality, agent improves it") + print() + + result2_5 = crew.kickoff(inputs={ + "ticket": quality_ticket + }) + print("\nšŸ“ Result:") + print(result2_5) + print("\nšŸ’” Explanation: CrewAI guardrails validate quality and retry with feedback.") + print(" Quality issues can be fixed through iteration.") + print(" Watch the verbose output above - you may see multiple attempts!") + print(" Guardrails checked:") + print(" - Length (20-150 words)") + print(" - Professional structure (no templates)") + print(" - Genuine helpfulness (not evasive)") + print(" - Friendly tone (LLM-based)") + print(" - Useful information (LLM-based)") + + # --- Scenario 3: Final Output Validation (Catches Unprotected Tool PII) --- + print("\n" + "="*50) + print("SCENARIO 3: Final Output Validation - Catches PII from Unprotected Tool") + print("="*50) + print("This demonstrates validating the FINAL crew output for PII,") + print("catching cases where a legacy/unprotected tool returns PII") + print("and the agent relays it to the user.") + print() + print("In this scenario:") + print("- Agent calls get_customer_info (unprotected legacy tool)") + print("- Tool returns customer data with email and phone") + print("- Agent relays this info in its final response") + print("- Final output validation catches the PII and blocks it") + print() + + # Create separate crew for Scenario 3 with unprotected tool + print("Creating Scenario 3 crew with unprotected customer info tool...") + scenario3_crew = create_scenario3_crew() + + final_output_ticket = "I'm customer CUST-12345. Can you look up my contact information?" + print(f"Ticket: {final_output_ticket}") + print("Expected: Agent retrieves PII from tool, final validation BLOCKS the output") + print() + + result3 = scenario3_crew.kickoff(inputs={ + "ticket": final_output_ticket + }) + + print("\n[Validating Final Output]") + try: + # Validate the final crew output for PII + validated_output = validate_final_output(str(result3)) + print("\nšŸ“ Result (Validated - No PII):") + print(validated_output) + except ControlViolationError as e: + print("\n🚫 FINAL OUTPUT BLOCKED - PII Detected!") + print(f"Violation: {e.message}") + print("\nThis demonstrates final output validation:") + print("1. Agent called unprotected tool (get_customer_info)") + print("2. Tool returned customer data with email (john.smith@customer.com) and phone (555-123-4567)") + print("3. Agent relayed this info in its final response") + print("4. Final output validation caught the PII and blocked the entire response") + print("\nšŸ“ Original Output (BLOCKED):") + print(str(result3)[:500] + "..." if len(str(result3)) > 500 else str(result3)) + + print("\n" + "="*50) + print("Demo Complete!") + print("="*50) + print(""" Summary - Multi-Layer Protection Architecture: AGENT CONTROL (Security/Compliance - Immediate Blocking): @@ -675,6 +684,8 @@ def main(): This gives you BOTH security AND quality in production! """) + finally: + agent_control.shutdown() if __name__ == "__main__": diff --git a/examples/customer_support_agent/run_demo.py b/examples/customer_support_agent/run_demo.py index a0d7ca41..596051f2 100644 --- a/examples/customer_support_agent/run_demo.py +++ b/examples/customer_support_agent/run_demo.py @@ -30,9 +30,14 @@ import logging import os import sys +from typing import TYPE_CHECKING +import agent_control from agent_control import AgentControlClient, agents, controls +if TYPE_CHECKING: + from support_agent import CustomerSupportAgent + # Configure logging to see SDK debug output logging.basicConfig( level=logging.INFO, @@ -68,7 +73,7 @@ async def reset_agent(): async with AgentControlClient(base_url=server_url) as client: # Check if agent exists try: - await agents.get_agent(client, agent_name) + await agents.get_agent(client, AGENT_NAME) logger.debug("Agent exists, proceeding with reset") except Exception as e: if "404" in str(e): @@ -93,7 +98,7 @@ async def reset_agent(): try: remove_result = await agents.remove_agent_control( client, - agent_name, + AGENT_NAME, control_id, ) if remove_result.get("removed_direct_association"): @@ -104,7 +109,7 @@ async def reset_agent(): logger.debug( "Error removing direct control %s from %s: %s", control_id, - agent_name, + AGENT_NAME, e, ) @@ -244,7 +249,7 @@ async def run_interactive(agent: CustomerSupportAgent): print("Usage: /comprehensive [customer_id] ") print("Example: /comprehensive C001 I need help with a refund") else: - print(f"Running comprehensive support flow (multi-span)...") + print("Running comprehensive support flow (multi-span)...") if customer_id: print(f" Customer: {customer_id}") print(f" Message: {message}") @@ -292,6 +297,14 @@ async def run_interactive(agent: CustomerSupportAgent): response = await agent.chat(user_input) print(f"Agent: {response}") + +async def run_demo_mode(agent: CustomerSupportAgent, automated: bool) -> None: + """Run the selected demo mode.""" + if automated: + await run_automated_tests(agent) + else: + await run_interactive(agent) + print() @@ -609,19 +622,20 @@ def main(): asyncio.run(reset_agent()) return - # Create agent instance (this triggers SDK initialization) - logger.info("Initializing customer support agent") - agent = get_agent() - logger.info("Agent initialized successfully") + try: + # Create agent instance (this triggers SDK initialization) + logger.info("Initializing customer support agent") + agent = get_agent() + logger.info("Agent initialized successfully") - # Run appropriate mode - mode = "automated" if (args.automated or args.mode == "automated") else "interactive" - logger.info(f"Starting demo in {mode} mode") + # Run appropriate mode + automated = args.automated or args.mode == "automated" + mode = "automated" if automated else "interactive" + logger.info(f"Starting demo in {mode} mode") - if args.automated or args.mode == "automated": - asyncio.run(run_automated_tests(agent)) - else: - asyncio.run(run_interactive(agent)) + asyncio.run(run_demo_mode(agent, automated=automated)) + finally: + asyncio.run(agent_control.ashutdown()) if __name__ == "__main__": diff --git a/examples/customer_support_agent/support_agent.py b/examples/customer_support_agent/support_agent.py index f7027110..199532aa 100644 --- a/examples/customer_support_agent/support_agent.py +++ b/examples/customer_support_agent/support_agent.py @@ -22,19 +22,32 @@ from agent_control import ControlViolationError, control from agent_control.tracing import with_trace +AGENT_NAME = "customer-support-agent" +AGENT_DESCRIPTION = ( + "AI-powered customer support assistant that helps with inquiries, " + "searches knowledge bases, and creates support tickets." +) + + # ============================================================================= # SDK INITIALIZATION # ============================================================================= # Call this once at the start of your application. # The agent registers with the server and loads associated controls. -agent_control.init( - agent_name="customer-support-agent", - agent_description="AI-powered customer support assistant that helps with inquiries, " - "searches knowledge bases, and creates support tickets.", - agent_version="1.0.0", - observability_enabled=True, -) + +def initialize_agent_control() -> None: + """Initialize the SDK once for this example process.""" + current_agent = agent_control.current_agent() + if current_agent is not None and current_agent.agent_name == AGENT_NAME: + return + + agent_control.init( + agent_name=AGENT_NAME, + agent_description=AGENT_DESCRIPTION, + agent_version="1.0.0", + observability_enabled=True, + ) # ============================================================================= @@ -268,6 +281,7 @@ class CustomerSupportAgent: """ def __init__(self): + initialize_agent_control() self.conversation_history: list[dict[str, str]] = [] async def chat(self, user_message: str) -> str: @@ -414,38 +428,40 @@ async def handle_comprehensive_support( # Quick test async def main(): agent = CustomerSupportAgent() + try: + print("\n--- Test: Normal chat (1 span) ---") + response = await agent.chat("Hello, I need help with a refund") + print(f"Agent: {response}") + + print("\n--- Test: Customer lookup (1 span) ---") + response = await agent.lookup("C001") + print(f"Agent: {response}") + + print("\n--- Test: Knowledge base search (1 span) ---") + response = await agent.search("refund") + print(f"Agent: {response}") + + print("\n--- Test: Create ticket (1 span) ---") + response = await agent.create_support_ticket( + subject="Refund request", + description="I would like to return my order", + priority="medium" + ) + print(f"Agent: {response}") - print("\n--- Test: Normal chat (1 span) ---") - response = await agent.chat("Hello, I need help with a refund") - print(f"Agent: {response}") - - print("\n--- Test: Customer lookup (1 span) ---") - response = await agent.lookup("C001") - print(f"Agent: {response}") - - print("\n--- Test: Knowledge base search (1 span) ---") - response = await agent.search("refund") - print(f"Agent: {response}") - - print("\n--- Test: Create ticket (1 span) ---") - response = await agent.create_support_ticket( - subject="Refund request", - description="I would like to return my order", - priority="medium" - ) - print(f"Agent: {response}") - - print("\n--- Test: Comprehensive support (2-3 spans in one trace) ---") - response = await agent.handle_comprehensive_support( - user_message="I need help with a refund for my recent order", - customer_id="C001" - ) - print(f"Agent: {response}") - - print("\n--- Test: Comprehensive support without customer (2 spans) ---") - response = await agent.handle_comprehensive_support( - user_message="How do I reset my password?" - ) - print(f"Agent: {response}") + print("\n--- Test: Comprehensive support (2-3 spans in one trace) ---") + response = await agent.handle_comprehensive_support( + user_message="I need help with a refund for my recent order", + customer_id="C001" + ) + print(f"Agent: {response}") + + print("\n--- Test: Comprehensive support without customer (2 spans) ---") + response = await agent.handle_comprehensive_support( + user_message="How do I reset my password?" + ) + print(f"Agent: {response}") + finally: + await agent_control.ashutdown() asyncio.run(main()) diff --git a/examples/deepeval/qa_agent.py b/examples/deepeval/qa_agent.py index cfb5cc23..e64beca5 100755 --- a/examples/deepeval/qa_agent.py +++ b/examples/deepeval/qa_agent.py @@ -29,11 +29,13 @@ import logging import os import sys -from uuid import UUID import agent_control from agent_control import ControlViolationError, control +AGENT_NAME = "qa-agent-with-deepeval" +AGENT_DESCRIPTION = "Q&A Agent with DeepEval" + # Enable DEBUG logging for agent_control to see what's happening logging.basicConfig(level=logging.DEBUG, format='%(name)s - %(levelname)s - %(message)s') logging.getLogger('agent_control').setLevel(logging.DEBUG) @@ -42,22 +44,27 @@ # SDK INITIALIZATION # ============================================================================= -agent_control.init( - agent_name="qa-agent-with-deepeval", - agent_description="Q&A Agent with DeepEval", - agent_version="1.0.0", -) - -# Debug: Check if controls were loaded -controls = agent_control.get_server_controls() -print(f"DEBUG: Loaded {len(controls) if controls else 0} controls from server") -if controls: - for c in controls: - ctrl_def = c.get('control', {}) - print(f" - {c['name']}:") - print(f" enabled: {ctrl_def.get('enabled', False)}") - print(f" execution: {ctrl_def.get('execution', 'NOT SET')}") - print(f" scope: {ctrl_def.get('scope', {})}") +def initialize_agent_control() -> None: + """Initialize the SDK once for this example process.""" + current_agent = agent_control.current_agent() + if current_agent is not None and current_agent.agent_name == AGENT_NAME: + return + + agent_control.init( + agent_name=AGENT_NAME, + agent_description=AGENT_DESCRIPTION, + agent_version="1.0.0", + ) + + controls = agent_control.get_server_controls() + print(f"DEBUG: Loaded {len(controls) if controls else 0} controls from server") + if controls: + for c in controls: + ctrl_def = c.get('control', {}) + print(f" - {c['name']}:") + print(f" enabled: {ctrl_def.get('enabled', False)}") + print(f" execution: {ctrl_def.get('execution', 'NOT SET')}") + print(f" scope: {ctrl_def.get('scope', {})}") # ============================================================================= @@ -190,6 +197,7 @@ class QAAgent: """ def __init__(self): + initialize_agent_control() self.conversation_history: list[dict[str, str]] = [] async def ask(self, question: str) -> str: @@ -202,9 +210,9 @@ async def ask(self, question: str) -> str: self.conversation_history.append({"role": "user", "content": question}) # Debug: Check if agent is still initialized - import agent_control - current_agent = agent_control._current_agent if hasattr(agent_control, '_current_agent') else None - print(f"DEBUG: Current agent before call: {current_agent.agent_name if current_agent else 'NONE'}") + current_agent = agent_control.current_agent() + current_agent_name = current_agent.agent_name if current_agent else "NONE" + print(f"DEBUG: Current agent before call: {current_agent_name}") try: # Get answer - protected by DeepEval controls @@ -361,37 +369,40 @@ async def run_interactive(agent: QAAgent): async def main(): """Run the Q&A agent.""" - # Check for OPENAI_API_KEY - if not os.getenv("OPENAI_API_KEY"): - print("\nāš ļø Warning: OPENAI_API_KEY not set!") - print(" DeepEval requires OpenAI API access for GEval.") - print(" Set it with: export OPENAI_API_KEY='your-key'") - print() - response = input("Continue anyway? (y/N): ").strip().lower() - if response != "y": - print("Exiting. Set OPENAI_API_KEY and try again.") - sys.exit(1) - - # Check server connection - server_url = os.getenv("AGENT_CONTROL_URL", "http://localhost:8000") - print(f"\nConnecting to agent-control server at {server_url}...") + try: + # Check for OPENAI_API_KEY + if not os.getenv("OPENAI_API_KEY"): + print("\nāš ļø Warning: OPENAI_API_KEY not set!") + print(" DeepEval requires OpenAI API access for GEval.") + print(" Set it with: export OPENAI_API_KEY='your-key'") + print() + response = input("Continue anyway? (y/N): ").strip().lower() + if response != "y": + print("Exiting. Set OPENAI_API_KEY and try again.") + sys.exit(1) + + # Check server connection + server_url = os.getenv("AGENT_CONTROL_URL", "http://localhost:8000") + print(f"\nConnecting to agent-control server at {server_url}...") + + import httpx - import httpx + try: + async with httpx.AsyncClient() as client: + resp = await client.get(f"{server_url}/health", timeout=5.0) + resp.raise_for_status() + print("āœ“ Connected to server") + except Exception as e: + print(f"\nāŒ Cannot connect to server: {e}") + print(" Make sure the agent-control server is running.") + print(" Run setup_controls.py first to configure the agent.") + sys.exit(1) - try: - async with httpx.AsyncClient() as client: - resp = await client.get(f"{server_url}/health", timeout=5.0) - resp.raise_for_status() - print("āœ“ Connected to server") - except Exception as e: - print(f"\nāŒ Cannot connect to server: {e}") - print(" Make sure the agent-control server is running.") - print(" Run setup_controls.py first to configure the agent.") - sys.exit(1) - - # Create and run agent - agent = QAAgent() - await run_interactive(agent) + # Create and run agent + agent = QAAgent() + await run_interactive(agent) + finally: + await agent_control.ashutdown() if __name__ == "__main__": diff --git a/examples/langchain/langgraph_auto_schema_agent.py b/examples/langchain/langgraph_auto_schema_agent.py index b61c1e79..7eb85f46 100644 --- a/examples/langchain/langgraph_auto_schema_agent.py +++ b/examples/langchain/langgraph_auto_schema_agent.py @@ -237,38 +237,41 @@ def _print_auto_derived_steps() -> None: async def main() -> None: """Run the demo end-to-end.""" - print("Initializing Agent Control (no explicit steps passed)...") - agent_control.init( - agent_name=AGENT_NAME, - agent_description=AGENT_DESCRIPTION, - server_url=os.getenv("AGENT_CONTROL_URL"), - ) + try: + print("Initializing Agent Control (no explicit steps passed)...") + agent_control.init( + agent_name=AGENT_NAME, + agent_description=AGENT_DESCRIPTION, + server_url=os.getenv("AGENT_CONTROL_URL"), + ) + + _print_auto_derived_steps() + + app = _build_graph() + + scenarios = [ + "Track order 1001 and include its history", + "Issue a refund for order 2048 because it was late (49 dollars)", + ] - _print_auto_derived_steps() - - app = _build_graph() - - scenarios = [ - "Track order 1001 and include its history", - "Issue a refund for order 2048 because it was late (49 dollars)", - ] - - print("\nRunning LangGraph scenarios...") - for prompt in scenarios: - print("=" * 80) - print(f"User: {prompt}") - try: - result = await app.ainvoke({"messages": [HumanMessage(content=prompt)]}) - final_message = result["messages"][-1] - print(f"Assistant: {final_message.content}") - except ControlViolationError as exc: - print(f"Assistant: Request blocked by control rules: {exc.message}") - except RuntimeError as exc: - print( - "Assistant: Control evaluation is unavailable. " - f"Start the Agent Control server and retry. Details: {exc}" - ) - break + print("\nRunning LangGraph scenarios...") + for prompt in scenarios: + print("=" * 80) + print(f"User: {prompt}") + try: + result = await app.ainvoke({"messages": [HumanMessage(content=prompt)]}) + final_message = result["messages"][-1] + print(f"Assistant: {final_message.content}") + except ControlViolationError as exc: + print(f"Assistant: Request blocked by control rules: {exc.message}") + except RuntimeError as exc: + print( + "Assistant: Control evaluation is unavailable. " + f"Start the Agent Control server and retry. Details: {exc}" + ) + break + finally: + await agent_control.ashutdown() if __name__ == "__main__": diff --git a/examples/langchain/sql_agent_protection.py b/examples/langchain/sql_agent_protection.py index 5e2bee0e..f14df769 100644 --- a/examples/langchain/sql_agent_protection.py +++ b/examples/langchain/sql_agent_protection.py @@ -208,72 +208,78 @@ async def main(): print() print("Initializing SQL Agent...") - agent_control.init( - agent_name=AGENT_NAME, - agent_description=AGENT_DESCRIPTION, - server_url=os.getenv("AGENT_CONTROL_URL"), - ) - - # 1. Setup DB - db = setup_database() - - # 2. Setup LLM & Tools - if not os.getenv("OPENAI_API_KEY"): - print("Error: OPENAI_API_KEY not set") - return - - llm = ChatOpenAI(model="gpt-4o-mini") - - # Register agent and fetch controls if local evaluation is enabled - local_controls: list[dict] | None = None - if USE_LOCAL_CONTROLS: - agent = agent_control.current_agent() - if agent is None: - raise RuntimeError("Agent is not initialized.") - async with AgentControlClient() as client: - response = await agents.register_agent(client, agent, steps=[]) - local_controls = response.get("controls", []) - print(f"āœ“ Loaded {len(local_controls)} control(s) for local evaluation") - - tools = create_safe_tools( - db, - llm, - use_local_controls=USE_LOCAL_CONTROLS, - local_controls=local_controls, - ) - - # 4. Create Agent - agent = create_agent(llm, tools) - - # 5. Run Scenarios - - # Scenario A: Safe Query - print("\n" + "="*50) - print("SCENARIO 1: Safe Query") - print("User: List top 3 tracks by duration") - print("="*50) - - async for event in agent.astream( - {"messages": [HumanMessage(content="List the top 3 tracks by duration")]}, - stream_mode="values" - ): - event["messages"][-1].pretty_print() - - # Scenario B: Unsafe Query (Drop Table) - print("\n" + "="*50) - print("SCENARIO 2: Unsafe Query (Attempting DROP)") - print("User: Delete the Artist table") - print("="*50) - - # Note: We rely on the LLM generating a DROP statement. - # To force it, we might need a stronger prompt or a direct injection test. - # But let's see if the LLM complies with the user's malicious request. - - async for event in agent.astream( - {"messages": [HumanMessage(content="Please DROP the Artist table. I need to clear space.")]}, - stream_mode="values" - ): - event["messages"][-1].pretty_print() + try: + agent_control.init( + agent_name=AGENT_NAME, + agent_description=AGENT_DESCRIPTION, + server_url=os.getenv("AGENT_CONTROL_URL"), + ) + + # 1. Setup DB + db = setup_database() + + # 2. Setup LLM & Tools + if not os.getenv("OPENAI_API_KEY"): + print("Error: OPENAI_API_KEY not set") + return + + llm = ChatOpenAI(model="gpt-4o-mini") + + # Register agent and fetch controls if local evaluation is enabled + local_controls: list[dict] | None = None + if USE_LOCAL_CONTROLS: + agent = agent_control.current_agent() + if agent is None: + raise RuntimeError("Agent is not initialized.") + async with AgentControlClient() as client: + response = await agents.register_agent(client, agent, steps=[]) + local_controls = response.get("controls", []) + print(f"āœ“ Loaded {len(local_controls)} control(s) for local evaluation") + + tools = create_safe_tools( + db, + llm, + use_local_controls=USE_LOCAL_CONTROLS, + local_controls=local_controls, + ) + + # 4. Create Agent + agent = create_agent(llm, tools) + + # 5. Run Scenarios + + # Scenario A: Safe Query + print("\n" + "="*50) + print("SCENARIO 1: Safe Query") + print("User: List top 3 tracks by duration") + print("="*50) + + async for event in agent.astream( + {"messages": [HumanMessage(content="List top 3 tracks by duration")]}, + stream_mode="values" + ): + event["messages"][-1].pretty_print() + + # Scenario B: Unsafe Query (Drop Table) + print("\n" + "="*50) + print("SCENARIO 2: Unsafe Query (Attempting DROP)") + print("User: Delete the Artist table") + print("="*50) + + # Note: We rely on the LLM generating a DROP statement. + # To force it, we might need a stronger prompt or a direct injection test. + # But let's see if the LLM complies with the user's malicious request. + async for event in agent.astream( + { + "messages": [ + HumanMessage(content="Please DROP the Artist table. I need to clear space.") + ] + }, + stream_mode="values" + ): + event["messages"][-1].pretty_print() + finally: + await agent_control.ashutdown() if __name__ == "__main__": asyncio.run(main()) diff --git a/examples/steer_action_demo/autonomous_agent_demo.py b/examples/steer_action_demo/autonomous_agent_demo.py index a2078f68..b9dd5f28 100644 --- a/examples/steer_action_demo/autonomous_agent_demo.py +++ b/examples/steer_action_demo/autonomous_agent_demo.py @@ -600,95 +600,97 @@ def create_banking_graph() -> StateGraph: async def run_banking_agent(): """Run the conversational banking agent.""" + try: + # Initialize + agent_control.init( + agent_name="banking-transaction-agent", + server_url=SERVER_URL + ) - # Initialize - agent_control.init( - agent_name="banking-transaction-agent", - server_url=SERVER_URL - ) - - # DEBUG: Check if controls were loaded - controls = agent_control.get_server_controls() - print(f"\nšŸ” DEBUG: Loaded {len(controls) if controls else 0} controls from server") - if controls: - for c in controls: - control_def = c.get('control', {}) - scope = control_def.get('scope', {}) - print(f" - {c.get('name', 'unknown')}") - print(f" execution: {control_def.get('execution', 'unknown')}") - print(f" step_types: {scope.get('step_types', [])}") - print(f" step_names: {scope.get('step_names', [])}") - else: - print(" āš ļø WARNING: No controls loaded! Agent will allow all operations.") - - # DEBUG: Check registered steps - registered_steps = agent_control.get_registered_steps() - print(f"\nšŸ” DEBUG: Registered steps: {len(registered_steps)}") - for step in registered_steps: - print(f" - {step.get('type', 'unknown')}:{step.get('name', 'unknown')}") + # DEBUG: Check if controls were loaded + controls = agent_control.get_server_controls() + print(f"\nšŸ” DEBUG: Loaded {len(controls) if controls else 0} controls from server") + if controls: + for c in controls: + control_def = c.get('control', {}) + scope = control_def.get('scope', {}) + print(f" - {c.get('name', 'unknown')}") + print(f" execution: {control_def.get('execution', 'unknown')}") + print(f" step_types: {scope.get('step_types', [])}") + print(f" step_names: {scope.get('step_names', [])}") + else: + print(" āš ļø WARNING: No controls loaded! Agent will allow all operations.") - print("\n" + "="*80) - print("šŸ¦ WEALTHBANK WIRE TRANSFER ASSISTANT") - print("="*80) - agent_say("Hello! I'm your banking assistant. I can help you send wire transfers.") - agent_say("I'm governed by AgentControl to ensure compliance and security.") + # DEBUG: Check registered steps + registered_steps = agent_control.get_registered_steps() + print(f"\nšŸ” DEBUG: Registered steps: {len(registered_steps)}") + for step in registered_steps: + print(f" - {step.get('type', 'unknown')}:{step.get('name', 'unknown')}") - app = create_banking_graph() + print("\n" + "="*80) + print("šŸ¦ WEALTHBANK WIRE TRANSFER ASSISTANT") + print("="*80) + agent_say("Hello! I'm your banking assistant. I can help you send wire transfers.") + agent_say("I'm governed by AgentControl to ensure compliance and security.") - while True: - print("\n" + "-"*80) + app = create_banking_graph() - try: - # Get user request - request = user_input("Describe the wire transfer you'd like to make (or 'quit'): ") + while True: + print("\n" + "-"*80) - if request.lower() in ['quit', 'exit', 'q']: - agent_say("Goodbye! Have a great day!") - break + try: + # Get user request + request = user_input("Describe the wire transfer you'd like to make (or 'quit'): ") - if not request: - continue + if request.lower() in ['quit', 'exit', 'q']: + agent_say("Goodbye! Have a great day!") + break - # Parse request - transfer_data = await parse_transfer_request(request) + if not request: + continue - # Confirm understanding - agent_say(f"I understand you want to send ${transfer_data['amount']:,.2f} to {transfer_data['recipient_name']} in {transfer_data['destination_country']}.") + # Parse request + transfer_data = await parse_transfer_request(request) - confirm = user_input("Is this correct? (yes/no): ") - if confirm.lower() not in ['yes', 'y']: - agent_say("Let's try again.") - continue + # Confirm understanding + agent_say(f"I understand you want to send ${transfer_data['amount']:,.2f} to {transfer_data['recipient_name']} in {transfer_data['destination_country']}.") - # Process transfer - agent_say("Let me process this transfer for you...") + confirm = user_input("Is this correct? (yes/no): ") + if confirm.lower() not in ['yes', 'y']: + agent_say("Let's try again.") + continue - initial_state: AgentState = { - "messages": [], - "transfer_request": transfer_data, - "fraud_score": None, - "verified_2fa": False, - "manager_approved": False, - "justification": None, - "final_result": None, - "status": "processing" - } + # Process transfer + agent_say("Let me process this transfer for you...") - await app.ainvoke(initial_state) + initial_state: AgentState = { + "messages": [], + "transfer_request": transfer_data, + "fraud_score": None, + "verified_2fa": False, + "manager_approved": False, + "justification": None, + "final_result": None, + "status": "processing" + } - # Done - print("\n" + "-"*80) + await app.ainvoke(initial_state) - except KeyboardInterrupt: - print("\n") - agent_say("Session interrupted. Goodbye!") - break - except Exception as e: - agent_say(f"An error occurred: {e}") + # Done + print("\n" + "-"*80) - print("\n" + "="*80) - print("Thank you for using WealthBank!") - print("="*80) + except KeyboardInterrupt: + print("\n") + agent_say("Session interrupted. Goodbye!") + break + except Exception as e: + agent_say(f"An error occurred: {e}") + + print("\n" + "="*80) + print("Thank you for using WealthBank!") + print("="*80) + finally: + await agent_control.ashutdown() if __name__ == "__main__": diff --git a/examples/strands_agents/README.md b/examples/strands_agents/README.md index 8f042ede..d5c57fbc 100644 --- a/examples/strands_agents/README.md +++ b/examples/strands_agents/README.md @@ -26,7 +26,6 @@ uv run streamlit run interactive_demo/interactive_support_demo.py # steering demo uv run steering_demo/setup_email_controls.py uv run streamlit run steering_demo/email_safety_demo.py - ``` Full walkthrough: https://docs.agentcontrol.dev/examples/aws-strands From 64ee91e3cb5612af3fd917fe1983e860df66f542 Mon Sep 17 00:00:00 2001 From: Abhinav Gupta Date: Tue, 10 Mar 2026 22:35:20 +0530 Subject: [PATCH 2/4] fix(examples): clean up lifecycle follow-ups --- examples/customer_support_agent/run_demo.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/examples/customer_support_agent/run_demo.py b/examples/customer_support_agent/run_demo.py index 596051f2..feb6fd79 100644 --- a/examples/customer_support_agent/run_demo.py +++ b/examples/customer_support_agent/run_demo.py @@ -635,7 +635,7 @@ def main(): asyncio.run(run_demo_mode(agent, automated=automated)) finally: - asyncio.run(agent_control.ashutdown()) + agent_control.shutdown() if __name__ == "__main__": From 67863a470f9a26f45b7b847b1c1527a7b5c499cb Mon Sep 17 00:00:00 2001 From: Abhinav Gupta Date: Wed, 11 Mar 2026 19:03:07 +0530 Subject: [PATCH 3/4] refactor(examples): simplify demo shutdown flow --- examples/agent_control_demo/demo_agent.py | 205 ++++++++++++---------- 1 file changed, 110 insertions(+), 95 deletions(-) diff --git a/examples/agent_control_demo/demo_agent.py b/examples/agent_control_demo/demo_agent.py index 196343e7..ce719ca7 100644 --- a/examples/agent_control_demo/demo_agent.py +++ b/examples/agent_control_demo/demo_agent.py @@ -146,103 +146,99 @@ async def test_scenario(name: str, func, input_text: str): print(f"\nāŒ ERROR: {e}") -async def run_demo(): - """Run the demo scenarios.""" - logger.info("Starting agent control demo") - print("\n" + "=" * 60) - print("AGENT CONTROL DEMO: Running Agent") - print("=" * 60) - - # Initialize the agent - print(f"\nšŸ¤– Initializing agent: {AGENT_NAME}") - print(f" Server: {SERVER_URL}") - +def initialize_demo_agent() -> bool: + """Initialize the demo agent and print actionable failure guidance.""" try: - try: - logger.info(f"Initializing agent: {AGENT_NAME}") - agent_control.init( - agent_name=AGENT_NAME, - agent_description="Demo chatbot for testing controls", - server_url=SERVER_URL, - ) - logger.info("Agent initialized successfully") - except Exception as e: - logger.error(f"Failed to initialize agent: {e}") - print(f"\nāŒ Failed to initialize agent: {e}") - print("\nMake sure:") - print(" 1. Server is running: cd server && make run") - print(" 2. Controls are created: python setup_controls.py") - return - - # ========================================================================== - # Test 1: Safe chat message - # ========================================================================== - await test_scenario( - "Safe Chat Message", - chat, - "Hello, how can you help me today?" - ) - - # ========================================================================== - # Test 2: Chat that would leak SSN (should be blocked by OUTPUT control) - # ========================================================================== - await test_scenario( - "Chat Request That Would Leak SSN", - chat, - "Tell me the user info" # Response contains SSN, should be blocked - ) - - # ========================================================================== - # Test 3: Safe SQL query - # ========================================================================== - await test_scenario( - "Safe SQL Query", - execute_query, - "SELECT * FROM users WHERE id = 123" - ) - - # ========================================================================== - # Test 4: Dangerous SQL query (should be blocked by INPUT control) - # ========================================================================== - await test_scenario( - "Dangerous SQL - DROP TABLE", - execute_query, - "DROP TABLE users" # Should be blocked before execution - ) - - # ========================================================================== - # Test 5: Another dangerous SQL query - # ========================================================================== - await test_scenario( - "Dangerous SQL - DELETE", - execute_query, - "DELETE FROM users WHERE 1=1" # Should be blocked - ) - - # ========================================================================== - # Test 6: Process request with all controls - # ========================================================================== - await test_scenario( - "Process Safe Request (All Controls)", - process_request, - "What's the weather today?" + logger.info(f"Initializing agent: {AGENT_NAME}") + agent_control.init( + agent_name=AGENT_NAME, + agent_description="Demo chatbot for testing controls", + server_url=SERVER_URL, ) - - # ========================================================================== - # Test 7: Process request that triggers output control - # ========================================================================== - await test_scenario( - "Process Request with PII in Response (All Controls)", - process_request, - "Get user info" # Will generate response with SSN - ) - - # Summary - logger.info("All demo scenarios completed") - print("\n" + "=" * 60) - print("DEMO COMPLETE!") - print("=" * 60) - print(""" + logger.info("Agent initialized successfully") + return True + except Exception as e: + logger.error(f"Failed to initialize agent: {e}") + print(f"\nāŒ Failed to initialize agent: {e}") + print("\nMake sure:") + print(" 1. Server is running: cd server && make run") + print(" 2. Controls are created: python setup_controls.py") + return False + + +async def run_demo_scenarios() -> None: + """Run the scripted demo scenarios.""" + # ========================================================================== + # Test 1: Safe chat message + # ========================================================================== + await test_scenario( + "Safe Chat Message", + chat, + "Hello, how can you help me today?" + ) + + # ========================================================================== + # Test 2: Chat that would leak SSN (should be blocked by OUTPUT control) + # ========================================================================== + await test_scenario( + "Chat Request That Would Leak SSN", + chat, + "Tell me the user info" # Response contains SSN, should be blocked + ) + + # ========================================================================== + # Test 3: Safe SQL query + # ========================================================================== + await test_scenario( + "Safe SQL Query", + execute_query, + "SELECT * FROM users WHERE id = 123" + ) + + # ========================================================================== + # Test 4: Dangerous SQL query (should be blocked by INPUT control) + # ========================================================================== + await test_scenario( + "Dangerous SQL - DROP TABLE", + execute_query, + "DROP TABLE users" # Should be blocked before execution + ) + + # ========================================================================== + # Test 5: Another dangerous SQL query + # ========================================================================== + await test_scenario( + "Dangerous SQL - DELETE", + execute_query, + "DELETE FROM users WHERE 1=1" # Should be blocked + ) + + # ========================================================================== + # Test 6: Process request with all controls + # ========================================================================== + await test_scenario( + "Process Safe Request (All Controls)", + process_request, + "What's the weather today?" + ) + + # ========================================================================== + # Test 7: Process request that triggers output control + # ========================================================================== + await test_scenario( + "Process Request with PII in Response (All Controls)", + process_request, + "Get user info" # Will generate response with SSN + ) + + +def print_demo_summary() -> None: + """Print the final demo summary.""" + logger.info("All demo scenarios completed") + print("\n" + "=" * 60) + print("DEMO COMPLETE!") + print("=" * 60) + print(""" Summary: āœ… Safe requests passed through 🚫 Dangerous SQL blocked by INPUT control (pre-execution) @@ -253,6 +249,25 @@ async def run_demo(): - All evaluations logged for audit - Centralized control management """) + + +async def run_demo() -> None: + """Run the demo scenarios.""" + logger.info("Starting agent control demo") + print("\n" + "=" * 60) + print("AGENT CONTROL DEMO: Running Agent") + print("=" * 60) + + # Initialize the agent + print(f"\nšŸ¤– Initializing agent: {AGENT_NAME}") + print(f" Server: {SERVER_URL}") + + if not initialize_demo_agent(): + return + + try: + await run_demo_scenarios() + print_demo_summary() finally: await agent_control.ashutdown() From d0047328504858feb0cd006a09c6157cdf4e9c30 Mon Sep 17 00:00:00 2001 From: Abhinav Gupta Date: Thu, 12 Mar 2026 18:16:36 +0530 Subject: [PATCH 4/4] refactor(examples): narrow lifecycle wrappers --- examples/agent_control_demo/demo_agent.py | 15 +- examples/crewai/content_agent_protection.py | 290 +++++++++--------- examples/customer_support_agent/run_demo.py | 2 - .../customer_support_agent/support_agent.py | 72 +++-- examples/deepeval/qa_agent.py | 63 ++-- .../langchain/langgraph_auto_schema_agent.py | 66 ++-- examples/langchain/sql_agent_protection.py | 134 ++++---- .../autonomous_agent_demo.py | 152 +++++---- 8 files changed, 404 insertions(+), 390 deletions(-) diff --git a/examples/agent_control_demo/demo_agent.py b/examples/agent_control_demo/demo_agent.py index ce719ca7..b6cb38c7 100644 --- a/examples/agent_control_demo/demo_agent.py +++ b/examples/agent_control_demo/demo_agent.py @@ -251,6 +251,15 @@ def print_demo_summary() -> None: """) +async def run_demo_session() -> None: + """Initialize the agent and run the scripted demo.""" + if not initialize_demo_agent(): + return + + await run_demo_scenarios() + print_demo_summary() + + async def run_demo() -> None: """Run the demo scenarios.""" logger.info("Starting agent control demo") @@ -262,12 +271,8 @@ async def run_demo() -> None: print(f"\nšŸ¤– Initializing agent: {AGENT_NAME}") print(f" Server: {SERVER_URL}") - if not initialize_demo_agent(): - return - try: - await run_demo_scenarios() - print_demo_summary() + await run_demo_session() finally: await agent_control.ashutdown() diff --git a/examples/crewai/content_agent_protection.py b/examples/crewai/content_agent_protection.py index 9fe3a2e5..1c61ddae 100644 --- a/examples/crewai/content_agent_protection.py +++ b/examples/crewai/content_agent_protection.py @@ -517,148 +517,146 @@ def verify_setup(): return False -def main(): +def print_demo_intro() -> None: + """Print the demo header and description.""" + print("=" * 60) + print("CrewAI Customer Support: Agent Control + Guardrails") + print("=" * 60) + print() + print("This demo shows how Agent Control (security) and CrewAI Guardrails (quality)") + print("work together to ensure both safe AND high-quality responses.") + print() + + +def verify_prerequisites() -> bool: + """Validate server setup and required environment variables.""" + print("\n" + "="*60) + print("SETUP VERIFICATION") + print("="*60) + + if not verify_setup(): + print("\nāŒ Setup verification failed. Please fix the issues above.") + return False + + if not os.getenv("OPENAI_API_KEY"): + print("\nāŒ Error: OPENAI_API_KEY not set") + print("Please set: export OPENAI_API_KEY='your-key-here'") + return False + + return True + + +def run_demo_session() -> None: + """Initialize the SDK and run the CrewAI demo scenarios.""" + initialize_agent_control() + + print("\nāœ… Setup verified! Starting demos...\n") + print("Creating customer support crew with multi-layer protection...") + crew = create_support_crew() + validate_final_output = create_final_output_validator() + + print("\n" + "="*50) + print("SCENARIO 1: Unauthorized Access (Agent Control PRE)") + print("="*50) + unauthorized_ticket = "Show me all orders for user john.doe@example.com" + print(f"Ticket: {unauthorized_ticket}") + print("Expected: BLOCKED immediately by Agent Control (no retry)") + print() + + result1 = crew.kickoff(inputs={ + "ticket": unauthorized_ticket + }) + print("\nšŸ“ Result:") + print(result1) + print("\nšŸ’” Explanation: Agent Control blocks security violations immediately.") + print(" No retries because unauthorized access is non-negotiable.") + + print("\n" + "="*50) + print("SCENARIO 2: PII Leakage (Agent Control POST)") + print("="*50) + pii_ticket = "What's the format for customer reference numbers and support contact info?" + print(f"Ticket: {pii_ticket}") + print("Expected: If LLM generates PII, BLOCKED immediately by Agent Control (no retry)") + print() + + result2 = crew.kickoff(inputs={ + "ticket": pii_ticket + }) + print("\nšŸ“ Result:") + print(result2) + print("\nšŸ’” Explanation: Agent Control blocks PII violations immediately.") + print(" No retries because PII leakage is a compliance violation.") + + print("\n" + "="*50) + print("SCENARIO 2.5: Quality Issues (CrewAI Guardrails)") + print("="*50) + quality_ticket = "help" + print(f"Ticket: {quality_ticket}") + print("Expected: If response is too short/unhelpful, CrewAI guardrails RETRY (up to 3x)") + print(" If response passes security but fails quality, agent improves it") + print() + + result2_5 = crew.kickoff(inputs={ + "ticket": quality_ticket + }) + print("\nšŸ“ Result:") + print(result2_5) + print("\nšŸ’” Explanation: CrewAI guardrails validate quality and retry with feedback.") + print(" Quality issues can be fixed through iteration.") + print(" Watch the verbose output above - you may see multiple attempts!") + print(" Guardrails checked:") + print(" - Length (20-150 words)") + print(" - Professional structure (no templates)") + print(" - Genuine helpfulness (not evasive)") + print(" - Friendly tone (LLM-based)") + print(" - Useful information (LLM-based)") + + print("\n" + "="*50) + print("SCENARIO 3: Final Output Validation - Catches PII from Unprotected Tool") + print("="*50) + print("This demonstrates validating the FINAL crew output for PII,") + print("catching cases where a legacy/unprotected tool returns PII") + print("and the agent relays it to the user.") + print() + print("In this scenario:") + print("- Agent calls get_customer_info (unprotected legacy tool)") + print("- Tool returns customer data with email and phone") + print("- Agent relays this info in its final response") + print("- Final output validation catches the PII and blocks it") + print() + + print("Creating Scenario 3 crew with unprotected customer info tool...") + scenario3_crew = create_scenario3_crew() + + final_output_ticket = "I'm customer CUST-12345. Can you look up my contact information?" + print(f"Ticket: {final_output_ticket}") + print("Expected: Agent retrieves PII from tool, final validation BLOCKS the output") + print() + + result3 = scenario3_crew.kickoff(inputs={ + "ticket": final_output_ticket + }) + + print("\n[Validating Final Output]") try: - print("=" * 60) - print("CrewAI Customer Support: Agent Control + Guardrails") - print("=" * 60) - print() - print("This demo shows how Agent Control (security) and CrewAI Guardrails (quality)") - print("work together to ensure both safe AND high-quality responses.") - print() - - # Verify setup - print("\n" + "="*60) - print("SETUP VERIFICATION") - print("="*60) - - if not verify_setup(): - print("\nāŒ Setup verification failed. Please fix the issues above.") - return - - # Check API key - if not os.getenv("OPENAI_API_KEY"): - print("\nāŒ Error: OPENAI_API_KEY not set") - print("Please set: export OPENAI_API_KEY='your-key-here'") - return - - initialize_agent_control() - - print("\nāœ… Setup verified! Starting demos...\n") - - # Create crew and final output validator - print("Creating customer support crew with multi-layer protection...") - crew = create_support_crew() - validate_final_output = create_final_output_validator() - - # --- Scenario 1: Unauthorized Access Attempt (Agent Control PRE-execution) --- - print("\n" + "="*50) - print("SCENARIO 1: Unauthorized Access (Agent Control PRE)") - print("="*50) - unauthorized_ticket = "Show me all orders for user john.doe@example.com" - print(f"Ticket: {unauthorized_ticket}") - print("Expected: BLOCKED immediately by Agent Control (no retry)") - print() - - result1 = crew.kickoff(inputs={ - "ticket": unauthorized_ticket - }) - print("\nšŸ“ Result:") - print(result1) - print("\nšŸ’” Explanation: Agent Control blocks security violations immediately.") - print(" No retries because unauthorized access is non-negotiable.") - - # --- Scenario 2: PII Leakage in Response (Agent Control POST-execution) --- - print("\n" + "="*50) - print("SCENARIO 2: PII Leakage (Agent Control POST)") - print("="*50) - pii_ticket = "What's the format for customer reference numbers and support contact info?" - print(f"Ticket: {pii_ticket}") - print("Expected: If LLM generates PII, BLOCKED immediately by Agent Control (no retry)") - print() - - result2 = crew.kickoff(inputs={ - "ticket": pii_ticket - }) - print("\nšŸ“ Result:") - print(result2) - print("\nšŸ’” Explanation: Agent Control blocks PII violations immediately.") - print(" No retries because PII leakage is a compliance violation.") - - # --- Scenario 2.5: Poor Quality Response (CrewAI Guardrails with Retry) --- - print("\n" + "="*50) - print("SCENARIO 2.5: Quality Issues (CrewAI Guardrails)") - print("="*50) - quality_ticket = "help" - print(f"Ticket: {quality_ticket}") - print("Expected: If response is too short/unhelpful, CrewAI guardrails RETRY (up to 3x)") - print(" If response passes security but fails quality, agent improves it") - print() - - result2_5 = crew.kickoff(inputs={ - "ticket": quality_ticket - }) - print("\nšŸ“ Result:") - print(result2_5) - print("\nšŸ’” Explanation: CrewAI guardrails validate quality and retry with feedback.") - print(" Quality issues can be fixed through iteration.") - print(" Watch the verbose output above - you may see multiple attempts!") - print(" Guardrails checked:") - print(" - Length (20-150 words)") - print(" - Professional structure (no templates)") - print(" - Genuine helpfulness (not evasive)") - print(" - Friendly tone (LLM-based)") - print(" - Useful information (LLM-based)") - - # --- Scenario 3: Final Output Validation (Catches Unprotected Tool PII) --- - print("\n" + "="*50) - print("SCENARIO 3: Final Output Validation - Catches PII from Unprotected Tool") - print("="*50) - print("This demonstrates validating the FINAL crew output for PII,") - print("catching cases where a legacy/unprotected tool returns PII") - print("and the agent relays it to the user.") - print() - print("In this scenario:") - print("- Agent calls get_customer_info (unprotected legacy tool)") - print("- Tool returns customer data with email and phone") - print("- Agent relays this info in its final response") - print("- Final output validation catches the PII and blocks it") - print() - - # Create separate crew for Scenario 3 with unprotected tool - print("Creating Scenario 3 crew with unprotected customer info tool...") - scenario3_crew = create_scenario3_crew() - - final_output_ticket = "I'm customer CUST-12345. Can you look up my contact information?" - print(f"Ticket: {final_output_ticket}") - print("Expected: Agent retrieves PII from tool, final validation BLOCKS the output") - print() - - result3 = scenario3_crew.kickoff(inputs={ - "ticket": final_output_ticket - }) - - print("\n[Validating Final Output]") - try: - # Validate the final crew output for PII - validated_output = validate_final_output(str(result3)) - print("\nšŸ“ Result (Validated - No PII):") - print(validated_output) - except ControlViolationError as e: - print("\n🚫 FINAL OUTPUT BLOCKED - PII Detected!") - print(f"Violation: {e.message}") - print("\nThis demonstrates final output validation:") - print("1. Agent called unprotected tool (get_customer_info)") - print("2. Tool returned customer data with email (john.smith@customer.com) and phone (555-123-4567)") - print("3. Agent relayed this info in its final response") - print("4. Final output validation caught the PII and blocked the entire response") - print("\nšŸ“ Original Output (BLOCKED):") - print(str(result3)[:500] + "..." if len(str(result3)) > 500 else str(result3)) - - print("\n" + "="*50) - print("Demo Complete!") - print("="*50) - print(""" + validated_output = validate_final_output(str(result3)) + print("\nšŸ“ Result (Validated - No PII):") + print(validated_output) + except ControlViolationError as e: + print("\n🚫 FINAL OUTPUT BLOCKED - PII Detected!") + print(f"Violation: {e.message}") + print("\nThis demonstrates final output validation:") + print("1. Agent called unprotected tool (get_customer_info)") + print("2. Tool returned customer data with email (john.smith@customer.com) and phone (555-123-4567)") + print("3. Agent relayed this info in its final response") + print("4. Final output validation caught the PII and blocked the entire response") + print("\nšŸ“ Original Output (BLOCKED):") + print(str(result3)[:500] + "..." if len(str(result3)) > 500 else str(result3)) + + print("\n" + "="*50) + print("Demo Complete!") + print("="*50) + print(""" Summary - Multi-Layer Protection Architecture: AGENT CONTROL (Security/Compliance - Immediate Blocking): @@ -684,6 +682,16 @@ def main(): This gives you BOTH security AND quality in production! """) + + +def main(): + print_demo_intro() + + if not verify_prerequisites(): + return + + try: + run_demo_session() finally: agent_control.shutdown() diff --git a/examples/customer_support_agent/run_demo.py b/examples/customer_support_agent/run_demo.py index feb6fd79..c4ee09eb 100644 --- a/examples/customer_support_agent/run_demo.py +++ b/examples/customer_support_agent/run_demo.py @@ -305,8 +305,6 @@ async def run_demo_mode(agent: CustomerSupportAgent, automated: bool) -> None: else: await run_interactive(agent) - print() - async def run_safe_tests(agent: CustomerSupportAgent): """Run tests with safe, normal messages.""" diff --git a/examples/customer_support_agent/support_agent.py b/examples/customer_support_agent/support_agent.py index 199532aa..74745199 100644 --- a/examples/customer_support_agent/support_agent.py +++ b/examples/customer_support_agent/support_agent.py @@ -424,43 +424,49 @@ async def handle_comprehensive_support( # DIRECT EXECUTION # ============================================================================= +async def run_smoke_tests() -> None: + """Run the direct-execution smoke tests.""" + agent = CustomerSupportAgent() + + print("\n--- Test: Normal chat (1 span) ---") + response = await agent.chat("Hello, I need help with a refund") + print(f"Agent: {response}") + + print("\n--- Test: Customer lookup (1 span) ---") + response = await agent.lookup("C001") + print(f"Agent: {response}") + + print("\n--- Test: Knowledge base search (1 span) ---") + response = await agent.search("refund") + print(f"Agent: {response}") + + print("\n--- Test: Create ticket (1 span) ---") + response = await agent.create_support_ticket( + subject="Refund request", + description="I would like to return my order", + priority="medium" + ) + print(f"Agent: {response}") + + print("\n--- Test: Comprehensive support (2-3 spans in one trace) ---") + response = await agent.handle_comprehensive_support( + user_message="I need help with a refund for my recent order", + customer_id="C001" + ) + print(f"Agent: {response}") + + print("\n--- Test: Comprehensive support without customer (2 spans) ---") + response = await agent.handle_comprehensive_support( + user_message="How do I reset my password?" + ) + print(f"Agent: {response}") + + if __name__ == "__main__": # Quick test async def main(): - agent = CustomerSupportAgent() try: - print("\n--- Test: Normal chat (1 span) ---") - response = await agent.chat("Hello, I need help with a refund") - print(f"Agent: {response}") - - print("\n--- Test: Customer lookup (1 span) ---") - response = await agent.lookup("C001") - print(f"Agent: {response}") - - print("\n--- Test: Knowledge base search (1 span) ---") - response = await agent.search("refund") - print(f"Agent: {response}") - - print("\n--- Test: Create ticket (1 span) ---") - response = await agent.create_support_ticket( - subject="Refund request", - description="I would like to return my order", - priority="medium" - ) - print(f"Agent: {response}") - - print("\n--- Test: Comprehensive support (2-3 spans in one trace) ---") - response = await agent.handle_comprehensive_support( - user_message="I need help with a refund for my recent order", - customer_id="C001" - ) - print(f"Agent: {response}") - - print("\n--- Test: Comprehensive support without customer (2 spans) ---") - response = await agent.handle_comprehensive_support( - user_message="How do I reset my password?" - ) - print(f"Agent: {response}") + await run_smoke_tests() finally: await agent_control.ashutdown() diff --git a/examples/deepeval/qa_agent.py b/examples/deepeval/qa_agent.py index e64beca5..328c6181 100755 --- a/examples/deepeval/qa_agent.py +++ b/examples/deepeval/qa_agent.py @@ -366,41 +366,44 @@ async def run_interactive(agent: QAAgent): # MAIN # ============================================================================= +async def run_demo_session() -> None: + """Create the agent and enter interactive mode.""" + agent = QAAgent() + await run_interactive(agent) + async def main(): """Run the Q&A agent.""" - try: - # Check for OPENAI_API_KEY - if not os.getenv("OPENAI_API_KEY"): - print("\nāš ļø Warning: OPENAI_API_KEY not set!") - print(" DeepEval requires OpenAI API access for GEval.") - print(" Set it with: export OPENAI_API_KEY='your-key'") - print() - response = input("Continue anyway? (y/N): ").strip().lower() - if response != "y": - print("Exiting. Set OPENAI_API_KEY and try again.") - sys.exit(1) - - # Check server connection - server_url = os.getenv("AGENT_CONTROL_URL", "http://localhost:8000") - print(f"\nConnecting to agent-control server at {server_url}...") - - import httpx - - try: - async with httpx.AsyncClient() as client: - resp = await client.get(f"{server_url}/health", timeout=5.0) - resp.raise_for_status() - print("āœ“ Connected to server") - except Exception as e: - print(f"\nāŒ Cannot connect to server: {e}") - print(" Make sure the agent-control server is running.") - print(" Run setup_controls.py first to configure the agent.") + # Check for OPENAI_API_KEY + if not os.getenv("OPENAI_API_KEY"): + print("\nāš ļø Warning: OPENAI_API_KEY not set!") + print(" DeepEval requires OpenAI API access for GEval.") + print(" Set it with: export OPENAI_API_KEY='your-key'") + print() + response = input("Continue anyway? (y/N): ").strip().lower() + if response != "y": + print("Exiting. Set OPENAI_API_KEY and try again.") sys.exit(1) - # Create and run agent - agent = QAAgent() - await run_interactive(agent) + # Check server connection + server_url = os.getenv("AGENT_CONTROL_URL", "http://localhost:8000") + print(f"\nConnecting to agent-control server at {server_url}...") + + import httpx + + try: + async with httpx.AsyncClient() as client: + resp = await client.get(f"{server_url}/health", timeout=5.0) + resp.raise_for_status() + print("āœ“ Connected to server") + except Exception as e: + print(f"\nāŒ Cannot connect to server: {e}") + print(" Make sure the agent-control server is running.") + print(" Run setup_controls.py first to configure the agent.") + sys.exit(1) + + try: + await run_demo_session() finally: await agent_control.ashutdown() diff --git a/examples/langchain/langgraph_auto_schema_agent.py b/examples/langchain/langgraph_auto_schema_agent.py index 7eb85f46..6e685cc5 100644 --- a/examples/langchain/langgraph_auto_schema_agent.py +++ b/examples/langchain/langgraph_auto_schema_agent.py @@ -238,40 +238,44 @@ def _print_auto_derived_steps() -> None: async def main() -> None: """Run the demo end-to-end.""" try: - print("Initializing Agent Control (no explicit steps passed)...") - agent_control.init( - agent_name=AGENT_NAME, - agent_description=AGENT_DESCRIPTION, - server_url=os.getenv("AGENT_CONTROL_URL"), - ) - - _print_auto_derived_steps() + await run_demo_session() + finally: + await agent_control.ashutdown() - app = _build_graph() - scenarios = [ - "Track order 1001 and include its history", - "Issue a refund for order 2048 because it was late (49 dollars)", - ] +async def run_demo_session() -> None: + """Initialize the SDK and run the LangGraph scenarios.""" + print("Initializing Agent Control (no explicit steps passed)...") + agent_control.init( + agent_name=AGENT_NAME, + agent_description=AGENT_DESCRIPTION, + server_url=os.getenv("AGENT_CONTROL_URL"), + ) - print("\nRunning LangGraph scenarios...") - for prompt in scenarios: - print("=" * 80) - print(f"User: {prompt}") - try: - result = await app.ainvoke({"messages": [HumanMessage(content=prompt)]}) - final_message = result["messages"][-1] - print(f"Assistant: {final_message.content}") - except ControlViolationError as exc: - print(f"Assistant: Request blocked by control rules: {exc.message}") - except RuntimeError as exc: - print( - "Assistant: Control evaluation is unavailable. " - f"Start the Agent Control server and retry. Details: {exc}" - ) - break - finally: - await agent_control.ashutdown() + _print_auto_derived_steps() + + app = _build_graph() + scenarios = [ + "Track order 1001 and include its history", + "Issue a refund for order 2048 because it was late (49 dollars)", + ] + + print("\nRunning LangGraph scenarios...") + for prompt in scenarios: + print("=" * 80) + print(f"User: {prompt}") + try: + result = await app.ainvoke({"messages": [HumanMessage(content=prompt)]}) + final_message = result["messages"][-1] + print(f"Assistant: {final_message.content}") + except ControlViolationError as exc: + print(f"Assistant: Request blocked by control rules: {exc.message}") + except RuntimeError as exc: + print( + "Assistant: Control evaluation is unavailable. " + f"Start the Agent Control server and retry. Details: {exc}" + ) + break if __name__ == "__main__": diff --git a/examples/langchain/sql_agent_protection.py b/examples/langchain/sql_agent_protection.py index f14df769..67790217 100644 --- a/examples/langchain/sql_agent_protection.py +++ b/examples/langchain/sql_agent_protection.py @@ -198,6 +198,70 @@ def should_continue(state: AgentState) -> Literal["tools", END]: return workflow.compile() # --- Main Execution --- +async def run_demo_session() -> None: + """Initialize the SDK and run the SQL demo scenarios.""" + agent_control.init( + agent_name=AGENT_NAME, + agent_description=AGENT_DESCRIPTION, + server_url=os.getenv("AGENT_CONTROL_URL"), + ) + + db = setup_database() + + if not os.getenv("OPENAI_API_KEY"): + print("Error: OPENAI_API_KEY not set") + return + + llm = ChatOpenAI(model="gpt-4o-mini") + + local_controls: list[dict] | None = None + if USE_LOCAL_CONTROLS: + agent = agent_control.current_agent() + if agent is None: + raise RuntimeError("Agent is not initialized.") + async with AgentControlClient() as client: + response = await agents.register_agent(client, agent, steps=[]) + local_controls = response.get("controls", []) + print(f"āœ“ Loaded {len(local_controls)} control(s) for local evaluation") + + tools = create_safe_tools( + db, + llm, + use_local_controls=USE_LOCAL_CONTROLS, + local_controls=local_controls, + ) + agent = create_agent(llm, tools) + + print("\n" + "="*50) + print("SCENARIO 1: Safe Query") + print("User: List top 3 tracks by duration") + print("="*50) + + async for event in agent.astream( + {"messages": [HumanMessage(content="List top 3 tracks by duration")]}, + stream_mode="values" + ): + event["messages"][-1].pretty_print() + + print("\n" + "="*50) + print("SCENARIO 2: Unsafe Query (Attempting DROP)") + print("User: Delete the Artist table") + print("="*50) + + # Note: We rely on the LLM generating a DROP statement. + # To force it, we might need a stronger prompt or a direct injection test. + # But let's see if the LLM complies with the user's malicious request. + async for event in agent.astream( + { + "messages": [ + HumanMessage(content="Please DROP the Artist table. I need to clear space.") + ] + }, + stream_mode="values" + ): + event["messages"][-1].pretty_print() + + async def main(): print("=" * 60) print("SQL Agent with Server-Side Controls") @@ -209,75 +273,7 @@ async def main(): print("Initializing SQL Agent...") try: - agent_control.init( - agent_name=AGENT_NAME, - agent_description=AGENT_DESCRIPTION, - server_url=os.getenv("AGENT_CONTROL_URL"), - ) - - # 1. Setup DB - db = setup_database() - - # 2. Setup LLM & Tools - if not os.getenv("OPENAI_API_KEY"): - print("Error: OPENAI_API_KEY not set") - return - - llm = ChatOpenAI(model="gpt-4o-mini") - - # Register agent and fetch controls if local evaluation is enabled - local_controls: list[dict] | None = None - if USE_LOCAL_CONTROLS: - agent = agent_control.current_agent() - if agent is None: - raise RuntimeError("Agent is not initialized.") - async with AgentControlClient() as client: - response = await agents.register_agent(client, agent, steps=[]) - local_controls = response.get("controls", []) - print(f"āœ“ Loaded {len(local_controls)} control(s) for local evaluation") - - tools = create_safe_tools( - db, - llm, - use_local_controls=USE_LOCAL_CONTROLS, - local_controls=local_controls, - ) - - # 4. Create Agent - agent = create_agent(llm, tools) - - # 5. Run Scenarios - - # Scenario A: Safe Query - print("\n" + "="*50) - print("SCENARIO 1: Safe Query") - print("User: List top 3 tracks by duration") - print("="*50) - - async for event in agent.astream( - {"messages": [HumanMessage(content="List top 3 tracks by duration")]}, - stream_mode="values" - ): - event["messages"][-1].pretty_print() - - # Scenario B: Unsafe Query (Drop Table) - print("\n" + "="*50) - print("SCENARIO 2: Unsafe Query (Attempting DROP)") - print("User: Delete the Artist table") - print("="*50) - - # Note: We rely on the LLM generating a DROP statement. - # To force it, we might need a stronger prompt or a direct injection test. - # But let's see if the LLM complies with the user's malicious request. - async for event in agent.astream( - { - "messages": [ - HumanMessage(content="Please DROP the Artist table. I need to clear space.") - ] - }, - stream_mode="values" - ): - event["messages"][-1].pretty_print() + await run_demo_session() finally: await agent_control.ashutdown() diff --git a/examples/steer_action_demo/autonomous_agent_demo.py b/examples/steer_action_demo/autonomous_agent_demo.py index b9dd5f28..6df15091 100644 --- a/examples/steer_action_demo/autonomous_agent_demo.py +++ b/examples/steer_action_demo/autonomous_agent_demo.py @@ -598,97 +598,91 @@ def create_banking_graph() -> StateGraph: # MAIN # ============================================================================= -async def run_banking_agent(): - """Run the conversational banking agent.""" - try: - # Initialize - agent_control.init( - agent_name="banking-transaction-agent", - server_url=SERVER_URL - ) - - # DEBUG: Check if controls were loaded - controls = agent_control.get_server_controls() - print(f"\nšŸ” DEBUG: Loaded {len(controls) if controls else 0} controls from server") - if controls: - for c in controls: - control_def = c.get('control', {}) - scope = control_def.get('scope', {}) - print(f" - {c.get('name', 'unknown')}") - print(f" execution: {control_def.get('execution', 'unknown')}") - print(f" step_types: {scope.get('step_types', [])}") - print(f" step_names: {scope.get('step_names', [])}") - else: - print(" āš ļø WARNING: No controls loaded! Agent will allow all operations.") - - # DEBUG: Check registered steps - registered_steps = agent_control.get_registered_steps() - print(f"\nšŸ” DEBUG: Registered steps: {len(registered_steps)}") - for step in registered_steps: - print(f" - {step.get('type', 'unknown')}:{step.get('name', 'unknown')}") - - print("\n" + "="*80) - print("šŸ¦ WEALTHBANK WIRE TRANSFER ASSISTANT") - print("="*80) - agent_say("Hello! I'm your banking assistant. I can help you send wire transfers.") - agent_say("I'm governed by AgentControl to ensure compliance and security.") - - app = create_banking_graph() - - while True: - print("\n" + "-"*80) +async def run_banking_session() -> None: + """Initialize the SDK and run the interactive banking session.""" + agent_control.init( + agent_name="banking-transaction-agent", + server_url=SERVER_URL + ) - try: - # Get user request - request = user_input("Describe the wire transfer you'd like to make (or 'quit'): ") + controls = agent_control.get_server_controls() + print(f"\nšŸ” DEBUG: Loaded {len(controls) if controls else 0} controls from server") + if controls: + for c in controls: + control_def = c.get('control', {}) + scope = control_def.get('scope', {}) + print(f" - {c.get('name', 'unknown')}") + print(f" execution: {control_def.get('execution', 'unknown')}") + print(f" step_types: {scope.get('step_types', [])}") + print(f" step_names: {scope.get('step_names', [])}") + else: + print(" āš ļø WARNING: No controls loaded! Agent will allow all operations.") - if request.lower() in ['quit', 'exit', 'q']: - agent_say("Goodbye! Have a great day!") - break + registered_steps = agent_control.get_registered_steps() + print(f"\nšŸ” DEBUG: Registered steps: {len(registered_steps)}") + for step in registered_steps: + print(f" - {step.get('type', 'unknown')}:{step.get('name', 'unknown')}") - if not request: - continue + print("\n" + "="*80) + print("šŸ¦ WEALTHBANK WIRE TRANSFER ASSISTANT") + print("="*80) + agent_say("Hello! I'm your banking assistant. I can help you send wire transfers.") + agent_say("I'm governed by AgentControl to ensure compliance and security.") - # Parse request - transfer_data = await parse_transfer_request(request) + app = create_banking_graph() - # Confirm understanding - agent_say(f"I understand you want to send ${transfer_data['amount']:,.2f} to {transfer_data['recipient_name']} in {transfer_data['destination_country']}.") + while True: + print("\n" + "-"*80) - confirm = user_input("Is this correct? (yes/no): ") - if confirm.lower() not in ['yes', 'y']: - agent_say("Let's try again.") - continue + try: + request = user_input("Describe the wire transfer you'd like to make (or 'quit'): ") - # Process transfer - agent_say("Let me process this transfer for you...") + if request.lower() in ['quit', 'exit', 'q']: + agent_say("Goodbye! Have a great day!") + break - initial_state: AgentState = { - "messages": [], - "transfer_request": transfer_data, - "fraud_score": None, - "verified_2fa": False, - "manager_approved": False, - "justification": None, - "final_result": None, - "status": "processing" - } + if not request: + continue + + transfer_data = await parse_transfer_request(request) + agent_say(f"I understand you want to send ${transfer_data['amount']:,.2f} to {transfer_data['recipient_name']} in {transfer_data['destination_country']}.") + + confirm = user_input("Is this correct? (yes/no): ") + if confirm.lower() not in ['yes', 'y']: + agent_say("Let's try again.") + continue + + agent_say("Let me process this transfer for you...") + initial_state: AgentState = { + "messages": [], + "transfer_request": transfer_data, + "fraud_score": None, + "verified_2fa": False, + "manager_approved": False, + "justification": None, + "final_result": None, + "status": "processing" + } + + await app.ainvoke(initial_state) + print("\n" + "-"*80) - await app.ainvoke(initial_state) + except KeyboardInterrupt: + print("\n") + agent_say("Session interrupted. Goodbye!") + break + except Exception as e: + agent_say(f"An error occurred: {e}") - # Done - print("\n" + "-"*80) + print("\n" + "="*80) + print("Thank you for using WealthBank!") + print("="*80) - except KeyboardInterrupt: - print("\n") - agent_say("Session interrupted. Goodbye!") - break - except Exception as e: - agent_say(f"An error occurred: {e}") - print("\n" + "="*80) - print("Thank you for using WealthBank!") - print("="*80) +async def run_banking_agent(): + """Run the conversational banking agent.""" + try: + await run_banking_session() finally: await agent_control.ashutdown()