diff --git a/benchmark-scripts/benchmark_order_accuracy.py b/benchmark-scripts/benchmark_order_accuracy.py index 17fa969..df8dd5a 100644 --- a/benchmark-scripts/benchmark_order_accuracy.py +++ b/benchmark-scripts/benchmark_order_accuracy.py @@ -112,7 +112,7 @@ def run_fixed_workers( workers: Number of concurrent station workers init_duration: Warmup duration in seconds duration: Benchmark duration in seconds (ignored if iterations > 0) - profile: Docker compose profile to use (parallel, benchmark, worker) + profile: Docker compose profile to use (parallel for take-away, benchmark for dine-in) iterations: Number of iterations per worker (0 = use duration-based) Returns: @@ -228,29 +228,48 @@ def _collect_worker_results(self) -> Dict: "successful": 0, "failed": 0, "avg_latency_ms": 0.0, - "results_files": [] + "results_files": [], + "details": [] } - # Look for worker result files - patterns = ["worker_*.json", "results_*.json", "*results*.json"] - - for pattern in patterns: - for f in glob.glob(os.path.join(self.results_dir, pattern)): - try: - with open(f, 'r') as fp: - data = json.load(fp) - worker_results["results_files"].append(f) - - # Aggregate stats if present - if "stats" in data: - stats = data["stats"] - worker_results["total_iterations"] += stats.get("total_iterations", 0) - worker_results["successful"] += stats.get("successful_iterations", 0) - worker_results["failed"] += stats.get("failed_iterations", 0) - except (json.JSONDecodeError, IOError) as e: - print(f"Warning: Could not read results from {f}: {e}") - - # Calculate average latency + # Look for worker result files (only worker_*.json pattern to avoid duplicates) + seen_files = set() + for f in glob.glob(os.path.join(self.results_dir, "worker_*.json")): + if f in seen_files: + continue + seen_files.add(f) + try: + with open(f, 'r') as fp: + data = json.load(fp) + worker_results["results_files"].append(f) + + # Worker results have stats at root level, not in "stats" key + worker_results["total_iterations"] += data.get("total_iterations", 0) + worker_results["successful"] += data.get("successful_iterations", 0) + worker_results["failed"] += data.get("failed_iterations", 0) + + # Collect detailed results + if "results" in data: + for result in data["results"]: + worker_results["details"].append({ + "worker_id": result.get("worker_id"), + "order_id": result.get("order_id"), + "success": result.get("success"), + "order_complete": result.get("order_complete"), + "accuracy_score": result.get("accuracy_score"), + "items_detected": result.get("items_detected"), + "items_expected": result.get("items_expected"), + "missing_items": result.get("missing_items", 0), + "extra_items": result.get("extra_items", 0), + "missing_items_list": result.get("missing_items_list", []), + "extra_items_list": result.get("extra_items_list", []), + "total_latency_ms": result.get("total_latency_ms"), + "tps": result.get("tps") + }) + except (json.JSONDecodeError, IOError) as e: + print(f"Warning: Could not read results from {f}: {e}") + + # Calculate average latency from root level avg_latency_ms if worker_results["successful"] > 0: total_latency = 0 count = 0 @@ -258,8 +277,8 @@ def _collect_worker_results(self) -> Dict: try: with open(f, 'r') as fp: data = json.load(fp) - if "stats" in data and "avg_latency_ms" in data["stats"]: - total_latency += data["stats"]["avg_latency_ms"] + if "avg_latency_ms" in data: + total_latency += data["avg_latency_ms"] count += 1 except: pass @@ -651,4 +670,4 @@ def main(): if __name__ == '__main__': - main() \ No newline at end of file + main() diff --git a/benchmark-scripts/stream_density_latency_oa.py b/benchmark-scripts/stream_density_latency_oa.py index 0b2dcdd..38563fc 100644 --- a/benchmark-scripts/stream_density_latency_oa.py +++ b/benchmark-scripts/stream_density_latency_oa.py @@ -141,7 +141,7 @@ def run(self) -> DensityResult: print(f"Latency Metric: {self.latency_metric}") print(f"Worker Increment: {self.worker_increment}") print(f"Init Duration: {self.init_duration}s") - print(f"Min Transactions: {self.min_transactions}") + print(f"Min Transactions: {self.min_transactions} per worker (scales with worker count)") print("=" * 70) workers = 1 @@ -163,9 +163,10 @@ def run(self) -> DensityResult: # Run benchmark iteration result = self._run_iteration(workers) - # Check if we got enough transactions - if result.total_transactions < self.min_transactions: - print(f"WARNING: Only got {result.total_transactions} transactions (need {self.min_transactions})") + # Check if we got enough transactions (scaled by worker count) + required_transactions = self.min_transactions * workers + if result.total_transactions < required_transactions: + print(f"WARNING: Only got {result.total_transactions} transactions (need {required_transactions} = {self.min_transactions} × {workers} workers)") print("Latency measurement may be unreliable.") self.iterations.append(result) @@ -192,6 +193,12 @@ def run(self) -> DensityResult: print(f" ⚠ NO DATA - No latency measurements collected") print(f" Trying next iteration anyway...") workers += self.worker_increment + elif current_latency < 0: + print(f" ⚠ CORRUPTED METRICS - Negative latency {current_latency:.0f}ms detected") + print(f" This usually means a video-loop ID collision in the metrics file.") + print(f" Discarding this iteration result; trying next...") + self.iterations.pop() # Remove the corrupt entry already appended above + workers += self.worker_increment else: print(f" ✗ FAILED (latency {current_latency/1000:.1f}s > {self.target_latency_ms/1000:.1f}s)") break @@ -241,12 +248,13 @@ def _run_iteration(self, workers: int) -> IterationResult: print(f"Waiting {self.init_duration}s for initialization...") time.sleep(self.init_duration) - # Poll for transactions until we have enough - print(f"Waiting for {self.min_transactions} transactions...") + # Poll for transactions until we have enough (scaled by worker count) + required_transactions = self.min_transactions * workers + print(f"Waiting for {required_transactions} transactions ({self.min_transactions} per worker × {workers} worker(s))...") start_time = time.time() transactions = 0 - while transactions < self.min_transactions: + while transactions < required_transactions: elapsed = time.time() - start_time if elapsed > self.MAX_WAIT_SEC: @@ -257,9 +265,9 @@ def _run_iteration(self, workers: int) -> IterationResult: metrics = self._collect_vlm_logger_metrics() transactions = metrics.get("total_transactions", 0) - if transactions < self.min_transactions: - remaining = self.min_transactions - transactions - print(f" Transactions: {transactions}/{self.min_transactions} " + if transactions < required_transactions: + remaining = required_transactions - transactions + print(f" Transactions: {transactions}/{required_transactions} " f"(waiting for {remaining} more, {elapsed:.0f}s elapsed)") time.sleep(self.POLL_INTERVAL_SEC) @@ -324,6 +332,14 @@ def _collect_vlm_logger_metrics(self) -> Dict: timestamp = int(ts_match.group(1)) if event == "start": + # If this ID already has a completed start+end pair (video loop + # reuse), save it under a unique key before overwriting so the + # latency isn't corrupted by end(loop N) - start(loop N+1). + if unique_id in start_times and unique_id in end_times: + saved_key = f"{unique_id}_run{len([k for k in start_times if k.startswith(unique_id)])}" + start_times[saved_key] = start_times[unique_id] + end_times[saved_key] = end_times[unique_id] + del end_times[unique_id] start_times[unique_id] = timestamp elif event == "end": end_times[unique_id] = timestamp diff --git a/benchmark-scripts/stream_density_oa_dine_in.py b/benchmark-scripts/stream_density_oa_dine_in.py index 7cf63ec..ca51304 100644 --- a/benchmark-scripts/stream_density_oa_dine_in.py +++ b/benchmark-scripts/stream_density_oa_dine_in.py @@ -238,7 +238,30 @@ async def validate_image( if resp.status == 200: result["success"] = True - result["response"] = await resp.json() + response_data = await resp.json() + result["response"] = response_data + # Log validation outcome + accuracy = response_data.get("accuracy_score", 0) + complete = response_data.get("order_complete", False) + matched = response_data.get("matched_items", []) + missing = response_data.get("missing_items", []) + extra = response_data.get("extra_items", []) + mismatches = response_data.get("quantity_mismatches", []) + status_str = "COMPLETE" if complete else "INCOMPLETE" + logger.info( + f"[VALIDATE] {request_id} | {status_str} | " + f"accuracy={accuracy:.2f} | latency={result['latency_ms']:.0f}ms | " + f"matched={len(matched)} missing={len(missing)} " + f"extra={len(extra)} mismatches={len(mismatches)}" + ) + if missing: + missing_names = [item.get('name', item) for item in missing] + logger.info(f"[VALIDATE] {request_id} | MISSING: {missing_names}") + if extra: + extra_names = [item.get('name', item) for item in extra] + logger.info(f"[VALIDATE] {request_id} | EXTRA: {extra_names}") + if mismatches: + logger.info(f"[VALIDATE] {request_id} | QUANTITY MISMATCHES: {mismatches}") else: result["error"] = f"HTTP {resp.status}: {await resp.text()}" @@ -352,6 +375,8 @@ def __init__( init_duration: int = 60, min_requests: int = MIN_REQUESTS_PER_ITERATION, request_timeout: int = 300, + single_run: bool = False, + concurrent_images: int = 1, max_iterations: int = DEFAULT_MAX_ITERATIONS ): self.compose_file = compose_file @@ -363,6 +388,8 @@ def __init__( self.init_duration = init_duration self.min_requests = min_requests self.request_timeout = request_timeout + self.single_run = single_run + self.concurrent_images = concurrent_images self.max_iterations = max_iterations # Resolve paths relative to compose file @@ -382,6 +409,9 @@ def __init__( logger.info(f" Images: {self.images_dir}") logger.info(f" Orders: {self.orders_file}") logger.info(f" Target Latency: {self.target_latency_ms}ms") + logger.info(f" Single Run Mode: {self.single_run}") + if self.single_run: + logger.info(f" Concurrent Images: {self.concurrent_images}") def run(self) -> DineInDensityResult: """ @@ -394,11 +424,13 @@ def run(self) -> DineInDensityResult: """ self._print_header() - density = 1 + # In single_run mode, just run once with specified concurrent_images + density = self.concurrent_images if self.single_run else 1 + max_iter = 1 if self.single_run else self.max_iterations best_result: Optional[DineInIterationResult] = None total_images = 0 - for iteration in range(1, self.max_iterations + 1): + for iteration in range(1, max_iter + 1): print(f"\n{'='*70}") print(f"Iteration {iteration}: Testing density={density} concurrent images") print(f"{'='*70}") @@ -503,6 +535,27 @@ def _run_iteration(self, density: int, iteration: int) -> DineInIterationResult: latencies = [r["latency_ms"] for r in results if r.get("success")] successful = len(latencies) failed = len(results) - successful + + # Log per-request validation summary + for r in results: + if r.get("success") and r.get("response"): + resp = r["response"] + complete = resp.get("order_complete", False) + accuracy = resp.get("accuracy_score", 0) + matched = len(resp.get("matched_items", [])) + missing = len(resp.get("missing_items", [])) + extra = len(resp.get("extra_items", [])) + print( + f" [{r.get('image_id', '?')}] " + f"{'✓ COMPLETE' if complete else '✗ INCOMPLETE'} | " + f"accuracy={accuracy:.2f} | matched={matched} missing={missing} extra={extra} | " + f"latency={r['latency_ms']:.0f}ms" + ) + elif not r.get("success"): + print(f" [{r.get('image_id', '?')}] ✗ FAILED - {r.get('error', 'unknown error')}") + + # Print detailed validation results + self._print_detailed_validation_results(results) # Calculate metrics from HTTP response latencies avg_latency = sum(latencies) / len(latencies) if latencies else 0 @@ -520,7 +573,7 @@ def _run_iteration(self, density: int, iteration: int) -> DineInIterationResult: memory_percent = psutil.virtual_memory().percent cpu_percent = psutil.cpu_percent(interval=0.5) - return DineInIterationResult( + iteration_result = DineInIterationResult( density=density, avg_latency_ms=avg_latency, p95_latency_ms=p95_latency, @@ -535,6 +588,9 @@ def _run_iteration(self, density: int, iteration: int) -> DineInIterationResult: cpu_percent=cpu_percent, timestamp=datetime.now().strftime("%Y-%m-%d %H:%M:%S") ) + # Attach raw results for detailed validation export + iteration_result._raw_results = results + return iteration_result def _get_latency_metric(self, result: DineInIterationResult) -> float: """Get the configured latency metric value.""" @@ -546,6 +602,70 @@ def _get_latency_metric(self, result: DineInIterationResult) -> float: return result.max_latency_ms return result.avg_latency_ms + def _print_detailed_validation_results(self, results: List[Dict]): + """Print detailed validation results with item breakdowns.""" + print("\n" + "=" * 70) + print("VALIDATION DETAILS") + print("=" * 70) + + for r in results: + image_id = r.get("image_id", "unknown") + latency_ms = r.get("latency_ms", 0) + + if not r.get("success"): + print(f"\n🖼️ {image_id} - ❌ Request Failed") + print(f" Error: {r.get('error', 'unknown error')}") + print(f" Latency: {latency_ms:.0f}ms") + continue + + resp = r.get("response", {}) + order_complete = resp.get("order_complete", False) + accuracy = resp.get("accuracy_score", 0.0) + matched_items = resp.get("matched_items", []) + missing_items = resp.get("missing_items", []) + extra_items = resp.get("extra_items", []) + qty_mismatches = resp.get("quantity_mismatches", []) + + complete_icon = "✅" if order_complete else "❌" + complete_label = "Order Complete" if order_complete else "Order Incomplete" + + print(f"\n🖼️ {image_id}") + print(f" {complete_icon} {complete_label}") + print(f" Accuracy: {accuracy * 100:.0f}%") + print(f" Latency: {latency_ms:.0f}ms") + + if matched_items: + print(f" ✔️ Matched Items:") + for item in matched_items: + name = item.get("detected_name") or item.get("expected_name", "?") + qty = item.get("quantity", 1) + sim = item.get("similarity", 0) + print(f" • {name} (×{qty}) - {sim * 100:.0f}% match") + + if missing_items: + print(f" ⚠️ Missing Items:") + for item in missing_items: + name = item.get("name", "?") + qty = item.get("quantity", 1) + print(f" • {name} (×{qty})") + + if extra_items: + print(f" ➕ Extra Items Detected:") + for item in extra_items: + name = item.get("name", "?") + qty = item.get("quantity", 1) + print(f" • {name} (×{qty})") + + if qty_mismatches: + print(f" 🔢 Quantity Mismatches:") + for item in qty_mismatches: + name = item.get("item", "?") + exp = item.get("expected_quantity", "?") + got = item.get("detected_quantity", "?") + print(f" • {name}: expected ×{exp}, detected ×{got}") + + print("\n" + "=" * 70) + def _start_services(self): """Start dine-in services via docker compose.""" print("Starting dine-in services...") @@ -576,7 +696,7 @@ def _wait_for_ready(self): if resp.status == 200: ready = True break - except (urllib.error.URLError, TimeoutError, ConnectionResetError, ConnectionRefusedError, OSError): + except Exception: pass time.sleep(5) @@ -703,6 +823,29 @@ def _export_results(self, result: DineInDensityResult): } if result.best_iteration else None } + # Collect all per-request validation details across all iterations + all_validations = [] + for it in result.iterations: + if hasattr(it, '_raw_results'): + for r in it._raw_results: + if r.get("success") and r.get("response"): + resp = r["response"] + all_validations.append({ + "density": it.density, + "request_id": r.get("request_id"), + "image_id": r.get("image_id"), + "latency_ms": round(r["latency_ms"], 2), + "order_complete": resp.get("order_complete"), + "accuracy_score": resp.get("accuracy_score"), + "matched_items": resp.get("matched_items", []), + "missing_items": resp.get("missing_items", []), + "extra_items": resp.get("extra_items", []), + "quantity_mismatches": resp.get("quantity_mismatches", []), + "metrics": resp.get("metrics") + }) + if all_validations: + result_dict["validation_details"] = all_validations + # Export JSON json_path = self.results_dir / f"dinein_density_results_{timestamp}.json" with open(json_path, 'w') as f: @@ -720,7 +863,163 @@ def _export_results(self, result: DineInDensityResult): f.write(f"{it.successful_requests},{it.failed_requests},{it.passed},") f.write(f"{it.memory_percent:.1f},{it.cpu_percent:.1f}\n") print(f"Summary exported to: {csv_path}") - + + # Export per-request validation details CSV + val_csv_path = self.results_dir / f"dinein_validation_details_{timestamp}.csv" + with open(val_csv_path, 'w') as f: + f.write("density,request_id,image_id,latency_ms,order_complete,accuracy_score," + "matched,missing,extra,mismatches\n") + for it in result.iterations: + raw = getattr(it, '_raw_results', []) + for r in raw: + if r.get("success") and r.get("response"): + resp = r["response"] + f.write( + f"{it.density},{r.get('request_id','')},{r.get('image_id','')}," + f"{r['latency_ms']:.0f},{resp.get('order_complete','')}," + f"{resp.get('accuracy_score', 0):.2f}," + f"{len(resp.get('matched_items',[]))}," + f"{len(resp.get('missing_items',[]))}," + f"{len(resp.get('extra_items',[]))}," + f"{len(resp.get('quantity_mismatches',[]))}\n" + ) + print(f"Validation details exported to: {val_csv_path}") + + # Export Markdown report + self._export_markdown_report(result, timestamp) + + def _export_markdown_report(self, result: DineInDensityResult, timestamp: str): + """Export a human-readable Markdown validation report per iteration and image.""" + md_path = self.results_dir / f"dinein_validation_report_{timestamp}.md" + + with open(md_path, 'w') as f: + f.write(f"# Dine-In Stream Density — Validation Report\n\n") + f.write(f"**Generated:** {datetime.now().strftime('%Y-%m-%d %H:%M:%S')} \n") + f.write(f"**Target Latency:** {result.target_latency_ms:.0f} ms " + f"({result.target_latency_ms / 1000:.1f} s) \n") + f.write(f"**Max Density Achieved:** {result.max_density} concurrent image(s) \n") + f.write(f"**Met Target:** {'✅ Yes' if result.met_target else '❌ No'} \n") + f.write(f"**Total Images Processed:** {result.total_images_processed} \n\n") + f.write("---\n\n") + + for it in result.iterations: + status_icon = "✅" if it.passed else "❌" + f.write(f"## {status_icon} Iteration — Density {it.density} " + f"({'PASS' if it.passed else 'FAIL'})\n\n") + + f.write(f"| Metric | Value |\n") + f.write(f"|--------|-------|\n") + f.write(f"| Concurrent Images | {it.density} |\n") + f.write(f"| Avg Latency | {it.avg_latency_ms:.0f} ms " + f"({it.avg_latency_ms / 1000:.2f} s) |\n") + f.write(f"| P95 Latency | {it.p95_latency_ms:.0f} ms " + f"({it.p95_latency_ms / 1000:.2f} s) |\n") + f.write(f"| Min / Max Latency | {it.min_latency_ms:.0f} ms / " + f"{it.max_latency_ms:.0f} ms |\n") + f.write(f"| Avg TPS | {it.avg_tps:.2f} |\n") + f.write(f"| Requests | {it.successful_requests} succeeded / " + f"{it.failed_requests} failed |\n") + f.write(f"| Memory | {it.memory_percent:.1f}% |\n") + f.write(f"| CPU | {it.cpu_percent:.1f}% |\n") + f.write(f"| Timestamp | {it.timestamp} |\n\n") + + # Per-image validation results + raw = getattr(it, '_raw_results', []) + if not raw: + f.write("_No per-image results available for this iteration._\n\n") + f.write("---\n\n") + continue + + for r in raw: + image_id = r.get("image_id", "unknown") + latency_ms = r.get("latency_ms", 0) + + if not r.get("success"): + f.write(f"### 🖼️ `{image_id}` — ❌ Request Failed\n\n") + f.write(f"> **Error:** {r.get('error', 'unknown error')} \n") + f.write(f"> **Latency:** {latency_ms:.0f} ms\n\n") + continue + + resp = r.get("response", {}) + order_complete = resp.get("order_complete", False) + accuracy = resp.get("accuracy_score", 0.0) + matched_items = resp.get("matched_items", []) + missing_items = resp.get("missing_items", []) + extra_items = resp.get("extra_items", []) + qty_mismatches = resp.get("quantity_mismatches", []) + metrics = resp.get("metrics") or {} + + complete_icon = "✅" if order_complete else "❌" + complete_label = "Order Complete" if order_complete else "Order Incomplete" + + f.write(f"### 🖼️ `{image_id}`\n\n") + f.write(f"**✅ Validation Result**\n\n") + f.write(f"{complete_icon} **{complete_label}** \n") + f.write(f"**Accuracy:** {accuracy * 100:.0f}% \n") + f.write(f"**Latency:** {latency_ms:.0f} ms \n\n") + + if matched_items: + f.write(f"**✔️ Matched Items**\n\n") + for item in matched_items: + name = item.get("detected_name") or item.get("expected_name", "?") + qty = item.get("quantity", 1) + sim = item.get("similarity", 0) + f.write(f"- {name} (×{qty}) _{sim * 100:.0f}% match_\n") + f.write("\n") + + if missing_items: + f.write(f"**⚠️ Missing Items**\n\n") + for item in missing_items: + name = item.get("name", "?") + qty = item.get("quantity", 1) + f.write(f"- {name} (×{qty})\n") + f.write("\n") + + if extra_items: + f.write(f"**➕ Extra Items Detected**\n\n") + for item in extra_items: + name = item.get("name", "?") + qty = item.get("quantity", 1) + f.write(f"- {name} (×{qty})\n") + f.write("\n") + + if qty_mismatches: + f.write(f"**🔢 Quantity Mismatches**\n\n") + for item in qty_mismatches: + name = item.get("item", "?") + exp = item.get("expected_quantity", "?") + got = item.get("detected_quantity", "?") + f.write(f"- {name}: expected ×{exp}, detected ×{got}\n") + f.write("\n") + + if metrics: + f.write(f"**📊 Performance Metrics**\n\n") + f.write(f"| Metric | Value |\n") + f.write(f"|--------|-------|\n") + for k, v in metrics.items(): + f.write(f"| {k} | {v} |\n") + f.write("\n") + + f.write("---\n\n") + + # Final summary section + f.write("## 📋 Summary Table\n\n") + f.write("| Density | Avg Latency | P95 Latency | TPS | Succeeded | Failed | Status |\n") + f.write("|---------|-------------|-------------|-----|-----------|--------|--------|\n") + for it in result.iterations: + status = "✅ PASS" if it.passed else "❌ FAIL" + f.write( + f"| {it.density} " + f"| {it.avg_latency_ms:.0f} ms ({it.avg_latency_ms / 1000:.2f}s) " + f"| {it.p95_latency_ms:.0f} ms ({it.p95_latency_ms / 1000:.2f}s) " + f"| {it.avg_tps:.2f} " + f"| {it.successful_requests} " + f"| {it.failed_requests} " + f"| {status} |\n" + ) + + print(f"Markdown report exported to: {md_path}") + def _print_summary(self, result: DineInDensityResult): """Print final summary.""" print("\n" + "=" * 70) @@ -861,11 +1160,22 @@ def main(): default=env_results_dir, help=f"Directory for results output (default: {env_results_dir}, env: RESULTS_DIR)" ) + parser.add_argument( + "--single_run", + action="store_true", + help="Run a single benchmark iteration without density scaling (simple benchmark mode)" + ) + parser.add_argument( + "--concurrent_images", + type=int, + default=1, + help="Number of concurrent images for single_run mode (default: 1)" + ) parser.add_argument( "--max_iterations", type=int, default=50, - help="Maximum number of density iterations to run (default: 50)" + help="Maximum iterations for density scaling (default: 50)" ) args = parser.parse_args() @@ -880,7 +1190,9 @@ def main(): print(f" REQUEST_TIMEOUT: {args.request_timeout}") print(f" API_ENDPOINT: {args.api_endpoint}") print(f" RESULTS_DIR: {args.results_dir}") - print(f" MAX_ITERATIONS: {args.max_iterations}") + print(f" SINGLE_RUN: {args.single_run}") + if args.single_run: + print(f" CONCURRENT_IMAGES: {args.concurrent_images}") print() # Validate compose file exists @@ -902,6 +1214,8 @@ def main(): init_duration=args.init_duration, min_requests=args.min_requests, request_timeout=args.request_timeout, + single_run=args.single_run, + concurrent_images=args.concurrent_images, max_iterations=args.max_iterations )