Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
176 changes: 30 additions & 146 deletions benchmark-scripts/benchmark_order_accuracy.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,14 @@
Order Accuracy Benchmark Script

Orchestrates benchmark execution for the Order Accuracy pipeline.
Integrates with performance-tools for metrics collection and stream density analysis.
Integrates with performance-tools for metrics collection.

Usage:
python benchmark_order_accuracy.py --compose_file ../../docker-compose.yaml --pipelines 1
python benchmark_order_accuracy.py --compose_file ../../docker-compose.yaml --target_fps 14.95
python benchmark_order_accuracy.py --compose_file ../../docker-compose.yaml --workers 2 --duration 300

Note: For stream density testing, use the application-specific scripts directly:
- Take-Away: stream_density_latency_oa.py (RTSP/workers based)
- Dine-In: stream_density_oa_dine_in.py (concurrent images based)
"""

import argparse
Expand All @@ -18,30 +21,29 @@
import json
import csv
from pathlib import Path
from typing import List, Dict, Optional, Tuple
from typing import List, Dict, Optional

# Import from performance-tools benchmark scripts
sys.path.insert(0, os.path.dirname(os.path.abspath(__file__)))
import stream_density
from stream_density_latency_oa import OrderAccuracyStreamDensity


class OrderAccuracyBenchmark:
"""
Benchmark orchestrator for Order Accuracy pipeline.

Supports two modes:
1. Fixed workers: Runs N station workers for specified duration
2. Stream density: Finds maximum workers that maintain target throughput

Runs N station workers for specified duration and collects metrics.
Order Accuracy uses WORKERS to scale, not traditional pipelines.
Each worker processes one RTSP station stream.

For stream density testing, use application-specific scripts:
- Take-Away: stream_density_latency_oa.py
- Dine-In: stream_density_oa_dine_in.py
"""

# Default configuration
DEFAULT_INIT_DURATION = 120 # seconds
DEFAULT_DURATION = 300 # seconds
DEFAULT_TARGET_FPS = 14.95
DEFAULT_WORKERS = 1

def __init__(
Expand Down Expand Up @@ -154,66 +156,6 @@ def run_fixed_workers(

return results

def run_stream_density(
self,
target_fps: float,
init_duration: int,
density_increment: Optional[int] = None,
container_name: str = "order-accuracy-vlm"
) -> Tuple[int, bool, Dict]:
"""
Run stream density benchmark to find maximum pipelines.

Args:
target_fps: Target FPS to maintain per stream
init_duration: Warmup duration in seconds
density_increment: Pipeline count increment (auto if None)
container_name: Container to monitor

Returns:
Tuple of (max_pipelines, met_target, metrics_dict)
"""
print(f"\n{'='*60}")
print(f"Order Accuracy Benchmark - Stream Density Mode")
print(f"Target FPS: {target_fps}")
print(f"Init Duration: {init_duration}s")
print(f"Container: {container_name}")
print(f"{'='*60}\n")

# Configure stream density
self.env_vars["TARGET_FPS"] = str(target_fps)
self.env_vars["INIT_DURATION"] = str(init_duration)
self.env_vars["CONTAINER_NAME"] = container_name

if density_increment:
self.env_vars["PIPELINE_INC"] = str(density_increment)

# Use Order Accuracy specific stream density
oa_density = OrderAccuracyStreamDensity(
self.env_vars,
self.compose_files,
self.results_dir
)

max_pipelines, met_target = oa_density.run_iterations(
target_fps=target_fps,
container_name=container_name
)

# Collect final metrics
metrics = oa_density.get_final_metrics()

# Export results
self._export_results({
"mode": "stream_density",
"target_fps": target_fps,
"max_pipelines": max_pipelines,
"met_target": met_target,
"metrics": metrics
}, "stream_density")

return max_pipelines, met_target, metrics

def _clean_pipeline_logs(self):
"""Remove previous pipeline log files."""
import glob
Expand Down Expand Up @@ -474,11 +416,15 @@ def parse_args():
# Run with 2 workers for 5 minutes
python benchmark_order_accuracy.py --compose_file ../../docker-compose.yaml --workers 2 --duration 300

# Run stream density to find max workers at target throughput
python benchmark_order_accuracy.py --compose_file ../../docker-compose.yaml --target_fps 15.0
# Run with 1 worker (default)
python benchmark_order_accuracy.py --compose_file ../../docker-compose.yaml

For stream density testing, use application-specific scripts:
# Take-Away (RTSP/workers based)
python stream_density_latency_oa.py --compose_files ../../docker-compose.yaml

# Run stream density with custom increment
python benchmark_order_accuracy.py --compose_file ../../docker-compose.yaml --target_fps 14.95 --density_increment 2
# Dine-In (concurrent images based)
python stream_density_oa_dine_in.py --compose_file ../../docker-compose.yml
"""
)

Expand All @@ -493,24 +439,8 @@ def parse_args():
parser.add_argument(
'--workers',
type=int,
default=0,
help='Number of station workers to run (0 = use stream density mode)'
)

parser.add_argument(
'--target_fps',
type=float,
nargs='*',
default=None,
help='Target FPS for stream density mode'
)

parser.add_argument(
'--container_names',
type=str,
nargs='*',
default=None,
help='Container names for stream density (1:1 with target_fps)'
default=1,
help='Number of station workers to run'
)

parser.add_argument(
Expand All @@ -527,13 +457,6 @@ def parse_args():
help='Benchmark duration in seconds'
)

parser.add_argument(
'--density_increment',
type=int,
default=None,
help='Worker increment for stream density'
)

parser.add_argument(
'--results_dir',
type=str,
Expand Down Expand Up @@ -586,53 +509,14 @@ def main():
target_device=args.target_device
)

# Determine mode
target_fps_list = args.target_fps if args.target_fps else []
container_names = args.container_names if args.container_names else []

if args.workers > 0:
# Fixed workers mode
print("Running in fixed workers mode...")
results = benchmark.run_fixed_workers(
workers=args.workers,
init_duration=args.init_duration,
duration=args.duration
)
print(f"\nBenchmark complete. Results: {results}")

elif target_fps_list:
# Stream density mode
print("Running in stream density mode...")

if len(target_fps_list) > 1 and len(target_fps_list) != len(container_names):
print("Error: Number of target_fps values must match container_names")
sys.exit(1)

for i, target_fps in enumerate(target_fps_list):
container_name = container_names[i] if container_names else "oa_service"

max_workers, met_target, metrics = benchmark.run_stream_density(
target_fps=target_fps,
init_duration=args.init_duration,
density_increment=args.density_increment,
container_name=container_name
)

print(f"\n{'='*60}")
print(f"Stream Density Results:")
print(f" Target FPS: {target_fps}")
print(f" Max Workers: {max_workers}")
print(f" Met Target: {met_target}")
print(f"{'='*60}")
else:
# Default: single worker
print("No mode specified. Running with 1 worker...")
results = benchmark.run_fixed_workers(
workers=1,
init_duration=args.init_duration,
duration=args.duration
)
print(f"\nBenchmark complete. Results: {results}")
# Run fixed workers benchmark
print(f"Running benchmark with {args.workers} worker(s)...")
results = benchmark.run_fixed_workers(
workers=args.workers,
init_duration=args.init_duration,
duration=args.duration
)
print(f"\nBenchmark complete. Results: {results}")


if __name__ == '__main__':
Expand Down
3 changes: 2 additions & 1 deletion benchmark-scripts/requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -3,4 +3,5 @@ numpy>=1.26.0
pandas>=2.1.0
natsort>=8.4.0
matplotlib==3.10.3
psutil
psutil
aiohttp>=3.9.0
105 changes: 8 additions & 97 deletions benchmark-scripts/stream_density_oa_dine_in.py
Original file line number Diff line number Diff line change
Expand Up @@ -406,8 +406,10 @@ def run(self) -> DineInDensityResult:
print(f"Memory threshold exceeded. Stopping at density={density - self.density_increment}")
break

# Clean previous metrics
self._clean_metrics_files()
# Clean previous metrics only on first iteration
# (subsequent iterations append to metrics files)
if iteration == 1:
self._clean_metrics_files()

# Start services if not running (first iteration)
if iteration == 1:
Expand Down Expand Up @@ -500,22 +502,17 @@ def _run_iteration(self, density: int, iteration: int) -> DineInIterationResult:
successful = len(latencies)
failed = len(results) - successful

# Calculate metrics
# Calculate metrics from HTTP response latencies
avg_latency = sum(latencies) / len(latencies) if latencies else 0
sorted_latencies = sorted(latencies) if latencies else [0]
p95_idx = int(len(sorted_latencies) * 0.95) if sorted_latencies else 0
p95_latency = sorted_latencies[min(p95_idx, len(sorted_latencies) - 1)]
min_latency = min(sorted_latencies) if sorted_latencies else 0
max_latency = max(sorted_latencies) if sorted_latencies else 0

# Collect VLM metrics from logger files
vlm_metrics = self._collect_vlm_logger_metrics()
avg_tps = vlm_metrics.get("avg_tps", 0.0)

# If VLM metrics have better latency data, use them
if vlm_metrics.get("avg_latency_ms", 0) > 0:
avg_latency = vlm_metrics["avg_latency_ms"]
p95_latency = vlm_metrics.get("p95_latency_ms", p95_latency)
# TPS calculated from successful requests / total time
total_time_sec = max_latency / 1000 if max_latency > 0 else 1
avg_tps = successful / total_time_sec if successful > 0 else 0.0

# System metrics
memory_percent = psutil.virtual_memory().percent
Expand All @@ -537,92 +534,6 @@ def _run_iteration(self, density: int, iteration: int) -> DineInIterationResult:
timestamp=datetime.now().strftime("%Y-%m-%d %H:%M:%S")
)

def _collect_vlm_logger_metrics(self) -> Dict:
"""
Collect metrics from vlm_metrics_logger output files.

Parses vlm_application_metrics_*.txt files to extract latency and TPS.

Returns:
Dictionary with VLM latency metrics
"""
metrics = {
"total_transactions": 0,
"avg_latency_ms": 0.0,
"p95_latency_ms": 0.0,
"avg_tps": 0.0
}

# Search in results dir and common locations
search_paths = [
os.path.join(self.results_dir, "vlm_application_metrics_*.txt"),
"/tmp/vlm_application_metrics_*.txt",
os.path.join(Path(self.compose_file).parent, "results", "vlm_application_metrics_*.txt")
]

log_files = []
for pattern in search_paths:
log_files.extend(glob.glob(pattern))

if not log_files:
logger.debug("No vlm_metrics_logger files found")
return metrics

start_times = {}
end_times = {}
tps_values = []

for log_file in log_files:
try:
with open(log_file, 'r') as f:
for line in f:
# Parse unique_id, event, and timestamp
id_match = re.search(r'id=([\w_-]+)', line)
event_match = re.search(r'event=(\w+)', line)
ts_match = re.search(r'timestamp_ms=(\d+)', line)

if id_match and event_match and ts_match:
unique_id = id_match.group(1)
event = event_match.group(1)
timestamp = int(ts_match.group(1))

if event == "start":
start_times[unique_id] = timestamp
elif event == "end":
end_times[unique_id] = timestamp

# Parse TPS from custom events
if "ovms_metrics" in line:
tps_match = re.search(r'tps=([\d.]+)', line)
if tps_match:
tps_values.append(float(tps_match.group(1)))

except (IOError, OSError) as e:
logger.warning(f"Error reading metrics file {log_file}: {e}")
continue

# Calculate latencies from start/end pairs
latencies = []
for unique_id in start_times:
if unique_id in end_times:
latency_ms = end_times[unique_id] - start_times[unique_id]
if latency_ms > 0: # Filter invalid
latencies.append(latency_ms)

metrics["total_transactions"] = len(latencies)

if latencies:
metrics["avg_latency_ms"] = sum(latencies) / len(latencies)
sorted_latencies = sorted(latencies)
p95_idx = int(len(sorted_latencies) * 0.95)
metrics["p95_latency_ms"] = sorted_latencies[min(p95_idx, len(sorted_latencies) - 1)]

if tps_values:
metrics["avg_tps"] = sum(tps_values) / len(tps_values)

logger.debug(f"VLM metrics collected: {metrics}")
return metrics

def _get_latency_metric(self, result: DineInIterationResult) -> float:
"""Get the configured latency metric value."""
if self.latency_metric == "avg":
Expand Down
Loading