Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -30,12 +30,14 @@ import (
)

const (
defaultQoSAwareSyncPeriod = 5 * time.Second
defaultQoSAwareSyncPeriod = 5 * time.Second
defaultEnableDyingMemcgReclaim = false
)

// QoSAwarePluginOptions holds the configurations for qos aware plugin.
type QoSAwarePluginOptions struct {
SyncPeriod time.Duration
SyncPeriod time.Duration
EnableDyingMemcgReclaim bool
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would it be better to put this configuration in MemoryAdvisorOptions?


*resource.ResourceAdvisorOptions
*server.QRMServerOptions
Expand All @@ -46,11 +48,12 @@ type QoSAwarePluginOptions struct {
// NewQoSAwarePluginOptions creates a new Options with a default config.
func NewQoSAwarePluginOptions() *QoSAwarePluginOptions {
return &QoSAwarePluginOptions{
SyncPeriod: defaultQoSAwareSyncPeriod,
ResourceAdvisorOptions: resource.NewResourceAdvisorOptions(),
QRMServerOptions: server.NewQRMServerOptions(),
ReporterOptions: reporter.NewReporterOptions(),
ModelOptions: model.NewModelOptions(),
SyncPeriod: defaultQoSAwareSyncPeriod,
EnableDyingMemcgReclaim: defaultEnableDyingMemcgReclaim,
ResourceAdvisorOptions: resource.NewResourceAdvisorOptions(),
QRMServerOptions: server.NewQRMServerOptions(),
ReporterOptions: reporter.NewReporterOptions(),
ModelOptions: model.NewModelOptions(),
}
}

Expand All @@ -59,6 +62,7 @@ func (o *QoSAwarePluginOptions) AddFlags(fss *cliflag.NamedFlagSets) {
fs := fss.FlagSet("qos_aware_plugin")

fs.DurationVar(&o.SyncPeriod, "qos-aware-sync-period", o.SyncPeriod, "Period for QoS aware plugin to sync")
fs.BoolVar(&o.EnableDyingMemcgReclaim, "qos-aware-enable-dying-memcg-reclaim", o.EnableDyingMemcgReclaim, "Enable to reclaim dying memcg")

o.ResourceAdvisorOptions.AddFlags(fs)
o.QRMServerOptions.AddFlags(fs)
Expand All @@ -69,6 +73,7 @@ func (o *QoSAwarePluginOptions) AddFlags(fss *cliflag.NamedFlagSets) {
// ApplyTo fills up config with options
func (o *QoSAwarePluginOptions) ApplyTo(c *qosaware.QoSAwarePluginConfiguration) error {
c.SyncPeriod = o.SyncPeriod
c.EnableDyingMemcgReclaim = o.EnableDyingMemcgReclaim

var errList []error
errList = append(errList, o.ResourceAdvisorOptions.ApplyTo(c.ResourceAdvisorConfiguration))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,18 @@ const (
ControlKnobKeyBalanceNumaMemory MemoryControlKnobName = "balance_numa_memory"
ControlKnobKeySwapMax MemoryControlKnobName = "swap_max"
ControlKnowKeyMemoryOffloading MemoryControlKnobName = "memory_offloading"
ControlKnowKeyDyingMemcgReclaim MemoryControlKnobName = "dying_memcg_reclaim"
ControlKnobKeyMemoryNUMAHeadroom MemoryControlKnobName = "memory_numa_headroom"
)

const (
KubePodsCgroupPath = "kubepods"
OfflineBestEffortPrefix = "offline-besteffort-" // follows some number, such as kubepods/offline-besteffort-0
OnlineBurstableCgroupPath = "kubepods/burstable"
)

const (
MemCgReclaimDefaultIntervalSeconds = 300
)

type MemoryNUMAHeadroom map[int]int64
3 changes: 3 additions & 0 deletions pkg/agent/qrm-plugins/memory/dynamicpolicy/policy.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@ const (
memoryPluginAsyncWorkTopicSetExtraCGMemLimit = "qrm_memory_plugin_set_extra_mem_limit"
memoryPluginAsyncWorkTopicMovePage = "qrm_memory_plugin_move_page"
memoryPluginAsyncWorkTopicMemoryOffloading = "qrm_memory_plugin_mem_offload"
memoryPluginAsyncWorkTopicDyingMemcgReclaim = "qrm_memory_plugin_dying_memcg_reclaim"

dropCacheTimeoutSeconds = 30
setExtraCGMemLimitTimeoutSeconds = 60
Expand Down Expand Up @@ -284,6 +285,8 @@ func NewDynamicPolicy(agentCtx *agent.GenericContext, conf *config.Configuration
memoryadvisor.ControlKnobHandlerWithChecker(policyImplement.handleAdvisorMemoryOffloading))
memoryadvisor.RegisterControlKnobHandler(memoryadvisor.ControlKnobKeyMemoryNUMAHeadroom,
memoryadvisor.ControlKnobHandlerWithChecker(policyImplement.handleAdvisorMemoryNUMAHeadroom))
memoryadvisor.RegisterControlKnobHandler(memoryadvisor.ControlKnowKeyDyingMemcgReclaim,
memoryadvisor.ControlKnobHandlerWithChecker(policyImplement.handleAdvisorDyingMemcgReclaim))

if policyImplement.enableEvictingLogCache {
policyImplement.logCacheEvictionManager = logcache.NewManager(conf, agentCtx.MetaServer)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -971,3 +971,64 @@ func (p *DynamicPolicy) handleAdvisorMemoryOffloading(_ *config.Configuration,
})...)
return nil
}

// handleAdvisorDyingMemcgReclaim handles dying memcg reclaim from memory-advisor
func (p *DynamicPolicy) handleAdvisorDyingMemcgReclaim(_ *config.Configuration,
_ interface{},
_ *dynamicconfig.DynamicAgentConfiguration,
emitter metrics.MetricEmitter,
metaServer *metaserver.MetaServer,
entryName, subEntryName string,
calculationInfo *advisorsvc.CalculationInfo, podResourceEntries state.PodResourceEntries,
) error {
var absCGPath string
var dyingMemcgReclaimWorkName string

if calculationInfo.CgroupPath != "" {
dyingMemcgReclaimWorkName = util.GetCgroupAsyncWorkName(calculationInfo.CgroupPath, memoryPluginAsyncWorkTopicDyingMemcgReclaim)
absCGPath = calculationInfo.CgroupPath
} else {
return fmt.Errorf("cgroup path is empty")
}

// set swap max before trigger memory offloading
swapMax := calculationInfo.CalculationResult.Values[string(memoryadvisor.ControlKnobKeySwapMax)]
if swapMax == consts.ControlKnobON {
err := cgroupmgr.SetSwapMaxWithAbsolutePathRecursive(absCGPath)
if err != nil {
general.Infof("Failed to set swap max, err: %v", err)
}
} else {
err := cgroupmgr.DisableSwapMaxWithAbsolutePathRecursive(absCGPath)
if err != nil {
general.Infof("Failed to disable swap, err: %v", err)
}
}

isEnableDyingMemcgReclaim := calculationInfo.CalculationResult.Values[string(memoryadvisor.ControlKnowKeyDyingMemcgReclaim)] == consts.ControlKnobON
if !isEnableDyingMemcgReclaim {
return nil
}

_, mems, err := cgroupmgr.GetEffectiveCPUSetWithAbsolutePath(absCGPath)
if err != nil {
return fmt.Errorf("GetEffectiveCPUSetWithAbsolutePath failed with error: %v", err)
}
general.Infof("dyingMemcgReclaimWithAbsolutePath mems: %v", mems)

// start an asynchronous work to execute dying memcg reclaim
err = p.defaultAsyncLimitedWorkers.AddWork(
&asyncworker.Work{
Name: dyingMemcgReclaimWorkName,
UID: uuid.NewUUID(),
Fn: cgroupmgr.DyingMemcgReclaimWithAbsolutePath,
Params: []interface{}{absCGPath, emitter, entryName, subEntryName, mems},
DeliveredAt: time.Now(),
}, asyncworker.DuplicateWorkPolicyOverride,
)
if err != nil {
return fmt.Errorf("add work: %s pod: %s container: %s cgroup: %s failed with error: %v", dyingMemcgReclaimWorkName, entryName, subEntryName, absCGPath, err)
}

return nil
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,155 @@
/*
Copyright 2022 The Katalyst Authors.

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package dynamicpolicy

import (
"reflect"
"sync"
"testing"

"github.com/bytedance/mockey"
"github.com/stretchr/testify/assert"

"github.com/kubewharf/katalyst-core/pkg/agent/qrm-plugins/advisorsvc"
"github.com/kubewharf/katalyst-core/pkg/agent/qrm-plugins/memory/dynamicpolicy/memoryadvisor"
"github.com/kubewharf/katalyst-core/pkg/agent/qrm-plugins/util"
"github.com/kubewharf/katalyst-core/pkg/consts"
"github.com/kubewharf/katalyst-core/pkg/metrics"
"github.com/kubewharf/katalyst-core/pkg/util/asyncworker"
cgroupmgr "github.com/kubewharf/katalyst-core/pkg/util/cgroup/manager"
"github.com/kubewharf/katalyst-core/pkg/util/machine"
)

var dyingMemcgReclaimTestMutex sync.Mutex

func TestDynamicPolicy_handleAdvisorDyingMemcgReclaim(t *testing.T) {
t.Parallel()
t.Run("empty cgroup path", func(t *testing.T) {
t.Parallel()
dyingMemcgReclaimTestMutex.Lock()
defer dyingMemcgReclaimTestMutex.Unlock()
p := &DynamicPolicy{}
calculationInfo := &advisorsvc.CalculationInfo{
CalculationResult: &advisorsvc.CalculationResult{
Values: map[string]string{},
},
}

err := p.handleAdvisorDyingMemcgReclaim(nil, nil, nil, metrics.DummyMetrics{}, nil, "pod", "container", calculationInfo, nil)
assert.Error(t, err)
})

t.Run("disabled dying memcg reclaim", func(t *testing.T) {
t.Parallel()
dyingMemcgReclaimTestMutex.Lock()
defer dyingMemcgReclaimTestMutex.Unlock()
defer mockey.UnPatchAll()

p := &DynamicPolicy{
defaultAsyncLimitedWorkers: asyncworker.NewAsyncLimitedWorkers("test", 1, metrics.DummyMetrics{}),
}

disableSwapCalled := false
addWorkCalled := false
getEffectiveCalled := false

mockey.Mock(cgroupmgr.DisableSwapMaxWithAbsolutePathRecursive).IncludeCurrentGoRoutine().To(func(_ string) error {
disableSwapCalled = true
return nil
}).Build()
mockey.Mock((*asyncworker.AsyncLimitedWorkers).AddWork).IncludeCurrentGoRoutine().To(func(_ *asyncworker.AsyncLimitedWorkers, _ *asyncworker.Work, _ asyncworker.DuplicateWorkPolicy) error {
addWorkCalled = true
return nil
}).Build()
mockey.Mock(cgroupmgr.GetEffectiveCPUSetWithAbsolutePath).IncludeCurrentGoRoutine().To(func(_ string) (machine.CPUSet, machine.CPUSet, error) {
getEffectiveCalled = true
return machine.NewCPUSet(0), machine.NewCPUSet(0), nil
}).Build()

calculationInfo := &advisorsvc.CalculationInfo{
CgroupPath: "/sys/fs/cgroup/kubepods/burstable",
CalculationResult: &advisorsvc.CalculationResult{
Values: map[string]string{
string(memoryadvisor.ControlKnobKeySwapMax): consts.ControlKnobOFF,
string(memoryadvisor.ControlKnowKeyDyingMemcgReclaim): consts.ControlKnobOFF,
},
},
}

err := p.handleAdvisorDyingMemcgReclaim(nil, nil, nil, metrics.DummyMetrics{}, nil, "pod", "container", calculationInfo, nil)
assert.NoError(t, err)
assert.True(t, disableSwapCalled)
assert.False(t, addWorkCalled)
assert.False(t, getEffectiveCalled)
})

t.Run("enabled dying memcg reclaim", func(t *testing.T) {
t.Parallel()
dyingMemcgReclaimTestMutex.Lock()
defer dyingMemcgReclaimTestMutex.Unlock()
defer mockey.UnPatchAll()

p := &DynamicPolicy{
defaultAsyncLimitedWorkers: asyncworker.NewAsyncLimitedWorkers("test", 1, metrics.DummyMetrics{}),
}

setSwapCalled := false
var capturedWork *asyncworker.Work
var capturedPolicy asyncworker.DuplicateWorkPolicy
mems := machine.NewCPUSet(0, 1)

mockey.Mock(cgroupmgr.SetSwapMaxWithAbsolutePathRecursive).IncludeCurrentGoRoutine().To(func(_ string) error {
setSwapCalled = true
return nil
}).Build()
mockey.Mock(cgroupmgr.GetEffectiveCPUSetWithAbsolutePath).IncludeCurrentGoRoutine().To(func(_ string) (machine.CPUSet, machine.CPUSet, error) {
return machine.NewCPUSet(0), mems, nil
}).Build()
mockey.Mock((*asyncworker.AsyncLimitedWorkers).AddWork).IncludeCurrentGoRoutine().To(func(_ *asyncworker.AsyncLimitedWorkers, work *asyncworker.Work, policy asyncworker.DuplicateWorkPolicy) error {
capturedWork = work
capturedPolicy = policy
return nil
}).Build()

calculationInfo := &advisorsvc.CalculationInfo{
CgroupPath: "/sys/fs/cgroup/kubepods/burstable",
CalculationResult: &advisorsvc.CalculationResult{
Values: map[string]string{
string(memoryadvisor.ControlKnobKeySwapMax): consts.ControlKnobON,
string(memoryadvisor.ControlKnowKeyDyingMemcgReclaim): consts.ControlKnobON,
},
},
}

emitter := metrics.DummyMetrics{}
err := p.handleAdvisorDyingMemcgReclaim(nil, nil, nil, emitter, nil, "pod", "container", calculationInfo, nil)
assert.NoError(t, err)
assert.True(t, setSwapCalled)
assert.NotNil(t, capturedWork)

expectedAbsPath := "/sys/fs/cgroup/kubepods/burstable"
expectedWorkName := util.GetCgroupAsyncWorkName("/sys/fs/cgroup/kubepods/burstable", memoryPluginAsyncWorkTopicDyingMemcgReclaim)
assert.Equal(t, expectedWorkName, capturedWork.Name)
assert.Equal(t, asyncworker.DuplicateWorkPolicy(asyncworker.DuplicateWorkPolicyOverride), capturedPolicy)
assert.Equal(t, expectedAbsPath, capturedWork.Params[0])
assert.Equal(t, emitter, capturedWork.Params[1])
assert.Equal(t, "pod", capturedWork.Params[2])
assert.Equal(t, "container", capturedWork.Params[3])
assert.Equal(t, mems, capturedWork.Params[4])
assert.Equal(t, reflect.ValueOf(cgroupmgr.DyingMemcgReclaimWithAbsolutePath).Pointer(), reflect.ValueOf(capturedWork.Fn).Pointer())
})
}
1 change: 1 addition & 0 deletions pkg/agent/qrm-plugins/util/consts.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ const (
MetricNameMemoryHandleAdvisorDropCache = "memory_handle_advisor_drop_cache"
MetricNameMemoryHandleAdvisorCPUSetMems = "memory_handle_advisor_cpuset_mems"
MetricNameMemoryHandlerAdvisorMemoryOffload = "memory_handler_advisor_memory_offloading"
MetricNameMemoryHandlerAdvisorDyingMemcgReclaim = "memory_handler_advisor_dying_memcg_reclaim"
MetricNameMemoryHandlerAdvisorMemoryNUMAHeadroom = "memory_handler_advisor_memory_numa_headroom"
MetricNameMemoryOOMPriorityDeleteFailed = "memory_oom_priority_delete_failed"
MetricNameMemoryOOMPriorityUpdateFailed = "memory_oom_priority_update_failed"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ package plugin
import (
"context"
"math"
"os"
"path"
"path/filepath"
"strconv"
"strings"
Expand Down Expand Up @@ -201,6 +203,9 @@ type transparentMemoryOffloading struct {
emitter metrics.MetricEmitter
containerTmoEngines map[katalystcoreconsts.PodContainerName]TmoEngine
cgpathTmoEngines map[string]TmoEngine

lastDyingCGReclaimTime time.Time
enableDyingMemcgReclaim bool
}

type TmoEngine interface {
Expand Down Expand Up @@ -470,6 +475,8 @@ func NewTransparentMemoryOffloading(conf *config.Configuration, extraConfig inte
emitter: emitter,
containerTmoEngines: make(map[consts.PodContainerName]TmoEngine),
cgpathTmoEngines: make(map[string]TmoEngine),
// enableDyingMemcgReclaim: getEnvBool(DyingMemcgReclaimEnv, true),
enableDyingMemcgReclaim: conf.QoSAwarePluginConfiguration.EnableDyingMemcgReclaim,
}
}

Expand Down Expand Up @@ -524,6 +531,7 @@ func (tmo *transparentMemoryOffloading) Reconcile(status *types.MemoryPressureSt
tmo.containerTmoEngines[podContainerName].GetConf().PolicyName)
}
}

// load SPD conf if exists
tmoIndicator := &v1alpha1.TransparentMemoryOffloadingIndicators{}
isBaseline, err := tmo.metaServer.ServiceProfilingManager.ServiceExtendedIndicator(context.Background(), pod.ObjectMeta, tmoIndicator)
Expand Down Expand Up @@ -658,5 +666,47 @@ func (tmo *transparentMemoryOffloading) GetAdvices() types.InternalMemoryCalcula
result.ExtraEntries = append(result.ExtraEntries, entry)
}

if !tmo.enableDyingMemcgReclaim {
return result
}

cgroupPaths := make([]string, 0)
onlineBurstableCgroupPath := path.Join(common.CgroupFSMountPoint, memoryadvisor.OnlineBurstableCgroupPath)
cgroupPaths = append(cgroupPaths, onlineBurstableCgroupPath)

// add offline-besteffort-* cgroup paths
// traverse all paths
directory := path.Join(common.CgroupFSMountPoint, memoryadvisor.KubePodsCgroupPath)
entries, err := os.ReadDir(directory)
if err != nil {
general.Infof("Failed to read directory %s: %v", directory, err)
return result
}

for _, entry := range entries {
if entry.IsDir() && strings.HasPrefix(entry.Name(), memoryadvisor.OfflineBestEffortPrefix) {
cgroupPaths = append(cgroupPaths, path.Join(directory, entry.Name()))
}
}

general.Infof("DyingMemcg paths to be processed: %v", cgroupPaths)

currentTime := time.Now()
if tmo.lastDyingCGReclaimTime.IsZero() || currentTime.Sub(tmo.lastDyingCGReclaimTime) >= memoryadvisor.MemCgReclaimDefaultIntervalSeconds*time.Second {
general.Infof("Trigger dying memcg reclaim for cgroup paths: %v after %v seconds", cgroupPaths, memoryadvisor.MemCgReclaimDefaultIntervalSeconds)

tmo.lastDyingCGReclaimTime = currentTime
// Note: trigger qrm-plugin
for _, cgroupPath := range cgroupPaths {
entry := types.ExtraMemoryAdvices{
CgroupPath: cgroupPath,
Values: map[string]string{
string(memoryadvisor.ControlKnowKeyDyingMemcgReclaim): consts.ControlKnobON,
},
}
result.ExtraEntries = append(result.ExtraEntries, entry)
}
}

return result
}
Loading
Loading