Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 16 additions & 0 deletions openmeter/subscription/service/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"sync"

"github.com/samber/lo"
"go.opentelemetry.io/otel/attribute"

"github.com/openmeterio/openmeter/openmeter/customer"
"github.com/openmeterio/openmeter/openmeter/productcatalog/feature"
Expand Down Expand Up @@ -98,6 +99,11 @@ func (s *service) lockCustomer(ctx context.Context, customerId string) error {

func (s *service) Create(ctx context.Context, namespace string, spec subscription.SubscriptionSpec) (subscription.Subscription, error) {
ctx = subscription.NewSubscriptionOperationContext(ctx)
setSpanAttrs(ctx,
attribute.String("subscription.namespace", namespace),
attribute.String("subscription.operation", "create"),
)
setSpanAttrs(ctx, addSpecAttrs([]attribute.KeyValue{}, "subscription.spec.input", spec)...)

def := subscription.Subscription{}

Expand Down Expand Up @@ -193,6 +199,12 @@ func (s *service) Create(ctx context.Context, namespace string, spec subscriptio

func (s *service) Update(ctx context.Context, subscriptionID models.NamespacedID, newSpec subscription.SubscriptionSpec) (subscription.Subscription, error) {
ctx = subscription.NewSubscriptionOperationContext(ctx)
setSpanAttrs(ctx,
attribute.String("subscription.namespace", subscriptionID.Namespace),
attribute.String("subscription.id", subscriptionID.ID),
attribute.String("subscription.operation", "update"),
)
setSpanAttrs(ctx, addSpecAttrs([]attribute.KeyValue{}, "subscription.spec.input", newSpec)...)

var def subscription.Subscription

Expand All @@ -201,6 +213,8 @@ func (s *service) Update(ctx context.Context, subscriptionID models.NamespacedID
if err != nil {
return def, fmt.Errorf("failed to get view: %w", err)
}
setSpanAttrs(ctx, addViewAttrs([]attribute.KeyValue{}, "subscription.view.current", view)...)
setSpanAttrs(ctx, addSpecAttrs([]attribute.KeyValue{}, "subscription.spec.current", view.Spec)...)

if err := s.validateUpdate(ctx, view, newSpec); err != nil {
return def, err
Expand All @@ -227,6 +241,8 @@ func (s *service) Update(ctx context.Context, subscriptionID models.NamespacedID
if err != nil {
return subs, err
}
setSpanAttrs(ctx, addViewAttrs([]attribute.KeyValue{}, "subscription.view.updated", updatedView)...)
setSpanAttrs(ctx, addSpecAttrs([]attribute.KeyValue{}, "subscription.spec.updated", updatedView.Spec)...)

err = errors.Join(lo.Map(s.Hooks, func(v subscription.SubscriptionCommandHook, _ int) error {
return v.AfterUpdate(ctx, updatedView)
Expand Down
71 changes: 70 additions & 1 deletion openmeter/subscription/service/sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"fmt"

"github.com/samber/lo"
"go.opentelemetry.io/otel/attribute"

"github.com/openmeterio/openmeter/openmeter/entitlement"
"github.com/openmeterio/openmeter/openmeter/subscription"
Expand All @@ -26,8 +27,19 @@ import (
// TODO: localize error so phase and item keys are always included (alongside subscription reference)
// TODO (OM-1074): clean up this control flow
func (s *service) sync(ctx context.Context, view subscription.SubscriptionView, newSpec subscription.SubscriptionSpec) (subscription.Subscription, error) {
setSpanAttrs(ctx,
attribute.String("subscription.namespace", view.Subscription.Namespace),
attribute.String("subscription.id", view.Subscription.ID),
attribute.String("subscription.sync.operation", "spec_sync"),
)
setSpanAttrs(ctx, addViewAttrs([]attribute.KeyValue{}, "subscription.view.before", view)...)
setSpanAttrs(ctx, addSpecAttrs([]attribute.KeyValue{}, "subscription.spec.before", view.Spec)...)
setSpanAttrs(ctx, addSpecAttrs([]attribute.KeyValue{}, "subscription.spec.target", newSpec)...)

return transaction.Run(ctx, s.TransactionManager, func(ctx context.Context) (subscription.Subscription, error) {
var def subscription.Subscription
var phaseDeleted, phaseCreated int
var itemDeleted, itemCreated int

// Some sanity checks for good measure
if view.Subscription.CustomerId != newSpec.CustomerId {
Expand Down Expand Up @@ -63,6 +75,12 @@ func (s *service) sync(ctx context.Context, view subscription.SubscriptionView,
if err := s.deletePhase(ctx, currentPhaseView); err != nil {
return def, fmt.Errorf("failed to delete phase: %w", err)
}
phaseDeleted++
addSpanEvent(ctx, "subscription.sync.phase.delete",
attribute.String("phase.key", currentPhaseView.SubscriptionPhase.Key),
attribute.String("phase.id", currentPhaseView.SubscriptionPhase.ID),
attribute.String("reason", "removed"),
)

dirty.mark(subscription.NewPhasePath(currentPhaseView.SubscriptionPhase.Key))

Expand Down Expand Up @@ -99,6 +117,12 @@ func (s *service) sync(ctx context.Context, view subscription.SubscriptionView,
if err := s.deletePhase(ctx, currentPhaseView); err != nil {
return def, fmt.Errorf("failed to delete phase: %w", err)
}
phaseDeleted++
addSpanEvent(ctx, "subscription.sync.phase.delete",
attribute.String("phase.key", currentPhaseView.SubscriptionPhase.Key),
attribute.String("phase.id", currentPhaseView.SubscriptionPhase.ID),
attribute.String("reason", "changed"),
)

dirty.mark(subscription.NewPhasePath(currentPhaseView.SubscriptionPhase.Key))

Expand All @@ -124,6 +148,13 @@ func (s *service) sync(ctx context.Context, view subscription.SubscriptionView,
if err := s.deleteItem(ctx, currentItemView); err != nil {
return def, fmt.Errorf("failed to delete item: %w", err)
}
itemDeleted++
addSpanEvent(ctx, "subscription.sync.item.delete",
attribute.String("phase.key", currentItemView.Spec.PhaseKey),
attribute.String("item.key", currentItemView.Spec.ItemKey),
attribute.String("item.id", currentItemView.SubscriptionItem.ID),
attribute.String("reason", "key_removed"),
)

dirty.mark(subscription.NewItemPath(currentItemView.Spec.PhaseKey, currentItemView.Spec.ItemKey))
}
Expand All @@ -140,6 +171,14 @@ func (s *service) sync(ctx context.Context, view subscription.SubscriptionView,
if err := s.deleteItem(ctx, currentItemView); err != nil {
return def, fmt.Errorf("failed to delete item: %w", err)
}
itemDeleted++
addSpanEvent(ctx, "subscription.sync.item.delete",
attribute.String("phase.key", currentItemView.Spec.PhaseKey),
attribute.String("item.key", currentItemView.Spec.ItemKey),
attribute.String("item.id", currentItemView.SubscriptionItem.ID),
attribute.Int("item.version", currentItemIdx),
attribute.String("reason", "version_removed"),
)

dirty.mark(subscription.NewItemVersionPath(currentItemView.Spec.PhaseKey, currentItemView.Spec.ItemKey, currentItemIdx))

Expand Down Expand Up @@ -191,6 +230,14 @@ func (s *service) sync(ctx context.Context, view subscription.SubscriptionView,
if err := s.deleteItem(ctx, currentItemView); err != nil {
return def, fmt.Errorf("failed to delete item: %w", err)
}
itemDeleted++
addSpanEvent(ctx, "subscription.sync.item.delete",
attribute.String("phase.key", currentItemView.Spec.PhaseKey),
attribute.String("item.key", currentItemView.Spec.ItemKey),
attribute.String("item.id", currentItemView.SubscriptionItem.ID),
attribute.Int("item.version", currentItemIdx),
attribute.String("reason", "changed"),
)

dirty.mark(subscription.NewItemVersionPath(currentItemView.Spec.PhaseKey, currentItemView.Spec.ItemKey, currentItemIdx))

Expand Down Expand Up @@ -228,6 +275,7 @@ func (s *service) sync(ctx context.Context, view subscription.SubscriptionView,
if _, err := s.createPhase(ctx, view.Customer, *matchingPhaseFromNewSpec, view.Subscription, newPhaseCadence); err != nil {
return def, fmt.Errorf("failed to create phase: %w", err)
}
phaseCreated++

// There's nothing more to be done for this phase, so lets skip to the next one
continue
Expand Down Expand Up @@ -265,6 +313,7 @@ func (s *service) sync(ctx context.Context, view subscription.SubscriptionView,
}); err != nil {
return def, fmt.Errorf("failed to create item: %w", err)
}
itemCreated++

// There's nothing more to be done for this item, so lets skip to the next one
continue
Expand Down Expand Up @@ -294,6 +343,7 @@ func (s *service) sync(ctx context.Context, view subscription.SubscriptionView,
if _, err := s.createPhase(ctx, view.Customer, *phase, view.Subscription, phaseCadence); err != nil {
return def, fmt.Errorf("failed to create phase: %w", err)
}
phaseCreated++
continue
}

Expand All @@ -318,6 +368,7 @@ func (s *service) sync(ctx context.Context, view subscription.SubscriptionView,
}); err != nil {
return def, fmt.Errorf("failed to create item: %w", err)
}
itemCreated++

// There's nothing left to do for this item
continue
Expand All @@ -335,13 +386,31 @@ func (s *service) sync(ctx context.Context, view subscription.SubscriptionView,
}); err != nil {
return def, fmt.Errorf("failed to create item: %w", err)
}
itemCreated++
}
}
}
}

// 4. Finally we're done with syncing everything, we should just re-fetch the subscription
return s.Get(ctx, view.Subscription.NamespacedID)
setSpanAttrs(ctx,
attribute.Int("subscription.sync.touched_paths.count", len(dirty)),
attribute.Int("subscription.sync.phases.deleted", phaseDeleted),
attribute.Int("subscription.sync.phases.created", phaseCreated),
attribute.Int("subscription.sync.items.deleted", itemDeleted),
attribute.Int("subscription.sync.items.created", itemCreated),
)

sub, err := s.Get(ctx, view.Subscription.NamespacedID)
if err != nil {
return def, err
}
setSpanAttrs(ctx,
attribute.String("subscription.sync.result_id", sub.ID),
attribute.String("subscription.sync.result_namespace", sub.Namespace),
)

return sub, nil
})
}

Expand Down
101 changes: 101 additions & 0 deletions openmeter/subscription/service/trace.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
package service

import (
"context"
"slices"

"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/trace"

"github.com/openmeterio/openmeter/openmeter/subscription"
)

func setSpanAttrs(ctx context.Context, attrs ...attribute.KeyValue) {
span := trace.SpanFromContext(ctx)
if span == nil {
return
}

span.SetAttributes(attrs...)
}

func addSpanEvent(ctx context.Context, name string, attrs ...attribute.KeyValue) {
span := trace.SpanFromContext(ctx)
if span == nil {
return
}

span.AddEvent(name, trace.WithAttributes(attrs...))
}

func addSpecAttrs(attrs []attribute.KeyValue, prefix string, spec subscription.SubscriptionSpec) []attribute.KeyValue {
phaseKeys := make([]string, 0, len(spec.Phases))
itemKeySet := make(map[string]struct{})
itemVersions := 0
for k, phase := range spec.Phases {
phaseKeys = append(phaseKeys, k)
for ik, items := range phase.ItemsByKey {
itemKeySet[ik] = struct{}{}
itemVersions += len(items)
}
}
Comment on lines +31 to +41
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Guard against nil phase entries to avoid panics.
SubscriptionSpec.Phases stores pointers; if a phase is nil (e.g., during patching/invalid input), phase.ItemsByKey will panic before validation kicks in. A small nil check keeps tracing from crashing the operation.

🛠️ Suggested guard
 for k, phase := range spec.Phases {
+	if phase == nil {
+		continue
+	}
 	phaseKeys = append(phaseKeys, k)
 	for ik, items := range phase.ItemsByKey {
 		itemKeySet[ik] = struct{}{}
 		itemVersions += len(items)
 	}
 }
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
func addSpecAttrs(attrs []attribute.KeyValue, prefix string, spec subscription.SubscriptionSpec) []attribute.KeyValue {
phaseKeys := make([]string, 0, len(spec.Phases))
itemKeySet := make(map[string]struct{})
itemVersions := 0
for k, phase := range spec.Phases {
phaseKeys = append(phaseKeys, k)
for ik, items := range phase.ItemsByKey {
itemKeySet[ik] = struct{}{}
itemVersions += len(items)
}
}
func addSpecAttrs(attrs []attribute.KeyValue, prefix string, spec subscription.SubscriptionSpec) []attribute.KeyValue {
phaseKeys := make([]string, 0, len(spec.Phases))
itemKeySet := make(map[string]struct{})
itemVersions := 0
for k, phase := range spec.Phases {
if phase == nil {
continue
}
phaseKeys = append(phaseKeys, k)
for ik, items := range phase.ItemsByKey {
itemKeySet[ik] = struct{}{}
itemVersions += len(items)
}
}
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@openmeter/subscription/service/trace.go` around lines 31 - 41, The loop in
addSpecAttrs reads phase.ItemsByKey but spec.Phases contains pointers, so if a
phase is nil this will panic; update the loop over spec.Phases in addSpecAttrs
to check if phase == nil and skip/continue that entry before accessing
phase.ItemsByKey (i.e., guard the use of phase.ItemsByKey and phase-related
logic with a nil check) to prevent trace crashes on malformed/partial specs.

slices.Sort(phaseKeys)

itemKeys := make([]string, 0, len(itemKeySet))
for k := range itemKeySet {
itemKeys = append(itemKeys, k)
}
slices.Sort(itemKeys)

attrs = append(attrs,
attribute.String(prefix+".customer_id", spec.CustomerId),
attribute.Int(prefix+".phases.count", len(spec.Phases)),
attribute.StringSlice(prefix+".phase_keys", phaseKeys),
attribute.Int(prefix+".item_keys.count", len(itemKeySet)),
attribute.StringSlice(prefix+".item_keys", itemKeys),
attribute.Int(prefix+".item_versions.count", itemVersions),
attribute.Bool(prefix+".has_billables", spec.HasBillables()),
attribute.Bool(prefix+".has_metered_billables", spec.HasMeteredBillables()),
attribute.Bool(prefix+".has_entitlements", spec.HasEntitlements()),
)

if spec.Plan != nil {
attrs = append(attrs,
attribute.String(prefix+".plan.id", spec.Plan.Id),
attribute.String(prefix+".plan.key", spec.Plan.Key),
attribute.Int(prefix+".plan.version", spec.Plan.Version),
)
}

return attrs
}

func addViewAttrs(attrs []attribute.KeyValue, prefix string, view subscription.SubscriptionView) []attribute.KeyValue {
phaseKeys := make([]string, 0, len(view.Phases))
itemKeySet := make(map[string]struct{})
itemVersions := 0
for _, phase := range view.Phases {
phaseKeys = append(phaseKeys, phase.SubscriptionPhase.Key)
for ik, items := range phase.ItemsByKey {
itemKeySet[ik] = struct{}{}
itemVersions += len(items)
}
}
slices.Sort(phaseKeys)

itemKeys := make([]string, 0, len(itemKeySet))
for k := range itemKeySet {
itemKeys = append(itemKeys, k)
}
slices.Sort(itemKeys)

return append(attrs,
attribute.String(prefix+".subscription_id", view.Subscription.ID),
attribute.String(prefix+".customer_id", view.Subscription.CustomerId),
attribute.Int(prefix+".phases.count", len(view.Phases)),
attribute.StringSlice(prefix+".phase_keys", phaseKeys),
attribute.Int(prefix+".item_keys.count", len(itemKeySet)),
attribute.StringSlice(prefix+".item_keys", itemKeys),
attribute.Int(prefix+".item_versions.count", itemVersions),
)
}
Loading
Loading