diff --git a/cmd/mysql-agent/app/mysql_agent.go b/cmd/mysql-agent/app/mysql_agent.go index 85c1eba95..7cb76f4d4 100644 --- a/cmd/mysql-agent/app/mysql_agent.go +++ b/cmd/mysql-agent/app/mysql_agent.go @@ -22,6 +22,8 @@ import ( "strconv" "sync" "time" + "os" + "fmt" "github.com/golang/glog" "github.com/heptiolabs/healthcheck" @@ -100,15 +102,17 @@ func Run(opts *agentopts.MySQLAgentOpts) error { if err != nil { return errors.Wrap(err, "failed to create new local MySQL InnoDB cluster manager") } - - // Initialise the agent metrics. + // agent prometheus port + agentPromePort := os.Getenv("AGENT_PROME_PORT") + promeMetricsEndpoint := fmt.Sprintf("0.0.0.0: %s", agentPromePort) + glog.Info("agent prometheus endpoint: ", promeMetricsEndpoint) metrics.RegisterPodName(opts.Hostname) metrics.RegisterClusterName(manager.Instance.ClusterName) clustermgr.RegisterMetrics() backupcontroller.RegisterMetrics() restorecontroller.RegisterMetrics() http.Handle("/metrics", prometheus.Handler()) - go http.ListenAndServe(metricsEndpoint, nil) + go http.ListenAndServe(promeMetricsEndpoint, nil) // Block until local instance successfully initialised. for !manager.Sync(ctx) { diff --git a/pkg/apis/mysql/v1alpha1/helpers.go b/pkg/apis/mysql/v1alpha1/helpers.go index df25b1465..c5a0bc18d 100644 --- a/pkg/apis/mysql/v1alpha1/helpers.go +++ b/pkg/apis/mysql/v1alpha1/helpers.go @@ -22,8 +22,13 @@ import ( const ( // DefaultVersion is the MySQL version to use if not specified explicitly by user DefaultVersion = "8.0.12" + DefaultReplicationGroupPort = 33061 + DefaultAgentHealthCheckPort = 10512 + DefaultAgentPromePort = 8080 + DefaultMysqlPort = 3306 defaultMembers = 3 defaultBaseServerID = 1000 + defaultAgentIntervalTime = 15 // maxBaseServerID is the maximum safe value for BaseServerID calculated // as max MySQL server_id value - max Replication Group size. maxBaseServerID uint32 = 4294967295 - 9 @@ -69,6 +74,24 @@ func (c *Cluster) EnsureDefaults() *Cluster { c.Spec.Version = DefaultVersion } + if c.Spec.GroupPort == 0 { + c.Spec.GroupPort = DefaultReplicationGroupPort + } + + if c.Spec.AgentCheckPort == 0 { + c.Spec.AgentCheckPort = DefaultAgentHealthCheckPort + } + + if c.Spec.AgentPromePort == 0 { + c.Spec.AgentPromePort = DefaultAgentPromePort + } + + if c.Spec.MysqlPort == 0 { + c.Spec.MysqlPort = DefaultMysqlPort + } + if c.Spec.AgentIntervalTime == 0 { + c.Spec.AgentIntervalTime = defaultAgentIntervalTime + } return c } diff --git a/pkg/apis/mysql/v1alpha1/types.go b/pkg/apis/mysql/v1alpha1/types.go index dcc8735cf..62f358cac 100644 --- a/pkg/apis/mysql/v1alpha1/types.go +++ b/pkg/apis/mysql/v1alpha1/types.go @@ -43,6 +43,18 @@ type ClusterSpec struct { // all instances will be R/W. If false (the default), only a single instance // will be R/W and the rest will be R/O. MultiMaster bool `json:"multiMaster,omitempty"` + //hostnetwork + HostNetwork bool `json:"hostNetwork,omitempty"` + //replicationGroupPort + GroupPort uint32 `json:"groupPort,omitempty"` + //agent healthcheck port + AgentCheckPort uint32 `json:"agentCheckPort,omitempty"` + //agent prometheus port + AgentPromePort uint32 `json:"agentPromePort,omitempty"` + //mysql_port + MysqlPort uint32 `json:"mysqlPort,omitempty"` + //agent execute interval + AgentIntervalTime uint32 `json:"agentIntervalTime,omitempty"` // NodeSelector is a selector which must be true for the pod to fit on a node. // Selector which must match a node's labels for the pod to be scheduled on that node. // More info: https://kubernetes.io/docs/concepts/configuration/assign-pod-node/ diff --git a/pkg/cluster/innodb/innodb.go b/pkg/cluster/innodb/innodb.go index eb2103646..316658849 100644 --- a/pkg/cluster/innodb/innodb.go +++ b/pkg/cluster/innodb/innodb.go @@ -17,6 +17,7 @@ package innodb import ( "fmt" "net" + "os" ) // DefaultClusterName is the default name assigned to InnoDB clusters created by @@ -123,7 +124,7 @@ func (s *ClusterStatus) GetInstanceStatus(name string) InstanceStatus { if s.DefaultReplicaSet.Topology == nil { return InstanceStatusNotFound } - if is, ok := s.DefaultReplicaSet.Topology[fmt.Sprintf("%s:%d", name, MySQLDBPort)]; ok { + if is, ok := s.DefaultReplicaSet.Topology[fmt.Sprintf("%s:%s", name, os.Getenv("MYSQL_PORT"))]; ok { return is.Status } return InstanceStatusNotFound diff --git a/pkg/cluster/instance.go b/pkg/cluster/instance.go index 16e448db6..3ced3bcbb 100644 --- a/pkg/cluster/instance.go +++ b/pkg/cluster/instance.go @@ -24,7 +24,7 @@ import ( "github.com/pkg/errors" - "github.com/oracle/mysql-operator/pkg/cluster/innodb" + //"github.com/oracle/mysql-operator/pkg/cluster/innodb" ) // Instance represents the local MySQL instance. @@ -62,18 +62,19 @@ func NewInstance(namespace, clusterName, parentName string, ordinal, port int, m // NewLocalInstance creates a new instance of this structure, with it's name and index // populated from os.Hostname(). func NewLocalInstance() (*Instance, error) { - hostname, err := os.Hostname() - if err != nil { - return nil, err + pod_name := os.Getenv("MY_POD_NAME") + if pod_name == "" { + return nil, errors.Errorf("env MY_POD_NAME is empty!!!") } - name, ordinal := GetParentNameAndOrdinal(hostname) + name, ordinal := GetParentNameAndOrdinal(pod_name) multiMaster, _ := strconv.ParseBool(os.Getenv("MYSQL_CLUSTER_MULTI_MASTER")) + mysqlPort, _ := strconv.ParseInt(os.Getenv("MYSQL_PORT"), 10, 32) return &Instance{ Namespace: os.Getenv("POD_NAMESPACE"), ClusterName: os.Getenv("MYSQL_CLUSTER_NAME"), ParentName: name, Ordinal: ordinal, - Port: innodb.MySQLDBPort, + Port: int(mysqlPort), MultiMaster: multiMaster, IP: net.ParseIP(os.Getenv("MY_POD_IP")), }, nil @@ -90,12 +91,13 @@ func NewInstanceFromGroupSeed(seed string) (*Instance, error) { // MySQLDB port not its group replication port. parentName, ordinal := GetParentNameAndOrdinal(podName) multiMaster, _ := strconv.ParseBool(os.Getenv("MYSQL_CLUSTER_MULTI_MASTER")) + mysqlPort, _ := strconv.ParseInt(os.Getenv("MYSQL_PORT"), 10, 32) return &Instance{ ClusterName: os.Getenv("MYSQL_CLUSTER_NAME"), Namespace: os.Getenv("POD_NAMESPACE"), ParentName: parentName, Ordinal: ordinal, - Port: innodb.MySQLDBPort, + Port: int(mysqlPort), MultiMaster: multiMaster, }, nil } diff --git a/pkg/controllers/cluster/manager/cluster_manager.go b/pkg/controllers/cluster/manager/cluster_manager.go index 3f3648a0b..f953b45a0 100644 --- a/pkg/controllers/cluster/manager/cluster_manager.go +++ b/pkg/controllers/cluster/manager/cluster_manager.go @@ -17,7 +17,9 @@ package manager import ( "context" "fmt" + "os" "strings" + "strconv" "time" "github.com/golang/glog" @@ -36,7 +38,7 @@ import ( "github.com/oracle/mysql-operator/pkg/util/mysqlsh" ) -const pollingIntervalSeconds = 15 +const pollingIntervalSeconds = 60 // ClusterManager manages the local MySQL instance's membership of an InnoDB cluster. type ClusterManager struct { @@ -100,6 +102,7 @@ func (m *ClusterManager) getClusterStatus(ctx context.Context) (*innodb.ClusterS if localMSHErr != nil { var err error clusterStatus, err = getClusterStatusFromGroupSeeds(ctx, m.kubeClient, m.Instance) + glog.V(2).Infof("get cluster seeds*** error: %+v", err) if err != nil { // NOTE: We return the localMSHErr rather than the error here so that we // can dispatch on it. @@ -122,6 +125,7 @@ func (m *ClusterManager) Sync(ctx context.Context) bool { clusterStatus, err := m.getClusterStatus(ctx) if err != nil { myshErr, ok := errors.Cause(err).(*mysqlsh.Error) + glog.V(2).Infof("get cluster*** error: %+v", err) if !ok { glog.Errorf("Failed to get the cluster status: %+v", err) return false @@ -289,10 +293,17 @@ func (m *ClusterManager) handleInstanceNotFound(ctx context.Context, primaryAddr glog.Errorf("Getting CIDR to whitelist for GR: %v", err) return false } + //use deault mysqlPort + 1 + //localAddress := fmt.Sprintf("%s:%s", m.Instance.Name(), os.Getenv("GROUP_PORT")) + //groupSeeds := os.Getenv("REPLICATION_GROUP_SEEDS") + + //glog.Infof("localAddress: %s, groupSeeds: %s", localAddress, groupSeeds) if err := psh.AddInstanceToCluster(ctx, m.Instance.GetShellURI(), mysqlsh.Options{ "memberSslMode": "REQUIRED", "ipWhitelist": whitelistCIDR, + // "localAddress": localAddress, + // "groupSeeds": groupSeeds, }); err != nil { glog.Errorf("Failed to add to cluster: %v", err) return false @@ -322,9 +333,18 @@ func (m *ClusterManager) createCluster(ctx context.Context) (*innodb.ClusterStat if err != nil { return nil, errors.Wrap(err, "getting CIDR to whitelist for GR") } + + // use deault mysql_port + 1 + //localAddress := fmt.Sprintf("%s:%s", m.Instance.Name(), os.Getenv("GROUP_PORT")) + //groupSeeds := os.Getenv("REPLICATION_GROUP_SEEDS") + + //glog.Infof("localAddress: %s, groupSeeds: %s", localAddress, groupSeeds) + opts := mysqlsh.Options{ "memberSslMode": "REQUIRED", "ipWhitelist": whitelistCIDR, + // "localAddress": localAddress, + // "groupSeeds": groupSeeds, } if m.Instance.MultiMaster { opts["force"] = "True" @@ -355,7 +375,10 @@ func (m *ClusterManager) rebootFromOutage(ctx context.Context) (*innodb.ClusterS // Run runs the ClusterManager controller. // NOTE: ctx is not currently used for cancellation by caller (the stopCh is). func (m *ClusterManager) Run(ctx context.Context) { - wait.Until(func() { m.Sync(ctx) }, time.Second*pollingIntervalSeconds, ctx.Done()) + interval_time, _ := strconv.ParseUint(os.Getenv("AGENT_INTERVAL"), 10, 32) + glog.Info("***agent run interval: ", interval_time) + //wait.Until(func() { m.Sync(ctx) }, time.Second*pollingIntervalSeconds, ctx.Done()) + wait.Until(func() { m.Sync(ctx) }, time.Second*time.Duration(interval_time), ctx.Done()) <-ctx.Done() diff --git a/pkg/controllers/cluster/manager/innodb_cluster.go b/pkg/controllers/cluster/manager/innodb_cluster.go index 2039cc1b3..289382a06 100644 --- a/pkg/controllers/cluster/manager/innodb_cluster.go +++ b/pkg/controllers/cluster/manager/innodb_cluster.go @@ -35,7 +35,7 @@ import ( var errNoClusterFound = errors.New("no cluster found on any of the seed nodes") -const defaultTimeout = 10 * time.Second +const defaultTimeout = 60 * time.Second // isDatabaseRunning returns true if a connection can be made to the MySQL // database running in the pod instance in which this function is called. @@ -46,6 +46,7 @@ func isDatabaseRunning(ctx context.Context) bool { "mysqladmin", "--protocol", "tcp", "-u", "root", + os.ExpandEnv("-P$MYSQL_PORT"), os.ExpandEnv("-p$MYSQL_ROOT_PASSWORD"), "status", ).Run() diff --git a/pkg/options/agent/options.go b/pkg/options/agent/options.go index 77ccd7501..8fd8ba580 100644 --- a/pkg/options/agent/options.go +++ b/pkg/options/agent/options.go @@ -18,6 +18,7 @@ import ( "fmt" "os" "time" + "strconv" "github.com/golang/glog" "github.com/spf13/pflag" @@ -65,8 +66,10 @@ func NewMySQLAgentOpts() *MySQLAgentOpts { } namespace := os.Getenv("POD_NAMESPACE") clusterName := os.Getenv("MYSQL_CLUSTER_NAME") + healthcheckPort, _ := strconv.ParseInt(os.Getenv("AGENT_HEALTHCHECK_PORT"), 10, 32) + glog.V(2).Infof("mysql-agent healthcheckPort: %d", healthcheckPort) return &MySQLAgentOpts{ - HealthcheckPort: DefaultMySQLAgentHeathcheckPort, + HealthcheckPort: int32(healthcheckPort), Address: "0.0.0.0", Namespace: namespace, ClusterName: clusterName, diff --git a/pkg/resources/services/service.go b/pkg/resources/services/service.go index 9aa177a3b..36289d00d 100644 --- a/pkg/resources/services/service.go +++ b/pkg/resources/services/service.go @@ -25,7 +25,7 @@ import ( // NewForCluster will return a new headless Kubernetes service for a MySQL cluster func NewForCluster(cluster *v1alpha1.Cluster) *corev1.Service { - mysqlPort := corev1.ServicePort{Port: 3306} + mysqlPort := corev1.ServicePort{Port: int32(cluster.Spec.MysqlPort)} svc := &corev1.Service{ ObjectMeta: metav1.ObjectMeta{ Labels: map[string]string{constants.ClusterLabel: cluster.Name}, diff --git a/pkg/resources/statefulsets/statefulset.go b/pkg/resources/statefulsets/statefulset.go index b3ae46f3c..0e70ae6d6 100644 --- a/pkg/resources/statefulsets/statefulset.go +++ b/pkg/resources/statefulsets/statefulset.go @@ -29,7 +29,7 @@ import ( "github.com/oracle/mysql-operator/pkg/apis/mysql/v1alpha1" "github.com/oracle/mysql-operator/pkg/constants" - agentopts "github.com/oracle/mysql-operator/pkg/options/agent" + //agentopts "github.com/oracle/mysql-operator/pkg/options/agent" operatoropts "github.com/oracle/mysql-operator/pkg/options/operator" "github.com/oracle/mysql-operator/pkg/resources/secrets" "github.com/oracle/mysql-operator/pkg/version" @@ -127,6 +127,41 @@ func multiMasterEnvVar(enabled bool) v1.EnvVar { } } +func replicationGroupPortEnvVar(groupPort uint32) v1.EnvVar { + return v1.EnvVar{ + Name: "GROUP_PORT", + Value: strconv.FormatUint(uint64(groupPort), 10), + } +} + +func agentHealthCheckPortEnvVar(agentCheckPort uint32) v1.EnvVar { + return v1.EnvVar{ + Name: "AGENT_HEALTHCHECK_PORT", + Value: strconv.FormatUint(uint64(agentCheckPort), 10), + } +} + +func agentPromePortEnvVar(agentPromePort uint32) v1.EnvVar { + return v1.EnvVar{ + Name: "AGENT_PROME_PORT", + Value: strconv.FormatUint(uint64(agentPromePort), 10), + } +} + +func mysqlPortEnvVar(mysqlPort uint32) v1.EnvVar { + return v1.EnvVar{ + Name: "MYSQL_PORT", + Value: strconv.FormatUint(uint64(mysqlPort), 10), + } +} + +func numberEnvVar(field string, num uint32) v1.EnvVar { + return v1.EnvVar{ + Name: field, + Value: strconv.FormatUint(uint64(num), 10), + } +} + // Returns the MySQL_ROOT_PASSWORD environment variable // If a user specifies a secret in the spec we use that // else we create a secret with a random password @@ -151,10 +186,10 @@ func mysqlRootPassword(cluster *v1alpha1.Cluster) v1.EnvVar { } } -func getReplicationGroupSeeds(name string, members int) string { +func getReplicationGroupSeeds(name string, members int, groupPort uint32) string { seeds := []string{} for i := 0; i < members; i++ { - seeds = append(seeds, fmt.Sprintf("%[1]s-%[2]d.%[1]s:%[3]d", name, i, replicationGroupPort)) + seeds = append(seeds, fmt.Sprintf("%[1]s-%[2]d.%[1]s:%[3]d", name, i, groupPort)) } return strings.Join(seeds, ",") } @@ -188,7 +223,7 @@ func mysqlServerContainer(cluster *v1alpha1.Cluster, mysqlServerImage string, ro "--datadir=/var/lib/mysql", "--user=mysql", "--gtid_mode=ON", - "--log-bin", + "--log-bin=${MY_POD_NAME}-bin", "--binlog_checksum=NONE", "--enforce_gtid_consistency=ON", "--log-slave-updates=ON", @@ -208,9 +243,9 @@ func mysqlServerContainer(cluster *v1alpha1.Cluster, mysqlServerImage string, ro "--ssl-key=/etc/ssl/mysql/tls.key") } - if checkSupportGroupExitStateArgs(cluster.Spec.Version) { - args = append(args, "--loose-group-replication-exit-state-action=READ_ONLY") - } + //if checkSupportGroupExitStateArgs(cluster.Spec.Version) { + // args = append(args, "--loose-group-replication-exit-state-action=READ_ONLY") + //} entryPointArgs := strings.Join(args, " ") @@ -220,7 +255,7 @@ func mysqlServerContainer(cluster *v1alpha1.Cluster, mysqlServerImage string, ro # Finds the replica index from the hostname, and uses this to define # a unique server id for this instance. - index=$(cat /etc/hostname | grep -o '[^-]*$') + index=$(echo $MY_POD_NAME | grep -o '[^-]*$') /entrypoint.sh %s`, baseServerID, entryPointArgs) var resourceLimits corev1.ResourceRequirements @@ -232,14 +267,16 @@ func mysqlServerContainer(cluster *v1alpha1.Cluster, mysqlServerImage string, ro Name: MySQLServerName, // TODO(apryde): Add BaseImage to cluster CRD. Image: fmt.Sprintf("%s:%s", mysqlServerImage, cluster.Spec.Version), + ImagePullPolicy: v1.PullAlways, Ports: []v1.ContainerPort{ { - ContainerPort: 3306, + ContainerPort: int32(cluster.Spec.MysqlPort), }, }, VolumeMounts: volumeMounts(cluster), Command: []string{"/bin/bash", "-ecx", cmd}, Env: []v1.EnvVar{ + //mysqlPortEnvVar(cluster.Spec.MysqlPort), rootPassword, { Name: "MYSQL_ROOT_HOST", @@ -249,6 +286,14 @@ func mysqlServerContainer(cluster *v1alpha1.Cluster, mysqlServerImage string, ro Name: "MYSQL_LOG_CONSOLE", Value: "true", }, + { + Name: "MY_POD_NAME", + ValueFrom: &v1.EnvVarSource{ + FieldRef: &v1.ObjectFieldSelector{ + FieldPath: "metadata.name", + }, + }, + }, }, Resources: resourceLimits, } @@ -260,7 +305,7 @@ func mysqlAgentContainer(cluster *v1alpha1.Cluster, mysqlAgentImage string, root agentVersion = version } - replicationGroupSeeds := getReplicationGroupSeeds(cluster.Name, members) + replicationGroupSeeds := getReplicationGroupSeeds(cluster.Name, members, cluster.Spec.GroupPort) var resourceLimits corev1.ResourceRequirements if cluster.Spec.Resources != nil && cluster.Spec.Resources.Agent != nil { @@ -270,13 +315,19 @@ func mysqlAgentContainer(cluster *v1alpha1.Cluster, mysqlAgentImage string, root return v1.Container{ Name: MySQLAgentName, Image: fmt.Sprintf("%s:%s", mysqlAgentImage, agentVersion), - Args: []string{"--v=4"}, + ImagePullPolicy: v1.PullAlways, + Args: []string{"--v=6"}, VolumeMounts: volumeMounts(cluster), Env: []v1.EnvVar{ clusterNameEnvVar(cluster), namespaceEnvVar(), replicationGroupSeedsEnvVar(replicationGroupSeeds), multiMasterEnvVar(cluster.Spec.MultiMaster), + replicationGroupPortEnvVar(cluster.Spec.GroupPort), + agentHealthCheckPortEnvVar(cluster.Spec.AgentCheckPort), + agentPromePortEnvVar(cluster.Spec.AgentPromePort), + mysqlPortEnvVar(cluster.Spec.MysqlPort), + numberEnvVar("AGENT_INTERVAL", cluster.Spec.AgentIntervalTime), rootPassword, { Name: "MY_POD_IP", @@ -286,12 +337,20 @@ func mysqlAgentContainer(cluster *v1alpha1.Cluster, mysqlAgentImage string, root }, }, }, + { + Name: "MY_POD_NAME", + ValueFrom: &v1.EnvVarSource{ + FieldRef: &v1.ObjectFieldSelector{ + FieldPath: "metadata.name", + }, + }, + }, }, LivenessProbe: &v1.Probe{ Handler: v1.Handler{ HTTPGet: &v1.HTTPGetAction{ Path: "/live", - Port: intstr.FromInt(int(agentopts.DefaultMySQLAgentHeathcheckPort)), + Port: intstr.FromInt(int(cluster.Spec.AgentCheckPort)), }, }, }, @@ -299,7 +358,7 @@ func mysqlAgentContainer(cluster *v1alpha1.Cluster, mysqlAgentImage string, root Handler: v1.Handler{ HTTPGet: &v1.HTTPGetAction{ Path: "/ready", - Port: intstr.FromInt(int(agentopts.DefaultMySQLAgentHeathcheckPort)), + Port: intstr.FromInt(int(cluster.Spec.AgentCheckPort)), }, }, }, @@ -406,7 +465,7 @@ func NewForCluster(cluster *v1alpha1.Cluster, images operatoropts.Images, servic Labels: podLabels, Annotations: map[string]string{ "prometheus.io/scrape": "true", - "prometheus.io/port": "8080", + "prometheus.io/port": strconv.FormatUint(uint64(cluster.Spec.AgentPromePort), 10), }, }, Spec: v1.PodSpec{ @@ -416,6 +475,7 @@ func NewForCluster(cluster *v1alpha1.Cluster, images operatoropts.Images, servic ServiceAccountName: "mysql-agent", NodeSelector: cluster.Spec.NodeSelector, Affinity: cluster.Spec.Affinity, + HostNetwork: cluster.Spec.HostNetwork, Containers: containers, Volumes: podVolumes, }, diff --git a/pkg/util/mysqlsh/mysqlsh.go b/pkg/util/mysqlsh/mysqlsh.go index 283ac2d3a..7ec2e9743 100644 --- a/pkg/util/mysqlsh/mysqlsh.go +++ b/pkg/util/mysqlsh/mysqlsh.go @@ -87,6 +87,7 @@ func (r *runner) IsClustered(ctx context.Context) bool { func (r *runner) CreateCluster(ctx context.Context, opts Options) (*innodb.ClusterStatus, error) { python := fmt.Sprintf("print dba.create_cluster('%s', %s).status()", innodb.DefaultClusterName, opts) + glog.V(2).Infof("mysqlsh CreateCluster: %q", python) output, err := r.run(ctx, python) if err != nil { return nil, err @@ -118,9 +119,22 @@ func (r *runner) GetClusterStatus(ctx context.Context) (*innodb.ClusterStatus, e if err != nil { return nil, err } - + + //repair json decode problem + rawJson := string(output) + firstBraceIndex := strings.Index(rawJson, "{") + + var jsonData string + + if firstBraceIndex == -1 { + return nil, errors.Errorf("no json found in output: %q", rawJson) + } + + jsonData = rawJson[firstBraceIndex:] + glog.V(2).Infof("mysqlsh clusterstatus: %q", jsonData) + status := &innodb.ClusterStatus{} - err = json.Unmarshal(sanitizeJSON(output), status) + err = json.Unmarshal(sanitizeJSON([]byte(jsonData)), status) if err != nil { return nil, errors.Wrapf(err, "decoding cluster status output: %q", output) } diff --git a/src/github.com/oracle/mysql-operator/pkg/controllers/cluster/controller.go b/src/github.com/oracle/mysql-operator/pkg/controllers/cluster/controller.go new file mode 100644 index 000000000..b7159e9aa --- /dev/null +++ b/src/github.com/oracle/mysql-operator/pkg/controllers/cluster/controller.go @@ -0,0 +1,622 @@ +// Copyright 2018 Oracle and/or its affiliates. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package cluster + +import ( + "context" + "fmt" + "strings" + "time" + + apps "k8s.io/api/apps/v1beta1" + corev1 "k8s.io/api/core/v1" + apierrors "k8s.io/apimachinery/pkg/api/errors" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + labels "k8s.io/apimachinery/pkg/labels" + "k8s.io/apimachinery/pkg/types" + utilruntime "k8s.io/apimachinery/pkg/util/runtime" + wait "k8s.io/apimachinery/pkg/util/wait" + appsinformers "k8s.io/client-go/informers/apps/v1beta1" + coreinformers "k8s.io/client-go/informers/core/v1" + kubernetes "k8s.io/client-go/kubernetes" + scheme "k8s.io/client-go/kubernetes/scheme" + typedcorev1 "k8s.io/client-go/kubernetes/typed/core/v1" + appslisters "k8s.io/client-go/listers/apps/v1beta1" + corelisters "k8s.io/client-go/listers/core/v1" + cache "k8s.io/client-go/tools/cache" + record "k8s.io/client-go/tools/record" + workqueue "k8s.io/client-go/util/workqueue" + + "github.com/coreos/go-semver/semver" + "github.com/golang/glog" + "github.com/pkg/errors" + + clusterutil "github.com/oracle/mysql-operator/pkg/api/cluster" + v1alpha1 "github.com/oracle/mysql-operator/pkg/apis/mysql/v1alpha1" + constants "github.com/oracle/mysql-operator/pkg/constants" + controllerutils "github.com/oracle/mysql-operator/pkg/controllers/util" + clientset "github.com/oracle/mysql-operator/pkg/generated/clientset/versioned" + opscheme "github.com/oracle/mysql-operator/pkg/generated/clientset/versioned/scheme" + informersv1alpha1 "github.com/oracle/mysql-operator/pkg/generated/informers/externalversions/mysql/v1alpha1" + listersv1alpha1 "github.com/oracle/mysql-operator/pkg/generated/listers/mysql/v1alpha1" + + operatoropts "github.com/oracle/mysql-operator/pkg/options/operator" + secrets "github.com/oracle/mysql-operator/pkg/resources/secrets" + services "github.com/oracle/mysql-operator/pkg/resources/services" + statefulsets "github.com/oracle/mysql-operator/pkg/resources/statefulsets" + metrics "github.com/oracle/mysql-operator/pkg/util/metrics" + buildversion "github.com/oracle/mysql-operator/pkg/version" +) + +const controllerAgentName = "mysql-operator" + +const ( + // SuccessSynced is used as part of the Event 'reason' when a MySQSL is + // synced. + SuccessSynced = "Synced" + // ErrResourceExists is used as part of the Event 'reason' when a + // Cluster fails to sync due to a resource of the same name already + // existing. + ErrResourceExists = "ErrResourceExists" + + // MessageResourceExists is the message used for Events when a resource + // fails to sync due to a resource already existing. + MessageResourceExists = "%s %s/%s already exists and is not managed by Cluster" + // MessageResourceSynced is the message used for an Event fired when a + // Cluster is synced successfully + MessageResourceSynced = "Cluster synced successfully" +) + +// The MySQLController watches the Kubernetes API for changes to MySQL resources +type MySQLController struct { + // Global MySQLOperator configuration options. + opConfig operatoropts.MySQLOperatorOpts + + kubeClient kubernetes.Interface + opClient clientset.Interface + + shutdown bool + queue workqueue.RateLimitingInterface + + // clusterLister is able to list/get Clusters from a shared informer's + // store. + clusterLister listersv1alpha1.ClusterLister + // clusterListerSynced returns true if the Cluster shared informer has + // synced at least once. + clusterListerSynced cache.InformerSynced + // clusterUpdater implements control logic for updating Cluster + // statuses. Implemented as an interface to enable testing. + clusterUpdater clusterUpdaterInterface + + // statefulSetLister is able to list/get StatefulSets from a shared + // informer's store. + statefulSetLister appslisters.StatefulSetLister + // statefulSetListerSynced returns true if the StatefulSet shared informer + // has synced at least once. + statefulSetListerSynced cache.InformerSynced + // statefulSetControl enables control of StatefulSets associated with + // Clusters. + statefulSetControl StatefulSetControlInterface + + // podLister is able to list/get Pods from a shared + // informer's store. + podLister corelisters.PodLister + // podListerSynced returns true if the Pod shared informer + // has synced at least once. + podListerSynced cache.InformerSynced + // podControl enables control of Pods associated with + // Clusters. + podControl PodControlInterface + + // serviceLister is able to list/get Services from a shared informer's + // store. + serviceLister corelisters.ServiceLister + // serviceListerSynced returns true if the Service shared informer + // has synced at least once. + serviceListerSynced cache.InformerSynced + + // serviceControl enables control of Services associated with Clusters. + serviceControl ServiceControlInterface + + // secretControl enables control of Services associated with Clusters. + secretControl SecretControlInterface + + // recorder is an event recorder for recording Event resources to the + // Kubernetes API. + recorder record.EventRecorder +} + +// NewController creates a new MySQLController. +func NewController( + opConfig operatoropts.MySQLOperatorOpts, + opClient clientset.Interface, + kubeClient kubernetes.Interface, + clusterInformer informersv1alpha1.ClusterInformer, + statefulSetInformer appsinformers.StatefulSetInformer, + podInformer coreinformers.PodInformer, + serviceInformer coreinformers.ServiceInformer, + resyncPeriod time.Duration, + namespace string, +) *MySQLController { + opscheme.AddToScheme(scheme.Scheme) // TODO: This shouldn't be done here I don't think. + + // Create event broadcaster. + glog.V(4).Info("Creating event broadcaster") + eventBroadcaster := record.NewBroadcaster() + eventBroadcaster.StartLogging(glog.Infof) + eventBroadcaster.StartRecordingToSink(&typedcorev1.EventSinkImpl{Interface: kubeClient.CoreV1().Events("")}) + recorder := eventBroadcaster.NewRecorder(scheme.Scheme, corev1.EventSource{Component: controllerAgentName}) + + m := MySQLController{ + opConfig: opConfig, + + opClient: opClient, + kubeClient: kubeClient, + + clusterLister: clusterInformer.Lister(), + clusterListerSynced: clusterInformer.Informer().HasSynced, + clusterUpdater: newClusterUpdater(opClient, clusterInformer.Lister()), + + serviceLister: serviceInformer.Lister(), + serviceListerSynced: serviceInformer.Informer().HasSynced, + serviceControl: NewRealServiceControl(kubeClient, serviceInformer.Lister()), + + statefulSetLister: statefulSetInformer.Lister(), + statefulSetListerSynced: statefulSetInformer.Informer().HasSynced, + statefulSetControl: NewRealStatefulSetControl(kubeClient, statefulSetInformer.Lister()), + + podLister: podInformer.Lister(), + podListerSynced: podInformer.Informer().HasSynced, + podControl: NewRealPodControl(kubeClient, podInformer.Lister()), + + secretControl: NewRealSecretControl(kubeClient), + + queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "mysqlcluster"), + recorder: recorder, + } + + clusterInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{ + AddFunc: m.enqueueCluster, + UpdateFunc: func(old, new interface{}) { + m.enqueueCluster(new) + }, + DeleteFunc: func(obj interface{}) { + cluster, ok := obj.(*v1alpha1.Cluster) + if ok { + m.onClusterDeleted(cluster.Name) + } + }, + }) + + statefulSetInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{ + AddFunc: m.handleObject, + UpdateFunc: func(old, new interface{}) { + newStatefulSet := new.(*apps.StatefulSet) + oldStatefulSet := old.(*apps.StatefulSet) + if newStatefulSet.ResourceVersion == oldStatefulSet.ResourceVersion { + return + } + + // If cluster is ready ... + if newStatefulSet.Status.ReadyReplicas == newStatefulSet.Status.Replicas { + clusterName, ok := newStatefulSet.Labels[constants.ClusterLabel] + if ok { + m.onClusterReady(clusterName) + } + } + m.handleObject(new) + }, + DeleteFunc: m.handleObject, + }) + + return &m +} + +// Run will set up the event handlers for types we are interested in, as well +// as syncing informer caches and starting workers. It will block until stopCh +// is closed, at which point it will shutdown the workqueue and wait for +// workers to finish processing their current work items. +func (m *MySQLController) Run(ctx context.Context, threadiness int) { + defer utilruntime.HandleCrash() + defer m.queue.ShutDown() + + glog.Info("Starting Cluster controller") + + // Wait for the caches to be synced before starting workers + glog.Info("Waiting for Cluster controller informer caches to sync") + if !controllerutils.WaitForCacheSync("mysql cluster", ctx.Done(), + m.clusterListerSynced, + m.statefulSetListerSynced, + m.podListerSynced, + m.serviceListerSynced) { + return + } + + glog.Info("Starting Cluster controller workers") + // Launch two workers to process Foo resources + for i := 0; i < threadiness; i++ { + go wait.Until(m.runWorker, time.Second, ctx.Done()) + } + + glog.Info("Started Cluster controller workers") + defer glog.Info("Shutting down Cluster controller workers") + <-ctx.Done() +} + +// worker runs a worker goroutine that invokes processNextWorkItem until the +// controller's queue is closed. +func (m *MySQLController) runWorker() { + for m.processNextWorkItem() { + } +} + +// processNextWorkItem will read a single work item off the workqueue and +// attempt to process it, by calling the syncHandler. +func (m *MySQLController) processNextWorkItem() bool { + obj, shutdown := m.queue.Get() + + if shutdown { + return false + } + + err := func(obj interface{}) error { + defer m.queue.Done(obj) + key, ok := obj.(string) + if !ok { + m.queue.Forget(obj) + utilruntime.HandleError(fmt.Errorf("expected string in queue but got %#v", obj)) + return nil + } + if err := m.syncHandler(key); err != nil { + return fmt.Errorf("error syncing '%s': %s", key, err.Error()) + } + m.queue.Forget(obj) + glog.Infof("Successfully synced '%s'", key) + return nil + }(obj) + + if err != nil { + utilruntime.HandleError(err) + return true + } + + return true +} + +// syncHandler compares the actual state with the desired, and attempts to +// converge the two. It then updates the Status block of the Cluster +// resource with the current status of the resource. +func (m *MySQLController) syncHandler(key string) error { + // Convert the namespace/name string into a distinct namespace and name. + namespace, name, err := cache.SplitMetaNamespaceKey(key) + if err != nil { + utilruntime.HandleError(fmt.Errorf("invalid resource key: %s", key)) + return nil + } + + nsName := types.NamespacedName{Namespace: namespace, Name: name} + + // Get the Cluster resource with this namespace/name. + cluster, err := m.clusterLister.Clusters(namespace).Get(name) + if err != nil { + // The Cluster resource may no longer exist, in which case we stop processing. + if apierrors.IsNotFound(err) { + utilruntime.HandleError(fmt.Errorf("mysqlcluster '%s' in work queue no longer exists", key)) + return nil + } + return err + } + + cluster.EnsureDefaults() + if err = cluster.Validate(); err != nil { + return errors.Wrap(err, "validating Cluster") + } + + if cluster.Spec.Repository == "" { + cluster.Spec.Repository = m.opConfig.Images.DefaultMySQLServerImage + } + + operatorVersion := buildversion.GetBuildVersion() + // Ensure that the required labels are set on the cluster. + sel := combineSelectors(SelectorForCluster(cluster), SelectorForClusterOperatorVersion(operatorVersion)) + if !sel.Matches(labels.Set(cluster.Labels)) { + glog.V(2).Infof("Setting labels on cluster %s", SelectorForCluster(cluster).String()) + if cluster.Labels == nil { + cluster.Labels = make(map[string]string) + } + cluster.Labels[constants.ClusterLabel] = cluster.Name + cluster.Labels[constants.MySQLOperatorVersionLabel] = buildversion.GetBuildVersion() + return m.clusterUpdater.UpdateClusterLabels(cluster.DeepCopy(), labels.Set(cluster.Labels)) + } + + // Create a MySQL root password secret for the cluster if required. + if cluster.RequiresSecret() { + err = m.secretControl.CreateSecret(secrets.NewMysqlRootPassword(cluster)) + if err != nil && !apierrors.IsAlreadyExists(err) { + return errors.Wrap(err, "creating root password Secret") + } + } + + svc, err := m.serviceLister.Services(cluster.Namespace).Get(cluster.Name) + // If the resource doesn't exist, we'll create it + if apierrors.IsNotFound(err) { + glog.V(2).Infof("Creating a new Service for cluster %q", nsName) + svc = services.NewForCluster(cluster) + err = m.serviceControl.CreateService(svc) + } + + // If an error occurs during Get/Create, we'll requeue the item so we can + // attempt processing again later. This could have been caused by a + // temporary network failure, or any other transient reason. + if err != nil { + return err + } + + // If the Service is not controlled by this Cluster resource, we should + // log a warning to the event recorder and return. + if !metav1.IsControlledBy(svc, cluster) { + msg := fmt.Sprintf(MessageResourceExists, "Service", svc.Namespace, svc.Name) + m.recorder.Event(cluster, corev1.EventTypeWarning, ErrResourceExists, msg) + return errors.New(msg) + } + + ss, err := m.statefulSetLister.StatefulSets(cluster.Namespace).Get(cluster.Name) + // If the resource doesn't exist, we'll create it + if apierrors.IsNotFound(err) { + glog.V(2).Infof("Creating a new StatefulSet for cluster %q", nsName) + ss = statefulsets.NewForCluster(cluster, m.opConfig.Images, svc.Name) + err = m.statefulSetControl.CreateStatefulSet(ss) + } + + // If an error occurs during Get/Create, we'll requeue the item so we can + // attempt processing again later. This could have been caused by a + // temporary network failure, or any other transient reason. + if err != nil { + return err + } + + // If the StatefulSet is not controlled by this Cluster resource, we + // should log a warning to the event recorder and return. + if !metav1.IsControlledBy(ss, cluster) { + msg := fmt.Sprintf(MessageResourceExists, "StatefulSet", ss.Namespace, ss.Name) + m.recorder.Event(cluster, corev1.EventTypeWarning, ErrResourceExists, msg) + return fmt.Errorf(msg) + } + + // Upgrade the required component resources the current MySQLOperator version. + if err := m.ensureMySQLOperatorVersion(cluster, ss, buildversion.GetBuildVersion()); err != nil { + return errors.Wrap(err, "ensuring MySQL Operator version") + } + + // Upgrade the MySQL server version if required. + if err := m.ensureMySQLVersion(cluster, ss); err != nil { + return errors.Wrap(err, "ensuring MySQL version") + } + + // If this number of the members on the Cluster does not equal the + // current desired replicas on the StatefulSet, we should update the + // StatefulSet resource. + if cluster.Spec.Members != *ss.Spec.Replicas { + glog.V(4).Infof("Updating %q: clusterMembers=%d statefulSetReplicas=%d", + nsName, cluster.Spec.Members, ss.Spec.Replicas) + old := ss.DeepCopy() + ss = statefulsets.NewForCluster(cluster, m.opConfig.Images, svc.Name) + if err := m.statefulSetControl.Patch(old, ss); err != nil { + // Requeue the item so we can attempt processing again later. + // This could have been caused by a temporary network failure etc. + return err + } + } + + // Finally, we update the status block of the Cluster resource to + // reflect the current state of the world. + err = m.updateClusterStatus(cluster, ss) + if err != nil { + return err + } + + m.recorder.Event(cluster, corev1.EventTypeNormal, SuccessSynced, MessageResourceSynced) + return nil +} + +func getMySQLContainerIndex(containers []corev1.Container) (int, error) { + for i, c := range containers { + if c.Name == statefulsets.MySQLServerName { + return i, nil + } + } + + return 0, errors.Errorf("no %q container found", statefulsets.MySQLServerName) +} + +// splitImage splits an image into its name and version. +func splitImage(image string) (string, string, error) { + parts := strings.Split(image, ":") + if len(parts) < 2 { + return "", "", errors.Errorf("invalid image %q", image) + } + return strings.Join(parts[:len(parts)-1], ":"), parts[len(parts)-1], nil +} + +func (m *MySQLController) ensureMySQLVersion(c *v1alpha1.Cluster, ss *apps.StatefulSet) error { + index, err := getMySQLContainerIndex(ss.Spec.Template.Spec.Containers) + if err != nil { + return errors.Wrapf(err, "getting MySQL container for StatefulSet %q", ss.Name) + } + imageName, actualVersion, err := splitImage(ss.Spec.Template.Spec.Containers[index].Image) + if err != nil { + return errors.Wrapf(err, "getting MySQL version for StatefulSet %q", ss.Name) + } + + actual, err := semver.NewVersion(actualVersion) + if err != nil { + return errors.Wrap(err, "parsing StatuefulSet MySQL version") + } + expected, err := semver.NewVersion(c.Spec.Version) + if err != nil { + return errors.Wrap(err, "parsing Cluster MySQL version") + } + + switch expected.Compare(*actual) { + case -1: + return errors.Errorf("attempted unsupported downgrade from %q to %q", actual, expected) + case 0: + return nil + } + + updated := ss.DeepCopy() + updated.Spec.Template.Spec.Containers[index].Image = fmt.Sprintf("%s:%s", imageName, c.Spec.Version) + // NOTE: We do this as previously we defaulted to the OnDelete strategy + // so clusters created with previous versions would not support upgrades. + updated.Spec.UpdateStrategy = apps.StatefulSetUpdateStrategy{ + Type: apps.RollingUpdateStatefulSetStrategyType, + } + + err = m.statefulSetControl.Patch(ss, updated) + if err != nil { + return errors.Wrap(err, "patching StatefulSet") + } + + return nil +} + +// ensureMySQLOperatorVersion updates the MySQLOperator resource types that +//require it to make it consistent with the specified operator version. +func (m *MySQLController) ensureMySQLOperatorVersion(c *v1alpha1.Cluster, ss *apps.StatefulSet, operatorVersion string) error { + // Ensure the Pods belonging to the Cluster are updated to the correct 'mysql-agent' image for the current MySQLOperator version. + container := statefulsets.MySQLAgentName + pods, err := m.podLister.List(SelectorForCluster(c)) + if err != nil { + return errors.Wrapf(err, "listing pods matching %q", SelectorForCluster(c).String()) + } + for _, pod := range pods { + if requiresMySQLAgentPodUpgrade(pod, container, operatorVersion) && canUpgradeMySQLAgent(pod) { + glog.Infof("Upgrading cluster pod '%s/%s' to latest operator version: %s", pod.Namespace, pod.Name, operatorVersion) + updated := updatePodToOperatorVersion(pod.DeepCopy(), m.opConfig.Images.MySQLAgentImage, operatorVersion) + err = m.podControl.PatchPod(pod, updated) + if err != nil { + return errors.Wrap(err, "upgrade operator version: PatchPod failed") + } + } + } + + // Ensure the StatefulSet is updated with the correct template 'mysql-agent' image for the current MySQLOperator version. + if requiresMySQLAgentStatefulSetUpgrade(ss, container, operatorVersion) { + glog.Infof("Upgrading cluster statefulset '%s/%s' to latest operator version: %s", ss.Namespace, ss.Name, operatorVersion) + updated := updateStatefulSetToOperatorVersion(ss.DeepCopy(), m.opConfig.Images.MySQLAgentImage, operatorVersion) + err = m.statefulSetControl.Patch(ss, updated) + if err != nil { + return errors.Wrap(err, "upgrade operator version: PatchStatefulSet failed") + } + } + + // Ensure the Cluster is updated with the correct MySQLOperator version. + if !SelectorForClusterOperatorVersion(operatorVersion).Matches(labels.Set(c.Labels)) { + glog.Infof("Upgrading cluster statefulset '%s/%s' to latest operator version: %s", c.Namespace, c.Name, operatorVersion) + copy := c.DeepCopy() + copy.Labels[constants.MySQLOperatorVersionLabel] = operatorVersion + err := m.clusterUpdater.UpdateClusterLabels(copy, labels.Set(copy.Labels)) + if err != nil { + return errors.Wrap(err, "upgrade operator version: ClusterUpdate failed") + } + } + return nil +} + +// updateClusterStatusForSS updates Cluster statuses based on changes to their associated StatefulSets. +func (m *MySQLController) updateClusterStatus(cluster *v1alpha1.Cluster, ss *apps.StatefulSet) error { + glog.V(4).Infof("%s/%s: ss.Spec.Replicas=%d, ss.Status.ReadyReplicas=%d, ss.Status.Replicas=%d", + cluster.Namespace, cluster.Name, *ss.Spec.Replicas, ss.Status.ReadyReplicas, ss.Status.Replicas) + + status := cluster.Status.DeepCopy() + _, condition := clusterutil.GetClusterCondition(&cluster.Status, v1alpha1.ClusterReady) + if condition == nil { + condition = &v1alpha1.ClusterCondition{Type: v1alpha1.ClusterReady} + } + if ss.Status.ReadyReplicas == ss.Status.Replicas && ss.Status.ReadyReplicas == cluster.Spec.Members { + condition.Status = corev1.ConditionTrue + } else { + condition.Status = corev1.ConditionFalse + } + + if updated := clusterutil.UpdateClusterCondition(status, condition); updated { + return m.clusterUpdater.UpdateClusterStatus(cluster.DeepCopy(), status) + } + return nil +} + +// enqueueCluster takes a Cluster resource and converts it into a +// namespace/name string which is then put onto the work queue. This method +// should *not* be passed resources of any type other than Cluster. +func (m *MySQLController) enqueueCluster(obj interface{}) { + key, err := cache.MetaNamespaceKeyFunc(obj) + if err != nil { + utilruntime.HandleError(err) + return + } + m.queue.AddRateLimited(key) +} + +// handleObject will take any resource implementing metav1.Object and attempt +// to find the MySQLResource that 'owns' it. It does this by looking at the +// objects metadata.ownerReferences field for an appropriate OwnerReference. +// It then enqueues that Foo resource to be processed. If the object does not +// have an appropriate OwnerReference, it will simply be skipped. +func (m *MySQLController) handleObject(obj interface{}) { + object, ok := obj.(metav1.Object) + if !ok { + tombstone, ok := obj.(cache.DeletedFinalStateUnknown) + if !ok { + utilruntime.HandleError(fmt.Errorf("error decoding object, invalid type")) + return + } + object, ok = tombstone.Obj.(metav1.Object) + if !ok { + utilruntime.HandleError(fmt.Errorf("error decoding object tombstone, invalid type")) + return + } + glog.V(4).Infof("Recovered deleted object '%s' from tombstone", object.GetName()) + } + + glog.V(4).Infof("Processing object: %s", object.GetName()) + if ownerRef := metav1.GetControllerOf(object); ownerRef != nil { + // If this object is not owned by a Cluster, we should not do + // anything more with it. + if ownerRef.Kind != v1alpha1.ClusterCRDResourceKind { + return + } + + cluster, err := m.clusterLister.Clusters(object.GetNamespace()).Get(ownerRef.Name) + if err != nil { + glog.V(4).Infof("ignoring orphaned object '%s' of Cluster '%s'", object.GetSelfLink(), ownerRef.Name) + return + } + + m.enqueueCluster(cluster) + return + } +} + +func (m *MySQLController) onClusterReady(clusterName string) { + glog.V(2).Infof("Cluster %s ready", clusterName) + metrics.IncEventCounter(clustersCreatedCount) + metrics.IncEventGauge(clustersTotalCount) +} + +func (m *MySQLController) onClusterDeleted(clusterName string) { + glog.V(2).Infof("Cluster %s deleted", clusterName) + metrics.IncEventCounter(clustersDeletedCount) + metrics.DecEventGauge(clustersTotalCount) +}