Skip to content

Commit 286ba74

Browse files
Fix proactive scale up injecting fake pods for scheduling gated pods
1 parent 94637a2 commit 286ba74

File tree

4 files changed

+94
-4
lines changed

4 files changed

+94
-4
lines changed

cluster-autoscaler/processors/podinjection/pod_group.go

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ import (
2020
apiv1 "k8s.io/api/core/v1"
2121
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
2222
"k8s.io/apimachinery/pkg/types"
23+
kube_util "k8s.io/autoscaler/cluster-autoscaler/utils/kubernetes"
2324
)
2425

2526
type podGroup struct {
@@ -45,6 +46,30 @@ func groupPods(pods []*apiv1.Pod, controllers []controller) map[types.UID]podGro
4546
return podGroups
4647
}
4748

49+
func filterOutSchedulingGatedPods(groups map[types.UID]podGroup, allPods []*apiv1.Pod) map[types.UID]podGroup {
50+
if groups != nil {
51+
podsWithSchedulingGates := kube_util.SchedulingGatedPods(allPods)
52+
for _, pod := range podsWithSchedulingGates {
53+
if pod != nil {
54+
groups = removeSchedulingGatedPodFromGroups(groups, pod)
55+
}
56+
}
57+
}
58+
return groups
59+
}
60+
61+
func removeSchedulingGatedPodFromGroups(groups map[types.UID]podGroup, pod *apiv1.Pod) map[types.UID]podGroup {
62+
for _, podOwnerRef := range pod.OwnerReferences {
63+
// SchedulingGated pods can't be unschedualable nor unprocessed nor scheduled so it is not expected
64+
// to have them as group sample nor in pod count, so decreasing desiredReplicas by one is enough
65+
if grp, found := groups[podOwnerRef.UID]; found {
66+
grp.desiredReplicas -= 1
67+
groups[podOwnerRef.UID] = grp
68+
}
69+
}
70+
return groups
71+
}
72+
4873
// updatePodGroups updates the pod group if ownerRef is the controller of the pod
4974
func updatePodGroups(pod *apiv1.Pod, ownerRef metav1.OwnerReference, podGroups map[types.UID]podGroup) map[types.UID]podGroup {
5075
if ownerRef.Controller == nil {

cluster-autoscaler/processors/podinjection/pod_injection_processor.go

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -60,10 +60,16 @@ func (p *PodInjectionPodListProcessor) Process(ctx *context.AutoscalingContext,
6060
return unschedulablePods, fmt.Errorf("failed to list nodeInfos from cluster snapshot: %v", err)
6161
}
6262
scheduledPods := podsFromNodeInfos(nodeInfos)
63-
6463
groupedPods := groupPods(append(scheduledPods, unschedulablePods...), controllers)
65-
var podsToInject []*apiv1.Pod
6664

65+
allPods, err := ctx.AllPodLister().List()
66+
if err == nil {
67+
groupedPods = filterOutSchedulingGatedPods(groupedPods, allPods)
68+
} else {
69+
klog.Warningf("Pod injection processor failed to list pods to filter scheduling gated pods with error %v", err.Error())
70+
}
71+
72+
var podsToInject []*apiv1.Pod
6773
for _, groupedPod := range groupedPods {
6874
var fakePodCount = groupedPod.fakePodCount()
6975
fakePods := makeFakePods(groupedPod.ownerUid, groupedPod.sample, fakePodCount)

cluster-autoscaler/processors/podinjection/pod_injection_processor_test.go

Lines changed: 37 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -38,13 +38,22 @@ import (
3838
. "k8s.io/autoscaler/cluster-autoscaler/utils/test"
3939
)
4040

41+
func setStatusConditionGated(pod *apiv1.Pod) {
42+
pod.Status.Conditions = []apiv1.PodCondition{{
43+
Type: apiv1.PodScheduled,
44+
Status: apiv1.ConditionFalse,
45+
Reason: apiv1.PodReasonSchedulingGated,
46+
}}
47+
}
4148
func TestTargetCountInjectionPodListProcessor(t *testing.T) {
4249
node := BuildTestNode("node1", 100, 0)
4350

4451
replicaSet1 := createTestReplicaSet("rep-set-1", "default", 5)
4552
scheduledPodRep1Copy1 := buildTestPod("default", "-scheduled-pod-rep1-1", WithControllerOwnerRef(replicaSet1.Name, "ReplicaSet", replicaSet1.UID), WithNodeName(node.Name))
4653
podRep1Copy1 := buildTestPod("default", "pod-rep1-1", WithControllerOwnerRef(replicaSet1.Name, "ReplicaSet", replicaSet1.UID))
4754
podRep1Copy2 := buildTestPod("default", "pod-rep1-2", WithControllerOwnerRef(replicaSet1.Name, "ReplicaSet", replicaSet1.UID))
55+
podRep1Copy3Gated := buildTestPod("default", "pod-rep1-3", WithControllerOwnerRef(replicaSet1.Name, "ReplicaSet", replicaSet1.UID))
56+
setStatusConditionGated(podRep1Copy3Gated)
4857

4958
job1 := createTestJob("job-1", "default", 10, 10, 0)
5059
scheduledPodJob1Copy1 := buildTestPod("default", "scheduled-pod-job1-1", WithControllerOwnerRef(job1.Name, "Job", job1.UID), WithNodeName(node.Name))
@@ -55,6 +64,8 @@ func TestTargetCountInjectionPodListProcessor(t *testing.T) {
5564
scheduledParallelStatefulsetPod := buildTestPod("default", "parallel-scheduled-pod-statefulset-1", WithControllerOwnerRef(parallelStatefulset.Name, "StatefulSet", parallelStatefulset.UID), WithNodeName(node.Name))
5665
parallelStatefulsetPodCopy1 := buildTestPod("default", "parallel-pod-statefulset1-1", WithControllerOwnerRef(parallelStatefulset.Name, "StatefulSet", parallelStatefulset.UID))
5766
parallelStatefulsetPodCopy2 := buildTestPod("default", "parallel-pod-statefulset1-2", WithControllerOwnerRef(parallelStatefulset.Name, "StatefulSet", parallelStatefulset.UID))
67+
parallelStatefulsetPodCopy3Gated := buildTestPod("default", "parallel-pod-statefulset1-3", WithControllerOwnerRef(parallelStatefulset.Name, "StatefulSet", parallelStatefulset.UID))
68+
setStatusConditionGated(parallelStatefulsetPodCopy3Gated)
5869

5970
sequentialStatefulset := createTestStatefulset("sequential-statefulset-1", "default", appsv1.OrderedReadyPodManagement, 10)
6071
scheduledSequentialStatefulsetPod := buildTestPod("default", "sequential-scheduled-pod-statefulset-1", WithControllerOwnerRef(sequentialStatefulset.Name, "StatefulSet", sequentialStatefulset.UID), WithNodeName(node.Name))
@@ -72,6 +83,7 @@ func TestTargetCountInjectionPodListProcessor(t *testing.T) {
7283
name string
7384
scheduledPods []*apiv1.Pod
7485
unschedulablePods []*apiv1.Pod
86+
otherPods []*apiv1.Pod
7587
wantPods []*apiv1.Pod
7688
}{
7789
{
@@ -111,6 +123,20 @@ func TestTargetCountInjectionPodListProcessor(t *testing.T) {
111123
makeFakePods(parallelStatefulset.UID, scheduledParallelStatefulsetPod, 7)...,
112124
),
113125
},
126+
{
127+
name: "Mix of controllers with scheduling gated pods",
128+
scheduledPods: []*apiv1.Pod{scheduledPodRep1Copy1, scheduledPodJob1Copy1, scheduledParallelStatefulsetPod},
129+
unschedulablePods: []*apiv1.Pod{podRep1Copy1, podRep1Copy2, podJob1Copy1, podJob1Copy2, parallelStatefulsetPodCopy1, parallelStatefulsetPodCopy2},
130+
otherPods: []*apiv1.Pod{parallelStatefulsetPodCopy3Gated, podRep1Copy3Gated},
131+
wantPods: append(
132+
append(
133+
append(
134+
[]*apiv1.Pod{podRep1Copy1, podRep1Copy2, podJob1Copy1, podJob1Copy2, parallelStatefulsetPodCopy1, parallelStatefulsetPodCopy2},
135+
makeFakePods(replicaSet1.UID, scheduledPodRep1Copy1, 1)...),
136+
makeFakePods(job1.UID, scheduledPodJob1Copy1, 7)...),
137+
makeFakePods(parallelStatefulset.UID, scheduledParallelStatefulsetPod, 6)...,
138+
),
139+
},
114140
}
115141

116142
for _, tc := range testCases {
@@ -119,15 +145,16 @@ func TestTargetCountInjectionPodListProcessor(t *testing.T) {
119145
clusterSnapshot := testsnapshot.NewCustomTestSnapshotOrDie(t, store.NewDeltaSnapshotStore(16))
120146
err := clusterSnapshot.AddNodeInfo(framework.NewTestNodeInfo(node, tc.scheduledPods...))
121147
assert.NoError(t, err)
148+
allPodsLister := fakeAllPodsLister{podsToList: append(append(tc.scheduledPods, tc.unschedulablePods...), tc.otherPods...)}
122149
ctx := context.AutoscalingContext{
123150
AutoscalingKubeClients: context.AutoscalingKubeClients{
124-
ListerRegistry: kubernetes.NewListerRegistry(nil, nil, nil, nil, nil, nil, jobLister, replicaSetLister, statefulsetLister),
151+
ListerRegistry: kubernetes.NewListerRegistry(nil, nil, &allPodsLister, nil, nil, nil, jobLister, replicaSetLister, statefulsetLister),
125152
},
126153
ClusterSnapshot: clusterSnapshot,
127154
}
128155
pods, err := p.Process(&ctx, tc.unschedulablePods)
129156
assert.NoError(t, err)
130-
assert.ElementsMatch(t, tc.wantPods, pods)
157+
assert.Equal(t, tc.wantPods, pods)
131158
})
132159
}
133160
}
@@ -433,3 +460,11 @@ func buildTestPod(namespace, name string, opts ...podOption) *apiv1.Pod {
433460
}
434461

435462
type podOption func(*apiv1.Pod)
463+
464+
type fakeAllPodsLister struct {
465+
podsToList []*apiv1.Pod
466+
}
467+
468+
func (l *fakeAllPodsLister) List() ([]*apiv1.Pod, error) {
469+
return l.podsToList, nil
470+
}

cluster-autoscaler/utils/kubernetes/listers.go

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -204,6 +204,30 @@ func SchedulerUnprocessedPods(allPods []*apiv1.Pod, bypassedSchedulers map[strin
204204
return unprocessedPods
205205
}
206206

207+
// SchedulingGatedPods is a helper method that returns all pods which has scheduling gate
208+
// SchedulingGated pods are not scheduled nor deleted by the implementation and are not unschedulable nor unprocessed by definition
209+
func SchedulingGatedPods(allPods []*apiv1.Pod) []*apiv1.Pod {
210+
var schedulingGatedPods []*apiv1.Pod
211+
for _, pod := range allPods {
212+
if isSchedulingGated(pod) {
213+
schedulingGatedPods = append(schedulingGatedPods, pod)
214+
}
215+
}
216+
return schedulingGatedPods
217+
}
218+
219+
// isSchedulingGated returns true in case PodScheduled is false with reason PodReasonSchedulingGated
220+
func isSchedulingGated(pod *apiv1.Pod) bool {
221+
if isScheduled(pod) || isDeleted(pod) {
222+
return false
223+
}
224+
_, condition := podv1.GetPodCondition(&pod.Status, apiv1.PodScheduled)
225+
if condition != nil && condition.Status == apiv1.ConditionFalse && condition.Reason == apiv1.PodReasonSchedulingGated {
226+
return true
227+
}
228+
return false
229+
}
230+
207231
// UnschedulablePods is a helper method that returns all unschedulable pods from given pod list.
208232
func UnschedulablePods(allPods []*apiv1.Pod) []*apiv1.Pod {
209233
var unschedulablePods []*apiv1.Pod

0 commit comments

Comments
 (0)