Skip to content

Commit 5b57e17

Browse files
committed
fix: async row_reader in _ingest_schemas + X2BEE DB validation
Fixed coroutine handling: row_reader returns coroutine for async DBs (PostgreSQL), needs await. Used asyncio.iscoroutine() check. Validated on real X2BEE production DB (AWS RDS PostgreSQL): ai_lab_main: 6 tables, 19,843 rows → 19,843 nodes in 36.7s - pr_goods_base: 1,008 products - pr_goods_sold_hist: 3,370 sales (FK → products) - pr_goods_user_feedback: 15,374 reviews (FK → products) Batch processing: 15K feedback rows split into 10K + 5K batches Search: "Wine Niagara" → instant hit ✅
1 parent 1f85899 commit 5b57e17

File tree

1 file changed

+4
-2
lines changed

1 file changed

+4
-2
lines changed

src/synaptic/extensions/db_ingester.py

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -695,7 +695,8 @@ async def _ingest_schemas(
695695
has_fk = [s for s in regular if s.foreign_keys]
696696

697697
for schema in no_fk + has_fk:
698-
rows = await row_reader(schema.name) if asyncio.iscoroutinefunction(row_reader) else row_reader(schema.name)
698+
result = row_reader(schema.name)
699+
rows = await result if asyncio.iscoroutine(result) else result
699700
if not rows:
700701
continue
701702

@@ -721,7 +722,8 @@ async def _ingest_schemas(
721722

722723
# M:N join tables → direct edges (no intermediate nodes)
723724
for schema in join_tables:
724-
rows = await row_reader(schema.name) if asyncio.iscoroutinefunction(row_reader) else row_reader(schema.name)
725+
result = row_reader(schema.name)
726+
rows = await result if asyncio.iscoroutine(result) else result
725727
if not rows:
726728
continue
727729

0 commit comments

Comments
 (0)