From d1e252d9751dee460484bddfc9b14f9079f93ed7 Mon Sep 17 00:00:00 2001 From: zhangrenpeng Date: Tue, 20 Jan 2026 11:35:56 +0800 Subject: [PATCH 1/5] feat(qrm, sysadvisor): Periodically (every 300s) clean up excessive (>2000) dying mem cgroups. BREAKING CHANGES: 1. Add a scheduled task in SysAdvisor to trigger dying memcg cleanup every 300s. 2. Add a new task in the QRM plugin to trigger dying memcg cleanup via memory reclamation using memory.reclaim. --- .../dynamicpolicy/memoryadvisor/types.go | 11 ++ .../memory/dynamicpolicy/policy.go | 3 + .../dynamicpolicy/policy_advisor_handler.go | 56 ++++++++++ pkg/agent/qrm-plugins/util/consts.go | 1 + .../memory/plugin/memory_offloading.go | 52 +++++++++ pkg/util/cgroup/manager/cgroup.go | 102 ++++++++++++++++++ 6 files changed, 225 insertions(+) diff --git a/pkg/agent/qrm-plugins/memory/dynamicpolicy/memoryadvisor/types.go b/pkg/agent/qrm-plugins/memory/dynamicpolicy/memoryadvisor/types.go index 3fb7d3bb49..c620614c4e 100644 --- a/pkg/agent/qrm-plugins/memory/dynamicpolicy/memoryadvisor/types.go +++ b/pkg/agent/qrm-plugins/memory/dynamicpolicy/memoryadvisor/types.go @@ -26,7 +26,18 @@ const ( ControlKnobKeyBalanceNumaMemory MemoryControlKnobName = "balance_numa_memory" ControlKnobKeySwapMax MemoryControlKnobName = "swap_max" ControlKnowKeyMemoryOffloading MemoryControlKnobName = "memory_offloading" + ControlKnowKeyDyingMemcgReclaim MemoryControlKnobName = "dying_memcg_reclaim" ControlKnobKeyMemoryNUMAHeadroom MemoryControlKnobName = "memory_numa_headroom" ) +const ( + KubePodsCgroupPath = "kubepods" + OfflineBestEffortPrefix = "offline-besteffort-" // follows some number, such as kubepods/offline-besteffort-0 + OnlineBurstableCgroupPath = "kubepods/burstable" +) + +const ( + MemCgReclaimDefaultIntervalSeconds = 300 +) + type MemoryNUMAHeadroom map[int]int64 diff --git a/pkg/agent/qrm-plugins/memory/dynamicpolicy/policy.go b/pkg/agent/qrm-plugins/memory/dynamicpolicy/policy.go index a4e0dcff75..0840cf9125 100644 --- a/pkg/agent/qrm-plugins/memory/dynamicpolicy/policy.go +++ b/pkg/agent/qrm-plugins/memory/dynamicpolicy/policy.go @@ -74,6 +74,7 @@ const ( memoryPluginAsyncWorkTopicSetExtraCGMemLimit = "qrm_memory_plugin_set_extra_mem_limit" memoryPluginAsyncWorkTopicMovePage = "qrm_memory_plugin_move_page" memoryPluginAsyncWorkTopicMemoryOffloading = "qrm_memory_plugin_mem_offload" + memoryPluginAsyncWorkTopicDyingMemcgReclaim = "qrm_memory_plugin_dying_memcg_reclaim" dropCacheTimeoutSeconds = 30 setExtraCGMemLimitTimeoutSeconds = 60 @@ -284,6 +285,8 @@ func NewDynamicPolicy(agentCtx *agent.GenericContext, conf *config.Configuration memoryadvisor.ControlKnobHandlerWithChecker(policyImplement.handleAdvisorMemoryOffloading)) memoryadvisor.RegisterControlKnobHandler(memoryadvisor.ControlKnobKeyMemoryNUMAHeadroom, memoryadvisor.ControlKnobHandlerWithChecker(policyImplement.handleAdvisorMemoryNUMAHeadroom)) + memoryadvisor.RegisterControlKnobHandler(memoryadvisor.ControlKnowKeyDyingMemcgReclaim, + memoryadvisor.ControlKnobHandlerWithChecker(policyImplement.handleAdvisorDyingMemcgReclaim)) if policyImplement.enableEvictingLogCache { policyImplement.logCacheEvictionManager = logcache.NewManager(conf, agentCtx.MetaServer) diff --git a/pkg/agent/qrm-plugins/memory/dynamicpolicy/policy_advisor_handler.go b/pkg/agent/qrm-plugins/memory/dynamicpolicy/policy_advisor_handler.go index 44d0db1dfa..38b41556f1 100644 --- a/pkg/agent/qrm-plugins/memory/dynamicpolicy/policy_advisor_handler.go +++ b/pkg/agent/qrm-plugins/memory/dynamicpolicy/policy_advisor_handler.go @@ -971,3 +971,59 @@ func (p *DynamicPolicy) handleAdvisorMemoryOffloading(_ *config.Configuration, })...) return nil } + +// handleAdvisorDyingMemcgReclaim handles dying memcg reclaim from memory-advisor +// translates from tce/tmo: https://code.byted.org/tce/tmo/commit/58506d1d168f75deedb543c442053b8fbbda3c8e +func (p *DynamicPolicy) handleAdvisorDyingMemcgReclaim(_ *config.Configuration, + _ interface{}, + _ *dynamicconfig.DynamicAgentConfiguration, + emitter metrics.MetricEmitter, + metaServer *metaserver.MetaServer, + entryName, subEntryName string, + calculationInfo *advisorsvc.CalculationInfo, podResourceEntries state.PodResourceEntries, +) error { + var absCGPath string + var dyingMemcgReclaimWorkName string + + if calculationInfo.CgroupPath != "" { + dyingMemcgReclaimWorkName = util.GetCgroupAsyncWorkName(calculationInfo.CgroupPath, memoryPluginAsyncWorkTopicDyingMemcgReclaim) + absCGPath = common.GetAbsCgroupPath(common.CgroupSubsysMemory, calculationInfo.CgroupPath) + } else { + return fmt.Errorf("cgroup path is empty") + } + + // set swap max before trigger memory offloading + swapMax := calculationInfo.CalculationResult.Values[string(memoryadvisor.ControlKnobKeySwapMax)] + if swapMax == consts.ControlKnobON { + err := cgroupmgr.SetSwapMaxWithAbsolutePathRecursive(absCGPath) + if err != nil { + general.Infof("Failed to set swap max, err: %v", err) + } + } else { + err := cgroupmgr.DisableSwapMaxWithAbsolutePathRecursive(absCGPath) + if err != nil { + general.Infof("Failed to disable swap, err: %v", err) + } + } + + isEnableDyingMemcgReclaim := calculationInfo.CalculationResult.Values[string(memoryadvisor.ControlKnowKeyDyingMemcgReclaim)] == consts.ControlKnobON + if !isEnableDyingMemcgReclaim { + return nil + } + + // start an asynchronous work to execute dying memcg reclaim + err := p.defaultAsyncLimitedWorkers.AddWork( + &asyncworker.Work{ + Name: dyingMemcgReclaimWorkName, + UID: uuid.NewUUID(), + Fn: cgroupmgr.DyingMemcgReclaimWithAbsolutePath, + Params: []interface{}{absCGPath, emitter, entryName, subEntryName}, + DeliveredAt: time.Now(), + }, asyncworker.DuplicateWorkPolicyOverride, + ) + if err != nil { + return fmt.Errorf("add work: %s pod: %s container: %s cgroup: %s failed with error: %v", dyingMemcgReclaimWorkName, entryName, subEntryName, absCGPath, err) + } + + return nil +} diff --git a/pkg/agent/qrm-plugins/util/consts.go b/pkg/agent/qrm-plugins/util/consts.go index a13b3bb0c9..8fcfb8de18 100644 --- a/pkg/agent/qrm-plugins/util/consts.go +++ b/pkg/agent/qrm-plugins/util/consts.go @@ -51,6 +51,7 @@ const ( MetricNameMemoryHandleAdvisorDropCache = "memory_handle_advisor_drop_cache" MetricNameMemoryHandleAdvisorCPUSetMems = "memory_handle_advisor_cpuset_mems" MetricNameMemoryHandlerAdvisorMemoryOffload = "memory_handler_advisor_memory_offloading" + MetricNameMemoryHandlerAdvisorDyingMemcgReclaim = "memory_handler_advisor_dying_memcg_reclaim" MetricNameMemoryHandlerAdvisorMemoryNUMAHeadroom = "memory_handler_advisor_memory_numa_headroom" MetricNameMemoryOOMPriorityDeleteFailed = "memory_oom_priority_delete_failed" MetricNameMemoryOOMPriorityUpdateFailed = "memory_oom_priority_update_failed" diff --git a/pkg/agent/sysadvisor/plugin/qosaware/resource/memory/plugin/memory_offloading.go b/pkg/agent/sysadvisor/plugin/qosaware/resource/memory/plugin/memory_offloading.go index 5e6b2cb101..cee95e3a88 100644 --- a/pkg/agent/sysadvisor/plugin/qosaware/resource/memory/plugin/memory_offloading.go +++ b/pkg/agent/sysadvisor/plugin/qosaware/resource/memory/plugin/memory_offloading.go @@ -19,6 +19,8 @@ package plugin import ( "context" "math" + "os" + "path" "path/filepath" "strconv" "strings" @@ -201,6 +203,8 @@ type transparentMemoryOffloading struct { emitter metrics.MetricEmitter containerTmoEngines map[katalystcoreconsts.PodContainerName]TmoEngine cgpathTmoEngines map[string]TmoEngine + + lastDyingCGReclaimTime time.Time } type TmoEngine interface { @@ -524,6 +528,9 @@ func (tmo *transparentMemoryOffloading) Reconcile(status *types.MemoryPressureSt tmo.containerTmoEngines[podContainerName].GetConf().PolicyName) } } + + // PoolName Override QosLevel Config + // load SPD conf if exists tmoIndicator := &v1alpha1.TransparentMemoryOffloadingIndicators{} isBaseline, err := tmo.metaServer.ServiceProfilingManager.ServiceExtendedIndicator(context.Background(), pod.ObjectMeta, tmoIndicator) @@ -658,5 +665,50 @@ func (tmo *transparentMemoryOffloading) GetAdvices() types.InternalMemoryCalcula result.ExtraEntries = append(result.ExtraEntries, entry) } + cgroupPaths := make([]string, 0) + onlineBurstableCgroupPath := path.Join(common.CgroupFSMountPoint, memoryadvisor.OnlineBurstableCgroupPath) + cgroupPaths = append(cgroupPaths, onlineBurstableCgroupPath) + + // add offline-besteffort-* cgroup paths + // traverse all paths + directory := path.Join(common.CgroupFSMountPoint, memoryadvisor.KubePodsCgroupPath) + entries, err := os.ReadDir(directory) + if err != nil { + general.Infof("Failed to read directory %s: %v", directory, err) + return result + } + + for _, entry := range entries { + if entry.IsDir() && strings.HasPrefix(entry.Name(), memoryadvisor.OfflineBestEffortPrefix) { + cgroupPaths = append(cgroupPaths, path.Join(directory, entry.Name())) + } + } + + general.Infof("DyingMemcg paths to be processed: %v", cgroupPaths) + + currentTime := time.Now() + if tmo.lastDyingCGReclaimTime.IsZero() || currentTime.Sub(tmo.lastDyingCGReclaimTime) >= memoryadvisor.MemCgReclaimDefaultIntervalSeconds*time.Second { + general.Infof("Trigger dying memcg reclaim for cgroup paths: %v after %v seconds", cgroupPaths, memoryadvisor.MemCgReclaimDefaultIntervalSeconds) + + tmo.lastDyingCGReclaimTime = currentTime + // Note: trigger qrm-plugin + for _, cgroupPath := range cgroupPaths { + // cgroupPath here uses absolute path + relativeCgroupPath, err := filepath.Rel(common.CgroupFSMountPoint, cgroupPath) + if err != nil { + continue + } + relativeCgroupPath = "/" + relativeCgroupPath + + entry := types.ExtraMemoryAdvices{ + CgroupPath: relativeCgroupPath, + Values: map[string]string{ + string(memoryadvisor.ControlKnowKeyDyingMemcgReclaim): consts.ControlKnobON, + }, + } + result.ExtraEntries = append(result.ExtraEntries, entry) + } + } + return result } diff --git a/pkg/util/cgroup/manager/cgroup.go b/pkg/util/cgroup/manager/cgroup.go index 3c0ce8ee72..7ad6bd6cf2 100644 --- a/pkg/util/cgroup/manager/cgroup.go +++ b/pkg/util/cgroup/manager/cgroup.go @@ -23,10 +23,13 @@ import ( "math" "os" "os/exec" + "path" "path/filepath" "strconv" + "strings" "time" + "github.com/kubewharf/katalyst-core/pkg/agent/qrm-plugins/util" "github.com/kubewharf/katalyst-core/pkg/consts" "github.com/kubewharf/katalyst-core/pkg/metrics" "github.com/kubewharf/katalyst-core/pkg/util/asyncworker" @@ -549,6 +552,105 @@ func MemoryOffloadingWithAbsolutePath(ctx context.Context, absCgroupPath string, return err } +func invokeMemoryReclaim(reclaimFile string, memSize string) error { + // write memSize to reclaimFile + err := os.WriteFile(reclaimFile, []byte(memSize), 0644) + if err != nil { + return fmt.Errorf("write %s failed with error: %v", reclaimFile, err) + } + return nil +} + +func readNrDyingDescendants(statFile string) (int, error) { + // read nr_dying_descendants from statFile + statContent, err := os.ReadFile(statFile) + var nrDyingDescendants int + if err != nil { + general.Warningf("read cgroup.stat file failed: %v", err) + return 0, err + } + statLines := strings.Split(string(statContent), "\n") + for _, line := range statLines { + if strings.HasPrefix(line, "nr_dying_descendants") { + nrDyingDescendants, err = strconv.Atoi(strings.Split(line, " ")[1]) + if err != nil { + general.Warningf("parse nr_dying_descendants failed: %v", err) + return 0, err + } + general.Infof("nr_dying_descendants: %d", nrDyingDescendants) + } + } + return nrDyingDescendants, nil +} + +func DyingMemcgReclaimWithAbsolutePath(ctx context.Context, absCGPath string, emitter metrics.MetricEmitter, entryName string, subEntryName string) error { + // perform dying memcg reclaim for burstable cgroup + general.Infof("Enable dying memcg reclaim for global Cgroup: %s", absCGPath) + // absCGPath is like "/sys/fs/cgroup/kubepods/burstable/pod-1234-5678" + + startTime := time.Now() + + statFile := path.Join(absCGPath, "cgroup.stat") + // check whether file exists + if _, err := os.Stat(statFile); os.IsNotExist(err) { + general.Warningf("cgroup.stat file not exist: %s", statFile) + return nil + } + + reclaimFile := path.Join(absCGPath, "memory.reclaim") + + // check whether file exists + if _, err := os.Stat(reclaimFile); os.IsNotExist(err) { + general.Warningf("memory.reclaim file not exist: %s", reclaimFile) + return nil + } + + initialDyingDescendants, err := readNrDyingDescendants(statFile) + if err != nil { + general.Warningf("read nr_dying_descendants failed: %v", err) + return nil + } + nrDyingDescendants := initialDyingDescendants + + for i := 0; i < 10; i++ { + if nrDyingDescendants <= 2000 { + general.Infof("nr_dying_descendants: %d < 2000, no need to reclaim", nrDyingDescendants) + break + } + + // invoke memory reclaim + err = invokeMemoryReclaim(reclaimFile, "30m") + if err != nil { + general.Warningf("invoke memory reclaim failed: %v", err) + return nil + } + + // sleep 5s + time.Sleep(5 * time.Second) + + nrDyingDescendants, err = readNrDyingDescendants(statFile) + if err != nil { + general.Warningf("read nr_dying_descendants failed: %v", err) + return nil + } + } + + totalReleaseDyingMemcgCnt := initialDyingDescendants - nrDyingDescendants + + // emit metrics: the number of dying memcg released this time + _ = emitter.StoreInt64(util.MetricNameMemoryHandlerAdvisorDyingMemcgReclaim, int64(totalReleaseDyingMemcgCnt), + metrics.MetricTypeNameRaw, metrics.ConvertMapToTags(map[string]string{ + "entryName": entryName, + "subEntryName": subEntryName, + "cgroupPath": absCGPath, + })...) + + delta := time.Since(startTime).Seconds() + general.Infof("[DyingMemcgReclaimWithAbsolutePath] it takes %v to do \"%s\" on cgroup: %s", delta, "memory.reclaim", absCGPath) + + return nil +} + func GetEffectiveCPUSetWithAbsolutePath(absCgroupPath string) (machine.CPUSet, machine.CPUSet, error) { if !IsCgroupPath(absCgroupPath) { return machine.CPUSet{}, machine.CPUSet{}, fmt.Errorf("path %s is not a cgroup", absCgroupPath) From 2c55d2e06d528cfc16c2b0534fc0153cfef19352 Mon Sep 17 00:00:00 2001 From: zhangrenpeng Date: Thu, 22 Jan 2026 11:44:51 +0800 Subject: [PATCH 2/5] fix(cgroup): Add logging for dying memcg reclamation amount --- pkg/util/cgroup/manager/cgroup.go | 13 ++++++++++--- 1 file changed, 10 insertions(+), 3 deletions(-) diff --git a/pkg/util/cgroup/manager/cgroup.go b/pkg/util/cgroup/manager/cgroup.go index 7ad6bd6cf2..9a418105f4 100644 --- a/pkg/util/cgroup/manager/cgroup.go +++ b/pkg/util/cgroup/manager/cgroup.go @@ -50,6 +50,8 @@ const ( const CgroupFSMountPoint = "/sys/fs/cgroup" +const DyingMemcgThreshold int = 2000 + func ApplyMemoryWithRelativePath(relCgroupPath string, data *common.MemoryData) error { if data == nil { return fmt.Errorf("ApplyMemoryWithRelativePath with nil cgroup data") @@ -554,7 +556,7 @@ func MemoryOffloadingWithAbsolutePath(ctx context.Context, absCgroupPath string, func invokeMemoryReclaim(reclaimFile string, memSize string) error { // write memSize to reclaimFile - err := os.WriteFile(reclaimFile, []byte(memSize), 0644) + err := os.WriteFile(reclaimFile, []byte(memSize), 0o644) if err != nil { return fmt.Errorf("write %s failed with error: %v", reclaimFile, err) } @@ -613,11 +615,13 @@ func DyingMemcgReclaimWithAbsolutePath(ctx context.Context, absCGPath string, em nrDyingDescendants := initialDyingDescendants for i := 0; i < 10; i++ { - if nrDyingDescendants <= 2000 { - general.Infof("nr_dying_descendants: %d < 2000, no need to reclaim", nrDyingDescendants) + if nrDyingDescendants <= DyingMemcgThreshold { + general.Infof("nr_dying_descendants: %d <= %d, no need to reclaim", nrDyingDescendants, DyingMemcgThreshold) break } + general.Infof("nr_dying_descendants: %d > %d, reclaim memory", nrDyingDescendants, DyingMemcgThreshold) + // invoke memory reclaim err = invokeMemoryReclaim(reclaimFile, "30m") if err != nil { @@ -633,6 +637,8 @@ func DyingMemcgReclaimWithAbsolutePath(ctx context.Context, absCGPath string, em general.Warningf("read nr_dying_descendants failed: %v", err) return nil } + + general.Infof("After reclaim, nr_dying_descendants: %d", nrDyingDescendants) } totalReleaseDyingMemcgCnt := initialDyingDescendants - nrDyingDescendants @@ -647,6 +653,7 @@ func DyingMemcgReclaimWithAbsolutePath(ctx context.Context, absCGPath string, em delta := time.Since(startTime).Seconds() general.Infof("[DyingMemcgReclaimWithAbsolutePath] it takes %v to do \"%s\" on cgroup: %s", delta, "memory.reclaim", absCGPath) + general.Infof("[DyingMemcgReclaimWithAbsolutePath] After reclaim, nr_dying_descendants: %d -> %d", initialDyingDescendants, nrDyingDescendants) return nil } From 21bb4b9d5b3800d724f0c5fc6aed6811522cb53f Mon Sep 17 00:00:00 2001 From: zhangrenpeng Date: Mon, 16 Mar 2026 21:59:07 +0800 Subject: [PATCH 3/5] fix(sys-advisor, qrm): encapsulate `GetCgroupNrDyingDescendants` in common `manager.go` interface, and using `MemoryOffloadingWithAbsolutePath` instead of self-written `invokeMemoryReclaim` --- .../dynamicpolicy/policy_advisor_handler.go | 10 +++- .../memory/plugin/memory_offloading.go | 2 - pkg/util/cgroup/manager/cgroup.go | 50 ++----------------- pkg/util/cgroup/manager/fake_manager.go | 4 ++ pkg/util/cgroup/manager/manager.go | 1 + pkg/util/cgroup/manager/v1/fs_linux.go | 4 ++ pkg/util/cgroup/manager/v2/fs_linux.go | 35 +++++++++++++ 7 files changed, 57 insertions(+), 49 deletions(-) diff --git a/pkg/agent/qrm-plugins/memory/dynamicpolicy/policy_advisor_handler.go b/pkg/agent/qrm-plugins/memory/dynamicpolicy/policy_advisor_handler.go index 38b41556f1..6e5fb6208e 100644 --- a/pkg/agent/qrm-plugins/memory/dynamicpolicy/policy_advisor_handler.go +++ b/pkg/agent/qrm-plugins/memory/dynamicpolicy/policy_advisor_handler.go @@ -1011,13 +1011,19 @@ func (p *DynamicPolicy) handleAdvisorDyingMemcgReclaim(_ *config.Configuration, return nil } + _, mems, err := cgroupmgr.GetEffectiveCPUSetWithAbsolutePath(absCGPath) + if err != nil { + return fmt.Errorf("GetEffectiveCPUSetWithAbsolutePath failed with error: %v", err) + } + general.Infof("dyingMemcgReclaimWithAbsolutePath mems: %v", mems) + // start an asynchronous work to execute dying memcg reclaim - err := p.defaultAsyncLimitedWorkers.AddWork( + err = p.defaultAsyncLimitedWorkers.AddWork( &asyncworker.Work{ Name: dyingMemcgReclaimWorkName, UID: uuid.NewUUID(), Fn: cgroupmgr.DyingMemcgReclaimWithAbsolutePath, - Params: []interface{}{absCGPath, emitter, entryName, subEntryName}, + Params: []interface{}{absCGPath, emitter, entryName, subEntryName, mems}, DeliveredAt: time.Now(), }, asyncworker.DuplicateWorkPolicyOverride, ) diff --git a/pkg/agent/sysadvisor/plugin/qosaware/resource/memory/plugin/memory_offloading.go b/pkg/agent/sysadvisor/plugin/qosaware/resource/memory/plugin/memory_offloading.go index cee95e3a88..7171835afa 100644 --- a/pkg/agent/sysadvisor/plugin/qosaware/resource/memory/plugin/memory_offloading.go +++ b/pkg/agent/sysadvisor/plugin/qosaware/resource/memory/plugin/memory_offloading.go @@ -529,8 +529,6 @@ func (tmo *transparentMemoryOffloading) Reconcile(status *types.MemoryPressureSt } } - // PoolName Override QosLevel Config - // load SPD conf if exists tmoIndicator := &v1alpha1.TransparentMemoryOffloadingIndicators{} isBaseline, err := tmo.metaServer.ServiceProfilingManager.ServiceExtendedIndicator(context.Background(), pod.ObjectMeta, tmoIndicator) diff --git a/pkg/util/cgroup/manager/cgroup.go b/pkg/util/cgroup/manager/cgroup.go index 9a418105f4..d54e5ec159 100644 --- a/pkg/util/cgroup/manager/cgroup.go +++ b/pkg/util/cgroup/manager/cgroup.go @@ -26,7 +26,6 @@ import ( "path" "path/filepath" "strconv" - "strings" "time" "github.com/kubewharf/katalyst-core/pkg/agent/qrm-plugins/util" @@ -554,51 +553,12 @@ func MemoryOffloadingWithAbsolutePath(ctx context.Context, absCgroupPath string, return err } -func invokeMemoryReclaim(reclaimFile string, memSize string) error { - // write memSize to reclaimFile - err := os.WriteFile(reclaimFile, []byte(memSize), 0o644) - if err != nil { - return fmt.Errorf("write %s failed with error: %v", reclaimFile, err) - } - return nil -} - -func readNrDyingDescendants(statFile string) (int, error) { - // read nr_dying_descendants from statFile - statContent, err := os.ReadFile(statFile) - var nrDyingDescendants int - if err != nil { - general.Warningf("read cgroup.stat file failed: %v", err) - return 0, err - } - statLines := strings.Split(string(statContent), "\n") - for _, line := range statLines { - if strings.HasPrefix(line, "nr_dying_descendants") { - nrDyingDescendants, err = strconv.Atoi(strings.Split(line, " ")[1]) - if err != nil { - general.Warningf("parse nr_dying_descendants failed: %v", err) - return 0, err - } - general.Infof("nr_dying_descendants: %d", nrDyingDescendants) - } - } - return nrDyingDescendants, nil -} - -func DyingMemcgReclaimWithAbsolutePath(ctx context.Context, absCGPath string, emitter metrics.MetricEmitter, entryName string, subEntryName string) error { +func DyingMemcgReclaimWithAbsolutePath(ctx context.Context, absCGPath string, emitter metrics.MetricEmitter, entryName string, subEntryName string, mems machine.CPUSet) error { // perform dying memcg reclaim for burstable cgroup general.Infof("Enable dying memcg reclaim for global Cgroup: %s", absCGPath) // absCGPath is like "/sys/fs/cgroup/kubepods/burstable/pod-1234-5678" startTime := time.Now() - - statFile := path.Join(absCGPath, "cgroup.stat") - // check whether file exists - if _, err := os.Stat(statFile); os.IsNotExist(err) { - general.Warningf("cgroup.stat file not exist: %s", statFile) - return nil - } - reclaimFile := path.Join(absCGPath, "memory.reclaim") // check whether file exists @@ -607,7 +567,7 @@ func DyingMemcgReclaimWithAbsolutePath(ctx context.Context, absCGPath string, em return nil } - initialDyingDescendants, err := readNrDyingDescendants(statFile) + initialDyingDescendants, err := GetManager().GetCgroupNrDyingDescendants(absCGPath) if err != nil { general.Warningf("read nr_dying_descendants failed: %v", err) return nil @@ -622,8 +582,8 @@ func DyingMemcgReclaimWithAbsolutePath(ctx context.Context, absCGPath string, em general.Infof("nr_dying_descendants: %d > %d, reclaim memory", nrDyingDescendants, DyingMemcgThreshold) - // invoke memory reclaim - err = invokeMemoryReclaim(reclaimFile, "30m") + // reclaim 30m, to trigger dying memory cgroup reclaim + err = MemoryOffloadingWithAbsolutePath(ctx, absCGPath, int64(30*1024*1024), mems) if err != nil { general.Warningf("invoke memory reclaim failed: %v", err) return nil @@ -632,7 +592,7 @@ func DyingMemcgReclaimWithAbsolutePath(ctx context.Context, absCGPath string, em // sleep 5s time.Sleep(5 * time.Second) - nrDyingDescendants, err = readNrDyingDescendants(statFile) + nrDyingDescendants, err = GetManager().GetCgroupNrDyingDescendants(absCGPath) if err != nil { general.Warningf("read nr_dying_descendants failed: %v", err) return nil diff --git a/pkg/util/cgroup/manager/fake_manager.go b/pkg/util/cgroup/manager/fake_manager.go index cb646715c6..49241567ea 100644 --- a/pkg/util/cgroup/manager/fake_manager.go +++ b/pkg/util/cgroup/manager/fake_manager.go @@ -99,3 +99,7 @@ func (f *FakeCgroupManager) GetPids(absCgroupPath string) ([]string, error) { func (f *FakeCgroupManager) GetTasks(absCgroupPath string) ([]string, error) { return nil, nil } + +func (f *FakeCgroupManager) GetCgroupNrDyingDescendants(absCgroupPath string) (int, error) { + return 0, nil +} diff --git a/pkg/util/cgroup/manager/manager.go b/pkg/util/cgroup/manager/manager.go index 9adbb4b0c1..89db3dd2eb 100644 --- a/pkg/util/cgroup/manager/manager.go +++ b/pkg/util/cgroup/manager/manager.go @@ -53,6 +53,7 @@ type Manager interface { GetDeviceIOWeight(absCgroupPath string, devID string) (uint64, bool, error) GetIOStat(absCgroupPath string) (map[string]map[string]string, error) GetMetrics(relCgroupPath string, subsystems map[string]struct{}) (*common.CgroupMetrics, error) + GetCgroupNrDyingDescendants(absCgroupPath string) (int, error) GetPids(absCgroupPath string) ([]string, error) GetTasks(absCgroupPath string) ([]string, error) diff --git a/pkg/util/cgroup/manager/v1/fs_linux.go b/pkg/util/cgroup/manager/v1/fs_linux.go index 62913e961c..949741e910 100644 --- a/pkg/util/cgroup/manager/v1/fs_linux.go +++ b/pkg/util/cgroup/manager/v1/fs_linux.go @@ -437,6 +437,10 @@ func (m *manager) GetTasks(absCgroupPath string) ([]string, error) { return tasks, nil } +func (m *manager) GetCgroupNrDyingDescendants(absCgroupPath string) (int, error) { + return 0, errors.New("cgroups v1 does not support nr_dying_descendants") +} + func newHierarchy(enabled map[cgroups.Name]struct{}) cgroups.Hierarchy { return func() ([]cgroups.Subsystem, error) { ss, err := cgroups.V1() diff --git a/pkg/util/cgroup/manager/v2/fs_linux.go b/pkg/util/cgroup/manager/v2/fs_linux.go index 7dea750c0e..0ac25b2831 100644 --- a/pkg/util/cgroup/manager/v2/fs_linux.go +++ b/pkg/util/cgroup/manager/v2/fs_linux.go @@ -709,6 +709,41 @@ func (m *manager) GetTasks(absCgroupPath string) ([]string, error) { return tasks, err } +func (m *manager) GetCgroupNrDyingDescendants(absCgroupPath string) (int, error) { + statFile := path.Join(absCgroupPath, "cgroup.stat") + // check whether file exists + if _, err := os.Stat(statFile); os.IsNotExist(err) { + general.Warningf("cgroup.stat file not exist: %s", statFile) + return 0, err + } + + // read nr_dying_descendants from statFile + statContent, err := os.ReadFile(statFile) + var nrDyingDescendants int + if err != nil { + general.Warningf("read cgroup.stat file failed: %v", err) + return 0, err + } + statLines := strings.Split(string(statContent), "\n") + for _, line := range statLines { + if strings.HasPrefix(line, "nr_dying_descendants") { + fields := strings.Split(line, " ") + if len(fields) < 2 { + general.Warningf("invalid nr_dying_descendants line: %s", line) + return 0, fmt.Errorf("invalid nr_dying_descendants line: %s", line) + } + + nrDyingDescendants, err = strconv.Atoi(fields[1]) + if err != nil { + general.Warningf("parse nr_dying_descendants failed: %v", err) + return 0, err + } + general.Infof("nr_dying_descendants: %d", nrDyingDescendants) + } + } + return nrDyingDescendants, nil +} + func numToStr(value int64) (ret string) { switch { case value == 0: From ec7c420adbf21296e77997d8c1ca90d8729002ca Mon Sep 17 00:00:00 2001 From: zhangrenpeng Date: Tue, 17 Mar 2026 11:54:21 +0800 Subject: [PATCH 4/5] fix(sys-advisor, qrm-plugin): add unit tests, and remove duplicate path calc --- .../dynamicpolicy/policy_advisor_handler.go | 3 +- ...policy_advisor_handler_dying_memcg_test.go | 155 ++++++++++++++++++ .../memory/plugin/memory_offloading.go | 9 +- .../memory/plugin/memory_offloading_test.go | 107 ++++++++++++ pkg/util/cgroup/manager/cgroup_test.go | 55 +++++++ pkg/util/cgroup/manager/v2/fs_linux_test.go | 67 ++++++++ 6 files changed, 386 insertions(+), 10 deletions(-) create mode 100644 pkg/agent/qrm-plugins/memory/dynamicpolicy/policy_advisor_handler_dying_memcg_test.go create mode 100644 pkg/agent/sysadvisor/plugin/qosaware/resource/memory/plugin/memory_offloading_test.go diff --git a/pkg/agent/qrm-plugins/memory/dynamicpolicy/policy_advisor_handler.go b/pkg/agent/qrm-plugins/memory/dynamicpolicy/policy_advisor_handler.go index 6e5fb6208e..021f017e44 100644 --- a/pkg/agent/qrm-plugins/memory/dynamicpolicy/policy_advisor_handler.go +++ b/pkg/agent/qrm-plugins/memory/dynamicpolicy/policy_advisor_handler.go @@ -973,7 +973,6 @@ func (p *DynamicPolicy) handleAdvisorMemoryOffloading(_ *config.Configuration, } // handleAdvisorDyingMemcgReclaim handles dying memcg reclaim from memory-advisor -// translates from tce/tmo: https://code.byted.org/tce/tmo/commit/58506d1d168f75deedb543c442053b8fbbda3c8e func (p *DynamicPolicy) handleAdvisorDyingMemcgReclaim(_ *config.Configuration, _ interface{}, _ *dynamicconfig.DynamicAgentConfiguration, @@ -987,7 +986,7 @@ func (p *DynamicPolicy) handleAdvisorDyingMemcgReclaim(_ *config.Configuration, if calculationInfo.CgroupPath != "" { dyingMemcgReclaimWorkName = util.GetCgroupAsyncWorkName(calculationInfo.CgroupPath, memoryPluginAsyncWorkTopicDyingMemcgReclaim) - absCGPath = common.GetAbsCgroupPath(common.CgroupSubsysMemory, calculationInfo.CgroupPath) + absCGPath = calculationInfo.CgroupPath } else { return fmt.Errorf("cgroup path is empty") } diff --git a/pkg/agent/qrm-plugins/memory/dynamicpolicy/policy_advisor_handler_dying_memcg_test.go b/pkg/agent/qrm-plugins/memory/dynamicpolicy/policy_advisor_handler_dying_memcg_test.go new file mode 100644 index 0000000000..76aabc31ae --- /dev/null +++ b/pkg/agent/qrm-plugins/memory/dynamicpolicy/policy_advisor_handler_dying_memcg_test.go @@ -0,0 +1,155 @@ +/* +Copyright 2022 The Katalyst Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package dynamicpolicy + +import ( + "reflect" + "sync" + "testing" + + "github.com/bytedance/mockey" + "github.com/stretchr/testify/assert" + + "github.com/kubewharf/katalyst-core/pkg/agent/qrm-plugins/advisorsvc" + "github.com/kubewharf/katalyst-core/pkg/agent/qrm-plugins/memory/dynamicpolicy/memoryadvisor" + "github.com/kubewharf/katalyst-core/pkg/agent/qrm-plugins/util" + "github.com/kubewharf/katalyst-core/pkg/consts" + "github.com/kubewharf/katalyst-core/pkg/metrics" + "github.com/kubewharf/katalyst-core/pkg/util/asyncworker" + cgroupmgr "github.com/kubewharf/katalyst-core/pkg/util/cgroup/manager" + "github.com/kubewharf/katalyst-core/pkg/util/machine" +) + +var dyingMemcgReclaimTestMutex sync.Mutex + +func TestDynamicPolicy_handleAdvisorDyingMemcgReclaim(t *testing.T) { + t.Parallel() + t.Run("empty cgroup path", func(t *testing.T) { + t.Parallel() + dyingMemcgReclaimTestMutex.Lock() + defer dyingMemcgReclaimTestMutex.Unlock() + p := &DynamicPolicy{} + calculationInfo := &advisorsvc.CalculationInfo{ + CalculationResult: &advisorsvc.CalculationResult{ + Values: map[string]string{}, + }, + } + + err := p.handleAdvisorDyingMemcgReclaim(nil, nil, nil, metrics.DummyMetrics{}, nil, "pod", "container", calculationInfo, nil) + assert.Error(t, err) + }) + + t.Run("disabled dying memcg reclaim", func(t *testing.T) { + t.Parallel() + dyingMemcgReclaimTestMutex.Lock() + defer dyingMemcgReclaimTestMutex.Unlock() + defer mockey.UnPatchAll() + + p := &DynamicPolicy{ + defaultAsyncLimitedWorkers: asyncworker.NewAsyncLimitedWorkers("test", 1, metrics.DummyMetrics{}), + } + + disableSwapCalled := false + addWorkCalled := false + getEffectiveCalled := false + + mockey.Mock(cgroupmgr.DisableSwapMaxWithAbsolutePathRecursive).IncludeCurrentGoRoutine().To(func(_ string) error { + disableSwapCalled = true + return nil + }).Build() + mockey.Mock((*asyncworker.AsyncLimitedWorkers).AddWork).IncludeCurrentGoRoutine().To(func(_ *asyncworker.AsyncLimitedWorkers, _ *asyncworker.Work, _ asyncworker.DuplicateWorkPolicy) error { + addWorkCalled = true + return nil + }).Build() + mockey.Mock(cgroupmgr.GetEffectiveCPUSetWithAbsolutePath).IncludeCurrentGoRoutine().To(func(_ string) (machine.CPUSet, machine.CPUSet, error) { + getEffectiveCalled = true + return machine.NewCPUSet(0), machine.NewCPUSet(0), nil + }).Build() + + calculationInfo := &advisorsvc.CalculationInfo{ + CgroupPath: "/sys/fs/cgroup/kubepods/burstable", + CalculationResult: &advisorsvc.CalculationResult{ + Values: map[string]string{ + string(memoryadvisor.ControlKnobKeySwapMax): consts.ControlKnobOFF, + string(memoryadvisor.ControlKnowKeyDyingMemcgReclaim): consts.ControlKnobOFF, + }, + }, + } + + err := p.handleAdvisorDyingMemcgReclaim(nil, nil, nil, metrics.DummyMetrics{}, nil, "pod", "container", calculationInfo, nil) + assert.NoError(t, err) + assert.True(t, disableSwapCalled) + assert.False(t, addWorkCalled) + assert.False(t, getEffectiveCalled) + }) + + t.Run("enabled dying memcg reclaim", func(t *testing.T) { + t.Parallel() + dyingMemcgReclaimTestMutex.Lock() + defer dyingMemcgReclaimTestMutex.Unlock() + defer mockey.UnPatchAll() + + p := &DynamicPolicy{ + defaultAsyncLimitedWorkers: asyncworker.NewAsyncLimitedWorkers("test", 1, metrics.DummyMetrics{}), + } + + setSwapCalled := false + var capturedWork *asyncworker.Work + var capturedPolicy asyncworker.DuplicateWorkPolicy + mems := machine.NewCPUSet(0, 1) + + mockey.Mock(cgroupmgr.SetSwapMaxWithAbsolutePathRecursive).IncludeCurrentGoRoutine().To(func(_ string) error { + setSwapCalled = true + return nil + }).Build() + mockey.Mock(cgroupmgr.GetEffectiveCPUSetWithAbsolutePath).IncludeCurrentGoRoutine().To(func(_ string) (machine.CPUSet, machine.CPUSet, error) { + return machine.NewCPUSet(0), mems, nil + }).Build() + mockey.Mock((*asyncworker.AsyncLimitedWorkers).AddWork).IncludeCurrentGoRoutine().To(func(_ *asyncworker.AsyncLimitedWorkers, work *asyncworker.Work, policy asyncworker.DuplicateWorkPolicy) error { + capturedWork = work + capturedPolicy = policy + return nil + }).Build() + + calculationInfo := &advisorsvc.CalculationInfo{ + CgroupPath: "/sys/fs/cgroup/kubepods/burstable", + CalculationResult: &advisorsvc.CalculationResult{ + Values: map[string]string{ + string(memoryadvisor.ControlKnobKeySwapMax): consts.ControlKnobON, + string(memoryadvisor.ControlKnowKeyDyingMemcgReclaim): consts.ControlKnobON, + }, + }, + } + + emitter := metrics.DummyMetrics{} + err := p.handleAdvisorDyingMemcgReclaim(nil, nil, nil, emitter, nil, "pod", "container", calculationInfo, nil) + assert.NoError(t, err) + assert.True(t, setSwapCalled) + assert.NotNil(t, capturedWork) + + expectedAbsPath := "/sys/fs/cgroup/kubepods/burstable" + expectedWorkName := util.GetCgroupAsyncWorkName("/sys/fs/cgroup/kubepods/burstable", memoryPluginAsyncWorkTopicDyingMemcgReclaim) + assert.Equal(t, expectedWorkName, capturedWork.Name) + assert.Equal(t, asyncworker.DuplicateWorkPolicy(asyncworker.DuplicateWorkPolicyOverride), capturedPolicy) + assert.Equal(t, expectedAbsPath, capturedWork.Params[0]) + assert.Equal(t, emitter, capturedWork.Params[1]) + assert.Equal(t, "pod", capturedWork.Params[2]) + assert.Equal(t, "container", capturedWork.Params[3]) + assert.Equal(t, mems, capturedWork.Params[4]) + assert.Equal(t, reflect.ValueOf(cgroupmgr.DyingMemcgReclaimWithAbsolutePath).Pointer(), reflect.ValueOf(capturedWork.Fn).Pointer()) + }) +} diff --git a/pkg/agent/sysadvisor/plugin/qosaware/resource/memory/plugin/memory_offloading.go b/pkg/agent/sysadvisor/plugin/qosaware/resource/memory/plugin/memory_offloading.go index 7171835afa..9cad4a0ac1 100644 --- a/pkg/agent/sysadvisor/plugin/qosaware/resource/memory/plugin/memory_offloading.go +++ b/pkg/agent/sysadvisor/plugin/qosaware/resource/memory/plugin/memory_offloading.go @@ -691,15 +691,8 @@ func (tmo *transparentMemoryOffloading) GetAdvices() types.InternalMemoryCalcula tmo.lastDyingCGReclaimTime = currentTime // Note: trigger qrm-plugin for _, cgroupPath := range cgroupPaths { - // cgroupPath here uses absolute path - relativeCgroupPath, err := filepath.Rel(common.CgroupFSMountPoint, cgroupPath) - if err != nil { - continue - } - relativeCgroupPath = "/" + relativeCgroupPath - entry := types.ExtraMemoryAdvices{ - CgroupPath: relativeCgroupPath, + CgroupPath: cgroupPath, Values: map[string]string{ string(memoryadvisor.ControlKnowKeyDyingMemcgReclaim): consts.ControlKnobON, }, diff --git a/pkg/agent/sysadvisor/plugin/qosaware/resource/memory/plugin/memory_offloading_test.go b/pkg/agent/sysadvisor/plugin/qosaware/resource/memory/plugin/memory_offloading_test.go new file mode 100644 index 0000000000..7ae02598da --- /dev/null +++ b/pkg/agent/sysadvisor/plugin/qosaware/resource/memory/plugin/memory_offloading_test.go @@ -0,0 +1,107 @@ +/* +Copyright 2022 The Katalyst Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package plugin + +import ( + "errors" + "os" + "sync" + "testing" + "time" + + "github.com/bytedance/mockey" + "github.com/stretchr/testify/assert" + + "github.com/kubewharf/katalyst-core/pkg/agent/qrm-plugins/memory/dynamicpolicy/memoryadvisor" + "github.com/kubewharf/katalyst-core/pkg/consts" +) + +var transparentMemoryOffloadingTestMutex sync.Mutex + +type mockDirEntry struct { + name string + isDir bool +} + +func (m mockDirEntry) Name() string { return m.name } +func (m mockDirEntry) IsDir() bool { return m.isDir } +func (m mockDirEntry) Type() os.FileMode { return 0 } +func (m mockDirEntry) Info() (os.FileInfo, error) { return nil, nil } + +func TestTransparentMemoryOffloading_GetAdvices_DyingMemcgReclaim(t *testing.T) { + t.Parallel() + transparentMemoryOffloadingTestMutex.Lock() + defer transparentMemoryOffloadingTestMutex.Unlock() + defer mockey.UnPatchAll() + + tmo := &transparentMemoryOffloading{} + + mockey.Mock(os.ReadDir).IncludeCurrentGoRoutine().Return([]os.DirEntry{ + mockDirEntry{name: "offline-besteffort-0", isDir: true}, + mockDirEntry{name: "offline-besteffort-1", isDir: true}, + mockDirEntry{name: "burstable", isDir: true}, + }, nil).Build() + + result := tmo.GetAdvices() + assert.Len(t, result.ExtraEntries, 3) + + expectedPaths := map[string]struct{}{ + "/sys/fs/cgroup/" + memoryadvisor.OnlineBurstableCgroupPath: {}, + "/sys/fs/cgroup/" + memoryadvisor.KubePodsCgroupPath + "/offline-besteffort-0": {}, + "/sys/fs/cgroup/" + memoryadvisor.KubePodsCgroupPath + "/offline-besteffort-1": {}, + } + + for _, entry := range result.ExtraEntries { + assert.Equal(t, consts.ControlKnobON, entry.Values[string(memoryadvisor.ControlKnowKeyDyingMemcgReclaim)]) + _, ok := expectedPaths[entry.CgroupPath] + assert.True(t, ok) + delete(expectedPaths, entry.CgroupPath) + } + assert.Empty(t, expectedPaths) +} + +func TestTransparentMemoryOffloading_GetAdvices_DyingMemcgReclaimInterval(t *testing.T) { + t.Parallel() + transparentMemoryOffloadingTestMutex.Lock() + defer transparentMemoryOffloadingTestMutex.Unlock() + defer mockey.UnPatchAll() + + tmo := &transparentMemoryOffloading{ + lastDyingCGReclaimTime: time.Now(), + } + + mockey.Mock(os.ReadDir).IncludeCurrentGoRoutine().Return([]os.DirEntry{ + mockDirEntry{name: "offline-besteffort-0", isDir: true}, + }, nil).Build() + + result := tmo.GetAdvices() + assert.Len(t, result.ExtraEntries, 0) +} + +func TestTransparentMemoryOffloading_GetAdvices_DyingMemcgReclaimReadDirError(t *testing.T) { + t.Parallel() + transparentMemoryOffloadingTestMutex.Lock() + defer transparentMemoryOffloadingTestMutex.Unlock() + defer mockey.UnPatchAll() + + tmo := &transparentMemoryOffloading{} + + mockey.Mock(os.ReadDir).IncludeCurrentGoRoutine().Return(nil, errors.New("read failed")).Build() + + result := tmo.GetAdvices() + assert.Len(t, result.ExtraEntries, 0) +} diff --git a/pkg/util/cgroup/manager/cgroup_test.go b/pkg/util/cgroup/manager/cgroup_test.go index 7573fcf0ce..462077ffd0 100644 --- a/pkg/util/cgroup/manager/cgroup_test.go +++ b/pkg/util/cgroup/manager/cgroup_test.go @@ -34,6 +34,7 @@ import ( "github.com/opencontainers/runc/libcontainer/cgroups" "github.com/stretchr/testify/assert" + "github.com/kubewharf/katalyst-core/pkg/metrics" "github.com/kubewharf/katalyst-core/pkg/util/cgroup/common" v1 "github.com/kubewharf/katalyst-core/pkg/util/cgroup/manager/v1" v2 "github.com/kubewharf/katalyst-core/pkg/util/cgroup/manager/v2" @@ -236,6 +237,60 @@ func testMemoryOffloadingWithAbsolutePath(t *testing.T) { assert.Equal(t, fmt.Sprintf("%v\n", 100), string(s)) } +func TestDyingMemcgReclaimWithAbsolutePath(t *testing.T) { + t.Parallel() + mu.Lock() + defer mu.Unlock() + + t.Run("missing reclaim file", func(t *testing.T) { + t.Parallel() + mu.Lock() + defer mu.Unlock() + tmpDir, err := os.MkdirTemp("", "dying-memcg") + assert.NoError(t, err) + defer os.RemoveAll(tmpDir) + + err = DyingMemcgReclaimWithAbsolutePath(context.TODO(), tmpDir, metrics.DummyMetrics{}, "entry", "container", machine.NewCPUSet(0)) + assert.NoError(t, err) + }) + + t.Run("reclaim with threshold", func(t *testing.T) { + t.Parallel() + mu.Lock() + defer mu.Unlock() + defer mockey.UnPatchAll() + + tmpDir, err := os.MkdirTemp("", "dying-memcg") + assert.NoError(t, err) + defer os.RemoveAll(tmpDir) + + reclaimFile := filepath.Join(tmpDir, "memory.reclaim") + err = os.WriteFile(reclaimFile, []byte(""), 0o600) + assert.NoError(t, err) + + fakeManager := &FakeCgroupManager{} + callCount := 0 + offloadCount := 0 + + mockey.Mock(GetManager).IncludeCurrentGoRoutine().Return(fakeManager).Build() + mockey.Mock((*FakeCgroupManager).GetCgroupNrDyingDescendants).IncludeCurrentGoRoutine().To(func(_ *FakeCgroupManager, _ string) (int, error) { + callCount++ + if callCount == 1 { + return DyingMemcgThreshold + 10, nil + } + return DyingMemcgThreshold - 10, nil + }).Build() + mockey.Mock(MemoryOffloadingWithAbsolutePath).IncludeCurrentGoRoutine().To(func(_ context.Context, _ string, _ int64, _ machine.CPUSet) error { + offloadCount++ + return nil + }).Build() + err = DyingMemcgReclaimWithAbsolutePath(context.TODO(), tmpDir, metrics.DummyMetrics{}, "entry", "container", machine.NewCPUSet(0)) + assert.NoError(t, err) + assert.Equal(t, 2, callCount) + assert.Equal(t, 1, offloadCount) + }) +} + func TestGetEffectiveCPUSetWithAbsolutePathV1(t *testing.T) { cgroups.TestMode = true t.Parallel() diff --git a/pkg/util/cgroup/manager/v2/fs_linux_test.go b/pkg/util/cgroup/manager/v2/fs_linux_test.go index 0a9fad9d20..062beb4199 100644 --- a/pkg/util/cgroup/manager/v2/fs_linux_test.go +++ b/pkg/util/cgroup/manager/v2/fs_linux_test.go @@ -20,6 +20,8 @@ limitations under the License. package v2 import ( + "os" + "path/filepath" "reflect" "testing" @@ -734,6 +736,71 @@ func Test_manager_GetMetrics(t *testing.T) { } } +func Test_manager_GetCgroupNrDyingDescendants(t *testing.T) { + t.Parallel() + + tests := []struct { + name string + content string + want int + wantErr bool + createFile bool + }{ + { + name: "valid content", + content: "nr_dying_descendants 123\nnr_descendants 456\n", + want: 123, + createFile: true, + }, + { + name: "invalid format", + content: "nr_dying_descendants\n", + wantErr: true, + createFile: true, + }, + { + name: "invalid number", + content: "nr_dying_descendants abc\n", + wantErr: true, + createFile: true, + }, + { + name: "missing file", + wantErr: true, + }, + } + + for _, tt := range tests { + tt := tt + t.Run(tt.name, func(t *testing.T) { + t.Parallel() + + tmpDir, err := os.MkdirTemp("", "cgroup-stat") + if err != nil { + t.Fatalf("MkdirTemp failed: %v", err) + } + defer os.RemoveAll(tmpDir) + + if tt.createFile { + statFile := filepath.Join(tmpDir, "cgroup.stat") + if err := os.WriteFile(statFile, []byte(tt.content), 0o600); err != nil { + t.Fatalf("WriteFile failed: %v", err) + } + } + + m := &manager{} + got, err := m.GetCgroupNrDyingDescendants(tmpDir) + if (err != nil) != tt.wantErr { + t.Errorf("manager.GetCgroupNrDyingDescendants() error = %v, wantErr %v", err, tt.wantErr) + return + } + if !tt.wantErr && got != tt.want { + t.Errorf("manager.GetCgroupNrDyingDescendants() = %v, want %v", got, tt.want) + } + }) + } +} + func Test_manager_GetPids(t *testing.T) { t.Parallel() From 5827e06d0166058bfba84f0bc7493c4948cc67eb Mon Sep 17 00:00:00 2001 From: zhangrenpeng Date: Wed, 18 Mar 2026 14:47:23 +0800 Subject: [PATCH 5/5] feat(sys-advisor): add a control switch for enabling or disabling dying memcg reclaim --- .../sysadvisor/qosaware/qos_aware_plugin.go | 19 ++++++++++++------- .../memory/plugin/memory_offloading.go | 9 ++++++++- .../memory/plugin/memory_offloading_test.go | 11 ++++++++--- .../sysadvisor/qosaware/qos_aware_plugin.go | 3 ++- 4 files changed, 30 insertions(+), 12 deletions(-) diff --git a/cmd/katalyst-agent/app/options/sysadvisor/qosaware/qos_aware_plugin.go b/cmd/katalyst-agent/app/options/sysadvisor/qosaware/qos_aware_plugin.go index 9234c81cff..3dd47a417a 100644 --- a/cmd/katalyst-agent/app/options/sysadvisor/qosaware/qos_aware_plugin.go +++ b/cmd/katalyst-agent/app/options/sysadvisor/qosaware/qos_aware_plugin.go @@ -30,12 +30,14 @@ import ( ) const ( - defaultQoSAwareSyncPeriod = 5 * time.Second + defaultQoSAwareSyncPeriod = 5 * time.Second + defaultEnableDyingMemcgReclaim = false ) // QoSAwarePluginOptions holds the configurations for qos aware plugin. type QoSAwarePluginOptions struct { - SyncPeriod time.Duration + SyncPeriod time.Duration + EnableDyingMemcgReclaim bool *resource.ResourceAdvisorOptions *server.QRMServerOptions @@ -46,11 +48,12 @@ type QoSAwarePluginOptions struct { // NewQoSAwarePluginOptions creates a new Options with a default config. func NewQoSAwarePluginOptions() *QoSAwarePluginOptions { return &QoSAwarePluginOptions{ - SyncPeriod: defaultQoSAwareSyncPeriod, - ResourceAdvisorOptions: resource.NewResourceAdvisorOptions(), - QRMServerOptions: server.NewQRMServerOptions(), - ReporterOptions: reporter.NewReporterOptions(), - ModelOptions: model.NewModelOptions(), + SyncPeriod: defaultQoSAwareSyncPeriod, + EnableDyingMemcgReclaim: defaultEnableDyingMemcgReclaim, + ResourceAdvisorOptions: resource.NewResourceAdvisorOptions(), + QRMServerOptions: server.NewQRMServerOptions(), + ReporterOptions: reporter.NewReporterOptions(), + ModelOptions: model.NewModelOptions(), } } @@ -59,6 +62,7 @@ func (o *QoSAwarePluginOptions) AddFlags(fss *cliflag.NamedFlagSets) { fs := fss.FlagSet("qos_aware_plugin") fs.DurationVar(&o.SyncPeriod, "qos-aware-sync-period", o.SyncPeriod, "Period for QoS aware plugin to sync") + fs.BoolVar(&o.EnableDyingMemcgReclaim, "qos-aware-enable-dying-memcg-reclaim", o.EnableDyingMemcgReclaim, "Enable to reclaim dying memcg") o.ResourceAdvisorOptions.AddFlags(fs) o.QRMServerOptions.AddFlags(fs) @@ -69,6 +73,7 @@ func (o *QoSAwarePluginOptions) AddFlags(fss *cliflag.NamedFlagSets) { // ApplyTo fills up config with options func (o *QoSAwarePluginOptions) ApplyTo(c *qosaware.QoSAwarePluginConfiguration) error { c.SyncPeriod = o.SyncPeriod + c.EnableDyingMemcgReclaim = o.EnableDyingMemcgReclaim var errList []error errList = append(errList, o.ResourceAdvisorOptions.ApplyTo(c.ResourceAdvisorConfiguration)) diff --git a/pkg/agent/sysadvisor/plugin/qosaware/resource/memory/plugin/memory_offloading.go b/pkg/agent/sysadvisor/plugin/qosaware/resource/memory/plugin/memory_offloading.go index 9cad4a0ac1..585021da50 100644 --- a/pkg/agent/sysadvisor/plugin/qosaware/resource/memory/plugin/memory_offloading.go +++ b/pkg/agent/sysadvisor/plugin/qosaware/resource/memory/plugin/memory_offloading.go @@ -204,7 +204,8 @@ type transparentMemoryOffloading struct { containerTmoEngines map[katalystcoreconsts.PodContainerName]TmoEngine cgpathTmoEngines map[string]TmoEngine - lastDyingCGReclaimTime time.Time + lastDyingCGReclaimTime time.Time + enableDyingMemcgReclaim bool } type TmoEngine interface { @@ -474,6 +475,8 @@ func NewTransparentMemoryOffloading(conf *config.Configuration, extraConfig inte emitter: emitter, containerTmoEngines: make(map[consts.PodContainerName]TmoEngine), cgpathTmoEngines: make(map[string]TmoEngine), + // enableDyingMemcgReclaim: getEnvBool(DyingMemcgReclaimEnv, true), + enableDyingMemcgReclaim: conf.QoSAwarePluginConfiguration.EnableDyingMemcgReclaim, } } @@ -663,6 +666,10 @@ func (tmo *transparentMemoryOffloading) GetAdvices() types.InternalMemoryCalcula result.ExtraEntries = append(result.ExtraEntries, entry) } + if !tmo.enableDyingMemcgReclaim { + return result + } + cgroupPaths := make([]string, 0) onlineBurstableCgroupPath := path.Join(common.CgroupFSMountPoint, memoryadvisor.OnlineBurstableCgroupPath) cgroupPaths = append(cgroupPaths, onlineBurstableCgroupPath) diff --git a/pkg/agent/sysadvisor/plugin/qosaware/resource/memory/plugin/memory_offloading_test.go b/pkg/agent/sysadvisor/plugin/qosaware/resource/memory/plugin/memory_offloading_test.go index 7ae02598da..6c28d7bea5 100644 --- a/pkg/agent/sysadvisor/plugin/qosaware/resource/memory/plugin/memory_offloading_test.go +++ b/pkg/agent/sysadvisor/plugin/qosaware/resource/memory/plugin/memory_offloading_test.go @@ -48,7 +48,9 @@ func TestTransparentMemoryOffloading_GetAdvices_DyingMemcgReclaim(t *testing.T) defer transparentMemoryOffloadingTestMutex.Unlock() defer mockey.UnPatchAll() - tmo := &transparentMemoryOffloading{} + tmo := &transparentMemoryOffloading{ + enableDyingMemcgReclaim: true, + } mockey.Mock(os.ReadDir).IncludeCurrentGoRoutine().Return([]os.DirEntry{ mockDirEntry{name: "offline-besteffort-0", isDir: true}, @@ -81,7 +83,8 @@ func TestTransparentMemoryOffloading_GetAdvices_DyingMemcgReclaimInterval(t *tes defer mockey.UnPatchAll() tmo := &transparentMemoryOffloading{ - lastDyingCGReclaimTime: time.Now(), + lastDyingCGReclaimTime: time.Now(), + enableDyingMemcgReclaim: true, } mockey.Mock(os.ReadDir).IncludeCurrentGoRoutine().Return([]os.DirEntry{ @@ -98,7 +101,9 @@ func TestTransparentMemoryOffloading_GetAdvices_DyingMemcgReclaimReadDirError(t defer transparentMemoryOffloadingTestMutex.Unlock() defer mockey.UnPatchAll() - tmo := &transparentMemoryOffloading{} + tmo := &transparentMemoryOffloading{ + enableDyingMemcgReclaim: true, + } mockey.Mock(os.ReadDir).IncludeCurrentGoRoutine().Return(nil, errors.New("read failed")).Build() diff --git a/pkg/config/agent/sysadvisor/qosaware/qos_aware_plugin.go b/pkg/config/agent/sysadvisor/qosaware/qos_aware_plugin.go index aa6f920fe8..a9c0371b18 100644 --- a/pkg/config/agent/sysadvisor/qosaware/qos_aware_plugin.go +++ b/pkg/config/agent/sysadvisor/qosaware/qos_aware_plugin.go @@ -27,7 +27,8 @@ import ( // QoSAwarePluginConfiguration stores configurations of qos aware plugin type QoSAwarePluginConfiguration struct { - SyncPeriod time.Duration + SyncPeriod time.Duration + EnableDyingMemcgReclaim bool *resource.ResourceAdvisorConfiguration *server.QRMServerConfiguration