Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions app/config/billing.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ type BillingConfiguration struct {
MaxParallelQuantitySnapshots int
Worker BillingWorkerConfiguration
FeatureSwitches BillingFeatureSwitchesConfiguration
Charges BillingChargesConfiguration
}

func (c BillingConfiguration) Validate() error {
Expand All @@ -33,6 +34,14 @@ func (c BillingConfiguration) Validate() error {
return errors.Join(errs...)
}

type BillingChargesConfiguration struct {
Enabled bool
}

func (c BillingChargesConfiguration) Validate() error {
return nil
}

type BillingFeatureSwitchesConfiguration struct {
NamespaceLockdown []string
}
Expand All @@ -50,4 +59,5 @@ func ConfigureBilling(v *viper.Viper, flags *pflag.FlagSet) {
_ = v.BindPFlag("billing.advancementStrategy", flags.Lookup("billing-advancement-strategy"))
v.SetDefault("billing.advancementStrategy", billing.ForegroundAdvancementStrategy)
v.SetDefault("billing.maxParallelQuantitySnapshots", 4)
v.SetDefault("billing.charges.enabled", false)
}
4 changes: 4 additions & 0 deletions openmeter/billing/gatheringinvoice.go
Original file line number Diff line number Diff line change
Expand Up @@ -650,6 +650,10 @@ func (g GatheringLine) AsInvoiceLine() InvoiceLine {
}
}

func (g GatheringLine) AsLineOrHierarchy() (LineOrHierarchy, error) {
return NewLineOrHierarchy(g), nil
}

func (g GatheringLine) Equal(other GatheringLine) bool {
return g.GatheringLineBase.Equal(other.GatheringLineBase)
}
Expand Down
1 change: 1 addition & 0 deletions openmeter/billing/invoiceline.go
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,7 @@ type GenericInvoiceLineReader interface {

Validate() error
AsInvoiceLine() InvoiceLine
AsLineOrHierarchy() (LineOrHierarchy, error)
GetRateCardDiscounts() Discounts
GetSubscriptionReference() *SubscriptionReference
GetSplitLineGroupID() *string
Expand Down
9 changes: 9 additions & 0 deletions openmeter/billing/stdinvoiceline.go
Original file line number Diff line number Diff line change
Expand Up @@ -296,6 +296,15 @@ func (i StandardLine) AsInvoiceLine() InvoiceLine {
}
}

func (i StandardLine) AsLineOrHierarchy() (LineOrHierarchy, error) {
cloned, err := i.Clone()
if err != nil {
return LineOrHierarchy{}, err
}

return NewLineOrHierarchy(cloned), nil
}

func (i StandardLine) GetQuantity() *alpacadecimal.Decimal {
if i.UsageBased == nil {
return nil
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
package persistedstate

import (
"errors"

"github.com/openmeterio/openmeter/openmeter/billing"
"github.com/openmeterio/openmeter/openmeter/billing/charges"
"github.com/openmeterio/openmeter/pkg/timeutil"
)

type EntityType string

const (
EntityTypeLineOrHierarchy EntityType = "line_or_hierarchy"
EntityTypeCharge EntityType = "charge"
)

var ErrEntityTypeMismatch = errors.New("entity type mismatch")

type Entity interface {
IsFlatFee() bool

GetServicePeriod() timeutil.ClosedPeriod
GetChildUniqueReferenceID() *string
GetType() EntityType
AsLineOrHierarchy() (billing.LineOrHierarchy, error)
AsCharge() (charges.Charge, error)
}

// Implementations

// Line
var _ Entity = (*LineEntity)(nil)

type LineEntity struct {
line billing.GenericInvoiceLine
}

func (e LineEntity) GetType() EntityType {
return EntityTypeLineOrHierarchy
}

func (e LineEntity) AsLineOrHierarchy() (billing.LineOrHierarchy, error) {
return e.line.AsLineOrHierarchy()
}

// Hierarchy

var _ Entity = (*HierarchyEntity)(nil)

type HierarchyEntity struct {
hierarchy *billing.SplitLineHierarchy
}

func (e HierarchyEntity) GetType() EntityType {
return EntityTypeLineOrHierarchy
}

func (e HierarchyEntity) AsLineOrHierarchy() (billing.LineOrHierarchy, error) {
return billing.NewLineOrHierarchy(e.hierarchy), nil
}

// Charge

var _ Entity = (*ChargeEntity)(nil)

type ChargeEntity struct {
charge charges.Charge
}

func (e ChargeEntity) GetType() EntityType {
return EntityTypeCharge
}

func (e ChargeEntity) GetServicePeriod() timeutil.ClosedPeriod {
return e.charge.GetServicePeriod()
}

func (e ChargeEntity) GetChildUniqueReferenceID() *string {
return e.charge.GetChildUniqueReferenceID()
}

func (e ChargeEntity) AsLineOrHierarchy() (billing.LineOrHierarchy, error) {
return billing.LineOrHierarchy{}, ErrEntityTypeMismatch
}
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"github.com/samber/lo"

"github.com/openmeterio/openmeter/openmeter/billing"
"github.com/openmeterio/openmeter/openmeter/billing/charges"
"github.com/openmeterio/openmeter/openmeter/customer"
"github.com/openmeterio/openmeter/openmeter/streaming"
"github.com/openmeterio/openmeter/openmeter/subscription"
Expand All @@ -21,15 +22,26 @@ type billingService interface {

type Loader struct {
billingService billingService
chargesService charges.Service
}

func NewLoader(billingService billingService) Loader {
func NewLoader(billingService billingService, chargesService charges.Service) Loader {
return Loader{
billingService: billingService,
chargesService: chargesService,
}
}

func (l Loader) LoadForSubscription(ctx context.Context, subs subscription.Subscription) (State, error) {
lines, err := l.loadLinesForSubscription(ctx, subs)
if err != nil {
return State{}, fmt.Errorf("loading lines for subscription: %w", err)
}

return lines, nil
}

func (l Loader) loadLinesForSubscription(ctx context.Context, subs subscription.Subscription) (State, error) {
lines, err := l.billingService.GetLinesForSubscription(ctx, billing.GetLinesForSubscriptionInput{
Namespace: subs.Namespace,
SubscriptionID: subs.ID,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ import (

type State struct {
Lines []billing.LineOrHierarchy
ByUniqueID map[string]billing.LineOrHierarchy
ByUniqueID map[string]Entity
}

func (s State) Validate() error {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ import (
"context"
"time"

"github.com/openmeterio/openmeter/openmeter/billing/worker/subscriptionsync/service/persistedstate"
"github.com/openmeterio/openmeter/openmeter/billing/worker/subscriptionsync/service/reconciler"
"github.com/openmeterio/openmeter/openmeter/billing/worker/subscriptionsync/service/targetstate"
"github.com/openmeterio/openmeter/openmeter/subscription"
Expand All @@ -16,8 +15,7 @@ func (s *Service) buildSyncPlan(ctx context.Context, subsView subscription.Subsc
span := tracex.Start[*reconciler.Plan](ctx, s.tracer, "billing.worker.subscription.sync.buildSyncPlan")

return span.Wrap(func(ctx context.Context) (*reconciler.Plan, error) {
persistedLoader := persistedstate.NewLoader(s.billingService)
persisted, err := persistedLoader.LoadForSubscription(ctx, subsView.Subscription)
persisted, err := s.persistedStateLoader.LoadForSubscription(ctx, subsView.Subscription)
if err != nil {
return nil, err
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,5 +26,14 @@ type GetInvoicePatchesInput struct {
type Patch interface {
Operation() PatchOperation
UniqueReferenceID() string
}

type InvoicePatch interface {
Patch
GetInvoicePatches(input GetInvoicePatchesInput) ([]invoiceupdater.Patch, error)
}

type ChargePatch interface {
Patch
TodoPatchMechanism()
}
Original file line number Diff line number Diff line change
@@ -1,26 +1,57 @@
package reconciler

import (
"errors"
"fmt"

"github.com/openmeterio/openmeter/openmeter/billing/worker/subscriptionsync/service/reconciler/invoiceupdater"
"github.com/openmeterio/openmeter/openmeter/billing/worker/subscriptionsync/service/targetstate"
)

type CreatePatch struct {
type NewCreatePatchInput struct {
UniqueID string
Target targetstate.SubscriptionItemWithPeriods
}

func (p CreatePatch) Operation() PatchOperation {
func (i NewCreatePatchInput) Validate() error {
var errs []error
if i.UniqueID == "" {
errs = append(errs, errors.New("unique id is required"))
}

if err := i.Target.Validate(); err != nil {
errs = append(errs, fmt.Errorf("target: %w", err))
}

return errors.Join(errs...)
}

func (s *Service) NewCreatePatch(input NewCreatePatchInput) (Patch, error) {
if err := input.Validate(); err != nil {
return nil, fmt.Errorf("new create patch: %w", err)
}

// TODO: use the service's field to decide if it should create a line or charge
return LineCreatePatch{
UniqueID: input.UniqueID,
Target: input.Target,
}, nil
}

type LineCreatePatch struct {
UniqueID string
Target targetstate.SubscriptionItemWithPeriods
}

func (p LineCreatePatch) Operation() PatchOperation {
return PatchOperationCreate
}

func (p CreatePatch) UniqueReferenceID() string {
func (p LineCreatePatch) UniqueReferenceID() string {
return p.UniqueID
}

func (p CreatePatch) GetInvoicePatches(input GetInvoicePatchesInput) ([]invoiceupdater.Patch, error) {
func (p LineCreatePatch) GetInvoicePatches(input GetInvoicePatchesInput) ([]invoiceupdater.Patch, error) {
line, err := p.Target.GetExpectedLine(input.Subscription, input.Currency)
if err != nil {
return nil, fmt.Errorf("generating line from subscription item [%s]: %w", p.Target.SubscriptionItem.ID, err)
Expand Down
Original file line number Diff line number Diff line change
@@ -1,23 +1,46 @@
package reconciler

import (
"errors"

"github.com/openmeterio/openmeter/openmeter/billing"
"github.com/openmeterio/openmeter/openmeter/billing/worker/subscriptionsync/service/persistedstate"
"github.com/openmeterio/openmeter/openmeter/billing/worker/subscriptionsync/service/reconciler/invoiceupdater"
"github.com/samber/lo"
)

type DeletePatch struct {
UniqueID string
Existing persistedstate.Entity
}

func (s *Service) NewDeletePatch(existing persistedstate.Entity) (Patch, error) {
if existing == nil {
return nil, errors.New("new delete patch: existing entity is required")
}

return newFromEntity(newFromEntityInput{
Entity: existing,
NewInvoicePatch: func(lineOrHierarchy billing.LineOrHierarchy) (Patch, error) {
return LineDeletePatch{
Existing: lineOrHierarchy,
}, nil
},
})
}

type LineDeletePatch struct {
Existing billing.LineOrHierarchy
}

func (p DeletePatch) Operation() PatchOperation {
func (p LineDeletePatch) Operation() PatchOperation {
return PatchOperationDelete
}

func (p DeletePatch) UniqueReferenceID() string {
return p.UniqueID
func (p LineDeletePatch) UniqueReferenceID() string {
return lo.FromPtr(p.Existing.ChildUniqueReferenceID())
}

func (p DeletePatch) GetInvoicePatches(input GetInvoicePatchesInput) ([]invoiceupdater.Patch, error) {
func (p LineDeletePatch) GetInvoicePatches(input GetInvoicePatchesInput) ([]invoiceupdater.Patch, error) {
return invoiceupdater.GetDeletePatchesForLine(p.Existing)
}
Loading
Loading