diff --git a/cmd/katalyst-agent/app/options/dynamic/dynamic_base.go b/cmd/katalyst-agent/app/options/dynamic/dynamic_base.go index 5f06559c8d..47eb1ebfc3 100644 --- a/cmd/katalyst-agent/app/options/dynamic/dynamic_base.go +++ b/cmd/katalyst-agent/app/options/dynamic/dynamic_base.go @@ -24,6 +24,7 @@ import ( "github.com/kubewharf/katalyst-core/cmd/katalyst-agent/app/options/dynamic/irqtuning" "github.com/kubewharf/katalyst-core/cmd/katalyst-agent/app/options/dynamic/strategygroup" "github.com/kubewharf/katalyst-core/cmd/katalyst-agent/app/options/dynamic/tmo" + "github.com/kubewharf/katalyst-core/cmd/katalyst-agent/app/options/dynamic/userwatermark" "github.com/kubewharf/katalyst-core/pkg/config/agent/dynamic" ) @@ -32,6 +33,7 @@ type DynamicOptions struct { *tmo.TransparentMemoryOffloadingOptions *strategygroup.StrategyGroupOptions *irqtuning.IRQTuningOptions + *userwatermark.UserWatermarkOptions } func NewDynamicOptions() *DynamicOptions { @@ -40,6 +42,7 @@ func NewDynamicOptions() *DynamicOptions { TransparentMemoryOffloadingOptions: tmo.NewTransparentMemoryOffloadingOptions(), StrategyGroupOptions: strategygroup.NewStrategyGroupOptions(), IRQTuningOptions: irqtuning.NewIRQTuningOptions(), + UserWatermarkOptions: userwatermark.NewUserWatermarkOptions(), } } @@ -48,6 +51,7 @@ func (o *DynamicOptions) AddFlags(fss *cliflag.NamedFlagSets) { o.TransparentMemoryOffloadingOptions.AddFlags(fss) o.StrategyGroupOptions.AddFlags(fss) o.IRQTuningOptions.AddFlags(fss) + o.UserWatermarkOptions.AddFlags(fss) } func (o *DynamicOptions) ApplyTo(c *dynamic.Configuration) error { @@ -56,5 +60,6 @@ func (o *DynamicOptions) ApplyTo(c *dynamic.Configuration) error { errList = append(errList, o.TransparentMemoryOffloadingOptions.ApplyTo(c.TransparentMemoryOffloadingConfiguration)) errList = append(errList, o.StrategyGroupOptions.ApplyTo(c.StrategyGroupConfiguration)) errList = append(errList, o.IRQTuningOptions.ApplyTo(c.IRQTuningConfiguration)) + errList = append(errList, o.UserWatermarkOptions.ApplyTo(c.UserWatermarkConfiguration)) return errors.NewAggregate(errList) } diff --git a/cmd/katalyst-agent/app/options/dynamic/userwatermark/user_watermark_base.go b/cmd/katalyst-agent/app/options/dynamic/userwatermark/user_watermark_base.go new file mode 100644 index 0000000000..e06e9982e6 --- /dev/null +++ b/cmd/katalyst-agent/app/options/dynamic/userwatermark/user_watermark_base.go @@ -0,0 +1,58 @@ +/* +Copyright 2022 The Katalyst Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package userwatermark + +import ( + cliflag "k8s.io/component-base/cli/flag" + + defaultOptions "github.com/kubewharf/katalyst-core/cmd/katalyst-agent/app/options/dynamic/userwatermark/userwatermarkdefault" + "github.com/kubewharf/katalyst-core/pkg/config/agent/dynamic/userwatermark" +) + +type UserWatermarkOptions struct { + EnableReclaimer bool + ReconcileInterval int64 + ServiceLabel string + *defaultOptions.DefaultOptions +} + +func NewUserWatermarkOptions() *UserWatermarkOptions { + return &UserWatermarkOptions{ + DefaultOptions: defaultOptions.NewDefaultOptions(), + } +} + +func (o *UserWatermarkOptions) AddFlags(fss *cliflag.NamedFlagSets) { + fs := fss.FlagSet("user-watermark") + + fs.BoolVar(&o.EnableReclaimer, "enable-user-watermark-reclaimer", o.EnableReclaimer, + "whether to enable memory reclaimer") + fs.Int64Var(&o.ReconcileInterval, "user-watermark-reconcile-interval", o.ReconcileInterval, + "the interval to reconcile memory") + fs.StringVar(&o.ServiceLabel, "user-watermark-pod-service-label", o.ServiceLabel, + "the service label to filter") + + o.DefaultOptions.AddFlags(fss) +} + +func (o *UserWatermarkOptions) ApplyTo(c *userwatermark.UserWatermarkConfiguration) error { + c.EnableReclaimer = o.EnableReclaimer + c.ReconcileInterval = o.ReconcileInterval + c.ServiceLabel = o.ServiceLabel + + return o.DefaultOptions.ApplyTo(c.DefaultConfig) +} diff --git a/cmd/katalyst-agent/app/options/dynamic/userwatermark/user_watermark_base_test.go b/cmd/katalyst-agent/app/options/dynamic/userwatermark/user_watermark_base_test.go new file mode 100644 index 0000000000..4f6fcafcf6 --- /dev/null +++ b/cmd/katalyst-agent/app/options/dynamic/userwatermark/user_watermark_base_test.go @@ -0,0 +1,67 @@ +/* +Copyright 2022 The Katalyst Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package userwatermark + +import ( + "testing" + + "github.com/stretchr/testify/assert" + cliflag "k8s.io/component-base/cli/flag" + + dynamicuserwm "github.com/kubewharf/katalyst-core/pkg/config/agent/dynamic/userwatermark" +) + +func TestNewUserWatermarkOptions(t *testing.T) { + t.Parallel() + + opts := NewUserWatermarkOptions() + assert.NotNil(t, opts) + assert.NotNil(t, opts.DefaultOptions) +} + +func TestUserWatermarkOptions_AddFlags(t *testing.T) { + t.Parallel() + + opts := NewUserWatermarkOptions() + fss := &cliflag.NamedFlagSets{} + + opts.AddFlags(fss) + fs := fss.FlagSet("user-watermark") + assert.NotNil(t, fs.Lookup("enable-user-watermark-reclaimer")) + assert.NotNil(t, fs.Lookup("user-watermark-reconcile-interval")) + assert.NotNil(t, fs.Lookup("user-watermark-pod-service-label")) +} + +func TestUserWatermarkOptions_ApplyTo(t *testing.T) { + t.Parallel() + + opts := &UserWatermarkOptions{ + EnableReclaimer: true, + ReconcileInterval: 10, + ServiceLabel: "service-label", + DefaultOptions: NewUserWatermarkOptions().DefaultOptions, + } + + conf := dynamicuserwm.NewUserWatermarkConfiguration() + err := opts.ApplyTo(conf) + + assert.NoError(t, err) + assert.True(t, conf.EnableReclaimer) + assert.Equal(t, int64(10), conf.ReconcileInterval) + assert.Equal(t, "service-label", conf.ServiceLabel) + assert.NotNil(t, conf.DefaultConfig) +} diff --git a/cmd/katalyst-agent/app/options/dynamic/userwatermark/userwatermarkdefault/user_watermark_default_config.go b/cmd/katalyst-agent/app/options/dynamic/userwatermark/userwatermarkdefault/user_watermark_default_config.go new file mode 100644 index 0000000000..b080f1e709 --- /dev/null +++ b/cmd/katalyst-agent/app/options/dynamic/userwatermark/userwatermarkdefault/user_watermark_default_config.go @@ -0,0 +1,94 @@ +/* +Copyright 2022 The Katalyst Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package userwatermarkdefault + +import ( + "time" + + cliflag "k8s.io/component-base/cli/flag" + + "github.com/kubewharf/katalyst-api/pkg/apis/config/v1alpha1" + "github.com/kubewharf/katalyst-core/pkg/config/agent/dynamic/userwatermark" +) + +type DefaultOptions struct { + EnableMemoryReclaim bool + ReclaimInterval int64 + + ScaleFactor uint64 + SingleReclaimFactor float64 + // SingleReclaimSize is the max memory reclaim size in one reclaim cycle + SingleReclaimSize uint64 + BackoffDuration time.Duration + FeedbackPolicy string + ReclaimFailedThreshold uint64 + FailureFreezePeriod time.Duration + + PsiAvg60Threshold float64 + ReclaimAccuracyTarget float64 + ReclaimScanEfficiencyTarget float64 +} + +func NewDefaultOptions() *DefaultOptions { + return &DefaultOptions{} +} + +func (o *DefaultOptions) AddFlags(fss *cliflag.NamedFlagSets) { + fs := fss.FlagSet("user-watermark") + + fs.BoolVar(&o.EnableMemoryReclaim, "enable-user-watermark-memory-reclaim", o.EnableMemoryReclaim, + "whether to enable memory reclaim") + fs.Int64Var(&o.ReclaimInterval, "user-watermark-reclaim-interval", o.ReclaimInterval, + "the interval to reclaim memory") + fs.Uint64Var(&o.ScaleFactor, "user-watermark-scale-factor", o.ScaleFactor, + "the scale factor to reclaim memory") + fs.Uint64Var(&o.SingleReclaimSize, "user-watermark-single-reclaim-size", o.SingleReclaimSize, + "the max memory reclaim size in one reclaim cycle") + fs.Float64Var(&o.SingleReclaimFactor, "user-watermark-single-reclaim-factor", o.SingleReclaimFactor, + "the factor to reclaim memory") + fs.DurationVar(&o.BackoffDuration, "user-watermark-backoff-duration", o.BackoffDuration, + "the duration to backoff after reclaim failed") + fs.StringVar(&o.FeedbackPolicy, "user-watermark-feedback-policy", o.FeedbackPolicy, + "the feedback policy to reclaim memory") + fs.Uint64Var(&o.ReclaimFailedThreshold, "user-watermark-reclaim-failed-threshold", o.ReclaimFailedThreshold, + "the threshold to trigger reclaim failed") + fs.DurationVar(&o.FailureFreezePeriod, "user-watermark-failure-freeze-period", o.FailureFreezePeriod, + "the period to freeze reclaim after trigger reclaim failed") + fs.Float64Var(&o.PsiAvg60Threshold, "user-watermark-psi-avg60-threshold", o.PsiAvg60Threshold, + "the threshold to trigger reclaim failed") + fs.Float64Var(&o.ReclaimAccuracyTarget, "user-watermark-reclaim-accuracy-target", o.ReclaimAccuracyTarget, + "the target reclaim accuracy") + fs.Float64Var(&o.ReclaimScanEfficiencyTarget, "user-watermark-reclaim-scan-efficiency-target", o.ReclaimScanEfficiencyTarget, + "the target reclaim scan efficiency") +} + +func (o *DefaultOptions) ApplyTo(c *userwatermark.UserWatermarkDefaultConfiguration) error { + c.EnableMemoryReclaim = o.EnableMemoryReclaim + c.ReclaimInterval = o.ReclaimInterval + + c.ScaleFactor = o.ScaleFactor + c.SingleReclaimSize = o.SingleReclaimSize + c.SingleReclaimFactor = o.SingleReclaimFactor + c.BackoffDuration = o.BackoffDuration + c.FeedbackPolicy = v1alpha1.UserWatermarkPolicyName(o.FeedbackPolicy) + c.ReclaimFailedThreshold = o.ReclaimFailedThreshold + c.FailureFreezePeriod = o.FailureFreezePeriod + c.PsiAvg60Threshold = o.PsiAvg60Threshold + c.ReclaimAccuracyTarget = o.ReclaimAccuracyTarget + c.ReclaimScanEfficiencyTarget = o.ReclaimScanEfficiencyTarget + return nil +} diff --git a/cmd/katalyst-agent/app/options/dynamic/userwatermark/userwatermarkdefault/user_watermark_default_config_test.go b/cmd/katalyst-agent/app/options/dynamic/userwatermark/userwatermarkdefault/user_watermark_default_config_test.go new file mode 100644 index 0000000000..02cc2a92a1 --- /dev/null +++ b/cmd/katalyst-agent/app/options/dynamic/userwatermark/userwatermarkdefault/user_watermark_default_config_test.go @@ -0,0 +1,96 @@ +/* +Copyright 2022 The Katalyst Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package userwatermarkdefault + +import ( + "testing" + + "github.com/stretchr/testify/assert" + cliflag "k8s.io/component-base/cli/flag" + + v1alpha1 "github.com/kubewharf/katalyst-api/pkg/apis/config/v1alpha1" + dynamicuserwm "github.com/kubewharf/katalyst-core/pkg/config/agent/dynamic/userwatermark" +) + +func TestNewDefaultOptions(t *testing.T) { + t.Parallel() + + opts := NewDefaultOptions() + assert.NotNil(t, opts) +} + +func TestDefaultOptions_AddFlags(t *testing.T) { + t.Parallel() + + opts := NewDefaultOptions() + fss := &cliflag.NamedFlagSets{} + + opts.AddFlags(fss) + + fs := fss.FlagSet("user-watermark") + + assert.NotNil(t, fs.Lookup("enable-user-watermark-memory-reclaim")) + assert.NotNil(t, fs.Lookup("user-watermark-reclaim-interval")) + assert.NotNil(t, fs.Lookup("user-watermark-scale-factor")) + assert.NotNil(t, fs.Lookup("user-watermark-single-reclaim-size")) + assert.NotNil(t, fs.Lookup("user-watermark-single-reclaim-factor")) + assert.NotNil(t, fs.Lookup("user-watermark-backoff-duration")) + + assert.NotNil(t, fs.Lookup("user-watermark-feedback-policy")) + assert.NotNil(t, fs.Lookup("user-watermark-reclaim-failed-threshold")) + assert.NotNil(t, fs.Lookup("user-watermark-failure-freeze-period")) + assert.NotNil(t, fs.Lookup("user-watermark-psi-avg60-threshold")) + assert.NotNil(t, fs.Lookup("user-watermark-reclaim-accuracy-target")) + assert.NotNil(t, fs.Lookup("user-watermark-reclaim-scan-efficiency-target")) +} + +func TestDefaultOptions_ApplyTo(t *testing.T) { + t.Parallel() + + opts := &DefaultOptions{ + EnableMemoryReclaim: true, + ReclaimInterval: 5, + ScaleFactor: 200, + SingleReclaimFactor: 0.5, + SingleReclaimSize: 1 << 20, + BackoffDuration: 10, + FeedbackPolicy: string(v1alpha1.UserWatermarkPolicyNamePSI), + + ReclaimFailedThreshold: 3, + FailureFreezePeriod: 20, + PsiAvg60Threshold: 1.0, + ReclaimAccuracyTarget: 0.8, + ReclaimScanEfficiencyTarget: 0.5, + } + + conf := &dynamicuserwm.UserWatermarkDefaultConfiguration{} + err := opts.ApplyTo(conf) + + assert.NoError(t, err) + assert.True(t, conf.EnableMemoryReclaim) + assert.Equal(t, int64(5), conf.ReclaimInterval) + assert.Equal(t, uint64(200), conf.ScaleFactor) + assert.Equal(t, 0.5, conf.SingleReclaimFactor) + assert.Equal(t, uint64(1<<20), conf.SingleReclaimSize) + assert.Equal(t, opts.BackoffDuration, conf.BackoffDuration) + assert.Equal(t, v1alpha1.UserWatermarkPolicyNamePSI, conf.FeedbackPolicy) + assert.Equal(t, uint64(3), conf.ReclaimFailedThreshold) + assert.Equal(t, opts.FailureFreezePeriod, conf.FailureFreezePeriod) + assert.Equal(t, opts.PsiAvg60Threshold, conf.PsiAvg60Threshold) + assert.Equal(t, opts.ReclaimAccuracyTarget, conf.ReclaimAccuracyTarget) + assert.Equal(t, opts.ReclaimScanEfficiencyTarget, conf.ReclaimScanEfficiencyTarget) +} diff --git a/go.mod b/go.mod index 07c83545b7..f51080715b 100644 --- a/go.mod +++ b/go.mod @@ -175,6 +175,7 @@ require ( ) replace ( + github.com/kubewharf/katalyst-api => github.com/JulyWindK/katalyst-api v1.6.9 k8s.io/api => k8s.io/api v0.24.6 k8s.io/apiextensions-apiserver => k8s.io/apiextensions-apiserver v0.24.6 k8s.io/apimachinery => k8s.io/apimachinery v0.24.6 diff --git a/go.sum b/go.sum index 62b18af565..834b9597f8 100644 --- a/go.sum +++ b/go.sum @@ -62,6 +62,8 @@ github.com/Djarvur/go-err113 v0.0.0-20200511133814-5174e21577d5/go.mod h1:4UJr5H github.com/GoogleCloudPlatform/k8s-cloud-provider v1.16.1-0.20210702024009-ea6160c1d0e3/go.mod h1:8XasY4ymP2V/tn2OOV9ZadmiTE1FIB/h3W+yNlPttKw= github.com/HdrHistogram/hdrhistogram-go v1.0.0/go.mod h1:YzE1EgsuAz8q9lfGdlxBZo2Ma655+PfKp2mlzcAqIFw= github.com/JeffAshton/win_pdh v0.0.0-20161109143554-76bb4ee9f0ab/go.mod h1:3VYc5hodBMJ5+l/7J4xAyMeuM2PNuepvHlGs8yilUCA= +github.com/JulyWindK/katalyst-api v1.6.9 h1:NRQOSQmeg6A+EuNpc8WgzMYceFKeqVlv4OnKHvhmKZY= +github.com/JulyWindK/katalyst-api v1.6.9/go.mod h1:BZMVGVl3EP0eCn5xsDgV41/gjYkoh43abIYxrB10e3k= github.com/Knetic/govaluate v3.0.1-0.20171022003610-9aa49832a739+incompatible/go.mod h1:r7JcOSlj0wfOMncg0iLm8Leh48TZaKVeNIfJntJ2wa0= github.com/MakeNowJust/heredoc v0.0.0-20170808103936-bb23615498cd/go.mod h1:64YHyfSL2R96J44Nlwm39UHepQbyR5q10x7iYa1ks2E= github.com/Masterminds/semver v1.5.0/go.mod h1:MB6lktGJrhw8PrUyiEoblNEGEQ+RzHPF078ddwwvV3Y= @@ -574,8 +576,6 @@ github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ= github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI= github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY= github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE= -github.com/kubewharf/katalyst-api v0.5.9-0.20260108125536-85e136f5902c h1:ohKHA5TOlW9487menKnKH2M14LeIq1xQ1yW4xp8x9o8= -github.com/kubewharf/katalyst-api v0.5.9-0.20260108125536-85e136f5902c/go.mod h1:BZMVGVl3EP0eCn5xsDgV41/gjYkoh43abIYxrB10e3k= github.com/kubewharf/kubelet v1.24.6-kubewharf-pre.1 h1:pzU37yZWrOBosNX+Laay9Ess0Bff/rsWanBxbdXnHnM= github.com/kubewharf/kubelet v1.24.6-kubewharf-pre.1/go.mod h1:MxbSZUx3wXztFneeelwWWlX7NAAStJ6expqq7gY2J3c= github.com/kyoh86/exportloopref v0.1.7/go.mod h1:h1rDl2Kdj97+Kwh4gdz3ujE7XHmH51Q0lUiZ1z4NLj8= diff --git a/pkg/agent/qrm-plugins/advisorsvc/advisor_svc.pb.go b/pkg/agent/qrm-plugins/advisorsvc/advisor_svc.pb.go index 76a9e1120f..7a6902c4fe 100644 --- a/pkg/agent/qrm-plugins/advisorsvc/advisor_svc.pb.go +++ b/pkg/agent/qrm-plugins/advisorsvc/advisor_svc.pb.go @@ -20,18 +20,19 @@ package advisorsvc import ( context "context" fmt "fmt" + io "io" + math "math" + math_bits "math/bits" + reflect "reflect" + strings "strings" + _ "github.com/gogo/protobuf/gogoproto" proto "github.com/gogo/protobuf/proto" github_com_gogo_protobuf_sortkeys "github.com/gogo/protobuf/sortkeys" grpc "google.golang.org/grpc" codes "google.golang.org/grpc/codes" status "google.golang.org/grpc/status" - io "io" v1alpha1 "k8s.io/kubelet/pkg/apis/resourceplugin/v1alpha1" - math "math" - math_bits "math/bits" - reflect "reflect" - strings "strings" ) // Reference imports to suppress errors if they are not otherwise used. diff --git a/pkg/agent/qrm-plugins/memory/dynamicpolicy/memoryadvisor/types.go b/pkg/agent/qrm-plugins/memory/dynamicpolicy/memoryadvisor/types.go index 3fb7d3bb49..6e71c1deb0 100644 --- a/pkg/agent/qrm-plugins/memory/dynamicpolicy/memoryadvisor/types.go +++ b/pkg/agent/qrm-plugins/memory/dynamicpolicy/memoryadvisor/types.go @@ -27,6 +27,7 @@ const ( ControlKnobKeySwapMax MemoryControlKnobName = "swap_max" ControlKnowKeyMemoryOffloading MemoryControlKnobName = "memory_offloading" ControlKnobKeyMemoryNUMAHeadroom MemoryControlKnobName = "memory_numa_headroom" + ControlKnobKeyMemoryReclaim MemoryControlKnobName = "memory_reclaim" ) type MemoryNUMAHeadroom map[int]int64 diff --git a/pkg/agent/qrm-plugins/memory/dynamicpolicy/policy.go b/pkg/agent/qrm-plugins/memory/dynamicpolicy/policy.go index a4e0dcff75..e88c580532 100644 --- a/pkg/agent/qrm-plugins/memory/dynamicpolicy/policy.go +++ b/pkg/agent/qrm-plugins/memory/dynamicpolicy/policy.go @@ -43,6 +43,7 @@ import ( "github.com/kubewharf/katalyst-core/pkg/agent/qrm-plugins/memory/dynamicpolicy/oom" memoryreactor "github.com/kubewharf/katalyst-core/pkg/agent/qrm-plugins/memory/dynamicpolicy/reactor" "github.com/kubewharf/katalyst-core/pkg/agent/qrm-plugins/memory/dynamicpolicy/state" + "github.com/kubewharf/katalyst-core/pkg/agent/qrm-plugins/memory/dynamicpolicy/userwatermark" "github.com/kubewharf/katalyst-core/pkg/agent/qrm-plugins/memory/handlers/fragmem" "github.com/kubewharf/katalyst-core/pkg/agent/qrm-plugins/memory/handlers/hostwatermark" "github.com/kubewharf/katalyst-core/pkg/agent/qrm-plugins/memory/handlers/logcache" @@ -57,6 +58,7 @@ import ( "github.com/kubewharf/katalyst-core/pkg/metaserver" "github.com/kubewharf/katalyst-core/pkg/metrics" "github.com/kubewharf/katalyst-core/pkg/util/asyncworker" + "github.com/kubewharf/katalyst-core/pkg/util/cgroup/common" "github.com/kubewharf/katalyst-core/pkg/util/general" "github.com/kubewharf/katalyst-core/pkg/util/machine" "github.com/kubewharf/katalyst-core/pkg/util/metric" @@ -146,6 +148,7 @@ type DynamicPolicy struct { enableSettingSockMem bool enableSettingFragMem bool enableSettingHostWatermark bool + enableUserWatermark bool enableMemoryAdvisor bool getAdviceInterval time.Duration memoryAdvisorSocketAbsPath string @@ -225,6 +228,7 @@ func NewDynamicPolicy(agentCtx *agent.GenericContext, conf *config.Configuration enableSettingFragMem: conf.EnableSettingFragMem, enableSettingHostWatermark: conf.EnableSettingHostWatermark, enableMemoryAdvisor: conf.EnableMemoryAdvisor, + enableUserWatermark: conf.EnableUserWatermark, getAdviceInterval: conf.GetAdviceInterval, memoryAdvisorSocketAbsPath: conf.MemoryAdvisorSocketAbsPath, memoryPluginSocketAbsPath: conf.MemoryPluginSocketAbsPath, @@ -464,6 +468,13 @@ func (p *DynamicPolicy) Start() (err error) { } } + if p.enableUserWatermark && common.CheckCgroup2UnifiedMode() { + general.Infof("setUserWatermark enabled") + + userWatermarkReclaimManager := userwatermark.NewUserWatermarkReclaimManager(p.qosConfig, p.dynamicConf, p.emitter, p.metaServer) + go userWatermarkReclaimManager.Run(p.stopCh) + } + go wait.Until(func() { periodicalhandler.ReadyToStartHandlersByGroup(qrm.QRMMemoryPluginPeriodicalHandlerGroupName) }, 5*time.Second, p.stopCh) diff --git a/pkg/agent/qrm-plugins/memory/dynamicpolicy/userwatermark/calculator.go b/pkg/agent/qrm-plugins/memory/dynamicpolicy/userwatermark/calculator.go new file mode 100644 index 0000000000..c51c6997de --- /dev/null +++ b/pkg/agent/qrm-plugins/memory/dynamicpolicy/userwatermark/calculator.go @@ -0,0 +1,111 @@ +/* +Copyright 2022 The Katalyst Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package userwatermark + +import ( + "bufio" + "math" + "os" + "strconv" + "strings" + + "github.com/kubewharf/katalyst-core/pkg/util/cgroup/common" + "github.com/kubewharf/katalyst-core/pkg/util/general" +) + +type WatermarkCalculator struct { + SwapEnabled bool + CGroupPath string + WatermarkScaleFactor uint64 + SingleReclaimFactor float64 + SingleReclaimSize uint64 +} + +func NewMemoryWatermarkCalculator(cgroupPath string, watermarkScaleFactor uint64, singleReclaimFactor float64, singleReclaimSize uint64) *WatermarkCalculator { + return &WatermarkCalculator{ + SwapEnabled: isSwapEnabled(), + CGroupPath: cgroupPath, + WatermarkScaleFactor: watermarkScaleFactor, + SingleReclaimFactor: singleReclaimFactor, + SingleReclaimSize: singleReclaimSize, + } +} + +func (wmc *WatermarkCalculator) GetWatermark(capacity uint64) (uint64, uint64) { + return wmc.GetLowWatermark(capacity), wmc.GetHighWatermark(capacity) +} + +func (wmc *WatermarkCalculator) GetLowWatermark(capacity uint64) uint64 { + return uint64(float64(capacity * wmc.WatermarkScaleFactor / 10000)) +} + +func (wmc *WatermarkCalculator) GetHighWatermark(capacity uint64) uint64 { + return uint64(float64(capacity * 2 * wmc.WatermarkScaleFactor / 10000)) +} + +func (wmc *WatermarkCalculator) GetReclaimTarget(memLimit, memUsage uint64, reclaimableMax uint64) uint64 { + highWatermark := wmc.GetHighWatermark(memLimit) + reclaimTarget := highWatermark - (memLimit - memUsage) + + // calculate the reclaimTarget for the current container + return general.MinUInt64(reclaimableMax, reclaimTarget) +} + +func (wmc *WatermarkCalculator) GetReclaimSingleStepMax(memStats common.MemoryStats) uint64 { + return general.MinUInt64(wmc.SingleReclaimSize, uint64(math.Ceil(wmc.SingleReclaimFactor*float64(memStats.FileCache)))) +} + +func (wmc *WatermarkCalculator) GetReclaimMax(memStats common.MemoryStats) uint64 { + reclaimable := memStats.InactiveFile + memStats.ActiveFile + + if wmc.SwapEnabled { + reclaimable += memStats.InactiveAnno + memStats.ActiveAnno + } + + return reclaimable +} + +// TODO 优化 +func isSwapEnabled() bool { + file, err := os.Open("/proc/swaps") + if err != nil { + return false + } + defer file.Close() + + scanner := bufio.NewScanner(file) + + headerLine := true + for scanner.Scan() { + if headerLine { + headerLine = false + continue + } + + fields := strings.Fields(scanner.Text()) + if len(fields) < 3 { + continue + } + + sizeKiB, err := strconv.ParseUint(fields[2], 10, 64) + if err == nil && sizeKiB > 0 { + return true + } + } + + return false +} diff --git a/pkg/agent/qrm-plugins/memory/dynamicpolicy/userwatermark/calculator_test.go b/pkg/agent/qrm-plugins/memory/dynamicpolicy/userwatermark/calculator_test.go new file mode 100644 index 0000000000..ffc872b1de --- /dev/null +++ b/pkg/agent/qrm-plugins/memory/dynamicpolicy/userwatermark/calculator_test.go @@ -0,0 +1,146 @@ +/* +Copyright 2022 The Katalyst Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package userwatermark + +import ( + "sync" + "testing" + + "github.com/stretchr/testify/assert" + + "github.com/kubewharf/katalyst-core/pkg/util/cgroup/common" +) + +const ( + TestCGroupPath = "/sys/fs/cgroup/test" + + TestWatermarkScaleFactor = 100 + TestSingleReclaimFactor = 0.25 + TestSingleReclaimSize = 4096 +) + +var calculatorMutex sync.Mutex + +func NewDefaultMemoryWatermarkCalculator() *WatermarkCalculator { + return NewMemoryWatermarkCalculator(TestCGroupPath, TestWatermarkScaleFactor, TestSingleReclaimFactor, TestSingleReclaimSize) +} + +func TestNewMemoryWatermarkCalculator(t *testing.T) { + t.Parallel() + + calc := NewMemoryWatermarkCalculator(TestCGroupPath, TestWatermarkScaleFactor, TestSingleReclaimFactor, TestSingleReclaimSize) + assert.Equal(t, TestCGroupPath, calc.CGroupPath) + assert.Equal(t, uint64(TestWatermarkScaleFactor), calc.WatermarkScaleFactor) + assert.Equal(t, TestSingleReclaimFactor, calc.SingleReclaimFactor) + assert.Equal(t, uint64(TestSingleReclaimSize), calc.SingleReclaimSize) +} + +func TestWatermarkCalculator_GetLowAndHighWatermark(t *testing.T) { + t.Parallel() + + wmc := NewDefaultMemoryWatermarkCalculator() + capacity := uint64(1024 * 1024 * 1024) // 1GiB + low := wmc.GetLowWatermark(capacity) + high := wmc.GetHighWatermark(capacity) + + expectedLow := uint64(float64(capacity * wmc.WatermarkScaleFactor / 10000)) + expectedHigh := uint64(float64(capacity * 2 * wmc.WatermarkScaleFactor / 10000)) + + assert.Equal(t, expectedLow, low) + assert.Equal(t, expectedHigh, high) + assert.True(t, high >= low) +} + +func TestWatermarkCalculator_GetReclaimTarget(t *testing.T) { + t.Parallel() + + wmc := NewDefaultMemoryWatermarkCalculator() + memLimit := uint64(1000) + + // case 1: reclaimTarget > reclaimableMax + memUsage := uint64(950) + reclaimableMax := uint64(300) + + high := wmc.GetHighWatermark(memLimit) + free := memLimit - memUsage + expected := high - free + + if expected > reclaimableMax { + expected = reclaimableMax + } + got := wmc.GetReclaimTarget(memLimit, memUsage, reclaimableMax) + assert.Equal(t, expected, got) + + // case 2: reclaimTarget < reclaimableMax + memUsage = 900 + reclaimableMax = 999 + + high = wmc.GetHighWatermark(memLimit) + free = memLimit - memUsage + expected = high - free + + if expected > reclaimableMax { + expected = reclaimableMax + } + + got = wmc.GetReclaimTarget(memLimit, memUsage, reclaimableMax) + assert.Equal(t, expected, got) + + // case 3: reclaimTarget == reclaimableMax + memUsage = 800 + reclaimableMax = 900 + + high = wmc.GetHighWatermark(memLimit) + free = memLimit - memUsage + expected = high - free + + if expected > reclaimableMax { + expected = reclaimableMax + } + + got = wmc.GetReclaimTarget(memLimit, memUsage, reclaimableMax) + assert.Equal(t, expected, got) +} + +func TestWatermarkCalculator_GetWatermark(t *testing.T) { + t.Parallel() + + wmc := &WatermarkCalculator{WatermarkScaleFactor: 100} + + capacity := uint64(1024) + + low, high := wmc.GetWatermark(capacity) + assert.Equal(t, wmc.GetLowWatermark(capacity), low) + assert.Equal(t, wmc.GetHighWatermark(capacity), high) +} + +func TestWatermarkCalculator_GetReclaimMax(t *testing.T) { + t.Parallel() + + memStats := common.MemoryStats{ + InactiveFile: 100, + ActiveFile: 50, + InactiveAnno: 200, + ActiveAnno: 70, + } + + wmc := &WatermarkCalculator{SwapEnabled: false} + assert.Equal(t, uint64(150), wmc.GetReclaimMax(memStats)) + + wmc.SwapEnabled = true + assert.Equal(t, uint64(420), wmc.GetReclaimMax(memStats)) +} diff --git a/pkg/agent/qrm-plugins/memory/dynamicpolicy/userwatermark/const.go b/pkg/agent/qrm-plugins/memory/dynamicpolicy/userwatermark/const.go new file mode 100644 index 0000000000..141d281c5d --- /dev/null +++ b/pkg/agent/qrm-plugins/memory/dynamicpolicy/userwatermark/const.go @@ -0,0 +1,40 @@ +/* +Copyright 2022 The Katalyst Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package userwatermark + +const ( + MetricNameUserWatermarkReclaimEnabled = "user_watermark_reclaim_enabled" + MetricNameUserWatermarkReclaimResult = "user_watermark_reclaim_result" + MetricNameUserWatermarkReclaimFailedCount = "user_watermark_reclaim_failed_count" + MetricNameUserWatermarkReclaimCost = "user_watermark_reclaim_cost" + MetricNameUserWatermarkReclaimStats = "user_watermark_reclaim_stats" + + MetricTagKeyCGroupPath = "cgroup_path" + MetricTagKeyPodName = "pod_name" + MetricTagKeyContainerName = "container_name" + MetricTagKeySuccess = "success" + MetricTagKeyReason = "reason" + MetricTagKeyHighWaterMark = "high_watermark" + MetricTagKeyLowWaterMark = "low_watermark" + MetricTagKeyReclaimTarget = "reclaim_target" + MetricTagKeyReclaimedSize = "reclaimed_size" + MetricTagKeyMemoryPSI = "psi_avg_60" + MetricTagKeyMemoryFree = "memory_free" + MetricTagKeyReclaimRefault = "reclaim_refault" + MetricTagKeyReclaimAccuracyRatio = "reclaim_accuracy_ratio" + MetricTagKeyReclaimScanEfficiencyRatio = "reclaim_scan_efficiency_ratio" +) diff --git a/pkg/agent/qrm-plugins/memory/dynamicpolicy/userwatermark/feedback.go b/pkg/agent/qrm-plugins/memory/dynamicpolicy/userwatermark/feedback.go new file mode 100644 index 0000000000..70014f4e97 --- /dev/null +++ b/pkg/agent/qrm-plugins/memory/dynamicpolicy/userwatermark/feedback.go @@ -0,0 +1,145 @@ +/* +Copyright 2022 The Katalyst Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package userwatermark + +import ( + "fmt" + "sync" + + "github.com/kubewharf/katalyst-api/pkg/apis/config/v1alpha1" + "github.com/kubewharf/katalyst-core/pkg/config/agent/dynamic/userwatermark" + "github.com/kubewharf/katalyst-core/pkg/metrics" + "github.com/kubewharf/katalyst-core/pkg/util/general" +) + +func init() { + RegisterFeedbackPolicy(v1alpha1.UserWatermarkPolicyNamePSI, memoryPSIPolicy) + RegisterFeedbackPolicy(v1alpha1.UserWatermarkPolicyNameRefault, refaultPolicy) + RegisterFeedbackPolicy(v1alpha1.UserWatermarkPolicyNameIntegrated, integratedPolicy) +} + +var feedbackPolicies sync.Map + +func RegisterFeedbackPolicy(name v1alpha1.UserWatermarkPolicyName, plugin FeedbackPolicy) { + feedbackPolicies.Store(name, plugin) +} + +func GetRegisteredFeedbackPolices() map[v1alpha1.UserWatermarkPolicyName]FeedbackPolicy { + res := make(map[v1alpha1.UserWatermarkPolicyName]FeedbackPolicy) + feedbackPolicies.Range(func(key, value interface{}) bool { + res[key.(v1alpha1.UserWatermarkPolicyName)] = value.(FeedbackPolicy) + return true + }) + return res +} + +type FeedbackResult struct { + Abnormal bool + Reason string +} + +type FeedbackManager struct { + policyName v1alpha1.UserWatermarkPolicyName + feedbackPolicies map[v1alpha1.UserWatermarkPolicyName]FeedbackPolicy +} +type FeedbackPolicy func(lastStats ReclaimStats, currStats ReclaimStats, conf *userwatermark.ReclaimConfigDetail, emitter metrics.MetricEmitter) FeedbackResult + +func NewFeedbackManager() *FeedbackManager { + return &FeedbackManager{ + policyName: v1alpha1.UserWatermarkPolicyNameIntegrated, + feedbackPolicies: GetRegisteredFeedbackPolices(), + } +} + +func (f *FeedbackManager) UpdateFeedbackPolicy(policyName v1alpha1.UserWatermarkPolicyName) { + f.policyName = policyName + general.InfofV(5, "Update feedback policy success, policyName:%v", f.policyName) +} + +func (f *FeedbackManager) FeedbackResult(lastStats ReclaimStats, currStats ReclaimStats, conf *userwatermark.ReclaimConfigDetail, emitter metrics.MetricEmitter) (FeedbackResult, error) { + switch f.policyName { + case v1alpha1.UserWatermarkPolicyNamePSI: + return memoryPSIPolicy(lastStats, currStats, conf, emitter), nil + case v1alpha1.UserWatermarkPolicyNameRefault: + return refaultPolicy(lastStats, currStats, conf, emitter), nil + case v1alpha1.UserWatermarkPolicyNameIntegrated: + return integratedPolicy(lastStats, currStats, conf, emitter), nil + default: + return FeedbackResult{}, fmt.Errorf("feedback policy %v not registered", f.policyName) + } +} + +func integratedPolicy(lastStats ReclaimStats, currStats ReclaimStats, conf *userwatermark.ReclaimConfigDetail, emitter metrics.MetricEmitter) FeedbackResult { + result := memoryPSIPolicy(lastStats, currStats, conf, emitter) + if result.Abnormal { + return result + } + + result = refaultPolicy(lastStats, currStats, conf, emitter) + if result.Abnormal { + return result + } + + return result +} + +func memoryPSIPolicy(lastStats ReclaimStats, currStats ReclaimStats, conf *userwatermark.ReclaimConfigDetail, emitter metrics.MetricEmitter) FeedbackResult { + result := FeedbackResult{} + if currStats.memPsiAvg60 >= conf.PsiAvg60Threshold { + result.Abnormal = true + result.Reason = fmt.Sprintf("memPsiAvg60 %.4f >= threshold %.4f", currStats.memPsiAvg60, conf.PsiAvg60Threshold) + } + + return result +} + +func refaultPolicy(lastStats ReclaimStats, currStats ReclaimStats, conf *userwatermark.ReclaimConfigDetail, emitter metrics.MetricEmitter) FeedbackResult { + result := FeedbackResult{} + + reclaimAccuracyRatio := getReclaimAccuracyRatio(lastStats, currStats) + reclaimScanEfficiencyRatio := getReclaimScanEfficiencyRatio(lastStats, currStats) + + if reclaimAccuracyRatio < conf.RefaultPolicyConf.ReclaimAccuracyTarget || reclaimScanEfficiencyRatio < conf.RefaultPolicyConf.ReclaimScanEfficiencyTarget { + result.Abnormal = true + result.Reason = fmt.Sprintf("reclaimAccuracyRatio %.4f < target %.4f || reclaimScanEfficiencyRatio %.4f < target %.4f", + reclaimAccuracyRatio, conf.RefaultPolicyConf.ReclaimAccuracyTarget, reclaimScanEfficiencyRatio, conf.RefaultPolicyConf.ReclaimScanEfficiencyTarget) + } + + return result +} + +func getReclaimAccuracyRatio(lastStats ReclaimStats, currStats ReclaimStats) float64 { + pgstealDelta := currStats.pgsteal - lastStats.pgsteal + refaultDelta := currStats.refaultActivate - lastStats.refaultActivate + reclaimAccuracyRatio := 1.0 + if pgstealDelta > 0 { + reclaimAccuracyRatio = 1 - refaultDelta/pgstealDelta + } + + return reclaimAccuracyRatio +} + +func getReclaimScanEfficiencyRatio(lastStats ReclaimStats, currStats ReclaimStats) float64 { + pgscanDelta := currStats.pgscan - lastStats.pgscan + pgstealDelta := currStats.pgsteal - lastStats.pgsteal + reclaimScanEfficiencyRatio := 1.0 + if pgscanDelta > 0 && pgstealDelta > 0 { + reclaimScanEfficiencyRatio = pgstealDelta / pgscanDelta + } + + return reclaimScanEfficiencyRatio +} diff --git a/pkg/agent/qrm-plugins/memory/dynamicpolicy/userwatermark/feedback_test.go b/pkg/agent/qrm-plugins/memory/dynamicpolicy/userwatermark/feedback_test.go new file mode 100644 index 0000000000..f9f67c15b2 --- /dev/null +++ b/pkg/agent/qrm-plugins/memory/dynamicpolicy/userwatermark/feedback_test.go @@ -0,0 +1,181 @@ +/* +Copyright 2022 The Katalyst Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package userwatermark + +import ( + "testing" + + "github.com/stretchr/testify/assert" + + v1alpha1 "github.com/kubewharf/katalyst-api/pkg/apis/config/v1alpha1" + dynamicuserwm "github.com/kubewharf/katalyst-core/pkg/config/agent/dynamic/userwatermark" + "github.com/kubewharf/katalyst-core/pkg/metrics" +) + +func TestNewFeedbackManager_DefaultPolicyAndRegistry(t *testing.T) { + t.Parallel() + + mgr := NewFeedbackManager() + assert.Equal(t, v1alpha1.UserWatermarkPolicyNameIntegrated, mgr.policyName) + + policies := mgr.feedbackPolicies + + // basic sanity: three builtin policies should be registered + _, hasPSI := policies[v1alpha1.UserWatermarkPolicyNamePSI] + _, hasRefault := policies[v1alpha1.UserWatermarkPolicyNameRefault] + _, hasIntegrated := policies[v1alpha1.UserWatermarkPolicyNameIntegrated] + + assert.True(t, hasPSI) + assert.True(t, hasRefault) + assert.True(t, hasIntegrated) +} + +func TestFeedbackManager_UpdateAndDispatch(t *testing.T) { + t.Parallel() + + mgr := NewFeedbackManager() + + conf := dynamicuserwm.NewReclaimConfigDetail(dynamicuserwm.NewUserWatermarkDefaultConfiguration()) + conf.PsiAvg60Threshold = 1.0 + conf.RefaultPolicyConf.ReclaimAccuracyTarget = 0.0 + conf.RefaultPolicyConf.ReclaimScanEfficiencyTarget = 0.0 + emitter := metrics.DummyMetrics{} + + last := ReclaimStats{} + curr := ReclaimStats{} + + // PSI policy + mgr.UpdateFeedbackPolicy(v1alpha1.UserWatermarkPolicyNamePSI) + res, err := mgr.FeedbackResult(last, curr, conf, emitter) + assert.NoError(t, err) + assert.False(t, res.Abnormal) + + // Refault policy + mgr.UpdateFeedbackPolicy(v1alpha1.UserWatermarkPolicyNameRefault) + res, err = mgr.FeedbackResult(last, curr, conf, emitter) + assert.NoError(t, err) + assert.False(t, res.Abnormal) + + // Integrated policy + mgr.UpdateFeedbackPolicy(v1alpha1.UserWatermarkPolicyNameIntegrated) + res, err = mgr.FeedbackResult(last, curr, conf, emitter) + assert.NoError(t, err) + assert.False(t, res.Abnormal) + + // Unknown policy should return error + mgr.UpdateFeedbackPolicy("unknown-policy") + _, err = mgr.FeedbackResult(last, curr, conf, emitter) + assert.Error(t, err) +} + +func TestIntegratedPolicy_Order(t *testing.T) { + t.Parallel() + + conf := dynamicuserwm.NewReclaimConfigDetail(dynamicuserwm.NewUserWatermarkDefaultConfiguration()) + + emitter := metrics.DummyMetrics{} + last := ReclaimStats{} + curr := ReclaimStats{} + + // when PSI policy reports abnormal, integrated should stop early + conf.PsiAvg60Threshold = 0.5 + curr.memPsiAvg60 = 1.0 + + res := integratedPolicy(last, curr, conf, emitter) + assert.True(t, res.Abnormal) + assert.Contains(t, res.Reason, "memPsiAvg60") + + // when PSI is normal but refault policy is abnormal + curr.memPsiAvg60 = 0.0 + conf.RefaultPolicyConf = &dynamicuserwm.RefaultPolicyConf{ + ReclaimAccuracyTarget: 0.8, + ReclaimScanEfficiencyTarget: 0.5, + } + last = ReclaimStats{pgsteal: 100, pgscan: 200} + curr = ReclaimStats{pgsteal: 200, pgscan: 400, refaultActivate: 90} + + res = integratedPolicy(last, curr, conf, emitter) + assert.True(t, res.Abnormal) + assert.Contains(t, res.Reason, "reclaimAccuracyRatio") +} + +func TestMemoryPSIPolicy(t *testing.T) { + t.Parallel() + + conf := dynamicuserwm.NewReclaimConfigDetail(dynamicuserwm.NewUserWatermarkDefaultConfiguration()) + emitter := metrics.DummyMetrics{} + + last := ReclaimStats{} + curr := ReclaimStats{} + conf.PsiAvg60Threshold = 1.0 + + // below threshold + curr.memPsiAvg60 = 0.5 + res := memoryPSIPolicy(last, curr, conf, emitter) + assert.False(t, res.Abnormal) + + // equal to threshold should be abnormal + curr.memPsiAvg60 = 1.0 + res = memoryPSIPolicy(last, curr, conf, emitter) + assert.True(t, res.Abnormal) + assert.Contains(t, res.Reason, "memPsiAvg60") +} + +func TestRefaultPolicy(t *testing.T) { + t.Parallel() + + conf := dynamicuserwm.NewReclaimConfigDetail(dynamicuserwm.NewUserWatermarkDefaultConfiguration()) + conf.RefaultPolicyConf = &dynamicuserwm.RefaultPolicyConf{ + ReclaimAccuracyTarget: 0.8, + ReclaimScanEfficiencyTarget: 0.5, + } + emitter := metrics.DummyMetrics{} + + last := ReclaimStats{pgsteal: 100, pgscan: 200} + curr := ReclaimStats{pgsteal: 200, pgscan: 400, refaultActivate: 10} + + // good accuracy and efficiency -> normal + res := refaultPolicy(last, curr, conf, emitter) + assert.False(t, res.Abnormal) + + // bad accuracy -> abnormal + curr.refaultActivate = 90 + res = refaultPolicy(last, curr, conf, emitter) + assert.True(t, res.Abnormal) + assert.Contains(t, res.Reason, "reclaimAccuracyRatio") +} + +func TestGetReclaimAccuracyAndScanEfficiencyRatio(t *testing.T) { + t.Parallel() + + last := ReclaimStats{pgsteal: 100, pgscan: 200} + curr := ReclaimStats{pgsteal: 200, pgscan: 400, refaultActivate: 50} + + accuracy := getReclaimAccuracyRatio(last, curr) + scanEff := getReclaimScanEfficiencyRatio(last, curr) + + // pgstealDelta=100, refaultDelta=50 => 1 - 50/100 = 0.5 + assert.InDelta(t, 0.5, accuracy, 1e-6) + // pgscanDelta=200, pgstealDelta=100 => 100/200 = 0.5 + assert.InDelta(t, 0.5, scanEff, 1e-6) + // guard branches when denominators are zero + accuracy = getReclaimAccuracyRatio(ReclaimStats{}, ReclaimStats{}) + assert.InDelta(t, 1.0, accuracy, 1e-6) + + scanEff = getReclaimScanEfficiencyRatio(ReclaimStats{}, ReclaimStats{}) + assert.InDelta(t, 1.0, scanEff, 1e-6) +} diff --git a/pkg/agent/qrm-plugins/memory/dynamicpolicy/userwatermark/manager.go b/pkg/agent/qrm-plugins/memory/dynamicpolicy/userwatermark/manager.go new file mode 100644 index 0000000000..c1b4ea3040 --- /dev/null +++ b/pkg/agent/qrm-plugins/memory/dynamicpolicy/userwatermark/manager.go @@ -0,0 +1,193 @@ +/* +Copyright 2022 The Katalyst Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package userwatermark + +import ( + "context" + "time" + + "k8s.io/apimachinery/pkg/util/wait" + + katalystapiconsts "github.com/kubewharf/katalyst-api/pkg/consts" + "github.com/kubewharf/katalyst-core/pkg/agent/sysadvisor/plugin/qosaware/resource/helper" + "github.com/kubewharf/katalyst-core/pkg/agent/sysadvisor/types" + dynamicconfig "github.com/kubewharf/katalyst-core/pkg/config/agent/dynamic" + "github.com/kubewharf/katalyst-core/pkg/config/generic" + katalystcoreconsts "github.com/kubewharf/katalyst-core/pkg/consts" + "github.com/kubewharf/katalyst-core/pkg/metaserver" + "github.com/kubewharf/katalyst-core/pkg/metrics" + "github.com/kubewharf/katalyst-core/pkg/util/general" + "github.com/kubewharf/katalyst-core/pkg/util/native" +) + +// UserWatermarkReclaimManager is a memory reclamation manager based on the container's memory watermark at the user-space level. +type UserWatermarkReclaimManager struct { + qosConfig *generic.QoSConfiguration + emitter metrics.MetricEmitter + metaServer *metaserver.MetaServer + dynamicConf *dynamicconfig.DynamicAgentConfiguration + + started map[string]bool + containerReclaimer map[katalystcoreconsts.PodContainerName]Reclaimer + cgroupPathReclaimer map[string]Reclaimer +} + +// NewUserWatermarkReclaimManager creates a new instance of UserWatermarkReclaimManager. +func NewUserWatermarkReclaimManager(qosConfig *generic.QoSConfiguration, dynamicConf *dynamicconfig.DynamicAgentConfiguration, + emitter metrics.MetricEmitter, metaServer *metaserver.MetaServer, +) *UserWatermarkReclaimManager { + return &UserWatermarkReclaimManager{ + emitter: emitter, + metaServer: metaServer, + qosConfig: qosConfig, + dynamicConf: dynamicConf, + + started: make(map[string]bool), + containerReclaimer: make(map[katalystcoreconsts.PodContainerName]Reclaimer), + cgroupPathReclaimer: make(map[string]Reclaimer), + } +} + +// Run starts the UserWatermarkReclaimManager periodically. +func (m *UserWatermarkReclaimManager) Run(stopCh chan struct{}) { + wait.Until(m.reconcile, time.Duration(m.dynamicConf.GetDynamicConfiguration().UserWatermarkConfiguration.ReconcileInterval)*time.Second, stopCh) +} + +func (m *UserWatermarkReclaimManager) reconcile() { + enabled := m.dynamicConf.GetDynamicConfiguration().UserWatermarkConfiguration.EnableReclaimer + if !enabled { + general.Warningf("UserWatermarkReclaimManager is disabled") + _ = m.emitter.StoreInt64(MetricNameUserWatermarkReclaimEnabled, 0, metrics.MetricTypeNameRaw) + return + } + general.InfofV(5, "UserWatermarkReclaimManage start to reconcile") + _ = m.emitter.StoreInt64(MetricNameUserWatermarkReclaimEnabled, 1, metrics.MetricTypeNameRaw) + + containerNamesMap := make(map[katalystcoreconsts.PodContainerName]bool) + podList, err := m.metaServer.GetPodList(context.Background(), native.PodIsActive) + if err != nil { + general.Errorf("Failed to get pod list: %v", err) + return + } + + // iterate through all running containers to create a reclaimer instance for them + for _, pod := range podList { + if pod == nil { + general.Errorf("Get nil pod from meta server.") + continue + } + // calculate the pod qos level + qos, err := m.qosConfig.GetQoSLevel(pod, map[string]string{}) + if err != nil { + general.Errorf("Failed to get qos level for pod: %v, err: %v/%v", pod.Name, pod.Namespace, err) + if helper.PodIsDaemonSet(pod) { + qos = katalystapiconsts.PodAnnotationQoSLevelSystemCores + general.Infof("DaemonSet pod %v is considered as system_cores qos level", pod.Name) + } + } + // wrap container info + for _, containerStatus := range pod.Status.ContainerStatuses { + containerInfo := &types.ContainerInfo{ + PodUID: string(pod.UID), + PodName: pod.Name, + ContainerName: containerStatus.Name, + Labels: pod.Labels, + Annotations: pod.Annotations, + QoSLevel: qos, + } + containerID, err := m.metaServer.GetContainerID(containerInfo.PodUID, containerInfo.ContainerName) + if err != nil || containerID == "" { + general.Warningf("Failed to get container id for pod: %v, container name: %s, err: %v", pod.Name, containerInfo.ContainerName, err) + continue + } + // get the container absolute cgroup path + cgpath, err := GetContainerCgroupPath(containerInfo.PodName, containerID) + if err != nil { + general.Infof("Failed to get cgroup path for pod: %v, container name: %s, err: %v", pod.Name, containerInfo.ContainerName, err) + continue + } + + instanceInfo := ReclaimInstance{ + ContainerInfo: containerInfo, + CgroupPath: cgpath, + } + + podContainerName := native.GeneratePodContainerName(containerInfo.PodName, containerInfo.ContainerName) + containerNamesMap[podContainerName] = true + _, exist := m.containerReclaimer[podContainerName] + if !exist { + m.containerReclaimer[podContainerName] = NewUserWatermarkReclaimer(instanceInfo, m.metaServer, m.emitter, m.dynamicConf) + general.InfofV(5, "Create UserWatermarkReclaimer for pod: %v, container name: %s", pod.Name, containerInfo.ContainerName) + } + } + } + + // update tmo config for specified cgroup paths + for cgpath := range m.dynamicConf.GetDynamicConfiguration().UserWatermarkConfiguration.CgroupConfig { + general.Infof("Get Cgroup config for specific cgroup path %v", cgpath) + if _, exist := m.cgroupPathReclaimer[cgpath]; !exist { + instanceInfo := ReclaimInstance{ + ContainerInfo: &types.ContainerInfo{}, + CgroupPath: cgpath, + } + m.cgroupPathReclaimer[cgpath] = NewUserWatermarkReclaimer(instanceInfo, m.metaServer, m.emitter, m.dynamicConf) + general.InfofV(5, "Create UserWatermarkReclaimer for cgroup path: %v", cgpath) + } + } + + // delete user watermark reclaimer for not existed containers + for podContainerName, reclaimer := range m.containerReclaimer { + if _, exist := containerNamesMap[podContainerName]; !exist { + reclaimer.Stop() + delete(m.started, string(podContainerName)) + delete(m.containerReclaimer, podContainerName) + general.Infof("Stop UserWatermarkReclaimer for pod container name: %v", podContainerName) + } + } + + // delete user watermark reclaimer for not existed cgroups + for cgpath, reclaimer := range m.cgroupPathReclaimer { + if _, exist := m.dynamicConf.GetDynamicConfiguration().UserWatermarkConfiguration.CgroupConfig[cgpath]; !exist { + reclaimer.Stop() + delete(m.started, cgpath) + delete(m.cgroupPathReclaimer, cgpath) + general.Infof("Stop UserWatermarkReclaimer for cgroup path: %v", cgpath) + } + } + + // start reclaim reclaimer for each container + for podContainerName, reclaimer := range m.containerReclaimer { + if !m.started[string(podContainerName)] { + go reclaimer.Run() + m.started[string(podContainerName)] = true + + general.Infof("Start UserWatermarkReclaimer for pod container name: %v", podContainerName) + } + general.InfofV(5, "UserWatermarkReclaimer for pod container name: %v has started", podContainerName) + } + + // start reclaim reclaimer for each cgroups + for cgpath, reclaimer := range m.cgroupPathReclaimer { + if !m.started[cgpath] { + go reclaimer.Run() + m.started[cgpath] = true + + general.Infof("Start UserWatermarkReclaimer for cgroup path: %v", cgpath) + } + general.InfofV(5, "UserWatermarkReclaimer for cgroup path: %v has started", cgpath) + } +} diff --git a/pkg/agent/qrm-plugins/memory/dynamicpolicy/userwatermark/manager_test.go b/pkg/agent/qrm-plugins/memory/dynamicpolicy/userwatermark/manager_test.go new file mode 100644 index 0000000000..b02e78e136 --- /dev/null +++ b/pkg/agent/qrm-plugins/memory/dynamicpolicy/userwatermark/manager_test.go @@ -0,0 +1,231 @@ +/* +Copyright 2022 The Katalyst Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package userwatermark + +import ( + "fmt" + "sync" + "testing" + + "github.com/bytedance/mockey" + "github.com/stretchr/testify/assert" + v1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + + katalystapiconsts "github.com/kubewharf/katalyst-api/pkg/consts" + dynamicconfig "github.com/kubewharf/katalyst-core/pkg/config/agent/dynamic" + userwmconfig "github.com/kubewharf/katalyst-core/pkg/config/agent/dynamic/userwatermark" + "github.com/kubewharf/katalyst-core/pkg/config/generic" + katalystcoreconsts "github.com/kubewharf/katalyst-core/pkg/consts" + "github.com/kubewharf/katalyst-core/pkg/metaserver" + "github.com/kubewharf/katalyst-core/pkg/metaserver/agent" + metapod "github.com/kubewharf/katalyst-core/pkg/metaserver/agent/pod" + "github.com/kubewharf/katalyst-core/pkg/metrics" + "github.com/kubewharf/katalyst-core/pkg/util/cgroup/common" + "github.com/kubewharf/katalyst-core/pkg/util/native" +) + +var managerMutex sync.Mutex + +func generateUserWatermarkTestMetaServer(pods []*v1.Pod) *metaserver.MetaServer { + podFetcher := &metapod.PodFetcherStub{PodList: pods} + + return &metaserver.MetaServer{ + MetaAgent: &agent.MetaAgent{ + PodFetcher: podFetcher, + }, + } +} + +func TestNewUserWatermarkReclaimManager(t *testing.T) { + t.Parallel() + + qosConfig := generic.NewQoSConfiguration() + dynamicConf := dynamicconfig.NewDynamicAgentConfiguration() + m := NewUserWatermarkReclaimManager(qosConfig, dynamicConf, metrics.DummyMetrics{}, (*metaserver.MetaServer)(nil)) + assert.NotNil(t, m) + assert.Equal(t, qosConfig, m.qosConfig) + assert.Equal(t, dynamicConf, m.dynamicConf) + assert.NotNil(t, m.started) + assert.NotNil(t, m.containerReclaimer) + assert.NotNil(t, m.cgroupPathReclaimer) +} + +func TestUserWatermarkReclaimManager_Reconcile_Disabled(t *testing.T) { + t.Parallel() + + qosConfig := generic.NewQoSConfiguration() + dynamicConf := dynamicconfig.NewDynamicAgentConfiguration() + conf := dynamicConf.GetDynamicConfiguration() + conf.UserWatermarkConfiguration.EnableReclaimer = false + + m := NewUserWatermarkReclaimManager(qosConfig, dynamicConf, metrics.DummyMetrics{}, (*metaserver.MetaServer)(nil)) + + // pre-populate started/container maps to ensure they are not touched when disabled + m.started["test"] = true + m.containerReclaimer[katalystcoreconsts.PodContainerName("pod,container")] = &userWatermarkReclaimer{} + + m.reconcile() + assert.True(t, m.started["test"]) + _, exist := m.containerReclaimer[katalystcoreconsts.PodContainerName("pod,container")] + assert.True(t, exist) +} + +func TestUserWatermarkReclaimManager_Reconcile_CreateReclaimers(t *testing.T) { + t.Parallel() + managerMutex.Lock() + defer managerMutex.Unlock() + + defer mockey.UnPatchAll() + qosConfig := generic.NewQoSConfiguration() + + dynamicConf := dynamicconfig.NewDynamicAgentConfiguration() + conf := dynamicConf.GetDynamicConfiguration() + conf.UserWatermarkConfiguration.EnableReclaimer = true + conf.UserWatermarkConfiguration.ServiceLabel = "svc-label" + + // add a cgroup level configuration + cgPath := "/sys/fs/cgroup/memory/custom" + conf.UserWatermarkConfiguration.CgroupConfig[cgPath] = userwmconfig.NewReclaimConfigDetail(conf.UserWatermarkConfiguration.DefaultConfig) + + podObj := &v1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + UID: "pod-uid-1", + Name: "test-pod", + Namespace: "default", + Annotations: map[string]string{ + katalystapiconsts.PodAnnotationQoSLevelKey: katalystapiconsts.PodAnnotationQoSLevelSharedCores, + }, + Labels: map[string]string{ + "svc-label": "svc-a", + }, + }, + Status: v1.PodStatus{ + Phase: v1.PodRunning, + ContainerStatuses: []v1.ContainerStatus{ + { + Name: "c1", + ContainerID: "containerd://cid-1", + Ready: true, + }, + }, + }, + } + + ms := generateUserWatermarkTestMetaServer([]*v1.Pod{podObj}) + + // patch container cgroup path helper to avoid touching real cgroups + mockey.Mock(common.GetContainerAbsCgroupPath). + To(func(_ string, podUID, containerID string) (string, error) { + return "/sys/fs/cgroup/memory/test-cgroup", nil + }).Build() + + m := NewUserWatermarkReclaimManager(qosConfig, dynamicConf, metrics.DummyMetrics{}, ms) + m.reconcile() + + // container based reclaimer + podContainerName := native.GeneratePodContainerName(podObj.Name, podObj.Status.ContainerStatuses[0].Name) + _, exist := m.containerReclaimer[podContainerName] + assert.True(t, exist) + assert.True(t, m.started[string(podContainerName)]) + + // cgroup based reclaimer + _, exist = m.cgroupPathReclaimer[cgPath] + assert.True(t, exist) + assert.True(t, m.started[cgPath]) +} + +func TestUserWatermarkReclaimManager_Reconcile_GetPodListError(t *testing.T) { + t.Parallel() + managerMutex.Lock() + defer managerMutex.Unlock() + + defer mockey.UnPatchAll() + + qosConfig := generic.NewQoSConfiguration() + dynamicConf := dynamicconfig.NewDynamicAgentConfiguration() + conf := dynamicConf.GetDynamicConfiguration() + conf.UserWatermarkConfiguration.EnableReclaimer = true + + // meta server with stub pod fetcher + pods := []*v1.Pod{{}} + ms := generateUserWatermarkTestMetaServer(pods) + + // force PodFetcherStub.GetPodList to return error + mockErr := fmt.Errorf("list error") + mockey.Mock((*metapod.PodFetcherStub).GetPodList).Return(nil, mockErr).Build() + + m := NewUserWatermarkReclaimManager(qosConfig, dynamicConf, metrics.DummyMetrics{}, ms) + m.reconcile() + + // should early return and create no reclaimers + assert.Empty(t, m.containerReclaimer) + assert.Empty(t, m.cgroupPathReclaimer) +} + +type fakeReclaimer struct { + stopped bool +} + +func (f *fakeReclaimer) Run() {} +func (f *fakeReclaimer) Stop() { f.stopped = true } +func (f *fakeReclaimer) LoadConfig() {} +func (f *fakeReclaimer) GetConfig() *userwmconfig.ReclaimConfigDetail { return nil } + +func TestUserWatermarkReclaimManager_Reconcile_CleanupStaleReclaimers(t *testing.T) { + t.Parallel() + + qosConfig := generic.NewQoSConfiguration() + dynamicConf := dynamicconfig.NewDynamicAgentConfiguration() + conf := dynamicConf.GetDynamicConfiguration() + conf.UserWatermarkConfiguration.EnableReclaimer = true + + // meta server with no running pods + ms := generateUserWatermarkTestMetaServer(nil) + + m := NewUserWatermarkReclaimManager(qosConfig, dynamicConf, metrics.DummyMetrics{}, ms) + + podContainerKey := katalystcoreconsts.PodContainerName("pod-1,container-1") + cgPath := "/sys/fs/cgroup/memory/old" + + frContainer := &fakeReclaimer{} + frCgroup := &fakeReclaimer{} + + m.containerReclaimer[podContainerKey] = frContainer + m.cgroupPathReclaimer[cgPath] = frCgroup + + m.started[string(podContainerKey)] = true + m.started[cgPath] = true + + // no CgroupConfig for cgPath, and no pods => both reclaimers should be cleaned + m.reconcile() + + _, exist := m.containerReclaimer[podContainerKey] + assert.False(t, exist) + + _, exist = m.cgroupPathReclaimer[cgPath] + assert.False(t, exist) + + _, exist = m.started[string(podContainerKey)] + assert.False(t, exist) + + _, exist = m.started[cgPath] + assert.False(t, exist) + + assert.True(t, frContainer.stopped) + assert.True(t, frCgroup.stopped) +} diff --git a/pkg/agent/qrm-plugins/memory/dynamicpolicy/userwatermark/reclaimer.go b/pkg/agent/qrm-plugins/memory/dynamicpolicy/userwatermark/reclaimer.go new file mode 100644 index 0000000000..cdea099b92 --- /dev/null +++ b/pkg/agent/qrm-plugins/memory/dynamicpolicy/userwatermark/reclaimer.go @@ -0,0 +1,540 @@ +/* +Copyright 2022 The Katalyst Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package userwatermark + +import ( + "context" + "fmt" + "path/filepath" + "strings" + "sync" + "time" + + "k8s.io/apimachinery/pkg/util/wait" + + katalystapiconsts "github.com/kubewharf/katalyst-api/pkg/consts" + "github.com/kubewharf/katalyst-core/pkg/agent/sysadvisor/plugin/qosaware/resource/helper" + "github.com/kubewharf/katalyst-core/pkg/agent/sysadvisor/types" + dynamicconfig "github.com/kubewharf/katalyst-core/pkg/config/agent/dynamic" + "github.com/kubewharf/katalyst-core/pkg/config/agent/dynamic/userwatermark" + "github.com/kubewharf/katalyst-core/pkg/consts" + "github.com/kubewharf/katalyst-core/pkg/metaserver" + "github.com/kubewharf/katalyst-core/pkg/metrics" + "github.com/kubewharf/katalyst-core/pkg/util/cgroup/common" + cgroupmgr "github.com/kubewharf/katalyst-core/pkg/util/cgroup/manager" + "github.com/kubewharf/katalyst-core/pkg/util/general" +) + +type ReclaimInstance struct { + ContainerInfo *types.ContainerInfo + CgroupPath string +} + +type ReclaimInfo struct { + CGroupPath string + LowWaterMark uint64 + HighWaterMark uint64 + // ReclaimTarget is the target memory size that can be reclaimed. The default unit is byte. + ReclaimTarget uint64 + // SingleReclaimMax is the max memory size that can be reclaimed in one reclaim cycle.The default unit is byte. + SingleReclaimMax uint64 +} + +type ReclaimResult struct { + Success bool + Reason string + ReclaimedSize uint64 +} + +type ReclaimStats struct { + obj string + qosLevel string + memUsage float64 + memInactive float64 + memPsiAvg60 float64 + pgscan float64 + pgsteal float64 + refault float64 + refaultActivate float64 + cache float64 + mapped float64 + reclaimTargetSize float64 +} + +type Reclaimer interface { + Run() + Stop() + LoadConfig() + GetConfig() *userwatermark.ReclaimConfigDetail +} + +type userWatermarkReclaimer struct { + mutex sync.RWMutex + stopCh chan struct{} + emitter metrics.MetricEmitter + metaServer *metaserver.MetaServer + dynamicConf *dynamicconfig.DynamicAgentConfiguration + + // absolute path of cgroup + cgroupPath string + containerInfo *types.ContainerInfo + serviceLabel string + reclaimConf *userwatermark.ReclaimConfigDetail + + failedCount uint64 + feedbackManager *FeedbackManager +} + +// NewUserWatermarkReclaimer creates a new instance of userWatermarkReclaimer. +func NewUserWatermarkReclaimer(instanceInfo ReclaimInstance, metaServer *metaserver.MetaServer, emitter metrics.MetricEmitter, dynamicConf *dynamicconfig.DynamicAgentConfiguration) *userWatermarkReclaimer { + reclaimer := &userWatermarkReclaimer{ + stopCh: make(chan struct{}), + emitter: emitter, + metaServer: metaServer, + dynamicConf: dynamicConf, + + serviceLabel: dynamicConf.GetDynamicConfiguration().UserWatermarkConfiguration.ServiceLabel, + containerInfo: &types.ContainerInfo{}, + reclaimConf: userwatermark.NewReclaimConfigDetail(dynamicConf.GetDynamicConfiguration().UserWatermarkConfiguration.DefaultConfig), + feedbackManager: NewFeedbackManager(), + } + + reclaimer.cgroupPath = instanceInfo.CgroupPath + if instanceInfo.ContainerInfo != nil { + reclaimer.containerInfo = instanceInfo.ContainerInfo + } + + return reclaimer +} + +func (r *userWatermarkReclaimer) Run() { + _ = wait.PollUntil(time.Duration(r.reclaimConf.ReclaimInterval)*time.Second, r.run, r.stopCh) +} + +func (r *userWatermarkReclaimer) Stop() { + close(r.stopCh) +} + +// The memory reclamation of the instance is performed in a loop through the following steps: +// 1. Load the latest configuration; +// 2. Check for frequent failures to avoid invalid recycling; +// 3. Determine if recycling is required (memory watermark not reached); +// 4. Perform a recycling and perform an appropriate amount of sleep based on the recovery state results. +func (r *userWatermarkReclaimer) run() (done bool, err error) { + // 1. load the latest configuration + r.LoadConfig() + r.feedbackManager.UpdateFeedbackPolicy(r.reclaimConf.FeedbackPolicy) + + if !r.reclaimConf.EnableMemoryReclaim { + general.Warningf("Memory reclamation is disabled for podcgroup %s", r.cgroupPath) + return false, nil + } + + // 2. check for frequent failures to avoid invalid recycling + if r.failedCount >= r.reclaimConf.ReclaimFailedThreshold { + general.Warningf("The number of memory watermark reclamation failures(%v) has reached a threshold(%v), freeze for %v seconds.", + r.failedCount, r.reclaimConf.ReclaimFailedThreshold, r.reclaimConf.FailureFreezePeriod) + time.Sleep(r.reclaimConf.FailureFreezePeriod) + + return false, nil + } + + // get memory limit and usage + memLimit, memUsage, err := GetCGroupMemoryLimitAndUsage(r.cgroupPath) + if err != nil { + general.Warningf("Get cgroup(%s) memory limit and usage failed, err: %v", r.cgroupPath, err) + return false, nil + } + if memLimit == 0 || memUsage >= memLimit { + general.Warningf("Memory watermark reclaimer skip cgroup %s due to invalid limit(%d) or usage(%d)", r.cgroupPath, memLimit, memUsage) + return false, nil + } + // calculate memory watermark + mwc := NewMemoryWatermarkCalculator(r.cgroupPath, r.reclaimConf.ScaleFactor, r.reclaimConf.SingleReclaimFactor, r.reclaimConf.SingleReclaimSize) + lowWatermark, highWatermark := mwc.GetWatermark(memLimit) + + // 3. determine if memory reclamation is necessary + free := memLimit - memUsage + if free >= highWatermark { + general.Warningf("Memory watermark reclaimer skip cgroup %s due to free(%d) >= highWatermark(%d)", r.cgroupPath, free, highWatermark) + return false, nil + } + + // warp the reclaim info + memStats, err := GetCGroupMemoryStats(r.cgroupPath) + if err != nil { + general.Warningf("Get cgroup(%s) memory stats failed, err: %v", r.cgroupPath, err) + return false, nil + } + reclaimableMax := mwc.GetReclaimMax(memStats) + reclaimTarget := mwc.GetReclaimTarget(memLimit, memUsage, reclaimableMax) + singleReclaimMax := mwc.GetReclaimSingleStepMax(memStats) + reclaimInfo := &ReclaimInfo{ + CGroupPath: r.cgroupPath, + LowWaterMark: lowWatermark, + HighWaterMark: highWatermark, + ReclaimTarget: reclaimTarget, + SingleReclaimMax: singleReclaimMax, + } + + // 4. perform memory reclamation and hibernate to a certain extent depending on the reclaim state + result, err := r.Reclaim(reclaimInfo) + if err != nil || !result.Success { + r.failedCount++ + + general.Warningf("Memory reclaim failed, podName:%v containerName:%v cgroupPath: %s, result: %v", r.containerInfo.PodName, r.containerInfo.ContainerName, r.cgroupPath, result) + time.Sleep(r.reclaimConf.BackoffDuration) + } else { + r.failedCount = 0 + } + + r.emitMetric(MetricNameUserWatermarkReclaimResult, 1, metrics.MetricTypeNameRaw, + metrics.MetricTag{Key: MetricTagKeySuccess, Val: fmt.Sprintf("%v", result.Success)}, + metrics.MetricTag{Key: MetricTagKeyReason, Val: result.Reason}, + metrics.MetricTag{Key: MetricTagKeyHighWaterMark, Val: fmt.Sprintf("%v", reclaimInfo.HighWaterMark)}, + metrics.MetricTag{Key: MetricTagKeyLowWaterMark, Val: fmt.Sprintf("%v", reclaimInfo.LowWaterMark)}, + metrics.MetricTag{Key: MetricTagKeyReclaimTarget, Val: fmt.Sprintf("%v", reclaimInfo.ReclaimTarget)}, + metrics.MetricTag{Key: MetricTagKeyReclaimedSize, Val: fmt.Sprintf("%v", result.ReclaimedSize)}) + + r.emitMetric(MetricNameUserWatermarkReclaimFailedCount, int64(r.failedCount), metrics.MetricTypeNameRaw) + + return false, nil +} + +// Reclaim performs memory reclamation based on the provided reclaimInfo. +// It returns a ReclaimResult indicating the success or failure of the reclamation operation, +// along with an error if any occurred. +func (r *userWatermarkReclaimer) Reclaim(reclaimInfo *ReclaimInfo) (ReclaimResult, error) { + result := ReclaimResult{} + general.InfofV(5, "Memory watermark reclaimer start to reclaim, reclaimInfo: %v", reclaimInfo) + + originReclaimStats, err := r.getMemoryReclaimStats() + if err != nil { + result.Reason = fmt.Sprintf("Get memory reclaimStats failed: %v", err) + return result, fmt.Errorf(result.Reason) + } + + _, mems, err := cgroupmgr.GetEffectiveCPUSetWithAbsolutePath(r.cgroupPath) + if err != nil { + result.Reason = fmt.Sprintf("Get effective CPUSet with absolute path(%s) failed: %v", r.cgroupPath, err) + return result, fmt.Errorf(result.Reason) + } + + var reclaimed uint64 + remaining := reclaimInfo.ReclaimTarget + start := time.Now() + + for remaining > 0 { + step := general.MinUInt64(reclaimInfo.SingleReclaimMax, remaining) + general.InfofV(5, "Memory watermark reclaimer ready to reclaim, size: %v", step) + if err = cgroupmgr.MemoryOffloadingWithAbsolutePath(context.Background(), r.cgroupPath, int64(step), mems); err != nil { + result.Reason = fmt.Sprintf("Memory offloading with absolute path(%s) failed, err: %v", r.cgroupPath, err) + result.ReclaimedSize = reclaimed + return result, fmt.Errorf(result.Reason) + } + reclaimed += step + remaining -= step + result.ReclaimedSize = reclaimed + + memLimit, memUsage, err := GetCGroupMemoryLimitAndUsage(r.cgroupPath) + if err != nil { + result.Reason = fmt.Sprintf("Get cgroup(%s) memory limit and usage failed, err: %v", r.cgroupPath, err) + return result, fmt.Errorf(result.Reason) + } + free := memLimit - memUsage + if reachedHighWatermark(free, reclaimInfo.HighWaterMark) { + break + } + + currReclaimStats, err := r.getMemoryReclaimStats() + if err != nil { + result.Reason = fmt.Sprintf("getMemoryReclaimStats failed: %v", err) + return result, fmt.Errorf(result.Reason) + } + + // reclaim state info metrics emit + reclaimAccuracyRatio := getReclaimAccuracyRatio(originReclaimStats, currReclaimStats) + reclaimScanEfficiencyRatio := getReclaimScanEfficiencyRatio(originReclaimStats, currReclaimStats) + r.emitMetric(MetricNameUserWatermarkReclaimStats, 1, metrics.MetricTypeNameRaw, + metrics.MetricTag{Key: MetricTagKeyMemoryFree, Val: fmt.Sprintf("%v", free)}, + metrics.MetricTag{Key: MetricTagKeyMemoryPSI, Val: fmt.Sprintf("%v", currReclaimStats.memPsiAvg60)}, + metrics.MetricTag{Key: MetricTagKeyReclaimRefault, Val: fmt.Sprintf("%v", currReclaimStats.refault)}, + metrics.MetricTag{Key: MetricTagKeyReclaimAccuracyRatio, Val: fmt.Sprintf("%v", reclaimAccuracyRatio)}, + metrics.MetricTag{Key: MetricTagKeyReclaimScanEfficiencyRatio, Val: fmt.Sprintf("%v", reclaimScanEfficiencyRatio)}, + ) + + if feedbackResult, err := r.feedbackManager.FeedbackResult(originReclaimStats, currReclaimStats, r.reclaimConf, r.emitter); err != nil { + result.Reason = fmt.Sprintf("get feedback result failed: %v", err) + return result, fmt.Errorf(result.Reason) + } else if feedbackResult.Abnormal { + result.Reason = fmt.Sprintf("feedback indicates an anomaly: %v", feedbackResult.Reason) + return result, fmt.Errorf(result.Reason) + } + } + duration := time.Since(start) + // update reclaim result + result.Success = true + + r.emitMetric(MetricNameUserWatermarkReclaimCost, duration.Microseconds(), metrics.MetricTypeNameRaw, + metrics.MetricTag{Key: MetricTagKeySuccess, Val: fmt.Sprintf("%v", result.Success)}) + general.InfoS("Memory watermark reclaim finished", "podName", r.containerInfo.PodName, "containerName", + r.containerInfo.ContainerName, "cgroupPath", r.cgroupPath, "duration", duration, "reclaimedBytes", reclaimed, + "reclaimTarget", reclaimInfo.ReclaimTarget, "highWatermark", reclaimInfo.HighWaterMark) + + return result, nil +} + +func (r *userWatermarkReclaimer) GetConfig() *userwatermark.ReclaimConfigDetail { + return r.reclaimConf +} + +func (r *userWatermarkReclaimer) LoadConfig() { + userWatermarkDynamicConf := r.dynamicConf.GetDynamicConfiguration().UserWatermarkConfiguration + if userWatermarkDynamicConf == nil { + general.Warningf("Get user watermark dynamic conf is nil") + return + } + + if r.containerInfo != nil && r.containerInfo.PodUID != "" { + // load QoS level config + if helper.IsValidQosLevel(r.containerInfo.QoSLevel) { + if qosReclaimConfig, exist := userWatermarkDynamicConf.QoSLevelConfig[katalystapiconsts.QoSLevel(r.containerInfo.QoSLevel)]; exist { + r.loadConfig(qosReclaimConfig) + } + } + + // load service config + if serviceName, exist := r.containerInfo.Labels[r.serviceLabel]; exist { + if serviceReclaimConfig, exist := userWatermarkDynamicConf.ServiceConfig[serviceName]; exist { + r.loadConfig(serviceReclaimConfig) + } + } + return + } + + // load cgroup config + if cgroupReclaimConfig, exist := r.dynamicConf.GetDynamicConfiguration().UserWatermarkConfiguration.CgroupConfig[r.cgroupPath]; exist { + r.loadConfig(cgroupReclaimConfig) + } +} + +func (r *userWatermarkReclaimer) loadConfig(config *userwatermark.ReclaimConfigDetail) { + if config == nil { + general.Warningf("Load config detail failed, config is nil, podName:%v containerName:%v cgroupPath: %s qosLevel:%v", + r.containerInfo.PodName, r.containerInfo.ContainerName, r.cgroupPath, r.containerInfo.QoSLevel) + return + } + + r.reclaimConf.EnableMemoryReclaim = config.EnableMemoryReclaim + r.reclaimConf.ReclaimInterval = config.ReclaimInterval + r.reclaimConf.SingleReclaimSize = config.SingleReclaimSize + r.reclaimConf.ScaleFactor = config.ScaleFactor + r.reclaimConf.BackoffDuration = config.BackoffDuration + r.reclaimConf.FeedbackPolicy = config.FeedbackPolicy + r.reclaimConf.ReclaimFailedThreshold = config.ReclaimFailedThreshold + r.reclaimConf.FailureFreezePeriod = config.FailureFreezePeriod + + if config.PSIPolicyConf != nil { + r.reclaimConf.PsiAvg60Threshold = config.PSIPolicyConf.PsiAvg60Threshold + } + if config.RefaultPolicyConf != nil { + r.reclaimConf.ReclaimAccuracyTarget = config.RefaultPolicyConf.ReclaimAccuracyTarget + r.reclaimConf.ReclaimScanEfficiencyTarget = config.RefaultPolicyConf.ReclaimScanEfficiencyTarget + } + + general.InfofV(5, "Load config detail success, podName:%v containerName:%v cgroupPath: %s conf:%v", + r.containerInfo.PodName, r.containerInfo.ContainerName, r.cgroupPath, r.reclaimConf) +} + +func (r *userWatermarkReclaimer) getMemoryReclaimStats() (ReclaimStats, error) { + reclaimStats := &ReclaimStats{} + var err error + getCgroupMetrics := func(metaserver *metaserver.MetaServer, absPath string) error { + relativePath, err := filepath.Rel(common.CgroupFSMountPoint, absPath) + if err != nil { + return err + } + // make sure the relative path with prefix '/' has already been added to GeneralRelativeCgroupPaths, + // otherwise the MalachiteMetricsProvisioner will not fetch and store the metrics for these cgroup paths. + relativePath = "/" + relativePath + psiAvg60, err := metaserver.GetCgroupMetric(relativePath, consts.MetricMemPsiAvg60Cgroup) + if err != nil { + return err + } + pgsteal, err := metaserver.GetCgroupMetric(relativePath, consts.MetricMemPgstealCgroup) + if err != nil { + return err + } + pgscan, err := metaserver.GetCgroupMetric(relativePath, consts.MetricMemPgscanCgroup) + if err != nil { + return err + } + refault, err := metaserver.GetCgroupMetric(relativePath, consts.MetricMemWorkingsetRefaultCgroup) + if err != nil { + return err + } + refaultActivate, err := metaserver.GetCgroupMetric(relativePath, consts.MetricMemWorkingsetActivateCgroup) + if err != nil { + return err + } + memUsage, err := metaserver.GetCgroupMetric(relativePath, consts.MetricMemUsageCgroup) + if err != nil { + return err + } + memInactiveAnon, err := metaserver.GetCgroupMetric(relativePath, consts.MetricMemInactiveAnonCgroup) + if err != nil { + return err + } + memInactiveFile, err := metaserver.GetCgroupMetric(relativePath, consts.MetricMemInactiveFileCgroup) + if err != nil { + return err + } + memCache, err := metaserver.GetCgroupMetric(relativePath, consts.MetricMemCacheCgroup) + if err != nil { + return err + } + memMappedFile, err := metaserver.GetCgroupMetric(relativePath, consts.MetricMemMappedCgroup) + if err != nil { + return err + } + reclaimStats.memUsage = memUsage.Value + reclaimStats.memInactive = memInactiveFile.Value + memInactiveAnon.Value + reclaimStats.memPsiAvg60 = psiAvg60.Value + reclaimStats.pgsteal = pgsteal.Value + reclaimStats.pgscan = pgscan.Value + reclaimStats.refault = refault.Value + reclaimStats.refaultActivate = refaultActivate.Value + reclaimStats.cache = memCache.Value + reclaimStats.mapped = memMappedFile.Value + general.Infof("Memory Usage of Cgroup %s, memUsage: %v, cache: %v, mapped: %v", r.cgroupPath, memUsage.Value, memCache.Value, memMappedFile.Value) + return nil + } + getContainerMetrics := func(metaserver *metaserver.MetaServer, podUID string, containerName string) error { + psiAvg60, err := metaserver.GetContainerMetric(podUID, containerName, consts.MetricMemPsiAvg60Container) + if err != nil { + return err + } + pgsteal, err := metaserver.GetContainerMetric(podUID, containerName, consts.MetricMemPgstealContainer) + if err != nil { + return err + } + pgscan, err := metaserver.GetContainerMetric(podUID, containerName, consts.MetricMemPgscanContainer) + if err != nil { + return err + } + refault, err := metaserver.GetContainerMetric(podUID, containerName, consts.MetricMemWorkingsetRefaultContainer) + if err != nil { + return err + } + refaultActivate, err := metaserver.GetContainerMetric(podUID, containerName, consts.MetricMemWorkingsetActivateContainer) + if err != nil { + return err + } + memUsage, err := metaserver.GetContainerMetric(podUID, containerName, consts.MetricMemUsageContainer) + if err != nil { + return err + } + memInactiveAnon, err := metaserver.GetContainerMetric(podUID, containerName, consts.MetricMemInactiveAnonContainer) + if err != nil { + return err + } + memInactiveFile, err := metaserver.GetContainerMetric(podUID, containerName, consts.MetricMemInactiveFileContainer) + if err != nil { + return err + } + memCache, err := metaserver.GetContainerMetric(podUID, containerName, consts.MetricMemCacheContainer) + if err != nil { + return err + } + memMappedFile, err := metaserver.GetContainerMetric(podUID, containerName, consts.MetricMemMappedContainer) + if err != nil { + return err + } + reclaimStats.memUsage = memUsage.Value + reclaimStats.memInactive = memInactiveFile.Value + memInactiveAnon.Value + reclaimStats.memPsiAvg60 = psiAvg60.Value + reclaimStats.pgsteal = pgsteal.Value + reclaimStats.pgscan = pgscan.Value + reclaimStats.refault = refault.Value + reclaimStats.refaultActivate = refaultActivate.Value + reclaimStats.cache = memCache.Value + reclaimStats.mapped = memMappedFile.Value + general.Infof("Memory Usage of Pod %v, Container %v, memUsage: %v, cache: %v, mapped: %v", podUID, containerName, memUsage.Value, memCache.Value, memMappedFile.Value) + return nil + } + + // if it's a container, get it from the container metric, otherwise from the cgroup metric + if r.containerInfo != nil && r.containerInfo.PodUID != "" { + err = getContainerMetrics(r.metaServer, r.containerInfo.PodUID, r.containerInfo.ContainerName) + reclaimStats.obj = strings.Join([]string{r.containerInfo.PodNamespace, r.containerInfo.PodName}, "/") + reclaimStats.qosLevel = r.containerInfo.QoSLevel + } else { + err = getCgroupMetrics(r.metaServer, r.cgroupPath) + reclaimStats.obj = r.cgroupPath + reclaimStats.qosLevel = r.cgroupPath + } + + return *reclaimStats, err +} + +func (r *userWatermarkReclaimer) emitMetric(metricName string, val int64, emitType metrics.MetricTypeName, tags ...metrics.MetricTag) { + baseTags := []metrics.MetricTag{ + {Key: MetricTagKeyCGroupPath, Val: r.cgroupPath}, + } + if r.containerInfo != nil { + baseTags = append(baseTags, metrics.MetricTag{Key: MetricTagKeyPodName, Val: r.containerInfo.PodName}) + baseTags = append(baseTags, metrics.MetricTag{Key: MetricTagKeyContainerName, Val: r.containerInfo.ContainerName}) + } + tags = append(baseTags, tags...) + + _ = r.emitter.StoreInt64(metricName, val, emitType, tags...) +} + +func GetContainerCgroupPath(podUID, containerId string) (string, error) { + return common.GetContainerAbsCgroupPath(common.CgroupSubsysMemory, podUID, containerId) +} + +func GetCGroupMemoryLimitAndUsage(cgroupPath string) (uint64, uint64, error) { + ms, err := cgroupmgr.GetMemoryWithAbsolutePath(cgroupPath) + if err != nil || ms == nil { + return 0, 0, fmt.Errorf("failed to get memory usage for cgroup path %s: %v", cgroupPath, err) + } + + return ms.Limit, ms.Usage, nil +} + +func GetCGroupMemoryStats(cgroupPath string) (common.MemoryStats, error) { + stats := common.MemoryStats{} + ms, err := cgroupmgr.GetMemoryStatsWithAbsolutePath(cgroupPath) + if err != nil || ms == nil { + return common.MemoryStats{}, fmt.Errorf("failed to get memory usage for cgroup path %s: %v", cgroupPath, err) + } + + stats.Limit = ms.Limit + stats.Usage = ms.Usage + stats.FileCache = ms.FileCache + stats.ActiveFile = ms.ActiveFile + stats.ActiveAnno = ms.ActiveAnno + stats.InactiveFile = ms.InactiveFile + stats.InactiveAnno = ms.InactiveAnno + + return stats, nil +} + +func reachedHighWatermark(free, high uint64) bool { + return high > 0 && free >= high +} diff --git a/pkg/agent/qrm-plugins/memory/dynamicpolicy/userwatermark/reclaimer_test.go b/pkg/agent/qrm-plugins/memory/dynamicpolicy/userwatermark/reclaimer_test.go new file mode 100644 index 0000000000..32d1f8b01d --- /dev/null +++ b/pkg/agent/qrm-plugins/memory/dynamicpolicy/userwatermark/reclaimer_test.go @@ -0,0 +1,465 @@ +/* +Copyright 2022 The Katalyst Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package userwatermark + +import ( + "context" + "fmt" + "sync" + "testing" + "time" + + "github.com/bytedance/mockey" + "github.com/stretchr/testify/assert" + + katalystapiconsts "github.com/kubewharf/katalyst-api/pkg/consts" + "github.com/kubewharf/katalyst-core/pkg/agent/sysadvisor/types" + dynamicconfig "github.com/kubewharf/katalyst-core/pkg/config/agent/dynamic" + userwmconfig "github.com/kubewharf/katalyst-core/pkg/config/agent/dynamic/userwatermark" + "github.com/kubewharf/katalyst-core/pkg/metaserver" + "github.com/kubewharf/katalyst-core/pkg/metrics" + "github.com/kubewharf/katalyst-core/pkg/util/cgroup/common" + cgroupmgr "github.com/kubewharf/katalyst-core/pkg/util/cgroup/manager" + "github.com/kubewharf/katalyst-core/pkg/util/machine" +) + +var reclaimMutex = sync.Mutex{} + +func newTestDynamicConf() *dynamicconfig.DynamicAgentConfiguration { + // NewDynamicAgentConfiguration already initializes a non-nil + // UserWatermarkConfiguration via NewConfiguration(). + return dynamicconfig.NewDynamicAgentConfiguration() +} + +func TestNewUserWatermarkReclaimer_Defaults(t *testing.T) { + t.Parallel() + + dynamicConf := newTestDynamicConf() + cfg := dynamicConf.GetDynamicConfiguration() + cfg.UserWatermarkConfiguration.ServiceLabel = "service-label" + + instance := ReclaimInstance{CgroupPath: "/sys/fs/cgroup/memory/test"} + + r := NewUserWatermarkReclaimer(instance, (*metaserver.MetaServer)(nil), metrics.DummyMetrics{}, dynamicConf) + assert.NotNil(t, r) + assert.Equal(t, "/sys/fs/cgroup/memory/test", r.cgroupPath) + assert.Equal(t, "service-label", r.serviceLabel) + assert.NotNil(t, r.containerInfo) + assert.NotNil(t, r.reclaimConf) + assert.NotNil(t, r.feedbackManager) +} + +func TestNewUserWatermarkReclaimer_WithContainerInfo(t *testing.T) { + t.Parallel() + + dynamicConf := newTestDynamicConf() + cfg := dynamicConf.GetDynamicConfiguration() + cfg.UserWatermarkConfiguration.ServiceLabel = "svc" + + cInfo := &types.ContainerInfo{ + PodUID: "pod-uid-1", + PodNamespace: "default", + PodName: "pod-1", + ContainerName: "c1", + QoSLevel: string(katalystapiconsts.QoSLevelSharedCores), + Labels: map[string]string{"svc": "service-a"}, + } + + instance := ReclaimInstance{ + ContainerInfo: cInfo, + CgroupPath: "/sys/fs/cgroup/memory/test", + } + + r := NewUserWatermarkReclaimer(instance, (*metaserver.MetaServer)(nil), metrics.DummyMetrics{}, dynamicConf) + assert.Equal(t, cInfo, r.containerInfo) +} + +func TestUserWatermarkReclaimer_GetConfigAndLoadConfig_QoSAndService(t *testing.T) { + t.Parallel() + + dynamicConf := newTestDynamicConf() + cfg := dynamicConf.GetDynamicConfiguration() + + serviceLabel := "service-key" + serviceName := "svc-a" + cfg.UserWatermarkConfiguration.ServiceLabel = serviceLabel + + // prepare default & per-qos/service configs + defaultConf := cfg.UserWatermarkConfiguration.DefaultConfig + qosConf := userwmconfig.NewReclaimConfigDetail(defaultConf) + qosConf.EnableMemoryReclaim = true + qosConf.ScaleFactor = 200 + + serviceConf := userwmconfig.NewReclaimConfigDetail(defaultConf) + serviceConf.BackoffDuration = 10 * time.Second + serviceConf.PSIPolicyConf = &userwmconfig.PSIPolicyConf{PsiAvg60Threshold: 0.5} + serviceConf.RefaultPolicyConf = &userwmconfig.RefaultPolicyConf{ + ReclaimAccuracyTarget: 0.8, + ReclaimScanEfficiencyTarget: 0.7, + } + + cfg.UserWatermarkConfiguration.QoSLevelConfig[katalystapiconsts.QoSLevelSharedCores] = qosConf + cfg.UserWatermarkConfiguration.ServiceConfig[serviceName] = serviceConf + + cInfo := &types.ContainerInfo{ + PodUID: "pod-uid-1", + PodNamespace: "default", + PodName: "pod-1", + ContainerName: "c1", + QoSLevel: string(katalystapiconsts.QoSLevelSharedCores), + Labels: map[string]string{serviceLabel: serviceName}, + } + + instance := ReclaimInstance{ + ContainerInfo: cInfo, + CgroupPath: "/sys/fs/cgroup/memory/test", + } + + r := NewUserWatermarkReclaimer(instance, (*metaserver.MetaServer)(nil), metrics.DummyMetrics{}, dynamicConf) + // Load QoS + service specific config: service-level config overrides qos-level + r.LoadConfig() + + conf := r.GetConfig() + assert.True(t, conf.EnableMemoryReclaim) + // ScaleFactor should follow service config (which currently keeps default value). + assert.Equal(t, serviceConf.ScaleFactor, conf.ScaleFactor) + assert.Equal(t, serviceConf.BackoffDuration, conf.BackoffDuration) + assert.Equal(t, serviceConf.PsiAvg60Threshold, conf.PsiAvg60Threshold) + assert.Equal(t, serviceConf.RefaultPolicyConf.ReclaimAccuracyTarget, conf.ReclaimAccuracyTarget) + assert.Equal(t, serviceConf.RefaultPolicyConf.ReclaimScanEfficiencyTarget, conf.ReclaimScanEfficiencyTarget) +} + +func TestUserWatermarkReclaimer_LoadConfig_NilDynamicConf(t *testing.T) { + t.Parallel() + + // construct a DynamicAgentConfiguration whose Configuration has nil UserWatermarkConfiguration + dynamicConf := dynamicconfig.NewDynamicAgentConfiguration() + dynamicConf.SetDynamicConfiguration(&dynamicconfig.Configuration{}) + + // build reclaimer manually to avoid panic in NewUserWatermarkReclaimer + defaultConf := userwmconfig.NewUserWatermarkDefaultConfiguration() + r := &userWatermarkReclaimer{ + dynamicConf: dynamicConf, + reclaimConf: userwmconfig.NewReclaimConfigDetail(defaultConf), + } + + before := *r.reclaimConf + + r.LoadConfig() + after := *r.reclaimConf + + assert.Equal(t, before, after, "LoadConfig should be noop when dynamic conf is nil") +} + +func TestUserWatermarkReclaimer_LoadConfig_CgroupConfig(t *testing.T) { + t.Parallel() + + dynamicConf := newTestDynamicConf() + cfg := dynamicConf.GetDynamicConfiguration() + + cgroupPath := "/sys/fs/cgroup/memory/test-cgroup" + defaultConf := cfg.UserWatermarkConfiguration.DefaultConfig + cgConf := userwmconfig.NewReclaimConfigDetail(defaultConf) + cgConf.ScaleFactor = 300 + cgConf.ReclaimFailedThreshold = 10 + + cfg.UserWatermarkConfiguration.CgroupConfig[cgroupPath] = cgConf + + instance := ReclaimInstance{CgroupPath: cgroupPath} + + r := NewUserWatermarkReclaimer(instance, (*metaserver.MetaServer)(nil), metrics.DummyMetrics{}, dynamicConf) + r.LoadConfig() + + conf := r.GetConfig() + assert.Equal(t, uint64(300), conf.ScaleFactor) + assert.Equal(t, uint64(10), conf.ReclaimFailedThreshold) +} + +func TestUserWatermarkReclaimer_loadConfig_NilConfig(t *testing.T) { + t.Parallel() + + dynamicConf := newTestDynamicConf() + instance := ReclaimInstance{CgroupPath: "/sys/fs/cgroup/memory/test"} + r := NewUserWatermarkReclaimer(instance, (*metaserver.MetaServer)(nil), metrics.DummyMetrics{}, dynamicConf) + + before := *r.reclaimConf + r.loadConfig(nil) + after := *r.reclaimConf + + assert.Equal(t, before, after) +} + +func TestGetContainerCgroupPath_SuccessAndError(t *testing.T) { + t.Parallel() + + t.Run("success", func(t *testing.T) { + t.Parallel() + reclaimMutex.Lock() + defer reclaimMutex.Unlock() + + defer mockey.UnPatchAll() + + expected := "/sys/fs/cgroup/memory/pod/container" + mockey.Mock(common.GetContainerAbsCgroupPath).Return(expected, nil).Build() + + path, err := GetContainerCgroupPath("pod", "container") + assert.NoError(t, err) + assert.Equal(t, expected, path) + }) + + t.Run("error", func(t *testing.T) { + t.Parallel() + reclaimMutex.Lock() + defer reclaimMutex.Unlock() + + defer mockey.UnPatchAll() + + mockErr := fmt.Errorf("cgroup error") + mockey.Mock(common.GetContainerAbsCgroupPath).Return("", mockErr).Build() + + path, err := GetContainerCgroupPath("pod", "container") + assert.Error(t, err) + assert.Empty(t, path) + }) +} + +func TestGetCGroupMemoryLimitAndUsage(t *testing.T) { + t.Parallel() + + t.Run("success", func(t *testing.T) { + t.Parallel() + reclaimMutex.Lock() + defer reclaimMutex.Unlock() + + defer mockey.UnPatchAll() + + mockStats := &common.MemoryStats{Limit: 2048, Usage: 1024} + mockey.Mock(cgroupmgr.GetMemoryWithAbsolutePath).Return(mockStats, nil).Build() + + limit, usage, err := GetCGroupMemoryLimitAndUsage("/sys/fs/cgroup/memory/test") + assert.NoError(t, err) + assert.Equal(t, uint64(2048), limit) + assert.Equal(t, uint64(1024), usage) + }) + + t.Run("error", func(t *testing.T) { + t.Parallel() + reclaimMutex.Lock() + defer reclaimMutex.Unlock() + + defer mockey.UnPatchAll() + + mockey.Mock(cgroupmgr.GetMemoryWithAbsolutePath).Return((*common.MemoryStats)(nil), fmt.Errorf("read error")).Build() + + limit, usage, err := GetCGroupMemoryLimitAndUsage("/sys/fs/cgroup/memory/test") + assert.Error(t, err) + assert.Equal(t, uint64(0), limit) + assert.Equal(t, uint64(0), usage) + }) + + t.Run("nilStats", func(t *testing.T) { + t.Parallel() + reclaimMutex.Lock() + defer reclaimMutex.Unlock() + + defer mockey.UnPatchAll() + + mockey.Mock(cgroupmgr.GetMemoryWithAbsolutePath).Return((*common.MemoryStats)(nil), nil).Build() + + limit, usage, err := GetCGroupMemoryLimitAndUsage("/sys/fs/cgroup/memory/test") + assert.Error(t, err) + assert.Equal(t, uint64(0), limit) + assert.Equal(t, uint64(0), usage) + }) +} + +func TestGetCGroupMemoryStats(t *testing.T) { + t.Parallel() + + t.Run("success", func(t *testing.T) { + t.Parallel() + reclaimMutex.Lock() + defer reclaimMutex.Unlock() + + defer mockey.UnPatchAll() + + mockStats := &common.MemoryStats{ + Limit: 4096, + Usage: 2048, + FileCache: 100, + ActiveFile: 10, + ActiveAnno: 20, + InactiveFile: 30, + InactiveAnno: 40, + } + mockey.Mock(cgroupmgr.GetMemoryStatsWithAbsolutePath).Return(mockStats, nil).Build() + + stats, err := GetCGroupMemoryStats("/sys/fs/cgroup/memory/test") + assert.NoError(t, err) + assert.Equal(t, uint64(4096), stats.Limit) + assert.Equal(t, uint64(2048), stats.Usage) + assert.Equal(t, mockStats.FileCache, stats.FileCache) + assert.Equal(t, mockStats.ActiveFile, stats.ActiveFile) + assert.Equal(t, mockStats.ActiveAnno, stats.ActiveAnno) + assert.Equal(t, mockStats.InactiveFile, stats.InactiveFile) + assert.Equal(t, mockStats.InactiveAnno, stats.InactiveAnno) + }) + + t.Run("error", func(t *testing.T) { + t.Parallel() + reclaimMutex.Lock() + defer reclaimMutex.Unlock() + + defer mockey.UnPatchAll() + + mockey.Mock(cgroupmgr.GetMemoryStatsWithAbsolutePath).Return((*common.MemoryStats)(nil), fmt.Errorf("read error")).Build() + + stats, err := GetCGroupMemoryStats("/sys/fs/cgroup/memory/test") + assert.Error(t, err) + assert.Equal(t, uint64(0), stats.Limit) + assert.Equal(t, uint64(0), stats.Usage) + assert.Equal(t, uint64(0), stats.FileCache) + assert.Equal(t, uint64(0), stats.ActiveFile) + assert.Equal(t, uint64(0), stats.ActiveAnno) + assert.Equal(t, uint64(0), stats.InactiveFile) + assert.Equal(t, uint64(0), stats.InactiveAnno) + }) +} + +func TestReachedHighWatermark(t *testing.T) { + t.Parallel() + + assert.False(t, reachedHighWatermark(10, 0)) + assert.False(t, reachedHighWatermark(10, 20)) + assert.True(t, reachedHighWatermark(30, 10)) +} + +func TestUserWatermarkReclaimer_ReclaimSuccess(t *testing.T) { + t.Parallel() + calculatorMutex.Lock() + defer calculatorMutex.Unlock() + managerMutex.Lock() + defer managerMutex.Unlock() + reclaimMutex.Lock() + defer reclaimMutex.Unlock() + + defer mockey.UnPatchAll() + + dynamicConf := newTestDynamicConf() + defaultConf := dynamicConf.GetDynamicConfiguration().UserWatermarkConfiguration.DefaultConfig + + r := &userWatermarkReclaimer{ + stopCh: make(chan struct{}), + emitter: metrics.DummyMetrics{}, + dynamicConf: dynamicConf, + cgroupPath: "/sys/fs/cgroup/memory/test", + containerInfo: &types.ContainerInfo{ + PodName: "pod-1", + ContainerName: "c1", + }, + reclaimConf: userwmconfig.NewReclaimConfigDetail(defaultConf), + feedbackManager: NewFeedbackManager(), + } + + // origin and current reclaim stats + callCount := 0 + mockey.Mock((*userWatermarkReclaimer).getMemoryReclaimStats). + To(func(_ *userWatermarkReclaimer) (ReclaimStats, error) { + callCount++ + switch callCount { + case 1: + return ReclaimStats{pgsteal: 100, pgscan: 200}, nil + default: + return ReclaimStats{pgsteal: 200, pgscan: 400, refaultActivate: 10}, nil + } + }).Build() + + mockey.Mock(cgroupmgr.GetEffectiveCPUSetWithAbsolutePath). + Return(machine.NewCPUSet(0), machine.NewCPUSet(0), nil).Build() + + var offloadCalls []int64 + mockey.Mock(cgroupmgr.MemoryOffloadingWithAbsolutePath). + To(func(_ context.Context, path string, nbytes int64, _ machine.CPUSet) error { + offloadCalls = append(offloadCalls, nbytes) + return nil + }).Build() + + memCall := 0 + mockey.Mock(GetCGroupMemoryLimitAndUsage). + To(func(_ string) (uint64, uint64, error) { + memCall++ + switch memCall { + case 1: + return 200, 190, nil // free 10 < high watermark + default: + return 200, 150, nil // free 50 >= high watermark + } + }).Build() + + mockey.Mock((*FeedbackManager).FeedbackResult). + Return(FeedbackResult{Abnormal: false}, nil).Build() + + reclaimInfo := &ReclaimInfo{ + CGroupPath: r.cgroupPath, + LowWaterMark: 10, + HighWaterMark: 30, + ReclaimTarget: 100, + SingleReclaimMax: 60, + } + + result, err := r.Reclaim(reclaimInfo) + assert.NoError(t, err) + assert.True(t, result.Success) + assert.Equal(t, uint64(100), result.ReclaimedSize) + assert.Equal(t, []int64{60, 40}, offloadCalls) +} + +func TestUserWatermarkReclaimer_ReclaimGetStatsFailed(t *testing.T) { + t.Parallel() + reclaimMutex.Lock() + defer reclaimMutex.Unlock() + + defer mockey.UnPatchAll() + + dynamicConf := newTestDynamicConf() + defaultConf := dynamicConf.GetDynamicConfiguration().UserWatermarkConfiguration.DefaultConfig + + r := &userWatermarkReclaimer{ + stopCh: make(chan struct{}), + emitter: metrics.DummyMetrics{}, + dynamicConf: dynamicConf, + cgroupPath: "/sys/fs/cgroup/memory/test", + reclaimConf: userwmconfig.NewReclaimConfigDetail(defaultConf), + } + + mockErr := fmt.Errorf("stats error") + mockey.Mock((*userWatermarkReclaimer).getMemoryReclaimStats). + Return(ReclaimStats{}, mockErr).Build() + + reclaimInfo := &ReclaimInfo{ + CGroupPath: r.cgroupPath, + LowWaterMark: 10, + HighWaterMark: 30, + ReclaimTarget: 64, + SingleReclaimMax: 32, + } + + result, err := r.Reclaim(reclaimInfo) + assert.Error(t, err) + assert.Contains(t, result.Reason, "Get memory reclaimStats failed") + assert.Equal(t, uint64(0), result.ReclaimedSize) +} diff --git a/pkg/config/agent/dynamic/crd/dynamic_crd.go b/pkg/config/agent/dynamic/crd/dynamic_crd.go index e8b16e7a57..1ef734282f 100644 --- a/pkg/config/agent/dynamic/crd/dynamic_crd.go +++ b/pkg/config/agent/dynamic/crd/dynamic_crd.go @@ -37,6 +37,7 @@ type DynamicConfigCRD struct { TransparentMemoryOffloadingConfiguration *v1alpha1.TransparentMemoryOffloadingConfiguration StrategyGroup *v1alpha1.StrategyGroup IRQTuningConfiguration *v1alpha1.IRQTuningConfiguration + UserWatermarkConfiguration *v1alpha1.UserWatermarkConfiguration } var ( @@ -50,4 +51,6 @@ var ( StrategyGroupGVR = metav1.GroupVersionResource(v1alpha1.SchemeGroupVersion.WithResource(v1alpha1.ResourceNameStrategyGroups)) // IRQTuningConfigurationGVR is the group version resource for IRQTuningConfiguration IRQTuningConfigurationGVR = metav1.GroupVersionResource(v1alpha1.SchemeGroupVersion.WithResource(v1alpha1.ResourceNameIRQTuningConfigurations)) + // UserWatermarkConfigurationGVR is the group version resource for UserWatermarkConfiguration + UserWatermarkConfigurationGVR = metav1.GroupVersionResource(v1alpha1.SchemeGroupVersion.WithResource(v1alpha1.ResourceNameUserWatermarkConfigurations)) ) diff --git a/pkg/config/agent/dynamic/dynamic_base.go b/pkg/config/agent/dynamic/dynamic_base.go index 5c72188938..5e80ff419e 100644 --- a/pkg/config/agent/dynamic/dynamic_base.go +++ b/pkg/config/agent/dynamic/dynamic_base.go @@ -26,6 +26,7 @@ import ( "github.com/kubewharf/katalyst-core/pkg/config/agent/dynamic/metricthreshold" "github.com/kubewharf/katalyst-core/pkg/config/agent/dynamic/strategygroup" "github.com/kubewharf/katalyst-core/pkg/config/agent/dynamic/tmo" + "github.com/kubewharf/katalyst-core/pkg/config/agent/dynamic/userwatermark" ) type DynamicAgentConfiguration struct { @@ -60,6 +61,7 @@ type Configuration struct { *strategygroup.StrategyGroupConfiguration *metricthreshold.MetricThresholdConfiguration *irqtuning.IRQTuningConfiguration + *userwatermark.UserWatermarkConfiguration } func NewConfiguration() *Configuration { @@ -70,6 +72,7 @@ func NewConfiguration() *Configuration { StrategyGroupConfiguration: strategygroup.NewStrategyGroupConfiguration(), MetricThresholdConfiguration: metricthreshold.NewMetricThresholdConfiguration(), IRQTuningConfiguration: irqtuning.NewIRQTuningConfiguration(), + UserWatermarkConfiguration: userwatermark.NewUserWatermarkConfiguration(), } } @@ -80,4 +83,5 @@ func (c *Configuration) ApplyConfiguration(conf *crd.DynamicConfigCRD) { c.StrategyGroupConfiguration.ApplyConfiguration(conf) c.MetricThresholdConfiguration.ApplyConfiguration(conf) c.IRQTuningConfiguration.ApplyConfiguration(conf) + c.UserWatermarkConfiguration.ApplyConfiguration(conf) } diff --git a/pkg/config/agent/dynamic/userwatermark/user_watermark_base.go b/pkg/config/agent/dynamic/userwatermark/user_watermark_base.go new file mode 100644 index 0000000000..089cade9cb --- /dev/null +++ b/pkg/config/agent/dynamic/userwatermark/user_watermark_base.go @@ -0,0 +1,217 @@ +/* +Copyright 2022 The Katalyst Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package userwatermark + +import ( + "time" + + "github.com/kubewharf/katalyst-api/pkg/apis/config/v1alpha1" + "github.com/kubewharf/katalyst-api/pkg/consts" + "github.com/kubewharf/katalyst-core/pkg/config/agent/dynamic/crd" +) + +const ( + DefaultReconcileInterval = 1 + DefaultBackoffDuration = 3 * time.Second + DefaultSingleReclaimSize = 1 * 1024 * 1024 * 1024 + DefaultScaleFactor = 100 + DefaultSingleReclaimFactor = 0.5 + DefaultReclaimFailedThreshold = 3 + DefaultFailureFreezePeriod = 5 * time.Second +) + +type UserWatermarkDefaultConfiguration struct { + EnableMemoryReclaim bool + ReclaimInterval int64 + + ScaleFactor uint64 + SingleReclaimFactor float64 + // SingleReclaimSize is the max memory reclaim size in one reclaim cycle + SingleReclaimSize uint64 + + BackoffDuration time.Duration + FeedbackPolicy v1alpha1.UserWatermarkPolicyName + ReclaimFailedThreshold uint64 + FailureFreezePeriod time.Duration + + PsiAvg60Threshold float64 + ReclaimAccuracyTarget float64 + ReclaimScanEfficiencyTarget float64 +} + +func NewUserWatermarkDefaultConfiguration() *UserWatermarkDefaultConfiguration { + return &UserWatermarkDefaultConfiguration{ + EnableMemoryReclaim: true, + ReclaimInterval: DefaultReconcileInterval, + ScaleFactor: DefaultScaleFactor, + SingleReclaimFactor: DefaultSingleReclaimFactor, + SingleReclaimSize: DefaultSingleReclaimSize, + BackoffDuration: DefaultBackoffDuration, + FeedbackPolicy: v1alpha1.UserWatermarkPolicyNameIntegrated, + ReclaimFailedThreshold: DefaultReclaimFailedThreshold, + FailureFreezePeriod: DefaultFailureFreezePeriod, + } +} + +type PSIPolicyConf struct { + PsiAvg60Threshold float64 +} + +type RefaultPolicyConf struct { + ReclaimAccuracyTarget float64 + ReclaimScanEfficiencyTarget float64 +} + +type ReclaimConfigDetail struct { + EnableMemoryReclaim bool + ReclaimInterval int64 + + ScaleFactor uint64 + SingleReclaimFactor float64 + // SingleReclaimSize is the max memory reclaim size in one reclaim cycle + SingleReclaimSize uint64 + + BackoffDuration time.Duration + FeedbackPolicy v1alpha1.UserWatermarkPolicyName + + ReclaimFailedThreshold uint64 + FailureFreezePeriod time.Duration + + *PSIPolicyConf + *RefaultPolicyConf +} + +func NewReclaimConfigDetail(defaultConfigs *UserWatermarkDefaultConfiguration) *ReclaimConfigDetail { + detail := &ReclaimConfigDetail{ + EnableMemoryReclaim: defaultConfigs.EnableMemoryReclaim, + ReclaimInterval: defaultConfigs.ReclaimInterval, + ScaleFactor: defaultConfigs.ScaleFactor, + SingleReclaimFactor: defaultConfigs.SingleReclaimFactor, + SingleReclaimSize: defaultConfigs.SingleReclaimSize, + + BackoffDuration: defaultConfigs.BackoffDuration, + FeedbackPolicy: defaultConfigs.FeedbackPolicy, + ReclaimFailedThreshold: defaultConfigs.ReclaimFailedThreshold, + FailureFreezePeriod: defaultConfigs.FailureFreezePeriod, + PSIPolicyConf: &PSIPolicyConf{}, + RefaultPolicyConf: &RefaultPolicyConf{}, + } + detail.PSIPolicyConf.PsiAvg60Threshold = defaultConfigs.PsiAvg60Threshold + detail.RefaultPolicyConf.ReclaimAccuracyTarget = defaultConfigs.ReclaimAccuracyTarget + detail.RefaultPolicyConf.ReclaimScanEfficiencyTarget = defaultConfigs.ReclaimScanEfficiencyTarget + + return detail +} + +type UserWatermarkConfiguration struct { + EnableReclaimer bool + ReconcileInterval int64 + ServiceLabel string + DefaultConfig *UserWatermarkDefaultConfiguration + ServiceConfig map[string]*ReclaimConfigDetail + QoSLevelConfig map[consts.QoSLevel]*ReclaimConfigDetail + CgroupConfig map[string]*ReclaimConfigDetail +} + +func NewUserWatermarkConfiguration() *UserWatermarkConfiguration { + return &UserWatermarkConfiguration{ + EnableReclaimer: false, + ReconcileInterval: DefaultReconcileInterval, + DefaultConfig: NewUserWatermarkDefaultConfiguration(), + ServiceConfig: map[string]*ReclaimConfigDetail{}, + QoSLevelConfig: map[consts.QoSLevel]*ReclaimConfigDetail{}, + CgroupConfig: map[string]*ReclaimConfigDetail{}, + } +} + +func ApplyReclaimConfigDetail(detail *ReclaimConfigDetail, configDetail v1alpha1.ReclaimConfigDetail) { + if configDetail.EnableMemoryReclaim != nil { + detail.EnableMemoryReclaim = *configDetail.EnableMemoryReclaim + } + if configDetail.ReclaimInterval != nil { + detail.ReclaimInterval = *configDetail.ReclaimInterval + } + if configDetail.ScaleFactor != nil { + detail.ScaleFactor = *configDetail.ScaleFactor + } + if configDetail.SingleReclaimFactor != nil { + detail.SingleReclaimFactor = *configDetail.SingleReclaimFactor + } + if configDetail.SingleReclaimSize != nil { + detail.SingleReclaimSize = *configDetail.SingleReclaimSize + } + if configDetail.BackoffDuration != nil { + detail.BackoffDuration = configDetail.BackoffDuration.Duration + } + if configDetail.ReclaimFailedThreshold != nil { + detail.ReclaimFailedThreshold = *configDetail.ReclaimFailedThreshold + } + if configDetail.FailureFreezePeriod != nil { + detail.FailureFreezePeriod = configDetail.FailureFreezePeriod.Duration + } + + if psiPolicyConfDynamic := configDetail.PSIPolicyConf; psiPolicyConfDynamic != nil { + if psiPolicyConfDynamic.PSIAvg60Threshold != nil { + detail.PSIPolicyConf.PsiAvg60Threshold = *psiPolicyConfDynamic.PSIAvg60Threshold + } + } + if refaultPolicyConfDynamic := configDetail.RefaultPolicConf; refaultPolicyConfDynamic != nil { + if refaultPolicyConfDynamic.ReclaimAccuracyTarget != nil { + detail.RefaultPolicyConf.ReclaimAccuracyTarget = *refaultPolicyConfDynamic.ReclaimAccuracyTarget + } + if refaultPolicyConfDynamic.ReclaimScanEfficiencyTarget != nil { + detail.RefaultPolicyConf.ReclaimScanEfficiencyTarget = *refaultPolicyConfDynamic.ReclaimScanEfficiencyTarget + } + } +} + +func (c *UserWatermarkConfiguration) ApplyConfiguration(conf *crd.DynamicConfigCRD) { + if uwc := conf.UserWatermarkConfiguration; uwc != nil { + if uwc.Spec.Config.EnableReclaimer != nil { + c.EnableReclaimer = *uwc.Spec.Config.EnableReclaimer + } + if uwc.Spec.Config.ReconcileInterval != nil { + c.ReconcileInterval = *uwc.Spec.Config.ReconcileInterval + } + if len(uwc.Spec.Config.ServiceLabel) > 0 { + c.ServiceLabel = uwc.Spec.Config.ServiceLabel + } + + if uwc.Spec.Config.ServiceConfig != nil { + for _, serviceConfig := range uwc.Spec.Config.ServiceConfig { + configDetail := NewReclaimConfigDetail(c.DefaultConfig) + ApplyReclaimConfigDetail(configDetail, serviceConfig.ConfigDetail) + c.ServiceConfig[serviceConfig.ServiceName] = configDetail + } + } + if uwc.Spec.Config.QoSLevelConfig != nil { + for _, qosLevelConfig := range uwc.Spec.Config.QoSLevelConfig { + configDetail := NewReclaimConfigDetail(c.DefaultConfig) + ApplyReclaimConfigDetail(configDetail, qosLevelConfig.ConfigDetail) + c.QoSLevelConfig[qosLevelConfig.QoSLevel] = configDetail + } + } + + if uwc.Spec.Config.CgroupConfig != nil { + for _, cgroupConfig := range uwc.Spec.Config.CgroupConfig { + configDetail := NewReclaimConfigDetail(c.DefaultConfig) + ApplyReclaimConfigDetail(configDetail, cgroupConfig.ConfigDetail) + c.CgroupConfig[cgroupConfig.CgroupPath] = configDetail + } + } + } +} diff --git a/pkg/config/agent/qrm/memory_plugin.go b/pkg/config/agent/qrm/memory_plugin.go index 25d936fa4b..c3211b0eb2 100644 --- a/pkg/config/agent/qrm/memory_plugin.go +++ b/pkg/config/agent/qrm/memory_plugin.go @@ -35,6 +35,8 @@ type MemoryQRMPluginConfig struct { EnableSettingMemoryMigrate bool // EnableMemoryAdvisor indicates whether to enable sys-advisor module to calculate memory resources EnableMemoryAdvisor bool + // EnableUserWatermark: enable user memory watermark + EnableUserWatermark bool // GetAdviceInterval is the interval at which we get advice from sys-advisor GetAdviceInterval time.Duration // ExtraControlKnobConfigFile: the absolute path of extra control knob config file diff --git a/pkg/util/cgroup/common/types.go b/pkg/util/cgroup/common/types.go index d92f4cef10..d0d7acd606 100644 --- a/pkg/util/cgroup/common/types.go +++ b/pkg/util/cgroup/common/types.go @@ -177,8 +177,13 @@ func (iocmd *IOCostModelData) String() string { // MemoryStats get cgroup memory data type MemoryStats struct { - Limit uint64 - Usage uint64 + Limit uint64 + Usage uint64 + FileCache uint64 + InactiveAnno uint64 + ActiveAnno uint64 + InactiveFile uint64 + ActiveFile uint64 } // CPUStats get cgroup cpu data diff --git a/pkg/util/cgroup/manager/cgroup.go b/pkg/util/cgroup/manager/cgroup.go index 3c0ce8ee72..2faf57c0b0 100644 --- a/pkg/util/cgroup/manager/cgroup.go +++ b/pkg/util/cgroup/manager/cgroup.go @@ -200,6 +200,15 @@ func GetMemoryWithAbsolutePath(absCgroupPath string) (*common.MemoryStats, error return GetManager().GetMemory(absCgroupPath) } +func GetMemoryStatsWithRelativePath(relCgroupPath string) (*common.MemoryStats, error) { + absCgroupPath := common.GetAbsCgroupPath("memory", relCgroupPath) + return GetManager().GetMemoryStats(absCgroupPath) +} + +func GetMemoryStatsWithAbsolutePath(absCgroupPath string) (*common.MemoryStats, error) { + return GetManager().GetMemoryStats(absCgroupPath) +} + func GetMemoryPressureWithAbsolutePath(absCgroupPath string, t common.PressureType) (*common.MemoryPressure, error) { return GetManager().GetMemoryPressure(absCgroupPath, t) } diff --git a/pkg/util/cgroup/manager/fake_manager.go b/pkg/util/cgroup/manager/fake_manager.go index cb646715c6..b311273628 100644 --- a/pkg/util/cgroup/manager/fake_manager.go +++ b/pkg/util/cgroup/manager/fake_manager.go @@ -56,6 +56,10 @@ func (f *FakeCgroupManager) GetMemory(absCgroupPath string) (*common.MemoryStats return nil, nil } +func (f *FakeCgroupManager) GetMemoryStats(absCgroupPath string) (*common.MemoryStats, error) { + return nil, nil +} + func (f *FakeCgroupManager) GetNumaMemory(absCgroupPath string) (map[int]*common.MemoryNumaMetrics, error) { return nil, nil } diff --git a/pkg/util/cgroup/manager/manager.go b/pkg/util/cgroup/manager/manager.go index 9adbb4b0c1..df39bd8f52 100644 --- a/pkg/util/cgroup/manager/manager.go +++ b/pkg/util/cgroup/manager/manager.go @@ -44,6 +44,7 @@ type Manager interface { ApplyUnifiedData(absCgroupPath, cgroupFileName, data string) error GetMemory(absCgroupPath string) (*common.MemoryStats, error) + GetMemoryStats(absCgroupPath string) (*common.MemoryStats, error) GetNumaMemory(absCgroupPath string) (map[int]*common.MemoryNumaMetrics, error) GetMemoryPressure(absCgroupPath string, t common.PressureType) (*common.MemoryPressure, error) GetCPU(absCgroupPath string) (*common.CPUStats, error) diff --git a/pkg/util/cgroup/manager/v1/fs_linux.go b/pkg/util/cgroup/manager/v1/fs_linux.go index 62913e961c..2add801394 100644 --- a/pkg/util/cgroup/manager/v1/fs_linux.go +++ b/pkg/util/cgroup/manager/v1/fs_linux.go @@ -246,6 +246,50 @@ func (m *manager) GetMemory(absCgroupPath string) (*common.MemoryStats, error) { return memoryStats, nil } +func (m *manager) GetMemoryStats(absCgroupPath string) (*common.MemoryStats, error) { + fileName := "memory.stat" + + memoryStats, err := m.GetMemory(absCgroupPath) + if err != nil { + return nil, fmt.Errorf("get memory limit and usage failed with error: %v", err) + } + + content, err := libcgroups.ReadFile(absCgroupPath, fileName) + if err != nil { + return nil, fmt.Errorf("failed to read %s: %w", fileName, err) + } + lines := strings.Split(content, "\n") + for _, line := range lines { + cols := strings.Fields(line) + if len(cols) != 2 { + continue + } + + key := cols[0] + valueStr := cols[1] + + value, err := strconv.ParseUint(valueStr, 10, 64) + if err != nil { + return nil, fmt.Errorf("failed to parse %s, %v", valueStr, err) + } + + switch key { + case "cache": + memoryStats.FileCache = value + case "inactive_anon": + memoryStats.InactiveAnno = value + case "active_anon": + memoryStats.ActiveAnno = value + case "inactive_file": + memoryStats.InactiveFile = value + case "active_file": + memoryStats.ActiveFile = value + } + } + + return memoryStats, nil +} + func (m *manager) GetNumaMemory(absCgroupPath string) (map[int]*common.MemoryNumaMetrics, error) { const fileName = "memory.numa_stat" content, err := libcgroups.ReadFile(absCgroupPath, fileName) diff --git a/pkg/util/cgroup/manager/v1/fs_unsupported.go b/pkg/util/cgroup/manager/v1/fs_unsupported.go index e103f81a67..c5c3438fdc 100644 --- a/pkg/util/cgroup/manager/v1/fs_unsupported.go +++ b/pkg/util/cgroup/manager/v1/fs_unsupported.go @@ -68,6 +68,10 @@ func (m *unsupportedManager) GetMemory(_ string) (*common.MemoryStats, error) { return nil, fmt.Errorf("unsupported manager v1") } +func (m *unsupportedManager) GetMemoryStats(_ string) (*common.MemoryStats, error) { + return nil, fmt.Errorf("unsupported manager v1") +} + func (m *unsupportedManager) GetNumaMemory(absCgroupPath string) (map[int]*common.MemoryNumaMetrics, error) { return nil, fmt.Errorf("unsupported manager v1") } diff --git a/pkg/util/cgroup/manager/v2/fs_linux.go b/pkg/util/cgroup/manager/v2/fs_linux.go index 7dea750c0e..ba6e12aaf6 100644 --- a/pkg/util/cgroup/manager/v2/fs_linux.go +++ b/pkg/util/cgroup/manager/v2/fs_linux.go @@ -289,6 +289,50 @@ func (m *manager) GetMemory(absCgroupPath string) (*common.MemoryStats, error) { return memoryStats, nil } +func (m *manager) GetMemoryStats(absCgroupPath string) (*common.MemoryStats, error) { + fileName := "memory.stat" + + memoryStats, err := m.GetMemory(absCgroupPath) + if err != nil { + return nil, fmt.Errorf("get memory limit and usage failed with error: %v", err) + } + + content, err := libcgroups.ReadFile(absCgroupPath, fileName) + if err != nil { + return nil, fmt.Errorf("failed to read %s: %w", fileName, err) + } + lines := strings.Split(content, "\n") + for _, line := range lines { + cols := strings.Fields(line) + if len(cols) != 2 { + continue + } + + key := cols[0] + valueStr := cols[1] + + value, err := strconv.ParseUint(valueStr, 10, 64) + if err != nil { + return nil, fmt.Errorf("failed to parse %s, %v", valueStr, err) + } + + switch key { + case "file": + memoryStats.FileCache = value + case "inactive_anon": + memoryStats.InactiveAnno = value + case "active_anon": + memoryStats.ActiveAnno = value + case "inactive_file": + memoryStats.InactiveFile = value + case "active_file": + memoryStats.ActiveFile = value + } + } + + return memoryStats, nil +} + func (m *manager) GetNumaMemory(absCgroupPath string) (map[int]*common.MemoryNumaMetrics, error) { const fileName = "memory.numa_stat" content, err := libcgroups.ReadFile(absCgroupPath, fileName) diff --git a/pkg/util/cgroup/manager/v2/fs_unsupported.go b/pkg/util/cgroup/manager/v2/fs_unsupported.go index a3dc3492b3..33bfddf146 100644 --- a/pkg/util/cgroup/manager/v2/fs_unsupported.go +++ b/pkg/util/cgroup/manager/v2/fs_unsupported.go @@ -68,6 +68,10 @@ func (m *unsupportedManager) GetMemory(_ string) (*common.MemoryStats, error) { return nil, fmt.Errorf("unsupported manager v2") } +func (m *unsupportedManager) GetMemoryStats(_ string) (*common.MemoryStats, error) { + return nil, fmt.Errorf("unsupported manager v2") +} + func (m *unsupportedManager) GetNumaMemory(absCgroupPath string) (map[int]*common.MemoryNumaMetrics, error) { return nil, fmt.Errorf("unsupported manager v2") }