Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
69 changes: 44 additions & 25 deletions benchmark-scripts/benchmark_order_accuracy.py
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@ def run_fixed_workers(
workers: Number of concurrent station workers
init_duration: Warmup duration in seconds
duration: Benchmark duration in seconds (ignored if iterations > 0)
profile: Docker compose profile to use (parallel, benchmark, worker)
profile: Docker compose profile to use (parallel for take-away, benchmark for dine-in)
iterations: Number of iterations per worker (0 = use duration-based)

Returns:
Expand Down Expand Up @@ -228,38 +228,57 @@ def _collect_worker_results(self) -> Dict:
"successful": 0,
"failed": 0,
"avg_latency_ms": 0.0,
"results_files": []
"results_files": [],
"details": []
}

# Look for worker result files
patterns = ["worker_*.json", "results_*.json", "*results*.json"]

for pattern in patterns:
for f in glob.glob(os.path.join(self.results_dir, pattern)):
try:
with open(f, 'r') as fp:
data = json.load(fp)
worker_results["results_files"].append(f)

# Aggregate stats if present
if "stats" in data:
stats = data["stats"]
worker_results["total_iterations"] += stats.get("total_iterations", 0)
worker_results["successful"] += stats.get("successful_iterations", 0)
worker_results["failed"] += stats.get("failed_iterations", 0)
except (json.JSONDecodeError, IOError) as e:
print(f"Warning: Could not read results from {f}: {e}")

# Calculate average latency
# Look for worker result files (only worker_*.json pattern to avoid duplicates)
seen_files = set()
for f in glob.glob(os.path.join(self.results_dir, "worker_*.json")):
if f in seen_files:
continue
seen_files.add(f)
try:
with open(f, 'r') as fp:
data = json.load(fp)
worker_results["results_files"].append(f)

# Worker results have stats at root level, not in "stats" key
worker_results["total_iterations"] += data.get("total_iterations", 0)
worker_results["successful"] += data.get("successful_iterations", 0)
worker_results["failed"] += data.get("failed_iterations", 0)

# Collect detailed results
if "results" in data:
for result in data["results"]:
worker_results["details"].append({
"worker_id": result.get("worker_id"),
"order_id": result.get("order_id"),
"success": result.get("success"),
"order_complete": result.get("order_complete"),
"accuracy_score": result.get("accuracy_score"),
"items_detected": result.get("items_detected"),
"items_expected": result.get("items_expected"),
"missing_items": result.get("missing_items", 0),
"extra_items": result.get("extra_items", 0),
"missing_items_list": result.get("missing_items_list", []),
"extra_items_list": result.get("extra_items_list", []),
"total_latency_ms": result.get("total_latency_ms"),
"tps": result.get("tps")
})
except (json.JSONDecodeError, IOError) as e:
print(f"Warning: Could not read results from {f}: {e}")

# Calculate average latency from root level avg_latency_ms
if worker_results["successful"] > 0:
total_latency = 0
count = 0
for f in worker_results["results_files"]:
try:
with open(f, 'r') as fp:
data = json.load(fp)
if "stats" in data and "avg_latency_ms" in data["stats"]:
total_latency += data["stats"]["avg_latency_ms"]
if "avg_latency_ms" in data:
total_latency += data["avg_latency_ms"]
count += 1
except:
pass
Expand Down Expand Up @@ -651,4 +670,4 @@ def main():


if __name__ == '__main__':
main()
main()
36 changes: 26 additions & 10 deletions benchmark-scripts/stream_density_latency_oa.py
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,7 @@ def run(self) -> DensityResult:
print(f"Latency Metric: {self.latency_metric}")
print(f"Worker Increment: {self.worker_increment}")
print(f"Init Duration: {self.init_duration}s")
print(f"Min Transactions: {self.min_transactions}")
print(f"Min Transactions: {self.min_transactions} per worker (scales with worker count)")
print("=" * 70)

workers = 1
Expand All @@ -163,9 +163,10 @@ def run(self) -> DensityResult:
# Run benchmark iteration
result = self._run_iteration(workers)

# Check if we got enough transactions
if result.total_transactions < self.min_transactions:
print(f"WARNING: Only got {result.total_transactions} transactions (need {self.min_transactions})")
# Check if we got enough transactions (scaled by worker count)
required_transactions = self.min_transactions * workers
if result.total_transactions < required_transactions:
print(f"WARNING: Only got {result.total_transactions} transactions (need {required_transactions} = {self.min_transactions} × {workers} workers)")
print("Latency measurement may be unreliable.")

self.iterations.append(result)
Expand All @@ -192,6 +193,12 @@ def run(self) -> DensityResult:
print(f" ⚠ NO DATA - No latency measurements collected")
print(f" Trying next iteration anyway...")
workers += self.worker_increment
elif current_latency < 0:
print(f" ⚠ CORRUPTED METRICS - Negative latency {current_latency:.0f}ms detected")
print(f" This usually means a video-loop ID collision in the metrics file.")
print(f" Discarding this iteration result; trying next...")
self.iterations.pop() # Remove the corrupt entry already appended above
workers += self.worker_increment
else:
print(f" ✗ FAILED (latency {current_latency/1000:.1f}s > {self.target_latency_ms/1000:.1f}s)")
break
Expand Down Expand Up @@ -241,12 +248,13 @@ def _run_iteration(self, workers: int) -> IterationResult:
print(f"Waiting {self.init_duration}s for initialization...")
time.sleep(self.init_duration)

# Poll for transactions until we have enough
print(f"Waiting for {self.min_transactions} transactions...")
# Poll for transactions until we have enough (scaled by worker count)
required_transactions = self.min_transactions * workers
print(f"Waiting for {required_transactions} transactions ({self.min_transactions} per worker × {workers} worker(s))...")
start_time = time.time()
transactions = 0

while transactions < self.min_transactions:
while transactions < required_transactions:
elapsed = time.time() - start_time

if elapsed > self.MAX_WAIT_SEC:
Expand All @@ -257,9 +265,9 @@ def _run_iteration(self, workers: int) -> IterationResult:
metrics = self._collect_vlm_logger_metrics()
transactions = metrics.get("total_transactions", 0)

if transactions < self.min_transactions:
remaining = self.min_transactions - transactions
print(f" Transactions: {transactions}/{self.min_transactions} "
if transactions < required_transactions:
remaining = required_transactions - transactions
print(f" Transactions: {transactions}/{required_transactions} "
f"(waiting for {remaining} more, {elapsed:.0f}s elapsed)")
time.sleep(self.POLL_INTERVAL_SEC)

Expand Down Expand Up @@ -324,6 +332,14 @@ def _collect_vlm_logger_metrics(self) -> Dict:
timestamp = int(ts_match.group(1))

if event == "start":
# If this ID already has a completed start+end pair (video loop
# reuse), save it under a unique key before overwriting so the
# latency isn't corrupted by end(loop N) - start(loop N+1).
if unique_id in start_times and unique_id in end_times:
saved_key = f"{unique_id}_run{len([k for k in start_times if k.startswith(unique_id)])}"
start_times[saved_key] = start_times[unique_id]
end_times[saved_key] = end_times[unique_id]
del end_times[unique_id]
start_times[unique_id] = timestamp
elif event == "end":
end_times[unique_id] = timestamp
Expand Down
Loading
Loading