Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion pkg/settings/cresettings/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -113,8 +113,9 @@ flowchart
PerWorkflow.ExecutionResponseLimit{{PerWorkflow.ExecutionResponseLimit}}:::bound
PerWorkflow.ExecutionTimestampsEnabled[/PerWorkflow.ExecutionTimestampsEnabled\]:::gate
PerWorkflow.FeatureMultiTriggerExecutionIDsActiveAt[/PerWorkflow.FeatureMultiTriggerExecutionIDsActiveAt\]:::gate
PerWorkflow.FeatureMultiTriggerExecutionIDsActivePeriod[/PerWorkflow.FeatureMultiTriggerExecutionIDsActivePeriod\]:::gate

PerWorkflow.ExecutionTimestampsEnabled-->PerWorkflow.FeatureMultiTriggerExecutionIDsActiveAt-->PerWorkflow.ExecutionTimeout-->PerWorkflow.ExecutionResponseLimit
PerWorkflow.ExecutionTimestampsEnabled-->PerWorkflow.FeatureMultiTriggerExecutionIDsActivePeriod-->PerWorkflow.ExecutionTimeout-->PerWorkflow.ExecutionResponseLimit
end

subgraph ExecutionHelper.GetSecrets
Expand Down
3 changes: 2 additions & 1 deletion pkg/settings/cresettings/defaults.json
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,7 @@
"Secrets": {
"CallLimit": "5"
},
"FeatureMultiTriggerExecutionIDsActiveAt": "2100-01-01 00:00:00 +0000 UTC"
"FeatureMultiTriggerExecutionIDsActiveAt": "2100-01-01 00:00:00 +0000 UTC",
"FeatureMultiTriggerExecutionIDsActivePeriod": "[2100-01-01 00:00:00 +0000 UTC,2101-01-01 00:00:00 +0000 UTC]"
}
}
1 change: 1 addition & 0 deletions pkg/settings/cresettings/defaults.toml
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ WASMSecretsSizeLimit = '1mb'
LogLineLimit = '1kb'
LogEventLimit = '1000'
FeatureMultiTriggerExecutionIDsActiveAt = '2100-01-01 00:00:00 +0000 UTC'
FeatureMultiTriggerExecutionIDsActivePeriod = '[2100-01-01 00:00:00 +0000 UTC,2101-01-01 00:00:00 +0000 UTC]'

[PerWorkflow.ChainAllowed]
Default = 'false'
Expand Down
6 changes: 5 additions & 1 deletion pkg/settings/cresettings/settings.go
Original file line number Diff line number Diff line change
Expand Up @@ -205,6 +205,9 @@ var Default = Schema{
},

FeatureMultiTriggerExecutionIDsActiveAt: Time(time.Date(2100, 1, 1, 0, 0, 0, 0, time.UTC)),
FeatureMultiTriggerExecutionIDsActivePeriod: TimeRange(
time.Date(2100, 1, 1, 0, 0, 0, 0, time.UTC),
time.Date(2101, 1, 1, 0, 0, 0, 0, time.UTC)),
},
}

Expand Down Expand Up @@ -288,7 +291,8 @@ type Workflows struct {
ConfidentialHTTP confidentialHTTP
Secrets secrets

FeatureMultiTriggerExecutionIDsActiveAt Setting[config.Timestamp]
FeatureMultiTriggerExecutionIDsActiveAt Setting[config.Timestamp] // Deprecated
FeatureMultiTriggerExecutionIDsActivePeriod Setting[Range[config.Timestamp]]
}

type cronTrigger struct {
Expand Down
24 changes: 24 additions & 0 deletions pkg/settings/limits/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,30 @@ func (e ErrorBoundLimited[N]) Error() string {
return fmt.Sprintf("%slimited%s: cannot use %v, limit is %v", which, who, e.Amount, e.Limit)
}

type ErrorRangeLimited[N Number] struct {
Key string

Scope settings.Scope
Tenant string

Limit settings.Range[N]
Amount N
}

func (e ErrorRangeLimited[N]) GRPCStatus() *status.Status {
return status.New(codes.ResourceExhausted, e.Error())
}

func (e ErrorRangeLimited[N]) Is(target error) bool {
_, ok := target.(ErrorRangeLimited[N]) //nolint:errcheck // implementing errors.Is
return ok
}

func (e ErrorRangeLimited[N]) Error() string {
which, who := errArgs(e.Key, e.Scope, e.Tenant)
return fmt.Sprintf("%slimited%s: cannot use %v, limited to range %v", which, who, e.Amount, e.Limit)
}

type ErrorQueueFull struct {
Key string

Expand Down
10 changes: 10 additions & 0 deletions pkg/settings/limits/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,16 @@ func MakeLowerBoundLimiter[N Number](f Factory, bound settings.IsSetting[N]) (Bo
return newBoundLimiter(f, bound.GetSpec(), true)
}

// MakeRangeLimiter returns a RangeLimiter for the given bound and configured by the Factory.
// If Meter is set, the following metrics will be emitted
// - range.*.lower.limit - gauge
// - range.*.upper.limit - gauge
// - range.*.usage - histogram
// - range.*.denied - histogram
func MakeRangeLimiter[N Number](f Factory, bound settings.IsSetting[settings.Range[N]]) (RangeLimiter[N], error) {
return newRangeLimiter(f, bound.GetSpec())
}

// MakeQueueLimiter returns a QueueLimiter for the given limit and configured by the Factory.
// If Meter is set, the following metrics will be emitted
// - queue.*.limit - int gauge
Expand Down
245 changes: 245 additions & 0 deletions pkg/settings/limits/range.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,245 @@
package limits

import (
"context"
"errors"
"fmt"
"sync"
"sync/atomic"

"go.opentelemetry.io/otel/metric"

"github.com/smartcontractkit/chainlink-common/pkg/contexts"
"github.com/smartcontractkit/chainlink-common/pkg/logger"
"github.com/smartcontractkit/chainlink-common/pkg/services"
"github.com/smartcontractkit/chainlink-common/pkg/settings"
)

// BoundLimiter is a limiter for simple bounds checks.
type RangeLimiter[N Number] interface {
Limiter[settings.Range[N]]
// Check returns ErrorBoundLimited if the value is above the limit.
Check(context.Context, N) error
}

// NewRangeLimiter returns a RangeLimiter with the given lower bounds.
func NewRangeLimiter[N Number](bounds settings.Range[N]) RangeLimiter[N] {
return &simpleRangeLimiter[N]{bounds: bounds}
}

var _ RangeLimiter[int64] = &simpleRangeLimiter[int64]{}

type simpleRangeLimiter[N Number] struct {
bounds settings.Range[N]
closed atomic.Bool
}

func (s *simpleRangeLimiter[N]) Close() error { s.closed.Store(true); return nil }

func (s *simpleRangeLimiter[N]) Check(ctx context.Context, n N) error {
if s.closed.Load() {
return errors.New("closed")
}
if !s.bounds.Contains(n) {
return ErrorRangeLimited[N]{Limit: s.bounds, Amount: n}
}
return nil
}
func (s *simpleRangeLimiter[N]) Limit(ctx context.Context) (settings.Range[N], error) {
return s.bounds, nil
}

func newRangeLimiter[N Number](f Factory, bound settings.SettingSpec[settings.Range[N]]) (RangeLimiter[N], error) {
b := &rangeLimiter[N]{
updater: newUpdater[settings.Range[N]](nil, func(ctx context.Context) (settings.Range[N], error) {
return bound.GetOrDefault(ctx, f.Settings)
}, nil),
key: bound.GetKey(),
scope: bound.GetScope(),
}
b.updater.recordLimit = func(ctx context.Context, n settings.Range[N]) { b.recordBound(ctx, n) }

if f.Meter != nil {
if b.key == "" {
return nil, errors.New("metrics require Key to be set")
}
newGauge, newHist := metricConstructors[N](f.Meter, bound.GetUnit())

key := bound.GetKey()
lowerLimitGauge, err := newGauge("range." + key + ".lower.limit")
if err != nil {
return nil, err
}
upperLimitGauge, err := newGauge("range." + key + ".upper.limit")
if err != nil {
return nil, err
}
b.recordBound = func(ctx context.Context, value settings.Range[N], options ...metric.RecordOption) {
lowerLimitGauge.Record(ctx, value.Lower, options...)
upperLimitGauge.Record(ctx, value.Upper, options...)
}
usageHist, err := newHist("range." + key + ".usage")
if err != nil {
return nil, err
}
b.recordUsage = func(ctx context.Context, value N, options ...metric.RecordOption) {
usageHist.Record(ctx, value, options...)
}
deniedHist, err := newHist("range." + key + ".denied")
if err != nil {
return nil, err
}
b.recordDenied = func(ctx context.Context, value N, options ...metric.RecordOption) {
deniedHist.Record(ctx, value, options...)
}
} else {
b.recordBound = func(ctx context.Context, value settings.Range[N], options ...metric.RecordOption) {}
b.recordUsage = func(ctx context.Context, value N, options ...metric.RecordOption) {}
b.recordDenied = func(ctx context.Context, value N, options ...metric.RecordOption) {}
}

if f.Logger != nil {
b.lggr = logger.Sugared(f.Logger).Named("RangeLimiter").With("key", bound.GetKey())
}

if f.Settings != nil {
if r, ok := f.Settings.(settings.Registry); ok {
b.subFn = func(ctx context.Context) (<-chan settings.Update[settings.Range[N]], func()) {
return bound.Subscribe(ctx, r)
}
}
}

if bound.GetScope() == settings.ScopeGlobal {
go b.updateLoop(context.Background())
}

return b, nil
}

type rangeLimiter[N Number] struct {
*updater[settings.Range[N]]

key string // optional
scope settings.Scope

recordBound func(ctx context.Context, value settings.Range[N], options ...metric.RecordOption)
recordUsage func(ctx context.Context, value N, options ...metric.RecordOption)
recordDenied func(ctx context.Context, value N, options ...metric.RecordOption)

// opt: reap after period of non-use
updaters sync.Map // map[string]*updater[Range[N]]
wg services.WaitGroup // tracks and blocks updaters background routines
}

func (b *rangeLimiter[N]) Close() (err error) {
b.wg.Wait()

// cleanup
if b.scope == settings.ScopeGlobal {
return b.updater.Close()
} else {
b.updaters.Range(func(key, value any) bool {
// opt: parallelize
err = errors.Join(err, value.(*updater[settings.Range[N]]).Close())
return true
})
}
return
}

// Deprecated: use TryCleanup
func (b *rangeLimiter[N]) EvictTenant(tenant string) error {
v, loaded := b.updaters.LoadAndDelete(tenant)
if !loaded {
return nil
}
return v.(*updater[settings.Range[N]]).Close()
}

func (b *rangeLimiter[N]) cleanup(ctx context.Context) {
tenant := b.scope.Value(ctx)
if tenant == "" {
b.lggr.Warnw("Unable to cleanup scoped bounds limiter due to missing tenant", "scope", b.scope)
return
}
v, loaded := b.updaters.LoadAndDelete(tenant)
if !loaded {
return
}
if err := v.(*updater[settings.Range[N]]).Close(); err != nil {
b.lggr.Errorw("Failed to close bounds limiter", "tenant", tenant, "err", err)
}
}

func (b *rangeLimiter[N]) Check(ctx context.Context, amount N) error {
if err := b.wg.TryAdd(1); err != nil {
return err
}
defer b.wg.Done()

tenant, bound, err := b.get(ctx)
if err != nil {
return err
}
if tenant == "" && b.scope != settings.ScopeGlobal {
return nil // fail open
}
if !bound.Contains(amount) {
b.recordDenied(ctx, amount, withScope(ctx, b.scope))
return ErrorRangeLimited[N]{Key: b.key, Scope: b.scope, Tenant: tenant, Limit: bound, Amount: amount}
}

b.recordUsage(ctx, amount, withScope(ctx, b.scope))
return nil
}

func (b *rangeLimiter[N]) Limit(ctx context.Context) (settings.Range[N], error) {
var zero settings.Range[N]
if err := b.wg.TryAdd(1); err != nil {
return zero, err
}
defer b.wg.Done()

tenant, bound, err := b.get(ctx)
if err != nil {
return zero, err
}
if tenant == "" && b.scope != settings.ScopeGlobal {
return zero, nil // fail open
}

return bound, nil
}

func (b *rangeLimiter[N]) get(ctx context.Context) (tenant string, bound settings.Range[N], err error) {
if b.scope != settings.ScopeGlobal {
tenant = b.scope.Value(ctx)
if tenant == "" {
if !b.scope.IsTenantRequired() {
kvs := contexts.CREValue(ctx).LoggerKVs()
b.lggr.Warnw("Unable to get scoped bounds limit due to missing tenant: failing open", append([]any{"scope", b.scope}, kvs...)...)
return
}
err = fmt.Errorf("unable to get scoped bounds limit due to missing tenant for scope: %s", b.scope)
return
}

u := newUpdater(b.lggr, b.getLimitFn, b.subFn)
actual, loaded := b.updaters.LoadOrStore(tenant, u)
creCtx := contexts.WithCRE(ctx, b.scope.RoundCRE(contexts.CREValue(ctx)))
if !loaded {
go u.updateLoop(creCtx)
} else {
u = actual.(*updater[settings.Range[N]])
u.updateCtx(creCtx)
}
}

bound, err = b.getLimitFn(ctx)
if err != nil {
b.lggr.Errorw("Failed to get limit. Using default value", "default", bound, "err", err)
}
b.recordBound(ctx, bound, withScope(ctx, b.scope))
return
}
Loading
Loading