diff --git a/build/Dockerfile.onix-adapter-deg b/build/Dockerfile.onix-adapter-deg index 15cb6db9..0240736b 100644 --- a/build/Dockerfile.onix-adapter-deg +++ b/build/Dockerfile.onix-adapter-deg @@ -45,8 +45,8 @@ WORKDIR /workspace/deg-plugins # Build DEG ledger recorder plugin RUN go build -buildmode=plugin -o /workspace/beckn-onix/plugins/degledgerrecorder.so ./degledgerrecorder/cmd/plugin.go -# # Build policy enforcer plugin -# RUN go build -buildmode=plugin -o /workspace/beckn-onix/plugins/policyenforcer.so ./policyenforcer/cmd/plugin.go +# Build DEG revenue flows plugin +RUN go build -buildmode=plugin -o /workspace/beckn-onix/plugins/revenueflows.so ./revenueflows/cmd/plugin.go # Create minimal runtime image # Using debian-slim (glibc) for Go plugin (.so) compatibility with the bullseye builder. diff --git a/build/build-multiarch.sh b/build/build-multiarch.sh index 29d267ee..928743fa 100755 --- a/build/build-multiarch.sh +++ b/build/build-multiarch.sh @@ -1,8 +1,9 @@ #!/bin/bash -# Multi-Architecture Docker Build Script for DEG Ledger Recorder Plugin +# Multi-Architecture Docker Build Script for DEG Plugins # ====================================================================== -# Builds onix-adapter with DEG plugins for linux/amd64 and linux/arm64 +# Builds onix-adapter with DEG plugins (degledgerrecorder, revenueflows) +# for linux/amd64 and linux/arm64 # # Prerequisites: # - Docker Desktop or Docker Engine with buildx support @@ -119,7 +120,7 @@ else fi echo "============================================" -echo "Multi-Arch Build: DEG Ledger Recorder" +echo "Multi-Arch Build: DEG Plugins (ledgerrecorder + revenueflows)" echo "============================================" echo "DEG Root: $DEG_ROOT" echo "Beckn-ONIX Root: $BECKN_ONIX_ROOT" diff --git a/plugins/go.mod b/plugins/go.mod index f9eec617..f37117e7 100644 --- a/plugins/go.mod +++ b/plugins/go.mod @@ -45,12 +45,14 @@ require ( github.com/xeipuuv/gojsonreference v0.0.0-20180127040603-bd5ef7bd5415 // indirect github.com/yashtewari/glob-intersection v0.2.0 // indirect go.opentelemetry.io/auto/sdk v1.2.1 // indirect - go.opentelemetry.io/otel v1.39.0 // indirect + go.opentelemetry.io/otel v1.40.0 // indirect go.opentelemetry.io/otel/exporters/prometheus v0.46.0 // indirect - go.opentelemetry.io/otel/metric v1.39.0 // indirect - go.opentelemetry.io/otel/sdk v1.39.0 // indirect - go.opentelemetry.io/otel/sdk/metric v1.39.0 // indirect - go.opentelemetry.io/otel/trace v1.39.0 // indirect + go.opentelemetry.io/otel/log v0.16.0 // indirect + go.opentelemetry.io/otel/metric v1.40.0 // indirect + go.opentelemetry.io/otel/sdk v1.40.0 // indirect + go.opentelemetry.io/otel/sdk/log v0.16.0 // indirect + go.opentelemetry.io/otel/sdk/metric v1.40.0 // indirect + go.opentelemetry.io/otel/trace v1.40.0 // indirect go.yaml.in/yaml/v2 v2.4.2 // indirect go.yaml.in/yaml/v3 v3.0.4 // indirect golang.org/x/sync v0.19.0 // indirect @@ -58,5 +60,6 @@ require ( google.golang.org/protobuf v1.36.11 // indirect gopkg.in/ini.v1 v1.67.1 // indirect gopkg.in/natefinch/lumberjack.v2 v2.2.1 // indirect + gopkg.in/yaml.v3 v3.0.1 // indirect sigs.k8s.io/yaml v1.6.0 // indirect ) diff --git a/plugins/go.sum b/plugins/go.sum index 64fceb57..4f32a4fe 100644 --- a/plugins/go.sum +++ b/plugins/go.sum @@ -49,8 +49,8 @@ github.com/google/go-cmp v0.7.0 h1:wk8382ETsv4JYUZwIsn6YpYiWiBsYLSJiTsyBybVuN8= github.com/google/go-cmp v0.7.0/go.mod h1:pXiqmnSA92OHEEa9HXL2W4E7lf9JzCmGVUdgjX3N/iU= github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0= github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= -github.com/grpc-ecosystem/grpc-gateway/v2 v2.27.3 h1:NmZ1PKzSTQbuGHw9DGPFomqkkLWMC+vZCkfs+FHv1Vg= -github.com/grpc-ecosystem/grpc-gateway/v2 v2.27.3/go.mod h1:zQrxl1YP88HQlA6i9c63DSVPFklWpGX4OWAc9bFuaH4= +github.com/grpc-ecosystem/grpc-gateway/v2 v2.27.7 h1:X+2YciYSxvMQK0UZ7sg45ZVabVZBeBuvMkmuI2V3Fak= +github.com/grpc-ecosystem/grpc-gateway/v2 v2.27.7/go.mod h1:lW34nIZuQ8UDPdkon5fmfp2l3+ZkQ2me/+oecHYLOII= github.com/klauspost/compress v1.18.2 h1:iiPHWW0YrcFgpBYhsA6D1+fqHssJscY/Tm/y2Uqnapk= github.com/klauspost/compress v1.18.2/go.mod h1:R0h/fSBs8DE4ENlcrlib3PsXS61voFxhIs2DeRhCvJ4= github.com/kr/pretty v0.3.1 h1:flRD4NNwYAUpkphVc1HcthR4KEIFJ65n8Mw5qdRn3LE= @@ -135,8 +135,8 @@ go.opentelemetry.io/auto/sdk v1.2.1 h1:jXsnJ4Lmnqd11kwkBV2LgLoFMZKizbCi5fNZ/ipaZ go.opentelemetry.io/auto/sdk v1.2.1/go.mod h1:KRTj+aOaElaLi+wW1kO/DZRXwkF4C5xPbEe3ZiIhN7Y= go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.64.0 h1:ssfIgGNANqpVFCndZvcuyKbl0g+UAVcbBcqGkG28H0Y= go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.64.0/go.mod h1:GQ/474YrbE4Jx8gZ4q5I4hrhUzM6UPzyrqJYV2AqPoQ= -go.opentelemetry.io/otel v1.39.0 h1:8yPrr/S0ND9QEfTfdP9V+SiwT4E0G7Y5MO7p85nis48= -go.opentelemetry.io/otel v1.39.0/go.mod h1:kLlFTywNWrFyEdH0oj2xK0bFYZtHRYUdv1NklR/tgc8= +go.opentelemetry.io/otel v1.40.0 h1:oA5YeOcpRTXq6NN7frwmwFR0Cn3RhTVZvXsP4duvCms= +go.opentelemetry.io/otel v1.40.0/go.mod h1:IMb+uXZUKkMXdPddhwAHm6UfOwJyh4ct1ybIlV14J0g= go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.39.0 h1:f0cb2XPmrqn4XMy9PNliTgRKJgS5WcL/u0/WRYGz4t0= go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.39.0/go.mod h1:vnakAaFckOMiMtOIhFI2MNH4FYrZzXCYxmb1LlhoGz8= go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc v1.39.0 h1:in9O8ESIOlwJAEGTkkf34DesGRAc/Pn8qJ7k3r/42LM= @@ -145,14 +145,18 @@ go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracehttp v1.39.0 h1:Ckwye go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracehttp v1.39.0/go.mod h1:teIFJh5pW2y+AN7riv6IBPX2DuesS3HgP39mwOspKwU= go.opentelemetry.io/otel/exporters/prometheus v0.46.0 h1:I8WIFXR351FoLJYuloU4EgXbtNX2URfU/85pUPheIEQ= go.opentelemetry.io/otel/exporters/prometheus v0.46.0/go.mod h1:ztwVUHe5DTR/1v7PeuGRnU5Bbd4QKYwApWmuutKsJSs= -go.opentelemetry.io/otel/metric v1.39.0 h1:d1UzonvEZriVfpNKEVmHXbdf909uGTOQjA0HF0Ls5Q0= -go.opentelemetry.io/otel/metric v1.39.0/go.mod h1:jrZSWL33sD7bBxg1xjrqyDjnuzTUB0x1nBERXd7Ftcs= -go.opentelemetry.io/otel/sdk v1.39.0 h1:nMLYcjVsvdui1B/4FRkwjzoRVsMK8uL/cj0OyhKzt18= -go.opentelemetry.io/otel/sdk v1.39.0/go.mod h1:vDojkC4/jsTJsE+kh+LXYQlbL8CgrEcwmt1ENZszdJE= -go.opentelemetry.io/otel/sdk/metric v1.39.0 h1:cXMVVFVgsIf2YL6QkRF4Urbr/aMInf+2WKg+sEJTtB8= -go.opentelemetry.io/otel/sdk/metric v1.39.0/go.mod h1:xq9HEVH7qeX69/JnwEfp6fVq5wosJsY1mt4lLfYdVew= -go.opentelemetry.io/otel/trace v1.39.0 h1:2d2vfpEDmCJ5zVYz7ijaJdOF59xLomrvj7bjt6/qCJI= -go.opentelemetry.io/otel/trace v1.39.0/go.mod h1:88w4/PnZSazkGzz/w84VHpQafiU4EtqqlVdxWy+rNOA= +go.opentelemetry.io/otel/log v0.16.0 h1:DeuBPqCi6pQwtCK0pO4fvMB5eBq6sNxEnuTs88pjsN4= +go.opentelemetry.io/otel/log v0.16.0/go.mod h1:rWsmqNVTLIA8UnwYVOItjyEZDbKIkMxdQunsIhpUMes= +go.opentelemetry.io/otel/metric v1.40.0 h1:rcZe317KPftE2rstWIBitCdVp89A2HqjkxR3c11+p9g= +go.opentelemetry.io/otel/metric v1.40.0/go.mod h1:ib/crwQH7N3r5kfiBZQbwrTge743UDc7DTFVZrrXnqc= +go.opentelemetry.io/otel/sdk v1.40.0 h1:KHW/jUzgo6wsPh9At46+h4upjtccTmuZCFAc9OJ71f8= +go.opentelemetry.io/otel/sdk v1.40.0/go.mod h1:Ph7EFdYvxq72Y8Li9q8KebuYUr2KoeyHx0DRMKrYBUE= +go.opentelemetry.io/otel/sdk/log v0.16.0 h1:e/b4bdlQwC5fnGtG3dlXUrNOnP7c8YLVSpSfEBIkTnI= +go.opentelemetry.io/otel/sdk/log v0.16.0/go.mod h1:JKfP3T6ycy7QEuv3Hj8oKDy7KItrEkus8XJE6EoSzw4= +go.opentelemetry.io/otel/sdk/metric v1.40.0 h1:mtmdVqgQkeRxHgRv4qhyJduP3fYJRMX4AtAlbuWdCYw= +go.opentelemetry.io/otel/sdk/metric v1.40.0/go.mod h1:4Z2bGMf0KSK3uRjlczMOeMhKU2rhUqdWNoKcYrtcBPg= +go.opentelemetry.io/otel/trace v1.40.0 h1:WA4etStDttCSYuhwvEa8OP8I5EWu24lkOzp+ZYblVjw= +go.opentelemetry.io/otel/trace v1.40.0/go.mod h1:zeAhriXecNGP/s2SEG3+Y8X9ujcJOTqQ5RgdEJcawiA= go.opentelemetry.io/proto/otlp v1.9.0 h1:l706jCMITVouPOqEnii2fIAuO3IVGBRPV5ICjceRb/A= go.opentelemetry.io/proto/otlp v1.9.0/go.mod h1:xE+Cx5E/eEHw+ISFkwPLwCZefwVjY+pqKg1qcK03+/4= go.uber.org/goleak v1.3.0 h1:2K3zAYmnTNqV73imy9J1T3WC+gmCePx2hEGkimedGto= @@ -178,10 +182,10 @@ golang.org/x/text v0.33.0 h1:B3njUFyqtHDUI5jMn1YIr5B0IE2U0qck04r6d4KPAxE= golang.org/x/text v0.33.0/go.mod h1:LuMebE6+rBincTi9+xWTY8TztLzKHc/9C1uBCG27+q8= golang.org/x/tools v0.40.0 h1:yLkxfA+Qnul4cs9QA3KnlFu0lVmd8JJfoq+E41uSutA= golang.org/x/tools v0.40.0/go.mod h1:Ik/tzLRlbscWpqqMRjyWYDisX8bG13FrdXp3o4Sr9lc= -google.golang.org/genproto/googleapis/api v0.0.0-20251202230838-ff82c1b0f217 h1:fCvbg86sFXwdrl5LgVcTEvNC+2txB5mgROGmRL5mrls= -google.golang.org/genproto/googleapis/api v0.0.0-20251202230838-ff82c1b0f217/go.mod h1:+rXWjjaukWZun3mLfjmVnQi18E1AsFbDN9QdJ5YXLto= -google.golang.org/genproto/googleapis/rpc v0.0.0-20251202230838-ff82c1b0f217 h1:gRkg/vSppuSQoDjxyiGfN4Upv/h/DQmIR10ZU8dh4Ww= -google.golang.org/genproto/googleapis/rpc v0.0.0-20251202230838-ff82c1b0f217/go.mod h1:7i2o+ce6H/6BluujYR+kqX3GKH+dChPTQU19wjRPiGk= +google.golang.org/genproto/googleapis/api v0.0.0-20260128011058-8636f8732409 h1:merA0rdPeUV3YIIfHHcH4qBkiQAc1nfCKSI7lB4cV2M= +google.golang.org/genproto/googleapis/api v0.0.0-20260128011058-8636f8732409/go.mod h1:fl8J1IvUjCilwZzQowmw2b7HQB2eAuYBabMXzWurF+I= +google.golang.org/genproto/googleapis/rpc v0.0.0-20260128011058-8636f8732409 h1:H86B94AW+VfJWDqFeEbBPhEtHzJwJfTbgE2lZa54ZAQ= +google.golang.org/genproto/googleapis/rpc v0.0.0-20260128011058-8636f8732409/go.mod h1:j9x/tPzZkyxcgEFkiKEEGxfvyumM01BEtsW8xzOahRQ= google.golang.org/grpc v1.78.0 h1:K1XZG/yGDJnzMdd/uZHAkVqJE+xIDOcmdSFZkBUicNc= google.golang.org/grpc v1.78.0/go.mod h1:I47qjTo4OKbMkjA/aOOwxDIiPSBofUtQUI5EfpWvW7U= google.golang.org/protobuf v1.36.11 h1:fV6ZwhNocDyBLK0dj+fg8ektcVegBBuEolpbTQyBNVE= diff --git a/plugins/revenueflows/cache.go b/plugins/revenueflows/cache.go new file mode 100644 index 00000000..f0c7e42a --- /dev/null +++ b/plugins/revenueflows/cache.go @@ -0,0 +1,133 @@ +package revenueflows + +import ( + "context" + "fmt" + "io" + "net/http" + "sync" + "time" + + "github.com/open-policy-agent/opa/v1/ast" + "github.com/open-policy-agent/opa/v1/rego" +) + +// cacheEntry holds a compiled OPA query and its metadata. +type cacheEntry struct { + pq rego.PreparedEvalQuery + query string + fetchedAt time.Time +} + +// PolicyCache is a TTL-based LRU cache for compiled rego policies. +// Keyed by policy URL. +type PolicyCache struct { + mu sync.RWMutex + entries map[string]*cacheEntry + maxSize int + ttl time.Duration + + fetchTimeout time.Duration + maxFileSize int64 +} + +// NewPolicyCache creates a new cache. +func NewPolicyCache(maxSize int, ttl, fetchTimeout time.Duration, maxFileSize int64) *PolicyCache { + return &PolicyCache{ + entries: make(map[string]*cacheEntry), + maxSize: maxSize, + ttl: ttl, + fetchTimeout: fetchTimeout, + maxFileSize: maxFileSize, + } +} + +// GetOrCompile returns a compiled query for the given policy URL and OPA query path. +// Fetches and compiles on cache miss or TTL expiry. +func (c *PolicyCache) GetOrCompile(ctx context.Context, url, query string) (rego.PreparedEvalQuery, error) { + c.mu.RLock() + entry, ok := c.entries[url] + c.mu.RUnlock() + + if ok && entry.query == query && time.Since(entry.fetchedAt) < c.ttl { + return entry.pq, nil + } + + // Cache miss or expired — fetch and compile + return c.fetchAndCompile(ctx, url, query) +} + +func (c *PolicyCache) fetchAndCompile(ctx context.Context, url, query string) (rego.PreparedEvalQuery, error) { + c.mu.Lock() + defer c.mu.Unlock() + + // Double-check after acquiring write lock + if entry, ok := c.entries[url]; ok && entry.query == query && time.Since(entry.fetchedAt) < c.ttl { + return entry.pq, nil + } + + // Fetch rego source from URL + source, err := c.fetchPolicy(ctx, url) + if err != nil { + return rego.PreparedEvalQuery{}, fmt.Errorf("fetch %s: %w", url, err) + } + + // Compile + compiler, err := ast.CompileModulesWithOpt(map[string]string{"policy.rego": source}, ast.CompileOpts{}) + if err != nil { + return rego.PreparedEvalQuery{}, fmt.Errorf("compile %s: %w", url, err) + } + + pq, err := rego.New( + rego.Query(query), + rego.Compiler(compiler), + ).PrepareForEval(ctx) + if err != nil { + return rego.PreparedEvalQuery{}, fmt.Errorf("prepare query %s: %w", query, err) + } + + // Evict oldest if at capacity + if len(c.entries) >= c.maxSize { + var oldestKey string + var oldestTime time.Time + for k, v := range c.entries { + if oldestKey == "" || v.fetchedAt.Before(oldestTime) { + oldestKey = k + oldestTime = v.fetchedAt + } + } + if oldestKey != "" { + delete(c.entries, oldestKey) + } + } + + c.entries[url] = &cacheEntry{pq: pq, query: query, fetchedAt: time.Now()} + return pq, nil +} + +func (c *PolicyCache) fetchPolicy(ctx context.Context, url string) (string, error) { + reqCtx, cancel := context.WithTimeout(ctx, c.fetchTimeout) + defer cancel() + + req, err := http.NewRequestWithContext(reqCtx, http.MethodGet, url, nil) + if err != nil { + return "", err + } + + resp, err := http.DefaultClient.Do(req) + if err != nil { + return "", err + } + defer resp.Body.Close() + + if resp.StatusCode != http.StatusOK { + return "", fmt.Errorf("HTTP %d from %s", resp.StatusCode, url) + } + + body, err := io.ReadAll(io.LimitReader(resp.Body, c.maxFileSize)) + if err != nil { + return "", err + } + + return string(body), nil +} diff --git a/plugins/revenueflows/cmd/plugin.go b/plugins/revenueflows/cmd/plugin.go new file mode 100644 index 00000000..4356c1a3 --- /dev/null +++ b/plugins/revenueflows/cmd/plugin.go @@ -0,0 +1,19 @@ +// Package main provides the plugin entry point for the RevenueFlows middleware. +// Compiled as a Go plugin (.so) and loaded by beckn-onix at runtime. +package main + +import ( + "context" + "net/http" + + revenueflows "github.com/beckn-one/deg/plugins/revenueflows" +) + +type provider struct{} + +func (p provider) New(ctx context.Context, cfg map[string]string) (func(http.Handler) http.Handler, error) { + return revenueflows.NewMiddleware(cfg) +} + +// Provider is the exported symbol that beckn-onix plugin manager looks up. +var Provider = provider{} diff --git a/plugins/revenueflows/config.go b/plugins/revenueflows/config.go new file mode 100644 index 00000000..7594e62f --- /dev/null +++ b/plugins/revenueflows/config.go @@ -0,0 +1,133 @@ +package revenueflows + +import ( + "strconv" + "strings" + "time" +) + +// Config holds configuration for the RevenueFlows plugin. +type Config struct { + // Enabled controls whether the plugin is active. + Enabled bool + + // Actions is the list of beckn actions that trigger revenue flow computation. + // Default: ["on_status"] + Actions []string + + // CacheTTL is how long a compiled rego policy is cached before re-fetch. + // Default: 5 minutes. + CacheTTL time.Duration + + // MaxCacheEntries is the LRU bound on cached compiled policies. + // Default: 50. + MaxCacheEntries int + + // PolicyFetchTimeout is the HTTP timeout for fetching rego from a URL. + // Default: 30 seconds. + PolicyFetchTimeout time.Duration + + // MaxPolicySize is the maximum rego file size in bytes. + // Default: 1 MB. + MaxPolicySize int64 + + // DebugLogging enables verbose logging. + DebugLogging bool + + // AllowedDomains restricts which domains rego can be fetched from. + // Empty = allow all. Comma-separated list. + AllowedDomains []string +} + +// DefaultConfig returns a Config with sensible defaults. +func DefaultConfig() *Config { + return &Config{ + Enabled: true, + Actions: []string{"on_status"}, + CacheTTL: 5 * time.Minute, + MaxCacheEntries: 50, + PolicyFetchTimeout: 30 * time.Second, + MaxPolicySize: 1 << 20, // 1 MB + DebugLogging: false, + } +} + +// ParseConfig parses the plugin configuration map. +func ParseConfig(cfg map[string]string) (*Config, error) { + config := DefaultConfig() + + if enabled, ok := cfg["enabled"]; ok { + config.Enabled = enabled == "true" || enabled == "1" + } + + if actions, ok := cfg["actions"]; ok && actions != "" { + list := strings.Split(actions, ",") + config.Actions = make([]string, 0, len(list)) + for _, a := range list { + a = strings.TrimSpace(a) + if a != "" { + config.Actions = append(config.Actions, a) + } + } + } + + if ttl, ok := cfg["cacheTTL"]; ok && ttl != "" { + seconds, err := strconv.Atoi(ttl) + if err != nil { + d, err2 := time.ParseDuration(ttl) + if err2 != nil { + return nil, err + } + config.CacheTTL = d + } else { + config.CacheTTL = time.Duration(seconds) * time.Second + } + } + + if max, ok := cfg["maxCacheEntries"]; ok && max != "" { + n, err := strconv.Atoi(max) + if err != nil { + return nil, err + } + config.MaxCacheEntries = n + } + + if debug, ok := cfg["debugLogging"]; ok { + config.DebugLogging = debug == "true" || debug == "1" + } + + if domains, ok := cfg["allowedDomains"]; ok && domains != "" { + for _, d := range strings.Split(domains, ",") { + d = strings.TrimSpace(d) + if d != "" { + config.AllowedDomains = append(config.AllowedDomains, d) + } + } + } + + return config, nil +} + +// IsActionEnabled checks if the given action is in the configured list. +func (c *Config) IsActionEnabled(action string) bool { + for _, a := range c.Actions { + if a == action { + return true + } + } + return false +} + +// IsDomainAllowed checks if the URL domain is in the allowed list. +// Returns true if no domain restriction is configured. +func (c *Config) IsDomainAllowed(url string) bool { + if len(c.AllowedDomains) == 0 { + return true + } + for _, d := range c.AllowedDomains { + if strings.Contains(url, d) { + return true + } + } + return false +} diff --git a/plugins/revenueflows/inject.go b/plugins/revenueflows/inject.go new file mode 100644 index 00000000..143355ef --- /dev/null +++ b/plugins/revenueflows/inject.go @@ -0,0 +1,99 @@ +package revenueflows + +import ( + "encoding/json" + "fmt" + "strings" +) + +// PolicyRef holds the policy URL and OPA query path extracted from a message. +type PolicyRef struct { + URL string + QueryPath string +} + +// ExtractPolicyRef reads contractAttributes.policy.url and .queryPath from +// the message body. Returns nil if not present. +func ExtractPolicyRef(body []byte) *PolicyRef { + var envelope struct { + Message struct { + Contract struct { + ContractAttributes struct { + Policy struct { + URL string `json:"url"` + QueryPath string `json:"queryPath"` + } `json:"policy"` + } `json:"contractAttributes"` + } `json:"contract"` + } `json:"message"` + } + + if err := json.Unmarshal(body, &envelope); err != nil { + return nil + } + + url := envelope.Message.Contract.ContractAttributes.Policy.URL + qp := envelope.Message.Contract.ContractAttributes.Policy.QueryPath + if url == "" || qp == "" { + return nil + } + + return &PolicyRef{URL: url, QueryPath: qp} +} + +// ExtractAction reads the beckn action from the URL path or context.action. +func ExtractAction(urlPath string, body []byte) string { + // Try URL path first (e.g., /bpp/caller/on_status → on_status) + parts := strings.Split(strings.TrimRight(urlPath, "/"), "/") + if len(parts) > 0 { + action := parts[len(parts)-1] + if action != "" && action != "caller" && action != "receiver" { + return action + } + } + + // Fallback: parse context.action from body + var envelope struct { + Context struct { + Action string `json:"action"` + } `json:"context"` + } + if err := json.Unmarshal(body, &envelope); err == nil && envelope.Context.Action != "" { + return envelope.Context.Action + } + + return "" +} + +// InjectRevenueFlows sets message.contract.contractAttributes.revenueFlows +// in the JSON body and returns the modified bytes. +// Uses json.Number to preserve numeric precision. +func InjectRevenueFlows(body []byte, flows interface{}) ([]byte, error) { + // Use Decoder with UseNumber to preserve numeric precision + dec := json.NewDecoder(strings.NewReader(string(body))) + dec.UseNumber() + + var payload map[string]interface{} + if err := dec.Decode(&payload); err != nil { + return nil, fmt.Errorf("failed to decode body: %w", err) + } + + // Navigate: message → contract → contractAttributes + message, ok := payload["message"].(map[string]interface{}) + if !ok { + return nil, fmt.Errorf("message not found or not an object") + } + contract, ok := message["contract"].(map[string]interface{}) + if !ok { + return nil, fmt.Errorf("message.contract not found or not an object") + } + attrs, ok := contract["contractAttributes"].(map[string]interface{}) + if !ok { + return nil, fmt.Errorf("contractAttributes not found or not an object") + } + + // Inject + attrs["revenueFlows"] = flows + + return json.Marshal(payload) +} diff --git a/plugins/revenueflows/middleware.go b/plugins/revenueflows/middleware.go new file mode 100644 index 00000000..c40b6157 --- /dev/null +++ b/plugins/revenueflows/middleware.go @@ -0,0 +1,166 @@ +package revenueflows + +import ( + "bytes" + "encoding/json" + "fmt" + "io" + "net/http" + "strings" + + "github.com/open-policy-agent/opa/v1/rego" +) + +// NewMiddleware returns an HTTP middleware that computes revenue flows +// and injects them into the request body before passing to the next handler. +func NewMiddleware(cfg map[string]string) (func(http.Handler) http.Handler, error) { + config, err := ParseConfig(cfg) + if err != nil { + return nil, fmt.Errorf("revenueflows: config: %w", err) + } + + cache := NewPolicyCache( + config.MaxCacheEntries, + config.CacheTTL, + config.PolicyFetchTimeout, + config.MaxPolicySize, + ) + + fmt.Printf("[RevenueFlows] Middleware enabled=%v, actions=%v, cacheTTL=%s\n", + config.Enabled, config.Actions, config.CacheTTL) + + return func(next http.Handler) http.Handler { + return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + if !config.Enabled { + next.ServeHTTP(w, r) + return + } + + // Read body + body, err := io.ReadAll(r.Body) + if err != nil { + next.ServeHTTP(w, r) + return + } + + // Check action + action := extractActionFromPathAndBody(r.URL.Path, body) + if !config.IsActionEnabled(action) { + // Pass through unchanged + r.Body = io.NopCloser(bytes.NewReader(body)) + r.ContentLength = int64(len(body)) + next.ServeHTTP(w, r) + return + } + + // Extract policy reference + ref := ExtractPolicyRef(body) + if ref == nil { + r.Body = io.NopCloser(bytes.NewReader(body)) + r.ContentLength = int64(len(body)) + next.ServeHTTP(w, r) + return + } + + if !config.IsDomainAllowed(ref.URL) { + fmt.Printf("[RevenueFlows] WARN: policy URL domain not allowed: %s\n", ref.URL) + r.Body = io.NopCloser(bytes.NewReader(body)) + r.ContentLength = int64(len(body)) + next.ServeHTTP(w, r) + return + } + + if config.DebugLogging { + fmt.Printf("[RevenueFlows] Evaluating %s with query %s\n", ref.URL, ref.QueryPath) + } + + // Get or compile policy + pq, err := cache.GetOrCompile(r.Context(), ref.URL, ref.QueryPath) + if err != nil { + fmt.Printf("[RevenueFlows] WARN: failed to load policy: %v\n", err) + r.Body = io.NopCloser(bytes.NewReader(body)) + r.ContentLength = int64(len(body)) + next.ServeHTTP(w, r) + return + } + + // Parse as OPA input + var input interface{} + if err := json.Unmarshal(body, &input); err != nil { + fmt.Printf("[RevenueFlows] WARN: failed to parse body: %v\n", err) + r.Body = io.NopCloser(bytes.NewReader(body)) + r.ContentLength = int64(len(body)) + next.ServeHTTP(w, r) + return + } + + // Evaluate + rs, err := pq.Eval(r.Context(), regoEvalInput(input)) + if err != nil { + fmt.Printf("[RevenueFlows] WARN: rego eval failed: %v\n", err) + r.Body = io.NopCloser(bytes.NewReader(body)) + r.ContentLength = int64(len(body)) + next.ServeHTTP(w, r) + return + } + + flows := extractFlowsFromResultSet(rs) + if flows == nil { + if config.DebugLogging { + fmt.Printf("[RevenueFlows] No revenue_flows in result, skipping\n") + } + r.Body = io.NopCloser(bytes.NewReader(body)) + r.ContentLength = int64(len(body)) + next.ServeHTTP(w, r) + return + } + + // Inject into body + modified, err := InjectRevenueFlows(body, flows) + if err != nil { + fmt.Printf("[RevenueFlows] WARN: inject failed: %v\n", err) + r.Body = io.NopCloser(bytes.NewReader(body)) + r.ContentLength = int64(len(body)) + next.ServeHTTP(w, r) + return + } + + fmt.Printf("[RevenueFlows] Injected %d revenue flow(s) for action %s\n", len(flows), action) + + // Pass modified body to next handler + r.Body = io.NopCloser(bytes.NewReader(modified)) + r.ContentLength = int64(len(modified)) + next.ServeHTTP(w, r) + }) + }, nil +} + +// regoEvalInput wraps input for OPA evaluation. +func regoEvalInput(input interface{}) rego.EvalOption { + return rego.EvalInput(input) +} + +// extractFlowsFromResultSet pulls revenue_flows from OPA result. +func extractFlowsFromResultSet(rs rego.ResultSet) []interface{} { + return extractFlows(rs) +} + +// extractActionFromPathAndBody extracts action from URL path or body. +func extractActionFromPathAndBody(urlPath string, body []byte) string { + parts := strings.Split(strings.TrimRight(urlPath, "/"), "/") + if len(parts) > 0 { + action := parts[len(parts)-1] + if action != "" && action != "caller" && action != "receiver" { + return action + } + } + var envelope struct { + Context struct { + Action string `json:"action"` + } `json:"context"` + } + if err := json.Unmarshal(body, &envelope); err == nil && envelope.Context.Action != "" { + return envelope.Context.Action + } + return "" +} diff --git a/plugins/revenueflows/revenueflows.go b/plugins/revenueflows/revenueflows.go new file mode 100644 index 00000000..7f1bad50 --- /dev/null +++ b/plugins/revenueflows/revenueflows.go @@ -0,0 +1,152 @@ +// Package revenueflows is an onix Step plugin that computes revenue flows +// from a rego policy embedded in the contract and injects them into the message. +// +// It reads the policy URL and query path from +// message.contract.contractAttributes.policy, evaluates the rego against +// the full message, and writes the resulting revenue_flows array back into +// contractAttributes.revenueFlows. +// +// Soft failure: if anything goes wrong (fetch, compile, eval), the message +// passes through unmodified with a warning log. Never blocks delivery. +package revenueflows + +import ( + "context" + "encoding/json" + "fmt" + + "github.com/beckn-one/beckn-onix/pkg/log" + "github.com/beckn-one/beckn-onix/pkg/model" + "github.com/open-policy-agent/opa/v1/rego" +) + +// RevenueFlows is a Step plugin that computes and injects revenue flows. +type RevenueFlows struct { + config *Config + cache *PolicyCache +} + +// New creates a new RevenueFlows plugin instance. +func New(cfg map[string]string) (*RevenueFlows, error) { + config, err := ParseConfig(cfg) + if err != nil { + return nil, fmt.Errorf("revenueflows: config: %w", err) + } + + cache := NewPolicyCache( + config.MaxCacheEntries, + config.CacheTTL, + config.PolicyFetchTimeout, + config.MaxPolicySize, + ) + + fmt.Printf("[RevenueFlows] Enabled=%v, actions=%v, cacheTTL=%s\n", + config.Enabled, config.Actions, config.CacheTTL) + + return &RevenueFlows{config: config, cache: cache}, nil +} + +// Run implements the Step interface. +func (rf *RevenueFlows) Run(ctx *model.StepContext) error { + if !rf.config.Enabled { + return nil + } + + // Check action + action := ExtractAction(ctx.Request.URL.Path, ctx.Body) + if !rf.config.IsActionEnabled(action) { + if rf.config.DebugLogging { + log.Debugf(ctx, "RevenueFlows: action '%s' not enabled, skipping", action) + } + return nil + } + + // Extract policy reference from the message + ref := ExtractPolicyRef(ctx.Body) + if ref == nil { + if rf.config.DebugLogging { + log.Debug(ctx, "RevenueFlows: no contractAttributes.policy in message, skipping") + } + return nil + } + + // Check domain allowlist + if !rf.config.IsDomainAllowed(ref.URL) { + log.Warnf(ctx, "RevenueFlows: policy URL domain not allowed: %s", ref.URL) + return nil + } + + if rf.config.DebugLogging { + log.Debugf(ctx, "RevenueFlows: evaluating %s with query %s", ref.URL, ref.QueryPath) + } + + // Get or compile the policy + pq, err := rf.cache.GetOrCompile(context.Background(), ref.URL, ref.QueryPath) + if err != nil { + log.Warnf(ctx, "RevenueFlows: failed to load policy: %v", err) + return nil // soft failure + } + + // Parse message as OPA input + var input interface{} + if err := json.Unmarshal(ctx.Body, &input); err != nil { + log.Warnf(ctx, "RevenueFlows: failed to parse message body: %v", err) + return nil + } + + // Evaluate + rs, err := pq.Eval(context.Background(), rego.EvalInput(input)) + if err != nil { + log.Warnf(ctx, "RevenueFlows: rego evaluation failed: %v", err) + return nil // soft failure + } + + // Extract revenue_flows from result + flows := extractFlows(rs) + if flows == nil { + if rf.config.DebugLogging { + log.Debug(ctx, "RevenueFlows: no revenue_flows in rego result, skipping injection") + } + return nil + } + + // Inject into message body + modified, err := InjectRevenueFlows(ctx.Body, flows) + if err != nil { + log.Warnf(ctx, "RevenueFlows: failed to inject revenue_flows: %v", err) + return nil // soft failure + } + + ctx.Body = modified + log.Infof(ctx, "RevenueFlows: injected %d revenue flow(s)", len(flows)) + return nil +} + +// Close is a no-op cleanup function. +func (rf *RevenueFlows) Close() {} + +// extractFlows pulls revenue_flows from the OPA result set. +// The query evaluates to the full package object; we look for the +// "revenue_flows" key within it. +func extractFlows(rs rego.ResultSet) []interface{} { + if len(rs) == 0 || len(rs[0].Expressions) == 0 { + return nil + } + + val := rs[0].Expressions[0].Value + + // If the query returns the full package, result is a map + if m, ok := val.(map[string]interface{}); ok { + if flows, ok := m["revenue_flows"].([]interface{}); ok { + return flows + } + return nil + } + + // If the query targets revenue_flows directly, result is an array + if flows, ok := val.([]interface{}); ok { + return flows + } + + return nil +} diff --git a/plugins/revenueflows/revenueflows_test.go b/plugins/revenueflows/revenueflows_test.go new file mode 100644 index 00000000..e375b6a7 --- /dev/null +++ b/plugins/revenueflows/revenueflows_test.go @@ -0,0 +1,195 @@ +package revenueflows + +import ( + "encoding/json" + "testing" +) + +// --------------------------------------------------------------------------- +// ExtractPolicyRef tests +// --------------------------------------------------------------------------- + +func TestExtractPolicyRef_Present(t *testing.T) { + body := []byte(`{ + "message": { + "contract": { + "contractAttributes": { + "policy": { + "url": "https://example.com/policy.rego", + "queryPath": "data.test.violations" + } + } + } + } + }`) + + ref := ExtractPolicyRef(body) + if ref == nil { + t.Fatal("expected non-nil PolicyRef") + } + if ref.URL != "https://example.com/policy.rego" { + t.Errorf("URL = %q, want %q", ref.URL, "https://example.com/policy.rego") + } + if ref.QueryPath != "data.test.violations" { + t.Errorf("QueryPath = %q, want %q", ref.QueryPath, "data.test.violations") + } +} + +func TestExtractPolicyRef_Missing(t *testing.T) { + body := []byte(`{"message": {"contract": {}}}`) + ref := ExtractPolicyRef(body) + if ref != nil { + t.Errorf("expected nil PolicyRef, got %+v", ref) + } +} + +func TestExtractPolicyRef_PartialMissing(t *testing.T) { + body := []byte(`{ + "message": { + "contract": { + "contractAttributes": { + "policy": { "url": "https://example.com/policy.rego" } + } + } + } + }`) + ref := ExtractPolicyRef(body) + if ref != nil { + t.Errorf("expected nil PolicyRef when queryPath missing, got %+v", ref) + } +} + +// --------------------------------------------------------------------------- +// ExtractAction tests +// --------------------------------------------------------------------------- + +func TestExtractAction_FromPath(t *testing.T) { + action := ExtractAction("/bpp/caller/on_status", nil) + if action != "on_status" { + t.Errorf("action = %q, want %q", action, "on_status") + } +} + +func TestExtractAction_FromBody(t *testing.T) { + body := []byte(`{"context": {"action": "on_confirm"}}`) + action := ExtractAction("/bpp/caller", body) + if action != "on_confirm" { + t.Errorf("action = %q, want %q", action, "on_confirm") + } +} + +// --------------------------------------------------------------------------- +// InjectRevenueFlows tests +// --------------------------------------------------------------------------- + +func TestInjectRevenueFlows(t *testing.T) { + body := []byte(`{ + "context": {"action": "on_status"}, + "message": { + "contract": { + "contractAttributes": { + "@type": "DEGContract", + "policy": {"url": "test", "queryPath": "test"} + } + } + } + }`) + + flows := []interface{}{ + map[string]interface{}{"role": "buyer", "value": -525.0, "currency": "INR"}, + map[string]interface{}{"role": "seller", "value": 525.0, "currency": "INR"}, + } + + result, err := InjectRevenueFlows(body, flows) + if err != nil { + t.Fatalf("InjectRevenueFlows failed: %v", err) + } + + // Verify the result has revenueFlows + var payload map[string]interface{} + if err := json.Unmarshal(result, &payload); err != nil { + t.Fatalf("failed to parse result: %v", err) + } + + msg := payload["message"].(map[string]interface{}) + contract := msg["contract"].(map[string]interface{}) + attrs := contract["contractAttributes"].(map[string]interface{}) + rf, ok := attrs["revenueFlows"].([]interface{}) + if !ok { + t.Fatal("revenueFlows not found or wrong type") + } + if len(rf) != 2 { + t.Errorf("len(revenueFlows) = %d, want 2", len(rf)) + } + + // Verify existing fields preserved + if attrs["@type"] != "DEGContract" { + t.Errorf("@type lost after injection") + } +} + +func TestInjectRevenueFlows_NoContract(t *testing.T) { + body := []byte(`{"message": {}}`) + _, err := InjectRevenueFlows(body, []interface{}{}) + if err == nil { + t.Error("expected error when contract missing") + } +} + +// --------------------------------------------------------------------------- +// Config tests +// --------------------------------------------------------------------------- + +func TestParseConfig_Defaults(t *testing.T) { + cfg, err := ParseConfig(map[string]string{}) + if err != nil { + t.Fatalf("ParseConfig failed: %v", err) + } + if !cfg.Enabled { + t.Error("expected Enabled=true by default") + } + if len(cfg.Actions) != 1 || cfg.Actions[0] != "on_status" { + t.Errorf("Actions = %v, want [on_status]", cfg.Actions) + } +} + +func TestParseConfig_Custom(t *testing.T) { + cfg, err := ParseConfig(map[string]string{ + "actions": "on_status,on_confirm", + "cacheTTL": "600", + "maxCacheEntries": "100", + "debugLogging": "true", + "allowedDomains": "raw.githubusercontent.com,schema.beckn.io", + }) + if err != nil { + t.Fatalf("ParseConfig failed: %v", err) + } + if len(cfg.Actions) != 2 { + t.Errorf("Actions = %v, want 2 items", cfg.Actions) + } + if cfg.MaxCacheEntries != 100 { + t.Errorf("MaxCacheEntries = %d, want 100", cfg.MaxCacheEntries) + } + if !cfg.DebugLogging { + t.Error("expected DebugLogging=true") + } + if len(cfg.AllowedDomains) != 2 { + t.Errorf("AllowedDomains = %v, want 2 items", cfg.AllowedDomains) + } +} + +func TestIsDomainAllowed(t *testing.T) { + cfg := &Config{AllowedDomains: []string{"raw.githubusercontent.com"}} + if !cfg.IsDomainAllowed("https://raw.githubusercontent.com/beckn/DEG/policy.rego") { + t.Error("expected allowed") + } + if cfg.IsDomainAllowed("https://evil.com/policy.rego") { + t.Error("expected blocked") + } + + // Empty = allow all + cfg2 := &Config{} + if !cfg2.IsDomainAllowed("https://anything.com/policy.rego") { + t.Error("expected allowed when no restrictions") + } +}