Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 26 additions & 2 deletions internal/ent/eventqueue/gala_adapter.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,12 +62,25 @@ type MutationGalaMetadata struct {
// NewMutationGalaMetadata builds metadata for Gala mutation envelopes from mutation payload data
func NewMutationGalaMetadata(eventID string, payload MutationGalaPayload) MutationGalaMetadata {
properties := mutationMetadataProperties(payload)

entityID := strings.TrimSpace(payload.EntityID)
if entityID != "" {

if entityID != "" || payload.Operation != "" || payload.MutationType != "" {
if properties == nil {
properties = map[string]string{}
}
properties[MutationPropertyEntityID] = entityID

if entityID != "" {
properties[MutationPropertyEntityID] = entityID
}

if payload.Operation != "" {
properties[MutationPropertyOperation] = payload.Operation
}

if payload.MutationType != "" {
properties[MutationPropertyMutationType] = payload.MutationType
}
}

return MutationGalaMetadata{
Expand All @@ -81,9 +94,20 @@ func NewGalaHeadersFromMutationMetadata(metadata MutationGalaMetadata) gala.Head
properties := normalizeMutationMetadataProperties(metadata.Properties)
eventID := strings.TrimSpace(metadata.EventID)

var tags []string

if mt := metadata.Properties[MutationPropertyMutationType]; mt != "" {
tags = append(tags, mt)
}

if op := metadata.Properties[MutationPropertyOperation]; op != "" {
tags = append(tags, op)
}

return gala.Headers{
IdempotencyKey: eventID,
Properties: properties,
Tags: tags,
}
}

Expand Down
72 changes: 36 additions & 36 deletions internal/ent/eventqueue/gala_adapter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ import (
"context"
"testing"

"github.com/stretchr/testify/require"
"github.com/stretchr/testify/assert"
"github.com/theopenlane/iam/auth"
"github.com/theopenlane/utils/contextx"

Expand All @@ -24,11 +24,11 @@ func TestNewMutationGalaEnvelope(t *testing.T) {
runtime, err := gala.NewGala(context.Background(), gala.Config{
DispatchMode: gala.DispatchModeInMemory,
})
require.NoError(t, err)
assert.NoError(t, err)

t.Cleanup(func() { _ = runtime.Close() })

require.NoError(t, runtime.ContextManager().Register(
assert.NoError(t, runtime.ContextManager().Register(
gala.NewTypedContextCodec[galaAdapterTestActor]("adapter_actor"),
))

Expand All @@ -37,7 +37,7 @@ func TestNewMutationGalaEnvelope(t *testing.T) {
Topic: topic,
Codec: gala.JSONCodec[MutationGalaPayload]{},
})
require.NoError(t, err)
assert.NoError(t, err)

payload := MutationGalaPayload{
MutationType: "organization",
Expand All @@ -62,34 +62,34 @@ func TestNewMutationGalaEnvelope(t *testing.T) {

metadata := NewMutationGalaMetadata("evt_123", payload)
envelope, err := NewMutationGalaEnvelope(emitCtx, runtime, topic, payload, metadata)
require.NoError(t, err)

require.Equal(t, gala.EventID("evt_123"), envelope.ID)
require.Equal(t, topic.Name, envelope.Topic)
require.Equal(t, "evt_123", envelope.Headers.IdempotencyKey)
require.Equal(t, "name", envelope.Headers.Properties["mutation_field"])
require.Equal(t, "7", envelope.Headers.Properties["count"])
require.Equal(t, payload.EntityID, envelope.Headers.Properties[MutationPropertyEntityID])
require.Equal(t, true, envelope.ContextSnapshot.Flags[gala.ContextFlagWorkflowBypass])
require.Equal(t, true, envelope.ContextSnapshot.Flags[gala.ContextFlagWorkflowAllowEventEmission])
require.Contains(t, envelope.ContextSnapshot.Values, gala.ContextKey("adapter_actor"))
require.Contains(t, envelope.ContextSnapshot.Values, gala.ContextKey("durable"))
assert.NoError(t, err)

assert.Equal(t, gala.EventID("evt_123"), envelope.ID)
assert.Equal(t, topic.Name, envelope.Topic)
assert.Equal(t, "evt_123", envelope.Headers.IdempotencyKey)
assert.Equal(t, "name", envelope.Headers.Properties["mutation_field"])
assert.Equal(t, "7", envelope.Headers.Properties["count"])
assert.Equal(t, payload.EntityID, envelope.Headers.Properties[MutationPropertyEntityID])
assert.Equal(t, true, envelope.ContextSnapshot.Flags[gala.ContextFlagWorkflowBypass])
assert.Equal(t, true, envelope.ContextSnapshot.Flags[gala.ContextFlagWorkflowAllowEventEmission])
assert.Contains(t, envelope.ContextSnapshot.Values, gala.ContextKey("adapter_actor"))
assert.Contains(t, envelope.ContextSnapshot.Values, gala.ContextKey("durable"))

restoredContext, err := runtime.ContextManager().Restore(context.Background(), envelope.ContextSnapshot)
require.NoError(t, err)
assert.NoError(t, err)

restoredUser, err := auth.GetAuthenticatedUserFromContext(restoredContext)
require.NoError(t, err)
require.Equal(t, "subject_123", restoredUser.SubjectID)
require.Equal(t, "org_123", restoredUser.OrganizationID)
assert.NoError(t, err)
assert.Equal(t, "subject_123", restoredUser.SubjectID)
assert.Equal(t, "org_123", restoredUser.OrganizationID)

decodedAny, err := runtime.Registry().DecodePayload(topic.Name, envelope.Payload)
require.NoError(t, err)
assert.NoError(t, err)

decoded, ok := decodedAny.(MutationGalaPayload)
require.True(t, ok)
require.Equal(t, payload.EntityID, decoded.EntityID)
require.Equal(t, payload.Operation, decoded.Operation)
assert.True(t, ok)
assert.Equal(t, payload.EntityID, decoded.EntityID)
assert.Equal(t, payload.Operation, decoded.Operation)
}

// TestNewGalaHeadersFromMutationMetadata verifies property normalization for gala headers
Expand All @@ -105,11 +105,11 @@ func TestNewGalaHeadersFromMutationMetadata(t *testing.T) {
},
})

require.Equal(t, "evt_456", headers.IdempotencyKey)
require.Equal(t, "true", headers.Properties["active"])
require.Equal(t, "5", headers.Properties["count"])
assert.Equal(t, "evt_456", headers.IdempotencyKey)
assert.Equal(t, "true", headers.Properties["active"])
assert.Equal(t, "5", headers.Properties["count"])
_, exists := headers.Properties[""]
require.False(t, exists)
assert.False(t, exists)
}

// TestMutationGalaPayloadChangeSetRoundTrip verifies payload change-set projections preserve values and clone maps/slices
Expand All @@ -135,15 +135,15 @@ func TestMutationGalaPayloadChangeSetRoundTrip(t *testing.T) {
changeSet.AddedIDs["controls"][0] = "mutated"
changeSet.ProposedChanges["status"] = "mutated"

require.Equal(t, "status", payload.ChangedFields[0])
require.Equal(t, "one", payload.AddedIDs["controls"][0])
require.Equal(t, "approved", payload.ProposedChanges["status"])
assert.Equal(t, "status", payload.ChangedFields[0])
assert.Equal(t, "one", payload.AddedIDs["controls"][0])
assert.Equal(t, "approved", payload.ProposedChanges["status"])

var roundTrip MutationGalaPayload
roundTrip.SetChangeSet(payload.ChangeSet())
require.Equal(t, payload.ChangedFields, roundTrip.ChangedFields)
require.Equal(t, payload.ChangedEdges, roundTrip.ChangedEdges)
require.Equal(t, payload.AddedIDs, roundTrip.AddedIDs)
require.Equal(t, payload.RemovedIDs, roundTrip.RemovedIDs)
require.Equal(t, payload.ProposedChanges, roundTrip.ProposedChanges)
assert.Equal(t, payload.ChangedFields, roundTrip.ChangedFields)
assert.Equal(t, payload.ChangedEdges, roundTrip.ChangedEdges)
assert.Equal(t, payload.AddedIDs, roundTrip.AddedIDs)
assert.Equal(t, payload.RemovedIDs, roundTrip.RemovedIDs)
assert.Equal(t, payload.ProposedChanges, roundTrip.ProposedChanges)
}
15 changes: 11 additions & 4 deletions internal/ent/eventqueue/handler_helpers.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,19 @@ import (
"github.com/theopenlane/core/pkg/gala"
)

// ClientFromHandler resolves the ent client from Gala listener dependencies
func ClientFromHandler(ctx gala.HandlerContext) (*generated.Client, bool) {
// ClientFromHandler resolves the ent client from the Gala injector and seeds it into
// HandlerContext.Context so that ent interceptors relying on generated.FromContext work
// correctly when the context was reconstructed from a durable snapshot.
// The restored snapshot carries auth claims and log fields but not the ent client,
// which is a live runtime dependency. Without it, interceptors such as InterceptorModules
// cannot resolve the FGA authz client and incorrectly report features as disabled.
func ClientFromHandler(ctx gala.HandlerContext) (gala.HandlerContext, *generated.Client, bool) {
client, err := do.Invoke[*generated.Client](ctx.Injector)
if err != nil || client == nil {
return nil, false
return ctx, nil, false
}

return client, true
ctx.Context = generated.NewContext(ctx.Context, client)

return ctx, client, true
}
4 changes: 4 additions & 0 deletions internal/ent/eventqueue/mutation_helpers.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,10 @@ import (
const (
// MutationPropertyEntityID is the standard mutation metadata key used for entity identifiers
MutationPropertyEntityID = "ID"
// MutationPropertyOperation is the mutation metadata key used for the operation type
MutationPropertyOperation = "operation"
// MutationPropertyMutationType is the mutation metadata key used for the ent schema type
MutationPropertyMutationType = "mutation_type"
// SoftDeleteOne is a synthetic operation used for soft-delete hooks
SoftDeleteOne = "SoftDeleteOne"
)
Expand Down
39 changes: 25 additions & 14 deletions internal/ent/eventqueue/mutation_helpers_test.go
Original file line number Diff line number Diff line change
@@ -1,10 +1,11 @@
package eventqueue

import (
"context"
"testing"

"github.com/samber/do/v2"
"github.com/stretchr/testify/require"
"github.com/stretchr/testify/assert"

"github.com/theopenlane/core/internal/ent/generated"
"github.com/theopenlane/core/pkg/gala"
Expand All @@ -24,12 +25,12 @@ func TestMutationStringValueOrProperty(t *testing.T) {
}

got := MutationStringValueOrProperty(payload, map[string]string{field: "header@example.com"}, field)
require.Equal(t, "proposed@example.com", got)
assert.Equal(t, "proposed@example.com", got)
})

t.Run("falls back to properties when proposed value is missing", func(t *testing.T) {
got := MutationStringValueOrProperty(MutationGalaPayload{}, map[string]string{field: "header@example.com"}, field)
require.Equal(t, "header@example.com", got)
assert.Equal(t, "header@example.com", got)
})

t.Run("falls back to properties when proposed value is blank", func(t *testing.T) {
Expand All @@ -40,7 +41,7 @@ func TestMutationStringValueOrProperty(t *testing.T) {
}

got := MutationStringValueOrProperty(payload, map[string]string{field: "header@example.com"}, field)
require.Equal(t, "header@example.com", got)
assert.Equal(t, "header@example.com", got)
})
}

Expand All @@ -58,12 +59,12 @@ func TestMutationStringValuePreferPayload(t *testing.T) {
}

got := MutationStringValuePreferPayload(payload, map[string]string{field: "header@example.com"}, field)
require.Equal(t, "proposed@example.com", got)
assert.Equal(t, "proposed@example.com", got)
})

t.Run("falls back to properties when proposed value is missing", func(t *testing.T) {
got := MutationStringValuePreferPayload(MutationGalaPayload{}, map[string]string{field: "header@example.com"}, field)
require.Equal(t, "header@example.com", got)
assert.Equal(t, "header@example.com", got)
})

t.Run("does not fall back when proposed value is blank", func(t *testing.T) {
Expand All @@ -74,7 +75,7 @@ func TestMutationStringValuePreferPayload(t *testing.T) {
}

got := MutationStringValuePreferPayload(payload, map[string]string{field: "header@example.com"}, field)
require.Empty(t, got)
assert.Empty(t, got)
})

t.Run("preserves non-string proposed value conversion semantics", func(t *testing.T) {
Expand All @@ -85,7 +86,7 @@ func TestMutationStringValuePreferPayload(t *testing.T) {
}

got := MutationStringValuePreferPayload(payload, map[string]string{field: "header@example.com"}, field)
require.Equal(t, "[invalid]", got)
assert.Equal(t, "[invalid]", got)
})
}

Expand All @@ -98,14 +99,24 @@ func TestClientFromHandler(t *testing.T) {
client := &generated.Client{}
do.ProvideValue(injector, client)

got, ok := ClientFromHandler(gala.HandlerContext{Injector: injector})
require.True(t, ok)
require.Same(t, client, got)
_, got, ok := ClientFromHandler(gala.HandlerContext{Context: context.Background(), Injector: injector})
assert.True(t, ok)
assert.Same(t, client, got)
})

t.Run("seeds client into handler context", func(t *testing.T) {
injector := do.New()
client := &generated.Client{}
do.ProvideValue(injector, client)

enriched, _, ok := ClientFromHandler(gala.HandlerContext{Context: context.Background(), Injector: injector})
assert.True(t, ok)
assert.Same(t, client, generated.FromContext(enriched.Context))
})

t.Run("returns false without injected client", func(t *testing.T) {
got, ok := ClientFromHandler(gala.HandlerContext{Injector: do.New()})
require.False(t, ok)
require.Nil(t, got)
_, got, ok := ClientFromHandler(gala.HandlerContext{Context: context.Background(), Injector: do.New()})
assert.False(t, ok)
assert.Nil(t, got)
})
}
5 changes: 3 additions & 2 deletions internal/ent/hooks/listeners_entitlements.go
Original file line number Diff line number Diff line change
Expand Up @@ -201,7 +201,7 @@ func softDeleteAllowContext(ctx context.Context) context.Context {

// newEntitlementInvocation gathers prerequisites for entitlement mutation handling.
func newEntitlementInvocation(handlerCtx gala.HandlerContext, payload eventqueue.MutationGalaPayload, allow func(context.Context) context.Context) (*entitlementInvocation, bool) {
client, ok := eventqueue.ClientFromHandler(handlerCtx)
handlerCtx, client, ok := eventqueue.ClientFromHandler(handlerCtx)
if !ok || client.EntitlementManager == nil {
return nil, false
}
Expand All @@ -222,7 +222,8 @@ func newEntitlementInvocation(handlerCtx gala.HandlerContext, payload eventqueue
if strings.TrimSpace(payload.MutationType) == entgen.TypeOrganizationSetting {
setting, err := client.OrganizationSetting.Get(allowCtx, entityID)
if err != nil {
logx.FromContext(handlerCtx.Context).Err(err).Str("organization_setting_id", entityID).Msg("failed to resolve organization from organization setting")
logx.FromContext(handlerCtx.Context).Error().Err(err).Str("organization_setting_id", entityID).Msg("failed to resolve organization from organization setting")

return nil, false
}

Expand Down
Loading