Balance sizes of similar nodegroups in scale-up

This commit is contained in:
Maciej Pytel 2017-06-01 18:19:06 +02:00
parent c5e9f479de
commit cd186f3ebc
4 changed files with 183 additions and 34 deletions

View File

@ -101,6 +101,8 @@ type AutoscalingOptions struct {
ScaleDownTrialInterval time.Duration
// WriteStatusConfigMap tells if the status information should be written to a ConfigMap
WriteStatusConfigMap bool
// BalanceSimilarNodeGroups enables logic that identifies node groups with similar machines and tries to balance node count between them.
BalanceSimilarNodeGroups bool
}
// NewAutoscalingContext returns an autoscaling context from all the necessary parameters passed via arguments

View File

@ -17,13 +17,16 @@ limitations under the License.
package core
import (
"bytes"
"time"
"k8s.io/autoscaler/cluster-autoscaler/cloudprovider"
"k8s.io/autoscaler/cluster-autoscaler/clusterstate"
"k8s.io/autoscaler/cluster-autoscaler/estimator"
"k8s.io/autoscaler/cluster-autoscaler/expander"
"k8s.io/autoscaler/cluster-autoscaler/metrics"
"k8s.io/autoscaler/cluster-autoscaler/utils/errors"
"k8s.io/autoscaler/cluster-autoscaler/utils/nodegroupset"
apiv1 "k8s.io/kubernetes/pkg/api/v1"
extensionsv1 "k8s.io/kubernetes/pkg/apis/extensions/v1beta1"
"k8s.io/kubernetes/plugin/pkg/scheduler/schedulercache"
@ -67,6 +70,7 @@ func ScaleUp(context *AutoscalingContext, unschedulablePods []*apiv1.Pod, nodes
}
glog.V(4).Infof("Upcoming %d nodes", len(upcomingNodes))
podsPassingPredicates := make(map[string][]*apiv1.Pod)
podsRemainUnschedulable := make(map[*apiv1.Pod]bool)
expansionOptions := make([]expander.Option, 0)
@ -111,6 +115,10 @@ func ScaleUp(context *AutoscalingContext, unschedulablePods []*apiv1.Pod, nodes
}
}
}
passingPods := make([]*apiv1.Pod, len(option.Pods))
copy(passingPods, option.Pods)
podsPassingPredicates[nodeGroup.Id()] = passingPods
if len(option.Pods) > 0 {
if context.EstimatorName == estimator.BinpackingEstimatorName {
binpackingEstimator := estimator.NewBinpackingNodeEstimator(context.PredicateChecker)
@ -154,48 +162,52 @@ func ScaleUp(context *AutoscalingContext, unschedulablePods []*apiv1.Pod, nodes
}
glog.V(1).Infof("Estimated %d nodes needed in %s", bestOption.NodeCount, bestOption.NodeGroup.Id())
currentSize, err := bestOption.NodeGroup.TargetSize()
if err != nil {
return false, errors.NewAutoscalerError(
errors.CloudProviderError,
"failed to get node group size: %v", err)
}
newSize := currentSize + bestOption.NodeCount
if newSize >= bestOption.NodeGroup.MaxSize() {
glog.V(1).Infof("Capping size to MAX (%d)", bestOption.NodeGroup.MaxSize())
newSize = bestOption.NodeGroup.MaxSize()
}
if context.MaxNodesTotal > 0 && len(nodes)+(newSize-currentSize) > context.MaxNodesTotal {
newNodes := bestOption.NodeCount
if context.MaxNodesTotal > 0 && len(nodes)+newNodes > context.MaxNodesTotal {
glog.V(1).Infof("Capping size to max cluster total size (%d)", context.MaxNodesTotal)
newSize = context.MaxNodesTotal - len(nodes) + currentSize
if newSize < currentSize {
newNodes = context.MaxNodesTotal - len(nodes)
if newNodes < 1 {
return false, errors.NewAutoscalerError(
errors.TransientError,
"max node total count already reached")
}
}
glog.V(0).Infof("Scale-up: setting group %s size to %d", bestOption.NodeGroup.Id(), newSize)
increase := newSize - currentSize
if err := bestOption.NodeGroup.IncreaseSize(increase); err != nil {
return false, errors.NewAutoscalerError(
errors.CloudProviderError, "failed to increase node group size: %v", err)
targetNodeGroups := []cloudprovider.NodeGroup{bestOption.NodeGroup}
if context.BalanceSimilarNodeGroups {
similarNodeGroups, typedErr := nodegroupset.FindSimilarNodeGroups(bestOption.NodeGroup, context.CloudProvider, nodeInfos)
if typedErr != nil {
return false, typedErr.AddPrefix("Failed to find matching node groups: ")
}
similarNodeGroups = filterNodeGroupsByPods(similarNodeGroups, bestOption.Pods, podsPassingPredicates)
targetNodeGroups = append(targetNodeGroups, similarNodeGroups...)
if len(targetNodeGroups) > 1 {
var buffer bytes.Buffer
for i, ng := range targetNodeGroups {
if i > 0 {
buffer.WriteString(", ")
}
buffer.WriteString(ng.Id())
}
glog.V(1).Infof("Splitting scale-up between %v similar node groups: {%v}", len(targetNodeGroups), buffer.String())
}
}
scaleUpInfos, typedErr := nodegroupset.BalanceScaleUpBetweenGroups(
targetNodeGroups, newNodes)
if typedErr != nil {
return false, typedErr
}
glog.V(1).Infof("Final scale-up plan: %v", scaleUpInfos)
for _, info := range scaleUpInfos {
typedErr := executeScaleUp(context, info)
if typedErr != nil {
return false, typedErr
}
}
context.ClusterStateRegistry.RegisterScaleUp(
&clusterstate.ScaleUpRequest{
NodeGroupName: bestOption.NodeGroup.Id(),
Increase: increase,
Time: time.Now(),
ExpectedAddTime: time.Now().Add(context.MaxNodeProvisionTime),
})
metrics.RegisterScaleUp(increase)
context.LogRecorder.Eventf(apiv1.EventTypeNormal, "ScaledUpGroup",
"Scale-up: group %s size set to %d", bestOption.NodeGroup.Id(), newSize)
for _, pod := range bestOption.Pods {
context.Recorder.Eventf(pod, apiv1.EventTypeNormal, "TriggeredScaleUp",
"pod triggered scale-up, group: %s, sizes (current/new): %d/%d", bestOption.NodeGroup.Id(), currentSize, newSize)
"pod triggered scale-up: %v", scaleUpInfos)
}
return true, nil
@ -209,3 +221,48 @@ func ScaleUp(context *AutoscalingContext, unschedulablePods []*apiv1.Pod, nodes
return false, nil
}
func filterNodeGroupsByPods(groups []cloudprovider.NodeGroup, podsRequiredToFit []*apiv1.Pod,
fittingPodsPerNodeGroup map[string][]*apiv1.Pod) []cloudprovider.NodeGroup {
result := make([]cloudprovider.NodeGroup, 0)
groupsloop:
for _, group := range groups {
fittingPods, found := fittingPodsPerNodeGroup[group.Id()]
if !found {
glog.V(1).Infof("No info about pods passing predicates found for group %v, skipping it from scale-up consideration", group.Id())
continue
}
podSet := make(map[*apiv1.Pod]bool, len(fittingPods))
for _, pod := range fittingPods {
podSet[pod] = true
}
for _, pod := range podsRequiredToFit {
if _, found = podSet[pod]; !found {
glog.V(1).Infof("Group %v, can't fit pod %v/%v, removing from scale-up consideration", group.Id(), pod.Namespace, pod.Name)
continue groupsloop
}
}
result = append(result, group)
}
return result
}
func executeScaleUp(context *AutoscalingContext, info nodegroupset.ScaleUpInfo) *errors.AutoscalerError {
glog.V(0).Infof("Scale-up: setting group %s size to %d", info.Group.Id(), info.NewSize)
increase := info.NewSize - info.CurrentSize
if err := info.Group.IncreaseSize(increase); err != nil {
return errors.NewAutoscalerError(errors.CloudProviderError,
"failed to increase node group size: %v", err)
}
context.ClusterStateRegistry.RegisterScaleUp(
&clusterstate.ScaleUpRequest{
NodeGroupName: info.Group.Id(),
Increase: increase,
Time: time.Now(),
ExpectedAddTime: time.Now().Add(context.MaxNodeProvisionTime),
})
metrics.RegisterScaleUp(increase)
context.LogRecorder.Eventf(apiv1.EventTypeNormal, "ScaledUpGroup",
"Scale-up: group %s size set to %d", info.Group.Id(), info.NewSize)
return nil
}

View File

@ -25,6 +25,7 @@ import (
"github.com/stretchr/testify/assert"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/autoscaler/cluster-autoscaler/cloudprovider"
"k8s.io/autoscaler/cluster-autoscaler/cloudprovider/test"
"k8s.io/autoscaler/cluster-autoscaler/clusterstate"
"k8s.io/autoscaler/cluster-autoscaler/clusterstate/utils"
@ -354,3 +355,90 @@ func TestScaleUpNoHelp(t *testing.T) {
}
assert.Regexp(t, regexp.MustCompile("NotTriggerScaleUp"), event)
}
func TestScaleUpBalanceGroups(t *testing.T) {
fakeClient := &fake.Clientset{}
provider := testprovider.NewTestCloudProvider(func(string, int) error {
return nil
}, nil)
type ngInfo struct {
min, max, size int
}
testCfg := map[string]ngInfo{
"ng1": {min: 1, max: 1, size: 1},
"ng2": {min: 1, max: 2, size: 1},
"ng3": {min: 1, max: 5, size: 1},
"ng4": {min: 1, max: 5, size: 3},
}
podMap := make(map[string]*apiv1.Pod, len(testCfg))
nodes := make([]*apiv1.Node, 0)
for gid, gconf := range testCfg {
provider.AddNodeGroup(gid, gconf.min, gconf.max, gconf.size)
for i := 0; i < gconf.size; i++ {
nodeName := fmt.Sprintf("%v-node-%v", gid, i)
node := BuildTestNode(nodeName, 100, 1000)
SetNodeReadyState(node, true, time.Now())
nodes = append(nodes, node)
pod := BuildTestPod(fmt.Sprintf("%v-pod-%v", gid, i), 80, 0)
pod.Spec.NodeName = nodeName
podMap[gid] = pod
provider.AddNode(gid, node)
}
}
fakeClient.Fake.AddReactor("list", "pods", func(action core.Action) (bool, runtime.Object, error) {
list := action.(core.ListAction)
fieldstring := list.GetListRestrictions().Fields.String()
matcher, err := regexp.Compile("ng[0-9]")
if err != nil {
return false, &apiv1.PodList{Items: []apiv1.Pod{}}, err
}
matches := matcher.FindStringSubmatch(fieldstring)
if len(matches) != 1 {
return false, &apiv1.PodList{Items: []apiv1.Pod{}}, fmt.Errorf("parse error")
}
return true, &apiv1.PodList{Items: []apiv1.Pod{*(podMap[matches[0]])}}, nil
})
clusterState := clusterstate.NewClusterStateRegistry(provider, clusterstate.ClusterStateRegistryConfig{})
clusterState.UpdateNodes(nodes, time.Now())
fakeRecorder := kube_record.NewFakeRecorder(5)
fakeLogRecorder, _ := utils.NewStatusMapRecorder(fakeClient, kube_record.NewFakeRecorder(5), false)
context := &AutoscalingContext{
AutoscalingOptions: AutoscalingOptions{
EstimatorName: estimator.BinpackingEstimatorName,
BalanceSimilarNodeGroups: true,
},
PredicateChecker: simulator.NewTestPredicateChecker(),
CloudProvider: provider,
ClientSet: fakeClient,
Recorder: fakeRecorder,
ExpanderStrategy: random.NewStrategy(),
ClusterStateRegistry: clusterState,
LogRecorder: fakeLogRecorder,
}
pods := make([]*apiv1.Pod, 0)
for i := 0; i < 2; i++ {
pods = append(pods, BuildTestPod(fmt.Sprintf("test-pod-%v", i), 80, 0))
}
result, typedErr := ScaleUp(context, pods, nodes, []*extensionsv1.DaemonSet{})
assert.NoError(t, typedErr)
assert.True(t, result)
groupMap := make(map[string]cloudprovider.NodeGroup, 3)
for _, group := range provider.NodeGroups() {
groupMap[group.Id()] = group
}
ng2size, err := groupMap["ng2"].TargetSize()
assert.NoError(t, err)
ng3size, err := groupMap["ng3"].TargetSize()
assert.NoError(t, err)
assert.Equal(t, 2, ng2size)
assert.Equal(t, 2, ng3size)
}

View File

@ -98,9 +98,10 @@ var (
expanderFlag = flag.String("expander", expander.RandomExpanderName,
"Type of node group expander to be used in scale up. Available values: ["+strings.Join(expander.AvailableExpanders, ",")+"]")
writeStatusConfigMapFlag = flag.Bool("write-status-configmap", true, "Should CA write status information to a configmap")
maxInactivityTimeFlag = flag.Duration("max-inactivity", 10*time.Minute, "Maximum time from last recorded autoscaler activity before automatic restart")
maxFailingTimeFlag = flag.Duration("max-failing-time", 15*time.Minute, "Maximum time from last recorded successful autoscaler run before automatic restart")
writeStatusConfigMapFlag = flag.Bool("write-status-configmap", true, "Should CA write status information to a configmap")
maxInactivityTimeFlag = flag.Duration("max-inactivity", 10*time.Minute, "Maximum time from last recorded autoscaler activity before automatic restart")
maxFailingTimeFlag = flag.Duration("max-failing-time", 15*time.Minute, "Maximum time from last recorded successful autoscaler run before automatic restart")
balanceSimilarNodeGroupsFlag = flag.Bool("balance-similar-node-groups", false, "Detect similar node groups and balance the number of nodes between them")
)
func createAutoscalerOptions() core.AutoscalerOptions {
@ -126,6 +127,7 @@ func createAutoscalerOptions() core.AutoscalerOptions {
ScaleDownUtilizationThreshold: *scaleDownUtilizationThreshold,
VerifyUnschedulablePods: *verifyUnschedulablePods,
WriteStatusConfigMap: *writeStatusConfigMapFlag,
BalanceSimilarNodeGroups: *balanceSimilarNodeGroupsFlag,
}
configFetcherOpts := dynamic.ConfigFetcherOptions{