From 031dd46c2101161e16d9071ed6d9102f960f41de Mon Sep 17 00:00:00 2001 From: Maxence Maireaux Date: Wed, 29 Oct 2025 17:38:43 +0100 Subject: [PATCH 1/5] perf: skip ledger filter in queries for single-ledger buckets Optimize query performance by conditionally removing WHERE ledger = ? clauses when a bucket contains only one ledger. The optimization is automatically detected and applied at runtime, providing 5-15% query performance improvement for single-ledger deployments with no impact on multi-ledger scenarios. Changes: - Add single-ledger detection cache in Store with thread-safe access - Implement applyLedgerFilter() and getLedgerFilterSQL() helpers - Refactor 38+ query locations across ledger storage package - Auto-detect single-ledger state on store creation and ledger open - Add CountLedgersInBucket() to system store for bucket analysis --- internal/storage/driver/driver.go | 10 ++++ internal/storage/ledger/accounts.go | 15 +++-- internal/storage/ledger/logs.go | 8 +-- internal/storage/ledger/resource_accounts.go | 19 +++--- .../ledger/resource_aggregated_balances.go | 16 ++--- internal/storage/ledger/resource_logs.go | 7 ++- .../storage/ledger/resource_transactions.go | 22 +++---- internal/storage/ledger/resource_volumes.go | 12 ++-- internal/storage/ledger/store.go | 58 +++++++++++++++++++ internal/storage/ledger/transactions.go | 37 +++++++++--- internal/storage/system/store.go | 16 +++++ 11 files changed, 168 insertions(+), 52 deletions(-) diff --git a/internal/storage/driver/driver.go b/internal/storage/driver/driver.go index 796fe33926..5b8b34d606 100644 --- a/internal/storage/driver/driver.go +++ b/internal/storage/driver/driver.go @@ -84,6 +84,11 @@ func (d *Driver) CreateLedger(ctx context.Context, l *ledger.Ledger) (*ledgersto return nil, postgres.ResolveError(err) } + // Update single-ledger optimization state + if err := ret.UpdateSingleLedgerState(ctx, d.systemStoreFactory.Create(d.db).CountLedgersInBucket); err != nil { + logging.FromContext(ctx).Debugf("Failed to update single-ledger state: %v", err) + } + return ret, nil } @@ -96,6 +101,11 @@ func (d *Driver) OpenLedger(ctx context.Context, name string) (*ledgerstore.Stor store := d.ledgerStoreFactory.Create(d.bucketFactory.Create(ret.Bucket), *ret) + // Update single-ledger optimization state + if err := store.UpdateSingleLedgerState(ctx, d.systemStoreFactory.Create(d.db).CountLedgersInBucket); err != nil { + logging.FromContext(ctx).Debugf("Failed to update single-ledger state: %v", err) + } + return store, ret, err } diff --git a/internal/storage/ledger/accounts.go b/internal/storage/ledger/accounts.go index 47ed625033..a5419e294c 100644 --- a/internal/storage/ledger/accounts.go +++ b/internal/storage/ledger/accounts.go @@ -5,6 +5,7 @@ import ( "fmt" . "github.com/formancehq/go-libs/v2/collectionutils" "github.com/formancehq/ledger/internal/tracing" + "github.com/uptrace/bun" "go.opentelemetry.io/otel/attribute" "go.opentelemetry.io/otel/trace" "regexp" @@ -77,12 +78,18 @@ func (store *Store) DeleteAccountMetadata(ctx context.Context, account, key stri store.tracer, store.deleteAccountMetadataHistogram, tracing.NoResult(func(ctx context.Context) error { - _, err := store.db.NewUpdate(). + query := store.db.NewUpdate(). ModelTableExpr(store.GetPrefixedRelationName("accounts")). Set("metadata = metadata - ?", key). - Where("address = ?", account). - Where("ledger = ?", store.ledger.Name). - Exec(ctx) + Where("address = ?", account) + query = query.WhereGroup(" AND ", func(q *bun.UpdateQuery) *bun.UpdateQuery { + ledgerFilter, ledgerArgs := store.getLedgerFilterSQL() + if ledgerFilter != "" { + return q.Where(ledgerFilter[4:], ledgerArgs...) // Skip "and " prefix + } + return q + }) + _, err := query.Exec(ctx) return postgres.ResolveError(err) }), ) diff --git a/internal/storage/ledger/logs.go b/internal/storage/ledger/logs.go index 38fd2670c6..71347f6b25 100644 --- a/internal/storage/ledger/logs.go +++ b/internal/storage/ledger/logs.go @@ -119,14 +119,14 @@ func (store *Store) ReadLogWithIdempotencyKey(ctx context.Context, key string) ( store.readLogWithIdempotencyKeyHistogram, func(ctx context.Context) (*ledger.Log, error) { ret := &Log{} - if err := store.db.NewSelect(). + query := store.db.NewSelect(). Model(ret). ModelTableExpr(store.GetPrefixedRelationName("logs")). Column("*"). Where("idempotency_key = ?", key). - Where("ledger = ?", store.ledger.Name). - Limit(1). - Scan(ctx); err != nil { + Limit(1) + query = store.applyLedgerFilter(query, "logs") + if err := query.Scan(ctx); err != nil { return nil, postgres.ResolveError(err) } diff --git a/internal/storage/ledger/resource_accounts.go b/internal/storage/ledger/resource_accounts.go index 3721a2b6a5..eba9fd9c42 100644 --- a/internal/storage/ledger/resource_accounts.go +++ b/internal/storage/ledger/resource_accounts.go @@ -54,8 +54,8 @@ func (h accountsResourceHandler) buildDataset(store *Store, opts repositoryHandl // Build the query ret = ret. ModelTableExpr(store.GetPrefixedRelationName("accounts")). - Column("address", "address_array", "first_usage", "insertion_date", "updated_at"). - Where("ledger = ?", store.ledger.Name) + Column("address", "address_array", "first_usage", "insertion_date", "updated_at") + ret = store.applyLedgerFilter(ret, "accounts") if opts.PIT != nil && !opts.PIT.IsZero() { ret = ret.Where("accounts.first_usage <= ?", opts.PIT) @@ -65,10 +65,10 @@ func (h accountsResourceHandler) buildDataset(store *Store, opts repositoryHandl selectDistinctAccountMetadataHistories := store.db.NewSelect(). DistinctOn("accounts_address"). ModelTableExpr(store.GetPrefixedRelationName("accounts_metadata")). - Where("ledger = ?", store.ledger.Name). Column("accounts_address"). ColumnExpr("first_value(metadata) over (partition by accounts_address order by revision desc) as metadata"). Where("date <= ?", opts.PIT) + selectDistinctAccountMetadataHistories = store.applyLedgerFilter(selectDistinctAccountMetadataHistories, "accounts_metadata") ret = ret. Join( @@ -92,8 +92,7 @@ func (h accountsResourceHandler) resolveFilter(store *Store, opts ledgercontroll case balanceRegex.MatchString(property) || property == "balance": selectBalance := store.db.NewSelect(). - Where("accounts_address = dataset.address"). - Where("ledger = ?", store.ledger.Name) + Where("accounts_address = dataset.address") if opts.PIT != nil && !opts.PIT.IsZero() { if !store.ledger.HasFeature(features.FeatureMovesHistory, "ON") { @@ -104,10 +103,12 @@ func (h accountsResourceHandler) resolveFilter(store *Store, opts ledgercontroll DistinctOn("asset"). ColumnExpr("first_value((post_commit_effective_volumes).inputs - (post_commit_effective_volumes).outputs) over (partition by (accounts_address, asset) order by effective_date desc, seq desc) as balance"). Where("effective_date <= ?", opts.PIT) + selectBalance = store.applyLedgerFilter(selectBalance, "moves") } else { selectBalance = selectBalance. ModelTableExpr(store.GetPrefixedRelationName("accounts_volumes")). ColumnExpr("input - output as balance") + selectBalance = store.applyLedgerFilter(selectBalance, "accounts_volumes") } if balanceRegex.MatchString(property) { @@ -154,8 +155,8 @@ func (h accountsResourceHandler) expand(store *Store, opts ledgercontroller.Reso selectRowsQuery = selectRowsQuery. ModelTableExpr(store.GetPrefixedRelationName("moves")). DistinctOn("accounts_address, asset"). - Column("accounts_address", "asset"). - Where("ledger = ?", store.ledger.Name) + Column("accounts_address", "asset") + selectRowsQuery = store.applyLedgerFilter(selectRowsQuery, "moves") if property == "volumes" { selectRowsQuery = selectRowsQuery. ColumnExpr("first_value(post_commit_volumes) over (partition by (accounts_address, asset) order by seq desc) as volumes"). @@ -169,8 +170,8 @@ func (h accountsResourceHandler) expand(store *Store, opts ledgercontroller.Reso selectRowsQuery = selectRowsQuery. ModelTableExpr(store.GetPrefixedRelationName("accounts_volumes")). Column("asset", "accounts_address"). - ColumnExpr("(input, output)::"+store.GetPrefixedRelationName("volumes")+" as volumes"). - Where("ledger = ?", store.ledger.Name) + ColumnExpr("(input, output)::"+store.GetPrefixedRelationName("volumes")+" as volumes") + selectRowsQuery = store.applyLedgerFilter(selectRowsQuery, "accounts_volumes") } return store.db.NewSelect(). diff --git a/internal/storage/ledger/resource_aggregated_balances.go b/internal/storage/ledger/resource_aggregated_balances.go index f1cbe204f5..370165e62c 100644 --- a/internal/storage/ledger/resource_aggregated_balances.go +++ b/internal/storage/ledger/resource_aggregated_balances.go @@ -52,8 +52,8 @@ func (h aggregatedBalancesResourceRepositoryHandler) buildDataset(store *Store, ret := store.db.NewSelect(). ModelTableExpr(store.GetPrefixedRelationName("moves")). DistinctOn("accounts_address, asset"). - Column("accounts_address", "asset"). - Where("ledger = ?", store.ledger.Name) + Column("accounts_address", "asset") + ret = store.applyLedgerFilter(ret, "moves") if query.Opts.UseInsertionDate { if !store.ledger.HasFeature(features.FeatureMovesHistory, "ON") { return nil, ledgercontroller.NewErrMissingFeature(features.FeatureMovesHistory) @@ -76,8 +76,8 @@ func (h aggregatedBalancesResourceRepositoryHandler) buildDataset(store *Store, subQuery := store.db.NewSelect(). TableExpr(store.GetPrefixedRelationName("accounts")). Column("address_array"). - Where("accounts.address = accounts_address"). - Where("ledger = ?", store.ledger.Name) + Where("accounts.address = accounts_address") + subQuery = store.applyLedgerFilter(subQuery, "accounts") ret = ret. ColumnExpr("accounts.address_array as accounts_address_array"). @@ -89,9 +89,9 @@ func (h aggregatedBalancesResourceRepositoryHandler) buildDataset(store *Store, DistinctOn("accounts_address"). ModelTableExpr(store.GetPrefixedRelationName("accounts_metadata")). ColumnExpr("first_value(metadata) over (partition by accounts_address order by revision desc) as metadata"). - Where("ledger = ?", store.ledger.Name). Where("accounts_metadata.accounts_address = moves.accounts_address"). Where("date <= ?", query.PIT) + subQuery = store.applyLedgerFilter(subQuery, "accounts_metadata") ret = ret. Join(`left join lateral (?) accounts_metadata on true`, subQuery). @@ -103,15 +103,15 @@ func (h aggregatedBalancesResourceRepositoryHandler) buildDataset(store *Store, ret := store.db.NewSelect(). ModelTableExpr(store.GetPrefixedRelationName("accounts_volumes")). Column("asset", "accounts_address"). - ColumnExpr("(input, output)::"+store.GetPrefixedRelationName("volumes")+" as volumes"). - Where("ledger = ?", store.ledger.Name) + ColumnExpr("(input, output)::"+store.GetPrefixedRelationName("volumes")+" as volumes") + ret = store.applyLedgerFilter(ret, "accounts_volumes") if query.useFilter("metadata") || query.useFilter("address", isPartialAddress) { subQuery := store.db.NewSelect(). TableExpr(store.GetPrefixedRelationName("accounts")). Column("address"). - Where("ledger = ?", store.ledger.Name). Where("accounts.address = accounts_address") + subQuery = store.applyLedgerFilter(subQuery, "accounts") if query.useFilter("address") { subQuery = subQuery.ColumnExpr("address_array as accounts_address_array") diff --git a/internal/storage/ledger/resource_logs.go b/internal/storage/ledger/resource_logs.go index 544912a228..d21694d96d 100644 --- a/internal/storage/ledger/resource_logs.go +++ b/internal/storage/ledger/resource_logs.go @@ -22,10 +22,11 @@ func (h logsResourceHandler) filters() []filter { } func (h logsResourceHandler) buildDataset(store *Store, _ repositoryHandlerBuildContext[any]) (*bun.SelectQuery, error) { - return store.db.NewSelect(). + ret := store.db.NewSelect(). ModelTableExpr(store.GetPrefixedRelationName("logs")). - ColumnExpr("*"). - Where("ledger = ?", store.ledger.Name), nil + ColumnExpr("*") + ret = store.applyLedgerFilter(ret, "logs") + return ret, nil } func (h logsResourceHandler) resolveFilter(_ *Store, _ ledgercontroller.ResourceQuery[any], operator, property string, value any) (string, []any, error) { diff --git a/internal/storage/ledger/resource_transactions.go b/internal/storage/ledger/resource_transactions.go index 6b9ea4a86b..cbb86a1881 100644 --- a/internal/storage/ledger/resource_transactions.go +++ b/internal/storage/ledger/resource_transactions.go @@ -83,8 +83,8 @@ func (h transactionsResourceHandler) buildDataset(store *Store, opts repositoryH "destinations", "sources_arrays", "destinations_arrays", - ). - Where("ledger = ?", store.ledger.Name) + ) + ret = store.applyLedgerFilter(ret, "transactions") if slices.Contains(opts.Expand, "volumes") { ret = ret.Column("post_commit_volumes") @@ -98,10 +98,10 @@ func (h transactionsResourceHandler) buildDataset(store *Store, opts repositoryH selectDistinctTransactionMetadataHistories := store.db.NewSelect(). DistinctOn("transactions_id"). ModelTableExpr(store.GetPrefixedRelationName("transactions_metadata")). - Where("ledger = ?", store.ledger.Name). Column("transactions_id", "metadata"). Order("transactions_id", "revision desc"). Where("date <= ?", opts.PIT) + selectDistinctTransactionMetadataHistories = store.applyLedgerFilter(selectDistinctTransactionMetadataHistories, "transactions_metadata") ret = ret. Join( @@ -163,19 +163,21 @@ func (h transactionsResourceHandler) expand(store *Store, opts ledgercontroller. return nil, nil, nil } + movesSubquery := store.db.NewSelect(). + DistinctOn("transactions_id, accounts_address, asset"). + ModelTableExpr(store.GetPrefixedRelationName("moves")). + Column("transactions_id", "accounts_address", "asset"). + ColumnExpr(`first_value(moves.post_commit_effective_volumes) over (partition by (transactions_id, accounts_address, asset) order by seq desc) as post_commit_effective_volumes`). + Where("transactions_id in (select id from dataset)") + movesSubquery = store.applyLedgerFilter(movesSubquery, "moves") + ret := store.db.NewSelect(). TableExpr( "(?) data", store.db.NewSelect(). TableExpr( "(?) moves", - store.db.NewSelect(). - DistinctOn("transactions_id, accounts_address, asset"). - ModelTableExpr(store.GetPrefixedRelationName("moves")). - Column("transactions_id", "accounts_address", "asset"). - ColumnExpr(`first_value(moves.post_commit_effective_volumes) over (partition by (transactions_id, accounts_address, asset) order by seq desc) as post_commit_effective_volumes`). - Where("ledger = ?", store.ledger.Name). - Where("transactions_id in (select id from dataset)"), + movesSubquery, ). Column("transactions_id", "accounts_address"). ColumnExpr(`public.aggregate_objects(json_build_object(moves.asset, json_build_object('input', (moves.post_commit_effective_volumes).inputs, 'output', (moves.post_commit_effective_volumes).outputs))::jsonb) AS post_commit_effective_volumes`). diff --git a/internal/storage/ledger/resource_volumes.go b/internal/storage/ledger/resource_volumes.go index 475ac65338..03a76a4e53 100644 --- a/internal/storage/ledger/resource_volumes.go +++ b/internal/storage/ledger/resource_volumes.go @@ -65,15 +65,15 @@ func (h volumesResourceHandler) buildDataset(store *Store, query repositoryHandl ColumnExpr("input - output as balance"). ColumnExpr("accounts_address as account"). ModelTableExpr(store.GetPrefixedRelationName("accounts_volumes")). - Where("ledger = ?", store.ledger.Name). Order("accounts_address", "asset") + selectVolumes = store.applyLedgerFilter(selectVolumes, "accounts_volumes") if query.useFilter("metadata") || needAddressSegments { subQuery := store.db.NewSelect(). TableExpr(store.GetPrefixedRelationName("accounts")). Column("address"). - Where("ledger = ?", store.ledger.Name). Where("accounts.address = accounts_address") + subQuery = store.applyLedgerFilter(subQuery, "accounts") if needAddressSegments { subQuery = subQuery.ColumnExpr("address_array as account_array") @@ -99,9 +99,9 @@ func (h volumesResourceHandler) buildDataset(store *Store, query repositoryHandl ColumnExpr("sum(case when is_source then amount else 0 end) as output"). ColumnExpr("sum(case when not is_source then amount else -amount end) as balance"). ModelTableExpr(store.GetPrefixedRelationName("moves")). - Where("ledger = ?", store.ledger.Name). GroupExpr("accounts_address, asset"). Order("accounts_address", "asset") + selectVolumes = store.applyLedgerFilter(selectVolumes, "moves") dateFilterColumn := "effective_date" if query.Opts.UseInsertionDate { @@ -120,8 +120,8 @@ func (h volumesResourceHandler) buildDataset(store *Store, query repositoryHandl subQuery := store.db.NewSelect(). TableExpr(store.GetPrefixedRelationName("accounts")). Column("address_array"). - Where("accounts.address = accounts_address"). - Where("ledger = ?", store.ledger.Name) + Where("accounts.address = accounts_address") + subQuery = store.applyLedgerFilter(subQuery, "accounts") selectVolumes. ColumnExpr("(array_agg(accounts.address_array))[1] as account_array"). @@ -133,8 +133,8 @@ func (h volumesResourceHandler) buildDataset(store *Store, query repositoryHandl DistinctOn("accounts_address"). ModelTableExpr(store.GetPrefixedRelationName("accounts_metadata")). ColumnExpr("first_value(metadata) over (partition by accounts_address order by revision desc) as metadata"). - Where("ledger = ?", store.ledger.Name). Where("accounts_metadata.accounts_address = moves.accounts_address") + subQuery = store.applyLedgerFilter(subQuery, "accounts_metadata") selectVolumes = selectVolumes. Join(`left join lateral (?) accounts_metadata on true`, subQuery). diff --git a/internal/storage/ledger/store.go b/internal/storage/ledger/store.go index 0c323da5a3..7d2d77556a 100644 --- a/internal/storage/ledger/store.go +++ b/internal/storage/ledger/store.go @@ -4,6 +4,8 @@ import ( "context" "database/sql" "fmt" + "sync" + "github.com/formancehq/go-libs/v2/bun/bunpaginate" "github.com/formancehq/go-libs/v2/migrations" "github.com/formancehq/go-libs/v2/platform/postgres" @@ -20,11 +22,18 @@ import ( "github.com/uptrace/bun" ) +type singleLedgerOptimization struct { + mu sync.RWMutex + enabled bool +} + type Store struct { db bun.IDB bucket bucket.Bucket ledger ledger.Ledger + singleLedgerCache *singleLedgerOptimization + tracer trace.Tracer meter metric.Meter checkBucketSchemaHistogram metric.Int64Histogram @@ -135,6 +144,55 @@ func (store *Store) GetPrefixedRelationName(v string) string { return fmt.Sprintf(`"%s".%s`, store.ledger.Bucket, v) } +// isSingleLedger returns true if the bucket optimization is enabled for single-ledger scenarios. +// This allows queries to skip the WHERE ledger = ? clause when there's only one ledger in the bucket. +func (store *Store) isSingleLedger() bool { + if store.singleLedgerCache == nil { + return false + } + store.singleLedgerCache.mu.RLock() + defer store.singleLedgerCache.mu.RUnlock() + return store.singleLedgerCache.enabled +} + +// applyLedgerFilter conditionally applies the WHERE ledger = ? clause to a query. +// If the bucket contains only one ledger, the filter is skipped for performance optimization. +func (store *Store) applyLedgerFilter(query *bun.SelectQuery, tableAlias string) *bun.SelectQuery { + if store.isSingleLedger() { + return query + } + return query.Where(fmt.Sprintf("%s.ledger = ?", tableAlias), store.ledger.Name) +} + +// getLedgerFilterSQL returns the SQL fragment and arguments for ledger filtering in inline SQL. +// Returns empty string and nil args if single-ledger optimization is enabled. +func (store *Store) getLedgerFilterSQL() (string, []any) { + if store.isSingleLedger() { + return "", nil + } + return "and ledger = ?", []any{store.ledger.Name} +} + +// UpdateSingleLedgerState checks if the bucket contains only one ledger and updates the cache accordingly. +// This should be called during store initialization. +func (store *Store) UpdateSingleLedgerState(ctx context.Context, countFunc func(ctx context.Context, bucketName string) (int, error)) error { + if store.singleLedgerCache == nil { + store.singleLedgerCache = &singleLedgerOptimization{} + } + + count, err := countFunc(ctx, store.ledger.Bucket) + if err != nil { + return err + } + + store.singleLedgerCache.mu.Lock() + defer store.singleLedgerCache.mu.Unlock() + + store.singleLedgerCache.enabled = (count == 1) + + return nil +} + func validateAddressFilter(ledger ledger.Ledger, operator string, value any) error { if operator != "$match" { return fmt.Errorf("'address' column can only be used with $match, operator used is: %s", operator) diff --git a/internal/storage/ledger/transactions.go b/internal/storage/ledger/transactions.go index 9069de2dc7..e41e1a9697 100644 --- a/internal/storage/ledger/transactions.go +++ b/internal/storage/ledger/transactions.go @@ -176,11 +176,14 @@ func (store *Store) updateTxWithRetrieve(ctx context.Context, id int, query *bun ColumnExpr("upd.*, true as modified"). ModelTableExpr("upd"). UnionAll( - store.db.NewSelect(). - ModelTableExpr(store.GetPrefixedRelationName("transactions")). - ColumnExpr("*, false as modified"). - Where("id = ? and ledger = ?", id, store.ledger.Name). - Limit(1), + func() *bun.SelectQuery { + query := store.db.NewSelect(). + ModelTableExpr(store.GetPrefixedRelationName("transactions")). + ColumnExpr("*, false as modified"). + Where("id = ?", id). + Limit(1) + return store.applyLedgerFilter(query, "transactions") + }(), ), ). Model(me). @@ -203,8 +206,14 @@ func (store *Store) RevertTransaction(ctx context.Context, id int, at time.Time) ModelTableExpr(store.GetPrefixedRelationName("transactions")). Where("id = ?", id). Where("reverted_at is null"). - Where("ledger = ?", store.ledger.Name). Returning("*") + query = query.WhereGroup(" AND ", func(q *bun.UpdateQuery) *bun.UpdateQuery { + ledgerFilter, ledgerArgs := store.getLedgerFilterSQL() + if ledgerFilter != "" { + return q.Where(ledgerFilter[4:], ledgerArgs...) // Skip "and " prefix + } + return q + }) if at.IsZero() { query = query. Set("reverted_at = (now() at time zone 'utc')"). @@ -234,10 +243,16 @@ func (store *Store) UpdateTransactionMetadata(ctx context.Context, id int, m met Model(&ledger.Transaction{}). ModelTableExpr(store.GetPrefixedRelationName("transactions")). Where("id = ?", id). - Where("ledger = ?", store.ledger.Name). Set("metadata = metadata || ?", m). Where("not (metadata @> ?)", m). Returning("*") + updateQuery = updateQuery.WhereGroup(" AND ", func(q *bun.UpdateQuery) *bun.UpdateQuery { + ledgerFilter, ledgerArgs := store.getLedgerFilterSQL() + if ledgerFilter != "" { + return q.Where(ledgerFilter[4:], ledgerArgs...) // Skip "and " prefix + } + return q + }) if at.IsZero() { updateQuery = updateQuery.Set("updated_at = " + store.GetPrefixedRelationName("transaction_date") + "()") } else { @@ -264,9 +279,15 @@ func (store *Store) DeleteTransactionMetadata(ctx context.Context, id int, key s ModelTableExpr(store.GetPrefixedRelationName("transactions")). Set("metadata = metadata - ?", key). Where("id = ?", id). - Where("ledger = ?", store.ledger.Name). Where("metadata -> ? is not null", key). Returning("*") + updateQuery = updateQuery.WhereGroup(" AND ", func(q *bun.UpdateQuery) *bun.UpdateQuery { + ledgerFilter, ledgerArgs := store.getLedgerFilterSQL() + if ledgerFilter != "" { + return q.Where(ledgerFilter[4:], ledgerArgs...) // Skip "and " prefix + } + return q + }) if at.IsZero() { updateQuery = updateQuery.Set("updated_at = " + store.GetPrefixedRelationName("transaction_date") + "()") } else { diff --git a/internal/storage/system/store.go b/internal/storage/system/store.go index 5184b9d4bb..1ac335f21a 100644 --- a/internal/storage/system/store.go +++ b/internal/storage/system/store.go @@ -24,6 +24,7 @@ type Store interface { ListLedgers(ctx context.Context, q ledgercontroller.ListLedgersQuery) (*bunpaginate.Cursor[ledger.Ledger], error) GetLedger(ctx context.Context, name string) (*ledger.Ledger, error) GetDistinctBuckets(ctx context.Context) ([]string, error) + CountLedgersInBucket(ctx context.Context, bucketName string) (int, error) Migrate(ctx context.Context, options ...migrations.Option) error GetMigrator(options ...migrations.Option) *migrations.Migrator @@ -57,6 +58,21 @@ func (d *DefaultStore) GetDistinctBuckets(ctx context.Context) ([]string, error) return buckets, nil } +// CountLedgersInBucket returns the number of ledgers in a specific bucket. +// This is useful for determining if single-ledger optimizations can be applied. +func (d *DefaultStore) CountLedgersInBucket(ctx context.Context, bucketName string) (int, error) { + count, err := d.db.NewSelect(). + Model((*ledger.Ledger)(nil)). + Where("bucket = ?", bucketName). + Count(ctx) + + if err != nil { + return 0, postgres.ResolveError(err) + } + + return count, nil +} + func (d *DefaultStore) CreateLedger(ctx context.Context, l *ledger.Ledger) error { if l.Metadata == nil { From ec27538c614e10f4bdd468c0822b6575bae67225 Mon Sep 17 00:00:00 2001 From: Maxence Maireaux Date: Wed, 29 Oct 2025 18:00:03 +0100 Subject: [PATCH 2/5] fix: refactor getLedgerFilterSQL to remove fragile prefix handling Address CodeRabbit review comments: - Remove hardcoded "and " prefix from getLedgerFilterSQL() return value - Replace all fragile [4:] string slicing with clean filter application - Regenerate mocks to include CountLedgersInBucket method Changes: - store.go: getLedgerFilterSQL() now returns "ledger = ?" without prefix - accounts.go: Remove [4:] slicing in WhereGroup - transactions.go: Remove [4:] slicing in 3 WhereGroup calls - *_generated_test.go: Regenerated mocks with uber-go/mock This makes the code more robust and prevents potential SQL corruption if the filter format ever changes. --- .../storage/driver/buckets_generated_test.go | 12 ++++++++---- .../storage/driver/ledger_generated_test.go | 3 +++ .../storage/driver/system_generated_test.go | 18 ++++++++++++++++++ internal/storage/ledger/accounts.go | 2 +- internal/storage/ledger/store.go | 4 ++-- internal/storage/ledger/transactions.go | 6 +++--- 6 files changed, 35 insertions(+), 10 deletions(-) diff --git a/internal/storage/driver/buckets_generated_test.go b/internal/storage/driver/buckets_generated_test.go index 7331e1912c..2f0f14c95b 100644 --- a/internal/storage/driver/buckets_generated_test.go +++ b/internal/storage/driver/buckets_generated_test.go @@ -3,6 +3,8 @@ // Generated by this command: // // mockgen -write_source_comment=false -write_package_comment=false -source ../bucket/bucket.go -destination buckets_generated_test.go -package driver --mock_names Factory=BucketFactory . Factory +// + package driver import ( @@ -20,6 +22,7 @@ import ( type MockBucket struct { ctrl *gomock.Controller recorder *MockBucketMockRecorder + isgomock struct{} } // MockBucketMockRecorder is the mock recorder for MockBucket. @@ -40,17 +43,17 @@ func (m *MockBucket) EXPECT() *MockBucketMockRecorder { } // AddLedger mocks base method. -func (m *MockBucket) AddLedger(ctx context.Context, db bun.IDB, ledger ledger.Ledger) error { +func (m *MockBucket) AddLedger(ctx context.Context, db bun.IDB, arg2 ledger.Ledger) error { m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "AddLedger", ctx, db, ledger) + ret := m.ctrl.Call(m, "AddLedger", ctx, db, arg2) ret0, _ := ret[0].(error) return ret0 } // AddLedger indicates an expected call of AddLedger. -func (mr *MockBucketMockRecorder) AddLedger(ctx, db, ledger any) *gomock.Call { +func (mr *MockBucketMockRecorder) AddLedger(ctx, db, arg2 any) *gomock.Call { mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "AddLedger", reflect.TypeOf((*MockBucket)(nil).AddLedger), ctx, db, ledger) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "AddLedger", reflect.TypeOf((*MockBucket)(nil).AddLedger), ctx, db, arg2) } // GetLastVersion mocks base method. @@ -151,6 +154,7 @@ func (mr *MockBucketMockRecorder) Migrate(ctx, db any, opts ...any) *gomock.Call type BucketFactory struct { ctrl *gomock.Controller recorder *BucketFactoryMockRecorder + isgomock struct{} } // BucketFactoryMockRecorder is the mock recorder for BucketFactory. diff --git a/internal/storage/driver/ledger_generated_test.go b/internal/storage/driver/ledger_generated_test.go index fb2f8a6ab1..b940e41b32 100644 --- a/internal/storage/driver/ledger_generated_test.go +++ b/internal/storage/driver/ledger_generated_test.go @@ -3,6 +3,8 @@ // Generated by this command: // // mockgen -write_source_comment=false -write_package_comment=false -source ../ledger/factory.go -destination ledger_generated_test.go -package driver --mock_names Factory=LedgerStoreFactory . Factory +// + package driver import ( @@ -18,6 +20,7 @@ import ( type LedgerStoreFactory struct { ctrl *gomock.Controller recorder *LedgerStoreFactoryMockRecorder + isgomock struct{} } // LedgerStoreFactoryMockRecorder is the mock recorder for LedgerStoreFactory. diff --git a/internal/storage/driver/system_generated_test.go b/internal/storage/driver/system_generated_test.go index d6afce5735..90d8befcaf 100644 --- a/internal/storage/driver/system_generated_test.go +++ b/internal/storage/driver/system_generated_test.go @@ -3,6 +3,8 @@ // Generated by this command: // // mockgen -write_source_comment=false -write_package_comment=false -source ../system/store.go -destination system_generated_test.go -package driver --mock_names Store=SystemStore . Store +// + package driver import ( @@ -21,6 +23,7 @@ import ( type SystemStore struct { ctrl *gomock.Controller recorder *SystemStoreMockRecorder + isgomock struct{} } // SystemStoreMockRecorder is the mock recorder for SystemStore. @@ -40,6 +43,21 @@ func (m *SystemStore) EXPECT() *SystemStoreMockRecorder { return m.recorder } +// CountLedgersInBucket mocks base method. +func (m *SystemStore) CountLedgersInBucket(ctx context.Context, bucketName string) (int, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "CountLedgersInBucket", ctx, bucketName) + ret0, _ := ret[0].(int) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// CountLedgersInBucket indicates an expected call of CountLedgersInBucket. +func (mr *SystemStoreMockRecorder) CountLedgersInBucket(ctx, bucketName any) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "CountLedgersInBucket", reflect.TypeOf((*SystemStore)(nil).CountLedgersInBucket), ctx, bucketName) +} + // CreateLedger mocks base method. func (m *SystemStore) CreateLedger(ctx context.Context, l *ledger.Ledger) error { m.ctrl.T.Helper() diff --git a/internal/storage/ledger/accounts.go b/internal/storage/ledger/accounts.go index a5419e294c..65a5130fd0 100644 --- a/internal/storage/ledger/accounts.go +++ b/internal/storage/ledger/accounts.go @@ -85,7 +85,7 @@ func (store *Store) DeleteAccountMetadata(ctx context.Context, account, key stri query = query.WhereGroup(" AND ", func(q *bun.UpdateQuery) *bun.UpdateQuery { ledgerFilter, ledgerArgs := store.getLedgerFilterSQL() if ledgerFilter != "" { - return q.Where(ledgerFilter[4:], ledgerArgs...) // Skip "and " prefix + return q.Where(ledgerFilter, ledgerArgs...) } return q }) diff --git a/internal/storage/ledger/store.go b/internal/storage/ledger/store.go index 7d2d77556a..1d9e0fe166 100644 --- a/internal/storage/ledger/store.go +++ b/internal/storage/ledger/store.go @@ -164,13 +164,13 @@ func (store *Store) applyLedgerFilter(query *bun.SelectQuery, tableAlias string) return query.Where(fmt.Sprintf("%s.ledger = ?", tableAlias), store.ledger.Name) } -// getLedgerFilterSQL returns the SQL fragment and arguments for ledger filtering in inline SQL. +// getLedgerFilterSQL returns the SQL condition (without conjunction) and arguments for ledger filtering. // Returns empty string and nil args if single-ledger optimization is enabled. func (store *Store) getLedgerFilterSQL() (string, []any) { if store.isSingleLedger() { return "", nil } - return "and ledger = ?", []any{store.ledger.Name} + return "ledger = ?", []any{store.ledger.Name} } // UpdateSingleLedgerState checks if the bucket contains only one ledger and updates the cache accordingly. diff --git a/internal/storage/ledger/transactions.go b/internal/storage/ledger/transactions.go index e41e1a9697..209776145d 100644 --- a/internal/storage/ledger/transactions.go +++ b/internal/storage/ledger/transactions.go @@ -210,7 +210,7 @@ func (store *Store) RevertTransaction(ctx context.Context, id int, at time.Time) query = query.WhereGroup(" AND ", func(q *bun.UpdateQuery) *bun.UpdateQuery { ledgerFilter, ledgerArgs := store.getLedgerFilterSQL() if ledgerFilter != "" { - return q.Where(ledgerFilter[4:], ledgerArgs...) // Skip "and " prefix + return q.Where(ledgerFilter, ledgerArgs...) } return q }) @@ -249,7 +249,7 @@ func (store *Store) UpdateTransactionMetadata(ctx context.Context, id int, m met updateQuery = updateQuery.WhereGroup(" AND ", func(q *bun.UpdateQuery) *bun.UpdateQuery { ledgerFilter, ledgerArgs := store.getLedgerFilterSQL() if ledgerFilter != "" { - return q.Where(ledgerFilter[4:], ledgerArgs...) // Skip "and " prefix + return q.Where(ledgerFilter, ledgerArgs...) } return q }) @@ -284,7 +284,7 @@ func (store *Store) DeleteTransactionMetadata(ctx context.Context, id int, key s updateQuery = updateQuery.WhereGroup(" AND ", func(q *bun.UpdateQuery) *bun.UpdateQuery { ledgerFilter, ledgerArgs := store.getLedgerFilterSQL() if ledgerFilter != "" { - return q.Where(ledgerFilter[4:], ledgerArgs...) // Skip "and " prefix + return q.Where(ledgerFilter, ledgerArgs...) } return q }) From c8a67acb1fff35ece511fec9c5c60f732bccf9b3 Mon Sep 17 00:00:00 2001 From: Maxence Maireaux Date: Wed, 29 Oct 2025 18:22:39 +0100 Subject: [PATCH 3/5] refactor: update functions to accept context and improve ledger filtering --- internal/storage/driver/driver.go | 10 ---- internal/storage/driver/module.go | 22 +++++-- internal/storage/ledger/accounts.go | 2 +- internal/storage/ledger/logs.go | 2 +- internal/storage/ledger/resource.go | 30 +++++----- internal/storage/ledger/resource_accounts.go | 21 +++---- .../ledger/resource_aggregated_balances.go | 18 +++--- internal/storage/ledger/resource_logs.go | 11 ++-- .../storage/ledger/resource_transactions.go | 15 ++--- internal/storage/ledger/resource_volumes.go | 17 +++--- internal/storage/ledger/store.go | 57 +++++++------------ internal/storage/ledger/transactions.go | 8 +-- 12 files changed, 103 insertions(+), 110 deletions(-) diff --git a/internal/storage/driver/driver.go b/internal/storage/driver/driver.go index 5b8b34d606..796fe33926 100644 --- a/internal/storage/driver/driver.go +++ b/internal/storage/driver/driver.go @@ -84,11 +84,6 @@ func (d *Driver) CreateLedger(ctx context.Context, l *ledger.Ledger) (*ledgersto return nil, postgres.ResolveError(err) } - // Update single-ledger optimization state - if err := ret.UpdateSingleLedgerState(ctx, d.systemStoreFactory.Create(d.db).CountLedgersInBucket); err != nil { - logging.FromContext(ctx).Debugf("Failed to update single-ledger state: %v", err) - } - return ret, nil } @@ -101,11 +96,6 @@ func (d *Driver) OpenLedger(ctx context.Context, name string) (*ledgerstore.Stor store := d.ledgerStoreFactory.Create(d.bucketFactory.Create(ret.Bucket), *ret) - // Update single-ledger optimization state - if err := store.UpdateSingleLedgerState(ctx, d.systemStoreFactory.Create(d.db).CountLedgersInBucket); err != nil { - logging.FromContext(ctx).Debugf("Failed to update single-ledger state: %v", err) - } - return store, ret, err } diff --git a/internal/storage/driver/module.go b/internal/storage/driver/module.go index f0be31b16a..efce0029e9 100644 --- a/internal/storage/driver/module.go +++ b/internal/storage/driver/module.go @@ -31,12 +31,18 @@ func NewFXModule() fx.Option { &ledger.Ledger{}, ) }), + fx.Provide(func(tracerProvider trace.TracerProvider) systemstore.StoreFactory { + return systemstore.NewStoreFactory(systemstore.WithTracer( + tracerProvider.Tracer("SystemStore"), + )) + }), fx.Provide(func(params struct { fx.In - DB *bun.DB - TracerProvider trace.TracerProvider `optional:"true"` - MeterProvider metric.MeterProvider `optional:"true"` + DB *bun.DB + SystemStoreFactory systemstore.StoreFactory + TracerProvider trace.TracerProvider `optional:"true"` + MeterProvider metric.MeterProvider `optional:"true"` }) ledgerstore.Factory { options := make([]ledgerstore.Option, 0) if params.TracerProvider != nil { @@ -45,21 +51,25 @@ func NewFXModule() fx.Option { if params.MeterProvider != nil { options = append(options, ledgerstore.WithMeter(params.MeterProvider.Meter("store"))) } + options = append(options, ledgerstore.WithCountLedgersInBucketFunc( + func(ctx context.Context, bucketName string) (int, error) { + return params.SystemStoreFactory.Create(params.DB).CountLedgersInBucket(ctx, bucketName) + }, + )) return ledgerstore.NewFactory(params.DB, options...) }), fx.Provide(func( db *bun.DB, bucketFactory bucket.Factory, ledgerStoreFactory ledgerstore.Factory, + systemStoreFactory systemstore.StoreFactory, tracerProvider trace.TracerProvider, ) (*Driver, error) { return New( db, ledgerStoreFactory, bucketFactory, - systemstore.NewStoreFactory(systemstore.WithTracer( - tracerProvider.Tracer("SystemStore"), - )), + systemStoreFactory, WithTracer(tracerProvider.Tracer("StorageDriver")), ), nil }), diff --git a/internal/storage/ledger/accounts.go b/internal/storage/ledger/accounts.go index 65a5130fd0..209ad93b21 100644 --- a/internal/storage/ledger/accounts.go +++ b/internal/storage/ledger/accounts.go @@ -83,7 +83,7 @@ func (store *Store) DeleteAccountMetadata(ctx context.Context, account, key stri Set("metadata = metadata - ?", key). Where("address = ?", account) query = query.WhereGroup(" AND ", func(q *bun.UpdateQuery) *bun.UpdateQuery { - ledgerFilter, ledgerArgs := store.getLedgerFilterSQL() + ledgerFilter, ledgerArgs := store.getLedgerFilterSQL(ctx) if ledgerFilter != "" { return q.Where(ledgerFilter, ledgerArgs...) } diff --git a/internal/storage/ledger/logs.go b/internal/storage/ledger/logs.go index 71347f6b25..1268c732ec 100644 --- a/internal/storage/ledger/logs.go +++ b/internal/storage/ledger/logs.go @@ -125,7 +125,7 @@ func (store *Store) ReadLogWithIdempotencyKey(ctx context.Context, key string) ( Column("*"). Where("idempotency_key = ?", key). Limit(1) - query = store.applyLedgerFilter(query, "logs") + query = store.applyLedgerFilter(ctx, query, "logs") if err := query.Scan(ctx); err != nil { return nil, postgres.ResolveError(err) } diff --git a/internal/storage/ledger/resource.go b/internal/storage/ledger/resource.go index 6e734472a9..6b36035ce8 100644 --- a/internal/storage/ledger/resource.go +++ b/internal/storage/ledger/resource.go @@ -87,10 +87,10 @@ func (ctx repositoryHandlerBuildContext[Opts]) useFilter(v string, matchers ...f type repositoryHandler[Opts any] interface { filters() []filter - buildDataset(store *Store, query repositoryHandlerBuildContext[Opts]) (*bun.SelectQuery, error) - resolveFilter(store *Store, query ledgercontroller.ResourceQuery[Opts], operator, property string, value any) (string, []any, error) - project(store *Store, query ledgercontroller.ResourceQuery[Opts], selectQuery *bun.SelectQuery) (*bun.SelectQuery, error) - expand(store *Store, query ledgercontroller.ResourceQuery[Opts], property string) (*bun.SelectQuery, *joinCondition, error) + buildDataset(ctx context.Context, store *Store, query repositoryHandlerBuildContext[Opts]) (*bun.SelectQuery, error) + resolveFilter(ctx context.Context, store *Store, query ledgercontroller.ResourceQuery[Opts], operator, property string, value any) (string, []any, error) + project(ctx context.Context, store *Store, query ledgercontroller.ResourceQuery[Opts], selectQuery *bun.SelectQuery) (*bun.SelectQuery, error) + expand(ctx context.Context, store *Store, query ledgercontroller.ResourceQuery[Opts], property string) (*bun.SelectQuery, *joinCondition, error) } type resourceRepository[ResourceType, OptionsType any] struct { @@ -151,14 +151,14 @@ func (r *resourceRepository[ResourceType, OptionsType]) validateFilters(builder return ret, nil } -func (r *resourceRepository[ResourceType, OptionsType]) buildFilteredDataset(q ledgercontroller.ResourceQuery[OptionsType]) (*bun.SelectQuery, error) { +func (r *resourceRepository[ResourceType, OptionsType]) buildFilteredDataset(ctx context.Context, q ledgercontroller.ResourceQuery[OptionsType]) (*bun.SelectQuery, error) { filters, err := r.validateFilters(q.Builder) if err != nil { return nil, err } - dataset, err := r.resourceHandler.buildDataset(r.store, repositoryHandlerBuildContext[OptionsType]{ + dataset, err := r.resourceHandler.buildDataset(ctx, r.store, repositoryHandlerBuildContext[OptionsType]{ ResourceQuery: q, filters: filters, }) @@ -172,7 +172,7 @@ func (r *resourceRepository[ResourceType, OptionsType]) buildFilteredDataset(q l if q.Builder != nil { // Convert filters to where clause where, args, err := q.Builder.Build(query.ContextFn(func(key, operator string, value any) (string, []any, error) { - return r.resourceHandler.resolveFilter(r.store, q, operator, key, value) + return r.resourceHandler.resolveFilter(ctx, r.store, q, operator, key, value) })) if err != nil { return nil, err @@ -184,10 +184,10 @@ func (r *resourceRepository[ResourceType, OptionsType]) buildFilteredDataset(q l } } - return r.resourceHandler.project(r.store, q, dataset) + return r.resourceHandler.project(ctx, r.store, q, dataset) } -func (r *resourceRepository[ResourceType, OptionsType]) expand(dataset *bun.SelectQuery, q ledgercontroller.ResourceQuery[OptionsType]) (*bun.SelectQuery, error) { +func (r *resourceRepository[ResourceType, OptionsType]) expand(ctx context.Context, dataset *bun.SelectQuery, q ledgercontroller.ResourceQuery[OptionsType]) (*bun.SelectQuery, error) { dataset = r.store.db.NewSelect(). With("dataset", dataset). ModelTableExpr("dataset"). @@ -196,7 +196,7 @@ func (r *resourceRepository[ResourceType, OptionsType]) expand(dataset *bun.Sele slices.Sort(q.Expand) for i, expand := range q.Expand { - selectQuery, joinCondition, err := r.resourceHandler.expand(r.store, q, expand) + selectQuery, joinCondition, err := r.resourceHandler.expand(ctx, r.store, q, expand) if err != nil { return nil, err } @@ -222,12 +222,12 @@ func (r *resourceRepository[ResourceType, OptionsType]) expand(dataset *bun.Sele func (r *resourceRepository[ResourceType, OptionsType]) GetOne(ctx context.Context, query ledgercontroller.ResourceQuery[OptionsType]) (*ResourceType, error) { - finalQuery, err := r.buildFilteredDataset(query) + finalQuery, err := r.buildFilteredDataset(ctx, query) if err != nil { return nil, err } - finalQuery, err = r.expand(finalQuery, query) + finalQuery, err = r.expand(ctx, finalQuery, query) if err != nil { return nil, err } @@ -248,7 +248,7 @@ func (r *resourceRepository[ResourceType, OptionsType]) GetOne(ctx context.Conte func (r *resourceRepository[ResourceType, OptionsType]) Count(ctx context.Context, query ledgercontroller.ResourceQuery[OptionsType]) (int, error) { - finalQuery, err := r.buildFilteredDataset(query) + finalQuery, err := r.buildFilteredDataset(ctx, query) if err != nil { return 0, err } @@ -289,7 +289,7 @@ func (r *paginatedResourceRepository[ResourceType, OptionsType, PaginationQueryT panic("should not happen") } - finalQuery, err := r.buildFilteredDataset(resourceQuery) + finalQuery, err := r.buildFilteredDataset(ctx, resourceQuery) if err != nil { return nil, fmt.Errorf("building filtered dataset: %w", err) } @@ -299,7 +299,7 @@ func (r *paginatedResourceRepository[ResourceType, OptionsType, PaginationQueryT return nil, fmt.Errorf("paginating request: %w", err) } - finalQuery, err = r.expand(finalQuery, resourceQuery) + finalQuery, err = r.expand(ctx, finalQuery, resourceQuery) if err != nil { return nil, fmt.Errorf("expanding results: %w", err) } diff --git a/internal/storage/ledger/resource_accounts.go b/internal/storage/ledger/resource_accounts.go index eba9fd9c42..3c88683e35 100644 --- a/internal/storage/ledger/resource_accounts.go +++ b/internal/storage/ledger/resource_accounts.go @@ -1,6 +1,7 @@ package ledger import ( + "context" "fmt" ledger "github.com/formancehq/ledger/internal" ledgercontroller "github.com/formancehq/ledger/internal/controller/ledger" @@ -48,14 +49,14 @@ func (h accountsResourceHandler) filters() []filter { } } -func (h accountsResourceHandler) buildDataset(store *Store, opts repositoryHandlerBuildContext[any]) (*bun.SelectQuery, error) { +func (h accountsResourceHandler) buildDataset(ctx context.Context, store *Store, opts repositoryHandlerBuildContext[any]) (*bun.SelectQuery, error) { ret := store.db.NewSelect() // Build the query ret = ret. ModelTableExpr(store.GetPrefixedRelationName("accounts")). Column("address", "address_array", "first_usage", "insertion_date", "updated_at") - ret = store.applyLedgerFilter(ret, "accounts") + ret = store.applyLedgerFilter(ctx, ret, "accounts") if opts.PIT != nil && !opts.PIT.IsZero() { ret = ret.Where("accounts.first_usage <= ?", opts.PIT) @@ -68,7 +69,7 @@ func (h accountsResourceHandler) buildDataset(store *Store, opts repositoryHandl Column("accounts_address"). ColumnExpr("first_value(metadata) over (partition by accounts_address order by revision desc) as metadata"). Where("date <= ?", opts.PIT) - selectDistinctAccountMetadataHistories = store.applyLedgerFilter(selectDistinctAccountMetadataHistories, "accounts_metadata") + selectDistinctAccountMetadataHistories = store.applyLedgerFilter(ctx, selectDistinctAccountMetadataHistories, "accounts_metadata") ret = ret. Join( @@ -83,7 +84,7 @@ func (h accountsResourceHandler) buildDataset(store *Store, opts repositoryHandl return ret, nil } -func (h accountsResourceHandler) resolveFilter(store *Store, opts ledgercontroller.ResourceQuery[any], operator, property string, value any) (string, []any, error) { +func (h accountsResourceHandler) resolveFilter(ctx context.Context, store *Store, opts ledgercontroller.ResourceQuery[any], operator, property string, value any) (string, []any, error) { switch { case property == "address": return filterAccountAddress(value.(string), "address"), nil, nil @@ -103,12 +104,12 @@ func (h accountsResourceHandler) resolveFilter(store *Store, opts ledgercontroll DistinctOn("asset"). ColumnExpr("first_value((post_commit_effective_volumes).inputs - (post_commit_effective_volumes).outputs) over (partition by (accounts_address, asset) order by effective_date desc, seq desc) as balance"). Where("effective_date <= ?", opts.PIT) - selectBalance = store.applyLedgerFilter(selectBalance, "moves") + selectBalance = store.applyLedgerFilter(ctx, selectBalance, "moves") } else { selectBalance = selectBalance. ModelTableExpr(store.GetPrefixedRelationName("accounts_volumes")). ColumnExpr("input - output as balance") - selectBalance = store.applyLedgerFilter(selectBalance, "accounts_volumes") + selectBalance = store.applyLedgerFilter(ctx, selectBalance, "accounts_volumes") } if balanceRegex.MatchString(property) { @@ -133,11 +134,11 @@ func (h accountsResourceHandler) resolveFilter(store *Store, opts ledgercontroll } } -func (h accountsResourceHandler) project(store *Store, query ledgercontroller.ResourceQuery[any], selectQuery *bun.SelectQuery) (*bun.SelectQuery, error) { +func (h accountsResourceHandler) project(ctx context.Context, store *Store, query ledgercontroller.ResourceQuery[any], selectQuery *bun.SelectQuery) (*bun.SelectQuery, error) { return selectQuery.ColumnExpr("*"), nil } -func (h accountsResourceHandler) expand(store *Store, opts ledgercontroller.ResourceQuery[any], property string) (*bun.SelectQuery, *joinCondition, error) { +func (h accountsResourceHandler) expand(ctx context.Context, store *Store, opts ledgercontroller.ResourceQuery[any], property string) (*bun.SelectQuery, *joinCondition, error) { switch property { case "volumes": if !store.ledger.HasFeature(features.FeatureMovesHistory, "ON") { @@ -156,7 +157,7 @@ func (h accountsResourceHandler) expand(store *Store, opts ledgercontroller.Reso ModelTableExpr(store.GetPrefixedRelationName("moves")). DistinctOn("accounts_address, asset"). Column("accounts_address", "asset") - selectRowsQuery = store.applyLedgerFilter(selectRowsQuery, "moves") + selectRowsQuery = store.applyLedgerFilter(ctx, selectRowsQuery, "moves") if property == "volumes" { selectRowsQuery = selectRowsQuery. ColumnExpr("first_value(post_commit_volumes) over (partition by (accounts_address, asset) order by seq desc) as volumes"). @@ -171,7 +172,7 @@ func (h accountsResourceHandler) expand(store *Store, opts ledgercontroller.Reso ModelTableExpr(store.GetPrefixedRelationName("accounts_volumes")). Column("asset", "accounts_address"). ColumnExpr("(input, output)::"+store.GetPrefixedRelationName("volumes")+" as volumes") - selectRowsQuery = store.applyLedgerFilter(selectRowsQuery, "accounts_volumes") + selectRowsQuery = store.applyLedgerFilter(ctx, selectRowsQuery, "accounts_volumes") } return store.db.NewSelect(). diff --git a/internal/storage/ledger/resource_aggregated_balances.go b/internal/storage/ledger/resource_aggregated_balances.go index 370165e62c..df3baed020 100644 --- a/internal/storage/ledger/resource_aggregated_balances.go +++ b/internal/storage/ledger/resource_aggregated_balances.go @@ -1,6 +1,7 @@ package ledger import ( + "context" "errors" "fmt" ledger "github.com/formancehq/ledger/internal" @@ -46,14 +47,14 @@ func (h aggregatedBalancesResourceRepositoryHandler) filters() []filter { } } -func (h aggregatedBalancesResourceRepositoryHandler) buildDataset(store *Store, query repositoryHandlerBuildContext[ledgercontroller.GetAggregatedVolumesOptions]) (*bun.SelectQuery, error) { +func (h aggregatedBalancesResourceRepositoryHandler) buildDataset(ctx context.Context, store *Store, query repositoryHandlerBuildContext[ledgercontroller.GetAggregatedVolumesOptions]) (*bun.SelectQuery, error) { if query.UsePIT() { ret := store.db.NewSelect(). ModelTableExpr(store.GetPrefixedRelationName("moves")). DistinctOn("accounts_address, asset"). Column("accounts_address", "asset") - ret = store.applyLedgerFilter(ret, "moves") + ret = store.applyLedgerFilter(ctx, ret, "moves") if query.Opts.UseInsertionDate { if !store.ledger.HasFeature(features.FeatureMovesHistory, "ON") { return nil, ledgercontroller.NewErrMissingFeature(features.FeatureMovesHistory) @@ -77,7 +78,7 @@ func (h aggregatedBalancesResourceRepositoryHandler) buildDataset(store *Store, TableExpr(store.GetPrefixedRelationName("accounts")). Column("address_array"). Where("accounts.address = accounts_address") - subQuery = store.applyLedgerFilter(subQuery, "accounts") + subQuery = store.applyLedgerFilter(ctx, subQuery, "accounts") ret = ret. ColumnExpr("accounts.address_array as accounts_address_array"). @@ -91,7 +92,7 @@ func (h aggregatedBalancesResourceRepositoryHandler) buildDataset(store *Store, ColumnExpr("first_value(metadata) over (partition by accounts_address order by revision desc) as metadata"). Where("accounts_metadata.accounts_address = moves.accounts_address"). Where("date <= ?", query.PIT) - subQuery = store.applyLedgerFilter(subQuery, "accounts_metadata") + subQuery = store.applyLedgerFilter(ctx, subQuery, "accounts_metadata") ret = ret. Join(`left join lateral (?) accounts_metadata on true`, subQuery). @@ -104,14 +105,14 @@ func (h aggregatedBalancesResourceRepositoryHandler) buildDataset(store *Store, ModelTableExpr(store.GetPrefixedRelationName("accounts_volumes")). Column("asset", "accounts_address"). ColumnExpr("(input, output)::"+store.GetPrefixedRelationName("volumes")+" as volumes") - ret = store.applyLedgerFilter(ret, "accounts_volumes") + ret = store.applyLedgerFilter(ctx, ret, "accounts_volumes") if query.useFilter("metadata") || query.useFilter("address", isPartialAddress) { subQuery := store.db.NewSelect(). TableExpr(store.GetPrefixedRelationName("accounts")). Column("address"). Where("accounts.address = accounts_address") - subQuery = store.applyLedgerFilter(subQuery, "accounts") + subQuery = store.applyLedgerFilter(ctx, subQuery, "accounts") if query.useFilter("address") { subQuery = subQuery.ColumnExpr("address_array as accounts_address_array") @@ -130,7 +131,7 @@ func (h aggregatedBalancesResourceRepositoryHandler) buildDataset(store *Store, } } -func (h aggregatedBalancesResourceRepositoryHandler) resolveFilter(store *Store, query ledgercontroller.ResourceQuery[ledgercontroller.GetAggregatedVolumesOptions], operator, property string, value any) (string, []any, error) { +func (h aggregatedBalancesResourceRepositoryHandler) resolveFilter(ctx context.Context, store *Store, query ledgercontroller.ResourceQuery[ledgercontroller.GetAggregatedVolumesOptions], operator, property string, value any) (string, []any, error) { switch { case property == "address": return filterAccountAddress(value.(string), "accounts_address"), nil, nil @@ -149,11 +150,12 @@ func (h aggregatedBalancesResourceRepositoryHandler) resolveFilter(store *Store, } } -func (h aggregatedBalancesResourceRepositoryHandler) expand(_ *Store, _ ledgercontroller.ResourceQuery[ledgercontroller.GetAggregatedVolumesOptions], property string) (*bun.SelectQuery, *joinCondition, error) { +func (h aggregatedBalancesResourceRepositoryHandler) expand(_ context.Context, _ *Store, _ ledgercontroller.ResourceQuery[ledgercontroller.GetAggregatedVolumesOptions], property string) (*bun.SelectQuery, *joinCondition, error) { return nil, nil, errors.New("no expand available for aggregated balances") } func (h aggregatedBalancesResourceRepositoryHandler) project( + ctx context.Context, store *Store, _ ledgercontroller.ResourceQuery[ledgercontroller.GetAggregatedVolumesOptions], selectQuery *bun.SelectQuery, diff --git a/internal/storage/ledger/resource_logs.go b/internal/storage/ledger/resource_logs.go index d21694d96d..9abcfce7e7 100644 --- a/internal/storage/ledger/resource_logs.go +++ b/internal/storage/ledger/resource_logs.go @@ -1,6 +1,7 @@ package ledger import ( + "context" "errors" "fmt" ledgercontroller "github.com/formancehq/ledger/internal/controller/ledger" @@ -21,15 +22,15 @@ func (h logsResourceHandler) filters() []filter { } } -func (h logsResourceHandler) buildDataset(store *Store, _ repositoryHandlerBuildContext[any]) (*bun.SelectQuery, error) { +func (h logsResourceHandler) buildDataset(ctx context.Context, store *Store, _ repositoryHandlerBuildContext[any]) (*bun.SelectQuery, error) { ret := store.db.NewSelect(). ModelTableExpr(store.GetPrefixedRelationName("logs")). ColumnExpr("*") - ret = store.applyLedgerFilter(ret, "logs") + ret = store.applyLedgerFilter(ctx, ret, "logs") return ret, nil } -func (h logsResourceHandler) resolveFilter(_ *Store, _ ledgercontroller.ResourceQuery[any], operator, property string, value any) (string, []any, error) { +func (h logsResourceHandler) resolveFilter(_ context.Context, _ *Store, _ ledgercontroller.ResourceQuery[any], operator, property string, value any) (string, []any, error) { switch { case property == "date" || property == "id": return fmt.Sprintf("%s %s ?", property, convertOperatorToSQL(operator)), []any{value}, nil @@ -38,11 +39,11 @@ func (h logsResourceHandler) resolveFilter(_ *Store, _ ledgercontroller.Resource } } -func (h logsResourceHandler) expand(_ *Store, _ ledgercontroller.ResourceQuery[any], _ string) (*bun.SelectQuery, *joinCondition, error) { +func (h logsResourceHandler) expand(_ context.Context, _ *Store, _ ledgercontroller.ResourceQuery[any], _ string) (*bun.SelectQuery, *joinCondition, error) { return nil, nil, errors.New("no expand supported") } -func (h logsResourceHandler) project(store *Store, query ledgercontroller.ResourceQuery[any], selectQuery *bun.SelectQuery) (*bun.SelectQuery, error) { +func (h logsResourceHandler) project(_ context.Context, store *Store, query ledgercontroller.ResourceQuery[any], selectQuery *bun.SelectQuery) (*bun.SelectQuery, error) { return selectQuery.ColumnExpr("*"), nil } diff --git a/internal/storage/ledger/resource_transactions.go b/internal/storage/ledger/resource_transactions.go index cbb86a1881..d1f2d70e61 100644 --- a/internal/storage/ledger/resource_transactions.go +++ b/internal/storage/ledger/resource_transactions.go @@ -1,6 +1,7 @@ package ledger import ( + "context" "fmt" ledger "github.com/formancehq/ledger/internal" ledgercontroller "github.com/formancehq/ledger/internal/controller/ledger" @@ -68,7 +69,7 @@ func (h transactionsResourceHandler) filters() []filter { } } -func (h transactionsResourceHandler) buildDataset(store *Store, opts repositoryHandlerBuildContext[any]) (*bun.SelectQuery, error) { +func (h transactionsResourceHandler) buildDataset(ctx context.Context, store *Store, opts repositoryHandlerBuildContext[any]) (*bun.SelectQuery, error) { ret := store.db.NewSelect(). ModelTableExpr(store.GetPrefixedRelationName("transactions")). Column( @@ -84,7 +85,7 @@ func (h transactionsResourceHandler) buildDataset(store *Store, opts repositoryH "sources_arrays", "destinations_arrays", ) - ret = store.applyLedgerFilter(ret, "transactions") + ret = store.applyLedgerFilter(ctx, ret, "transactions") if slices.Contains(opts.Expand, "volumes") { ret = ret.Column("post_commit_volumes") @@ -101,7 +102,7 @@ func (h transactionsResourceHandler) buildDataset(store *Store, opts repositoryH Column("transactions_id", "metadata"). Order("transactions_id", "revision desc"). Where("date <= ?", opts.PIT) - selectDistinctTransactionMetadataHistories = store.applyLedgerFilter(selectDistinctTransactionMetadataHistories, "transactions_metadata") + selectDistinctTransactionMetadataHistories = store.applyLedgerFilter(ctx, selectDistinctTransactionMetadataHistories, "transactions_metadata") ret = ret. Join( @@ -122,7 +123,7 @@ func (h transactionsResourceHandler) buildDataset(store *Store, opts repositoryH return ret, nil } -func (h transactionsResourceHandler) resolveFilter(store *Store, opts ledgercontroller.ResourceQuery[any], operator, property string, value any) (string, []any, error) { +func (h transactionsResourceHandler) resolveFilter(ctx context.Context, store *Store, opts ledgercontroller.ResourceQuery[any], operator, property string, value any) (string, []any, error) { switch { case property == "id": return fmt.Sprintf("id %s ?", convertOperatorToSQL(operator)), []any{value}, nil @@ -154,11 +155,11 @@ func (h transactionsResourceHandler) resolveFilter(store *Store, opts ledgercont } } -func (h transactionsResourceHandler) project(store *Store, query ledgercontroller.ResourceQuery[any], selectQuery *bun.SelectQuery) (*bun.SelectQuery, error) { +func (h transactionsResourceHandler) project(ctx context.Context, store *Store, query ledgercontroller.ResourceQuery[any], selectQuery *bun.SelectQuery) (*bun.SelectQuery, error) { return selectQuery.ColumnExpr("*"), nil } -func (h transactionsResourceHandler) expand(store *Store, opts ledgercontroller.ResourceQuery[any], property string) (*bun.SelectQuery, *joinCondition, error) { +func (h transactionsResourceHandler) expand(ctx context.Context, store *Store, opts ledgercontroller.ResourceQuery[any], property string) (*bun.SelectQuery, *joinCondition, error) { if property != "effectiveVolumes" { return nil, nil, nil } @@ -169,7 +170,7 @@ func (h transactionsResourceHandler) expand(store *Store, opts ledgercontroller. Column("transactions_id", "accounts_address", "asset"). ColumnExpr(`first_value(moves.post_commit_effective_volumes) over (partition by (transactions_id, accounts_address, asset) order by seq desc) as post_commit_effective_volumes`). Where("transactions_id in (select id from dataset)") - movesSubquery = store.applyLedgerFilter(movesSubquery, "moves") + movesSubquery = store.applyLedgerFilter(ctx, movesSubquery, "moves") ret := store.db.NewSelect(). TableExpr( diff --git a/internal/storage/ledger/resource_volumes.go b/internal/storage/ledger/resource_volumes.go index 03a76a4e53..706f73dee3 100644 --- a/internal/storage/ledger/resource_volumes.go +++ b/internal/storage/ledger/resource_volumes.go @@ -1,6 +1,7 @@ package ledger import ( + "context" "errors" "fmt" ledger "github.com/formancehq/ledger/internal" @@ -54,7 +55,7 @@ func (h volumesResourceHandler) filters() []filter { } } -func (h volumesResourceHandler) buildDataset(store *Store, query repositoryHandlerBuildContext[ledgercontroller.GetVolumesOptions]) (*bun.SelectQuery, error) { +func (h volumesResourceHandler) buildDataset(ctx context.Context, store *Store, query repositoryHandlerBuildContext[ledgercontroller.GetVolumesOptions]) (*bun.SelectQuery, error) { var selectVolumes *bun.SelectQuery @@ -66,14 +67,14 @@ func (h volumesResourceHandler) buildDataset(store *Store, query repositoryHandl ColumnExpr("accounts_address as account"). ModelTableExpr(store.GetPrefixedRelationName("accounts_volumes")). Order("accounts_address", "asset") - selectVolumes = store.applyLedgerFilter(selectVolumes, "accounts_volumes") + selectVolumes = store.applyLedgerFilter(ctx, selectVolumes, "accounts_volumes") if query.useFilter("metadata") || needAddressSegments { subQuery := store.db.NewSelect(). TableExpr(store.GetPrefixedRelationName("accounts")). Column("address"). Where("accounts.address = accounts_address") - subQuery = store.applyLedgerFilter(subQuery, "accounts") + subQuery = store.applyLedgerFilter(ctx, subQuery, "accounts") if needAddressSegments { subQuery = subQuery.ColumnExpr("address_array as account_array") @@ -101,7 +102,7 @@ func (h volumesResourceHandler) buildDataset(store *Store, query repositoryHandl ModelTableExpr(store.GetPrefixedRelationName("moves")). GroupExpr("accounts_address, asset"). Order("accounts_address", "asset") - selectVolumes = store.applyLedgerFilter(selectVolumes, "moves") + selectVolumes = store.applyLedgerFilter(ctx, selectVolumes, "moves") dateFilterColumn := "effective_date" if query.Opts.UseInsertionDate { @@ -121,7 +122,7 @@ func (h volumesResourceHandler) buildDataset(store *Store, query repositoryHandl TableExpr(store.GetPrefixedRelationName("accounts")). Column("address_array"). Where("accounts.address = accounts_address") - subQuery = store.applyLedgerFilter(subQuery, "accounts") + subQuery = store.applyLedgerFilter(ctx, subQuery, "accounts") selectVolumes. ColumnExpr("(array_agg(accounts.address_array))[1] as account_array"). @@ -134,7 +135,7 @@ func (h volumesResourceHandler) buildDataset(store *Store, query repositoryHandl ModelTableExpr(store.GetPrefixedRelationName("accounts_metadata")). ColumnExpr("first_value(metadata) over (partition by accounts_address order by revision desc) as metadata"). Where("accounts_metadata.accounts_address = moves.accounts_address") - subQuery = store.applyLedgerFilter(subQuery, "accounts_metadata") + subQuery = store.applyLedgerFilter(ctx, subQuery, "accounts_metadata") selectVolumes = selectVolumes. Join(`left join lateral (?) accounts_metadata on true`, subQuery). @@ -146,6 +147,7 @@ func (h volumesResourceHandler) buildDataset(store *Store, query repositoryHandl } func (h volumesResourceHandler) resolveFilter( + ctx context.Context, store *Store, opts ledgercontroller.ResourceQuery[ledgercontroller.GetVolumesOptions], operator, property string, @@ -184,6 +186,7 @@ func (h volumesResourceHandler) resolveFilter( } func (h volumesResourceHandler) project( + ctx context.Context, store *Store, query ledgercontroller.ResourceQuery[ledgercontroller.GetVolumesOptions], selectQuery *bun.SelectQuery, @@ -208,7 +211,7 @@ func (h volumesResourceHandler) project( GroupExpr("account, asset"), nil } -func (h volumesResourceHandler) expand(_ *Store, _ ledgercontroller.ResourceQuery[ledgercontroller.GetVolumesOptions], property string) (*bun.SelectQuery, *joinCondition, error) { +func (h volumesResourceHandler) expand(_ context.Context, _ *Store, _ ledgercontroller.ResourceQuery[ledgercontroller.GetVolumesOptions], property string) (*bun.SelectQuery, *joinCondition, error) { return nil, nil, errors.New("no expansion available") } diff --git a/internal/storage/ledger/store.go b/internal/storage/ledger/store.go index 1d9e0fe166..dad55b0cc0 100644 --- a/internal/storage/ledger/store.go +++ b/internal/storage/ledger/store.go @@ -4,7 +4,6 @@ import ( "context" "database/sql" "fmt" - "sync" "github.com/formancehq/go-libs/v2/bun/bunpaginate" "github.com/formancehq/go-libs/v2/migrations" @@ -22,17 +21,12 @@ import ( "github.com/uptrace/bun" ) -type singleLedgerOptimization struct { - mu sync.RWMutex - enabled bool -} - type Store struct { db bun.IDB bucket bucket.Bucket ledger ledger.Ledger - singleLedgerCache *singleLedgerOptimization + countLedgersInBucket func(ctx context.Context, bucketName string) (int, error) tracer trace.Tracer meter metric.Meter @@ -146,19 +140,24 @@ func (store *Store) GetPrefixedRelationName(v string) string { // isSingleLedger returns true if the bucket optimization is enabled for single-ledger scenarios. // This allows queries to skip the WHERE ledger = ? clause when there's only one ledger in the bucket. -func (store *Store) isSingleLedger() bool { - if store.singleLedgerCache == nil { +// isSingleLedger checks in real-time if the bucket contains only one ledger. +// This query is fast since the ledgers table has very few rows. +func (store *Store) isSingleLedger(ctx context.Context) bool { + if store.countLedgersInBucket == nil { + return false + } + count, err := store.countLedgersInBucket(ctx, store.ledger.Bucket) + if err != nil { + // On error, be conservative and assume multi-ledger return false } - store.singleLedgerCache.mu.RLock() - defer store.singleLedgerCache.mu.RUnlock() - return store.singleLedgerCache.enabled + return count == 1 } // applyLedgerFilter conditionally applies the WHERE ledger = ? clause to a query. // If the bucket contains only one ledger, the filter is skipped for performance optimization. -func (store *Store) applyLedgerFilter(query *bun.SelectQuery, tableAlias string) *bun.SelectQuery { - if store.isSingleLedger() { +func (store *Store) applyLedgerFilter(ctx context.Context, query *bun.SelectQuery, tableAlias string) *bun.SelectQuery { + if store.isSingleLedger(ctx) { return query } return query.Where(fmt.Sprintf("%s.ledger = ?", tableAlias), store.ledger.Name) @@ -166,33 +165,13 @@ func (store *Store) applyLedgerFilter(query *bun.SelectQuery, tableAlias string) // getLedgerFilterSQL returns the SQL condition (without conjunction) and arguments for ledger filtering. // Returns empty string and nil args if single-ledger optimization is enabled. -func (store *Store) getLedgerFilterSQL() (string, []any) { - if store.isSingleLedger() { +func (store *Store) getLedgerFilterSQL(ctx context.Context) (string, []any) { + if store.isSingleLedger(ctx) { return "", nil } return "ledger = ?", []any{store.ledger.Name} } -// UpdateSingleLedgerState checks if the bucket contains only one ledger and updates the cache accordingly. -// This should be called during store initialization. -func (store *Store) UpdateSingleLedgerState(ctx context.Context, countFunc func(ctx context.Context, bucketName string) (int, error)) error { - if store.singleLedgerCache == nil { - store.singleLedgerCache = &singleLedgerOptimization{} - } - - count, err := countFunc(ctx, store.ledger.Bucket) - if err != nil { - return err - } - - store.singleLedgerCache.mu.Lock() - defer store.singleLedgerCache.mu.Unlock() - - store.singleLedgerCache.enabled = (count == 1) - - return nil -} - func validateAddressFilter(ledger ledger.Ledger, operator string, value any) error { if operator != "$match" { return fmt.Errorf("'address' column can only be used with $match, operator used is: %s", operator) @@ -362,6 +341,12 @@ func WithTracer(tracer trace.Tracer) Option { } } +func WithCountLedgersInBucketFunc(countFunc func(ctx context.Context, bucketName string) (int, error)) Option { + return func(s *Store) { + s.countLedgersInBucket = countFunc + } +} + var defaultOptions = []Option{ WithMeter(noopmetrics.Meter{}), WithTracer(nooptracer.Tracer{}), diff --git a/internal/storage/ledger/transactions.go b/internal/storage/ledger/transactions.go index 209776145d..c1ada8c14e 100644 --- a/internal/storage/ledger/transactions.go +++ b/internal/storage/ledger/transactions.go @@ -182,7 +182,7 @@ func (store *Store) updateTxWithRetrieve(ctx context.Context, id int, query *bun ColumnExpr("*, false as modified"). Where("id = ?", id). Limit(1) - return store.applyLedgerFilter(query, "transactions") + return store.applyLedgerFilter(ctx, query, "transactions") }(), ), ). @@ -208,7 +208,7 @@ func (store *Store) RevertTransaction(ctx context.Context, id int, at time.Time) Where("reverted_at is null"). Returning("*") query = query.WhereGroup(" AND ", func(q *bun.UpdateQuery) *bun.UpdateQuery { - ledgerFilter, ledgerArgs := store.getLedgerFilterSQL() + ledgerFilter, ledgerArgs := store.getLedgerFilterSQL(ctx) if ledgerFilter != "" { return q.Where(ledgerFilter, ledgerArgs...) } @@ -247,7 +247,7 @@ func (store *Store) UpdateTransactionMetadata(ctx context.Context, id int, m met Where("not (metadata @> ?)", m). Returning("*") updateQuery = updateQuery.WhereGroup(" AND ", func(q *bun.UpdateQuery) *bun.UpdateQuery { - ledgerFilter, ledgerArgs := store.getLedgerFilterSQL() + ledgerFilter, ledgerArgs := store.getLedgerFilterSQL(ctx) if ledgerFilter != "" { return q.Where(ledgerFilter, ledgerArgs...) } @@ -282,7 +282,7 @@ func (store *Store) DeleteTransactionMetadata(ctx context.Context, id int, key s Where("metadata -> ? is not null", key). Returning("*") updateQuery = updateQuery.WhereGroup(" AND ", func(q *bun.UpdateQuery) *bun.UpdateQuery { - ledgerFilter, ledgerArgs := store.getLedgerFilterSQL() + ledgerFilter, ledgerArgs := store.getLedgerFilterSQL(ctx) if ledgerFilter != "" { return q.Where(ledgerFilter, ledgerArgs...) } From 6844d4f375734dda0f30a2de2111184e5d20d444 Mon Sep 17 00:00:00 2001 From: Maxence Maireaux Date: Thu, 30 Oct 2025 09:17:15 +0100 Subject: [PATCH 4/5] fix: address Geoffrey's review comments - Handle errors properly in isSingleLedger by returning (bool, error) - Record errors in trace span instead of silently ignoring them - Add explanatory comment for SystemStoreFactory provider - Apply conservative behavior (use filter) when counting fails Addresses review comments from PR #1090 --- internal/storage/driver/module.go | 2 ++ internal/storage/ledger/store.go | 27 ++++++++++++++++++++------- 2 files changed, 22 insertions(+), 7 deletions(-) diff --git a/internal/storage/driver/module.go b/internal/storage/driver/module.go index efce0029e9..66e7fed8bc 100644 --- a/internal/storage/driver/module.go +++ b/internal/storage/driver/module.go @@ -31,6 +31,8 @@ func NewFXModule() fx.Option { &ledger.Ledger{}, ) }), + // SystemStoreFactory is provided separately to be used both by the Driver + // and by the ledger store factory for counting ledgers in buckets fx.Provide(func(tracerProvider trace.TracerProvider) systemstore.StoreFactory { return systemstore.NewStoreFactory(systemstore.WithTracer( tracerProvider.Tracer("SystemStore"), diff --git a/internal/storage/ledger/store.go b/internal/storage/ledger/store.go index dad55b0cc0..37f147b42b 100644 --- a/internal/storage/ledger/store.go +++ b/internal/storage/ledger/store.go @@ -142,22 +142,28 @@ func (store *Store) GetPrefixedRelationName(v string) string { // This allows queries to skip the WHERE ledger = ? clause when there's only one ledger in the bucket. // isSingleLedger checks in real-time if the bucket contains only one ledger. // This query is fast since the ledgers table has very few rows. -func (store *Store) isSingleLedger(ctx context.Context) bool { +func (store *Store) isSingleLedger(ctx context.Context) (bool, error) { if store.countLedgersInBucket == nil { - return false + return false, nil } count, err := store.countLedgersInBucket(ctx, store.ledger.Bucket) if err != nil { - // On error, be conservative and assume multi-ledger - return false + return false, fmt.Errorf("failed to count ledgers in bucket: %w", err) } - return count == 1 + return count == 1, nil } // applyLedgerFilter conditionally applies the WHERE ledger = ? clause to a query. // If the bucket contains only one ledger, the filter is skipped for performance optimization. +// On error, conservatively applies the filter. func (store *Store) applyLedgerFilter(ctx context.Context, query *bun.SelectQuery, tableAlias string) *bun.SelectQuery { - if store.isSingleLedger(ctx) { + singleLedger, err := store.isSingleLedger(ctx) + if err != nil { + // Log error but continue with conservative behavior (apply filter) + trace.SpanFromContext(ctx).RecordError(err) + return query.Where(fmt.Sprintf("%s.ledger = ?", tableAlias), store.ledger.Name) + } + if singleLedger { return query } return query.Where(fmt.Sprintf("%s.ledger = ?", tableAlias), store.ledger.Name) @@ -165,8 +171,15 @@ func (store *Store) applyLedgerFilter(ctx context.Context, query *bun.SelectQuer // getLedgerFilterSQL returns the SQL condition (without conjunction) and arguments for ledger filtering. // Returns empty string and nil args if single-ledger optimization is enabled. +// On error, conservatively returns the filter. func (store *Store) getLedgerFilterSQL(ctx context.Context) (string, []any) { - if store.isSingleLedger(ctx) { + singleLedger, err := store.isSingleLedger(ctx) + if err != nil { + // Log error but continue with conservative behavior (return filter) + trace.SpanFromContext(ctx).RecordError(err) + return "ledger = ?", []any{store.ledger.Name} + } + if singleLedger { return "", nil } return "ledger = ?", []any{store.ledger.Name} From 908b6c46d2476627c4d283e7d306d2c20da4a48c Mon Sep 17 00:00:00 2001 From: Maxence Maireaux Date: Thu, 30 Oct 2025 10:16:20 +0100 Subject: [PATCH 5/5] refactor: remove unused fields from mock structs in generated test files --- internal/storage/driver/buckets_generated_test.go | 12 ++++-------- internal/storage/driver/ledger_generated_test.go | 3 --- internal/storage/driver/system_generated_test.go | 3 --- 3 files changed, 4 insertions(+), 14 deletions(-) diff --git a/internal/storage/driver/buckets_generated_test.go b/internal/storage/driver/buckets_generated_test.go index 2f0f14c95b..7331e1912c 100644 --- a/internal/storage/driver/buckets_generated_test.go +++ b/internal/storage/driver/buckets_generated_test.go @@ -3,8 +3,6 @@ // Generated by this command: // // mockgen -write_source_comment=false -write_package_comment=false -source ../bucket/bucket.go -destination buckets_generated_test.go -package driver --mock_names Factory=BucketFactory . Factory -// - package driver import ( @@ -22,7 +20,6 @@ import ( type MockBucket struct { ctrl *gomock.Controller recorder *MockBucketMockRecorder - isgomock struct{} } // MockBucketMockRecorder is the mock recorder for MockBucket. @@ -43,17 +40,17 @@ func (m *MockBucket) EXPECT() *MockBucketMockRecorder { } // AddLedger mocks base method. -func (m *MockBucket) AddLedger(ctx context.Context, db bun.IDB, arg2 ledger.Ledger) error { +func (m *MockBucket) AddLedger(ctx context.Context, db bun.IDB, ledger ledger.Ledger) error { m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "AddLedger", ctx, db, arg2) + ret := m.ctrl.Call(m, "AddLedger", ctx, db, ledger) ret0, _ := ret[0].(error) return ret0 } // AddLedger indicates an expected call of AddLedger. -func (mr *MockBucketMockRecorder) AddLedger(ctx, db, arg2 any) *gomock.Call { +func (mr *MockBucketMockRecorder) AddLedger(ctx, db, ledger any) *gomock.Call { mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "AddLedger", reflect.TypeOf((*MockBucket)(nil).AddLedger), ctx, db, arg2) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "AddLedger", reflect.TypeOf((*MockBucket)(nil).AddLedger), ctx, db, ledger) } // GetLastVersion mocks base method. @@ -154,7 +151,6 @@ func (mr *MockBucketMockRecorder) Migrate(ctx, db any, opts ...any) *gomock.Call type BucketFactory struct { ctrl *gomock.Controller recorder *BucketFactoryMockRecorder - isgomock struct{} } // BucketFactoryMockRecorder is the mock recorder for BucketFactory. diff --git a/internal/storage/driver/ledger_generated_test.go b/internal/storage/driver/ledger_generated_test.go index b940e41b32..fb2f8a6ab1 100644 --- a/internal/storage/driver/ledger_generated_test.go +++ b/internal/storage/driver/ledger_generated_test.go @@ -3,8 +3,6 @@ // Generated by this command: // // mockgen -write_source_comment=false -write_package_comment=false -source ../ledger/factory.go -destination ledger_generated_test.go -package driver --mock_names Factory=LedgerStoreFactory . Factory -// - package driver import ( @@ -20,7 +18,6 @@ import ( type LedgerStoreFactory struct { ctrl *gomock.Controller recorder *LedgerStoreFactoryMockRecorder - isgomock struct{} } // LedgerStoreFactoryMockRecorder is the mock recorder for LedgerStoreFactory. diff --git a/internal/storage/driver/system_generated_test.go b/internal/storage/driver/system_generated_test.go index 90d8befcaf..1f79df3e9e 100644 --- a/internal/storage/driver/system_generated_test.go +++ b/internal/storage/driver/system_generated_test.go @@ -3,8 +3,6 @@ // Generated by this command: // // mockgen -write_source_comment=false -write_package_comment=false -source ../system/store.go -destination system_generated_test.go -package driver --mock_names Store=SystemStore . Store -// - package driver import ( @@ -23,7 +21,6 @@ import ( type SystemStore struct { ctrl *gomock.Controller recorder *SystemStoreMockRecorder - isgomock struct{} } // SystemStoreMockRecorder is the mock recorder for SystemStore.