diff --git a/internal/ent/eventqueue/gala_adapter.go b/internal/ent/eventqueue/gala_adapter.go index 7491729f8..5b6cbb0ba 100644 --- a/internal/ent/eventqueue/gala_adapter.go +++ b/internal/ent/eventqueue/gala_adapter.go @@ -62,12 +62,25 @@ type MutationGalaMetadata struct { // NewMutationGalaMetadata builds metadata for Gala mutation envelopes from mutation payload data func NewMutationGalaMetadata(eventID string, payload MutationGalaPayload) MutationGalaMetadata { properties := mutationMetadataProperties(payload) + entityID := strings.TrimSpace(payload.EntityID) - if entityID != "" { + + if entityID != "" || payload.Operation != "" || payload.MutationType != "" { if properties == nil { properties = map[string]string{} } - properties[MutationPropertyEntityID] = entityID + + if entityID != "" { + properties[MutationPropertyEntityID] = entityID + } + + if payload.Operation != "" { + properties[MutationPropertyOperation] = payload.Operation + } + + if payload.MutationType != "" { + properties[MutationPropertyMutationType] = payload.MutationType + } } return MutationGalaMetadata{ @@ -81,9 +94,20 @@ func NewGalaHeadersFromMutationMetadata(metadata MutationGalaMetadata) gala.Head properties := normalizeMutationMetadataProperties(metadata.Properties) eventID := strings.TrimSpace(metadata.EventID) + var tags []string + + if mt := metadata.Properties[MutationPropertyMutationType]; mt != "" { + tags = append(tags, mt) + } + + if op := metadata.Properties[MutationPropertyOperation]; op != "" { + tags = append(tags, op) + } + return gala.Headers{ IdempotencyKey: eventID, Properties: properties, + Tags: tags, } } diff --git a/internal/ent/eventqueue/gala_adapter_test.go b/internal/ent/eventqueue/gala_adapter_test.go index b77c458f0..e95f76979 100644 --- a/internal/ent/eventqueue/gala_adapter_test.go +++ b/internal/ent/eventqueue/gala_adapter_test.go @@ -4,7 +4,7 @@ import ( "context" "testing" - "github.com/stretchr/testify/require" + "github.com/stretchr/testify/assert" "github.com/theopenlane/iam/auth" "github.com/theopenlane/utils/contextx" @@ -24,11 +24,11 @@ func TestNewMutationGalaEnvelope(t *testing.T) { runtime, err := gala.NewGala(context.Background(), gala.Config{ DispatchMode: gala.DispatchModeInMemory, }) - require.NoError(t, err) + assert.NoError(t, err) t.Cleanup(func() { _ = runtime.Close() }) - require.NoError(t, runtime.ContextManager().Register( + assert.NoError(t, runtime.ContextManager().Register( gala.NewTypedContextCodec[galaAdapterTestActor]("adapter_actor"), )) @@ -37,7 +37,7 @@ func TestNewMutationGalaEnvelope(t *testing.T) { Topic: topic, Codec: gala.JSONCodec[MutationGalaPayload]{}, }) - require.NoError(t, err) + assert.NoError(t, err) payload := MutationGalaPayload{ MutationType: "organization", @@ -62,34 +62,34 @@ func TestNewMutationGalaEnvelope(t *testing.T) { metadata := NewMutationGalaMetadata("evt_123", payload) envelope, err := NewMutationGalaEnvelope(emitCtx, runtime, topic, payload, metadata) - require.NoError(t, err) - - require.Equal(t, gala.EventID("evt_123"), envelope.ID) - require.Equal(t, topic.Name, envelope.Topic) - require.Equal(t, "evt_123", envelope.Headers.IdempotencyKey) - require.Equal(t, "name", envelope.Headers.Properties["mutation_field"]) - require.Equal(t, "7", envelope.Headers.Properties["count"]) - require.Equal(t, payload.EntityID, envelope.Headers.Properties[MutationPropertyEntityID]) - require.Equal(t, true, envelope.ContextSnapshot.Flags[gala.ContextFlagWorkflowBypass]) - require.Equal(t, true, envelope.ContextSnapshot.Flags[gala.ContextFlagWorkflowAllowEventEmission]) - require.Contains(t, envelope.ContextSnapshot.Values, gala.ContextKey("adapter_actor")) - require.Contains(t, envelope.ContextSnapshot.Values, gala.ContextKey("durable")) + assert.NoError(t, err) + + assert.Equal(t, gala.EventID("evt_123"), envelope.ID) + assert.Equal(t, topic.Name, envelope.Topic) + assert.Equal(t, "evt_123", envelope.Headers.IdempotencyKey) + assert.Equal(t, "name", envelope.Headers.Properties["mutation_field"]) + assert.Equal(t, "7", envelope.Headers.Properties["count"]) + assert.Equal(t, payload.EntityID, envelope.Headers.Properties[MutationPropertyEntityID]) + assert.Equal(t, true, envelope.ContextSnapshot.Flags[gala.ContextFlagWorkflowBypass]) + assert.Equal(t, true, envelope.ContextSnapshot.Flags[gala.ContextFlagWorkflowAllowEventEmission]) + assert.Contains(t, envelope.ContextSnapshot.Values, gala.ContextKey("adapter_actor")) + assert.Contains(t, envelope.ContextSnapshot.Values, gala.ContextKey("durable")) restoredContext, err := runtime.ContextManager().Restore(context.Background(), envelope.ContextSnapshot) - require.NoError(t, err) + assert.NoError(t, err) restoredUser, err := auth.GetAuthenticatedUserFromContext(restoredContext) - require.NoError(t, err) - require.Equal(t, "subject_123", restoredUser.SubjectID) - require.Equal(t, "org_123", restoredUser.OrganizationID) + assert.NoError(t, err) + assert.Equal(t, "subject_123", restoredUser.SubjectID) + assert.Equal(t, "org_123", restoredUser.OrganizationID) decodedAny, err := runtime.Registry().DecodePayload(topic.Name, envelope.Payload) - require.NoError(t, err) + assert.NoError(t, err) decoded, ok := decodedAny.(MutationGalaPayload) - require.True(t, ok) - require.Equal(t, payload.EntityID, decoded.EntityID) - require.Equal(t, payload.Operation, decoded.Operation) + assert.True(t, ok) + assert.Equal(t, payload.EntityID, decoded.EntityID) + assert.Equal(t, payload.Operation, decoded.Operation) } // TestNewGalaHeadersFromMutationMetadata verifies property normalization for gala headers @@ -105,11 +105,11 @@ func TestNewGalaHeadersFromMutationMetadata(t *testing.T) { }, }) - require.Equal(t, "evt_456", headers.IdempotencyKey) - require.Equal(t, "true", headers.Properties["active"]) - require.Equal(t, "5", headers.Properties["count"]) + assert.Equal(t, "evt_456", headers.IdempotencyKey) + assert.Equal(t, "true", headers.Properties["active"]) + assert.Equal(t, "5", headers.Properties["count"]) _, exists := headers.Properties[""] - require.False(t, exists) + assert.False(t, exists) } // TestMutationGalaPayloadChangeSetRoundTrip verifies payload change-set projections preserve values and clone maps/slices @@ -135,15 +135,15 @@ func TestMutationGalaPayloadChangeSetRoundTrip(t *testing.T) { changeSet.AddedIDs["controls"][0] = "mutated" changeSet.ProposedChanges["status"] = "mutated" - require.Equal(t, "status", payload.ChangedFields[0]) - require.Equal(t, "one", payload.AddedIDs["controls"][0]) - require.Equal(t, "approved", payload.ProposedChanges["status"]) + assert.Equal(t, "status", payload.ChangedFields[0]) + assert.Equal(t, "one", payload.AddedIDs["controls"][0]) + assert.Equal(t, "approved", payload.ProposedChanges["status"]) var roundTrip MutationGalaPayload roundTrip.SetChangeSet(payload.ChangeSet()) - require.Equal(t, payload.ChangedFields, roundTrip.ChangedFields) - require.Equal(t, payload.ChangedEdges, roundTrip.ChangedEdges) - require.Equal(t, payload.AddedIDs, roundTrip.AddedIDs) - require.Equal(t, payload.RemovedIDs, roundTrip.RemovedIDs) - require.Equal(t, payload.ProposedChanges, roundTrip.ProposedChanges) + assert.Equal(t, payload.ChangedFields, roundTrip.ChangedFields) + assert.Equal(t, payload.ChangedEdges, roundTrip.ChangedEdges) + assert.Equal(t, payload.AddedIDs, roundTrip.AddedIDs) + assert.Equal(t, payload.RemovedIDs, roundTrip.RemovedIDs) + assert.Equal(t, payload.ProposedChanges, roundTrip.ProposedChanges) } diff --git a/internal/ent/eventqueue/handler_helpers.go b/internal/ent/eventqueue/handler_helpers.go index 488b417cc..b90da0b70 100644 --- a/internal/ent/eventqueue/handler_helpers.go +++ b/internal/ent/eventqueue/handler_helpers.go @@ -7,12 +7,19 @@ import ( "github.com/theopenlane/core/pkg/gala" ) -// ClientFromHandler resolves the ent client from Gala listener dependencies -func ClientFromHandler(ctx gala.HandlerContext) (*generated.Client, bool) { +// ClientFromHandler resolves the ent client from the Gala injector and seeds it into +// HandlerContext.Context so that ent interceptors relying on generated.FromContext work +// correctly when the context was reconstructed from a durable snapshot. +// The restored snapshot carries auth claims and log fields but not the ent client, +// which is a live runtime dependency. Without it, interceptors such as InterceptorModules +// cannot resolve the FGA authz client and incorrectly report features as disabled. +func ClientFromHandler(ctx gala.HandlerContext) (gala.HandlerContext, *generated.Client, bool) { client, err := do.Invoke[*generated.Client](ctx.Injector) if err != nil || client == nil { - return nil, false + return ctx, nil, false } - return client, true + ctx.Context = generated.NewContext(ctx.Context, client) + + return ctx, client, true } diff --git a/internal/ent/eventqueue/mutation_helpers.go b/internal/ent/eventqueue/mutation_helpers.go index fba3db489..2753421b0 100644 --- a/internal/ent/eventqueue/mutation_helpers.go +++ b/internal/ent/eventqueue/mutation_helpers.go @@ -10,6 +10,10 @@ import ( const ( // MutationPropertyEntityID is the standard mutation metadata key used for entity identifiers MutationPropertyEntityID = "ID" + // MutationPropertyOperation is the mutation metadata key used for the operation type + MutationPropertyOperation = "operation" + // MutationPropertyMutationType is the mutation metadata key used for the ent schema type + MutationPropertyMutationType = "mutation_type" // SoftDeleteOne is a synthetic operation used for soft-delete hooks SoftDeleteOne = "SoftDeleteOne" ) diff --git a/internal/ent/eventqueue/mutation_helpers_test.go b/internal/ent/eventqueue/mutation_helpers_test.go index c59cf3fa0..4d1427db4 100644 --- a/internal/ent/eventqueue/mutation_helpers_test.go +++ b/internal/ent/eventqueue/mutation_helpers_test.go @@ -1,10 +1,11 @@ package eventqueue import ( + "context" "testing" "github.com/samber/do/v2" - "github.com/stretchr/testify/require" + "github.com/stretchr/testify/assert" "github.com/theopenlane/core/internal/ent/generated" "github.com/theopenlane/core/pkg/gala" @@ -24,12 +25,12 @@ func TestMutationStringValueOrProperty(t *testing.T) { } got := MutationStringValueOrProperty(payload, map[string]string{field: "header@example.com"}, field) - require.Equal(t, "proposed@example.com", got) + assert.Equal(t, "proposed@example.com", got) }) t.Run("falls back to properties when proposed value is missing", func(t *testing.T) { got := MutationStringValueOrProperty(MutationGalaPayload{}, map[string]string{field: "header@example.com"}, field) - require.Equal(t, "header@example.com", got) + assert.Equal(t, "header@example.com", got) }) t.Run("falls back to properties when proposed value is blank", func(t *testing.T) { @@ -40,7 +41,7 @@ func TestMutationStringValueOrProperty(t *testing.T) { } got := MutationStringValueOrProperty(payload, map[string]string{field: "header@example.com"}, field) - require.Equal(t, "header@example.com", got) + assert.Equal(t, "header@example.com", got) }) } @@ -58,12 +59,12 @@ func TestMutationStringValuePreferPayload(t *testing.T) { } got := MutationStringValuePreferPayload(payload, map[string]string{field: "header@example.com"}, field) - require.Equal(t, "proposed@example.com", got) + assert.Equal(t, "proposed@example.com", got) }) t.Run("falls back to properties when proposed value is missing", func(t *testing.T) { got := MutationStringValuePreferPayload(MutationGalaPayload{}, map[string]string{field: "header@example.com"}, field) - require.Equal(t, "header@example.com", got) + assert.Equal(t, "header@example.com", got) }) t.Run("does not fall back when proposed value is blank", func(t *testing.T) { @@ -74,7 +75,7 @@ func TestMutationStringValuePreferPayload(t *testing.T) { } got := MutationStringValuePreferPayload(payload, map[string]string{field: "header@example.com"}, field) - require.Empty(t, got) + assert.Empty(t, got) }) t.Run("preserves non-string proposed value conversion semantics", func(t *testing.T) { @@ -85,7 +86,7 @@ func TestMutationStringValuePreferPayload(t *testing.T) { } got := MutationStringValuePreferPayload(payload, map[string]string{field: "header@example.com"}, field) - require.Equal(t, "[invalid]", got) + assert.Equal(t, "[invalid]", got) }) } @@ -98,14 +99,24 @@ func TestClientFromHandler(t *testing.T) { client := &generated.Client{} do.ProvideValue(injector, client) - got, ok := ClientFromHandler(gala.HandlerContext{Injector: injector}) - require.True(t, ok) - require.Same(t, client, got) + _, got, ok := ClientFromHandler(gala.HandlerContext{Context: context.Background(), Injector: injector}) + assert.True(t, ok) + assert.Same(t, client, got) + }) + + t.Run("seeds client into handler context", func(t *testing.T) { + injector := do.New() + client := &generated.Client{} + do.ProvideValue(injector, client) + + enriched, _, ok := ClientFromHandler(gala.HandlerContext{Context: context.Background(), Injector: injector}) + assert.True(t, ok) + assert.Same(t, client, generated.FromContext(enriched.Context)) }) t.Run("returns false without injected client", func(t *testing.T) { - got, ok := ClientFromHandler(gala.HandlerContext{Injector: do.New()}) - require.False(t, ok) - require.Nil(t, got) + _, got, ok := ClientFromHandler(gala.HandlerContext{Context: context.Background(), Injector: do.New()}) + assert.False(t, ok) + assert.Nil(t, got) }) } diff --git a/internal/ent/hooks/listeners_entitlements.go b/internal/ent/hooks/listeners_entitlements.go index 1641591e9..2bead01a1 100644 --- a/internal/ent/hooks/listeners_entitlements.go +++ b/internal/ent/hooks/listeners_entitlements.go @@ -201,7 +201,7 @@ func softDeleteAllowContext(ctx context.Context) context.Context { // newEntitlementInvocation gathers prerequisites for entitlement mutation handling. func newEntitlementInvocation(handlerCtx gala.HandlerContext, payload eventqueue.MutationGalaPayload, allow func(context.Context) context.Context) (*entitlementInvocation, bool) { - client, ok := eventqueue.ClientFromHandler(handlerCtx) + handlerCtx, client, ok := eventqueue.ClientFromHandler(handlerCtx) if !ok || client.EntitlementManager == nil { return nil, false } @@ -222,7 +222,8 @@ func newEntitlementInvocation(handlerCtx gala.HandlerContext, payload eventqueue if strings.TrimSpace(payload.MutationType) == entgen.TypeOrganizationSetting { setting, err := client.OrganizationSetting.Get(allowCtx, entityID) if err != nil { - logx.FromContext(handlerCtx.Context).Err(err).Str("organization_setting_id", entityID).Msg("failed to resolve organization from organization setting") + logx.FromContext(handlerCtx.Context).Error().Err(err).Str("organization_setting_id", entityID).Msg("failed to resolve organization from organization setting") + return nil, false } diff --git a/internal/ent/hooks/listeners_trustcenter_cache.go b/internal/ent/hooks/listeners_trustcenter_cache.go index e9cf76827..d305f5c9e 100644 --- a/internal/ent/hooks/listeners_trustcenter_cache.go +++ b/internal/ent/hooks/listeners_trustcenter_cache.go @@ -83,7 +83,7 @@ func RegisterGalaTrustCenterCacheListeners(registry *gala.Registry) ([]gala.List // handleTrustCenterDocMutationGala processes TrustCenterDoc mutations and invalidates cache when needed. func handleTrustCenterDocMutationGala(ctx gala.HandlerContext, payload eventqueue.MutationGalaPayload) error { - client, ok := eventqueue.ClientFromHandler(ctx) + ctx, client, ok := eventqueue.ClientFromHandler(ctx) if !ok { return nil } @@ -123,9 +123,13 @@ func handleTrustCenterDocMutationGala(ctx gala.HandlerContext, payload eventqueu Where(trustcenterdoc.ID(docID)). Select(trustcenterdoc.FieldTrustCenterID). Only(ctx.Context) - if err == nil && doc != nil { - trustCenterID = doc.TrustCenterID + if err != nil || doc == nil { + logx.FromContext(ctx.Context).Warn().Err(err).Str("doc_id", docID).Msg("failed to query trust center doc for cache invalidation") + + return nil } + + trustCenterID = doc.TrustCenterID } } @@ -138,7 +142,7 @@ func handleTrustCenterDocMutationGala(ctx gala.HandlerContext, payload eventqueu // handleNoteMutationGala processes Note mutations and invalidates cache when trust center linkage changes. func handleNoteMutationGala(ctx gala.HandlerContext, payload eventqueue.MutationGalaPayload) error { - client, ok := eventqueue.ClientFromHandler(ctx) + ctx, client, ok := eventqueue.ClientFromHandler(ctx) if !ok { return nil } @@ -177,7 +181,7 @@ func handleNoteMutationGala(ctx gala.HandlerContext, payload eventqueue.Mutation // handleTrustCenterEntityMutationGala processes TrustCenterEntity mutations and invalidates cache. func handleTrustCenterEntityMutationGala(ctx gala.HandlerContext, payload eventqueue.MutationGalaPayload) error { - client, ok := eventqueue.ClientFromHandler(ctx) + ctx, client, ok := eventqueue.ClientFromHandler(ctx) if !ok { return nil } @@ -202,7 +206,7 @@ func handleTrustCenterEntityMutationGala(ctx gala.HandlerContext, payload eventq // handleTrustCenterSubprocessorMutationGala processes TrustCenterSubprocessor mutations and invalidates cache. func handleTrustCenterSubprocessorMutationGala(ctx gala.HandlerContext, payload eventqueue.MutationGalaPayload) error { - client, ok := eventqueue.ClientFromHandler(ctx) + ctx, client, ok := eventqueue.ClientFromHandler(ctx) if !ok { return nil } @@ -212,9 +216,12 @@ func handleTrustCenterSubprocessorMutationGala(ctx gala.HandlerContext, payload entityID, ok := eventqueue.MutationEntityID(payload, ctx.Envelope.Headers.Properties) if ok && entityID != "" { entity, err := client.TrustCenterSubprocessor.Get(ctx.Context, entityID) - if err == nil && entity != nil { - trustCenterID = entity.TrustCenterID + if err != nil || entity == nil { + logx.FromContext(ctx.Context).Warn().Err(err).Str("subprocessor_id", entityID).Msg("failed to query trust center subprocessor for cache invalidation") + return nil } + + trustCenterID = entity.TrustCenterID } } @@ -227,7 +234,7 @@ func handleTrustCenterSubprocessorMutationGala(ctx gala.HandlerContext, payload // handleTrustCenterComplianceMutationGala processes TrustCenterCompliance mutations and invalidates cache. func handleTrustCenterComplianceMutationGala(ctx gala.HandlerContext, payload eventqueue.MutationGalaPayload) error { - client, ok := eventqueue.ClientFromHandler(ctx) + ctx, client, ok := eventqueue.ClientFromHandler(ctx) if !ok { return nil } @@ -237,9 +244,13 @@ func handleTrustCenterComplianceMutationGala(ctx gala.HandlerContext, payload ev entityID, ok := eventqueue.MutationEntityID(payload, ctx.Envelope.Headers.Properties) if ok && entityID != "" { entity, err := client.TrustCenterCompliance.Get(ctx.Context, entityID) - if err == nil && entity != nil { - trustCenterID = entity.TrustCenterID + if err != nil || entity == nil { + logx.FromContext(ctx.Context).Warn().Err(err).Str("compliance_id", entityID).Msg("failed to query trust center compliance for cache invalidation") + + return nil } + + trustCenterID = entity.TrustCenterID } } @@ -252,7 +263,7 @@ func handleTrustCenterComplianceMutationGala(ctx gala.HandlerContext, payload ev // handleSubprocessorMutationGala processes Subprocessor mutations and invalidates related trust center cache. func handleSubprocessorMutationGala(ctx gala.HandlerContext, payload eventqueue.MutationGalaPayload) error { - client, ok := eventqueue.ClientFromHandler(ctx) + ctx, client, ok := eventqueue.ClientFromHandler(ctx) if !ok { return nil } @@ -272,6 +283,7 @@ func handleSubprocessorMutationGala(ctx gala.HandlerContext, payload eventqueue. All(ctx.Context) if err != nil { logx.FromContext(ctx.Context).Warn().Err(err).Str("subprocessor_id", subprocessorID).Msg("failed to query trust center subprocessors") + return nil } @@ -294,7 +306,7 @@ func handleSubprocessorMutationGala(ctx gala.HandlerContext, payload eventqueue. // handleStandardMutationGala processes Standard mutations and invalidates related trust center cache. func handleStandardMutationGala(ctx gala.HandlerContext, payload eventqueue.MutationGalaPayload) error { - client, ok := eventqueue.ClientFromHandler(ctx) + ctx, client, ok := eventqueue.ClientFromHandler(ctx) if !ok { return nil } @@ -314,6 +326,7 @@ func handleStandardMutationGala(ctx gala.HandlerContext, payload eventqueue.Muta All(ctx.Context) if err != nil { logx.FromContext(ctx.Context).Warn().Err(err).Str("standard_id", standardID).Msg("failed to query trust center docs") + return nil } @@ -336,7 +349,7 @@ func handleStandardMutationGala(ctx gala.HandlerContext, payload eventqueue.Muta // handleTrustCenterSettingMutationGala processes TrustCenterSetting mutations and refreshes cache. func handleTrustCenterSettingMutationGala(ctx gala.HandlerContext, payload eventqueue.MutationGalaPayload) error { - client, ok := eventqueue.ClientFromHandler(ctx) + ctx, client, ok := eventqueue.ClientFromHandler(ctx) if !ok { return nil } @@ -346,9 +359,12 @@ func handleTrustCenterSettingMutationGala(ctx gala.HandlerContext, payload event settingID, ok := eventqueue.MutationEntityID(payload, ctx.Envelope.Headers.Properties) if ok && settingID != "" { setting, err := client.TrustCenterSetting.Get(ctx.Context, settingID) - if err == nil && setting != nil { - trustCenterID = setting.TrustCenterID + if err != nil || setting == nil { + logx.FromContext(ctx.Context).Warn().Err(err).Str("setting_id", settingID).Msg("failed to query trust center setting for cache invalidation") + return nil } + + trustCenterID = setting.TrustCenterID } } @@ -361,7 +377,7 @@ func handleTrustCenterSettingMutationGala(ctx gala.HandlerContext, payload event // handleTrustCenterMutationGala processes TrustCenter mutations and refreshes cache. func handleTrustCenterMutationGala(ctx gala.HandlerContext, payload eventqueue.MutationGalaPayload) error { - client, ok := eventqueue.ClientFromHandler(ctx) + ctx, client, ok := eventqueue.ClientFromHandler(ctx) if !ok { return nil } diff --git a/internal/ent/hooks/listeners_workflow.go b/internal/ent/hooks/listeners_workflow.go index c86b922e4..fcbf6d0ad 100644 --- a/internal/ent/hooks/listeners_workflow.go +++ b/internal/ent/hooks/listeners_workflow.go @@ -207,7 +207,7 @@ func handleWorkflowInstanceCompletedGala(ctx gala.HandlerContext, payload gala.W // workflowListenersFromGala resolves workflow listener dependencies from the gala injector // and enriches the handler context so the ent client is available to interceptors func workflowListenersFromGala(handlerCtx gala.HandlerContext) (gala.HandlerContext, *engine.WorkflowListeners, bool) { - client, ok := eventqueue.ClientFromHandler(handlerCtx) + handlerCtx, client, ok := eventqueue.ClientFromHandler(handlerCtx) if !ok { return handlerCtx, nil, false } @@ -222,11 +222,5 @@ func workflowListenersFromGala(handlerCtx gala.HandlerContext) (gala.HandlerCont return handlerCtx, nil, false } - // Ensure the ent client is in context for interceptors and privacy checks. - // In durable dispatch the context is reconstructed from a snapshot that does - // not include the ent client, so interceptors like FGA auth would get a nil - // client from generated.FromContext. - handlerCtx.Context = generated.NewContext(handlerCtx.Context, client) - return handlerCtx, engine.NewWorkflowListeners(client, wfEngine, runtime), true } diff --git a/internal/ent/notifications/events.go b/internal/ent/notifications/events.go index f76a13096..d7d0d45af 100644 --- a/internal/ent/notifications/events.go +++ b/internal/ent/notifications/events.go @@ -70,7 +70,7 @@ type documentNotificationInput struct { // handleTaskMutation processes task mutations and creates notifications when assignee changes or mentions are added. func handleTaskMutation(ctx gala.HandlerContext, payload eventqueue.MutationGalaPayload) error { - client, ok := eventqueue.ClientFromHandler(ctx) + ctx, client, ok := eventqueue.ClientFromHandler(ctx) if !ok { return ErrFailedToGetClient } @@ -229,7 +229,7 @@ func handleDocumentNeedsApproval(ctx gala.HandlerContext, payload eventqueue.Mut return nil } - client, ok := eventqueue.ClientFromHandler(ctx) + ctx, client, ok := eventqueue.ClientFromHandler(ctx) if !ok { return ErrFailedToGetClient } diff --git a/internal/ent/notifications/exports.go b/internal/ent/notifications/exports.go index bd0853bed..c6adde426 100644 --- a/internal/ent/notifications/exports.go +++ b/internal/ent/notifications/exports.go @@ -26,7 +26,7 @@ type exportFields struct { // handleExportMutation processes export mutations and creates notifications when status changes to READY or FAILED. func handleExportMutation(ctx gala.HandlerContext, payload eventqueue.MutationGalaPayload) error { - client, ok := eventqueue.ClientFromHandler(ctx) + ctx, client, ok := eventqueue.ClientFromHandler(ctx) if !ok { return ErrFailedToGetClient } diff --git a/internal/ent/notifications/mentions.go b/internal/ent/notifications/mentions.go index da5f25e7f..9809a7567 100644 --- a/internal/ent/notifications/mentions.go +++ b/internal/ent/notifications/mentions.go @@ -74,7 +74,7 @@ type mentionNotificationInput struct { // handleNoteMutation processes note mutations and creates notifications for mentioned users func handleNoteMutation(ctx gala.HandlerContext, payload eventqueue.MutationGalaPayload) error { - client, ok := eventqueue.ClientFromHandler(ctx) + ctx, client, ok := eventqueue.ClientFromHandler(ctx) if !ok { return ErrFailedToGetClient } @@ -496,7 +496,7 @@ type oldDocumentDetails struct { // handleObjectMentions checks mentions in object details fields (task/risk/procedure/policy). func handleObjectMentions(ctx gala.HandlerContext, payload eventqueue.MutationGalaPayload) error { - client, ok := eventqueue.ClientFromHandler(ctx) + ctx, client, ok := eventqueue.ClientFromHandler(ctx) if !ok { return ErrFailedToGetClient } diff --git a/pkg/gala/dispatcher.go b/pkg/gala/dispatcher.go index 35631c8bf..f47f603a5 100644 --- a/pkg/gala/dispatcher.go +++ b/pkg/gala/dispatcher.go @@ -85,6 +85,18 @@ func NewRiverDispatchWorker(galaProvider Provider) *RiverDispatchWorker { return &RiverDispatchWorker{galaProvider: galaProvider} } +// riverJobMetadata is the JSON structure attached to River jobs for UI visibility +type riverJobMetadata struct { + // Topic is the gala topic name + Topic string `json:"topic"` + // EventID is the gala event identifier + EventID string `json:"event_id"` + // Listeners are the registered listener names for the topic + Listeners []string `json:"listeners,omitempty"` + // Properties contains envelope header properties (entity_id, operation, mutation_type, etc.) + Properties map[string]string `json:"properties,omitempty"` +} + // Dispatch dispatches an envelope to River for processing by a Worker func (d *RiverDispatcher) Dispatch(ctx context.Context, envelope Envelope) error { args, err := NewRiverDispatchArgs(envelope) @@ -99,12 +111,25 @@ func (d *RiverDispatcher) Dispatch(ctx context.Context, envelope Envelope) error insertOpts := &river.InsertOpts{ Queue: queueName, + Tags: envelope.Headers.Tags, } if envelope.Headers.MaxAttempts > 0 { insertOpts.MaxAttempts = envelope.Headers.MaxAttempts } + meta, err := json.Marshal(riverJobMetadata{ + Topic: string(envelope.Topic), + EventID: string(envelope.ID), + Listeners: envelope.Headers.Listeners, + Properties: envelope.Headers.Properties, + }) + if err != nil { + return ErrRiverEnvelopeEncodeFailed + } + + insertOpts.Metadata = meta + if _, err = d.jobClient.Insert(ctx, args, insertOpts); err != nil { return ErrRiverDispatchInsertFailed } diff --git a/pkg/gala/gala.go b/pkg/gala/gala.go index e12e0b9ae..4114a4009 100644 --- a/pkg/gala/gala.go +++ b/pkg/gala/gala.go @@ -277,6 +277,8 @@ func (g *Gala) EmitWithHeaders(ctx context.Context, topic TopicName, payload any return EmitReceipt{EventID: envelope.ID, Err: ErrDispatcherRequired} } + envelope.Headers.Listeners = g.registry.listenerNamesForTopic(topic) + if err := g.dispatcher.Dispatch(ctx, envelope); err != nil { logx.FromContext(ctx).Debug().Err(err).Str("event_id", string(envelope.ID)).Str("topic", string(topic)).Msg("gala event dispatch failed") @@ -309,6 +311,8 @@ func (g *Gala) EmitEnvelope(ctx context.Context, envelope Envelope) error { envelope.ContextSnapshot = snapshot } + envelope.Headers.Listeners = g.registry.listenerNamesForTopic(envelope.Topic) + return g.dispatcher.Dispatch(ctx, envelope) } @@ -353,7 +357,7 @@ func (g *Gala) DispatchEnvelope(ctx context.Context, envelope Envelope) error { } } - logx.FromContext(restoredContext).Debug().Str("event_id", string(envelope.ID)).Str("topic", string(envelope.Topic)).Str("operation", operation).Int("listener_count", len(listeners)).Msg("gala event processed") + logx.FromContext(restoredContext).Info().Str("event_id", string(envelope.ID)).Str("topic", string(envelope.Topic)).Str("operation", operation).Int("listener_count", len(listeners)).Msg("gala event processed") return nil } diff --git a/pkg/gala/registry.go b/pkg/gala/registry.go index 01fa423ac..923d07bc2 100644 --- a/pkg/gala/registry.go +++ b/pkg/gala/registry.go @@ -142,6 +142,21 @@ func (r *Registry) DecodePayload(topic TopicName, payload []byte) (any, error) { return registration.decode(payload) } +// listenerNamesForTopic returns the registered listener names for a topic +func (r *Registry) listenerNamesForTopic(topic TopicName) []string { + listeners := r.registeredListeners(topic) + if len(listeners) == 0 { + return nil + } + + names := make([]string, len(listeners)) + for i, l := range listeners { + names[i] = l.name + } + + return names +} + // registeredListeners returns a snapshot of listeners for one topic. func (r *Registry) registeredListeners(topic TopicName) []registeredListener { r.mu.RLock() diff --git a/pkg/gala/types.go b/pkg/gala/types.go index 5f89166dc..fa2358955 100644 --- a/pkg/gala/types.go +++ b/pkg/gala/types.go @@ -11,6 +11,10 @@ type Headers struct { IdempotencyKey string `json:"idempotency_key,omitempty"` // Properties stores additional typed metadata projected to string values Properties map[string]string `json:"properties,omitempty"` + // Tags are low-cardinality labels forwarded to the transport layer (e.g. River job tags) + Tags []string `json:"tags,omitempty"` + // Listeners are the registered listener names for the topic, populated at dispatch time + Listeners []string `json:"listeners,omitempty"` // Queue optionally overrides the River queue used for dispatch. Queue string `json:"queue,omitempty"` // MaxAttempts optionally overrides River max attempts for this envelope.