From 9916eca82e2db47bd06277b77ff16f140e6db442 Mon Sep 17 00:00:00 2001 From: Jitendra Saini Date: Sat, 28 Feb 2026 16:08:52 +0530 Subject: [PATCH 1/2] take away stream density fix --- .../stream_density_latency_oa.py | 211 ++++++++++- .../stream_density_oa_dine_in.py | 330 +++++++++++++++++- 2 files changed, 514 insertions(+), 27 deletions(-) diff --git a/benchmark-scripts/stream_density_latency_oa.py b/benchmark-scripts/stream_density_latency_oa.py index 0b2dcdd..f8baf0c 100644 --- a/benchmark-scripts/stream_density_latency_oa.py +++ b/benchmark-scripts/stream_density_latency_oa.py @@ -24,6 +24,7 @@ import psutil import shlex import subprocess +import shutil from typing import Dict, List, Optional from dataclasses import dataclass, field from datetime import datetime @@ -40,6 +41,7 @@ class IterationResult: passed: bool memory_percent: float timestamp: str + station_results: Dict = field(default_factory=dict) # Per-station validation results @dataclass @@ -141,11 +143,13 @@ def run(self) -> DensityResult: print(f"Latency Metric: {self.latency_metric}") print(f"Worker Increment: {self.worker_increment}") print(f"Init Duration: {self.init_duration}s") - print(f"Min Transactions: {self.min_transactions}") + print(f"Min Transactions: {self.min_transactions} per worker (scales with worker count)") print("=" * 70) workers = 1 best_result: Optional[IterationResult] = None + consecutive_failures = 0 # Track consecutive 0-transaction failures + MAX_CONSECUTIVE_FAILURES = 2 # Stop after this many consecutive failures for iteration in range(1, self.MAX_ITERATIONS + 1): print(f"\n{'='*60}") @@ -163,9 +167,10 @@ def run(self) -> DensityResult: # Run benchmark iteration result = self._run_iteration(workers) - # Check if we got enough transactions - if result.total_transactions < self.min_transactions: - print(f"WARNING: Only got {result.total_transactions} transactions (need {self.min_transactions})") + # Check if we got enough transactions (scaled by worker count) + required_transactions = self.min_transactions * workers + if result.total_transactions < required_transactions: + print(f"WARNING: Only got {result.total_transactions} transactions (need {required_transactions} = {self.min_transactions} × {workers} workers)") print("Latency measurement may be unreliable.") self.iterations.append(result) @@ -187,17 +192,34 @@ def run(self) -> DensityResult: if current_latency <= self.target_latency_ms and current_latency > 0: print(f" ✓ PASSED (latency {current_latency/1000:.1f}s <= {self.target_latency_ms/1000:.1f}s)") best_result = result + consecutive_failures = 0 # Reset on success workers += self.worker_increment elif current_latency == 0: - print(f" ⚠ NO DATA - No latency measurements collected") - print(f" Trying next iteration anyway...") + consecutive_failures += 1 + print(f" ⚠ NO DATA - No latency measurements collected ({consecutive_failures}/{MAX_CONSECUTIVE_FAILURES} consecutive failures)") + if consecutive_failures >= MAX_CONSECUTIVE_FAILURES: + print(f"\n ✗ STOPPING: {MAX_CONSECUTIVE_FAILURES} consecutive failures detected (likely GPU OOM)") + print(f" Maximum sustainable workers: {best_result.workers if best_result else 0}") + break + print(f" Trying next iteration...") + workers += self.worker_increment + elif current_latency < 0: + consecutive_failures += 1 + print(f" ⚠ CORRUPTED METRICS - Negative latency {current_latency:.0f}ms detected") + print(f" This usually means a video-loop ID collision in the metrics file.") + if consecutive_failures >= MAX_CONSECUTIVE_FAILURES: + print(f"\n ✗ STOPPING: {MAX_CONSECUTIVE_FAILURES} consecutive failures detected") + print(f" Maximum sustainable workers: {best_result.workers if best_result else 0}") + break + print(f" Discarding this iteration result; trying next...") + self.iterations.pop() # Remove the corrupt entry already appended above workers += self.worker_increment else: print(f" ✗ FAILED (latency {current_latency/1000:.1f}s > {self.target_latency_ms/1000:.1f}s)") break - # Final cleanup - self._docker_compose("--profile parallel down") + # Final cleanup (with volumes to ensure clean state for next run) + self._docker_compose("--profile parallel down -v") # Build result density_result = DensityResult( @@ -222,16 +244,35 @@ def _run_iteration(self, workers: int) -> IterationResult: Run a single benchmark iteration with specified workers. Polls for transactions instead of waiting fixed duration. + Cleans MinIO data before each iteration for clean results. """ + iteration_num = len(self.iterations) + 1 + # Set environment for workers self.env_vars["WORKERS"] = str(workers) self.env_vars["VLM_WORKERS"] = str(workers) self.env_vars["SERVICE_MODE"] = "parallel" - # Stop any existing containers + # Stop any existing containers (with volume cleanup to clear sync state) print("Stopping existing containers...") - self._docker_compose("--profile parallel down") - time.sleep(5) + self._docker_compose("--profile parallel down -v") + time.sleep(3) + + # Clean MinIO data for fresh start + print("Cleaning MinIO data for fresh iteration...") + self._clean_minio() + + # Clean sync volume (CRITICAL: prevents 2PC deadlock on iteration 2+) + # Note: docker compose down -v should handle this, but double-check + print("Cleaning sync volume...") + self._clean_sync_volume() + + # Clean station result files from previous iteration + self._clean_station_results() + + # Clean metrics files so transaction count starts from 0 + print("Cleaning metrics files...") + self._clean_metrics_files() # Start containers print("Starting containers...") @@ -241,12 +282,13 @@ def _run_iteration(self, workers: int) -> IterationResult: print(f"Waiting {self.init_duration}s for initialization...") time.sleep(self.init_duration) - # Poll for transactions until we have enough - print(f"Waiting for {self.min_transactions} transactions...") + # Poll for transactions until we have enough (scaled by worker count) + required_transactions = self.min_transactions * workers + print(f"Waiting for {required_transactions} transactions ({self.min_transactions} per worker × {workers} worker(s))...") start_time = time.time() transactions = 0 - while transactions < self.min_transactions: + while transactions < required_transactions: elapsed = time.time() - start_time if elapsed > self.MAX_WAIT_SEC: @@ -257,9 +299,9 @@ def _run_iteration(self, workers: int) -> IterationResult: metrics = self._collect_vlm_logger_metrics() transactions = metrics.get("total_transactions", 0) - if transactions < self.min_transactions: - remaining = self.min_transactions - transactions - print(f" Transactions: {transactions}/{self.min_transactions} " + if transactions < required_transactions: + remaining = required_transactions - transactions + print(f" Transactions: {transactions}/{required_transactions} " f"(waiting for {remaining} more, {elapsed:.0f}s elapsed)") time.sleep(self.POLL_INTERVAL_SEC) @@ -267,6 +309,11 @@ def _run_iteration(self, workers: int) -> IterationResult: vlm_metrics = self._collect_vlm_logger_metrics() memory_percent = psutil.virtual_memory().percent + # Collect station results before stopping containers + print("Collecting station results...") + station_results = self._collect_station_results(iteration_num, workers) + print(f" Collected results from {len(station_results)} station(s)") + # Stop containers print("Stopping containers...") self._docker_compose("--profile parallel down") @@ -280,7 +327,8 @@ def _run_iteration(self, workers: int) -> IterationResult: total_transactions=vlm_metrics.get("total_transactions", 0), passed=True, # Will be updated after comparison memory_percent=memory_percent, - timestamp=datetime.now().strftime("%Y-%m-%d %H:%M:%S") + timestamp=datetime.now().strftime("%Y-%m-%d %H:%M:%S"), + station_results=station_results ) def _collect_vlm_logger_metrics(self) -> Dict: @@ -324,6 +372,14 @@ def _collect_vlm_logger_metrics(self) -> Dict: timestamp = int(ts_match.group(1)) if event == "start": + # If this ID already has a completed start+end pair (video loop + # reuse), save it under a unique key before overwriting so the + # latency isn't corrupted by end(loop N) - start(loop N+1). + if unique_id in start_times and unique_id in end_times: + saved_key = f"{unique_id}_run{len([k for k in start_times if k.startswith(unique_id)])}" + start_times[saved_key] = start_times[unique_id] + end_times[saved_key] = end_times[unique_id] + del end_times[unique_id] start_times[unique_id] = timestamp elif event == "end": end_times[unique_id] = timestamp @@ -393,6 +449,122 @@ def _clean_metrics_files(self): except OSError: pass + def _clean_minio(self): + """Clean MinIO data bucket before each iteration.""" + print("Cleaning MinIO data...") + + # Stop MinIO container if running + subprocess.run( + "docker stop oa_minio 2>/dev/null || true", + shell=True, + capture_output=True + ) + + # Remove MinIO container + subprocess.run( + "docker rm oa_minio 2>/dev/null || true", + shell=True, + capture_output=True + ) + + # Remove MinIO volume + subprocess.run( + "docker volume rm take-away_minio_data 2>/dev/null || true", + shell=True, + capture_output=True + ) + + print(" MinIO data cleared") + + def _clean_sync_volume(self): + """Clean pipeline sync volume to prevent 2PC deadlock between iterations. + + The pipeline_sync volume contains ready/prepare/commit signal files. + If not cleaned, stale files from previous iteration cause 2PC protocol + to deadlock (pipelines wait for COMMIT that was already issued). + """ + print("Cleaning pipeline sync volume...") + + # Remove the sync volume - it will be recreated on next compose up + result = subprocess.run( + "docker volume rm take-away_pipeline_sync 2>/dev/null || true", + shell=True, + capture_output=True, + text=True + ) + + # Also try without the project prefix (depends on compose project name) + subprocess.run( + "docker volume rm pipeline_sync 2>/dev/null || true", + shell=True, + capture_output=True + ) + + # List remaining sync-related volumes for debugging + result = subprocess.run( + "docker volume ls | grep -i sync || true", + shell=True, + capture_output=True, + text=True + ) + if result.stdout.strip(): + print(f" Warning: Remaining sync volumes: {result.stdout.strip()}") + else: + print(" Pipeline sync volume cleared") + + def _clean_station_results(self): + """Clean previous station result files.""" + patterns = [ + "station_*_report.md", + "station_*_summary.json" + ] + + for pattern in patterns: + for f in glob.glob(os.path.join(self.results_dir, pattern)): + try: + os.remove(f) + except OSError: + pass + + print(" Station results cleared") + + def _collect_station_results(self, iteration_num: int, workers: int) -> Dict: + """Collect station results for this iteration.""" + station_results = {} + + for station_id in range(1, workers + 1): + summary_file = os.path.join(self.results_dir, f"station_{station_id}_summary.json") + report_file = os.path.join(self.results_dir, f"station_{station_id}_report.md") + + if os.path.exists(summary_file): + try: + with open(summary_file, 'r') as f: + summary_data = json.load(f) + station_results[f"station_{station_id}"] = summary_data + + # Write to iteration-specific file (avoid shutil.copy permission issues) + iter_summary_file = os.path.join( + self.results_dir, + f"iteration_{iteration_num}_station_{station_id}_summary.json" + ) + with open(iter_summary_file, 'w') as f: + json.dump(summary_data, f, indent=2) + + if os.path.exists(report_file): + iter_report_file = os.path.join( + self.results_dir, + f"iteration_{iteration_num}_station_{station_id}_report.md" + ) + with open(report_file, 'r') as f: + report_content = f.read() + with open(iter_report_file, 'w') as f: + f.write(report_content) + + except (json.JSONDecodeError, IOError, PermissionError) as e: + print(f" Warning: Could not read station {station_id} summary: {e}") + + return station_results + def _check_memory_available(self, workers: int) -> bool: """Check if memory is available for workers.""" mem = psutil.virtual_memory() @@ -431,7 +603,8 @@ def _export_results(self, result: DensityResult): "total_transactions": it.total_transactions, "passed": it.passed, "memory_percent": it.memory_percent, - "timestamp": it.timestamp + "timestamp": it.timestamp, + "station_results": it.station_results } for it in result.iterations ], diff --git a/benchmark-scripts/stream_density_oa_dine_in.py b/benchmark-scripts/stream_density_oa_dine_in.py index 7cf63ec..ca51304 100644 --- a/benchmark-scripts/stream_density_oa_dine_in.py +++ b/benchmark-scripts/stream_density_oa_dine_in.py @@ -238,7 +238,30 @@ async def validate_image( if resp.status == 200: result["success"] = True - result["response"] = await resp.json() + response_data = await resp.json() + result["response"] = response_data + # Log validation outcome + accuracy = response_data.get("accuracy_score", 0) + complete = response_data.get("order_complete", False) + matched = response_data.get("matched_items", []) + missing = response_data.get("missing_items", []) + extra = response_data.get("extra_items", []) + mismatches = response_data.get("quantity_mismatches", []) + status_str = "COMPLETE" if complete else "INCOMPLETE" + logger.info( + f"[VALIDATE] {request_id} | {status_str} | " + f"accuracy={accuracy:.2f} | latency={result['latency_ms']:.0f}ms | " + f"matched={len(matched)} missing={len(missing)} " + f"extra={len(extra)} mismatches={len(mismatches)}" + ) + if missing: + missing_names = [item.get('name', item) for item in missing] + logger.info(f"[VALIDATE] {request_id} | MISSING: {missing_names}") + if extra: + extra_names = [item.get('name', item) for item in extra] + logger.info(f"[VALIDATE] {request_id} | EXTRA: {extra_names}") + if mismatches: + logger.info(f"[VALIDATE] {request_id} | QUANTITY MISMATCHES: {mismatches}") else: result["error"] = f"HTTP {resp.status}: {await resp.text()}" @@ -352,6 +375,8 @@ def __init__( init_duration: int = 60, min_requests: int = MIN_REQUESTS_PER_ITERATION, request_timeout: int = 300, + single_run: bool = False, + concurrent_images: int = 1, max_iterations: int = DEFAULT_MAX_ITERATIONS ): self.compose_file = compose_file @@ -363,6 +388,8 @@ def __init__( self.init_duration = init_duration self.min_requests = min_requests self.request_timeout = request_timeout + self.single_run = single_run + self.concurrent_images = concurrent_images self.max_iterations = max_iterations # Resolve paths relative to compose file @@ -382,6 +409,9 @@ def __init__( logger.info(f" Images: {self.images_dir}") logger.info(f" Orders: {self.orders_file}") logger.info(f" Target Latency: {self.target_latency_ms}ms") + logger.info(f" Single Run Mode: {self.single_run}") + if self.single_run: + logger.info(f" Concurrent Images: {self.concurrent_images}") def run(self) -> DineInDensityResult: """ @@ -394,11 +424,13 @@ def run(self) -> DineInDensityResult: """ self._print_header() - density = 1 + # In single_run mode, just run once with specified concurrent_images + density = self.concurrent_images if self.single_run else 1 + max_iter = 1 if self.single_run else self.max_iterations best_result: Optional[DineInIterationResult] = None total_images = 0 - for iteration in range(1, self.max_iterations + 1): + for iteration in range(1, max_iter + 1): print(f"\n{'='*70}") print(f"Iteration {iteration}: Testing density={density} concurrent images") print(f"{'='*70}") @@ -503,6 +535,27 @@ def _run_iteration(self, density: int, iteration: int) -> DineInIterationResult: latencies = [r["latency_ms"] for r in results if r.get("success")] successful = len(latencies) failed = len(results) - successful + + # Log per-request validation summary + for r in results: + if r.get("success") and r.get("response"): + resp = r["response"] + complete = resp.get("order_complete", False) + accuracy = resp.get("accuracy_score", 0) + matched = len(resp.get("matched_items", [])) + missing = len(resp.get("missing_items", [])) + extra = len(resp.get("extra_items", [])) + print( + f" [{r.get('image_id', '?')}] " + f"{'✓ COMPLETE' if complete else '✗ INCOMPLETE'} | " + f"accuracy={accuracy:.2f} | matched={matched} missing={missing} extra={extra} | " + f"latency={r['latency_ms']:.0f}ms" + ) + elif not r.get("success"): + print(f" [{r.get('image_id', '?')}] ✗ FAILED - {r.get('error', 'unknown error')}") + + # Print detailed validation results + self._print_detailed_validation_results(results) # Calculate metrics from HTTP response latencies avg_latency = sum(latencies) / len(latencies) if latencies else 0 @@ -520,7 +573,7 @@ def _run_iteration(self, density: int, iteration: int) -> DineInIterationResult: memory_percent = psutil.virtual_memory().percent cpu_percent = psutil.cpu_percent(interval=0.5) - return DineInIterationResult( + iteration_result = DineInIterationResult( density=density, avg_latency_ms=avg_latency, p95_latency_ms=p95_latency, @@ -535,6 +588,9 @@ def _run_iteration(self, density: int, iteration: int) -> DineInIterationResult: cpu_percent=cpu_percent, timestamp=datetime.now().strftime("%Y-%m-%d %H:%M:%S") ) + # Attach raw results for detailed validation export + iteration_result._raw_results = results + return iteration_result def _get_latency_metric(self, result: DineInIterationResult) -> float: """Get the configured latency metric value.""" @@ -546,6 +602,70 @@ def _get_latency_metric(self, result: DineInIterationResult) -> float: return result.max_latency_ms return result.avg_latency_ms + def _print_detailed_validation_results(self, results: List[Dict]): + """Print detailed validation results with item breakdowns.""" + print("\n" + "=" * 70) + print("VALIDATION DETAILS") + print("=" * 70) + + for r in results: + image_id = r.get("image_id", "unknown") + latency_ms = r.get("latency_ms", 0) + + if not r.get("success"): + print(f"\n🖼️ {image_id} - ❌ Request Failed") + print(f" Error: {r.get('error', 'unknown error')}") + print(f" Latency: {latency_ms:.0f}ms") + continue + + resp = r.get("response", {}) + order_complete = resp.get("order_complete", False) + accuracy = resp.get("accuracy_score", 0.0) + matched_items = resp.get("matched_items", []) + missing_items = resp.get("missing_items", []) + extra_items = resp.get("extra_items", []) + qty_mismatches = resp.get("quantity_mismatches", []) + + complete_icon = "✅" if order_complete else "❌" + complete_label = "Order Complete" if order_complete else "Order Incomplete" + + print(f"\n🖼️ {image_id}") + print(f" {complete_icon} {complete_label}") + print(f" Accuracy: {accuracy * 100:.0f}%") + print(f" Latency: {latency_ms:.0f}ms") + + if matched_items: + print(f" ✔️ Matched Items:") + for item in matched_items: + name = item.get("detected_name") or item.get("expected_name", "?") + qty = item.get("quantity", 1) + sim = item.get("similarity", 0) + print(f" • {name} (×{qty}) - {sim * 100:.0f}% match") + + if missing_items: + print(f" ⚠️ Missing Items:") + for item in missing_items: + name = item.get("name", "?") + qty = item.get("quantity", 1) + print(f" • {name} (×{qty})") + + if extra_items: + print(f" ➕ Extra Items Detected:") + for item in extra_items: + name = item.get("name", "?") + qty = item.get("quantity", 1) + print(f" • {name} (×{qty})") + + if qty_mismatches: + print(f" 🔢 Quantity Mismatches:") + for item in qty_mismatches: + name = item.get("item", "?") + exp = item.get("expected_quantity", "?") + got = item.get("detected_quantity", "?") + print(f" • {name}: expected ×{exp}, detected ×{got}") + + print("\n" + "=" * 70) + def _start_services(self): """Start dine-in services via docker compose.""" print("Starting dine-in services...") @@ -576,7 +696,7 @@ def _wait_for_ready(self): if resp.status == 200: ready = True break - except (urllib.error.URLError, TimeoutError, ConnectionResetError, ConnectionRefusedError, OSError): + except Exception: pass time.sleep(5) @@ -703,6 +823,29 @@ def _export_results(self, result: DineInDensityResult): } if result.best_iteration else None } + # Collect all per-request validation details across all iterations + all_validations = [] + for it in result.iterations: + if hasattr(it, '_raw_results'): + for r in it._raw_results: + if r.get("success") and r.get("response"): + resp = r["response"] + all_validations.append({ + "density": it.density, + "request_id": r.get("request_id"), + "image_id": r.get("image_id"), + "latency_ms": round(r["latency_ms"], 2), + "order_complete": resp.get("order_complete"), + "accuracy_score": resp.get("accuracy_score"), + "matched_items": resp.get("matched_items", []), + "missing_items": resp.get("missing_items", []), + "extra_items": resp.get("extra_items", []), + "quantity_mismatches": resp.get("quantity_mismatches", []), + "metrics": resp.get("metrics") + }) + if all_validations: + result_dict["validation_details"] = all_validations + # Export JSON json_path = self.results_dir / f"dinein_density_results_{timestamp}.json" with open(json_path, 'w') as f: @@ -720,7 +863,163 @@ def _export_results(self, result: DineInDensityResult): f.write(f"{it.successful_requests},{it.failed_requests},{it.passed},") f.write(f"{it.memory_percent:.1f},{it.cpu_percent:.1f}\n") print(f"Summary exported to: {csv_path}") - + + # Export per-request validation details CSV + val_csv_path = self.results_dir / f"dinein_validation_details_{timestamp}.csv" + with open(val_csv_path, 'w') as f: + f.write("density,request_id,image_id,latency_ms,order_complete,accuracy_score," + "matched,missing,extra,mismatches\n") + for it in result.iterations: + raw = getattr(it, '_raw_results', []) + for r in raw: + if r.get("success") and r.get("response"): + resp = r["response"] + f.write( + f"{it.density},{r.get('request_id','')},{r.get('image_id','')}," + f"{r['latency_ms']:.0f},{resp.get('order_complete','')}," + f"{resp.get('accuracy_score', 0):.2f}," + f"{len(resp.get('matched_items',[]))}," + f"{len(resp.get('missing_items',[]))}," + f"{len(resp.get('extra_items',[]))}," + f"{len(resp.get('quantity_mismatches',[]))}\n" + ) + print(f"Validation details exported to: {val_csv_path}") + + # Export Markdown report + self._export_markdown_report(result, timestamp) + + def _export_markdown_report(self, result: DineInDensityResult, timestamp: str): + """Export a human-readable Markdown validation report per iteration and image.""" + md_path = self.results_dir / f"dinein_validation_report_{timestamp}.md" + + with open(md_path, 'w') as f: + f.write(f"# Dine-In Stream Density — Validation Report\n\n") + f.write(f"**Generated:** {datetime.now().strftime('%Y-%m-%d %H:%M:%S')} \n") + f.write(f"**Target Latency:** {result.target_latency_ms:.0f} ms " + f"({result.target_latency_ms / 1000:.1f} s) \n") + f.write(f"**Max Density Achieved:** {result.max_density} concurrent image(s) \n") + f.write(f"**Met Target:** {'✅ Yes' if result.met_target else '❌ No'} \n") + f.write(f"**Total Images Processed:** {result.total_images_processed} \n\n") + f.write("---\n\n") + + for it in result.iterations: + status_icon = "✅" if it.passed else "❌" + f.write(f"## {status_icon} Iteration — Density {it.density} " + f"({'PASS' if it.passed else 'FAIL'})\n\n") + + f.write(f"| Metric | Value |\n") + f.write(f"|--------|-------|\n") + f.write(f"| Concurrent Images | {it.density} |\n") + f.write(f"| Avg Latency | {it.avg_latency_ms:.0f} ms " + f"({it.avg_latency_ms / 1000:.2f} s) |\n") + f.write(f"| P95 Latency | {it.p95_latency_ms:.0f} ms " + f"({it.p95_latency_ms / 1000:.2f} s) |\n") + f.write(f"| Min / Max Latency | {it.min_latency_ms:.0f} ms / " + f"{it.max_latency_ms:.0f} ms |\n") + f.write(f"| Avg TPS | {it.avg_tps:.2f} |\n") + f.write(f"| Requests | {it.successful_requests} succeeded / " + f"{it.failed_requests} failed |\n") + f.write(f"| Memory | {it.memory_percent:.1f}% |\n") + f.write(f"| CPU | {it.cpu_percent:.1f}% |\n") + f.write(f"| Timestamp | {it.timestamp} |\n\n") + + # Per-image validation results + raw = getattr(it, '_raw_results', []) + if not raw: + f.write("_No per-image results available for this iteration._\n\n") + f.write("---\n\n") + continue + + for r in raw: + image_id = r.get("image_id", "unknown") + latency_ms = r.get("latency_ms", 0) + + if not r.get("success"): + f.write(f"### 🖼️ `{image_id}` — ❌ Request Failed\n\n") + f.write(f"> **Error:** {r.get('error', 'unknown error')} \n") + f.write(f"> **Latency:** {latency_ms:.0f} ms\n\n") + continue + + resp = r.get("response", {}) + order_complete = resp.get("order_complete", False) + accuracy = resp.get("accuracy_score", 0.0) + matched_items = resp.get("matched_items", []) + missing_items = resp.get("missing_items", []) + extra_items = resp.get("extra_items", []) + qty_mismatches = resp.get("quantity_mismatches", []) + metrics = resp.get("metrics") or {} + + complete_icon = "✅" if order_complete else "❌" + complete_label = "Order Complete" if order_complete else "Order Incomplete" + + f.write(f"### 🖼️ `{image_id}`\n\n") + f.write(f"**✅ Validation Result**\n\n") + f.write(f"{complete_icon} **{complete_label}** \n") + f.write(f"**Accuracy:** {accuracy * 100:.0f}% \n") + f.write(f"**Latency:** {latency_ms:.0f} ms \n\n") + + if matched_items: + f.write(f"**✔️ Matched Items**\n\n") + for item in matched_items: + name = item.get("detected_name") or item.get("expected_name", "?") + qty = item.get("quantity", 1) + sim = item.get("similarity", 0) + f.write(f"- {name} (×{qty}) _{sim * 100:.0f}% match_\n") + f.write("\n") + + if missing_items: + f.write(f"**⚠️ Missing Items**\n\n") + for item in missing_items: + name = item.get("name", "?") + qty = item.get("quantity", 1) + f.write(f"- {name} (×{qty})\n") + f.write("\n") + + if extra_items: + f.write(f"**➕ Extra Items Detected**\n\n") + for item in extra_items: + name = item.get("name", "?") + qty = item.get("quantity", 1) + f.write(f"- {name} (×{qty})\n") + f.write("\n") + + if qty_mismatches: + f.write(f"**🔢 Quantity Mismatches**\n\n") + for item in qty_mismatches: + name = item.get("item", "?") + exp = item.get("expected_quantity", "?") + got = item.get("detected_quantity", "?") + f.write(f"- {name}: expected ×{exp}, detected ×{got}\n") + f.write("\n") + + if metrics: + f.write(f"**📊 Performance Metrics**\n\n") + f.write(f"| Metric | Value |\n") + f.write(f"|--------|-------|\n") + for k, v in metrics.items(): + f.write(f"| {k} | {v} |\n") + f.write("\n") + + f.write("---\n\n") + + # Final summary section + f.write("## 📋 Summary Table\n\n") + f.write("| Density | Avg Latency | P95 Latency | TPS | Succeeded | Failed | Status |\n") + f.write("|---------|-------------|-------------|-----|-----------|--------|--------|\n") + for it in result.iterations: + status = "✅ PASS" if it.passed else "❌ FAIL" + f.write( + f"| {it.density} " + f"| {it.avg_latency_ms:.0f} ms ({it.avg_latency_ms / 1000:.2f}s) " + f"| {it.p95_latency_ms:.0f} ms ({it.p95_latency_ms / 1000:.2f}s) " + f"| {it.avg_tps:.2f} " + f"| {it.successful_requests} " + f"| {it.failed_requests} " + f"| {status} |\n" + ) + + print(f"Markdown report exported to: {md_path}") + def _print_summary(self, result: DineInDensityResult): """Print final summary.""" print("\n" + "=" * 70) @@ -861,11 +1160,22 @@ def main(): default=env_results_dir, help=f"Directory for results output (default: {env_results_dir}, env: RESULTS_DIR)" ) + parser.add_argument( + "--single_run", + action="store_true", + help="Run a single benchmark iteration without density scaling (simple benchmark mode)" + ) + parser.add_argument( + "--concurrent_images", + type=int, + default=1, + help="Number of concurrent images for single_run mode (default: 1)" + ) parser.add_argument( "--max_iterations", type=int, default=50, - help="Maximum number of density iterations to run (default: 50)" + help="Maximum iterations for density scaling (default: 50)" ) args = parser.parse_args() @@ -880,7 +1190,9 @@ def main(): print(f" REQUEST_TIMEOUT: {args.request_timeout}") print(f" API_ENDPOINT: {args.api_endpoint}") print(f" RESULTS_DIR: {args.results_dir}") - print(f" MAX_ITERATIONS: {args.max_iterations}") + print(f" SINGLE_RUN: {args.single_run}") + if args.single_run: + print(f" CONCURRENT_IMAGES: {args.concurrent_images}") print() # Validate compose file exists @@ -902,6 +1214,8 @@ def main(): init_duration=args.init_duration, min_requests=args.min_requests, request_timeout=args.request_timeout, + single_run=args.single_run, + concurrent_images=args.concurrent_images, max_iterations=args.max_iterations ) From 6a35613bf8bd25287da58e05bc6d5035ddd74228 Mon Sep 17 00:00:00 2001 From: Jitendra Saini Date: Mon, 2 Mar 2026 02:14:03 +0530 Subject: [PATCH 2/2] updated the density script --- benchmark-scripts/stream_density_latency_oa.py | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/benchmark-scripts/stream_density_latency_oa.py b/benchmark-scripts/stream_density_latency_oa.py index f8baf0c..3b895ab 100644 --- a/benchmark-scripts/stream_density_latency_oa.py +++ b/benchmark-scripts/stream_density_latency_oa.py @@ -252,6 +252,10 @@ def _run_iteration(self, workers: int) -> IterationResult: self.env_vars["WORKERS"] = str(workers) self.env_vars["VLM_WORKERS"] = str(workers) self.env_vars["SERVICE_MODE"] = "parallel" + # Set LOOP_COUNT to get enough transactions + # Each loop generates 1 transaction per station (worker) + # Need min_transactions per worker, so LOOP_COUNT = min_transactions + self.env_vars["LOOP_COUNT"] = str(self.min_transactions) # Stop any existing containers (with volume cleanup to clear sync state) print("Stopping existing containers...")