From e86b177188908f978c128df50aa5823abe329bef Mon Sep 17 00:00:00 2001 From: Ameet Deshpande Date: Mon, 30 Mar 2026 01:21:00 +0530 Subject: [PATCH] Add revenue flows plugin and update onix build Adds the computerevenueflows plugin for beckn v2 demand flexibility, which injects revenue flow data into beckn responses. Includes cache, config, middleware, and test files. Updates Dockerfile and build script to compile the new plugin with the locally built onix. Co-Authored-By: Claude Opus 4.6 (1M context) --- build/Dockerfile.onix-adapter-deg | 4 +- build/build-multiarch.sh | 7 +- plugins/go.mod | 13 +- plugins/go.sum | 36 ++-- plugins/revenueflows/cache.go | 133 +++++++++++++++ plugins/revenueflows/cmd/plugin.go | 19 +++ plugins/revenueflows/config.go | 133 +++++++++++++++ plugins/revenueflows/inject.go | 99 +++++++++++ plugins/revenueflows/middleware.go | 166 ++++++++++++++++++ plugins/revenueflows/revenueflows.go | 152 +++++++++++++++++ plugins/revenueflows/revenueflows_test.go | 195 ++++++++++++++++++++++ 11 files changed, 931 insertions(+), 26 deletions(-) create mode 100644 plugins/revenueflows/cache.go create mode 100644 plugins/revenueflows/cmd/plugin.go create mode 100644 plugins/revenueflows/config.go create mode 100644 plugins/revenueflows/inject.go create mode 100644 plugins/revenueflows/middleware.go create mode 100644 plugins/revenueflows/revenueflows.go create mode 100644 plugins/revenueflows/revenueflows_test.go diff --git a/build/Dockerfile.onix-adapter-deg b/build/Dockerfile.onix-adapter-deg index 15cb6db9..0240736b 100644 --- a/build/Dockerfile.onix-adapter-deg +++ b/build/Dockerfile.onix-adapter-deg @@ -45,8 +45,8 @@ WORKDIR /workspace/deg-plugins # Build DEG ledger recorder plugin RUN go build -buildmode=plugin -o /workspace/beckn-onix/plugins/degledgerrecorder.so ./degledgerrecorder/cmd/plugin.go -# # Build policy enforcer plugin -# RUN go build -buildmode=plugin -o /workspace/beckn-onix/plugins/policyenforcer.so ./policyenforcer/cmd/plugin.go +# Build DEG revenue flows plugin +RUN go build -buildmode=plugin -o /workspace/beckn-onix/plugins/revenueflows.so ./revenueflows/cmd/plugin.go # Create minimal runtime image # Using debian-slim (glibc) for Go plugin (.so) compatibility with the bullseye builder. diff --git a/build/build-multiarch.sh b/build/build-multiarch.sh index 29d267ee..928743fa 100755 --- a/build/build-multiarch.sh +++ b/build/build-multiarch.sh @@ -1,8 +1,9 @@ #!/bin/bash -# Multi-Architecture Docker Build Script for DEG Ledger Recorder Plugin +# Multi-Architecture Docker Build Script for DEG Plugins # ====================================================================== -# Builds onix-adapter with DEG plugins for linux/amd64 and linux/arm64 +# Builds onix-adapter with DEG plugins (degledgerrecorder, revenueflows) +# for linux/amd64 and linux/arm64 # # Prerequisites: # - Docker Desktop or Docker Engine with buildx support @@ -119,7 +120,7 @@ else fi echo "============================================" -echo "Multi-Arch Build: DEG Ledger Recorder" +echo "Multi-Arch Build: DEG Plugins (ledgerrecorder + revenueflows)" echo "============================================" echo "DEG Root: $DEG_ROOT" echo "Beckn-ONIX Root: $BECKN_ONIX_ROOT" diff --git a/plugins/go.mod b/plugins/go.mod index f9eec617..f37117e7 100644 --- a/plugins/go.mod +++ b/plugins/go.mod @@ -45,12 +45,14 @@ require ( github.com/xeipuuv/gojsonreference v0.0.0-20180127040603-bd5ef7bd5415 // indirect github.com/yashtewari/glob-intersection v0.2.0 // indirect go.opentelemetry.io/auto/sdk v1.2.1 // indirect - go.opentelemetry.io/otel v1.39.0 // indirect + go.opentelemetry.io/otel v1.40.0 // indirect go.opentelemetry.io/otel/exporters/prometheus v0.46.0 // indirect - go.opentelemetry.io/otel/metric v1.39.0 // indirect - go.opentelemetry.io/otel/sdk v1.39.0 // indirect - go.opentelemetry.io/otel/sdk/metric v1.39.0 // indirect - go.opentelemetry.io/otel/trace v1.39.0 // indirect + go.opentelemetry.io/otel/log v0.16.0 // indirect + go.opentelemetry.io/otel/metric v1.40.0 // indirect + go.opentelemetry.io/otel/sdk v1.40.0 // indirect + go.opentelemetry.io/otel/sdk/log v0.16.0 // indirect + go.opentelemetry.io/otel/sdk/metric v1.40.0 // indirect + go.opentelemetry.io/otel/trace v1.40.0 // indirect go.yaml.in/yaml/v2 v2.4.2 // indirect go.yaml.in/yaml/v3 v3.0.4 // indirect golang.org/x/sync v0.19.0 // indirect @@ -58,5 +60,6 @@ require ( google.golang.org/protobuf v1.36.11 // indirect gopkg.in/ini.v1 v1.67.1 // indirect gopkg.in/natefinch/lumberjack.v2 v2.2.1 // indirect + gopkg.in/yaml.v3 v3.0.1 // indirect sigs.k8s.io/yaml v1.6.0 // indirect ) diff --git a/plugins/go.sum b/plugins/go.sum index 64fceb57..4f32a4fe 100644 --- a/plugins/go.sum +++ b/plugins/go.sum @@ -49,8 +49,8 @@ github.com/google/go-cmp v0.7.0 h1:wk8382ETsv4JYUZwIsn6YpYiWiBsYLSJiTsyBybVuN8= github.com/google/go-cmp v0.7.0/go.mod h1:pXiqmnSA92OHEEa9HXL2W4E7lf9JzCmGVUdgjX3N/iU= github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0= github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= -github.com/grpc-ecosystem/grpc-gateway/v2 v2.27.3 h1:NmZ1PKzSTQbuGHw9DGPFomqkkLWMC+vZCkfs+FHv1Vg= -github.com/grpc-ecosystem/grpc-gateway/v2 v2.27.3/go.mod h1:zQrxl1YP88HQlA6i9c63DSVPFklWpGX4OWAc9bFuaH4= +github.com/grpc-ecosystem/grpc-gateway/v2 v2.27.7 h1:X+2YciYSxvMQK0UZ7sg45ZVabVZBeBuvMkmuI2V3Fak= +github.com/grpc-ecosystem/grpc-gateway/v2 v2.27.7/go.mod h1:lW34nIZuQ8UDPdkon5fmfp2l3+ZkQ2me/+oecHYLOII= github.com/klauspost/compress v1.18.2 h1:iiPHWW0YrcFgpBYhsA6D1+fqHssJscY/Tm/y2Uqnapk= github.com/klauspost/compress v1.18.2/go.mod h1:R0h/fSBs8DE4ENlcrlib3PsXS61voFxhIs2DeRhCvJ4= github.com/kr/pretty v0.3.1 h1:flRD4NNwYAUpkphVc1HcthR4KEIFJ65n8Mw5qdRn3LE= @@ -135,8 +135,8 @@ go.opentelemetry.io/auto/sdk v1.2.1 h1:jXsnJ4Lmnqd11kwkBV2LgLoFMZKizbCi5fNZ/ipaZ go.opentelemetry.io/auto/sdk v1.2.1/go.mod h1:KRTj+aOaElaLi+wW1kO/DZRXwkF4C5xPbEe3ZiIhN7Y= go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.64.0 h1:ssfIgGNANqpVFCndZvcuyKbl0g+UAVcbBcqGkG28H0Y= go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.64.0/go.mod h1:GQ/474YrbE4Jx8gZ4q5I4hrhUzM6UPzyrqJYV2AqPoQ= -go.opentelemetry.io/otel v1.39.0 h1:8yPrr/S0ND9QEfTfdP9V+SiwT4E0G7Y5MO7p85nis48= -go.opentelemetry.io/otel v1.39.0/go.mod h1:kLlFTywNWrFyEdH0oj2xK0bFYZtHRYUdv1NklR/tgc8= +go.opentelemetry.io/otel v1.40.0 h1:oA5YeOcpRTXq6NN7frwmwFR0Cn3RhTVZvXsP4duvCms= +go.opentelemetry.io/otel v1.40.0/go.mod h1:IMb+uXZUKkMXdPddhwAHm6UfOwJyh4ct1ybIlV14J0g= go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.39.0 h1:f0cb2XPmrqn4XMy9PNliTgRKJgS5WcL/u0/WRYGz4t0= go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.39.0/go.mod h1:vnakAaFckOMiMtOIhFI2MNH4FYrZzXCYxmb1LlhoGz8= go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc v1.39.0 h1:in9O8ESIOlwJAEGTkkf34DesGRAc/Pn8qJ7k3r/42LM= @@ -145,14 +145,18 @@ go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracehttp v1.39.0 h1:Ckwye go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracehttp v1.39.0/go.mod h1:teIFJh5pW2y+AN7riv6IBPX2DuesS3HgP39mwOspKwU= go.opentelemetry.io/otel/exporters/prometheus v0.46.0 h1:I8WIFXR351FoLJYuloU4EgXbtNX2URfU/85pUPheIEQ= go.opentelemetry.io/otel/exporters/prometheus v0.46.0/go.mod h1:ztwVUHe5DTR/1v7PeuGRnU5Bbd4QKYwApWmuutKsJSs= -go.opentelemetry.io/otel/metric v1.39.0 h1:d1UzonvEZriVfpNKEVmHXbdf909uGTOQjA0HF0Ls5Q0= -go.opentelemetry.io/otel/metric v1.39.0/go.mod h1:jrZSWL33sD7bBxg1xjrqyDjnuzTUB0x1nBERXd7Ftcs= -go.opentelemetry.io/otel/sdk v1.39.0 h1:nMLYcjVsvdui1B/4FRkwjzoRVsMK8uL/cj0OyhKzt18= -go.opentelemetry.io/otel/sdk v1.39.0/go.mod h1:vDojkC4/jsTJsE+kh+LXYQlbL8CgrEcwmt1ENZszdJE= -go.opentelemetry.io/otel/sdk/metric v1.39.0 h1:cXMVVFVgsIf2YL6QkRF4Urbr/aMInf+2WKg+sEJTtB8= -go.opentelemetry.io/otel/sdk/metric v1.39.0/go.mod h1:xq9HEVH7qeX69/JnwEfp6fVq5wosJsY1mt4lLfYdVew= -go.opentelemetry.io/otel/trace v1.39.0 h1:2d2vfpEDmCJ5zVYz7ijaJdOF59xLomrvj7bjt6/qCJI= -go.opentelemetry.io/otel/trace v1.39.0/go.mod h1:88w4/PnZSazkGzz/w84VHpQafiU4EtqqlVdxWy+rNOA= +go.opentelemetry.io/otel/log v0.16.0 h1:DeuBPqCi6pQwtCK0pO4fvMB5eBq6sNxEnuTs88pjsN4= +go.opentelemetry.io/otel/log v0.16.0/go.mod h1:rWsmqNVTLIA8UnwYVOItjyEZDbKIkMxdQunsIhpUMes= +go.opentelemetry.io/otel/metric v1.40.0 h1:rcZe317KPftE2rstWIBitCdVp89A2HqjkxR3c11+p9g= +go.opentelemetry.io/otel/metric v1.40.0/go.mod h1:ib/crwQH7N3r5kfiBZQbwrTge743UDc7DTFVZrrXnqc= +go.opentelemetry.io/otel/sdk v1.40.0 h1:KHW/jUzgo6wsPh9At46+h4upjtccTmuZCFAc9OJ71f8= +go.opentelemetry.io/otel/sdk v1.40.0/go.mod h1:Ph7EFdYvxq72Y8Li9q8KebuYUr2KoeyHx0DRMKrYBUE= +go.opentelemetry.io/otel/sdk/log v0.16.0 h1:e/b4bdlQwC5fnGtG3dlXUrNOnP7c8YLVSpSfEBIkTnI= +go.opentelemetry.io/otel/sdk/log v0.16.0/go.mod h1:JKfP3T6ycy7QEuv3Hj8oKDy7KItrEkus8XJE6EoSzw4= +go.opentelemetry.io/otel/sdk/metric v1.40.0 h1:mtmdVqgQkeRxHgRv4qhyJduP3fYJRMX4AtAlbuWdCYw= +go.opentelemetry.io/otel/sdk/metric v1.40.0/go.mod h1:4Z2bGMf0KSK3uRjlczMOeMhKU2rhUqdWNoKcYrtcBPg= +go.opentelemetry.io/otel/trace v1.40.0 h1:WA4etStDttCSYuhwvEa8OP8I5EWu24lkOzp+ZYblVjw= +go.opentelemetry.io/otel/trace v1.40.0/go.mod h1:zeAhriXecNGP/s2SEG3+Y8X9ujcJOTqQ5RgdEJcawiA= go.opentelemetry.io/proto/otlp v1.9.0 h1:l706jCMITVouPOqEnii2fIAuO3IVGBRPV5ICjceRb/A= go.opentelemetry.io/proto/otlp v1.9.0/go.mod h1:xE+Cx5E/eEHw+ISFkwPLwCZefwVjY+pqKg1qcK03+/4= go.uber.org/goleak v1.3.0 h1:2K3zAYmnTNqV73imy9J1T3WC+gmCePx2hEGkimedGto= @@ -178,10 +182,10 @@ golang.org/x/text v0.33.0 h1:B3njUFyqtHDUI5jMn1YIr5B0IE2U0qck04r6d4KPAxE= golang.org/x/text v0.33.0/go.mod h1:LuMebE6+rBincTi9+xWTY8TztLzKHc/9C1uBCG27+q8= golang.org/x/tools v0.40.0 h1:yLkxfA+Qnul4cs9QA3KnlFu0lVmd8JJfoq+E41uSutA= golang.org/x/tools v0.40.0/go.mod h1:Ik/tzLRlbscWpqqMRjyWYDisX8bG13FrdXp3o4Sr9lc= -google.golang.org/genproto/googleapis/api v0.0.0-20251202230838-ff82c1b0f217 h1:fCvbg86sFXwdrl5LgVcTEvNC+2txB5mgROGmRL5mrls= -google.golang.org/genproto/googleapis/api v0.0.0-20251202230838-ff82c1b0f217/go.mod h1:+rXWjjaukWZun3mLfjmVnQi18E1AsFbDN9QdJ5YXLto= -google.golang.org/genproto/googleapis/rpc v0.0.0-20251202230838-ff82c1b0f217 h1:gRkg/vSppuSQoDjxyiGfN4Upv/h/DQmIR10ZU8dh4Ww= -google.golang.org/genproto/googleapis/rpc v0.0.0-20251202230838-ff82c1b0f217/go.mod h1:7i2o+ce6H/6BluujYR+kqX3GKH+dChPTQU19wjRPiGk= +google.golang.org/genproto/googleapis/api v0.0.0-20260128011058-8636f8732409 h1:merA0rdPeUV3YIIfHHcH4qBkiQAc1nfCKSI7lB4cV2M= +google.golang.org/genproto/googleapis/api v0.0.0-20260128011058-8636f8732409/go.mod h1:fl8J1IvUjCilwZzQowmw2b7HQB2eAuYBabMXzWurF+I= +google.golang.org/genproto/googleapis/rpc v0.0.0-20260128011058-8636f8732409 h1:H86B94AW+VfJWDqFeEbBPhEtHzJwJfTbgE2lZa54ZAQ= +google.golang.org/genproto/googleapis/rpc v0.0.0-20260128011058-8636f8732409/go.mod h1:j9x/tPzZkyxcgEFkiKEEGxfvyumM01BEtsW8xzOahRQ= google.golang.org/grpc v1.78.0 h1:K1XZG/yGDJnzMdd/uZHAkVqJE+xIDOcmdSFZkBUicNc= google.golang.org/grpc v1.78.0/go.mod h1:I47qjTo4OKbMkjA/aOOwxDIiPSBofUtQUI5EfpWvW7U= google.golang.org/protobuf v1.36.11 h1:fV6ZwhNocDyBLK0dj+fg8ektcVegBBuEolpbTQyBNVE= diff --git a/plugins/revenueflows/cache.go b/plugins/revenueflows/cache.go new file mode 100644 index 00000000..f0c7e42a --- /dev/null +++ b/plugins/revenueflows/cache.go @@ -0,0 +1,133 @@ +package revenueflows + +import ( + "context" + "fmt" + "io" + "net/http" + "sync" + "time" + + "github.com/open-policy-agent/opa/v1/ast" + "github.com/open-policy-agent/opa/v1/rego" +) + +// cacheEntry holds a compiled OPA query and its metadata. +type cacheEntry struct { + pq rego.PreparedEvalQuery + query string + fetchedAt time.Time +} + +// PolicyCache is a TTL-based LRU cache for compiled rego policies. +// Keyed by policy URL. +type PolicyCache struct { + mu sync.RWMutex + entries map[string]*cacheEntry + maxSize int + ttl time.Duration + + fetchTimeout time.Duration + maxFileSize int64 +} + +// NewPolicyCache creates a new cache. +func NewPolicyCache(maxSize int, ttl, fetchTimeout time.Duration, maxFileSize int64) *PolicyCache { + return &PolicyCache{ + entries: make(map[string]*cacheEntry), + maxSize: maxSize, + ttl: ttl, + fetchTimeout: fetchTimeout, + maxFileSize: maxFileSize, + } +} + +// GetOrCompile returns a compiled query for the given policy URL and OPA query path. +// Fetches and compiles on cache miss or TTL expiry. +func (c *PolicyCache) GetOrCompile(ctx context.Context, url, query string) (rego.PreparedEvalQuery, error) { + c.mu.RLock() + entry, ok := c.entries[url] + c.mu.RUnlock() + + if ok && entry.query == query && time.Since(entry.fetchedAt) < c.ttl { + return entry.pq, nil + } + + // Cache miss or expired — fetch and compile + return c.fetchAndCompile(ctx, url, query) +} + +func (c *PolicyCache) fetchAndCompile(ctx context.Context, url, query string) (rego.PreparedEvalQuery, error) { + c.mu.Lock() + defer c.mu.Unlock() + + // Double-check after acquiring write lock + if entry, ok := c.entries[url]; ok && entry.query == query && time.Since(entry.fetchedAt) < c.ttl { + return entry.pq, nil + } + + // Fetch rego source from URL + source, err := c.fetchPolicy(ctx, url) + if err != nil { + return rego.PreparedEvalQuery{}, fmt.Errorf("fetch %s: %w", url, err) + } + + // Compile + compiler, err := ast.CompileModulesWithOpt(map[string]string{"policy.rego": source}, ast.CompileOpts{}) + if err != nil { + return rego.PreparedEvalQuery{}, fmt.Errorf("compile %s: %w", url, err) + } + + pq, err := rego.New( + rego.Query(query), + rego.Compiler(compiler), + ).PrepareForEval(ctx) + if err != nil { + return rego.PreparedEvalQuery{}, fmt.Errorf("prepare query %s: %w", query, err) + } + + // Evict oldest if at capacity + if len(c.entries) >= c.maxSize { + var oldestKey string + var oldestTime time.Time + for k, v := range c.entries { + if oldestKey == "" || v.fetchedAt.Before(oldestTime) { + oldestKey = k + oldestTime = v.fetchedAt + } + } + if oldestKey != "" { + delete(c.entries, oldestKey) + } + } + + c.entries[url] = &cacheEntry{pq: pq, query: query, fetchedAt: time.Now()} + return pq, nil +} + +func (c *PolicyCache) fetchPolicy(ctx context.Context, url string) (string, error) { + reqCtx, cancel := context.WithTimeout(ctx, c.fetchTimeout) + defer cancel() + + req, err := http.NewRequestWithContext(reqCtx, http.MethodGet, url, nil) + if err != nil { + return "", err + } + + resp, err := http.DefaultClient.Do(req) + if err != nil { + return "", err + } + defer resp.Body.Close() + + if resp.StatusCode != http.StatusOK { + return "", fmt.Errorf("HTTP %d from %s", resp.StatusCode, url) + } + + body, err := io.ReadAll(io.LimitReader(resp.Body, c.maxFileSize)) + if err != nil { + return "", err + } + + return string(body), nil +} diff --git a/plugins/revenueflows/cmd/plugin.go b/plugins/revenueflows/cmd/plugin.go new file mode 100644 index 00000000..4356c1a3 --- /dev/null +++ b/plugins/revenueflows/cmd/plugin.go @@ -0,0 +1,19 @@ +// Package main provides the plugin entry point for the RevenueFlows middleware. +// Compiled as a Go plugin (.so) and loaded by beckn-onix at runtime. +package main + +import ( + "context" + "net/http" + + revenueflows "github.com/beckn-one/deg/plugins/revenueflows" +) + +type provider struct{} + +func (p provider) New(ctx context.Context, cfg map[string]string) (func(http.Handler) http.Handler, error) { + return revenueflows.NewMiddleware(cfg) +} + +// Provider is the exported symbol that beckn-onix plugin manager looks up. +var Provider = provider{} diff --git a/plugins/revenueflows/config.go b/plugins/revenueflows/config.go new file mode 100644 index 00000000..7594e62f --- /dev/null +++ b/plugins/revenueflows/config.go @@ -0,0 +1,133 @@ +package revenueflows + +import ( + "strconv" + "strings" + "time" +) + +// Config holds configuration for the RevenueFlows plugin. +type Config struct { + // Enabled controls whether the plugin is active. + Enabled bool + + // Actions is the list of beckn actions that trigger revenue flow computation. + // Default: ["on_status"] + Actions []string + + // CacheTTL is how long a compiled rego policy is cached before re-fetch. + // Default: 5 minutes. + CacheTTL time.Duration + + // MaxCacheEntries is the LRU bound on cached compiled policies. + // Default: 50. + MaxCacheEntries int + + // PolicyFetchTimeout is the HTTP timeout for fetching rego from a URL. + // Default: 30 seconds. + PolicyFetchTimeout time.Duration + + // MaxPolicySize is the maximum rego file size in bytes. + // Default: 1 MB. + MaxPolicySize int64 + + // DebugLogging enables verbose logging. + DebugLogging bool + + // AllowedDomains restricts which domains rego can be fetched from. + // Empty = allow all. Comma-separated list. + AllowedDomains []string +} + +// DefaultConfig returns a Config with sensible defaults. +func DefaultConfig() *Config { + return &Config{ + Enabled: true, + Actions: []string{"on_status"}, + CacheTTL: 5 * time.Minute, + MaxCacheEntries: 50, + PolicyFetchTimeout: 30 * time.Second, + MaxPolicySize: 1 << 20, // 1 MB + DebugLogging: false, + } +} + +// ParseConfig parses the plugin configuration map. +func ParseConfig(cfg map[string]string) (*Config, error) { + config := DefaultConfig() + + if enabled, ok := cfg["enabled"]; ok { + config.Enabled = enabled == "true" || enabled == "1" + } + + if actions, ok := cfg["actions"]; ok && actions != "" { + list := strings.Split(actions, ",") + config.Actions = make([]string, 0, len(list)) + for _, a := range list { + a = strings.TrimSpace(a) + if a != "" { + config.Actions = append(config.Actions, a) + } + } + } + + if ttl, ok := cfg["cacheTTL"]; ok && ttl != "" { + seconds, err := strconv.Atoi(ttl) + if err != nil { + d, err2 := time.ParseDuration(ttl) + if err2 != nil { + return nil, err + } + config.CacheTTL = d + } else { + config.CacheTTL = time.Duration(seconds) * time.Second + } + } + + if max, ok := cfg["maxCacheEntries"]; ok && max != "" { + n, err := strconv.Atoi(max) + if err != nil { + return nil, err + } + config.MaxCacheEntries = n + } + + if debug, ok := cfg["debugLogging"]; ok { + config.DebugLogging = debug == "true" || debug == "1" + } + + if domains, ok := cfg["allowedDomains"]; ok && domains != "" { + for _, d := range strings.Split(domains, ",") { + d = strings.TrimSpace(d) + if d != "" { + config.AllowedDomains = append(config.AllowedDomains, d) + } + } + } + + return config, nil +} + +// IsActionEnabled checks if the given action is in the configured list. +func (c *Config) IsActionEnabled(action string) bool { + for _, a := range c.Actions { + if a == action { + return true + } + } + return false +} + +// IsDomainAllowed checks if the URL domain is in the allowed list. +// Returns true if no domain restriction is configured. +func (c *Config) IsDomainAllowed(url string) bool { + if len(c.AllowedDomains) == 0 { + return true + } + for _, d := range c.AllowedDomains { + if strings.Contains(url, d) { + return true + } + } + return false +} diff --git a/plugins/revenueflows/inject.go b/plugins/revenueflows/inject.go new file mode 100644 index 00000000..143355ef --- /dev/null +++ b/plugins/revenueflows/inject.go @@ -0,0 +1,99 @@ +package revenueflows + +import ( + "encoding/json" + "fmt" + "strings" +) + +// PolicyRef holds the policy URL and OPA query path extracted from a message. +type PolicyRef struct { + URL string + QueryPath string +} + +// ExtractPolicyRef reads contractAttributes.policy.url and .queryPath from +// the message body. Returns nil if not present. +func ExtractPolicyRef(body []byte) *PolicyRef { + var envelope struct { + Message struct { + Contract struct { + ContractAttributes struct { + Policy struct { + URL string `json:"url"` + QueryPath string `json:"queryPath"` + } `json:"policy"` + } `json:"contractAttributes"` + } `json:"contract"` + } `json:"message"` + } + + if err := json.Unmarshal(body, &envelope); err != nil { + return nil + } + + url := envelope.Message.Contract.ContractAttributes.Policy.URL + qp := envelope.Message.Contract.ContractAttributes.Policy.QueryPath + if url == "" || qp == "" { + return nil + } + + return &PolicyRef{URL: url, QueryPath: qp} +} + +// ExtractAction reads the beckn action from the URL path or context.action. +func ExtractAction(urlPath string, body []byte) string { + // Try URL path first (e.g., /bpp/caller/on_status → on_status) + parts := strings.Split(strings.TrimRight(urlPath, "/"), "/") + if len(parts) > 0 { + action := parts[len(parts)-1] + if action != "" && action != "caller" && action != "receiver" { + return action + } + } + + // Fallback: parse context.action from body + var envelope struct { + Context struct { + Action string `json:"action"` + } `json:"context"` + } + if err := json.Unmarshal(body, &envelope); err == nil && envelope.Context.Action != "" { + return envelope.Context.Action + } + + return "" +} + +// InjectRevenueFlows sets message.contract.contractAttributes.revenueFlows +// in the JSON body and returns the modified bytes. +// Uses json.Number to preserve numeric precision. +func InjectRevenueFlows(body []byte, flows interface{}) ([]byte, error) { + // Use Decoder with UseNumber to preserve numeric precision + dec := json.NewDecoder(strings.NewReader(string(body))) + dec.UseNumber() + + var payload map[string]interface{} + if err := dec.Decode(&payload); err != nil { + return nil, fmt.Errorf("failed to decode body: %w", err) + } + + // Navigate: message → contract → contractAttributes + message, ok := payload["message"].(map[string]interface{}) + if !ok { + return nil, fmt.Errorf("message not found or not an object") + } + contract, ok := message["contract"].(map[string]interface{}) + if !ok { + return nil, fmt.Errorf("message.contract not found or not an object") + } + attrs, ok := contract["contractAttributes"].(map[string]interface{}) + if !ok { + return nil, fmt.Errorf("contractAttributes not found or not an object") + } + + // Inject + attrs["revenueFlows"] = flows + + return json.Marshal(payload) +} diff --git a/plugins/revenueflows/middleware.go b/plugins/revenueflows/middleware.go new file mode 100644 index 00000000..c40b6157 --- /dev/null +++ b/plugins/revenueflows/middleware.go @@ -0,0 +1,166 @@ +package revenueflows + +import ( + "bytes" + "encoding/json" + "fmt" + "io" + "net/http" + "strings" + + "github.com/open-policy-agent/opa/v1/rego" +) + +// NewMiddleware returns an HTTP middleware that computes revenue flows +// and injects them into the request body before passing to the next handler. +func NewMiddleware(cfg map[string]string) (func(http.Handler) http.Handler, error) { + config, err := ParseConfig(cfg) + if err != nil { + return nil, fmt.Errorf("revenueflows: config: %w", err) + } + + cache := NewPolicyCache( + config.MaxCacheEntries, + config.CacheTTL, + config.PolicyFetchTimeout, + config.MaxPolicySize, + ) + + fmt.Printf("[RevenueFlows] Middleware enabled=%v, actions=%v, cacheTTL=%s\n", + config.Enabled, config.Actions, config.CacheTTL) + + return func(next http.Handler) http.Handler { + return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + if !config.Enabled { + next.ServeHTTP(w, r) + return + } + + // Read body + body, err := io.ReadAll(r.Body) + if err != nil { + next.ServeHTTP(w, r) + return + } + + // Check action + action := extractActionFromPathAndBody(r.URL.Path, body) + if !config.IsActionEnabled(action) { + // Pass through unchanged + r.Body = io.NopCloser(bytes.NewReader(body)) + r.ContentLength = int64(len(body)) + next.ServeHTTP(w, r) + return + } + + // Extract policy reference + ref := ExtractPolicyRef(body) + if ref == nil { + r.Body = io.NopCloser(bytes.NewReader(body)) + r.ContentLength = int64(len(body)) + next.ServeHTTP(w, r) + return + } + + if !config.IsDomainAllowed(ref.URL) { + fmt.Printf("[RevenueFlows] WARN: policy URL domain not allowed: %s\n", ref.URL) + r.Body = io.NopCloser(bytes.NewReader(body)) + r.ContentLength = int64(len(body)) + next.ServeHTTP(w, r) + return + } + + if config.DebugLogging { + fmt.Printf("[RevenueFlows] Evaluating %s with query %s\n", ref.URL, ref.QueryPath) + } + + // Get or compile policy + pq, err := cache.GetOrCompile(r.Context(), ref.URL, ref.QueryPath) + if err != nil { + fmt.Printf("[RevenueFlows] WARN: failed to load policy: %v\n", err) + r.Body = io.NopCloser(bytes.NewReader(body)) + r.ContentLength = int64(len(body)) + next.ServeHTTP(w, r) + return + } + + // Parse as OPA input + var input interface{} + if err := json.Unmarshal(body, &input); err != nil { + fmt.Printf("[RevenueFlows] WARN: failed to parse body: %v\n", err) + r.Body = io.NopCloser(bytes.NewReader(body)) + r.ContentLength = int64(len(body)) + next.ServeHTTP(w, r) + return + } + + // Evaluate + rs, err := pq.Eval(r.Context(), regoEvalInput(input)) + if err != nil { + fmt.Printf("[RevenueFlows] WARN: rego eval failed: %v\n", err) + r.Body = io.NopCloser(bytes.NewReader(body)) + r.ContentLength = int64(len(body)) + next.ServeHTTP(w, r) + return + } + + flows := extractFlowsFromResultSet(rs) + if flows == nil { + if config.DebugLogging { + fmt.Printf("[RevenueFlows] No revenue_flows in result, skipping\n") + } + r.Body = io.NopCloser(bytes.NewReader(body)) + r.ContentLength = int64(len(body)) + next.ServeHTTP(w, r) + return + } + + // Inject into body + modified, err := InjectRevenueFlows(body, flows) + if err != nil { + fmt.Printf("[RevenueFlows] WARN: inject failed: %v\n", err) + r.Body = io.NopCloser(bytes.NewReader(body)) + r.ContentLength = int64(len(body)) + next.ServeHTTP(w, r) + return + } + + fmt.Printf("[RevenueFlows] Injected %d revenue flow(s) for action %s\n", len(flows), action) + + // Pass modified body to next handler + r.Body = io.NopCloser(bytes.NewReader(modified)) + r.ContentLength = int64(len(modified)) + next.ServeHTTP(w, r) + }) + }, nil +} + +// regoEvalInput wraps input for OPA evaluation. +func regoEvalInput(input interface{}) rego.EvalOption { + return rego.EvalInput(input) +} + +// extractFlowsFromResultSet pulls revenue_flows from OPA result. +func extractFlowsFromResultSet(rs rego.ResultSet) []interface{} { + return extractFlows(rs) +} + +// extractActionFromPathAndBody extracts action from URL path or body. +func extractActionFromPathAndBody(urlPath string, body []byte) string { + parts := strings.Split(strings.TrimRight(urlPath, "/"), "/") + if len(parts) > 0 { + action := parts[len(parts)-1] + if action != "" && action != "caller" && action != "receiver" { + return action + } + } + var envelope struct { + Context struct { + Action string `json:"action"` + } `json:"context"` + } + if err := json.Unmarshal(body, &envelope); err == nil && envelope.Context.Action != "" { + return envelope.Context.Action + } + return "" +} diff --git a/plugins/revenueflows/revenueflows.go b/plugins/revenueflows/revenueflows.go new file mode 100644 index 00000000..7f1bad50 --- /dev/null +++ b/plugins/revenueflows/revenueflows.go @@ -0,0 +1,152 @@ +// Package revenueflows is an onix Step plugin that computes revenue flows +// from a rego policy embedded in the contract and injects them into the message. +// +// It reads the policy URL and query path from +// message.contract.contractAttributes.policy, evaluates the rego against +// the full message, and writes the resulting revenue_flows array back into +// contractAttributes.revenueFlows. +// +// Soft failure: if anything goes wrong (fetch, compile, eval), the message +// passes through unmodified with a warning log. Never blocks delivery. +package revenueflows + +import ( + "context" + "encoding/json" + "fmt" + + "github.com/beckn-one/beckn-onix/pkg/log" + "github.com/beckn-one/beckn-onix/pkg/model" + "github.com/open-policy-agent/opa/v1/rego" +) + +// RevenueFlows is a Step plugin that computes and injects revenue flows. +type RevenueFlows struct { + config *Config + cache *PolicyCache +} + +// New creates a new RevenueFlows plugin instance. +func New(cfg map[string]string) (*RevenueFlows, error) { + config, err := ParseConfig(cfg) + if err != nil { + return nil, fmt.Errorf("revenueflows: config: %w", err) + } + + cache := NewPolicyCache( + config.MaxCacheEntries, + config.CacheTTL, + config.PolicyFetchTimeout, + config.MaxPolicySize, + ) + + fmt.Printf("[RevenueFlows] Enabled=%v, actions=%v, cacheTTL=%s\n", + config.Enabled, config.Actions, config.CacheTTL) + + return &RevenueFlows{config: config, cache: cache}, nil +} + +// Run implements the Step interface. +func (rf *RevenueFlows) Run(ctx *model.StepContext) error { + if !rf.config.Enabled { + return nil + } + + // Check action + action := ExtractAction(ctx.Request.URL.Path, ctx.Body) + if !rf.config.IsActionEnabled(action) { + if rf.config.DebugLogging { + log.Debugf(ctx, "RevenueFlows: action '%s' not enabled, skipping", action) + } + return nil + } + + // Extract policy reference from the message + ref := ExtractPolicyRef(ctx.Body) + if ref == nil { + if rf.config.DebugLogging { + log.Debug(ctx, "RevenueFlows: no contractAttributes.policy in message, skipping") + } + return nil + } + + // Check domain allowlist + if !rf.config.IsDomainAllowed(ref.URL) { + log.Warnf(ctx, "RevenueFlows: policy URL domain not allowed: %s", ref.URL) + return nil + } + + if rf.config.DebugLogging { + log.Debugf(ctx, "RevenueFlows: evaluating %s with query %s", ref.URL, ref.QueryPath) + } + + // Get or compile the policy + pq, err := rf.cache.GetOrCompile(context.Background(), ref.URL, ref.QueryPath) + if err != nil { + log.Warnf(ctx, "RevenueFlows: failed to load policy: %v", err) + return nil // soft failure + } + + // Parse message as OPA input + var input interface{} + if err := json.Unmarshal(ctx.Body, &input); err != nil { + log.Warnf(ctx, "RevenueFlows: failed to parse message body: %v", err) + return nil + } + + // Evaluate + rs, err := pq.Eval(context.Background(), rego.EvalInput(input)) + if err != nil { + log.Warnf(ctx, "RevenueFlows: rego evaluation failed: %v", err) + return nil // soft failure + } + + // Extract revenue_flows from result + flows := extractFlows(rs) + if flows == nil { + if rf.config.DebugLogging { + log.Debug(ctx, "RevenueFlows: no revenue_flows in rego result, skipping injection") + } + return nil + } + + // Inject into message body + modified, err := InjectRevenueFlows(ctx.Body, flows) + if err != nil { + log.Warnf(ctx, "RevenueFlows: failed to inject revenue_flows: %v", err) + return nil // soft failure + } + + ctx.Body = modified + log.Infof(ctx, "RevenueFlows: injected %d revenue flow(s)", len(flows)) + return nil +} + +// Close is a no-op cleanup function. +func (rf *RevenueFlows) Close() {} + +// extractFlows pulls revenue_flows from the OPA result set. +// The query evaluates to the full package object; we look for the +// "revenue_flows" key within it. +func extractFlows(rs rego.ResultSet) []interface{} { + if len(rs) == 0 || len(rs[0].Expressions) == 0 { + return nil + } + + val := rs[0].Expressions[0].Value + + // If the query returns the full package, result is a map + if m, ok := val.(map[string]interface{}); ok { + if flows, ok := m["revenue_flows"].([]interface{}); ok { + return flows + } + return nil + } + + // If the query targets revenue_flows directly, result is an array + if flows, ok := val.([]interface{}); ok { + return flows + } + + return nil +} diff --git a/plugins/revenueflows/revenueflows_test.go b/plugins/revenueflows/revenueflows_test.go new file mode 100644 index 00000000..e375b6a7 --- /dev/null +++ b/plugins/revenueflows/revenueflows_test.go @@ -0,0 +1,195 @@ +package revenueflows + +import ( + "encoding/json" + "testing" +) + +// --------------------------------------------------------------------------- +// ExtractPolicyRef tests +// --------------------------------------------------------------------------- + +func TestExtractPolicyRef_Present(t *testing.T) { + body := []byte(`{ + "message": { + "contract": { + "contractAttributes": { + "policy": { + "url": "https://example.com/policy.rego", + "queryPath": "data.test.violations" + } + } + } + } + }`) + + ref := ExtractPolicyRef(body) + if ref == nil { + t.Fatal("expected non-nil PolicyRef") + } + if ref.URL != "https://example.com/policy.rego" { + t.Errorf("URL = %q, want %q", ref.URL, "https://example.com/policy.rego") + } + if ref.QueryPath != "data.test.violations" { + t.Errorf("QueryPath = %q, want %q", ref.QueryPath, "data.test.violations") + } +} + +func TestExtractPolicyRef_Missing(t *testing.T) { + body := []byte(`{"message": {"contract": {}}}`) + ref := ExtractPolicyRef(body) + if ref != nil { + t.Errorf("expected nil PolicyRef, got %+v", ref) + } +} + +func TestExtractPolicyRef_PartialMissing(t *testing.T) { + body := []byte(`{ + "message": { + "contract": { + "contractAttributes": { + "policy": { "url": "https://example.com/policy.rego" } + } + } + } + }`) + ref := ExtractPolicyRef(body) + if ref != nil { + t.Errorf("expected nil PolicyRef when queryPath missing, got %+v", ref) + } +} + +// --------------------------------------------------------------------------- +// ExtractAction tests +// --------------------------------------------------------------------------- + +func TestExtractAction_FromPath(t *testing.T) { + action := ExtractAction("/bpp/caller/on_status", nil) + if action != "on_status" { + t.Errorf("action = %q, want %q", action, "on_status") + } +} + +func TestExtractAction_FromBody(t *testing.T) { + body := []byte(`{"context": {"action": "on_confirm"}}`) + action := ExtractAction("/bpp/caller", body) + if action != "on_confirm" { + t.Errorf("action = %q, want %q", action, "on_confirm") + } +} + +// --------------------------------------------------------------------------- +// InjectRevenueFlows tests +// --------------------------------------------------------------------------- + +func TestInjectRevenueFlows(t *testing.T) { + body := []byte(`{ + "context": {"action": "on_status"}, + "message": { + "contract": { + "contractAttributes": { + "@type": "DEGContract", + "policy": {"url": "test", "queryPath": "test"} + } + } + } + }`) + + flows := []interface{}{ + map[string]interface{}{"role": "buyer", "value": -525.0, "currency": "INR"}, + map[string]interface{}{"role": "seller", "value": 525.0, "currency": "INR"}, + } + + result, err := InjectRevenueFlows(body, flows) + if err != nil { + t.Fatalf("InjectRevenueFlows failed: %v", err) + } + + // Verify the result has revenueFlows + var payload map[string]interface{} + if err := json.Unmarshal(result, &payload); err != nil { + t.Fatalf("failed to parse result: %v", err) + } + + msg := payload["message"].(map[string]interface{}) + contract := msg["contract"].(map[string]interface{}) + attrs := contract["contractAttributes"].(map[string]interface{}) + rf, ok := attrs["revenueFlows"].([]interface{}) + if !ok { + t.Fatal("revenueFlows not found or wrong type") + } + if len(rf) != 2 { + t.Errorf("len(revenueFlows) = %d, want 2", len(rf)) + } + + // Verify existing fields preserved + if attrs["@type"] != "DEGContract" { + t.Errorf("@type lost after injection") + } +} + +func TestInjectRevenueFlows_NoContract(t *testing.T) { + body := []byte(`{"message": {}}`) + _, err := InjectRevenueFlows(body, []interface{}{}) + if err == nil { + t.Error("expected error when contract missing") + } +} + +// --------------------------------------------------------------------------- +// Config tests +// --------------------------------------------------------------------------- + +func TestParseConfig_Defaults(t *testing.T) { + cfg, err := ParseConfig(map[string]string{}) + if err != nil { + t.Fatalf("ParseConfig failed: %v", err) + } + if !cfg.Enabled { + t.Error("expected Enabled=true by default") + } + if len(cfg.Actions) != 1 || cfg.Actions[0] != "on_status" { + t.Errorf("Actions = %v, want [on_status]", cfg.Actions) + } +} + +func TestParseConfig_Custom(t *testing.T) { + cfg, err := ParseConfig(map[string]string{ + "actions": "on_status,on_confirm", + "cacheTTL": "600", + "maxCacheEntries": "100", + "debugLogging": "true", + "allowedDomains": "raw.githubusercontent.com,schema.beckn.io", + }) + if err != nil { + t.Fatalf("ParseConfig failed: %v", err) + } + if len(cfg.Actions) != 2 { + t.Errorf("Actions = %v, want 2 items", cfg.Actions) + } + if cfg.MaxCacheEntries != 100 { + t.Errorf("MaxCacheEntries = %d, want 100", cfg.MaxCacheEntries) + } + if !cfg.DebugLogging { + t.Error("expected DebugLogging=true") + } + if len(cfg.AllowedDomains) != 2 { + t.Errorf("AllowedDomains = %v, want 2 items", cfg.AllowedDomains) + } +} + +func TestIsDomainAllowed(t *testing.T) { + cfg := &Config{AllowedDomains: []string{"raw.githubusercontent.com"}} + if !cfg.IsDomainAllowed("https://raw.githubusercontent.com/beckn/DEG/policy.rego") { + t.Error("expected allowed") + } + if cfg.IsDomainAllowed("https://evil.com/policy.rego") { + t.Error("expected blocked") + } + + // Empty = allow all + cfg2 := &Config{} + if !cfg2.IsDomainAllowed("https://anything.com/policy.rego") { + t.Error("expected allowed when no restrictions") + } +}