Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
185 changes: 176 additions & 9 deletions benchmark-scripts/stream_density_latency_oa.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import psutil
import shlex
import subprocess
import shutil
from typing import Dict, List, Optional
from dataclasses import dataclass, field
from datetime import datetime
Expand All @@ -40,6 +41,7 @@ class IterationResult:
passed: bool
memory_percent: float
timestamp: str
station_results: Dict = field(default_factory=dict) # Per-station validation results


@dataclass
Expand Down Expand Up @@ -146,6 +148,8 @@ def run(self) -> DensityResult:

workers = 1
best_result: Optional[IterationResult] = None
consecutive_failures = 0 # Track consecutive 0-transaction failures
MAX_CONSECUTIVE_FAILURES = 2 # Stop after this many consecutive failures

for iteration in range(1, self.MAX_ITERATIONS + 1):
print(f"\n{'='*60}")
Expand Down Expand Up @@ -188,10 +192,27 @@ def run(self) -> DensityResult:
if current_latency <= self.target_latency_ms and current_latency > 0:
print(f" ✓ PASSED (latency {current_latency/1000:.1f}s <= {self.target_latency_ms/1000:.1f}s)")
best_result = result
consecutive_failures = 0 # Reset on success
workers += self.worker_increment
elif current_latency == 0:
print(f" ⚠ NO DATA - No latency measurements collected")
print(f" Trying next iteration anyway...")
consecutive_failures += 1
print(f" ⚠ NO DATA - No latency measurements collected ({consecutive_failures}/{MAX_CONSECUTIVE_FAILURES} consecutive failures)")
if consecutive_failures >= MAX_CONSECUTIVE_FAILURES:
print(f"\n ✗ STOPPING: {MAX_CONSECUTIVE_FAILURES} consecutive failures detected (likely GPU OOM)")
print(f" Maximum sustainable workers: {best_result.workers if best_result else 0}")
break
print(f" Trying next iteration...")
workers += self.worker_increment
elif current_latency < 0:
consecutive_failures += 1
print(f" ⚠ CORRUPTED METRICS - Negative latency {current_latency:.0f}ms detected")
print(f" This usually means a video-loop ID collision in the metrics file.")
if consecutive_failures >= MAX_CONSECUTIVE_FAILURES:
print(f"\n ✗ STOPPING: {MAX_CONSECUTIVE_FAILURES} consecutive failures detected")
print(f" Maximum sustainable workers: {best_result.workers if best_result else 0}")
break
print(f" Discarding this iteration result; trying next...")
self.iterations.pop() # Remove the corrupt entry already appended above
workers += self.worker_increment
elif current_latency < 0:
print(f" ⚠ CORRUPTED METRICS - Negative latency {current_latency:.0f}ms detected")
Expand All @@ -203,8 +224,8 @@ def run(self) -> DensityResult:
print(f" ✗ FAILED (latency {current_latency/1000:.1f}s > {self.target_latency_ms/1000:.1f}s)")
break

# Final cleanup
self._docker_compose("--profile parallel down")
# Final cleanup (with volumes to ensure clean state for next run)
self._docker_compose("--profile parallel down -v")

# Build result
density_result = DensityResult(
Expand All @@ -229,16 +250,39 @@ def _run_iteration(self, workers: int) -> IterationResult:
Run a single benchmark iteration with specified workers.

Polls for transactions instead of waiting fixed duration.
Cleans MinIO data before each iteration for clean results.
"""
iteration_num = len(self.iterations) + 1

# Set environment for workers
self.env_vars["WORKERS"] = str(workers)
self.env_vars["VLM_WORKERS"] = str(workers)
self.env_vars["SERVICE_MODE"] = "parallel"
# Set LOOP_COUNT to get enough transactions
# Each loop generates 1 transaction per station (worker)
# Need min_transactions per worker, so LOOP_COUNT = min_transactions
self.env_vars["LOOP_COUNT"] = str(self.min_transactions)

# Stop any existing containers
# Stop any existing containers (with volume cleanup to clear sync state)
print("Stopping existing containers...")
self._docker_compose("--profile parallel down")
time.sleep(5)
self._docker_compose("--profile parallel down -v")
time.sleep(3)

# Clean MinIO data for fresh start
print("Cleaning MinIO data for fresh iteration...")
self._clean_minio()

# Clean sync volume (CRITICAL: prevents 2PC deadlock on iteration 2+)
# Note: docker compose down -v should handle this, but double-check
print("Cleaning sync volume...")
self._clean_sync_volume()

# Clean station result files from previous iteration
self._clean_station_results()

# Clean metrics files so transaction count starts from 0
print("Cleaning metrics files...")
self._clean_metrics_files()

# Start containers
print("Starting containers...")
Expand Down Expand Up @@ -275,6 +319,11 @@ def _run_iteration(self, workers: int) -> IterationResult:
vlm_metrics = self._collect_vlm_logger_metrics()
memory_percent = psutil.virtual_memory().percent

# Collect station results before stopping containers
print("Collecting station results...")
station_results = self._collect_station_results(iteration_num, workers)
print(f" Collected results from {len(station_results)} station(s)")

# Stop containers
print("Stopping containers...")
self._docker_compose("--profile parallel down")
Expand All @@ -288,7 +337,8 @@ def _run_iteration(self, workers: int) -> IterationResult:
total_transactions=vlm_metrics.get("total_transactions", 0),
passed=True, # Will be updated after comparison
memory_percent=memory_percent,
timestamp=datetime.now().strftime("%Y-%m-%d %H:%M:%S")
timestamp=datetime.now().strftime("%Y-%m-%d %H:%M:%S"),
station_results=station_results
)

def _collect_vlm_logger_metrics(self) -> Dict:
Expand Down Expand Up @@ -409,6 +459,122 @@ def _clean_metrics_files(self):
except OSError:
pass

def _clean_minio(self):
"""Clean MinIO data bucket before each iteration."""
print("Cleaning MinIO data...")

# Stop MinIO container if running
subprocess.run(
"docker stop oa_minio 2>/dev/null || true",
shell=True,
capture_output=True
)

# Remove MinIO container
subprocess.run(
"docker rm oa_minio 2>/dev/null || true",
shell=True,
capture_output=True
)

# Remove MinIO volume
subprocess.run(
"docker volume rm take-away_minio_data 2>/dev/null || true",
shell=True,
capture_output=True
)

print(" MinIO data cleared")

def _clean_sync_volume(self):
"""Clean pipeline sync volume to prevent 2PC deadlock between iterations.

The pipeline_sync volume contains ready/prepare/commit signal files.
If not cleaned, stale files from previous iteration cause 2PC protocol
to deadlock (pipelines wait for COMMIT that was already issued).
"""
print("Cleaning pipeline sync volume...")

# Remove the sync volume - it will be recreated on next compose up
result = subprocess.run(
"docker volume rm take-away_pipeline_sync 2>/dev/null || true",
shell=True,
capture_output=True,
text=True
)

# Also try without the project prefix (depends on compose project name)
subprocess.run(
"docker volume rm pipeline_sync 2>/dev/null || true",
shell=True,
capture_output=True
)

# List remaining sync-related volumes for debugging
result = subprocess.run(
"docker volume ls | grep -i sync || true",
shell=True,
capture_output=True,
text=True
)
if result.stdout.strip():
print(f" Warning: Remaining sync volumes: {result.stdout.strip()}")
else:
print(" Pipeline sync volume cleared")

def _clean_station_results(self):
"""Clean previous station result files."""
patterns = [
"station_*_report.md",
"station_*_summary.json"
]

for pattern in patterns:
for f in glob.glob(os.path.join(self.results_dir, pattern)):
try:
os.remove(f)
except OSError:
pass

print(" Station results cleared")

def _collect_station_results(self, iteration_num: int, workers: int) -> Dict:
"""Collect station results for this iteration."""
station_results = {}

for station_id in range(1, workers + 1):
summary_file = os.path.join(self.results_dir, f"station_{station_id}_summary.json")
report_file = os.path.join(self.results_dir, f"station_{station_id}_report.md")

if os.path.exists(summary_file):
try:
with open(summary_file, 'r') as f:
summary_data = json.load(f)
station_results[f"station_{station_id}"] = summary_data

# Write to iteration-specific file (avoid shutil.copy permission issues)
iter_summary_file = os.path.join(
self.results_dir,
f"iteration_{iteration_num}_station_{station_id}_summary.json"
)
with open(iter_summary_file, 'w') as f:
json.dump(summary_data, f, indent=2)

if os.path.exists(report_file):
iter_report_file = os.path.join(
self.results_dir,
f"iteration_{iteration_num}_station_{station_id}_report.md"
)
with open(report_file, 'r') as f:
report_content = f.read()
with open(iter_report_file, 'w') as f:
f.write(report_content)

except (json.JSONDecodeError, IOError, PermissionError) as e:
print(f" Warning: Could not read station {station_id} summary: {e}")

return station_results

def _check_memory_available(self, workers: int) -> bool:
"""Check if memory is available for workers."""
mem = psutil.virtual_memory()
Expand Down Expand Up @@ -447,7 +613,8 @@ def _export_results(self, result: DensityResult):
"total_transactions": it.total_transactions,
"passed": it.passed,
"memory_percent": it.memory_percent,
"timestamp": it.timestamp
"timestamp": it.timestamp,
"station_results": it.station_results
}
for it in result.iterations
],
Expand Down
Loading