From c269394678d2f2aa4b46477a2689d2d53549205d Mon Sep 17 00:00:00 2001 From: Jitendra Saini Date: Wed, 18 Feb 2026 13:39:10 +0530 Subject: [PATCH] imporvement for order accuracy --- benchmark-scripts/benchmark_order_accuracy.py | 176 +++--------------- benchmark-scripts/requirements.txt | 3 +- .../stream_density_oa_dine_in.py | 105 +---------- 3 files changed, 40 insertions(+), 244 deletions(-) diff --git a/benchmark-scripts/benchmark_order_accuracy.py b/benchmark-scripts/benchmark_order_accuracy.py index d67c430..3e2dd02 100644 --- a/benchmark-scripts/benchmark_order_accuracy.py +++ b/benchmark-scripts/benchmark_order_accuracy.py @@ -2,11 +2,14 @@ Order Accuracy Benchmark Script Orchestrates benchmark execution for the Order Accuracy pipeline. -Integrates with performance-tools for metrics collection and stream density analysis. +Integrates with performance-tools for metrics collection. Usage: - python benchmark_order_accuracy.py --compose_file ../../docker-compose.yaml --pipelines 1 - python benchmark_order_accuracy.py --compose_file ../../docker-compose.yaml --target_fps 14.95 + python benchmark_order_accuracy.py --compose_file ../../docker-compose.yaml --workers 2 --duration 300 + +Note: For stream density testing, use the application-specific scripts directly: + - Take-Away: stream_density_latency_oa.py (RTSP/workers based) + - Dine-In: stream_density_oa_dine_in.py (concurrent images based) """ import argparse @@ -18,30 +21,29 @@ import json import csv from pathlib import Path -from typing import List, Dict, Optional, Tuple +from typing import List, Dict, Optional # Import from performance-tools benchmark scripts sys.path.insert(0, os.path.dirname(os.path.abspath(__file__))) import stream_density -from stream_density_latency_oa import OrderAccuracyStreamDensity class OrderAccuracyBenchmark: """ Benchmark orchestrator for Order Accuracy pipeline. - Supports two modes: - 1. Fixed workers: Runs N station workers for specified duration - 2. Stream density: Finds maximum workers that maintain target throughput - + Runs N station workers for specified duration and collects metrics. Order Accuracy uses WORKERS to scale, not traditional pipelines. Each worker processes one RTSP station stream. + + For stream density testing, use application-specific scripts: + - Take-Away: stream_density_latency_oa.py + - Dine-In: stream_density_oa_dine_in.py """ # Default configuration DEFAULT_INIT_DURATION = 120 # seconds DEFAULT_DURATION = 300 # seconds - DEFAULT_TARGET_FPS = 14.95 DEFAULT_WORKERS = 1 def __init__( @@ -154,66 +156,6 @@ def run_fixed_workers( return results - def run_stream_density( - self, - target_fps: float, - init_duration: int, - density_increment: Optional[int] = None, - container_name: str = "order-accuracy-vlm" - ) -> Tuple[int, bool, Dict]: - """ - Run stream density benchmark to find maximum pipelines. - - Args: - target_fps: Target FPS to maintain per stream - init_duration: Warmup duration in seconds - density_increment: Pipeline count increment (auto if None) - container_name: Container to monitor - - Returns: - Tuple of (max_pipelines, met_target, metrics_dict) - """ - print(f"\n{'='*60}") - print(f"Order Accuracy Benchmark - Stream Density Mode") - print(f"Target FPS: {target_fps}") - print(f"Init Duration: {init_duration}s") - print(f"Container: {container_name}") - print(f"{'='*60}\n") - - # Configure stream density - self.env_vars["TARGET_FPS"] = str(target_fps) - self.env_vars["INIT_DURATION"] = str(init_duration) - self.env_vars["CONTAINER_NAME"] = container_name - - if density_increment: - self.env_vars["PIPELINE_INC"] = str(density_increment) - - # Use Order Accuracy specific stream density - oa_density = OrderAccuracyStreamDensity( - self.env_vars, - self.compose_files, - self.results_dir - ) - - max_pipelines, met_target = oa_density.run_iterations( - target_fps=target_fps, - container_name=container_name - ) - - # Collect final metrics - metrics = oa_density.get_final_metrics() - - # Export results - self._export_results({ - "mode": "stream_density", - "target_fps": target_fps, - "max_pipelines": max_pipelines, - "met_target": met_target, - "metrics": metrics - }, "stream_density") - - return max_pipelines, met_target, metrics - def _clean_pipeline_logs(self): """Remove previous pipeline log files.""" import glob @@ -474,11 +416,15 @@ def parse_args(): # Run with 2 workers for 5 minutes python benchmark_order_accuracy.py --compose_file ../../docker-compose.yaml --workers 2 --duration 300 - # Run stream density to find max workers at target throughput - python benchmark_order_accuracy.py --compose_file ../../docker-compose.yaml --target_fps 15.0 + # Run with 1 worker (default) + python benchmark_order_accuracy.py --compose_file ../../docker-compose.yaml + +For stream density testing, use application-specific scripts: + # Take-Away (RTSP/workers based) + python stream_density_latency_oa.py --compose_files ../../docker-compose.yaml - # Run stream density with custom increment - python benchmark_order_accuracy.py --compose_file ../../docker-compose.yaml --target_fps 14.95 --density_increment 2 + # Dine-In (concurrent images based) + python stream_density_oa_dine_in.py --compose_file ../../docker-compose.yml """ ) @@ -493,24 +439,8 @@ def parse_args(): parser.add_argument( '--workers', type=int, - default=0, - help='Number of station workers to run (0 = use stream density mode)' - ) - - parser.add_argument( - '--target_fps', - type=float, - nargs='*', - default=None, - help='Target FPS for stream density mode' - ) - - parser.add_argument( - '--container_names', - type=str, - nargs='*', - default=None, - help='Container names for stream density (1:1 with target_fps)' + default=1, + help='Number of station workers to run' ) parser.add_argument( @@ -527,13 +457,6 @@ def parse_args(): help='Benchmark duration in seconds' ) - parser.add_argument( - '--density_increment', - type=int, - default=None, - help='Worker increment for stream density' - ) - parser.add_argument( '--results_dir', type=str, @@ -586,53 +509,14 @@ def main(): target_device=args.target_device ) - # Determine mode - target_fps_list = args.target_fps if args.target_fps else [] - container_names = args.container_names if args.container_names else [] - - if args.workers > 0: - # Fixed workers mode - print("Running in fixed workers mode...") - results = benchmark.run_fixed_workers( - workers=args.workers, - init_duration=args.init_duration, - duration=args.duration - ) - print(f"\nBenchmark complete. Results: {results}") - - elif target_fps_list: - # Stream density mode - print("Running in stream density mode...") - - if len(target_fps_list) > 1 and len(target_fps_list) != len(container_names): - print("Error: Number of target_fps values must match container_names") - sys.exit(1) - - for i, target_fps in enumerate(target_fps_list): - container_name = container_names[i] if container_names else "oa_service" - - max_workers, met_target, metrics = benchmark.run_stream_density( - target_fps=target_fps, - init_duration=args.init_duration, - density_increment=args.density_increment, - container_name=container_name - ) - - print(f"\n{'='*60}") - print(f"Stream Density Results:") - print(f" Target FPS: {target_fps}") - print(f" Max Workers: {max_workers}") - print(f" Met Target: {met_target}") - print(f"{'='*60}") - else: - # Default: single worker - print("No mode specified. Running with 1 worker...") - results = benchmark.run_fixed_workers( - workers=1, - init_duration=args.init_duration, - duration=args.duration - ) - print(f"\nBenchmark complete. Results: {results}") + # Run fixed workers benchmark + print(f"Running benchmark with {args.workers} worker(s)...") + results = benchmark.run_fixed_workers( + workers=args.workers, + init_duration=args.init_duration, + duration=args.duration + ) + print(f"\nBenchmark complete. Results: {results}") if __name__ == '__main__': diff --git a/benchmark-scripts/requirements.txt b/benchmark-scripts/requirements.txt index 8b97a3a..7b11fba 100644 --- a/benchmark-scripts/requirements.txt +++ b/benchmark-scripts/requirements.txt @@ -3,4 +3,5 @@ numpy>=1.26.0 pandas>=2.1.0 natsort>=8.4.0 matplotlib==3.10.3 -psutil \ No newline at end of file +psutil +aiohttp>=3.9.0 \ No newline at end of file diff --git a/benchmark-scripts/stream_density_oa_dine_in.py b/benchmark-scripts/stream_density_oa_dine_in.py index 6f2fefa..e20c40d 100644 --- a/benchmark-scripts/stream_density_oa_dine_in.py +++ b/benchmark-scripts/stream_density_oa_dine_in.py @@ -406,8 +406,10 @@ def run(self) -> DineInDensityResult: print(f"Memory threshold exceeded. Stopping at density={density - self.density_increment}") break - # Clean previous metrics - self._clean_metrics_files() + # Clean previous metrics only on first iteration + # (subsequent iterations append to metrics files) + if iteration == 1: + self._clean_metrics_files() # Start services if not running (first iteration) if iteration == 1: @@ -500,7 +502,7 @@ def _run_iteration(self, density: int, iteration: int) -> DineInIterationResult: successful = len(latencies) failed = len(results) - successful - # Calculate metrics + # Calculate metrics from HTTP response latencies avg_latency = sum(latencies) / len(latencies) if latencies else 0 sorted_latencies = sorted(latencies) if latencies else [0] p95_idx = int(len(sorted_latencies) * 0.95) if sorted_latencies else 0 @@ -508,14 +510,9 @@ def _run_iteration(self, density: int, iteration: int) -> DineInIterationResult: min_latency = min(sorted_latencies) if sorted_latencies else 0 max_latency = max(sorted_latencies) if sorted_latencies else 0 - # Collect VLM metrics from logger files - vlm_metrics = self._collect_vlm_logger_metrics() - avg_tps = vlm_metrics.get("avg_tps", 0.0) - - # If VLM metrics have better latency data, use them - if vlm_metrics.get("avg_latency_ms", 0) > 0: - avg_latency = vlm_metrics["avg_latency_ms"] - p95_latency = vlm_metrics.get("p95_latency_ms", p95_latency) + # TPS calculated from successful requests / total time + total_time_sec = max_latency / 1000 if max_latency > 0 else 1 + avg_tps = successful / total_time_sec if successful > 0 else 0.0 # System metrics memory_percent = psutil.virtual_memory().percent @@ -537,92 +534,6 @@ def _run_iteration(self, density: int, iteration: int) -> DineInIterationResult: timestamp=datetime.now().strftime("%Y-%m-%d %H:%M:%S") ) - def _collect_vlm_logger_metrics(self) -> Dict: - """ - Collect metrics from vlm_metrics_logger output files. - - Parses vlm_application_metrics_*.txt files to extract latency and TPS. - - Returns: - Dictionary with VLM latency metrics - """ - metrics = { - "total_transactions": 0, - "avg_latency_ms": 0.0, - "p95_latency_ms": 0.0, - "avg_tps": 0.0 - } - - # Search in results dir and common locations - search_paths = [ - os.path.join(self.results_dir, "vlm_application_metrics_*.txt"), - "/tmp/vlm_application_metrics_*.txt", - os.path.join(Path(self.compose_file).parent, "results", "vlm_application_metrics_*.txt") - ] - - log_files = [] - for pattern in search_paths: - log_files.extend(glob.glob(pattern)) - - if not log_files: - logger.debug("No vlm_metrics_logger files found") - return metrics - - start_times = {} - end_times = {} - tps_values = [] - - for log_file in log_files: - try: - with open(log_file, 'r') as f: - for line in f: - # Parse unique_id, event, and timestamp - id_match = re.search(r'id=([\w_-]+)', line) - event_match = re.search(r'event=(\w+)', line) - ts_match = re.search(r'timestamp_ms=(\d+)', line) - - if id_match and event_match and ts_match: - unique_id = id_match.group(1) - event = event_match.group(1) - timestamp = int(ts_match.group(1)) - - if event == "start": - start_times[unique_id] = timestamp - elif event == "end": - end_times[unique_id] = timestamp - - # Parse TPS from custom events - if "ovms_metrics" in line: - tps_match = re.search(r'tps=([\d.]+)', line) - if tps_match: - tps_values.append(float(tps_match.group(1))) - - except (IOError, OSError) as e: - logger.warning(f"Error reading metrics file {log_file}: {e}") - continue - - # Calculate latencies from start/end pairs - latencies = [] - for unique_id in start_times: - if unique_id in end_times: - latency_ms = end_times[unique_id] - start_times[unique_id] - if latency_ms > 0: # Filter invalid - latencies.append(latency_ms) - - metrics["total_transactions"] = len(latencies) - - if latencies: - metrics["avg_latency_ms"] = sum(latencies) / len(latencies) - sorted_latencies = sorted(latencies) - p95_idx = int(len(sorted_latencies) * 0.95) - metrics["p95_latency_ms"] = sorted_latencies[min(p95_idx, len(sorted_latencies) - 1)] - - if tps_values: - metrics["avg_tps"] = sum(tps_values) / len(tps_values) - - logger.debug(f"VLM metrics collected: {metrics}") - return metrics - def _get_latency_metric(self, result: DineInIterationResult) -> float: """Get the configured latency metric value.""" if self.latency_metric == "avg":