Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 18 additions & 6 deletions internal/storage/driver/module.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,12 +31,20 @@ func NewFXModule() fx.Option {
&ledger.Ledger{},
)
}),
// SystemStoreFactory is provided separately to be used both by the Driver
// and by the ledger store factory for counting ledgers in buckets
fx.Provide(func(tracerProvider trace.TracerProvider) systemstore.StoreFactory {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You don't need to recreate the system store.
The system store is already existing in the *Driver service.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Justification :

  • The SystemStoreFactory must be injected into the ledgerstore.Factory to provide the CountLedgersInBucket function
  • The Create(params.DB) call in the closure does not really "recreate" a heavy store - it is just a wrapper struct around the DB
  • The context needed for CountLedgersInBucket is only available at runtime, so the closure is necessary

return systemstore.NewStoreFactory(systemstore.WithTracer(
tracerProvider.Tracer("SystemStore"),
))
}),
fx.Provide(func(params struct {
fx.In

DB *bun.DB
TracerProvider trace.TracerProvider `optional:"true"`
MeterProvider metric.MeterProvider `optional:"true"`
DB *bun.DB
SystemStoreFactory systemstore.StoreFactory
TracerProvider trace.TracerProvider `optional:"true"`
MeterProvider metric.MeterProvider `optional:"true"`
}) ledgerstore.Factory {
options := make([]ledgerstore.Option, 0)
if params.TracerProvider != nil {
Expand All @@ -45,21 +53,25 @@ func NewFXModule() fx.Option {
if params.MeterProvider != nil {
options = append(options, ledgerstore.WithMeter(params.MeterProvider.Meter("store")))
}
options = append(options, ledgerstore.WithCountLedgersInBucketFunc(
func(ctx context.Context, bucketName string) (int, error) {
return params.SystemStoreFactory.Create(params.DB).CountLedgersInBucket(ctx, bucketName)
},
))
return ledgerstore.NewFactory(params.DB, options...)
}),
fx.Provide(func(
db *bun.DB,
bucketFactory bucket.Factory,
ledgerStoreFactory ledgerstore.Factory,
systemStoreFactory systemstore.StoreFactory,
tracerProvider trace.TracerProvider,
) (*Driver, error) {
return New(
db,
ledgerStoreFactory,
bucketFactory,
systemstore.NewStoreFactory(systemstore.WithTracer(
tracerProvider.Tracer("SystemStore"),
)),
systemStoreFactory,
WithTracer(tracerProvider.Tracer("StorageDriver")),
), nil
}),
Expand Down
15 changes: 15 additions & 0 deletions internal/storage/driver/system_generated_test.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

15 changes: 11 additions & 4 deletions internal/storage/ledger/accounts.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"fmt"
. "github.com/formancehq/go-libs/v2/collectionutils"
"github.com/formancehq/ledger/internal/tracing"
"github.com/uptrace/bun"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/trace"
"regexp"
Expand Down Expand Up @@ -77,12 +78,18 @@ func (store *Store) DeleteAccountMetadata(ctx context.Context, account, key stri
store.tracer,
store.deleteAccountMetadataHistogram,
tracing.NoResult(func(ctx context.Context) error {
_, err := store.db.NewUpdate().
query := store.db.NewUpdate().
ModelTableExpr(store.GetPrefixedRelationName("accounts")).
Set("metadata = metadata - ?", key).
Where("address = ?", account).
Where("ledger = ?", store.ledger.Name).
Exec(ctx)
Where("address = ?", account)
query = query.WhereGroup(" AND ", func(q *bun.UpdateQuery) *bun.UpdateQuery {
ledgerFilter, ledgerArgs := store.getLedgerFilterSQL(ctx)
if ledgerFilter != "" {
return q.Where(ledgerFilter, ledgerArgs...)
}
return q
})
_, err := query.Exec(ctx)
return postgres.ResolveError(err)
}),
)
Expand Down
8 changes: 4 additions & 4 deletions internal/storage/ledger/logs.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,14 +119,14 @@ func (store *Store) ReadLogWithIdempotencyKey(ctx context.Context, key string) (
store.readLogWithIdempotencyKeyHistogram,
func(ctx context.Context) (*ledger.Log, error) {
ret := &Log{}
if err := store.db.NewSelect().
query := store.db.NewSelect().
Model(ret).
ModelTableExpr(store.GetPrefixedRelationName("logs")).
Column("*").
Where("idempotency_key = ?", key).
Where("ledger = ?", store.ledger.Name).
Limit(1).
Scan(ctx); err != nil {
Limit(1)
query = store.applyLedgerFilter(ctx, query, "logs")
if err := query.Scan(ctx); err != nil {
return nil, postgres.ResolveError(err)
}

Expand Down
30 changes: 15 additions & 15 deletions internal/storage/ledger/resource.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,10 +87,10 @@ func (ctx repositoryHandlerBuildContext[Opts]) useFilter(v string, matchers ...f

type repositoryHandler[Opts any] interface {
filters() []filter
buildDataset(store *Store, query repositoryHandlerBuildContext[Opts]) (*bun.SelectQuery, error)
resolveFilter(store *Store, query ledgercontroller.ResourceQuery[Opts], operator, property string, value any) (string, []any, error)
project(store *Store, query ledgercontroller.ResourceQuery[Opts], selectQuery *bun.SelectQuery) (*bun.SelectQuery, error)
expand(store *Store, query ledgercontroller.ResourceQuery[Opts], property string) (*bun.SelectQuery, *joinCondition, error)
buildDataset(ctx context.Context, store *Store, query repositoryHandlerBuildContext[Opts]) (*bun.SelectQuery, error)
resolveFilter(ctx context.Context, store *Store, query ledgercontroller.ResourceQuery[Opts], operator, property string, value any) (string, []any, error)
project(ctx context.Context, store *Store, query ledgercontroller.ResourceQuery[Opts], selectQuery *bun.SelectQuery) (*bun.SelectQuery, error)
expand(ctx context.Context, store *Store, query ledgercontroller.ResourceQuery[Opts], property string) (*bun.SelectQuery, *joinCondition, error)
}

type resourceRepository[ResourceType, OptionsType any] struct {
Expand Down Expand Up @@ -151,14 +151,14 @@ func (r *resourceRepository[ResourceType, OptionsType]) validateFilters(builder
return ret, nil
}

func (r *resourceRepository[ResourceType, OptionsType]) buildFilteredDataset(q ledgercontroller.ResourceQuery[OptionsType]) (*bun.SelectQuery, error) {
func (r *resourceRepository[ResourceType, OptionsType]) buildFilteredDataset(ctx context.Context, q ledgercontroller.ResourceQuery[OptionsType]) (*bun.SelectQuery, error) {

filters, err := r.validateFilters(q.Builder)
if err != nil {
return nil, err
}

dataset, err := r.resourceHandler.buildDataset(r.store, repositoryHandlerBuildContext[OptionsType]{
dataset, err := r.resourceHandler.buildDataset(ctx, r.store, repositoryHandlerBuildContext[OptionsType]{
ResourceQuery: q,
filters: filters,
})
Expand All @@ -172,7 +172,7 @@ func (r *resourceRepository[ResourceType, OptionsType]) buildFilteredDataset(q l
if q.Builder != nil {
// Convert filters to where clause
where, args, err := q.Builder.Build(query.ContextFn(func(key, operator string, value any) (string, []any, error) {
return r.resourceHandler.resolveFilter(r.store, q, operator, key, value)
return r.resourceHandler.resolveFilter(ctx, r.store, q, operator, key, value)
}))
if err != nil {
return nil, err
Expand All @@ -184,10 +184,10 @@ func (r *resourceRepository[ResourceType, OptionsType]) buildFilteredDataset(q l
}
}

return r.resourceHandler.project(r.store, q, dataset)
return r.resourceHandler.project(ctx, r.store, q, dataset)
}

func (r *resourceRepository[ResourceType, OptionsType]) expand(dataset *bun.SelectQuery, q ledgercontroller.ResourceQuery[OptionsType]) (*bun.SelectQuery, error) {
func (r *resourceRepository[ResourceType, OptionsType]) expand(ctx context.Context, dataset *bun.SelectQuery, q ledgercontroller.ResourceQuery[OptionsType]) (*bun.SelectQuery, error) {
dataset = r.store.db.NewSelect().
With("dataset", dataset).
ModelTableExpr("dataset").
Expand All @@ -196,7 +196,7 @@ func (r *resourceRepository[ResourceType, OptionsType]) expand(dataset *bun.Sele
slices.Sort(q.Expand)

for i, expand := range q.Expand {
selectQuery, joinCondition, err := r.resourceHandler.expand(r.store, q, expand)
selectQuery, joinCondition, err := r.resourceHandler.expand(ctx, r.store, q, expand)
if err != nil {
return nil, err
}
Expand All @@ -222,12 +222,12 @@ func (r *resourceRepository[ResourceType, OptionsType]) expand(dataset *bun.Sele

func (r *resourceRepository[ResourceType, OptionsType]) GetOne(ctx context.Context, query ledgercontroller.ResourceQuery[OptionsType]) (*ResourceType, error) {

finalQuery, err := r.buildFilteredDataset(query)
finalQuery, err := r.buildFilteredDataset(ctx, query)
if err != nil {
return nil, err
}

finalQuery, err = r.expand(finalQuery, query)
finalQuery, err = r.expand(ctx, finalQuery, query)
if err != nil {
return nil, err
}
Expand All @@ -248,7 +248,7 @@ func (r *resourceRepository[ResourceType, OptionsType]) GetOne(ctx context.Conte

func (r *resourceRepository[ResourceType, OptionsType]) Count(ctx context.Context, query ledgercontroller.ResourceQuery[OptionsType]) (int, error) {

finalQuery, err := r.buildFilteredDataset(query)
finalQuery, err := r.buildFilteredDataset(ctx, query)
if err != nil {
return 0, err
}
Expand Down Expand Up @@ -289,7 +289,7 @@ func (r *paginatedResourceRepository[ResourceType, OptionsType, PaginationQueryT
panic("should not happen")
}

finalQuery, err := r.buildFilteredDataset(resourceQuery)
finalQuery, err := r.buildFilteredDataset(ctx, resourceQuery)
if err != nil {
return nil, fmt.Errorf("building filtered dataset: %w", err)
}
Expand All @@ -299,7 +299,7 @@ func (r *paginatedResourceRepository[ResourceType, OptionsType, PaginationQueryT
return nil, fmt.Errorf("paginating request: %w", err)
}

finalQuery, err = r.expand(finalQuery, resourceQuery)
finalQuery, err = r.expand(ctx, finalQuery, resourceQuery)
if err != nil {
return nil, fmt.Errorf("expanding results: %w", err)
}
Expand Down
28 changes: 15 additions & 13 deletions internal/storage/ledger/resource_accounts.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package ledger

import (
"context"
"fmt"
ledger "github.com/formancehq/ledger/internal"
ledgercontroller "github.com/formancehq/ledger/internal/controller/ledger"
Expand Down Expand Up @@ -48,14 +49,14 @@ func (h accountsResourceHandler) filters() []filter {
}
}

func (h accountsResourceHandler) buildDataset(store *Store, opts repositoryHandlerBuildContext[any]) (*bun.SelectQuery, error) {
func (h accountsResourceHandler) buildDataset(ctx context.Context, store *Store, opts repositoryHandlerBuildContext[any]) (*bun.SelectQuery, error) {
ret := store.db.NewSelect()

// Build the query
ret = ret.
ModelTableExpr(store.GetPrefixedRelationName("accounts")).
Column("address", "address_array", "first_usage", "insertion_date", "updated_at").
Where("ledger = ?", store.ledger.Name)
Column("address", "address_array", "first_usage", "insertion_date", "updated_at")
ret = store.applyLedgerFilter(ctx, ret, "accounts")

if opts.PIT != nil && !opts.PIT.IsZero() {
ret = ret.Where("accounts.first_usage <= ?", opts.PIT)
Expand All @@ -65,10 +66,10 @@ func (h accountsResourceHandler) buildDataset(store *Store, opts repositoryHandl
selectDistinctAccountMetadataHistories := store.db.NewSelect().
DistinctOn("accounts_address").
ModelTableExpr(store.GetPrefixedRelationName("accounts_metadata")).
Where("ledger = ?", store.ledger.Name).
Column("accounts_address").
ColumnExpr("first_value(metadata) over (partition by accounts_address order by revision desc) as metadata").
Where("date <= ?", opts.PIT)
selectDistinctAccountMetadataHistories = store.applyLedgerFilter(ctx, selectDistinctAccountMetadataHistories, "accounts_metadata")

ret = ret.
Join(
Expand All @@ -83,7 +84,7 @@ func (h accountsResourceHandler) buildDataset(store *Store, opts repositoryHandl
return ret, nil
}

func (h accountsResourceHandler) resolveFilter(store *Store, opts ledgercontroller.ResourceQuery[any], operator, property string, value any) (string, []any, error) {
func (h accountsResourceHandler) resolveFilter(ctx context.Context, store *Store, opts ledgercontroller.ResourceQuery[any], operator, property string, value any) (string, []any, error) {
switch {
case property == "address":
return filterAccountAddress(value.(string), "address"), nil, nil
Expand All @@ -92,8 +93,7 @@ func (h accountsResourceHandler) resolveFilter(store *Store, opts ledgercontroll
case balanceRegex.MatchString(property) || property == "balance":

selectBalance := store.db.NewSelect().
Where("accounts_address = dataset.address").
Where("ledger = ?", store.ledger.Name)
Where("accounts_address = dataset.address")

if opts.PIT != nil && !opts.PIT.IsZero() {
if !store.ledger.HasFeature(features.FeatureMovesHistory, "ON") {
Expand All @@ -104,10 +104,12 @@ func (h accountsResourceHandler) resolveFilter(store *Store, opts ledgercontroll
DistinctOn("asset").
ColumnExpr("first_value((post_commit_effective_volumes).inputs - (post_commit_effective_volumes).outputs) over (partition by (accounts_address, asset) order by effective_date desc, seq desc) as balance").
Where("effective_date <= ?", opts.PIT)
selectBalance = store.applyLedgerFilter(ctx, selectBalance, "moves")
} else {
selectBalance = selectBalance.
ModelTableExpr(store.GetPrefixedRelationName("accounts_volumes")).
ColumnExpr("input - output as balance")
selectBalance = store.applyLedgerFilter(ctx, selectBalance, "accounts_volumes")
}

if balanceRegex.MatchString(property) {
Expand All @@ -132,11 +134,11 @@ func (h accountsResourceHandler) resolveFilter(store *Store, opts ledgercontroll
}
}

func (h accountsResourceHandler) project(store *Store, query ledgercontroller.ResourceQuery[any], selectQuery *bun.SelectQuery) (*bun.SelectQuery, error) {
func (h accountsResourceHandler) project(ctx context.Context, store *Store, query ledgercontroller.ResourceQuery[any], selectQuery *bun.SelectQuery) (*bun.SelectQuery, error) {
return selectQuery.ColumnExpr("*"), nil
}

func (h accountsResourceHandler) expand(store *Store, opts ledgercontroller.ResourceQuery[any], property string) (*bun.SelectQuery, *joinCondition, error) {
func (h accountsResourceHandler) expand(ctx context.Context, store *Store, opts ledgercontroller.ResourceQuery[any], property string) (*bun.SelectQuery, *joinCondition, error) {
switch property {
case "volumes":
if !store.ledger.HasFeature(features.FeatureMovesHistory, "ON") {
Expand All @@ -154,8 +156,8 @@ func (h accountsResourceHandler) expand(store *Store, opts ledgercontroller.Reso
selectRowsQuery = selectRowsQuery.
ModelTableExpr(store.GetPrefixedRelationName("moves")).
DistinctOn("accounts_address, asset").
Column("accounts_address", "asset").
Where("ledger = ?", store.ledger.Name)
Column("accounts_address", "asset")
selectRowsQuery = store.applyLedgerFilter(ctx, selectRowsQuery, "moves")
if property == "volumes" {
selectRowsQuery = selectRowsQuery.
ColumnExpr("first_value(post_commit_volumes) over (partition by (accounts_address, asset) order by seq desc) as volumes").
Expand All @@ -169,8 +171,8 @@ func (h accountsResourceHandler) expand(store *Store, opts ledgercontroller.Reso
selectRowsQuery = selectRowsQuery.
ModelTableExpr(store.GetPrefixedRelationName("accounts_volumes")).
Column("asset", "accounts_address").
ColumnExpr("(input, output)::"+store.GetPrefixedRelationName("volumes")+" as volumes").
Where("ledger = ?", store.ledger.Name)
ColumnExpr("(input, output)::"+store.GetPrefixedRelationName("volumes")+" as volumes")
selectRowsQuery = store.applyLedgerFilter(ctx, selectRowsQuery, "accounts_volumes")
}

return store.db.NewSelect().
Expand Down
Loading
Loading