Fix multiple comments and update flags

This commit is contained in:
Mahmoud Atwa 2023-10-31 11:04:38 +00:00
parent a1ab7b9e20
commit 86ab017967
9 changed files with 142 additions and 67 deletions

View File

@ -277,12 +277,8 @@ type AutoscalingOptions struct {
// dynamicNodeDeleteDelayAfterTaintEnabled is used to enable/disable dynamic adjustment of NodeDeleteDelayAfterTaint
// based on the latency between the CA and the api-server
DynamicNodeDeleteDelayAfterTaintEnabled bool
// UnschedulablePodTimeBuffer controls when scale-ups happen so that
// the oldest unschedulable pod is older than UnschedulablePodTimeBuffer
UnschedulablePodTimeBuffer time.Duration
// UnschedulablePodWithGpuTimeBuffer specifies how old should the oldest unschedulable pod with GPU be before starting scale up.
// The idea is that nodes with GPU are very expensive and we're ready to sacrifice
// a bit more latency to wait for more pods and make a more informed scale-up decision.
UnschedulablePodWithGpuTimeBuffer time.Duration
// unschedulablePodWithGpuTimeBuffer = 30 * time.Second
//IgnoreSchedulerProcessing is used to signal whether CA will/won't wait
//for scheduler to mark pods as unschedulable and will process both marked & non-marked pods
//it will also signal whether we enable/disable waiting for pod time buffers before triggering a scale-up.
IgnoreSchedulerProcessing bool
}

View File

@ -0,0 +1,39 @@
/*
Copyright 2023 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package podlistprocessor
import (
apiv1 "k8s.io/api/core/v1"
"k8s.io/autoscaler/cluster-autoscaler/context"
"k8s.io/autoscaler/cluster-autoscaler/utils/tpu"
)
type clearTpuRequests struct {
}
// NewClearTPURequestsPodListProcessor creates a PodListProcessor which clears TPU requests in pods
func NewClearTPURequestsPodListProcessor() *clearTpuRequests {
return &clearTpuRequests{}
}
// Process removes pods' tpu requests
func (p *clearTpuRequests) Process(context *context.AutoscalingContext, pods []*apiv1.Pod) ([]*apiv1.Pod, error) {
return tpu.ClearTPURequests(pods), nil
}
func (p *clearTpuRequests) CleanUp() {
}

View File

@ -24,17 +24,16 @@ import (
klog "k8s.io/klog/v2"
)
type filterOutExpandable struct {
type filterOutExpendable struct {
}
// NewFilterOutExpandablePodListProcessor creates a PodListProcessor filtering out expendable pods
func NewFilterOutExpandablePodListProcessor() *filterOutExpandable {
return &filterOutExpandable{}
// NewFilterOutExpendablePodListProcessor creates a PodListProcessor filtering out expendable pods
func NewFilterOutExpendablePodListProcessor() *filterOutExpendable {
return &filterOutExpendable{}
}
// Process filters out pods which are expendable and adds pods which is waiting for lower priority pods preemption to the cluster snapshot
func (p *filterOutExpandable) Process(context *context.AutoscalingContext, pods []*apiv1.Pod) ([]*apiv1.Pod, error) {
klog.V(4).Infof("Filtering out expandable pods")
func (p *filterOutExpendable) Process(context *context.AutoscalingContext, pods []*apiv1.Pod) ([]*apiv1.Pod, error) {
nodes, err := context.AllNodeLister().List()
if err != nil {
return nil, err
@ -42,25 +41,25 @@ func (p *filterOutExpandable) Process(context *context.AutoscalingContext, pods
expendablePodsPriorityCutoff := context.AutoscalingOptions.ExpendablePodsPriorityCutoff
unschedulablePods, waitingForLowerPriorityPreemption := core_utils.FilterOutExpendableAndSplit(pods, nodes, expendablePodsPriorityCutoff)
if err = p.addPreemptiblePodsToSnapshot(waitingForLowerPriorityPreemption, context); err != nil {
if err = p.addPreemptingPodsToSnapshot(waitingForLowerPriorityPreemption, context); err != nil {
return nil, err
}
return unschedulablePods, nil
}
// addPreemptiblePodsToSnapshot modifies the snapshot simulating scheduling of pods waiting for preemption.
// addPreemptingPodsToSnapshot modifies the snapshot simulating scheduling of pods waiting for preemption.
// this is not strictly correct as we are not simulating preemption itself but it matches
// CA logic from before migration to scheduler framework. So let's keep it for now
func (p *filterOutExpandable) addPreemptiblePodsToSnapshot(pods []*apiv1.Pod, ctx *context.AutoscalingContext) error {
func (p *filterOutExpendable) addPreemptingPodsToSnapshot(pods []*apiv1.Pod, ctx *context.AutoscalingContext) error {
for _, p := range pods {
if err := ctx.ClusterSnapshot.AddPod(p, p.Status.NominatedNodeName); err != nil {
klog.Errorf("Failed to update snapshot with pod %s waiting for preemption", err)
klog.Errorf("Failed to update snapshot with pod %s/%s waiting for preemption: %v", p.Namespace, p.Name, err)
return caerrors.ToAutoscalerError(caerrors.InternalError, err)
}
}
return nil
}
func (p *filterOutExpandable) CleanUp() {
func (p *filterOutExpendable) CleanUp() {
}

View File

@ -53,51 +53,51 @@ func TestFilterOutExpendable(t *testing.T) {
{
name: "non-expendable pods with priority >= to cutoff priority",
pods: []*apiv1.Pod{
test.BuildTestPod("p1", 1000, 1, getPrioritySetter(2)),
test.BuildTestPod("p2", 1000, 1, getPrioritySetter(3)),
test.BuildTestPod("p1", 1000, 1, priority(2)),
test.BuildTestPod("p2", 1000, 1, priority(3)),
},
wantPods: []*apiv1.Pod{
test.BuildTestPod("p1", 1000, 1, getPrioritySetter(2)),
test.BuildTestPod("p2", 1000, 1, getPrioritySetter(3)),
test.BuildTestPod("p1", 1000, 1, priority(2)),
test.BuildTestPod("p2", 1000, 1, priority(3)),
},
priorityCutoff: 2,
},
{
name: "single expednable pod",
pods: []*apiv1.Pod{
test.BuildTestPod("p", 1000, 1, getPrioritySetter(2)),
test.BuildTestPod("p", 1000, 1, priority(2)),
},
priorityCutoff: 3,
},
{
name: "single waiting-for-low-priority-preemption pod",
pods: []*apiv1.Pod{
test.BuildTestPod("p", 1000, 1, getNominatedNodeNameSetter("node-1")),
test.BuildTestPod("p", 1000, 1, nominatedNodeName("node-1")),
},
nodes: []*apiv1.Node{
test.BuildTestNode("node-1", 2400, 2400),
},
wantPodsInSnapshot: []*apiv1.Pod{
test.BuildTestPod("p", 1000, 1, getNominatedNodeNameSetter("node-1")),
test.BuildTestPod("p", 1000, 1, nominatedNodeName("node-1")),
},
},
{
name: "mixed expendable, non-expendable & waiting-for-low-priority-preemption pods",
pods: []*apiv1.Pod{
test.BuildTestPod("p1", 1000, 1, getPrioritySetter(3)),
test.BuildTestPod("p2", 1000, 1, getPrioritySetter(4)),
test.BuildTestPod("p3", 1000, 1, getPrioritySetter(1)),
test.BuildTestPod("p1", 1000, 1, priority(3)),
test.BuildTestPod("p2", 1000, 1, priority(4)),
test.BuildTestPod("p3", 1000, 1, priority(1)),
test.BuildTestPod("p4", 1000, 1),
test.BuildTestPod("p5", 1000, 1, getNominatedNodeNameSetter("node-1")),
test.BuildTestPod("p5", 1000, 1, nominatedNodeName("node-1")),
},
priorityCutoff: 2,
wantPods: []*apiv1.Pod{
test.BuildTestPod("p1", 1000, 1, getPrioritySetter(3)),
test.BuildTestPod("p2", 1000, 1, getPrioritySetter(4)),
test.BuildTestPod("p1", 1000, 1, priority(3)),
test.BuildTestPod("p2", 1000, 1, priority(4)),
test.BuildTestPod("p4", 1000, 1),
},
wantPodsInSnapshot: []*apiv1.Pod{
test.BuildTestPod("p5", 1000, 1, getNominatedNodeNameSetter("node-1")),
test.BuildTestPod("p5", 1000, 1, nominatedNodeName("node-1")),
},
nodes: []*apiv1.Node{
test.BuildTestNode("node-1", 2400, 2400),
@ -107,7 +107,7 @@ func TestFilterOutExpendable(t *testing.T) {
for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
processor := NewFilterOutExpandablePodListProcessor()
processor := NewFilterOutExpendablePodListProcessor()
snapshot := clustersnapshot.NewBasicClusterSnapshot()
snapshot.AddNodes(tc.nodes)
@ -141,12 +141,12 @@ func TestFilterOutExpendable(t *testing.T) {
}
}
func getPrioritySetter(priority int32) func(*apiv1.Pod) {
func priority(priority int32) func(*apiv1.Pod) {
return func(pod *apiv1.Pod) {
pod.Spec.Priority = &priority
}
}
func getNominatedNodeNameSetter(nodeName string) func(*apiv1.Pod) {
func nominatedNodeName(nodeName string) func(*apiv1.Pod) {
return func(pod *apiv1.Pod) {
pod.Status.NominatedNodeName = nodeName
}

View File

@ -32,7 +32,8 @@ type defaultPodListProcessor struct {
func NewDefaultPodListProcessor(predicateChecker predicatechecker.PredicateChecker) *defaultPodListProcessor {
return &defaultPodListProcessor{
processors: []pods.PodListProcessor{
NewFilterOutExpandablePodListProcessor(),
NewClearTPURequestsPodListProcessor(),
NewFilterOutExpendablePodListProcessor(),
NewCurrentlyDrainedNodesPodListProcessor(),
NewFilterOutSchedulablePodListProcessor(predicateChecker),
NewFilterOutDaemonSetPodListProcessor(),

View File

@ -63,6 +63,13 @@ import (
)
const (
// How old the oldest unschedulable pod should be before starting scale up.
unschedulablePodTimeBuffer = 2 * time.Second
// How old the oldest unschedulable pod with GPU should be before starting scale up.
// The idea is that nodes with GPU are very expensive and we're ready to sacrifice
// a bit more latency to wait for more pods and make a more informed scale-up decision.
unschedulablePodWithGpuTimeBuffer = 30 * time.Second
// NodeUpcomingAnnotation is an annotation CA adds to nodes which are upcoming.
NodeUpcomingAnnotation = "cluster-autoscaler.k8s.io/upcoming-node"
@ -301,7 +308,11 @@ func (a *StaticAutoscaler) RunOnce(currentTime time.Time) caerrors.AutoscalerErr
klog.Errorf("Failed to list pods: %v", err)
return caerrors.ToAutoscalerError(caerrors.ApiCallError, err)
}
originalScheduledPods, unschedulablePods, unknownPods := kube_util.ScheduledPods(pods), kube_util.UnschedulablePods(pods), kube_util.UnknownPods(pods)
originalScheduledPods, unschedulablePods := kube_util.ScheduledPods(pods), kube_util.UnschedulablePods(pods)
schedulerUnprocessed := make([]*apiv1.Pod, 0, 0)
if a.IgnoreSchedulerProcessing {
schedulerUnprocessed = kube_util.SchedulerUnprocessedPods(pods)
}
// Update cluster resource usage metrics
coresTotal, memoryTotal := calculateCoresMemoryTotal(allNodes, currentTime)
@ -443,10 +454,12 @@ func (a *StaticAutoscaler) RunOnce(currentTime time.Time) caerrors.AutoscalerErr
metrics.UpdateLastTime(metrics.Autoscaling, time.Now())
metrics.UpdateUnschedulablePodsCount(len(unschedulablePods), len(unknownPods))
// Treat unknown pods as unschedulable, pod list processor will remove schedulable pods
unschedulablePods = append(unschedulablePods, unknownPods...)
// SchedulerUnprocessed might be zero here if it was disabled
metrics.UpdateUnschedulablePodsCount(len(unschedulablePods), len(schedulerUnprocessed))
if a.IgnoreSchedulerProcessing {
// Treat unknown pods as unschedulable, pod list processor will remove schedulable pods
unschedulablePods = append(unschedulablePods, schedulerUnprocessed...)
}
// Upcoming nodes are recently created nodes that haven't registered in the cluster yet, or haven't become ready yet.
upcomingCounts, registeredUpcoming := a.clusterStateRegistry.GetUpcomingNodes()
// For each upcoming node we inject a placeholder node faked to appear ready into the cluster snapshot, so that we can pack unschedulable pods on
@ -530,7 +543,7 @@ func (a *StaticAutoscaler) RunOnce(currentTime time.Time) caerrors.AutoscalerErr
} else if a.MaxNodesTotal > 0 && len(readyNodes) >= a.MaxNodesTotal {
scaleUpStatus.Result = status.ScaleUpNoOptionsAvailable
klog.V(1).Info("Max total nodes in cluster reached")
} else if allPodsAreNew(unschedulablePodsToHelp, currentTime, a.AutoscalingOptions.UnschedulablePodTimeBuffer, a.AutoscalingOptions.UnschedulablePodWithGpuTimeBuffer) {
} else if !a.IgnoreSchedulerProcessing && allPodsAreNew(unschedulablePodsToHelp, currentTime) {
// The assumption here is that these pods have been created very recently and probably there
// is more pods to come. In theory we could check the newest pod time but then if pod were created
// slowly but at the pace of 1 every 2 seconds then no scale up would be triggered for long time.
@ -963,7 +976,7 @@ func (a *StaticAutoscaler) reportTaintsCount(nodes []*apiv1.Node) {
}
}
func allPodsAreNew(pods []*apiv1.Pod, currentTime time.Time, unschedulablePodTimeBuffer, unschedulablePodWithGpuTimeBuffer time.Duration) bool {
func allPodsAreNew(pods []*apiv1.Pod, currentTime time.Time) bool {
if core_utils.GetOldestCreateTime(pods).Add(unschedulablePodTimeBuffer).After(currentTime) {
return true
}

View File

@ -243,8 +243,7 @@ var (
maxAllocatableDifferenceRatio = flag.Float64("max-allocatable-difference-ratio", config.DefaultMaxAllocatableDifferenceRatio, "Maximum difference in allocatable resources between two similar node groups to be considered for balancing. Value is a ratio of the smaller node group's allocatable resource.")
forceDaemonSets = flag.Bool("force-ds", false, "Blocks scale-up of node groups too small for all suitable Daemon Sets pods.")
dynamicNodeDeleteDelayAfterTaintEnabled = flag.Bool("dynamic-node-delete-delay-after-taint-enabled", false, "Enables dynamic adjustment of NodeDeleteDelayAfterTaint based of the latency between CA and api-server")
unschedulablePodTimeBuffer = flag.Duration("unschedulable-pod-time-buffer", 2*time.Second, "How old the oldest unschedulable pod should be before starting scale up.")
unschedulablePodWithGpuTimeBuffer = flag.Duration("unschedulable-pod-with-gpu-time-buffer", 30*time.Second, "How old the oldest unschedulable pod with GPU should be before starting scale up.")
ignoreSchedulerProcessing = flag.Bool("ignore-scheduler-processing", false, "If true, cluster autoscaler will not wait for scheduler to mark pods as unschedulable and will process both marked & non-marked pods (Schedulable pods will be filtered before scaling-up) it will also disable waiting for pod time buffers before triggering a scale-up.")
)
func isFlagPassed(name string) bool {
@ -392,8 +391,7 @@ func createAutoscalingOptions() config.AutoscalingOptions {
MaxFreeDifferenceRatio: *maxFreeDifferenceRatio,
},
DynamicNodeDeleteDelayAfterTaintEnabled: *dynamicNodeDeleteDelayAfterTaintEnabled,
UnschedulablePodTimeBuffer: *unschedulablePodTimeBuffer,
UnschedulablePodWithGpuTimeBuffer: *unschedulablePodWithGpuTimeBuffer,
IgnoreSchedulerProcessing: *ignoreSchedulerProcessing,
}
}

View File

@ -141,7 +141,7 @@ var (
Namespace: caNamespace,
Name: "unschedulable_pods_count",
Help: "Number of unschedulable pods in the cluster.",
}, []string{"count_type"},
}, []string{"type"},
)
maxNodesCount = k8smetrics.NewGauge(
@ -473,9 +473,14 @@ func UpdateNodeGroupsCount(autoscaled, autoprovisioned int) {
}
// UpdateUnschedulablePodsCount records number of currently unschedulable pods
func UpdateUnschedulablePodsCount(uschedulablePodsCount, unknownPodsCount int) {
unschedulablePodsCount.WithLabelValues("unschedulable").Set(float64(uschedulablePodsCount))
unschedulablePodsCount.WithLabelValues("unknown").Set(float64(unknownPodsCount))
func UpdateUnschedulablePodsCount(uschedulablePodsCount, schedulerUnprocessedCount int) {
UpdateUnschedulablePodsCountWithLabel(uschedulablePodsCount, "unschedulable")
UpdateUnschedulablePodsCountWithLabel(schedulerUnprocessedCount, "scheduler_unprocessed")
}
// UpdateUnschedulablePodsCountWithLabel records number of currently unschedulable pods wil label "type" value "label"
func UpdateUnschedulablePodsCountWithLabel(uschedulablePodsCount int, label string) {
unschedulablePodsCount.WithLabelValues(label).Set(float64(uschedulablePodsCount))
}
// UpdateMaxNodesCount records the current maximum number of nodes being set for all node groups

View File

@ -144,11 +144,37 @@ type PodLister interface {
List() ([]*apiv1.Pod, error)
}
func isScheduled(pod *apiv1.Pod) bool {
if pod == nil {
return false
}
return pod.Spec.NodeName != ""
}
func isDeleted(pod *apiv1.Pod) bool {
if pod == nil {
return false
}
return pod.GetDeletionTimestamp() != nil
}
func isUnschedulable(pod *apiv1.Pod) bool {
if pod == nil {
return false
}
if isScheduled(pod) || isDeleted(pod) {
return false
}
_, condition := podv1.GetPodCondition(&pod.Status, apiv1.PodScheduled)
if condition == nil || condition.Status != apiv1.ConditionFalse || condition.Reason != apiv1.PodReasonUnschedulable {
return false
}
return true
}
// ScheduledPods is a helper method that returns all scheduled pods from given pod list.
func ScheduledPods(allPods []*apiv1.Pod) []*apiv1.Pod {
var scheduledPods []*apiv1.Pod
for _, pod := range allPods {
if pod.Spec.NodeName != "" {
if isScheduled(pod) {
scheduledPods = append(scheduledPods, pod)
continue
}
@ -156,35 +182,33 @@ func ScheduledPods(allPods []*apiv1.Pod) []*apiv1.Pod {
return scheduledPods
}
// UnknownPods is a helper method that returns all pods which are not yet processed by the scheduler
func UnknownPods(allPods []*apiv1.Pod) []*apiv1.Pod {
var unknownPods []*apiv1.Pod
// SchedulerUnprocessedPods is a helper method that returns all pods which are not yet processed by the scheduler
func SchedulerUnprocessedPods(allPods []*apiv1.Pod) []*apiv1.Pod {
var unprocessedPods []*apiv1.Pod
for _, pod := range allPods {
// Make sure it's not scheduled or deleted
if pod.Spec.NodeName != "" || pod.GetDeletionTimestamp() != nil {
if isScheduled(pod) || isDeleted(pod) || isUnschedulable(pod) {
continue
}
// Make sure it's not unschedulable
// Make sure that if it's not scheduled it's either
// Not processed (condition is nil)
// Or Reason is empty (not schedulerError, terminated, ...etc)
_, condition := podv1.GetPodCondition(&pod.Status, apiv1.PodScheduled)
if condition == nil || (condition.Status == apiv1.ConditionFalse && condition.Reason == "") {
unknownPods = append(unknownPods, pod)
unprocessedPods = append(unprocessedPods, pod)
}
}
return unknownPods
return unprocessedPods
}
// UnschedulablePods is a helper method that returns all unschedulable pods from given pod list.
func UnschedulablePods(allPods []*apiv1.Pod) []*apiv1.Pod {
var unschedulablePods []*apiv1.Pod
for _, pod := range allPods {
if pod.Spec.NodeName == "" {
_, condition := podv1.GetPodCondition(&pod.Status, apiv1.PodScheduled)
if condition != nil && condition.Status == apiv1.ConditionFalse && condition.Reason == apiv1.PodReasonUnschedulable {
if pod.GetDeletionTimestamp() == nil {
unschedulablePods = append(unschedulablePods, pod)
}
}
if !isUnschedulable(pod) {
continue
}
unschedulablePods = append(unschedulablePods, pod)
}
return unschedulablePods
}