From 5faa41e6836cbaca06cf78f0d54897ef6a7ce2ce Mon Sep 17 00:00:00 2001 From: Maciej Pytel Date: Fri, 25 May 2018 13:27:28 +0200 Subject: [PATCH 1/2] Move PodListProcessor to new directory It's not really a util and with more processors coming it makes more sense to keep them in dedicated place. --- cluster-autoscaler/core/autoscaler.go | 2 +- cluster-autoscaler/core/autoscaler_builder.go | 2 +- cluster-autoscaler/core/static_autoscaler.go | 2 +- cluster-autoscaler/core/static_autoscaler_test.go | 2 +- .../{utils => processors}/pods/pod_list_processor.go | 0 .../{utils => processors}/pods/pod_list_processor_test.go | 0 6 files changed, 4 insertions(+), 4 deletions(-) rename cluster-autoscaler/{utils => processors}/pods/pod_list_processor.go (100%) rename cluster-autoscaler/{utils => processors}/pods/pod_list_processor_test.go (100%) diff --git a/cluster-autoscaler/core/autoscaler.go b/cluster-autoscaler/core/autoscaler.go index db052deced..b4140fd39e 100644 --- a/cluster-autoscaler/core/autoscaler.go +++ b/cluster-autoscaler/core/autoscaler.go @@ -23,7 +23,7 @@ import ( "k8s.io/autoscaler/cluster-autoscaler/simulator" "k8s.io/autoscaler/cluster-autoscaler/utils/errors" kube_util "k8s.io/autoscaler/cluster-autoscaler/utils/kubernetes" - "k8s.io/autoscaler/cluster-autoscaler/utils/pods" + "k8s.io/autoscaler/cluster-autoscaler/processors/pods" kube_client "k8s.io/client-go/kubernetes" kube_record "k8s.io/client-go/tools/record" ) diff --git a/cluster-autoscaler/core/autoscaler_builder.go b/cluster-autoscaler/core/autoscaler_builder.go index aa347db4d6..989210286d 100644 --- a/cluster-autoscaler/core/autoscaler_builder.go +++ b/cluster-autoscaler/core/autoscaler_builder.go @@ -22,7 +22,7 @@ import ( "k8s.io/autoscaler/cluster-autoscaler/simulator" "k8s.io/autoscaler/cluster-autoscaler/utils/errors" kube_util "k8s.io/autoscaler/cluster-autoscaler/utils/kubernetes" - "k8s.io/autoscaler/cluster-autoscaler/utils/pods" + "k8s.io/autoscaler/cluster-autoscaler/processors/pods" kube_client "k8s.io/client-go/kubernetes" kube_record "k8s.io/client-go/tools/record" ) diff --git a/cluster-autoscaler/core/static_autoscaler.go b/cluster-autoscaler/core/static_autoscaler.go index a551e57be2..7fbfc75df8 100644 --- a/cluster-autoscaler/core/static_autoscaler.go +++ b/cluster-autoscaler/core/static_autoscaler.go @@ -27,7 +27,7 @@ import ( "k8s.io/autoscaler/cluster-autoscaler/utils/errors" "k8s.io/autoscaler/cluster-autoscaler/utils/gpu" kube_util "k8s.io/autoscaler/cluster-autoscaler/utils/kubernetes" - "k8s.io/autoscaler/cluster-autoscaler/utils/pods" + "k8s.io/autoscaler/cluster-autoscaler/processors/pods" "k8s.io/autoscaler/cluster-autoscaler/utils/tpu" apiv1 "k8s.io/api/core/v1" diff --git a/cluster-autoscaler/core/static_autoscaler_test.go b/cluster-autoscaler/core/static_autoscaler_test.go index 015e6b7869..8b72c9dcde 100644 --- a/cluster-autoscaler/core/static_autoscaler_test.go +++ b/cluster-autoscaler/core/static_autoscaler_test.go @@ -29,7 +29,7 @@ import ( "k8s.io/autoscaler/cluster-autoscaler/expander/random" "k8s.io/autoscaler/cluster-autoscaler/simulator" kube_util "k8s.io/autoscaler/cluster-autoscaler/utils/kubernetes" - "k8s.io/autoscaler/cluster-autoscaler/utils/pods" + "k8s.io/autoscaler/cluster-autoscaler/processors/pods" scheduler_util "k8s.io/autoscaler/cluster-autoscaler/utils/scheduler" . "k8s.io/autoscaler/cluster-autoscaler/utils/test" diff --git a/cluster-autoscaler/utils/pods/pod_list_processor.go b/cluster-autoscaler/processors/pods/pod_list_processor.go similarity index 100% rename from cluster-autoscaler/utils/pods/pod_list_processor.go rename to cluster-autoscaler/processors/pods/pod_list_processor.go diff --git a/cluster-autoscaler/utils/pods/pod_list_processor_test.go b/cluster-autoscaler/processors/pods/pod_list_processor_test.go similarity index 100% rename from cluster-autoscaler/utils/pods/pod_list_processor_test.go rename to cluster-autoscaler/processors/pods/pod_list_processor_test.go From 856855987b7a18fd382cf7950c652cb30ecfb26d Mon Sep 17 00:00:00 2001 From: Maciej Pytel Date: Mon, 28 May 2018 13:40:25 +0200 Subject: [PATCH 2/2] Move some GKE-specific logic outside core No change in actual logic being executed. Added a new NodeGroupListProcessor interface to encapsulate the existing logic. Moved PodListProcessor and refactor how it's passed around to make it consistent and easy to add similar interfaces. --- cluster-autoscaler/core/autoscaler.go | 10 +- cluster-autoscaler/core/autoscaler_builder.go | 10 +- cluster-autoscaler/core/scale_up.go | 87 ++------------ cluster-autoscaler/core/scale_up_test.go | 93 ++++---------- cluster-autoscaler/core/static_autoscaler.go | 12 +- .../core/static_autoscaler_test.go | 13 +- .../nodegroups/autoprovisioning_processor.go | 113 ++++++++++++++++++ .../autoprovisioning_processor_test.go | 101 ++++++++++++++++ .../nodegroups/nodegroup_list_processor.go | 47 ++++++++ cluster-autoscaler/processors/processors.go | 45 +++++++ 10 files changed, 363 insertions(+), 168 deletions(-) create mode 100644 cluster-autoscaler/processors/nodegroups/autoprovisioning_processor.go create mode 100644 cluster-autoscaler/processors/nodegroups/autoprovisioning_processor_test.go create mode 100644 cluster-autoscaler/processors/nodegroups/nodegroup_list_processor.go create mode 100644 cluster-autoscaler/processors/processors.go diff --git a/cluster-autoscaler/core/autoscaler.go b/cluster-autoscaler/core/autoscaler.go index b4140fd39e..0832f88c92 100644 --- a/cluster-autoscaler/core/autoscaler.go +++ b/cluster-autoscaler/core/autoscaler.go @@ -20,10 +20,10 @@ import ( "time" "k8s.io/autoscaler/cluster-autoscaler/context" + ca_processors "k8s.io/autoscaler/cluster-autoscaler/processors" "k8s.io/autoscaler/cluster-autoscaler/simulator" "k8s.io/autoscaler/cluster-autoscaler/utils/errors" kube_util "k8s.io/autoscaler/cluster-autoscaler/utils/kubernetes" - "k8s.io/autoscaler/cluster-autoscaler/processors/pods" kube_client "k8s.io/client-go/kubernetes" kube_record "k8s.io/client-go/tools/record" ) @@ -35,7 +35,7 @@ type AutoscalerOptions struct { KubeEventRecorder kube_record.EventRecorder PredicateChecker *simulator.PredicateChecker ListerRegistry kube_util.ListerRegistry - PodListProcessor pods.PodListProcessor + Processors *ca_processors.AutoscalingProcessors } // Autoscaler is the main component of CA which scales up/down node groups according to its configuration @@ -48,8 +48,8 @@ type Autoscaler interface { } func initializeDefaultOptions(opts *AutoscalerOptions) error { - if opts.PodListProcessor == nil { - opts.PodListProcessor = pods.NewDefaultPodListProcessor() + if opts.Processors == nil { + opts.Processors = ca_processors.DefaultProcessors() } return nil } @@ -60,6 +60,6 @@ func NewAutoscaler(opts AutoscalerOptions) (Autoscaler, errors.AutoscalerError) if err != nil { return nil, errors.ToAutoscalerError(errors.InternalError, err) } - autoscalerBuilder := NewAutoscalerBuilder(opts.AutoscalingOptions, opts.PredicateChecker, opts.KubeClient, opts.KubeEventRecorder, opts.ListerRegistry, opts.PodListProcessor) + autoscalerBuilder := NewAutoscalerBuilder(opts.AutoscalingOptions, opts.PredicateChecker, opts.KubeClient, opts.KubeEventRecorder, opts.ListerRegistry, opts.Processors) return autoscalerBuilder.Build() } diff --git a/cluster-autoscaler/core/autoscaler_builder.go b/cluster-autoscaler/core/autoscaler_builder.go index 989210286d..fa6b66e2d4 100644 --- a/cluster-autoscaler/core/autoscaler_builder.go +++ b/cluster-autoscaler/core/autoscaler_builder.go @@ -19,10 +19,10 @@ package core import ( "k8s.io/autoscaler/cluster-autoscaler/config/dynamic" "k8s.io/autoscaler/cluster-autoscaler/context" + ca_processors "k8s.io/autoscaler/cluster-autoscaler/processors" "k8s.io/autoscaler/cluster-autoscaler/simulator" "k8s.io/autoscaler/cluster-autoscaler/utils/errors" kube_util "k8s.io/autoscaler/cluster-autoscaler/utils/kubernetes" - "k8s.io/autoscaler/cluster-autoscaler/processors/pods" kube_client "k8s.io/client-go/kubernetes" kube_record "k8s.io/client-go/tools/record" ) @@ -42,19 +42,19 @@ type AutoscalerBuilderImpl struct { kubeEventRecorder kube_record.EventRecorder predicateChecker *simulator.PredicateChecker listerRegistry kube_util.ListerRegistry - podListProcessor pods.PodListProcessor + processors *ca_processors.AutoscalingProcessors } // NewAutoscalerBuilder builds an AutoscalerBuilder from required parameters func NewAutoscalerBuilder(autoscalingOptions context.AutoscalingOptions, predicateChecker *simulator.PredicateChecker, - kubeClient kube_client.Interface, kubeEventRecorder kube_record.EventRecorder, listerRegistry kube_util.ListerRegistry, podListProcessor pods.PodListProcessor) *AutoscalerBuilderImpl { + kubeClient kube_client.Interface, kubeEventRecorder kube_record.EventRecorder, listerRegistry kube_util.ListerRegistry, processors *ca_processors.AutoscalingProcessors) *AutoscalerBuilderImpl { return &AutoscalerBuilderImpl{ autoscalingOptions: autoscalingOptions, kubeClient: kubeClient, kubeEventRecorder: kubeEventRecorder, predicateChecker: predicateChecker, listerRegistry: listerRegistry, - podListProcessor: podListProcessor, + processors: processors, } } @@ -72,5 +72,5 @@ func (b *AutoscalerBuilderImpl) Build() (Autoscaler, errors.AutoscalerError) { c := *(b.dynamicConfig) options.NodeGroups = c.NodeGroupSpecStrings() } - return NewStaticAutoscaler(options, b.predicateChecker, b.kubeClient, b.kubeEventRecorder, b.listerRegistry, b.podListProcessor) + return NewStaticAutoscaler(options, b.predicateChecker, b.kubeClient, b.kubeEventRecorder, b.listerRegistry, b.processors) } diff --git a/cluster-autoscaler/core/scale_up.go b/cluster-autoscaler/core/scale_up.go index 17ec0276d8..cc3e0df49d 100644 --- a/cluster-autoscaler/core/scale_up.go +++ b/cluster-autoscaler/core/scale_up.go @@ -22,17 +22,15 @@ import ( apiv1 "k8s.io/api/core/v1" extensionsv1 "k8s.io/api/extensions/v1beta1" - "k8s.io/apimachinery/pkg/api/resource" "k8s.io/autoscaler/cluster-autoscaler/cloudprovider" "k8s.io/autoscaler/cluster-autoscaler/clusterstate" "k8s.io/autoscaler/cluster-autoscaler/context" "k8s.io/autoscaler/cluster-autoscaler/estimator" "k8s.io/autoscaler/cluster-autoscaler/expander" "k8s.io/autoscaler/cluster-autoscaler/metrics" + ca_processors "k8s.io/autoscaler/cluster-autoscaler/processors" "k8s.io/autoscaler/cluster-autoscaler/utils/errors" "k8s.io/autoscaler/cluster-autoscaler/utils/glogx" - "k8s.io/autoscaler/cluster-autoscaler/utils/gpu" - "k8s.io/autoscaler/cluster-autoscaler/utils/labels" "k8s.io/autoscaler/cluster-autoscaler/utils/nodegroupset" "k8s.io/kubernetes/pkg/scheduler/schedulercache" @@ -42,8 +40,8 @@ import ( // ScaleUp tries to scale the cluster up. Return true if it found a way to increase the size, // false if it didn't and error if an error occurred. Assumes that all nodes in the cluster are // ready and in sync with instance groups. -func ScaleUp(context *context.AutoscalingContext, clusterStateRegistry *clusterstate.ClusterStateRegistry, unschedulablePods []*apiv1.Pod, nodes []*apiv1.Node, - daemonSets []*extensionsv1.DaemonSet) (bool, errors.AutoscalerError) { +func ScaleUp(context *context.AutoscalingContext, processors *ca_processors.AutoscalingProcessors, clusterStateRegistry *clusterstate.ClusterStateRegistry, unschedulablePods []*apiv1.Pod, + nodes []*apiv1.Node, daemonSets []*extensionsv1.DaemonSet) (bool, errors.AutoscalerError) { // From now on we only care about unschedulable pods that were marked after the newest // node became available for the scheduler. if len(unschedulablePods) == 0 { @@ -97,8 +95,12 @@ func ScaleUp(context *context.AutoscalingContext, clusterStateRegistry *clusters podsPassingPredicates := make(map[string][]*apiv1.Pod) expansionOptions := make([]expander.Option, 0) - if context.AutoscalingOptions.NodeAutoprovisioningEnabled { - nodeGroups, nodeInfos = addAutoprovisionedCandidates(context, nodeGroups, nodeInfos, unschedulablePods) + if processors != nil && processors.NodeGroupListProcessor != nil { + var errProc error + nodeGroups, nodeInfos, errProc = processors.NodeGroupListProcessor.Process(context, nodeGroups, nodeInfos, unschedulablePods) + if errProc != nil { + return false, errors.ToAutoscalerError(errors.InternalError, errProc) + } } for _, nodeGroup := range nodeGroups { @@ -356,77 +358,6 @@ func executeScaleUp(context *context.AutoscalingContext, clusterStateRegistry *c return nil } -func addAutoprovisionedCandidates(context *context.AutoscalingContext, nodeGroups []cloudprovider.NodeGroup, - nodeInfos map[string]*schedulercache.NodeInfo, unschedulablePods []*apiv1.Pod) ([]cloudprovider.NodeGroup, - map[string]*schedulercache.NodeInfo) { - - autoprovisionedNodeGroupCount := 0 - for _, group := range nodeGroups { - if group.Autoprovisioned() { - autoprovisionedNodeGroupCount++ - } - } - if autoprovisionedNodeGroupCount >= context.MaxAutoprovisionedNodeGroupCount { - glog.V(4).Infof("Max autoprovisioned node group count reached") - return nodeGroups, nodeInfos - } - - newGroupsCount := 0 - - newNodeGroups := addAllMachineTypesForConfig(context, map[string]string{}, map[string]resource.Quantity{}, - nodeInfos, unschedulablePods) - newGroupsCount += len(newNodeGroups) - nodeGroups = append(nodeGroups, newNodeGroups...) - - gpuRequests := gpu.GetGpuRequests(unschedulablePods) - for _, gpuRequestInfo := range gpuRequests { - glog.V(4).Info("Adding node groups using GPU to NAP simulations") - extraResources := map[string]resource.Quantity{ - gpu.ResourceNvidiaGPU: gpuRequestInfo.MaxRequest, - } - newNodeGroups := addAllMachineTypesForConfig(context, gpuRequestInfo.SystemLabels, extraResources, - nodeInfos, gpuRequestInfo.Pods) - newGroupsCount += len(newNodeGroups) - nodeGroups = append(nodeGroups, newNodeGroups...) - } - glog.V(4).Infof("Considering %v potential node groups in NAP simulations", newGroupsCount) - - return nodeGroups, nodeInfos -} - -func addAllMachineTypesForConfig(context *context.AutoscalingContext, systemLabels map[string]string, extraResources map[string]resource.Quantity, - nodeInfos map[string]*schedulercache.NodeInfo, unschedulablePods []*apiv1.Pod) []cloudprovider.NodeGroup { - - nodeGroups := make([]cloudprovider.NodeGroup, 0) - machines, err := context.CloudProvider.GetAvailableMachineTypes() - if err != nil { - glog.Warningf("Failed to get machine types: %v", err) - return nodeGroups - } - - bestLabels := labels.BestLabelSet(unschedulablePods) - taints := make([]apiv1.Taint, 0) - for _, machineType := range machines { - nodeGroup, err := context.CloudProvider.NewNodeGroup(machineType, bestLabels, systemLabels, taints, extraResources) - if err != nil { - // We don't check if a given node group setup is allowed. - // It's fine if it isn't, just don't consider it an option. - if err != cloudprovider.ErrIllegalConfiguration { - glog.Warningf("Unable to build temporary node group for %s: %v", machineType, err) - } - continue - } - nodeInfo, err := nodeGroup.TemplateNodeInfo() - if err != nil { - glog.Warningf("Unable to build template for node group for %s: %v", nodeGroup.Id(), err) - continue - } - nodeInfos[nodeGroup.Id()] = nodeInfo - nodeGroups = append(nodeGroups, nodeGroup) - } - return nodeGroups -} - func calculateClusterCoresMemoryTotal(nodeGroups []cloudprovider.NodeGroup, nodeInfos map[string]*schedulercache.NodeInfo) (int64, int64) { var coresTotal int64 var memoryTotal int64 diff --git a/cluster-autoscaler/core/scale_up_test.go b/cluster-autoscaler/core/scale_up_test.go index d7e49eca8d..884a95de83 100644 --- a/cluster-autoscaler/core/scale_up_test.go +++ b/cluster-autoscaler/core/scale_up_test.go @@ -31,6 +31,8 @@ import ( "k8s.io/autoscaler/cluster-autoscaler/context" "k8s.io/autoscaler/cluster-autoscaler/estimator" "k8s.io/autoscaler/cluster-autoscaler/expander/random" + ca_processors "k8s.io/autoscaler/cluster-autoscaler/processors" + "k8s.io/autoscaler/cluster-autoscaler/processors/nodegroups" "k8s.io/autoscaler/cluster-autoscaler/simulator" kube_util "k8s.io/autoscaler/cluster-autoscaler/utils/kubernetes" . "k8s.io/autoscaler/cluster-autoscaler/utils/test" @@ -368,7 +370,9 @@ func simpleScaleUpTest(t *testing.T, config *scaleTestConfig) { extraPods[i] = pod } - result, err := ScaleUp(context, clusterState, extraPods, nodes, []*extensionsv1.DaemonSet{}) + processors := ca_processors.TestProcessors() + + result, err := ScaleUp(context, processors, clusterState, extraPods, nodes, []*extensionsv1.DaemonSet{}) assert.NoError(t, err) assert.True(t, result) @@ -470,7 +474,9 @@ func TestScaleUpNodeComingNoScale(t *testing.T) { } p3 := BuildTestPod("p-new", 550, 0) - result, err := ScaleUp(context, clusterState, []*apiv1.Pod{p3}, []*apiv1.Node{n1, n2}, []*extensionsv1.DaemonSet{}) + processors := ca_processors.TestProcessors() + + result, err := ScaleUp(context, processors, clusterState, []*apiv1.Pod{p3}, []*apiv1.Node{n1, n2}, []*extensionsv1.DaemonSet{}) assert.NoError(t, err) // A node is already coming - no need for scale up. assert.False(t, result) @@ -532,7 +538,9 @@ func TestScaleUpNodeComingHasScale(t *testing.T) { } p3 := BuildTestPod("p-new", 550, 0) - result, err := ScaleUp(context, clusterState, []*apiv1.Pod{p3, p3}, []*apiv1.Node{n1, n2}, []*extensionsv1.DaemonSet{}) + processors := ca_processors.TestProcessors() + result, err := ScaleUp(context, processors, clusterState, []*apiv1.Pod{p3, p3}, []*apiv1.Node{n1, n2}, []*extensionsv1.DaemonSet{}) + assert.NoError(t, err) // Two nodes needed but one node is already coming, so it should increase by one. assert.True(t, result) @@ -591,7 +599,9 @@ func TestScaleUpUnhealthy(t *testing.T) { } p3 := BuildTestPod("p-new", 550, 0) - result, err := ScaleUp(context, clusterState, []*apiv1.Pod{p3}, []*apiv1.Node{n1, n2}, []*extensionsv1.DaemonSet{}) + processors := ca_processors.TestProcessors() + result, err := ScaleUp(context, processors, clusterState, []*apiv1.Pod{p3}, []*apiv1.Node{n1, n2}, []*extensionsv1.DaemonSet{}) + assert.NoError(t, err) // Node group is unhealthy. assert.False(t, result) @@ -641,7 +651,9 @@ func TestScaleUpNoHelp(t *testing.T) { } p3 := BuildTestPod("p-new", 500, 0) - result, err := ScaleUp(context, clusterState, []*apiv1.Pod{p3}, []*apiv1.Node{n1}, []*extensionsv1.DaemonSet{}) + processors := ca_processors.TestProcessors() + result, err := ScaleUp(context, processors, clusterState, []*apiv1.Pod{p3}, []*apiv1.Node{n1}, []*extensionsv1.DaemonSet{}) + assert.NoError(t, err) assert.False(t, result) var event string @@ -725,7 +737,9 @@ func TestScaleUpBalanceGroups(t *testing.T) { pods = append(pods, BuildTestPod(fmt.Sprintf("test-pod-%v", i), 80, 0)) } - result, typedErr := ScaleUp(context, clusterState, pods, nodes, []*extensionsv1.DaemonSet{}) + processors := ca_processors.TestProcessors() + result, typedErr := ScaleUp(context, processors, clusterState, pods, nodes, []*extensionsv1.DaemonSet{}) + assert.NoError(t, typedErr) assert.True(t, result) groupMap := make(map[string]cloudprovider.NodeGroup, 3) @@ -783,71 +797,12 @@ func TestScaleUpAutoprovisionedNodeGroup(t *testing.T) { LogRecorder: fakeLogRecorder, } - result, err := ScaleUp(context, clusterState, []*apiv1.Pod{p1}, []*apiv1.Node{}, []*extensionsv1.DaemonSet{}) + processors := ca_processors.TestProcessors() + processors.NodeGroupListProcessor = nodegroups.NewAutoprovisioningNodeGroupListProcessor() + + result, err := ScaleUp(context, processors, clusterState, []*apiv1.Pod{p1}, []*apiv1.Node{}, []*extensionsv1.DaemonSet{}) assert.NoError(t, err) assert.True(t, result) assert.Equal(t, "autoprovisioned-T1", getStringFromChan(createdGroups)) assert.Equal(t, "autoprovisioned-T1-1", getStringFromChan(expandedGroups)) } - -func TestAddAutoprovisionedCandidatesOK(t *testing.T) { - t1 := BuildTestNode("t1", 4000, 1000000) - ti1 := schedulercache.NewNodeInfo() - ti1.SetNode(t1) - p1 := BuildTestPod("p1", 100, 100) - - n1 := BuildTestNode("ng1-xxx", 4000, 1000000) - ni1 := schedulercache.NewNodeInfo() - ni1.SetNode(n1) - - provider := testprovider.NewTestAutoprovisioningCloudProvider(nil, nil, - nil, nil, - []string{"T1"}, map[string]*schedulercache.NodeInfo{"T1": ti1}) - provider.AddNodeGroup("ng1", 1, 5, 3) - - context := &context.AutoscalingContext{ - AutoscalingOptions: context.AutoscalingOptions{ - MaxAutoprovisionedNodeGroupCount: 1, - }, - CloudProvider: provider, - } - nodeGroups := provider.NodeGroups() - nodeInfos := map[string]*schedulercache.NodeInfo{ - "ng1": ni1, - } - nodeGroups, nodeInfos = addAutoprovisionedCandidates(context, nodeGroups, nodeInfos, []*apiv1.Pod{p1}) - - assert.Equal(t, 2, len(nodeGroups)) - assert.Equal(t, 2, len(nodeInfos)) -} - -func TestAddAutoprovisionedCandidatesToMany(t *testing.T) { - t1 := BuildTestNode("T1-abc", 4000, 1000000) - ti1 := schedulercache.NewNodeInfo() - ti1.SetNode(t1) - - x1 := BuildTestNode("X1-cde", 4000, 1000000) - xi1 := schedulercache.NewNodeInfo() - xi1.SetNode(x1) - - p1 := BuildTestPod("p1", 100, 100) - - provider := testprovider.NewTestAutoprovisioningCloudProvider(nil, nil, - nil, nil, - []string{"T1", "X1"}, - map[string]*schedulercache.NodeInfo{"T1": ti1, "X1": xi1}) - provider.AddAutoprovisionedNodeGroup("autoprovisioned-X1", 0, 1000, 0, "X1") - - context := &context.AutoscalingContext{ - AutoscalingOptions: context.AutoscalingOptions{ - MaxAutoprovisionedNodeGroupCount: 1, - }, - CloudProvider: provider, - } - nodeGroups := provider.NodeGroups() - nodeInfos := map[string]*schedulercache.NodeInfo{"X1": xi1} - nodeGroups, nodeInfos = addAutoprovisionedCandidates(context, nodeGroups, nodeInfos, []*apiv1.Pod{p1}) - - assert.Equal(t, 1, len(nodeGroups)) - assert.Equal(t, 1, len(nodeInfos)) -} diff --git a/cluster-autoscaler/core/static_autoscaler.go b/cluster-autoscaler/core/static_autoscaler.go index 7fbfc75df8..1bcdf3930b 100644 --- a/cluster-autoscaler/core/static_autoscaler.go +++ b/cluster-autoscaler/core/static_autoscaler.go @@ -23,11 +23,11 @@ import ( "k8s.io/autoscaler/cluster-autoscaler/clusterstate/utils" "k8s.io/autoscaler/cluster-autoscaler/context" "k8s.io/autoscaler/cluster-autoscaler/metrics" + ca_processors "k8s.io/autoscaler/cluster-autoscaler/processors" "k8s.io/autoscaler/cluster-autoscaler/simulator" "k8s.io/autoscaler/cluster-autoscaler/utils/errors" "k8s.io/autoscaler/cluster-autoscaler/utils/gpu" kube_util "k8s.io/autoscaler/cluster-autoscaler/utils/kubernetes" - "k8s.io/autoscaler/cluster-autoscaler/processors/pods" "k8s.io/autoscaler/cluster-autoscaler/utils/tpu" apiv1 "k8s.io/api/core/v1" @@ -60,14 +60,14 @@ type StaticAutoscaler struct { lastScaleDownDeleteTime time.Time lastScaleDownFailTime time.Time scaleDown *ScaleDown - podListProcessor pods.PodListProcessor + processors *ca_processors.AutoscalingProcessors initialized bool } // NewStaticAutoscaler creates an instance of Autoscaler filled with provided parameters func NewStaticAutoscaler(opts context.AutoscalingOptions, predicateChecker *simulator.PredicateChecker, kubeClient kube_client.Interface, kubeEventRecorder kube_record.EventRecorder, listerRegistry kube_util.ListerRegistry, - podListProcessor pods.PodListProcessor) (*StaticAutoscaler, errors.AutoscalerError) { + processors *ca_processors.AutoscalingProcessors) (*StaticAutoscaler, errors.AutoscalerError) { logRecorder, err := utils.NewStatusMapRecorder(kubeClient, opts.ConfigNamespace, kubeEventRecorder, opts.WriteStatusConfigMap) if err != nil { glog.Error("Failed to initialize status configmap, unable to write status events") @@ -97,7 +97,7 @@ func NewStaticAutoscaler(opts context.AutoscalingOptions, predicateChecker *simu lastScaleDownDeleteTime: time.Now(), lastScaleDownFailTime: time.Now(), scaleDown: scaleDown, - podListProcessor: podListProcessor, + processors: processors, clusterStateRegistry: clusterStateRegistry, }, nil } @@ -210,7 +210,7 @@ func (a *StaticAutoscaler) RunOnce(currentTime time.Time) errors.AutoscalerError return errors.ToAutoscalerError(errors.ApiCallError, err) } - allUnschedulablePods, allScheduled, err = a.podListProcessor.Process(a.AutoscalingContext, allUnschedulablePods, allScheduled, allNodes) + allUnschedulablePods, allScheduled, err = a.processors.PodListProcessor.Process(a.AutoscalingContext, allUnschedulablePods, allScheduled, allNodes) if err != nil { glog.Errorf("Failed to process pod list: %v", err) return errors.ToAutoscalerError(errors.InternalError, err) @@ -275,7 +275,7 @@ func (a *StaticAutoscaler) RunOnce(currentTime time.Time) errors.AutoscalerError scaleUpStart := time.Now() metrics.UpdateLastTime(metrics.ScaleUp, scaleUpStart) - scaledUp, typedErr := ScaleUp(autoscalingContext, a.clusterStateRegistry, unschedulablePodsToHelp, readyNodes, daemonsets) + scaledUp, typedErr := ScaleUp(autoscalingContext, a.processors, a.clusterStateRegistry, unschedulablePodsToHelp, readyNodes, daemonsets) metrics.UpdateDurationFromStart(metrics.ScaleUp, scaleUpStart) diff --git a/cluster-autoscaler/core/static_autoscaler_test.go b/cluster-autoscaler/core/static_autoscaler_test.go index 8b72c9dcde..b4487ecf50 100644 --- a/cluster-autoscaler/core/static_autoscaler_test.go +++ b/cluster-autoscaler/core/static_autoscaler_test.go @@ -27,9 +27,10 @@ import ( "k8s.io/autoscaler/cluster-autoscaler/context" "k8s.io/autoscaler/cluster-autoscaler/estimator" "k8s.io/autoscaler/cluster-autoscaler/expander/random" + ca_processors "k8s.io/autoscaler/cluster-autoscaler/processors" + "k8s.io/autoscaler/cluster-autoscaler/processors/nodegroups" "k8s.io/autoscaler/cluster-autoscaler/simulator" kube_util "k8s.io/autoscaler/cluster-autoscaler/utils/kubernetes" - "k8s.io/autoscaler/cluster-autoscaler/processors/pods" scheduler_util "k8s.io/autoscaler/cluster-autoscaler/utils/scheduler" . "k8s.io/autoscaler/cluster-autoscaler/utils/test" @@ -200,7 +201,7 @@ func TestStaticAutoscalerRunOnce(t *testing.T) { lastScaleUpTime: time.Now(), lastScaleDownFailTime: time.Now(), scaleDown: sd, - podListProcessor: pods.NewDefaultPodListProcessor(), + processors: ca_processors.TestProcessors(), initialized: true} // MaxNodesTotal reached. @@ -348,6 +349,8 @@ func TestStaticAutoscalerRunOnceWithAutoprovisionedEnabled(t *testing.T) { clusterState := clusterstate.NewClusterStateRegistry(provider, clusterStateConfig, fakeLogRecorder) clusterState.UpdateNodes([]*apiv1.Node{n1}, time.Now()) + processors := ca_processors.TestProcessors() + processors.NodeGroupListProcessor = nodegroups.NewAutoprovisioningNodeGroupListProcessor() context := &context.AutoscalingContext{ AutoscalingOptions: context.AutoscalingOptions{ EstimatorName: estimator.BinpackingEstimatorName, @@ -380,7 +383,7 @@ func TestStaticAutoscalerRunOnceWithAutoprovisionedEnabled(t *testing.T) { lastScaleUpTime: time.Now(), lastScaleDownFailTime: time.Now(), scaleDown: sd, - podListProcessor: pods.NewDefaultPodListProcessor(), + processors: processors, initialized: true} // Scale up. @@ -518,7 +521,7 @@ func TestStaticAutoscalerRunOnceWithALongUnregisteredNode(t *testing.T) { lastScaleUpTime: time.Now(), lastScaleDownFailTime: time.Now(), scaleDown: sd, - podListProcessor: pods.NewDefaultPodListProcessor()} + processors: ca_processors.TestProcessors()} // Scale up. readyNodeListerMock.On("List").Return([]*apiv1.Node{n1}, nil).Times(2) // due to initialized=false @@ -655,7 +658,7 @@ func TestStaticAutoscalerRunOncePodsWithPriorities(t *testing.T) { lastScaleUpTime: time.Now(), lastScaleDownFailTime: time.Now(), scaleDown: sd, - podListProcessor: pods.NewDefaultPodListProcessor()} + processors: ca_processors.TestProcessors()} // Scale up readyNodeListerMock.On("List").Return([]*apiv1.Node{n1, n2, n3}, nil).Times(2) // due to initialized=false diff --git a/cluster-autoscaler/processors/nodegroups/autoprovisioning_processor.go b/cluster-autoscaler/processors/nodegroups/autoprovisioning_processor.go new file mode 100644 index 0000000000..68fd9d596f --- /dev/null +++ b/cluster-autoscaler/processors/nodegroups/autoprovisioning_processor.go @@ -0,0 +1,113 @@ +/* +Copyright 2018 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package nodegroups + +import ( + apiv1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/api/resource" + "k8s.io/autoscaler/cluster-autoscaler/cloudprovider" + "k8s.io/autoscaler/cluster-autoscaler/context" + "k8s.io/autoscaler/cluster-autoscaler/utils/gpu" + "k8s.io/autoscaler/cluster-autoscaler/utils/labels" + "k8s.io/kubernetes/pkg/scheduler/schedulercache" + + "github.com/golang/glog" +) + +// AutoprovisioningNodeGroupListProcessor adds autoprovisioning candidates to consider in scale-up. +type AutoprovisioningNodeGroupListProcessor struct { +} + +// NewAutoprovisioningNodeGroupListProcessor creates an instance of NodeGroupListProcessor. +func NewAutoprovisioningNodeGroupListProcessor() NodeGroupListProcessor { + return &AutoprovisioningNodeGroupListProcessor{} +} + +// Process processes lists of unschedulable and sheduled pods before scaling of the cluster. +func (p *AutoprovisioningNodeGroupListProcessor) Process(context *context.AutoscalingContext, nodeGroups []cloudprovider.NodeGroup, nodeInfos map[string]*schedulercache.NodeInfo, + unschedulablePods []*apiv1.Pod) ([]cloudprovider.NodeGroup, map[string]*schedulercache.NodeInfo, error) { + + if !context.AutoscalingOptions.NodeAutoprovisioningEnabled { + return nodeGroups, nodeInfos, nil + } + + autoprovisionedNodeGroupCount := 0 + for _, group := range nodeGroups { + if group.Autoprovisioned() { + autoprovisionedNodeGroupCount++ + } + } + if autoprovisionedNodeGroupCount >= context.MaxAutoprovisionedNodeGroupCount { + glog.V(4).Infof("Max autoprovisioned node group count reached") + return nodeGroups, nodeInfos, nil + } + + newGroupsCount := 0 + + newNodeGroups := addAllMachineTypesForConfig(context, map[string]string{}, map[string]resource.Quantity{}, + nodeInfos, unschedulablePods) + newGroupsCount += len(newNodeGroups) + nodeGroups = append(nodeGroups, newNodeGroups...) + + gpuRequests := gpu.GetGpuRequests(unschedulablePods) + for _, gpuRequestInfo := range gpuRequests { + glog.V(4).Info("Adding node groups using GPU to NAP simulations") + extraResources := map[string]resource.Quantity{ + gpu.ResourceNvidiaGPU: gpuRequestInfo.MaxRequest, + } + newNodeGroups := addAllMachineTypesForConfig(context, gpuRequestInfo.SystemLabels, extraResources, + nodeInfos, gpuRequestInfo.Pods) + newGroupsCount += len(newNodeGroups) + nodeGroups = append(nodeGroups, newNodeGroups...) + } + glog.V(4).Infof("Considering %v potential node groups in NAP simulations", newGroupsCount) + + return nodeGroups, nodeInfos, nil +} + +func addAllMachineTypesForConfig(context *context.AutoscalingContext, systemLabels map[string]string, extraResources map[string]resource.Quantity, + nodeInfos map[string]*schedulercache.NodeInfo, unschedulablePods []*apiv1.Pod) []cloudprovider.NodeGroup { + + nodeGroups := make([]cloudprovider.NodeGroup, 0) + machines, err := context.CloudProvider.GetAvailableMachineTypes() + if err != nil { + glog.Warningf("Failed to get machine types: %v", err) + return nodeGroups + } + + bestLabels := labels.BestLabelSet(unschedulablePods) + taints := make([]apiv1.Taint, 0) + for _, machineType := range machines { + nodeGroup, err := context.CloudProvider.NewNodeGroup(machineType, bestLabels, systemLabels, taints, extraResources) + if err != nil { + // We don't check if a given node group setup is allowed. + // It's fine if it isn't, just don't consider it an option. + if err != cloudprovider.ErrIllegalConfiguration { + glog.Warningf("Unable to build temporary node group for %s: %v", machineType, err) + } + continue + } + nodeInfo, err := nodeGroup.TemplateNodeInfo() + if err != nil { + glog.Warningf("Unable to build template for node group for %s: %v", nodeGroup.Id(), err) + continue + } + nodeInfos[nodeGroup.Id()] = nodeInfo + nodeGroups = append(nodeGroups, nodeGroup) + } + return nodeGroups +} diff --git a/cluster-autoscaler/processors/nodegroups/autoprovisioning_processor_test.go b/cluster-autoscaler/processors/nodegroups/autoprovisioning_processor_test.go new file mode 100644 index 0000000000..81cadfdb2d --- /dev/null +++ b/cluster-autoscaler/processors/nodegroups/autoprovisioning_processor_test.go @@ -0,0 +1,101 @@ +/* +Copyright 2018 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package nodegroups + +import ( + "testing" + + apiv1 "k8s.io/api/core/v1" + testprovider "k8s.io/autoscaler/cluster-autoscaler/cloudprovider/test" + "k8s.io/autoscaler/cluster-autoscaler/context" + . "k8s.io/autoscaler/cluster-autoscaler/utils/test" + "k8s.io/kubernetes/pkg/scheduler/schedulercache" + + "github.com/stretchr/testify/assert" +) + +func TestAutoprovisioningNGLProcessor(t *testing.T) { + processor := NewAutoprovisioningNodeGroupListProcessor() + + t1 := BuildTestNode("t1", 4000, 1000000) + ti1 := schedulercache.NewNodeInfo() + ti1.SetNode(t1) + p1 := BuildTestPod("p1", 100, 100) + + n1 := BuildTestNode("ng1-xxx", 4000, 1000000) + ni1 := schedulercache.NewNodeInfo() + ni1.SetNode(n1) + + provider := testprovider.NewTestAutoprovisioningCloudProvider(nil, nil, + nil, nil, + []string{"T1"}, map[string]*schedulercache.NodeInfo{"T1": ti1}) + provider.AddNodeGroup("ng1", 1, 5, 3) + + context := &context.AutoscalingContext{ + AutoscalingOptions: context.AutoscalingOptions{ + MaxAutoprovisionedNodeGroupCount: 1, + NodeAutoprovisioningEnabled: true, + }, + CloudProvider: provider, + } + nodeGroups := provider.NodeGroups() + nodeInfos := map[string]*schedulercache.NodeInfo{ + "ng1": ni1, + } + var err error + nodeGroups, nodeInfos, err = processor.Process(context, nodeGroups, nodeInfos, []*apiv1.Pod{p1}) + + assert.NoError(t, err) + assert.Equal(t, 2, len(nodeGroups)) + assert.Equal(t, 2, len(nodeInfos)) +} + +func TestAutoprovisioningNGLProcessorTooMany(t *testing.T) { + processor := NewAutoprovisioningNodeGroupListProcessor() + + t1 := BuildTestNode("T1-abc", 4000, 1000000) + ti1 := schedulercache.NewNodeInfo() + ti1.SetNode(t1) + + x1 := BuildTestNode("X1-cde", 4000, 1000000) + xi1 := schedulercache.NewNodeInfo() + xi1.SetNode(x1) + + p1 := BuildTestPod("p1", 100, 100) + + provider := testprovider.NewTestAutoprovisioningCloudProvider(nil, nil, + nil, nil, + []string{"T1", "X1"}, + map[string]*schedulercache.NodeInfo{"T1": ti1, "X1": xi1}) + provider.AddAutoprovisionedNodeGroup("autoprovisioned-X1", 0, 1000, 0, "X1") + + context := &context.AutoscalingContext{ + AutoscalingOptions: context.AutoscalingOptions{ + MaxAutoprovisionedNodeGroupCount: 1, + NodeAutoprovisioningEnabled: true, + }, + CloudProvider: provider, + } + nodeGroups := provider.NodeGroups() + nodeInfos := map[string]*schedulercache.NodeInfo{"X1": xi1} + var err error + nodeGroups, nodeInfos, err = processor.Process(context, nodeGroups, nodeInfos, []*apiv1.Pod{p1}) + + assert.NoError(t, err) + assert.Equal(t, 1, len(nodeGroups)) + assert.Equal(t, 1, len(nodeInfos)) +} diff --git a/cluster-autoscaler/processors/nodegroups/nodegroup_list_processor.go b/cluster-autoscaler/processors/nodegroups/nodegroup_list_processor.go new file mode 100644 index 0000000000..9671f8a126 --- /dev/null +++ b/cluster-autoscaler/processors/nodegroups/nodegroup_list_processor.go @@ -0,0 +1,47 @@ +/* +Copyright 2018 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package nodegroups + +import ( + apiv1 "k8s.io/api/core/v1" + "k8s.io/autoscaler/cluster-autoscaler/cloudprovider" + "k8s.io/autoscaler/cluster-autoscaler/context" + "k8s.io/kubernetes/pkg/scheduler/schedulercache" +) + +// NodeGroupListProcessor processes lists of NodeGroups considered in scale-up. +type NodeGroupListProcessor interface { + Process(context *context.AutoscalingContext, nodeGroups []cloudprovider.NodeGroup, + nodeInfos map[string]*schedulercache.NodeInfo, + unschedulablePods []*apiv1.Pod) ([]cloudprovider.NodeGroup, map[string]*schedulercache.NodeInfo, error) +} + +// NoOpNodeGroupListProcessor is returning pod lists without processing them. +type NoOpNodeGroupListProcessor struct { +} + +// NewDefaultNodeGroupListProcessor creates an instance of NodeGroupListProcessor. +func NewDefaultNodeGroupListProcessor() NodeGroupListProcessor { + // TODO(maciekpytel): Use a better default + return &AutoprovisioningNodeGroupListProcessor{} +} + +// Process processes lists of unschedulable and sheduled pods before scaling of the cluster. +func (p *NoOpNodeGroupListProcessor) Process(context *context.AutoscalingContext, nodeGroups []cloudprovider.NodeGroup, nodeInfos map[string]*schedulercache.NodeInfo, + unschedulablePods []*apiv1.Pod) ([]cloudprovider.NodeGroup, map[string]*schedulercache.NodeInfo, error) { + return nodeGroups, nodeInfos, nil +} diff --git a/cluster-autoscaler/processors/processors.go b/cluster-autoscaler/processors/processors.go new file mode 100644 index 0000000000..c948ab30b9 --- /dev/null +++ b/cluster-autoscaler/processors/processors.go @@ -0,0 +1,45 @@ +/* +Copyright 2018 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package processors + +import ( + "k8s.io/autoscaler/cluster-autoscaler/processors/nodegroups" + "k8s.io/autoscaler/cluster-autoscaler/processors/pods" +) + +// AutoscalingProcessors are a set of customizable processors used for encapsulating +// various heuristics used in different parts of Cluster Autoscaler code. +type AutoscalingProcessors struct { + // PodListProcessor is used to process list of unschedulable pods before autoscaling. + PodListProcessor pods.PodListProcessor + // NodeGroupListProcessor is used to process list of NodeGroups that can be used in scale-up. + NodeGroupListProcessor nodegroups.NodeGroupListProcessor +} + +// DefaultProcessors returns default set of processors. +func DefaultProcessors() *AutoscalingProcessors { + return &AutoscalingProcessors{ + PodListProcessor: pods.NewDefaultPodListProcessor(), + NodeGroupListProcessor: nodegroups.NewDefaultNodeGroupListProcessor()} +} + +// TestProcessors returns a set of simple processors for use in tests. +func TestProcessors() *AutoscalingProcessors { + return &AutoscalingProcessors{ + PodListProcessor: &pods.NoOpPodListProcessor{}, + NodeGroupListProcessor: &nodegroups.NoOpNodeGroupListProcessor{}} +}