From b2dd29b443efe83eadf432a36a637e094dca781f Mon Sep 17 00:00:00 2001 From: Jordan Krage Date: Wed, 11 Mar 2026 16:38:34 -0500 Subject: [PATCH] pkg/settings/limits: add RangeLimiter --- pkg/settings/cresettings/README.md | 3 +- pkg/settings/cresettings/defaults.json | 3 +- pkg/settings/cresettings/defaults.toml | 1 + pkg/settings/cresettings/settings.go | 6 +- pkg/settings/limits/errors.go | 24 +++ pkg/settings/limits/factory.go | 10 + pkg/settings/limits/range.go | 245 +++++++++++++++++++++++++ pkg/settings/limits/range_test.go | 136 ++++++++++++++ pkg/settings/range.go | 40 ++++ pkg/settings/settings.go | 13 +- 10 files changed, 477 insertions(+), 4 deletions(-) create mode 100644 pkg/settings/limits/range.go create mode 100644 pkg/settings/limits/range_test.go create mode 100644 pkg/settings/range.go diff --git a/pkg/settings/cresettings/README.md b/pkg/settings/cresettings/README.md index 6ffbcdd5ab..66ac96edb9 100644 --- a/pkg/settings/cresettings/README.md +++ b/pkg/settings/cresettings/README.md @@ -113,8 +113,9 @@ flowchart PerWorkflow.ExecutionResponseLimit{{PerWorkflow.ExecutionResponseLimit}}:::bound PerWorkflow.ExecutionTimestampsEnabled[/PerWorkflow.ExecutionTimestampsEnabled\]:::gate PerWorkflow.FeatureMultiTriggerExecutionIDsActiveAt[/PerWorkflow.FeatureMultiTriggerExecutionIDsActiveAt\]:::gate + PerWorkflow.FeatureMultiTriggerExecutionIDsActivePeriod[/PerWorkflow.FeatureMultiTriggerExecutionIDsActivePeriod\]:::gate - PerWorkflow.ExecutionTimestampsEnabled-->PerWorkflow.FeatureMultiTriggerExecutionIDsActiveAt-->PerWorkflow.ExecutionTimeout-->PerWorkflow.ExecutionResponseLimit + PerWorkflow.ExecutionTimestampsEnabled-->PerWorkflow.FeatureMultiTriggerExecutionIDsActivePeriod-->PerWorkflow.ExecutionTimeout-->PerWorkflow.ExecutionResponseLimit end subgraph ExecutionHelper.GetSecrets diff --git a/pkg/settings/cresettings/defaults.json b/pkg/settings/cresettings/defaults.json index 0d85a96a93..d3add960cd 100644 --- a/pkg/settings/cresettings/defaults.json +++ b/pkg/settings/cresettings/defaults.json @@ -109,6 +109,7 @@ "Secrets": { "CallLimit": "5" }, - "FeatureMultiTriggerExecutionIDsActiveAt": "2100-01-01 00:00:00 +0000 UTC" + "FeatureMultiTriggerExecutionIDsActiveAt": "2100-01-01 00:00:00 +0000 UTC", + "FeatureMultiTriggerExecutionIDsActivePeriod": "[2100-01-01 00:00:00 +0000 UTC,2101-01-01 00:00:00 +0000 UTC]" } } \ No newline at end of file diff --git a/pkg/settings/cresettings/defaults.toml b/pkg/settings/cresettings/defaults.toml index 868b22f0c5..eff679a474 100644 --- a/pkg/settings/cresettings/defaults.toml +++ b/pkg/settings/cresettings/defaults.toml @@ -51,6 +51,7 @@ WASMSecretsSizeLimit = '1mb' LogLineLimit = '1kb' LogEventLimit = '1000' FeatureMultiTriggerExecutionIDsActiveAt = '2100-01-01 00:00:00 +0000 UTC' +FeatureMultiTriggerExecutionIDsActivePeriod = '[2100-01-01 00:00:00 +0000 UTC,2101-01-01 00:00:00 +0000 UTC]' [PerWorkflow.ChainAllowed] Default = 'false' diff --git a/pkg/settings/cresettings/settings.go b/pkg/settings/cresettings/settings.go index a520e59525..f506be7ed5 100644 --- a/pkg/settings/cresettings/settings.go +++ b/pkg/settings/cresettings/settings.go @@ -205,6 +205,9 @@ var Default = Schema{ }, FeatureMultiTriggerExecutionIDsActiveAt: Time(time.Date(2100, 1, 1, 0, 0, 0, 0, time.UTC)), + FeatureMultiTriggerExecutionIDsActivePeriod: TimeRange( + time.Date(2100, 1, 1, 0, 0, 0, 0, time.UTC), + time.Date(2101, 1, 1, 0, 0, 0, 0, time.UTC)), }, } @@ -288,7 +291,8 @@ type Workflows struct { ConfidentialHTTP confidentialHTTP Secrets secrets - FeatureMultiTriggerExecutionIDsActiveAt Setting[config.Timestamp] + FeatureMultiTriggerExecutionIDsActiveAt Setting[config.Timestamp] // Deprecated + FeatureMultiTriggerExecutionIDsActivePeriod Setting[Range[config.Timestamp]] } type cronTrigger struct { diff --git a/pkg/settings/limits/errors.go b/pkg/settings/limits/errors.go index 0aede8454e..55fd371a6a 100644 --- a/pkg/settings/limits/errors.go +++ b/pkg/settings/limits/errors.go @@ -120,6 +120,30 @@ func (e ErrorBoundLimited[N]) Error() string { return fmt.Sprintf("%slimited%s: cannot use %v, limit is %v", which, who, e.Amount, e.Limit) } +type ErrorRangeLimited[N Number] struct { + Key string + + Scope settings.Scope + Tenant string + + Limit settings.Range[N] + Amount N +} + +func (e ErrorRangeLimited[N]) GRPCStatus() *status.Status { + return status.New(codes.ResourceExhausted, e.Error()) +} + +func (e ErrorRangeLimited[N]) Is(target error) bool { + _, ok := target.(ErrorRangeLimited[N]) //nolint:errcheck // implementing errors.Is + return ok +} + +func (e ErrorRangeLimited[N]) Error() string { + which, who := errArgs(e.Key, e.Scope, e.Tenant) + return fmt.Sprintf("%slimited%s: cannot use %v, limited to range %v", which, who, e.Amount, e.Limit) +} + type ErrorQueueFull struct { Key string diff --git a/pkg/settings/limits/factory.go b/pkg/settings/limits/factory.go index 5a4aebe07f..4dca12e0da 100644 --- a/pkg/settings/limits/factory.go +++ b/pkg/settings/limits/factory.go @@ -100,6 +100,16 @@ func MakeLowerBoundLimiter[N Number](f Factory, bound settings.IsSetting[N]) (Bo return newBoundLimiter(f, bound.GetSpec(), true) } +// MakeRangeLimiter returns a RangeLimiter for the given bound and configured by the Factory. +// If Meter is set, the following metrics will be emitted +// - range.*.lower.limit - gauge +// - range.*.upper.limit - gauge +// - range.*.usage - histogram +// - range.*.denied - histogram +func MakeRangeLimiter[N Number](f Factory, bound settings.IsSetting[settings.Range[N]]) (RangeLimiter[N], error) { + return newRangeLimiter(f, bound.GetSpec()) +} + // MakeQueueLimiter returns a QueueLimiter for the given limit and configured by the Factory. // If Meter is set, the following metrics will be emitted // - queue.*.limit - int gauge diff --git a/pkg/settings/limits/range.go b/pkg/settings/limits/range.go new file mode 100644 index 0000000000..f3ee21dc20 --- /dev/null +++ b/pkg/settings/limits/range.go @@ -0,0 +1,245 @@ +package limits + +import ( + "context" + "errors" + "fmt" + "sync" + "sync/atomic" + + "go.opentelemetry.io/otel/metric" + + "github.com/smartcontractkit/chainlink-common/pkg/contexts" + "github.com/smartcontractkit/chainlink-common/pkg/logger" + "github.com/smartcontractkit/chainlink-common/pkg/services" + "github.com/smartcontractkit/chainlink-common/pkg/settings" +) + +// BoundLimiter is a limiter for simple bounds checks. +type RangeLimiter[N Number] interface { + Limiter[settings.Range[N]] + // Check returns ErrorBoundLimited if the value is above the limit. + Check(context.Context, N) error +} + +// NewRangeLimiter returns a RangeLimiter with the given lower bounds. +func NewRangeLimiter[N Number](bounds settings.Range[N]) RangeLimiter[N] { + return &simpleRangeLimiter[N]{bounds: bounds} +} + +var _ RangeLimiter[int64] = &simpleRangeLimiter[int64]{} + +type simpleRangeLimiter[N Number] struct { + bounds settings.Range[N] + closed atomic.Bool +} + +func (s *simpleRangeLimiter[N]) Close() error { s.closed.Store(true); return nil } + +func (s *simpleRangeLimiter[N]) Check(ctx context.Context, n N) error { + if s.closed.Load() { + return errors.New("closed") + } + if !s.bounds.Contains(n) { + return ErrorRangeLimited[N]{Limit: s.bounds, Amount: n} + } + return nil +} +func (s *simpleRangeLimiter[N]) Limit(ctx context.Context) (settings.Range[N], error) { + return s.bounds, nil +} + +func newRangeLimiter[N Number](f Factory, bound settings.SettingSpec[settings.Range[N]]) (RangeLimiter[N], error) { + b := &rangeLimiter[N]{ + updater: newUpdater[settings.Range[N]](nil, func(ctx context.Context) (settings.Range[N], error) { + return bound.GetOrDefault(ctx, f.Settings) + }, nil), + key: bound.GetKey(), + scope: bound.GetScope(), + } + b.updater.recordLimit = func(ctx context.Context, n settings.Range[N]) { b.recordBound(ctx, n) } + + if f.Meter != nil { + if b.key == "" { + return nil, errors.New("metrics require Key to be set") + } + newGauge, newHist := metricConstructors[N](f.Meter, bound.GetUnit()) + + key := bound.GetKey() + lowerLimitGauge, err := newGauge("range." + key + ".lower.limit") + if err != nil { + return nil, err + } + upperLimitGauge, err := newGauge("range." + key + ".upper.limit") + if err != nil { + return nil, err + } + b.recordBound = func(ctx context.Context, value settings.Range[N], options ...metric.RecordOption) { + lowerLimitGauge.Record(ctx, value.Lower, options...) + upperLimitGauge.Record(ctx, value.Upper, options...) + } + usageHist, err := newHist("range." + key + ".usage") + if err != nil { + return nil, err + } + b.recordUsage = func(ctx context.Context, value N, options ...metric.RecordOption) { + usageHist.Record(ctx, value, options...) + } + deniedHist, err := newHist("range." + key + ".denied") + if err != nil { + return nil, err + } + b.recordDenied = func(ctx context.Context, value N, options ...metric.RecordOption) { + deniedHist.Record(ctx, value, options...) + } + } else { + b.recordBound = func(ctx context.Context, value settings.Range[N], options ...metric.RecordOption) {} + b.recordUsage = func(ctx context.Context, value N, options ...metric.RecordOption) {} + b.recordDenied = func(ctx context.Context, value N, options ...metric.RecordOption) {} + } + + if f.Logger != nil { + b.lggr = logger.Sugared(f.Logger).Named("RangeLimiter").With("key", bound.GetKey()) + } + + if f.Settings != nil { + if r, ok := f.Settings.(settings.Registry); ok { + b.subFn = func(ctx context.Context) (<-chan settings.Update[settings.Range[N]], func()) { + return bound.Subscribe(ctx, r) + } + } + } + + if bound.GetScope() == settings.ScopeGlobal { + go b.updateLoop(context.Background()) + } + + return b, nil +} + +type rangeLimiter[N Number] struct { + *updater[settings.Range[N]] + + key string // optional + scope settings.Scope + + recordBound func(ctx context.Context, value settings.Range[N], options ...metric.RecordOption) + recordUsage func(ctx context.Context, value N, options ...metric.RecordOption) + recordDenied func(ctx context.Context, value N, options ...metric.RecordOption) + + // opt: reap after period of non-use + updaters sync.Map // map[string]*updater[Range[N]] + wg services.WaitGroup // tracks and blocks updaters background routines +} + +func (b *rangeLimiter[N]) Close() (err error) { + b.wg.Wait() + + // cleanup + if b.scope == settings.ScopeGlobal { + return b.updater.Close() + } else { + b.updaters.Range(func(key, value any) bool { + // opt: parallelize + err = errors.Join(err, value.(*updater[settings.Range[N]]).Close()) + return true + }) + } + return +} + +// Deprecated: use TryCleanup +func (b *rangeLimiter[N]) EvictTenant(tenant string) error { + v, loaded := b.updaters.LoadAndDelete(tenant) + if !loaded { + return nil + } + return v.(*updater[settings.Range[N]]).Close() +} + +func (b *rangeLimiter[N]) cleanup(ctx context.Context) { + tenant := b.scope.Value(ctx) + if tenant == "" { + b.lggr.Warnw("Unable to cleanup scoped bounds limiter due to missing tenant", "scope", b.scope) + return + } + v, loaded := b.updaters.LoadAndDelete(tenant) + if !loaded { + return + } + if err := v.(*updater[settings.Range[N]]).Close(); err != nil { + b.lggr.Errorw("Failed to close bounds limiter", "tenant", tenant, "err", err) + } +} + +func (b *rangeLimiter[N]) Check(ctx context.Context, amount N) error { + if err := b.wg.TryAdd(1); err != nil { + return err + } + defer b.wg.Done() + + tenant, bound, err := b.get(ctx) + if err != nil { + return err + } + if tenant == "" && b.scope != settings.ScopeGlobal { + return nil // fail open + } + if !bound.Contains(amount) { + b.recordDenied(ctx, amount, withScope(ctx, b.scope)) + return ErrorRangeLimited[N]{Key: b.key, Scope: b.scope, Tenant: tenant, Limit: bound, Amount: amount} + } + + b.recordUsage(ctx, amount, withScope(ctx, b.scope)) + return nil +} + +func (b *rangeLimiter[N]) Limit(ctx context.Context) (settings.Range[N], error) { + var zero settings.Range[N] + if err := b.wg.TryAdd(1); err != nil { + return zero, err + } + defer b.wg.Done() + + tenant, bound, err := b.get(ctx) + if err != nil { + return zero, err + } + if tenant == "" && b.scope != settings.ScopeGlobal { + return zero, nil // fail open + } + + return bound, nil +} + +func (b *rangeLimiter[N]) get(ctx context.Context) (tenant string, bound settings.Range[N], err error) { + if b.scope != settings.ScopeGlobal { + tenant = b.scope.Value(ctx) + if tenant == "" { + if !b.scope.IsTenantRequired() { + kvs := contexts.CREValue(ctx).LoggerKVs() + b.lggr.Warnw("Unable to get scoped bounds limit due to missing tenant: failing open", append([]any{"scope", b.scope}, kvs...)...) + return + } + err = fmt.Errorf("unable to get scoped bounds limit due to missing tenant for scope: %s", b.scope) + return + } + + u := newUpdater(b.lggr, b.getLimitFn, b.subFn) + actual, loaded := b.updaters.LoadOrStore(tenant, u) + creCtx := contexts.WithCRE(ctx, b.scope.RoundCRE(contexts.CREValue(ctx))) + if !loaded { + go u.updateLoop(creCtx) + } else { + u = actual.(*updater[settings.Range[N]]) + u.updateCtx(creCtx) + } + } + + bound, err = b.getLimitFn(ctx) + if err != nil { + b.lggr.Errorw("Failed to get limit. Using default value", "default", bound, "err", err) + } + b.recordBound(ctx, bound, withScope(ctx, b.scope)) + return +} diff --git a/pkg/settings/limits/range_test.go b/pkg/settings/limits/range_test.go new file mode 100644 index 0000000000..74ac61ebb0 --- /dev/null +++ b/pkg/settings/limits/range_test.go @@ -0,0 +1,136 @@ +package limits + +import ( + "context" + "fmt" + "testing" + "time" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/sdk/metric/metricdata" + + "github.com/smartcontractkit/chainlink-common/pkg/config" + "github.com/smartcontractkit/chainlink-common/pkg/contexts" + "github.com/smartcontractkit/chainlink-common/pkg/settings" +) + +func ExampleRangeLimiter_Check() { + bl := NewRangeLimiter(settings.Range[int]{Lower: 1, Upper: 10}) + fn := func(n int) { + if err := bl.Check(context.Background(), n); err != nil { + fmt.Println(err) + return + } + fmt.Println("used", n) + } + fn(11) + fn(4) + fn(10) + // Output: + // limited: cannot use 11, limited to range [1,10] + // used 4 + // used 10 +} + +func TestMakeRangeLimiter(t *testing.T) { + t.Parallel() + + for _, tt := range []struct { + scope settings.Scope + cre contexts.CRE + }{ + {settings.ScopeGlobal, contexts.CRE{}}, + {settings.ScopeOwner, contexts.CRE{Owner: "ow-id"}}, + } { + t.Run(tt.scope.String(), func(t *testing.T) { + t.Parallel() + mc := newMetricsChecker(t) + f := Factory{Meter: mc.Meter(t.Name())} + limit := settings.TimeRange( + time.Date(2022, 1, 1, 0, 0, 0, 0, time.UTC), + time.Date(2022, 1, 1, 0, 0, 0, 0, time.UTC).Add(10*time.Hour)) + limit.Key = "foo.bar" + limit.Scope = tt.scope + bl, err := MakeRangeLimiter[config.Timestamp](f, limit) + require.NoError(t, err) + t.Cleanup(func() { assert.NoError(t, bl.Close()) }) + + ctx := t.Context() + ctx = contexts.WithCRE(ctx, tt.cre) + + denied := config.Timestamp(time.Date(2022, 1, 1, 0, 0, 0, 0, time.UTC).Add(11 * time.Hour).Unix()) + var errBound ErrorRangeLimited[config.Timestamp] + if assert.ErrorAs(t, bl.Check(ctx, denied), &errBound) { + assert.Equal(t, "foo.bar", errBound.Key) + assert.Equal(t, tt.scope, errBound.Scope) + assert.Equal(t, limit.DefaultValue, errBound.Limit) + assert.Equal(t, denied, errBound.Amount) + } + for _, v := range []time.Time{ + time.Date(2022, 1, 1, 0, 0, 0, 0, time.UTC), + time.Date(2022, 1, 1, 0, 0, 0, 0, time.UTC).Add(5 * time.Hour), + } { + assert.NoError(t, bl.Check(ctx, config.Timestamp(v.Unix()))) + } + + ms := mc.lastResourceFirstScopeMetric(t) + redactHistogramVals[int64](t, ms, "range.foo.bar.usage") + redactHistogramVals[int64](t, ms, "range.foo.bar.denied") + + attrs := attribute.NewSet(kvsFromScope(ctx, tt.scope)...) + + require.Equal(t, metrics{ + { + Name: "range.foo.bar.lower.limit", + Unit: "s", + Data: metricdata.Gauge[int64]{ + DataPoints: []metricdata.DataPoint[int64]{ + {Attributes: attrs, Value: int64(limit.DefaultValue.Lower)}, + }, + }, + }, + { + Name: "range.foo.bar.upper.limit", + Unit: "s", + Data: metricdata.Gauge[int64]{ + DataPoints: []metricdata.DataPoint[int64]{ + {Attributes: attrs, Value: int64(limit.DefaultValue.Upper)}, + }, + }, + }, + { + Name: "range.foo.bar.usage", + Unit: "s", + Data: metricdata.Histogram[int64]{ + DataPoints: []metricdata.HistogramDataPoint[int64]{ + { + Attributes: attrs, + Count: 2, + Bounds: []float64{0, 5, 10, 25, 50, 75, 100, 250, 500, 750, 1000, 2500, 5000, 7500, 10000}, + BucketCounts: []uint64{0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 2}, + }, + }, + Temporality: metricdata.CumulativeTemporality, + }, + }, + { + Name: "range.foo.bar.denied", + Unit: "s", + Data: metricdata.Histogram[int64]{ + DataPoints: []metricdata.HistogramDataPoint[int64]{ + { + Attributes: attrs, + Count: 1, + Bounds: []float64{0, 5, 10, 25, 50, 75, 100, 250, 500, 750, 1000, 2500, 5000, 7500, 10000}, + BucketCounts: []uint64{0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1}, + }, + }, + Temporality: metricdata.CumulativeTemporality, + }, + }, + }, ms) + }) + } +} diff --git a/pkg/settings/range.go b/pkg/settings/range.go new file mode 100644 index 0000000000..1b6f4b0489 --- /dev/null +++ b/pkg/settings/range.go @@ -0,0 +1,40 @@ +package settings + +import ( + "cmp" + "errors" + "fmt" + "strings" +) + +// Range represents a closed (inclusive) interval of type N. +type Range[N cmp.Ordered] struct { + Lower, Upper N +} + +func (r Range[N]) Contains(n N) bool { + return r.Lower <= n && n <= r.Upper +} + +func (r Range[N]) String() string { + return fmt.Sprintf("[%v,%v]", r.Lower, r.Upper) +} + +// ParseRangeFn return a func for parsing a Range of type N. +func ParseRangeFn[N cmp.Ordered](parseFn func(string) (N, error)) func(s string) (Range[N], error) { + return func(s string) (Range[N], error) { + s = strings.TrimPrefix(s, "[") + s = strings.TrimSuffix(s, "]") + parts := strings.Split(s, ",") + if len(parts) != 2 { + return Range[N]{}, fmt.Errorf("invalid range: must have two comma separated values: %q", s) + } + lower, lerr := parseFn(parts[0]) + upper, uerr := parseFn(parts[1]) + err := errors.Join(lerr, uerr) + if err != nil { + return Range[N]{}, err + } + return Range[N]{Lower: lower, Upper: upper}, nil + } +} diff --git a/pkg/settings/settings.go b/pkg/settings/settings.go index 9ea4bc4870..a2b494aac8 100644 --- a/pkg/settings/settings.go +++ b/pkg/settings/settings.go @@ -117,7 +117,18 @@ func Duration(defaultValue time.Duration) Setting[time.Duration] { } func Time(defaultValue time.Time) Setting[config.Timestamp] { - return NewSetting(config.Timestamp(defaultValue.Unix()), config.ParseTimestamp) + s := NewSetting(config.Timestamp(defaultValue.Unix()), config.ParseTimestamp) + s.Unit = "s" + return s +} + +func TimeRange(lower, upper time.Time) Setting[Range[config.Timestamp]] { + s := NewSetting(Range[config.Timestamp]{ + Lower: config.Timestamp(lower.Unix()), + Upper: config.Timestamp(upper.Unix()), + }, ParseRangeFn(config.ParseTimestamp)) + s.Unit = "s" + return s } func URL(defaultValue *url.URL) Setting[*url.URL] {