diff --git a/.env.example b/.env.example new file mode 100644 index 0000000..e11937a --- /dev/null +++ b/.env.example @@ -0,0 +1,19 @@ +# Open WebUI Benchmark Configuration +# Copy this file to .env and configure for your environment + +# Base URL of the Open WebUI instance to benchmark +OPEN_WEBUI_URL=http://localhost:8080 + +# Admin user credentials (REQUIRED - must exist in Open WebUI) +# The admin account is used to: +# - Create test channels +# - Create temporary benchmark users dynamically +# - Clean up benchmark users after tests +ADMIN_USER_EMAIL=admin@example.com +ADMIN_USER_PASSWORD=adminpassword123 + +# Benchmark settings (optional - defaults shown) +MAX_CONCURRENT_USERS=50 +USER_STEP_SIZE=10 +SUSTAIN_TIME_SECONDS=30 +MESSAGE_FREQUENCY=0.5 diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..10706ae --- /dev/null +++ b/.gitignore @@ -0,0 +1,50 @@ +# Benchmark results directory +results/ + +# Python cache +__pycache__/ +*.py[cod] +*$py.class +*.so + +# Distribution / packaging +.Python +build/ +develop-eggs/ +dist/ +downloads/ +eggs/ +.eggs/ +lib/ +lib64/ +parts/ +sdist/ +var/ +wheels/ +*.egg-info/ +.installed.cfg +*.egg + +# Virtual environments +venv/ +ENV/ +env/ +.venv/ +**/.env + +# IDE +.idea/ +.vscode/ +*.swp +*.swo + +# Logs +logs/ +*.log + +# Docker +.docker/ + +# OS +.DS_Store +Thumbs.db diff --git a/README.md b/README.md new file mode 100644 index 0000000..9406672 --- /dev/null +++ b/README.md @@ -0,0 +1,300 @@ +# Open WebUI Benchmark Suite + +A comprehensive benchmarking framework for testing Open WebUI performance under various load conditions. + +## Overview + +This benchmark suite is designed to: + +1. **Measure concurrent user capacity** - Test how many users can simultaneously use features like Channels +2. **Identify performance limits** - Find the point where response times degrade +3. **Compare compute profiles** - Test performance across different resource configurations +4. **Generate actionable reports** - Provide detailed metrics and recommendations + +## Quick Start + +### Prerequisites + +- Python 3.11+ +- Docker and Docker Compose +- A running Open WebUI instance (or use the provided Docker setup) + +### Installation + +```bash +cd benchmark +python -m venv .venv +source .venv/bin/activate # On Windows: .venv\Scripts\activate +pip install -e . +``` + +### Configuration + +Copy the example environment file and configure your admin credentials: + +```bash +cp .env.example .env +``` + +Edit `.env` with your Open WebUI admin credentials: + +```dotenv +OPEN_WEBUI_URL=http://localhost:8080 +ADMIN_USER_EMAIL=your-admin@example.com +ADMIN_USER_PASSWORD=your-password +``` + +### Running Benchmarks + +1. **Start Open WebUI with benchmark configuration:** + +```bash +cd docker +./run.sh default # Use the default compute profile (2 CPU, 8GB RAM) +``` + +2. **Run the benchmark:** + +```bash +# Run all benchmarks +owb run all + +# Run only channel concurrency benchmark +owb run channels -m 50 # Test up to 50 concurrent users + +# Run with a specific target URL +owb run channels -u http://localhost:3000 + +# Run with a specific compute profile +owb run channels -p cloud_medium +``` + +3. **View results:** + +Results are saved to the `results/` directory in JSON and CSV formats. + +## Compute Profiles + +Compute profiles define the resource constraints for the Open WebUI container: + +| Profile | CPUs | Memory | Use Case | +|---------|------|--------|----------| +| `default` | 2 | 8GB | Local MacBook testing | +| `minimal` | 1 | 4GB | Testing lower bounds | +| `cloud_small` | 2 | 4GB | Small cloud VM | +| `cloud_medium` | 4 | 8GB | Medium cloud VM | +| `cloud_large` | 8 | 16GB | Large cloud VM | + +List available profiles: + +```bash +owb profiles +``` + +## Available Benchmarks + +### Channel Concurrency (`channels`) + +Tests concurrent user capacity in Open WebUI Channels: + +- Creates a test channel +- Progressively adds users (10, 20, 30, ... up to max) +- Each user sends messages at a configured rate +- Measures response times and error rates +- Identifies the maximum sustainable user count + +**Configuration options:** + +```yaml +channels: + max_concurrent_users: 100 # Maximum users to test + user_step_size: 10 # Increment users by this amount + sustain_time: 30 # Seconds to run at each level + message_frequency: 0.5 # Messages per second per user +``` + +### Channel WebSocket (`channels-ws`) + +Tests WebSocket scalability for real-time message delivery. + +## Configuration + +Configuration files are located in `config/`: + +- `benchmark_config.yaml` - Main benchmark settings +- `compute_profiles.yaml` - Resource profiles for Docker containers + +### Environment Variables + +| Variable | Description | Default | +|----------|-------------|---------| +| `OPEN_WEBUI_URL` | Open WebUI URL for benchmarking | `http://localhost:8080` | +| `ADMIN_USER_EMAIL` | Admin user email (required) | - | +| `ADMIN_USER_PASSWORD` | Admin user password (required) | - | +| `OPEN_WEBUI_PORT` | Port for Docker container | `8080` | +| `CPU_LIMIT` | CPU limit for container | `2.0` | +| `MEMORY_LIMIT` | Memory limit for container | `8g` | + +## Extending the Benchmark Suite + +### Adding a New Benchmark + +1. Create a new file in `benchmark/scenarios/`: + +```python +from benchmark.core.base import BaseBenchmark +from benchmark.core.metrics import BenchmarkResult + +class MyNewBenchmark(BaseBenchmark): + name = "My New Benchmark" + description = "Tests something new" + version = "1.0.0" + + async def setup(self) -> None: + # Set up test environment + pass + + async def run(self) -> BenchmarkResult: + # Execute the benchmark + # Use self.metrics to record timings + return self.metrics.get_result(self.name) + + async def teardown(self) -> None: + # Clean up + pass +``` + +2. Register the benchmark in `benchmark/cli.py` + +3. Add configuration options if needed in `config/benchmark_config.yaml` + +### Custom Metrics Collection + +```python +from benchmark.core.metrics import MetricsCollector + +metrics = MetricsCollector() +metrics.start() + +# Time individual operations +with metrics.time_operation("my_operation"): + await do_something() + +# Or record manually +metrics.record_timing( + operation="api_call", + duration_ms=150.5, + success=True, +) + +metrics.stop() +result = metrics.get_result("My Benchmark") +``` + +## Understanding Results + +### Key Metrics + +| Metric | Description | Good Threshold | +|--------|-------------|----------------| +| `avg_response_time_ms` | Average response time | < 2000ms | +| `p95_response_time_ms` | 95th percentile response time | < 3000ms | +| `error_rate_percent` | Percentage of failed requests | < 1% | +| `requests_per_second` | Throughput | > 10 | + +### Result Files + +- `*.json` - Detailed results for each benchmark run +- `benchmark_results_*.csv` - Combined results in CSV format +- `summary_*.txt` - Human-readable summary + +### Interpreting Channel Benchmark Results + +The channel benchmark reports: + +- **max_sustainable_users**: Maximum users where performance thresholds are met +- **results_by_level**: Performance at each user count level +- **tested_levels**: All user counts that were tested + +Example result analysis: + +``` +Users: 10 | P95: 150ms | Errors: 0% | ✓ PASS +Users: 20 | P95: 280ms | Errors: 0.1% | ✓ PASS +Users: 30 | P95: 520ms | Errors: 0.3% | ✓ PASS +Users: 40 | P95: 1200ms | Errors: 0.8% | ✓ PASS +Users: 50 | P95: 3500ms | Errors: 2.1% | ✗ FAIL + +Maximum sustainable users: 40 +``` + +## Architecture + +``` +benchmark/ +├── benchmark/ +│ ├── core/ # Core framework +│ │ ├── base.py # Base benchmark class +│ │ ├── config.py # Configuration management +│ │ ├── metrics.py # Metrics collection +│ │ └── runner.py # Benchmark orchestration +│ ├── clients/ # API clients +│ │ ├── http_client.py # HTTP/REST client +│ │ └── websocket_client.py # WebSocket client +│ ├── scenarios/ # Benchmark implementations +│ │ └── channels.py # Channel benchmarks +│ ├── utils/ # Utilities +│ │ └── docker.py # Docker management +│ └── cli.py # Command-line interface +├── config/ # Configuration files +├── docker/ # Docker Compose for benchmarking +└── results/ # Benchmark output (gitignored) +``` + +## Dependencies + +The benchmark suite reuses Open WebUI dependencies where possible: + +**From Open WebUI:** +- `httpx` - HTTP client +- `aiohttp` - Async HTTP +- `python-socketio` - WebSocket client +- `pydantic` - Data validation +- `pandas` - Data analysis + +**Benchmark-specific:** +- `locust` - Load testing (optional, for advanced scenarios) +- `rich` - Terminal output +- `docker` - Docker SDK +- `matplotlib` - Plotting results + +## Troubleshooting + +### Common Issues + +1. **Connection refused**: Ensure Open WebUI is running and accessible +2. **Authentication errors**: Check admin credentials in config +3. **Docker resource errors**: Ensure Docker has enough resources allocated +4. **WebSocket timeout**: Increase `websocket_timeout` in config + +### Debug Mode + +Set logging level to DEBUG: + +```bash +export BENCHMARK_LOG_LEVEL=DEBUG +owb run channels +``` + +## Contributing + +When adding new benchmarks: + +1. Follow the `BaseBenchmark` interface +2. Add tests for the new benchmark +3. Update configuration schema if needed +4. Add documentation to this README + +## License + +MIT License - See LICENSE file diff --git a/benchmark/__init__.py b/benchmark/__init__.py new file mode 100644 index 0000000..c724748 --- /dev/null +++ b/benchmark/__init__.py @@ -0,0 +1,19 @@ +""" +Open WebUI Benchmark Suite + +A comprehensive benchmarking framework for testing Open WebUI performance +under various load conditions. +""" + +__version__ = "0.1.0" +__author__ = "Open WebUI Benchmark Team" + +from benchmark.core.runner import BenchmarkRunner +from benchmark.core.config import BenchmarkConfig +from benchmark.core.metrics import MetricsCollector + +__all__ = [ + "BenchmarkRunner", + "BenchmarkConfig", + "MetricsCollector", +] diff --git a/benchmark/cli.py b/benchmark/cli.py new file mode 100644 index 0000000..894de70 --- /dev/null +++ b/benchmark/cli.py @@ -0,0 +1,231 @@ +""" +Command-line interface for Open WebUI Benchmark. + +Provides commands for running benchmarks, managing compute profiles, +and analyzing results. +""" + +import asyncio +import sys +from pathlib import Path +from typing import Optional + +from rich.console import Console +from rich.panel import Panel + +from benchmark.core.config import load_config, ConfigLoader +from benchmark.core.runner import BenchmarkRunner +from benchmark.scenarios.channels import ChannelConcurrencyBenchmark, ChannelWebSocketBenchmark + + +console = Console() + + +def print_banner(): + """Print the benchmark suite banner.""" + banner = """ +╔═══════════════════════════════════════════════════════════╗ +║ Open WebUI Benchmark Suite v0.1.0 ║ +║ Testing performance at scale ║ +╚═══════════════════════════════════════════════════════════╝ + """ + console.print(banner, style="bold blue") + + +def list_profiles(): + """List available compute profiles.""" + loader = ConfigLoader() + profiles = loader.load_compute_profiles() + + console.print("\n[bold]Available Compute Profiles:[/bold]\n") + + for profile_id, profile in profiles.items(): + console.print(f" [cyan]{profile_id}[/cyan]") + console.print(f" Name: {profile.name}") + console.print(f" Description: {profile.description}") + console.print(f" Resources: {profile.resources.cpus} CPUs, {profile.resources.memory} RAM") + console.print() + + +def list_benchmarks(): + """List available benchmarks.""" + console.print("\n[bold]Available Benchmarks:[/bold]\n") + + benchmarks = [ + ("channels", ChannelConcurrencyBenchmark), + ("channels-ws", ChannelWebSocketBenchmark), + ] + + for cmd, benchmark_class in benchmarks: + console.print(f" [cyan]{cmd}[/cyan]") + console.print(f" Name: {benchmark_class.name}") + console.print(f" Description: {benchmark_class.description}") + console.print() + + +async def run_channel_benchmark( + profile_id: str = "default", + target_url: Optional[str] = None, + max_users: Optional[int] = None, + step_size: Optional[int] = None, + output_dir: Optional[str] = None, +): + """Run the channel concurrency benchmark.""" + # Load config with overrides + overrides = {} + if target_url: + overrides["target_url"] = target_url + + config = load_config(profile_id, overrides=overrides) + + if max_users: + config.channels.max_concurrent_users = max_users + + if step_size: + config.channels.user_step_size = step_size + + # Auto-adjust step size if larger than max users + if config.channels.user_step_size > config.channels.max_concurrent_users: + config.channels.user_step_size = config.channels.max_concurrent_users + + # Create runner + runner = BenchmarkRunner( + config=config, + profile_id=profile_id, + output_dir=Path(output_dir) if output_dir else None, + ) + + # Run benchmark + result = await runner.run_benchmark(ChannelConcurrencyBenchmark) + runner.display_final_summary() + + return result + + +async def run_all_benchmarks( + profile_id: str = "default", + target_url: Optional[str] = None, + output_dir: Optional[str] = None, +): + """Run all available benchmarks.""" + # Load config with overrides + overrides = {} + if target_url: + overrides["target_url"] = target_url + + config = load_config(profile_id, overrides=overrides) + + # Create runner and register all benchmarks + runner = BenchmarkRunner( + config=config, + profile_id=profile_id, + output_dir=Path(output_dir) if output_dir else None, + ) + + runner.register_benchmark(ChannelConcurrencyBenchmark) + + # Run all benchmarks + results = await runner.run_all() + runner.display_final_summary() + + return results + + +def main(): + """Main entry point for CLI.""" + import argparse + + print_banner() + + parser = argparse.ArgumentParser( + description="Open WebUI Benchmark Suite", + formatter_class=argparse.RawDescriptionHelpFormatter, + ) + + subparsers = parser.add_subparsers(dest="command", help="Available commands") + + # List profiles command + list_profiles_parser = subparsers.add_parser( + "profiles", + help="List available compute profiles", + ) + + # List benchmarks command + list_benchmarks_parser = subparsers.add_parser( + "list", + help="List available benchmarks", + ) + + # Run command + run_parser = subparsers.add_parser( + "run", + help="Run benchmarks", + ) + run_parser.add_argument( + "benchmark", + nargs="?", + default="all", + choices=["all", "channels", "channels-ws"], + help="Benchmark to run (default: all)", + ) + run_parser.add_argument( + "-p", "--profile", + default="default", + help="Compute profile to use (default: default)", + ) + run_parser.add_argument( + "-u", "--url", + help="Target Open WebUI URL (default: from config)", + ) + run_parser.add_argument( + "-m", "--max-users", + type=int, + help="Maximum concurrent users to test", + ) + run_parser.add_argument( + "-s", "--step-size", + type=int, + help="User increment step size (default: 10)", + ) + run_parser.add_argument( + "-o", "--output", + help="Output directory for results", + ) + + args = parser.parse_args() + + if args.command == "profiles": + list_profiles() + elif args.command == "list": + list_benchmarks() + elif args.command == "run": + try: + if args.benchmark == "all": + asyncio.run(run_all_benchmarks( + profile_id=args.profile, + target_url=args.url, + output_dir=args.output, + )) + elif args.benchmark == "channels": + asyncio.run(run_channel_benchmark( + profile_id=args.profile, + target_url=args.url, + max_users=args.max_users, + step_size=args.step_size, + output_dir=args.output, + )) + else: + console.print(f"[red]Unknown benchmark: {args.benchmark}[/red]") + sys.exit(1) + except KeyboardInterrupt: + console.print("\n[yellow]Benchmark interrupted[/yellow]") + sys.exit(1) + except Exception as e: + console.print(f"\n[red]Error: {e}[/red]") + sys.exit(1) + else: + parser.print_help() + + +if __name__ == "__main__": + main() diff --git a/benchmark/clients/__init__.py b/benchmark/clients/__init__.py new file mode 100644 index 0000000..fa0d230 --- /dev/null +++ b/benchmark/clients/__init__.py @@ -0,0 +1,13 @@ +""" +Client utilities for benchmarking. + +Provides HTTP and WebSocket client wrappers for interacting with Open WebUI. +""" + +from benchmark.clients.http_client import OpenWebUIClient +from benchmark.clients.websocket_client import WebSocketClient + +__all__ = [ + "OpenWebUIClient", + "WebSocketClient", +] diff --git a/benchmark/clients/http_client.py b/benchmark/clients/http_client.py new file mode 100644 index 0000000..559169f --- /dev/null +++ b/benchmark/clients/http_client.py @@ -0,0 +1,709 @@ +""" +HTTP client for interacting with Open WebUI API. + +Provides async HTTP methods for authentication, channel management, and messaging. +""" + +import asyncio +from typing import Optional, Dict, Any, List, Callable +from dataclasses import dataclass +import httpx + + +@dataclass +class User: + """Represents an authenticated user.""" + id: str + email: str + name: str + role: str + token: str + + +class OpenWebUIClient: + """ + Async HTTP client for Open WebUI API. + + Handles authentication and provides methods for common API operations. + """ + + def __init__( + self, + base_url: str, + timeout: float = 30.0, + ): + """ + Initialize the client. + + Args: + base_url: Base URL of the Open WebUI instance + timeout: Request timeout in seconds + """ + self.base_url = base_url.rstrip('/') + self.timeout = timeout + self._client: Optional[httpx.AsyncClient] = None + self._token: Optional[str] = None + self._user: Optional[User] = None + + async def __aenter__(self): + """Async context manager entry.""" + await self.connect() + return self + + async def __aexit__(self, exc_type, exc_val, exc_tb): + """Async context manager exit.""" + await self.close() + + async def connect(self) -> None: + """Initialize the HTTP client.""" + self._client = httpx.AsyncClient( + base_url=self.base_url, + timeout=self.timeout, + follow_redirects=True, + ) + + async def close(self) -> None: + """Close the HTTP client.""" + if self._client: + await self._client.aclose() + self._client = None + + @property + def client(self) -> httpx.AsyncClient: + """Get the HTTP client, ensuring it's initialized.""" + if self._client is None: + raise RuntimeError("Client not connected. Call connect() first or use async context manager.") + return self._client + + @property + def headers(self) -> Dict[str, str]: + """Get headers including authorization if authenticated.""" + headers = { + "Content-Type": "application/json", + "Accept": "application/json", + } + if self._token: + headers["Authorization"] = f"Bearer {self._token}" + return headers + + @property + def user(self) -> Optional[User]: + """Get the current authenticated user.""" + return self._user + + @property + def token(self) -> Optional[str]: + """Get the current authentication token.""" + return self._token + + # ==================== Authentication ==================== + + async def signup( + self, + email: str, + password: str, + name: str, + ) -> User: + """ + Create a new user account. + + Args: + email: User email + password: User password + name: User display name + + Returns: + User object with authentication token + """ + response = await self.client.post( + "/api/v1/auths/signup", + json={ + "email": email, + "password": password, + "name": name, + }, + headers={"Content-Type": "application/json"}, + ) + response.raise_for_status() + + data = response.json() + self._token = data.get("token") + self._user = User( + id=data.get("id"), + email=email, + name=name, + role=data.get("role", "user"), + token=self._token, + ) + + return self._user + + async def signin(self, email: str, password: str) -> User: + """ + Authenticate an existing user. + + Args: + email: User email + password: User password + + Returns: + User object with authentication token + """ + response = await self.client.post( + "/api/v1/auths/signin", + json={ + "email": email, + "password": password, + }, + headers={"Content-Type": "application/json"}, + ) + response.raise_for_status() + + data = response.json() + self._token = data.get("token") + self._user = User( + id=data.get("id"), + email=email, + name=data.get("name", ""), + role=data.get("role", "user"), + token=self._token, + ) + + return self._user + + async def get_current_user(self) -> Dict[str, Any]: + """Get the current authenticated user's information.""" + response = await self.client.get( + "/api/v1/auths/", + headers=self.headers, + ) + response.raise_for_status() + return response.json() + + # ==================== Admin User Management ==================== + + async def admin_create_user( + self, + email: str, + password: str, + name: str, + role: str = "user", + ) -> Dict[str, Any]: + """ + Create a new user as admin. + + Requires the client to be authenticated as an admin user. + + Args: + email: User email + password: User password + name: User display name + role: User role (default: "user") + + Returns: + Created user data including token + """ + response = await self.client.post( + "/api/v1/auths/add", + json={ + "email": email, + "password": password, + "name": name, + "role": role, + }, + headers=self.headers, + ) + response.raise_for_status() + return response.json() + + async def admin_delete_user(self, user_id: str) -> bool: + """ + Delete a user as admin. + + Requires the client to be authenticated as an admin user. + + Args: + user_id: ID of the user to delete + + Returns: + True if successful + """ + response = await self.client.delete( + f"/api/v1/users/{user_id}", + headers=self.headers, + ) + response.raise_for_status() + return True + return response.json() + + # ==================== Channels ==================== + + async def get_channels(self) -> List[Dict[str, Any]]: + """ + Get list of channels accessible to the current user. + + Returns: + List of channel dictionaries + """ + response = await self.client.get( + "/api/v1/channels/", + headers=self.headers, + ) + response.raise_for_status() + return response.json() + + async def create_channel( + self, + name: str, + description: Optional[str] = None, + access_control: Optional[Dict] = None, + ) -> Dict[str, Any]: + """ + Create a new channel (admin only). + + Args: + name: Channel name + description: Optional channel description + access_control: Optional access control settings + + Returns: + Created channel dictionary + """ + payload = { + "name": name, + } + if description: + payload["description"] = description + if access_control: + payload["access_control"] = access_control + + response = await self.client.post( + "/api/v1/channels/create", + json=payload, + headers=self.headers, + ) + response.raise_for_status() + return response.json() + + async def get_channel(self, channel_id: str) -> Dict[str, Any]: + """ + Get a specific channel by ID. + + Args: + channel_id: Channel ID + + Returns: + Channel dictionary + """ + response = await self.client.get( + f"/api/v1/channels/{channel_id}", + headers=self.headers, + ) + response.raise_for_status() + return response.json() + + async def delete_channel(self, channel_id: str) -> bool: + """ + Delete a channel (admin only). + + Args: + channel_id: Channel ID + + Returns: + True if successful + """ + response = await self.client.delete( + f"/api/v1/channels/{channel_id}/delete", + headers=self.headers, + ) + response.raise_for_status() + return True + + # ==================== Messages ==================== + + async def get_channel_messages( + self, + channel_id: str, + skip: int = 0, + limit: int = 50, + ) -> List[Dict[str, Any]]: + """ + Get messages from a channel. + + Args: + channel_id: Channel ID + skip: Number of messages to skip + limit: Maximum number of messages to return + + Returns: + List of message dictionaries + """ + response = await self.client.get( + f"/api/v1/channels/{channel_id}/messages", + params={"skip": skip, "limit": limit}, + headers=self.headers, + ) + response.raise_for_status() + return response.json() + + async def post_message( + self, + channel_id: str, + content: str, + parent_id: Optional[str] = None, + data: Optional[Dict] = None, + ) -> Dict[str, Any]: + """ + Post a message to a channel. + + Args: + channel_id: Channel ID + content: Message content + parent_id: Optional parent message ID for threads + data: Optional additional message data + + Returns: + Created message dictionary + """ + payload = { + "content": content, + } + if parent_id: + payload["parent_id"] = parent_id + if data: + payload["data"] = data + + response = await self.client.post( + f"/api/v1/channels/{channel_id}/messages/post", + json=payload, + headers=self.headers, + ) + response.raise_for_status() + return response.json() + + # ==================== Health Check ==================== + + async def health_check(self) -> bool: + """ + Check if the Open WebUI instance is healthy. + + Returns: + True if healthy + """ + try: + response = await self.client.get("/health") + return response.status_code == 200 + except Exception: + return False + + async def wait_for_ready(self, timeout: float = 60.0, interval: float = 2.0) -> bool: + """ + Wait for the Open WebUI instance to become ready. + + Args: + timeout: Maximum time to wait in seconds + interval: Time between checks in seconds + + Returns: + True if ready, False if timeout + """ + elapsed = 0.0 + while elapsed < timeout: + if await self.health_check(): + return True + await asyncio.sleep(interval) + elapsed += interval + return False + + +class ClientPool: + """ + Pool of HTTP clients for concurrent benchmark operations. + + Manages multiple authenticated clients for simulating concurrent users. + """ + + def __init__( + self, + base_url: str, + timeout: float = 30.0, + ): + """ + Initialize the client pool. + + Args: + base_url: Base URL of the Open WebUI instance + timeout: Request timeout in seconds + """ + self.base_url = base_url + self.timeout = timeout + self._clients: List[OpenWebUIClient] = [] + + async def create_clients( + self, + count: int, + email_pattern: str = "user{n}@benchmark.local", + password: str = "benchmark_password_123", + name_pattern: str = "Test User {n}", + ) -> List[OpenWebUIClient]: + """ + Create and authenticate multiple clients. + + Args: + count: Number of clients to create + email_pattern: Email pattern with {n} placeholder + password: Password for all users + name_pattern: Name pattern with {n} placeholder + + Returns: + List of authenticated clients + """ + clients = [] + + for i in range(count): + client = OpenWebUIClient(self.base_url, self.timeout) + await client.connect() + + email = email_pattern.format(n=i + 1) + name = name_pattern.format(n=i + 1) + + try: + # Try to sign in first (user might already exist) + await client.signin(email, password) + except httpx.HTTPStatusError: + # User doesn't exist, create them + await client.signup(email, password, name) + + clients.append(client) + + self._clients = clients + return clients + + async def create_single_user_clients( + self, + count: int, + email: str, + password: str, + ) -> List[OpenWebUIClient]: + """ + Create multiple clients all authenticated as the same user. + + This is useful when you want to simulate concurrent connections + from a single user account (e.g., same user on multiple devices). + + Args: + count: Number of clients to create + email: Email of the existing user + password: Password of the existing user + + Returns: + List of authenticated clients (all same user) + + Raises: + httpx.HTTPStatusError: If authentication fails + """ + clients = [] + + for i in range(count): + client = OpenWebUIClient(self.base_url, self.timeout) + await client.connect() + + # Sign in with the same credentials for each client + await client.signin(email, password) + clients.append(client) + + self._clients = clients + return clients + + async def create_clients_with_existing_users( + self, + credentials: List[tuple], + ) -> List[OpenWebUIClient]: + """ + Create clients using a list of existing user credentials. + + Args: + credentials: List of (email, password) tuples + + Returns: + List of authenticated clients + + Raises: + httpx.HTTPStatusError: If authentication fails + """ + clients = [] + + for email, password in credentials: + client = OpenWebUIClient(self.base_url, self.timeout) + await client.connect() + await client.signin(email, password) + clients.append(client) + + self._clients = clients + return clients + + async def create_benchmark_users( + self, + admin_client: OpenWebUIClient, + count: int, + email_pattern: str = "benchmark_user_{n}@test.local", + password: str = "benchmark_pass_123", + name_pattern: str = "Benchmark User {n}", + progress_callback: Optional[Callable[[int, int], None]] = None, + ) -> List[OpenWebUIClient]: + """ + Create temporary benchmark users via admin API and authenticate them. + + Uses parallel creation for better performance. + + This method: + 1. Uses the admin client to create N new users (in parallel batches) + 2. Creates and authenticates a client for each user (in parallel) + 3. Stores user IDs for cleanup later + + Args: + admin_client: Authenticated admin client + count: Number of users to create + email_pattern: Email pattern with {n} placeholder + password: Password for all benchmark users + name_pattern: Name pattern with {n} placeholder + progress_callback: Optional callback(current, total) for progress updates + + Returns: + List of authenticated clients for the new users + """ + self._benchmark_user_ids: List[str] = [] + + # Create all users in parallel via admin API + async def create_single_user(index: int) -> Optional[Dict[str, Any]]: + email = email_pattern.format(n=index + 1) + name = name_pattern.format(n=index + 1) + + try: + user_data = await admin_client.admin_create_user( + email=email, + password=password, + name=name, + role="user", + ) + return {"index": index, "user_data": user_data, "email": email, "existing": False} + except httpx.HTTPStatusError as e: + if e.response.status_code == 400: + # User already exists + return {"index": index, "user_data": None, "email": email, "existing": True} + raise + + # Create users in parallel (batch to avoid overwhelming the server) + batch_size = 10 + user_infos = [] + + for batch_start in range(0, count, batch_size): + batch_end = min(batch_start + batch_size, count) + batch_tasks = [create_single_user(i) for i in range(batch_start, batch_end)] + batch_results = await asyncio.gather(*batch_tasks, return_exceptions=True) + + for result in batch_results: + if isinstance(result, Exception): + raise result + if result: + user_infos.append(result) + + if progress_callback: + progress_callback(len(user_infos), count * 2) # *2 because we also need to auth + + # Store user IDs for cleanup + for info in user_infos: + if info["user_data"]: + self._benchmark_user_ids.append(info["user_data"].get("id")) + + # Authenticate all users in parallel + async def auth_user(info: Dict) -> OpenWebUIClient: + email = info["email"] + client = OpenWebUIClient(self.base_url, self.timeout) + await client.connect() + await client.signin(email, password) + + # If user already existed, get their ID for cleanup tracking + if info["existing"]: + user_data = await client.get_current_user() + self._benchmark_user_ids.append(user_data.get("id")) + + return client + + clients = [] + for batch_start in range(0, len(user_infos), batch_size): + batch_end = min(batch_start + batch_size, len(user_infos)) + batch_tasks = [auth_user(user_infos[i]) for i in range(batch_start, batch_end)] + batch_clients = await asyncio.gather(*batch_tasks, return_exceptions=True) + + for client in batch_clients: + if isinstance(client, Exception): + raise client + clients.append(client) + + if progress_callback: + progress_callback(count + len(clients), count * 2) + + self._clients = clients + return clients + + async def cleanup_benchmark_users( + self, + admin_client: OpenWebUIClient, + progress_callback: Optional[Callable[[int, int], None]] = None, + ) -> int: + """ + Delete all benchmark users created by create_benchmark_users. + + Uses parallel deletion for better performance. + + Args: + admin_client: Authenticated admin client + progress_callback: Optional callback(current, total) for progress updates + + Returns: + Number of users successfully deleted + """ + user_ids = getattr(self, '_benchmark_user_ids', []) + if not user_ids: + return 0 + + deleted_count = 0 + batch_size = 10 + + async def delete_user(user_id: str) -> bool: + try: + await admin_client.admin_delete_user(user_id) + return True + except httpx.HTTPStatusError: + return False + + for batch_start in range(0, len(user_ids), batch_size): + batch_end = min(batch_start + batch_size, len(user_ids)) + batch_tasks = [delete_user(user_ids[i]) for i in range(batch_start, batch_end)] + results = await asyncio.gather(*batch_tasks) + deleted_count += sum(results) + + if progress_callback: + progress_callback(batch_end, len(user_ids)) + + self._benchmark_user_ids = [] + return deleted_count + + async def close_all(self) -> None: + """Close all clients in the pool.""" + for client in self._clients: + await client.close() + self._clients.clear() + + def __len__(self) -> int: + """Get number of clients in pool.""" + return len(self._clients) + + def __iter__(self): + """Iterate over clients.""" + return iter(self._clients) + + def __getitem__(self, index: int) -> OpenWebUIClient: + """Get client by index.""" + return self._clients[index] diff --git a/benchmark/clients/websocket_client.py b/benchmark/clients/websocket_client.py new file mode 100644 index 0000000..1627738 --- /dev/null +++ b/benchmark/clients/websocket_client.py @@ -0,0 +1,289 @@ +""" +WebSocket client for real-time communication with Open WebUI. + +Provides async WebSocket connections for channel events and real-time messaging. +""" + +import asyncio +import json +from typing import Optional, Dict, Any, Callable, List +from dataclasses import dataclass, field +import socketio + + +@dataclass +class WebSocketMessage: + """Represents a received WebSocket message.""" + event: str + data: Dict[str, Any] + timestamp: float = 0.0 + + +class WebSocketClient: + """ + WebSocket client for Open WebUI real-time features. + + Uses Socket.IO to connect to Open WebUI's WebSocket endpoint. + """ + + def __init__( + self, + base_url: str, + token: str, + timeout: float = 60.0, + ): + """ + Initialize the WebSocket client. + + Args: + base_url: Base URL of the Open WebUI instance + token: Authentication token + timeout: Connection timeout in seconds + """ + self.base_url = base_url.rstrip('/') + self.token = token + self.timeout = timeout + + self._sio: Optional[socketio.AsyncClient] = None + self._connected = False + self._messages: List[WebSocketMessage] = [] + self._event_handlers: Dict[str, List[Callable]] = {} + self._user_id: Optional[str] = None + + @property + def connected(self) -> bool: + """Check if WebSocket is connected.""" + return self._connected and self._sio is not None + + @property + def messages(self) -> List[WebSocketMessage]: + """Get all received messages.""" + return self._messages + + async def connect(self) -> bool: + """ + Establish WebSocket connection. + + Returns: + True if connected successfully + """ + if self._connected: + return True + + self._sio = socketio.AsyncClient( + reconnection=True, + reconnection_delay=1, + reconnection_delay_max=5, + ) + + # Set up event handlers + @self._sio.event + async def connect(): + self._connected = True + # Emit user-join event + await self._sio.emit('user-join', {'auth': {'token': self.token}}) + + @self._sio.event + async def disconnect(): + self._connected = False + + @self._sio.event + async def connect_error(data): + self._connected = False + + @self._sio.on('events:channel') + async def on_channel_event(data): + import time + message = WebSocketMessage( + event='events:channel', + data=data, + timestamp=time.time(), + ) + self._messages.append(message) + + # Call registered handlers + for handler in self._event_handlers.get('events:channel', []): + try: + if asyncio.iscoroutinefunction(handler): + await handler(data) + else: + handler(data) + except Exception: + pass + + @self._sio.on('*') + async def on_any_event(event, data): + import time + message = WebSocketMessage( + event=event, + data=data if isinstance(data, dict) else {'data': data}, + timestamp=time.time(), + ) + self._messages.append(message) + + try: + # Connect with authentication + await self._sio.connect( + self.base_url, + socketio_path='/ws/socket.io', + transports=['websocket'], + auth={'token': self.token}, + wait_timeout=self.timeout, + ) + + # Wait for connection to be established + await asyncio.sleep(0.5) + return self._connected + + except Exception as e: + self._connected = False + raise ConnectionError(f"Failed to connect WebSocket: {e}") + + async def disconnect(self) -> None: + """Disconnect the WebSocket.""" + if self._sio: + await self._sio.disconnect() + self._connected = False + self._sio = None + + async def join_channels(self) -> None: + """Join all channels the user has access to.""" + if not self._connected: + raise RuntimeError("Not connected") + + await self._sio.emit('join-channels', {'auth': {'token': self.token}}) + + async def emit_typing(self, channel_id: str, message_id: Optional[str] = None) -> None: + """ + Emit a typing indicator event. + + Args: + channel_id: Channel ID + message_id: Optional message ID if typing in a thread + """ + if not self._connected: + raise RuntimeError("Not connected") + + data = { + 'channel_id': channel_id, + 'data': {'type': 'typing'}, + } + if message_id: + data['message_id'] = message_id + + await self._sio.emit('events:channel', data) + + def on_event(self, event: str, handler: Callable) -> None: + """ + Register an event handler. + + Args: + event: Event name to handle + handler: Callback function + """ + if event not in self._event_handlers: + self._event_handlers[event] = [] + self._event_handlers[event].append(handler) + + def clear_messages(self) -> None: + """Clear all stored messages.""" + self._messages.clear() + + async def wait_for_event( + self, + event: str, + timeout: float = 10.0, + condition: Optional[Callable[[Dict], bool]] = None, + ) -> Optional[WebSocketMessage]: + """ + Wait for a specific event. + + Args: + event: Event name to wait for + timeout: Maximum time to wait + condition: Optional condition function + + Returns: + The matching message, or None if timeout + """ + start_count = len(self._messages) + elapsed = 0.0 + interval = 0.1 + + while elapsed < timeout: + # Check new messages + for msg in self._messages[start_count:]: + if msg.event == event: + if condition is None or condition(msg.data): + return msg + + await asyncio.sleep(interval) + elapsed += interval + + return None + + +class WebSocketPool: + """ + Pool of WebSocket clients for concurrent benchmark operations. + + Manages multiple WebSocket connections for simulating concurrent users. + """ + + def __init__(self, base_url: str, timeout: float = 60.0): + """ + Initialize the WebSocket pool. + + Args: + base_url: Base URL of the Open WebUI instance + timeout: Connection timeout in seconds + """ + self.base_url = base_url + self.timeout = timeout + self._clients: List[WebSocketClient] = [] + + async def create_connections(self, tokens: List[str]) -> List[WebSocketClient]: + """ + Create WebSocket connections for multiple users. + + Args: + tokens: List of authentication tokens + + Returns: + List of connected WebSocket clients + """ + clients = [] + + for token in tokens: + client = WebSocketClient(self.base_url, token, self.timeout) + try: + await client.connect() + await client.join_channels() + clients.append(client) + except Exception as e: + # Continue with other clients even if one fails + pass + + self._clients = clients + return clients + + async def close_all(self) -> None: + """Close all WebSocket connections.""" + for client in self._clients: + try: + await client.disconnect() + except Exception: + pass + self._clients.clear() + + def __len__(self) -> int: + """Get number of connected clients.""" + return len(self._clients) + + def __iter__(self): + """Iterate over clients.""" + return iter(self._clients) + + def __getitem__(self, index: int) -> WebSocketClient: + """Get client by index.""" + return self._clients[index] diff --git a/benchmark/core/__init__.py b/benchmark/core/__init__.py new file mode 100644 index 0000000..9217467 --- /dev/null +++ b/benchmark/core/__init__.py @@ -0,0 +1,15 @@ +""" +Core benchmarking module - contains the main benchmarking infrastructure. +""" + +from benchmark.core.config import BenchmarkConfig +from benchmark.core.runner import BenchmarkRunner +from benchmark.core.metrics import MetricsCollector +from benchmark.core.base import BaseBenchmark + +__all__ = [ + "BenchmarkConfig", + "BenchmarkRunner", + "MetricsCollector", + "BaseBenchmark", +] diff --git a/benchmark/core/base.py b/benchmark/core/base.py new file mode 100644 index 0000000..1aa1bbe --- /dev/null +++ b/benchmark/core/base.py @@ -0,0 +1,277 @@ +""" +Base benchmark class that all benchmark implementations should extend. + +Provides common functionality for benchmark setup, execution, and teardown. +""" + +import asyncio +import time +from abc import ABC, abstractmethod +from dataclasses import dataclass, field +from typing import Optional, Dict, Any, List, Callable +from datetime import datetime + +from benchmark.core.config import BenchmarkConfig +from benchmark.core.metrics import MetricsCollector, BenchmarkResult + + +@dataclass +class BenchmarkContext: + """Context object passed to benchmark methods.""" + config: BenchmarkConfig + metrics: MetricsCollector + iteration: int = 0 + start_time: Optional[float] = None + end_time: Optional[float] = None + user_data: Dict[str, Any] = field(default_factory=dict) + + @property + def elapsed_time(self) -> float: + """Get elapsed time in seconds.""" + if self.start_time is None: + return 0.0 + end = self.end_time or time.time() + return end - self.start_time + + +class BaseBenchmark(ABC): + """ + Base class for all benchmarks. + + Subclasses should implement: + - setup(): Prepare the benchmark environment + - run(): Execute the actual benchmark + - teardown(): Clean up after the benchmark + - validate_result(): Validate the benchmark passed thresholds + """ + + name: str = "Base Benchmark" + description: str = "Base benchmark class" + version: str = "1.0.0" + + def __init__(self, config: BenchmarkConfig): + """ + Initialize the benchmark. + + Args: + config: Benchmark configuration + """ + self.config = config + self.metrics = MetricsCollector() + self._context: Optional[BenchmarkContext] = None + self._setup_complete = False + self._teardown_complete = False + + @property + def context(self) -> BenchmarkContext: + """Get the current benchmark context.""" + if self._context is None: + self._context = BenchmarkContext( + config=self.config, + metrics=self.metrics, + ) + return self._context + + @abstractmethod + async def setup(self) -> None: + """ + Set up the benchmark environment. + + This method should: + - Create necessary test data (users, channels, etc.) + - Establish connections + - Verify the target system is ready + + Raises: + Exception: If setup fails + """ + pass + + @abstractmethod + async def run(self) -> BenchmarkResult: + """ + Execute the benchmark. + + This method should: + - Execute the benchmark workload + - Record metrics using self.metrics + - Return the benchmark result + + Returns: + BenchmarkResult with collected metrics + """ + pass + + @abstractmethod + async def teardown(self) -> None: + """ + Clean up after the benchmark. + + This method should: + - Clean up test data + - Close connections + - Release resources + """ + pass + + def validate_result(self, result: BenchmarkResult) -> bool: + """ + Validate that the benchmark result meets thresholds. + + Args: + result: The benchmark result to validate + + Returns: + True if all thresholds are met, False otherwise + """ + thresholds = self.config.thresholds + + # Check response time threshold + if result.avg_response_time_ms > thresholds.max_response_time_ms: + return False + + # Check P95 response time threshold + if result.p95_response_time_ms > thresholds.max_p95_response_time_ms: + return False + + # Check error rate threshold + if result.error_rate_percent > thresholds.max_error_rate_percent: + return False + + # Check requests per second threshold + if result.requests_per_second < thresholds.min_requests_per_second: + return False + + return True + + async def warmup(self) -> None: + """ + Execute warmup requests before the main benchmark. + + Override this method to customize warmup behavior. + """ + pass + + async def cooldown(self) -> None: + """ + Execute cooldown period after the benchmark. + + Default implementation waits for configured cooldown time. + """ + await asyncio.sleep(self.config.cooldown_seconds) + + async def execute(self) -> BenchmarkResult: + """ + Execute the full benchmark lifecycle. + + This method orchestrates: + 1. Setup + 2. Warmup + 3. Run (multiple iterations if configured) + 4. Cooldown + 5. Teardown + + Returns: + Combined BenchmarkResult from all iterations + """ + try: + # Setup + await self.setup() + self._setup_complete = True + + # Warmup + if self.config.warmup_requests > 0: + await self.warmup() + + # Run benchmark iterations + results: List[BenchmarkResult] = [] + + for iteration in range(self.config.iterations): + self.context.iteration = iteration + 1 + self.context.start_time = time.time() + + result = await self.run() + + self.context.end_time = time.time() + results.append(result) + + # Cooldown between iterations (except last) + if iteration < self.config.iterations - 1: + await self.cooldown() + + # Combine results from all iterations + combined_result = self._combine_results(results) + combined_result.passed = self.validate_result(combined_result) + + return combined_result + + finally: + # Always attempt teardown + if self._setup_complete and not self._teardown_complete: + await self.teardown() + self._teardown_complete = True + + def _combine_results(self, results: List[BenchmarkResult]) -> BenchmarkResult: + """ + Combine multiple iteration results into a single result. + + Args: + results: List of results from each iteration + + Returns: + Combined BenchmarkResult + """ + if not results: + return BenchmarkResult( + benchmark_name=self.name, + timestamp=datetime.utcnow(), + ) + + # Use first result as base + combined = results[0] + + if len(results) == 1: + return combined + + # Average numeric metrics across iterations + combined.total_requests = sum(r.total_requests for r in results) + combined.successful_requests = sum(r.successful_requests for r in results) + combined.failed_requests = sum(r.failed_requests for r in results) + combined.total_duration_seconds = sum(r.total_duration_seconds for r in results) + + # Average response times + combined.avg_response_time_ms = sum(r.avg_response_time_ms for r in results) / len(results) + combined.min_response_time_ms = min(r.min_response_time_ms for r in results) + combined.max_response_time_ms = max(r.max_response_time_ms for r in results) + combined.p50_response_time_ms = sum(r.p50_response_time_ms for r in results) / len(results) + combined.p95_response_time_ms = sum(r.p95_response_time_ms for r in results) / len(results) + combined.p99_response_time_ms = sum(r.p99_response_time_ms for r in results) / len(results) + + # Recalculate derived metrics + if combined.total_requests > 0: + combined.error_rate_percent = (combined.failed_requests / combined.total_requests) * 100 + + if combined.total_duration_seconds > 0: + combined.requests_per_second = combined.total_requests / combined.total_duration_seconds + + combined.iterations = len(results) + + return combined + + def get_metadata(self) -> Dict[str, Any]: + """ + Get benchmark metadata. + + Returns: + Dictionary containing benchmark metadata + """ + return { + "name": self.name, + "description": self.description, + "version": self.version, + "config": { + "target_url": self.config.target_url, + "iterations": self.config.iterations, + "compute_profile": self.config.compute_profile.name if self.config.compute_profile else None, + } + } diff --git a/benchmark/core/config.py b/benchmark/core/config.py new file mode 100644 index 0000000..765dde1 --- /dev/null +++ b/benchmark/core/config.py @@ -0,0 +1,303 @@ +""" +Configuration management for the benchmark suite. + +Handles loading and validating configuration from YAML files and environment variables. +""" + +import os +from pathlib import Path +from typing import Optional, Dict, Any, List +from dataclasses import dataclass, field + +import yaml +from pydantic import BaseModel, Field +from dotenv import load_dotenv + +# Load environment variables from .env file +load_dotenv() + + +class ResourceConfig(BaseModel): + """Docker resource constraints configuration.""" + cpus: float = 2.0 + memory: str = "8g" + memory_swap: str = "8g" + memory_reservation: str = "4g" + + +class DockerConfig(BaseModel): + """Docker-specific configuration.""" + cpu_shares: int = 1024 + cpu_period: int = 100000 + cpu_quota: int = 200000 + + +class ComputeProfile(BaseModel): + """A compute profile defining resource constraints.""" + name: str + description: str + resources: ResourceConfig + docker: DockerConfig + + +class TestUserConfig(BaseModel): + """Configuration for test users.""" + email: str + password: str + name: str + role: str = "user" + + +class TestUserTemplateConfig(BaseModel): + """Template for generating test users.""" + email_pattern: str = "user{n}@benchmark.local" + password: str = "benchmark_user_password_123" + name_pattern: str = "Test User {n}" + role: str = "user" + + +class ThresholdsConfig(BaseModel): + """Performance thresholds for pass/fail criteria.""" + max_response_time_ms: int = 2000 + max_p95_response_time_ms: int = 3000 + max_error_rate_percent: float = 1.0 + min_requests_per_second: float = 10.0 + + +class ChannelBenchmarkConfig(BaseModel): + """Channel-specific benchmark configuration.""" + max_concurrent_users: int = 100 + user_step_size: int = 10 + ramp_up_time: int = 5 + sustain_time: int = 30 + message_frequency: float = 0.5 + message_size: Dict[str, int] = Field(default_factory=lambda: {"min": 50, "max": 500, "avg": 200}) + + +class OutputConfig(BaseModel): + """Output configuration for benchmark results.""" + results_dir: str = "results" + formats: List[str] = Field(default_factory=lambda: ["json", "csv"]) + include_timing_details: bool = True + include_resource_metrics: bool = True + + +class BenchmarkConfig(BaseModel): + """Main benchmark configuration.""" + target_url: str = "http://localhost:3000" + request_timeout: int = 30 + websocket_timeout: int = 60 + iterations: int = 3 + warmup_requests: int = 10 + cooldown_seconds: int = 5 + + # Sub-configurations + output: OutputConfig = Field(default_factory=OutputConfig) + thresholds: ThresholdsConfig = Field(default_factory=ThresholdsConfig) + channels: ChannelBenchmarkConfig = Field(default_factory=ChannelBenchmarkConfig) + + # Compute profile + compute_profile: Optional[ComputeProfile] = None + + # Test users - loaded from environment variables + admin_user: Optional[TestUserConfig] = None + test_user: Optional[TestUserConfig] = None # Single test user for benchmarks + user_template: TestUserTemplateConfig = Field(default_factory=TestUserTemplateConfig) + + # Use single user for all concurrent connections (simpler setup) + use_single_user: bool = True + + +class ConfigLoader: + """Loads and manages benchmark configuration.""" + + def __init__(self, config_dir: Optional[Path] = None): + """ + Initialize the config loader. + + Args: + config_dir: Path to configuration directory. If None, uses default. + """ + if config_dir is None: + # Default to benchmark/config directory + config_dir = Path(__file__).parent.parent.parent / "config" + self.config_dir = Path(config_dir) + + self._compute_profiles: Dict[str, ComputeProfile] = {} + self._benchmark_config: Optional[BenchmarkConfig] = None + + def load_compute_profiles(self) -> Dict[str, ComputeProfile]: + """Load compute profiles from YAML file.""" + profiles_file = self.config_dir / "compute_profiles.yaml" + + if not profiles_file.exists(): + raise FileNotFoundError(f"Compute profiles file not found: {profiles_file}") + + with open(profiles_file, 'r') as f: + data = yaml.safe_load(f) + + profiles = {} + for profile_id, profile_data in data.get("profiles", {}).items(): + profiles[profile_id] = ComputeProfile( + name=profile_data["name"], + description=profile_data["description"], + resources=ResourceConfig(**profile_data["resources"]), + docker=DockerConfig(**profile_data["docker"]), + ) + + self._compute_profiles = profiles + return profiles + + def get_compute_profile(self, profile_id: str) -> ComputeProfile: + """Get a specific compute profile by ID.""" + if not self._compute_profiles: + self.load_compute_profiles() + + if profile_id not in self._compute_profiles: + raise ValueError(f"Unknown compute profile: {profile_id}") + + return self._compute_profiles[profile_id] + + def load_benchmark_config( + self, + profile_id: str = "default", + overrides: Optional[Dict[str, Any]] = None + ) -> BenchmarkConfig: + """ + Load benchmark configuration with optional overrides. + + Args: + profile_id: Compute profile to use + overrides: Dictionary of configuration overrides + + Returns: + BenchmarkConfig instance + """ + config_file = self.config_dir / "benchmark_config.yaml" + + if not config_file.exists(): + raise FileNotFoundError(f"Benchmark config file not found: {config_file}") + + with open(config_file, 'r') as f: + data = yaml.safe_load(f) + + # Build configuration from YAML + benchmark_data = data.get("benchmark", {}) + output_data = data.get("output", {}) + thresholds_data = data.get("thresholds", {}) + channels_data = data.get("channels", {}) + test_users_data = data.get("test_users", {}) + + # Apply environment variable overrides for target URL + target_url = os.environ.get("OPEN_WEBUI_URL", + os.environ.get("BENCHMARK_TARGET_URL", + benchmark_data.get("target_url"))) + + # Load user credentials from environment variables (like the tests framework) + admin_email = os.environ.get("ADMIN_USER_EMAIL") + admin_password = os.environ.get("ADMIN_USER_PASSWORD") + test_email = os.environ.get("TEST_USER_EMAIL") + test_password = os.environ.get("TEST_USER_PASSWORD") + use_single_user = os.environ.get("USE_SINGLE_USER", "true").lower() == "true" + + # Override channel settings from environment + max_users = os.environ.get("MAX_CONCURRENT_USERS") + user_step = os.environ.get("USER_STEP_SIZE") + sustain_time = os.environ.get("SUSTAIN_TIME_SECONDS") + msg_freq = os.environ.get("MESSAGE_FREQUENCY") + + channels_config = ChannelBenchmarkConfig(**channels_data) if channels_data else ChannelBenchmarkConfig() + if max_users: + channels_config.max_concurrent_users = int(max_users) + if user_step: + channels_config.user_step_size = int(user_step) + if sustain_time: + channels_config.sustain_time = int(sustain_time) + if msg_freq: + channels_config.message_frequency = float(msg_freq) + + config = BenchmarkConfig( + target_url=target_url, + request_timeout=benchmark_data.get("request_timeout", 30), + websocket_timeout=benchmark_data.get("websocket_timeout", 60), + iterations=benchmark_data.get("iterations", 3), + warmup_requests=benchmark_data.get("warmup_requests", 10), + cooldown_seconds=benchmark_data.get("cooldown_seconds", 5), + output=OutputConfig(**output_data) if output_data else OutputConfig(), + thresholds=ThresholdsConfig(**thresholds_data) if thresholds_data else ThresholdsConfig(), + channels=channels_config, + compute_profile=self.get_compute_profile(profile_id), + use_single_user=use_single_user, + ) + + # Set admin user from environment variables (preferred) or YAML config + if admin_email and admin_password: + config.admin_user = TestUserConfig( + email=admin_email, + password=admin_password, + name="Admin User", + role="admin", + ) + elif "admin" in test_users_data: + config.admin_user = TestUserConfig(**test_users_data["admin"]) + + # Set test user from environment variables + if test_email and test_password: + config.test_user = TestUserConfig( + email=test_email, + password=test_password, + name="Test User", + role="user", + ) + + # Set user template if configured + if "user_template" in test_users_data: + config.user_template = TestUserTemplateConfig(**test_users_data["user_template"]) + + # Apply any additional overrides + if overrides: + for key, value in overrides.items(): + if hasattr(config, key): + setattr(config, key, value) + + self._benchmark_config = config + return config + + @property + def config(self) -> BenchmarkConfig: + """Get the current benchmark configuration.""" + if self._benchmark_config is None: + self._benchmark_config = self.load_benchmark_config() + return self._benchmark_config + + +# Global config loader instance +_config_loader: Optional[ConfigLoader] = None + + +def get_config_loader() -> ConfigLoader: + """Get the global config loader instance.""" + global _config_loader + if _config_loader is None: + _config_loader = ConfigLoader() + return _config_loader + + +def load_config( + profile_id: str = "default", + config_dir: Optional[Path] = None, + overrides: Optional[Dict[str, Any]] = None +) -> BenchmarkConfig: + """ + Convenience function to load benchmark configuration. + + Args: + profile_id: Compute profile to use + config_dir: Optional custom config directory + overrides: Optional configuration overrides + + Returns: + BenchmarkConfig instance + """ + loader = ConfigLoader(config_dir) if config_dir else get_config_loader() + return loader.load_benchmark_config(profile_id, overrides) diff --git a/benchmark/core/metrics.py b/benchmark/core/metrics.py new file mode 100644 index 0000000..69ff28c --- /dev/null +++ b/benchmark/core/metrics.py @@ -0,0 +1,445 @@ +""" +Metrics collection and result management for benchmarks. + +Provides utilities for recording timing information, calculating statistics, +and generating benchmark reports. +""" + +import statistics +import time +from dataclasses import dataclass, field +from datetime import datetime +from typing import Optional, Dict, Any, List +from contextlib import contextmanager +import json +import csv +from pathlib import Path + + +@dataclass +class TimingRecord: + """A single timing record for a request or operation.""" + operation: str + start_time: float + end_time: float + success: bool + error: Optional[str] = None + metadata: Dict[str, Any] = field(default_factory=dict) + + @property + def duration_ms(self) -> float: + """Get duration in milliseconds.""" + return (self.end_time - self.start_time) * 1000 + + @property + def duration_seconds(self) -> float: + """Get duration in seconds.""" + return self.end_time - self.start_time + + +@dataclass +class BenchmarkResult: + """Results from a benchmark run.""" + benchmark_name: str + timestamp: datetime = field(default_factory=datetime.utcnow) + + # Request counts + total_requests: int = 0 + successful_requests: int = 0 + failed_requests: int = 0 + + # Timing statistics (milliseconds) + avg_response_time_ms: float = 0.0 + min_response_time_ms: float = 0.0 + max_response_time_ms: float = 0.0 + p50_response_time_ms: float = 0.0 + p95_response_time_ms: float = 0.0 + p99_response_time_ms: float = 0.0 + + # Throughput + requests_per_second: float = 0.0 + total_duration_seconds: float = 0.0 + + # Error information + error_rate_percent: float = 0.0 + errors: List[str] = field(default_factory=list) + + # Validation + passed: bool = False + iterations: int = 1 + + # Resource metrics + peak_cpu_percent: float = 0.0 + peak_memory_mb: float = 0.0 + avg_cpu_percent: float = 0.0 + avg_memory_mb: float = 0.0 + + # Additional data + metadata: Dict[str, Any] = field(default_factory=dict) + detailed_timings: List[TimingRecord] = field(default_factory=list) + + # Concurrent users (for channel benchmarks) + concurrent_users: int = 0 + + def to_dict(self) -> Dict[str, Any]: + """Convert result to dictionary.""" + return { + "benchmark_name": self.benchmark_name, + "timestamp": self.timestamp.isoformat(), + "total_requests": self.total_requests, + "successful_requests": self.successful_requests, + "failed_requests": self.failed_requests, + "avg_response_time_ms": round(self.avg_response_time_ms, 2), + "min_response_time_ms": round(self.min_response_time_ms, 2), + "max_response_time_ms": round(self.max_response_time_ms, 2), + "p50_response_time_ms": round(self.p50_response_time_ms, 2), + "p95_response_time_ms": round(self.p95_response_time_ms, 2), + "p99_response_time_ms": round(self.p99_response_time_ms, 2), + "requests_per_second": round(self.requests_per_second, 2), + "total_duration_seconds": round(self.total_duration_seconds, 2), + "error_rate_percent": round(self.error_rate_percent, 2), + "errors": self.errors[:10], # Limit to first 10 errors + "passed": self.passed, + "iterations": self.iterations, + "concurrent_users": self.concurrent_users, + "peak_cpu_percent": round(self.peak_cpu_percent, 2), + "peak_memory_mb": round(self.peak_memory_mb, 2), + "avg_cpu_percent": round(self.avg_cpu_percent, 2), + "avg_memory_mb": round(self.avg_memory_mb, 2), + "metadata": self.metadata, + } + + def to_json(self) -> str: + """Convert result to JSON string.""" + return json.dumps(self.to_dict(), indent=2) + + +class MetricsCollector: + """ + Collects and calculates metrics during benchmark execution. + + Usage: + collector = MetricsCollector() + + # Record individual timings + with collector.time_operation("api_call"): + response = await make_api_call() + + # Get results + result = collector.get_result("my_benchmark") + """ + + def __init__(self): + """Initialize the metrics collector.""" + self._timings: List[TimingRecord] = [] + self._start_time: Optional[float] = None + self._end_time: Optional[float] = None + self._resource_samples: List[Dict[str, float]] = [] + + def start(self) -> None: + """Start the metrics collection timer.""" + self._start_time = time.time() + + def stop(self) -> None: + """Stop the metrics collection timer.""" + self._end_time = time.time() + + @contextmanager + def time_operation(self, operation: str, metadata: Optional[Dict[str, Any]] = None): + """ + Context manager to time an operation. + + Args: + operation: Name of the operation being timed + metadata: Optional metadata to attach to the timing record + + Yields: + The timing record (can be modified to set success/error) + """ + record = TimingRecord( + operation=operation, + start_time=time.time(), + end_time=0, + success=True, + metadata=metadata or {}, + ) + + try: + yield record + record.success = True + except Exception as e: + record.success = False + record.error = str(e) + raise + finally: + record.end_time = time.time() + self._timings.append(record) + + def record_timing( + self, + operation: str, + duration_ms: float, + success: bool = True, + error: Optional[str] = None, + metadata: Optional[Dict[str, Any]] = None, + ) -> None: + """ + Manually record a timing. + + Args: + operation: Name of the operation + duration_ms: Duration in milliseconds + success: Whether the operation was successful + error: Error message if failed + metadata: Optional metadata + """ + now = time.time() + duration_seconds = duration_ms / 1000 + + record = TimingRecord( + operation=operation, + start_time=now - duration_seconds, + end_time=now, + success=success, + error=error, + metadata=metadata or {}, + ) + self._timings.append(record) + + def record_resource_sample(self, cpu_percent: float, memory_mb: float) -> None: + """ + Record a resource usage sample. + + Args: + cpu_percent: CPU usage percentage + memory_mb: Memory usage in MB + """ + self._resource_samples.append({ + "timestamp": time.time(), + "cpu_percent": cpu_percent, + "memory_mb": memory_mb, + }) + + def get_result( + self, + benchmark_name: str, + concurrent_users: int = 0, + include_detailed_timings: bool = False, + metadata: Optional[Dict[str, Any]] = None, + ) -> BenchmarkResult: + """ + Calculate and return benchmark results. + + Args: + benchmark_name: Name of the benchmark + concurrent_users: Number of concurrent users tested + include_detailed_timings: Whether to include all timing records + metadata: Additional metadata to include + + Returns: + BenchmarkResult with calculated statistics + """ + result = BenchmarkResult( + benchmark_name=benchmark_name, + concurrent_users=concurrent_users, + metadata=metadata or {}, + ) + + if not self._timings: + return result + + # Calculate request counts + result.total_requests = len(self._timings) + result.successful_requests = sum(1 for t in self._timings if t.success) + result.failed_requests = result.total_requests - result.successful_requests + + # Get durations in milliseconds + durations = [t.duration_ms for t in self._timings] + successful_durations = [t.duration_ms for t in self._timings if t.success] + + if successful_durations: + # Calculate timing statistics on successful requests + result.avg_response_time_ms = statistics.mean(successful_durations) + result.min_response_time_ms = min(successful_durations) + result.max_response_time_ms = max(successful_durations) + + sorted_durations = sorted(successful_durations) + result.p50_response_time_ms = self._percentile(sorted_durations, 50) + result.p95_response_time_ms = self._percentile(sorted_durations, 95) + result.p99_response_time_ms = self._percentile(sorted_durations, 99) + + # Calculate duration + if self._start_time and self._end_time: + result.total_duration_seconds = self._end_time - self._start_time + else: + # Fall back to timing record range + result.total_duration_seconds = ( + max(t.end_time for t in self._timings) - + min(t.start_time for t in self._timings) + ) + + # Calculate throughput + if result.total_duration_seconds > 0: + result.requests_per_second = result.total_requests / result.total_duration_seconds + + # Calculate error rate + if result.total_requests > 0: + result.error_rate_percent = (result.failed_requests / result.total_requests) * 100 + + # Collect unique errors + result.errors = list(set( + t.error for t in self._timings + if t.error is not None + )) + + # Calculate resource metrics + if self._resource_samples: + cpu_values = [s["cpu_percent"] for s in self._resource_samples] + memory_values = [s["memory_mb"] for s in self._resource_samples] + + result.peak_cpu_percent = max(cpu_values) + result.peak_memory_mb = max(memory_values) + result.avg_cpu_percent = statistics.mean(cpu_values) + result.avg_memory_mb = statistics.mean(memory_values) + + # Include detailed timings if requested + if include_detailed_timings: + result.detailed_timings = self._timings.copy() + + return result + + def _percentile(self, sorted_data: List[float], percentile: float) -> float: + """Calculate percentile value from sorted data.""" + if not sorted_data: + return 0.0 + + k = (len(sorted_data) - 1) * (percentile / 100) + f = int(k) + c = f + 1 if f + 1 < len(sorted_data) else f + + if f == c: + return sorted_data[f] + + return sorted_data[f] * (c - k) + sorted_data[c] * (k - f) + + def reset(self) -> None: + """Reset all collected metrics.""" + self._timings.clear() + self._resource_samples.clear() + self._start_time = None + self._end_time = None + + +class ResultsWriter: + """Writes benchmark results to various formats.""" + + def __init__(self, output_dir: Path): + """ + Initialize the results writer. + + Args: + output_dir: Directory to write results to + """ + self.output_dir = Path(output_dir) + self.output_dir.mkdir(parents=True, exist_ok=True) + + def write_json(self, result: BenchmarkResult, filename: Optional[str] = None) -> Path: + """ + Write result to JSON file. + + Args: + result: Benchmark result to write + filename: Optional filename (default: benchmark_name_timestamp.json) + + Returns: + Path to written file + """ + if filename is None: + timestamp = result.timestamp.strftime("%Y%m%d_%H%M%S") + filename = f"{result.benchmark_name.replace(' ', '_').lower()}_{timestamp}.json" + + filepath = self.output_dir / filename + + with open(filepath, 'w') as f: + f.write(result.to_json()) + + return filepath + + def write_csv( + self, + results: List[BenchmarkResult], + filename: str = "benchmark_results.csv" + ) -> Path: + """ + Write multiple results to CSV file. + + Args: + results: List of benchmark results + filename: Output filename + + Returns: + Path to written file + """ + filepath = self.output_dir / filename + + fieldnames = [ + "benchmark_name", "timestamp", "concurrent_users", + "total_requests", "successful_requests", "failed_requests", + "avg_response_time_ms", "min_response_time_ms", "max_response_time_ms", + "p50_response_time_ms", "p95_response_time_ms", "p99_response_time_ms", + "requests_per_second", "error_rate_percent", "passed", + "peak_cpu_percent", "peak_memory_mb", + ] + + with open(filepath, 'w', newline='') as f: + writer = csv.DictWriter(f, fieldnames=fieldnames, extrasaction='ignore') + writer.writeheader() + + for result in results: + row = result.to_dict() + row["timestamp"] = result.timestamp.strftime("%Y-%m-%d %H:%M:%S") + writer.writerow(row) + + return filepath + + def write_summary(self, results: List[BenchmarkResult], filename: str = "summary.txt") -> Path: + """ + Write a human-readable summary of results. + + Args: + results: List of benchmark results + filename: Output filename + + Returns: + Path to written file + """ + filepath = self.output_dir / filename + + lines = [ + "=" * 60, + "BENCHMARK RESULTS SUMMARY", + "=" * 60, + "", + ] + + for result in results: + lines.extend([ + f"Benchmark: {result.benchmark_name}", + f" Concurrent Users: {result.concurrent_users}", + f" Total Requests: {result.total_requests}", + f" Success Rate: {100 - result.error_rate_percent:.1f}%", + f" Avg Response Time: {result.avg_response_time_ms:.2f}ms", + f" P95 Response Time: {result.p95_response_time_ms:.2f}ms", + f" Requests/sec: {result.requests_per_second:.2f}", + f" Status: {'PASSED' if result.passed else 'FAILED'}", + "", + ]) + + lines.extend([ + "=" * 60, + ]) + + with open(filepath, 'w') as f: + f.write('\n'.join(lines)) + + return filepath diff --git a/benchmark/core/runner.py b/benchmark/core/runner.py new file mode 100644 index 0000000..4f10c23 --- /dev/null +++ b/benchmark/core/runner.py @@ -0,0 +1,275 @@ +""" +Benchmark runner - orchestrates benchmark execution. + +Handles Docker container management, benchmark lifecycle, and result collection. +""" + +import asyncio +import time +from pathlib import Path +from typing import Optional, Dict, Any, List, Type +from datetime import datetime + +from rich.console import Console +from rich.progress import Progress, SpinnerColumn, TextColumn, BarColumn, TaskProgressColumn +from rich.table import Table +from rich.panel import Panel + +from benchmark.core.config import BenchmarkConfig, ConfigLoader, load_config +from benchmark.core.base import BaseBenchmark +from benchmark.core.metrics import BenchmarkResult, ResultsWriter + + +console = Console() + + +class BenchmarkRunner: + """ + Orchestrates benchmark execution. + + Handles: + - Loading configuration + - Managing Docker containers with resource constraints + - Running benchmarks + - Collecting and reporting results + """ + + def __init__( + self, + config: Optional[BenchmarkConfig] = None, + profile_id: str = "default", + output_dir: Optional[Path] = None, + ): + """ + Initialize the benchmark runner. + + Args: + config: Optional pre-loaded configuration + profile_id: Compute profile to use + output_dir: Directory for benchmark results + """ + self.config = config or load_config(profile_id) + self.profile_id = profile_id + + # Set up output directory + if output_dir is None: + output_dir = Path(__file__).parent.parent.parent / "results" + self.output_dir = Path(output_dir) + self.output_dir.mkdir(parents=True, exist_ok=True) + + self.results_writer = ResultsWriter(self.output_dir) + self._benchmarks: Dict[str, Type[BaseBenchmark]] = {} + self._results: List[BenchmarkResult] = [] + + def register_benchmark(self, benchmark_class: Type[BaseBenchmark]) -> None: + """ + Register a benchmark class for execution. + + Args: + benchmark_class: The benchmark class to register + """ + self._benchmarks[benchmark_class.name] = benchmark_class + + async def run_benchmark( + self, + benchmark_class: Type[BaseBenchmark], + **kwargs: Any, + ) -> BenchmarkResult: + """ + Run a single benchmark. + + Args: + benchmark_class: The benchmark class to run + **kwargs: Additional arguments to pass to the benchmark + + Returns: + BenchmarkResult from the benchmark + """ + console.print(Panel( + f"[bold blue]Running: {benchmark_class.name}[/bold blue]\n" + f"[dim]{benchmark_class.description}[/dim]", + title="Benchmark", + border_style="blue", + )) + + # Create benchmark instance + benchmark = benchmark_class(self.config) + + # Execute with progress display + with Progress( + SpinnerColumn(), + TextColumn("[progress.description]{task.description}"), + BarColumn(), + TaskProgressColumn(), + console=console, + ) as progress: + task = progress.add_task("Running benchmark...", total=100) + + try: + # Setup phase + progress.update(task, description="Setting up...", completed=10) + + # Execute benchmark + result = await benchmark.execute() + + progress.update(task, description="Collecting results...", completed=90) + + # Store result + self._results.append(result) + + progress.update(task, description="Complete!", completed=100) + + except Exception as e: + console.print(f"[red]Error running benchmark: {e}[/red]") + raise + + # Display result summary + self._display_result_summary(result) + + return result + + async def run_all(self) -> List[BenchmarkResult]: + """ + Run all registered benchmarks. + + Returns: + List of BenchmarkResult from all benchmarks + """ + if not self._benchmarks: + console.print("[yellow]No benchmarks registered![/yellow]") + return [] + + console.print(Panel( + f"[bold green]Running {len(self._benchmarks)} benchmark(s)[/bold green]\n" + f"Profile: {self.profile_id}\n" + f"Target: {self.config.target_url}", + title="Benchmark Suite", + border_style="green", + )) + + results = [] + for name, benchmark_class in self._benchmarks.items(): + try: + result = await self.run_benchmark(benchmark_class) + results.append(result) + except Exception as e: + console.print(f"[red]Benchmark '{name}' failed: {e}[/red]") + + # Write all results + self._write_results(results) + + return results + + def _display_result_summary(self, result: BenchmarkResult) -> None: + """Display a summary of benchmark results.""" + table = Table(title=f"Results: {result.benchmark_name}") + table.add_column("Metric", style="cyan") + table.add_column("Value", justify="right") + + table.add_row("Max Concurrent Users", str(result.concurrent_users)) + table.add_row("Total Requests", str(result.total_requests)) + table.add_row("Successful", str(result.successful_requests)) + table.add_row("Failed", str(result.failed_requests)) + table.add_row("Avg Response Time", f"{result.avg_response_time_ms:.2f} ms") + table.add_row("P95 Response Time", f"{result.p95_response_time_ms:.2f} ms") + table.add_row("P99 Response Time", f"{result.p99_response_time_ms:.2f} ms") + table.add_row("Requests/sec", f"{result.requests_per_second:.2f}") + + # Color-code error rate + error_color = "green" if result.error_rate_percent < 1 else "yellow" if result.error_rate_percent < 5 else "red" + table.add_row("Error Rate", f"[{error_color}]{result.error_rate_percent:.2f}%[/{error_color}]") + + if result.peak_cpu_percent > 0: + table.add_row("Peak CPU", f"{result.peak_cpu_percent:.1f}%") + table.add_row("Peak Memory", f"{result.peak_memory_mb:.1f} MB") + + console.print(table) + + def _write_results(self, results: List[BenchmarkResult]) -> None: + """Write benchmark results to files.""" + if not results: + return + + timestamp = datetime.utcnow().strftime("%Y%m%d_%H%M%S") + + # Write JSON for each result + for result in results: + json_path = self.results_writer.write_json(result) + console.print(f"[dim]Results written to: {json_path}[/dim]") + + # Write combined CSV + csv_path = self.results_writer.write_csv( + results, + filename=f"benchmark_results_{timestamp}.csv" + ) + console.print(f"[dim]CSV written to: {csv_path}[/dim]") + + # Write summary + summary_path = self.results_writer.write_summary( + results, + filename=f"summary_{timestamp}.txt" + ) + console.print(f"[dim]Summary written to: {summary_path}[/dim]") + + def display_final_summary(self) -> None: + """Display a final summary of all benchmark results.""" + if not self._results: + return + + console.print("\n") + console.print(Panel( + "[bold]BENCHMARK SUITE COMPLETE[/bold]", + border_style="green", + )) + + # Summary table + table = Table(title="Final Results Summary") + table.add_column("Benchmark", style="cyan") + table.add_column("Max Users", justify="right") + table.add_column("Requests", justify="right") + table.add_column("Avg Time", justify="right") + table.add_column("P95 Time", justify="right") + table.add_column("RPS", justify="right") + table.add_column("Errors", justify="right") + + for result in self._results: + error_color = "green" if result.error_rate_percent < 1 else "yellow" if result.error_rate_percent < 5 else "red" + table.add_row( + result.benchmark_name, + str(result.concurrent_users), + str(result.total_requests), + f"{result.avg_response_time_ms:.0f}ms", + f"{result.p95_response_time_ms:.0f}ms", + f"{result.requests_per_second:.1f}", + f"[{error_color}]{result.error_rate_percent:.1f}%[/{error_color}]", + ) + + console.print(table) + console.print() + + +async def run_benchmarks( + benchmarks: List[Type[BaseBenchmark]], + profile_id: str = "default", + output_dir: Optional[Path] = None, +) -> List[BenchmarkResult]: + """ + Convenience function to run multiple benchmarks. + + Args: + benchmarks: List of benchmark classes to run + profile_id: Compute profile to use + output_dir: Output directory for results + + Returns: + List of benchmark results + """ + runner = BenchmarkRunner(profile_id=profile_id, output_dir=output_dir) + + for benchmark_class in benchmarks: + runner.register_benchmark(benchmark_class) + + results = await runner.run_all() + runner.display_final_summary() + + return results diff --git a/benchmark/scenarios/__init__.py b/benchmark/scenarios/__init__.py new file mode 100644 index 0000000..14ab6e8 --- /dev/null +++ b/benchmark/scenarios/__init__.py @@ -0,0 +1,11 @@ +""" +Benchmark scenarios module. + +Contains specific benchmark implementations for different Open WebUI features. +""" + +from benchmark.scenarios.channels import ChannelConcurrencyBenchmark + +__all__ = [ + "ChannelConcurrencyBenchmark", +] diff --git a/benchmark/scenarios/channels.py b/benchmark/scenarios/channels.py new file mode 100644 index 0000000..ceee0e0 --- /dev/null +++ b/benchmark/scenarios/channels.py @@ -0,0 +1,619 @@ +""" +Channel concurrency benchmark. + +Tests how many concurrent users can be in a Channel before the system +starts experiencing performance degradation. +""" + +import asyncio +import random +import string +import time +from typing import Optional, Dict, Any, List +from datetime import datetime + +from rich.console import Console +from rich.progress import Progress, SpinnerColumn, TextColumn, BarColumn, TaskProgressColumn +from rich.live import Live +from rich.table import Table + +from benchmark.core.base import BaseBenchmark +from benchmark.core.config import BenchmarkConfig +from benchmark.core.metrics import MetricsCollector, BenchmarkResult +from benchmark.clients.http_client import OpenWebUIClient, ClientPool +from benchmark.clients.websocket_client import WebSocketClient, WebSocketPool + + +console = Console() + + +def generate_message_content(min_length: int = 50, max_length: int = 500) -> str: + """Generate random message content.""" + length = random.randint(min_length, max_length) + words = [] + current_length = 0 + + while current_length < length: + word_length = random.randint(3, 10) + word = ''.join(random.choices(string.ascii_lowercase, k=word_length)) + words.append(word) + current_length += word_length + 1 + + return ' '.join(words)[:length] + + +class ChannelConcurrencyBenchmark(BaseBenchmark): + """ + Benchmark for testing channel concurrent user capacity. + + This benchmark: + 1. Creates a test channel + 2. Progressively adds users up to a maximum + 3. Has each user send messages at a configured rate + 4. Measures response times and error rates at each level + 5. Identifies the point where performance degrades + """ + + name = "Channel Concurrency" + description = "Test concurrent user capacity in Open WebUI Channels" + version = "1.0.0" + + def __init__(self, config: BenchmarkConfig): + """Initialize the channel concurrency benchmark.""" + super().__init__(config) + + self._admin_client: Optional[OpenWebUIClient] = None + self._client_pool: Optional[ClientPool] = None + self._ws_pool: Optional[WebSocketPool] = None + self._test_channel_id: Optional[str] = None + self._created_users: List[str] = [] # Track user IDs for cleanup + + async def setup(self) -> None: + """ + Set up the benchmark environment. + + Uses admin credentials to: + 1. Create a test channel + 2. Create temporary benchmark users dynamically + + Requires ADMIN_USER_EMAIL and ADMIN_USER_PASSWORD in environment. + """ + # Validate that we have admin credentials configured + if not self.config.admin_user: + raise RuntimeError( + "Admin user credentials not configured. " + "Set ADMIN_USER_EMAIL and ADMIN_USER_PASSWORD environment variables." + ) + + # Create admin client + self._admin_client = OpenWebUIClient( + self.config.target_url, + self.config.request_timeout, + ) + await self._admin_client.connect() + + # Wait for service to be ready + if not await self._admin_client.wait_for_ready(): + raise RuntimeError("Open WebUI service not ready") + + # Authenticate admin (must already exist) + admin_config = self.config.admin_user + try: + await self._admin_client.signin(admin_config.email, admin_config.password) + except Exception as e: + raise RuntimeError( + f"Failed to sign in as admin ({admin_config.email}). " + f"Make sure this user exists in Open WebUI. Error: {e}" + ) + + # Create test channel + channel_name = f"benchmark-channel-{int(time.time())}" + channel = await self._admin_client.create_channel( + name=channel_name, + description="Benchmark test channel - will be deleted after test", + access_control=None, # Public channel for benchmark + ) + self._test_channel_id = channel["id"] + + # Initialize client pool + self._client_pool = ClientPool( + self.config.target_url, + self.config.request_timeout, + ) + + async def run(self) -> BenchmarkResult: + """ + Execute the channel concurrency benchmark. + + Progressively increases concurrent users and measures performance + at each level. + + Returns: + BenchmarkResult with metrics from the benchmark + """ + channel_config = self.config.channels + max_users = channel_config.max_concurrent_users + step_size = channel_config.user_step_size + sustain_time = channel_config.sustain_time + message_frequency = channel_config.message_frequency + + all_results: List[BenchmarkResult] = [] + + # Calculate total levels for progress + total_levels = (max_users + step_size - 1) // step_size + current_level = 0 + + # Progressive load testing + current_users = step_size + + console.print(f"\n[bold cyan]Testing {step_size} to {max_users} concurrent users (step: {step_size})[/bold cyan]") + console.print(f"[dim]Each level runs for {sustain_time}s with {self.config.cooldown_seconds}s cooldown[/dim]\n") + + while current_users <= max_users: + current_level += 1 + console.print(f"[yellow]━━━ Level {current_level}/{total_levels}: {current_users} users ━━━[/yellow]") + + # Run benchmark at this level + result = await self._run_at_user_level( + user_count=current_users, + duration=sustain_time, + message_frequency=message_frequency, + ) + + all_results.append(result) + + # Show level results + status_color = "green" if result.error_rate_percent < 5 else "yellow" if result.error_rate_percent < 10 else "red" + console.print(f" Requests: {result.total_requests} | " + f"Avg: {result.avg_response_time_ms:.1f}ms | " + f"P95: {result.p95_response_time_ms:.1f}ms | " + f"[{status_color}]Errors: {result.error_rate_percent:.1f}%[/{status_color}]") + + # Check if we should stop (too many errors) + if result.error_rate_percent > self.config.thresholds.max_error_rate_percent * 2: + console.print(f"[red]⚠ Stopping early - error rate too high ({result.error_rate_percent:.1f}%)[/red]") + break + + # Increase user count + current_users += step_size + + # Brief cooldown between levels (except last) + if current_users <= max_users: + console.print(f"[dim] Cooldown {self.config.cooldown_seconds}s...[/dim]") + await asyncio.sleep(self.config.cooldown_seconds) + + console.print() + + # Find the best sustainable user count + return self._analyze_results(all_results) + + async def _run_at_user_level( + self, + user_count: int, + duration: float, + message_frequency: float, + ) -> BenchmarkResult: + """ + Run benchmark at a specific user count level. + + Args: + user_count: Number of concurrent users + duration: How long to sustain the load + message_frequency: Messages per second per user + + Returns: + BenchmarkResult for this level + """ + metrics = MetricsCollector() + + # Create benchmark users via admin API with progress + with Progress( + SpinnerColumn(), + TextColumn("[progress.description]{task.description}"), + BarColumn(), + TaskProgressColumn(), + console=console, + transient=True, + ) as progress: + task = progress.add_task(f"Creating {user_count} users...", total=user_count * 2) + + def update_progress(current: int, total: int): + progress.update(task, completed=current) + + clients = await self._client_pool.create_benchmark_users( + admin_client=self._admin_client, + count=user_count, + email_pattern="benchmark_user_{n}@test.local", + password="benchmark_pass_123", + name_pattern="Benchmark User {n}", + progress_callback=update_progress, + ) + + console.print(f" [green]✓[/green] Created {len(clients)} users") + + # Calculate message interval + if message_frequency > 0: + message_interval = 1.0 / message_frequency + else: + message_interval = float('inf') + + # Run the actual benchmark + console.print(f" [cyan]▶[/cyan] Running load test for {duration}s...") + metrics.start() + + # Create tasks for each user + end_time = time.time() + duration + tasks = [] + + for client in clients: + task = asyncio.create_task( + self._user_activity( + client=client, + channel_id=self._test_channel_id, + message_interval=message_interval, + end_time=end_time, + metrics=metrics, + ) + ) + tasks.append(task) + + # Wait for all tasks to complete + await asyncio.gather(*tasks, return_exceptions=True) + + metrics.stop() + + # Clean up: close clients and delete benchmark users + await self._client_pool.close_all() + + with Progress( + SpinnerColumn(), + TextColumn("[progress.description]{task.description}"), + BarColumn(), + TaskProgressColumn(), + console=console, + transient=True, + ) as progress: + task = progress.add_task(f"Cleaning up {user_count} users...", total=user_count) + + def cleanup_progress(current: int, total: int): + progress.update(task, completed=current) + + await self._client_pool.cleanup_benchmark_users( + self._admin_client, + progress_callback=cleanup_progress, + ) + + console.print(f" [green]✓[/green] Cleanup complete") + + # Get result + result = metrics.get_result( + benchmark_name=f"Channel @ {user_count} users", + concurrent_users=user_count, + metadata={"user_count": user_count, "duration": duration}, + ) + + return result + + async def _user_activity( + self, + client: OpenWebUIClient, + channel_id: str, + message_interval: float, + end_time: float, + metrics: MetricsCollector, + ) -> None: + """ + Simulate user activity in a channel. + + Args: + client: HTTP client for the user + channel_id: Channel to send messages to + message_interval: Time between messages + end_time: When to stop + metrics: Metrics collector + """ + message_size = self.config.channels.message_size + + while time.time() < end_time: + try: + # Generate message + content = generate_message_content( + min_length=message_size.get("min", 50), + max_length=message_size.get("max", 500), + ) + + # Send message and time it + start = time.time() + try: + await client.post_message(channel_id, content) + duration_ms = (time.time() - start) * 1000 + metrics.record_timing( + operation="post_message", + duration_ms=duration_ms, + success=True, + metadata={"user": client.user.email if client.user else "unknown"}, + ) + except Exception as e: + duration_ms = (time.time() - start) * 1000 + metrics.record_timing( + operation="post_message", + duration_ms=duration_ms, + success=False, + error=str(e), + ) + + # Also fetch messages periodically + if random.random() < 0.3: # 30% chance + start = time.time() + try: + await client.get_channel_messages(channel_id, limit=20) + duration_ms = (time.time() - start) * 1000 + metrics.record_timing( + operation="get_messages", + duration_ms=duration_ms, + success=True, + ) + except Exception as e: + duration_ms = (time.time() - start) * 1000 + metrics.record_timing( + operation="get_messages", + duration_ms=duration_ms, + success=False, + error=str(e), + ) + + # Wait before next message + # Add some jitter to avoid thundering herd + jitter = random.uniform(0.8, 1.2) + await asyncio.sleep(message_interval * jitter) + + except asyncio.CancelledError: + break + except Exception: + # Continue on other errors + await asyncio.sleep(0.5) + + def _analyze_results(self, results: List[BenchmarkResult]) -> BenchmarkResult: + """ + Analyze results from all user levels. + + Finds the maximum sustainable user count and returns a summary result. + + Args: + results: Results from each user level + + Returns: + Summary BenchmarkResult + """ + if not results: + return BenchmarkResult( + benchmark_name=self.name, + timestamp=datetime.utcnow(), + passed=False, + ) + + # Find the highest user count that passed thresholds + max_sustainable_users = 0 + best_result = results[0] + + for result in results: + # Check if this level meets thresholds + passes_response_time = ( + result.p95_response_time_ms <= self.config.thresholds.max_p95_response_time_ms + ) + passes_error_rate = ( + result.error_rate_percent <= self.config.thresholds.max_error_rate_percent + ) + + if passes_response_time and passes_error_rate: + if result.concurrent_users > max_sustainable_users: + max_sustainable_users = result.concurrent_users + best_result = result + + # Create summary result + summary = BenchmarkResult( + benchmark_name=self.name, + timestamp=datetime.utcnow(), + concurrent_users=max_sustainable_users, + total_requests=sum(r.total_requests for r in results), + successful_requests=sum(r.successful_requests for r in results), + failed_requests=sum(r.failed_requests for r in results), + avg_response_time_ms=best_result.avg_response_time_ms, + min_response_time_ms=min(r.min_response_time_ms for r in results if r.min_response_time_ms > 0), + max_response_time_ms=max(r.max_response_time_ms for r in results), + p50_response_time_ms=best_result.p50_response_time_ms, + p95_response_time_ms=best_result.p95_response_time_ms, + p99_response_time_ms=best_result.p99_response_time_ms, + requests_per_second=best_result.requests_per_second, + total_duration_seconds=sum(r.total_duration_seconds for r in results), + error_rate_percent=best_result.error_rate_percent, + iterations=len(results), + metadata={ + "max_sustainable_users": max_sustainable_users, + "tested_levels": [r.concurrent_users for r in results], + "results_by_level": [ + { + "users": r.concurrent_users, + "p95_ms": r.p95_response_time_ms, + "error_rate": r.error_rate_percent, + } + for r in results + ], + }, + ) + + # Determine if passed + summary.passed = max_sustainable_users > 0 + + return summary + + async def teardown(self) -> None: + """Clean up benchmark resources.""" + # Delete test channel + if self._test_channel_id and self._admin_client: + try: + await self._admin_client.delete_channel(self._test_channel_id) + except Exception: + pass + + # Close admin client + if self._admin_client: + await self._admin_client.close() + + # Close any remaining pooled clients + if self._client_pool: + await self._client_pool.close_all() + + # Close WebSocket connections + if self._ws_pool: + await self._ws_pool.close_all() + + +class ChannelWebSocketBenchmark(BaseBenchmark): + """ + Benchmark for testing channel WebSocket scalability. + + Tests real-time message delivery through WebSockets rather than + HTTP polling. + """ + + name = "Channel WebSocket" + description = "Test WebSocket scalability in Open WebUI Channels" + version = "1.0.0" + + def __init__(self, config: BenchmarkConfig): + """Initialize the WebSocket benchmark.""" + super().__init__(config) + + self._admin_client: Optional[OpenWebUIClient] = None + self._client_pool: Optional[ClientPool] = None + self._ws_pool: Optional[WebSocketPool] = None + self._test_channel_id: Optional[str] = None + + async def setup(self) -> None: + """Set up benchmark environment.""" + # Similar to ChannelConcurrencyBenchmark setup + self._admin_client = OpenWebUIClient( + self.config.target_url, + self.config.request_timeout, + ) + await self._admin_client.connect() + + if not await self._admin_client.wait_for_ready(): + raise RuntimeError("Open WebUI service not ready") + + # Authenticate admin + admin_config = self.config.admin_user + if admin_config: + try: + await self._admin_client.signin(admin_config.email, admin_config.password) + except Exception: + await self._admin_client.signup( + admin_config.email, + admin_config.password, + admin_config.name, + ) + else: + try: + await self._admin_client.signin("admin@benchmark.local", "benchmark_admin_123") + except Exception: + await self._admin_client.signup("admin@benchmark.local", "benchmark_admin_123", "Benchmark Admin") + + # Create test channel + channel_name = f"benchmark-ws-channel-{int(time.time())}" + channel = await self._admin_client.create_channel( + name=channel_name, + description="WebSocket benchmark channel", + ) + self._test_channel_id = channel["id"] + + # Initialize pools + self._client_pool = ClientPool(self.config.target_url, self.config.request_timeout) + self._ws_pool = WebSocketPool(self.config.target_url, self.config.websocket_timeout) + + async def run(self) -> BenchmarkResult: + """ + Execute WebSocket benchmark. + + Tests message delivery latency through WebSockets. + """ + metrics = MetricsCollector() + metrics.start() + + channel_config = self.config.channels + user_count = min(channel_config.max_concurrent_users, 50) # Limit for WS test + + # Create HTTP clients for users + clients = await self._client_pool.create_clients( + count=user_count, + email_pattern=self.config.user_template.email_pattern, + password=self.config.user_template.password, + name_pattern=self.config.user_template.name_pattern, + ) + + # Create WebSocket connections + tokens = [client.token for client in clients if client.token] + ws_clients = await self._ws_pool.create_connections(tokens) + + # Run test - measure message propagation time + duration = channel_config.sustain_time + end_time = time.time() + duration + + message_count = 0 + while time.time() < end_time: + # Pick random sender + sender = random.choice(clients) + + # Track start time + start = time.time() + + try: + # Send message via HTTP + content = f"Test message {message_count}" + await sender.post_message(self._test_channel_id, content) + + duration_ms = (time.time() - start) * 1000 + metrics.record_timing( + operation="send_message", + duration_ms=duration_ms, + success=True, + ) + message_count += 1 + + except Exception as e: + metrics.record_timing( + operation="send_message", + duration_ms=(time.time() - start) * 1000, + success=False, + error=str(e), + ) + + await asyncio.sleep(0.5) # Message rate + + metrics.stop() + + return metrics.get_result( + benchmark_name=self.name, + concurrent_users=user_count, + metadata={ + "websocket_connections": len(ws_clients), + "messages_sent": message_count, + }, + ) + + async def teardown(self) -> None: + """Clean up resources.""" + if self._test_channel_id and self._admin_client: + try: + await self._admin_client.delete_channel(self._test_channel_id) + except Exception: + pass + + if self._admin_client: + await self._admin_client.close() + + if self._client_pool: + await self._client_pool.close_all() + + if self._ws_pool: + await self._ws_pool.close_all() diff --git a/benchmark/utils/__init__.py b/benchmark/utils/__init__.py new file mode 100644 index 0000000..af15f92 --- /dev/null +++ b/benchmark/utils/__init__.py @@ -0,0 +1,11 @@ +""" +Utility modules for the benchmark suite. +""" + +from benchmark.utils.docker import DockerManager, DockerComposeManager, ContainerStats + +__all__ = [ + "DockerManager", + "DockerComposeManager", + "ContainerStats", +] diff --git a/benchmark/utils/docker.py b/benchmark/utils/docker.py new file mode 100644 index 0000000..d0093d6 --- /dev/null +++ b/benchmark/utils/docker.py @@ -0,0 +1,414 @@ +""" +Docker utilities for benchmark environment management. + +Provides functions for managing Docker containers with specific resource constraints. +""" + +import asyncio +import time +from pathlib import Path +from typing import Optional, Dict, Any, List +from dataclasses import dataclass + +try: + import docker + from docker.models.containers import Container + DOCKER_AVAILABLE = True +except ImportError: + DOCKER_AVAILABLE = False + Container = None + +from benchmark.core.config import ComputeProfile + + +@dataclass +class ContainerStats: + """Container resource usage statistics.""" + cpu_percent: float + memory_usage_mb: float + memory_limit_mb: float + memory_percent: float + network_rx_bytes: int + network_tx_bytes: int + timestamp: float + + +class DockerManager: + """ + Manages Docker containers for benchmark environments. + + Provides methods for starting, stopping, and monitoring + Open WebUI containers with specific resource constraints. + """ + + def __init__(self): + """Initialize the Docker manager.""" + if not DOCKER_AVAILABLE: + raise ImportError("Docker SDK not available. Install with: pip install docker") + + self._client = docker.from_env() + self._containers: Dict[str, Container] = {} + + def start_open_webui( + self, + profile: ComputeProfile, + name: str = "open-webui-benchmark", + image: str = "ghcr.io/open-webui/open-webui:main", + port: int = 3000, + environment: Optional[Dict[str, str]] = None, + volumes: Optional[Dict[str, Dict]] = None, + ) -> Container: + """ + Start an Open WebUI container with specified resource constraints. + + Args: + profile: Compute profile defining resource constraints + name: Container name + image: Docker image to use + port: Host port to expose + environment: Additional environment variables + volumes: Volume mounts + + Returns: + Container instance + """ + # Remove existing container if present + try: + existing = self._client.containers.get(name) + existing.remove(force=True) + except docker.errors.NotFound: + pass + + # Build environment + env = { + "WEBUI_SECRET_KEY": "benchmark-secret-key", + } + if environment: + env.update(environment) + + # Build volume mounts + vol_mounts = {} + if volumes: + vol_mounts.update(volumes) + + # Calculate resource constraints + resources = profile.resources + docker_config = profile.docker + + # Convert memory string to bytes + memory_limit = self._parse_memory(resources.memory) + memory_reservation = self._parse_memory(resources.memory_reservation) + memswap_limit = self._parse_memory(resources.memory_swap) + + # Start container + container = self._client.containers.run( + image=image, + name=name, + detach=True, + ports={"8080/tcp": port}, + environment=env, + volumes=vol_mounts, + # Resource constraints + cpu_shares=docker_config.cpu_shares, + cpu_period=docker_config.cpu_period, + cpu_quota=docker_config.cpu_quota, + mem_limit=memory_limit, + mem_reservation=memory_reservation, + memswap_limit=memswap_limit, + # Other options + restart_policy={"Name": "unless-stopped"}, + remove=False, + ) + + self._containers[name] = container + return container + + def stop_container(self, name: str, timeout: int = 10) -> bool: + """ + Stop and remove a container. + + Args: + name: Container name + timeout: Timeout for graceful stop + + Returns: + True if successful + """ + try: + container = self._containers.get(name) + if container is None: + container = self._client.containers.get(name) + + container.stop(timeout=timeout) + container.remove() + + if name in self._containers: + del self._containers[name] + + return True + except Exception: + return False + + def get_container_stats(self, name: str) -> Optional[ContainerStats]: + """ + Get current resource usage statistics for a container. + + Args: + name: Container name + + Returns: + ContainerStats or None if container not found + """ + try: + container = self._containers.get(name) + if container is None: + container = self._client.containers.get(name) + + stats = container.stats(stream=False) + + # Calculate CPU percentage + cpu_delta = ( + stats["cpu_stats"]["cpu_usage"]["total_usage"] - + stats["precpu_stats"]["cpu_usage"]["total_usage"] + ) + system_delta = ( + stats["cpu_stats"]["system_cpu_usage"] - + stats["precpu_stats"]["system_cpu_usage"] + ) + num_cpus = stats["cpu_stats"]["online_cpus"] + + if system_delta > 0: + cpu_percent = (cpu_delta / system_delta) * num_cpus * 100 + else: + cpu_percent = 0.0 + + # Calculate memory usage + memory_usage = stats["memory_stats"].get("usage", 0) + memory_limit = stats["memory_stats"].get("limit", 1) + memory_percent = (memory_usage / memory_limit) * 100 + + # Get network stats + networks = stats.get("networks", {}) + network_rx = sum(net.get("rx_bytes", 0) for net in networks.values()) + network_tx = sum(net.get("tx_bytes", 0) for net in networks.values()) + + return ContainerStats( + cpu_percent=cpu_percent, + memory_usage_mb=memory_usage / (1024 * 1024), + memory_limit_mb=memory_limit / (1024 * 1024), + memory_percent=memory_percent, + network_rx_bytes=network_rx, + network_tx_bytes=network_tx, + timestamp=time.time(), + ) + except Exception: + return None + + async def collect_stats_async( + self, + name: str, + duration: float, + interval: float = 1.0, + ) -> List[ContainerStats]: + """ + Collect container stats over a period of time. + + Args: + name: Container name + duration: Duration to collect stats + interval: Time between samples + + Returns: + List of ContainerStats samples + """ + samples = [] + end_time = time.time() + duration + + while time.time() < end_time: + stats = self.get_container_stats(name) + if stats: + samples.append(stats) + await asyncio.sleep(interval) + + return samples + + def wait_for_healthy( + self, + name: str, + timeout: float = 60.0, + interval: float = 2.0, + ) -> bool: + """ + Wait for a container to become healthy. + + Args: + name: Container name + timeout: Maximum time to wait + interval: Time between checks + + Returns: + True if container is healthy, False if timeout + """ + try: + container = self._containers.get(name) + if container is None: + container = self._client.containers.get(name) + + elapsed = 0.0 + while elapsed < timeout: + container.reload() + status = container.status + + if status == "running": + # Check health if available + health = container.attrs.get("State", {}).get("Health", {}) + health_status = health.get("Status", "none") + + if health_status in ("healthy", "none"): + return True + + time.sleep(interval) + elapsed += interval + + return False + except Exception: + return False + + def _parse_memory(self, memory_str: str) -> int: + """Parse memory string (e.g., '8g') to bytes.""" + memory_str = memory_str.lower().strip() + + multipliers = { + 'b': 1, + 'k': 1024, + 'kb': 1024, + 'm': 1024 * 1024, + 'mb': 1024 * 1024, + 'g': 1024 * 1024 * 1024, + 'gb': 1024 * 1024 * 1024, + } + + for suffix, multiplier in multipliers.items(): + if memory_str.endswith(suffix): + value = memory_str[:-len(suffix)] + return int(float(value) * multiplier) + + return int(memory_str) + + def cleanup_all(self) -> None: + """Stop and remove all managed containers.""" + for name in list(self._containers.keys()): + self.stop_container(name) + + +class DockerComposeManager: + """ + Manages Docker Compose environments for benchmarking. + + Uses existing docker-compose.yaml files from the Open WebUI repository. + """ + + def __init__(self, compose_dir: Path): + """ + Initialize the compose manager. + + Args: + compose_dir: Directory containing docker-compose.yaml + """ + self.compose_dir = Path(compose_dir) + self._project_name = "open-webui-benchmark" + + async def up( + self, + profile: Optional[ComputeProfile] = None, + build: bool = False, + detach: bool = True, + ) -> bool: + """ + Start the Docker Compose environment. + + Args: + profile: Optional compute profile for resource constraints + build: Whether to build images + detach: Whether to run in background + + Returns: + True if successful + """ + cmd = ["docker", "compose", "-p", self._project_name] + + # Add compose file + compose_file = self.compose_dir / "docker-compose.yaml" + if compose_file.exists(): + cmd.extend(["-f", str(compose_file)]) + + cmd.append("up") + + if build: + cmd.append("--build") + if detach: + cmd.append("-d") + + # TODO: Apply resource constraints from profile + # This would require modifying the compose file or using override files + + process = await asyncio.create_subprocess_exec( + *cmd, + cwd=self.compose_dir, + stdout=asyncio.subprocess.PIPE, + stderr=asyncio.subprocess.PIPE, + ) + + stdout, stderr = await process.communicate() + return process.returncode == 0 + + async def down(self, volumes: bool = False) -> bool: + """ + Stop the Docker Compose environment. + + Args: + volumes: Whether to remove volumes + + Returns: + True if successful + """ + cmd = ["docker", "compose", "-p", self._project_name, "down"] + + if volumes: + cmd.append("-v") + + process = await asyncio.create_subprocess_exec( + *cmd, + cwd=self.compose_dir, + stdout=asyncio.subprocess.PIPE, + stderr=asyncio.subprocess.PIPE, + ) + + await process.communicate() + return process.returncode == 0 + + async def logs(self, follow: bool = False, tail: int = 100) -> str: + """ + Get logs from the Docker Compose environment. + + Args: + follow: Whether to follow logs + tail: Number of lines to show + + Returns: + Log output + """ + cmd = ["docker", "compose", "-p", self._project_name, "logs"] + + if not follow: + cmd.extend(["--tail", str(tail)]) + + process = await asyncio.create_subprocess_exec( + *cmd, + cwd=self.compose_dir, + stdout=asyncio.subprocess.PIPE, + stderr=asyncio.subprocess.PIPE, + ) + + stdout, stderr = await process.communicate() + return stdout.decode() diff --git a/config/benchmark_config.yaml b/config/benchmark_config.yaml new file mode 100644 index 0000000..f0d06f1 --- /dev/null +++ b/config/benchmark_config.yaml @@ -0,0 +1,95 @@ +# Open WebUI Benchmark Configuration +# Global settings for benchmark execution + +benchmark: + # Default target URL for Open WebUI instance + target_url: "http://localhost:8080" + + # Default timeout for HTTP requests (seconds) + request_timeout: 30 + + # Default timeout for WebSocket connections (seconds) + websocket_timeout: 60 + + # Number of times to repeat the full benchmark (for averaging results) + # Set to 1 for capacity testing, higher for statistical significance + iterations: 1 + + # Warm-up requests before actual benchmark + warmup_requests: 10 + + # Cool-down period between benchmarks (seconds) + cooldown_seconds: 5 + +# Output configuration +output: + # Directory for benchmark results (relative to benchmark root) + results_dir: "results" + + # Formats for output (json, csv, html) + formats: + - json + - csv + + # Include detailed timing information + include_timing_details: true + + # Include resource metrics from Docker + include_resource_metrics: true + +# Logging configuration +logging: + level: "INFO" + format: "%(asctime)s - %(name)s - %(levelname)s - %(message)s" + file: "logs/benchmark.log" + +# Default test user configuration +test_users: + admin: + email: "admin@benchmark.local" + password: "benchmark_admin_password_123" + name: "Benchmark Admin" + role: "admin" + + user_template: + email_pattern: "user{n}@benchmark.local" + password: "benchmark_user_password_123" + name_pattern: "Test User {n}" + role: "user" + +# Channel benchmark specific settings +channels: + # Maximum number of concurrent users to test + max_concurrent_users: 100 + + # Step size for ramping up users + user_step_size: 10 + + # Ramp-up time per step (seconds) + ramp_up_time: 5 + + # Sustain time at each level (seconds) + sustain_time: 30 + + # Message frequency per user (messages per second) + message_frequency: 0.5 + + # Message size distribution (bytes) + message_size: + min: 50 + max: 500 + avg: 200 + +# Performance thresholds for pass/fail criteria +thresholds: + # Maximum acceptable response time (ms) + max_response_time_ms: 2000 + + # Maximum acceptable 95th percentile response time (ms) + max_p95_response_time_ms: 3000 + + # Maximum acceptable error rate (percentage) + max_error_rate_percent: 1.0 + + # Minimum acceptable requests per second + min_requests_per_second: 10 diff --git a/config/compute_profiles.yaml b/config/compute_profiles.yaml new file mode 100644 index 0000000..60bc1c9 --- /dev/null +++ b/config/compute_profiles.yaml @@ -0,0 +1,82 @@ +# Open WebUI Benchmark - Compute Profiles +# Defines the resource constraints for different benchmark configurations + +profiles: + # Default profile for local MacBook testing + # Represents a realistic Docker container configuration + default: + name: "Default Local" + description: "Standard profile for local development and testing on MacBook" + resources: + cpus: 2.0 + memory: "8g" + memory_swap: "8g" + memory_reservation: "4g" + docker: + cpu_shares: 1024 + cpu_period: 100000 + cpu_quota: 200000 # 2 CPUs worth + + # Minimal profile for testing lower bounds + minimal: + name: "Minimal" + description: "Minimal resources to test lower performance bounds" + resources: + cpus: 1.0 + memory: "4g" + memory_swap: "4g" + memory_reservation: "2g" + docker: + cpu_shares: 512 + cpu_period: 100000 + cpu_quota: 100000 # 1 CPU worth + + # Standard cloud profile (small VM equivalent) + cloud_small: + name: "Cloud Small" + description: "Equivalent to a small cloud VM (2 vCPU, 4GB RAM)" + resources: + cpus: 2.0 + memory: "4g" + memory_swap: "4g" + memory_reservation: "2g" + docker: + cpu_shares: 1024 + cpu_period: 100000 + cpu_quota: 200000 + + # Medium cloud profile + cloud_medium: + name: "Cloud Medium" + description: "Equivalent to a medium cloud VM (4 vCPU, 8GB RAM)" + resources: + cpus: 4.0 + memory: "8g" + memory_swap: "8g" + memory_reservation: "4g" + docker: + cpu_shares: 2048 + cpu_period: 100000 + cpu_quota: 400000 + + # Large cloud profile + cloud_large: + name: "Cloud Large" + description: "Equivalent to a large cloud VM (8 vCPU, 16GB RAM)" + resources: + cpus: 8.0 + memory: "16g" + memory_swap: "16g" + memory_reservation: "8g" + docker: + cpu_shares: 4096 + cpu_period: 100000 + cpu_quota: 800000 + +# Default profile to use if not specified +default_profile: "default" + +# Network configuration for benchmarks +network: + name: "open-webui-benchmark" + driver: "bridge" diff --git a/docker/docker-compose.benchmark.yaml b/docker/docker-compose.benchmark.yaml new file mode 100644 index 0000000..1fc6beb --- /dev/null +++ b/docker/docker-compose.benchmark.yaml @@ -0,0 +1,40 @@ +# Docker Compose configuration for Open WebUI benchmarking +# This configuration includes resource constraints based on compute profiles + +services: + open-webui: + image: ghcr.io/open-webui/open-webui:${WEBUI_DOCKER_TAG:-main} + container_name: open-webui-benchmark + volumes: + - open-webui-benchmark-data:/app/backend/data + ports: + # Maps host port (default 8080) to container port 8080 (Open WebUI default) + - "${OPEN_WEBUI_PORT:-8080}:8080" + environment: + - WEBUI_SECRET_KEY=${WEBUI_SECRET_KEY:-benchmark-secret-key} + - ENABLE_WEBSOCKET_SUPPORT=true + # Disable external services for isolated benchmarking + - OLLAMA_BASE_URL= + - OPENAI_API_KEY= + extra_hosts: + - host.docker.internal:host-gateway + restart: unless-stopped + # Resource constraints - adjust based on compute profile + # Default profile: 2 CPUs, 8GB RAM + deploy: + resources: + limits: + cpus: "${CPU_LIMIT:-2.0}" + memory: "${MEMORY_LIMIT:-8g}" + reservations: + cpus: "${CPU_RESERVATION:-1.0}" + memory: "${MEMORY_RESERVATION:-4g}" + healthcheck: + test: ["CMD", "curl", "-f", "http://localhost:8080/health"] + interval: 30s + timeout: 10s + retries: 3 + start_period: 40s + +volumes: + open-webui-benchmark-data: diff --git a/docker/run.sh b/docker/run.sh new file mode 100755 index 0000000..283ab69 --- /dev/null +++ b/docker/run.sh @@ -0,0 +1,73 @@ +#!/bin/bash +# Run Open WebUI for benchmarking with a specific compute profile + +set -e + +SCRIPT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)" +BENCHMARK_DIR="$(dirname "$SCRIPT_DIR")" + +# Default profile values (matches "default" compute profile) +CPU_LIMIT="${CPU_LIMIT:-2.0}" +MEMORY_LIMIT="${MEMORY_LIMIT:-8g}" +CPU_RESERVATION="${CPU_RESERVATION:-1.0}" +MEMORY_RESERVATION="${MEMORY_RESERVATION:-4g}" +OPEN_WEBUI_PORT="${OPEN_WEBUI_PORT:-8080}" + +# Parse arguments +PROFILE="${1:-default}" + +case "$PROFILE" in + default) + CPU_LIMIT="2.0" + MEMORY_LIMIT="8g" + CPU_RESERVATION="1.0" + MEMORY_RESERVATION="4g" + ;; + minimal) + CPU_LIMIT="1.0" + MEMORY_LIMIT="4g" + CPU_RESERVATION="0.5" + MEMORY_RESERVATION="2g" + ;; + cloud_small) + CPU_LIMIT="2.0" + MEMORY_LIMIT="4g" + CPU_RESERVATION="1.0" + MEMORY_RESERVATION="2g" + ;; + cloud_medium) + CPU_LIMIT="4.0" + MEMORY_LIMIT="8g" + CPU_RESERVATION="2.0" + MEMORY_RESERVATION="4g" + ;; + cloud_large) + CPU_LIMIT="8.0" + MEMORY_LIMIT="16g" + CPU_RESERVATION="4.0" + MEMORY_RESERVATION="8g" + ;; + *) + echo "Unknown profile: $PROFILE" + echo "Available profiles: default, minimal, cloud_small, cloud_medium, cloud_large" + exit 1 + ;; +esac + +echo "Starting Open WebUI with profile: $PROFILE" +echo " CPU Limit: $CPU_LIMIT" +echo " Memory Limit: $MEMORY_LIMIT" +echo " Port: $OPEN_WEBUI_PORT" + +export CPU_LIMIT +export MEMORY_LIMIT +export CPU_RESERVATION +export MEMORY_RESERVATION +export OPEN_WEBUI_PORT + +cd "$SCRIPT_DIR" +docker compose -f docker-compose.benchmark.yaml up -d + +echo "" +echo "Open WebUI started at http://localhost:$OPEN_WEBUI_PORT" +echo "To stop: docker compose -f docker-compose.benchmark.yaml down" diff --git a/examples/custom_benchmark.py b/examples/custom_benchmark.py new file mode 100644 index 0000000..f154a7a --- /dev/null +++ b/examples/custom_benchmark.py @@ -0,0 +1,133 @@ +""" +Example showing how to create a custom benchmark. + +This demonstrates extending the base benchmark class to create +new benchmark scenarios. +""" + +import asyncio +import time +from typing import Optional + +from benchmark.core.base import BaseBenchmark +from benchmark.core.config import BenchmarkConfig +from benchmark.core.metrics import BenchmarkResult +from benchmark.clients.http_client import OpenWebUIClient + + +class APILatencyBenchmark(BaseBenchmark): + """ + Example custom benchmark that measures API endpoint latencies. + + This benchmark tests various API endpoints to establish baseline + latency measurements. + """ + + name = "API Latency Baseline" + description = "Measure baseline latency for key API endpoints" + version = "1.0.0" + + # Endpoints to test + ENDPOINTS = [ + ("GET", "/api/v1/channels/"), + ("GET", "/api/v1/auths/"), + ("GET", "/health"), + ] + + def __init__(self, config: BenchmarkConfig): + """Initialize the benchmark.""" + super().__init__(config) + self._client: Optional[OpenWebUIClient] = None + + async def setup(self) -> None: + """Set up the benchmark environment.""" + self._client = OpenWebUIClient( + self.config.target_url, + self.config.request_timeout, + ) + await self._client.connect() + + # Wait for service + if not await self._client.wait_for_ready(): + raise RuntimeError("Open WebUI service not ready") + + # Authenticate + try: + await self._client.signin( + "admin@benchmark.local", + "benchmark_admin_123", + ) + except Exception: + await self._client.signup( + "admin@benchmark.local", + "benchmark_admin_123", + "Benchmark Admin", + ) + + async def run(self) -> BenchmarkResult: + """Execute the benchmark.""" + self.metrics.start() + + # Number of requests per endpoint + requests_per_endpoint = 100 + + for method, endpoint in self.ENDPOINTS: + for _ in range(requests_per_endpoint): + start = time.time() + success = True + error = None + + try: + if method == "GET": + response = await self._client.client.get( + endpoint, + headers=self._client.headers, + ) + response.raise_for_status() + except Exception as e: + success = False + error = str(e) + + duration_ms = (time.time() - start) * 1000 + + self.metrics.record_timing( + operation=f"{method} {endpoint}", + duration_ms=duration_ms, + success=success, + error=error, + ) + + # Small delay between requests + await asyncio.sleep(0.01) + + self.metrics.stop() + + return self.metrics.get_result( + benchmark_name=self.name, + metadata={ + "endpoints_tested": len(self.ENDPOINTS), + "requests_per_endpoint": requests_per_endpoint, + }, + ) + + async def teardown(self) -> None: + """Clean up resources.""" + if self._client: + await self._client.close() + + +async def main(): + """Run the custom benchmark.""" + from benchmark.core.config import load_config + from benchmark.core.runner import BenchmarkRunner + + config = load_config("default") + + runner = BenchmarkRunner(config=config) + result = await runner.run_benchmark(APILatencyBenchmark) + + runner.display_final_summary() + + +if __name__ == "__main__": + asyncio.run(main()) diff --git a/examples/run_channel_benchmark.py b/examples/run_channel_benchmark.py new file mode 100644 index 0000000..abfa606 --- /dev/null +++ b/examples/run_channel_benchmark.py @@ -0,0 +1,62 @@ +""" +Example script demonstrating how to run the channel concurrency benchmark. + +This script can be run directly or used as a reference for programmatic benchmark execution. +""" + +import asyncio +from pathlib import Path + +from benchmark.core.config import load_config +from benchmark.core.runner import BenchmarkRunner +from benchmark.scenarios.channels import ChannelConcurrencyBenchmark + + +async def main(): + """Run the channel concurrency benchmark with custom settings.""" + + # Load configuration with the default compute profile + config = load_config( + profile_id="default", + overrides={ + "target_url": "http://localhost:3000", + } + ) + + # Customize channel benchmark settings + config.channels.max_concurrent_users = 50 # Test up to 50 users + config.channels.user_step_size = 10 # Increase by 10 users each level + config.channels.sustain_time = 20 # Run each level for 20 seconds + config.channels.message_frequency = 0.5 # 0.5 messages per second per user + + # Create benchmark runner + runner = BenchmarkRunner( + config=config, + profile_id="default", + output_dir=Path("./results"), + ) + + # Run the benchmark + print("Starting Channel Concurrency Benchmark...") + print(f"Target: {config.target_url}") + print(f"Max users: {config.channels.max_concurrent_users}") + print() + + result = await runner.run_benchmark(ChannelConcurrencyBenchmark) + + # Display results + runner.display_final_summary() + + # Access specific metrics + print(f"\n--- Benchmark Complete ---") + print(f"Maximum sustainable users: {result.metadata.get('max_sustainable_users', 'N/A')}") + print(f"Average response time: {result.avg_response_time_ms:.2f}ms") + print(f"P95 response time: {result.p95_response_time_ms:.2f}ms") + print(f"Error rate: {result.error_rate_percent:.2f}%") + print(f"Passed: {'Yes' if result.passed else 'No'}") + + return result + + +if __name__ == "__main__": + asyncio.run(main()) diff --git a/pyproject.toml b/pyproject.toml new file mode 100644 index 0000000..e07100a --- /dev/null +++ b/pyproject.toml @@ -0,0 +1,57 @@ +[project] +name = "open-webui-benchmark" +version = "0.1.0" +description = "Benchmarking suite for Open WebUI" +readme = "README.md" +requires-python = ">=3.11" +license = { text = "MIT" } + +# Dependencies - using existing Open WebUI dependencies where possible +dependencies = [ + # Already in Open WebUI - reusing + "httpx>=0.28.0", # HTTP client (already in Open WebUI) + "aiohttp>=3.12.0", # Async HTTP (already in Open WebUI) + "python-socketio>=5.13.0", # WebSocket client (already in Open WebUI) + "pydantic>=2.11.0", # Data validation (already in Open WebUI) + "PyYAML>=6.0", # YAML parsing (already in Open WebUI via dependencies) + "python-dotenv>=1.0.0", # Environment file loading (already in Open WebUI) + + # Benchmark-specific additions + "locust>=2.32.0", # Load testing framework + "rich>=13.0.0", # Beautiful terminal output + "docker>=7.0.0", # Docker SDK for resource management + "psutil>=5.9.0", # System monitoring (already in Open WebUI) + "pandas>=2.2.0", # Data analysis (already in Open WebUI) + "matplotlib>=3.8.0", # Plotting results +] + +[project.optional-dependencies] +dev = [ + "pytest>=8.0.0", + "pytest-asyncio>=0.23.0", + "black>=24.0.0", + "ruff>=0.5.0", +] + +[project.scripts] +owb = "benchmark.cli:main" +open-webui-benchmark = "benchmark.cli:main" + +[build-system] +requires = ["hatchling"] +build-backend = "hatchling.build" + +[tool.hatch.build.targets.wheel] +packages = ["benchmark"] + +[tool.pytest.ini_options] +asyncio_mode = "auto" +testpaths = ["tests"] + +[tool.black] +line-length = 88 +target-version = ["py311"] + +[tool.ruff] +line-length = 88 +target-version = "py311" diff --git a/tests/test_core.py b/tests/test_core.py new file mode 100644 index 0000000..a8a47de --- /dev/null +++ b/tests/test_core.py @@ -0,0 +1,174 @@ +""" +Tests for the benchmark core modules. +""" + +import pytest +import asyncio +from datetime import datetime + +from benchmark.core.config import ( + BenchmarkConfig, + ConfigLoader, + ComputeProfile, + ResourceConfig, + DockerConfig, +) +from benchmark.core.metrics import MetricsCollector, BenchmarkResult, TimingRecord + + +class TestMetricsCollector: + """Tests for MetricsCollector.""" + + def test_time_operation_success(self): + """Test timing a successful operation.""" + collector = MetricsCollector() + + with collector.time_operation("test_op"): + pass # Simulate operation + + result = collector.get_result("test") + assert result.total_requests == 1 + assert result.successful_requests == 1 + assert result.failed_requests == 0 + + def test_time_operation_failure(self): + """Test timing a failed operation.""" + collector = MetricsCollector() + + try: + with collector.time_operation("test_op"): + raise ValueError("Test error") + except ValueError: + pass + + result = collector.get_result("test") + assert result.total_requests == 1 + assert result.successful_requests == 0 + assert result.failed_requests == 1 + assert "Test error" in result.errors + + def test_record_timing(self): + """Test manual timing recording.""" + collector = MetricsCollector() + + collector.record_timing("op1", 100.0, success=True) + collector.record_timing("op2", 200.0, success=True) + collector.record_timing("op3", 300.0, success=False, error="Fail") + + result = collector.get_result("test") + assert result.total_requests == 3 + assert result.successful_requests == 2 + assert result.failed_requests == 1 + + def test_percentile_calculation(self): + """Test percentile calculations.""" + collector = MetricsCollector() + + # Add 100 timings with known values + for i in range(1, 101): + collector.record_timing("op", float(i), success=True) + + result = collector.get_result("test") + + # P50 should be around 50 + assert 49 <= result.p50_response_time_ms <= 51 + + # P95 should be around 95 + assert 94 <= result.p95_response_time_ms <= 96 + + # P99 should be around 99 + assert 98 <= result.p99_response_time_ms <= 100 + + def test_reset(self): + """Test metrics reset.""" + collector = MetricsCollector() + + collector.record_timing("op", 100.0, success=True) + collector.reset() + + result = collector.get_result("test") + assert result.total_requests == 0 + + +class TestBenchmarkResult: + """Tests for BenchmarkResult.""" + + def test_to_dict(self): + """Test dictionary conversion.""" + result = BenchmarkResult( + benchmark_name="Test", + total_requests=100, + successful_requests=95, + failed_requests=5, + avg_response_time_ms=150.5, + concurrent_users=10, + ) + + d = result.to_dict() + + assert d["benchmark_name"] == "Test" + assert d["total_requests"] == 100 + assert d["successful_requests"] == 95 + assert d["avg_response_time_ms"] == 150.5 + + def test_to_json(self): + """Test JSON conversion.""" + result = BenchmarkResult( + benchmark_name="Test", + total_requests=50, + ) + + json_str = result.to_json() + + assert "Test" in json_str + assert "50" in json_str + + +class TestConfigLoader: + """Tests for ConfigLoader.""" + + def test_load_compute_profiles(self, tmp_path): + """Test loading compute profiles from YAML.""" + # Create test config file + config_dir = tmp_path / "config" + config_dir.mkdir() + + profiles_yaml = """ +profiles: + test: + name: "Test Profile" + description: "A test profile" + resources: + cpus: 2.0 + memory: "4g" + memory_swap: "4g" + memory_reservation: "2g" + docker: + cpu_shares: 1024 + cpu_period: 100000 + cpu_quota: 200000 +""" + (config_dir / "compute_profiles.yaml").write_text(profiles_yaml) + + loader = ConfigLoader(config_dir) + profiles = loader.load_compute_profiles() + + assert "test" in profiles + assert profiles["test"].name == "Test Profile" + assert profiles["test"].resources.cpus == 2.0 + + +class TestTimingRecord: + """Tests for TimingRecord.""" + + def test_duration_calculation(self): + """Test duration calculation.""" + record = TimingRecord( + operation="test", + start_time=0.0, + end_time=0.5, + success=True, + ) + + assert record.duration_seconds == 0.5 + assert record.duration_ms == 500.0