From a48cb4fd3513715cd8912a7579e2d248487c65d2 Mon Sep 17 00:00:00 2001 From: Kostas Dermentzis Date: Thu, 19 Feb 2026 14:02:40 +0200 Subject: [PATCH 1/7] Rename *Piped functions to *Chunked --- .../src/Cardano/DbSync/Api/Ledger.hs | 2 +- .../src/Cardano/DbSync/Era/Byron/Insert.hs | 2 +- .../src/Cardano/DbSync/Era/Universal/Epoch.hs | 12 +++---- .../DbSync/Era/Universal/Insert/GovAction.hs | 6 ++-- .../DbSync/Era/Universal/Insert/Grouped.hs | 36 +++++++++---------- cardano-db/src/Cardano/Db/Statement/Base.hs | 8 ++--- .../src/Cardano/Db/Statement/ConsumedTxOut.hs | 8 ++--- .../Db/Statement/GovernanceAndVoting.hs | 4 +-- .../src/Cardano/Db/Statement/MultiAsset.hs | 4 +-- .../Cardano/Db/Statement/StakeDelegation.hs | 12 +++---- .../Cardano/Db/Statement/Variants/TxOut.hs | 12 +++---- 11 files changed, 53 insertions(+), 53 deletions(-) diff --git a/cardano-db-sync/src/Cardano/DbSync/Api/Ledger.hs b/cardano-db-sync/src/Cardano/DbSync/Api/Ledger.hs index 244ea3593..1fd12259c 100644 --- a/cardano-db-sync/src/Cardano/DbSync/Api/Ledger.hs +++ b/cardano-db-sync/src/Cardano/DbSync/Api/Ledger.hs @@ -134,7 +134,7 @@ storePage syncEnv blkEra percQuantum (n, ls) = do txOuts <- mapM (prepareTxOut syncEnv blkEra) ls txOutIds <- lift $ DB.insertBulkTxOut False $ etoTxOut . fst <$> txOuts let maTxOuts = concatMap (mkmaTxOuts txOutVariantType) $ zip txOutIds (snd <$> txOuts) - void . lift $ DB.insertBulkMaTxOutPiped [maTxOuts] + void . lift $ DB.insertBulkMaTxOutChunked [maTxOuts] where txOutVariantType = getTxOutVariantType syncEnv trce = getTrace syncEnv diff --git a/cardano-db-sync/src/Cardano/DbSync/Era/Byron/Insert.hs b/cardano-db-sync/src/Cardano/DbSync/Era/Byron/Insert.hs index 873383130..6770c7dd7 100644 --- a/cardano-db-sync/src/Cardano/DbSync/Era/Byron/Insert.hs +++ b/cardano-db-sync/src/Cardano/DbSync/Era/Byron/Insert.hs @@ -314,7 +314,7 @@ insertByronTx' syncEnv blkId tx blockIndex = do -- Update consumed TxOut records if enabled whenConsumeOrPruneTxOut syncEnv $ lift $ - DB.updateListTxOutConsumedByTxIdBP [prepUpdate txId <$> resolvedInputs] + DB.updateListTxOutConsumedByTxIdChunked [prepUpdate txId <$> resolvedInputs] -- Return fee amount for caching/epoch calculations pure $ unDbLovelace $ vfFee valFee diff --git a/cardano-db-sync/src/Cardano/DbSync/Era/Universal/Epoch.hs b/cardano-db-sync/src/Cardano/DbSync/Era/Universal/Epoch.hs index 427066d01..c27905aba 100644 --- a/cardano-db-sync/src/Cardano/DbSync/Era/Universal/Epoch.hs +++ b/cardano-db-sync/src/Cardano/DbSync/Era/Universal/Epoch.hs @@ -220,8 +220,8 @@ insertEpochStake syncEnv nw epochNo stakeChunk = do dbStakes <- mapM mkStake stakeChunk let chunckDbStakes = DB.chunkForBulkQuery (Proxy @DB.EpochStake) Nothing dbStakes - -- minimising the bulk inserts into hundred thousand chunks to improve performance with pipeline - lift $ DB.insertBulkEpochStakePiped dbConstraintEpochStake chunckDbStakes + -- minimising the bulk inserts into hundred thousand chunks to improve performance + lift $ DB.insertBulkEpochStakeChunked dbConstraintEpochStake chunckDbStakes where mkStake :: (StakeCred, (Shelley.Coin, PoolKeyHash)) -> @@ -250,8 +250,8 @@ insertRewards syncEnv nw earnedEpoch spendableEpoch rewardsChunk = do dbRewards <- concatMapM mkRewards rewardsChunk DB.ManualDbConstraints {..} <- liftIO $ readTVarIO $ envDbConstraints syncEnv let chunckDbRewards = DB.chunkForBulkQuery (Proxy @DB.Reward) Nothing dbRewards - -- minimising the bulk inserts into hundred thousand chunks to improve performance with pipeline - lift $ DB.insertBulkRewardsPiped dbConstraintRewards chunckDbRewards + -- minimising the bulk inserts into hundred thousand chunks to improve performance + lift $ DB.insertBulkRewardsChunked dbConstraintRewards chunckDbRewards where mkRewards :: (StakeCred, Set Generic.Reward) -> @@ -294,8 +294,8 @@ insertRewardRests :: insertRewardRests syncEnv nw earnedEpoch spendableEpoch rewardsChunk = do dbRewards <- concatMapM mkRewards rewardsChunk let chunckDbRewards = DB.chunkForBulkQuery (Proxy @DB.RewardRest) Nothing dbRewards - -- minimising the bulk inserts into hundred thousand chunks to improve performance with pipeline - lift $ DB.insertBulkRewardRestsPiped chunckDbRewards + -- minimising the bulk inserts into hundred thousand chunks to improve performance + lift $ DB.insertBulkRewardRestsChunked chunckDbRewards where mkRewards :: (StakeCred, Set Generic.RewardRest) -> diff --git a/cardano-db-sync/src/Cardano/DbSync/Era/Universal/Insert/GovAction.hs b/cardano-db-sync/src/Cardano/DbSync/Era/Universal/Insert/GovAction.hs index 7125bd46f..fa4fcaf61 100644 --- a/cardano-db-sync/src/Cardano/DbSync/Era/Universal/Insert/GovAction.hs +++ b/cardano-db-sync/src/Cardano/DbSync/Era/Universal/Insert/GovAction.hs @@ -129,7 +129,7 @@ insertGovActionProposal syncEnv blkId txId govExpiresAt mcgs (index, (govId, pp) let withdrawalChunks = DB.chunkForBulkQuery (Proxy @DB.TreasuryWithdrawal) Nothing withdrawals -- Process all chunks to create treasury withdrawals with resolved IDs allTreasuryWithdrawals <- mapM processChunk withdrawalChunks - -- Insert all chunks in a single pipeline operation + -- Insert all chunks lift $ DB.insertBulkTreasuryWithdrawal allTreasuryWithdrawals where processChunk chunk = do @@ -381,8 +381,8 @@ insertDrepDistr e pSnapshot = do drepChunks = DB.chunkForBulkQuery (Proxy @DB.DrepDistr) Nothing drepEntries -- Process all chunks to create DRep distribution entries allDrepDistrs <- mapM processChunk drepChunks - -- Insert all chunks in a single pipeline operation - lift $ DB.insertBulkDrepDistrPiped allDrepDistrs + -- Insert all chunks + lift $ DB.insertBulkDrepDistrChunked allDrepDistrs where processChunk = mapM mkEntry diff --git a/cardano-db-sync/src/Cardano/DbSync/Era/Universal/Insert/Grouped.hs b/cardano-db-sync/src/Cardano/DbSync/Era/Universal/Insert/Grouped.hs index 3fb612a47..bf469d7f6 100644 --- a/cardano-db-sync/src/Cardano/DbSync/Era/Universal/Insert/Grouped.hs +++ b/cardano-db-sync/src/Cardano/DbSync/Era/Universal/Insert/Grouped.hs @@ -116,11 +116,11 @@ insertBlockGroupedData syncEnv grouped = do -- Sequential TxOut processing (generates required IDs) txOutIds <- concat <$> mapM (lift . DB.insertBulkTxOut disInOut) txOutChunks -- Execute independent operations (TxIn, Metadata, Mint) in parallel - txInIds <- executePreparedTxInPiped preparedTxIn + txInIds <- executePreparedTxInChunked preparedTxIn -- TxOut-dependent operations (MaTxOut + UTxO consumption) maTxOutIds <- processMaTxOuts syncEnv txOutIds grouped - executePreparedMetadataPiped preparedMetadata - executePreparedMintPiped preparedMint + executePreparedMetadataChunked preparedMetadata + executePreparedMintChunked preparedMint -- Process UTxO consumption (depends on txOutIds) processUtxoConsumption syncEnv grouped txOutIds @@ -332,22 +332,22 @@ prepareMintProcessing _syncEnv grouped = { pmtChunks = DB.chunkForBulkQuery (Proxy @DB.MaTxMint) Nothing $ groupedTxMint grouped } --- | Execute prepared TxIn operations (using pipeline) -executePreparedTxInPiped :: PreparedTxIn -> ExceptT SyncNodeError DB.DbM [DB.TxInId] -executePreparedTxInPiped prepared = +-- | Execute prepared TxIn operations in chunks +executePreparedTxInChunked :: PreparedTxIn -> ExceptT SyncNodeError DB.DbM [DB.TxInId] +executePreparedTxInChunked prepared = if ptiSkip prepared then pure [] - else lift $ DB.insertBulkTxInPiped (ptiChunks prepared) + else lift $ DB.insertBulkTxInChunked (ptiChunks prepared) --- | Execute prepared Metadata operations (using pipeline) -executePreparedMetadataPiped :: PreparedMetadata -> ExceptT SyncNodeError DB.DbM () -executePreparedMetadataPiped prepared = - void $ lift $ DB.insertBulkTxMetadataPiped (pmRemoveJsonb prepared) (pmChunks prepared) +-- | Execute prepared Metadata operations in chunks +executePreparedMetadataChunked :: PreparedMetadata -> ExceptT SyncNodeError DB.DbM () +executePreparedMetadataChunked prepared = + void $ lift $ DB.insertBulkTxMetadataChunked (pmRemoveJsonb prepared) (pmChunks prepared) --- | Execute prepared Mint operations (using pipeline) -executePreparedMintPiped :: PreparedMint -> ExceptT SyncNodeError DB.DbM () -executePreparedMintPiped prepared = - void $ lift $ DB.insertBulkMaTxMintPiped (pmtChunks prepared) +-- | Execute prepared Mint operations in chunks +executePreparedMintChunked :: PreparedMint -> ExceptT SyncNodeError DB.DbM () +executePreparedMintChunked prepared = + void $ lift $ DB.insertBulkMaTxMintChunked (pmtChunks prepared) -- | Process MaTxOut operations (depends on TxOut IDs) processMaTxOuts :: SyncEnv -> [DB.TxOutIdW] -> BlockGroupedData -> ExceptT SyncNodeError DB.DbM [DB.MaTxOutIdW] @@ -357,7 +357,7 @@ processMaTxOuts syncEnv txOutIds grouped = do concatMap (mkmaTxOuts txOutVariantType) $ zip txOutIds (snd <$> groupedTxOut grouped) maTxOutChunks = DB.chunkForBulkQueryWith (DB.getMaTxOutBulkSize txOutVariantType) maTxOuts - lift $ DB.insertBulkMaTxOutPiped maTxOutChunks + lift $ DB.insertBulkMaTxOutChunked maTxOutChunks -- | Process UTxO consumption updates (depends on TxOut IDs) processUtxoConsumption :: SyncEnv -> BlockGroupedData -> [DB.TxOutIdW] -> ExceptT SyncNodeError DB.DbM () @@ -377,12 +377,12 @@ processUtxoConsumption syncEnv grouped txOutIds = do unless (null hashBasedUpdates) $ void $ lift $ - DB.updateConsumedByTxHashPiped txOutVariantType hashUpdateChunks + DB.updateConsumedByTxHashChunked txOutVariantType hashUpdateChunks -- Individual process ID-based updates unless (null idBasedUpdates) $ void $ lift $ - DB.updateListTxOutConsumedByTxIdBP idUpdateChunks + DB.updateListTxOutConsumedByTxIdChunked idUpdateChunks -- Log failures mapM_ (liftIO . logWarning tracer . ("Failed to find output for " <>) . Text.pack . show) failedInputs diff --git a/cardano-db/src/Cardano/Db/Statement/Base.hs b/cardano-db/src/Cardano/Db/Statement/Base.hs index 3b65935a2..e9d3f1fc0 100644 --- a/cardano-db/src/Cardano/Db/Statement/Base.hs +++ b/cardano-db/src/Cardano/Db/Statement/Base.hs @@ -978,8 +978,8 @@ insertBulkTxMetadataStmt removeJsonb = , map SCB.txMetadataTxId xs ) -insertBulkTxMetadataPiped :: HasCallStack => Bool -> [[SCB.TxMetadata]] -> DbM [Id.TxMetadataId] -insertBulkTxMetadataPiped removeJsonb txMetaChunks = +insertBulkTxMetadataChunked :: HasCallStack => Bool -> [[SCB.TxMetadata]] -> DbM [Id.TxMetadataId] +insertBulkTxMetadataChunked removeJsonb txMetaChunks = runSession mkDbCallStack $ concat <$> traverse (\chunk -> HsqlSes.statement chunk (insertBulkTxMetadataStmt removeJsonb)) txMetaChunks @@ -1365,8 +1365,8 @@ insertBulkTxInStmt = , map SCB.txInRedeemerId xs ) -insertBulkTxInPiped :: HasCallStack => [[SCB.TxIn]] -> DbM [Id.TxInId] -insertBulkTxInPiped txInChunks = +insertBulkTxInChunked :: HasCallStack => [[SCB.TxIn]] -> DbM [Id.TxInId] +insertBulkTxInChunked txInChunks = concat <$> runSession mkDbCallStack diff --git a/cardano-db/src/Cardano/Db/Statement/ConsumedTxOut.hs b/cardano-db/src/Cardano/Db/Statement/ConsumedTxOut.hs index 49a43cc4e..1d72ab050 100644 --- a/cardano-db/src/Cardano/Db/Statement/ConsumedTxOut.hs +++ b/cardano-db/src/Cardano/Db/Statement/ConsumedTxOut.hs @@ -531,11 +531,11 @@ data BulkConsumedByHash = BulkConsumedByHash , bchConsumingTxId :: !Id.TxId } -updateConsumedByTxHashPiped :: +updateConsumedByTxHashChunked :: TxOutVariantType -> [[BulkConsumedByHash]] -> DbM () -updateConsumedByTxHashPiped txOutVariantType consumedData = do +updateConsumedByTxHashChunked txOutVariantType consumedData = do unless (null consumedData) $ do case txOutVariantType of TxOutVariantCore -> do @@ -673,8 +673,8 @@ migrateTxOutDbTool bulkSize txOutVariantType = do -------------------------------------------------------------------------------- -- | Update a list of TxOut consumed by TxId mappings using bulked statements -updateListTxOutConsumedByTxIdBP :: [[(TxOutIdW, Id.TxId)]] -> DbM () -updateListTxOutConsumedByTxIdBP chunks = do +updateListTxOutConsumedByTxIdChunked :: [[(TxOutIdW, Id.TxId)]] -> DbM () +updateListTxOutConsumedByTxIdChunked chunks = do unless (null chunks) $ do !_results <- runSession mkDbCallStack $ diff --git a/cardano-db/src/Cardano/Db/Statement/GovernanceAndVoting.hs b/cardano-db/src/Cardano/Db/Statement/GovernanceAndVoting.hs index 642689106..1a729e7e6 100644 --- a/cardano-db/src/Cardano/Db/Statement/GovernanceAndVoting.hs +++ b/cardano-db/src/Cardano/Db/Statement/GovernanceAndVoting.hs @@ -256,8 +256,8 @@ insertBulkDrepDistrStmt = , map SGV.drepDistrActiveUntil xs ) -insertBulkDrepDistrPiped :: HasCallStack => [[SGV.DrepDistr]] -> DbM () -insertBulkDrepDistrPiped drepDistrChunks = +insertBulkDrepDistrChunked :: HasCallStack => [[SGV.DrepDistr]] -> DbM () +insertBulkDrepDistrChunked drepDistrChunks = runSession mkDbCallStack $ traverse_ (`HsqlSes.statement` insertBulkDrepDistrStmt) drepDistrChunks diff --git a/cardano-db/src/Cardano/Db/Statement/MultiAsset.hs b/cardano-db/src/Cardano/Db/Statement/MultiAsset.hs index f0d776cb2..ffac4dd2e 100644 --- a/cardano-db/src/Cardano/Db/Statement/MultiAsset.hs +++ b/cardano-db/src/Cardano/Db/Statement/MultiAsset.hs @@ -78,8 +78,8 @@ insertBulkMaTxMintStmt = , map SMA.maTxMintIdent xs ) -insertBulkMaTxMintPiped :: HasCallStack => [[SMA.MaTxMint]] -> DbM [Id.MaTxMintId] -insertBulkMaTxMintPiped maTxMintChunks = +insertBulkMaTxMintChunked :: HasCallStack => [[SMA.MaTxMint]] -> DbM [Id.MaTxMintId] +insertBulkMaTxMintChunked maTxMintChunks = concat <$> runSession mkDbCallStack diff --git a/cardano-db/src/Cardano/Db/Statement/StakeDelegation.hs b/cardano-db/src/Cardano/Db/Statement/StakeDelegation.hs index 19a7ef45d..231a5b511 100644 --- a/cardano-db/src/Cardano/Db/Statement/StakeDelegation.hs +++ b/cardano-db/src/Cardano/Db/Statement/StakeDelegation.hs @@ -95,8 +95,8 @@ insertBulkEpochStake dbConstraintEpochStake epochStakes = HsqlSes.statement epochStakes $ insertBulkEpochStakeStmt dbConstraintEpochStake -insertBulkEpochStakePiped :: Bool -> [[SS.EpochStake]] -> DbM () -insertBulkEpochStakePiped dbConstraintEpochStake epochStakeChunks = +insertBulkEpochStakeChunked :: Bool -> [[SS.EpochStake]] -> DbM () +insertBulkEpochStakeChunked dbConstraintEpochStake epochStakeChunks = runSession mkDbCallStack $ traverse_ (\chunk -> HsqlSes.statement chunk (insertBulkEpochStakeStmt dbConstraintEpochStake)) epochStakeChunks @@ -185,8 +185,8 @@ insertBulkRewards dbConstraintRewards rewards = HsqlSes.statement rewards $ insertBulkRewardsStmt dbConstraintRewards -insertBulkRewardsPiped :: Bool -> [[SS.Reward]] -> DbM () -insertBulkRewardsPiped dbConstraintRewards rewardChunks = +insertBulkRewardsChunked :: Bool -> [[SS.Reward]] -> DbM () +insertBulkRewardsChunked dbConstraintRewards rewardChunks = runSession mkDbCallStack $ traverse_ (\chunk -> HsqlSes.statement chunk (insertBulkRewardsStmt dbConstraintRewards)) rewardChunks @@ -358,8 +358,8 @@ insertBulkRewardRests rewardRests = runSession mkDbCallStack $ HsqlSes.statement rewardRests insertBulkRewardRestsStmt -insertBulkRewardRestsPiped :: [[SS.RewardRest]] -> DbM () -insertBulkRewardRestsPiped rewardRestChunks = +insertBulkRewardRestsChunked :: [[SS.RewardRest]] -> DbM () +insertBulkRewardRestsChunked rewardRestChunks = runSession mkDbCallStack $ traverse_ (`HsqlSes.statement` insertBulkRewardRestsStmt) rewardRestChunks diff --git a/cardano-db/src/Cardano/Db/Statement/Variants/TxOut.hs b/cardano-db/src/Cardano/Db/Statement/Variants/TxOut.hs index d38568a39..1f7823779 100644 --- a/cardano-db/src/Cardano/Db/Statement/Variants/TxOut.hs +++ b/cardano-db/src/Cardano/Db/Statement/Variants/TxOut.hs @@ -136,9 +136,9 @@ insertBulkAddressTxOutStmt = , map SVA.txOutAddressAddressId xs ) -insertBulkTxOutPiped :: Bool -> [[TxOutW]] -> DbM [TxOutIdW] -insertBulkTxOutPiped _ [] = pure [] -insertBulkTxOutPiped disInOut chunks = +insertBulkTxOutChunked :: Bool -> [[TxOutW]] -> DbM [TxOutIdW] +insertBulkTxOutChunked _ [] = pure [] +insertBulkTxOutChunked disInOut chunks = if disInOut then pure [] else case getFirstNonEmpty chunks of @@ -592,9 +592,9 @@ insertBulkAddressMaTxOutStmt = , map SVA.maTxOutAddressIdent xs ) -insertBulkMaTxOutPiped :: [[MaTxOutW]] -> DbM [MaTxOutIdW] -insertBulkMaTxOutPiped [] = pure [] -insertBulkMaTxOutPiped chunks = +insertBulkMaTxOutChunked :: [[MaTxOutW]] -> DbM [MaTxOutIdW] +insertBulkMaTxOutChunked [] = pure [] +insertBulkMaTxOutChunked chunks = case getFirstNonEmpty chunks of Nothing -> pure [] Just (CMaTxOutW _) -> do From 226892aa94eb92a47953b2daf538b80776810ba3 Mon Sep 17 00:00:00 2001 From: Kostas Dermentzis Date: Thu, 19 Feb 2026 14:28:21 +0200 Subject: [PATCH 2/7] Use pipelining in grouped inserts --- .../DbSync/Era/Universal/Insert/Grouped.hs | 134 +++++++----------- 1 file changed, 48 insertions(+), 86 deletions(-) diff --git a/cardano-db-sync/src/Cardano/DbSync/Era/Universal/Insert/Grouped.hs b/cardano-db-sync/src/Cardano/DbSync/Era/Universal/Insert/Grouped.hs index bf469d7f6..6f85247f4 100644 --- a/cardano-db-sync/src/Cardano/DbSync/Era/Universal/Insert/Grouped.hs +++ b/cardano-db-sync/src/Cardano/DbSync/Era/Universal/Insert/Grouped.hs @@ -35,6 +35,8 @@ import Cardano.DbSync.Era.Shelley.Generic.Util (unTxHash) import Cardano.DbSync.Era.Shelley.Query import Cardano.DbSync.Error (SyncNodeError (..), mkSyncNodeCallStack) import Cardano.Prelude +import qualified Hasql.Pipeline as HsqlP +import qualified Hasql.Session as HsqlSes -- | Group data within the same block, to insert them together in batches -- @@ -90,7 +92,6 @@ instance Semigroup BlockGroupedData where (groupedTxFees tgd1 + groupedTxFees tgd2) (groupedTxOutSum tgd1 + groupedTxOutSum tgd2) --- | Parallel implementation with single connection coordination insertBlockGroupedData :: SyncEnv -> BlockGroupedData -> @@ -98,34 +99,55 @@ insertBlockGroupedData :: insertBlockGroupedData syncEnv grouped = do disInOut <- liftIO $ getDisableInOutState syncEnv - -- Parallel preparation of independent data - (preparedTxIn, preparedMetadata, preparedMint, txOutChunks) <- liftIO $ do - a1 <- async $ pure $ prepareTxInProcessing syncEnv grouped - a2 <- async $ pure $ prepareMetadataProcessing syncEnv grouped - a3 <- async $ pure $ prepareMintProcessing syncEnv grouped - a4 <- async $ do - let txOutData = etoTxOut . fst <$> groupedTxOut grouped - bulkSize = DB.getTxOutBulkSize (getTxOutVariantType syncEnv) - pure $ DB.chunkForBulkQueryWith bulkSize txOutData - - r1 <- wait a1 - r2 <- wait a2 - r3 <- wait a3 - r4 <- wait a4 - pure (r1, r2, r3, r4) - -- Sequential TxOut processing (generates required IDs) - txOutIds <- concat <$> mapM (lift . DB.insertBulkTxOut disInOut) txOutChunks - -- Execute independent operations (TxIn, Metadata, Mint) in parallel - txInIds <- executePreparedTxInChunked preparedTxIn - -- TxOut-dependent operations (MaTxOut + UTxO consumption) - maTxOutIds <- processMaTxOuts syncEnv txOutIds grouped - executePreparedMetadataChunked preparedMetadata - executePreparedMintChunked preparedMint + let skipTxIn = getSkipTxIn syncEnv + rmJsonb = ioRemoveJsonbFromSchema $ soptInsertOptions $ envOptions syncEnv + txOutVariantType = getTxOutVariantType syncEnv - -- Process UTxO consumption (depends on txOutIds) + -- Chunk data for bulk inserts + txInChunks = DB.chunkForBulkQuery (Proxy @DB.TxIn) Nothing $ etiTxIn <$> groupedTxIn grouped + metaChunks = DB.chunkForBulkQuery (Proxy @DB.TxMetadata) (Just $ envIsJsonbInSchema syncEnv) $ groupedTxMetadata grouped + mintChunks = DB.chunkForBulkQuery (Proxy @DB.MaTxMint) Nothing $ groupedTxMint grouped + txOutData = etoTxOut . fst <$> groupedTxOut grouped + txOutChunks = DB.chunkForBulkQueryWith (DB.getTxOutBulkSize txOutVariantType) txOutData + + -- Pipeline all bulk inserts: TxOut + TxIn + Metadata + Mint + (txOutIds, txInIds) <- lift $ DB.runSession DB.mkDbCallStack $ HsqlSes.pipeline $ do + -- TxOut + txOutResults <- + if disInOut + then pure [] + else case txOutVariantType of + DB.TxOutVariantCore -> do + ids <- for txOutChunks $ \chunk -> + HsqlP.statement (map extractCoreTxOut chunk) DB.insertBulkCoreTxOutStmt + pure $ map DB.VCTxOutIdW (concat ids) + DB.TxOutVariantAddress -> do + ids <- for txOutChunks $ \chunk -> + HsqlP.statement (map extractVariantTxOut chunk) DB.insertBulkAddressTxOutStmt + pure $ map DB.VATxOutIdW (concat ids) + -- TxIn + txInResults <- + if skipTxIn + then pure [] + else for txInChunks $ \chunk -> HsqlP.statement chunk DB.insertBulkTxInStmt + -- Metadata + Mint + for_ metaChunks $ \chunk -> HsqlP.statement chunk (DB.insertBulkTxMetadataStmt rmJsonb) + for_ mintChunks $ \chunk -> HsqlP.statement chunk DB.insertBulkMaTxMintStmt + pure (txOutResults, concat txInResults) + + -- TxOut-dependent operations + maTxOutIds <- processMaTxOuts syncEnv txOutIds grouped processUtxoConsumption syncEnv grouped txOutIds pure $ makeMinId syncEnv txInIds txOutIds maTxOutIds + where + extractCoreTxOut :: DB.TxOutW -> VC.TxOutCore + extractCoreTxOut (DB.VCTxOutW txOut) = txOut + extractCoreTxOut _ = panic "Unexpected VATxOutW in CoreTxOut list" + + extractVariantTxOut :: DB.TxOutW -> VA.TxOutAddress + extractVariantTxOut (DB.VATxOutW txOut _) = txOut + extractVariantTxOut _ = panic "Unexpected VCTxOutW in VariantTxOut list" mkmaTxOuts :: DB.TxOutVariantType -> (DB.TxOutIdW, [MissingMaTxOut]) -> [DB.MaTxOutW] mkmaTxOuts _txOutVariantType (txOutId, mmtos) = mkmaTxOut <$> mmtos @@ -290,65 +312,9 @@ matches txIn eutxo = DB.VATxOutW vTxOut _ -> VA.txOutAddressIndex vTxOut ----------------------------------------------------------------------------------------------------------------------------------- --- PARALLEL PROCESSING HELPER FUNCTIONS +-- HELPER FUNCTIONS ----------------------------------------------------------------------------------------------------------------------------------- --- | Prepared TxIn data for async execution -data PreparedTxIn = PreparedTxIn - { ptiChunks :: ![[DB.TxIn]] - , ptiSkip :: !Bool - } - --- | Prepared Metadata data for async execution -data PreparedMetadata = PreparedMetadata - { pmChunks :: ![[DB.TxMetadata]] - , pmRemoveJsonb :: !Bool - } - --- | Prepared Mint data for async execution -data PreparedMint where - PreparedMint :: {pmtChunks :: ![[DB.MaTxMint]]} -> PreparedMint - --- | Prepare TxIn processing (can run in parallel with TxOut) -prepareTxInProcessing :: SyncEnv -> BlockGroupedData -> PreparedTxIn -prepareTxInProcessing syncEnv grouped = - PreparedTxIn - { ptiChunks = DB.chunkForBulkQuery (Proxy @DB.TxIn) Nothing $ etiTxIn <$> groupedTxIn grouped - , ptiSkip = getSkipTxIn syncEnv - } - --- | Prepare Metadata processing (fully independent) -prepareMetadataProcessing :: SyncEnv -> BlockGroupedData -> PreparedMetadata -prepareMetadataProcessing syncEnv grouped = - PreparedMetadata - { pmChunks = DB.chunkForBulkQuery (Proxy @DB.TxMetadata) (Just $ envIsJsonbInSchema syncEnv) $ groupedTxMetadata grouped - , pmRemoveJsonb = ioRemoveJsonbFromSchema $ soptInsertOptions $ envOptions syncEnv - } - --- | Prepare Mint processing (fully independent) -prepareMintProcessing :: SyncEnv -> BlockGroupedData -> PreparedMint -prepareMintProcessing _syncEnv grouped = - PreparedMint - { pmtChunks = DB.chunkForBulkQuery (Proxy @DB.MaTxMint) Nothing $ groupedTxMint grouped - } - --- | Execute prepared TxIn operations in chunks -executePreparedTxInChunked :: PreparedTxIn -> ExceptT SyncNodeError DB.DbM [DB.TxInId] -executePreparedTxInChunked prepared = - if ptiSkip prepared - then pure [] - else lift $ DB.insertBulkTxInChunked (ptiChunks prepared) - --- | Execute prepared Metadata operations in chunks -executePreparedMetadataChunked :: PreparedMetadata -> ExceptT SyncNodeError DB.DbM () -executePreparedMetadataChunked prepared = - void $ lift $ DB.insertBulkTxMetadataChunked (pmRemoveJsonb prepared) (pmChunks prepared) - --- | Execute prepared Mint operations in chunks -executePreparedMintChunked :: PreparedMint -> ExceptT SyncNodeError DB.DbM () -executePreparedMintChunked prepared = - void $ lift $ DB.insertBulkMaTxMintChunked (pmtChunks prepared) - -- | Process MaTxOut operations (depends on TxOut IDs) processMaTxOuts :: SyncEnv -> [DB.TxOutIdW] -> BlockGroupedData -> ExceptT SyncNodeError DB.DbM [DB.MaTxOutIdW] processMaTxOuts syncEnv txOutIds grouped = do @@ -386,10 +352,6 @@ processUtxoConsumption syncEnv grouped txOutIds = do -- Log failures mapM_ (liftIO . logWarning tracer . ("Failed to find output for " <>) . Text.pack . show) failedInputs ------------------------------------------------------------------------------------------------------------------------------------ --- PARALLEL PROCESSING HELPER FUNCTIONS (NO PIPELINES) ------------------------------------------------------------------------------------------------------------------------------------ - -- | Helper function to create MinIds result makeMinId :: SyncEnv -> [DB.TxInId] -> [DB.TxOutIdW] -> [DB.MaTxOutIdW] -> DB.MinIdsWrapper makeMinId syncEnv txInIds txOutIds maTxOutIds = From ea7399ebf2ba5394b0c4c1e7643dd05544f403d6 Mon Sep 17 00:00:00 2001 From: Kostas Dermentzis Date: Thu, 19 Feb 2026 14:41:36 +0200 Subject: [PATCH 3/7] Deleted unused inserts functions Due to pipelining we use the *Stmt directly --- cardano-db/src/Cardano/Db/Statement/Base.hs | 16 +------ .../src/Cardano/Db/Statement/MultiAsset.hs | 11 +---- .../Cardano/Db/Statement/Variants/TxOut.hs | 42 ------------------- 3 files changed, 2 insertions(+), 67 deletions(-) diff --git a/cardano-db/src/Cardano/Db/Statement/Base.hs b/cardano-db/src/Cardano/Db/Statement/Base.hs index e9d3f1fc0..ab39eb41f 100644 --- a/cardano-db/src/Cardano/Db/Statement/Base.hs +++ b/cardano-db/src/Cardano/Db/Statement/Base.hs @@ -15,7 +15,7 @@ module Cardano.Db.Statement.Base where import Cardano.BM.Data.Trace (Trace) import Cardano.BM.Trace (logInfo, logWarning, nullTracer) import Cardano.Ledger.BaseTypes (SlotNo (..)) -import Cardano.Prelude (ByteString, HasCallStack, Int64, MonadIO (..), Proxy (..), Word64, for, textShow, void) +import Cardano.Prelude (ByteString, HasCallStack, Int64, MonadIO (..), Proxy (..), Word64, textShow, void) import Data.Functor.Contravariant ((>$<)) import Data.List (partition) import Data.Maybe (fromMaybe, isJust) @@ -978,11 +978,6 @@ insertBulkTxMetadataStmt removeJsonb = , map SCB.txMetadataTxId xs ) -insertBulkTxMetadataChunked :: HasCallStack => Bool -> [[SCB.TxMetadata]] -> DbM [Id.TxMetadataId] -insertBulkTxMetadataChunked removeJsonb txMetaChunks = - runSession mkDbCallStack $ - concat <$> traverse (\chunk -> HsqlSes.statement chunk (insertBulkTxMetadataStmt removeJsonb)) txMetaChunks - -------------------------------------------------------------------------------- -- CollateralTxIn -------------------------------------------------------------------------------- @@ -1365,15 +1360,6 @@ insertBulkTxInStmt = , map SCB.txInRedeemerId xs ) -insertBulkTxInChunked :: HasCallStack => [[SCB.TxIn]] -> DbM [Id.TxInId] -insertBulkTxInChunked txInChunks = - concat - <$> runSession - mkDbCallStack - ( for txInChunks $ \chunk -> - HsqlSes.statement chunk insertBulkTxInStmt - ) - -------------------------------------------------------------------------------- queryTxInCount :: HasCallStack => DbM Word64 queryTxInCount = diff --git a/cardano-db/src/Cardano/Db/Statement/MultiAsset.hs b/cardano-db/src/Cardano/Db/Statement/MultiAsset.hs index ffac4dd2e..be534c6d9 100644 --- a/cardano-db/src/Cardano/Db/Statement/MultiAsset.hs +++ b/cardano-db/src/Cardano/Db/Statement/MultiAsset.hs @@ -3,7 +3,7 @@ module Cardano.Db.Statement.MultiAsset where -import Cardano.Prelude (ByteString, HasCallStack, for) +import Cardano.Prelude (ByteString, HasCallStack) import Data.Functor.Contravariant (Contravariant (..)) import qualified Data.Text as Text import qualified Data.Text.Encoding as TextEnc @@ -77,12 +77,3 @@ insertBulkMaTxMintStmt = , map SMA.maTxMintTxId xs , map SMA.maTxMintIdent xs ) - -insertBulkMaTxMintChunked :: HasCallStack => [[SMA.MaTxMint]] -> DbM [Id.MaTxMintId] -insertBulkMaTxMintChunked maTxMintChunks = - concat - <$> runSession - mkDbCallStack - ( for maTxMintChunks $ \chunk -> - HsqlSes.statement chunk insertBulkMaTxMintStmt - ) diff --git a/cardano-db/src/Cardano/Db/Statement/Variants/TxOut.hs b/cardano-db/src/Cardano/Db/Statement/Variants/TxOut.hs index 1f7823779..5aff9d0b7 100644 --- a/cardano-db/src/Cardano/Db/Statement/Variants/TxOut.hs +++ b/cardano-db/src/Cardano/Db/Statement/Variants/TxOut.hs @@ -136,48 +136,6 @@ insertBulkAddressTxOutStmt = , map SVA.txOutAddressAddressId xs ) -insertBulkTxOutChunked :: Bool -> [[TxOutW]] -> DbM [TxOutIdW] -insertBulkTxOutChunked _ [] = pure [] -insertBulkTxOutChunked disInOut chunks = - if disInOut - then pure [] - else case getFirstNonEmpty chunks of - Nothing -> pure [] - Just (VCTxOutW _) -> do - coreIds <- - concat - <$> runSession - mkDbCallStack - ( traverse - ( \chunk -> - let coreTxOuts = map extractCoreTxOut chunk - in HsqlSes.statement coreTxOuts insertBulkCoreTxOutStmt - ) - chunks - ) - pure $ map VCTxOutIdW coreIds - Just (VATxOutW _ _) -> do - addressIds <- - concat - <$> runSession - mkDbCallStack - ( traverse - ( \chunk -> - let variantTxOuts = map extractVariantTxOut chunk - in HsqlSes.statement variantTxOuts insertBulkAddressTxOutStmt - ) - chunks - ) - pure $ map VATxOutIdW addressIds - where - extractCoreTxOut :: TxOutW -> SVC.TxOutCore - extractCoreTxOut (VCTxOutW txOut) = txOut - extractCoreTxOut (VATxOutW _ _) = error "Unexpected VATxOutW in CoreTxOut list" - - extractVariantTxOut :: TxOutW -> SVA.TxOutAddress - extractVariantTxOut (VATxOutW txOut _) = txOut - extractVariantTxOut (VCTxOutW _) = error "Unexpected VCTxOutW in VariantTxOut list" - insertBulkTxOut :: Bool -> [TxOutW] -> DbM [TxOutIdW] insertBulkTxOut disInOut txOutWs = if disInOut From 29d20137ea36e21f61a43a796457691af45bede0 Mon Sep 17 00:00:00 2001 From: Kostas Dermentzis Date: Tue, 24 Feb 2026 10:32:46 +0200 Subject: [PATCH 4/7] Use insertCheckUnique for MultiAsset --- cardano-db/src/Cardano/Db/Statement/MultiAsset.hs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/cardano-db/src/Cardano/Db/Statement/MultiAsset.hs b/cardano-db/src/Cardano/Db/Statement/MultiAsset.hs index be534c6d9..a37117c3f 100644 --- a/cardano-db/src/Cardano/Db/Statement/MultiAsset.hs +++ b/cardano-db/src/Cardano/Db/Statement/MultiAsset.hs @@ -17,7 +17,7 @@ import Cardano.Db.Schema.Core.MultiAsset (MaTxMint) import qualified Cardano.Db.Schema.Core.MultiAsset as SMA import qualified Cardano.Db.Schema.Ids as Id import Cardano.Db.Statement.Function.Core (ResultType (..), ResultTypeBulk (..), runSession) -import Cardano.Db.Statement.Function.Insert (insert) +import Cardano.Db.Statement.Function.Insert (insertCheckUnique) import Cardano.Db.Statement.Function.InsertBulk (insertBulk) import Cardano.Db.Types (DbInt65, DbM) @@ -28,7 +28,7 @@ import Cardano.Db.Types (DbInt65, DbM) -- | INSERT -------------------------------------------------------------------- insertMultiAssetStmt :: HsqlStmt.Statement SMA.MultiAsset Id.MultiAssetId insertMultiAssetStmt = - insert + insertCheckUnique SMA.multiAssetEncoder (WithResult $ HsqlD.singleRow $ Id.idDecoder Id.MultiAssetId) From 619e6c9e8d25ce84ce4ffdbb0b7d5c01c8683721 Mon Sep 17 00:00:00 2001 From: Kostas Dermentzis Date: Tue, 24 Feb 2026 10:34:19 +0200 Subject: [PATCH 5/7] Simplify runtime constraint handling --- .../Db/Statement/Function/InsertBulk.hs | 78 ------------------- .../Cardano/Db/Statement/StakeDelegation.hs | 39 +++++----- 2 files changed, 18 insertions(+), 99 deletions(-) diff --git a/cardano-db/src/Cardano/Db/Statement/Function/InsertBulk.hs b/cardano-db/src/Cardano/Db/Statement/Function/InsertBulk.hs index ef62590fe..92fe289ea 100644 --- a/cardano-db/src/Cardano/Db/Statement/Function/InsertBulk.hs +++ b/cardano-db/src/Cardano/Db/Statement/Function/InsertBulk.hs @@ -12,8 +12,6 @@ module Cardano.Db.Statement.Function.InsertBulk ( -- * Convenience Functions insertBulk, insertBulkJsonb, - insertBulkMaybeIgnore, - insertBulkMaybeIgnoreWithConstraint, ) where @@ -168,79 +166,3 @@ insertBulkJsonb :: ResultTypeBulk r -> HsqlS.Statement [a] r insertBulkJsonb = insertBulkWith NoConflict - ------------------------------------------------------------------------------------------------------------------------------------ --- PERFORMANCE-OPTIMIZED FUNCTIONS FOR ManualDbConstraints PATTERN ------------------------------------------------------------------------------------------------------------------------------------ - --- | High-performance bulk insert with conditional conflict handling. --- --- Optimized for the ManualDbConstraints pattern where constraint existence --- is determined at runtime. Uses fastest simple insert when constraints don't --- exist, switches to conflict handling only when needed. --- --- ==== Parameters --- * @constraintExists@: Runtime flag indicating if constraints are present. --- * @extract@: Function to extract fields from a list of records. --- * @encoder@: Encoder for the extracted fields. --- * @returnIds@: Result type indicating whether to return generated IDs. --- * @statement@: The prepared statement that can be executed. -insertBulkMaybeIgnore :: - forall a b r. - DbInfo a => - Bool -> -- Whether constraint exists (from ManualDbConstraints) - ([a] -> b) -> - HsqlE.Params b -> - ResultTypeBulk r -> - HsqlS.Statement [a] r -insertBulkMaybeIgnore constraintExists extract enc returnIds = - if constraintExists - then insertBulkWith conflictStrategy False extract enc returnIds - else insertBulk extract enc returnIds -- Fastest when no constraint exists - where - conflictStrategy = case uniqueFields (Proxy @a) of - [] -> IgnoreWithConstraint (autoConstraintName (Proxy @a)) -- For generated columns - cols -> IgnoreWithColumns cols -- For normal columns - --- | Conditional bulk insert with custom constraint name specification. --- --- Similar to `insertBulkMaybeIgnore` but allows specifying a custom constraint --- name for special cases where the auto-derived constraint name doesn't match --- the actual database constraint. --- --- ==== Parameters --- * @constraintExists@: Runtime flag indicating if constraints are present. --- * @constraintName@: Custom name of the constraint to handle conflicts on. --- * @extract@: Function to extract fields from a list of records. --- * @encoder@: Encoder for the extracted fields. --- * @returnIds@: Result type indicating whether to return generated IDs. --- * @statement@: The prepared statement that can be executed. -insertBulkMaybeIgnoreWithConstraint :: - forall a b r. - DbInfo a => - Bool -> -- Whether constraint exists - Text.Text -> -- Custom constraint name - ([a] -> b) -> - HsqlE.Params b -> - ResultTypeBulk r -> - HsqlS.Statement [a] r -insertBulkMaybeIgnoreWithConstraint constraintExists constraintName extract enc returnIds = - if constraintExists - then insertBulkWith (IgnoreWithConstraint constraintName) False extract enc returnIds - else insertBulk extract enc returnIds - ------------------------------------------------------------------------------------------------------------------------------------ --- HELPER FUNCTIONS ------------------------------------------------------------------------------------------------------------------------------------ - --- | Auto-derives PostgreSQL constraint names following standard conventions. --- --- Generates constraint names in the format "unique_{table_name}" which matches --- PostgreSQL's default naming convention for unique constraints. Used internally --- by bulk insert functions when constraint names need to be inferred. --- --- ==== Parameters --- * @proxy@: Type proxy for the table type. --- * @constraintName@: Generated constraint name following PostgreSQL conventions. -autoConstraintName :: DbInfo a => Proxy a -> Text.Text -autoConstraintName p = "unique_" <> tableName p diff --git a/cardano-db/src/Cardano/Db/Statement/StakeDelegation.hs b/cardano-db/src/Cardano/Db/Statement/StakeDelegation.hs index 231a5b511..05f1caedb 100644 --- a/cardano-db/src/Cardano/Db/Statement/StakeDelegation.hs +++ b/cardano-db/src/Cardano/Db/Statement/StakeDelegation.hs @@ -24,9 +24,10 @@ import qualified Cardano.Db.Schema.Core.Base as SCB import qualified Cardano.Db.Schema.Core.EpochAndProtocol as SEP import qualified Cardano.Db.Schema.Core.StakeDelegation as SS import qualified Cardano.Db.Schema.Ids as Id +import Cardano.Db.Statement.Constraint (constraintNameEpochStake, constraintNameReward, unConstraintNameDB) import Cardano.Db.Statement.Function.Core (ResultType (..), ResultTypeBulk (..), bulkEncoder, runSession) import Cardano.Db.Statement.Function.Insert (insert, insertCheckUnique) -import Cardano.Db.Statement.Function.InsertBulk (insertBulk, insertBulkMaybeIgnore, insertBulkMaybeIgnoreWithConstraint) +import Cardano.Db.Statement.Function.InsertBulk (ConflictStrategy (..), insertBulk, insertBulkWith) import Cardano.Db.Statement.Function.Query (adaSumDecoder, countAll) import Cardano.Db.Statement.Types (DbInfo (..)) import Cardano.Db.Types (Ada, DbLovelace, DbM, RewardSource, dbLovelaceDecoder, rewardSourceDecoder, rewardSourceEncoder) @@ -75,11 +76,19 @@ queryDelegationScript = -- | INSERT -------------------------------------------------------------------- insertBulkEpochStakeStmt :: Bool -> HsqlStmt.Statement [SS.EpochStake] () insertBulkEpochStakeStmt dbConstraintEpochStake = - insertBulkMaybeIgnore - dbConstraintEpochStake - extractEpochStake - SS.epochStakeBulkEncoder - NoResultBulk + if dbConstraintEpochStake + then + insertBulkWith + (IgnoreWithConstraint $ unConstraintNameDB constraintNameEpochStake) + False + extractEpochStake + SS.epochStakeBulkEncoder + NoResultBulk + else + insertBulk + extractEpochStake + SS.epochStakeBulkEncoder + NoResultBulk where extractEpochStake :: [SS.EpochStake] -> ([Id.StakeAddressId], [Id.PoolHashId], [DbLovelace], [Word64]) extractEpochStake xs = @@ -89,12 +98,6 @@ insertBulkEpochStakeStmt dbConstraintEpochStake = , map SS.epochStakeEpochNo xs ) -insertBulkEpochStake :: Bool -> [SS.EpochStake] -> DbM () -insertBulkEpochStake dbConstraintEpochStake epochStakes = - runSession mkDbCallStack $ - HsqlSes.statement epochStakes $ - insertBulkEpochStakeStmt dbConstraintEpochStake - insertBulkEpochStakeChunked :: Bool -> [[SS.EpochStake]] -> DbM () insertBulkEpochStakeChunked dbConstraintEpochStake epochStakeChunks = runSession mkDbCallStack $ @@ -158,9 +161,9 @@ insertBulkRewardsStmt :: Bool -> HsqlStmt.Statement [SS.Reward] () insertBulkRewardsStmt dbConstraintRewards = if dbConstraintRewards then - insertBulkMaybeIgnoreWithConstraint - True - "unique_reward" + insertBulkWith + (IgnoreWithConstraint $ unConstraintNameDB constraintNameReward) + False extractReward SS.rewardBulkEncoder NoResultBulk @@ -179,12 +182,6 @@ insertBulkRewardsStmt dbConstraintRewards = , map SS.rewardPoolId xs ) -insertBulkRewards :: Bool -> [SS.Reward] -> DbM () -insertBulkRewards dbConstraintRewards rewards = - runSession mkDbCallStack $ - HsqlSes.statement rewards $ - insertBulkRewardsStmt dbConstraintRewards - insertBulkRewardsChunked :: Bool -> [[SS.Reward]] -> DbM () insertBulkRewardsChunked dbConstraintRewards rewardChunks = runSession mkDbCallStack $ From 1b1e831882fed6e1a95c36f4abe8699428b291f9 Mon Sep 17 00:00:00 2001 From: Kostas Dermentzis Date: Tue, 24 Feb 2026 10:35:29 +0200 Subject: [PATCH 6/7] Remove unique_drep_distr in favor of an index --- .../Db/Schema/Core/GovernanceAndVoting.hs | 1 - schema/migration-2-0046-20250223.sql | 17 +++++++++++++++++ schema/migration-4-0009-20250223.sql | 2 ++ 3 files changed, 19 insertions(+), 1 deletion(-) create mode 100644 schema/migration-2-0046-20250223.sql create mode 100644 schema/migration-4-0009-20250223.sql diff --git a/cardano-db/src/Cardano/Db/Schema/Core/GovernanceAndVoting.hs b/cardano-db/src/Cardano/Db/Schema/Core/GovernanceAndVoting.hs index c35a2da3e..516214f5f 100644 --- a/cardano-db/src/Cardano/Db/Schema/Core/GovernanceAndVoting.hs +++ b/cardano-db/src/Cardano/Db/Schema/Core/GovernanceAndVoting.hs @@ -103,7 +103,6 @@ data DrepDistr = DrepDistr type instance Key DrepDistr = Id.DrepDistrId instance DbInfo DrepDistr where - uniqueFields _ = ["hash_id", "epoch_no"] unnestParamTypes _ = [ ("hash_id", "bigint[]") , ("amount", "bigint[]") diff --git a/schema/migration-2-0046-20250223.sql b/schema/migration-2-0046-20250223.sql new file mode 100644 index 000000000..ec97a53c1 --- /dev/null +++ b/schema/migration-2-0046-20250223.sql @@ -0,0 +1,17 @@ +CREATE FUNCTION migrate() RETURNS void AS $$ +DECLARE + next_version int ; +BEGIN + SELECT stage_two + 1 INTO next_version FROM schema_version ; + IF next_version = 46 THEN + + ALTER TABLE "drep_distr" DROP CONSTRAINT IF EXISTS "unique_drep_distr" ; + + UPDATE schema_version SET stage_two = next_version ; + RAISE NOTICE 'DB has been migrated to stage_two version %', next_version ; + END IF ; +END ; +$$ LANGUAGE plpgsql ; + +SELECT migrate() ; +DROP FUNCTION migrate() ; diff --git a/schema/migration-4-0009-20250223.sql b/schema/migration-4-0009-20250223.sql new file mode 100644 index 000000000..12646c109 --- /dev/null +++ b/schema/migration-4-0009-20250223.sql @@ -0,0 +1,2 @@ + +CREATE INDEX IF NOT EXISTS idx_drep_distr_hash_id_epoch_no ON drep_distr(hash_id, epoch_no) ; From e0d067e333cab52fa7b2c07b6a23e92f859d6612 Mon Sep 17 00:00:00 2001 From: Kostas Dermentzis Date: Mon, 9 Mar 2026 15:15:41 +0200 Subject: [PATCH 7/7] remove unused runDbPoolLogged --- .../src/Cardano/DbSync/Database.hs | 2 +- cardano-db/src/Cardano/Db/Run.hs | 30 ------------------- 2 files changed, 1 insertion(+), 31 deletions(-) diff --git a/cardano-db-sync/src/Cardano/DbSync/Database.hs b/cardano-db-sync/src/Cardano/DbSync/Database.hs index 86d841714..7887d01db 100644 --- a/cardano-db-sync/src/Cardano/DbSync/Database.hs +++ b/cardano-db-sync/src/Cardano/DbSync/Database.hs @@ -86,7 +86,7 @@ runDbThread syncEnv queue = do updateBlockMetrics = do let metricsSetters = envMetricSetters syncEnv void $ async $ do - mBlock <- DB.runDbPoolLogged (fromMaybe mempty $ DB.dbTracer $ envDbEnv syncEnv) (envDbEnv syncEnv) DB.queryLatestBlock + mBlock <- DB.runDbPoolTransLogged (fromMaybe mempty $ DB.dbTracer $ envDbEnv syncEnv) (envDbEnv syncEnv) Nothing DB.queryLatestBlock liftIO $ whenJust mBlock $ \block -> do let blockNo = BlockNo $ fromMaybe 0 $ DB.blockBlockNo block slotNo = SlotNo $ fromMaybe 0 $ DB.blockSlotNo block diff --git a/cardano-db/src/Cardano/Db/Run.hs b/cardano-db/src/Cardano/Db/Run.hs index 7bbeff78a..1b7d96db0 100644 --- a/cardano-db/src/Cardano/Db/Run.hs +++ b/cardano-db/src/Cardano/Db/Run.hs @@ -198,36 +198,6 @@ runDbPoolTransLogged tracer dbEnv mIsolationLevel action = do HsqlS.statement () commitTransactionStmt pure value -runDbPoolLogged :: - MonadUnliftIO m => - Trace IO Text -> - DbEnv -> - DbM a -> - m a -runDbPoolLogged tracer dbEnv action = do - case dbPoolConnection dbEnv of - Nothing -> throwIO $ DbSessionError mkDbCallStack "No connection pool available in DbEnv" - Just pool -> do - runIohkLogging tracer $ do - liftIO $ withResource pool $ \conn -> do - result <- HsqlS.run (transactionSession conn) conn - case result of - Left sessionErr -> throwIO $ DbSessionError mkDbCallStack ("Pool transaction error: " <> formatSessionError sessionErr) - Right dbResult -> pure dbResult - where - transactionSession conn = do - HsqlS.statement () (beginTransactionStmt RepeatableRead) - result <- liftIO $ try @SomeException $ do - let tempDbEnv = createDbEnv conn (dbPoolConnection dbEnv) (dbTracer dbEnv) - runReaderT (runDbM action) tempDbEnv - case result of - Left err -> do - HsqlS.statement () rollbackTransactionStmt - liftIO $ throwIO err - Right value -> do - HsqlS.statement () commitTransactionStmt - pure value - -- | External service database runner with error handling -- -- Designed for external services (like SMASH server) that manage their own connection pools.