diff --git a/cluster-autoscaler/core/scaleup/orchestrator/orchestrator.go b/cluster-autoscaler/core/scaleup/orchestrator/orchestrator.go index af11d17d87..7ec1b1f118 100644 --- a/cluster-autoscaler/core/scaleup/orchestrator/orchestrator.go +++ b/cluster-autoscaler/core/scaleup/orchestrator/orchestrator.go @@ -119,6 +119,13 @@ func (o *ScaleUpOrchestrator) ScaleUp( } } + useBinpackingLimiter := false + nodeGroupsWithExpansionOption := make(map[string]bool) + if o.processors.BinpackingLimiter != nil { + useBinpackingLimiter = true + o.processors.BinpackingLimiter.InitBinpacking(o.autoscalingContext, nodeGroups) + } + resourcesLeft, aErr := o.resourceManager.ResourcesLeft(o.autoscalingContext, nodeInfos, nodes) if aErr != nil { return scaleUpError(&status.ScaleUpStatus{}, aErr.AddPrefix("could not compute total resources: ")) @@ -144,7 +151,12 @@ func (o *ScaleUpOrchestrator) ScaleUp( continue } + nodeGroupsWithExpansionOption[nodeGroup.Id()] = true options = append(options, option) + + if useBinpackingLimiter && o.processors.BinpackingLimiter.StopBinpacking(o.autoscalingContext, options, nodeGroupsWithExpansionOption) { + break + } } if len(options) == 0 { diff --git a/cluster-autoscaler/core/scaleup/orchestrator/orchestrator_test.go b/cluster-autoscaler/core/scaleup/orchestrator/orchestrator_test.go index 90d7eeff20..5172aa0167 100644 --- a/cluster-autoscaler/core/scaleup/orchestrator/orchestrator_test.go +++ b/cluster-autoscaler/core/scaleup/orchestrator/orchestrator_test.go @@ -805,6 +805,69 @@ func TestScaleUpUnhealthy(t *testing.T) { assert.False(t, scaleUpStatus.WasSuccessful()) } +func TestBinpackingLimiter(t *testing.T) { + n1 := BuildTestNode("n1", 1000, 1000) + n2 := BuildTestNode("n2", 100000, 100000) + now := time.Now() + + SetNodeReadyState(n1, true, now.Add(-2*time.Minute)) + SetNodeReadyState(n2, true, now.Add(-2*time.Minute)) + + nodes := []*apiv1.Node{n1, n2} + + podLister := kube_util.NewTestPodLister([]*apiv1.Pod{}) + listers := kube_util.NewListerRegistry(nil, nil, podLister, nil, nil, nil, nil, nil, nil, nil) + + provider := testprovider.NewTestCloudProvider(func(nodeGroup string, increase int) error { + return nil + }, nil) + + options := defaultOptions + provider.AddNodeGroup("ng1", 1, 10, 1) + provider.AddNode("ng1", n1) + provider.AddNodeGroup("ng2", 1, 10, 1) + provider.AddNode("ng2", n2) + assert.NotNil(t, provider) + + context, err := NewScaleTestAutoscalingContext(options, &fake.Clientset{}, listers, provider, nil, nil) + assert.NoError(t, err) + + nodeInfos, err := nodeinfosprovider.NewDefaultTemplateNodeInfoProvider(nil, false). + Process(&context, nodes, []*appsv1.DaemonSet{}, taints.TaintConfig{}, now) + assert.NoError(t, err) + + clusterState := clusterstate.NewClusterStateRegistry(provider, clusterstate.ClusterStateRegistryConfig{}, context.LogRecorder, NewBackoff(), clusterstate.NewStaticMaxNodeProvisionTimeProvider(15*time.Minute)) + clusterState.UpdateNodes(nodes, nodeInfos, time.Now()) + + extraPod := BuildTestPod("p-new", 500, 0) + + processors := NewTestProcessors(&context) + + // We should stop binpacking after finding expansion option from first node. + // It is because we know that extra pod (requiring 500 cpu) is a good fit on + // node1. The pod can also be scheduled on node n2 but it's not a good fit. + // Hence we can use heuristics with BinpackingLimiter to stop binpacking early + // to save time on computation. + processors.BinpackingLimiter = &MockBinpackingLimiter{} + processors.BinpackingLimiter.InitBinpacking(&context, []cloudprovider.NodeGroup{}) + + suOrchestrator := New() + suOrchestrator.Initialize(&context, processors, clusterState, taints.TaintConfig{}) + + expander := NewMockRepotingStrategy(t, &GroupSizeChange{GroupName: "ng1", SizeChange: 1}) + context.ExpanderStrategy = expander + + scaleUpStatus, err := suOrchestrator.ScaleUp([]*apiv1.Pod{extraPod}, nodes, []*appsv1.DaemonSet{}, nodeInfos) + processors.ScaleUpStatusProcessor.Process(&context, scaleUpStatus) + assert.NoError(t, err) + assert.True(t, scaleUpStatus.WasSuccessful()) + + expansionOptions := expander.LastInputOptions() + // Only 1 expansion option should be there. Without BinpackingLimiter there will be 2. + assert.True(t, len(expansionOptions) == 1) + assert.Equal(t, expansionOptions, []GroupSizeChange{{GroupName: "ng1", SizeChange: 1}}) +} + func TestScaleUpNoHelp(t *testing.T) { n1 := BuildTestNode("n1", 100, 1000) now := time.Now() diff --git a/cluster-autoscaler/core/test/common.go b/cluster-autoscaler/core/test/common.go index 5a84f4435b..a7b6dcfbc6 100644 --- a/cluster-autoscaler/core/test/common.go +++ b/cluster-autoscaler/core/test/common.go @@ -37,6 +37,7 @@ import ( "k8s.io/autoscaler/cluster-autoscaler/metrics" "k8s.io/autoscaler/cluster-autoscaler/processors" "k8s.io/autoscaler/cluster-autoscaler/processors/actionablecluster" + "k8s.io/autoscaler/cluster-autoscaler/processors/binpacking" processor_callbacks "k8s.io/autoscaler/cluster-autoscaler/processors/callbacks" "k8s.io/autoscaler/cluster-autoscaler/processors/customresources" "k8s.io/autoscaler/cluster-autoscaler/processors/nodegroupconfig" @@ -167,6 +168,7 @@ func NewTestProcessors(context *context.AutoscalingContext) *processors.Autoscal return &processors.AutoscalingProcessors{ PodListProcessor: podlistprocessor.NewDefaultPodListProcessor(context.PredicateChecker), NodeGroupListProcessor: &nodegroups.NoOpNodeGroupListProcessor{}, + BinpackingLimiter: binpacking.NewDefaultBinpackingLimiter(), NodeGroupSetProcessor: nodegroupset.NewDefaultNodeGroupSetProcessor([]string{}, config.NodeGroupDifferenceRatios{}), ScaleDownSetProcessor: nodes.NewPostFilteringScaleDownNodeProcessor(), // TODO(bskiba): change scale up test so that this can be a NoOpProcessor @@ -329,6 +331,22 @@ func (p *MockAutoprovisioningNodeGroupListProcessor) Process(context *context.Au func (p *MockAutoprovisioningNodeGroupListProcessor) CleanUp() { } +// MockBinpackingLimiter is a fake BinpackingLimiter to be used in tests. +type MockBinpackingLimiter struct { + requiredExpansionOptions int + T *testing.T +} + +// InitBinpacking initialises the MockBinpackingLimiter and sets requiredExpansionOptions to 1. +func (p *MockBinpackingLimiter) InitBinpacking(context *context.AutoscalingContext, nodeGroups []cloudprovider.NodeGroup) { + p.requiredExpansionOptions = 1 +} + +// StopBinpacking stops the binpacking early, if we already have requiredExpansionOptions i.e. 1. +func (p *MockBinpackingLimiter) StopBinpacking(context *context.AutoscalingContext, evaluatedOptions []expander.Option, usedNodeGroups map[string]bool) bool { + return len(evaluatedOptions) == p.requiredExpansionOptions +} + // NewBackoff creates a new backoff object func NewBackoff() backoff.Backoff { return backoff.NewIdBasedExponentialBackoff(5*time.Minute, /*InitialNodeGroupBackoffDuration*/ diff --git a/cluster-autoscaler/processors/binpacking/binpacking_limiter.go b/cluster-autoscaler/processors/binpacking/binpacking_limiter.go new file mode 100644 index 0000000000..c2889e653b --- /dev/null +++ b/cluster-autoscaler/processors/binpacking/binpacking_limiter.go @@ -0,0 +1,47 @@ +/* +Copyright 2023 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package binpacking + +import ( + "k8s.io/autoscaler/cluster-autoscaler/cloudprovider" + "k8s.io/autoscaler/cluster-autoscaler/context" + "k8s.io/autoscaler/cluster-autoscaler/expander" +) + +// BinpackingLimiter processes expansion options to stop binpacking early. +type BinpackingLimiter interface { + InitBinpacking(context *context.AutoscalingContext, nodeGroups []cloudprovider.NodeGroup) + StopBinpacking(context *context.AutoscalingContext, evaluatedOptions []expander.Option, usedNodeGroups map[string]bool) bool +} + +// NoOpBinpackingLimiter returns true without processing expansion options. +type NoOpBinpackingLimiter struct { +} + +// NewDefaultBinpackingLimiter creates an instance of NoOpBinpackingLimiter. +func NewDefaultBinpackingLimiter() BinpackingLimiter { + return &NoOpBinpackingLimiter{} +} + +// InitBinpacking initialises the BinpackingLimiter. +func (p *NoOpBinpackingLimiter) InitBinpacking(context *context.AutoscalingContext, nodeGroups []cloudprovider.NodeGroup) { +} + +// StopBinpacking is used to make decsions on the evaluated expansion options. +func (p *NoOpBinpackingLimiter) StopBinpacking(context *context.AutoscalingContext, evaluatedOptions []expander.Option, usedNodeGroups map[string]bool) bool { + return false +} diff --git a/cluster-autoscaler/processors/processors.go b/cluster-autoscaler/processors/processors.go index 78cc56ea13..d763fd02ef 100644 --- a/cluster-autoscaler/processors/processors.go +++ b/cluster-autoscaler/processors/processors.go @@ -19,6 +19,7 @@ package processors import ( "k8s.io/autoscaler/cluster-autoscaler/config" "k8s.io/autoscaler/cluster-autoscaler/processors/actionablecluster" + "k8s.io/autoscaler/cluster-autoscaler/processors/binpacking" "k8s.io/autoscaler/cluster-autoscaler/processors/customresources" "k8s.io/autoscaler/cluster-autoscaler/processors/nodegroupconfig" "k8s.io/autoscaler/cluster-autoscaler/processors/nodegroups" @@ -38,6 +39,8 @@ type AutoscalingProcessors struct { PodListProcessor pods.PodListProcessor // NodeGroupListProcessor is used to process list of NodeGroups that can be used in scale-up. NodeGroupListProcessor nodegroups.NodeGroupListProcessor + // BinpackingLimiter processes expansion options to stop binpacking early. + BinpackingLimiter binpacking.BinpackingLimiter // NodeGroupSetProcessor is used to divide scale-up between similar NodeGroups. NodeGroupSetProcessor nodegroupset.NodeGroupSetProcessor // ScaleUpStatusProcessor is used to process the state of the cluster after a scale-up.