From 6e358b52233b001df624275ac2f20046825008c2 Mon Sep 17 00:00:00 2001 From: Johannes Rauh Date: Thu, 30 Nov 2023 14:56:20 +0100 Subject: [PATCH] Add filter on namespaces --- cmd/icinga-kubernetes/main.go | 32 +++++++++++++++++++++++--------- pkg/schema/scoped_resource.go | 26 ++++++++++++++++++++++++++ pkg/sync/sync.go | 19 ++++++++++++++----- 3 files changed, 63 insertions(+), 14 deletions(-) create mode 100644 pkg/schema/scoped_resource.go diff --git a/cmd/icinga-kubernetes/main.go b/cmd/icinga-kubernetes/main.go index 90924229..5836b122 100644 --- a/cmd/icinga-kubernetes/main.go +++ b/cmd/icinga-kubernetes/main.go @@ -11,15 +11,27 @@ import ( "github.com/icinga/icinga-kubernetes/pkg/sync" "github.com/okzk/sdnotify" "github.com/pkg/errors" + "github.com/spf13/pflag" "golang.org/x/sync/errgroup" + kmetav1 "k8s.io/apimachinery/pkg/apis/meta/v1" kinformers "k8s.io/client-go/informers" "k8s.io/client-go/kubernetes" kclientcmd "k8s.io/client-go/tools/clientcmd" ) func main() { - kconfig, err := kclientcmd.NewNonInteractiveDeferredLoadingClientConfig( - kclientcmd.NewDefaultClientConfigLoadingRules(), &kclientcmd.ConfigOverrides{}).ClientConfig() + var configPath string + pflag.StringVarP(&configPath, "config", "c", "./config.yml", "path to config file") + + kconfigOverrides := &kclientcmd.ConfigOverrides{} + kclientcmd.BindOverrideFlags(kconfigOverrides, pflag.CommandLine, kclientcmd.RecommendedConfigOverrideFlags("")) + + kclientconfig := kclientcmd.NewNonInteractiveDeferredLoadingClientConfig( + kclientcmd.NewDefaultClientConfigLoadingRules(), kconfigOverrides) + + pflag.Parse() + + kconfig, err := kclientconfig.ClientConfig() if err != nil { logging.Fatal(errors.Wrap(err, "can't configure Kubernetes client")) } @@ -29,12 +41,14 @@ func main() { logging.Fatal(errors.Wrap(err, "can't create Kubernetes client")) } - flags, err := config.ParseFlags[internal.Flags]() + namespace, overridden, err := kclientconfig.Namespace() if err != nil { - logging.Fatal(errors.Wrap(err, "can't parse flags")) + logging.Fatal(errors.Wrap(err, "can't get namespace from CLI")) + } else if !overridden { + namespace = kmetav1.NamespaceAll } - cfg, err := config.FromYAMLFile[internal.Config](flags.Config) + cfg, err := config.FromYAMLFile[internal.Config](configPath) if err != nil { logging.Fatal(errors.Wrap(err, "can't create configuration")) } @@ -66,26 +80,26 @@ func main() { } } - informers := kinformers.NewSharedInformerFactory(k, 0) + informers := kinformers.NewSharedInformerFactoryWithOptions(k, 0, kinformers.WithNamespace(namespace)) g, ctx := errgroup.WithContext(ctx) g.Go(func() error { return sync.NewSync( db, schema.NewNode, informers.Core().V1().Nodes().Informer(), logs.GetChildLogger("Nodes"), - ).Run(ctx) + ).Run(ctx, namespace) }) g.Go(func() error { return sync.NewSync( db, schema.NewNamespace, informers.Core().V1().Namespaces().Informer(), logs.GetChildLogger("Namespaces"), - ).Run(ctx) + ).Run(ctx, namespace) }) g.Go(func() error { return sync.NewSync( db, schema.NewPod, informers.Core().V1().Pods().Informer(), logs.GetChildLogger("Pods"), - ).Run(ctx) + ).Run(ctx, namespace) }) if err := g.Wait(); err != nil { diff --git a/pkg/schema/scoped_resource.go b/pkg/schema/scoped_resource.go new file mode 100644 index 00000000..38f08b05 --- /dev/null +++ b/pkg/schema/scoped_resource.go @@ -0,0 +1,26 @@ +package schema + +import ( + "github.com/icinga/icinga-go-library/database" + "github.com/icinga/icinga-kubernetes/pkg/contracts" +) + +type ScopedResource struct { + contracts.Resource + scope interface{} +} + +func (r *ScopedResource) Scope() interface{} { + return r.scope +} + +func (r *ScopedResource) TableName() string { + return database.TableName(r.Resource) +} + +func NewScopedResource(resource contracts.Resource, scope interface{}) *ScopedResource { + return &ScopedResource{ + Resource: resource, + scope: scope, + } +} diff --git a/pkg/sync/sync.go b/pkg/sync/sync.go index d0d788db..b395bc8a 100644 --- a/pkg/sync/sync.go +++ b/pkg/sync/sync.go @@ -7,15 +7,18 @@ import ( "github.com/icinga/icinga-go-library/database" "github.com/icinga/icinga-go-library/logging" "github.com/icinga/icinga-kubernetes/pkg/contracts" + "github.com/icinga/icinga-kubernetes/pkg/schema" "github.com/icinga/icinga-kubernetes/pkg/sink" "github.com/pkg/errors" "go.uber.org/zap" "golang.org/x/sync/errgroup" + kmetav1 "k8s.io/apimachinery/pkg/apis/meta/v1" kcache "k8s.io/client-go/tools/cache" + "reflect" ) type Sync interface { - Run(context.Context) error + Run(context.Context, string) error } type sync struct { @@ -39,12 +42,12 @@ func NewSync( } } -func (s *sync) Run(ctx context.Context) error { +func (s *sync) Run(ctx context.Context, namespace string) error { s.logger.Info("Starting sync") s.logger.Debug("Warming up") - err := s.Warmup(ctx) + err := s.Warmup(ctx, namespace) if err != nil { return errors.Wrap(err, "warmup failed") } @@ -139,13 +142,19 @@ func (s *sync) Run(ctx context.Context) error { return g.Wait() } -func (s *sync) Warmup(ctx context.Context) error { +func (s *sync) Warmup(ctx context.Context, namespace string) error { g, ctx := errgroup.WithContext(ctx) resource := s.factory() + resourceType := reflect.TypeOf(resource).Elem() + + if _, found := resourceType.FieldByName("kmetaWithNamespace"); found && namespace != kmetav1.NamespaceAll { + resource = schema.NewScopedResource(resource, &struct{ Namespace string }{}) + } + entities, err := s.db.YieldAll(ctx, func() database.Entity { return s.factory() - }, s.db.BuildSelectStmt(resource, resource.Fingerprint()), struct{}{}) + }, s.db.BuildSelectStmt(resource, resource.Fingerprint()), struct{ Namespace string }{Namespace: namespace}) com.ErrgroupReceive(ctx, g, err) g.Go(func() error {