From cd186f3ebc4ac394d1cc5d2b81eb8f0cb736aabb Mon Sep 17 00:00:00 2001 From: Maciej Pytel Date: Thu, 1 Jun 2017 18:19:06 +0200 Subject: [PATCH] Balance sizes of similar nodegroups in scale-up --- .../core/autoscaling_context.go | 2 + cluster-autoscaler/core/scale_up.go | 119 +++++++++++++----- cluster-autoscaler/core/scale_up_test.go | 88 +++++++++++++ cluster-autoscaler/main.go | 8 +- 4 files changed, 183 insertions(+), 34 deletions(-) diff --git a/cluster-autoscaler/core/autoscaling_context.go b/cluster-autoscaler/core/autoscaling_context.go index 349eea953a..6a3f13027a 100644 --- a/cluster-autoscaler/core/autoscaling_context.go +++ b/cluster-autoscaler/core/autoscaling_context.go @@ -101,6 +101,8 @@ type AutoscalingOptions struct { ScaleDownTrialInterval time.Duration // WriteStatusConfigMap tells if the status information should be written to a ConfigMap WriteStatusConfigMap bool + // BalanceSimilarNodeGroups enables logic that identifies node groups with similar machines and tries to balance node count between them. + BalanceSimilarNodeGroups bool } // NewAutoscalingContext returns an autoscaling context from all the necessary parameters passed via arguments diff --git a/cluster-autoscaler/core/scale_up.go b/cluster-autoscaler/core/scale_up.go index 7a6282347b..f28c9d8be5 100644 --- a/cluster-autoscaler/core/scale_up.go +++ b/cluster-autoscaler/core/scale_up.go @@ -17,13 +17,16 @@ limitations under the License. package core import ( + "bytes" "time" + "k8s.io/autoscaler/cluster-autoscaler/cloudprovider" "k8s.io/autoscaler/cluster-autoscaler/clusterstate" "k8s.io/autoscaler/cluster-autoscaler/estimator" "k8s.io/autoscaler/cluster-autoscaler/expander" "k8s.io/autoscaler/cluster-autoscaler/metrics" "k8s.io/autoscaler/cluster-autoscaler/utils/errors" + "k8s.io/autoscaler/cluster-autoscaler/utils/nodegroupset" apiv1 "k8s.io/kubernetes/pkg/api/v1" extensionsv1 "k8s.io/kubernetes/pkg/apis/extensions/v1beta1" "k8s.io/kubernetes/plugin/pkg/scheduler/schedulercache" @@ -67,6 +70,7 @@ func ScaleUp(context *AutoscalingContext, unschedulablePods []*apiv1.Pod, nodes } glog.V(4).Infof("Upcoming %d nodes", len(upcomingNodes)) + podsPassingPredicates := make(map[string][]*apiv1.Pod) podsRemainUnschedulable := make(map[*apiv1.Pod]bool) expansionOptions := make([]expander.Option, 0) @@ -111,6 +115,10 @@ func ScaleUp(context *AutoscalingContext, unschedulablePods []*apiv1.Pod, nodes } } } + passingPods := make([]*apiv1.Pod, len(option.Pods)) + copy(passingPods, option.Pods) + podsPassingPredicates[nodeGroup.Id()] = passingPods + if len(option.Pods) > 0 { if context.EstimatorName == estimator.BinpackingEstimatorName { binpackingEstimator := estimator.NewBinpackingNodeEstimator(context.PredicateChecker) @@ -154,48 +162,52 @@ func ScaleUp(context *AutoscalingContext, unschedulablePods []*apiv1.Pod, nodes } glog.V(1).Infof("Estimated %d nodes needed in %s", bestOption.NodeCount, bestOption.NodeGroup.Id()) - currentSize, err := bestOption.NodeGroup.TargetSize() - if err != nil { - return false, errors.NewAutoscalerError( - errors.CloudProviderError, - "failed to get node group size: %v", err) - } - newSize := currentSize + bestOption.NodeCount - if newSize >= bestOption.NodeGroup.MaxSize() { - glog.V(1).Infof("Capping size to MAX (%d)", bestOption.NodeGroup.MaxSize()) - newSize = bestOption.NodeGroup.MaxSize() - } - - if context.MaxNodesTotal > 0 && len(nodes)+(newSize-currentSize) > context.MaxNodesTotal { + newNodes := bestOption.NodeCount + if context.MaxNodesTotal > 0 && len(nodes)+newNodes > context.MaxNodesTotal { glog.V(1).Infof("Capping size to max cluster total size (%d)", context.MaxNodesTotal) - newSize = context.MaxNodesTotal - len(nodes) + currentSize - if newSize < currentSize { + newNodes = context.MaxNodesTotal - len(nodes) + if newNodes < 1 { return false, errors.NewAutoscalerError( errors.TransientError, "max node total count already reached") } } - glog.V(0).Infof("Scale-up: setting group %s size to %d", bestOption.NodeGroup.Id(), newSize) - increase := newSize - currentSize - if err := bestOption.NodeGroup.IncreaseSize(increase); err != nil { - return false, errors.NewAutoscalerError( - errors.CloudProviderError, "failed to increase node group size: %v", err) + targetNodeGroups := []cloudprovider.NodeGroup{bestOption.NodeGroup} + if context.BalanceSimilarNodeGroups { + similarNodeGroups, typedErr := nodegroupset.FindSimilarNodeGroups(bestOption.NodeGroup, context.CloudProvider, nodeInfos) + if typedErr != nil { + return false, typedErr.AddPrefix("Failed to find matching node groups: ") + } + similarNodeGroups = filterNodeGroupsByPods(similarNodeGroups, bestOption.Pods, podsPassingPredicates) + targetNodeGroups = append(targetNodeGroups, similarNodeGroups...) + if len(targetNodeGroups) > 1 { + var buffer bytes.Buffer + for i, ng := range targetNodeGroups { + if i > 0 { + buffer.WriteString(", ") + } + buffer.WriteString(ng.Id()) + } + glog.V(1).Infof("Splitting scale-up between %v similar node groups: {%v}", len(targetNodeGroups), buffer.String()) + } + } + scaleUpInfos, typedErr := nodegroupset.BalanceScaleUpBetweenGroups( + targetNodeGroups, newNodes) + if typedErr != nil { + return false, typedErr + } + glog.V(1).Infof("Final scale-up plan: %v", scaleUpInfos) + for _, info := range scaleUpInfos { + typedErr := executeScaleUp(context, info) + if typedErr != nil { + return false, typedErr + } } - context.ClusterStateRegistry.RegisterScaleUp( - &clusterstate.ScaleUpRequest{ - NodeGroupName: bestOption.NodeGroup.Id(), - Increase: increase, - Time: time.Now(), - ExpectedAddTime: time.Now().Add(context.MaxNodeProvisionTime), - }) - metrics.RegisterScaleUp(increase) - context.LogRecorder.Eventf(apiv1.EventTypeNormal, "ScaledUpGroup", - "Scale-up: group %s size set to %d", bestOption.NodeGroup.Id(), newSize) for _, pod := range bestOption.Pods { context.Recorder.Eventf(pod, apiv1.EventTypeNormal, "TriggeredScaleUp", - "pod triggered scale-up, group: %s, sizes (current/new): %d/%d", bestOption.NodeGroup.Id(), currentSize, newSize) + "pod triggered scale-up: %v", scaleUpInfos) } return true, nil @@ -209,3 +221,48 @@ func ScaleUp(context *AutoscalingContext, unschedulablePods []*apiv1.Pod, nodes return false, nil } + +func filterNodeGroupsByPods(groups []cloudprovider.NodeGroup, podsRequiredToFit []*apiv1.Pod, + fittingPodsPerNodeGroup map[string][]*apiv1.Pod) []cloudprovider.NodeGroup { + result := make([]cloudprovider.NodeGroup, 0) +groupsloop: + for _, group := range groups { + fittingPods, found := fittingPodsPerNodeGroup[group.Id()] + if !found { + glog.V(1).Infof("No info about pods passing predicates found for group %v, skipping it from scale-up consideration", group.Id()) + continue + } + podSet := make(map[*apiv1.Pod]bool, len(fittingPods)) + for _, pod := range fittingPods { + podSet[pod] = true + } + for _, pod := range podsRequiredToFit { + if _, found = podSet[pod]; !found { + glog.V(1).Infof("Group %v, can't fit pod %v/%v, removing from scale-up consideration", group.Id(), pod.Namespace, pod.Name) + continue groupsloop + } + } + result = append(result, group) + } + return result +} + +func executeScaleUp(context *AutoscalingContext, info nodegroupset.ScaleUpInfo) *errors.AutoscalerError { + glog.V(0).Infof("Scale-up: setting group %s size to %d", info.Group.Id(), info.NewSize) + increase := info.NewSize - info.CurrentSize + if err := info.Group.IncreaseSize(increase); err != nil { + return errors.NewAutoscalerError(errors.CloudProviderError, + "failed to increase node group size: %v", err) + } + context.ClusterStateRegistry.RegisterScaleUp( + &clusterstate.ScaleUpRequest{ + NodeGroupName: info.Group.Id(), + Increase: increase, + Time: time.Now(), + ExpectedAddTime: time.Now().Add(context.MaxNodeProvisionTime), + }) + metrics.RegisterScaleUp(increase) + context.LogRecorder.Eventf(apiv1.EventTypeNormal, "ScaledUpGroup", + "Scale-up: group %s size set to %d", info.Group.Id(), info.NewSize) + return nil +} diff --git a/cluster-autoscaler/core/scale_up_test.go b/cluster-autoscaler/core/scale_up_test.go index c35575d724..f5de407106 100644 --- a/cluster-autoscaler/core/scale_up_test.go +++ b/cluster-autoscaler/core/scale_up_test.go @@ -25,6 +25,7 @@ import ( "github.com/stretchr/testify/assert" "k8s.io/apimachinery/pkg/runtime" + "k8s.io/autoscaler/cluster-autoscaler/cloudprovider" "k8s.io/autoscaler/cluster-autoscaler/cloudprovider/test" "k8s.io/autoscaler/cluster-autoscaler/clusterstate" "k8s.io/autoscaler/cluster-autoscaler/clusterstate/utils" @@ -354,3 +355,90 @@ func TestScaleUpNoHelp(t *testing.T) { } assert.Regexp(t, regexp.MustCompile("NotTriggerScaleUp"), event) } + +func TestScaleUpBalanceGroups(t *testing.T) { + fakeClient := &fake.Clientset{} + provider := testprovider.NewTestCloudProvider(func(string, int) error { + return nil + }, nil) + + type ngInfo struct { + min, max, size int + } + testCfg := map[string]ngInfo{ + "ng1": {min: 1, max: 1, size: 1}, + "ng2": {min: 1, max: 2, size: 1}, + "ng3": {min: 1, max: 5, size: 1}, + "ng4": {min: 1, max: 5, size: 3}, + } + podMap := make(map[string]*apiv1.Pod, len(testCfg)) + nodes := make([]*apiv1.Node, 0) + + for gid, gconf := range testCfg { + provider.AddNodeGroup(gid, gconf.min, gconf.max, gconf.size) + for i := 0; i < gconf.size; i++ { + nodeName := fmt.Sprintf("%v-node-%v", gid, i) + node := BuildTestNode(nodeName, 100, 1000) + SetNodeReadyState(node, true, time.Now()) + nodes = append(nodes, node) + + pod := BuildTestPod(fmt.Sprintf("%v-pod-%v", gid, i), 80, 0) + pod.Spec.NodeName = nodeName + podMap[gid] = pod + + provider.AddNode(gid, node) + } + } + + fakeClient.Fake.AddReactor("list", "pods", func(action core.Action) (bool, runtime.Object, error) { + list := action.(core.ListAction) + fieldstring := list.GetListRestrictions().Fields.String() + matcher, err := regexp.Compile("ng[0-9]") + if err != nil { + return false, &apiv1.PodList{Items: []apiv1.Pod{}}, err + } + matches := matcher.FindStringSubmatch(fieldstring) + if len(matches) != 1 { + return false, &apiv1.PodList{Items: []apiv1.Pod{}}, fmt.Errorf("parse error") + } + return true, &apiv1.PodList{Items: []apiv1.Pod{*(podMap[matches[0]])}}, nil + }) + + clusterState := clusterstate.NewClusterStateRegistry(provider, clusterstate.ClusterStateRegistryConfig{}) + clusterState.UpdateNodes(nodes, time.Now()) + fakeRecorder := kube_record.NewFakeRecorder(5) + fakeLogRecorder, _ := utils.NewStatusMapRecorder(fakeClient, kube_record.NewFakeRecorder(5), false) + context := &AutoscalingContext{ + AutoscalingOptions: AutoscalingOptions{ + EstimatorName: estimator.BinpackingEstimatorName, + BalanceSimilarNodeGroups: true, + }, + PredicateChecker: simulator.NewTestPredicateChecker(), + CloudProvider: provider, + ClientSet: fakeClient, + Recorder: fakeRecorder, + ExpanderStrategy: random.NewStrategy(), + ClusterStateRegistry: clusterState, + LogRecorder: fakeLogRecorder, + } + + pods := make([]*apiv1.Pod, 0) + for i := 0; i < 2; i++ { + pods = append(pods, BuildTestPod(fmt.Sprintf("test-pod-%v", i), 80, 0)) + } + + result, typedErr := ScaleUp(context, pods, nodes, []*extensionsv1.DaemonSet{}) + assert.NoError(t, typedErr) + assert.True(t, result) + groupMap := make(map[string]cloudprovider.NodeGroup, 3) + for _, group := range provider.NodeGroups() { + groupMap[group.Id()] = group + } + + ng2size, err := groupMap["ng2"].TargetSize() + assert.NoError(t, err) + ng3size, err := groupMap["ng3"].TargetSize() + assert.NoError(t, err) + assert.Equal(t, 2, ng2size) + assert.Equal(t, 2, ng3size) +} diff --git a/cluster-autoscaler/main.go b/cluster-autoscaler/main.go index 4018a4fd60..08adb0f446 100644 --- a/cluster-autoscaler/main.go +++ b/cluster-autoscaler/main.go @@ -98,9 +98,10 @@ var ( expanderFlag = flag.String("expander", expander.RandomExpanderName, "Type of node group expander to be used in scale up. Available values: ["+strings.Join(expander.AvailableExpanders, ",")+"]") - writeStatusConfigMapFlag = flag.Bool("write-status-configmap", true, "Should CA write status information to a configmap") - maxInactivityTimeFlag = flag.Duration("max-inactivity", 10*time.Minute, "Maximum time from last recorded autoscaler activity before automatic restart") - maxFailingTimeFlag = flag.Duration("max-failing-time", 15*time.Minute, "Maximum time from last recorded successful autoscaler run before automatic restart") + writeStatusConfigMapFlag = flag.Bool("write-status-configmap", true, "Should CA write status information to a configmap") + maxInactivityTimeFlag = flag.Duration("max-inactivity", 10*time.Minute, "Maximum time from last recorded autoscaler activity before automatic restart") + maxFailingTimeFlag = flag.Duration("max-failing-time", 15*time.Minute, "Maximum time from last recorded successful autoscaler run before automatic restart") + balanceSimilarNodeGroupsFlag = flag.Bool("balance-similar-node-groups", false, "Detect similar node groups and balance the number of nodes between them") ) func createAutoscalerOptions() core.AutoscalerOptions { @@ -126,6 +127,7 @@ func createAutoscalerOptions() core.AutoscalerOptions { ScaleDownUtilizationThreshold: *scaleDownUtilizationThreshold, VerifyUnschedulablePods: *verifyUnschedulablePods, WriteStatusConfigMap: *writeStatusConfigMapFlag, + BalanceSimilarNodeGroups: *balanceSimilarNodeGroupsFlag, } configFetcherOpts := dynamic.ConfigFetcherOptions{