Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion cluster-autoscaler/processors/podinjection/pod_group.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,6 @@ func groupPods(pods []*apiv1.Pod, controllers []controller) map[types.UID]podGro
for _, con := range controllers {
podGroups[con.uid] = makePodGroup(con.desiredReplicas)
}

for _, pod := range pods {
for _, ownerRef := range pod.OwnerReferences {
podGroups = updatePodGroups(pod, ownerRef, podGroups)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
podinjectionbackoff "k8s.io/autoscaler/cluster-autoscaler/processors/podinjection/backoff"
"k8s.io/autoscaler/cluster-autoscaler/simulator/fake"
"k8s.io/autoscaler/cluster-autoscaler/simulator/framework"
kube_util "k8s.io/autoscaler/cluster-autoscaler/utils/kubernetes"
"k8s.io/klog/v2"
)

Expand Down Expand Up @@ -61,9 +62,16 @@ func (p *PodInjectionPodListProcessor) Process(ctx *context.AutoscalingContext,
}
scheduledPods := podsFromNodeInfos(nodeInfos)

groupedPods := groupPods(append(scheduledPods, unschedulablePods...), controllers)
var podsToInject []*apiv1.Pod
allPods, err := ctx.AllPodLister().List()
if err != nil {
klog.Errorf("Failed to list all pods from all pod lister: %v", err)
return unschedulablePods, fmt.Errorf("failed to list all pods from all pod lister: %v", err)
}
schedulingGatedPods := kube_util.SchedulingGatedPods(allPods)

groupedPods := groupPods(append(append(scheduledPods, unschedulablePods...), schedulingGatedPods...), controllers)

var podsToInject []*apiv1.Pod
for _, groupedPod := range groupedPods {
var fakePodCount = groupedPod.fakePodCount()
fakePods := makeFakePods(groupedPod.ownerUid, groupedPod.sample, fakePodCount)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,8 @@ func TestTargetCountInjectionPodListProcessor(t *testing.T) {
scheduledPodRep1Copy1 := buildTestPod("default", "-scheduled-pod-rep1-1", WithControllerOwnerRef(replicaSet1.Name, "ReplicaSet", replicaSet1.UID), WithNodeName(node.Name))
podRep1Copy1 := buildTestPod("default", "pod-rep1-1", WithControllerOwnerRef(replicaSet1.Name, "ReplicaSet", replicaSet1.UID))
podRep1Copy2 := buildTestPod("default", "pod-rep1-2", WithControllerOwnerRef(replicaSet1.Name, "ReplicaSet", replicaSet1.UID))
podRep1Copy3Gated := buildTestPod("default", "pod-rep1-3", WithControllerOwnerRef(replicaSet1.Name, "ReplicaSet", replicaSet1.UID))
podRep1Copy3Gated = WithSchedulingGatedStatus(podRep1Copy3Gated)

job1 := createTestJob("job-1", "default", 10, 10, 0)
scheduledPodJob1Copy1 := buildTestPod("default", "scheduled-pod-job1-1", WithControllerOwnerRef(job1.Name, "Job", job1.UID), WithNodeName(node.Name))
Expand All @@ -55,6 +57,8 @@ func TestTargetCountInjectionPodListProcessor(t *testing.T) {
scheduledParallelStatefulsetPod := buildTestPod("default", "parallel-scheduled-pod-statefulset-1", WithControllerOwnerRef(parallelStatefulset.Name, "StatefulSet", parallelStatefulset.UID), WithNodeName(node.Name))
parallelStatefulsetPodCopy1 := buildTestPod("default", "parallel-pod-statefulset1-1", WithControllerOwnerRef(parallelStatefulset.Name, "StatefulSet", parallelStatefulset.UID))
parallelStatefulsetPodCopy2 := buildTestPod("default", "parallel-pod-statefulset1-2", WithControllerOwnerRef(parallelStatefulset.Name, "StatefulSet", parallelStatefulset.UID))
parallelStatefulsetPodCopy3Gated := buildTestPod("default", "parallel-pod-statefulset1-3", WithControllerOwnerRef(parallelStatefulset.Name, "StatefulSet", parallelStatefulset.UID))
parallelStatefulsetPodCopy3Gated = WithSchedulingGatedStatus(parallelStatefulsetPodCopy3Gated)

sequentialStatefulset := createTestStatefulset("sequential-statefulset-1", "default", appsv1.OrderedReadyPodManagement, 10)
scheduledSequentialStatefulsetPod := buildTestPod("default", "sequential-scheduled-pod-statefulset-1", WithControllerOwnerRef(sequentialStatefulset.Name, "StatefulSet", sequentialStatefulset.UID), WithNodeName(node.Name))
Expand All @@ -72,6 +76,7 @@ func TestTargetCountInjectionPodListProcessor(t *testing.T) {
name string
scheduledPods []*apiv1.Pod
unschedulablePods []*apiv1.Pod
otherPods []*apiv1.Pod
wantPods []*apiv1.Pod
}{
{
Expand Down Expand Up @@ -111,6 +116,20 @@ func TestTargetCountInjectionPodListProcessor(t *testing.T) {
makeFakePods(parallelStatefulset.UID, scheduledParallelStatefulsetPod, 7)...,
),
},
{
name: "Mix of controllers with scheduling gated pods",
scheduledPods: []*apiv1.Pod{scheduledPodRep1Copy1, scheduledPodJob1Copy1, scheduledParallelStatefulsetPod},
unschedulablePods: []*apiv1.Pod{podRep1Copy1, podRep1Copy2, podJob1Copy1, podJob1Copy2, parallelStatefulsetPodCopy1, parallelStatefulsetPodCopy2},
otherPods: []*apiv1.Pod{parallelStatefulsetPodCopy3Gated, podRep1Copy3Gated},
wantPods: append(
append(
append(
[]*apiv1.Pod{podRep1Copy1, podRep1Copy2, podJob1Copy1, podJob1Copy2, parallelStatefulsetPodCopy1, parallelStatefulsetPodCopy2},
makeFakePods(replicaSet1.UID, scheduledPodRep1Copy1, 1)...),
makeFakePods(job1.UID, scheduledPodJob1Copy1, 7)...),
makeFakePods(parallelStatefulset.UID, scheduledParallelStatefulsetPod, 6)...,
),
},
}

for _, tc := range testCases {
Expand All @@ -119,9 +138,10 @@ func TestTargetCountInjectionPodListProcessor(t *testing.T) {
clusterSnapshot := testsnapshot.NewCustomTestSnapshotOrDie(t, store.NewDeltaSnapshotStore(16))
err := clusterSnapshot.AddNodeInfo(framework.NewTestNodeInfo(node, tc.scheduledPods...))
assert.NoError(t, err)
allPodsLister := fakeAllPodsLister{podsToList: append(append(tc.scheduledPods, tc.unschedulablePods...), tc.otherPods...)}
ctx := context.AutoscalingContext{
AutoscalingKubeClients: context.AutoscalingKubeClients{
ListerRegistry: kubernetes.NewListerRegistry(nil, nil, nil, nil, nil, nil, jobLister, replicaSetLister, statefulsetLister),
ListerRegistry: kubernetes.NewListerRegistry(nil, nil, &allPodsLister, nil, nil, nil, jobLister, replicaSetLister, statefulsetLister),
},
ClusterSnapshot: clusterSnapshot,
}
Expand Down Expand Up @@ -433,3 +453,11 @@ func buildTestPod(namespace, name string, opts ...podOption) *apiv1.Pod {
}

type podOption func(*apiv1.Pod)

type fakeAllPodsLister struct {
podsToList []*apiv1.Pod
}

func (l *fakeAllPodsLister) List() ([]*apiv1.Pod, error) {
return l.podsToList, nil
}
24 changes: 24 additions & 0 deletions cluster-autoscaler/utils/kubernetes/listers.go
Original file line number Diff line number Diff line change
Expand Up @@ -204,6 +204,30 @@ func SchedulerUnprocessedPods(allPods []*apiv1.Pod, bypassedSchedulers map[strin
return unprocessedPods
}

// SchedulingGatedPods is a helper method that returns all pods which has scheduling gate
// SchedulingGated pods are not scheduled nor deleted by the implementation and are not unschedulable nor unprocessed by definition
func SchedulingGatedPods(allPods []*apiv1.Pod) []*apiv1.Pod {
var schedulingGatedPods []*apiv1.Pod
for _, pod := range allPods {
if pod != nil && isSchedulingGated(pod) {
schedulingGatedPods = append(schedulingGatedPods, pod)
}
}
return schedulingGatedPods
}

// isSchedulingGated returns true in case PodScheduled is false with reason PodReasonSchedulingGated
func isSchedulingGated(pod *apiv1.Pod) bool {
if isScheduled(pod) || isDeleted(pod) {
return false
}
_, condition := podv1.GetPodCondition(&pod.Status, apiv1.PodScheduled)
if condition != nil && condition.Status == apiv1.ConditionFalse && condition.Reason == apiv1.PodReasonSchedulingGated {
return true
}
return false
}

// UnschedulablePods is a helper method that returns all unschedulable pods from given pod list.
func UnschedulablePods(allPods []*apiv1.Pod) []*apiv1.Pod {
var unschedulablePods []*apiv1.Pod
Expand Down
18 changes: 18 additions & 0 deletions cluster-autoscaler/utils/test/test_utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -359,6 +359,24 @@ func BuildTestNode(name string, millicpuCapacity int64, memCapacity int64) *apiv
return node
}

// WithSchedulingGatedStatus upserts the condition with type PodScheduled to be of status false
// and reason PodReasonSchedulingGated
func WithSchedulingGatedStatus(pod *apiv1.Pod) *apiv1.Pod {
gatedPodCondition := apiv1.PodCondition{
Type: apiv1.PodScheduled,
Status: apiv1.ConditionFalse,
Reason: apiv1.PodReasonSchedulingGated,
}
for index := range pod.Status.Conditions {
if pod.Status.Conditions[index].Type == apiv1.PodScheduled {
pod.Status.Conditions[index] = gatedPodCondition
return pod
}
}
pod.Status.Conditions = append(pod.Status.Conditions, gatedPodCondition)
return pod
}

// WithAllocatable adds specified milliCpu and memory to Allocatable of the node in-place.
func WithAllocatable(node *apiv1.Node, millicpuAllocatable, memAllocatable int64) *apiv1.Node {
node.Status.Allocatable[apiv1.ResourceCPU] = *resource.NewMilliQuantity(millicpuAllocatable, resource.DecimalSI)
Expand Down
Loading