From 4635a6dc04c4dbede06326c563d209c39b442dfc Mon Sep 17 00:00:00 2001 From: Mahmoud Atwa Date: Wed, 8 Nov 2023 10:15:22 +0000 Subject: [PATCH] Allow users to specify which schedulers to ignore --- .../config/autoscaling_options.go | 3 +++ cluster-autoscaler/core/static_autoscaler.go | 2 +- .../core/static_autoscaler_test.go | 15 ++++++++--- cluster-autoscaler/main.go | 3 +++ .../utils/kubernetes/listers.go | 25 +++++++++++++++++-- .../utils/scheduler/scheduler.go | 9 +++++++ cluster-autoscaler/utils/test/test_utils.go | 7 ++++++ 7 files changed, 58 insertions(+), 6 deletions(-) diff --git a/cluster-autoscaler/config/autoscaling_options.go b/cluster-autoscaler/config/autoscaling_options.go index 40441206a0..66cd06113a 100644 --- a/cluster-autoscaler/config/autoscaling_options.go +++ b/cluster-autoscaler/config/autoscaling_options.go @@ -281,4 +281,7 @@ type AutoscalingOptions struct { //for scheduler to mark pods as unschedulable and will process both marked & non-marked pods //it will also signal whether we enable/disable waiting for pod time buffers before triggering a scale-up. IgnoreSchedulerProcessing bool + //IgnoredSchedulers are used to specify which schedulers to ignore their processing + //if IgnoreSchedulerProcessing is set to true + IgnoredSchedulers map[string]bool } diff --git a/cluster-autoscaler/core/static_autoscaler.go b/cluster-autoscaler/core/static_autoscaler.go index d9a09d34ff..f5f7634bdd 100644 --- a/cluster-autoscaler/core/static_autoscaler.go +++ b/cluster-autoscaler/core/static_autoscaler.go @@ -311,7 +311,7 @@ func (a *StaticAutoscaler) RunOnce(currentTime time.Time) caerrors.AutoscalerErr originalScheduledPods, unschedulablePods := kube_util.ScheduledPods(pods), kube_util.UnschedulablePods(pods) schedulerUnprocessed := make([]*apiv1.Pod, 0, 0) if a.IgnoreSchedulerProcessing { - schedulerUnprocessed = kube_util.SchedulerUnprocessedPods(pods) + schedulerUnprocessed = kube_util.SchedulerUnprocessedPods(pods, a.IgnoredSchedulers) } // Update cluster resource usage metrics diff --git a/cluster-autoscaler/core/static_autoscaler_test.go b/cluster-autoscaler/core/static_autoscaler_test.go index c8e07145bb..2649f69be9 100644 --- a/cluster-autoscaler/core/static_autoscaler_test.go +++ b/cluster-autoscaler/core/static_autoscaler_test.go @@ -992,6 +992,9 @@ func TestStaticAutoscalerRunOnceWithFilteringOnUpcomingNodesEnabledNoScaleUp(t * } func TestStaticAutoscalerRunOnceWithSchedulerProcessingIgnored(t *testing.T) { + ignoredScheduler := "ignored-scheduler" + nonIgnoredScheduler := "non-ignored-scheduler" + readyNodeLister := kubernetes.NewTestNodeLister(nil) allNodeLister := kubernetes.NewTestNodeLister(nil) allPodListerMock := &podListerMock{} @@ -1010,7 +1013,9 @@ func TestStaticAutoscalerRunOnceWithSchedulerProcessingIgnored(t *testing.T) { p1 := BuildTestPod("p1", 600, 100) p1.Spec.NodeName = "n1" p2 := BuildTestPod("p2", 600, 100, MarkUnschedulable()) - p3 := BuildTestPod("p3", 600, 100) // Not yet processed by scheduler + p3 := BuildTestPod("p3", 600, 100, AddSchedulerName(apiv1.DefaultSchedulerName)) // Not yet processed by scheduler, default scheduler is ignored + p4 := BuildTestPod("p4", 600, 100, AddSchedulerName(ignoredScheduler)) // non-default scheduler & ignored, expects a scale-up + p5 := BuildTestPod("p5", 600, 100, AddSchedulerName(nonIgnoredScheduler)) // non-default scheduler & not ignored, shouldn't cause a scale-up provider := testprovider.NewTestCloudProvider( func(id string, delta int) error { @@ -1041,6 +1046,10 @@ func TestStaticAutoscalerRunOnceWithSchedulerProcessingIgnored(t *testing.T) { MaxCoresTotal: 10, MaxMemoryTotal: 100000, IgnoreSchedulerProcessing: true, + IgnoredSchedulers: map[string]bool{ + apiv1.DefaultSchedulerName: true, + ignoredScheduler: true, + }, } processorCallbacks := newStaticAutoscalerProcessorCallbacks() @@ -1083,10 +1092,10 @@ func TestStaticAutoscalerRunOnceWithSchedulerProcessingIgnored(t *testing.T) { // Scale up. readyNodeLister.SetNodes([]*apiv1.Node{n1}) allNodeLister.SetNodes([]*apiv1.Node{n1}) - allPodListerMock.On("List").Return([]*apiv1.Pod{p1, p2, p3}, nil).Twice() + allPodListerMock.On("List").Return([]*apiv1.Pod{p1, p2, p3, p4, p5}, nil).Twice() daemonSetListerMock.On("List", labels.Everything()).Return([]*appsv1.DaemonSet{}, nil).Once() podDisruptionBudgetListerMock.On("List").Return([]*policyv1.PodDisruptionBudget{}, nil).Once() - onScaleUpMock.On("ScaleUp", "ng1", 2).Return(nil).Once() + onScaleUpMock.On("ScaleUp", "ng1", 3).Return(nil).Once() err = autoscaler.RunOnce(later.Add(time.Hour)) assert.NoError(t, err) diff --git a/cluster-autoscaler/main.go b/cluster-autoscaler/main.go index 8429cb06d9..abf28a5dde 100644 --- a/cluster-autoscaler/main.go +++ b/cluster-autoscaler/main.go @@ -31,6 +31,7 @@ import ( "github.com/spf13/pflag" + apiv1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/meta" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apiserver/pkg/server/mux" @@ -244,6 +245,7 @@ var ( forceDaemonSets = flag.Bool("force-ds", false, "Blocks scale-up of node groups too small for all suitable Daemon Sets pods.") dynamicNodeDeleteDelayAfterTaintEnabled = flag.Bool("dynamic-node-delete-delay-after-taint-enabled", false, "Enables dynamic adjustment of NodeDeleteDelayAfterTaint based of the latency between CA and api-server") ignoreSchedulerProcessing = flag.Bool("ignore-scheduler-processing", false, "If true, cluster autoscaler will not wait for scheduler to mark pods as unschedulable and will process both marked & non-marked pods (Schedulable pods will be filtered before scaling-up) it will also disable waiting for pod time buffers before triggering a scale-up.") + ignoredSchedulers = pflag.StringSlice("ignore-schedulers", []string{apiv1.DefaultSchedulerName}, fmt.Sprintf("Names of schedulers to be ignored if '--ignore-scheduler-processing' is set to true. default value '%s' is used", apiv1.DefaultSchedulerName)) ) func isFlagPassed(name string) bool { @@ -392,6 +394,7 @@ func createAutoscalingOptions() config.AutoscalingOptions { }, DynamicNodeDeleteDelayAfterTaintEnabled: *dynamicNodeDeleteDelayAfterTaintEnabled, IgnoreSchedulerProcessing: *ignoreSchedulerProcessing, + IgnoredSchedulers: scheduler_util.GetIgnoredSchedulersMap(*ignoredSchedulers), } } diff --git a/cluster-autoscaler/utils/kubernetes/listers.go b/cluster-autoscaler/utils/kubernetes/listers.go index 4a96344cad..5e0eea234a 100644 --- a/cluster-autoscaler/utils/kubernetes/listers.go +++ b/cluster-autoscaler/utils/kubernetes/listers.go @@ -144,18 +144,23 @@ type PodLister interface { List() ([]*apiv1.Pod, error) } +// isScheduled checks whether a pod is scheduled on a node or not func isScheduled(pod *apiv1.Pod) bool { if pod == nil { return false } return pod.Spec.NodeName != "" } + +// isDeleted checks whether a pod is deleted not func isDeleted(pod *apiv1.Pod) bool { if pod == nil { return false } return pod.GetDeletionTimestamp() != nil } + +// isUnschedulable checks whether a pod is unschedulable or not func isUnschedulable(pod *apiv1.Pod) bool { if pod == nil { return false @@ -170,6 +175,12 @@ func isUnschedulable(pod *apiv1.Pod) bool { return true } +// getIsDefaultSchedulerIgnored checks if the default scheduler should be ignored or not +func getIsDefaultSchedulerIgnored(ignoredSchedulers map[string]bool) bool { + ignored, ok := ignoredSchedulers[apiv1.DefaultSchedulerName] + return ignored && ok +} + // ScheduledPods is a helper method that returns all scheduled pods from given pod list. func ScheduledPods(allPods []*apiv1.Pod) []*apiv1.Pod { var scheduledPods []*apiv1.Pod @@ -182,10 +193,20 @@ func ScheduledPods(allPods []*apiv1.Pod) []*apiv1.Pod { return scheduledPods } -// SchedulerUnprocessedPods is a helper method that returns all pods which are not yet processed by the scheduler -func SchedulerUnprocessedPods(allPods []*apiv1.Pod) []*apiv1.Pod { +// SchedulerUnprocessedPods is a helper method that returns all pods which are not yet processed by the specified ignored schedulers +func SchedulerUnprocessedPods(allPods []*apiv1.Pod, ignoredSchedulers map[string]bool) []*apiv1.Pod { var unprocessedPods []*apiv1.Pod + + isDefaultSchedulerIgnored := getIsDefaultSchedulerIgnored(ignoredSchedulers) + for _, pod := range allPods { + // Don't add a pod with a scheduler that isn't specified by the user + if !isDefaultSchedulerIgnored && pod.Spec.SchedulerName == "" { + continue + } + if isIgnored, found := ignoredSchedulers[pod.Spec.SchedulerName]; !found || !isIgnored { + continue + } // Make sure it's not scheduled or deleted if isScheduled(pod) || isDeleted(pod) || isUnschedulable(pod) { continue diff --git a/cluster-autoscaler/utils/scheduler/scheduler.go b/cluster-autoscaler/utils/scheduler/scheduler.go index 59d008db64..7b31fa77c4 100644 --- a/cluster-autoscaler/utils/scheduler/scheduler.go +++ b/cluster-autoscaler/utils/scheduler/scheduler.go @@ -147,3 +147,12 @@ func ConfigFromPath(path string) (*scheduler_config.KubeSchedulerConfiguration, return cfgObj, nil } + +// GetIgnoredSchedulersMap returns a map of scheduler names that should be ignored as keys, and values are set to true +func GetIgnoredSchedulersMap(ignoredSchedulers []string) map[string]bool { + ignoredSchedulersMap := make(map[string]bool, len(ignoredSchedulers)) + for _, scheduler := range ignoredSchedulers { + ignoredSchedulersMap[scheduler] = true + } + return ignoredSchedulersMap +} diff --git a/cluster-autoscaler/utils/test/test_utils.go b/cluster-autoscaler/utils/test/test_utils.go index c984c0a839..1b76b24b8b 100644 --- a/cluster-autoscaler/utils/test/test_utils.go +++ b/cluster-autoscaler/utils/test/test_utils.go @@ -82,6 +82,13 @@ func MarkUnschedulable() func(*apiv1.Pod) { } } +// AddSchedulerName adds scheduler name to a pod. +func AddSchedulerName(schedulerName string) func(*apiv1.Pod) { + return func(pod *apiv1.Pod) { + pod.Spec.SchedulerName = schedulerName + } +} + // BuildDSTestPod creates a DaemonSet pod with cpu and memory. func BuildDSTestPod(name string, cpu int64, mem int64) *apiv1.Pod {