Migrate filter out schedulabe to PodListProcessor

This commit is contained in:
Łukasz Osipiuk 2019-04-10 21:53:36 +02:00
parent 5c09c50774
commit db4c6f1133
10 changed files with 399 additions and 303 deletions

View File

@ -0,0 +1,158 @@
/*
Copyright 2019 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package core
import (
"sort"
"time"
"k8s.io/autoscaler/cluster-autoscaler/context"
"k8s.io/autoscaler/cluster-autoscaler/metrics"
"k8s.io/autoscaler/cluster-autoscaler/processors/pods"
"k8s.io/autoscaler/cluster-autoscaler/simulator"
"k8s.io/autoscaler/cluster-autoscaler/utils/glogx"
schedulerutil "k8s.io/autoscaler/cluster-autoscaler/utils/scheduler"
apiv1 "k8s.io/api/core/v1"
"k8s.io/klog"
"k8s.io/kubernetes/pkg/scheduler/util"
)
type filterOutSchedulablePodListProcessor struct{}
// NewFilterOutSchedulablePodListProcessor creates a PodListProcessor filtering out schedulable pods
func NewFilterOutSchedulablePodListProcessor() pods.PodListProcessor {
return &filterOutSchedulablePodListProcessor{}
}
// Process filters out pods which are schedulable from list of unschedulable pods.
func (filterOutSchedulablePodListProcessor) Process(
context *context.AutoscalingContext,
unschedulablePods []*apiv1.Pod, allScheduledPods []*apiv1.Pod,
allNodes []*apiv1.Node, readyNodes []*apiv1.Node) ([]*apiv1.Pod, []*apiv1.Pod, error) {
// We need to check whether pods marked as unschedulable are actually unschedulable.
// It's likely we added a new node and the scheduler just haven't managed to put the
// pod on in yet. In this situation we don't want to trigger another scale-up.
//
// It's also important to prevent uncontrollable cluster growth if CA's simulated
// scheduler differs in opinion with real scheduler. Example of such situation:
// - CA and Scheduler has slightly different configuration
// - Scheduler can't schedule a pod and marks it as unschedulable
// - CA added a node which should help the pod
// - Scheduler doesn't schedule the pod on the new node
// because according to it logic it doesn't fit there
// - CA see the pod is still unschedulable, so it adds another node to help it
//
// With the check enabled the last point won't happen because CA will ignore a pod
// which is supposed to schedule on an existing node.
// Some unschedulable pods can be waiting for lower priority pods preemption so they have nominated node to run.
// Such pods don't require scale up but should be considered during scale down.
unschedulablePods, unschedulableWaitingForLowerPriorityPreemption := filterOutExpendableAndSplit(unschedulablePods, context.ExpendablePodsPriorityCutoff)
klog.V(4).Infof("Filtering out schedulables")
filterOutSchedulableStart := time.Now()
var unschedulablePodsToHelp []*apiv1.Pod
if context.FilterOutSchedulablePodsUsesPacking {
unschedulablePodsToHelp = filterOutSchedulableByPacking(unschedulablePods, readyNodes, allScheduledPods,
unschedulableWaitingForLowerPriorityPreemption, context.PredicateChecker, context.ExpendablePodsPriorityCutoff)
} else {
unschedulablePodsToHelp = filterOutSchedulableSimple(unschedulablePods, readyNodes, allScheduledPods,
unschedulableWaitingForLowerPriorityPreemption, context.PredicateChecker, context.ExpendablePodsPriorityCutoff)
}
metrics.UpdateDurationFromStart(metrics.FilterOutSchedulable, filterOutSchedulableStart)
if len(unschedulablePodsToHelp) != len(unschedulablePods) {
klog.V(2).Info("Schedulable pods present")
context.ProcessorCallbacks.DisableScaleDownForLoop()
} else {
klog.V(4).Info("No schedulable pods")
}
return unschedulablePodsToHelp, allScheduledPods, nil
}
func (filterOutSchedulablePodListProcessor) CleanUp() {
}
// filterOutSchedulableByPacking checks whether pods from <unschedulableCandidates> marked as unschedulable
// can be scheduled on free capacity on existing nodes by trying to pack the pods. It tries to pack the higher priority
// pods first. It takes into account pods that are bound to node and will be scheduled after lower priority pod preemption.
func filterOutSchedulableByPacking(unschedulableCandidates []*apiv1.Pod, nodes []*apiv1.Node, allScheduled []*apiv1.Pod, podsWaitingForLowerPriorityPreemption []*apiv1.Pod,
predicateChecker *simulator.PredicateChecker, expendablePodsPriorityCutoff int) []*apiv1.Pod {
var unschedulablePods []*apiv1.Pod
nonExpendableScheduled := filterOutExpendablePods(allScheduled, expendablePodsPriorityCutoff)
nodeNameToNodeInfo := schedulerutil.CreateNodeNameToInfoMap(append(nonExpendableScheduled, podsWaitingForLowerPriorityPreemption...), nodes)
loggingQuota := glogx.PodsLoggingQuota()
sort.Slice(unschedulableCandidates, func(i, j int) bool {
return util.GetPodPriority(unschedulableCandidates[i]) > util.GetPodPriority(unschedulableCandidates[j])
})
for _, pod := range unschedulableCandidates {
nodeName, err := predicateChecker.FitsAny(pod, nodeNameToNodeInfo)
if err != nil {
unschedulablePods = append(unschedulablePods, pod)
} else {
glogx.V(4).UpTo(loggingQuota).Infof("Pod %s marked as unschedulable can be scheduled on %s. Ignoring in scale up.", pod.Name, nodeName)
nodeNameToNodeInfo[nodeName] = schedulerutil.NodeWithPod(nodeNameToNodeInfo[nodeName], pod)
}
}
glogx.V(4).Over(loggingQuota).Infof("%v other pods marked as unschedulable can be scheduled.", -loggingQuota.Left())
return unschedulablePods
}
// filterOutSchedulableSimple checks whether pods from <unschedulableCandidates> marked as unschedulable
// by Scheduler actually can't be scheduled on any node and filter out the ones that can.
// It takes into account pods that are bound to node and will be scheduled after lower priority pod preemption.
func filterOutSchedulableSimple(unschedulableCandidates []*apiv1.Pod, nodes []*apiv1.Node, allScheduled []*apiv1.Pod, podsWaitingForLowerPriorityPreemption []*apiv1.Pod,
predicateChecker *simulator.PredicateChecker, expendablePodsPriorityCutoff int) []*apiv1.Pod {
var unschedulablePods []*apiv1.Pod
nonExpendableScheduled := filterOutExpendablePods(allScheduled, expendablePodsPriorityCutoff)
nodeNameToNodeInfo := schedulerutil.CreateNodeNameToInfoMap(append(nonExpendableScheduled, podsWaitingForLowerPriorityPreemption...), nodes)
podSchedulable := make(podSchedulableMap)
loggingQuota := glogx.PodsLoggingQuota()
for _, pod := range unschedulableCandidates {
cachedError, found := podSchedulable.get(pod)
// Try to get result from cache.
if found {
if cachedError != nil {
unschedulablePods = append(unschedulablePods, pod)
} else {
glogx.V(4).UpTo(loggingQuota).Infof("Pod %s marked as unschedulable can be scheduled (based on simulation run for other pod owned by the same controller). Ignoring in scale up.", pod.Name)
}
continue
}
// Not found in cache, have to run the predicates.
nodeName, err := predicateChecker.FitsAny(pod, nodeNameToNodeInfo)
// err returned from FitsAny isn't a PredicateError.
// Hello, ugly hack. I wish you weren't here.
var predicateError *simulator.PredicateError
if err != nil {
predicateError = simulator.NewPredicateError("FitsAny", err, nil, nil)
unschedulablePods = append(unschedulablePods, pod)
} else {
glogx.V(4).UpTo(loggingQuota).Infof("Pod %s marked as unschedulable can be scheduled on %s. Ignoring in scale up.", pod.Name, nodeName)
}
podSchedulable.set(pod, predicateError)
}
glogx.V(4).Over(loggingQuota).Infof("%v other pods marked as unschedulable can be scheduled.", -loggingQuota.Left())
return unschedulablePods
}

View File

@ -0,0 +1,179 @@
/*
Copyright 2016 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package core
import (
"testing"
"time"
"k8s.io/autoscaler/cluster-autoscaler/simulator"
. "k8s.io/autoscaler/cluster-autoscaler/utils/test"
apiv1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/kubernetes/pkg/api/testapi"
"github.com/stretchr/testify/assert"
)
func TestFilterOutSchedulableByPacking(t *testing.T) {
rc1 := apiv1.ReplicationController{
ObjectMeta: metav1.ObjectMeta{
Name: "rc1",
Namespace: "default",
SelfLink: testapi.Default.SelfLink("replicationcontrollers", "rc"),
UID: "12345678-1234-1234-1234-123456789012",
},
}
rc2 := apiv1.ReplicationController{
ObjectMeta: metav1.ObjectMeta{
Name: "rc2",
Namespace: "default",
SelfLink: testapi.Default.SelfLink("replicationcontrollers", "rc"),
UID: "12345678-1234-1234-1234-12345678901a",
},
}
p1 := BuildTestPod("p1", 1500, 200000)
p2_1 := BuildTestPod("p2_2", 3000, 200000)
p2_1.OwnerReferences = GenerateOwnerReferences(rc1.Name, "ReplicationController", "extensions/v1beta1", rc1.UID)
p2_2 := BuildTestPod("p2_2", 3000, 200000)
p2_2.OwnerReferences = GenerateOwnerReferences(rc1.Name, "ReplicationController", "extensions/v1beta1", rc1.UID)
p3_1 := BuildTestPod("p3", 300, 200000)
p3_1.OwnerReferences = GenerateOwnerReferences(rc2.Name, "ReplicationController", "extensions/v1beta1", rc2.UID)
p3_2 := BuildTestPod("p3", 300, 200000)
p3_2.OwnerReferences = GenerateOwnerReferences(rc2.Name, "ReplicationController", "extensions/v1beta1", rc2.UID)
unschedulablePods := []*apiv1.Pod{p1, p2_1, p2_2, p3_1, p3_2}
scheduledPod1 := BuildTestPod("s1", 100, 200000)
scheduledPod2 := BuildTestPod("s2", 1500, 200000)
scheduledPod3 := BuildTestPod("s3", 4000, 200000)
var priority1 int32 = 1
scheduledPod3.Spec.Priority = &priority1
scheduledPod1.Spec.NodeName = "node1"
scheduledPod2.Spec.NodeName = "node1"
scheduledPod2.Spec.NodeName = "node1"
podWaitingForPreemption := BuildTestPod("w1", 1500, 200000)
var priority100 int32 = 100
podWaitingForPreemption.Spec.Priority = &priority100
podWaitingForPreemption.Status.NominatedNodeName = "node1"
p4 := BuildTestPod("p4", 1800, 200000)
p4.Spec.Priority = &priority100
node := BuildTestNode("node1", 2000, 2000000)
SetNodeReadyState(node, true, time.Time{})
predicateChecker := simulator.NewTestPredicateChecker()
res := filterOutSchedulableByPacking(unschedulablePods, []*apiv1.Node{node}, []*apiv1.Pod{scheduledPod1, scheduledPod3}, []*apiv1.Pod{}, predicateChecker, 10)
assert.Equal(t, 3, len(res))
assert.Equal(t, p2_1, res[0])
assert.Equal(t, p2_2, res[1])
assert.Equal(t, p3_2, res[2])
res2 := filterOutSchedulableByPacking(unschedulablePods, []*apiv1.Node{node}, []*apiv1.Pod{scheduledPod1, scheduledPod2, scheduledPod3}, []*apiv1.Pod{}, predicateChecker, 10)
assert.Equal(t, 4, len(res2))
assert.Equal(t, p1, res2[0])
assert.Equal(t, p2_1, res2[1])
assert.Equal(t, p2_2, res2[2])
assert.Equal(t, p3_2, res2[3])
res3 := filterOutSchedulableByPacking(unschedulablePods, []*apiv1.Node{node}, []*apiv1.Pod{scheduledPod1, scheduledPod3}, []*apiv1.Pod{podWaitingForPreemption}, predicateChecker, 10)
assert.Equal(t, 4, len(res3))
assert.Equal(t, p1, res3[0])
assert.Equal(t, p2_1, res3[1])
assert.Equal(t, p2_2, res3[2])
assert.Equal(t, p3_2, res3[3])
res4 := filterOutSchedulableByPacking(append(unschedulablePods, p4), []*apiv1.Node{node}, []*apiv1.Pod{scheduledPod1, scheduledPod3}, []*apiv1.Pod{}, predicateChecker, 10)
assert.Equal(t, 5, len(res4))
assert.Equal(t, p1, res4[0])
assert.Equal(t, p2_1, res4[1])
assert.Equal(t, p2_2, res4[2])
assert.Equal(t, p3_1, res4[3])
assert.Equal(t, p3_2, res4[4])
}
func TestFilterOutSchedulableSimple(t *testing.T) {
rc1 := apiv1.ReplicationController{
ObjectMeta: metav1.ObjectMeta{
Name: "rc1",
Namespace: "default",
SelfLink: testapi.Default.SelfLink("replicationcontrollers", "rc"),
UID: "12345678-1234-1234-1234-123456789012",
},
}
rc2 := apiv1.ReplicationController{
ObjectMeta: metav1.ObjectMeta{
Name: "rc2",
Namespace: "default",
SelfLink: testapi.Default.SelfLink("replicationcontrollers", "rc"),
UID: "12345678-1234-1234-1234-12345678901a",
},
}
p1 := BuildTestPod("p1", 1500, 200000)
p2_1 := BuildTestPod("p2_2", 3000, 200000)
p2_1.OwnerReferences = GenerateOwnerReferences(rc1.Name, "ReplicationController", "extensions/v1beta1", rc1.UID)
p2_2 := BuildTestPod("p2_2", 3000, 200000)
p2_2.OwnerReferences = GenerateOwnerReferences(rc1.Name, "ReplicationController", "extensions/v1beta1", rc1.UID)
p3_1 := BuildTestPod("p3", 100, 200000)
p3_1.OwnerReferences = GenerateOwnerReferences(rc2.Name, "ReplicationController", "extensions/v1beta1", rc2.UID)
p3_2 := BuildTestPod("p3", 100, 200000)
p3_2.OwnerReferences = GenerateOwnerReferences(rc2.Name, "ReplicationController", "extensions/v1beta1", rc2.UID)
unschedulablePods := []*apiv1.Pod{p1, p2_1, p2_2, p3_1, p3_2}
scheduledPod1 := BuildTestPod("s1", 100, 200000)
scheduledPod2 := BuildTestPod("s2", 1500, 200000)
scheduledPod3 := BuildTestPod("s3", 4000, 200000)
var priority1 int32 = 1
scheduledPod3.Spec.Priority = &priority1
scheduledPod1.Spec.NodeName = "node1"
scheduledPod2.Spec.NodeName = "node1"
scheduledPod2.Spec.NodeName = "node1"
podWaitingForPreemption := BuildTestPod("w1", 1500, 200000)
var priority100 int32 = 100
podWaitingForPreemption.Spec.Priority = &priority100
podWaitingForPreemption.Status.NominatedNodeName = "node1"
node := BuildTestNode("node1", 2000, 2000000)
SetNodeReadyState(node, true, time.Time{})
predicateChecker := simulator.NewTestPredicateChecker()
res := filterOutSchedulableSimple(unschedulablePods, []*apiv1.Node{node}, []*apiv1.Pod{scheduledPod1, scheduledPod3}, []*apiv1.Pod{}, predicateChecker, 10)
assert.Equal(t, 2, len(res))
assert.Equal(t, p2_1, res[0])
assert.Equal(t, p2_2, res[1])
res2 := filterOutSchedulableSimple(unschedulablePods, []*apiv1.Node{node}, []*apiv1.Pod{scheduledPod1, scheduledPod2, scheduledPod3}, []*apiv1.Pod{}, predicateChecker, 10)
assert.Equal(t, 3, len(res2))
assert.Equal(t, p1, res2[0])
assert.Equal(t, p2_1, res2[1])
assert.Equal(t, p2_2, res2[2])
res3 := filterOutSchedulableSimple(unschedulablePods, []*apiv1.Node{node}, []*apiv1.Pod{scheduledPod1, scheduledPod3}, []*apiv1.Pod{podWaitingForPreemption}, predicateChecker, 10)
assert.Equal(t, 3, len(res3))
assert.Equal(t, p1, res3[0])
assert.Equal(t, p2_1, res3[1])
assert.Equal(t, p2_2, res3[2])
}

View File

@ -28,7 +28,6 @@ import (
"k8s.io/autoscaler/cluster-autoscaler/clusterstate"
"k8s.io/autoscaler/cluster-autoscaler/config"
"k8s.io/autoscaler/cluster-autoscaler/estimator"
ca_processors "k8s.io/autoscaler/cluster-autoscaler/processors"
kube_util "k8s.io/autoscaler/cluster-autoscaler/utils/kubernetes"
. "k8s.io/autoscaler/cluster-autoscaler/utils/test"
"k8s.io/autoscaler/cluster-autoscaler/utils/units"
@ -423,7 +422,7 @@ func simpleScaleUpTest(t *testing.T, config *scaleTestConfig) {
extraPods[i] = pod
}
processors := ca_processors.TestProcessors()
processors := NewTestProcessors()
scaleUpStatus, err := ScaleUp(&context, processors, clusterState, extraPods, nodes, []*appsv1.DaemonSet{}, nodeInfos)
processors.ScaleUpStatusProcessor.Process(&context, scaleUpStatus)
@ -511,7 +510,7 @@ func TestScaleUpNodeComingNoScale(t *testing.T) {
p3 := BuildTestPod("p-new", 550, 0)
processors := ca_processors.TestProcessors()
processors := NewTestProcessors()
scaleUpStatus, err := ScaleUp(&context, processors, clusterState, []*apiv1.Pod{p3}, nodes, []*appsv1.DaemonSet{}, nodeInfos)
assert.NoError(t, err)
@ -560,7 +559,7 @@ func TestScaleUpNodeComingHasScale(t *testing.T) {
p3 := BuildTestPod("p-new", 550, 0)
p4 := BuildTestPod("p-new", 550, 0)
processors := ca_processors.TestProcessors()
processors := NewTestProcessors()
scaleUpStatus, err := ScaleUp(&context, processors, clusterState, []*apiv1.Pod{p3, p4}, nodes, []*appsv1.DaemonSet{}, nodeInfos)
assert.NoError(t, err)
@ -605,7 +604,7 @@ func TestScaleUpUnhealthy(t *testing.T) {
clusterState.UpdateNodes(nodes, nodeInfos, time.Now())
p3 := BuildTestPod("p-new", 550, 0)
processors := ca_processors.TestProcessors()
processors := NewTestProcessors()
scaleUpStatus, err := ScaleUp(&context, processors, clusterState, []*apiv1.Pod{p3}, nodes, []*appsv1.DaemonSet{}, nodeInfos)
assert.NoError(t, err)
@ -644,7 +643,7 @@ func TestScaleUpNoHelp(t *testing.T) {
clusterState.UpdateNodes(nodes, nodeInfos, time.Now())
p3 := BuildTestPod("p-new", 500, 0)
processors := ca_processors.TestProcessors()
processors := NewTestProcessors()
scaleUpStatus, err := ScaleUp(&context, processors, clusterState, []*apiv1.Pod{p3}, nodes, []*appsv1.DaemonSet{}, nodeInfos)
processors.ScaleUpStatusProcessor.Process(&context, scaleUpStatus)
@ -712,7 +711,7 @@ func TestScaleUpBalanceGroups(t *testing.T) {
pods = append(pods, BuildTestPod(fmt.Sprintf("test-pod-%v", i), 80, 0))
}
processors := ca_processors.TestProcessors()
processors := NewTestProcessors()
scaleUpStatus, typedErr := ScaleUp(&context, processors, clusterState, pods, nodes, []*appsv1.DaemonSet{}, nodeInfos)
assert.NoError(t, typedErr)
@ -763,7 +762,7 @@ func TestScaleUpAutoprovisionedNodeGroup(t *testing.T) {
clusterState := clusterstate.NewClusterStateRegistry(provider, clusterstate.ClusterStateRegistryConfig{}, context.LogRecorder, newBackoff())
processors := ca_processors.TestProcessors()
processors := NewTestProcessors()
processors.NodeGroupListProcessor = &mockAutoprovisioningNodeGroupListProcessor{t}
processors.NodeGroupManager = &mockAutoprovisioningNodeGroupManager{t}

View File

@ -282,56 +282,13 @@ func (a *StaticAutoscaler) RunOnce(currentTime time.Time) errors.AutoscalerError
return errors.ToAutoscalerError(errors.ApiCallError, err)
}
allUnschedulablePods, allScheduledPods, err = a.processors.PodListProcessor.Process(a.AutoscalingContext, allUnschedulablePods, allScheduledPods, allNodes, readyNodes)
if err != nil {
klog.Errorf("Failed to process pod list: %v", err)
return errors.ToAutoscalerError(errors.InternalError, err)
}
ConfigurePredicateCheckerForLoop(allUnschedulablePods, allScheduledPods, a.PredicateChecker)
// We need to check whether pods marked as unschedulable are actually unschedulable.
// It's likely we added a new node and the scheduler just haven't managed to put the
// pod on in yet. In this situation we don't want to trigger another scale-up.
//
// It's also important to prevent uncontrollable cluster growth if CA's simulated
// scheduler differs in opinion with real scheduler. Example of such situation:
// - CA and Scheduler has slightly different configuration
// - Scheduler can't schedule a pod and marks it as unschedulable
// - CA added a node which should help the pod
// - Scheduler doesn't schedule the pod on the new node
// because according to it logic it doesn't fit there
// - CA see the pod is still unschedulable, so it adds another node to help it
//
// With the check enabled the last point won't happen because CA will ignore a pod
// which is supposed to schedule on an existing node.
scaleDownForbidden := false
unschedulablePodsWithoutTPUs := tpu.ClearTPURequests(allUnschedulablePods)
// Some unschedulable pods can be waiting for lower priority pods preemption so they have nominated node to run.
// Such pods don't require scale up but should be considered during scale down.
unschedulablePods, unschedulableWaitingForLowerPriorityPreemption := filterOutExpendableAndSplit(unschedulablePodsWithoutTPUs, a.ExpendablePodsPriorityCutoff)
klog.V(4).Infof("Filtering out schedulables")
filterOutSchedulableStart := time.Now()
var unschedulablePodsToHelp []*apiv1.Pod
if a.FilterOutSchedulablePodsUsesPacking {
unschedulablePodsToHelp = filterOutSchedulableByPacking(unschedulablePods, readyNodes, allScheduledPods,
unschedulableWaitingForLowerPriorityPreemption, a.PredicateChecker, a.ExpendablePodsPriorityCutoff)
} else {
unschedulablePodsToHelp = filterOutSchedulableSimple(unschedulablePods, readyNodes, allScheduledPods,
unschedulableWaitingForLowerPriorityPreemption, a.PredicateChecker, a.ExpendablePodsPriorityCutoff)
}
metrics.UpdateDurationFromStart(metrics.FilterOutSchedulable, filterOutSchedulableStart)
if len(unschedulablePodsToHelp) != len(unschedulablePods) {
klog.V(2).Info("Schedulable pods present")
scaleDownForbidden = true
} else {
klog.V(4).Info("No schedulable pods")
}
// todo: this is also computed in filterOutSchedulablePodListProcessor; avoid that.
_, unschedulableWaitingForLowerPriorityPreemption := filterOutExpendableAndSplit(unschedulablePodsWithoutTPUs, a.ExpendablePodsPriorityCutoff)
unschedulablePodsToHelp, allScheduledPods, err := a.processors.PodListProcessor.Process(a.AutoscalingContext, unschedulablePodsWithoutTPUs, allScheduledPods, allNodes, readyNodes)
// finally, filter out pods that are too "young" to safely be considered for a scale-up (delay is configurable)
unschedulablePodsToHelp = a.filterOutYoungPods(unschedulablePodsToHelp, currentTime)
@ -347,7 +304,7 @@ func (a *StaticAutoscaler) RunOnce(currentTime time.Time) errors.AutoscalerError
// is more pods to come. In theory we could check the newest pod time but then if pod were created
// slowly but at the pace of 1 every 2 seconds then no scale up would be triggered for long time.
// We also want to skip a real scale down (just like if the pods were handled).
scaleDownForbidden = true
a.processorCallbacks.DisableScaleDownForLoop()
scaleUpStatus.Result = status.ScaleUpInCooldown
klog.V(1).Info("Unschedulable pods are very new, waiting one iteration for more")
} else {
@ -405,8 +362,7 @@ func (a *StaticAutoscaler) RunOnce(currentTime time.Time) errors.AutoscalerError
}
}
scaleDownForbidden = scaleDownForbidden || a.processorCallbacks.disableScaleDownForLoop
scaleDownInCooldown := scaleDownForbidden ||
scaleDownInCooldown := a.processorCallbacks.disableScaleDownForLoop ||
a.lastScaleUpTime.Add(a.ScaleDownDelayAfterAdd).After(currentTime) ||
a.lastScaleDownFailTime.Add(a.ScaleDownDelayAfterFailure).After(currentTime) ||
a.lastScaleDownDeleteTime.Add(a.ScaleDownDelayAfterDelete).After(currentTime)
@ -416,7 +372,7 @@ func (a *StaticAutoscaler) RunOnce(currentTime time.Time) errors.AutoscalerError
klog.V(4).Infof("Scale down status: unneededOnly=%v lastScaleUpTime=%s "+
"lastScaleDownDeleteTime=%v lastScaleDownFailTime=%s scaleDownForbidden=%v isDeleteInProgress=%v",
calculateUnneededOnly, a.lastScaleUpTime, a.lastScaleDownDeleteTime, a.lastScaleDownFailTime,
scaleDownForbidden, scaleDown.nodeDeleteStatus.IsDeleteInProgress())
a.processorCallbacks.disableScaleDownForLoop, scaleDown.nodeDeleteStatus.IsDeleteInProgress())
if scaleDownInCooldown {
scaleDownStatus.Result = status.ScaleDownInCooldown

View File

@ -29,7 +29,6 @@ import (
"k8s.io/autoscaler/cluster-autoscaler/clusterstate"
"k8s.io/autoscaler/cluster-autoscaler/config"
"k8s.io/autoscaler/cluster-autoscaler/estimator"
ca_processors "k8s.io/autoscaler/cluster-autoscaler/processors"
kube_util "k8s.io/autoscaler/cluster-autoscaler/utils/kubernetes"
. "k8s.io/autoscaler/cluster-autoscaler/utils/test"
@ -208,7 +207,7 @@ func TestStaticAutoscalerRunOnce(t *testing.T) {
lastScaleUpTime: time.Now(),
lastScaleDownFailTime: time.Now(),
scaleDown: sd,
processors: ca_processors.TestProcessors(),
processors: NewTestProcessors(),
processorCallbacks: processorCallbacks,
initialized: true,
}
@ -355,7 +354,7 @@ func TestStaticAutoscalerRunOnceWithAutoprovisionedEnabled(t *testing.T) {
provider.AddNode("ng1,", n1)
assert.NotNil(t, provider)
processors := ca_processors.TestProcessors()
processors := NewTestProcessors()
processors.NodeGroupManager = nodeGroupManager
processors.NodeGroupListProcessor = nodeGroupListProcessor
@ -533,7 +532,7 @@ func TestStaticAutoscalerRunOnceWithALongUnregisteredNode(t *testing.T) {
lastScaleUpTime: time.Now(),
lastScaleDownFailTime: time.Now(),
scaleDown: sd,
processors: ca_processors.TestProcessors(),
processors: NewTestProcessors(),
processorCallbacks: processorCallbacks,
}
@ -659,7 +658,7 @@ func TestStaticAutoscalerRunOncePodsWithFilterOutSchedulablePodsUsesPackingFalse
lastScaleUpTime: time.Now(),
lastScaleDownFailTime: time.Now(),
scaleDown: sd,
processors: ca_processors.TestProcessors(),
processors: NewTestProcessors(),
processorCallbacks: processorCallbacks,
}
@ -776,7 +775,7 @@ func TestStaticAutoscalerRunOncePodsWithPriorities(t *testing.T) {
lastScaleUpTime: time.Now(),
lastScaleDownFailTime: time.Now(),
scaleDown: sd,
processors: ca_processors.TestProcessors(),
processors: NewTestProcessors(),
processorCallbacks: processorCallbacks,
}

View File

@ -0,0 +1,38 @@
/*
Copyright 2019 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package core
import (
"k8s.io/autoscaler/cluster-autoscaler/processors"
"k8s.io/autoscaler/cluster-autoscaler/processors/nodegroups"
"k8s.io/autoscaler/cluster-autoscaler/processors/nodegroupset"
"k8s.io/autoscaler/cluster-autoscaler/processors/status"
)
// NewTestProcessors returns a set of simple processors for use in tests.
func NewTestProcessors() *processors.AutoscalingProcessors {
return &processors.AutoscalingProcessors{
PodListProcessor: NewFilterOutSchedulablePodListProcessor(),
NodeGroupListProcessor: &nodegroups.NoOpNodeGroupListProcessor{},
NodeGroupSetProcessor: &nodegroupset.BalancingNodeGroupSetProcessor{},
// TODO(bskiba): change scale up test so that this can be a NoOpProcessor
ScaleUpStatusProcessor: &status.EventingScaleUpStatusProcessor{},
ScaleDownStatusProcessor: &status.NoOpScaleDownStatusProcessor{},
AutoscalingStatusProcessor: &status.NoOpAutoscalingStatusProcessor{},
NodeGroupManager: nodegroups.NewDefaultNodeGroupManager(),
}
}

View File

@ -20,11 +20,11 @@ import (
"fmt"
"math/rand"
"reflect"
"sort"
"time"
"k8s.io/kubernetes/pkg/scheduler/util"
appsv1 "k8s.io/api/apps/v1"
apiv1 "k8s.io/api/core/v1"
apiequality "k8s.io/apimachinery/pkg/api/equality"
"k8s.io/autoscaler/cluster-autoscaler/cloudprovider"
"k8s.io/autoscaler/cluster-autoscaler/clusterstate"
"k8s.io/autoscaler/cluster-autoscaler/clusterstate/utils"
@ -38,11 +38,6 @@ import (
"k8s.io/autoscaler/cluster-autoscaler/utils/glogx"
"k8s.io/autoscaler/cluster-autoscaler/utils/gpu"
kube_util "k8s.io/autoscaler/cluster-autoscaler/utils/kubernetes"
scheduler_util "k8s.io/autoscaler/cluster-autoscaler/utils/scheduler"
appsv1 "k8s.io/api/apps/v1"
apiv1 "k8s.io/api/core/v1"
apiequality "k8s.io/apimachinery/pkg/api/equality"
schedulernodeinfo "k8s.io/kubernetes/pkg/scheduler/nodeinfo"
"k8s.io/klog"
@ -108,75 +103,6 @@ func (podMap podSchedulableMap) set(pod *apiv1.Pod, err *simulator.PredicateErro
})
}
// filterOutSchedulableByPacking checks whether pods from <unschedulableCandidates> marked as unschedulable
// can be scheduled on free capacity on existing nodes by trying to pack the pods. It tries to pack the higher priority
// pods first. It takes into account pods that are bound to node and will be scheduled after lower priority pod preemption.
func filterOutSchedulableByPacking(unschedulableCandidates []*apiv1.Pod, nodes []*apiv1.Node, allScheduled []*apiv1.Pod, podsWaitingForLowerPriorityPreemption []*apiv1.Pod,
predicateChecker *simulator.PredicateChecker, expendablePodsPriorityCutoff int) []*apiv1.Pod {
var unschedulablePods []*apiv1.Pod
nonExpendableScheduled := filterOutExpendablePods(allScheduled, expendablePodsPriorityCutoff)
nodeNameToNodeInfo := scheduler_util.CreateNodeNameToInfoMap(append(nonExpendableScheduled, podsWaitingForLowerPriorityPreemption...), nodes)
loggingQuota := glogx.PodsLoggingQuota()
sort.Slice(unschedulableCandidates, func(i, j int) bool {
return util.GetPodPriority(unschedulableCandidates[i]) > util.GetPodPriority(unschedulableCandidates[j])
})
for _, pod := range unschedulableCandidates {
nodeName, err := predicateChecker.FitsAny(pod, nodeNameToNodeInfo)
if err != nil {
unschedulablePods = append(unschedulablePods, pod)
} else {
glogx.V(4).UpTo(loggingQuota).Infof("Pod %s marked as unschedulable can be scheduled on %s. Ignoring in scale up.", pod.Name, nodeName)
nodeNameToNodeInfo[nodeName] = scheduler_util.NodeWithPod(nodeNameToNodeInfo[nodeName], pod)
}
}
glogx.V(4).Over(loggingQuota).Infof("%v other pods marked as unschedulable can be scheduled.", -loggingQuota.Left())
return unschedulablePods
}
// filterOutSchedulableSimple checks whether pods from <unschedulableCandidates> marked as unschedulable
// by Scheduler actually can't be scheduled on any node and filter out the ones that can.
// It takes into account pods that are bound to node and will be scheduled after lower priority pod preemption.
func filterOutSchedulableSimple(unschedulableCandidates []*apiv1.Pod, nodes []*apiv1.Node, allScheduled []*apiv1.Pod, podsWaitingForLowerPriorityPreemption []*apiv1.Pod,
predicateChecker *simulator.PredicateChecker, expendablePodsPriorityCutoff int) []*apiv1.Pod {
var unschedulablePods []*apiv1.Pod
nonExpendableScheduled := filterOutExpendablePods(allScheduled, expendablePodsPriorityCutoff)
nodeNameToNodeInfo := scheduler_util.CreateNodeNameToInfoMap(append(nonExpendableScheduled, podsWaitingForLowerPriorityPreemption...), nodes)
podSchedulable := make(podSchedulableMap)
loggingQuota := glogx.PodsLoggingQuota()
for _, pod := range unschedulableCandidates {
cachedError, found := podSchedulable.get(pod)
// Try to get result from cache.
if found {
if cachedError != nil {
unschedulablePods = append(unschedulablePods, pod)
} else {
glogx.V(4).UpTo(loggingQuota).Infof("Pod %s marked as unschedulable can be scheduled (based on simulation run for other pod owned by the same controller). Ignoring in scale up.", pod.Name)
}
continue
}
// Not found in cache, have to run the predicates.
nodeName, err := predicateChecker.FitsAny(pod, nodeNameToNodeInfo)
// err returned from FitsAny isn't a PredicateError.
// Hello, ugly hack. I wish you weren't here.
var predicateError *simulator.PredicateError
if err != nil {
predicateError = simulator.NewPredicateError("FitsAny", err, nil, nil)
unschedulablePods = append(unschedulablePods, pod)
} else {
glogx.V(4).UpTo(loggingQuota).Infof("Pod %s marked as unschedulable can be scheduled on %s. Ignoring in scale up.", pod.Name, nodeName)
}
podSchedulable.set(pod, predicateError)
}
glogx.V(4).Over(loggingQuota).Infof("%v other pods marked as unschedulable can be scheduled.", -loggingQuota.Left())
return unschedulablePods
}
// filterOutExpendableAndSplit filters out expendable pods and splits into:
// - waiting for lower priority pods preemption
// - other pods.

View File

@ -121,154 +121,6 @@ func TestPodSchedulableMap(t *testing.T) {
assert.Nil(t, err)
}
func TestFilterOutSchedulableByPacking(t *testing.T) {
rc1 := apiv1.ReplicationController{
ObjectMeta: metav1.ObjectMeta{
Name: "rc1",
Namespace: "default",
SelfLink: testapi.Default.SelfLink("replicationcontrollers", "rc"),
UID: "12345678-1234-1234-1234-123456789012",
},
}
rc2 := apiv1.ReplicationController{
ObjectMeta: metav1.ObjectMeta{
Name: "rc2",
Namespace: "default",
SelfLink: testapi.Default.SelfLink("replicationcontrollers", "rc"),
UID: "12345678-1234-1234-1234-12345678901a",
},
}
p1 := BuildTestPod("p1", 1500, 200000)
p2_1 := BuildTestPod("p2_2", 3000, 200000)
p2_1.OwnerReferences = GenerateOwnerReferences(rc1.Name, "ReplicationController", "extensions/v1beta1", rc1.UID)
p2_2 := BuildTestPod("p2_2", 3000, 200000)
p2_2.OwnerReferences = GenerateOwnerReferences(rc1.Name, "ReplicationController", "extensions/v1beta1", rc1.UID)
p3_1 := BuildTestPod("p3", 300, 200000)
p3_1.OwnerReferences = GenerateOwnerReferences(rc2.Name, "ReplicationController", "extensions/v1beta1", rc2.UID)
p3_2 := BuildTestPod("p3", 300, 200000)
p3_2.OwnerReferences = GenerateOwnerReferences(rc2.Name, "ReplicationController", "extensions/v1beta1", rc2.UID)
unschedulablePods := []*apiv1.Pod{p1, p2_1, p2_2, p3_1, p3_2}
scheduledPod1 := BuildTestPod("s1", 100, 200000)
scheduledPod2 := BuildTestPod("s2", 1500, 200000)
scheduledPod3 := BuildTestPod("s3", 4000, 200000)
var priority1 int32 = 1
scheduledPod3.Spec.Priority = &priority1
scheduledPod1.Spec.NodeName = "node1"
scheduledPod2.Spec.NodeName = "node1"
scheduledPod2.Spec.NodeName = "node1"
podWaitingForPreemption := BuildTestPod("w1", 1500, 200000)
var priority100 int32 = 100
podWaitingForPreemption.Spec.Priority = &priority100
podWaitingForPreemption.Status.NominatedNodeName = "node1"
p4 := BuildTestPod("p4", 1800, 200000)
p4.Spec.Priority = &priority100
node := BuildTestNode("node1", 2000, 2000000)
SetNodeReadyState(node, true, time.Time{})
predicateChecker := simulator.NewTestPredicateChecker()
res := filterOutSchedulableByPacking(unschedulablePods, []*apiv1.Node{node}, []*apiv1.Pod{scheduledPod1, scheduledPod3}, []*apiv1.Pod{}, predicateChecker, 10)
assert.Equal(t, 3, len(res))
assert.Equal(t, p2_1, res[0])
assert.Equal(t, p2_2, res[1])
assert.Equal(t, p3_2, res[2])
res2 := filterOutSchedulableByPacking(unschedulablePods, []*apiv1.Node{node}, []*apiv1.Pod{scheduledPod1, scheduledPod2, scheduledPod3}, []*apiv1.Pod{}, predicateChecker, 10)
assert.Equal(t, 4, len(res2))
assert.Equal(t, p1, res2[0])
assert.Equal(t, p2_1, res2[1])
assert.Equal(t, p2_2, res2[2])
assert.Equal(t, p3_2, res2[3])
res3 := filterOutSchedulableByPacking(unschedulablePods, []*apiv1.Node{node}, []*apiv1.Pod{scheduledPod1, scheduledPod3}, []*apiv1.Pod{podWaitingForPreemption}, predicateChecker, 10)
assert.Equal(t, 4, len(res3))
assert.Equal(t, p1, res3[0])
assert.Equal(t, p2_1, res3[1])
assert.Equal(t, p2_2, res3[2])
assert.Equal(t, p3_2, res3[3])
res4 := filterOutSchedulableByPacking(append(unschedulablePods, p4), []*apiv1.Node{node}, []*apiv1.Pod{scheduledPod1, scheduledPod3}, []*apiv1.Pod{}, predicateChecker, 10)
assert.Equal(t, 5, len(res4))
assert.Equal(t, p1, res4[0])
assert.Equal(t, p2_1, res4[1])
assert.Equal(t, p2_2, res4[2])
assert.Equal(t, p3_1, res4[3])
assert.Equal(t, p3_2, res4[4])
}
func TestFilterOutSchedulableSimple(t *testing.T) {
rc1 := apiv1.ReplicationController{
ObjectMeta: metav1.ObjectMeta{
Name: "rc1",
Namespace: "default",
SelfLink: testapi.Default.SelfLink("replicationcontrollers", "rc"),
UID: "12345678-1234-1234-1234-123456789012",
},
}
rc2 := apiv1.ReplicationController{
ObjectMeta: metav1.ObjectMeta{
Name: "rc2",
Namespace: "default",
SelfLink: testapi.Default.SelfLink("replicationcontrollers", "rc"),
UID: "12345678-1234-1234-1234-12345678901a",
},
}
p1 := BuildTestPod("p1", 1500, 200000)
p2_1 := BuildTestPod("p2_2", 3000, 200000)
p2_1.OwnerReferences = GenerateOwnerReferences(rc1.Name, "ReplicationController", "extensions/v1beta1", rc1.UID)
p2_2 := BuildTestPod("p2_2", 3000, 200000)
p2_2.OwnerReferences = GenerateOwnerReferences(rc1.Name, "ReplicationController", "extensions/v1beta1", rc1.UID)
p3_1 := BuildTestPod("p3", 100, 200000)
p3_1.OwnerReferences = GenerateOwnerReferences(rc2.Name, "ReplicationController", "extensions/v1beta1", rc2.UID)
p3_2 := BuildTestPod("p3", 100, 200000)
p3_2.OwnerReferences = GenerateOwnerReferences(rc2.Name, "ReplicationController", "extensions/v1beta1", rc2.UID)
unschedulablePods := []*apiv1.Pod{p1, p2_1, p2_2, p3_1, p3_2}
scheduledPod1 := BuildTestPod("s1", 100, 200000)
scheduledPod2 := BuildTestPod("s2", 1500, 200000)
scheduledPod3 := BuildTestPod("s3", 4000, 200000)
var priority1 int32 = 1
scheduledPod3.Spec.Priority = &priority1
scheduledPod1.Spec.NodeName = "node1"
scheduledPod2.Spec.NodeName = "node1"
scheduledPod2.Spec.NodeName = "node1"
podWaitingForPreemption := BuildTestPod("w1", 1500, 200000)
var priority100 int32 = 100
podWaitingForPreemption.Spec.Priority = &priority100
podWaitingForPreemption.Status.NominatedNodeName = "node1"
node := BuildTestNode("node1", 2000, 2000000)
SetNodeReadyState(node, true, time.Time{})
predicateChecker := simulator.NewTestPredicateChecker()
res := filterOutSchedulableSimple(unschedulablePods, []*apiv1.Node{node}, []*apiv1.Pod{scheduledPod1, scheduledPod3}, []*apiv1.Pod{}, predicateChecker, 10)
assert.Equal(t, 2, len(res))
assert.Equal(t, p2_1, res[0])
assert.Equal(t, p2_2, res[1])
res2 := filterOutSchedulableSimple(unschedulablePods, []*apiv1.Node{node}, []*apiv1.Pod{scheduledPod1, scheduledPod2, scheduledPod3}, []*apiv1.Pod{}, predicateChecker, 10)
assert.Equal(t, 3, len(res2))
assert.Equal(t, p1, res2[0])
assert.Equal(t, p2_1, res2[1])
assert.Equal(t, p2_2, res2[2])
res3 := filterOutSchedulableSimple(unschedulablePods, []*apiv1.Node{node}, []*apiv1.Pod{scheduledPod1, scheduledPod3}, []*apiv1.Pod{podWaitingForPreemption}, predicateChecker, 10)
assert.Equal(t, 3, len(res3))
assert.Equal(t, p1, res3[0])
assert.Equal(t, p2_1, res3[1])
assert.Equal(t, p2_2, res3[2])
}
func TestFilterOutExpendableAndSplit(t *testing.T) {
var priority1 int32 = 1
var priority100 int32 = 100

View File

@ -278,7 +278,10 @@ func buildAutoscaler() (core.Autoscaler, error) {
autoscalingOptions := createAutoscalingOptions()
kubeClient := createKubeClient(getKubeConfig())
eventsKubeClient := createKubeClient(getKubeConfig())
processors := ca_processors.DefaultProcessors()
processors.PodListProcessor = core.NewFilterOutSchedulablePodListProcessor()
opts := core.AutoscalerOptions{
AutoscalingOptions: autoscalingOptions,
KubeClient: kubeClient,

View File

@ -55,20 +55,6 @@ func DefaultProcessors() *AutoscalingProcessors {
}
}
// TestProcessors returns a set of simple processors for use in tests.
func TestProcessors() *AutoscalingProcessors {
return &AutoscalingProcessors{
PodListProcessor: &pods.NoOpPodListProcessor{},
NodeGroupListProcessor: &nodegroups.NoOpNodeGroupListProcessor{},
NodeGroupSetProcessor: &nodegroupset.BalancingNodeGroupSetProcessor{},
// TODO(bskiba): change scale up test so that this can be a NoOpProcessor
ScaleUpStatusProcessor: &status.EventingScaleUpStatusProcessor{},
ScaleDownStatusProcessor: &status.NoOpScaleDownStatusProcessor{},
AutoscalingStatusProcessor: &status.NoOpAutoscalingStatusProcessor{},
NodeGroupManager: nodegroups.NewDefaultNodeGroupManager(),
}
}
// CleanUp cleans up the processors' internal structures.
func (ap *AutoscalingProcessors) CleanUp() {
ap.PodListProcessor.CleanUp()