From 93de1632cbc917ea38939fa804be089ee0edab98 Mon Sep 17 00:00:00 2001 From: jL2718 Date: Wed, 17 Dec 2025 18:17:36 -0800 Subject: [PATCH 1/9] feat: Add script for RSA and Ed25519 key generation --- tap-agent/genkeys.sh | 31 +++++++++++++++++++++++++++++++ 1 file changed, 31 insertions(+) create mode 100755 tap-agent/genkeys.sh diff --git a/tap-agent/genkeys.sh b/tap-agent/genkeys.sh new file mode 100755 index 0000000..648ff70 --- /dev/null +++ b/tap-agent/genkeys.sh @@ -0,0 +1,31 @@ +#!/bin/bash + +# Generate RSA keys +openssl genpkey -algorithm RSA -pkeyopt rsa_keygen_bits:2048 -out rsa_private.pem +openssl rsa -pubout -in rsa_private.pem -out rsa_public.pem + +# Format RSA keys for .env (single line with \n) +RSA_PRIVATE_KEY=$(awk '{printf "%s\\n", $0}' rsa_private.pem | sed 's/\\n$//') +RSA_PUBLIC_KEY=$(awk '{printf "%s\\n", $0}' rsa_public.pem | sed 's/\\n$//') + +# Generate Ed25519 keys +openssl genpkey -algorithm Ed25519 -out ed_private.pem +ED25519_PRIVATE_KEY=$(openssl pkey -in ed_private.pem -outform DER | tail -c 32 | base64) +ED25519_PUBLIC_KEY=$(openssl pkey -in ed_private.pem -pubout -outform DER | tail -c 32 | base64) + +# Append to .env +cat << EOF >> .env + +# RSA Keys (for RSA-PSS-SHA256 signatures) +RSA_PRIVATE_KEY="$RSA_PRIVATE_KEY" +RSA_PUBLIC_KEY="$RSA_PUBLIC_KEY" + +# Ed25519 Keys (for Ed25519 signatures) +ED25519_PRIVATE_KEY="$ED25519_PRIVATE_KEY" +ED25519_PUBLIC_KEY="$ED25519_PUBLIC_KEY" +EOF + +# Cleanup +rm rsa_private.pem rsa_public.pem ed_private.pem + +echo "Keys generated and appended to .env" From 78176204287be292062710c78148434079feba46 Mon Sep 17 00:00:00 2001 From: jL2718 Date: Wed, 17 Dec 2025 21:49:23 -0800 Subject: [PATCH 2/9] switch to PEM files instead of environment variables --- .../app/security/signature_verification.py | 18 +- tap-agent/agent_app.py | 267 +++++------------- tap-agent/genkeys.sh | 36 +-- 3 files changed, 94 insertions(+), 227 deletions(-) diff --git a/merchant-backend/app/security/signature_verification.py b/merchant-backend/app/security/signature_verification.py index acafe35..be58373 100644 --- a/merchant-backend/app/security/signature_verification.py +++ b/merchant-backend/app/security/signature_verification.py @@ -16,15 +16,11 @@ import base64 import hashlib -publicKey = """-----BEGIN PUBLIC KEY----- -MIIBIjANBgkqhkiG9w0BAQEFAAOCAQ8AMIIBCgKCAQEAysHJFJ9uoVvU1sH2x3TV -bwW3nfyp34eOb8w177Ei/Bx8pk+8Ibu1yulV0nCBl/c9insg1k2x7dw1jRDZHJBG -wIpCdRL0GKm6qIdtsjeOcMnkI5ET0zGpxkhuRUwRblYW3LAdAq1Gja1WSPQKRT8r -EhUsmSlDWgAf0rFna15Ok6zOO3q21LtrEjnJSrgO+cr33YH0IAdALD7hqtPYK7+/ -7dD/XIgGW9cX0USMxdDBUt8TnN2TYar5YXetpMnFOPpQHiGpKTkDwrRggcthUyuC -e1CoL/9a/DWilJwd481QkurvZqGaKegX5DlI+jLNvfi8TWMS3jCjknyOLg54KU86 -LQIDAQAB ------END PUBLIC KEY-----""" +with open("rsa_public.pem", "r") as f: + publicKey_rsa = f.read() + +with open("ed25519_public.pem", "r") as f: + publicKey_ed25519 = f.read() class SignatureVerifier: def __init__(self): @@ -44,10 +40,10 @@ def _load_public_key(self, agent_name: str): """Load public key for the agent. In production, load from secure storage.""" if agent_name == "example": - return serialization.load_pem_public_key(publicKey.encode("utf-8")) + return serialization.load_pem_public_key(publicKey_rsa.encode("utf-8")) elif agent_name == "sample": # Replace with actual sample public key in production - return serialization.load_pem_public_key(publicKey.encode("utf-8")) + return serialization.load_pem_public_key(publicKey_rsa.encode("utf-8")) else: raise ValueError(f"Unknown agent name: {agent_name}") diff --git a/tap-agent/agent_app.py b/tap-agent/agent_app.py index 43b7862..e27805e 100644 --- a/tap-agent/agent_app.py +++ b/tap-agent/agent_app.py @@ -21,43 +21,43 @@ import threading import datetime import re +import typing as ty from urllib.parse import urlencode from cryptography.hazmat.primitives import hashes, serialization -from cryptography.hazmat.primitives.asymmetric import rsa, padding +from cryptography.hazmat.primitives.asymmetric import rsa, ed25519, padding from cryptography.hazmat.backends import default_backend -# Get RSA keys from environment variables -def get_static_keys_from_env(): - """Get RSA keys from environment variables""" - private_key = os.getenv('RSA_PRIVATE_KEY') - public_key = os.getenv('RSA_PUBLIC_KEY') +# Global variable to store product extraction results across threads +_product_extraction_results = None + +# Global variable to store order completion results across threads +_order_completion_results = None + +def get_ed25519_keys(): + """Get Ed25519 keys from environment ed25519_private.pem and ed25519_public.pem""" + with open("./ed25519_private.pem", "rb") as f: + private_key = serialization.load_pem_private_key(f.read(), password=None) + assert isinstance(private_key, ed25519.Ed25519PrivateKey) - if not private_key or not public_key: - raise ValueError("RSA_PRIVATE_KEY and RSA_PUBLIC_KEY must be set in environment variables") + with open("./ed25519_public.pem", "rb") as f: + public_key = serialization.load_pem_public_key(f.read()) + assert isinstance(public_key, ed25519.Ed25519PublicKey) return private_key, public_key -def get_ed25519_keys_from_env(): - """Get Ed25519 keys from environment variables""" - private_key = os.getenv('ED25519_PRIVATE_KEY') - public_key = os.getenv('ED25519_PUBLIC_KEY') +def get_rsa_keys(): + """Get RSA keys from environment rsa_private.pem and rsa_public.pem""" + with open("./rsa_private.pem", "rb") as f: + private_key = serialization.load_pem_private_key(f.read(), password=None) + assert isinstance(private_key, rsa.RSAPrivateKey) - if not private_key or not public_key: - raise ValueError("ED25519_PRIVATE_KEY and ED25519_PUBLIC_KEY must be set in environment variables") + with open("./rsa_public.pem", "rb") as f: + public_key = serialization.load_pem_public_key(f.read()) + assert isinstance(public_key, rsa.RSAPublicKey) return private_key, public_key -# Global variable to store product extraction results across threads -_product_extraction_results = None - -# Global variable to store order completion results across threads -_order_completion_results = None - -def get_static_keys(): - """Return the static private and public keys from environment variables""" - return get_static_keys_from_env() - -def create_http_message_signature(private_key_pem: str, authority: str, path: str, keyid: str, nonce: str, created: int, expires: int, tag: str) -> tuple[str, str]: +def create_http_message_signature(private_key: ty.Union[rsa.RSAPrivateKey, ed25519.Ed25519PrivateKey], authority: str, path: str, keyid: str, nonce: str, created: int, expires: int, tag: str) -> tuple[str, str]: """Create HTTP Message Signature following RFC 9421 syntax""" try: # Create signature parameters string @@ -75,13 +75,7 @@ def create_http_message_signature(private_key_pem: str, authority: str, path: st print(f"🌐 Authority: {authority}") print(f"📍 Path: {path}") print(f"📋 Signature Params: {signature_params}") - - # Load private key - private_key = serialization.load_pem_private_key( - private_key_pem.encode('utf-8'), - password=None, - backend=default_backend() - ) + # Sign the signature base string using RSA-PSS (matching the algorithm declared) signature = private_key.sign( @@ -111,6 +105,47 @@ def create_http_message_signature(private_key_pem: str, authority: str, path: st print(f"❌ Error creating HTTP message signature: {str(e)}") return "", "" +def create_ed25519_signature(private_key: ed25519.Ed25519PrivateKey, authority: str, path: str, keyid: str, nonce: str, created: int, expires: int, tag: str) -> tuple[str, str]: + """Create HTTP Message Signature using Ed25519 following RFC 9421""" + try: + from cryptography.hazmat.primitives.asymmetric import ed25519 + + print(f"🔐 Creating Ed25519 signature...") + print(f"🌐 Authority: {authority}") + print(f"📍 Path: {path}") + + # Create signature parameters string + signature_params = f'("@authority" "@path"); created={created}; expires={expires}; keyId="{keyid}"; alg="ed25519"; nonce="{nonce}"; tag="{tag}"' + + # Create the signature base string + signature_base_lines = [ + f'"@authority": {authority}', + f'"@path": {path}', + f'"@signature-params": {signature_params}' + ] + signature_base = '\n'.join(signature_base_lines) + + print(f"🔐 Ed25519 Signature Base String:\n{signature_base}") + + # Sign with Ed25519 (no padding needed) + signature = private_key.sign(signature_base.encode('utf-8')) + signature_b64 = base64.b64encode(signature).decode('utf-8') + + # Format headers + signature_input_header = f'sig2=("@authority" "@path"); created={created}; expires={expires}; keyId="{keyid}"; alg="ed25519"; nonce="{nonce}"; tag="{tag}"' + signature_header = f'sig2=:{signature_b64}:' + + print(f"✅ Created Ed25519 signature") + print(f"📤 Signature-Input: {signature_input_header}") + print(f"🔒 Signature: {signature_header}") + + return signature_input_header, signature_header + + except Exception as e: + print(f"❌ Error creating Ed25519 signature: {str(e)}") + st.error(f"Error creating Ed25519 signature: {str(e)}") + return "", "" + def parse_url_components(url: str) -> tuple[str, str]: """Parse URL to extract authority and path components for RFC 9421""" try: @@ -1229,152 +1264,6 @@ def run_full_checkout(): except Exception as e: return False, {'error': f'Checkout error: {str(e)}', 'timestamp': time.strftime('%Y-%m-%d %H:%M:%S')} -def create_signature(private_key_pem: str, json_data: str) -> str: - """Create a signature using the private key with JSON data and base64 encoding""" - try: - # Parse JSON to validate it - try: - parsed_json = json.loads(json_data) - except json.JSONDecodeError as e: - st.error(f"Invalid JSON format: {str(e)}") - return "" - - # Convert JSON to string (compact format) - json_string = json.dumps(parsed_json, separators=(',', ':'), sort_keys=True) - - # Base64 encode the JSON string - base64_data = base64.b64encode(json_string.encode('utf-8')).decode('utf-8') - print(f"Base64 Encoded Data: {base64_data}") - - # Load private key - private_key = serialization.load_pem_private_key( - private_key_pem.encode('utf-8'), - password=None, - backend=default_backend() - ) - - # Sign the base64 encoded data - signature = private_key.sign( - base64_data.encode('utf-8'), - padding.PSS( - mgf=padding.MGF1(hashes.SHA256()), - salt_length=padding.PSS.MAX_LENGTH - ), - hashes.SHA256() - ) - - return base64.b64encode(signature).decode('utf-8') - except Exception as e: - st.error(f"Error creating signature: {str(e)}") - return "" - -def create_http_message_signature(private_key_pem: str, authority: str, path: str, keyid: str, nonce: str, created: int, expires: int, tag: str) -> tuple[str, str]: - """Create HTTP Message Signature following RFC 9421 syntax""" - try: - # Create signature parameters string - signature_params = f'("@authority" "@path"); created={created}; expires={expires}; keyId="{keyid}"; alg="rsa-pss-sha256"; nonce="{nonce}"; tag="{tag}"' - - # Create the signature base string following RFC 9421 format - signature_base_lines = [ - f'"@authority": {authority}', - f'"@path": {path}', - f'"@signature-params": {signature_params}' - ] - signature_base = '\n'.join(signature_base_lines) - - print(f"Signature Base String:\n{signature_base}") - print(f"Authority: {authority}") - print(f"Path: {path}") - print(f"Signature Params: {signature_params}") - - # Load private key - private_key = serialization.load_pem_private_key( - private_key_pem.encode('utf-8'), - password=None, - backend=default_backend() - ) - - # Sign the signature base string using RSA-PSS (matching the algorithm declared) - signature = private_key.sign( - signature_base.encode('utf-8'), - padding.PSS( - mgf=padding.MGF1(hashes.SHA256()), - salt_length=padding.PSS.MAX_LENGTH - ), - hashes.SHA256() - ) - - signature_b64 = base64.b64encode(signature).decode('utf-8') - - # Format the signature-input header (RFC 9421 format) - signature_input_header = f'sig2=("@authority" "@path"); created={created}; expires={expires}; keyId="{keyid}"; alg="rsa-pss-sha256"; nonce="{nonce}"; tag="{tag}"' - - # Format the signature header (RFC 9421 format) - signature_header = f'sig2=:{signature_b64}:' - - return signature_input_header, signature_header - - except Exception as e: - st.error(f"Error creating HTTP message signature: {str(e)}") - return "", "" - -def create_ed25519_signature(private_key_pem: str, authority: str, path: str, keyid: str, nonce: str, created: int, expires: int, tag: str) -> tuple[str, str]: - """Create HTTP Message Signature using Ed25519 following RFC 9421""" - try: - from cryptography.hazmat.primitives.asymmetric import ed25519 - - print(f"🔐 Creating Ed25519 signature...") - print(f"🌐 Authority: {authority}") - print(f"📍 Path: {path}") - - # Create signature parameters string - signature_params = f'("@authority" "@path"); created={created}; expires={expires}; keyId="{keyid}"; alg="ed25519"; nonce="{nonce}"; tag="{tag}"' - - # Create the signature base string - signature_base_lines = [ - f'"@authority": {authority}', - f'"@path": {path}', - f'"@signature-params": {signature_params}' - ] - signature_base = '\n'.join(signature_base_lines) - - print(f"🔐 Ed25519 Signature Base String:\n{signature_base}") - - # Load Ed25519 keys from environment variables - try: - ed25519_private_b64, ed25519_public_b64 = get_ed25519_keys_from_env() - print(f"🔑 Using Ed25519 keys from environment variables") - except ValueError as e: - print(f"❌ Ed25519 keys not found in environment: {e}") - st.error(f"Ed25519 keys not configured. Please add ED25519_PRIVATE_KEY and ED25519_PUBLIC_KEY to your .env file.") - return "", "" - - # Load private key from base64 - private_bytes = base64.b64decode(ed25519_private_b64) - private_key = ed25519.Ed25519PrivateKey.from_private_bytes(private_bytes) - - print(f"🔑 Using Ed25519 Private Key: {ed25519_private_b64[:20]}...") - print(f"🔑 Using Ed25519 Public Key: {ed25519_public_b64[:20]}...") - - # Sign with Ed25519 (no padding needed) - signature = private_key.sign(signature_base.encode('utf-8')) - signature_b64 = base64.b64encode(signature).decode('utf-8') - - # Format headers - signature_input_header = f'sig2=("@authority" "@path"); created={created}; expires={expires}; keyId="{keyid}"; alg="ed25519"; nonce="{nonce}"; tag="{tag}"' - signature_header = f'sig2=:{signature_b64}:' - - print(f"✅ Created Ed25519 signature") - print(f"📤 Signature-Input: {signature_input_header}") - print(f"🔒 Signature: {signature_header}") - - return signature_input_header, signature_header - - except Exception as e: - print(f"❌ Error creating Ed25519 signature: {str(e)}") - st.error(f"Error creating Ed25519 signature: {str(e)}") - return "", "" - def parse_url_components(url: str) -> tuple[str, str]: """Parse URL to extract authority and path components""" try: @@ -1424,9 +1313,9 @@ def main(): # Initialize session state with static keys if 'private_key' not in st.session_state: - st.session_state.private_key = "" + st.session_state.rsa_private_key = "" if 'public_key' not in st.session_state: - st.session_state.public_key = "" + st.session_state.rsa_public_key = "" if 'ed25519_private_key' not in st.session_state: st.session_state.ed25519_private_key = "" if 'ed25519_public_key' not in st.session_state: @@ -1457,15 +1346,15 @@ def main(): st.session_state.input_data = json.dumps(default_input, indent=2) # Load static keys if not already loaded - if not st.session_state.private_key or not st.session_state.public_key: - private_key, public_key = get_static_keys() - st.session_state.private_key = private_key - st.session_state.public_key = public_key + if not st.session_state.rsa_private_key or not st.session_state.rsa_public_key: + private_key, public_key = get_rsa_keys() + st.session_state.rsa_private_key = private_key + st.session_state.rsa_public_key = public_key # Load Ed25519 keys if not already loaded if not st.session_state.ed25519_private_key or not st.session_state.ed25519_public_key: try: - ed25519_private_key, ed25519_public_key = get_ed25519_keys_from_env() + ed25519_private_key, ed25519_public_key = get_ed25519_keys() st.session_state.ed25519_private_key = ed25519_private_key st.session_state.ed25519_public_key = ed25519_public_key except ValueError: @@ -1611,7 +1500,7 @@ def update_input_data_with_action(): else: launch_disabled = False else: # rsa-pss-sha256 - if not st.session_state.private_key: + if not st.session_state.rsa_private_key: st.warning("Please configure RSA keys in your .env file first") launch_disabled = True else: @@ -1629,7 +1518,7 @@ def update_input_data_with_action(): # Single launch button that adapts to the selected action if st.button(button_text, type="primary", disabled=launch_disabled, help=button_help): - if st.session_state.private_key: + if st.session_state.rsa_private_key: import time spinner_text = f"Creating RFC 9421 signature and {'fetching product details' if action_choice == 'Product Details' else 'completing checkout'}..." @@ -1657,7 +1546,7 @@ def update_input_data_with_action(): # Create RFC 9421 compliant signature using selected algorithm if signature_algorithm == "ed25519": signature_input_header, signature_header = create_ed25519_signature( - private_key_pem="", # Ed25519 function will load from environment + private_key=st.session_state.ed25519_private_key, authority=authority, path=path, keyid="primary-ed25519", @@ -1668,7 +1557,7 @@ def update_input_data_with_action(): ) else: # rsa-pss-sha256 signature_input_header, signature_header = create_http_message_signature( - private_key_pem=st.session_state.private_key, + private_key=st.session_state.rsa_private_key, authority=authority, path=path, keyid="primary", diff --git a/tap-agent/genkeys.sh b/tap-agent/genkeys.sh index 648ff70..210ef34 100755 --- a/tap-agent/genkeys.sh +++ b/tap-agent/genkeys.sh @@ -1,31 +1,13 @@ #!/bin/bash -# Generate RSA keys -openssl genpkey -algorithm RSA -pkeyopt rsa_keygen_bits:2048 -out rsa_private.pem -openssl rsa -pubout -in rsa_private.pem -out rsa_public.pem +# Generate RSA keys (optional - for RSA-PSS-SHA256) +openssl genrsa -out rsa_private.pem 2048 +openssl rsa -in rsa_private.pem -pubout -out rsa_public.pem -# Format RSA keys for .env (single line with \n) -RSA_PRIVATE_KEY=$(awk '{printf "%s\\n", $0}' rsa_private.pem | sed 's/\\n$//') -RSA_PUBLIC_KEY=$(awk '{printf "%s\\n", $0}' rsa_public.pem | sed 's/\\n$//') +# Generate Ed25519 keys (optional - for Ed25519) +openssl genpkey -algorithm Ed25519 -out ed25519_private.pem +openssl pkey -in ed25519_private.pem -pubout -out ed25519_public.pem -# Generate Ed25519 keys -openssl genpkey -algorithm Ed25519 -out ed_private.pem -ED25519_PRIVATE_KEY=$(openssl pkey -in ed_private.pem -outform DER | tail -c 32 | base64) -ED25519_PUBLIC_KEY=$(openssl pkey -in ed_private.pem -pubout -outform DER | tail -c 32 | base64) - -# Append to .env -cat << EOF >> .env - -# RSA Keys (for RSA-PSS-SHA256 signatures) -RSA_PRIVATE_KEY="$RSA_PRIVATE_KEY" -RSA_PUBLIC_KEY="$RSA_PUBLIC_KEY" - -# Ed25519 Keys (for Ed25519 signatures) -ED25519_PRIVATE_KEY="$ED25519_PRIVATE_KEY" -ED25519_PUBLIC_KEY="$ED25519_PUBLIC_KEY" -EOF - -# Cleanup -rm rsa_private.pem rsa_public.pem ed_private.pem - -echo "Keys generated and appended to .env" +# Copy public keys to merchant-backend/ +cp ./ed25519_public.pem ../merchant-backend/ed25519_public.pem +cp ./rsa_public.pem ../merchant-backend/rsa_public.pem From b7c9aa7dfbf5e97a580793c99942d4d3a97c60f5 Mon Sep 17 00:00:00 2001 From: jL2718 Date: Wed, 17 Dec 2025 22:19:31 -0800 Subject: [PATCH 3/9] split up agent_app components --- tap-agent/README.md | 4 +- tap-agent/agent.py | 292 ++++++++ tap-agent/agent_app.py | 1622 +++------------------------------------- tap-agent/crypto.py | 140 ++++ 4 files changed, 536 insertions(+), 1522 deletions(-) create mode 100644 tap-agent/agent.py create mode 100644 tap-agent/crypto.py diff --git a/tap-agent/README.md b/tap-agent/README.md index 0e316ec..6a0fadf 100644 --- a/tap-agent/README.md +++ b/tap-agent/README.md @@ -155,8 +155,8 @@ streamlit run agent_app.py --server.runOnSave true ### Environment Setup ```bash # Generate RSA keys (optional - for RSA-PSS-SHA256) -openssl genrsa -out private_key.pem 2048 -openssl rsa -in private_key.pem -pubout -out public_key.pem +openssl genrsa -out rsa_private_key.pem 2048 +openssl rsa -in rsa_private_key.pem -pubout -out rsa_public_key.pem # Generate Ed25519 keys (optional - for Ed25519) openssl genpkey -algorithm Ed25519 -out ed25519_private.pem diff --git a/tap-agent/agent.py b/tap-agent/agent.py new file mode 100644 index 0000000..81aa65f --- /dev/null +++ b/tap-agent/agent.py @@ -0,0 +1,292 @@ +import time +import threading +import datetime +import re +import streamlit as st +from typing import Tuple, Dict, Any, Optional + +# Global variables to store results across threads +_product_extraction_results = None +_order_completion_results = None + +def get_product_extraction_results(): + global _product_extraction_results + return _product_extraction_results + +def get_order_completion_results(): + global _order_completion_results + return _order_completion_results + +def set_product_extraction_results(results): + global _product_extraction_results + _product_extraction_results = results + +def set_order_completion_results(results): + global _order_completion_results + _order_completion_results = results + +def launch_with_playwright(url: str, headers: dict) -> bool: + """Launch browser with headers using Playwright""" + try: + # Check if playwright is installed + from playwright.sync_api import sync_playwright + import threading + import time + + def run_browser(): + """Run browser in a separate thread to keep it alive""" + with sync_playwright() as p: + # Launch browser with additional options to handle network issues + browser = p.chromium.launch( + headless=False, + args=[ + '--disable-web-security', + '--disable-features=VizDisplayCompositor', + '--ignore-certificate-errors', + '--ignore-ssl-errors', + '--ignore-certificate-errors-spki-list' + ] + ) + + # Create context with signature headers applied to all requests + context = browser.new_context( + extra_http_headers=headers, + ignore_https_errors=True, + viewport={'width': 1280, 'height': 720} + ) + + page = context.new_page() + + print(f"🔧 Browser context created with signature headers") + print("📨 Signature Headers:") + for key, value in headers.items(): + if key == 'signature': + print(f" {key}: {value[:20]}..." if len(value) > 20 else f" {key}: {value}") + else: + print(f" {key}: {value}") + + # Add request/response interceptors to handle failed API calls + def handle_request(request): + if 'api' in request.url.lower() or request.method == 'OPTIONS': + print(f"API Request: {request.method} {request.url}") + + def handle_response(response): + if response.status >= 400: + print(f"Failed Request: {response.status} {response.request.method} {response.url}") + + page.on('request', handle_request) + page.on('response', handle_response) + + def handle_console(msg): + if msg.type == 'error': + print(f"Console Error: {msg.text}") + + page.on('console', handle_console) + + try: + page.goto(url, wait_until='domcontentloaded', timeout=30000) + print(f"✅ Successfully navigated to: {url}") + time.sleep(3) + + product_info = {} + + # Common selectors + title_selectors = ['h1', '[data-testid="product-title"]', '.product-title', '.product-name', '[class*="title"]', '[class*="product"]', 'title'] + price_selectors = ['[data-testid="price"]', '.price', '.product-price', '[class*="price"]', '[class*="cost"]', '[class*="amount"]', 'span:has-text("$")', 'span:has-text("€")', 'span:has-text("£")'] + + for selector in title_selectors: + try: + title_element = page.query_selector(selector) + if title_element: + title_text = title_element.inner_text().strip() + if title_text and len(title_text) > 3: + product_info['title'] = title_text + break + except: continue + + for selector in price_selectors: + try: + price_element = page.query_selector(selector) + if price_element: + price_text = price_element.inner_text().strip() + if price_text and (any(char in price_text for char in ['$', '€', '£', '¥']) or any(char.isdigit() for char in price_text)): + product_info['price'] = price_text + break + except: continue + + if not product_info.get('title'): + try: product_info['title'] = page.title() + except: pass + + if not product_info.get('price'): + try: + all_text = page.content() + price_pattern = r'[\$€£¥]\s*\d+(?:[.,]\d{2})?|\d+(?:[.,]\d{2})?\s*[\$€£¥]' + prices = re.findall(price_pattern, all_text) + if prices: product_info['price'] = prices[0] + except: pass + + extraction_log = ["🛍️ PRODUCT EXTRACTION RESULTS", "="*50] + extraction_log.append(f"📦 Title: {product_info.get('title', 'Not found')}") + extraction_log.append(f"💰 Price: {product_info.get('price', 'Not found')}") + extraction_log.append("="*50) + + set_product_extraction_results({ + 'title': product_info.get('title'), + 'price': product_info.get('price'), + 'url': url, + 'extraction_time': datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S"), + 'extraction_log': '\n'.join(extraction_log) + }) + + print("\n" + '\n'.join(extraction_log)) + time.sleep(3) + except Exception as e: + print(f"❌ Navigation or extraction error: {e}") + finally: + try: browser.close() + except: pass + + browser_thread = threading.Thread(target=run_browser, daemon=True) + browser_thread.start() + time.sleep(2) + st.success("✅ Browser launched with headers!") + return True + except Exception as e: + st.error(f"Error launching browser: {str(e)}") + return False + +def complete_checkout_with_playwright(product_url: str, cart_url: str, checkout_url: str, headers: dict = None) -> Tuple[bool, dict]: + """Complete full checkout process""" + try: + from playwright.sync_api import sync_playwright + import threading + import time + import re + + set_order_completion_results(None) + if headers is None: headers = {} + + def run_full_checkout(): + with sync_playwright() as p: + browser = p.chromium.launch(headless=False, args=['--disable-web-security', '--disable-features=VizDisplayCompositor', '--ignore-certificate-errors', '--ignore-ssl-errors']) + context = browser.new_context(extra_http_headers=headers, ignore_https_errors=True, viewport={'width': 1280, 'height': 720}) + page = context.new_page() + + try: + # STEP 1: Product Page + print(f"🛍️ STEP 1: {product_url}") + page.goto(product_url, wait_until='domcontentloaded', timeout=30000) + time.sleep(3) + + # STEP 2: Add to Cart + print(f"🛒 STEP 2: Add to Cart") + add_to_cart_selectors = ['button:has-text("Add to Cart")', 'button:has-text("Add To Cart")', '[data-testid="add-to-cart"]', '.add-cart', '#addToCart'] + cart_added = False + for selector in add_to_cart_selectors: + try: + btn = page.query_selector(selector) + if btn and btn.is_visible(): + btn.click() + cart_added = True + break + except: continue + + if not cart_added: + try: + btns = page.query_selector_all('button') + for btn in btns[:10]: + if any(phrase in btn.inner_text().lower() for phrase in ['add', 'cart', 'buy']): + btn.click() + cart_added = True + break + except: pass + + time.sleep(2) + + # STEP 3: Cart Page + print(f"🛒 STEP 3: {cart_url}") + page.goto(cart_url, wait_until='domcontentloaded', timeout=30000) + time.sleep(3) + + # STEP 4: Proceed to Checkout + print(f"➡️ STEP 4: Proceed to Checkout") + proceed_selectors = ['button:has-text("Proceed to Checkout")', 'button:has-text("Checkout")', 'a:has-text("Checkout")', '[data-testid="checkout"]', '#checkout'] + proceeded = False + for selector in proceed_selectors: + try: + btn = page.query_selector(selector) + if btn and btn.is_visible(): + btn.click() + proceeded = True + break + except: continue + + if not proceeded: + page.goto(checkout_url, wait_until='domcontentloaded', timeout=30000) + + time.sleep(3) + + # STEP 5: Form + print(f"📝 STEP 5: Fill Form") + checkout_info = { + 'email': 'john.doe@example.com', 'phone': '+1-555-0123', 'firstName': 'John', 'lastName': 'Doe', + 'company': 'Example Company', 'address1': '123 Main St', 'city': 'New York', 'state': 'NY', 'zipCode': '10001', + 'cardNumber': '4111111111111111', 'expiryDate': '12/25', 'cvv': '123', 'nameOnCard': 'John Doe' + } + + for field, value in checkout_info.items(): + try: + elem = page.query_selector(f'[name="{field}"], #{field}, [placeholder*="{field}"]') + if elem and elem.is_visible(): elem.fill(value) + except: continue + + time.sleep(2) + + # Submit + submit_selectors = ['button[type="submit"]', 'button:has-text("Complete Order")', 'button:has-text("Place Order")'] + submitted = False + for selector in submit_selectors: + try: + btn = page.query_selector(selector) + if btn and btn.is_visible(): + btn.click() + submitted = True + break + except: continue + + if submitted: + print("✅ Submitted") + success_reached = False + for _ in range(15): + if any(p in page.url.lower() for p in ['success', 'confirmation', 'thank-you']): + success_reached = True; break + time.sleep(1) + + order_id = None + if success_reached: + time.sleep(2) + match = re.search(r'Order\s*#\s*([A-Z0-9-]+)', page.content(), re.IGNORECASE) + if match: order_id = match.group(1) + + set_order_completion_results({ + 'order_id': order_id, + 'success_page_url': page.url, + 'timestamp': time.strftime('%Y-%m-%d %H:%M:%S') + }) + + time.sleep(3) + except Exception as e: + print(f"❌ Checkout error: {e}") + finally: + browser.close() + + t = threading.Thread(target=run_full_checkout, daemon=True) + t.start() + t.join(timeout=120) + + res = get_order_completion_results() + if res: return True, res + return False, {'error': 'Timed out'} + except Exception as e: + return False, {'error': str(e)} diff --git a/tap-agent/agent_app.py b/tap-agent/agent_app.py index e27805e..5a810b4 100644 --- a/tap-agent/agent_app.py +++ b/tap-agent/agent_app.py @@ -1,1287 +1,26 @@ -# © 2025 Visa. -# -# Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the "Software"), to deal in the Software without restriction, including without limitation the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and to permit persons to whom the Software is furnished to do so, subject to the following conditions: -# -# The above copyright notice and this permission notice shall be included in all copies or substantial portions of the Software. -# -# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. - +import streamlit as st import uuid +import json +import time import os -import streamlit as st from dotenv import load_dotenv # Load environment variables load_dotenv() -import base64 -import json -import time -import webbrowser -import requests -import threading -import datetime -import re -import typing as ty -from urllib.parse import urlencode -from cryptography.hazmat.primitives import hashes, serialization -from cryptography.hazmat.primitives.asymmetric import rsa, ed25519, padding -from cryptography.hazmat.backends import default_backend - -# Global variable to store product extraction results across threads -_product_extraction_results = None - -# Global variable to store order completion results across threads -_order_completion_results = None - -def get_ed25519_keys(): - """Get Ed25519 keys from environment ed25519_private.pem and ed25519_public.pem""" - with open("./ed25519_private.pem", "rb") as f: - private_key = serialization.load_pem_private_key(f.read(), password=None) - assert isinstance(private_key, ed25519.Ed25519PrivateKey) - - with open("./ed25519_public.pem", "rb") as f: - public_key = serialization.load_pem_public_key(f.read()) - assert isinstance(public_key, ed25519.Ed25519PublicKey) - - return private_key, public_key - -def get_rsa_keys(): - """Get RSA keys from environment rsa_private.pem and rsa_public.pem""" - with open("./rsa_private.pem", "rb") as f: - private_key = serialization.load_pem_private_key(f.read(), password=None) - assert isinstance(private_key, rsa.RSAPrivateKey) - - with open("./rsa_public.pem", "rb") as f: - public_key = serialization.load_pem_public_key(f.read()) - assert isinstance(public_key, rsa.RSAPublicKey) - - return private_key, public_key - -def create_http_message_signature(private_key: ty.Union[rsa.RSAPrivateKey, ed25519.Ed25519PrivateKey], authority: str, path: str, keyid: str, nonce: str, created: int, expires: int, tag: str) -> tuple[str, str]: - """Create HTTP Message Signature following RFC 9421 syntax""" - try: - # Create signature parameters string - signature_params = f'("@authority" "@path"); created={created}; expires={expires}; keyId="{keyid}"; alg="rsa-pss-sha256"; nonce="{nonce}"; tag="{tag}"' - - # Create the signature base string following RFC 9421 format - signature_base_lines = [ - f'"@authority": {authority}', - f'"@path": {path}', - f'"@signature-params": {signature_params}' - ] - signature_base = '\n'.join(signature_base_lines) - - print(f"🔐 RFC 9421 Signature Base String:\n{signature_base}") - print(f"🌐 Authority: {authority}") - print(f"📍 Path: {path}") - print(f"📋 Signature Params: {signature_params}") - - - # Sign the signature base string using RSA-PSS (matching the algorithm declared) - signature = private_key.sign( - signature_base.encode('utf-8'), - padding.PSS( - mgf=padding.MGF1(hashes.SHA256()), - salt_length=padding.PSS.MAX_LENGTH - ), - hashes.SHA256() - ) - - signature_b64 = base64.b64encode(signature).decode('utf-8') - - # Format the signature-input header (RFC 9421 format) - signature_input_header = f'sig2=("@authority" "@path"); created={created}; expires={expires}; keyId="{keyid}"; alg="rsa-pss-sha256"; nonce="{nonce}"; tag="{tag}"' - - # Format the signature header (RFC 9421 format) - signature_header = f'sig2=:{signature_b64}:' - - print(f"✅ Created RFC 9421 compliant signature") - print(f"📤 Signature-Input: {signature_input_header}") - print(f"🔒 Signature: {signature_header}") - - return signature_input_header, signature_header - - except Exception as e: - print(f"❌ Error creating HTTP message signature: {str(e)}") - return "", "" - -def create_ed25519_signature(private_key: ed25519.Ed25519PrivateKey, authority: str, path: str, keyid: str, nonce: str, created: int, expires: int, tag: str) -> tuple[str, str]: - """Create HTTP Message Signature using Ed25519 following RFC 9421""" - try: - from cryptography.hazmat.primitives.asymmetric import ed25519 - - print(f"🔐 Creating Ed25519 signature...") - print(f"🌐 Authority: {authority}") - print(f"📍 Path: {path}") - - # Create signature parameters string - signature_params = f'("@authority" "@path"); created={created}; expires={expires}; keyId="{keyid}"; alg="ed25519"; nonce="{nonce}"; tag="{tag}"' - - # Create the signature base string - signature_base_lines = [ - f'"@authority": {authority}', - f'"@path": {path}', - f'"@signature-params": {signature_params}' - ] - signature_base = '\n'.join(signature_base_lines) - - print(f"🔐 Ed25519 Signature Base String:\n{signature_base}") - - # Sign with Ed25519 (no padding needed) - signature = private_key.sign(signature_base.encode('utf-8')) - signature_b64 = base64.b64encode(signature).decode('utf-8') - - # Format headers - signature_input_header = f'sig2=("@authority" "@path"); created={created}; expires={expires}; keyId="{keyid}"; alg="ed25519"; nonce="{nonce}"; tag="{tag}"' - signature_header = f'sig2=:{signature_b64}:' - - print(f"✅ Created Ed25519 signature") - print(f"📤 Signature-Input: {signature_input_header}") - print(f"🔒 Signature: {signature_header}") - - return signature_input_header, signature_header - - except Exception as e: - print(f"❌ Error creating Ed25519 signature: {str(e)}") - st.error(f"Error creating Ed25519 signature: {str(e)}") - return "", "" - -def parse_url_components(url: str) -> tuple[str, str]: - """Parse URL to extract authority and path components for RFC 9421""" - try: - from urllib.parse import urlparse - parsed = urlparse(url) - - # Authority is the host (and port if not default) - authority = parsed.netloc - - # Path includes the path and query parameters - path = parsed.path - if parsed.query: - path += f"?{parsed.query}" - - print(f"🔍 Parsed URL: {url}") - print(f"🌐 Authority: {authority}") - print(f"📍 Path: {path}") - - return authority, path - except Exception as e: - print(f"❌ Error parsing URL: {str(e)}") - return "", "" - -def launch_with_playwright(url: str, headers: dict) -> bool: - """Launch browser with headers using Playwright""" - try: - # Check if playwright is installed - from playwright.sync_api import sync_playwright - import threading - import time - - def run_browser(): - """Run browser in a separate thread to keep it alive""" - with sync_playwright() as p: - # Launch browser with additional options to handle network issues - browser = p.chromium.launch( - headless=False, - args=[ - '--disable-web-security', - '--disable-features=VizDisplayCompositor', - '--ignore-certificate-errors', - '--ignore-ssl-errors', - '--ignore-certificate-errors-spki-list' - ] - ) - - # Create context with signature headers applied to all requests - context = browser.new_context( - extra_http_headers=headers, - ignore_https_errors=True, - viewport={'width': 1280, 'height': 720} - ) - - page = context.new_page() - - print(f"🔧 Browser context created with signature headers") - print("📨 Signature Headers:") - for key, value in headers.items(): - if key == 'signature': - print(f" {key}: {value[:20]}..." if len(value) > 20 else f" {key}: {value}") - else: - print(f" {key}: {value}") - - # Add request/response interceptors to handle failed API calls - def handle_request(request): - # Log API calls - if 'api' in request.url.lower() or request.method == 'OPTIONS': - print(f"API Request: {request.method} {request.url}") - - def handle_response(response): - # Handle failed OPTIONS and API requests - if response.status >= 400: - print(f"Failed Request: {response.status} {response.request.method} {response.url}") - # Don't let failed API calls crash the browser - return - - # Set up request/response listeners - page.on('request', handle_request) - page.on('response', handle_response) - - # Handle console errors from the website - def handle_console(msg): - if msg.type == 'error': - print(f"Console Error: {msg.text}") - - page.on('console', handle_console) - - # Navigate to the URL with error handling - try: - page.goto(url, wait_until='domcontentloaded', timeout=30000) - print(f"✅ Successfully navigated to: {url}") - - # Wait a bit for the page to fully load - time.sleep(3) - - # Try to extract product information - product_info = {} - - # Common selectors for product title - title_selectors = [ - 'h1', - '[data-testid="product-title"]', - '.product-title', - '.product-name', - '[class*="title"]', - '[class*="product"]', - 'title' - ] - - # Common selectors for product price - price_selectors = [ - '[data-testid="price"]', - '.price', - '.product-price', - '[class*="price"]', - '[class*="cost"]', - '[class*="amount"]', - 'span:has-text("$")', - 'span:has-text("€")', - 'span:has-text("£")' - ] - - # Extract product title - for selector in title_selectors: - try: - title_element = page.query_selector(selector) - if title_element: - title_text = title_element.inner_text().strip() - if title_text and len(title_text) > 3: # Valid title - product_info['title'] = title_text - print(f"📦 Product Title: {title_text}") - break - except: - continue - - # Extract product price - for selector in price_selectors: - try: - price_element = page.query_selector(selector) - if price_element: - price_text = price_element.inner_text().strip() - # Check if it looks like a price (contains currency symbols or numbers) - if price_text and any(char in price_text for char in ['$', '€', '£', '¥']) or any(char.isdigit() for char in price_text): - product_info['price'] = price_text - print(f"💰 Product Price: {price_text}") - break - except: - continue - - # If we couldn't find specific elements, try generic extraction - if not product_info.get('title'): - try: - page_title = page.title() - if page_title: - product_info['title'] = page_title - print(f"📦 Page Title: {page_title}") - except: - pass - - if not product_info.get('price'): - try: - # Look for any text that contains currency symbols - all_text = page.content() - import re - price_pattern = r'[\$€£¥]\s*\d+(?:[.,]\d{2})?|\d+(?:[.,]\d{2})?\s*[\$€£¥]' - prices = re.findall(price_pattern, all_text) - if prices: - product_info['price'] = prices[0] - print(f"💰 Found Price: {prices[0]}") - except: - pass - - # Log the results and store globally - global _product_extraction_results - import datetime - - extraction_log = [] - extraction_log.append("🛍️ PRODUCT EXTRACTION RESULTS") - extraction_log.append("="*50) - - if product_info.get('title'): - extraction_log.append(f"📦 Title: {product_info['title']}") - print(f"📦 Title: {product_info['title']}") - else: - extraction_log.append("❌ Title: Not found") - print("❌ Title: Not found") - - if product_info.get('price'): - extraction_log.append(f"💰 Price: {product_info['price']}") - print(f"💰 Price: {product_info['price']}") - else: - extraction_log.append("❌ Price: Not found") - print("❌ Price: Not found") - - extraction_log.append("="*50) - - # Store results globally - _product_extraction_results = { - 'title': product_info.get('title'), - 'price': product_info.get('price'), - 'url': url, - 'extraction_time': datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S"), - 'extraction_log': '\n'.join(extraction_log) - } - - print("\n" + '\n'.join(extraction_log)) - - # Wait a moment before closing - print("⏳ Closing browser in 3 seconds...") - time.sleep(3) - - except Exception as e: - print(f"❌ Navigation or extraction error: {e}") - - # Close the browser automatically - try: - print("🔒 Closing browser...") - browser.close() - except Exception as e: - print(f"Error closing browser: {e}") - - # Start browser in a separate thread so it doesn't block Streamlit - browser_thread = threading.Thread(target=run_browser, daemon=True) - browser_thread.start() - - # Give it a moment to start - time.sleep(2) - - st.success("✅ Browser launched with headers!") - st.info("🤖 Browser will automatically extract product info and close.") - st.warning("� Check the terminal/console for extraction results.") - return True - - except ImportError: - return False - except Exception as e: - st.error(f"Error launching browser: {str(e)}") - return False - -def complete_checkout_with_playwright(product_url: str, cart_url: str, checkout_url: str, headers: dict = None) -> tuple[bool, dict]: - """Complete full checkout process: product page → add to cart → cart page → proceed to checkout → complete order""" - try: - # Check if playwright is installed - from playwright.sync_api import sync_playwright - import threading - import time - import re - - # Reset order completion results - global _order_completion_results - _order_completion_results = None - - # Ensure headers are provided - if headers is None: - headers = {} - - def run_full_checkout(): - """Run complete checkout process in a separate thread""" - with sync_playwright() as p: - # Launch browser with additional options - browser = p.chromium.launch( - headless=False, - args=[ - '--disable-web-security', - '--disable-features=VizDisplayCompositor', - '--ignore-certificate-errors', - '--ignore-ssl-errors' - ] - ) - - # Create context with signature headers applied to all requests - print(f"🔧 Setting up browser context with signature headers") - context = browser.new_context( - extra_http_headers=headers, - ignore_https_errors=True, - viewport={'width': 1280, 'height': 720} - ) - - page = context.new_page() - - # Log headers being sent - print("🛒 STARTING COMPLETE CHECKOUT PROCESS") - print("="*50) - print(f"📦 Product URL: {product_url}") - print(f"🛒 Cart URL: {cart_url}") - print(f"💳 Checkout URL: {checkout_url}") - print("📨 Signature Headers:") - for key, value in headers.items(): - if key == 'signature': - print(f" {key}: {value[:20]}..." if len(value) > 20 else f" {key}: {value}") - else: - print(f" {key}: {value}") - print("="*50) - - try: - # STEP 1: Navigate to product page - print(f"🛍️ STEP 1: Navigating to product page: {product_url}") - page.goto(product_url, wait_until='domcontentloaded', timeout=30000) - print(f"✅ Successfully navigated to product page") - - # Wait for page to load - time.sleep(3) - - # STEP 2: Find and click "Add to Cart" button - print(f"🛒 STEP 2: Looking for 'Add to Cart' button...") - - # Common selectors for "Add to Cart" buttons - add_to_cart_selectors = [ - 'button:has-text("Add to Cart")', - 'button:has-text("Add To Cart")', - 'button:has-text("ADD TO CART")', - '[data-testid="add-to-cart"]', - '[id*="add-to-cart"]', - '[class*="add-to-cart"]', - '.add-cart', - '.addToCart', - '#addToCart', - 'input[value*="Add to Cart"]', - 'button[title*="Add to Cart"]', - '.btn-add-cart', - '.cart-add' - ] - - cart_added = False - for selector in add_to_cart_selectors: - try: - add_button = page.query_selector(selector) - if add_button and add_button.is_visible(): - print(f"🎯 Found 'Add to Cart' button: {selector}") - add_button.click() - print(f"✅ Successfully clicked 'Add to Cart'") - cart_added = True - break - except Exception as e: - continue - - if not cart_added: - print("❌ Could not find 'Add to Cart' button") - # Try to find any button that might be add to cart - try: - all_buttons = page.query_selector_all('button') - for button in all_buttons[:10]: # Check first 10 buttons - text = button.inner_text().lower() - if any(phrase in text for phrase in ['add', 'cart', 'buy', 'purchase']): - print(f"🔄 Trying button with text: {text}") - button.click() - cart_added = True - break - except: - pass - - if cart_added: - # Wait a moment for cart update - time.sleep(2) - print("✅ Product added to cart successfully") - else: - print("⚠️ Could not add product to cart, proceeding anyway") - - # STEP 3: Navigate to cart page - print(f"🛒 STEP 3: Navigating to cart page: {cart_url}") - page.goto(cart_url, wait_until='domcontentloaded', timeout=30000) - print(f"✅ Successfully navigated to cart page") - - # Wait for cart page to load - time.sleep(3) - - # STEP 4: Find and click "Proceed to Checkout" button - print(f"➡️ STEP 4: Looking for 'Proceed to Checkout' button...") - - # Common selectors for "Proceed to Checkout" buttons - proceed_checkout_selectors = [ - 'button:has-text("Proceed to Checkout")', - 'button:has-text("Proceed To Checkout")', - 'button:has-text("PROCEED TO CHECKOUT")', - 'button:has-text("Checkout")', - 'button:has-text("CHECKOUT")', - 'a:has-text("Proceed to Checkout")', - 'a:has-text("Checkout")', - '[data-testid="proceed-to-checkout"]', - '[data-testid="checkout"]', - '[id*="proceed-checkout"]', - '[id*="checkout"]', - '[class*="proceed-checkout"]', - '[class*="checkout-btn"]', - '.proceed-checkout', - '.checkout-proceed', - '#proceedToCheckout', - '#checkout', - '.btn-checkout', - 'input[value*="Checkout"]', - 'button[title*="Checkout"]' - ] - - checkout_proceeded = False - for selector in proceed_checkout_selectors: - try: - proceed_button = page.query_selector(selector) - if proceed_button and proceed_button.is_visible(): - print(f"🎯 Found 'Proceed to Checkout' button: {selector}") - proceed_button.click() - print(f"✅ Successfully clicked 'Proceed to Checkout'") - checkout_proceeded = True - break - except Exception as e: - continue - - if not checkout_proceeded: - print("❌ Could not find 'Proceed to Checkout' button") - # Try to find any button that might be proceed to checkout - try: - all_buttons = page.query_selector_all('button, a') - for button in all_buttons[:15]: # Check first 15 buttons/links - text = button.inner_text().lower() - if any(phrase in text for phrase in ['proceed', 'checkout', 'continue', 'next']): - print(f"🔄 Trying button with text: {text}") - button.click() - checkout_proceeded = True - break - except: - pass - - if checkout_proceeded: - # Wait for navigation to checkout page - time.sleep(3) - print("✅ Successfully proceeded to checkout") - else: - print("⚠️ Could not proceed to checkout, trying direct navigation") - # Fallback: navigate directly to checkout page - print(f"🛒 STEP 4b: Direct navigation to checkout: {checkout_url}") - page.goto(checkout_url, wait_until='domcontentloaded', timeout=30000) - - # STEP 5: We should now be on the checkout page - print(f"✅ Now on checkout page (current URL: {page.url})") - - # Wait for checkout page to load - time.sleep(3) - - # STEP 5: Fill out checkout form - print(f"📝 STEP 5: Filling out comprehensive checkout form...") - - # First, let's scan the page to see what form elements are available - print("🔍 Scanning page for form elements...") - try: - all_inputs = page.query_selector_all('input, select, textarea') - print(f"📋 Found {len(all_inputs)} form elements on the page:") - - for i, input_elem in enumerate(all_inputs[:20]): # Show first 20 elements - try: - try: - tag = input_elem.evaluate('el => el.tagName') or 'unknown' - except: - tag = input_elem.get_attribute('tagName') or 'unknown' - - input_type = input_elem.get_attribute('type') or 'text' - name = input_elem.get_attribute('name') or 'no-name' - input_id = input_elem.get_attribute('id') or 'no-id' - placeholder = input_elem.get_attribute('placeholder') or 'no-placeholder' - visible = input_elem.is_visible() - enabled = input_elem.is_enabled() - - print(f" {i+1}. {tag.lower()}[{input_type}] name='{name}' id='{input_id}' placeholder='{placeholder}' visible={visible} enabled={enabled}") - except: - print(f" {i+1}. ") - - if len(all_inputs) > 20: - print(f" ... and {len(all_inputs) - 20} more elements") - - except Exception as e: - print(f"⚠️ Error scanning form elements: {e}") - - print("="*50) - - # Comprehensive checkout form data matching the React form structure - checkout_info = { - # Contact Information - 'email': 'john.doe@example.com', - 'phone': '+1-555-0123', - - # Shipping Address - 'firstName': 'John', - 'lastName': 'Doe', - 'company': 'Example Company Inc.', - 'address1': '123 Main Street', - 'address2': 'Suite 456', - 'city': 'New York', - 'state': 'NY', - 'zipCode': '10001', - 'country': 'United States', - - # Billing Address (initially same as shipping) - 'billingFirstName': 'John', - 'billingLastName': 'Doe', - 'billingCompany': 'Example Company Inc.', - 'billingAddress1': '123 Main Street', - 'billingAddress2': 'Suite 456', - 'billingCity': 'New York', - 'billingState': 'NY', - 'billingZipCode': '10001', - 'billingCountry': 'United States', - - # Payment Information - 'cardNumber': '4111111111111111', - 'expiryDate': '12/25', - 'cvv': '123', - 'nameOnCard': 'John Doe', - - # Additional Options - 'specialInstructions': 'Please handle with care - signature authentication sample order' - } - - # Comprehensive form field selectors matching React form structure - form_selectors = { - # Contact Information - 'email': ['#email', '[name="email"]', '[type="email"]', '[placeholder*="email"]', '[data-testid="email"]'], - 'phone': ['#phone', '[name="phone"]', '[type="tel"]', '[placeholder*="phone"]', '[data-testid="phone"]'], - - # Shipping Address - 'firstName': ['#firstName', '[name="firstName"]', '[placeholder*="first"]', '[data-testid="firstName"]', '[id*="first"]'], - 'lastName': ['#lastName', '[name="lastName"]', '[placeholder*="last"]', '[data-testid="lastName"]', '[id*="last"]'], - 'company': ['#company', '[name="company"]', '[placeholder*="company"]', '[data-testid="company"]'], - 'address1': ['#address1', '[name="address1"]', '[placeholder*="address"]', '[data-testid="address1"]', '#address'], - 'address2': ['#address2', '[name="address2"]', '[placeholder*="address2"]', '[data-testid="address2"]', '[placeholder*="apt"]'], - 'city': ['#city', '[name="city"]', '[placeholder*="city"]', '[data-testid="city"]'], - 'state': ['#state', '[name="state"]', '[placeholder*="state"]', '[data-testid="state"]'], - 'zipCode': ['#zipCode', '#zip', '[name="zip"]', '[name="zipCode"]', '[placeholder*="zip"]', '[data-testid="zipCode"]'], - 'country': ['#country', '[name="country"]', '[data-testid="country"]'], - - # Billing Address - 'billingFirstName': ['#billingFirstName', '[name="billingFirstName"]', '[data-testid="billingFirstName"]'], - 'billingLastName': ['#billingLastName', '[name="billingLastName"]', '[data-testid="billingLastName"]'], - 'billingCompany': ['#billingCompany', '[name="billingCompany"]', '[data-testid="billingCompany"]'], - 'billingAddress1': ['#billingAddress1', '[name="billingAddress1"]', '[data-testid="billingAddress1"]'], - 'billingAddress2': ['#billingAddress2', '[name="billingAddress2"]', '[data-testid="billingAddress2"]'], - 'billingCity': ['#billingCity', '[name="billingCity"]', '[data-testid="billingCity"]'], - 'billingState': ['#billingState', '[name="billingState"]', '[data-testid="billingState"]'], - 'billingZipCode': ['#billingZipCode', '[name="billingZipCode"]', '[data-testid="billingZipCode"]'], - 'billingCountry': ['#billingCountry', '[name="billingCountry"]', '[data-testid="billingCountry"]'], - - # Payment Information - 'cardNumber': ['#cardNumber', '[name="cardNumber"]', '[placeholder*="card"]', '[data-testid="cardNumber"]', '[id*="card"]'], - 'expiryDate': ['#expiryDate', '[name="expiryDate"]', '[placeholder*="expiry"]', '[data-testid="expiryDate"]', '[placeholder*="mm/yy"]'], - 'cvv': ['#cvv', '[name="cvv"]', '[placeholder*="cvv"]', '[data-testid="cvv"]', '[placeholder*="security"]'], - 'nameOnCard': ['#nameOnCard', '[name="nameOnCard"]', '[placeholder*="name on card"]', '[data-testid="nameOnCard"]'], - - # Additional Options - 'specialInstructions': ['#specialInstructions', '[name="specialInstructions"]', '[placeholder*="instructions"]', '[data-testid="specialInstructions"]', 'textarea[name*="instructions"]'] - } - - # Fill form fields with enhanced logic and better error handling - fields_filled = 0 - for field, value in checkout_info.items(): - field_filled = False - print(f"🔍 Attempting to fill field: {field}") - - for selector in form_selectors.get(field, []): - try: - element = page.query_selector(selector) - if element: - # Check if element is visible and enabled - is_visible = element.is_visible() - is_enabled = element.is_enabled() - - print(f" 🎯 Found element with selector: {selector}") - print(f" Visible: {is_visible}, Enabled: {is_enabled}") - - if is_visible and is_enabled: - # Get element tag and type safely - try: - tag_name = element.evaluate('el => el.tagName') - except: - tag_name = element.get_attribute('tagName') - - element_type = element.get_attribute('type') - - print(f" Tag: {tag_name}, Type: {element_type}") - - # Check if it's a select dropdown - if tag_name and tag_name.lower() == 'select': - element.select_option(value) - print(f"✅ Selected {field}: {value}") - field_filled = True - fields_filled += 1 - break - # Check if it's a checkbox - elif element_type and element_type.lower() == 'checkbox': - # For now, we'll leave checkboxes unchecked (billingDifferent=false, newsletter=false) - print(f"📋 Skipped checkbox {field} (keeping default state)") - field_filled = True - break - # Check if it's a radio button - elif element_type and element_type.lower() == 'radio': - element.check() - print(f"✅ Selected radio {field}: {value}") - field_filled = True - fields_filled += 1 - break - # Regular input field (text, email, tel, etc.) - else: - # Fill the field (this automatically clears and replaces content) - element.fill(value) - - # Verify the value was set - try: - filled_value = element.input_value() - if filled_value == value: - print(f"✅ Filled {field}: {value}") - field_filled = True - fields_filled += 1 - break - else: - print(f"⚠️ Value mismatch for {field}. Expected: {value}, Got: {filled_value}") - except: - # Some elements don't support input_value(), just assume it worked - print(f"✅ Filled {field}: {value} (verification skipped)") - field_filled = True - fields_filled += 1 - break - else: - print(f" Element not visible or enabled, skipping") - else: - print(f" ❌ Element not found for selector: {selector}") - - except Exception as e: - print(f" ⚠️ Error with selector {selector}: {str(e)}") - continue - - if not field_filled: - print(f"⚠️ Could not fill {field}: {value}") - # Try a more generic approach for this field - try: - # Look for any input with name, id, or placeholder containing the field name - generic_selectors = [ - f'[name*="{field.lower()}"]', - f'[id*="{field.lower()}"]', - f'[placeholder*="{field.lower()}"]', - f'input[name*="{field.lower()}"]', - f'input[id*="{field.lower()}"]' - ] - - for generic_selector in generic_selectors: - try: - generic_element = page.query_selector(generic_selector) - if generic_element and generic_element.is_visible() and generic_element.is_enabled(): - generic_element.fill(value) - print(f"✅ Filled {field} using generic selector: {generic_selector}") - field_filled = True - fields_filled += 1 - break - except: - continue - - except Exception as e: - print(f" ⚠️ Generic selector approach failed: {str(e)}") - - print(f"📊 Successfully filled {fields_filled} out of {len(checkout_info)} fields") - - # Handle special cases: Payment method selection - try: - # Try to select credit card as payment method - payment_selectors = [ - 'input[value="credit_card"]', - 'input[name="paymentMethod"][value="credit_card"]', - '[data-testid="credit-card-option"]', - 'input[type="radio"][value*="credit"]' - ] - - for selector in payment_selectors: - try: - payment_element = page.query_selector(selector) - if payment_element: - payment_element.check() - print("✅ Selected credit card payment method") - break - except: - continue - except: - print("⚠️ Could not select payment method") - - # Wait for form validation - time.sleep(3) - - - - # Look for and click submit/complete order button with comprehensive selectors - print(f"🔄 Looking for submit/complete order button...") - - submit_selectors = [ - # Primary submit buttons - 'button[type="submit"]', - 'input[type="submit"]', - - # Text-based button selectors - 'button:has-text("Complete Order")', - 'button:has-text("Place Order")', - 'button:has-text("Submit Order")', - 'button:has-text("Complete")', - 'button:has-text("Submit")', - 'button:has-text("Order")', - 'button:has-text("Purchase")', - 'button:has-text("Checkout")', - 'button:has-text("Buy Now")', - 'button:has-text("Confirm")', - - # Case-insensitive variations - 'button:has-text("COMPLETE ORDER")', - 'button:has-text("PLACE ORDER")', - 'button:has-text("SUBMIT ORDER")', - - # Data attributes and IDs - '[data-testid="submit-order"]', - '[data-testid="complete-order"]', - '[data-testid="place-order"]', - '[data-testid="checkout-submit"]', - - # Class-based selectors - '.submit-btn', - '.complete-order', - '.place-order', - '.checkout-submit', - '.order-submit', - - # ID selectors - '#submit', - '#complete-order', - '#place-order', - '#checkout-submit', - '#order-submit' - ] - - submit_clicked = False - for selector in submit_selectors: - try: - button = page.query_selector(selector) - if button and button.is_visible() and button.is_enabled(): - print(f"🎯 Found submit button: {selector}") - button.click() - print(f"✅ Successfully clicked submit button") - submit_clicked = True - break - except Exception as e: - continue - - if not submit_clicked: - print("❌ Could not find submit button, trying fallback approach...") - # Fallback: look for any button that might be a submit button - try: - all_buttons = page.query_selector_all('button, input[type="button"], input[type="submit"]') - for button in all_buttons: - try: - text = button.inner_text().lower() if button.inner_text() else '' - value = button.get_attribute('value') or '' - if any(phrase in text or phrase in value.lower() for phrase in ['submit', 'complete', 'order', 'place', 'confirm', 'buy']): - print(f"🔄 Trying fallback button: {text or value}") - button.click() - submit_clicked = True - break - except: - continue - except: - pass - - if submit_clicked: - print("✅ Order submission initiated") - - # STEP 6: Wait for redirect to order success page - print(f"⏳ Waiting for redirect to order success page...") - - success_page_reached = False - current_url = page.url - print(f"📍 Starting URL: {current_url}") - - # Strategy 1: Wait for navigation event (works for full page redirects) - try: - print("🔄 Attempting to wait for navigation event...") - page.wait_for_navigation(timeout=15000) # 15 seconds - new_url = page.url - print(f"✅ Navigation detected! New URL: {new_url}") - - # Check if the new URL is a success page - success_patterns = ['/order-success', '/success', '/confirmation', '/thank-you', '/order-complete', '/order-confirmed'] - if any(pattern in new_url.lower() for pattern in success_patterns): - success_page_reached = True - print(f"✅ Successfully redirected to order success page via navigation: {new_url}") - - except Exception as e: - print(f"⚠️ No navigation event detected: {e}") - - # Strategy 2: Wait for URL pattern match (works for client-side routing) - if not success_page_reached: - print("🔄 Attempting to wait for URL pattern match...") - success_patterns = ['**/order-success*', '**/success*', '**/confirmation*', '**/thank-you*'] - - for pattern in success_patterns: - try: - print(f" Trying pattern: {pattern}") - page.wait_for_url(pattern, timeout=10000) # 10 seconds per pattern - new_url = page.url - print(f"✅ URL pattern matched! New URL: {new_url}") - success_page_reached = True - break - except Exception as e: - print(f" Pattern {pattern} not matched: {e}") - continue - - # Strategy 3: Manual polling with DOM change detection - if not success_page_reached: - print("🔄 Falling back to manual URL polling with DOM monitoring...") - max_wait_time = 20 - wait_interval = 1 - waited_time = 0 - last_url = page.url - - while waited_time < max_wait_time and not success_page_reached: - try: - current_url = page.url - - # Check if URL changed - if current_url != last_url: - print(f"🔍 URL changed from {last_url} to {current_url}") - last_url = current_url - - # Check for success page patterns - success_patterns = ['/order-success', '/success', '/confirmation', '/thank-you', '/order-complete', '/order-confirmed'] - if any(pattern in current_url.lower() for pattern in success_patterns): - success_page_reached = True - print(f"✅ Success page detected via polling: {current_url}") - break - - # Also check DOM content for success indicators - try: - # Look for success-related text in the page - success_text_indicators = [ - 'text="Order placed successfully"', - 'text="Thank you for your order"', - 'text="Order confirmation"', - 'text="Your order has been placed"', - ':has-text("Order #")', - ':has-text("Order Number")' - ] - - for indicator in success_text_indicators: - try: - element = page.query_selector(indicator) - if element and element.is_visible(): - print(f"✅ Success page detected via DOM content: found '{indicator}'") - success_page_reached = True - break - except: - continue - - if success_page_reached: - break - - except: - pass - - print(f"🔍 Current URL (polling {waited_time}s): {current_url}") - time.sleep(wait_interval) - waited_time += wait_interval - - except Exception as e: - print(f"⚠️ Error during polling: {e}") - time.sleep(wait_interval) - waited_time += wait_interval - - if not success_page_reached: - print(f"⚠️ Could not detect order success page after all strategies") - print(f"📍 Final URL: {page.url}") - - # Still try to extract order info from current page in case it's there - print("🔄 Attempting order extraction from current page anyway...") - else: - print(f"✅ Order success page reached: {page.url}") - - # STEP 7: Extract order number from success page - print(f"🔍 Extracting order number from success page...") - - # Wait for success page to fully load - time.sleep(3) - - order_id = None - order_info = {} - - # Comprehensive order number extraction strategies - order_extraction_strategies = [ - # Strategy 1: Look for "Order #" or "Order Number" text patterns - { - 'name': 'Order # Text Pattern', - 'selectors': [ - 'span:has-text("Order #")', - 'div:has-text("Order #")', - 'p:has-text("Order #")', - 'span:has-text("Order Number")', - 'div:has-text("Order Number")', - 'p:has-text("Order Number")', - 'h1:has-text("Order")', - 'h2:has-text("Order")', - 'h3:has-text("Order")' - ], - 'regex': r'Order\s*#\s*([A-Z0-9][A-Za-z0-9-]{5,})' - }, - - # Strategy 2: Look for data attributes and IDs - { - 'name': 'Data Attributes', - 'selectors': [ - '[data-testid*="order"]', - '[data-testid*="orderNumber"]', - '[data-testid*="order-id"]', - '[id*="order-number"]', - '[id*="orderNumber"]', - '[id*="order-id"]', - '[class*="order-number"]', - '[class*="orderNumber"]', - '[class*="order-id"]' - ], - 'regex': r'([A-Za-z0-9-]+)' - }, - - # Strategy 3: Look for confirmation/success specific elements - { - 'name': 'Confirmation Elements', - 'selectors': [ - '.confirmation .order-number', - '.success .order-number', - '.order-confirmation', - '.order-summary', - '.thank-you .order-number' - ], - 'regex': r'([A-Za-z0-9-]+)' - } - ] - - # Try each extraction strategy - for strategy in order_extraction_strategies: - if order_id: - break - - print(f"🎯 Trying strategy: {strategy['name']}") - - for selector in strategy['selectors']: - try: - element = page.query_selector(selector) - if element and element.is_visible(): - text = element.inner_text().strip() - print(f"🔍 Found element with text: {text}") - - # Extract order ID using strategy's regex - order_match = re.search(strategy['regex'], text, re.IGNORECASE) - if order_match: - order_id = order_match.group(1) - print(f"✅ Extracted order ID: {order_id} (using {strategy['name']})") - - # Store additional order info - order_info = { - 'order_id': order_id, - 'extraction_method': strategy['name'], - 'selector_used': selector, - 'full_text': text, - 'success_page_url': page.url, - 'timestamp': time.strftime('%Y-%m-%d %H:%M:%S') - } - break - except Exception as e: - continue - - # Fallback: Extract from URL if still not found - if not order_id: - print("🔄 Trying URL extraction as fallback...") - try: - current_url = page.url - url_patterns = [ - r'order-success/([A-Za-z0-9-]+)', - r'order/([A-Za-z0-9-]+)', - r'success/([A-Za-z0-9-]+)', - r'confirmation/([A-Za-z0-9-]+)', - r'[?&]order[=:]([A-Za-z0-9-]+)', - r'[?&]id[=:]([A-Za-z0-9-]+)', - r'[?&]order_id[=:]([A-Za-z0-9-]+)' - ] - - for pattern in url_patterns: - url_match = re.search(pattern, current_url, re.IGNORECASE) - if url_match: - order_id = url_match.group(1) - order_info = { - 'order_id': order_id, - 'extraction_method': 'URL Pattern', - 'pattern_used': pattern, - 'success_page_url': current_url, - 'timestamp': time.strftime('%Y-%m-%d %H:%M:%S') - } - print(f"✅ Extracted order ID from URL: {order_id}") - break - except Exception as e: - print(f"⚠️ URL extraction error: {e}") - - # Final fallback: Get any text that looks like an order ID - if not order_id: - print("🔄 Final fallback: scanning entire page content...") - try: - page_content = page.content() - # Look for patterns like "ORD-20251013090947-B12E70F6", "12345", etc. - fallback_patterns = [ - r'Order\s*#\s*([A-Z0-9][A-Za-z0-9-]{5,})', - r'\b(ORD-[A-Z0-9-]+)\b', - r'\b([A-Z]{3}-[0-9]{14}-[A-Z0-9]{8})\b', - r'order[#\s:]*([A-Z0-9-]{6,})', - r'confirmation[#\s:]*([A-Z0-9-]{6,})', - r'\b([A-Z]{2,}[0-9]{4,})\b', - r'\b([0-9]{6,})\b' - ] - - for pattern in fallback_patterns: - matches = re.findall(pattern, page_content, re.IGNORECASE) - if matches: - order_id = matches[0] - order_info = { - 'order_id': order_id, - 'extraction_method': 'Page Content Scan', - 'pattern_used': pattern, - 'success_page_url': page.url, - 'timestamp': time.strftime('%Y-%m-%d %H:%M:%S') - } - print(f"✅ Extracted order ID from page content: {order_id}") - break - except Exception as e: - print(f"⚠️ Page content scan error: {e}") - - else: - print("❌ Could not submit order - no submit button found") - order_info = { - 'error': 'Could not submit order - no submit button found', - 'timestamp': time.strftime('%Y-%m-%d %H:%M:%S') - } - - # Log comprehensive results - print("\n" + "="*60) - print("🛒 CHECKOUT COMPLETION RESULTS") - print("="*60) - - if order_id: - print(f"✅ ORDER SUCCESSFULLY PLACED!") - print(f"📋 Order ID: {order_id}") - print(f"🔍 Extraction Method: {order_info.get('extraction_method', 'Unknown')}") - print(f"🌐 Success Page URL: {order_info.get('success_page_url', page.url)}") - print(f"🕒 Completed At: {order_info.get('timestamp', 'Unknown')}") - - if order_info.get('full_text'): - print(f"📝 Full Text Found: {order_info['full_text']}") - - # Store order info globally for UI display - global _order_completion_results - _order_completion_results = order_info - - else: - print("❌ Order ID: Not found") - print("🔍 Attempted all extraction strategies without success") - - # Try to get page content for debugging - try: - print("📄 Success page content (first 1000 chars):") - page_content = page.content() - print(page_content[:1000] + "..." if len(page_content) > 1000 else page_content) - except Exception as e: - print(f"⚠️ Could not retrieve page content: {e}") - - # Store error info - _order_completion_results = { - 'error': 'Order ID not found after successful form submission', - 'final_url': page.url, - 'timestamp': time.strftime('%Y-%m-%d %H:%M:%S') - } - - print("="*60) - - # Wait before closing to allow user to see results - print("⏳ Closing browser in 3 seconds...") - time.sleep(3) - - return order_id is not None, order_info - - except Exception as e: - print(f"❌ Checkout error: {e}") - - # Close the browser - try: - print("🔒 Closing browser...") - browser.close() - except Exception as e: - print(f"Error closing browser: {e}") - - # Start checkout in a separate thread - checkout_thread = threading.Thread(target=run_full_checkout, daemon=True) - checkout_thread.start() - - # Wait for the thread to complete (with timeout) - checkout_thread.join(timeout=120) # 2 minute timeout - - # Check if thread is still alive (timed out) - if checkout_thread.is_alive(): - print("⚠️ Checkout process timed out after 2 minutes") - return False, {'error': 'Checkout process timed out after 2 minutes', 'timestamp': time.strftime('%Y-%m-%d %H:%M:%S')} - - # Return results - if _order_completion_results: - if 'error' in _order_completion_results: - return False, _order_completion_results - else: - return True, _order_completion_results - else: - return False, {'error': 'Checkout process completed but no results found', 'timestamp': time.strftime('%Y-%m-%d %H:%M:%S')} - - except ImportError: - return False, {'error': 'Playwright not installed', 'timestamp': time.strftime('%Y-%m-%d %H:%M:%S')} - except Exception as e: - return False, {'error': f'Checkout error: {str(e)}', 'timestamp': time.strftime('%Y-%m-%d %H:%M:%S')} -def parse_url_components(url: str) -> tuple[str, str]: - """Parse URL to extract authority and path components""" - try: - from urllib.parse import urlparse - parsed = urlparse(url) - - # Authority is the host (and port if not default) - authority = parsed.netloc - - # Path includes the path and query parameters - path = parsed.path - if parsed.query: - path += f"?{parsed.query}" - - return authority, path - except Exception as e: - st.error(f"Error parsing URL: {str(e)}") - return "", "" +# Import from our new modules +from crypto import ( + get_ed25519_keys, + get_rsa_keys, + create_http_message_signature, + create_ed25519_signature, + parse_url_components +) +from agent import ( + launch_with_playwright, + complete_checkout_with_playwright, + get_product_extraction_results +) def main(): st.set_page_config( @@ -1312,9 +51,9 @@ def main(): ) # Initialize session state with static keys - if 'private_key' not in st.session_state: + if 'rsa_private_key' not in st.session_state: st.session_state.rsa_private_key = "" - if 'public_key' not in st.session_state: + if 'rsa_public_key' not in st.session_state: st.session_state.rsa_public_key = "" if 'ed25519_private_key' not in st.session_state: st.session_state.ed25519_private_key = "" @@ -1324,11 +63,10 @@ def main(): st.session_state.product_details = None if 'input_data' not in st.session_state: # Generate default values only once - use Ed25519 as default - import time nonce = str(uuid.uuid4()) created = int(time.time()) expires = created + 8 * 60 # 8 minutes from now - keyId = "primary-ed25519" # Default to Ed25519 since it's the default algorithm + keyId = "primary-ed25519" tag = "agent-browser-auth" # Parse URL into authority and path components @@ -1347,9 +85,12 @@ def main(): # Load static keys if not already loaded if not st.session_state.rsa_private_key or not st.session_state.rsa_public_key: - private_key, public_key = get_rsa_keys() - st.session_state.rsa_private_key = private_key - st.session_state.rsa_public_key = public_key + try: + private_key, public_key = get_rsa_keys() + st.session_state.rsa_private_key = private_key + st.session_state.rsa_public_key = public_key + except Exception as e: + st.warning(f"Could not load RSA keys: {e}") # Load Ed25519 keys if not already loaded if not st.session_state.ed25519_private_key or not st.session_state.ed25519_public_key: @@ -1357,7 +98,7 @@ def main(): ed25519_private_key, ed25519_public_key = get_ed25519_keys() st.session_state.ed25519_private_key = ed25519_private_key st.session_state.ed25519_public_key = ed25519_public_key - except ValueError: + except Exception: # Ed25519 keys not configured - will show error when trying to use them pass @@ -1384,318 +125,159 @@ def main(): # Reset button if st.button("🔄 Reset to Default JSON"): - import time nonce = str(uuid.uuid4()) created = int(time.time()) - expires = created + 8 * 60 # 8 minutes from now + expires = created + 8 * 60 - # Use algorithm-appropriate keyId if signature_algorithm == "ed25519": keyId = "primary-ed25519" else: keyId = "primary" tag = "agent-browser-auth" - - # Parse URL into authority and path components authority, path = parse_url_components(reference_url) default_input = { - "nonce": nonce, - "created": created, - "expires": expires, - "keyId": keyId, - "tag": tag, - "authority": authority, - "path": path + "nonce": nonce, "created": created, "expires": expires, + "keyId": keyId, "tag": tag, "authority": authority, "path": path } st.session_state.input_data = json.dumps(default_input, indent=2) st.rerun() - - # Action Selection st.subheader("🎯 Action Selection") action_choice = st.radio( "Choose an action:", options=["Product Details", "Checkout"], - index=0, # Default to Product Details + index=0, help="Select whether to fetch product details or complete a checkout process.", horizontal=True ) - # Update input data function that considers action choice + # Update input data function def update_input_data_with_action(): - """Update input data when signature algorithm, merchant URL, or action changes""" - import time - - # Determine keyId based on signature algorithm - if signature_algorithm == "ed25519": - keyId = "primary-ed25519" - else: - keyId = "primary" - - # Determine tag based on action choice - if action_choice == "Product Details": - tag = "agent-browser-auth" - else: # Checkout - tag = "agent-payer-auth" + if signature_algorithm == "ed25519": keyId = "primary-ed25519" + else: keyId = "primary" - # Parse URL into authority and path components + tag = "agent-browser-auth" if action_choice == "Product Details" else "agent-payer-auth" authority, path = parse_url_components(reference_url) - # Parse current input data to preserve nonce, created, expires if they exist try: current_data = json.loads(st.session_state.input_data) nonce = current_data.get('nonce', str(uuid.uuid4())) created = current_data.get('created', int(time.time())) expires = current_data.get('expires', created + 8 * 60) - except (json.JSONDecodeError, KeyError): - # If parsing fails, use defaults + except: nonce = str(uuid.uuid4()) created = int(time.time()) expires = created + 8 * 60 - # Create updated input data updated_input = { - "nonce": nonce, - "created": created, - "expires": expires, - "keyId": keyId, - "tag": tag, - "authority": authority, - "path": path + "nonce": nonce, "created": created, "expires": expires, + "keyId": keyId, "tag": tag, "authority": authority, "path": path } - return json.dumps(updated_input, indent=2) - # Check if we need to update input data expected_input_data = update_input_data_with_action() if st.session_state.input_data != expected_input_data: - # Parse both to compare just the structure, ignoring whitespace differences try: current_parsed = json.loads(st.session_state.input_data) expected_parsed = json.loads(expected_input_data) - - # Update if keyId, authority, path, or tag have changed if (current_parsed.get('keyId') != expected_parsed.get('keyId') or current_parsed.get('authority') != expected_parsed.get('authority') or current_parsed.get('path') != expected_parsed.get('path') or current_parsed.get('tag') != expected_parsed.get('tag')): st.session_state.input_data = expected_input_data st.rerun() - except json.JSONDecodeError: - # If parsing fails, update with the expected data + except: st.session_state.input_data = expected_input_data st.rerun() # Launch Section st.header("🚀 Launch") - # Check if the required keys are available for the selected algorithm + launch_disabled = False if signature_algorithm == "ed25519": if not st.session_state.ed25519_private_key: - st.warning("Please configure Ed25519 keys in your .env file first") + st.warning("Please configure Ed25519 keys in your environment first") launch_disabled = True - else: - launch_disabled = False - else: # rsa-pss-sha256 + else: if not st.session_state.rsa_private_key: - st.warning("Please configure RSA keys in your .env file first") + st.warning("Please configure RSA keys in your environment first") launch_disabled = True - else: - launch_disabled = False - # Dynamic button based on selection - if action_choice == "Product Details": - button_text = "📦 Fetch Product Details" - button_help = "Create RFC 9421 signature and fetch product details from the merchant" - tag_value = "agent-browser-auth" - else: # Checkout - button_text = "🛒 Complete Checkout" - button_help = "Create RFC 9421 signature and complete the checkout process" - tag_value = "agent-payer-auth" + button_text = "📦 Fetch Product Details" if action_choice == "Product Details" else "🛒 Complete Checkout" + tag_value = "agent-browser-auth" if action_choice == "Product Details" else "agent-payer-auth" - # Single launch button that adapts to the selected action - if st.button(button_text, type="primary", disabled=launch_disabled, help=button_help): - if st.session_state.rsa_private_key: - import time - spinner_text = f"Creating RFC 9421 signature and {'fetching product details' if action_choice == 'Product Details' else 'completing checkout'}..." + if st.button(button_text, type="primary", disabled=launch_disabled): + with st.spinner(f"Processing {action_choice}..."): + try: + parsed_json = json.loads(st.session_state.input_data) + nonce = parsed_json.get('nonce', str(uuid.uuid4())) + created = parsed_json.get('created', int(time.time())) + expires = parsed_json.get('expires', created + 8 * 60) + tag = parsed_json.get('tag', tag_value) + except: + nonce, created, expires, tag = str(uuid.uuid4()), int(time.time()), int(time.time()) + 480, tag_value - with st.spinner(spinner_text): - # Parse current input data to get signature parameters - # Parse current input data to get signature parameters - try: - parsed_json = json.loads(st.session_state.input_data) - nonce = parsed_json.get('nonce', str(uuid.uuid4())) - created = parsed_json.get('created', int(time.time())) - expires = parsed_json.get('expires', created + 8 * 60) - tag = parsed_json.get('tag', tag_value) - except json.JSONDecodeError: - st.error("Invalid JSON format in input data") - nonce = str(uuid.uuid4()) - created = int(time.time()) - expires = created + 8 * 60 - tag = tag_value - # Parse URL components for RFC 9421 - print(f"🔍 Attempting to parse URL: '{reference_url}'") - authority, path = parse_url_components(reference_url) - print(f"🔍 Parse result - Authority: '{authority}', Path: '{path}'") - - if authority and path: - # Create RFC 9421 compliant signature using selected algorithm - if signature_algorithm == "ed25519": - signature_input_header, signature_header = create_ed25519_signature( - private_key=st.session_state.ed25519_private_key, - authority=authority, - path=path, - keyid="primary-ed25519", - nonce=nonce, - created=created, - expires=expires, - tag=tag - ) - else: # rsa-pss-sha256 - signature_input_header, signature_header = create_http_message_signature( - private_key=st.session_state.rsa_private_key, - authority=authority, - path=path, - keyid="primary", - nonce=nonce, - created=created, - expires=expires, - tag=tag - ) - - print(f"Signature Algorithm: {signature_algorithm}") - print(f"Signature input String:\n{signature_input_header}") - print(f"Signature String:\n{signature_header}") + authority, path = parse_url_components(reference_url) + + if authority and path: + if signature_algorithm == "ed25519": + sig_in, sig = create_ed25519_signature( + st.session_state.ed25519_private_key, authority, path, "primary-ed25519", nonce, created, expires, tag + ) + else: + sig_in, sig = create_http_message_signature( + st.session_state.rsa_private_key, authority, path, "primary", nonce, created, expires, tag + ) - if signature_input_header and signature_header: - # Create headers for the request - headers = { - 'Signature-Input': signature_input_header, - 'Signature': signature_header - } - - if action_choice == "Product Details": - # Fetch product details - if launch_with_playwright(reference_url, headers): - st.success("✅ Product extraction started!") - st.info("🔄 Please wait a few seconds for extraction to complete, then check the Product Details section below.") - - # Start checking for results - import time - max_wait_time = 10 # seconds - check_interval = 1 # second - waited = 0 - - while waited < max_wait_time: - time.sleep(check_interval) - waited += check_interval - - global _product_extraction_results - if _product_extraction_results: - st.session_state.product_details = _product_extraction_results - st.rerun() - break - - # If we get here, extraction didn't complete in time - if not _product_extraction_results: - st.warning("⏳ Product extraction is taking longer than expected. Check the console for updates.") - else: - st.error("❌ Failed to launch browser for product extraction") - else: # Checkout - # Complete checkout process - product_url = reference_url # Use the same reference URL for the product - - # Construct cart and checkout URLs based on merchant URL - from urllib.parse import urlparse - parsed = urlparse(reference_url) - base_url = f"{parsed.scheme}://{parsed.netloc}" - cart_url = f"{base_url}/cart" - checkout_url = f"{base_url}/checkout" - - print(f"🛒 Attempting checkout with RFC 9421 signature...") - print(f"📄 Product URL: {product_url}") - print(f"🛒 Cart URL: {cart_url}") - print(f"💳 Checkout URL: {checkout_url}") - - checkout_result = complete_checkout_with_playwright(product_url, cart_url, checkout_url, headers) - success, order_info = checkout_result - - if success and order_info: - st.success("🎉 Checkout completed successfully!") - - if order_info.get('order_id'): - st.markdown("### 📋 Order Confirmation") - - col1, col2 = st.columns(2) - with col1: - st.metric("Order ID", order_info['order_id']) - st.write(f"**Completed At:** {order_info.get('timestamp', 'Unknown')}") - - with col2: - st.write(f"**Extraction Method:** {order_info.get('extraction_method', 'Unknown')}") - st.write(f"**Success Page:** [View]({order_info.get('success_page_url', '#')})") - - # Show full order details in expander - with st.expander("🔍 Full Order Details"): - st.json(order_info) - - else: - st.warning("Order was placed but order ID could not be extracted.") - if order_info: - with st.expander("Debug Information"): - st.json(order_info) - else: - st.error("❌ Checkout failed.") - if order_info and 'error' in order_info: - st.error(f"Error: {order_info['error']}") - with st.expander("Error Details"): - st.json(order_info) - elif not success: - st.error("Checkout process failed to complete successfully.") + if sig_in and sig: + headers = {'Signature-Input': sig_in, 'Signature': sig} + + if action_choice == "Product Details": + if launch_with_playwright(reference_url, headers): + st.success("✅ Product extraction started!") + # Polling for results + for _ in range(10): + time.sleep(1) + results = get_product_extraction_results() + if results: + st.session_state.product_details = results + st.rerun() else: - st.error("❌ Failed to create RFC 9421 signature") + st.error("❌ Failed to launch browser") + else: # Checkout + parsed = parse_url_components(reference_url) + # We need base URL for cart/checkout + from urllib.parse import urlparse + p = urlparse(reference_url) + base = f"{p.scheme}://{p.netloc}" + success, order_info = complete_checkout_with_playwright(reference_url, f"{base}/cart", f"{base}/checkout", headers) + + if success: + st.success("🎉 Checkout completed!") + if order_info.get('order_id'): + st.metric("Order ID", order_info['order_id']) + with st.expander("🔍 Details"): st.json(order_info) + else: + st.error(f"❌ Checkout failed: {order_info.get('error', 'Unknown error')}") else: - st.error("❌ Failed to parse URL components for signature") - + st.error("❌ Failed to create signature") + else: + st.error("❌ Failed to parse URL") # Product Details Section if st.session_state.product_details: st.header("📦 Product Details") - - # Clear button - if st.button("🗑️ Clear Product Details"): + if st.button("🗑️ Clear Details"): st.session_state.product_details = None st.rerun() - col1, col2 = st.columns(2) - - with col1: - if st.session_state.product_details.get('title'): - st.subheader("📦 Product Title") - st.write(st.session_state.product_details['title']) - else: - st.subheader("📦 Product Title") - st.write("❌ Not found") - - with col2: - if st.session_state.product_details.get('price'): - st.subheader("💰 Product Price") - st.write(st.session_state.product_details['price']) - else: - st.subheader("💰 Product Price") - st.write("❌ Not found") - - # Additional extraction details - with st.expander("🔍 Extraction Details"): - extraction_time = st.session_state.product_details.get('extraction_time', 'Unknown') - st.write(f"**Extraction Time:** {extraction_time}") - st.write(f"**URL:** {st.session_state.product_details.get('url', 'Unknown')}") - if st.session_state.product_details.get('extraction_log'): - st.text_area("Extraction Log", value=st.session_state.product_details['extraction_log'], height=150) + c1, c2 = st.columns(2) + with c1: st.subheader("Title"); st.write(st.session_state.product_details.get('title', 'Not found')) + with c2: st.subheader("Price"); st.write(st.session_state.product_details.get('price', 'Not found')) + with st.expander("🔍 Log"): st.text(st.session_state.product_details.get('extraction_log', '')) if __name__ == "__main__": main() diff --git a/tap-agent/crypto.py b/tap-agent/crypto.py new file mode 100644 index 0000000..abe4474 --- /dev/null +++ b/tap-agent/crypto.py @@ -0,0 +1,140 @@ +import base64 +import json +import typing as ty +from cryptography.hazmat.primitives import hashes, serialization +from cryptography.hazmat.primitives.asymmetric import rsa, ed25519, padding + +def get_ed25519_keys(): + """Get Ed25519 keys from environment ed25519_private.pem and ed25519_public.pem""" + with open("./ed25519_private.pem", "rb") as f: + private_key = serialization.load_pem_private_key(f.read(), password=None) + assert isinstance(private_key, ed25519.Ed25519PrivateKey) + + with open("./ed25519_public.pem", "rb") as f: + public_key = serialization.load_pem_public_key(f.read()) + assert isinstance(public_key, ed25519.Ed25519PublicKey) + + return private_key, public_key + +def get_rsa_keys(): + """Get RSA keys from environment rsa_private.pem and rsa_public.pem""" + with open("./rsa_private.pem", "rb") as f: + private_key = serialization.load_pem_private_key(f.read(), password=None) + assert isinstance(private_key, rsa.RSAPrivateKey) + + with open("./rsa_public.pem", "rb") as f: + public_key = serialization.load_pem_public_key(f.read()) + assert isinstance(public_key, rsa.RSAPublicKey) + + return private_key, public_key + +def create_http_message_signature(private_key: ty.Union[rsa.RSAPrivateKey, ed25519.Ed25519PrivateKey], authority: str, path: str, keyid: str, nonce: str, created: int, expires: int, tag: str) -> ty.Tuple[str, str]: + """Create HTTP Message Signature following RFC 9421 syntax""" + try: + # Create signature parameters string + signature_params = f'("@authority" "@path"); created={created}; expires={expires}; keyId="{keyid}"; alg="rsa-pss-sha256"; nonce="{nonce}"; tag="{tag}"' + + # Create the signature base string following RFC 9421 format + signature_base_lines = [ + f'"@authority": {authority}', + f'"@path": {path}', + f'"@signature-params": {signature_params}' + ] + signature_base = '\n'.join(signature_base_lines) + + print(f"🔐 RFC 9421 Signature Base String:\n{signature_base}") + print(f"🌐 Authority: {authority}") + print(f"📍 Path: {path}") + print(f"📋 Signature Params: {signature_params}") + + + # Sign the signature base string using RSA-PSS (matching the algorithm declared) + signature = private_key.sign( + signature_base.encode('utf-8'), + padding.PSS( + mgf=padding.MGF1(hashes.SHA256()), + salt_length=padding.PSS.MAX_LENGTH + ), + hashes.SHA256() + ) + + signature_b64 = base64.b64encode(signature).decode('utf-8') + + # Format the signature-input header (RFC 9421 format) + signature_input_header = f'sig2=("@authority" "@path"); created={created}; expires={expires}; keyId="{keyid}"; alg="rsa-pss-sha256"; nonce="{nonce}"; tag="{tag}"' + + # Format the signature header (RFC 9421 format) + signature_header = f'sig2=:{signature_b64}:' + + print(f"✅ Created RFC 9421 compliant signature") + print(f"📤 Signature-Input: {signature_input_header}") + print(f"🔒 Signature: {signature_header}") + + return signature_input_header, signature_header + + except Exception as e: + print(f"❌ Error creating HTTP message signature: {str(e)}") + return "", "" + +def create_ed25519_signature(private_key: ed25519.Ed25519PrivateKey, authority: str, path: str, keyid: str, nonce: str, created: int, expires: int, tag: str) -> ty.Tuple[str, str]: + """Create HTTP Message Signature using Ed25519 following RFC 9421""" + try: + from cryptography.hazmat.primitives.asymmetric import ed25519 + + print(f"🔐 Creating Ed25519 signature...") + print(f"🌐 Authority: {authority}") + print(f"📍 Path: {path}") + + # Create signature parameters string + signature_params = f'("@authority" "@path"); created={created}; expires={expires}; keyId="{keyid}"; alg="ed25519"; nonce="{nonce}"; tag="{tag}"' + + # Create the signature base string + signature_base_lines = [ + f'"@authority": {authority}', + f'"@path": {path}', + f'"@signature-params": {signature_params}' + ] + signature_base = '\n'.join(signature_base_lines) + + print(f"🔐 Ed25519 Signature Base String:\n{signature_base}") + + # Sign with Ed25519 (no padding needed) + signature = private_key.sign(signature_base.encode('utf-8')) + signature_b64 = base64.b64encode(signature).decode('utf-8') + + # Format headers + signature_input_header = f'sig2=("@authority" "@path"); created={created}; expires={expires}; keyId="{keyid}"; alg="ed25519"; nonce="{nonce}"; tag="{tag}"' + signature_header = f'sig2=:{signature_b64}:' + + print(f"✅ Created Ed25519 signature") + print(f"📤 Signature-Input: {signature_input_header}") + print(f"🔒 Signature: {signature_header}") + + return signature_input_header, signature_header + + except Exception as e: + print(f"❌ Error creating Ed25519 signature: {str(e)}") + return "", "" + +def parse_url_components(url: str) -> ty.Tuple[str, str]: + """Parse URL to extract authority and path components for RFC 9421""" + try: + from urllib.parse import urlparse + parsed = urlparse(url) + + # Authority is the host (and port if not default) + authority = parsed.netloc + + # Path includes the path and query parameters + path = parsed.path + if parsed.query: + path += f"?{parsed.query}" + + print(f"🔍 Parsed URL: {url}") + print(f"🌐 Authority: {authority}") + print(f"📍 Path: {path}") + + return authority, path + except Exception as e: + print(f"❌ Error parsing URL: {str(e)}") + return "", "" From f7658825b1b5288348f1afb890f45745e52a429f Mon Sep 17 00:00:00 2001 From: jL2718 Date: Wed, 17 Dec 2025 22:32:18 -0800 Subject: [PATCH 4/9] moved key generation script to top-level --- README.md | 3 +++ tap-agent/genkeys.sh => generate_keys.sh | 9 +++++++-- tap-agent/README.md | 12 +----------- 3 files changed, 11 insertions(+), 13 deletions(-) rename tap-agent/genkeys.sh => generate_keys.sh (55%) diff --git a/README.md b/README.md index 7430be2..fab10f3 100644 --- a/README.md +++ b/README.md @@ -57,6 +57,9 @@ This repository contains a complete sample implementation demonstrating the Trus 2. **Start All Services**: ```bash + # Generate keys + ./generate_keys.sh + # Terminal 1: Agent Registry (port 8001) cd agent-registry && python main.py diff --git a/tap-agent/genkeys.sh b/generate_keys.sh similarity index 55% rename from tap-agent/genkeys.sh rename to generate_keys.sh index 210ef34..e091f37 100755 --- a/tap-agent/genkeys.sh +++ b/generate_keys.sh @@ -9,5 +9,10 @@ openssl genpkey -algorithm Ed25519 -out ed25519_private.pem openssl pkey -in ed25519_private.pem -pubout -out ed25519_public.pem # Copy public keys to merchant-backend/ -cp ./ed25519_public.pem ../merchant-backend/ed25519_public.pem -cp ./rsa_public.pem ../merchant-backend/rsa_public.pem +cp ed25519_public.pem rsa_public.pem merchant-backend/ + +# Copy public and private keys to agent-app/ +cp ed25519_public.pem ed25519_private.pem rsa_public.pem rsa_private.pem tap-agent/ + +# Remove private keys from top-level directory +rm rsa_private.pem rsa_public.pem ed25519_private.pem ed25519_public.pem \ No newline at end of file diff --git a/tap-agent/README.md b/tap-agent/README.md index 6a0fadf..298842d 100644 --- a/tap-agent/README.md +++ b/tap-agent/README.md @@ -4,17 +4,7 @@ A sophisticated Streamlit web application for testing RFC 9421 HTTP Message Sign ## Environment Configuration -Create a `.env` file in the root directory with the following variables: - -```bash -# RSA Keys (for RSA-PSS-SHA256 signatures) -RSA_PRIVATE_KEY="-----BEGIN PRIVATE KEY-----\n...\n-----END PRIVATE KEY-----" -RSA_PUBLIC_KEY="-----BEGIN PUBLIC KEY-----\n...\n-----END PUBLIC KEY-----" - -# Ed25519 Keys (for Ed25519 signatures) -ED25519_PRIVATE_KEY="base64_encoded_private_key" -ED25519_PUBLIC_KEY="base64_encoded_public_key" -``` +The `generate_keys.sh` script will generate a set of keys for the agent. ## Features From cccecedd67644b1f69a823bcf8a394a42644f1e7 Mon Sep 17 00:00:00 2001 From: jL2718 Date: Wed, 17 Dec 2025 22:50:39 -0800 Subject: [PATCH 5/9] remove keys from .env example --- tap-agent/.env.example | 10 ---------- 1 file changed, 10 deletions(-) diff --git a/tap-agent/.env.example b/tap-agent/.env.example index b4e48ed..5ce18d6 100644 --- a/tap-agent/.env.example +++ b/tap-agent/.env.example @@ -9,13 +9,3 @@ AGENT_PORT=8501 # Debug Configuration DEBUG=true - -# RSA Key Pair for HTTP Message Signatures (RFC 9421) -# Generate your own RSA key pair and replace these values -RSA_PRIVATE_KEY= -RSA_PUBLIC_KEY = "" - -# Ed25519 Key Pair for HTTP Message Signatures (RFC 9421) -# Static keys for Ed25519 algorithm -ED25519_PRIVATE_KEY = "" -ED25519_PUBLIC_KEY = "" From c6a1facb774543197f3c4af67ff9317b1ca9c13f Mon Sep 17 00:00:00 2001 From: jL2718 Date: Wed, 17 Dec 2025 23:47:06 -0800 Subject: [PATCH 6/9] added generated keys to registry sample data populator --- agent-registry/populate_sample_data.py | 48 ++++++++++++++++++++++---- generate_keys.sh | 5 ++- 2 files changed, 45 insertions(+), 8 deletions(-) diff --git a/agent-registry/populate_sample_data.py b/agent-registry/populate_sample_data.py index 4934f47..ec28a7d 100644 --- a/agent-registry/populate_sample_data.py +++ b/agent-registry/populate_sample_data.py @@ -13,6 +13,8 @@ import requests import json +import base64 +from cryptography.hazmat.primitives import serialization # Agent Registry Service URL BASE_URL = "http://localhost:9002" @@ -29,13 +31,13 @@ { "key_id": "primary", "public_key": """-----BEGIN PUBLIC KEY----- -MIIBIjANBgkqhkiG9w0BAQEFAAOCAQ8AMIIBCgKCAQEAoG2JyN6sWH0BSze3C8iK -6u6q7+0wo5ybcFX1kKquBDCLIKqY1hqvtVmj9wTGpCXQ2Jt8PtXXnSOhj69ng3mc -ypJjf72GyKrgHX+nYxcQrnrPXNDaDrhLVtxDsoGIwyVTiUGH5bX2qlIerwlfG9Jz -24HabfGSs6wpxXlfSt29giljSbX78g+Rb9TEV3joZjSQIn68iaKU147uVpv2JhCA -88X9l7fKMUSKDbiyhLRpDjutHrns8NYALPSyRLN645+Hcl7so+AWb9CR8+bdgBUq -GHYlyRsMdsQENFDDFS35M4oz/5MeXj+sIAWrq2ceI0LBttCH6cOcX/r1VpSqoUc1 -dQIDAQAB +MIIBIjANBgkqhkiG9w0BAQEFAAOCAQ8AMIIBCgKCAQEAx4rNYC4ChLr3xvR9K5a9 +JGFksUYbVqB2XfVF6QHlU9W0vONUjunKo1V56WkJWxY/diolG0847NTj2y0ksnZv +18e/AuPex4blylGuRXSBOoxsTkN6szMeFB1vsrKqrDqCGxOP0cHeW6biCZbOhQpE +tjWVwEXsJMzbKIYwhx35UrD1RjfRCdsFP6ery/tBu0jnTN47pWQWBq200JXZRpSR +0MfPDeIdQzvTUrHnxIbfGdsBixHvZXnw8Fa3lG9ol5l9QYpi8MN3qoVM4rT4ANcW +FkVaOOsJYd863Bqy3X9vaIUpDqJS0HoOq8l8X26HGtX2LinFi7hcPGZWiznj03v2 +ywIDAQAB -----END PUBLIC KEY-----""", "algorithm": "RSA-SHA256", "description": "Primary signing key for Test Agent 1", @@ -91,6 +93,35 @@ } ] +def make_agent_from_PEM_files(): + with open("ed25519_public.pem", "rb") as f: + ed25519_public_key= serialization.load_pem_public_key(f.read()) + ed25519_public_key_b64 = base64.b64encode(ed25519_public_key.public_bytes_raw()).decode("utf-8") + rsa_public_key = open("rsa_public.pem", "r").read() + return { + "name": "My TAP Agent", + "domain": "https://tapagent.com", + "description": "Official Tap Agent 1 service for merchant verification", + "contact_email": "support@tapagent.com", + "is_active": "true", + "keys": [ + { + "key_id": "primary", + "public_key": rsa_public_key, + "algorithm": "RSA-SHA256", + "description": "Primary signing key for Test Agent 1", + "is_active": "true" + }, + { + "key_id": "primary-ed25519", + "public_key": ed25519_public_key_b64, + "algorithm": "ed25519", + "description": "Primary Ed25519 signing key for modern crypto", + "is_active": "true" + } + ] + } + def register_agent(agent_data): """Register a single agent""" try: @@ -130,6 +161,9 @@ def main(): # Register each agent for agent in sample_agents: register_agent(agent) + # Register the agent from the PEM files + register_agent(make_agent_from_PEM_files()) + print("🏁 Sample data population completed!") diff --git a/generate_keys.sh b/generate_keys.sh index e091f37..00af06b 100755 --- a/generate_keys.sh +++ b/generate_keys.sh @@ -11,7 +11,10 @@ openssl pkey -in ed25519_private.pem -pubout -out ed25519_public.pem # Copy public keys to merchant-backend/ cp ed25519_public.pem rsa_public.pem merchant-backend/ -# Copy public and private keys to agent-app/ +# Copy public keys to agent-registry/ +cp ed25519_public.pem rsa_public.pem agent-registry/ + +# Copy public and private keys to tap-agent/ cp ed25519_public.pem ed25519_private.pem rsa_public.pem rsa_private.pem tap-agent/ # Remove private keys from top-level directory From 45e08f42b9be3a5269db65577e01bc60ceb7fde9 Mon Sep 17 00:00:00 2001 From: jL2718 Date: Wed, 17 Dec 2025 23:53:30 -0800 Subject: [PATCH 7/9] tell users to populate registry --- README.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/README.md b/README.md index fab10f3..646c756 100644 --- a/README.md +++ b/README.md @@ -61,7 +61,7 @@ This repository contains a complete sample implementation demonstrating the Trus ./generate_keys.sh # Terminal 1: Agent Registry (port 8001) - cd agent-registry && python main.py + cd agent-registry && python main.py && python populate_sample_data.py # Terminal 2: Merchant Backend (port 8000) cd merchant-backend && python -m uvicorn app.main:app --reload From fffc37df0c1c15ab7c26554d9470b4f22a0f950c Mon Sep 17 00:00:00 2001 From: jL2718 Date: Wed, 17 Dec 2025 23:56:17 -0800 Subject: [PATCH 8/9] ignore keys --- .gitignore | 3 +++ 1 file changed, 3 insertions(+) diff --git a/.gitignore b/.gitignore index f9b1a91..53e8720 100644 --- a/.gitignore +++ b/.gitignore @@ -1,3 +1,6 @@ +# project keys +*.pem + # IDE specific files: .idea *.iml From f3d4f478e931043cba1f9fce3037d04ebbae1205 Mon Sep 17 00:00:00 2001 From: jL2718 Date: Thu, 18 Dec 2025 00:12:49 -0800 Subject: [PATCH 9/9] fix agent-registry start command --- README.md | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/README.md b/README.md index 646c756..f4cfc1a 100644 --- a/README.md +++ b/README.md @@ -60,8 +60,8 @@ This repository contains a complete sample implementation demonstrating the Trus # Generate keys ./generate_keys.sh - # Terminal 1: Agent Registry (port 8001) - cd agent-registry && python main.py && python populate_sample_data.py + # Terminal 1: Agent Registry (port 9002) + cd agent-registry && (sleep 10 && python populate_sample_data.py) & python main.py # Terminal 2: Merchant Backend (port 8000) cd merchant-backend && python -m uvicorn app.main:app --reload