Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions build/Dockerfile.onix-adapter-deg
Original file line number Diff line number Diff line change
Expand Up @@ -45,8 +45,8 @@ WORKDIR /workspace/deg-plugins
# Build DEG ledger recorder plugin
RUN go build -buildmode=plugin -o /workspace/beckn-onix/plugins/degledgerrecorder.so ./degledgerrecorder/cmd/plugin.go

# # Build policy enforcer plugin
# RUN go build -buildmode=plugin -o /workspace/beckn-onix/plugins/policyenforcer.so ./policyenforcer/cmd/plugin.go
# Build DEG revenue flows plugin
RUN go build -buildmode=plugin -o /workspace/beckn-onix/plugins/revenueflows.so ./revenueflows/cmd/plugin.go

# Create minimal runtime image
# Using debian-slim (glibc) for Go plugin (.so) compatibility with the bullseye builder.
Expand Down
7 changes: 4 additions & 3 deletions build/build-multiarch.sh
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
#!/bin/bash

# Multi-Architecture Docker Build Script for DEG Ledger Recorder Plugin
# Multi-Architecture Docker Build Script for DEG Plugins
# ======================================================================
# Builds onix-adapter with DEG plugins for linux/amd64 and linux/arm64
# Builds onix-adapter with DEG plugins (degledgerrecorder, revenueflows)
# for linux/amd64 and linux/arm64
#
# Prerequisites:
# - Docker Desktop or Docker Engine with buildx support
Expand Down Expand Up @@ -119,7 +120,7 @@ else
fi

echo "============================================"
echo "Multi-Arch Build: DEG Ledger Recorder"
echo "Multi-Arch Build: DEG Plugins (ledgerrecorder + revenueflows)"
echo "============================================"
echo "DEG Root: $DEG_ROOT"
echo "Beckn-ONIX Root: $BECKN_ONIX_ROOT"
Expand Down
13 changes: 8 additions & 5 deletions plugins/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -45,18 +45,21 @@ require (
github.com/xeipuuv/gojsonreference v0.0.0-20180127040603-bd5ef7bd5415 // indirect
github.com/yashtewari/glob-intersection v0.2.0 // indirect
go.opentelemetry.io/auto/sdk v1.2.1 // indirect
go.opentelemetry.io/otel v1.39.0 // indirect
go.opentelemetry.io/otel v1.40.0 // indirect
go.opentelemetry.io/otel/exporters/prometheus v0.46.0 // indirect
go.opentelemetry.io/otel/metric v1.39.0 // indirect
go.opentelemetry.io/otel/sdk v1.39.0 // indirect
go.opentelemetry.io/otel/sdk/metric v1.39.0 // indirect
go.opentelemetry.io/otel/trace v1.39.0 // indirect
go.opentelemetry.io/otel/log v0.16.0 // indirect
go.opentelemetry.io/otel/metric v1.40.0 // indirect
go.opentelemetry.io/otel/sdk v1.40.0 // indirect
go.opentelemetry.io/otel/sdk/log v0.16.0 // indirect
go.opentelemetry.io/otel/sdk/metric v1.40.0 // indirect
go.opentelemetry.io/otel/trace v1.40.0 // indirect
go.yaml.in/yaml/v2 v2.4.2 // indirect
go.yaml.in/yaml/v3 v3.0.4 // indirect
golang.org/x/sync v0.19.0 // indirect
golang.org/x/sys v0.40.0 // indirect
google.golang.org/protobuf v1.36.11 // indirect
gopkg.in/ini.v1 v1.67.1 // indirect
gopkg.in/natefinch/lumberjack.v2 v2.2.1 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
sigs.k8s.io/yaml v1.6.0 // indirect
)
36 changes: 20 additions & 16 deletions plugins/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -49,8 +49,8 @@ github.com/google/go-cmp v0.7.0 h1:wk8382ETsv4JYUZwIsn6YpYiWiBsYLSJiTsyBybVuN8=
github.com/google/go-cmp v0.7.0/go.mod h1:pXiqmnSA92OHEEa9HXL2W4E7lf9JzCmGVUdgjX3N/iU=
github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0=
github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
github.com/grpc-ecosystem/grpc-gateway/v2 v2.27.3 h1:NmZ1PKzSTQbuGHw9DGPFomqkkLWMC+vZCkfs+FHv1Vg=
github.com/grpc-ecosystem/grpc-gateway/v2 v2.27.3/go.mod h1:zQrxl1YP88HQlA6i9c63DSVPFklWpGX4OWAc9bFuaH4=
github.com/grpc-ecosystem/grpc-gateway/v2 v2.27.7 h1:X+2YciYSxvMQK0UZ7sg45ZVabVZBeBuvMkmuI2V3Fak=
github.com/grpc-ecosystem/grpc-gateway/v2 v2.27.7/go.mod h1:lW34nIZuQ8UDPdkon5fmfp2l3+ZkQ2me/+oecHYLOII=
github.com/klauspost/compress v1.18.2 h1:iiPHWW0YrcFgpBYhsA6D1+fqHssJscY/Tm/y2Uqnapk=
github.com/klauspost/compress v1.18.2/go.mod h1:R0h/fSBs8DE4ENlcrlib3PsXS61voFxhIs2DeRhCvJ4=
github.com/kr/pretty v0.3.1 h1:flRD4NNwYAUpkphVc1HcthR4KEIFJ65n8Mw5qdRn3LE=
Expand Down Expand Up @@ -135,8 +135,8 @@ go.opentelemetry.io/auto/sdk v1.2.1 h1:jXsnJ4Lmnqd11kwkBV2LgLoFMZKizbCi5fNZ/ipaZ
go.opentelemetry.io/auto/sdk v1.2.1/go.mod h1:KRTj+aOaElaLi+wW1kO/DZRXwkF4C5xPbEe3ZiIhN7Y=
go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.64.0 h1:ssfIgGNANqpVFCndZvcuyKbl0g+UAVcbBcqGkG28H0Y=
go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.64.0/go.mod h1:GQ/474YrbE4Jx8gZ4q5I4hrhUzM6UPzyrqJYV2AqPoQ=
go.opentelemetry.io/otel v1.39.0 h1:8yPrr/S0ND9QEfTfdP9V+SiwT4E0G7Y5MO7p85nis48=
go.opentelemetry.io/otel v1.39.0/go.mod h1:kLlFTywNWrFyEdH0oj2xK0bFYZtHRYUdv1NklR/tgc8=
go.opentelemetry.io/otel v1.40.0 h1:oA5YeOcpRTXq6NN7frwmwFR0Cn3RhTVZvXsP4duvCms=
go.opentelemetry.io/otel v1.40.0/go.mod h1:IMb+uXZUKkMXdPddhwAHm6UfOwJyh4ct1ybIlV14J0g=
go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.39.0 h1:f0cb2XPmrqn4XMy9PNliTgRKJgS5WcL/u0/WRYGz4t0=
go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.39.0/go.mod h1:vnakAaFckOMiMtOIhFI2MNH4FYrZzXCYxmb1LlhoGz8=
go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc v1.39.0 h1:in9O8ESIOlwJAEGTkkf34DesGRAc/Pn8qJ7k3r/42LM=
Expand All @@ -145,14 +145,18 @@ go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracehttp v1.39.0 h1:Ckwye
go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracehttp v1.39.0/go.mod h1:teIFJh5pW2y+AN7riv6IBPX2DuesS3HgP39mwOspKwU=
go.opentelemetry.io/otel/exporters/prometheus v0.46.0 h1:I8WIFXR351FoLJYuloU4EgXbtNX2URfU/85pUPheIEQ=
go.opentelemetry.io/otel/exporters/prometheus v0.46.0/go.mod h1:ztwVUHe5DTR/1v7PeuGRnU5Bbd4QKYwApWmuutKsJSs=
go.opentelemetry.io/otel/metric v1.39.0 h1:d1UzonvEZriVfpNKEVmHXbdf909uGTOQjA0HF0Ls5Q0=
go.opentelemetry.io/otel/metric v1.39.0/go.mod h1:jrZSWL33sD7bBxg1xjrqyDjnuzTUB0x1nBERXd7Ftcs=
go.opentelemetry.io/otel/sdk v1.39.0 h1:nMLYcjVsvdui1B/4FRkwjzoRVsMK8uL/cj0OyhKzt18=
go.opentelemetry.io/otel/sdk v1.39.0/go.mod h1:vDojkC4/jsTJsE+kh+LXYQlbL8CgrEcwmt1ENZszdJE=
go.opentelemetry.io/otel/sdk/metric v1.39.0 h1:cXMVVFVgsIf2YL6QkRF4Urbr/aMInf+2WKg+sEJTtB8=
go.opentelemetry.io/otel/sdk/metric v1.39.0/go.mod h1:xq9HEVH7qeX69/JnwEfp6fVq5wosJsY1mt4lLfYdVew=
go.opentelemetry.io/otel/trace v1.39.0 h1:2d2vfpEDmCJ5zVYz7ijaJdOF59xLomrvj7bjt6/qCJI=
go.opentelemetry.io/otel/trace v1.39.0/go.mod h1:88w4/PnZSazkGzz/w84VHpQafiU4EtqqlVdxWy+rNOA=
go.opentelemetry.io/otel/log v0.16.0 h1:DeuBPqCi6pQwtCK0pO4fvMB5eBq6sNxEnuTs88pjsN4=
go.opentelemetry.io/otel/log v0.16.0/go.mod h1:rWsmqNVTLIA8UnwYVOItjyEZDbKIkMxdQunsIhpUMes=
go.opentelemetry.io/otel/metric v1.40.0 h1:rcZe317KPftE2rstWIBitCdVp89A2HqjkxR3c11+p9g=
go.opentelemetry.io/otel/metric v1.40.0/go.mod h1:ib/crwQH7N3r5kfiBZQbwrTge743UDc7DTFVZrrXnqc=
go.opentelemetry.io/otel/sdk v1.40.0 h1:KHW/jUzgo6wsPh9At46+h4upjtccTmuZCFAc9OJ71f8=
go.opentelemetry.io/otel/sdk v1.40.0/go.mod h1:Ph7EFdYvxq72Y8Li9q8KebuYUr2KoeyHx0DRMKrYBUE=
go.opentelemetry.io/otel/sdk/log v0.16.0 h1:e/b4bdlQwC5fnGtG3dlXUrNOnP7c8YLVSpSfEBIkTnI=
go.opentelemetry.io/otel/sdk/log v0.16.0/go.mod h1:JKfP3T6ycy7QEuv3Hj8oKDy7KItrEkus8XJE6EoSzw4=
go.opentelemetry.io/otel/sdk/metric v1.40.0 h1:mtmdVqgQkeRxHgRv4qhyJduP3fYJRMX4AtAlbuWdCYw=
go.opentelemetry.io/otel/sdk/metric v1.40.0/go.mod h1:4Z2bGMf0KSK3uRjlczMOeMhKU2rhUqdWNoKcYrtcBPg=
go.opentelemetry.io/otel/trace v1.40.0 h1:WA4etStDttCSYuhwvEa8OP8I5EWu24lkOzp+ZYblVjw=
go.opentelemetry.io/otel/trace v1.40.0/go.mod h1:zeAhriXecNGP/s2SEG3+Y8X9ujcJOTqQ5RgdEJcawiA=
go.opentelemetry.io/proto/otlp v1.9.0 h1:l706jCMITVouPOqEnii2fIAuO3IVGBRPV5ICjceRb/A=
go.opentelemetry.io/proto/otlp v1.9.0/go.mod h1:xE+Cx5E/eEHw+ISFkwPLwCZefwVjY+pqKg1qcK03+/4=
go.uber.org/goleak v1.3.0 h1:2K3zAYmnTNqV73imy9J1T3WC+gmCePx2hEGkimedGto=
Expand All @@ -178,10 +182,10 @@ golang.org/x/text v0.33.0 h1:B3njUFyqtHDUI5jMn1YIr5B0IE2U0qck04r6d4KPAxE=
golang.org/x/text v0.33.0/go.mod h1:LuMebE6+rBincTi9+xWTY8TztLzKHc/9C1uBCG27+q8=
golang.org/x/tools v0.40.0 h1:yLkxfA+Qnul4cs9QA3KnlFu0lVmd8JJfoq+E41uSutA=
golang.org/x/tools v0.40.0/go.mod h1:Ik/tzLRlbscWpqqMRjyWYDisX8bG13FrdXp3o4Sr9lc=
google.golang.org/genproto/googleapis/api v0.0.0-20251202230838-ff82c1b0f217 h1:fCvbg86sFXwdrl5LgVcTEvNC+2txB5mgROGmRL5mrls=
google.golang.org/genproto/googleapis/api v0.0.0-20251202230838-ff82c1b0f217/go.mod h1:+rXWjjaukWZun3mLfjmVnQi18E1AsFbDN9QdJ5YXLto=
google.golang.org/genproto/googleapis/rpc v0.0.0-20251202230838-ff82c1b0f217 h1:gRkg/vSppuSQoDjxyiGfN4Upv/h/DQmIR10ZU8dh4Ww=
google.golang.org/genproto/googleapis/rpc v0.0.0-20251202230838-ff82c1b0f217/go.mod h1:7i2o+ce6H/6BluujYR+kqX3GKH+dChPTQU19wjRPiGk=
google.golang.org/genproto/googleapis/api v0.0.0-20260128011058-8636f8732409 h1:merA0rdPeUV3YIIfHHcH4qBkiQAc1nfCKSI7lB4cV2M=
google.golang.org/genproto/googleapis/api v0.0.0-20260128011058-8636f8732409/go.mod h1:fl8J1IvUjCilwZzQowmw2b7HQB2eAuYBabMXzWurF+I=
google.golang.org/genproto/googleapis/rpc v0.0.0-20260128011058-8636f8732409 h1:H86B94AW+VfJWDqFeEbBPhEtHzJwJfTbgE2lZa54ZAQ=
google.golang.org/genproto/googleapis/rpc v0.0.0-20260128011058-8636f8732409/go.mod h1:j9x/tPzZkyxcgEFkiKEEGxfvyumM01BEtsW8xzOahRQ=
google.golang.org/grpc v1.78.0 h1:K1XZG/yGDJnzMdd/uZHAkVqJE+xIDOcmdSFZkBUicNc=
google.golang.org/grpc v1.78.0/go.mod h1:I47qjTo4OKbMkjA/aOOwxDIiPSBofUtQUI5EfpWvW7U=
google.golang.org/protobuf v1.36.11 h1:fV6ZwhNocDyBLK0dj+fg8ektcVegBBuEolpbTQyBNVE=
Expand Down
133 changes: 133 additions & 0 deletions plugins/revenueflows/cache.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,133 @@
package revenueflows

import (
"context"
"fmt"
"io"
"net/http"
"sync"
"time"

"github.com/open-policy-agent/opa/v1/ast"
"github.com/open-policy-agent/opa/v1/rego"
)

// cacheEntry holds a compiled OPA query and its metadata.
type cacheEntry struct {
pq rego.PreparedEvalQuery
query string
fetchedAt time.Time
}

// PolicyCache is a TTL-based LRU cache for compiled rego policies.
// Keyed by policy URL.
type PolicyCache struct {
mu sync.RWMutex
entries map[string]*cacheEntry
maxSize int
ttl time.Duration

fetchTimeout time.Duration
maxFileSize int64
}

// NewPolicyCache creates a new cache.
func NewPolicyCache(maxSize int, ttl, fetchTimeout time.Duration, maxFileSize int64) *PolicyCache {
return &PolicyCache{
entries: make(map[string]*cacheEntry),
maxSize: maxSize,
ttl: ttl,
fetchTimeout: fetchTimeout,
maxFileSize: maxFileSize,
}
}

// GetOrCompile returns a compiled query for the given policy URL and OPA query path.
// Fetches and compiles on cache miss or TTL expiry.
func (c *PolicyCache) GetOrCompile(ctx context.Context, url, query string) (rego.PreparedEvalQuery, error) {
c.mu.RLock()
entry, ok := c.entries[url]
c.mu.RUnlock()

if ok && entry.query == query && time.Since(entry.fetchedAt) < c.ttl {
return entry.pq, nil
}

// Cache miss or expired — fetch and compile
return c.fetchAndCompile(ctx, url, query)
}

func (c *PolicyCache) fetchAndCompile(ctx context.Context, url, query string) (rego.PreparedEvalQuery, error) {
c.mu.Lock()
defer c.mu.Unlock()

// Double-check after acquiring write lock
if entry, ok := c.entries[url]; ok && entry.query == query && time.Since(entry.fetchedAt) < c.ttl {
return entry.pq, nil
}

// Fetch rego source from URL
source, err := c.fetchPolicy(ctx, url)
if err != nil {
return rego.PreparedEvalQuery{}, fmt.Errorf("fetch %s: %w", url, err)
}

// Compile
compiler, err := ast.CompileModulesWithOpt(map[string]string{"policy.rego": source}, ast.CompileOpts{})
if err != nil {
return rego.PreparedEvalQuery{}, fmt.Errorf("compile %s: %w", url, err)
}

pq, err := rego.New(
rego.Query(query),
rego.Compiler(compiler),
).PrepareForEval(ctx)
if err != nil {
return rego.PreparedEvalQuery{}, fmt.Errorf("prepare query %s: %w", query, err)
}

// Evict oldest if at capacity
if len(c.entries) >= c.maxSize {
var oldestKey string
var oldestTime time.Time
for k, v := range c.entries {
if oldestKey == "" || v.fetchedAt.Before(oldestTime) {
oldestKey = k
oldestTime = v.fetchedAt
}
}
if oldestKey != "" {
delete(c.entries, oldestKey)
}
}

c.entries[url] = &cacheEntry{pq: pq, query: query, fetchedAt: time.Now()}
return pq, nil
}

func (c *PolicyCache) fetchPolicy(ctx context.Context, url string) (string, error) {
reqCtx, cancel := context.WithTimeout(ctx, c.fetchTimeout)
defer cancel()

req, err := http.NewRequestWithContext(reqCtx, http.MethodGet, url, nil)
if err != nil {
return "", err
}

resp, err := http.DefaultClient.Do(req)
if err != nil {
return "", err
}
defer resp.Body.Close()

if resp.StatusCode != http.StatusOK {
return "", fmt.Errorf("HTTP %d from %s", resp.StatusCode, url)
}

body, err := io.ReadAll(io.LimitReader(resp.Body, c.maxFileSize))
if err != nil {
return "", err
}

return string(body), nil
}
19 changes: 19 additions & 0 deletions plugins/revenueflows/cmd/plugin.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
// Package main provides the plugin entry point for the RevenueFlows middleware.
// Compiled as a Go plugin (.so) and loaded by beckn-onix at runtime.
package main

import (
"context"
"net/http"

revenueflows "github.com/beckn-one/deg/plugins/revenueflows"
)

type provider struct{}

func (p provider) New(ctx context.Context, cfg map[string]string) (func(http.Handler) http.Handler, error) {
return revenueflows.NewMiddleware(cfg)
}

// Provider is the exported symbol that beckn-onix plugin manager looks up.
var Provider = provider{}
Loading