@@ -162,9 +162,7 @@ async def detect_deletes(
162162 else :
163163 live_pks = live_result # type: ignore[assignment]
164164
165- deleted_rows = await self ._store .find_deleted_pks (
166- self ._source_url , schema .name , live_pks
167- )
165+ deleted_rows = await self ._store .find_deleted_pks (self ._source_url , schema .name , live_pks )
168166 if not deleted_rows :
169167 return 0
170168
@@ -274,14 +272,10 @@ async def sync_table(
274272 row_is_new .append (False )
275273 continue
276274 pk_str = canonical_pk (pk_val )
277- existing = await self ._store .get_node_id (
278- self ._source_url , schema .name , pk_str
279- )
275+ existing = await self ._store .get_node_id (self ._source_url , schema .name , pk_str )
280276 row_is_new .append (existing is None )
281277 if existing is not None and fk_map :
282- snapshot = await self ._store .get_fk_edges (
283- self ._source_url , schema .name , pk_str
284- )
278+ snapshot = await self ._store .get_fk_edges (self ._source_url , schema .name , pk_str )
285279 if snapshot :
286280 prior_fks [pk_str ] = snapshot
287281
@@ -335,11 +329,7 @@ async def sync_table(
335329 node_id = deterministic_row_id (self ._source_url , schema .name , pk_val )
336330 fk_snapshot : dict [str , str ] | None = None
337331 if fk_map :
338- fk_snapshot = {
339- col : str (row [col ])
340- for col in fk_map
341- if row .get (col ) is not None
342- }
332+ fk_snapshot = {col : str (row [col ]) for col in fk_map if row .get (col ) is not None }
343333 pk_batch .append ((canonical_pk (pk_val ), node_id , None , fk_snapshot ))
344334
345335 change_val = row .get (change_col )
@@ -384,9 +374,7 @@ async def _prune_stale_fk_edges(
384374 rows : list [dict [str , Any ]],
385375 prior_fks : dict [str , dict [str , str ]],
386376 ) -> int :
387- return await _prune_stale_fk_edges (
388- graph , schema , fk_map , rows , prior_fks , self ._source_url
389- )
377+ return await _prune_stale_fk_edges (graph , schema , fk_map , rows , prior_fks , self ._source_url )
390378
391379
392380async def _prune_stale_fk_edges (
@@ -429,19 +417,14 @@ async def _prune_stale_fk_edges(
429417 if target_table is None :
430418 continue
431419
432- old_target_node = deterministic_row_id (
433- source_url , target_table , old_target_pk
434- )
420+ old_target_node = deterministic_row_id (source_url , target_table , old_target_pk )
435421
436422 if source_node not in edge_cache :
437423 edge_cache [source_node ] = await graph .backend .get_edges (
438424 source_node , direction = "outgoing"
439425 )
440426 for edge in edge_cache [source_node ]:
441- if (
442- edge .target_id == old_target_node
443- and edge .kind == EdgeKind .RELATED
444- ):
427+ if edge .target_id == old_target_node and edge .kind == EdgeKind .RELATED :
445428 await graph .backend .delete_edge (edge .id )
446429 removed += 1
447430 edge_cache [source_node ] = [
@@ -506,9 +489,7 @@ async def sync_table(
506489 # Hash mode always does a full read — there is no watermark
507490 # to filter on. Pass `where_clause=None` so the SQLite
508491 # reader still applies the LIMIT but skips the WHERE.
509- reader_result = row_reader (
510- schema .name , where_clause = None , where_params = ()
511- )
492+ reader_result = row_reader (schema .name , where_clause = None , where_params = ())
512493 if hasattr (reader_result , "__await__" ):
513494 rows = await reader_result # type: ignore[misc]
514495 else :
@@ -535,12 +516,8 @@ async def sync_table(
535516 new_hash = row_hash (row )
536517 new_hashes [pk_str ] = new_hash
537518
538- prior_hash = await self ._store .get_row_hash (
539- self ._source_url , schema .name , pk_str
540- )
541- existing_node = await self ._store .get_node_id (
542- self ._source_url , schema .name , pk_str
543- )
519+ prior_hash = await self ._store .get_row_hash (self ._source_url , schema .name , pk_str )
520+ existing_node = await self ._store .get_node_id (self ._source_url , schema .name , pk_str )
544521
545522 if existing_node is None :
546523 stats .added += 1
@@ -549,19 +526,15 @@ async def sync_table(
549526 stats .updated += 1
550527 to_ingest .append (row )
551528 if fk_map :
552- snapshot = await self ._store .get_fk_edges (
553- self ._source_url , schema .name , pk_str
554- )
529+ snapshot = await self ._store .get_fk_edges (self ._source_url , schema .name , pk_str )
555530 if snapshot :
556531 prior_fks [pk_str ] = snapshot
557532 # else: unchanged — skip
558533
559534 if not to_ingest :
560535 # Nothing changed; no state advance needed beyond the
561536 # last_sync_at heartbeat for monitoring.
562- prior_state = await self ._store .load_state (
563- self ._source_url , schema .name
564- )
537+ prior_state = await self ._store .load_state (self ._source_url , schema .name )
565538 new_state = TableSyncState (
566539 source_url = self ._source_url ,
567540 table_name = schema .name ,
@@ -611,27 +584,15 @@ async def sync_table(
611584 if pk_val is None :
612585 continue
613586 pk_str = canonical_pk (pk_val )
614- node_id = deterministic_row_id (
615- self ._source_url , schema .name , pk_val
616- )
587+ node_id = deterministic_row_id (self ._source_url , schema .name , pk_val )
617588 fk_snapshot : dict [str , str ] | None = None
618589 if fk_map :
619- fk_snapshot = {
620- col : str (row [col ])
621- for col in fk_map
622- if row .get (col ) is not None
623- }
624- pk_batch .append (
625- (pk_str , node_id , new_hashes .get (pk_str ), fk_snapshot )
626- )
590+ fk_snapshot = {col : str (row [col ]) for col in fk_map if row .get (col ) is not None }
591+ pk_batch .append ((pk_str , node_id , new_hashes .get (pk_str ), fk_snapshot ))
627592
628- await self ._store .upsert_pk_batch (
629- self ._source_url , schema .name , pk_batch
630- )
593+ await self ._store .upsert_pk_batch (self ._source_url , schema .name , pk_batch )
631594
632- prior_state = await self ._store .load_state (
633- self ._source_url , schema .name
634- )
595+ prior_state = await self ._store .load_state (self ._source_url , schema .name )
635596 row_count = (prior_state .row_count if prior_state else 0 ) + stats .added
636597 new_state = TableSyncState (
637598 source_url = self ._source_url ,
0 commit comments