diff --git a/internal/controller/cluster/aero_info_calls.go b/internal/controller/cluster/aero_info_calls.go index 7ba8d6ce..d2b8444a 100644 --- a/internal/controller/cluster/aero_info_calls.go +++ b/internal/controller/cluster/aero_info_calls.go @@ -82,6 +82,21 @@ func (r *SingleClusterReconciler) waitForMultipleNodesSafeStopReady( return common.ReconcileSuccess() } +// waitForMigrationToComplete waits for the migration to complete on all the nodes in the cluster. +func (r *SingleClusterReconciler) waitForMigrationToComplete(policy *as.ClientPolicy, + ignorablePodNames sets.Set[string], +) common.ReconcileResult { + // This doesn't make actual connection, only objects having connection info are created + allHostConns, err := r.newAllHostConnWithOption(ignorablePodNames) + if err != nil { + return common.ReconcileError(fmt.Errorf("failed to get hostConn for aerospike cluster nodes: %v", err)) + } + + r.Log.Info("Waiting for migration to complete") + + return r.waitForClusterStability(policy, allHostConns) +} + func (r *SingleClusterReconciler) quiescePods( policy *as.ClientPolicy, allHostConns []*deployment.HostConn, pods []*corev1.Pod, ignorablePodNames sets.Set[string], ) error { diff --git a/internal/controller/cluster/rack.go b/internal/controller/cluster/rack.go index 33b54dc3..66029575 100644 --- a/internal/controller/cluster/rack.go +++ b/internal/controller/cluster/rack.go @@ -974,6 +974,18 @@ func (r *SingleClusterReconciler) scaleDownRack( ); !res.IsSuccess { return found, res } + + // Wait for migration to complete before deleting the pods. + if res := r.waitForMigrationToComplete(policy, + ignorablePodNames, + ); !res.IsSuccess { + r.Log.Error( + res.Err, "Failed to wait for migration to complete before deleting pods", + "rackID", rackState.Rack.ID, + ) + + return found, res + } } // Update new object with new size diff --git a/test/cluster/cluster_helper.go b/test/cluster/cluster_helper.go index 7736c581..22101cd9 100644 --- a/test/cluster/cluster_helper.go +++ b/test/cluster/cluster_helper.go @@ -4,6 +4,7 @@ import ( goctx "context" "fmt" "reflect" + "strconv" "time" "github.com/go-logr/logr" @@ -616,6 +617,30 @@ func validateMigrateFillDelay( return err } +// Helper function to get number of migrations in progress +func getMigrationsInProgress(ctx goctx.Context, k8sClient client.Client, + clusterNamespacedName types.NamespacedName, podList *corev1.PodList) int { + maxMigrations := 0 + + for idx := range podList.Items { + pod := &podList.Items[idx] + + podStats, err := requestInfoFromNode(logger, k8sClient, ctx, clusterNamespacedName, "statistics", pod.Name) + if err != nil { + continue + } + + if strVal, ok := podStats["migrate_partitions_remaining"]; ok { + val, err := strconv.Atoi(strVal) + if err == nil && val > maxMigrations { + maxMigrations = val + } + } + } + + return maxMigrations +} + // validate readiness port func validateReadinessProbe(ctx goctx.Context, k8sClient client.Client, aeroCluster *asdbv1.AerospikeCluster, requiredPort int) error { diff --git a/test/cluster/cluster_test.go b/test/cluster/cluster_test.go index c7c506c5..a81f0ff7 100644 --- a/test/cluster/cluster_test.go +++ b/test/cluster/cluster_test.go @@ -4,6 +4,7 @@ import ( goctx "context" "fmt" "strconv" + "strings" "time" . "github.com/onsi/ginkgo/v2" @@ -78,6 +79,11 @@ var _ = Describe( ScaleDownWithMigrateFillDelay(ctx) }, ) + Context( + "ValidateScaleDownWaitForMigrations", func() { + ValidateScaleDownWaitForMigrations(ctx) + }, + ) Context( "PauseReconcile", func() { PauseReconcileTest(ctx) @@ -767,6 +773,95 @@ func ScaleDownWithMigrateFillDelay(ctx goctx.Context) { ) } +func ValidateScaleDownWaitForMigrations(ctx goctx.Context) { + Context( + "ValidateScaleDownWaitForMigrations", func() { + clusterNamespacedName := test.GetNamespacedName( + "wait-for-migrations-cluster", namespace, + ) + + BeforeEach( + func() { + aeroCluster := createDummyAerospikeCluster(clusterNamespacedName, 4) + Expect(DeployCluster(k8sClient, ctx, aeroCluster)).ToNot(HaveOccurred()) + }, + ) + + AfterEach( + func() { + aeroCluster := &asdbv1.AerospikeCluster{ + ObjectMeta: metav1.ObjectMeta{ + Name: clusterNamespacedName.Name, + Namespace: clusterNamespacedName.Namespace, + }, + } + + Expect(DeleteCluster(k8sClient, ctx, aeroCluster)).ToNot(HaveOccurred()) + Expect(CleanupPVC(k8sClient, aeroCluster.Namespace, aeroCluster.Name)).ToNot(HaveOccurred()) + }, + ) + + It( + "Should wait for migrations to complete before pod deletion", func() { + aeroCluster, err := getCluster(k8sClient, ctx, clusterNamespacedName) + Expect(err).ToNot(HaveOccurred()) + + aeroCluster.Spec.Size -= 1 + err = k8sClient.Update(ctx, aeroCluster) + Expect(err).ToNot(HaveOccurred()) + + seenQuiesced := false + seenMigrationsComplete := false + + Eventually(func() bool { + podList, lErr := getPodList(aeroCluster, k8sClient) + Expect(lErr).ToNot(HaveOccurred()) + info, iErr := requestInfoFromNode(logger, k8sClient, ctx, clusterNamespacedName, "namespace/test", + podList.Items[0].Name) + Expect(iErr).ToNot(HaveOccurred()) + + var nodesQuiesced string + confs := strings.Split(info["namespace/test"], ";") + for _, conf := range confs { + if strings.Contains(conf, "nodes_quiesced") { + nodesQuiesced = strings.Split(conf, "=")[1] + break + } + } + migrations := getMigrationsInProgress(ctx, k8sClient, clusterNamespacedName, podList) + + podCount := utils.Len32(podList.Items) + // Track quiesced + if podCount == 4 && nodesQuiesced == "1" { + seenQuiesced = true + } + + // Track migrations complete after quiesce + if seenQuiesced && podCount == 4 && migrations == 0 { + seenMigrationsComplete = true + } + + // Fail condition: if scale down happens before migrations finish + if podCount < 4 && migrations != 0 { + Fail(fmt.Sprintf("Pods scaledown while migrations still running (pods=%d, migrations=%d)", + podCount, migrations)) + } + + // Success condition: pod scale down only after migrations complete + return podCount == aeroCluster.Spec.Size && seenQuiesced && seenMigrationsComplete + }, 5*time.Minute, 1*time.Second).Should(BeTrue(), "Pods should only scaledown after migrations complete") + + err = waitForAerospikeCluster( + k8sClient, ctx, aeroCluster, int(aeroCluster.Spec.Size), retryInterval, + getTimeout(2), []asdbv1.AerospikeClusterPhase{asdbv1.AerospikeClusterCompleted}, + ) + Expect(err).ToNot(HaveOccurred()) + }, + ) + }, + ) +} + func clusterWithMaxIgnorablePod(ctx goctx.Context) { var ( err error