Move node group balancing to processor

The goal is to allow customization of this logic
for different use-case and cloudproviders.
This commit is contained in:
Maciej Pytel 2018-10-23 14:38:56 +02:00
parent 8a79970c47
commit 6f5e6aab6f
11 changed files with 194 additions and 158 deletions

View File

@ -32,11 +32,11 @@ import (
"k8s.io/autoscaler/cluster-autoscaler/expander" "k8s.io/autoscaler/cluster-autoscaler/expander"
"k8s.io/autoscaler/cluster-autoscaler/metrics" "k8s.io/autoscaler/cluster-autoscaler/metrics"
ca_processors "k8s.io/autoscaler/cluster-autoscaler/processors" ca_processors "k8s.io/autoscaler/cluster-autoscaler/processors"
"k8s.io/autoscaler/cluster-autoscaler/processors/nodegroupset"
"k8s.io/autoscaler/cluster-autoscaler/processors/status" "k8s.io/autoscaler/cluster-autoscaler/processors/status"
"k8s.io/autoscaler/cluster-autoscaler/utils/errors" "k8s.io/autoscaler/cluster-autoscaler/utils/errors"
"k8s.io/autoscaler/cluster-autoscaler/utils/glogx" "k8s.io/autoscaler/cluster-autoscaler/utils/glogx"
"k8s.io/autoscaler/cluster-autoscaler/utils/gpu" "k8s.io/autoscaler/cluster-autoscaler/utils/gpu"
"k8s.io/autoscaler/cluster-autoscaler/utils/nodegroupset"
schedulercache "k8s.io/kubernetes/pkg/scheduler/cache" schedulercache "k8s.io/kubernetes/pkg/scheduler/cache"
"github.com/golang/glog" "github.com/golang/glog"
@ -510,7 +510,7 @@ func ScaleUp(context *context.AutoscalingContext, processors *ca_processors.Auto
targetNodeGroups := []cloudprovider.NodeGroup{bestOption.NodeGroup} targetNodeGroups := []cloudprovider.NodeGroup{bestOption.NodeGroup}
if context.BalanceSimilarNodeGroups { if context.BalanceSimilarNodeGroups {
similarNodeGroups, typedErr := nodegroupset.FindSimilarNodeGroups(bestOption.NodeGroup, context.CloudProvider, nodeInfos) similarNodeGroups, typedErr := processors.NodeGroupSetProcessor.FindSimilarNodeGroups(context, bestOption.NodeGroup, nodeInfos)
if typedErr != nil { if typedErr != nil {
return &status.ScaleUpStatus{Result: status.ScaleUpError}, typedErr.AddPrefix("Failed to find matching node groups: ") return &status.ScaleUpStatus{Result: status.ScaleUpError}, typedErr.AddPrefix("Failed to find matching node groups: ")
} }
@ -536,8 +536,8 @@ func ScaleUp(context *context.AutoscalingContext, processors *ca_processors.Auto
glog.V(1).Infof("Splitting scale-up between %v similar node groups: {%v}", len(targetNodeGroups), buffer.String()) glog.V(1).Infof("Splitting scale-up between %v similar node groups: {%v}", len(targetNodeGroups), buffer.String())
} }
} }
scaleUpInfos, typedErr := nodegroupset.BalanceScaleUpBetweenGroups( scaleUpInfos, typedErr := processors.NodeGroupSetProcessor.BalanceScaleUpBetweenGroups(
targetNodeGroups, newNodes) context, targetNodeGroups, newNodes)
if typedErr != nil { if typedErr != nil {
return &status.ScaleUpStatus{Result: status.ScaleUpError}, typedErr return &status.ScaleUpStatus{Result: status.ScaleUpError}, typedErr
} }

View File

@ -1,5 +1,5 @@
/* /*
Copyright 2017 The Kubernetes Authors. Copyright 2018 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License"); Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License. you may not use this file except in compliance with the License.
@ -17,30 +17,49 @@ limitations under the License.
package nodegroupset package nodegroupset
import ( import (
"fmt"
"sort" "sort"
"k8s.io/autoscaler/cluster-autoscaler/cloudprovider" "k8s.io/autoscaler/cluster-autoscaler/cloudprovider"
"k8s.io/autoscaler/cluster-autoscaler/context"
"k8s.io/autoscaler/cluster-autoscaler/utils/errors" "k8s.io/autoscaler/cluster-autoscaler/utils/errors"
schedulercache "k8s.io/kubernetes/pkg/scheduler/cache"
"github.com/golang/glog" "github.com/golang/glog"
) )
// ScaleUpInfo contains information about planned scale-up of a single NodeGroup // BalancingNodeGroupSetProcessor tries to keep similar node groups balanced on scale-up.
type ScaleUpInfo struct { type BalancingNodeGroupSetProcessor struct {
// Group is the group to be scaled-up
Group cloudprovider.NodeGroup
// CurrentSize is the current size of the Group
CurrentSize int
// NewSize is the size the Group will be scaled-up to
NewSize int
// MaxSize is the maximum allowed size of the Group
MaxSize int
} }
// String is used for printing ScaleUpInfo for logging, etc // FindSimilarNodeGroups returns a list of NodeGroups similar to the given one.
func (s ScaleUpInfo) String() string { // Two groups are similar if the NodeInfos for them compare equal using IsNodeInfoSimilar.
return fmt.Sprintf("{%v %v->%v (max: %v)}", s.Group.Id(), s.CurrentSize, s.NewSize, s.MaxSize) func (b *BalancingNodeGroupSetProcessor) FindSimilarNodeGroups(context *context.AutoscalingContext, nodeGroup cloudprovider.NodeGroup,
nodeInfosForGroups map[string]*schedulercache.NodeInfo) ([]cloudprovider.NodeGroup, errors.AutoscalerError) {
result := []cloudprovider.NodeGroup{}
nodeGroupId := nodeGroup.Id()
nodeInfo, found := nodeInfosForGroups[nodeGroupId]
if !found {
return []cloudprovider.NodeGroup{}, errors.NewAutoscalerError(
errors.InternalError,
"failed to find template node for node group %s",
nodeGroupId)
}
for _, ng := range context.CloudProvider.NodeGroups() {
ngId := ng.Id()
if ngId == nodeGroupId {
continue
}
ngNodeInfo, found := nodeInfosForGroups[ngId]
if !found {
glog.Warningf("Failed to find nodeInfo for group %v", ngId)
continue
}
if IsNodeInfoSimilar(nodeInfo, ngNodeInfo) {
result = append(result, ng)
}
}
return result, nil
} }
// BalanceScaleUpBetweenGroups distributes a given number of nodes between // BalanceScaleUpBetweenGroups distributes a given number of nodes between
@ -52,7 +71,7 @@ func (s ScaleUpInfo) String() string {
// MaxSize of each group will be respected. If newNodes > total free capacity // MaxSize of each group will be respected. If newNodes > total free capacity
// of all NodeGroups it will be capped to total capacity. In particular if all // of all NodeGroups it will be capped to total capacity. In particular if all
// group already have MaxSize, empty list will be returned. // group already have MaxSize, empty list will be returned.
func BalanceScaleUpBetweenGroups(groups []cloudprovider.NodeGroup, newNodes int) ([]ScaleUpInfo, errors.AutoscalerError) { func (b *BalancingNodeGroupSetProcessor) BalanceScaleUpBetweenGroups(context *context.AutoscalingContext, groups []cloudprovider.NodeGroup, newNodes int) ([]ScaleUpInfo, errors.AutoscalerError) {
if len(groups) == 0 { if len(groups) == 0 {
return []ScaleUpInfo{}, errors.NewAutoscalerError( return []ScaleUpInfo{}, errors.NewAutoscalerError(
errors.InternalError, "Can't balance scale up between 0 groups") errors.InternalError, "Can't balance scale up between 0 groups")
@ -152,3 +171,6 @@ func BalanceScaleUpBetweenGroups(groups []cloudprovider.NodeGroup, newNodes int)
return result, nil return result, nil
} }
// CleanUp performs final clean up of processor state.
func (b *BalancingNodeGroupSetProcessor) CleanUp() {}

View File

@ -21,28 +21,81 @@ import (
"k8s.io/autoscaler/cluster-autoscaler/cloudprovider" "k8s.io/autoscaler/cluster-autoscaler/cloudprovider"
testprovider "k8s.io/autoscaler/cluster-autoscaler/cloudprovider/test" testprovider "k8s.io/autoscaler/cluster-autoscaler/cloudprovider/test"
"k8s.io/autoscaler/cluster-autoscaler/context"
. "k8s.io/autoscaler/cluster-autoscaler/utils/test"
schedulercache "k8s.io/kubernetes/pkg/scheduler/cache"
"github.com/stretchr/testify/assert" "github.com/stretchr/testify/assert"
) )
func TestFindSimilarNodeGroups(t *testing.T) {
processor := &BalancingNodeGroupSetProcessor{}
context := &context.AutoscalingContext{}
n1 := BuildTestNode("n1", 1000, 1000)
n2 := BuildTestNode("n2", 1000, 1000)
n3 := BuildTestNode("n3", 2000, 2000)
provider := testprovider.NewTestCloudProvider(nil, nil)
provider.AddNodeGroup("ng1", 1, 10, 1)
provider.AddNodeGroup("ng2", 1, 10, 1)
provider.AddNodeGroup("ng3", 1, 10, 1)
provider.AddNode("ng1", n1)
provider.AddNode("ng2", n2)
provider.AddNode("ng3", n3)
ni1 := schedulercache.NewNodeInfo()
ni1.SetNode(n1)
ni2 := schedulercache.NewNodeInfo()
ni2.SetNode(n2)
ni3 := schedulercache.NewNodeInfo()
ni3.SetNode(n3)
nodeInfosForGroups := map[string]*schedulercache.NodeInfo{
"ng1": ni1, "ng2": ni2, "ng3": ni3,
}
ng1, _ := provider.NodeGroupForNode(n1)
ng2, _ := provider.NodeGroupForNode(n2)
ng3, _ := provider.NodeGroupForNode(n3)
context.CloudProvider = provider
similar, err := processor.FindSimilarNodeGroups(context, ng1, nodeInfosForGroups)
assert.NoError(t, err)
assert.Equal(t, similar, []cloudprovider.NodeGroup{ng2})
similar, err = processor.FindSimilarNodeGroups(context, ng2, nodeInfosForGroups)
assert.NoError(t, err)
assert.Equal(t, similar, []cloudprovider.NodeGroup{ng1})
similar, err = processor.FindSimilarNodeGroups(context, ng3, nodeInfosForGroups)
assert.NoError(t, err)
assert.Equal(t, similar, []cloudprovider.NodeGroup{})
}
func TestBalanceSingleGroup(t *testing.T) { func TestBalanceSingleGroup(t *testing.T) {
processor := &BalancingNodeGroupSetProcessor{}
context := &context.AutoscalingContext{}
provider := testprovider.NewTestCloudProvider(nil, nil) provider := testprovider.NewTestCloudProvider(nil, nil)
provider.AddNodeGroup("ng1", 1, 10, 1) provider.AddNodeGroup("ng1", 1, 10, 1)
// just one node // just one node
scaleUpInfo, err := BalanceScaleUpBetweenGroups(provider.NodeGroups(), 1) scaleUpInfo, err := processor.BalanceScaleUpBetweenGroups(context, provider.NodeGroups(), 1)
assert.NoError(t, err) assert.NoError(t, err)
assert.Equal(t, 1, len(scaleUpInfo)) assert.Equal(t, 1, len(scaleUpInfo))
assert.Equal(t, 2, scaleUpInfo[0].NewSize) assert.Equal(t, 2, scaleUpInfo[0].NewSize)
// multiple nodes // multiple nodes
scaleUpInfo, err = BalanceScaleUpBetweenGroups(provider.NodeGroups(), 4) scaleUpInfo, err = processor.BalanceScaleUpBetweenGroups(context, provider.NodeGroups(), 4)
assert.NoError(t, err) assert.NoError(t, err)
assert.Equal(t, 1, len(scaleUpInfo)) assert.Equal(t, 1, len(scaleUpInfo))
assert.Equal(t, 5, scaleUpInfo[0].NewSize) assert.Equal(t, 5, scaleUpInfo[0].NewSize)
} }
func TestBalanceUnderMaxSize(t *testing.T) { func TestBalanceUnderMaxSize(t *testing.T) {
processor := &BalancingNodeGroupSetProcessor{}
context := &context.AutoscalingContext{}
provider := testprovider.NewTestCloudProvider(nil, nil) provider := testprovider.NewTestCloudProvider(nil, nil)
provider.AddNodeGroup("ng1", 1, 10, 1) provider.AddNodeGroup("ng1", 1, 10, 1)
provider.AddNodeGroup("ng2", 1, 10, 3) provider.AddNodeGroup("ng2", 1, 10, 3)
@ -50,19 +103,19 @@ func TestBalanceUnderMaxSize(t *testing.T) {
provider.AddNodeGroup("ng4", 1, 10, 5) provider.AddNodeGroup("ng4", 1, 10, 5)
// add a single node // add a single node
scaleUpInfo, err := BalanceScaleUpBetweenGroups(provider.NodeGroups(), 1) scaleUpInfo, err := processor.BalanceScaleUpBetweenGroups(context, provider.NodeGroups(), 1)
assert.NoError(t, err) assert.NoError(t, err)
assert.Equal(t, 1, len(scaleUpInfo)) assert.Equal(t, 1, len(scaleUpInfo))
assert.Equal(t, 2, scaleUpInfo[0].NewSize) assert.Equal(t, 2, scaleUpInfo[0].NewSize)
// add multiple nodes to single group // add multiple nodes to single group
scaleUpInfo, err = BalanceScaleUpBetweenGroups(provider.NodeGroups(), 2) scaleUpInfo, err = processor.BalanceScaleUpBetweenGroups(context, provider.NodeGroups(), 2)
assert.NoError(t, err) assert.NoError(t, err)
assert.Equal(t, 1, len(scaleUpInfo)) assert.Equal(t, 1, len(scaleUpInfo))
assert.Equal(t, 3, scaleUpInfo[0].NewSize) assert.Equal(t, 3, scaleUpInfo[0].NewSize)
// add nodes to groups of different sizes, divisible // add nodes to groups of different sizes, divisible
scaleUpInfo, err = BalanceScaleUpBetweenGroups(provider.NodeGroups(), 4) scaleUpInfo, err = processor.BalanceScaleUpBetweenGroups(context, provider.NodeGroups(), 4)
assert.NoError(t, err) assert.NoError(t, err)
assert.Equal(t, 2, len(scaleUpInfo)) assert.Equal(t, 2, len(scaleUpInfo))
assert.Equal(t, 4, scaleUpInfo[0].NewSize) assert.Equal(t, 4, scaleUpInfo[0].NewSize)
@ -72,7 +125,7 @@ func TestBalanceUnderMaxSize(t *testing.T) {
// add nodes to groups of different sizes, non-divisible // add nodes to groups of different sizes, non-divisible
// we expect new sizes to be 4 and 5, doesn't matter which group gets how many // we expect new sizes to be 4 and 5, doesn't matter which group gets how many
scaleUpInfo, err = BalanceScaleUpBetweenGroups(provider.NodeGroups(), 5) scaleUpInfo, err = processor.BalanceScaleUpBetweenGroups(context, provider.NodeGroups(), 5)
assert.NoError(t, err) assert.NoError(t, err)
assert.Equal(t, 2, len(scaleUpInfo)) assert.Equal(t, 2, len(scaleUpInfo))
assert.Equal(t, 9, scaleUpInfo[0].NewSize+scaleUpInfo[1].NewSize) assert.Equal(t, 9, scaleUpInfo[0].NewSize+scaleUpInfo[1].NewSize)
@ -81,7 +134,7 @@ func TestBalanceUnderMaxSize(t *testing.T) {
assert.True(t, scaleUpInfo[0].Group.Id() == "ng2" || scaleUpInfo[1].Group.Id() == "ng2") assert.True(t, scaleUpInfo[0].Group.Id() == "ng2" || scaleUpInfo[1].Group.Id() == "ng2")
// add nodes to all groups, divisible // add nodes to all groups, divisible
scaleUpInfo, err = BalanceScaleUpBetweenGroups(provider.NodeGroups(), 10) scaleUpInfo, err = processor.BalanceScaleUpBetweenGroups(context, provider.NodeGroups(), 10)
assert.NoError(t, err) assert.NoError(t, err)
assert.Equal(t, 4, len(scaleUpInfo)) assert.Equal(t, 4, len(scaleUpInfo))
for _, info := range scaleUpInfo { for _, info := range scaleUpInfo {
@ -90,6 +143,9 @@ func TestBalanceUnderMaxSize(t *testing.T) {
} }
func TestBalanceHittingMaxSize(t *testing.T) { func TestBalanceHittingMaxSize(t *testing.T) {
processor := &BalancingNodeGroupSetProcessor{}
context := &context.AutoscalingContext{}
provider := testprovider.NewTestCloudProvider(nil, nil) provider := testprovider.NewTestCloudProvider(nil, nil)
provider.AddNodeGroup("ng1", 1, 1, 1) provider.AddNodeGroup("ng1", 1, 1, 1)
provider.AddNodeGroup("ng2", 1, 3, 1) provider.AddNodeGroup("ng2", 1, 3, 1)
@ -117,33 +173,33 @@ func TestBalanceHittingMaxSize(t *testing.T) {
} }
// Just one maxed out group // Just one maxed out group
scaleUpInfo, err := BalanceScaleUpBetweenGroups(getGroups("ng1"), 1) scaleUpInfo, err := processor.BalanceScaleUpBetweenGroups(context, getGroups("ng1"), 1)
assert.NoError(t, err) assert.NoError(t, err)
assert.Equal(t, 0, len(scaleUpInfo)) assert.Equal(t, 0, len(scaleUpInfo))
// Smallest group already maxed out, add one node // Smallest group already maxed out, add one node
scaleUpInfo, err = BalanceScaleUpBetweenGroups(getGroups("ng1", "ng2"), 1) scaleUpInfo, err = processor.BalanceScaleUpBetweenGroups(context, getGroups("ng1", "ng2"), 1)
assert.NoError(t, err) assert.NoError(t, err)
assert.Equal(t, 1, len(scaleUpInfo)) assert.Equal(t, 1, len(scaleUpInfo))
assert.Equal(t, "ng2", scaleUpInfo[0].Group.Id()) assert.Equal(t, "ng2", scaleUpInfo[0].Group.Id())
assert.Equal(t, 2, scaleUpInfo[0].NewSize) assert.Equal(t, 2, scaleUpInfo[0].NewSize)
// Smallest group already maxed out, too many nodes (should cap to max capacity) // Smallest group already maxed out, too many nodes (should cap to max capacity)
scaleUpInfo, err = BalanceScaleUpBetweenGroups(getGroups("ng1", "ng2"), 5) scaleUpInfo, err = processor.BalanceScaleUpBetweenGroups(context, getGroups("ng1", "ng2"), 5)
assert.NoError(t, err) assert.NoError(t, err)
assert.Equal(t, 1, len(scaleUpInfo)) assert.Equal(t, 1, len(scaleUpInfo))
assert.Equal(t, "ng2", scaleUpInfo[0].Group.Id()) assert.Equal(t, "ng2", scaleUpInfo[0].Group.Id())
assert.Equal(t, 3, scaleUpInfo[0].NewSize) assert.Equal(t, 3, scaleUpInfo[0].NewSize)
// First group maxes out before proceeding to next one // First group maxes out before proceeding to next one
scaleUpInfo, err = BalanceScaleUpBetweenGroups(getGroups("ng2", "ng3"), 4) scaleUpInfo, err = processor.BalanceScaleUpBetweenGroups(context, getGroups("ng2", "ng3"), 4)
assert.Equal(t, 2, len(scaleUpInfo)) assert.Equal(t, 2, len(scaleUpInfo))
scaleUpMap := toMap(scaleUpInfo) scaleUpMap := toMap(scaleUpInfo)
assert.Equal(t, 3, scaleUpMap["ng2"].NewSize) assert.Equal(t, 3, scaleUpMap["ng2"].NewSize)
assert.Equal(t, 5, scaleUpMap["ng3"].NewSize) assert.Equal(t, 5, scaleUpMap["ng3"].NewSize)
// Last group maxes out before previous one // Last group maxes out before previous one
scaleUpInfo, err = BalanceScaleUpBetweenGroups(getGroups("ng2", "ng3", "ng4"), 9) scaleUpInfo, err = processor.BalanceScaleUpBetweenGroups(context, getGroups("ng2", "ng3", "ng4"), 9)
assert.Equal(t, 3, len(scaleUpInfo)) assert.Equal(t, 3, len(scaleUpInfo))
scaleUpMap = toMap(scaleUpInfo) scaleUpMap = toMap(scaleUpInfo)
assert.Equal(t, 3, scaleUpMap["ng2"].NewSize) assert.Equal(t, 3, scaleUpMap["ng2"].NewSize)
@ -151,7 +207,7 @@ func TestBalanceHittingMaxSize(t *testing.T) {
assert.Equal(t, 7, scaleUpMap["ng4"].NewSize) assert.Equal(t, 7, scaleUpMap["ng4"].NewSize)
// Use all capacity, cap to max // Use all capacity, cap to max
scaleUpInfo, err = BalanceScaleUpBetweenGroups(getGroups("ng2", "ng3", "ng4"), 900) scaleUpInfo, err = processor.BalanceScaleUpBetweenGroups(context, getGroups("ng2", "ng3", "ng4"), 900)
assert.Equal(t, 3, len(scaleUpInfo)) assert.Equal(t, 3, len(scaleUpInfo))
scaleUpMap = toMap(scaleUpInfo) scaleUpMap = toMap(scaleUpInfo)
assert.Equal(t, 3, scaleUpMap["ng2"].NewSize) assert.Equal(t, 3, scaleUpMap["ng2"].NewSize)

View File

@ -0,0 +1,75 @@
/*
Copyright 2018 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package nodegroupset
import (
"fmt"
"k8s.io/autoscaler/cluster-autoscaler/cloudprovider"
"k8s.io/autoscaler/cluster-autoscaler/context"
"k8s.io/autoscaler/cluster-autoscaler/utils/errors"
schedulercache "k8s.io/kubernetes/pkg/scheduler/cache"
)
// ScaleUpInfo contains information about planned scale-up of a single NodeGroup
type ScaleUpInfo struct {
// Group is the group to be scaled-up
Group cloudprovider.NodeGroup
// CurrentSize is the current size of the Group
CurrentSize int
// NewSize is the size the Group will be scaled-up to
NewSize int
// MaxSize is the maximum allowed size of the Group
MaxSize int
}
// String is used for printing ScaleUpInfo for logging, etc
func (s ScaleUpInfo) String() string {
return fmt.Sprintf("{%v %v->%v (max: %v)}", s.Group.Id(), s.CurrentSize, s.NewSize, s.MaxSize)
}
// NodeGroupSetProcessor finds nodegroups that are similar and allows balancing scale-up between them.
type NodeGroupSetProcessor interface {
FindSimilarNodeGroups(context *context.AutoscalingContext, nodeGroup cloudprovider.NodeGroup,
nodeInfosForGroups map[string]*schedulercache.NodeInfo) ([]cloudprovider.NodeGroup, errors.AutoscalerError)
BalanceScaleUpBetweenGroups(context *context.AutoscalingContext, groups []cloudprovider.NodeGroup, newNodes int) ([]ScaleUpInfo, errors.AutoscalerError)
CleanUp()
}
// NoOpNodeGroupSetProcessor returns no similar node groups and doesn't do any balancing.
type NoOpNodeGroupSetProcessor struct {
}
// FindSimilarNodeGroups returns a list of NodeGroups similar to the one provided in parameter.
func (n *NoOpNodeGroupSetProcessor) FindSimilarNodeGroups(context *context.AutoscalingContext, nodeGroup cloudprovider.NodeGroup,
nodeInfosForGroups map[string]*schedulercache.NodeInfo) ([]cloudprovider.NodeGroup, errors.AutoscalerError) {
return []cloudprovider.NodeGroup{}, nil
}
// BalanceScaleUpBetweenGroups splits a scale-up between provided NodeGroups.
func (n *NoOpNodeGroupSetProcessor) BalanceScaleUpBetweenGroups(context *context.AutoscalingContext, groups []cloudprovider.NodeGroup, newNodes int) ([]ScaleUpInfo, errors.AutoscalerError) {
return []ScaleUpInfo{}, nil
}
// CleanUp performs final clean up of processor state.
func (n *NoOpNodeGroupSetProcessor) CleanUp() {}
// NewDefaultNodeGroupSetProcessor creates an instance of NodeGroupSetProcessor.
func NewDefaultNodeGroupSetProcessor() NodeGroupSetProcessor {
return &BalancingNodeGroupSetProcessor{}
}

View File

@ -18,6 +18,7 @@ package processors
import ( import (
"k8s.io/autoscaler/cluster-autoscaler/processors/nodegroups" "k8s.io/autoscaler/cluster-autoscaler/processors/nodegroups"
"k8s.io/autoscaler/cluster-autoscaler/processors/nodegroupset"
"k8s.io/autoscaler/cluster-autoscaler/processors/pods" "k8s.io/autoscaler/cluster-autoscaler/processors/pods"
"k8s.io/autoscaler/cluster-autoscaler/processors/status" "k8s.io/autoscaler/cluster-autoscaler/processors/status"
) )
@ -29,6 +30,8 @@ type AutoscalingProcessors struct {
PodListProcessor pods.PodListProcessor PodListProcessor pods.PodListProcessor
// NodeGroupListProcessor is used to process list of NodeGroups that can be used in scale-up. // NodeGroupListProcessor is used to process list of NodeGroups that can be used in scale-up.
NodeGroupListProcessor nodegroups.NodeGroupListProcessor NodeGroupListProcessor nodegroups.NodeGroupListProcessor
// NodeGroupSetProcessor is used to divide scale-up between similar NodeGroups.
NodeGroupSetProcessor nodegroupset.NodeGroupSetProcessor
// ScaleUpStatusProcessor is used to process the state of the cluster after a scale-up. // ScaleUpStatusProcessor is used to process the state of the cluster after a scale-up.
ScaleUpStatusProcessor status.ScaleUpStatusProcessor ScaleUpStatusProcessor status.ScaleUpStatusProcessor
// ScaleDownStatusProcessor is used to process the state of the cluster after a scale-down. // ScaleDownStatusProcessor is used to process the state of the cluster after a scale-down.
@ -44,6 +47,7 @@ func DefaultProcessors() *AutoscalingProcessors {
return &AutoscalingProcessors{ return &AutoscalingProcessors{
PodListProcessor: pods.NewDefaultPodListProcessor(), PodListProcessor: pods.NewDefaultPodListProcessor(),
NodeGroupListProcessor: nodegroups.NewDefaultNodeGroupListProcessor(), NodeGroupListProcessor: nodegroups.NewDefaultNodeGroupListProcessor(),
NodeGroupSetProcessor: nodegroupset.NewDefaultNodeGroupSetProcessor(),
ScaleUpStatusProcessor: status.NewDefaultScaleUpStatusProcessor(), ScaleUpStatusProcessor: status.NewDefaultScaleUpStatusProcessor(),
ScaleDownStatusProcessor: status.NewDefaultScaleDownStatusProcessor(), ScaleDownStatusProcessor: status.NewDefaultScaleDownStatusProcessor(),
AutoscalingStatusProcessor: status.NewDefaultAutoscalingStatusProcessor(), AutoscalingStatusProcessor: status.NewDefaultAutoscalingStatusProcessor(),
@ -56,6 +60,7 @@ func TestProcessors() *AutoscalingProcessors {
return &AutoscalingProcessors{ return &AutoscalingProcessors{
PodListProcessor: &pods.NoOpPodListProcessor{}, PodListProcessor: &pods.NoOpPodListProcessor{},
NodeGroupListProcessor: &nodegroups.NoOpNodeGroupListProcessor{}, NodeGroupListProcessor: &nodegroups.NoOpNodeGroupListProcessor{},
NodeGroupSetProcessor: &nodegroupset.BalancingNodeGroupSetProcessor{},
// TODO(bskiba): change scale up test so that this can be a NoOpProcessor // TODO(bskiba): change scale up test so that this can be a NoOpProcessor
ScaleUpStatusProcessor: &status.EventingScaleUpStatusProcessor{}, ScaleUpStatusProcessor: &status.EventingScaleUpStatusProcessor{},
ScaleDownStatusProcessor: &status.NoOpScaleDownStatusProcessor{}, ScaleDownStatusProcessor: &status.NoOpScaleDownStatusProcessor{},
@ -68,6 +73,7 @@ func TestProcessors() *AutoscalingProcessors {
func (ap *AutoscalingProcessors) CleanUp() { func (ap *AutoscalingProcessors) CleanUp() {
ap.PodListProcessor.CleanUp() ap.PodListProcessor.CleanUp()
ap.NodeGroupListProcessor.CleanUp() ap.NodeGroupListProcessor.CleanUp()
ap.NodeGroupSetProcessor.CleanUp()
ap.ScaleUpStatusProcessor.CleanUp() ap.ScaleUpStatusProcessor.CleanUp()
ap.ScaleDownStatusProcessor.CleanUp() ap.ScaleDownStatusProcessor.CleanUp()
ap.AutoscalingStatusProcessor.CleanUp() ap.AutoscalingStatusProcessor.CleanUp()

View File

@ -22,7 +22,7 @@ import (
apiv1 "k8s.io/api/core/v1" apiv1 "k8s.io/api/core/v1"
"k8s.io/autoscaler/cluster-autoscaler/context" "k8s.io/autoscaler/cluster-autoscaler/context"
"k8s.io/autoscaler/cluster-autoscaler/utils/nodegroupset" "k8s.io/autoscaler/cluster-autoscaler/processors/nodegroupset"
. "k8s.io/autoscaler/cluster-autoscaler/utils/test" . "k8s.io/autoscaler/cluster-autoscaler/utils/test"
kube_record "k8s.io/client-go/tools/record" kube_record "k8s.io/client-go/tools/record"

View File

@ -19,7 +19,7 @@ package status
import ( import (
apiv1 "k8s.io/api/core/v1" apiv1 "k8s.io/api/core/v1"
"k8s.io/autoscaler/cluster-autoscaler/context" "k8s.io/autoscaler/cluster-autoscaler/context"
"k8s.io/autoscaler/cluster-autoscaler/utils/nodegroupset" "k8s.io/autoscaler/cluster-autoscaler/processors/nodegroupset"
) )
// ScaleUpStatus is the status of a scale-up attempt. This includes information // ScaleUpStatus is the status of a scale-up attempt. This includes information

View File

@ -1,55 +0,0 @@
/*
Copyright 2017 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package nodegroupset
import (
"k8s.io/autoscaler/cluster-autoscaler/cloudprovider"
"k8s.io/autoscaler/cluster-autoscaler/utils/errors"
schedulercache "k8s.io/kubernetes/pkg/scheduler/cache"
"github.com/golang/glog"
)
// FindSimilarNodeGroups returns a list of NodeGroups similar to the given one.
// Two groups are similar if the NodeInfos for them compare equal using IsNodeInfoSimilar.
func FindSimilarNodeGroups(nodeGroup cloudprovider.NodeGroup, cloudProvider cloudprovider.CloudProvider,
nodeInfosForGroups map[string]*schedulercache.NodeInfo) ([]cloudprovider.NodeGroup, errors.AutoscalerError) {
result := []cloudprovider.NodeGroup{}
nodeGroupId := nodeGroup.Id()
nodeInfo, found := nodeInfosForGroups[nodeGroupId]
if !found {
return []cloudprovider.NodeGroup{}, errors.NewAutoscalerError(
errors.InternalError,
"failed to find template node for node group %s",
nodeGroupId)
}
for _, ng := range cloudProvider.NodeGroups() {
ngId := ng.Id()
if ngId == nodeGroupId {
continue
}
ngNodeInfo, found := nodeInfosForGroups[ngId]
if !found {
glog.Warningf("Failed to find nodeInfo for group %v", ngId)
continue
}
if IsNodeInfoSimilar(nodeInfo, ngNodeInfo) {
result = append(result, ng)
}
}
return result, nil
}

View File

@ -1,68 +0,0 @@
/*
Copyright 2017 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package nodegroupset
import (
"testing"
"k8s.io/autoscaler/cluster-autoscaler/cloudprovider"
testprovider "k8s.io/autoscaler/cluster-autoscaler/cloudprovider/test"
. "k8s.io/autoscaler/cluster-autoscaler/utils/test"
schedulercache "k8s.io/kubernetes/pkg/scheduler/cache"
"github.com/stretchr/testify/assert"
)
func TestFindSimilarNodeGroups(t *testing.T) {
n1 := BuildTestNode("n1", 1000, 1000)
n2 := BuildTestNode("n2", 1000, 1000)
n3 := BuildTestNode("n3", 2000, 2000)
provider := testprovider.NewTestCloudProvider(nil, nil)
provider.AddNodeGroup("ng1", 1, 10, 1)
provider.AddNodeGroup("ng2", 1, 10, 1)
provider.AddNodeGroup("ng3", 1, 10, 1)
provider.AddNode("ng1", n1)
provider.AddNode("ng2", n2)
provider.AddNode("ng3", n3)
ni1 := schedulercache.NewNodeInfo()
ni1.SetNode(n1)
ni2 := schedulercache.NewNodeInfo()
ni2.SetNode(n2)
ni3 := schedulercache.NewNodeInfo()
ni3.SetNode(n3)
nodeInfosForGroups := map[string]*schedulercache.NodeInfo{
"ng1": ni1, "ng2": ni2, "ng3": ni3,
}
ng1, _ := provider.NodeGroupForNode(n1)
ng2, _ := provider.NodeGroupForNode(n2)
ng3, _ := provider.NodeGroupForNode(n3)
similar, err := FindSimilarNodeGroups(ng1, provider, nodeInfosForGroups)
assert.NoError(t, err)
assert.Equal(t, similar, []cloudprovider.NodeGroup{ng2})
similar, err = FindSimilarNodeGroups(ng2, provider, nodeInfosForGroups)
assert.NoError(t, err)
assert.Equal(t, similar, []cloudprovider.NodeGroup{ng1})
similar, err = FindSimilarNodeGroups(ng3, provider, nodeInfosForGroups)
assert.NoError(t, err)
assert.Equal(t, similar, []cloudprovider.NodeGroup{})
}