Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion cardano-db-sync/src/Cardano/DbSync/Api/Ledger.hs
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,7 @@ storePage syncEnv blkEra percQuantum (n, ls) = do
txOuts <- mapM (prepareTxOut syncEnv blkEra) ls
txOutIds <- lift $ DB.insertBulkTxOut False $ etoTxOut . fst <$> txOuts
let maTxOuts = concatMap (mkmaTxOuts txOutVariantType) $ zip txOutIds (snd <$> txOuts)
void . lift $ DB.insertBulkMaTxOutPiped [maTxOuts]
void . lift $ DB.insertBulkMaTxOutChunked [maTxOuts]
where
txOutVariantType = getTxOutVariantType syncEnv
trce = getTrace syncEnv
Expand Down
2 changes: 1 addition & 1 deletion cardano-db-sync/src/Cardano/DbSync/Database.hs
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ runDbThread syncEnv queue = do
updateBlockMetrics = do
let metricsSetters = envMetricSetters syncEnv
void $ async $ do
mBlock <- DB.runDbPoolLogged (fromMaybe mempty $ DB.dbTracer $ envDbEnv syncEnv) (envDbEnv syncEnv) DB.queryLatestBlock
mBlock <- DB.runDbPoolTransLogged (fromMaybe mempty $ DB.dbTracer $ envDbEnv syncEnv) (envDbEnv syncEnv) Nothing DB.queryLatestBlock
liftIO $ whenJust mBlock $ \block -> do
let blockNo = BlockNo $ fromMaybe 0 $ DB.blockBlockNo block
slotNo = SlotNo $ fromMaybe 0 $ DB.blockSlotNo block
Expand Down
2 changes: 1 addition & 1 deletion cardano-db-sync/src/Cardano/DbSync/Era/Byron/Insert.hs
Original file line number Diff line number Diff line change
Expand Up @@ -314,7 +314,7 @@ insertByronTx' syncEnv blkId tx blockIndex = do
-- Update consumed TxOut records if enabled
whenConsumeOrPruneTxOut syncEnv $
lift $
DB.updateListTxOutConsumedByTxIdBP [prepUpdate txId <$> resolvedInputs]
DB.updateListTxOutConsumedByTxIdChunked [prepUpdate txId <$> resolvedInputs]

-- Return fee amount for caching/epoch calculations
pure $ unDbLovelace $ vfFee valFee
Expand Down
12 changes: 6 additions & 6 deletions cardano-db-sync/src/Cardano/DbSync/Era/Universal/Epoch.hs
Original file line number Diff line number Diff line change
Expand Up @@ -220,8 +220,8 @@ insertEpochStake syncEnv nw epochNo stakeChunk = do
dbStakes <- mapM mkStake stakeChunk
let chunckDbStakes = DB.chunkForBulkQuery (Proxy @DB.EpochStake) Nothing dbStakes

-- minimising the bulk inserts into hundred thousand chunks to improve performance with pipeline
lift $ DB.insertBulkEpochStakePiped dbConstraintEpochStake chunckDbStakes
-- minimising the bulk inserts into hundred thousand chunks to improve performance
lift $ DB.insertBulkEpochStakeChunked dbConstraintEpochStake chunckDbStakes
where
mkStake ::
(StakeCred, (Shelley.Coin, PoolKeyHash)) ->
Expand Down Expand Up @@ -250,8 +250,8 @@ insertRewards syncEnv nw earnedEpoch spendableEpoch rewardsChunk = do
dbRewards <- concatMapM mkRewards rewardsChunk
DB.ManualDbConstraints {..} <- liftIO $ readTVarIO $ envDbConstraints syncEnv
let chunckDbRewards = DB.chunkForBulkQuery (Proxy @DB.Reward) Nothing dbRewards
-- minimising the bulk inserts into hundred thousand chunks to improve performance with pipeline
lift $ DB.insertBulkRewardsPiped dbConstraintRewards chunckDbRewards
-- minimising the bulk inserts into hundred thousand chunks to improve performance
lift $ DB.insertBulkRewardsChunked dbConstraintRewards chunckDbRewards
where
mkRewards ::
(StakeCred, Set Generic.Reward) ->
Expand Down Expand Up @@ -294,8 +294,8 @@ insertRewardRests ::
insertRewardRests syncEnv nw earnedEpoch spendableEpoch rewardsChunk = do
dbRewards <- concatMapM mkRewards rewardsChunk
let chunckDbRewards = DB.chunkForBulkQuery (Proxy @DB.RewardRest) Nothing dbRewards
-- minimising the bulk inserts into hundred thousand chunks to improve performance with pipeline
lift $ DB.insertBulkRewardRestsPiped chunckDbRewards
-- minimising the bulk inserts into hundred thousand chunks to improve performance
lift $ DB.insertBulkRewardRestsChunked chunckDbRewards
where
mkRewards ::
(StakeCred, Set Generic.RewardRest) ->
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,7 @@ insertGovActionProposal syncEnv blkId txId govExpiresAt mcgs (index, (govId, pp)
let withdrawalChunks = DB.chunkForBulkQuery (Proxy @DB.TreasuryWithdrawal) Nothing withdrawals
-- Process all chunks to create treasury withdrawals with resolved IDs
allTreasuryWithdrawals <- mapM processChunk withdrawalChunks
-- Insert all chunks in a single pipeline operation
-- Insert all chunks
lift $ DB.insertBulkTreasuryWithdrawal allTreasuryWithdrawals
where
processChunk chunk = do
Expand Down Expand Up @@ -381,8 +381,8 @@ insertDrepDistr e pSnapshot = do
drepChunks = DB.chunkForBulkQuery (Proxy @DB.DrepDistr) Nothing drepEntries
-- Process all chunks to create DRep distribution entries
allDrepDistrs <- mapM processChunk drepChunks
-- Insert all chunks in a single pipeline operation
lift $ DB.insertBulkDrepDistrPiped allDrepDistrs
-- Insert all chunks
lift $ DB.insertBulkDrepDistrChunked allDrepDistrs
where
processChunk = mapM mkEntry

Expand Down
140 changes: 51 additions & 89 deletions cardano-db-sync/src/Cardano/DbSync/Era/Universal/Insert/Grouped.hs
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,8 @@ import Cardano.DbSync.Era.Shelley.Generic.Util (unTxHash)
import Cardano.DbSync.Era.Shelley.Query
import Cardano.DbSync.Error (SyncNodeError (..), mkSyncNodeCallStack)
import Cardano.Prelude
import qualified Hasql.Pipeline as HsqlP
import qualified Hasql.Session as HsqlSes

-- | Group data within the same block, to insert them together in batches
--
Expand Down Expand Up @@ -90,42 +92,62 @@ instance Semigroup BlockGroupedData where
(groupedTxFees tgd1 + groupedTxFees tgd2)
(groupedTxOutSum tgd1 + groupedTxOutSum tgd2)

-- | Parallel implementation with single connection coordination
insertBlockGroupedData ::
SyncEnv ->
BlockGroupedData ->
ExceptT SyncNodeError DB.DbM DB.MinIdsWrapper
insertBlockGroupedData syncEnv grouped = do
disInOut <- liftIO $ getDisableInOutState syncEnv

-- Parallel preparation of independent data
(preparedTxIn, preparedMetadata, preparedMint, txOutChunks) <- liftIO $ do
a1 <- async $ pure $ prepareTxInProcessing syncEnv grouped
a2 <- async $ pure $ prepareMetadataProcessing syncEnv grouped
a3 <- async $ pure $ prepareMintProcessing syncEnv grouped
a4 <- async $ do
let txOutData = etoTxOut . fst <$> groupedTxOut grouped
bulkSize = DB.getTxOutBulkSize (getTxOutVariantType syncEnv)
pure $ DB.chunkForBulkQueryWith bulkSize txOutData

r1 <- wait a1
r2 <- wait a2
r3 <- wait a3
r4 <- wait a4
pure (r1, r2, r3, r4)
-- Sequential TxOut processing (generates required IDs)
txOutIds <- concat <$> mapM (lift . DB.insertBulkTxOut disInOut) txOutChunks
-- Execute independent operations (TxIn, Metadata, Mint) in parallel
txInIds <- executePreparedTxInPiped preparedTxIn
-- TxOut-dependent operations (MaTxOut + UTxO consumption)
maTxOutIds <- processMaTxOuts syncEnv txOutIds grouped
executePreparedMetadataPiped preparedMetadata
executePreparedMintPiped preparedMint
let skipTxIn = getSkipTxIn syncEnv
rmJsonb = ioRemoveJsonbFromSchema $ soptInsertOptions $ envOptions syncEnv
txOutVariantType = getTxOutVariantType syncEnv

-- Process UTxO consumption (depends on txOutIds)
-- Chunk data for bulk inserts
txInChunks = DB.chunkForBulkQuery (Proxy @DB.TxIn) Nothing $ etiTxIn <$> groupedTxIn grouped
metaChunks = DB.chunkForBulkQuery (Proxy @DB.TxMetadata) (Just $ envIsJsonbInSchema syncEnv) $ groupedTxMetadata grouped
mintChunks = DB.chunkForBulkQuery (Proxy @DB.MaTxMint) Nothing $ groupedTxMint grouped
txOutData = etoTxOut . fst <$> groupedTxOut grouped
txOutChunks = DB.chunkForBulkQueryWith (DB.getTxOutBulkSize txOutVariantType) txOutData

-- Pipeline all bulk inserts: TxOut + TxIn + Metadata + Mint
(txOutIds, txInIds) <- lift $ DB.runSession DB.mkDbCallStack $ HsqlSes.pipeline $ do
-- TxOut
txOutResults <-
if disInOut
then pure []
else case txOutVariantType of
DB.TxOutVariantCore -> do
ids <- for txOutChunks $ \chunk ->
HsqlP.statement (map extractCoreTxOut chunk) DB.insertBulkCoreTxOutStmt
pure $ map DB.VCTxOutIdW (concat ids)
DB.TxOutVariantAddress -> do
ids <- for txOutChunks $ \chunk ->
HsqlP.statement (map extractVariantTxOut chunk) DB.insertBulkAddressTxOutStmt
pure $ map DB.VATxOutIdW (concat ids)
-- TxIn
txInResults <-
if skipTxIn
then pure []
else for txInChunks $ \chunk -> HsqlP.statement chunk DB.insertBulkTxInStmt
-- Metadata + Mint
for_ metaChunks $ \chunk -> HsqlP.statement chunk (DB.insertBulkTxMetadataStmt rmJsonb)
for_ mintChunks $ \chunk -> HsqlP.statement chunk DB.insertBulkMaTxMintStmt
pure (txOutResults, concat txInResults)

-- TxOut-dependent operations
maTxOutIds <- processMaTxOuts syncEnv txOutIds grouped
processUtxoConsumption syncEnv grouped txOutIds

pure $ makeMinId syncEnv txInIds txOutIds maTxOutIds
where
extractCoreTxOut :: DB.TxOutW -> VC.TxOutCore
extractCoreTxOut (DB.VCTxOutW txOut) = txOut
extractCoreTxOut _ = panic "Unexpected VATxOutW in CoreTxOut list"

extractVariantTxOut :: DB.TxOutW -> VA.TxOutAddress
extractVariantTxOut (DB.VATxOutW txOut _) = txOut
extractVariantTxOut _ = panic "Unexpected VCTxOutW in VariantTxOut list"

mkmaTxOuts :: DB.TxOutVariantType -> (DB.TxOutIdW, [MissingMaTxOut]) -> [DB.MaTxOutW]
mkmaTxOuts _txOutVariantType (txOutId, mmtos) = mkmaTxOut <$> mmtos
Expand Down Expand Up @@ -290,65 +312,9 @@ matches txIn eutxo =
DB.VATxOutW vTxOut _ -> VA.txOutAddressIndex vTxOut

-----------------------------------------------------------------------------------------------------------------------------------
-- PARALLEL PROCESSING HELPER FUNCTIONS
-- HELPER FUNCTIONS
-----------------------------------------------------------------------------------------------------------------------------------

-- | Prepared TxIn data for async execution
data PreparedTxIn = PreparedTxIn
{ ptiChunks :: ![[DB.TxIn]]
, ptiSkip :: !Bool
}

-- | Prepared Metadata data for async execution
data PreparedMetadata = PreparedMetadata
{ pmChunks :: ![[DB.TxMetadata]]
, pmRemoveJsonb :: !Bool
}

-- | Prepared Mint data for async execution
data PreparedMint where
PreparedMint :: {pmtChunks :: ![[DB.MaTxMint]]} -> PreparedMint

-- | Prepare TxIn processing (can run in parallel with TxOut)
prepareTxInProcessing :: SyncEnv -> BlockGroupedData -> PreparedTxIn
prepareTxInProcessing syncEnv grouped =
PreparedTxIn
{ ptiChunks = DB.chunkForBulkQuery (Proxy @DB.TxIn) Nothing $ etiTxIn <$> groupedTxIn grouped
, ptiSkip = getSkipTxIn syncEnv
}

-- | Prepare Metadata processing (fully independent)
prepareMetadataProcessing :: SyncEnv -> BlockGroupedData -> PreparedMetadata
prepareMetadataProcessing syncEnv grouped =
PreparedMetadata
{ pmChunks = DB.chunkForBulkQuery (Proxy @DB.TxMetadata) (Just $ envIsJsonbInSchema syncEnv) $ groupedTxMetadata grouped
, pmRemoveJsonb = ioRemoveJsonbFromSchema $ soptInsertOptions $ envOptions syncEnv
}

-- | Prepare Mint processing (fully independent)
prepareMintProcessing :: SyncEnv -> BlockGroupedData -> PreparedMint
prepareMintProcessing _syncEnv grouped =
PreparedMint
{ pmtChunks = DB.chunkForBulkQuery (Proxy @DB.MaTxMint) Nothing $ groupedTxMint grouped
}

-- | Execute prepared TxIn operations (using pipeline)
executePreparedTxInPiped :: PreparedTxIn -> ExceptT SyncNodeError DB.DbM [DB.TxInId]
executePreparedTxInPiped prepared =
if ptiSkip prepared
then pure []
else lift $ DB.insertBulkTxInPiped (ptiChunks prepared)

-- | Execute prepared Metadata operations (using pipeline)
executePreparedMetadataPiped :: PreparedMetadata -> ExceptT SyncNodeError DB.DbM ()
executePreparedMetadataPiped prepared =
void $ lift $ DB.insertBulkTxMetadataPiped (pmRemoveJsonb prepared) (pmChunks prepared)

-- | Execute prepared Mint operations (using pipeline)
executePreparedMintPiped :: PreparedMint -> ExceptT SyncNodeError DB.DbM ()
executePreparedMintPiped prepared =
void $ lift $ DB.insertBulkMaTxMintPiped (pmtChunks prepared)

-- | Process MaTxOut operations (depends on TxOut IDs)
processMaTxOuts :: SyncEnv -> [DB.TxOutIdW] -> BlockGroupedData -> ExceptT SyncNodeError DB.DbM [DB.MaTxOutIdW]
processMaTxOuts syncEnv txOutIds grouped = do
Expand All @@ -357,7 +323,7 @@ processMaTxOuts syncEnv txOutIds grouped = do
concatMap (mkmaTxOuts txOutVariantType) $
zip txOutIds (snd <$> groupedTxOut grouped)
maTxOutChunks = DB.chunkForBulkQueryWith (DB.getMaTxOutBulkSize txOutVariantType) maTxOuts
lift $ DB.insertBulkMaTxOutPiped maTxOutChunks
lift $ DB.insertBulkMaTxOutChunked maTxOutChunks

-- | Process UTxO consumption updates (depends on TxOut IDs)
processUtxoConsumption :: SyncEnv -> BlockGroupedData -> [DB.TxOutIdW] -> ExceptT SyncNodeError DB.DbM ()
Expand All @@ -377,19 +343,15 @@ processUtxoConsumption syncEnv grouped txOutIds = do
unless (null hashBasedUpdates) $
void $
lift $
DB.updateConsumedByTxHashPiped txOutVariantType hashUpdateChunks
DB.updateConsumedByTxHashChunked txOutVariantType hashUpdateChunks
-- Individual process ID-based updates
unless (null idBasedUpdates) $
void $
lift $
DB.updateListTxOutConsumedByTxIdBP idUpdateChunks
DB.updateListTxOutConsumedByTxIdChunked idUpdateChunks
-- Log failures
mapM_ (liftIO . logWarning tracer . ("Failed to find output for " <>) . Text.pack . show) failedInputs

-----------------------------------------------------------------------------------------------------------------------------------
-- PARALLEL PROCESSING HELPER FUNCTIONS (NO PIPELINES)
-----------------------------------------------------------------------------------------------------------------------------------

-- | Helper function to create MinIds result
makeMinId :: SyncEnv -> [DB.TxInId] -> [DB.TxOutIdW] -> [DB.MaTxOutIdW] -> DB.MinIdsWrapper
makeMinId syncEnv txInIds txOutIds maTxOutIds =
Expand Down
30 changes: 0 additions & 30 deletions cardano-db/src/Cardano/Db/Run.hs
Original file line number Diff line number Diff line change
Expand Up @@ -198,36 +198,6 @@ runDbPoolTransLogged tracer dbEnv mIsolationLevel action = do
HsqlS.statement () commitTransactionStmt
pure value

runDbPoolLogged ::
MonadUnliftIO m =>
Trace IO Text ->
DbEnv ->
DbM a ->
m a
runDbPoolLogged tracer dbEnv action = do
case dbPoolConnection dbEnv of
Nothing -> throwIO $ DbSessionError mkDbCallStack "No connection pool available in DbEnv"
Just pool -> do
runIohkLogging tracer $ do
liftIO $ withResource pool $ \conn -> do
result <- HsqlS.run (transactionSession conn) conn
case result of
Left sessionErr -> throwIO $ DbSessionError mkDbCallStack ("Pool transaction error: " <> formatSessionError sessionErr)
Right dbResult -> pure dbResult
where
transactionSession conn = do
HsqlS.statement () (beginTransactionStmt RepeatableRead)
result <- liftIO $ try @SomeException $ do
let tempDbEnv = createDbEnv conn (dbPoolConnection dbEnv) (dbTracer dbEnv)
runReaderT (runDbM action) tempDbEnv
case result of
Left err -> do
HsqlS.statement () rollbackTransactionStmt
liftIO $ throwIO err
Right value -> do
HsqlS.statement () commitTransactionStmt
pure value

-- | External service database runner with error handling
--
-- Designed for external services (like SMASH server) that manage their own connection pools.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,6 @@ data DrepDistr = DrepDistr

type instance Key DrepDistr = Id.DrepDistrId
instance DbInfo DrepDistr where
uniqueFields _ = ["hash_id", "epoch_no"]
unnestParamTypes _ =
[ ("hash_id", "bigint[]")
, ("amount", "bigint[]")
Expand Down
16 changes: 1 addition & 15 deletions cardano-db/src/Cardano/Db/Statement/Base.hs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ module Cardano.Db.Statement.Base where
import Cardano.BM.Data.Trace (Trace)
import Cardano.BM.Trace (logInfo, logWarning, nullTracer)
import Cardano.Ledger.BaseTypes (SlotNo (..))
import Cardano.Prelude (ByteString, HasCallStack, Int64, MonadIO (..), Proxy (..), Word64, for, textShow, void)
import Cardano.Prelude (ByteString, HasCallStack, Int64, MonadIO (..), Proxy (..), Word64, textShow, void)
import Data.Functor.Contravariant ((>$<))
import Data.List (partition)
import Data.Maybe (fromMaybe, isJust)
Expand Down Expand Up @@ -978,11 +978,6 @@ insertBulkTxMetadataStmt removeJsonb =
, map SCB.txMetadataTxId xs
)

insertBulkTxMetadataPiped :: HasCallStack => Bool -> [[SCB.TxMetadata]] -> DbM [Id.TxMetadataId]
insertBulkTxMetadataPiped removeJsonb txMetaChunks =
runSession mkDbCallStack $
concat <$> traverse (\chunk -> HsqlSes.statement chunk (insertBulkTxMetadataStmt removeJsonb)) txMetaChunks

--------------------------------------------------------------------------------
-- CollateralTxIn
--------------------------------------------------------------------------------
Expand Down Expand Up @@ -1365,15 +1360,6 @@ insertBulkTxInStmt =
, map SCB.txInRedeemerId xs
)

insertBulkTxInPiped :: HasCallStack => [[SCB.TxIn]] -> DbM [Id.TxInId]
insertBulkTxInPiped txInChunks =
concat
<$> runSession
mkDbCallStack
( for txInChunks $ \chunk ->
HsqlSes.statement chunk insertBulkTxInStmt
)

--------------------------------------------------------------------------------
queryTxInCount :: HasCallStack => DbM Word64
queryTxInCount =
Expand Down
8 changes: 4 additions & 4 deletions cardano-db/src/Cardano/Db/Statement/ConsumedTxOut.hs
Original file line number Diff line number Diff line change
Expand Up @@ -531,11 +531,11 @@ data BulkConsumedByHash = BulkConsumedByHash
, bchConsumingTxId :: !Id.TxId
}

updateConsumedByTxHashPiped ::
updateConsumedByTxHashChunked ::
TxOutVariantType ->
[[BulkConsumedByHash]] ->
DbM ()
updateConsumedByTxHashPiped txOutVariantType consumedData = do
updateConsumedByTxHashChunked txOutVariantType consumedData = do
unless (null consumedData) $ do
case txOutVariantType of
TxOutVariantCore -> do
Expand Down Expand Up @@ -673,8 +673,8 @@ migrateTxOutDbTool bulkSize txOutVariantType = do
--------------------------------------------------------------------------------

-- | Update a list of TxOut consumed by TxId mappings using bulked statements
updateListTxOutConsumedByTxIdBP :: [[(TxOutIdW, Id.TxId)]] -> DbM ()
updateListTxOutConsumedByTxIdBP chunks = do
updateListTxOutConsumedByTxIdChunked :: [[(TxOutIdW, Id.TxId)]] -> DbM ()
updateListTxOutConsumedByTxIdChunked chunks = do
unless (null chunks) $ do
!_results <-
runSession mkDbCallStack $
Expand Down
Loading
Loading