Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 15 additions & 0 deletions internal/controller/cluster/aero_info_calls.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,21 @@ func (r *SingleClusterReconciler) waitForMultipleNodesSafeStopReady(
return common.ReconcileSuccess()
}

// waitForMigrationToComplete waits for the migration to complete on all the nodes in the cluster.
func (r *SingleClusterReconciler) waitForMigrationToComplete(policy *as.ClientPolicy,
ignorablePodNames sets.Set[string],
) common.ReconcileResult {
// This doesn't make actual connection, only objects having connection info are created
allHostConns, err := r.newAllHostConnWithOption(ignorablePodNames)
if err != nil {
return common.ReconcileError(fmt.Errorf("failed to get hostConn for aerospike cluster nodes: %v", err))
}

r.Log.Info("Waiting for migration to complete")

return r.waitForClusterStability(policy, allHostConns)
}

func (r *SingleClusterReconciler) quiescePods(
policy *as.ClientPolicy, allHostConns []*deployment.HostConn, pods []*corev1.Pod, ignorablePodNames sets.Set[string],
) error {
Expand Down
12 changes: 12 additions & 0 deletions internal/controller/cluster/rack.go
Original file line number Diff line number Diff line change
Expand Up @@ -974,6 +974,18 @@ func (r *SingleClusterReconciler) scaleDownRack(
); !res.IsSuccess {
return found, res
}

// Wait for migration to complete before deleting the pods.
if res := r.waitForMigrationToComplete(policy,
ignorablePodNames,
); !res.IsSuccess {
r.Log.Error(
res.Err, "Failed to wait for migration to complete before deleting pods",
"rackID", rackState.Rack.ID,
)

return found, res
}
}

// Update new object with new size
Expand Down
25 changes: 25 additions & 0 deletions test/cluster/cluster_helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
goctx "context"
"fmt"
"reflect"
"strconv"
"time"

"github.com/go-logr/logr"
Expand Down Expand Up @@ -616,6 +617,30 @@ func validateMigrateFillDelay(
return err
}

// Helper function to get number of migrations in progress
func getMigrationsInProgress(ctx goctx.Context, k8sClient client.Client,
clusterNamespacedName types.NamespacedName, podList *corev1.PodList) int {
maxMigrations := 0

for idx := range podList.Items {
pod := &podList.Items[idx]

podStats, err := requestInfoFromNode(logger, k8sClient, ctx, clusterNamespacedName, "statistics", pod.Name)
if err != nil {
continue
}

if strVal, ok := podStats["migrate_partitions_remaining"]; ok {
val, err := strconv.Atoi(strVal)
if err == nil && val > maxMigrations {
maxMigrations = val
}
}
}

return maxMigrations
}

// validate readiness port
func validateReadinessProbe(ctx goctx.Context, k8sClient client.Client, aeroCluster *asdbv1.AerospikeCluster,
requiredPort int) error {
Expand Down
95 changes: 95 additions & 0 deletions test/cluster/cluster_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
goctx "context"
"fmt"
"strconv"
"strings"
"time"

. "github.com/onsi/ginkgo/v2"
Expand Down Expand Up @@ -78,6 +79,11 @@ var _ = Describe(
ScaleDownWithMigrateFillDelay(ctx)
},
)
Context(
"ValidateScaleDownWaitForMigrations", func() {
ValidateScaleDownWaitForMigrations(ctx)
},
)
Context(
"PauseReconcile", func() {
PauseReconcileTest(ctx)
Expand Down Expand Up @@ -767,6 +773,95 @@ func ScaleDownWithMigrateFillDelay(ctx goctx.Context) {
)
}

func ValidateScaleDownWaitForMigrations(ctx goctx.Context) {
Context(
"ValidateScaleDownWaitForMigrations", func() {
clusterNamespacedName := test.GetNamespacedName(
"wait-for-migrations-cluster", namespace,
)

BeforeEach(
func() {
aeroCluster := createDummyAerospikeCluster(clusterNamespacedName, 4)
Expect(DeployCluster(k8sClient, ctx, aeroCluster)).ToNot(HaveOccurred())
},
)

AfterEach(
func() {
aeroCluster := &asdbv1.AerospikeCluster{
ObjectMeta: metav1.ObjectMeta{
Name: clusterNamespacedName.Name,
Namespace: clusterNamespacedName.Namespace,
},
}

Expect(DeleteCluster(k8sClient, ctx, aeroCluster)).ToNot(HaveOccurred())
Expect(CleanupPVC(k8sClient, aeroCluster.Namespace, aeroCluster.Name)).ToNot(HaveOccurred())
},
)

It(
"Should wait for migrations to complete before pod deletion", func() {
aeroCluster, err := getCluster(k8sClient, ctx, clusterNamespacedName)
Expect(err).ToNot(HaveOccurred())

aeroCluster.Spec.Size -= 1
err = k8sClient.Update(ctx, aeroCluster)
Expect(err).ToNot(HaveOccurred())

seenQuiesced := false
seenMigrationsComplete := false

Eventually(func() bool {
podList, lErr := getPodList(aeroCluster, k8sClient)
Expect(lErr).ToNot(HaveOccurred())
info, iErr := requestInfoFromNode(logger, k8sClient, ctx, clusterNamespacedName, "namespace/test",
podList.Items[0].Name)
Expect(iErr).ToNot(HaveOccurred())

var nodesQuiesced string
confs := strings.Split(info["namespace/test"], ";")
for _, conf := range confs {
if strings.Contains(conf, "nodes_quiesced") {
nodesQuiesced = strings.Split(conf, "=")[1]
break
}
}
migrations := getMigrationsInProgress(ctx, k8sClient, clusterNamespacedName, podList)

podCount := utils.Len32(podList.Items)
// Track quiesced
if podCount == 4 && nodesQuiesced == "1" {
seenQuiesced = true
}

// Track migrations complete after quiesce
if seenQuiesced && podCount == 4 && migrations == 0 {
seenMigrationsComplete = true
}

// Fail condition: if scale down happens before migrations finish
if podCount < 4 && migrations != 0 {
Fail(fmt.Sprintf("Pods scaledown while migrations still running (pods=%d, migrations=%d)",
podCount, migrations))
}

// Success condition: pod scale down only after migrations complete
return podCount == aeroCluster.Spec.Size && seenQuiesced && seenMigrationsComplete
}, 5*time.Minute, 1*time.Second).Should(BeTrue(), "Pods should only scaledown after migrations complete")

err = waitForAerospikeCluster(
k8sClient, ctx, aeroCluster, int(aeroCluster.Spec.Size), retryInterval,
getTimeout(2), []asdbv1.AerospikeClusterPhase{asdbv1.AerospikeClusterCompleted},
)
Expect(err).ToNot(HaveOccurred())
},
)
},
)
}

func clusterWithMaxIgnorablePod(ctx goctx.Context) {
var (
err error
Expand Down