Skip to content

Commit e084e13

Browse files
committed
use watch instead of polling
Signed-off-by: itspooya <pooyadowlat@gmail.com>
1 parent 06bc039 commit e084e13

File tree

4 files changed

+507
-99
lines changed

4 files changed

+507
-99
lines changed

README.md

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -10,9 +10,9 @@ and was created for lack of an alternative.
1010

1111
## How it works
1212

13-
The controller simply gets all Pod Disruption Budgets for each namespace and
14-
compares them to Deployments and StatefulSets. For any resource with more than
15-
1 replica and no matching Pod Disruption Budget, a default PDB will be created:
13+
The controller uses Kubernetes informers and watch functionality to detect changes in Deployments, StatefulSets and PodDisruptionBudgets.
14+
It automatically gets all Pod Disruption Budgets for each namespace and compares them to Deployments and StatefulSets.
15+
For any resource with more than 1 replica and no matching Pod Disruption Budget, a default PDB will be created:
1616

1717
```yaml
1818
apiVersion: policy/v1beta1

controller.go

Lines changed: 191 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -7,13 +7,18 @@ import (
77
"time"
88

99
log "github.com/sirupsen/logrus"
10+
appsv1 "k8s.io/api/apps/v1"
1011
v1 "k8s.io/api/core/v1"
1112
pv1 "k8s.io/api/policy/v1"
1213
"k8s.io/apimachinery/pkg/api/equality"
1314
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
15+
"k8s.io/apimachinery/pkg/runtime"
1416
"k8s.io/apimachinery/pkg/util/intstr"
17+
"k8s.io/apimachinery/pkg/watch"
1518
"k8s.io/client-go/kubernetes"
19+
"k8s.io/client-go/tools/cache"
1620
"k8s.io/client-go/util/retry"
21+
"k8s.io/client-go/util/workqueue"
1722
)
1823

1924
const (
@@ -32,81 +37,230 @@ var (
3237
// if missing.
3338
type PDBController struct {
3439
kubernetes.Interface
35-
interval time.Duration
36-
pdbNameSuffix string
37-
nonReadyTTL time.Duration
38-
parentResourceHash bool
39-
maxUnavailable intstr.IntOrString
40+
interval time.Duration // kept for backward compatibility
41+
pdbNameSuffix string
42+
nonReadyTTL time.Duration
43+
parentResourceHash bool
44+
maxUnavailable intstr.IntOrString
45+
queue workqueue.TypedRateLimitingInterface[string]
46+
deploymentInformer cache.SharedIndexInformer
47+
statefulSetInformer cache.SharedIndexInformer
48+
pdbInformer cache.SharedIndexInformer
4049
}
4150

4251
// NewPDBController initializes a new PDBController.
4352
func NewPDBController(interval time.Duration, client kubernetes.Interface, pdbNameSuffix string, nonReadyTTL time.Duration, parentResourceHash bool, maxUnavailable intstr.IntOrString) *PDBController {
44-
return &PDBController{
53+
log.Info("Initializing PDB controller with TypedRateLimitingInterface - v20250304")
54+
controller := &PDBController{
4555
Interface: client,
4656
interval: interval,
4757
pdbNameSuffix: pdbNameSuffix,
4858
nonReadyTTL: nonReadyTTL,
4959
parentResourceHash: parentResourceHash,
5060
maxUnavailable: maxUnavailable,
61+
queue: workqueue.NewTypedRateLimitingQueue(workqueue.DefaultTypedControllerRateLimiter[string]()),
5162
}
52-
}
5363

54-
// Run runs the controller loop until it receives a stop signal over the stop
55-
// channel.
56-
func (n *PDBController) Run(ctx context.Context) {
57-
for {
58-
log.Debug("Running main control loop.")
59-
err := n.runOnce(ctx)
60-
if err != nil {
61-
log.Error(err)
62-
}
64+
// Setup Deployment informer
65+
controller.deploymentInformer = cache.NewSharedIndexInformer(
66+
&cache.ListWatch{
67+
ListFunc: func(options metav1.ListOptions) (runtime.Object, error) {
68+
return client.AppsV1().Deployments(v1.NamespaceAll).List(context.Background(), options)
69+
},
70+
WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {
71+
return client.AppsV1().Deployments(v1.NamespaceAll).Watch(context.Background(), options)
72+
},
73+
},
74+
&appsv1.Deployment{},
75+
0, // resync disabled
76+
cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc},
77+
)
78+
79+
// Setup StatefulSet informer
80+
controller.statefulSetInformer = cache.NewSharedIndexInformer(
81+
&cache.ListWatch{
82+
ListFunc: func(options metav1.ListOptions) (runtime.Object, error) {
83+
return client.AppsV1().StatefulSets(v1.NamespaceAll).List(context.Background(), options)
84+
},
85+
WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {
86+
return client.AppsV1().StatefulSets(v1.NamespaceAll).Watch(context.Background(), options)
87+
},
88+
},
89+
&appsv1.StatefulSet{},
90+
0, // resync disabled
91+
cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc},
92+
)
93+
94+
// Setup PodDisruptionBudget informer
95+
controller.pdbInformer = cache.NewSharedIndexInformer(
96+
&cache.ListWatch{
97+
ListFunc: func(options metav1.ListOptions) (runtime.Object, error) {
98+
return client.PolicyV1().PodDisruptionBudgets(v1.NamespaceAll).List(context.Background(), options)
99+
},
100+
WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {
101+
return client.PolicyV1().PodDisruptionBudgets(v1.NamespaceAll).Watch(context.Background(), options)
102+
},
103+
},
104+
&pv1.PodDisruptionBudget{},
105+
0, // resync disabled
106+
cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc},
107+
)
108+
109+
// Setup event handlers
110+
controller.deploymentInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{
111+
AddFunc: controller.enqueueResource,
112+
UpdateFunc: func(old, new interface{}) {
113+
controller.enqueueResource(new)
114+
},
115+
DeleteFunc: controller.enqueueResource,
116+
})
63117

64-
select {
65-
case <-time.After(n.interval):
66-
case <-ctx.Done():
67-
log.Info("Terminating main controller loop.")
68-
return
69-
}
70-
}
118+
controller.statefulSetInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{
119+
AddFunc: controller.enqueueResource,
120+
UpdateFunc: func(old, new interface{}) {
121+
controller.enqueueResource(new)
122+
},
123+
DeleteFunc: controller.enqueueResource,
124+
})
125+
126+
controller.pdbInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{
127+
DeleteFunc: controller.enqueuePDB,
128+
})
129+
130+
return controller
71131
}
72132

73-
// runOnce runs the main reconcilation loop of the controller.
74-
func (n *PDBController) runOnce(ctx context.Context) error {
75-
allPDBs, err := n.PolicyV1().PodDisruptionBudgets(v1.NamespaceAll).List(ctx, metav1.ListOptions{})
133+
func (n *PDBController) enqueueResource(obj interface{}) {
134+
key, err := cache.MetaNamespaceKeyFunc(obj)
76135
if err != nil {
77-
return err
136+
log.Errorf("Couldn't get key for object %+v: %v", obj, err)
137+
return
138+
}
139+
n.queue.Add(key)
140+
}
141+
142+
func (n *PDBController) enqueuePDB(obj interface{}) {
143+
pdb, ok := obj.(*pv1.PodDisruptionBudget)
144+
if !ok {
145+
log.Errorf("Expected PodDisruptionBudget but got %+v", obj)
146+
return
78147
}
79148

80-
managedPDBs, unmanagedPDBs := filterPDBs(allPDBs.Items)
149+
// Only enqueue for our managed PDBs
150+
if !containLabels(pdb.Labels, ownerLabels) {
151+
return
152+
}
81153

82-
deployments, err := n.AppsV1().Deployments(v1.NamespaceAll).List(ctx, metav1.ListOptions{})
154+
key, err := cache.MetaNamespaceKeyFunc(obj)
83155
if err != nil {
84-
return err
156+
log.Errorf("Couldn't get key for object %+v: %v", obj, err)
157+
return
158+
}
159+
n.queue.Add(key)
160+
}
161+
162+
// Run runs the controller with the specified number of workers.
163+
func (n *PDBController) Run(ctx context.Context) {
164+
defer n.queue.ShutDown()
165+
166+
log.Info("Starting PDB controller")
167+
168+
// Start the informers
169+
go n.deploymentInformer.Run(ctx.Done())
170+
go n.statefulSetInformer.Run(ctx.Done())
171+
go n.pdbInformer.Run(ctx.Done())
172+
173+
// Wait for the caches to be synced
174+
log.Info("Waiting for informer caches to sync")
175+
if !cache.WaitForCacheSync(ctx.Done(),
176+
n.deploymentInformer.HasSynced,
177+
n.statefulSetInformer.HasSynced,
178+
n.pdbInformer.HasSynced) {
179+
log.Fatal("Failed to wait for caches to sync")
180+
return
85181
}
182+
log.Info("Informer caches synced")
86183

87-
statefulSets, err := n.AppsV1().StatefulSets(v1.NamespaceAll).List(ctx, metav1.ListOptions{})
184+
// Run the reconcile loop
185+
go n.worker(ctx)
186+
187+
<-ctx.Done()
188+
log.Info("Shutting down PDB controller")
189+
}
190+
191+
func (n *PDBController) worker(ctx context.Context) {
192+
for n.processNextItem(ctx) {
193+
}
194+
}
195+
196+
func (n *PDBController) processNextItem(ctx context.Context) bool {
197+
key, quit := n.queue.Get()
198+
if quit {
199+
return false
200+
}
201+
defer n.queue.Done(key)
202+
203+
err := n.reconcile(ctx, key)
88204
if err != nil {
89-
return err
205+
log.Errorf("Error processing item %s: %v", key, err)
206+
n.queue.AddRateLimited(key)
207+
return true
208+
}
209+
210+
n.queue.Forget(key)
211+
return true
212+
}
213+
214+
func (n *PDBController) reconcile(ctx context.Context, key string) error {
215+
log.Debugf("Processing key: %s", key)
216+
217+
// Process all resources and PDBs
218+
return n.runOnce(ctx)
219+
}
220+
221+
// runOnce runs the main reconcilation loop of the controller.
222+
func (n *PDBController) runOnce(ctx context.Context) error {
223+
// Get all PDBs from the informer
224+
var allPDBs []pv1.PodDisruptionBudget
225+
for _, obj := range n.pdbInformer.GetStore().List() {
226+
pdb, ok := obj.(*pv1.PodDisruptionBudget)
227+
if !ok {
228+
continue
229+
}
230+
allPDBs = append(allPDBs, *pdb)
90231
}
91232

92-
resources := make([]kubeResource, 0, len(deployments.Items)+len(statefulSets.Items))
233+
managedPDBs, unmanagedPDBs := filterPDBs(allPDBs)
93234

94-
for _, d := range deployments.Items {
235+
// Get all resources from the informers
236+
resources := make([]kubeResource, 0)
237+
238+
// Process Deployments
239+
for _, obj := range n.deploymentInformer.GetStore().List() {
240+
d, ok := obj.(*appsv1.Deployment)
241+
if !ok {
242+
continue
243+
}
95244
// manually set Kind and APIVersion because of a bug in
96245
// client-go
97246
// https://github.com/kubernetes/client-go/issues/308
98247
d.Kind = "Deployment"
99248
d.APIVersion = "apps/v1"
100-
resources = append(resources, deployment{d})
249+
resources = append(resources, deployment{*d})
101250
}
102251

103-
for _, s := range statefulSets.Items {
252+
// Process StatefulSets
253+
for _, obj := range n.statefulSetInformer.GetStore().List() {
254+
s, ok := obj.(*appsv1.StatefulSet)
255+
if !ok {
256+
continue
257+
}
104258
// manually set Kind and APIVersion because of a bug in
105259
// client-go
106260
// https://github.com/kubernetes/client-go/issues/308
107261
s.Kind = "StatefulSet"
108262
s.APIVersion = "apps/v1"
109-
resources = append(resources, statefulSet{s})
263+
resources = append(resources, statefulSet{*s})
110264
}
111265

112266
desiredPDBs := n.generateDesiredPDBs(resources, managedPDBs, unmanagedPDBs)

0 commit comments

Comments
 (0)