Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
107 changes: 107 additions & 0 deletions cmd/icinga-kubernetes/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ import (
"k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/client-go/informers"
v2 "k8s.io/client-go/informers/core/v1"
networkingv1 "k8s.io/client-go/informers/networking/v1"
"k8s.io/client-go/kubernetes"
kclientcmd "k8s.io/client-go/tools/clientcmd"
"k8s.io/klog/v2"
Expand Down Expand Up @@ -303,6 +304,10 @@ func main() {
return SyncServicePods(ctx, kdb, factory.Core().V1().Services(), factory.Core().V1().Pods())
})

g.Go(func() error {
return SyncIngressServiceUuid(ctx, kdb, factory.Networking().V1().Ingresses())
})

err = internal.SyncPrometheusConfig(ctx, db, &cfg.Prometheus, clusterInstance.Uuid)
if err != nil {
klog.Error(errors.Wrap(err, "cannot sync prometheus config"))
Expand Down Expand Up @@ -702,3 +707,105 @@ func SyncServicePods(ctx context.Context, db *kdatabase.Database, serviceList v2

return g.Wait()
}

func SyncIngressServiceUuid(ctx context.Context, db *kdatabase.Database, ingressList networkingv1.IngressInformer) error {
// TODO: Respect delete events. At the moment, service link entries will only be deleted if the corresponding ingress is deleted.
ingressBackendServices := make(chan any)
ingressRules := make(chan any)

g, ctx := errgroup.WithContext(ctx)

g.Go(func() error {
return db.UpsertStreamed(ctx, ingressBackendServices)
})

g.Go(func() error {
return db.UpsertStreamed(ctx, ingressRules)
})

g.Go(func() error {
ch := cachev1.Multiplexers().Services().UpsertEvents().Out()
for {
select {
case service, more := <-ch:
if !more {
return nil
}

ingresses, err := ingressList.Lister().Ingresses(service.(*schemav1.Service).Namespace).List(labels.Everything())
if err != nil {
return err
}

for _, ingress := range ingresses {
if ingress.Spec.DefaultBackend != nil {
if ingress.Spec.DefaultBackend.Service != nil {
if ingress.Spec.DefaultBackend.Service.Name == service.(*schemav1.Service).Name {
select {
case ingressBackendServices <- schemav1.IngressBackendService{
ServiceUuid: service.(*schemav1.Service).Uuid,
IngressUuid: schemav1.EnsureUUID(ingress.UID),
ServiceName: ingress.Spec.DefaultBackend.Service.Name,
ServicePortName: ingress.Spec.DefaultBackend.Service.Port.Name,
ServicePortNumber: ingress.Spec.DefaultBackend.Service.Port.Number,
}:
case <-ctx.Done():
return ctx.Err()
}
}
}
}

for _, rules := range ingress.Spec.Rules {
if rules.HTTP == nil {
continue
}

for _, ruleValue := range rules.HTTP.Paths {
if ruleValue.Backend.Service == nil {
continue
}

serviceName := ruleValue.Backend.Service.Name
if service.(*schemav1.Service).Name == serviceName {
serviceUuid := service.(*schemav1.Service).Uuid
ingressUuid := schemav1.EnsureUUID(ingress.UID)
ingressRuleUuid := schemav1.NewUUID(ingressUuid, rules.Host+ruleValue.Path+serviceName)

select {
case ingressBackendServices <- schemav1.IngressBackendService{
ServiceUuid: serviceUuid,
IngressUuid: ingressUuid,
IngressRuleUuid: ingressRuleUuid,
ServiceName: serviceName,
ServicePortName: ruleValue.Backend.Service.Port.Name,
ServicePortNumber: ruleValue.Backend.Service.Port.Number,
}:
case <-ctx.Done():
return ctx.Err()
}

select {
case ingressRules <- schemav1.IngressRule{
Uuid: ingressRuleUuid,
BackendUuid: serviceUuid,
IngressUuid: ingressUuid,
Host: schemav1.NewNullableString(rules.Host),
Path: schemav1.NewNullableString(ruleValue.Path),
PathType: string(*ruleValue.PathType),
}:
case <-ctx.Done():
return ctx.Err()
}
}
}
}
}
case <-ctx.Done():
return ctx.Err()
}
}
})

return g.Wait()
}
31 changes: 1 addition & 30 deletions pkg/schema/v1/ingress.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,16 +90,6 @@ func (i *Ingress) Obtain(k8s kmetav1.Object, clusterUuid types.UUID) {
}

if ingress.Spec.DefaultBackend != nil {
if ingress.Spec.DefaultBackend.Service != nil {
serviceUuid := NewUUID(i.Uuid, ingress.Spec.DefaultBackend.Service.Name+ingress.Spec.DefaultBackend.Service.Port.Name)
i.IngressBackendService = append(i.IngressBackendService, IngressBackendService{
ServiceUuid: serviceUuid,
IngressUuid: i.Uuid,
ServiceName: ingress.Spec.DefaultBackend.Service.Name,
ServicePortName: ingress.Spec.DefaultBackend.Service.Port.Name,
ServicePortNumber: ingress.Spec.DefaultBackend.Service.Port.Number,
})
}
if ingress.Spec.DefaultBackend.Resource != nil {
resourceUuid := NewUUID(i.Uuid, ingress.Spec.DefaultBackend.Resource.Kind+ingress.Spec.DefaultBackend.Resource.Name)
var apiGroup sql.NullString
Expand All @@ -125,26 +115,7 @@ func (i *Ingress) Obtain(k8s kmetav1.Object, clusterUuid types.UUID) {
for _, ruleValue := range rules.IngressRuleValue.HTTP.Paths {
// It is safe to use the pointer directly here.
pathType := string(*ruleValue.PathType)
if ruleValue.Backend.Service != nil {
ingressRuleUuid := NewUUID(i.Uuid, rules.Host+ruleValue.Path+ruleValue.Backend.Service.Name)
serviceUuid := NewUUID(ingressRuleUuid, ruleValue.Backend.Service.Name)
i.IngressBackendService = append(i.IngressBackendService, IngressBackendService{
ServiceUuid: serviceUuid,
IngressUuid: i.Uuid,
IngressRuleUuid: ingressRuleUuid,
ServiceName: ruleValue.Backend.Service.Name,
ServicePortName: ruleValue.Backend.Service.Port.Name,
ServicePortNumber: ruleValue.Backend.Service.Port.Number,
})
i.IngressRule = append(i.IngressRule, IngressRule{
Uuid: ingressRuleUuid,
BackendUuid: serviceUuid,
IngressUuid: i.Uuid,
Host: NewNullableString(rules.Host),
Path: NewNullableString(ruleValue.Path),
PathType: pathType,
})
} else if ruleValue.Backend.Resource != nil {
if ruleValue.Backend.Resource != nil {
ingressRuleUuid := NewUUID(i.Uuid, rules.Host+ruleValue.Path+ruleValue.Backend.Resource.Name)
resourceUuid := NewUUID(ingressRuleUuid, ruleValue.Backend.Resource.Name)
var apiGroup sql.NullString
Expand Down
Loading