From ff6b3d2e454f43750a745a6621519bd0fe0f5a11 Mon Sep 17 00:00:00 2001 From: Jitendra Saini Date: Wed, 25 Feb 2026 11:28:26 +0530 Subject: [PATCH 1/2] order accuracy bug fixes and improvement --- benchmark-scripts/benchmark_order_accuracy.py | 163 ++---------- .../stream_density_latency_oa.py | 36 ++- .../stream_density_oa_dine_in.py | 249 ++++++++++++++++-- 3 files changed, 273 insertions(+), 175 deletions(-) diff --git a/benchmark-scripts/benchmark_order_accuracy.py b/benchmark-scripts/benchmark_order_accuracy.py index 17fa969..3e2dd02 100644 --- a/benchmark-scripts/benchmark_order_accuracy.py +++ b/benchmark-scripts/benchmark_order_accuracy.py @@ -101,34 +101,24 @@ def run_fixed_workers( self, workers: int, init_duration: int, - duration: int, - profile: str = "parallel", - iterations: int = 0 + duration: int ) -> Dict: """ Run benchmark with fixed number of station workers. Args: - workers: Number of concurrent station workers + workers: Number of concurrent station workers (RTSP streams) init_duration: Warmup duration in seconds - duration: Benchmark duration in seconds (ignored if iterations > 0) - profile: Docker compose profile to use (parallel, benchmark, worker) - iterations: Number of iterations per worker (0 = use duration-based) + duration: Benchmark duration in seconds Returns: Dictionary with benchmark results """ - is_iteration_mode = iterations > 0 - print(f"\n{'='*60}") print(f"Order Accuracy Benchmark - Fixed Workers Mode") print(f"Workers: {workers}") - print(f"Profile: {profile}") - if is_iteration_mode: - print(f"Iterations: {iterations} per worker") - else: - print(f"Init Duration: {init_duration}s") - print(f"Benchmark Duration: {duration}s") + print(f"Init Duration: {init_duration}s") + print(f"Benchmark Duration: {duration}s") print(f"{'='*60}\n") # Clean previous logs @@ -139,26 +129,17 @@ def run_fixed_workers( self.env_vars["VLM_WORKERS"] = str(workers) self.env_vars["SERVICE_MODE"] = "parallel" - # Set iterations for dine-in mode - if is_iteration_mode: - self.env_vars["ITERATIONS"] = str(iterations) - - # Start containers with specified profile + # Start containers with parallel profile print("Starting containers...") - self.docker_compose_cmd(f"--profile {profile} up", "-d") + self.docker_compose_cmd("--profile parallel up", "-d") # Wait for initialization print(f"Waiting {init_duration}s for initialization...") time.sleep(init_duration) - if is_iteration_mode: - # Wait for workers to complete iterations - print(f"Waiting for workers to complete {iterations} iterations...") - self._wait_for_workers_completion(workers, profile, timeout=duration if duration > 0 else 3600) - else: - # Run benchmark for fixed duration - print(f"Running benchmark for {duration}s...") - time.sleep(duration) + # Run benchmark + print(f"Running benchmark for {duration}s...") + time.sleep(duration) # Collect metrics results = self._collect_metrics(workers) @@ -166,108 +147,15 @@ def run_fixed_workers( # Collect VLM metrics from vlm_metrics_logger results["vlm_metrics"] = self._collect_vlm_logger_metrics() - # Collect dine-in results if in iteration mode - if is_iteration_mode: - results["worker_results"] = self._collect_worker_results() - # Stop containers print("Stopping containers...") - self.docker_compose_cmd(f"--profile {profile} down") + self.docker_compose_cmd("--profile parallel down") # Export results self._export_results(results, "fixed_workers") return results - def _wait_for_workers_completion(self, workers: int, profile: str, timeout: int = 3600): - """Wait for all worker containers to complete.""" - import subprocess - - start_time = time.time() - check_interval = 10 # seconds - - while time.time() - start_time < timeout: - # Check if any worker containers are still running - cmd = f"docker compose {' '.join(f'-f {f}' for f in self.compose_files)} --profile {profile} ps --format json" - result = subprocess.run(cmd, shell=True, capture_output=True, text=True, env=self.env_vars) - - if result.returncode != 0: - print(f"Warning: Could not check container status: {result.stderr}") - time.sleep(check_interval) - continue - - # Parse JSON output (docker compose ps outputs one JSON per line) - running_workers = 0 - for line in result.stdout.strip().split('\n'): - if not line: - continue - try: - container = json.loads(line) - name = container.get('Name', container.get('name', '')) - state = container.get('State', container.get('state', '')) - if 'worker' in name.lower() and state == 'running': - running_workers += 1 - except json.JSONDecodeError: - continue - - if running_workers == 0: - print("All workers completed.") - return - - print(f" {running_workers} workers still running...") - time.sleep(check_interval) - - print(f"Warning: Timeout reached after {timeout}s. Workers may not have completed.") - - def _collect_worker_results(self) -> Dict: - """Collect results from dine-in worker output files.""" - import glob - - worker_results = { - "total_iterations": 0, - "successful": 0, - "failed": 0, - "avg_latency_ms": 0.0, - "results_files": [] - } - - # Look for worker result files - patterns = ["worker_*.json", "results_*.json", "*results*.json"] - - for pattern in patterns: - for f in glob.glob(os.path.join(self.results_dir, pattern)): - try: - with open(f, 'r') as fp: - data = json.load(fp) - worker_results["results_files"].append(f) - - # Aggregate stats if present - if "stats" in data: - stats = data["stats"] - worker_results["total_iterations"] += stats.get("total_iterations", 0) - worker_results["successful"] += stats.get("successful_iterations", 0) - worker_results["failed"] += stats.get("failed_iterations", 0) - except (json.JSONDecodeError, IOError) as e: - print(f"Warning: Could not read results from {f}: {e}") - - # Calculate average latency - if worker_results["successful"] > 0: - total_latency = 0 - count = 0 - for f in worker_results["results_files"]: - try: - with open(f, 'r') as fp: - data = json.load(fp) - if "stats" in data and "avg_latency_ms" in data["stats"]: - total_latency += data["stats"]["avg_latency_ms"] - count += 1 - except: - pass - if count > 0: - worker_results["avg_latency_ms"] = total_latency / count - - return worker_results - def _clean_pipeline_logs(self): """Remove previous pipeline log files.""" import glob @@ -525,14 +413,11 @@ def parse_args(): formatter_class=argparse.RawDescriptionHelpFormatter, epilog=""" Examples: - # Take-Away: Run with 2 workers for 5 minutes (RTSP streams) + # Run with 2 workers for 5 minutes python benchmark_order_accuracy.py --compose_file ../../docker-compose.yaml --workers 2 --duration 300 - # Dine-In: Run with 2 workers, 10 iterations each (image-based) - python benchmark_order_accuracy.py --compose_file ../../docker-compose.yml --workers 2 --iterations 10 --profile benchmark - - # Dine-In: Quick test with 1 iteration - python benchmark_order_accuracy.py --compose_file ../../docker-compose.yml --workers 1 --iterations 1 --profile benchmark --skip_perf_tools + # Run with 1 worker (default) + python benchmark_order_accuracy.py --compose_file ../../docker-compose.yaml For stream density testing, use application-specific scripts: # Take-Away (RTSP/workers based) @@ -558,20 +443,6 @@ def parse_args(): help='Number of station workers to run' ) - parser.add_argument( - '--profile', - type=str, - default='parallel', - help='Docker compose profile to use (parallel for take-away, benchmark for dine-in)' - ) - - parser.add_argument( - '--iterations', - type=int, - default=0, - help='Number of iterations per worker (dine-in mode). 0 = use duration-based.' - ) - parser.add_argument( '--init_duration', type=int, @@ -583,7 +454,7 @@ def parse_args(): '--duration', type=int, default=300, - help='Benchmark duration in seconds (ignored if --iterations > 0)' + help='Benchmark duration in seconds' ) parser.add_argument( @@ -643,9 +514,7 @@ def main(): results = benchmark.run_fixed_workers( workers=args.workers, init_duration=args.init_duration, - duration=args.duration, - profile=args.profile, - iterations=args.iterations + duration=args.duration ) print(f"\nBenchmark complete. Results: {results}") diff --git a/benchmark-scripts/stream_density_latency_oa.py b/benchmark-scripts/stream_density_latency_oa.py index 0b2dcdd..38563fc 100644 --- a/benchmark-scripts/stream_density_latency_oa.py +++ b/benchmark-scripts/stream_density_latency_oa.py @@ -141,7 +141,7 @@ def run(self) -> DensityResult: print(f"Latency Metric: {self.latency_metric}") print(f"Worker Increment: {self.worker_increment}") print(f"Init Duration: {self.init_duration}s") - print(f"Min Transactions: {self.min_transactions}") + print(f"Min Transactions: {self.min_transactions} per worker (scales with worker count)") print("=" * 70) workers = 1 @@ -163,9 +163,10 @@ def run(self) -> DensityResult: # Run benchmark iteration result = self._run_iteration(workers) - # Check if we got enough transactions - if result.total_transactions < self.min_transactions: - print(f"WARNING: Only got {result.total_transactions} transactions (need {self.min_transactions})") + # Check if we got enough transactions (scaled by worker count) + required_transactions = self.min_transactions * workers + if result.total_transactions < required_transactions: + print(f"WARNING: Only got {result.total_transactions} transactions (need {required_transactions} = {self.min_transactions} × {workers} workers)") print("Latency measurement may be unreliable.") self.iterations.append(result) @@ -192,6 +193,12 @@ def run(self) -> DensityResult: print(f" ⚠ NO DATA - No latency measurements collected") print(f" Trying next iteration anyway...") workers += self.worker_increment + elif current_latency < 0: + print(f" ⚠ CORRUPTED METRICS - Negative latency {current_latency:.0f}ms detected") + print(f" This usually means a video-loop ID collision in the metrics file.") + print(f" Discarding this iteration result; trying next...") + self.iterations.pop() # Remove the corrupt entry already appended above + workers += self.worker_increment else: print(f" ✗ FAILED (latency {current_latency/1000:.1f}s > {self.target_latency_ms/1000:.1f}s)") break @@ -241,12 +248,13 @@ def _run_iteration(self, workers: int) -> IterationResult: print(f"Waiting {self.init_duration}s for initialization...") time.sleep(self.init_duration) - # Poll for transactions until we have enough - print(f"Waiting for {self.min_transactions} transactions...") + # Poll for transactions until we have enough (scaled by worker count) + required_transactions = self.min_transactions * workers + print(f"Waiting for {required_transactions} transactions ({self.min_transactions} per worker × {workers} worker(s))...") start_time = time.time() transactions = 0 - while transactions < self.min_transactions: + while transactions < required_transactions: elapsed = time.time() - start_time if elapsed > self.MAX_WAIT_SEC: @@ -257,9 +265,9 @@ def _run_iteration(self, workers: int) -> IterationResult: metrics = self._collect_vlm_logger_metrics() transactions = metrics.get("total_transactions", 0) - if transactions < self.min_transactions: - remaining = self.min_transactions - transactions - print(f" Transactions: {transactions}/{self.min_transactions} " + if transactions < required_transactions: + remaining = required_transactions - transactions + print(f" Transactions: {transactions}/{required_transactions} " f"(waiting for {remaining} more, {elapsed:.0f}s elapsed)") time.sleep(self.POLL_INTERVAL_SEC) @@ -324,6 +332,14 @@ def _collect_vlm_logger_metrics(self) -> Dict: timestamp = int(ts_match.group(1)) if event == "start": + # If this ID already has a completed start+end pair (video loop + # reuse), save it under a unique key before overwriting so the + # latency isn't corrupted by end(loop N) - start(loop N+1). + if unique_id in start_times and unique_id in end_times: + saved_key = f"{unique_id}_run{len([k for k in start_times if k.startswith(unique_id)])}" + start_times[saved_key] = start_times[unique_id] + end_times[saved_key] = end_times[unique_id] + del end_times[unique_id] start_times[unique_id] = timestamp elif event == "end": end_times[unique_id] = timestamp diff --git a/benchmark-scripts/stream_density_oa_dine_in.py b/benchmark-scripts/stream_density_oa_dine_in.py index 7cf63ec..64a14db 100644 --- a/benchmark-scripts/stream_density_oa_dine_in.py +++ b/benchmark-scripts/stream_density_oa_dine_in.py @@ -238,7 +238,30 @@ async def validate_image( if resp.status == 200: result["success"] = True - result["response"] = await resp.json() + response_data = await resp.json() + result["response"] = response_data + # Log validation outcome + accuracy = response_data.get("accuracy_score", 0) + complete = response_data.get("order_complete", False) + matched = response_data.get("matched_items", []) + missing = response_data.get("missing_items", []) + extra = response_data.get("extra_items", []) + mismatches = response_data.get("quantity_mismatches", []) + status_str = "COMPLETE" if complete else "INCOMPLETE" + logger.info( + f"[VALIDATE] {request_id} | {status_str} | " + f"accuracy={accuracy:.2f} | latency={result['latency_ms']:.0f}ms | " + f"matched={len(matched)} missing={len(missing)} " + f"extra={len(extra)} mismatches={len(mismatches)}" + ) + if missing: + missing_names = [item.get('name', item) for item in missing] + logger.info(f"[VALIDATE] {request_id} | MISSING: {missing_names}") + if extra: + extra_names = [item.get('name', item) for item in extra] + logger.info(f"[VALIDATE] {request_id} | EXTRA: {extra_names}") + if mismatches: + logger.info(f"[VALIDATE] {request_id} | QUANTITY MISMATCHES: {mismatches}") else: result["error"] = f"HTTP {resp.status}: {await resp.text()}" @@ -335,7 +358,7 @@ class DineInStreamDensity: # Configuration constants DEFAULT_TARGET_LATENCY_MS = 15000 # 15 seconds - DEFAULT_MAX_ITERATIONS = 50 + MAX_ITERATIONS = 50 MEMORY_SAFETY_THRESHOLD_PERCENT = 90 MIN_REQUESTS_PER_ITERATION = 3 @@ -351,8 +374,7 @@ def __init__( density_increment: int = 1, init_duration: int = 60, min_requests: int = MIN_REQUESTS_PER_ITERATION, - request_timeout: int = 300, - max_iterations: int = DEFAULT_MAX_ITERATIONS + request_timeout: int = 300 ): self.compose_file = compose_file self.results_dir = Path(results_dir) @@ -363,7 +385,6 @@ def __init__( self.init_duration = init_duration self.min_requests = min_requests self.request_timeout = request_timeout - self.max_iterations = max_iterations # Resolve paths relative to compose file compose_dir = Path(compose_file).parent @@ -398,7 +419,7 @@ def run(self) -> DineInDensityResult: best_result: Optional[DineInIterationResult] = None total_images = 0 - for iteration in range(1, self.max_iterations + 1): + for iteration in range(1, self.MAX_ITERATIONS + 1): print(f"\n{'='*70}") print(f"Iteration {iteration}: Testing density={density} concurrent images") print(f"{'='*70}") @@ -503,6 +524,24 @@ def _run_iteration(self, density: int, iteration: int) -> DineInIterationResult: latencies = [r["latency_ms"] for r in results if r.get("success")] successful = len(latencies) failed = len(results) - successful + + # Log per-request validation summary + for r in results: + if r.get("success") and r.get("response"): + resp = r["response"] + complete = resp.get("order_complete", False) + accuracy = resp.get("accuracy_score", 0) + matched = len(resp.get("matched_items", [])) + missing = len(resp.get("missing_items", [])) + extra = len(resp.get("extra_items", [])) + print( + f" [{r.get('image_id', '?')}] " + f"{'✓ COMPLETE' if complete else '✗ INCOMPLETE'} | " + f"accuracy={accuracy:.2f} | matched={matched} missing={missing} extra={extra} | " + f"latency={r['latency_ms']:.0f}ms" + ) + elif not r.get("success"): + print(f" [{r.get('image_id', '?')}] ✗ FAILED - {r.get('error', 'unknown error')}") # Calculate metrics from HTTP response latencies avg_latency = sum(latencies) / len(latencies) if latencies else 0 @@ -520,7 +559,7 @@ def _run_iteration(self, density: int, iteration: int) -> DineInIterationResult: memory_percent = psutil.virtual_memory().percent cpu_percent = psutil.cpu_percent(interval=0.5) - return DineInIterationResult( + iteration_result = DineInIterationResult( density=density, avg_latency_ms=avg_latency, p95_latency_ms=p95_latency, @@ -535,6 +574,9 @@ def _run_iteration(self, density: int, iteration: int) -> DineInIterationResult: cpu_percent=cpu_percent, timestamp=datetime.now().strftime("%Y-%m-%d %H:%M:%S") ) + # Attach raw results for detailed validation export + iteration_result._raw_results = results + return iteration_result def _get_latency_metric(self, result: DineInIterationResult) -> float: """Get the configured latency metric value.""" @@ -576,7 +618,7 @@ def _wait_for_ready(self): if resp.status == 200: ready = True break - except (urllib.error.URLError, TimeoutError, ConnectionResetError, ConnectionRefusedError, OSError): + except Exception: pass time.sleep(5) @@ -703,6 +745,29 @@ def _export_results(self, result: DineInDensityResult): } if result.best_iteration else None } + # Collect all per-request validation details across all iterations + all_validations = [] + for it in result.iterations: + if hasattr(it, '_raw_results'): + for r in it._raw_results: + if r.get("success") and r.get("response"): + resp = r["response"] + all_validations.append({ + "density": it.density, + "request_id": r.get("request_id"), + "image_id": r.get("image_id"), + "latency_ms": round(r["latency_ms"], 2), + "order_complete": resp.get("order_complete"), + "accuracy_score": resp.get("accuracy_score"), + "matched_items": resp.get("matched_items", []), + "missing_items": resp.get("missing_items", []), + "extra_items": resp.get("extra_items", []), + "quantity_mismatches": resp.get("quantity_mismatches", []), + "metrics": resp.get("metrics") + }) + if all_validations: + result_dict["validation_details"] = all_validations + # Export JSON json_path = self.results_dir / f"dinein_density_results_{timestamp}.json" with open(json_path, 'w') as f: @@ -720,7 +785,163 @@ def _export_results(self, result: DineInDensityResult): f.write(f"{it.successful_requests},{it.failed_requests},{it.passed},") f.write(f"{it.memory_percent:.1f},{it.cpu_percent:.1f}\n") print(f"Summary exported to: {csv_path}") - + + # Export per-request validation details CSV + val_csv_path = self.results_dir / f"dinein_validation_details_{timestamp}.csv" + with open(val_csv_path, 'w') as f: + f.write("density,request_id,image_id,latency_ms,order_complete,accuracy_score," + "matched,missing,extra,mismatches\n") + for it in result.iterations: + raw = getattr(it, '_raw_results', []) + for r in raw: + if r.get("success") and r.get("response"): + resp = r["response"] + f.write( + f"{it.density},{r.get('request_id','')},{r.get('image_id','')}," + f"{r['latency_ms']:.0f},{resp.get('order_complete','')}," + f"{resp.get('accuracy_score', 0):.2f}," + f"{len(resp.get('matched_items',[]))}," + f"{len(resp.get('missing_items',[]))}," + f"{len(resp.get('extra_items',[]))}," + f"{len(resp.get('quantity_mismatches',[]))}\n" + ) + print(f"Validation details exported to: {val_csv_path}") + + # Export Markdown report + self._export_markdown_report(result, timestamp) + + def _export_markdown_report(self, result: DineInDensityResult, timestamp: str): + """Export a human-readable Markdown validation report per iteration and image.""" + md_path = self.results_dir / f"dinein_validation_report_{timestamp}.md" + + with open(md_path, 'w') as f: + f.write(f"# Dine-In Stream Density — Validation Report\n\n") + f.write(f"**Generated:** {datetime.now().strftime('%Y-%m-%d %H:%M:%S')} \n") + f.write(f"**Target Latency:** {result.target_latency_ms:.0f} ms " + f"({result.target_latency_ms / 1000:.1f} s) \n") + f.write(f"**Max Density Achieved:** {result.max_density} concurrent image(s) \n") + f.write(f"**Met Target:** {'✅ Yes' if result.met_target else '❌ No'} \n") + f.write(f"**Total Images Processed:** {result.total_images_processed} \n\n") + f.write("---\n\n") + + for it in result.iterations: + status_icon = "✅" if it.passed else "❌" + f.write(f"## {status_icon} Iteration — Density {it.density} " + f"({'PASS' if it.passed else 'FAIL'})\n\n") + + f.write(f"| Metric | Value |\n") + f.write(f"|--------|-------|\n") + f.write(f"| Concurrent Images | {it.density} |\n") + f.write(f"| Avg Latency | {it.avg_latency_ms:.0f} ms " + f"({it.avg_latency_ms / 1000:.2f} s) |\n") + f.write(f"| P95 Latency | {it.p95_latency_ms:.0f} ms " + f"({it.p95_latency_ms / 1000:.2f} s) |\n") + f.write(f"| Min / Max Latency | {it.min_latency_ms:.0f} ms / " + f"{it.max_latency_ms:.0f} ms |\n") + f.write(f"| Avg TPS | {it.avg_tps:.2f} |\n") + f.write(f"| Requests | {it.successful_requests} succeeded / " + f"{it.failed_requests} failed |\n") + f.write(f"| Memory | {it.memory_percent:.1f}% |\n") + f.write(f"| CPU | {it.cpu_percent:.1f}% |\n") + f.write(f"| Timestamp | {it.timestamp} |\n\n") + + # Per-image validation results + raw = getattr(it, '_raw_results', []) + if not raw: + f.write("_No per-image results available for this iteration._\n\n") + f.write("---\n\n") + continue + + for r in raw: + image_id = r.get("image_id", "unknown") + latency_ms = r.get("latency_ms", 0) + + if not r.get("success"): + f.write(f"### 🖼️ `{image_id}` — ❌ Request Failed\n\n") + f.write(f"> **Error:** {r.get('error', 'unknown error')} \n") + f.write(f"> **Latency:** {latency_ms:.0f} ms\n\n") + continue + + resp = r.get("response", {}) + order_complete = resp.get("order_complete", False) + accuracy = resp.get("accuracy_score", 0.0) + matched_items = resp.get("matched_items", []) + missing_items = resp.get("missing_items", []) + extra_items = resp.get("extra_items", []) + qty_mismatches = resp.get("quantity_mismatches", []) + metrics = resp.get("metrics") or {} + + complete_icon = "✅" if order_complete else "❌" + complete_label = "Order Complete" if order_complete else "Order Incomplete" + + f.write(f"### 🖼️ `{image_id}`\n\n") + f.write(f"**✅ Validation Result**\n\n") + f.write(f"{complete_icon} **{complete_label}** \n") + f.write(f"**Accuracy:** {accuracy * 100:.0f}% \n") + f.write(f"**Latency:** {latency_ms:.0f} ms \n\n") + + if matched_items: + f.write(f"**✔️ Matched Items**\n\n") + for item in matched_items: + name = item.get("detected_name") or item.get("expected_name", "?") + qty = item.get("quantity", 1) + sim = item.get("similarity", 0) + f.write(f"- {name} (×{qty}) _{sim * 100:.0f}% match_\n") + f.write("\n") + + if missing_items: + f.write(f"**⚠️ Missing Items**\n\n") + for item in missing_items: + name = item.get("name", "?") + qty = item.get("quantity", 1) + f.write(f"- {name} (×{qty})\n") + f.write("\n") + + if extra_items: + f.write(f"**➕ Extra Items Detected**\n\n") + for item in extra_items: + name = item.get("name", "?") + qty = item.get("quantity", 1) + f.write(f"- {name} (×{qty})\n") + f.write("\n") + + if qty_mismatches: + f.write(f"**🔢 Quantity Mismatches**\n\n") + for item in qty_mismatches: + name = item.get("item", "?") + exp = item.get("expected_quantity", "?") + got = item.get("detected_quantity", "?") + f.write(f"- {name}: expected ×{exp}, detected ×{got}\n") + f.write("\n") + + if metrics: + f.write(f"**📊 Performance Metrics**\n\n") + f.write(f"| Metric | Value |\n") + f.write(f"|--------|-------|\n") + for k, v in metrics.items(): + f.write(f"| {k} | {v} |\n") + f.write("\n") + + f.write("---\n\n") + + # Final summary section + f.write("## 📋 Summary Table\n\n") + f.write("| Density | Avg Latency | P95 Latency | TPS | Succeeded | Failed | Status |\n") + f.write("|---------|-------------|-------------|-----|-----------|--------|--------|\n") + for it in result.iterations: + status = "✅ PASS" if it.passed else "❌ FAIL" + f.write( + f"| {it.density} " + f"| {it.avg_latency_ms:.0f} ms ({it.avg_latency_ms / 1000:.2f}s) " + f"| {it.p95_latency_ms:.0f} ms ({it.p95_latency_ms / 1000:.2f}s) " + f"| {it.avg_tps:.2f} " + f"| {it.successful_requests} " + f"| {it.failed_requests} " + f"| {status} |\n" + ) + + print(f"Markdown report exported to: {md_path}") + def _print_summary(self, result: DineInDensityResult): """Print final summary.""" print("\n" + "=" * 70) @@ -861,12 +1082,6 @@ def main(): default=env_results_dir, help=f"Directory for results output (default: {env_results_dir}, env: RESULTS_DIR)" ) - parser.add_argument( - "--max_iterations", - type=int, - default=50, - help="Maximum number of density iterations to run (default: 50)" - ) args = parser.parse_args() @@ -880,7 +1095,6 @@ def main(): print(f" REQUEST_TIMEOUT: {args.request_timeout}") print(f" API_ENDPOINT: {args.api_endpoint}") print(f" RESULTS_DIR: {args.results_dir}") - print(f" MAX_ITERATIONS: {args.max_iterations}") print() # Validate compose file exists @@ -901,8 +1115,7 @@ def main(): density_increment=args.density_increment, init_duration=args.init_duration, min_requests=args.min_requests, - request_timeout=args.request_timeout, - max_iterations=args.max_iterations + request_timeout=args.request_timeout ) result = tester.run() From 63f6c552782bbe62a34f30bdba7144068c985bc8 Mon Sep 17 00:00:00 2001 From: Jitendra Saini Date: Wed, 25 Feb 2026 19:28:40 +0530 Subject: [PATCH 2/2] result for dine iin benchmark --- benchmark-scripts/benchmark_order_accuracy.py | 184 ++++++++++++++++-- .../stream_density_oa_dine_in.py | 111 ++++++++++- 2 files changed, 273 insertions(+), 22 deletions(-) diff --git a/benchmark-scripts/benchmark_order_accuracy.py b/benchmark-scripts/benchmark_order_accuracy.py index 3e2dd02..df8dd5a 100644 --- a/benchmark-scripts/benchmark_order_accuracy.py +++ b/benchmark-scripts/benchmark_order_accuracy.py @@ -101,24 +101,34 @@ def run_fixed_workers( self, workers: int, init_duration: int, - duration: int + duration: int, + profile: str = "parallel", + iterations: int = 0 ) -> Dict: """ Run benchmark with fixed number of station workers. Args: - workers: Number of concurrent station workers (RTSP streams) + workers: Number of concurrent station workers init_duration: Warmup duration in seconds - duration: Benchmark duration in seconds + duration: Benchmark duration in seconds (ignored if iterations > 0) + profile: Docker compose profile to use (parallel for take-away, benchmark for dine-in) + iterations: Number of iterations per worker (0 = use duration-based) Returns: Dictionary with benchmark results """ + is_iteration_mode = iterations > 0 + print(f"\n{'='*60}") print(f"Order Accuracy Benchmark - Fixed Workers Mode") print(f"Workers: {workers}") - print(f"Init Duration: {init_duration}s") - print(f"Benchmark Duration: {duration}s") + print(f"Profile: {profile}") + if is_iteration_mode: + print(f"Iterations: {iterations} per worker") + else: + print(f"Init Duration: {init_duration}s") + print(f"Benchmark Duration: {duration}s") print(f"{'='*60}\n") # Clean previous logs @@ -129,17 +139,26 @@ def run_fixed_workers( self.env_vars["VLM_WORKERS"] = str(workers) self.env_vars["SERVICE_MODE"] = "parallel" - # Start containers with parallel profile + # Set iterations for dine-in mode + if is_iteration_mode: + self.env_vars["ITERATIONS"] = str(iterations) + + # Start containers with specified profile print("Starting containers...") - self.docker_compose_cmd("--profile parallel up", "-d") + self.docker_compose_cmd(f"--profile {profile} up", "-d") # Wait for initialization print(f"Waiting {init_duration}s for initialization...") time.sleep(init_duration) - # Run benchmark - print(f"Running benchmark for {duration}s...") - time.sleep(duration) + if is_iteration_mode: + # Wait for workers to complete iterations + print(f"Waiting for workers to complete {iterations} iterations...") + self._wait_for_workers_completion(workers, profile, timeout=duration if duration > 0 else 3600) + else: + # Run benchmark for fixed duration + print(f"Running benchmark for {duration}s...") + time.sleep(duration) # Collect metrics results = self._collect_metrics(workers) @@ -147,15 +166,127 @@ def run_fixed_workers( # Collect VLM metrics from vlm_metrics_logger results["vlm_metrics"] = self._collect_vlm_logger_metrics() + # Collect dine-in results if in iteration mode + if is_iteration_mode: + results["worker_results"] = self._collect_worker_results() + # Stop containers print("Stopping containers...") - self.docker_compose_cmd("--profile parallel down") + self.docker_compose_cmd(f"--profile {profile} down") # Export results self._export_results(results, "fixed_workers") return results + def _wait_for_workers_completion(self, workers: int, profile: str, timeout: int = 3600): + """Wait for all worker containers to complete.""" + import subprocess + + start_time = time.time() + check_interval = 10 # seconds + + while time.time() - start_time < timeout: + # Check if any worker containers are still running + cmd = f"docker compose {' '.join(f'-f {f}' for f in self.compose_files)} --profile {profile} ps --format json" + result = subprocess.run(cmd, shell=True, capture_output=True, text=True, env=self.env_vars) + + if result.returncode != 0: + print(f"Warning: Could not check container status: {result.stderr}") + time.sleep(check_interval) + continue + + # Parse JSON output (docker compose ps outputs one JSON per line) + running_workers = 0 + for line in result.stdout.strip().split('\n'): + if not line: + continue + try: + container = json.loads(line) + name = container.get('Name', container.get('name', '')) + state = container.get('State', container.get('state', '')) + if 'worker' in name.lower() and state == 'running': + running_workers += 1 + except json.JSONDecodeError: + continue + + if running_workers == 0: + print("All workers completed.") + return + + print(f" {running_workers} workers still running...") + time.sleep(check_interval) + + print(f"Warning: Timeout reached after {timeout}s. Workers may not have completed.") + + def _collect_worker_results(self) -> Dict: + """Collect results from dine-in worker output files.""" + import glob + + worker_results = { + "total_iterations": 0, + "successful": 0, + "failed": 0, + "avg_latency_ms": 0.0, + "results_files": [], + "details": [] + } + + # Look for worker result files (only worker_*.json pattern to avoid duplicates) + seen_files = set() + for f in glob.glob(os.path.join(self.results_dir, "worker_*.json")): + if f in seen_files: + continue + seen_files.add(f) + try: + with open(f, 'r') as fp: + data = json.load(fp) + worker_results["results_files"].append(f) + + # Worker results have stats at root level, not in "stats" key + worker_results["total_iterations"] += data.get("total_iterations", 0) + worker_results["successful"] += data.get("successful_iterations", 0) + worker_results["failed"] += data.get("failed_iterations", 0) + + # Collect detailed results + if "results" in data: + for result in data["results"]: + worker_results["details"].append({ + "worker_id": result.get("worker_id"), + "order_id": result.get("order_id"), + "success": result.get("success"), + "order_complete": result.get("order_complete"), + "accuracy_score": result.get("accuracy_score"), + "items_detected": result.get("items_detected"), + "items_expected": result.get("items_expected"), + "missing_items": result.get("missing_items", 0), + "extra_items": result.get("extra_items", 0), + "missing_items_list": result.get("missing_items_list", []), + "extra_items_list": result.get("extra_items_list", []), + "total_latency_ms": result.get("total_latency_ms"), + "tps": result.get("tps") + }) + except (json.JSONDecodeError, IOError) as e: + print(f"Warning: Could not read results from {f}: {e}") + + # Calculate average latency from root level avg_latency_ms + if worker_results["successful"] > 0: + total_latency = 0 + count = 0 + for f in worker_results["results_files"]: + try: + with open(f, 'r') as fp: + data = json.load(fp) + if "avg_latency_ms" in data: + total_latency += data["avg_latency_ms"] + count += 1 + except: + pass + if count > 0: + worker_results["avg_latency_ms"] = total_latency / count + + return worker_results + def _clean_pipeline_logs(self): """Remove previous pipeline log files.""" import glob @@ -413,11 +544,14 @@ def parse_args(): formatter_class=argparse.RawDescriptionHelpFormatter, epilog=""" Examples: - # Run with 2 workers for 5 minutes + # Take-Away: Run with 2 workers for 5 minutes (RTSP streams) python benchmark_order_accuracy.py --compose_file ../../docker-compose.yaml --workers 2 --duration 300 - # Run with 1 worker (default) - python benchmark_order_accuracy.py --compose_file ../../docker-compose.yaml + # Dine-In: Run with 2 workers, 10 iterations each (image-based) + python benchmark_order_accuracy.py --compose_file ../../docker-compose.yml --workers 2 --iterations 10 --profile benchmark + + # Dine-In: Quick test with 1 iteration + python benchmark_order_accuracy.py --compose_file ../../docker-compose.yml --workers 1 --iterations 1 --profile benchmark --skip_perf_tools For stream density testing, use application-specific scripts: # Take-Away (RTSP/workers based) @@ -443,6 +577,20 @@ def parse_args(): help='Number of station workers to run' ) + parser.add_argument( + '--profile', + type=str, + default='parallel', + help='Docker compose profile to use (parallel for take-away, benchmark for dine-in)' + ) + + parser.add_argument( + '--iterations', + type=int, + default=0, + help='Number of iterations per worker (dine-in mode). 0 = use duration-based.' + ) + parser.add_argument( '--init_duration', type=int, @@ -454,7 +602,7 @@ def parse_args(): '--duration', type=int, default=300, - help='Benchmark duration in seconds' + help='Benchmark duration in seconds (ignored if --iterations > 0)' ) parser.add_argument( @@ -514,10 +662,12 @@ def main(): results = benchmark.run_fixed_workers( workers=args.workers, init_duration=args.init_duration, - duration=args.duration + duration=args.duration, + profile=args.profile, + iterations=args.iterations ) print(f"\nBenchmark complete. Results: {results}") if __name__ == '__main__': - main() \ No newline at end of file + main() diff --git a/benchmark-scripts/stream_density_oa_dine_in.py b/benchmark-scripts/stream_density_oa_dine_in.py index 64a14db..ca51304 100644 --- a/benchmark-scripts/stream_density_oa_dine_in.py +++ b/benchmark-scripts/stream_density_oa_dine_in.py @@ -358,7 +358,7 @@ class DineInStreamDensity: # Configuration constants DEFAULT_TARGET_LATENCY_MS = 15000 # 15 seconds - MAX_ITERATIONS = 50 + DEFAULT_MAX_ITERATIONS = 50 MEMORY_SAFETY_THRESHOLD_PERCENT = 90 MIN_REQUESTS_PER_ITERATION = 3 @@ -374,7 +374,10 @@ def __init__( density_increment: int = 1, init_duration: int = 60, min_requests: int = MIN_REQUESTS_PER_ITERATION, - request_timeout: int = 300 + request_timeout: int = 300, + single_run: bool = False, + concurrent_images: int = 1, + max_iterations: int = DEFAULT_MAX_ITERATIONS ): self.compose_file = compose_file self.results_dir = Path(results_dir) @@ -385,6 +388,9 @@ def __init__( self.init_duration = init_duration self.min_requests = min_requests self.request_timeout = request_timeout + self.single_run = single_run + self.concurrent_images = concurrent_images + self.max_iterations = max_iterations # Resolve paths relative to compose file compose_dir = Path(compose_file).parent @@ -403,6 +409,9 @@ def __init__( logger.info(f" Images: {self.images_dir}") logger.info(f" Orders: {self.orders_file}") logger.info(f" Target Latency: {self.target_latency_ms}ms") + logger.info(f" Single Run Mode: {self.single_run}") + if self.single_run: + logger.info(f" Concurrent Images: {self.concurrent_images}") def run(self) -> DineInDensityResult: """ @@ -415,11 +424,13 @@ def run(self) -> DineInDensityResult: """ self._print_header() - density = 1 + # In single_run mode, just run once with specified concurrent_images + density = self.concurrent_images if self.single_run else 1 + max_iter = 1 if self.single_run else self.max_iterations best_result: Optional[DineInIterationResult] = None total_images = 0 - for iteration in range(1, self.MAX_ITERATIONS + 1): + for iteration in range(1, max_iter + 1): print(f"\n{'='*70}") print(f"Iteration {iteration}: Testing density={density} concurrent images") print(f"{'='*70}") @@ -543,6 +554,9 @@ def _run_iteration(self, density: int, iteration: int) -> DineInIterationResult: elif not r.get("success"): print(f" [{r.get('image_id', '?')}] ✗ FAILED - {r.get('error', 'unknown error')}") + # Print detailed validation results + self._print_detailed_validation_results(results) + # Calculate metrics from HTTP response latencies avg_latency = sum(latencies) / len(latencies) if latencies else 0 sorted_latencies = sorted(latencies) if latencies else [0] @@ -588,6 +602,70 @@ def _get_latency_metric(self, result: DineInIterationResult) -> float: return result.max_latency_ms return result.avg_latency_ms + def _print_detailed_validation_results(self, results: List[Dict]): + """Print detailed validation results with item breakdowns.""" + print("\n" + "=" * 70) + print("VALIDATION DETAILS") + print("=" * 70) + + for r in results: + image_id = r.get("image_id", "unknown") + latency_ms = r.get("latency_ms", 0) + + if not r.get("success"): + print(f"\n🖼️ {image_id} - ❌ Request Failed") + print(f" Error: {r.get('error', 'unknown error')}") + print(f" Latency: {latency_ms:.0f}ms") + continue + + resp = r.get("response", {}) + order_complete = resp.get("order_complete", False) + accuracy = resp.get("accuracy_score", 0.0) + matched_items = resp.get("matched_items", []) + missing_items = resp.get("missing_items", []) + extra_items = resp.get("extra_items", []) + qty_mismatches = resp.get("quantity_mismatches", []) + + complete_icon = "✅" if order_complete else "❌" + complete_label = "Order Complete" if order_complete else "Order Incomplete" + + print(f"\n🖼️ {image_id}") + print(f" {complete_icon} {complete_label}") + print(f" Accuracy: {accuracy * 100:.0f}%") + print(f" Latency: {latency_ms:.0f}ms") + + if matched_items: + print(f" ✔️ Matched Items:") + for item in matched_items: + name = item.get("detected_name") or item.get("expected_name", "?") + qty = item.get("quantity", 1) + sim = item.get("similarity", 0) + print(f" • {name} (×{qty}) - {sim * 100:.0f}% match") + + if missing_items: + print(f" ⚠️ Missing Items:") + for item in missing_items: + name = item.get("name", "?") + qty = item.get("quantity", 1) + print(f" • {name} (×{qty})") + + if extra_items: + print(f" ➕ Extra Items Detected:") + for item in extra_items: + name = item.get("name", "?") + qty = item.get("quantity", 1) + print(f" • {name} (×{qty})") + + if qty_mismatches: + print(f" 🔢 Quantity Mismatches:") + for item in qty_mismatches: + name = item.get("item", "?") + exp = item.get("expected_quantity", "?") + got = item.get("detected_quantity", "?") + print(f" • {name}: expected ×{exp}, detected ×{got}") + + print("\n" + "=" * 70) + def _start_services(self): """Start dine-in services via docker compose.""" print("Starting dine-in services...") @@ -1082,6 +1160,23 @@ def main(): default=env_results_dir, help=f"Directory for results output (default: {env_results_dir}, env: RESULTS_DIR)" ) + parser.add_argument( + "--single_run", + action="store_true", + help="Run a single benchmark iteration without density scaling (simple benchmark mode)" + ) + parser.add_argument( + "--concurrent_images", + type=int, + default=1, + help="Number of concurrent images for single_run mode (default: 1)" + ) + parser.add_argument( + "--max_iterations", + type=int, + default=50, + help="Maximum iterations for density scaling (default: 50)" + ) args = parser.parse_args() @@ -1095,6 +1190,9 @@ def main(): print(f" REQUEST_TIMEOUT: {args.request_timeout}") print(f" API_ENDPOINT: {args.api_endpoint}") print(f" RESULTS_DIR: {args.results_dir}") + print(f" SINGLE_RUN: {args.single_run}") + if args.single_run: + print(f" CONCURRENT_IMAGES: {args.concurrent_images}") print() # Validate compose file exists @@ -1115,7 +1213,10 @@ def main(): density_increment=args.density_increment, init_duration=args.init_duration, min_requests=args.min_requests, - request_timeout=args.request_timeout + request_timeout=args.request_timeout, + single_run=args.single_run, + concurrent_images=args.concurrent_images, + max_iterations=args.max_iterations ) result = tester.run()