Skip to content
60 changes: 60 additions & 0 deletions integration/e2e_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
"k8s.io/client-go/kubernetes"

"github.com/grafana/rollout-operator/integration/k3t"
"github.com/grafana/rollout-operator/pkg/admission"
)

func TestRolloutHappyCase(t *testing.T) {
Expand Down Expand Up @@ -348,3 +349,62 @@ func getCertificateExpiration(t *testing.T, secret *corev1.Secret) time.Time {
require.NoError(t, err)
return c.NotAfter
}

func TestPrepareDownscale_CanDownscale(t *testing.T) {
ctx := context.Background()

cluster := k3t.NewCluster(ctx, t, k3t.WithImages("rollout-operator:latest", "mock-service:latest"))
api := cluster.API()
{
t.Log("Create the webhook before the rollout-operator")
_, err := api.AdmissionregistrationV1().MutatingWebhookConfigurations().Create(ctx, prepareDownscaleValidatingWebhook(corev1.NamespaceDefault, admission.PrepareDownscaleWebhookPath), metav1.CreateOptions{})
require.NoError(t, err)
}

{
t.Log("Create rollout operator and check it's running and ready.")
createRolloutOperator(t, ctx, api, corev1.NamespaceDefault, true)
rolloutOperatorPod := eventuallyGetFirstPod(ctx, t, api, "name=rollout-operator")
requireEventuallyPod(t, api, ctx, rolloutOperatorPod, expectPodPhase(corev1.PodRunning), expectReady())
}

mock := mockServiceStatefulSet("mock", "1", true)
{
t.Log("Create the service.")
requireCreateService(ctx, t, api, corev1.NamespaceDefault, "mock")
t.Log("Create the statefulset with three replicas.")
mock.Spec.Replicas = ptr[int32](3)
mock.ObjectMeta.Labels[admission.PrepareDownscaleLabelKey] = admission.PrepareDownscaleLabelValue
mock.ObjectMeta.Annotations[admission.PrepareDownscalePathAnnotationKey] = "/prepare-shutdown-pass"
mock.ObjectMeta.Annotations[admission.PrepareDownscalePortAnnotationKey] = "8080"
requireCreateStatefulSet(ctx, t, api, mock)
requireEventuallyPodCount(ctx, t, api, "name=mock", 3)
// TODO: find a good way to wait until the DNS propagation is done for the pods
Copy link
Contributor Author

@MichelHollands MichelHollands Apr 26, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When running this locally the DNS entry for the third pod (mock-2) doesn't always exist until a bit later.
I tried a few things here (wait for the EndpointSlices to be ready etc) but nothing really works apart from a 2 second wait.
Any suggestions are welcome.

time.Sleep(2 * time.Second)
}

{
t.Log("Scale down.")
mock.Spec.Replicas = ptr[int32](1)
requireUpdateStatefulSet(ctx, t, api, mock)
requireEventuallyPodCount(ctx, t, api, "name=mock", 1)
}

{
t.Log("Scale up and make it have two replicas.")
mock.Spec.Replicas = ptr[int32](2)
requireUpdateStatefulSet(ctx, t, api, mock)
requireEventuallyPodCount(ctx, t, api, "name=mock", 2)
}

{
t.Log("Failure in prepare-shutdown on node.")
mock.ObjectMeta.Annotations[admission.PrepareDownscalePathAnnotationKey] = "/prepare-shutdown-fail"
requireUpdateStatefulSet(ctx, t, api, mock)
t.Log("Try to scale down again. This should fail")
mock.Spec.Replicas = ptr[int32](1)
err := updateStatefulSet(ctx, t, api, mock)
require.Error(t, err, "Updating the stateful set didn't fail as expected")
require.Contains(t, err.Error(), `admission webhook "prepare-downscale-default.grafana.com" denied the request: downscale of statefulsets/mock in default from 2 to 1 replicas is not allowed because one or more pods failed to prepare for shutdown`)
}
}
20 changes: 18 additions & 2 deletions integration/manifests_mock_service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,13 +33,27 @@ func createMockServiceZone(t *testing.T, ctx context.Context, api *kubernetes.Cl
}
}

func mockServiceService(name string) *corev1.Service {
func mockServiceServiceWithNoClusterIP(name string) *corev1.Service {
return mockServiceServiceHelper(name, false)
}

func mockServiceService(name string, noClusterIP ...bool) *corev1.Service {
return mockServiceServiceHelper(name, true)
}

func mockServiceServiceHelper(name string, clusterIPRequired bool) *corev1.Service {
var clusterIP string
if clusterIPRequired == false {
clusterIP = "None"
}

return &corev1.Service{
ObjectMeta: metav1.ObjectMeta{
Name: name,
},
Spec: corev1.ServiceSpec{
Type: corev1.ServiceTypeClusterIP,
Type: corev1.ServiceTypeClusterIP,
ClusterIP: clusterIP,
Selector: map[string]string{
"name": name,
},
Expand Down Expand Up @@ -97,6 +111,7 @@ func mockServiceStatefulSet(name, version string, ready bool) *appsv1.StatefulSe
Labels: map[string]string{
"rollout-group": "mock",
},
Annotations: map[string]string{},
},
Spec: appsv1.StatefulSetSpec{
Replicas: ptr[int32](1),
Expand All @@ -105,6 +120,7 @@ func mockServiceStatefulSet(name, version string, ready bool) *appsv1.StatefulSe
"name": name,
},
},
ServiceName: "mock",
Template: corev1.PodTemplateSpec{
ObjectMeta: metav1.ObjectMeta{
Name: name,
Expand Down
52 changes: 51 additions & 1 deletion integration/manifests_rollout_operator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -174,7 +174,7 @@ func rolloutOperatorRole() *rbacv1.Role {
{
APIGroups: []string{"apps"},
Resources: []string{"statefulsets"},
Verbs: []string{"list", "get", "watch"},
Verbs: []string{"list", "get", "watch", "patch"},
},
{
APIGroups: []string{"apps"},
Expand Down Expand Up @@ -285,6 +285,56 @@ func webhookRolloutOperatorClusterRole(namespace string) *rbacv1.ClusterRole {
}
}

func prepareDownscaleValidatingWebhook(namespace, path string) *admissionregistrationv1.MutatingWebhookConfiguration {
return &admissionregistrationv1.MutatingWebhookConfiguration{
ObjectMeta: metav1.ObjectMeta{
Name: fmt.Sprintf("prepare-downscale-%s", namespace),
Labels: map[string]string{
"grafana.com/inject-rollout-operator-ca": "true",
"grafana.com/namespace": namespace,
},
Annotations: nil,
OwnerReferences: nil,
Finalizers: nil,
ManagedFields: nil,
},
Webhooks: []admissionregistrationv1.MutatingWebhook{
{
Name: fmt.Sprintf("prepare-downscale-%s.grafana.com", namespace),
ClientConfig: admissionregistrationv1.WebhookClientConfig{
Service: &admissionregistrationv1.ServiceReference{
Namespace: namespace,
Name: "rollout-operator",
Path: &path,
},
},
Rules: []admissionregistrationv1.RuleWithOperations{
{
Operations: []admissionregistrationv1.OperationType{admissionregistrationv1.Update},
Rule: admissionregistrationv1.Rule{
APIGroups: []string{"apps"},
APIVersions: []string{"v1"},
Resources: []string{
"statefulsets",
"statefulsets/scale",
},
Scope: ptr(admissionregistrationv1.NamespacedScope),
},
},
},
NamespaceSelector: &metav1.LabelSelector{
// This is just an example of matching changes only in a specific namespace.
// https://kubernetes.io/docs/reference/labels-annotations-taints/#kubernetes-io-metadata-name
MatchLabels: map[string]string{"kubernetes.io/metadata.name": namespace},
},
FailurePolicy: ptr(admissionregistrationv1.Fail),
SideEffects: ptr(admissionregistrationv1.SideEffectClassNoneOnDryRun),
AdmissionReviewVersions: []string{"v1"},
},
},
}
}

func noDownscaleValidatingWebhook(namespace string) *admissionregistrationv1.ValidatingWebhookConfiguration {
return &admissionregistrationv1.ValidatingWebhookConfiguration{
ObjectMeta: metav1.ObjectMeta{
Expand Down
27 changes: 27 additions & 0 deletions integration/mock-service/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,18 @@ import (
"syscall"
)

type behaviour int

const (
PASS behaviour = iota
FAIL
)

const (
PASS_PATH = "/prepare-shutdown-pass"
FAIL_PATH = "/prepare-shutdown-fail"
)

func main() {
alive := &atomic.Int64{}
alive.Store(1)
Expand All @@ -21,6 +33,8 @@ func main() {
mux := http.NewServeMux()
mux.Handle("/ready", probeHandler(ready))
mux.Handle("/alive", probeHandler(alive))
mux.Handle(PASS_PATH, shutdownHandler(PASS))
mux.Handle(FAIL_PATH, shutdownHandler(FAIL))

if prefix := os.Getenv("PREFIX"); prefix != "" {
log.Printf("prefix=%s", prefix)
Expand All @@ -47,6 +61,19 @@ func main() {
<-closed
}

func shutdownHandler(expected behaviour) http.HandlerFunc {
return func(w http.ResponseWriter, req *http.Request) {
switch expected {
case PASS:
w.WriteHeader(http.StatusOK)
case FAIL:
w.WriteHeader(http.StatusForbidden)
default:
w.WriteHeader(http.StatusBadRequest)
}
}
}

func probeHandler(probe *atomic.Int64) http.HandlerFunc {
return func(w http.ResponseWriter, req *http.Request) {
switch req.Method {
Expand Down
14 changes: 14 additions & 0 deletions integration/util_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -165,6 +165,12 @@ func requireUpdateStatefulSet(ctx context.Context, t *testing.T, api *kubernetes
require.NoError(t, err, "Can't update StatefulSet")
}

func updateStatefulSet(ctx context.Context, t *testing.T, api *kubernetes.Clientset, sts *appsv1.StatefulSet) error {
t.Helper()
_, err := api.AppsV1().StatefulSets(corev1.NamespaceDefault).Update(ctx, sts, metav1.UpdateOptions{})
return err
}

func getAndUpdateStatefulSetScale(ctx context.Context, t *testing.T, api *kubernetes.Clientset, name string, replicas int32, dryrun bool) error {
s, err := api.AppsV1().StatefulSets(corev1.NamespaceDefault).GetScale(ctx, name, metav1.GetOptions{})
require.NoError(t, err)
Expand All @@ -176,3 +182,11 @@ func getAndUpdateStatefulSetScale(ctx context.Context, t *testing.T, api *kubern
_, err = api.AppsV1().StatefulSets(corev1.NamespaceDefault).UpdateScale(ctx, name, s, opts)
return err
}

func requireCreateService(ctx context.Context, t *testing.T, api *kubernetes.Clientset, namespace, name string) {
t.Helper()
_, err := api.CoreV1().Services(namespace).Create(ctx, mockServiceServiceWithNoClusterIP(name), metav1.CreateOptions{})
require.NoError(t, err, "Can't create Service")
_, err = api.NetworkingV1().Ingresses(namespace).Create(ctx, mockServiceIngress(name), metav1.CreateOptions{})
require.NoError(t, err, "Can't create Ingress")
}
2 changes: 1 addition & 1 deletion pkg/admission/prep_downscale.go
Original file line number Diff line number Diff line change
Expand Up @@ -257,7 +257,7 @@ func getResourceAnnotations(ctx context.Context, ar v1.AdmissionReview, api kube
case "statefulsets":
obj, err := api.AppsV1().StatefulSets(ar.Request.Namespace).Get(ctx, ar.Request.Name, metav1.GetOptions{})
if err != nil {
return nil, err
return nil, err
}
return obj.Annotations, nil
}
Expand Down