Add new pod list processors for clearing TPU requests & filtering out

expendable pods

Treat non-processed pods yet as unschedulable
This commit is contained in:
Mahmoud Atwa 2023-10-18 09:19:24 +00:00
parent 4d8e654503
commit a1ab7b9e20
9 changed files with 316 additions and 54 deletions

View File

@ -277,4 +277,12 @@ type AutoscalingOptions struct {
// dynamicNodeDeleteDelayAfterTaintEnabled is used to enable/disable dynamic adjustment of NodeDeleteDelayAfterTaint
// based on the latency between the CA and the api-server
DynamicNodeDeleteDelayAfterTaintEnabled bool
// UnschedulablePodTimeBuffer controls when scale-ups happen so that
// the oldest unschedulable pod is older than UnschedulablePodTimeBuffer
UnschedulablePodTimeBuffer time.Duration
// UnschedulablePodWithGpuTimeBuffer specifies how old should the oldest unschedulable pod with GPU be before starting scale up.
// The idea is that nodes with GPU are very expensive and we're ready to sacrifice
// a bit more latency to wait for more pods and make a more informed scale-up decision.
UnschedulablePodWithGpuTimeBuffer time.Duration
// unschedulablePodWithGpuTimeBuffer = 30 * time.Second
}

View File

@ -0,0 +1,66 @@
/*
Copyright 2023 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package podlistprocessor
import (
apiv1 "k8s.io/api/core/v1"
"k8s.io/autoscaler/cluster-autoscaler/context"
core_utils "k8s.io/autoscaler/cluster-autoscaler/core/utils"
caerrors "k8s.io/autoscaler/cluster-autoscaler/utils/errors"
klog "k8s.io/klog/v2"
)
type filterOutExpandable struct {
}
// NewFilterOutExpandablePodListProcessor creates a PodListProcessor filtering out expendable pods
func NewFilterOutExpandablePodListProcessor() *filterOutExpandable {
return &filterOutExpandable{}
}
// Process filters out pods which are expendable and adds pods which is waiting for lower priority pods preemption to the cluster snapshot
func (p *filterOutExpandable) Process(context *context.AutoscalingContext, pods []*apiv1.Pod) ([]*apiv1.Pod, error) {
klog.V(4).Infof("Filtering out expandable pods")
nodes, err := context.AllNodeLister().List()
if err != nil {
return nil, err
}
expendablePodsPriorityCutoff := context.AutoscalingOptions.ExpendablePodsPriorityCutoff
unschedulablePods, waitingForLowerPriorityPreemption := core_utils.FilterOutExpendableAndSplit(pods, nodes, expendablePodsPriorityCutoff)
if err = p.addPreemptiblePodsToSnapshot(waitingForLowerPriorityPreemption, context); err != nil {
return nil, err
}
return unschedulablePods, nil
}
// addPreemptiblePodsToSnapshot modifies the snapshot simulating scheduling of pods waiting for preemption.
// this is not strictly correct as we are not simulating preemption itself but it matches
// CA logic from before migration to scheduler framework. So let's keep it for now
func (p *filterOutExpandable) addPreemptiblePodsToSnapshot(pods []*apiv1.Pod, ctx *context.AutoscalingContext) error {
for _, p := range pods {
if err := ctx.ClusterSnapshot.AddPod(p, p.Status.NominatedNodeName); err != nil {
klog.Errorf("Failed to update snapshot with pod %s waiting for preemption", err)
return caerrors.ToAutoscalerError(caerrors.InternalError, err)
}
}
return nil
}
func (p *filterOutExpandable) CleanUp() {
}

View File

@ -0,0 +1,179 @@
/*
Copyright 2023 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package podlistprocessor
import (
"fmt"
"testing"
"github.com/stretchr/testify/assert"
apiv1 "k8s.io/api/core/v1"
"k8s.io/autoscaler/cluster-autoscaler/config"
"k8s.io/autoscaler/cluster-autoscaler/context"
"k8s.io/autoscaler/cluster-autoscaler/simulator/clustersnapshot"
kube_util "k8s.io/autoscaler/cluster-autoscaler/utils/kubernetes"
"k8s.io/autoscaler/cluster-autoscaler/utils/test"
)
func TestFilterOutExpendable(t *testing.T) {
testCases := []struct {
name string
pods []*apiv1.Pod
wantPods []*apiv1.Pod
wantPodsInSnapshot []*apiv1.Pod
priorityCutoff int
nodes []*apiv1.Node
}{
{
name: "no pods",
},
{
name: "single non-expendable pod",
pods: []*apiv1.Pod{
test.BuildTestPod("p", 1000, 1),
},
wantPods: []*apiv1.Pod{
test.BuildTestPod("p", 1000, 1),
},
},
{
name: "non-expendable pods with priority >= to cutoff priority",
pods: []*apiv1.Pod{
test.BuildTestPod("p1", 1000, 1, getPrioritySetter(2)),
test.BuildTestPod("p2", 1000, 1, getPrioritySetter(3)),
},
wantPods: []*apiv1.Pod{
test.BuildTestPod("p1", 1000, 1, getPrioritySetter(2)),
test.BuildTestPod("p2", 1000, 1, getPrioritySetter(3)),
},
priorityCutoff: 2,
},
{
name: "single expednable pod",
pods: []*apiv1.Pod{
test.BuildTestPod("p", 1000, 1, getPrioritySetter(2)),
},
priorityCutoff: 3,
},
{
name: "single waiting-for-low-priority-preemption pod",
pods: []*apiv1.Pod{
test.BuildTestPod("p", 1000, 1, getNominatedNodeNameSetter("node-1")),
},
nodes: []*apiv1.Node{
test.BuildTestNode("node-1", 2400, 2400),
},
wantPodsInSnapshot: []*apiv1.Pod{
test.BuildTestPod("p", 1000, 1, getNominatedNodeNameSetter("node-1")),
},
},
{
name: "mixed expendable, non-expendable & waiting-for-low-priority-preemption pods",
pods: []*apiv1.Pod{
test.BuildTestPod("p1", 1000, 1, getPrioritySetter(3)),
test.BuildTestPod("p2", 1000, 1, getPrioritySetter(4)),
test.BuildTestPod("p3", 1000, 1, getPrioritySetter(1)),
test.BuildTestPod("p4", 1000, 1),
test.BuildTestPod("p5", 1000, 1, getNominatedNodeNameSetter("node-1")),
},
priorityCutoff: 2,
wantPods: []*apiv1.Pod{
test.BuildTestPod("p1", 1000, 1, getPrioritySetter(3)),
test.BuildTestPod("p2", 1000, 1, getPrioritySetter(4)),
test.BuildTestPod("p4", 1000, 1),
},
wantPodsInSnapshot: []*apiv1.Pod{
test.BuildTestPod("p5", 1000, 1, getNominatedNodeNameSetter("node-1")),
},
nodes: []*apiv1.Node{
test.BuildTestNode("node-1", 2400, 2400),
},
},
}
for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
processor := NewFilterOutExpandablePodListProcessor()
snapshot := clustersnapshot.NewBasicClusterSnapshot()
snapshot.AddNodes(tc.nodes)
pods, err := processor.Process(&context.AutoscalingContext{
ClusterSnapshot: snapshot,
AutoscalingOptions: config.AutoscalingOptions{
ExpendablePodsPriorityCutoff: tc.priorityCutoff,
},
AutoscalingKubeClients: context.AutoscalingKubeClients{
ListerRegistry: newMockListerRegistry(tc.nodes),
},
}, tc.pods)
assert.NoError(t, err)
assert.ElementsMatch(t, tc.wantPods, pods)
var podsInSnapshot []*apiv1.Pod
nodeInfoLister := snapshot.NodeInfos()
// Get pods in snapshot
for _, n := range tc.nodes {
nodeInfo, err := nodeInfoLister.Get(n.Name)
assert.NoError(t, err)
assert.NotEqual(t, nodeInfo.Pods, nil)
for _, podInfo := range nodeInfo.Pods {
podsInSnapshot = append(podsInSnapshot, podInfo.Pod)
}
}
assert.ElementsMatch(t, tc.wantPodsInSnapshot, podsInSnapshot)
})
}
}
func getPrioritySetter(priority int32) func(*apiv1.Pod) {
return func(pod *apiv1.Pod) {
pod.Spec.Priority = &priority
}
}
func getNominatedNodeNameSetter(nodeName string) func(*apiv1.Pod) {
return func(pod *apiv1.Pod) {
pod.Status.NominatedNodeName = nodeName
}
}
type mockListerRegistry struct {
kube_util.ListerRegistry
nodes []*apiv1.Node
}
func newMockListerRegistry(nodes []*apiv1.Node) *mockListerRegistry {
return &mockListerRegistry{
nodes: nodes,
}
}
func (mlr mockListerRegistry) AllNodeLister() kube_util.NodeLister {
return &mockNodeLister{nodes: mlr.nodes}
}
type mockNodeLister struct {
nodes []*apiv1.Node
}
func (mnl *mockNodeLister) List() ([]*apiv1.Node, error) {
return mnl.nodes, nil
}
func (mnl *mockNodeLister) Get(name string) (*apiv1.Node, error) {
return nil, fmt.Errorf("Unsupported operation")
}

View File

@ -32,6 +32,7 @@ type defaultPodListProcessor struct {
func NewDefaultPodListProcessor(predicateChecker predicatechecker.PredicateChecker) *defaultPodListProcessor {
return &defaultPodListProcessor{
processors: []pods.PodListProcessor{
NewFilterOutExpandablePodListProcessor(),
NewCurrentlyDrainedNodesPodListProcessor(),
NewFilterOutSchedulablePodListProcessor(predicateChecker),
NewFilterOutDaemonSetPodListProcessor(),

View File

@ -57,20 +57,12 @@ import (
"k8s.io/autoscaler/cluster-autoscaler/utils/backoff"
caerrors "k8s.io/autoscaler/cluster-autoscaler/utils/errors"
scheduler_utils "k8s.io/autoscaler/cluster-autoscaler/utils/scheduler"
"k8s.io/autoscaler/cluster-autoscaler/utils/tpu"
"k8s.io/utils/integer"
klog "k8s.io/klog/v2"
)
const (
// How old the oldest unschedulable pod should be before starting scale up.
unschedulablePodTimeBuffer = 2 * time.Second
// How old the oldest unschedulable pod with GPU should be before starting scale up.
// The idea is that nodes with GPU are very expensive and we're ready to sacrifice
// a bit more latency to wait for more pods and make a more informed scale-up decision.
unschedulablePodWithGpuTimeBuffer = 30 * time.Second
// NodeUpcomingAnnotation is an annotation CA adds to nodes which are upcoming.
NodeUpcomingAnnotation = "cluster-autoscaler.k8s.io/upcoming-node"
@ -309,7 +301,7 @@ func (a *StaticAutoscaler) RunOnce(currentTime time.Time) caerrors.AutoscalerErr
klog.Errorf("Failed to list pods: %v", err)
return caerrors.ToAutoscalerError(caerrors.ApiCallError, err)
}
originalScheduledPods, unschedulablePods := kube_util.ScheduledPods(pods), kube_util.UnschedulablePods(pods)
originalScheduledPods, unschedulablePods, unknownPods := kube_util.ScheduledPods(pods), kube_util.UnschedulablePods(pods), kube_util.UnknownPods(pods)
// Update cluster resource usage metrics
coresTotal, memoryTotal := calculateCoresMemoryTotal(allNodes, currentTime)
@ -451,24 +443,9 @@ func (a *StaticAutoscaler) RunOnce(currentTime time.Time) caerrors.AutoscalerErr
metrics.UpdateLastTime(metrics.Autoscaling, time.Now())
metrics.UpdateUnschedulablePodsCount(len(unschedulablePods))
unschedulablePods = tpu.ClearTPURequests(unschedulablePods)
// todo: move split and append below to separate PodListProcessor
// Some unschedulable pods can be waiting for lower priority pods preemption so they have nominated node to run.
// Such pods don't require scale up but should be considered during scale down.
unschedulablePods, unschedulableWaitingForLowerPriorityPreemption := core_utils.FilterOutExpendableAndSplit(unschedulablePods, allNodes, a.ExpendablePodsPriorityCutoff)
// modify the snapshot simulating scheduling of pods waiting for preemption.
// this is not strictly correct as we are not simulating preemption itself but it matches
// CA logic from before migration to scheduler framework. So let's keep it for now
for _, p := range unschedulableWaitingForLowerPriorityPreemption {
if err := a.ClusterSnapshot.AddPod(p, p.Status.NominatedNodeName); err != nil {
klog.Errorf("Failed to update snapshot with pod %s waiting for preemption", err)
return caerrors.ToAutoscalerError(caerrors.InternalError, err)
}
}
metrics.UpdateUnschedulablePodsCount(len(unschedulablePods), len(unknownPods))
// Treat unknown pods as unschedulable, pod list processor will remove schedulable pods
unschedulablePods = append(unschedulablePods, unknownPods...)
// Upcoming nodes are recently created nodes that haven't registered in the cluster yet, or haven't become ready yet.
upcomingCounts, registeredUpcoming := a.clusterStateRegistry.GetUpcomingNodes()
@ -553,7 +530,7 @@ func (a *StaticAutoscaler) RunOnce(currentTime time.Time) caerrors.AutoscalerErr
} else if a.MaxNodesTotal > 0 && len(readyNodes) >= a.MaxNodesTotal {
scaleUpStatus.Result = status.ScaleUpNoOptionsAvailable
klog.V(1).Info("Max total nodes in cluster reached")
} else if allPodsAreNew(unschedulablePodsToHelp, currentTime) {
} else if allPodsAreNew(unschedulablePodsToHelp, currentTime, a.AutoscalingOptions.UnschedulablePodTimeBuffer, a.AutoscalingOptions.UnschedulablePodWithGpuTimeBuffer) {
// The assumption here is that these pods have been created very recently and probably there
// is more pods to come. In theory we could check the newest pod time but then if pod were created
// slowly but at the pace of 1 every 2 seconds then no scale up would be triggered for long time.
@ -986,7 +963,7 @@ func (a *StaticAutoscaler) reportTaintsCount(nodes []*apiv1.Node) {
}
}
func allPodsAreNew(pods []*apiv1.Pod, currentTime time.Time) bool {
func allPodsAreNew(pods []*apiv1.Pod, currentTime time.Time, unschedulablePodTimeBuffer, unschedulablePodWithGpuTimeBuffer time.Duration) bool {
if core_utils.GetOldestCreateTime(pods).Add(unschedulablePodTimeBuffer).After(currentTime) {
return true
}

View File

@ -171,11 +171,14 @@ func TestStaticAutoscalerRunOnce(t *testing.T) {
n2 := BuildTestNode("n2", 1000, 1000)
SetNodeReadyState(n2, true, time.Now())
n3 := BuildTestNode("n3", 1000, 1000)
SetNodeReadyState(n3, true, time.Now())
n4 := BuildTestNode("n4", 1000, 1000)
n5 := BuildTestNode("n5", 1000, 1000)
p1 := BuildTestPod("p1", 600, 100)
p1.Spec.NodeName = "n1"
p2 := BuildTestPod("p2", 600, 100, MarkUnschedulable())
p3 := BuildTestPod("p3", 600, 100) // Not yet processed by scheduler
tn := BuildTestNode("tn", 1000, 1000)
tni := schedulerframework.NewNodeInfo()
@ -248,7 +251,7 @@ func TestStaticAutoscalerRunOnce(t *testing.T) {
// MaxNodesTotal reached.
readyNodeLister.SetNodes([]*apiv1.Node{n1})
allNodeLister.SetNodes([]*apiv1.Node{n1})
allPodListerMock.On("List").Return([]*apiv1.Pod{p1, p2}, nil).Twice()
allPodListerMock.On("List").Return([]*apiv1.Pod{p1, p2, p3}, nil).Twice()
daemonSetListerMock.On("List", labels.Everything()).Return([]*appsv1.DaemonSet{}, nil).Once()
podDisruptionBudgetListerMock.On("List").Return([]*policyv1.PodDisruptionBudget{}, nil).Once()
@ -259,10 +262,10 @@ func TestStaticAutoscalerRunOnce(t *testing.T) {
// Scale up.
readyNodeLister.SetNodes([]*apiv1.Node{n1})
allNodeLister.SetNodes([]*apiv1.Node{n1})
allPodListerMock.On("List").Return([]*apiv1.Pod{p1, p2}, nil).Twice()
allPodListerMock.On("List").Return([]*apiv1.Pod{p1, p2, p3}, nil).Twice()
daemonSetListerMock.On("List", labels.Everything()).Return([]*appsv1.DaemonSet{}, nil).Once()
podDisruptionBudgetListerMock.On("List").Return([]*policyv1.PodDisruptionBudget{}, nil).Once()
onScaleUpMock.On("ScaleUp", "ng1", 1).Return(nil).Once()
onScaleUpMock.On("ScaleUp", "ng1", 2).Return(nil).Once()
context.MaxNodesTotal = 10
err = autoscaler.RunOnce(time.Now().Add(time.Hour))
@ -302,12 +305,12 @@ func TestStaticAutoscalerRunOnce(t *testing.T) {
// Mark unregistered nodes.
readyNodeLister.SetNodes([]*apiv1.Node{n1, n2})
allNodeLister.SetNodes([]*apiv1.Node{n1, n2})
allPodListerMock.On("List").Return([]*apiv1.Pod{p1, p2}, nil).Twice()
allPodListerMock.On("List").Return([]*apiv1.Pod{p1, p2, p3}, nil).Twice()
daemonSetListerMock.On("List", labels.Everything()).Return([]*appsv1.DaemonSet{}, nil).Once()
podDisruptionBudgetListerMock.On("List").Return([]*policyv1.PodDisruptionBudget{}, nil).Once()
provider.AddNodeGroup("ng2", 0, 10, 1)
provider.AddNode("ng2", n3)
provider.AddNode("ng2", n4)
err = autoscaler.RunOnce(time.Now().Add(4 * time.Hour))
assert.NoError(t, err)
@ -315,11 +318,11 @@ func TestStaticAutoscalerRunOnce(t *testing.T) {
podDisruptionBudgetListerMock, daemonSetListerMock, onScaleUpMock, onScaleDownMock)
// Remove unregistered nodes.
readyNodeLister.SetNodes([]*apiv1.Node{n1, n2})
allNodeLister.SetNodes([]*apiv1.Node{n1, n2})
allPodListerMock.On("List").Return([]*apiv1.Pod{p1, p2}, nil).Twice()
readyNodeLister.SetNodes([]*apiv1.Node{n1, n2, n3})
allNodeLister.SetNodes([]*apiv1.Node{n1, n2, n3})
allPodListerMock.On("List").Return([]*apiv1.Pod{p1, p2, p3}, nil).Twice()
daemonSetListerMock.On("List", labels.Everything()).Return([]*appsv1.DaemonSet{}, nil).Once()
onScaleDownMock.On("ScaleDown", "ng2", "n3").Return(nil).Once()
onScaleDownMock.On("ScaleDown", "ng2", "n4").Return(nil).Once()
podDisruptionBudgetListerMock.On("List").Return([]*policyv1.PodDisruptionBudget{}, nil).Once()
err = autoscaler.RunOnce(time.Now().Add(5 * time.Hour))
@ -329,15 +332,15 @@ func TestStaticAutoscalerRunOnce(t *testing.T) {
podDisruptionBudgetListerMock, daemonSetListerMock, onScaleUpMock, onScaleDownMock)
// Scale up to node gorup min size.
readyNodeLister.SetNodes([]*apiv1.Node{n4})
allNodeLister.SetNodes([]*apiv1.Node{n4})
readyNodeLister.SetNodes([]*apiv1.Node{n5})
allNodeLister.SetNodes([]*apiv1.Node{n5})
allPodListerMock.On("List").Return([]*apiv1.Pod{}, nil).Twice()
daemonSetListerMock.On("List", labels.Everything()).Return([]*appsv1.DaemonSet{}, nil)
podDisruptionBudgetListerMock.On("List").Return([]*policyv1.PodDisruptionBudget{}, nil)
onScaleUpMock.On("ScaleUp", "ng3", 2).Return(nil).Once() // 2 new nodes are supposed to be scaled up.
provider.AddNodeGroup("ng3", 3, 10, 1)
provider.AddNode("ng3", n4)
provider.AddNode("ng3", n5)
err = autoscaler.RunOnce(time.Now().Add(5 * time.Hour))
assert.NoError(t, err)
@ -366,6 +369,7 @@ func TestStaticAutoscalerRunOnceWithAutoprovisionedEnabled(t *testing.T) {
p1 := BuildTestPod("p1", 100, 100)
p1.Spec.NodeName = "n1"
p2 := BuildTestPod("p2", 600, 100, MarkUnschedulable())
p3 := BuildTestPod("p3", 600, 100) // Not yet processed by scheduler
tn1 := BuildTestNode("tn1", 100, 1000)
SetNodeReadyState(tn1, true, time.Now())
@ -457,11 +461,11 @@ func TestStaticAutoscalerRunOnceWithAutoprovisionedEnabled(t *testing.T) {
// Scale up.
readyNodeLister.SetNodes([]*apiv1.Node{n1})
allNodeLister.SetNodes([]*apiv1.Node{n1})
allPodListerMock.On("List").Return([]*apiv1.Pod{p1, p2}, nil).Twice()
allPodListerMock.On("List").Return([]*apiv1.Pod{p1, p2, p3}, nil).Twice()
podDisruptionBudgetListerMock.On("List").Return([]*policyv1.PodDisruptionBudget{}, nil).Once()
daemonSetListerMock.On("List", labels.Everything()).Return([]*appsv1.DaemonSet{}, nil).Once()
onNodeGroupCreateMock.On("Create", "autoprovisioned-TN2").Return(nil).Once()
onScaleUpMock.On("ScaleUp", "autoprovisioned-TN2", 1).Return(nil).Once()
onScaleUpMock.On("ScaleUp", "autoprovisioned-TN2", 2).Return(nil).Once()
err = autoscaler.RunOnce(time.Now().Add(time.Hour))
assert.NoError(t, err)
@ -521,10 +525,13 @@ func TestStaticAutoscalerRunOnceWithALongUnregisteredNode(t *testing.T) {
SetNodeReadyState(n1, true, time.Now())
n2 := BuildTestNode("n2", 1000, 1000)
SetNodeReadyState(n2, true, time.Now())
n3 := BuildTestNode("n3", 1000, 1000)
SetNodeReadyState(n3, true, time.Now())
p1 := BuildTestPod("p1", 600, 100)
p1.Spec.NodeName = "n1"
p2 := BuildTestPod("p2", 600, 100, MarkUnschedulable())
p3 := BuildTestPod("p3", 600, 100)
provider := testprovider.NewTestCloudProvider(
func(id string, delta int) error {
@ -534,7 +541,7 @@ func TestStaticAutoscalerRunOnceWithALongUnregisteredNode(t *testing.T) {
deleteFinished <- true
return ret
})
provider.AddNodeGroup("ng1", 2, 10, 2)
provider.AddNodeGroup("ng1", 3, 10, 2)
provider.AddNode("ng1", n1)
// broken node, that will be just hanging out there during
@ -606,10 +613,10 @@ func TestStaticAutoscalerRunOnceWithALongUnregisteredNode(t *testing.T) {
// Scale up.
readyNodeLister.SetNodes([]*apiv1.Node{n1})
allNodeLister.SetNodes([]*apiv1.Node{n1})
allPodListerMock.On("List").Return([]*apiv1.Pod{p1, p2}, nil).Twice()
allPodListerMock.On("List").Return([]*apiv1.Pod{p1, p2, p3}, nil).Twice()
daemonSetListerMock.On("List", labels.Everything()).Return([]*appsv1.DaemonSet{}, nil).Once()
podDisruptionBudgetListerMock.On("List").Return([]*policyv1.PodDisruptionBudget{}, nil).Once()
onScaleUpMock.On("ScaleUp", "ng1", 1).Return(nil).Once()
onScaleUpMock.On("ScaleUp", "ng1", 2).Return(nil).Once()
err = autoscaler.RunOnce(later.Add(time.Hour))
assert.NoError(t, err)
@ -618,11 +625,12 @@ func TestStaticAutoscalerRunOnceWithALongUnregisteredNode(t *testing.T) {
// Remove broken node after going over min size
provider.AddNode("ng1", n2)
ng1.SetTargetSize(3)
provider.AddNode("ng1", n3)
ng1.SetTargetSize(4)
readyNodeLister.SetNodes([]*apiv1.Node{n1, n2})
allNodeLister.SetNodes([]*apiv1.Node{n1, n2})
allPodListerMock.On("List").Return([]*apiv1.Pod{p1, p2}, nil).Twice()
readyNodeLister.SetNodes([]*apiv1.Node{n1, n2, n3})
allNodeLister.SetNodes([]*apiv1.Node{n1, n2, n3})
allPodListerMock.On("List").Return([]*apiv1.Pod{p1, p2, p3}, nil).Twice()
onScaleDownMock.On("ScaleDown", "ng1", "broken").Return(nil).Once()
daemonSetListerMock.On("List", labels.Everything()).Return([]*appsv1.DaemonSet{}, nil).Once()
podDisruptionBudgetListerMock.On("List").Return([]*policyv1.PodDisruptionBudget{}, nil).Once()

View File

@ -243,6 +243,8 @@ var (
maxAllocatableDifferenceRatio = flag.Float64("max-allocatable-difference-ratio", config.DefaultMaxAllocatableDifferenceRatio, "Maximum difference in allocatable resources between two similar node groups to be considered for balancing. Value is a ratio of the smaller node group's allocatable resource.")
forceDaemonSets = flag.Bool("force-ds", false, "Blocks scale-up of node groups too small for all suitable Daemon Sets pods.")
dynamicNodeDeleteDelayAfterTaintEnabled = flag.Bool("dynamic-node-delete-delay-after-taint-enabled", false, "Enables dynamic adjustment of NodeDeleteDelayAfterTaint based of the latency between CA and api-server")
unschedulablePodTimeBuffer = flag.Duration("unschedulable-pod-time-buffer", 2*time.Second, "How old the oldest unschedulable pod should be before starting scale up.")
unschedulablePodWithGpuTimeBuffer = flag.Duration("unschedulable-pod-with-gpu-time-buffer", 30*time.Second, "How old the oldest unschedulable pod with GPU should be before starting scale up.")
)
func isFlagPassed(name string) bool {
@ -390,6 +392,8 @@ func createAutoscalingOptions() config.AutoscalingOptions {
MaxFreeDifferenceRatio: *maxFreeDifferenceRatio,
},
DynamicNodeDeleteDelayAfterTaintEnabled: *dynamicNodeDeleteDelayAfterTaintEnabled,
UnschedulablePodTimeBuffer: *unschedulablePodTimeBuffer,
UnschedulablePodWithGpuTimeBuffer: *unschedulablePodWithGpuTimeBuffer,
}
}

View File

@ -135,12 +135,13 @@ var (
}, []string{"node_group_type"},
)
unschedulablePodsCount = k8smetrics.NewGauge(
// Unschedulable pod count can be from scheduler-marked-unschedulable pods or not-yet-processed pods (unknown)
unschedulablePodsCount = k8smetrics.NewGaugeVec(
&k8smetrics.GaugeOpts{
Namespace: caNamespace,
Name: "unschedulable_pods_count",
Help: "Number of unschedulable pods in the cluster.",
},
}, []string{"count_type"},
)
maxNodesCount = k8smetrics.NewGauge(
@ -472,8 +473,9 @@ func UpdateNodeGroupsCount(autoscaled, autoprovisioned int) {
}
// UpdateUnschedulablePodsCount records number of currently unschedulable pods
func UpdateUnschedulablePodsCount(podsCount int) {
unschedulablePodsCount.Set(float64(podsCount))
func UpdateUnschedulablePodsCount(uschedulablePodsCount, unknownPodsCount int) {
unschedulablePodsCount.WithLabelValues("unschedulable").Set(float64(uschedulablePodsCount))
unschedulablePodsCount.WithLabelValues("unknown").Set(float64(unknownPodsCount))
}
// UpdateMaxNodesCount records the current maximum number of nodes being set for all node groups

View File

@ -156,6 +156,23 @@ func ScheduledPods(allPods []*apiv1.Pod) []*apiv1.Pod {
return scheduledPods
}
// UnknownPods is a helper method that returns all pods which are not yet processed by the scheduler
func UnknownPods(allPods []*apiv1.Pod) []*apiv1.Pod {
var unknownPods []*apiv1.Pod
for _, pod := range allPods {
// Make sure it's not scheduled or deleted
if pod.Spec.NodeName != "" || pod.GetDeletionTimestamp() != nil {
continue
}
// Make sure it's not unschedulable
_, condition := podv1.GetPodCondition(&pod.Status, apiv1.PodScheduled)
if condition == nil || (condition.Status == apiv1.ConditionFalse && condition.Reason == "") {
unknownPods = append(unknownPods, pod)
}
}
return unknownPods
}
// UnschedulablePods is a helper method that returns all unschedulable pods from given pod list.
func UnschedulablePods(allPods []*apiv1.Pod) []*apiv1.Pod {
var unschedulablePods []*apiv1.Pod