From a0c4b311bc76e7bbba9b3a545f8c043b669dab27 Mon Sep 17 00:00:00 2001 From: dmytroprokopenko-techmagic Date: Tue, 17 Mar 2026 10:43:44 +0200 Subject: [PATCH 1/3] feat: TM-175 - update apply correspondences to run on separate thread, add semophore, add cancel token --- web_api/app/alignment.py | 44 ++++++++++++++++++++++++++++++++-------- 1 file changed, 36 insertions(+), 8 deletions(-) diff --git a/web_api/app/alignment.py b/web_api/app/alignment.py index 1c368018a..9f5b4f3f1 100644 --- a/web_api/app/alignment.py +++ b/web_api/app/alignment.py @@ -1,9 +1,11 @@ # pylint: disable=import-error +import asyncio import base64 import gzip import io import json import struct +import threading import numpy as np import torch @@ -20,6 +22,9 @@ api = FastAPI() +# Limits concurrent GPU computations to 1; additional requests queue asynchronously. +_gpu_semaphore = asyncio.Semaphore(1) + @api.exception_handler(Exception) async def generic_handler(request: Request, exc: Exception): @@ -376,14 +381,37 @@ async def apply_correspondences(request: Request): ) print(f"[apply_correspondences] params: {params}") - relaxed_field, warped_image = apply_correspondences_to_image( - correspondences_dict=correspondences_dict, - image=image_tensor, - src_mask=src_mask_tensor, - tgt_mask=tgt_mask_tensor, - tgt_image=tgt_image_tensor, - **params, - ) + cancel_event = threading.Event() + + async with _gpu_semaphore: + if await request.is_disconnected(): + print("[apply_correspondences] Client disconnected while waiting in queue") + return Response(status_code=499) + + compute_task = asyncio.get_event_loop().run_in_executor( + None, + lambda: apply_correspondences_to_image( + correspondences_dict=correspondences_dict, + image=image_tensor, + src_mask=src_mask_tensor, + tgt_mask=tgt_mask_tensor, + tgt_image=tgt_image_tensor, + cancel_event=cancel_event, + **params, + ), + ) + + # Poll for client disconnect while computation runs in thread pool. + while not compute_task.done(): + if await request.is_disconnected(): + print("[apply_correspondences] Client disconnected, cancelling computation") + cancel_event.set() + # Wait for the thread to finish (it will exit early on next iteration check) + await compute_task + return Response(status_code=499) + await asyncio.sleep(0.5) + + relaxed_field, warped_image = await compute_task relaxed_field_np = relaxed_field.cpu().numpy().astype(np.float32) warped_image_np = warped_image.cpu().numpy().astype(np.float32) From d92ee95f4d8dcfbdb24b8b3f6d81464574e65372 Mon Sep 17 00:00:00 2001 From: dmytroprokopenko-techmagic Date: Tue, 17 Mar 2026 10:43:56 +0200 Subject: [PATCH 2/3] feat: TM-175 - update apply correspondences to run on separate thread, add semophore, add cancel token --- zetta_utils/internal | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/zetta_utils/internal b/zetta_utils/internal index 169abd527..ed05e204a 160000 --- a/zetta_utils/internal +++ b/zetta_utils/internal @@ -1 +1 @@ -Subproject commit 169abd52771d8a8a58c06d3adc30847d5d45067b +Subproject commit ed05e204aa75a10b4113fc09cf605b382759f253 From bf792168cb20641ead4cbe88527fe0eaa8b64f79 Mon Sep 17 00:00:00 2001 From: dmytroprokopenko-techmagic Date: Wed, 18 Mar 2026 09:29:09 +0200 Subject: [PATCH 3/3] feat: update submodule --- zetta_utils/internal | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/zetta_utils/internal b/zetta_utils/internal index 6e0cde6f4..bb0e1032c 160000 --- a/zetta_utils/internal +++ b/zetta_utils/internal @@ -1 +1 @@ -Subproject commit 6e0cde6f45413e084df2f437bde4ce17845d612b +Subproject commit bb0e1032ce81467db17d786d7fcefbeb41434cc5