From 88b769b324dfcc94ced98866fc726f6085677e01 Mon Sep 17 00:00:00 2001 From: Krzysztof Jastrzebski Date: Thu, 12 Apr 2018 13:41:25 +0200 Subject: [PATCH] Refactor cluster autoscaler builder and add pod list processor. --- .../{core => context}/autoscaling_context.go | 2 +- .../autoscaling_context_test.go | 2 +- cluster-autoscaler/core/autoscaler.go | 28 +++++++++--- cluster-autoscaler/core/autoscaler_builder.go | 12 +++-- cluster-autoscaler/core/autoscaler_test.go | 20 ++++++--- cluster-autoscaler/core/scale_down.go | 7 +-- cluster-autoscaler/core/scale_down_test.go | 41 ++++++++--------- cluster-autoscaler/core/scale_up.go | 9 ++-- cluster-autoscaler/core/scale_up_test.go | 37 ++++++++-------- cluster-autoscaler/core/static_autoscaler.go | 19 ++++++-- .../core/static_autoscaler_test.go | 30 ++++++++----- cluster-autoscaler/core/utils.go | 7 +-- cluster-autoscaler/core/utils_test.go | 11 ++--- cluster-autoscaler/main.go | 29 +++++++----- .../utils/pods/pod_list_processor.go | 41 +++++++++++++++++ .../utils/pods/pod_list_processor_test.go | 44 +++++++++++++++++++ 16 files changed, 241 insertions(+), 98 deletions(-) rename cluster-autoscaler/{core => context}/autoscaling_context.go (99%) rename cluster-autoscaler/{core => context}/autoscaling_context_test.go (99%) create mode 100644 cluster-autoscaler/utils/pods/pod_list_processor.go create mode 100644 cluster-autoscaler/utils/pods/pod_list_processor_test.go diff --git a/cluster-autoscaler/core/autoscaling_context.go b/cluster-autoscaler/context/autoscaling_context.go similarity index 99% rename from cluster-autoscaler/core/autoscaling_context.go rename to cluster-autoscaler/context/autoscaling_context.go index f5b1ce4ee9..04ec0a7c14 100644 --- a/cluster-autoscaler/core/autoscaling_context.go +++ b/cluster-autoscaler/context/autoscaling_context.go @@ -14,7 +14,7 @@ See the License for the specific language governing permissions and limitations under the License. */ -package core +package context import ( "time" diff --git a/cluster-autoscaler/core/autoscaling_context_test.go b/cluster-autoscaler/context/autoscaling_context_test.go similarity index 99% rename from cluster-autoscaler/core/autoscaling_context_test.go rename to cluster-autoscaler/context/autoscaling_context_test.go index 9b94a709c4..7a7ff26029 100644 --- a/cluster-autoscaler/core/autoscaling_context_test.go +++ b/cluster-autoscaler/context/autoscaling_context_test.go @@ -14,7 +14,7 @@ See the License for the specific language governing permissions and limitations under the License. */ -package core +package context import ( "testing" diff --git a/cluster-autoscaler/core/autoscaler.go b/cluster-autoscaler/core/autoscaler.go index 611ef6b553..b805ae7c94 100644 --- a/cluster-autoscaler/core/autoscaler.go +++ b/cluster-autoscaler/core/autoscaler.go @@ -21,17 +21,24 @@ import ( "k8s.io/autoscaler/cluster-autoscaler/cloudprovider" "k8s.io/autoscaler/cluster-autoscaler/config/dynamic" + "k8s.io/autoscaler/cluster-autoscaler/context" "k8s.io/autoscaler/cluster-autoscaler/simulator" "k8s.io/autoscaler/cluster-autoscaler/utils/errors" kube_util "k8s.io/autoscaler/cluster-autoscaler/utils/kubernetes" + "k8s.io/autoscaler/cluster-autoscaler/utils/pods" kube_client "k8s.io/client-go/kubernetes" kube_record "k8s.io/client-go/tools/record" ) // AutoscalerOptions is the whole set of options for configuring an autoscaler type AutoscalerOptions struct { - AutoscalingOptions + context.AutoscalingOptions dynamic.ConfigFetcherOptions + KubeClient kube_client.Interface + KubeEventRecorder kube_record.EventRecorder + PredicateChecker *simulator.PredicateChecker + ListerRegistry kube_util.ListerRegistry + PodListProcessor pods.PodListProcessor } // Autoscaler is the main component of CA which scales up/down node groups according to its configuration @@ -47,13 +54,22 @@ type Autoscaler interface { ExitCleanUp() } -// NewAutoscaler creates an autoscaler of an appropriate type according to the parameters -func NewAutoscaler(opts AutoscalerOptions, predicateChecker *simulator.PredicateChecker, kubeClient kube_client.Interface, - kubeEventRecorder kube_record.EventRecorder, listerRegistry kube_util.ListerRegistry) (Autoscaler, errors.AutoscalerError) { +func initializeDefaultOptions(opts *AutoscalerOptions) error { + if opts.PodListProcessor == nil { + opts.PodListProcessor = pods.NewDefaultPodListProcessor() + } + return nil +} - autoscalerBuilder := NewAutoscalerBuilder(opts.AutoscalingOptions, predicateChecker, kubeClient, kubeEventRecorder, listerRegistry) +// NewAutoscaler creates an autoscaler of an appropriate type according to the parameters +func NewAutoscaler(opts AutoscalerOptions) (Autoscaler, errors.AutoscalerError) { + err := initializeDefaultOptions(&opts) + if err != nil { + return nil, errors.ToAutoscalerError(errors.InternalError, err) + } + autoscalerBuilder := NewAutoscalerBuilder(opts.AutoscalingOptions, opts.PredicateChecker, opts.KubeClient, opts.KubeEventRecorder, opts.ListerRegistry, opts.PodListProcessor) if opts.ConfigMapName != "" { - configFetcher := dynamic.NewConfigFetcher(opts.ConfigFetcherOptions, kubeClient, kubeEventRecorder) + configFetcher := dynamic.NewConfigFetcher(opts.ConfigFetcherOptions, opts.KubeClient, opts.KubeEventRecorder) return NewDynamicAutoscaler(autoscalerBuilder, configFetcher) } return autoscalerBuilder.Build() diff --git a/cluster-autoscaler/core/autoscaler_builder.go b/cluster-autoscaler/core/autoscaler_builder.go index 5997fff93e..aa347db4d6 100644 --- a/cluster-autoscaler/core/autoscaler_builder.go +++ b/cluster-autoscaler/core/autoscaler_builder.go @@ -18,9 +18,11 @@ package core import ( "k8s.io/autoscaler/cluster-autoscaler/config/dynamic" + "k8s.io/autoscaler/cluster-autoscaler/context" "k8s.io/autoscaler/cluster-autoscaler/simulator" "k8s.io/autoscaler/cluster-autoscaler/utils/errors" kube_util "k8s.io/autoscaler/cluster-autoscaler/utils/kubernetes" + "k8s.io/autoscaler/cluster-autoscaler/utils/pods" kube_client "k8s.io/client-go/kubernetes" kube_record "k8s.io/client-go/tools/record" ) @@ -34,23 +36,25 @@ type AutoscalerBuilder interface { // AutoscalerBuilderImpl builds new autoscalers from its state including initial `AutoscalingOptions` given at startup and // `dynamic.Config` read on demand from the configmap type AutoscalerBuilderImpl struct { - autoscalingOptions AutoscalingOptions + autoscalingOptions context.AutoscalingOptions dynamicConfig *dynamic.Config kubeClient kube_client.Interface kubeEventRecorder kube_record.EventRecorder predicateChecker *simulator.PredicateChecker listerRegistry kube_util.ListerRegistry + podListProcessor pods.PodListProcessor } // NewAutoscalerBuilder builds an AutoscalerBuilder from required parameters -func NewAutoscalerBuilder(autoscalingOptions AutoscalingOptions, predicateChecker *simulator.PredicateChecker, - kubeClient kube_client.Interface, kubeEventRecorder kube_record.EventRecorder, listerRegistry kube_util.ListerRegistry) *AutoscalerBuilderImpl { +func NewAutoscalerBuilder(autoscalingOptions context.AutoscalingOptions, predicateChecker *simulator.PredicateChecker, + kubeClient kube_client.Interface, kubeEventRecorder kube_record.EventRecorder, listerRegistry kube_util.ListerRegistry, podListProcessor pods.PodListProcessor) *AutoscalerBuilderImpl { return &AutoscalerBuilderImpl{ autoscalingOptions: autoscalingOptions, kubeClient: kubeClient, kubeEventRecorder: kubeEventRecorder, predicateChecker: predicateChecker, listerRegistry: listerRegistry, + podListProcessor: podListProcessor, } } @@ -68,5 +72,5 @@ func (b *AutoscalerBuilderImpl) Build() (Autoscaler, errors.AutoscalerError) { c := *(b.dynamicConfig) options.NodeGroups = c.NodeGroupSpecStrings() } - return NewStaticAutoscaler(options, b.predicateChecker, b.kubeClient, b.kubeEventRecorder, b.listerRegistry) + return NewStaticAutoscaler(options, b.predicateChecker, b.kubeClient, b.kubeEventRecorder, b.listerRegistry, b.podListProcessor) } diff --git a/cluster-autoscaler/core/autoscaler_test.go b/cluster-autoscaler/core/autoscaler_test.go index 1cfe074a54..a026fd8123 100644 --- a/cluster-autoscaler/core/autoscaler_test.go +++ b/cluster-autoscaler/core/autoscaler_test.go @@ -60,14 +60,18 @@ func TestNewAutoscalerStatic(t *testing.T) { return true, nil, fmt.Errorf("Wrong node: %v", getAction.GetName()) }) kubeEventRecorder := kube_util.CreateEventRecorder(fakeClient) + predicateChecker := simulator.NewTestPredicateChecker() + listerRegistry := kube_util.NewListerRegistry(nil, nil, nil, nil, nil, nil) opts := AutoscalerOptions{ ConfigFetcherOptions: dynamic.ConfigFetcherOptions{ ConfigMapName: "", }, + PredicateChecker: predicateChecker, + KubeClient: fakeClient, + KubeEventRecorder: kubeEventRecorder, + ListerRegistry: listerRegistry, } - predicateChecker := simulator.NewTestPredicateChecker() - listerRegistry := kube_util.NewListerRegistry(nil, nil, nil, nil, nil, nil) - a, _ := NewAutoscaler(opts, predicateChecker, fakeClient, kubeEventRecorder, listerRegistry) + a, _ := NewAutoscaler(opts) assert.IsType(t, &StaticAutoscaler{}, a) } @@ -97,13 +101,17 @@ func TestNewAutoscalerDynamic(t *testing.T) { return true, nil, fmt.Errorf("Wrong node: %v", getAction.GetName()) }) kubeEventRecorder := kube_util.CreateEventRecorder(fakeClient) + predicateChecker := simulator.NewTestPredicateChecker() + listerRegistry := kube_util.NewListerRegistry(nil, nil, nil, nil, nil, nil) opts := AutoscalerOptions{ ConfigFetcherOptions: dynamic.ConfigFetcherOptions{ ConfigMapName: "testconfigmap", }, + PredicateChecker: predicateChecker, + KubeClient: fakeClient, + KubeEventRecorder: kubeEventRecorder, + ListerRegistry: listerRegistry, } - predicateChecker := simulator.NewTestPredicateChecker() - listerRegistry := kube_util.NewListerRegistry(nil, nil, nil, nil, nil, nil) - a, _ := NewAutoscaler(opts, predicateChecker, fakeClient, kubeEventRecorder, listerRegistry) + a, _ := NewAutoscaler(opts) assert.IsType(t, &DynamicAutoscaler{}, a) } diff --git a/cluster-autoscaler/core/scale_down.go b/cluster-autoscaler/core/scale_down.go index 20daeda08b..2ee66db971 100644 --- a/cluster-autoscaler/core/scale_down.go +++ b/cluster-autoscaler/core/scale_down.go @@ -28,6 +28,7 @@ import ( "k8s.io/autoscaler/cluster-autoscaler/clusterstate" "k8s.io/autoscaler/cluster-autoscaler/clusterstate/utils" "k8s.io/autoscaler/cluster-autoscaler/config" + "k8s.io/autoscaler/cluster-autoscaler/context" "k8s.io/autoscaler/cluster-autoscaler/metrics" "k8s.io/autoscaler/cluster-autoscaler/simulator" "k8s.io/autoscaler/cluster-autoscaler/utils/deletetaint" @@ -101,7 +102,7 @@ func (n *NodeDeleteStatus) SetDeleteInProgress(status bool) { // ScaleDown is responsible for maintaining the state needed to perform unneeded node removals. type ScaleDown struct { - context *AutoscalingContext + context *context.AutoscalingContext unneededNodes map[string]time.Time unneededNodesList []*apiv1.Node unremovableNodes map[string]time.Time @@ -112,7 +113,7 @@ type ScaleDown struct { } // NewScaleDown builds new ScaleDown object. -func NewScaleDown(context *AutoscalingContext) *ScaleDown { +func NewScaleDown(context *context.AutoscalingContext) *ScaleDown { return &ScaleDown{ context: context, unneededNodes: make(map[string]time.Time), @@ -648,7 +649,7 @@ func (sd *ScaleDown) waitForEmptyNodesDeleted(emptyNodes []*apiv1.Node, confirma return finalError } -func deleteNode(context *AutoscalingContext, node *apiv1.Node, pods []*apiv1.Pod) errors.AutoscalerError { +func deleteNode(context *context.AutoscalingContext, node *apiv1.Node, pods []*apiv1.Pod) errors.AutoscalerError { deleteSuccessful := false drainSuccessful := false diff --git a/cluster-autoscaler/core/scale_down_test.go b/cluster-autoscaler/core/scale_down_test.go index 1145c6245c..02300d5f74 100644 --- a/cluster-autoscaler/core/scale_down_test.go +++ b/cluster-autoscaler/core/scale_down_test.go @@ -33,6 +33,7 @@ import ( "k8s.io/autoscaler/cluster-autoscaler/clusterstate" "k8s.io/autoscaler/cluster-autoscaler/clusterstate/utils" "k8s.io/autoscaler/cluster-autoscaler/config" + "k8s.io/autoscaler/cluster-autoscaler/context" "k8s.io/autoscaler/cluster-autoscaler/simulator" kube_util "k8s.io/autoscaler/cluster-autoscaler/utils/kubernetes" scheduler_util "k8s.io/autoscaler/cluster-autoscaler/utils/scheduler" @@ -123,8 +124,8 @@ func TestFindUnneededNodes(t *testing.T) { provider.AddNode("ng1", n8) provider.AddNode("ng1", n9) - context := AutoscalingContext{ - AutoscalingOptions: AutoscalingOptions{ + context := context.AutoscalingContext{ + AutoscalingOptions: context.AutoscalingOptions{ ScaleDownUtilizationThreshold: 0.35, ExpendablePodsPriorityCutoff: 10, }, @@ -242,8 +243,8 @@ func TestPodsWithPrioritiesFindUnneededNodes(t *testing.T) { provider.AddNode("ng1", n3) provider.AddNode("ng1", n4) - context := AutoscalingContext{ - AutoscalingOptions: AutoscalingOptions{ + context := context.AutoscalingContext{ + AutoscalingOptions: context.AutoscalingOptions{ ScaleDownUtilizationThreshold: 0.35, ExpendablePodsPriorityCutoff: 10, }, @@ -298,8 +299,8 @@ func TestFindUnneededMaxCandidates(t *testing.T) { numCandidates := 30 - context := AutoscalingContext{ - AutoscalingOptions: AutoscalingOptions{ + context := context.AutoscalingContext{ + AutoscalingOptions: context.AutoscalingOptions{ ScaleDownUtilizationThreshold: 0.35, ScaleDownNonEmptyCandidatesCount: numCandidates, ScaleDownCandidatesPoolRatio: 1, @@ -371,8 +372,8 @@ func TestFindUnneededEmptyNodes(t *testing.T) { numCandidates := 30 - context := AutoscalingContext{ - AutoscalingOptions: AutoscalingOptions{ + context := context.AutoscalingContext{ + AutoscalingOptions: context.AutoscalingOptions{ ScaleDownUtilizationThreshold: 0.35, ScaleDownNonEmptyCandidatesCount: numCandidates, ScaleDownCandidatesPoolRatio: 1.0, @@ -422,8 +423,8 @@ func TestFindUnneededNodePool(t *testing.T) { numCandidates := 30 - context := AutoscalingContext{ - AutoscalingOptions: AutoscalingOptions{ + context := context.AutoscalingContext{ + AutoscalingOptions: context.AutoscalingOptions{ ScaleDownUtilizationThreshold: 0.35, ScaleDownNonEmptyCandidatesCount: numCandidates, ScaleDownCandidatesPoolRatio: 0.1, @@ -569,8 +570,8 @@ func TestDeleteNode(t *testing.T) { fakeLogRecorder, _ := utils.NewStatusMapRecorder(fakeClient, "kube-system", fakeRecorder, false) // build context - context := &AutoscalingContext{ - AutoscalingOptions: AutoscalingOptions{}, + context := &context.AutoscalingContext{ + AutoscalingOptions: context.AutoscalingOptions{}, ClientSet: fakeClient, Recorder: fakeRecorder, LogRecorder: fakeLogRecorder, @@ -755,8 +756,8 @@ func TestScaleDown(t *testing.T) { fakeRecorder := kube_util.CreateEventRecorder(fakeClient) fakeLogRecorder, _ := utils.NewStatusMapRecorder(fakeClient, "kube-system", fakeRecorder, false) - context := &AutoscalingContext{ - AutoscalingOptions: AutoscalingOptions{ + context := &context.AutoscalingContext{ + AutoscalingOptions: context.AutoscalingOptions{ ScaleDownUtilizationThreshold: 0.5, ScaleDownUnneededTime: time.Minute, MaxGracefulTerminationSec: 60, @@ -811,7 +812,7 @@ func assertSubset(t *testing.T, a []string, b []string) { } } -var defaultScaleDownOptions = AutoscalingOptions{ +var defaultScaleDownOptions = context.AutoscalingOptions{ ScaleDownUtilizationThreshold: 0.5, ScaleDownUnneededTime: time.Minute, MaxGracefulTerminationSec: 60, @@ -947,7 +948,7 @@ func simpleScaleDownEmpty(t *testing.T, config *scaleTestConfig) { fakeRecorder := kube_util.CreateEventRecorder(fakeClient) fakeLogRecorder, _ := utils.NewStatusMapRecorder(fakeClient, "kube-system", fakeRecorder, false) - context := &AutoscalingContext{ + context := &context.AutoscalingContext{ AutoscalingOptions: config.options, PredicateChecker: simulator.NewTestPredicateChecker(), CloudProvider: provider, @@ -1024,8 +1025,8 @@ func TestNoScaleDownUnready(t *testing.T) { fakeRecorder := kube_util.CreateEventRecorder(fakeClient) fakeLogRecorder, _ := utils.NewStatusMapRecorder(fakeClient, "kube-system", fakeRecorder, false) - context := &AutoscalingContext{ - AutoscalingOptions: AutoscalingOptions{ + context := &context.AutoscalingContext{ + AutoscalingOptions: context.AutoscalingOptions{ ScaleDownUtilizationThreshold: 0.5, ScaleDownUnneededTime: time.Minute, ScaleDownUnreadyTime: time.Hour, @@ -1132,8 +1133,8 @@ func TestScaleDownNoMove(t *testing.T) { fakeRecorder := kube_util.CreateEventRecorder(fakeClient) fakeLogRecorder, _ := utils.NewStatusMapRecorder(fakeClient, "kube-system", fakeRecorder, false) - context := &AutoscalingContext{ - AutoscalingOptions: AutoscalingOptions{ + context := &context.AutoscalingContext{ + AutoscalingOptions: context.AutoscalingOptions{ ScaleDownUtilizationThreshold: 0.5, ScaleDownUnneededTime: time.Minute, ScaleDownUnreadyTime: time.Hour, diff --git a/cluster-autoscaler/core/scale_up.go b/cluster-autoscaler/core/scale_up.go index 21be310464..f4ae4e5c99 100644 --- a/cluster-autoscaler/core/scale_up.go +++ b/cluster-autoscaler/core/scale_up.go @@ -25,6 +25,7 @@ import ( "k8s.io/apimachinery/pkg/api/resource" "k8s.io/autoscaler/cluster-autoscaler/cloudprovider" "k8s.io/autoscaler/cluster-autoscaler/clusterstate" + "k8s.io/autoscaler/cluster-autoscaler/context" "k8s.io/autoscaler/cluster-autoscaler/estimator" "k8s.io/autoscaler/cluster-autoscaler/expander" "k8s.io/autoscaler/cluster-autoscaler/metrics" @@ -42,7 +43,7 @@ import ( // ScaleUp tries to scale the cluster up. Return true if it found a way to increase the size, // false if it didn't and error if an error occurred. Assumes that all nodes in the cluster are // ready and in sync with instance groups. -func ScaleUp(context *AutoscalingContext, unschedulablePods []*apiv1.Pod, nodes []*apiv1.Node, +func ScaleUp(context *context.AutoscalingContext, unschedulablePods []*apiv1.Pod, nodes []*apiv1.Node, daemonSets []*extensionsv1.DaemonSet) (bool, errors.AutoscalerError) { // From now on we only care about unschedulable pods that were marked after the newest // node became available for the scheduler. @@ -340,7 +341,7 @@ groupsloop: return result } -func executeScaleUp(context *AutoscalingContext, info nodegroupset.ScaleUpInfo) errors.AutoscalerError { +func executeScaleUp(context *context.AutoscalingContext, info nodegroupset.ScaleUpInfo) errors.AutoscalerError { glog.V(0).Infof("Scale-up: setting group %s size to %d", info.Group.Id(), info.NewSize) increase := info.NewSize - info.CurrentSize if err := info.Group.IncreaseSize(increase); err != nil { @@ -362,7 +363,7 @@ func executeScaleUp(context *AutoscalingContext, info nodegroupset.ScaleUpInfo) return nil } -func addAutoprovisionedCandidates(context *AutoscalingContext, nodeGroups []cloudprovider.NodeGroup, +func addAutoprovisionedCandidates(context *context.AutoscalingContext, nodeGroups []cloudprovider.NodeGroup, nodeInfos map[string]*schedulercache.NodeInfo, unschedulablePods []*apiv1.Pod) ([]cloudprovider.NodeGroup, map[string]*schedulercache.NodeInfo) { @@ -400,7 +401,7 @@ func addAutoprovisionedCandidates(context *AutoscalingContext, nodeGroups []clou return nodeGroups, nodeInfos } -func addAllMachineTypesForConfig(context *AutoscalingContext, systemLabels map[string]string, extraResources map[string]resource.Quantity, +func addAllMachineTypesForConfig(context *context.AutoscalingContext, systemLabels map[string]string, extraResources map[string]resource.Quantity, nodeInfos map[string]*schedulercache.NodeInfo, unschedulablePods []*apiv1.Pod) []cloudprovider.NodeGroup { nodeGroups := make([]cloudprovider.NodeGroup, 0) diff --git a/cluster-autoscaler/core/scale_up_test.go b/cluster-autoscaler/core/scale_up_test.go index 75c4359a47..e3bde266e3 100644 --- a/cluster-autoscaler/core/scale_up_test.go +++ b/cluster-autoscaler/core/scale_up_test.go @@ -28,6 +28,7 @@ import ( "k8s.io/autoscaler/cluster-autoscaler/clusterstate" "k8s.io/autoscaler/cluster-autoscaler/clusterstate/utils" "k8s.io/autoscaler/cluster-autoscaler/config" + "k8s.io/autoscaler/cluster-autoscaler/context" "k8s.io/autoscaler/cluster-autoscaler/estimator" "k8s.io/autoscaler/cluster-autoscaler/expander/random" "k8s.io/autoscaler/cluster-autoscaler/simulator" @@ -67,10 +68,10 @@ type scaleTestConfig struct { expectedScaleUp string expectedScaleUpGroup string expectedScaleDowns []string - options AutoscalingOptions + options context.AutoscalingOptions } -var defaultOptions = AutoscalingOptions{ +var defaultOptions = context.AutoscalingOptions{ EstimatorName: estimator.BinpackingEstimatorName, MaxCoresTotal: config.DefaultMaxClusterCores, MaxMemoryTotal: config.DefaultMaxClusterMemory, @@ -231,7 +232,7 @@ func simpleScaleUpTest(t *testing.T, config *scaleTestConfig) { clusterState.UpdateNodes(nodes, time.Now()) - context := &AutoscalingContext{ + context := &context.AutoscalingContext{ AutoscalingOptions: config.options, PredicateChecker: simulator.NewTestPredicateChecker(), CloudProvider: provider, @@ -313,8 +314,8 @@ func TestScaleUpNodeComingNoScale(t *testing.T) { }) clusterState.UpdateNodes([]*apiv1.Node{n1, n2}, time.Now()) - context := &AutoscalingContext{ - AutoscalingOptions: AutoscalingOptions{ + context := &context.AutoscalingContext{ + AutoscalingOptions: context.AutoscalingOptions{ EstimatorName: estimator.BinpackingEstimatorName, MaxCoresTotal: config.DefaultMaxClusterCores, MaxMemoryTotal: config.DefaultMaxClusterMemory, @@ -380,7 +381,7 @@ func TestScaleUpNodeComingHasScale(t *testing.T) { }) clusterState.UpdateNodes([]*apiv1.Node{n1, n2}, time.Now()) - context := &AutoscalingContext{ + context := &context.AutoscalingContext{ AutoscalingOptions: defaultOptions, PredicateChecker: simulator.NewTestPredicateChecker(), CloudProvider: provider, @@ -436,8 +437,8 @@ func TestScaleUpUnhealthy(t *testing.T) { fakeLogRecorder, _ := utils.NewStatusMapRecorder(fakeClient, "kube-system", fakeRecorder, false) clusterState := clusterstate.NewClusterStateRegistry(provider, clusterstate.ClusterStateRegistryConfig{}, fakeLogRecorder) clusterState.UpdateNodes([]*apiv1.Node{n1, n2}, time.Now()) - context := &AutoscalingContext{ - AutoscalingOptions: AutoscalingOptions{ + context := &context.AutoscalingContext{ + AutoscalingOptions: context.AutoscalingOptions{ EstimatorName: estimator.BinpackingEstimatorName, MaxCoresTotal: config.DefaultMaxClusterCores, MaxMemoryTotal: config.DefaultMaxClusterMemory, @@ -487,8 +488,8 @@ func TestScaleUpNoHelp(t *testing.T) { fakeLogRecorder, _ := utils.NewStatusMapRecorder(fakeClient, "kube-system", kube_record.NewFakeRecorder(5), false) clusterState := clusterstate.NewClusterStateRegistry(provider, clusterstate.ClusterStateRegistryConfig{}, fakeLogRecorder) clusterState.UpdateNodes([]*apiv1.Node{n1}, time.Now()) - context := &AutoscalingContext{ - AutoscalingOptions: AutoscalingOptions{ + context := &context.AutoscalingContext{ + AutoscalingOptions: context.AutoscalingOptions{ EstimatorName: estimator.BinpackingEstimatorName, MaxCoresTotal: config.DefaultMaxClusterCores, MaxMemoryTotal: config.DefaultMaxClusterMemory, @@ -567,8 +568,8 @@ func TestScaleUpBalanceGroups(t *testing.T) { fakeLogRecorder, _ := utils.NewStatusMapRecorder(fakeClient, "kube-system", kube_record.NewFakeRecorder(5), false) clusterState := clusterstate.NewClusterStateRegistry(provider, clusterstate.ClusterStateRegistryConfig{}, fakeLogRecorder) clusterState.UpdateNodes(nodes, time.Now()) - context := &AutoscalingContext{ - AutoscalingOptions: AutoscalingOptions{ + context := &context.AutoscalingContext{ + AutoscalingOptions: context.AutoscalingOptions{ EstimatorName: estimator.BinpackingEstimatorName, BalanceSimilarNodeGroups: true, MaxCoresTotal: config.DefaultMaxClusterCores, @@ -630,8 +631,8 @@ func TestScaleUpAutoprovisionedNodeGroup(t *testing.T) { fakeLogRecorder, _ := utils.NewStatusMapRecorder(fakeClient, "kube-system", fakeRecorder, false) clusterState := clusterstate.NewClusterStateRegistry(provider, clusterstate.ClusterStateRegistryConfig{}, fakeLogRecorder) - context := &AutoscalingContext{ - AutoscalingOptions: AutoscalingOptions{ + context := &context.AutoscalingContext{ + AutoscalingOptions: context.AutoscalingOptions{ EstimatorName: estimator.BinpackingEstimatorName, MaxCoresTotal: 5000 * 64, MaxMemoryTotal: 5000 * 64 * 20, @@ -669,8 +670,8 @@ func TestAddAutoprovisionedCandidatesOK(t *testing.T) { []string{"T1"}, map[string]*schedulercache.NodeInfo{"T1": ti1}) provider.AddNodeGroup("ng1", 1, 5, 3) - context := &AutoscalingContext{ - AutoscalingOptions: AutoscalingOptions{ + context := &context.AutoscalingContext{ + AutoscalingOptions: context.AutoscalingOptions{ MaxAutoprovisionedNodeGroupCount: 1, }, CloudProvider: provider, @@ -702,8 +703,8 @@ func TestAddAutoprovisionedCandidatesToMany(t *testing.T) { map[string]*schedulercache.NodeInfo{"T1": ti1, "X1": xi1}) provider.AddAutoprovisionedNodeGroup("autoprovisioned-X1", 0, 1000, 0, "X1") - context := &AutoscalingContext{ - AutoscalingOptions: AutoscalingOptions{ + context := &context.AutoscalingContext{ + AutoscalingOptions: context.AutoscalingOptions{ MaxAutoprovisionedNodeGroupCount: 1, }, CloudProvider: provider, diff --git a/cluster-autoscaler/core/static_autoscaler.go b/cluster-autoscaler/core/static_autoscaler.go index 28889cbd45..570a8ad378 100644 --- a/cluster-autoscaler/core/static_autoscaler.go +++ b/cluster-autoscaler/core/static_autoscaler.go @@ -21,11 +21,13 @@ import ( "k8s.io/autoscaler/cluster-autoscaler/cloudprovider" "k8s.io/autoscaler/cluster-autoscaler/clusterstate/utils" + "k8s.io/autoscaler/cluster-autoscaler/context" "k8s.io/autoscaler/cluster-autoscaler/metrics" "k8s.io/autoscaler/cluster-autoscaler/simulator" "k8s.io/autoscaler/cluster-autoscaler/utils/errors" "k8s.io/autoscaler/cluster-autoscaler/utils/gpu" kube_util "k8s.io/autoscaler/cluster-autoscaler/utils/kubernetes" + "k8s.io/autoscaler/cluster-autoscaler/utils/pods" "k8s.io/autoscaler/cluster-autoscaler/utils/tpu" apiv1 "k8s.io/api/core/v1" @@ -49,18 +51,20 @@ const ( // StaticAutoscaler is an autoscaler which has all the core functionality of a CA but without the reconfiguration feature type StaticAutoscaler struct { // AutoscalingContext consists of validated settings and options for this autoscaler - *AutoscalingContext + *context.AutoscalingContext kube_util.ListerRegistry startTime time.Time lastScaleUpTime time.Time lastScaleDownDeleteTime time.Time lastScaleDownFailTime time.Time scaleDown *ScaleDown + podListProcessor pods.PodListProcessor } // NewStaticAutoscaler creates an instance of Autoscaler filled with provided parameters -func NewStaticAutoscaler(opts AutoscalingOptions, predicateChecker *simulator.PredicateChecker, - kubeClient kube_client.Interface, kubeEventRecorder kube_record.EventRecorder, listerRegistry kube_util.ListerRegistry) (*StaticAutoscaler, errors.AutoscalerError) { +func NewStaticAutoscaler(opts context.AutoscalingOptions, predicateChecker *simulator.PredicateChecker, + kubeClient kube_client.Interface, kubeEventRecorder kube_record.EventRecorder, listerRegistry kube_util.ListerRegistry, + podListProcessor pods.PodListProcessor) (*StaticAutoscaler, errors.AutoscalerError) { logRecorder, err := utils.NewStatusMapRecorder(kubeClient, opts.ConfigNamespace, kubeEventRecorder, opts.WriteStatusConfigMap) if err != nil { glog.Error("Failed to initialize status configmap, unable to write status events") @@ -68,7 +72,7 @@ func NewStaticAutoscaler(opts AutoscalingOptions, predicateChecker *simulator.Pr // TODO(maciekpytel): recover from this after successful status configmap update? logRecorder, _ = utils.NewStatusMapRecorder(kubeClient, opts.ConfigNamespace, kubeEventRecorder, false) } - autoscalingContext, errctx := NewAutoscalingContext(opts, predicateChecker, kubeClient, kubeEventRecorder, logRecorder, listerRegistry) + autoscalingContext, errctx := context.NewAutoscalingContext(opts, predicateChecker, kubeClient, kubeEventRecorder, logRecorder, listerRegistry) if errctx != nil { return nil, errctx } @@ -83,6 +87,7 @@ func NewStaticAutoscaler(opts AutoscalingOptions, predicateChecker *simulator.Pr lastScaleDownDeleteTime: time.Now(), lastScaleDownFailTime: time.Now(), scaleDown: scaleDown, + podListProcessor: podListProcessor, }, nil } @@ -236,6 +241,12 @@ func (a *StaticAutoscaler) RunOnce(currentTime time.Time) errors.AutoscalerError return errors.ToAutoscalerError(errors.ApiCallError, err) } + allUnschedulablePods, allScheduled, err = a.podListProcessor.Process(a.AutoscalingContext, allUnschedulablePods, allScheduled, allNodes) + if err != nil { + glog.Errorf("Failed to process pod list: %v", err) + return errors.ToAutoscalerError(errors.InternalError, err) + } + ConfigurePredicateCheckerForLoop(allUnschedulablePods, allScheduled, a.PredicateChecker) // We need to check whether pods marked as unschedulable are actually unschedulable. diff --git a/cluster-autoscaler/core/static_autoscaler_test.go b/cluster-autoscaler/core/static_autoscaler_test.go index 4fc20662be..b35450dd70 100644 --- a/cluster-autoscaler/core/static_autoscaler_test.go +++ b/cluster-autoscaler/core/static_autoscaler_test.go @@ -24,10 +24,12 @@ import ( testprovider "k8s.io/autoscaler/cluster-autoscaler/cloudprovider/test" "k8s.io/autoscaler/cluster-autoscaler/clusterstate" "k8s.io/autoscaler/cluster-autoscaler/clusterstate/utils" + "k8s.io/autoscaler/cluster-autoscaler/context" "k8s.io/autoscaler/cluster-autoscaler/estimator" "k8s.io/autoscaler/cluster-autoscaler/expander/random" "k8s.io/autoscaler/cluster-autoscaler/simulator" kube_util "k8s.io/autoscaler/cluster-autoscaler/utils/kubernetes" + "k8s.io/autoscaler/cluster-autoscaler/utils/pods" scheduler_util "k8s.io/autoscaler/cluster-autoscaler/utils/scheduler" . "k8s.io/autoscaler/cluster-autoscaler/utils/test" @@ -168,8 +170,8 @@ func TestStaticAutoscalerRunOnce(t *testing.T) { clusterState := clusterstate.NewClusterStateRegistry(provider, clusterStateConfig, fakeLogRecorder) clusterState.UpdateNodes([]*apiv1.Node{n1, n2}, time.Now()) - context := &AutoscalingContext{ - AutoscalingOptions: AutoscalingOptions{ + context := &context.AutoscalingContext{ + AutoscalingOptions: context.AutoscalingOptions{ EstimatorName: estimator.BinpackingEstimatorName, ScaleDownEnabled: true, ScaleDownUtilizationThreshold: 0.5, @@ -197,7 +199,8 @@ func TestStaticAutoscalerRunOnce(t *testing.T) { ListerRegistry: listerRegistry, lastScaleUpTime: time.Now(), lastScaleDownFailTime: time.Now(), - scaleDown: sd} + scaleDown: sd, + podListProcessor: pods.NewDefaultPodListProcessor()} // MaxNodesTotal reached. readyNodeListerMock.On("List").Return([]*apiv1.Node{n1}, nil).Once() @@ -344,8 +347,8 @@ func TestStaticAutoscalerRunOnceWithAutoprovisionedEnabled(t *testing.T) { clusterState := clusterstate.NewClusterStateRegistry(provider, clusterStateConfig, fakeLogRecorder) clusterState.UpdateNodes([]*apiv1.Node{n1}, time.Now()) - context := &AutoscalingContext{ - AutoscalingOptions: AutoscalingOptions{ + context := &context.AutoscalingContext{ + AutoscalingOptions: context.AutoscalingOptions{ EstimatorName: estimator.BinpackingEstimatorName, ScaleDownEnabled: true, ScaleDownUtilizationThreshold: 0.5, @@ -375,7 +378,8 @@ func TestStaticAutoscalerRunOnceWithAutoprovisionedEnabled(t *testing.T) { ListerRegistry: listerRegistry, lastScaleUpTime: time.Now(), lastScaleDownFailTime: time.Now(), - scaleDown: sd} + scaleDown: sd, + podListProcessor: pods.NewDefaultPodListProcessor()} // Scale up. readyNodeListerMock.On("List").Return([]*apiv1.Node{n1}, nil).Once() @@ -481,8 +485,8 @@ func TestStaticAutoscalerRunOnceWithALongUnregisteredNode(t *testing.T) { // broken node failed to register in time clusterState.UpdateNodes([]*apiv1.Node{n1}, later) - context := &AutoscalingContext{ - AutoscalingOptions: AutoscalingOptions{ + context := &context.AutoscalingContext{ + AutoscalingOptions: context.AutoscalingOptions{ EstimatorName: estimator.BinpackingEstimatorName, ScaleDownEnabled: true, ScaleDownUtilizationThreshold: 0.5, @@ -511,7 +515,8 @@ func TestStaticAutoscalerRunOnceWithALongUnregisteredNode(t *testing.T) { ListerRegistry: listerRegistry, lastScaleUpTime: time.Now(), lastScaleDownFailTime: time.Now(), - scaleDown: sd} + scaleDown: sd, + podListProcessor: pods.NewDefaultPodListProcessor()} // Scale up. readyNodeListerMock.On("List").Return([]*apiv1.Node{n1}, nil).Once() @@ -617,8 +622,8 @@ func TestStaticAutoscalerRunOncePodsWithPriorities(t *testing.T) { clusterState := clusterstate.NewClusterStateRegistry(provider, clusterStateConfig, fakeLogRecorder) clusterState.UpdateNodes([]*apiv1.Node{n1, n2}, time.Now()) - context := &AutoscalingContext{ - AutoscalingOptions: AutoscalingOptions{ + context := &context.AutoscalingContext{ + AutoscalingOptions: context.AutoscalingOptions{ EstimatorName: estimator.BinpackingEstimatorName, ScaleDownEnabled: true, ScaleDownUtilizationThreshold: 0.5, @@ -647,7 +652,8 @@ func TestStaticAutoscalerRunOncePodsWithPriorities(t *testing.T) { ListerRegistry: listerRegistry, lastScaleUpTime: time.Now(), lastScaleDownFailTime: time.Now(), - scaleDown: sd} + scaleDown: sd, + podListProcessor: pods.NewDefaultPodListProcessor()} // Scale up readyNodeListerMock.On("List").Return([]*apiv1.Node{n1, n2, n3}, nil).Once() diff --git a/cluster-autoscaler/core/utils.go b/cluster-autoscaler/core/utils.go index ce01d49e56..c51a2f3137 100644 --- a/cluster-autoscaler/core/utils.go +++ b/cluster-autoscaler/core/utils.go @@ -26,6 +26,7 @@ import ( "k8s.io/autoscaler/cluster-autoscaler/cloudprovider" "k8s.io/autoscaler/cluster-autoscaler/clusterstate" "k8s.io/autoscaler/cluster-autoscaler/clusterstate/utils" + "k8s.io/autoscaler/cluster-autoscaler/context" "k8s.io/autoscaler/cluster-autoscaler/metrics" "k8s.io/autoscaler/cluster-autoscaler/simulator" "k8s.io/autoscaler/cluster-autoscaler/utils/daemonset" @@ -322,7 +323,7 @@ func sanitizeTemplateNode(node *apiv1.Node, nodeGroup string) (*apiv1.Node, erro } // Removes unregistered nodes if needed. Returns true if anything was removed and error if such occurred. -func removeOldUnregisteredNodes(unregisteredNodes []clusterstate.UnregisteredNode, context *AutoscalingContext, +func removeOldUnregisteredNodes(unregisteredNodes []clusterstate.UnregisteredNode, context *context.AutoscalingContext, currentTime time.Time, logRecorder *utils.LogEventRecorder) (bool, error) { removedAny := false for _, unregisteredNode := range unregisteredNodes { @@ -362,7 +363,7 @@ func removeOldUnregisteredNodes(unregisteredNodes []clusterstate.UnregisteredNod // Sets the target size of node groups to the current number of nodes in them // if the difference was constant for a prolonged time. Returns true if managed // to fix something. -func fixNodeGroupSize(context *AutoscalingContext, currentTime time.Time) (bool, error) { +func fixNodeGroupSize(context *context.AutoscalingContext, currentTime time.Time) (bool, error) { fixed := false for _, nodeGroup := range context.CloudProvider.NodeGroups() { incorrectSize := context.ClusterStateRegistry.GetIncorrectNodeGroupSize(nodeGroup.Id()) @@ -389,7 +390,7 @@ func fixNodeGroupSize(context *AutoscalingContext, currentTime time.Time) (bool, // getPotentiallyUnneededNodes returns nodes that are: // - managed by the cluster autoscaler // - in groups with size > min size -func getPotentiallyUnneededNodes(context *AutoscalingContext, nodes []*apiv1.Node) []*apiv1.Node { +func getPotentiallyUnneededNodes(context *context.AutoscalingContext, nodes []*apiv1.Node) []*apiv1.Node { result := make([]*apiv1.Node, 0, len(nodes)) nodeGroupSize := getNodeGroupSizeMap(context.CloudProvider) diff --git a/cluster-autoscaler/core/utils_test.go b/cluster-autoscaler/core/utils_test.go index cc42eb1222..968fc9c77d 100644 --- a/cluster-autoscaler/core/utils_test.go +++ b/cluster-autoscaler/core/utils_test.go @@ -24,6 +24,7 @@ import ( testprovider "k8s.io/autoscaler/cluster-autoscaler/cloudprovider/test" "k8s.io/autoscaler/cluster-autoscaler/clusterstate" "k8s.io/autoscaler/cluster-autoscaler/clusterstate/utils" + "k8s.io/autoscaler/cluster-autoscaler/context" "k8s.io/autoscaler/cluster-autoscaler/simulator" "k8s.io/autoscaler/cluster-autoscaler/utils/deletetaint" scheduler_util "k8s.io/autoscaler/cluster-autoscaler/utils/scheduler" @@ -330,8 +331,8 @@ func TestRemoveOldUnregisteredNodes(t *testing.T) { err := clusterState.UpdateNodes([]*apiv1.Node{ng1_1}, now.Add(-time.Hour)) assert.NoError(t, err) - context := &AutoscalingContext{ - AutoscalingOptions: AutoscalingOptions{ + context := &context.AutoscalingContext{ + AutoscalingOptions: context.AutoscalingOptions{ MaxNodeProvisionTime: 45 * time.Minute, }, CloudProvider: provider, @@ -428,8 +429,8 @@ func TestRemoveFixNodeTargetSize(t *testing.T) { err := clusterState.UpdateNodes([]*apiv1.Node{ng1_1}, now.Add(-time.Hour)) assert.NoError(t, err) - context := &AutoscalingContext{ - AutoscalingOptions: AutoscalingOptions{ + context := &context.AutoscalingContext{ + AutoscalingOptions: context.AutoscalingOptions{ MaxNodeProvisionTime: 45 * time.Minute, }, CloudProvider: provider, @@ -461,7 +462,7 @@ func TestGetPotentiallyUnneededNodes(t *testing.T) { provider.AddNode("ng1", ng1_2) provider.AddNode("ng2", ng2_1) - context := &AutoscalingContext{ + context := &context.AutoscalingContext{ CloudProvider: provider, } diff --git a/cluster-autoscaler/main.go b/cluster-autoscaler/main.go index 2f8d44c8ae..f2b5eeaa91 100644 --- a/cluster-autoscaler/main.go +++ b/cluster-autoscaler/main.go @@ -33,6 +33,7 @@ import ( cloudBuilder "k8s.io/autoscaler/cluster-autoscaler/cloudprovider/builder" "k8s.io/autoscaler/cluster-autoscaler/config" "k8s.io/autoscaler/cluster-autoscaler/config/dynamic" + "k8s.io/autoscaler/cluster-autoscaler/context" "k8s.io/autoscaler/cluster-autoscaler/core" "k8s.io/autoscaler/cluster-autoscaler/estimator" "k8s.io/autoscaler/cluster-autoscaler/expander" @@ -134,7 +135,7 @@ var ( regional = flag.Bool("regional", false, "Cluster is regional.") ) -func createAutoscalerOptions() core.AutoscalerOptions { +func createAutoscalingOptions() context.AutoscalingOptions { minCoresTotal, maxCoresTotal, err := parseMinMaxFlag(*coresTotal) if err != nil { glog.Fatalf("Failed to parse flags: %v", err) @@ -147,7 +148,7 @@ func createAutoscalerOptions() core.AutoscalerOptions { minMemoryTotal = minMemoryTotal * 1024 maxMemoryTotal = maxMemoryTotal * 1024 - autoscalingOpts := core.AutoscalingOptions{ + return context.AutoscalingOptions{ CloudConfig: *cloudConfig, CloudProviderName: *cloudProviderFlag, NodeGroupAutoDiscovery: nodeGroupAutoDiscoveryFlag, @@ -183,16 +184,13 @@ func createAutoscalerOptions() core.AutoscalerOptions { ExpendablePodsPriorityCutoff: *expendablePodsPriorityCutoff, Regional: *regional, } +} - configFetcherOpts := dynamic.ConfigFetcherOptions{ +func createConfigFetcherOptions() dynamic.ConfigFetcherOptions { + return dynamic.ConfigFetcherOptions{ ConfigMapName: *configMapName, Namespace: *namespace, } - - return core.AutoscalerOptions{ - AutoscalingOptions: autoscalingOpts, - ConfigFetcherOptions: configFetcherOpts, - } } func createKubeClient() kube_client.Interface { @@ -241,8 +239,8 @@ func run(healthCheck *metrics.HealthCheck) { metrics.RegisterAll() kubeClient := createKubeClient() kubeEventRecorder := kube_util.CreateEventRecorder(kubeClient) - opts := createAutoscalerOptions() - metrics.UpdateNapEnabled(opts.NodeAutoprovisioningEnabled) + autoscalingOptions := createAutoscalingOptions() + metrics.UpdateNapEnabled(autoscalingOptions.NodeAutoprovisioningEnabled) predicateCheckerStopChannel := make(chan struct{}) predicateChecker, err := simulator.NewPredicateChecker(kubeClient, predicateCheckerStopChannel) if err != nil { @@ -250,7 +248,16 @@ func run(healthCheck *metrics.HealthCheck) { } listerRegistryStopChannel := make(chan struct{}) listerRegistry := kube_util.NewListerRegistryWithDefaultListers(kubeClient, listerRegistryStopChannel) - autoscaler, err := core.NewAutoscaler(opts, predicateChecker, kubeClient, kubeEventRecorder, listerRegistry) + + opts := core.AutoscalerOptions{ + AutoscalingOptions: autoscalingOptions, + ConfigFetcherOptions: createConfigFetcherOptions(), + PredicateChecker: predicateChecker, + KubeClient: kubeClient, + KubeEventRecorder: kubeEventRecorder, + ListerRegistry: listerRegistry, + } + autoscaler, err := core.NewAutoscaler(opts) if err != nil { glog.Fatalf("Failed to create autoscaler: %v", err) } diff --git a/cluster-autoscaler/utils/pods/pod_list_processor.go b/cluster-autoscaler/utils/pods/pod_list_processor.go new file mode 100644 index 0000000000..f835c4bee4 --- /dev/null +++ b/cluster-autoscaler/utils/pods/pod_list_processor.go @@ -0,0 +1,41 @@ +/* +Copyright 2018 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package pods + +import ( + apiv1 "k8s.io/api/core/v1" + "k8s.io/autoscaler/cluster-autoscaler/context" +) + +// PodListProcessor processes lists of unschedulable and sheduled pods before scaling of the cluster. +type PodListProcessor interface { + Process(context *context.AutoscalingContext, unschedulablePods []*apiv1.Pod, allScheduled []*apiv1.Pod, nodes []*apiv1.Node) ([]*apiv1.Pod, []*apiv1.Pod, error) +} + +// NoOpPodListProcessor is returning pod lists without processing them. +type NoOpPodListProcessor struct { +} + +// NewDefaultPodListProcessor creates an instance of PodListProcessor. +func NewDefaultPodListProcessor() PodListProcessor { + return &NoOpPodListProcessor{} +} + +// Process processes lists of unschedulable and sheduled pods before scaling of the cluster. +func (p *NoOpPodListProcessor) Process(context *context.AutoscalingContext, unschedulablePods []*apiv1.Pod, allScheduled []*apiv1.Pod, nodes []*apiv1.Node) ([]*apiv1.Pod, []*apiv1.Pod, error) { + return unschedulablePods, allScheduled, nil +} diff --git a/cluster-autoscaler/utils/pods/pod_list_processor_test.go b/cluster-autoscaler/utils/pods/pod_list_processor_test.go new file mode 100644 index 0000000000..76b29aafd6 --- /dev/null +++ b/cluster-autoscaler/utils/pods/pod_list_processor_test.go @@ -0,0 +1,44 @@ +/* +Copyright 2018 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package pods + +import ( + "testing" + + apiv1 "k8s.io/api/core/v1" + + "k8s.io/autoscaler/cluster-autoscaler/context" + . "k8s.io/autoscaler/cluster-autoscaler/utils/test" +) + +func TestPodListProcessor(t *testing.T) { + context := &context.AutoscalingContext{} + p1 := BuildTestPod("p1", 40, 0) + p2 := BuildTestPod("p2", 400, 0) + n1 := BuildTestNode("n1", 100, 1000) + n2 := BuildTestNode("n1", 100, 1000) + unschedulablePods := []*apiv1.Pod{p1} + allScheduled := []*apiv1.Pod{p2} + nodes := []*apiv1.Node{n1, n2} + podListProcessor := NewDefaultPodListProcessor() + gotUnschedulablePods, gotAllScheduled, err := podListProcessor.Process(context, unschedulablePods, allScheduled, nodes) + if len(gotUnschedulablePods) != 1 || len(gotAllScheduled) != 1 || err != nil { + t.Errorf("Error podListProcessor.Process() = %v, %v, %v want %v, %v, nil ", + gotUnschedulablePods, gotAllScheduled, err, unschedulablePods, allScheduled) + } + +}