Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,9 @@ and was created for lack of an alternative.

## How it works

The controller simply gets all Pod Disruption Budgets for each namespace and
compares them to Deployments and StatefulSets. For any resource with more than
1 replica and no matching Pod Disruption Budget, a default PDB will be created:
The controller uses Kubernetes informers and watch functionality to detect changes in Deployments, StatefulSets and PodDisruptionBudgets.
It automatically gets all Pod Disruption Budgets for each namespace and compares them to Deployments and StatefulSets.
For any resource with more than 1 replica and no matching Pod Disruption Budget, a default PDB will be created:

```yaml
apiVersion: policy/v1beta1
Expand Down
228 changes: 191 additions & 37 deletions controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,18 @@ import (
"time"

log "github.com/sirupsen/logrus"
appsv1 "k8s.io/api/apps/v1"
v1 "k8s.io/api/core/v1"
pv1 "k8s.io/api/policy/v1"
"k8s.io/apimachinery/pkg/api/equality"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/util/intstr"
"k8s.io/apimachinery/pkg/watch"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/util/retry"
"k8s.io/client-go/util/workqueue"
)

const (
Expand All @@ -32,81 +37,230 @@ var (
// if missing.
type PDBController struct {
kubernetes.Interface
interval time.Duration
pdbNameSuffix string
nonReadyTTL time.Duration
parentResourceHash bool
maxUnavailable intstr.IntOrString
interval time.Duration // kept for backward compatibility
pdbNameSuffix string
nonReadyTTL time.Duration
parentResourceHash bool
maxUnavailable intstr.IntOrString
queue workqueue.TypedRateLimitingInterface[string]
deploymentInformer cache.SharedIndexInformer
statefulSetInformer cache.SharedIndexInformer
pdbInformer cache.SharedIndexInformer
}

// NewPDBController initializes a new PDBController.
func NewPDBController(interval time.Duration, client kubernetes.Interface, pdbNameSuffix string, nonReadyTTL time.Duration, parentResourceHash bool, maxUnavailable intstr.IntOrString) *PDBController {
return &PDBController{
log.Info("Initializing PDB controller with TypedRateLimitingInterface - v20250304")
controller := &PDBController{
Interface: client,
interval: interval,
pdbNameSuffix: pdbNameSuffix,
nonReadyTTL: nonReadyTTL,
parentResourceHash: parentResourceHash,
maxUnavailable: maxUnavailable,
queue: workqueue.NewTypedRateLimitingQueue(workqueue.DefaultTypedControllerRateLimiter[string]()),
}
}

// Run runs the controller loop until it receives a stop signal over the stop
// channel.
func (n *PDBController) Run(ctx context.Context) {
for {
log.Debug("Running main control loop.")
err := n.runOnce(ctx)
if err != nil {
log.Error(err)
}
// Setup Deployment informer
controller.deploymentInformer = cache.NewSharedIndexInformer(
&cache.ListWatch{
ListFunc: func(options metav1.ListOptions) (runtime.Object, error) {
return client.AppsV1().Deployments(v1.NamespaceAll).List(context.Background(), options)
},
WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {
return client.AppsV1().Deployments(v1.NamespaceAll).Watch(context.Background(), options)
},
},
&appsv1.Deployment{},
0, // resync disabled
cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc},
)

// Setup StatefulSet informer
controller.statefulSetInformer = cache.NewSharedIndexInformer(
&cache.ListWatch{
ListFunc: func(options metav1.ListOptions) (runtime.Object, error) {
return client.AppsV1().StatefulSets(v1.NamespaceAll).List(context.Background(), options)
},
WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {
return client.AppsV1().StatefulSets(v1.NamespaceAll).Watch(context.Background(), options)
},
},
&appsv1.StatefulSet{},
0, // resync disabled
cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc},
)

// Setup PodDisruptionBudget informer
controller.pdbInformer = cache.NewSharedIndexInformer(
&cache.ListWatch{
ListFunc: func(options metav1.ListOptions) (runtime.Object, error) {
return client.PolicyV1().PodDisruptionBudgets(v1.NamespaceAll).List(context.Background(), options)
},
WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {
return client.PolicyV1().PodDisruptionBudgets(v1.NamespaceAll).Watch(context.Background(), options)
},
},
&pv1.PodDisruptionBudget{},
0, // resync disabled
cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc},
)

// Setup event handlers
controller.deploymentInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: controller.enqueueResource,
UpdateFunc: func(old, new interface{}) {
controller.enqueueResource(new)
},
DeleteFunc: controller.enqueueResource,
})

select {
case <-time.After(n.interval):
case <-ctx.Done():
log.Info("Terminating main controller loop.")
return
}
}
controller.statefulSetInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: controller.enqueueResource,
UpdateFunc: func(old, new interface{}) {
controller.enqueueResource(new)
},
DeleteFunc: controller.enqueueResource,
})

controller.pdbInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{
DeleteFunc: controller.enqueuePDB,
})

return controller
}

// runOnce runs the main reconcilation loop of the controller.
func (n *PDBController) runOnce(ctx context.Context) error {
allPDBs, err := n.PolicyV1().PodDisruptionBudgets(v1.NamespaceAll).List(ctx, metav1.ListOptions{})
func (n *PDBController) enqueueResource(obj interface{}) {
key, err := cache.MetaNamespaceKeyFunc(obj)
if err != nil {
return err
log.Errorf("Couldn't get key for object %+v: %v", obj, err)
return
}
n.queue.Add(key)
}

func (n *PDBController) enqueuePDB(obj interface{}) {
pdb, ok := obj.(*pv1.PodDisruptionBudget)
if !ok {
log.Errorf("Expected PodDisruptionBudget but got %+v", obj)
return
}

managedPDBs, unmanagedPDBs := filterPDBs(allPDBs.Items)
// Only enqueue for our managed PDBs
if !containLabels(pdb.Labels, ownerLabels) {
return
}

deployments, err := n.AppsV1().Deployments(v1.NamespaceAll).List(ctx, metav1.ListOptions{})
key, err := cache.MetaNamespaceKeyFunc(obj)
if err != nil {
return err
log.Errorf("Couldn't get key for object %+v: %v", obj, err)
return
}
n.queue.Add(key)
}

// Run runs the controller with the specified number of workers.
func (n *PDBController) Run(ctx context.Context) {
defer n.queue.ShutDown()

log.Info("Starting PDB controller")

// Start the informers
go n.deploymentInformer.Run(ctx.Done())
go n.statefulSetInformer.Run(ctx.Done())
go n.pdbInformer.Run(ctx.Done())

// Wait for the caches to be synced
log.Info("Waiting for informer caches to sync")
if !cache.WaitForCacheSync(ctx.Done(),
n.deploymentInformer.HasSynced,
n.statefulSetInformer.HasSynced,
n.pdbInformer.HasSynced) {
log.Fatal("Failed to wait for caches to sync")
return
}
log.Info("Informer caches synced")

statefulSets, err := n.AppsV1().StatefulSets(v1.NamespaceAll).List(ctx, metav1.ListOptions{})
// Run the reconcile loop
go n.worker(ctx)

<-ctx.Done()
log.Info("Shutting down PDB controller")
}

func (n *PDBController) worker(ctx context.Context) {
for n.processNextItem(ctx) {
}
}

func (n *PDBController) processNextItem(ctx context.Context) bool {
key, quit := n.queue.Get()
if quit {
return false
}
defer n.queue.Done(key)

err := n.reconcile(ctx, key)
if err != nil {
return err
log.Errorf("Error processing item %s: %v", key, err)
n.queue.AddRateLimited(key)
return true
}

n.queue.Forget(key)
return true
}

func (n *PDBController) reconcile(ctx context.Context, key string) error {
log.Debugf("Processing key: %s", key)

// Process all resources and PDBs
return n.runOnce(ctx)
}

// runOnce runs the main reconcilation loop of the controller.
func (n *PDBController) runOnce(ctx context.Context) error {
// Get all PDBs from the informer
var allPDBs []pv1.PodDisruptionBudget
for _, obj := range n.pdbInformer.GetStore().List() {
pdb, ok := obj.(*pv1.PodDisruptionBudget)
if !ok {
continue
}
allPDBs = append(allPDBs, *pdb)
}

resources := make([]kubeResource, 0, len(deployments.Items)+len(statefulSets.Items))
managedPDBs, unmanagedPDBs := filterPDBs(allPDBs)

for _, d := range deployments.Items {
// Get all resources from the informers
resources := make([]kubeResource, 0)

// Process Deployments
for _, obj := range n.deploymentInformer.GetStore().List() {
d, ok := obj.(*appsv1.Deployment)
if !ok {
continue
}
// manually set Kind and APIVersion because of a bug in
// client-go
// https://github.com/kubernetes/client-go/issues/308
d.Kind = "Deployment"
d.APIVersion = "apps/v1"
resources = append(resources, deployment{d})
resources = append(resources, deployment{*d})
}

for _, s := range statefulSets.Items {
// Process StatefulSets
for _, obj := range n.statefulSetInformer.GetStore().List() {
s, ok := obj.(*appsv1.StatefulSet)
if !ok {
continue
}
// manually set Kind and APIVersion because of a bug in
// client-go
// https://github.com/kubernetes/client-go/issues/308
s.Kind = "StatefulSet"
s.APIVersion = "apps/v1"
resources = append(resources, statefulSet{s})
resources = append(resources, statefulSet{*s})
}

desiredPDBs := n.generateDesiredPDBs(resources, managedPDBs, unmanagedPDBs)
Expand Down
Loading
Loading