BinpackingLimiter interface

This commit is contained in:
Kushagra 2023-06-02 12:59:42 +00:00
parent fac5ef6480
commit 49cfd18000
5 changed files with 143 additions and 0 deletions

View File

@ -119,6 +119,13 @@ func (o *ScaleUpOrchestrator) ScaleUp(
}
}
useBinpackingLimiter := false
nodeGroupsWithExpansionOption := make(map[string]bool)
if o.processors.BinpackingLimiter != nil {
useBinpackingLimiter = true
o.processors.BinpackingLimiter.InitBinpacking(o.autoscalingContext, nodeGroups)
}
resourcesLeft, aErr := o.resourceManager.ResourcesLeft(o.autoscalingContext, nodeInfos, nodes)
if aErr != nil {
return scaleUpError(&status.ScaleUpStatus{}, aErr.AddPrefix("could not compute total resources: "))
@ -144,7 +151,12 @@ func (o *ScaleUpOrchestrator) ScaleUp(
continue
}
nodeGroupsWithExpansionOption[nodeGroup.Id()] = true
options = append(options, option)
if useBinpackingLimiter && o.processors.BinpackingLimiter.StopBinpacking(o.autoscalingContext, options, nodeGroupsWithExpansionOption) {
break
}
}
if len(options) == 0 {

View File

@ -805,6 +805,69 @@ func TestScaleUpUnhealthy(t *testing.T) {
assert.False(t, scaleUpStatus.WasSuccessful())
}
func TestBinpackingLimiter(t *testing.T) {
n1 := BuildTestNode("n1", 1000, 1000)
n2 := BuildTestNode("n2", 100000, 100000)
now := time.Now()
SetNodeReadyState(n1, true, now.Add(-2*time.Minute))
SetNodeReadyState(n2, true, now.Add(-2*time.Minute))
nodes := []*apiv1.Node{n1, n2}
podLister := kube_util.NewTestPodLister([]*apiv1.Pod{})
listers := kube_util.NewListerRegistry(nil, nil, podLister, nil, nil, nil, nil, nil, nil, nil)
provider := testprovider.NewTestCloudProvider(func(nodeGroup string, increase int) error {
return nil
}, nil)
options := defaultOptions
provider.AddNodeGroup("ng1", 1, 10, 1)
provider.AddNode("ng1", n1)
provider.AddNodeGroup("ng2", 1, 10, 1)
provider.AddNode("ng2", n2)
assert.NotNil(t, provider)
context, err := NewScaleTestAutoscalingContext(options, &fake.Clientset{}, listers, provider, nil, nil)
assert.NoError(t, err)
nodeInfos, err := nodeinfosprovider.NewDefaultTemplateNodeInfoProvider(nil, false).
Process(&context, nodes, []*appsv1.DaemonSet{}, taints.TaintConfig{}, now)
assert.NoError(t, err)
clusterState := clusterstate.NewClusterStateRegistry(provider, clusterstate.ClusterStateRegistryConfig{}, context.LogRecorder, NewBackoff(), clusterstate.NewStaticMaxNodeProvisionTimeProvider(15*time.Minute))
clusterState.UpdateNodes(nodes, nodeInfos, time.Now())
extraPod := BuildTestPod("p-new", 500, 0)
processors := NewTestProcessors(&context)
// We should stop binpacking after finding expansion option from first node.
// It is because we know that extra pod (requiring 500 cpu) is a good fit on
// node1. The pod can also be scheduled on node n2 but it's not a good fit.
// Hence we can use heuristics with BinpackingLimiter to stop binpacking early
// to save time on computation.
processors.BinpackingLimiter = &MockBinpackingLimiter{}
processors.BinpackingLimiter.InitBinpacking(&context, []cloudprovider.NodeGroup{})
suOrchestrator := New()
suOrchestrator.Initialize(&context, processors, clusterState, taints.TaintConfig{})
expander := NewMockRepotingStrategy(t, &GroupSizeChange{GroupName: "ng1", SizeChange: 1})
context.ExpanderStrategy = expander
scaleUpStatus, err := suOrchestrator.ScaleUp([]*apiv1.Pod{extraPod}, nodes, []*appsv1.DaemonSet{}, nodeInfos)
processors.ScaleUpStatusProcessor.Process(&context, scaleUpStatus)
assert.NoError(t, err)
assert.True(t, scaleUpStatus.WasSuccessful())
expansionOptions := expander.LastInputOptions()
// Only 1 expansion option should be there. Without BinpackingLimiter there will be 2.
assert.True(t, len(expansionOptions) == 1)
assert.Equal(t, expansionOptions, []GroupSizeChange{{GroupName: "ng1", SizeChange: 1}})
}
func TestScaleUpNoHelp(t *testing.T) {
n1 := BuildTestNode("n1", 100, 1000)
now := time.Now()

View File

@ -37,6 +37,7 @@ import (
"k8s.io/autoscaler/cluster-autoscaler/metrics"
"k8s.io/autoscaler/cluster-autoscaler/processors"
"k8s.io/autoscaler/cluster-autoscaler/processors/actionablecluster"
"k8s.io/autoscaler/cluster-autoscaler/processors/binpacking"
processor_callbacks "k8s.io/autoscaler/cluster-autoscaler/processors/callbacks"
"k8s.io/autoscaler/cluster-autoscaler/processors/customresources"
"k8s.io/autoscaler/cluster-autoscaler/processors/nodegroupconfig"
@ -167,6 +168,7 @@ func NewTestProcessors(context *context.AutoscalingContext) *processors.Autoscal
return &processors.AutoscalingProcessors{
PodListProcessor: podlistprocessor.NewDefaultPodListProcessor(context.PredicateChecker),
NodeGroupListProcessor: &nodegroups.NoOpNodeGroupListProcessor{},
BinpackingLimiter: binpacking.NewDefaultBinpackingLimiter(),
NodeGroupSetProcessor: nodegroupset.NewDefaultNodeGroupSetProcessor([]string{}, config.NodeGroupDifferenceRatios{}),
ScaleDownSetProcessor: nodes.NewPostFilteringScaleDownNodeProcessor(),
// TODO(bskiba): change scale up test so that this can be a NoOpProcessor
@ -329,6 +331,22 @@ func (p *MockAutoprovisioningNodeGroupListProcessor) Process(context *context.Au
func (p *MockAutoprovisioningNodeGroupListProcessor) CleanUp() {
}
// MockBinpackingLimiter is a fake BinpackingLimiter to be used in tests.
type MockBinpackingLimiter struct {
requiredExpansionOptions int
T *testing.T
}
// InitBinpacking initialises the MockBinpackingLimiter and sets requiredExpansionOptions to 1.
func (p *MockBinpackingLimiter) InitBinpacking(context *context.AutoscalingContext, nodeGroups []cloudprovider.NodeGroup) {
p.requiredExpansionOptions = 1
}
// StopBinpacking stops the binpacking early, if we already have requiredExpansionOptions i.e. 1.
func (p *MockBinpackingLimiter) StopBinpacking(context *context.AutoscalingContext, evaluatedOptions []expander.Option, usedNodeGroups map[string]bool) bool {
return len(evaluatedOptions) == p.requiredExpansionOptions
}
// NewBackoff creates a new backoff object
func NewBackoff() backoff.Backoff {
return backoff.NewIdBasedExponentialBackoff(5*time.Minute, /*InitialNodeGroupBackoffDuration*/

View File

@ -0,0 +1,47 @@
/*
Copyright 2023 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package binpacking
import (
"k8s.io/autoscaler/cluster-autoscaler/cloudprovider"
"k8s.io/autoscaler/cluster-autoscaler/context"
"k8s.io/autoscaler/cluster-autoscaler/expander"
)
// BinpackingLimiter processes expansion options to stop binpacking early.
type BinpackingLimiter interface {
InitBinpacking(context *context.AutoscalingContext, nodeGroups []cloudprovider.NodeGroup)
StopBinpacking(context *context.AutoscalingContext, evaluatedOptions []expander.Option, usedNodeGroups map[string]bool) bool
}
// NoOpBinpackingLimiter returns true without processing expansion options.
type NoOpBinpackingLimiter struct {
}
// NewDefaultBinpackingLimiter creates an instance of NoOpBinpackingLimiter.
func NewDefaultBinpackingLimiter() BinpackingLimiter {
return &NoOpBinpackingLimiter{}
}
// InitBinpacking initialises the BinpackingLimiter.
func (p *NoOpBinpackingLimiter) InitBinpacking(context *context.AutoscalingContext, nodeGroups []cloudprovider.NodeGroup) {
}
// StopBinpacking is used to make decsions on the evaluated expansion options.
func (p *NoOpBinpackingLimiter) StopBinpacking(context *context.AutoscalingContext, evaluatedOptions []expander.Option, usedNodeGroups map[string]bool) bool {
return false
}

View File

@ -19,6 +19,7 @@ package processors
import (
"k8s.io/autoscaler/cluster-autoscaler/config"
"k8s.io/autoscaler/cluster-autoscaler/processors/actionablecluster"
"k8s.io/autoscaler/cluster-autoscaler/processors/binpacking"
"k8s.io/autoscaler/cluster-autoscaler/processors/customresources"
"k8s.io/autoscaler/cluster-autoscaler/processors/nodegroupconfig"
"k8s.io/autoscaler/cluster-autoscaler/processors/nodegroups"
@ -38,6 +39,8 @@ type AutoscalingProcessors struct {
PodListProcessor pods.PodListProcessor
// NodeGroupListProcessor is used to process list of NodeGroups that can be used in scale-up.
NodeGroupListProcessor nodegroups.NodeGroupListProcessor
// BinpackingLimiter processes expansion options to stop binpacking early.
BinpackingLimiter binpacking.BinpackingLimiter
// NodeGroupSetProcessor is used to divide scale-up between similar NodeGroups.
NodeGroupSetProcessor nodegroupset.NodeGroupSetProcessor
// ScaleUpStatusProcessor is used to process the state of the cluster after a scale-up.