Skip to content

Commit be8d685

Browse files
author
ACI Bot
committed
improve: qdrant upsert fallback
1 parent 3b31917 commit be8d685

File tree

2 files changed

+152
-39
lines changed

2 files changed

+152
-39
lines changed

src/aci/infrastructure/vector_store/qdrant.py

Lines changed: 110 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,9 @@ def __init__(
3131
vector_size: int = 1536,
3232
api_key: str | None = None,
3333
url: str | None = None,
34+
timeout_seconds: float = 60.0,
35+
write_retry_attempts: int = 3,
36+
write_retry_backoff_seconds: float = 0.5,
3437
):
3538
url = (url or "").strip()
3639
if not url and host.startswith(("http://", "https://")):
@@ -41,14 +44,20 @@ def __init__(
4144
self._collection_name = collection_name
4245
self._vector_size = vector_size
4346
self._api_key = api_key or None
47+
self._timeout_seconds = timeout_seconds
48+
self._write_retry_attempts = max(1, write_retry_attempts)
49+
self._write_retry_backoff_seconds = max(0.0, write_retry_backoff_seconds)
4450
self._client: AsyncQdrantClient | None = None
4551
self._initialized_collections: set[str] = set()
4652
self._init_locks: dict[str, asyncio.Lock] = {}
4753

4854
async def _get_client(self) -> AsyncQdrantClient:
4955
"""Get or create the Qdrant client."""
5056
if self._client is None:
51-
client_kwargs: dict = {"api_key": self._api_key}
57+
client_kwargs: dict = {
58+
"api_key": self._api_key,
59+
"timeout": self._timeout_seconds,
60+
}
5261
if self._url:
5362
client_kwargs["url"] = self._url
5463
else:
@@ -57,6 +66,33 @@ async def _get_client(self) -> AsyncQdrantClient:
5766
self._client = AsyncQdrantClient(**client_kwargs)
5867
return self._client
5968

69+
def _format_exception_chain(self, exc: Exception) -> str:
70+
"""Return a readable error string including nested causes."""
71+
parts: list[str] = []
72+
current: Exception | None = exc
73+
depth = 0
74+
while current is not None and depth < 5:
75+
message = str(current).strip()
76+
if not message:
77+
message = repr(current)
78+
parts.append(f"{type(current).__name__}: {message}")
79+
current = current.__cause__
80+
depth += 1
81+
return " <- ".join(parts)
82+
83+
def _is_retryable_exception(self, exc: Exception) -> bool:
84+
"""Return True for transient timeout-style failures."""
85+
current: Exception | None = exc
86+
depth = 0
87+
while current is not None and depth < 5:
88+
name = type(current).__name__.lower()
89+
message = str(current).lower()
90+
if "timeout" in name or "timed out" in message or "timeout" in message:
91+
return True
92+
current = current.__cause__
93+
depth += 1
94+
return False
95+
6096
def set_collection(self, collection_name: str) -> None:
6197
"""Switch to a different collection."""
6298
self._collection_name = collection_name
@@ -143,15 +179,38 @@ async def upsert(
143179
if "artifact_type" not in payload:
144180
payload = {**payload, "artifact_type": "chunk"}
145181

146-
try:
147-
await client.upsert(
148-
collection_name=target_collection,
149-
points=[
150-
models.PointStruct(id=chunk_id, vector=vector, payload=payload)
151-
],
152-
)
153-
except Exception as e:
154-
raise VectorStoreError(f"Failed to upsert vector: {e}") from e
182+
last_error: Exception | None = None
183+
for attempt in range(1, self._write_retry_attempts + 1):
184+
try:
185+
await client.upsert(
186+
collection_name=target_collection,
187+
points=[
188+
models.PointStruct(id=chunk_id, vector=vector, payload=payload)
189+
],
190+
)
191+
return
192+
except Exception as e:
193+
last_error = e
194+
if attempt < self._write_retry_attempts and self._is_retryable_exception(e):
195+
delay = self._write_retry_backoff_seconds * (2 ** (attempt - 1))
196+
logger.warning(
197+
"Qdrant upsert timeout (attempt %s/%s) for chunk '%s' in collection '%s'; retrying in %.2fs",
198+
attempt,
199+
self._write_retry_attempts,
200+
chunk_id,
201+
target_collection,
202+
delay,
203+
)
204+
if delay > 0:
205+
await asyncio.sleep(delay)
206+
continue
207+
break
208+
209+
details = self._format_exception_chain(last_error or Exception("unknown error"))
210+
raise VectorStoreError(
211+
f"Failed to upsert vector in collection '{target_collection}' after "
212+
f"{self._write_retry_attempts} attempt(s): {details}"
213+
) from last_error
155214

156215
async def upsert_batch(
157216
self,
@@ -163,24 +222,48 @@ async def upsert_batch(
163222
await self.initialize(target_collection)
164223
client = await self._get_client()
165224

166-
try:
167-
qdrant_points = [
168-
models.PointStruct(
169-
id=chunk_id,
170-
vector=vector,
171-
payload=(
172-
payload if "artifact_type" in payload
173-
else {**payload, "artifact_type": "chunk"}
174-
),
175-
)
176-
for chunk_id, vector, payload in points
177-
]
178-
await client.upsert(
179-
collection_name=target_collection,
180-
points=qdrant_points,
225+
qdrant_points = [
226+
models.PointStruct(
227+
id=chunk_id,
228+
vector=vector,
229+
payload=(
230+
payload if "artifact_type" in payload
231+
else {**payload, "artifact_type": "chunk"}
232+
),
181233
)
182-
except Exception as e:
183-
raise VectorStoreError(f"Failed to batch upsert vectors: {e}") from e
234+
for chunk_id, vector, payload in points
235+
]
236+
237+
last_error: Exception | None = None
238+
for attempt in range(1, self._write_retry_attempts + 1):
239+
try:
240+
await client.upsert(
241+
collection_name=target_collection,
242+
points=qdrant_points,
243+
)
244+
return
245+
except Exception as e:
246+
last_error = e
247+
if attempt < self._write_retry_attempts and self._is_retryable_exception(e):
248+
delay = self._write_retry_backoff_seconds * (2 ** (attempt - 1))
249+
logger.warning(
250+
"Qdrant batch upsert timeout (attempt %s/%s) for %s point(s) in collection '%s'; retrying in %.2fs",
251+
attempt,
252+
self._write_retry_attempts,
253+
len(points),
254+
target_collection,
255+
delay,
256+
)
257+
if delay > 0:
258+
await asyncio.sleep(delay)
259+
continue
260+
break
261+
262+
details = self._format_exception_chain(last_error or Exception("unknown error"))
263+
raise VectorStoreError(
264+
f"Failed to batch upsert {len(points)} vector(s) in collection "
265+
f"'{target_collection}' after {self._write_retry_attempts} attempt(s): {details}"
266+
) from last_error
184267

185268
async def delete_by_file(self, file_path: str, collection_name: str | None = None) -> int:
186269
"""Delete all vectors for a file, return count deleted."""

src/aci/services/indexing_service.py

Lines changed: 42 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -723,6 +723,7 @@ async def _embed_and_store_chunks(
723723

724724
# Collect file info from this batch before writing vectors
725725
batch_file_infos: dict[str, dict] = {}
726+
batch_points: list[tuple[str, list[float], dict]] = []
726727

727728
# Time Qdrant upsert operations
728729
qdrant_start = time.time()
@@ -739,17 +740,17 @@ async def _embed_and_store_chunks(
739740
"artifact_type": ArtifactType.CHUNK.value,
740741
**chunk.metadata,
741742
}
742-
await self._vector_store.upsert(
743-
chunk.chunk_id,
744-
embedding,
745-
payload,
746-
collection_name=collection_name,
747-
)
743+
batch_points.append((chunk.chunk_id, embedding, payload))
748744

749745
file_path = pending_info["file_path"] if pending_info else None
750746
if file_path and file_path not in persisted_files:
751747
batch_file_infos[file_path] = pending_info
752748

749+
await self._upsert_points(
750+
batch_points,
751+
collection_name=collection_name,
752+
)
753+
753754
qdrant_duration_ms = (time.time() - qdrant_start) * 1000
754755

755756
# Log Qdrant operation with structured fields
@@ -818,6 +819,7 @@ async def _embed_and_store_chunks(
818819
actual=len(embeddings),
819820
)
820821

822+
summary_points: list[tuple[str, list[float], dict]] = []
821823
for summary, embedding in zip(batch, embeddings, strict=False):
822824
payload = {
823825
"file_path": summary.file_path,
@@ -828,19 +830,47 @@ async def _embed_and_store_chunks(
828830
"name": summary.name,
829831
**summary.metadata,
830832
}
831-
await self._vector_store.upsert(
832-
summary.artifact_id,
833-
embedding,
834-
payload,
835-
collection_name=collection_name,
836-
)
833+
summary_points.append((summary.artifact_id, embedding, payload))
834+
835+
await self._upsert_points(
836+
summary_points,
837+
collection_name=collection_name,
838+
)
837839

838840
self._report_progress(
839841
total_chunks + min(i + self._batch_size, total_summaries),
840842
total_items,
841843
f"Embedded {min(i + self._batch_size, total_summaries)} summaries",
842844
)
843845

846+
async def _upsert_points(
847+
self,
848+
points: list[tuple[str, list[float], dict]],
849+
*,
850+
collection_name: str | None,
851+
) -> None:
852+
"""
853+
Upsert points with best-effort batching.
854+
855+
Uses vector-store batch upsert when available, and falls back to
856+
per-point upsert for compatibility with lightweight test doubles.
857+
"""
858+
if not points:
859+
return
860+
861+
upsert_batch = getattr(self._vector_store, "upsert_batch", None)
862+
if callable(upsert_batch):
863+
await upsert_batch(points, collection_name=collection_name)
864+
return
865+
866+
for chunk_id, embedding, payload in points:
867+
await self._vector_store.upsert(
868+
chunk_id,
869+
embedding,
870+
payload,
871+
collection_name=collection_name,
872+
)
873+
844874
async def update_incremental(
845875
self, root_path: Path, *, max_workers: int | None = None
846876
) -> IndexingResult:

0 commit comments

Comments
 (0)