diff --git a/benchmark-scripts/stream_density_latency_oa.py b/benchmark-scripts/stream_density_latency_oa.py index 38563fc..aa0c940 100644 --- a/benchmark-scripts/stream_density_latency_oa.py +++ b/benchmark-scripts/stream_density_latency_oa.py @@ -24,6 +24,7 @@ import psutil import shlex import subprocess +import shutil from typing import Dict, List, Optional from dataclasses import dataclass, field from datetime import datetime @@ -40,6 +41,7 @@ class IterationResult: passed: bool memory_percent: float timestamp: str + station_results: Dict = field(default_factory=dict) # Per-station validation results @dataclass @@ -146,6 +148,8 @@ def run(self) -> DensityResult: workers = 1 best_result: Optional[IterationResult] = None + consecutive_failures = 0 # Track consecutive 0-transaction failures + MAX_CONSECUTIVE_FAILURES = 2 # Stop after this many consecutive failures for iteration in range(1, self.MAX_ITERATIONS + 1): print(f"\n{'='*60}") @@ -188,10 +192,27 @@ def run(self) -> DensityResult: if current_latency <= self.target_latency_ms and current_latency > 0: print(f" ✓ PASSED (latency {current_latency/1000:.1f}s <= {self.target_latency_ms/1000:.1f}s)") best_result = result + consecutive_failures = 0 # Reset on success workers += self.worker_increment elif current_latency == 0: - print(f" ⚠ NO DATA - No latency measurements collected") - print(f" Trying next iteration anyway...") + consecutive_failures += 1 + print(f" ⚠ NO DATA - No latency measurements collected ({consecutive_failures}/{MAX_CONSECUTIVE_FAILURES} consecutive failures)") + if consecutive_failures >= MAX_CONSECUTIVE_FAILURES: + print(f"\n ✗ STOPPING: {MAX_CONSECUTIVE_FAILURES} consecutive failures detected (likely GPU OOM)") + print(f" Maximum sustainable workers: {best_result.workers if best_result else 0}") + break + print(f" Trying next iteration...") + workers += self.worker_increment + elif current_latency < 0: + consecutive_failures += 1 + print(f" ⚠ CORRUPTED METRICS - Negative latency {current_latency:.0f}ms detected") + print(f" This usually means a video-loop ID collision in the metrics file.") + if consecutive_failures >= MAX_CONSECUTIVE_FAILURES: + print(f"\n ✗ STOPPING: {MAX_CONSECUTIVE_FAILURES} consecutive failures detected") + print(f" Maximum sustainable workers: {best_result.workers if best_result else 0}") + break + print(f" Discarding this iteration result; trying next...") + self.iterations.pop() # Remove the corrupt entry already appended above workers += self.worker_increment elif current_latency < 0: print(f" ⚠ CORRUPTED METRICS - Negative latency {current_latency:.0f}ms detected") @@ -203,8 +224,8 @@ def run(self) -> DensityResult: print(f" ✗ FAILED (latency {current_latency/1000:.1f}s > {self.target_latency_ms/1000:.1f}s)") break - # Final cleanup - self._docker_compose("--profile parallel down") + # Final cleanup (with volumes to ensure clean state for next run) + self._docker_compose("--profile parallel down -v") # Build result density_result = DensityResult( @@ -229,16 +250,39 @@ def _run_iteration(self, workers: int) -> IterationResult: Run a single benchmark iteration with specified workers. Polls for transactions instead of waiting fixed duration. + Cleans MinIO data before each iteration for clean results. """ + iteration_num = len(self.iterations) + 1 + # Set environment for workers self.env_vars["WORKERS"] = str(workers) self.env_vars["VLM_WORKERS"] = str(workers) self.env_vars["SERVICE_MODE"] = "parallel" + # Set LOOP_COUNT to get enough transactions + # Each loop generates 1 transaction per station (worker) + # Need min_transactions per worker, so LOOP_COUNT = min_transactions + self.env_vars["LOOP_COUNT"] = str(self.min_transactions) - # Stop any existing containers + # Stop any existing containers (with volume cleanup to clear sync state) print("Stopping existing containers...") - self._docker_compose("--profile parallel down") - time.sleep(5) + self._docker_compose("--profile parallel down -v") + time.sleep(3) + + # Clean MinIO data for fresh start + print("Cleaning MinIO data for fresh iteration...") + self._clean_minio() + + # Clean sync volume (CRITICAL: prevents 2PC deadlock on iteration 2+) + # Note: docker compose down -v should handle this, but double-check + print("Cleaning sync volume...") + self._clean_sync_volume() + + # Clean station result files from previous iteration + self._clean_station_results() + + # Clean metrics files so transaction count starts from 0 + print("Cleaning metrics files...") + self._clean_metrics_files() # Start containers print("Starting containers...") @@ -275,6 +319,11 @@ def _run_iteration(self, workers: int) -> IterationResult: vlm_metrics = self._collect_vlm_logger_metrics() memory_percent = psutil.virtual_memory().percent + # Collect station results before stopping containers + print("Collecting station results...") + station_results = self._collect_station_results(iteration_num, workers) + print(f" Collected results from {len(station_results)} station(s)") + # Stop containers print("Stopping containers...") self._docker_compose("--profile parallel down") @@ -288,7 +337,8 @@ def _run_iteration(self, workers: int) -> IterationResult: total_transactions=vlm_metrics.get("total_transactions", 0), passed=True, # Will be updated after comparison memory_percent=memory_percent, - timestamp=datetime.now().strftime("%Y-%m-%d %H:%M:%S") + timestamp=datetime.now().strftime("%Y-%m-%d %H:%M:%S"), + station_results=station_results ) def _collect_vlm_logger_metrics(self) -> Dict: @@ -409,6 +459,122 @@ def _clean_metrics_files(self): except OSError: pass + def _clean_minio(self): + """Clean MinIO data bucket before each iteration.""" + print("Cleaning MinIO data...") + + # Stop MinIO container if running + subprocess.run( + "docker stop oa_minio 2>/dev/null || true", + shell=True, + capture_output=True + ) + + # Remove MinIO container + subprocess.run( + "docker rm oa_minio 2>/dev/null || true", + shell=True, + capture_output=True + ) + + # Remove MinIO volume + subprocess.run( + "docker volume rm take-away_minio_data 2>/dev/null || true", + shell=True, + capture_output=True + ) + + print(" MinIO data cleared") + + def _clean_sync_volume(self): + """Clean pipeline sync volume to prevent 2PC deadlock between iterations. + + The pipeline_sync volume contains ready/prepare/commit signal files. + If not cleaned, stale files from previous iteration cause 2PC protocol + to deadlock (pipelines wait for COMMIT that was already issued). + """ + print("Cleaning pipeline sync volume...") + + # Remove the sync volume - it will be recreated on next compose up + result = subprocess.run( + "docker volume rm take-away_pipeline_sync 2>/dev/null || true", + shell=True, + capture_output=True, + text=True + ) + + # Also try without the project prefix (depends on compose project name) + subprocess.run( + "docker volume rm pipeline_sync 2>/dev/null || true", + shell=True, + capture_output=True + ) + + # List remaining sync-related volumes for debugging + result = subprocess.run( + "docker volume ls | grep -i sync || true", + shell=True, + capture_output=True, + text=True + ) + if result.stdout.strip(): + print(f" Warning: Remaining sync volumes: {result.stdout.strip()}") + else: + print(" Pipeline sync volume cleared") + + def _clean_station_results(self): + """Clean previous station result files.""" + patterns = [ + "station_*_report.md", + "station_*_summary.json" + ] + + for pattern in patterns: + for f in glob.glob(os.path.join(self.results_dir, pattern)): + try: + os.remove(f) + except OSError: + pass + + print(" Station results cleared") + + def _collect_station_results(self, iteration_num: int, workers: int) -> Dict: + """Collect station results for this iteration.""" + station_results = {} + + for station_id in range(1, workers + 1): + summary_file = os.path.join(self.results_dir, f"station_{station_id}_summary.json") + report_file = os.path.join(self.results_dir, f"station_{station_id}_report.md") + + if os.path.exists(summary_file): + try: + with open(summary_file, 'r') as f: + summary_data = json.load(f) + station_results[f"station_{station_id}"] = summary_data + + # Write to iteration-specific file (avoid shutil.copy permission issues) + iter_summary_file = os.path.join( + self.results_dir, + f"iteration_{iteration_num}_station_{station_id}_summary.json" + ) + with open(iter_summary_file, 'w') as f: + json.dump(summary_data, f, indent=2) + + if os.path.exists(report_file): + iter_report_file = os.path.join( + self.results_dir, + f"iteration_{iteration_num}_station_{station_id}_report.md" + ) + with open(report_file, 'r') as f: + report_content = f.read() + with open(iter_report_file, 'w') as f: + f.write(report_content) + + except (json.JSONDecodeError, IOError, PermissionError) as e: + print(f" Warning: Could not read station {station_id} summary: {e}") + + return station_results + def _check_memory_available(self, workers: int) -> bool: """Check if memory is available for workers.""" mem = psutil.virtual_memory() @@ -447,7 +613,8 @@ def _export_results(self, result: DensityResult): "total_transactions": it.total_transactions, "passed": it.passed, "memory_percent": it.memory_percent, - "timestamp": it.timestamp + "timestamp": it.timestamp, + "station_results": it.station_results } for it in result.iterations ],