diff --git a/integration/e2e_test.go b/integration/e2e_test.go index 94d5b28d0..64a5fab9f 100644 --- a/integration/e2e_test.go +++ b/integration/e2e_test.go @@ -15,6 +15,7 @@ import ( "k8s.io/client-go/kubernetes" "github.com/grafana/rollout-operator/integration/k3t" + "github.com/grafana/rollout-operator/pkg/admission" ) func TestRolloutHappyCase(t *testing.T) { @@ -348,3 +349,62 @@ func getCertificateExpiration(t *testing.T, secret *corev1.Secret) time.Time { require.NoError(t, err) return c.NotAfter } + +func TestPrepareDownscale_CanDownscale(t *testing.T) { + ctx := context.Background() + + cluster := k3t.NewCluster(ctx, t, k3t.WithImages("rollout-operator:latest", "mock-service:latest")) + api := cluster.API() + { + t.Log("Create the webhook before the rollout-operator") + _, err := api.AdmissionregistrationV1().MutatingWebhookConfigurations().Create(ctx, prepareDownscaleValidatingWebhook(corev1.NamespaceDefault, admission.PrepareDownscaleWebhookPath), metav1.CreateOptions{}) + require.NoError(t, err) + } + + { + t.Log("Create rollout operator and check it's running and ready.") + createRolloutOperator(t, ctx, api, corev1.NamespaceDefault, true) + rolloutOperatorPod := eventuallyGetFirstPod(ctx, t, api, "name=rollout-operator") + requireEventuallyPod(t, api, ctx, rolloutOperatorPod, expectPodPhase(corev1.PodRunning), expectReady()) + } + + mock := mockServiceStatefulSet("mock", "1", true) + { + t.Log("Create the service.") + requireCreateService(ctx, t, api, corev1.NamespaceDefault, "mock") + t.Log("Create the statefulset with three replicas.") + mock.Spec.Replicas = ptr[int32](3) + mock.ObjectMeta.Labels[admission.PrepareDownscaleLabelKey] = admission.PrepareDownscaleLabelValue + mock.ObjectMeta.Annotations[admission.PrepareDownscalePathAnnotationKey] = "/prepare-shutdown-pass" + mock.ObjectMeta.Annotations[admission.PrepareDownscalePortAnnotationKey] = "8080" + requireCreateStatefulSet(ctx, t, api, mock) + requireEventuallyPodCount(ctx, t, api, "name=mock", 3) + // TODO: find a good way to wait until the DNS propagation is done for the pods + time.Sleep(2 * time.Second) + } + + { + t.Log("Scale down.") + mock.Spec.Replicas = ptr[int32](1) + requireUpdateStatefulSet(ctx, t, api, mock) + requireEventuallyPodCount(ctx, t, api, "name=mock", 1) + } + + { + t.Log("Scale up and make it have two replicas.") + mock.Spec.Replicas = ptr[int32](2) + requireUpdateStatefulSet(ctx, t, api, mock) + requireEventuallyPodCount(ctx, t, api, "name=mock", 2) + } + + { + t.Log("Failure in prepare-shutdown on node.") + mock.ObjectMeta.Annotations[admission.PrepareDownscalePathAnnotationKey] = "/prepare-shutdown-fail" + requireUpdateStatefulSet(ctx, t, api, mock) + t.Log("Try to scale down again. This should fail") + mock.Spec.Replicas = ptr[int32](1) + err := updateStatefulSet(ctx, t, api, mock) + require.Error(t, err, "Updating the stateful set didn't fail as expected") + require.Contains(t, err.Error(), `admission webhook "prepare-downscale-default.grafana.com" denied the request: downscale of statefulsets/mock in default from 2 to 1 replicas is not allowed because one or more pods failed to prepare for shutdown`) + } +} diff --git a/integration/manifests_mock_service_test.go b/integration/manifests_mock_service_test.go index 7605891c5..8b0cd88a4 100644 --- a/integration/manifests_mock_service_test.go +++ b/integration/manifests_mock_service_test.go @@ -33,13 +33,27 @@ func createMockServiceZone(t *testing.T, ctx context.Context, api *kubernetes.Cl } } -func mockServiceService(name string) *corev1.Service { +func mockServiceServiceWithNoClusterIP(name string) *corev1.Service { + return mockServiceServiceHelper(name, false) +} + +func mockServiceService(name string, noClusterIP ...bool) *corev1.Service { + return mockServiceServiceHelper(name, true) +} + +func mockServiceServiceHelper(name string, clusterIPRequired bool) *corev1.Service { + var clusterIP string + if clusterIPRequired == false { + clusterIP = "None" + } + return &corev1.Service{ ObjectMeta: metav1.ObjectMeta{ Name: name, }, Spec: corev1.ServiceSpec{ - Type: corev1.ServiceTypeClusterIP, + Type: corev1.ServiceTypeClusterIP, + ClusterIP: clusterIP, Selector: map[string]string{ "name": name, }, @@ -97,6 +111,7 @@ func mockServiceStatefulSet(name, version string, ready bool) *appsv1.StatefulSe Labels: map[string]string{ "rollout-group": "mock", }, + Annotations: map[string]string{}, }, Spec: appsv1.StatefulSetSpec{ Replicas: ptr[int32](1), @@ -105,6 +120,7 @@ func mockServiceStatefulSet(name, version string, ready bool) *appsv1.StatefulSe "name": name, }, }, + ServiceName: "mock", Template: corev1.PodTemplateSpec{ ObjectMeta: metav1.ObjectMeta{ Name: name, diff --git a/integration/manifests_rollout_operator_test.go b/integration/manifests_rollout_operator_test.go index 9f7a201d2..371b62ada 100644 --- a/integration/manifests_rollout_operator_test.go +++ b/integration/manifests_rollout_operator_test.go @@ -174,7 +174,7 @@ func rolloutOperatorRole() *rbacv1.Role { { APIGroups: []string{"apps"}, Resources: []string{"statefulsets"}, - Verbs: []string{"list", "get", "watch"}, + Verbs: []string{"list", "get", "watch", "patch"}, }, { APIGroups: []string{"apps"}, @@ -285,6 +285,56 @@ func webhookRolloutOperatorClusterRole(namespace string) *rbacv1.ClusterRole { } } +func prepareDownscaleValidatingWebhook(namespace, path string) *admissionregistrationv1.MutatingWebhookConfiguration { + return &admissionregistrationv1.MutatingWebhookConfiguration{ + ObjectMeta: metav1.ObjectMeta{ + Name: fmt.Sprintf("prepare-downscale-%s", namespace), + Labels: map[string]string{ + "grafana.com/inject-rollout-operator-ca": "true", + "grafana.com/namespace": namespace, + }, + Annotations: nil, + OwnerReferences: nil, + Finalizers: nil, + ManagedFields: nil, + }, + Webhooks: []admissionregistrationv1.MutatingWebhook{ + { + Name: fmt.Sprintf("prepare-downscale-%s.grafana.com", namespace), + ClientConfig: admissionregistrationv1.WebhookClientConfig{ + Service: &admissionregistrationv1.ServiceReference{ + Namespace: namespace, + Name: "rollout-operator", + Path: &path, + }, + }, + Rules: []admissionregistrationv1.RuleWithOperations{ + { + Operations: []admissionregistrationv1.OperationType{admissionregistrationv1.Update}, + Rule: admissionregistrationv1.Rule{ + APIGroups: []string{"apps"}, + APIVersions: []string{"v1"}, + Resources: []string{ + "statefulsets", + "statefulsets/scale", + }, + Scope: ptr(admissionregistrationv1.NamespacedScope), + }, + }, + }, + NamespaceSelector: &metav1.LabelSelector{ + // This is just an example of matching changes only in a specific namespace. + // https://kubernetes.io/docs/reference/labels-annotations-taints/#kubernetes-io-metadata-name + MatchLabels: map[string]string{"kubernetes.io/metadata.name": namespace}, + }, + FailurePolicy: ptr(admissionregistrationv1.Fail), + SideEffects: ptr(admissionregistrationv1.SideEffectClassNoneOnDryRun), + AdmissionReviewVersions: []string{"v1"}, + }, + }, + } +} + func noDownscaleValidatingWebhook(namespace string) *admissionregistrationv1.ValidatingWebhookConfiguration { return &admissionregistrationv1.ValidatingWebhookConfiguration{ ObjectMeta: metav1.ObjectMeta{ diff --git a/integration/mock-service/main.go b/integration/mock-service/main.go index 8e38b3277..4abb9081e 100644 --- a/integration/mock-service/main.go +++ b/integration/mock-service/main.go @@ -10,6 +10,18 @@ import ( "syscall" ) +type behaviour int + +const ( + PASS behaviour = iota + FAIL +) + +const ( + PASS_PATH = "/prepare-shutdown-pass" + FAIL_PATH = "/prepare-shutdown-fail" +) + func main() { alive := &atomic.Int64{} alive.Store(1) @@ -21,6 +33,8 @@ func main() { mux := http.NewServeMux() mux.Handle("/ready", probeHandler(ready)) mux.Handle("/alive", probeHandler(alive)) + mux.Handle(PASS_PATH, shutdownHandler(PASS)) + mux.Handle(FAIL_PATH, shutdownHandler(FAIL)) if prefix := os.Getenv("PREFIX"); prefix != "" { log.Printf("prefix=%s", prefix) @@ -47,6 +61,19 @@ func main() { <-closed } +func shutdownHandler(expected behaviour) http.HandlerFunc { + return func(w http.ResponseWriter, req *http.Request) { + switch expected { + case PASS: + w.WriteHeader(http.StatusOK) + case FAIL: + w.WriteHeader(http.StatusForbidden) + default: + w.WriteHeader(http.StatusBadRequest) + } + } +} + func probeHandler(probe *atomic.Int64) http.HandlerFunc { return func(w http.ResponseWriter, req *http.Request) { switch req.Method { diff --git a/integration/util_test.go b/integration/util_test.go index 0a616e279..53b0359ef 100644 --- a/integration/util_test.go +++ b/integration/util_test.go @@ -165,6 +165,12 @@ func requireUpdateStatefulSet(ctx context.Context, t *testing.T, api *kubernetes require.NoError(t, err, "Can't update StatefulSet") } +func updateStatefulSet(ctx context.Context, t *testing.T, api *kubernetes.Clientset, sts *appsv1.StatefulSet) error { + t.Helper() + _, err := api.AppsV1().StatefulSets(corev1.NamespaceDefault).Update(ctx, sts, metav1.UpdateOptions{}) + return err +} + func getAndUpdateStatefulSetScale(ctx context.Context, t *testing.T, api *kubernetes.Clientset, name string, replicas int32, dryrun bool) error { s, err := api.AppsV1().StatefulSets(corev1.NamespaceDefault).GetScale(ctx, name, metav1.GetOptions{}) require.NoError(t, err) @@ -176,3 +182,11 @@ func getAndUpdateStatefulSetScale(ctx context.Context, t *testing.T, api *kubern _, err = api.AppsV1().StatefulSets(corev1.NamespaceDefault).UpdateScale(ctx, name, s, opts) return err } + +func requireCreateService(ctx context.Context, t *testing.T, api *kubernetes.Clientset, namespace, name string) { + t.Helper() + _, err := api.CoreV1().Services(namespace).Create(ctx, mockServiceServiceWithNoClusterIP(name), metav1.CreateOptions{}) + require.NoError(t, err, "Can't create Service") + _, err = api.NetworkingV1().Ingresses(namespace).Create(ctx, mockServiceIngress(name), metav1.CreateOptions{}) + require.NoError(t, err, "Can't create Ingress") +} diff --git a/pkg/admission/prep_downscale.go b/pkg/admission/prep_downscale.go index 87052dcc8..a87915904 100644 --- a/pkg/admission/prep_downscale.go +++ b/pkg/admission/prep_downscale.go @@ -257,7 +257,7 @@ func getResourceAnnotations(ctx context.Context, ar v1.AdmissionReview, api kube case "statefulsets": obj, err := api.AppsV1().StatefulSets(ar.Request.Namespace).Get(ctx, ar.Request.Name, metav1.GetOptions{}) if err != nil { - return nil, err + return nil, err } return obj.Annotations, nil }