Skip to content

[Coscheduling] Small Improve PodGroup lookup by adding a namespace+pod-group indexer #932

@JasonHe-WQ

Description

@JasonHe-WQ

Background

  • Scheduler-plugins uses listers with label selectors (e.g. scheduling.x-k8s.io/pod-group) to look up Pods that belong to a given PodGroup.
  • In larger clusters and namespaces, the lister path ends up scanning the whole namespace because we do not pre-index the Pods by PodGroup.
  • I have been experimenting with a custom informer index that keys Pods on namespace/pod-group to avoid the repeated label-selector scans.

Proposed change

  1. Extend the Pod informer setup to register a namespace+pod-group custom index (working branch touches indexer.go).
  2. Update the lookup helpers in lister.go to query through the new index instead of iterating through all Pods via selectors.
  3. Deduplicate the common logic that currently lives in both indexer.go and lister.go so the namespace+PodGroup key construction lives in one place.
  4. Keep the old code path as a fallback until we are confident every caller is covered by the index (open to feedback here).

Compatibility & risk assessment

  • No API surface changes are planned; this is an internal informer/indexer enhancement.
  • The custom index slightly increases informer memory usage (one additional key per Pod) but avoids repeated scans of large Pod sets.
  • The change should be guarded by unit tests to ensure we return the same Pod set as the selector path.

Performance data (local experiment)

Command (go 1.22.1, amd64 Linux, kubeconfig pointing at a production-sized cluster):

go run main.go -namespace=shai-core

Results (30 runs per method while searching for PodGroup exp-20250917-105355-t0):

baseline:

package main

import (
	"context"
	"flag"
	"fmt"
	"os"
	"path/filepath"
	"time"

	corev1 "k8s.io/api/core/v1"
	"k8s.io/apimachinery/pkg/labels"
	"k8s.io/client-go/informers"
	"k8s.io/client-go/kubernetes"
	"k8s.io/client-go/rest"
	"k8s.io/client-go/tools/cache"
	"k8s.io/client-go/tools/clientcmd"
)

// getKubeConfig tries in-cluster first, then falls back to local kubeconfig.
func getKubeConfig(kubeconfigPath string) (*rest.Config, error) {
	// Try in-cluster
	if cfg, err := rest.InClusterConfig(); err == nil {
		return cfg, nil
	}
	// Fall back to local kubeconfig
	if kubeconfigPath == "" {
		home, _ := os.UserHomeDir()
		kubeconfigPath = filepath.Join(home, ".kube", "config")
	}
	return clientcmd.BuildConfigFromFlags("", kubeconfigPath)
}

func main() {
	var (
		kubeconfig string
		labelKey   string
		labelValue string
		namespace  string
		repeat     int
		warmup     int
	)
	flag.StringVar(&kubeconfig, "kubeconfig", "", "Path to kubeconfig; empty means in-cluster first, then ~/.kube/config")
	flag.StringVar(&labelKey, "label-key", "scheduling.x-k8s.io/pod-group", "Label key to query")
	flag.StringVar(&labelValue, "label-value", "exp-20250917-105355-t0", "Label value to query")
	flag.StringVar(&namespace, "namespace", "", "Target namespace; empty for all namespaces")
	flag.IntVar(&repeat, "repeat", 30, "Number of timed queries to run")
	flag.IntVar(&warmup, "warmup", 5, "Warm-up queries before timing (not measured)")
	flag.Parse()

	cfg, err := getKubeConfig(kubeconfig)
	if err != nil {
		panic(fmt.Errorf("failed to get kubeconfig: %w", err))
	}
	clientset, err := kubernetes.NewForConfig(cfg)
	if err != nil {
		panic(fmt.Errorf("failed to create clientset: %w", err))
	}

	// Create a SharedInformerFactory with 0 resync (event-driven)
	factory := informers.NewSharedInformerFactory(clientset, 0)
	podInformer := factory.Core().V1().Pods()
	inf := podInformer.Informer()

	ctx, cancel := context.WithCancel(context.Background())
	defer cancel()

	// Start informers and wait for initial cache sync
	factory.Start(ctx.Done())
	if !cache.WaitForCacheSync(ctx.Done(), inf.HasSynced) {
		panic("failed to sync informer cache")
	}

	// Get lister and indexer
	lister := podInformer.Lister()

	// Count all pods (cluster-wide or namespaced)
	var allPods []*corev1.Pod
	var listErr error
	if namespace == "" {
		allPods, listErr = lister.List(labels.Everything())
	} else {
		allPods, listErr = podInformer.Lister().Pods(namespace).List(labels.Everything())
	}
	if listErr != nil {
		panic(fmt.Errorf("failed to list all pods: %w", listErr))
	}
	fmt.Printf("Total pods in cluster (or namespace if set): %d\n", len(allPods))

	// Prepare selector for label-based scan
	selector := labels.SelectorFromSet(labels.Set{labelKey: labelValue})
	fmt.Printf("Looking for pods with label selector: %s=%s\n", labelKey, labelValue)

	// Warm-up (not timed): ensures any lazy paths and CPU caches are primed
	for i := 0; i < warmup; i++ {
		if namespace == "" {
			_, _ = lister.List(selector)
		} else {
			_, _ = podInformer.Lister().Pods(namespace).List(selector)
		}
	}

	// Timed queries using Lister + LabelSelector (linear scan over store)
	start := time.Now()
	var hits []*corev1.Pod
	for i := 0; i < repeat; i++ {
		if namespace == "" {
			hits, listErr = lister.List(selector)
		} else {
			hits, listErr = podInformer.Lister().Pods(namespace).List(selector)
		}
		if listErr != nil {
			panic(fmt.Errorf("failed labeled list: %w", listErr))
		}
	}
	elapsed := time.Since(start)

	avg := time.Duration(0)
	if repeat > 0 {
		avg = elapsed / time.Duration(repeat)
	}

	fmt.Printf("[Lister+Selector] label %q=%q -> hits=%d, repeat=%d, total=%s, avg=%s\n",
		labelKey, labelValue, len(hits), repeat, elapsed, avg)
}

results:

Total pods in cluster (or namespace if set): 25375
Looking for pods with label selector: scheduling.x-k8s.io/pod-group=exp-20250917-105355-t0
[Lister+Selector] label "scheduling.x-k8s.io/pod-group"="exp-20250917-105355-t0" -> hits=0, repeat=30, total=489.33558ms, avg=16.311186ms

improved version:

package main

import (
	"context"
	"flag"
	"fmt"
	"os"
	"path/filepath"
	"time"

	corev1 "k8s.io/api/core/v1"
	"k8s.io/apimachinery/pkg/labels"
	"k8s.io/client-go/informers"
	"k8s.io/client-go/kubernetes"
	"k8s.io/client-go/rest"
	"k8s.io/client-go/tools/cache"
	"k8s.io/client-go/tools/clientcmd"
)

// getKubeConfig tries in-cluster first, then falls back to local kubeconfig.
func getKubeConfig(kubeconfigPath string) (*rest.Config, error) {
	// Try in-cluster
	if cfg, err := rest.InClusterConfig(); err == nil {
		return cfg, nil
	}
	// Fall back to local kubeconfig
	if kubeconfigPath == "" {
		home, _ := os.UserHomeDir()
		kubeconfigPath = filepath.Join(home, ".kube", "config")
	}
	return clientcmd.BuildConfigFromFlags("", kubeconfigPath)
}

func main() {
	var (
		kubeconfig string
		labelKey   string
		labelValue string
		namespace  string
		repeat     int
		warmup     int
		indexName  string
	)
	flag.StringVar(&kubeconfig, "kubeconfig", "", "Path to kubeconfig; empty means in-cluster first, then ~/.kube/config")
	flag.StringVar(&labelKey, "label-key", "scheduling.x-k8s.io/pod-group", "Label key to index and query")
	flag.StringVar(&labelValue, "label-value", "exp-20250917-105355-t0", "Label value to query via index")
	flag.StringVar(&namespace, "namespace", "", "Target namespace (REQUIRED for index query)")
	flag.IntVar(&repeat, "repeat", 30, "Number of timed queries to run")
	flag.IntVar(&warmup, "warmup", 5, "Warm-up queries before timing (not measured)")
	flag.StringVar(&indexName, "index-name", "byNamespaceAndPodGroupLabel", "Custom index name")
	flag.Parse()

	if namespace == "" {
		fmt.Fprintln(os.Stderr, "ERROR: --namespace is required when using a namespace+label index key")
		os.Exit(2)
	}

	cfg, err := getKubeConfig(kubeconfig)
	if err != nil {
		panic(fmt.Errorf("failed to get kubeconfig: %w", err))
	}
	clientset, err := kubernetes.NewForConfig(cfg)
	if err != nil {
		panic(fmt.Errorf("failed to create clientset: %w", err))
	}

	// Always build a CLUSTER-WIDE SharedInformerFactory (alignment between lister and indexer).
	factory := informers.NewSharedInformerFactory(clientset, 0)

	// Typed Pod informer handle.
	podInformer := factory.Core().V1().Pods()

	// IMPORTANT: instantiate informer BEFORE Start so it is registered and can be started.
	inf := podInformer.Informer()

	// IMPORTANT: Add indexers BEFORE starting the informer.
	// Index key format: "<namespace>/<labelValue>"
	indexFunc := func(obj interface{}) ([]string, error) {
		pod, ok := obj.(*corev1.Pod)
		if !ok || pod == nil {
			return nil, nil
		}
		if pod.Labels == nil {
			return nil, nil
		}
		if val, ok := pod.Labels[labelKey]; ok && val != "" {
			return []string{pod.Namespace + "/" + val}, nil
		}
		return nil, nil
	}
	if err := inf.AddIndexers(cache.Indexers{indexName: indexFunc}); err != nil {
		panic(fmt.Errorf("failed to add indexer: %w", err))
	}

	// Start informers and wait for THIS informer's cache to sync.
	ctx, cancel := context.WithCancel(context.Background())
	defer cancel()

	factory.Start(ctx.Done())
	if !cache.WaitForCacheSync(ctx.Done(), inf.HasSynced) {
		panic("pod informer cache failed to sync")
	}

	// Reference stats: total pods in cluster (same cluster-wide informer for alignment).
	lister := podInformer.Lister()
	allPods, err := lister.List(labels.Everything())
	if err != nil {
		panic(fmt.Errorf("failed to list all pods: %w", err))
	}
	fmt.Printf("Total pods in cluster: %d\n", len(allPods))

	// Optional per-namespace count (uses the same cluster-wide informer).
	nsPods, err := lister.Pods(namespace).List(labels.Everything())
	if err != nil {
		panic(fmt.Errorf("failed to list pods in namespace %q: %w", namespace, err))
	}
	fmt.Printf("Pods in namespace %q: %d\n", namespace, len(nsPods))

	// Prepare index key and warm-up (not timed).
	indexer := inf.GetIndexer()
	key := namespace + "/" + labelValue
	for i := 0; i < warmup; i++ {
		_, _ = indexer.ByIndex(indexName, key)
	}

	// Timed queries using the namespace+label index.
	start := time.Now()
	var objs []interface{}
	for i := 0; i < repeat; i++ {
		objs, err = indexer.ByIndex(indexName, key)
		if err != nil {
			panic(fmt.Errorf("index query failed: %w", err))
		}
	}
	elapsed := time.Since(start)
	var avg time.Duration
	if repeat > 0 {
		avg = elapsed / time.Duration(repeat)
	}

	// Convert []interface{} to []*corev1.Pod (mostly for type safety; hits should already be in the namespace).
	hits := 0
	for _, o := range objs {
		if p, ok := o.(*corev1.Pod); ok && p.Namespace == namespace {
			// No extra label check needed; the index key already enforces labelValue.
			hits++
		}
	}

	fmt.Printf("[Indexer] ns=%q, label %q=%q (index=%q) -> hits=%d, repeat=%d, total=%s, avg=%s\n",
		namespace, labelKey, labelValue, indexName, hits, repeat, elapsed, avg)
}

result:


Total pods in cluster: 137671
Pods in namespace "shai-core": 25763
[Indexer] ns="shai-core", label "scheduling.x-k8s.io/pod-group"="exp-20250917-105355-t0" (index="byNamespaceAndPodGroupLabel") -> hits=0, repeat=30, total=2.384µs, avg=79ns

While the selector path is not currently a bottleneck, the custom index consistently yields ~200x faster lookups in my environment.

Open questions

  • Should we keep the selector code path as a fallback, or is it acceptable to rely solely on the index once it is available?
  • Is this small improvement worth the extra code complexity?

Confess

  • I DO use AI to improve my issue content since my English is not fluent, and I am a nearly fresh gay for open resource.

Happy to submit a PR that includes the informer wiring, lister changes, and tests if the direction sounds reasonable.

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions