diff --git a/test/extended/etcd/OWNERS b/test/extended/etcd/OWNERS index 48efb05bcf01..64ac641735fd 100644 --- a/test/extended/etcd/OWNERS +++ b/test/extended/etcd/OWNERS @@ -1,12 +1,11 @@ reviewers: - dusk125 - hasbro17 - - Elbehery + - jubittajohn - tjungblu approvers: - deads2k - - soltysh - hasbro17 - dusk125 - - Elbehery + - jubittajohn - tjungblu diff --git a/test/extended/etcd/helpers/helpers.go b/test/extended/etcd/helpers/helpers.go index b8b242a9bad6..129475f09b17 100644 --- a/test/extended/etcd/helpers/helpers.go +++ b/test/extended/etcd/helpers/helpers.go @@ -385,6 +385,167 @@ func EnsureCPMSReplicasConverged(ctx context.Context, cpmsClient machinev1client return nil } +// UpdateCPMSStrategy updates the CPMS strategy to the specified type (OnDelete, RollingUpdate, or Recreate) +func UpdateCPMSStrategy(ctx context.Context, t TestingT, cpmsClient machinev1client.ControlPlaneMachineSetInterface, strategy machinev1.ControlPlaneMachineSetStrategyType) error { + cpms, err := cpmsClient.Get(ctx, "cluster", metav1.GetOptions{}) + if err != nil { + return err + } + + cpms.Spec.Strategy.Type = strategy + _, err = cpmsClient.Update(ctx, cpms, metav1.UpdateOptions{}) + if err != nil { + return err + } + + framework.Logf("Successfully updated CPMS strategy to %v", strategy) + return nil +} + +// DeleteAllMasterMachines deletes all master machines and returns the list of deleted machine names +func DeleteAllMasterMachines(ctx context.Context, t TestingT, machineClient machinev1beta1client.MachineInterface) ([]string, error) { + machineList, err := machineClient.List(ctx, metav1.ListOptions{LabelSelector: masterMachineLabelSelector}) + if err != nil { + return nil, fmt.Errorf("error listing master machines: '%w'", err) + } + + var deletedMachineNames []string + for _, machine := range machineList.Items { + if err := DeleteMachine(ctx, t, machineClient, machine.Name); err != nil { + return deletedMachineNames, err + } + deletedMachineNames = append(deletedMachineNames, machine.Name) + } + + return deletedMachineNames, nil +} + +// EnsureUpdatedReplicasOnCPMS checks if status.updatedReplicas on the cluster CPMS equals the expected count +// updatedReplicas represents machines with the desired spec that are ready +func EnsureUpdatedReplicasOnCPMS(ctx context.Context, t TestingT, expectedCount int, cpmsClient machinev1client.ControlPlaneMachineSetInterface) error { + waitPollInterval := 15 * time.Second + waitPollTimeout := 90 * time.Minute + framework.Logf("Waiting up to %s for the CPMS to have status.updatedReplicas = %v", waitPollTimeout.String(), expectedCount) + + return wait.PollUntilContextTimeout(ctx, waitPollInterval, waitPollTimeout, true, func(ctx context.Context) (done bool, err error) { + cpms, err := cpmsClient.Get(ctx, "cluster", metav1.GetOptions{}) + if err != nil { + return isTransientAPIError(t, err) + } + + if cpms.Status.UpdatedReplicas != int32(expectedCount) { + framework.Logf("expected %d updated replicas on CPMS, got: %v", expectedCount, cpms.Status.UpdatedReplicas) + return false, nil + } + framework.Logf("CPMS has reached the desired number of updated replicas: %v", cpms.Status.UpdatedReplicas) + return true, nil + }) +} + +// GetVotingMemberNames returns the list of current voting etcd member names +func GetVotingMemberNames(ctx context.Context, t TestingT, etcdClientFactory EtcdClientCreator) ([]string, error) { + etcdClient, closeFn, err := etcdClientFactory.NewEtcdClient() + if err != nil { + return nil, fmt.Errorf("failed to get etcd client: %w", err) + } + defer closeFn() + + memberCtx, cancel := context.WithTimeout(ctx, 15*time.Second) + defer cancel() + memberList, err := etcdClient.MemberList(memberCtx) + if err != nil { + return nil, fmt.Errorf("failed to get the member list: %w", err) + } + + var votingMemberNames []string + for _, member := range memberList.Members { + if !member.IsLearner { + votingMemberNames = append(votingMemberNames, member.Name) + } + } + + framework.Logf("Current voting etcd members: %v", votingMemberNames) + return votingMemberNames, nil +} + +// EnsureVotingMembersExcluding waits for the cluster to have exactly expectedCount voting members, +// with none of the members in the excludedMemberNames list +func EnsureVotingMembersExcluding(ctx context.Context, t TestingT, etcdClientFactory EtcdClientCreator, kubeClient kubernetes.Interface, excludedMemberNames []string, expectedCount int) error { + waitPollInterval := 15 * time.Second + waitPollTimeout := 90 * time.Minute + excludedSet := sets.NewString(excludedMemberNames...) + framework.Logf("Waiting up to %s for the cluster to have %v voting members with none from the excluded list: %v", waitPollTimeout.String(), expectedCount, excludedMemberNames) + + return wait.PollUntilContextTimeout(ctx, waitPollInterval, waitPollTimeout, true, func(ctx context.Context) (done bool, err error) { + etcdClient, closeFn, err := etcdClientFactory.NewEtcdClient() + if err != nil { + framework.Logf("failed to get etcd client, will retry, err: %v", err) + return false, nil + } + defer closeFn() + + memberCtx, cancel := context.WithTimeout(ctx, 15*time.Second) + defer cancel() + memberList, err := etcdClient.MemberList(memberCtx) + if err != nil { + framework.Logf("failed to get the member list, will retry, err: %v", err) + return false, nil + } + + var votingMemberNames []string + excludedMemberIDs := sets.NewString() + for _, member := range memberList.Members { + if !member.IsLearner { + votingMemberNames = append(votingMemberNames, member.Name) + // Collect IDs of excluded members + if excludedSet.Has(member.Name) { + // Convert member ID to hexadecimal format to match etcd-endpoints ConfigMap format + memberID := fmt.Sprintf("%x", member.ID) + excludedMemberIDs.Insert(memberID) + } + } + } + + // Check if we have the expected count + if len(votingMemberNames) != expectedCount { + framework.Logf("unexpected number of voting etcd members, expected exactly %d, got: %v, current members are: %v", expectedCount, len(votingMemberNames), votingMemberNames) + return false, nil + } + + // Check if any of the current members are in the excluded list + for _, memberName := range votingMemberNames { + if excludedSet.Has(memberName) { + framework.Logf("found excluded member %q still in the cluster, current members are: %v", memberName, votingMemberNames) + return false, nil + } + } + + framework.Logf("cluster has reached the expected number of %v voting members with none from excluded list, current members are: %v", expectedCount, votingMemberNames) + + // Also validate etcd-endpoints ConfigMap + framework.Logf("ensuring that the openshift-etcd/etcd-endpoints cm has the expected number of %v voting members and excludes old members", expectedCount) + etcdEndpointsConfigMap, err := kubeClient.CoreV1().ConfigMaps("openshift-etcd").Get(ctx, "etcd-endpoints", metav1.GetOptions{}) + if err != nil { + return false, err + } + currentVotingMemberIPListSet := sets.NewString() + for memberID, votingMemberIP := range etcdEndpointsConfigMap.Data { + // Check if this member ID is in the excluded member IDs list + if excludedMemberIDs.Has(memberID) { + framework.Logf("found excluded member ID %q in etcd-endpoints ConfigMap, will retry", memberID) + return false, nil + } + currentVotingMemberIPListSet.Insert(votingMemberIP) + } + if currentVotingMemberIPListSet.Len() != expectedCount { + framework.Logf("unexpected number of voting members in the openshift-etcd/etcd-endpoints cm, expected exactly %d, got: %v, current members are: %v", expectedCount, currentVotingMemberIPListSet.Len(), currentVotingMemberIPListSet.List()) + return false, nil + } + + return true, nil + }) +} + // EnsureVotingMembersCount counts the number of voting etcd members, it doesn't evaluate health conditions or any other attributes (i.e. name) of individual members // this method won't fail immediately on errors, this is useful during scaling down operation until the feature can ensure this operation to be graceful func EnsureVotingMembersCount(ctx context.Context, t TestingT, etcdClientFactory EtcdClientCreator, kubeClient kubernetes.Interface, expectedMembersCount int) error { diff --git a/test/extended/etcd/vertical_scaling.go b/test/extended/etcd/vertical_scaling.go index 3ce63166faec..a3570d623ef7 100644 --- a/test/extended/etcd/vertical_scaling.go +++ b/test/extended/etcd/vertical_scaling.go @@ -7,8 +7,9 @@ import ( o "github.com/onsi/gomega" "github.com/pkg/errors" + machinev1 "github.com/openshift/api/machine/v1" machineclient "github.com/openshift/client-go/machine/clientset/versioned" - machinev1 "github.com/openshift/client-go/machine/clientset/versioned/typed/machine/v1" + machinev1client "github.com/openshift/client-go/machine/clientset/versioned/typed/machine/v1" machinev1beta1client "github.com/openshift/client-go/machine/clientset/versioned/typed/machine/v1beta1" testlibraryapi "github.com/openshift/library-go/test/library/apiserver" v1 "k8s.io/client-go/kubernetes/typed/core/v1" @@ -19,6 +20,7 @@ import ( metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/client-go/kubernetes" "k8s.io/kubernetes/test/e2e/framework" + e2eskipper "k8s.io/kubernetes/test/e2e/framework/skipper" ) var _ = g.Describe("[sig-etcd][Feature:EtcdVerticalScaling][Suite:openshift/etcd/scaling][Serial] etcd", func() { @@ -30,7 +32,7 @@ var _ = g.Describe("[sig-etcd][Feature:EtcdVerticalScaling][Suite:openshift/etcd machineClientSet *machineclient.Clientset machineClient machinev1beta1client.MachineInterface nodeClient v1.NodeInterface - cpmsClient machinev1.ControlPlaneMachineSetInterface + cpmsClient machinev1client.ControlPlaneMachineSetInterface kubeClient kubernetes.Interface cpmsActive bool ctx context.Context @@ -293,4 +295,85 @@ var _ = g.Describe("[sig-etcd][Feature:EtcdVerticalScaling][Suite:openshift/etcd err = scalingtestinglibrary.EnsureCPMSReplicasConverged(ctx, cpmsClient) o.Expect(err).ToNot(o.HaveOccurred()) }) + + // The following test validates CPMS OnDelete strategy behavior during full master replacement. + // OnDelete strategy differs from RollingUpdate in that CPMS does not automatically update + // machines when their spec changes. However, when machines are deleted, CPMS still creates + // replacements to maintain the desired replica count. + // + // This test verifies that CPMS correctly handles the deletion of all three master machines + // simultaneously while in OnDelete mode: + // 1) Switches CPMS to OnDelete strategy + // 2) Deletes all master machines at once + // 3) Validates CPMS creates three new replacement machines + // 4) Verifies all old etcd members are removed from both the cluster and etcd-endpoints ConfigMap + // 5) Waits for API server rollout to stabilize and verifies the cluster returns to 3 running machines + g.It("is able to delete all masters with OnDelete strategy and wait for CPMSO to replace them [Timeout:120m][apigroup:machine.openshift.io]", func() { + if !cpmsActive { + e2eskipper.Skipf("CPMS is not active on this platform, this test requires an active CPMS to validate OnDelete strategy") + } + + // step 1: Update CPMS to OnDelete strategy + framework.Logf("Updating CPMS strategy to OnDelete") + err = scalingtestinglibrary.UpdateCPMSStrategy(ctx, g.GinkgoT(), cpmsClient, machinev1.OnDelete) + err = errors.Wrap(err, "failed to update CPMS strategy to OnDelete") + o.Expect(err).ToNot(o.HaveOccurred()) + + // step 2: Restore RollingUpdate strategy in cleanup + defer func() { + framework.Logf("Restoring CPMS strategy to RollingUpdate") + err := scalingtestinglibrary.UpdateCPMSStrategy(ctx, g.GinkgoT(), cpmsClient, machinev1.RollingUpdate) + err = errors.Wrap(err, "cleanup: failed to restore CPMS strategy to RollingUpdate") + o.Expect(err).ToNot(o.HaveOccurred()) + }() + + // step 3: Capture current etcd member names before deletion + framework.Logf("Capturing current voting etcd member names") + oldMemberNames, err := scalingtestinglibrary.GetVotingMemberNames(ctx, g.GinkgoT(), etcdClientFactory) + err = errors.Wrap(err, "failed to get current voting member names") + o.Expect(err).ToNot(o.HaveOccurred()) + + // step 4: Delete all master machines + framework.Logf("Deleting all master machines") + deletedMachineNames, err := scalingtestinglibrary.DeleteAllMasterMachines(ctx, g.GinkgoT(), machineClient) + err = errors.Wrap(err, "failed to delete all master machines") + o.Expect(err).ToNot(o.HaveOccurred()) + framework.Logf("Deleted machines: %v", deletedMachineNames) + + // step 5: Wait for CPMS to show 3 updated replicas + framework.Logf("Waiting for CPMS to show 3 updated replicas") + err = scalingtestinglibrary.EnsureUpdatedReplicasOnCPMS(ctx, g.GinkgoT(), 3, cpmsClient) + err = errors.Wrap(err, "timed out waiting for CPMS to show 3 updated replicas") + o.Expect(err).ToNot(o.HaveOccurred()) + + // step 6: Wait for etcd membership to have 3 members with none from old member list + framework.Logf("Waiting for etcd membership to stabilize with new members") + err = scalingtestinglibrary.EnsureVotingMembersExcluding(ctx, g.GinkgoT(), etcdClientFactory, kubeClient, oldMemberNames, 3) + err = errors.Wrap(err, "timed out waiting for etcd to have 3 voting members excluding old members") + o.Expect(err).ToNot(o.HaveOccurred()) + + // step 7: Verify CPMS shows 3 ready replicas + framework.Logf("Waiting for 3 ready replicas on CPMS") + err = scalingtestinglibrary.EnsureReadyReplicasOnCPMS(ctx, g.GinkgoT(), 3, cpmsClient, nodeClient) + err = errors.Wrap(err, "timed out waiting for CPMS to show 3 ready replicas") + o.Expect(err).ToNot(o.HaveOccurred()) + + // step 8: Verify only 3 running master machines + framework.Logf("Waiting for 3 Running master machines") + err = scalingtestinglibrary.EnsureMasterMachinesAndCount(ctx, g.GinkgoT(), machineClient) + err = errors.Wrap(err, "timed out waiting for only 3 Running master machines") + o.Expect(err).ToNot(o.HaveOccurred()) + + // step 9: Verify CPMS replicas converged + framework.Logf("Waiting for CPMS replicas to converge") + err = scalingtestinglibrary.EnsureCPMSReplicasConverged(ctx, cpmsClient) + err = errors.Wrap(err, "CPMS replicas failed to converge") + o.Expect(err).ToNot(o.HaveOccurred()) + + // step 10: Wait for API server to stabilize + framework.Logf("Waiting for API servers to stabilize on the same revision") + err = testlibraryapi.WaitForAPIServerToStabilizeOnTheSameRevision(g.GinkgoT(), oc.KubeClient().CoreV1().Pods("openshift-kube-apiserver")) + err = errors.Wrap(err, "timed out waiting for APIServer pods to stabilize on the same revision") + o.Expect(err).ToNot(o.HaveOccurred()) + }) })