Move some GKE-specific logic outside core

No change in actual logic being executed. Added a new
NodeGroupListProcessor interface to encapsulate the existing logic.
Moved PodListProcessor and refactor how it's passed around
to make it consistent and easy to add similar interfaces.
This commit is contained in:
Maciej Pytel 2018-05-28 13:40:25 +02:00
parent 5faa41e683
commit 856855987b
10 changed files with 363 additions and 168 deletions

View File

@ -20,10 +20,10 @@ import (
"time"
"k8s.io/autoscaler/cluster-autoscaler/context"
ca_processors "k8s.io/autoscaler/cluster-autoscaler/processors"
"k8s.io/autoscaler/cluster-autoscaler/simulator"
"k8s.io/autoscaler/cluster-autoscaler/utils/errors"
kube_util "k8s.io/autoscaler/cluster-autoscaler/utils/kubernetes"
"k8s.io/autoscaler/cluster-autoscaler/processors/pods"
kube_client "k8s.io/client-go/kubernetes"
kube_record "k8s.io/client-go/tools/record"
)
@ -35,7 +35,7 @@ type AutoscalerOptions struct {
KubeEventRecorder kube_record.EventRecorder
PredicateChecker *simulator.PredicateChecker
ListerRegistry kube_util.ListerRegistry
PodListProcessor pods.PodListProcessor
Processors *ca_processors.AutoscalingProcessors
}
// Autoscaler is the main component of CA which scales up/down node groups according to its configuration
@ -48,8 +48,8 @@ type Autoscaler interface {
}
func initializeDefaultOptions(opts *AutoscalerOptions) error {
if opts.PodListProcessor == nil {
opts.PodListProcessor = pods.NewDefaultPodListProcessor()
if opts.Processors == nil {
opts.Processors = ca_processors.DefaultProcessors()
}
return nil
}
@ -60,6 +60,6 @@ func NewAutoscaler(opts AutoscalerOptions) (Autoscaler, errors.AutoscalerError)
if err != nil {
return nil, errors.ToAutoscalerError(errors.InternalError, err)
}
autoscalerBuilder := NewAutoscalerBuilder(opts.AutoscalingOptions, opts.PredicateChecker, opts.KubeClient, opts.KubeEventRecorder, opts.ListerRegistry, opts.PodListProcessor)
autoscalerBuilder := NewAutoscalerBuilder(opts.AutoscalingOptions, opts.PredicateChecker, opts.KubeClient, opts.KubeEventRecorder, opts.ListerRegistry, opts.Processors)
return autoscalerBuilder.Build()
}

View File

@ -19,10 +19,10 @@ package core
import (
"k8s.io/autoscaler/cluster-autoscaler/config/dynamic"
"k8s.io/autoscaler/cluster-autoscaler/context"
ca_processors "k8s.io/autoscaler/cluster-autoscaler/processors"
"k8s.io/autoscaler/cluster-autoscaler/simulator"
"k8s.io/autoscaler/cluster-autoscaler/utils/errors"
kube_util "k8s.io/autoscaler/cluster-autoscaler/utils/kubernetes"
"k8s.io/autoscaler/cluster-autoscaler/processors/pods"
kube_client "k8s.io/client-go/kubernetes"
kube_record "k8s.io/client-go/tools/record"
)
@ -42,19 +42,19 @@ type AutoscalerBuilderImpl struct {
kubeEventRecorder kube_record.EventRecorder
predicateChecker *simulator.PredicateChecker
listerRegistry kube_util.ListerRegistry
podListProcessor pods.PodListProcessor
processors *ca_processors.AutoscalingProcessors
}
// NewAutoscalerBuilder builds an AutoscalerBuilder from required parameters
func NewAutoscalerBuilder(autoscalingOptions context.AutoscalingOptions, predicateChecker *simulator.PredicateChecker,
kubeClient kube_client.Interface, kubeEventRecorder kube_record.EventRecorder, listerRegistry kube_util.ListerRegistry, podListProcessor pods.PodListProcessor) *AutoscalerBuilderImpl {
kubeClient kube_client.Interface, kubeEventRecorder kube_record.EventRecorder, listerRegistry kube_util.ListerRegistry, processors *ca_processors.AutoscalingProcessors) *AutoscalerBuilderImpl {
return &AutoscalerBuilderImpl{
autoscalingOptions: autoscalingOptions,
kubeClient: kubeClient,
kubeEventRecorder: kubeEventRecorder,
predicateChecker: predicateChecker,
listerRegistry: listerRegistry,
podListProcessor: podListProcessor,
processors: processors,
}
}
@ -72,5 +72,5 @@ func (b *AutoscalerBuilderImpl) Build() (Autoscaler, errors.AutoscalerError) {
c := *(b.dynamicConfig)
options.NodeGroups = c.NodeGroupSpecStrings()
}
return NewStaticAutoscaler(options, b.predicateChecker, b.kubeClient, b.kubeEventRecorder, b.listerRegistry, b.podListProcessor)
return NewStaticAutoscaler(options, b.predicateChecker, b.kubeClient, b.kubeEventRecorder, b.listerRegistry, b.processors)
}

View File

@ -22,17 +22,15 @@ import (
apiv1 "k8s.io/api/core/v1"
extensionsv1 "k8s.io/api/extensions/v1beta1"
"k8s.io/apimachinery/pkg/api/resource"
"k8s.io/autoscaler/cluster-autoscaler/cloudprovider"
"k8s.io/autoscaler/cluster-autoscaler/clusterstate"
"k8s.io/autoscaler/cluster-autoscaler/context"
"k8s.io/autoscaler/cluster-autoscaler/estimator"
"k8s.io/autoscaler/cluster-autoscaler/expander"
"k8s.io/autoscaler/cluster-autoscaler/metrics"
ca_processors "k8s.io/autoscaler/cluster-autoscaler/processors"
"k8s.io/autoscaler/cluster-autoscaler/utils/errors"
"k8s.io/autoscaler/cluster-autoscaler/utils/glogx"
"k8s.io/autoscaler/cluster-autoscaler/utils/gpu"
"k8s.io/autoscaler/cluster-autoscaler/utils/labels"
"k8s.io/autoscaler/cluster-autoscaler/utils/nodegroupset"
"k8s.io/kubernetes/pkg/scheduler/schedulercache"
@ -42,8 +40,8 @@ import (
// ScaleUp tries to scale the cluster up. Return true if it found a way to increase the size,
// false if it didn't and error if an error occurred. Assumes that all nodes in the cluster are
// ready and in sync with instance groups.
func ScaleUp(context *context.AutoscalingContext, clusterStateRegistry *clusterstate.ClusterStateRegistry, unschedulablePods []*apiv1.Pod, nodes []*apiv1.Node,
daemonSets []*extensionsv1.DaemonSet) (bool, errors.AutoscalerError) {
func ScaleUp(context *context.AutoscalingContext, processors *ca_processors.AutoscalingProcessors, clusterStateRegistry *clusterstate.ClusterStateRegistry, unschedulablePods []*apiv1.Pod,
nodes []*apiv1.Node, daemonSets []*extensionsv1.DaemonSet) (bool, errors.AutoscalerError) {
// From now on we only care about unschedulable pods that were marked after the newest
// node became available for the scheduler.
if len(unschedulablePods) == 0 {
@ -97,8 +95,12 @@ func ScaleUp(context *context.AutoscalingContext, clusterStateRegistry *clusters
podsPassingPredicates := make(map[string][]*apiv1.Pod)
expansionOptions := make([]expander.Option, 0)
if context.AutoscalingOptions.NodeAutoprovisioningEnabled {
nodeGroups, nodeInfos = addAutoprovisionedCandidates(context, nodeGroups, nodeInfos, unschedulablePods)
if processors != nil && processors.NodeGroupListProcessor != nil {
var errProc error
nodeGroups, nodeInfos, errProc = processors.NodeGroupListProcessor.Process(context, nodeGroups, nodeInfos, unschedulablePods)
if errProc != nil {
return false, errors.ToAutoscalerError(errors.InternalError, errProc)
}
}
for _, nodeGroup := range nodeGroups {
@ -356,77 +358,6 @@ func executeScaleUp(context *context.AutoscalingContext, clusterStateRegistry *c
return nil
}
func addAutoprovisionedCandidates(context *context.AutoscalingContext, nodeGroups []cloudprovider.NodeGroup,
nodeInfos map[string]*schedulercache.NodeInfo, unschedulablePods []*apiv1.Pod) ([]cloudprovider.NodeGroup,
map[string]*schedulercache.NodeInfo) {
autoprovisionedNodeGroupCount := 0
for _, group := range nodeGroups {
if group.Autoprovisioned() {
autoprovisionedNodeGroupCount++
}
}
if autoprovisionedNodeGroupCount >= context.MaxAutoprovisionedNodeGroupCount {
glog.V(4).Infof("Max autoprovisioned node group count reached")
return nodeGroups, nodeInfos
}
newGroupsCount := 0
newNodeGroups := addAllMachineTypesForConfig(context, map[string]string{}, map[string]resource.Quantity{},
nodeInfos, unschedulablePods)
newGroupsCount += len(newNodeGroups)
nodeGroups = append(nodeGroups, newNodeGroups...)
gpuRequests := gpu.GetGpuRequests(unschedulablePods)
for _, gpuRequestInfo := range gpuRequests {
glog.V(4).Info("Adding node groups using GPU to NAP simulations")
extraResources := map[string]resource.Quantity{
gpu.ResourceNvidiaGPU: gpuRequestInfo.MaxRequest,
}
newNodeGroups := addAllMachineTypesForConfig(context, gpuRequestInfo.SystemLabels, extraResources,
nodeInfos, gpuRequestInfo.Pods)
newGroupsCount += len(newNodeGroups)
nodeGroups = append(nodeGroups, newNodeGroups...)
}
glog.V(4).Infof("Considering %v potential node groups in NAP simulations", newGroupsCount)
return nodeGroups, nodeInfos
}
func addAllMachineTypesForConfig(context *context.AutoscalingContext, systemLabels map[string]string, extraResources map[string]resource.Quantity,
nodeInfos map[string]*schedulercache.NodeInfo, unschedulablePods []*apiv1.Pod) []cloudprovider.NodeGroup {
nodeGroups := make([]cloudprovider.NodeGroup, 0)
machines, err := context.CloudProvider.GetAvailableMachineTypes()
if err != nil {
glog.Warningf("Failed to get machine types: %v", err)
return nodeGroups
}
bestLabels := labels.BestLabelSet(unschedulablePods)
taints := make([]apiv1.Taint, 0)
for _, machineType := range machines {
nodeGroup, err := context.CloudProvider.NewNodeGroup(machineType, bestLabels, systemLabels, taints, extraResources)
if err != nil {
// We don't check if a given node group setup is allowed.
// It's fine if it isn't, just don't consider it an option.
if err != cloudprovider.ErrIllegalConfiguration {
glog.Warningf("Unable to build temporary node group for %s: %v", machineType, err)
}
continue
}
nodeInfo, err := nodeGroup.TemplateNodeInfo()
if err != nil {
glog.Warningf("Unable to build template for node group for %s: %v", nodeGroup.Id(), err)
continue
}
nodeInfos[nodeGroup.Id()] = nodeInfo
nodeGroups = append(nodeGroups, nodeGroup)
}
return nodeGroups
}
func calculateClusterCoresMemoryTotal(nodeGroups []cloudprovider.NodeGroup, nodeInfos map[string]*schedulercache.NodeInfo) (int64, int64) {
var coresTotal int64
var memoryTotal int64

View File

@ -31,6 +31,8 @@ import (
"k8s.io/autoscaler/cluster-autoscaler/context"
"k8s.io/autoscaler/cluster-autoscaler/estimator"
"k8s.io/autoscaler/cluster-autoscaler/expander/random"
ca_processors "k8s.io/autoscaler/cluster-autoscaler/processors"
"k8s.io/autoscaler/cluster-autoscaler/processors/nodegroups"
"k8s.io/autoscaler/cluster-autoscaler/simulator"
kube_util "k8s.io/autoscaler/cluster-autoscaler/utils/kubernetes"
. "k8s.io/autoscaler/cluster-autoscaler/utils/test"
@ -368,7 +370,9 @@ func simpleScaleUpTest(t *testing.T, config *scaleTestConfig) {
extraPods[i] = pod
}
result, err := ScaleUp(context, clusterState, extraPods, nodes, []*extensionsv1.DaemonSet{})
processors := ca_processors.TestProcessors()
result, err := ScaleUp(context, processors, clusterState, extraPods, nodes, []*extensionsv1.DaemonSet{})
assert.NoError(t, err)
assert.True(t, result)
@ -470,7 +474,9 @@ func TestScaleUpNodeComingNoScale(t *testing.T) {
}
p3 := BuildTestPod("p-new", 550, 0)
result, err := ScaleUp(context, clusterState, []*apiv1.Pod{p3}, []*apiv1.Node{n1, n2}, []*extensionsv1.DaemonSet{})
processors := ca_processors.TestProcessors()
result, err := ScaleUp(context, processors, clusterState, []*apiv1.Pod{p3}, []*apiv1.Node{n1, n2}, []*extensionsv1.DaemonSet{})
assert.NoError(t, err)
// A node is already coming - no need for scale up.
assert.False(t, result)
@ -532,7 +538,9 @@ func TestScaleUpNodeComingHasScale(t *testing.T) {
}
p3 := BuildTestPod("p-new", 550, 0)
result, err := ScaleUp(context, clusterState, []*apiv1.Pod{p3, p3}, []*apiv1.Node{n1, n2}, []*extensionsv1.DaemonSet{})
processors := ca_processors.TestProcessors()
result, err := ScaleUp(context, processors, clusterState, []*apiv1.Pod{p3, p3}, []*apiv1.Node{n1, n2}, []*extensionsv1.DaemonSet{})
assert.NoError(t, err)
// Two nodes needed but one node is already coming, so it should increase by one.
assert.True(t, result)
@ -591,7 +599,9 @@ func TestScaleUpUnhealthy(t *testing.T) {
}
p3 := BuildTestPod("p-new", 550, 0)
result, err := ScaleUp(context, clusterState, []*apiv1.Pod{p3}, []*apiv1.Node{n1, n2}, []*extensionsv1.DaemonSet{})
processors := ca_processors.TestProcessors()
result, err := ScaleUp(context, processors, clusterState, []*apiv1.Pod{p3}, []*apiv1.Node{n1, n2}, []*extensionsv1.DaemonSet{})
assert.NoError(t, err)
// Node group is unhealthy.
assert.False(t, result)
@ -641,7 +651,9 @@ func TestScaleUpNoHelp(t *testing.T) {
}
p3 := BuildTestPod("p-new", 500, 0)
result, err := ScaleUp(context, clusterState, []*apiv1.Pod{p3}, []*apiv1.Node{n1}, []*extensionsv1.DaemonSet{})
processors := ca_processors.TestProcessors()
result, err := ScaleUp(context, processors, clusterState, []*apiv1.Pod{p3}, []*apiv1.Node{n1}, []*extensionsv1.DaemonSet{})
assert.NoError(t, err)
assert.False(t, result)
var event string
@ -725,7 +737,9 @@ func TestScaleUpBalanceGroups(t *testing.T) {
pods = append(pods, BuildTestPod(fmt.Sprintf("test-pod-%v", i), 80, 0))
}
result, typedErr := ScaleUp(context, clusterState, pods, nodes, []*extensionsv1.DaemonSet{})
processors := ca_processors.TestProcessors()
result, typedErr := ScaleUp(context, processors, clusterState, pods, nodes, []*extensionsv1.DaemonSet{})
assert.NoError(t, typedErr)
assert.True(t, result)
groupMap := make(map[string]cloudprovider.NodeGroup, 3)
@ -783,71 +797,12 @@ func TestScaleUpAutoprovisionedNodeGroup(t *testing.T) {
LogRecorder: fakeLogRecorder,
}
result, err := ScaleUp(context, clusterState, []*apiv1.Pod{p1}, []*apiv1.Node{}, []*extensionsv1.DaemonSet{})
processors := ca_processors.TestProcessors()
processors.NodeGroupListProcessor = nodegroups.NewAutoprovisioningNodeGroupListProcessor()
result, err := ScaleUp(context, processors, clusterState, []*apiv1.Pod{p1}, []*apiv1.Node{}, []*extensionsv1.DaemonSet{})
assert.NoError(t, err)
assert.True(t, result)
assert.Equal(t, "autoprovisioned-T1", getStringFromChan(createdGroups))
assert.Equal(t, "autoprovisioned-T1-1", getStringFromChan(expandedGroups))
}
func TestAddAutoprovisionedCandidatesOK(t *testing.T) {
t1 := BuildTestNode("t1", 4000, 1000000)
ti1 := schedulercache.NewNodeInfo()
ti1.SetNode(t1)
p1 := BuildTestPod("p1", 100, 100)
n1 := BuildTestNode("ng1-xxx", 4000, 1000000)
ni1 := schedulercache.NewNodeInfo()
ni1.SetNode(n1)
provider := testprovider.NewTestAutoprovisioningCloudProvider(nil, nil,
nil, nil,
[]string{"T1"}, map[string]*schedulercache.NodeInfo{"T1": ti1})
provider.AddNodeGroup("ng1", 1, 5, 3)
context := &context.AutoscalingContext{
AutoscalingOptions: context.AutoscalingOptions{
MaxAutoprovisionedNodeGroupCount: 1,
},
CloudProvider: provider,
}
nodeGroups := provider.NodeGroups()
nodeInfos := map[string]*schedulercache.NodeInfo{
"ng1": ni1,
}
nodeGroups, nodeInfos = addAutoprovisionedCandidates(context, nodeGroups, nodeInfos, []*apiv1.Pod{p1})
assert.Equal(t, 2, len(nodeGroups))
assert.Equal(t, 2, len(nodeInfos))
}
func TestAddAutoprovisionedCandidatesToMany(t *testing.T) {
t1 := BuildTestNode("T1-abc", 4000, 1000000)
ti1 := schedulercache.NewNodeInfo()
ti1.SetNode(t1)
x1 := BuildTestNode("X1-cde", 4000, 1000000)
xi1 := schedulercache.NewNodeInfo()
xi1.SetNode(x1)
p1 := BuildTestPod("p1", 100, 100)
provider := testprovider.NewTestAutoprovisioningCloudProvider(nil, nil,
nil, nil,
[]string{"T1", "X1"},
map[string]*schedulercache.NodeInfo{"T1": ti1, "X1": xi1})
provider.AddAutoprovisionedNodeGroup("autoprovisioned-X1", 0, 1000, 0, "X1")
context := &context.AutoscalingContext{
AutoscalingOptions: context.AutoscalingOptions{
MaxAutoprovisionedNodeGroupCount: 1,
},
CloudProvider: provider,
}
nodeGroups := provider.NodeGroups()
nodeInfos := map[string]*schedulercache.NodeInfo{"X1": xi1}
nodeGroups, nodeInfos = addAutoprovisionedCandidates(context, nodeGroups, nodeInfos, []*apiv1.Pod{p1})
assert.Equal(t, 1, len(nodeGroups))
assert.Equal(t, 1, len(nodeInfos))
}

View File

@ -23,11 +23,11 @@ import (
"k8s.io/autoscaler/cluster-autoscaler/clusterstate/utils"
"k8s.io/autoscaler/cluster-autoscaler/context"
"k8s.io/autoscaler/cluster-autoscaler/metrics"
ca_processors "k8s.io/autoscaler/cluster-autoscaler/processors"
"k8s.io/autoscaler/cluster-autoscaler/simulator"
"k8s.io/autoscaler/cluster-autoscaler/utils/errors"
"k8s.io/autoscaler/cluster-autoscaler/utils/gpu"
kube_util "k8s.io/autoscaler/cluster-autoscaler/utils/kubernetes"
"k8s.io/autoscaler/cluster-autoscaler/processors/pods"
"k8s.io/autoscaler/cluster-autoscaler/utils/tpu"
apiv1 "k8s.io/api/core/v1"
@ -60,14 +60,14 @@ type StaticAutoscaler struct {
lastScaleDownDeleteTime time.Time
lastScaleDownFailTime time.Time
scaleDown *ScaleDown
podListProcessor pods.PodListProcessor
processors *ca_processors.AutoscalingProcessors
initialized bool
}
// NewStaticAutoscaler creates an instance of Autoscaler filled with provided parameters
func NewStaticAutoscaler(opts context.AutoscalingOptions, predicateChecker *simulator.PredicateChecker,
kubeClient kube_client.Interface, kubeEventRecorder kube_record.EventRecorder, listerRegistry kube_util.ListerRegistry,
podListProcessor pods.PodListProcessor) (*StaticAutoscaler, errors.AutoscalerError) {
processors *ca_processors.AutoscalingProcessors) (*StaticAutoscaler, errors.AutoscalerError) {
logRecorder, err := utils.NewStatusMapRecorder(kubeClient, opts.ConfigNamespace, kubeEventRecorder, opts.WriteStatusConfigMap)
if err != nil {
glog.Error("Failed to initialize status configmap, unable to write status events")
@ -97,7 +97,7 @@ func NewStaticAutoscaler(opts context.AutoscalingOptions, predicateChecker *simu
lastScaleDownDeleteTime: time.Now(),
lastScaleDownFailTime: time.Now(),
scaleDown: scaleDown,
podListProcessor: podListProcessor,
processors: processors,
clusterStateRegistry: clusterStateRegistry,
}, nil
}
@ -210,7 +210,7 @@ func (a *StaticAutoscaler) RunOnce(currentTime time.Time) errors.AutoscalerError
return errors.ToAutoscalerError(errors.ApiCallError, err)
}
allUnschedulablePods, allScheduled, err = a.podListProcessor.Process(a.AutoscalingContext, allUnschedulablePods, allScheduled, allNodes)
allUnschedulablePods, allScheduled, err = a.processors.PodListProcessor.Process(a.AutoscalingContext, allUnschedulablePods, allScheduled, allNodes)
if err != nil {
glog.Errorf("Failed to process pod list: %v", err)
return errors.ToAutoscalerError(errors.InternalError, err)
@ -275,7 +275,7 @@ func (a *StaticAutoscaler) RunOnce(currentTime time.Time) errors.AutoscalerError
scaleUpStart := time.Now()
metrics.UpdateLastTime(metrics.ScaleUp, scaleUpStart)
scaledUp, typedErr := ScaleUp(autoscalingContext, a.clusterStateRegistry, unschedulablePodsToHelp, readyNodes, daemonsets)
scaledUp, typedErr := ScaleUp(autoscalingContext, a.processors, a.clusterStateRegistry, unschedulablePodsToHelp, readyNodes, daemonsets)
metrics.UpdateDurationFromStart(metrics.ScaleUp, scaleUpStart)

View File

@ -27,9 +27,10 @@ import (
"k8s.io/autoscaler/cluster-autoscaler/context"
"k8s.io/autoscaler/cluster-autoscaler/estimator"
"k8s.io/autoscaler/cluster-autoscaler/expander/random"
ca_processors "k8s.io/autoscaler/cluster-autoscaler/processors"
"k8s.io/autoscaler/cluster-autoscaler/processors/nodegroups"
"k8s.io/autoscaler/cluster-autoscaler/simulator"
kube_util "k8s.io/autoscaler/cluster-autoscaler/utils/kubernetes"
"k8s.io/autoscaler/cluster-autoscaler/processors/pods"
scheduler_util "k8s.io/autoscaler/cluster-autoscaler/utils/scheduler"
. "k8s.io/autoscaler/cluster-autoscaler/utils/test"
@ -200,7 +201,7 @@ func TestStaticAutoscalerRunOnce(t *testing.T) {
lastScaleUpTime: time.Now(),
lastScaleDownFailTime: time.Now(),
scaleDown: sd,
podListProcessor: pods.NewDefaultPodListProcessor(),
processors: ca_processors.TestProcessors(),
initialized: true}
// MaxNodesTotal reached.
@ -348,6 +349,8 @@ func TestStaticAutoscalerRunOnceWithAutoprovisionedEnabled(t *testing.T) {
clusterState := clusterstate.NewClusterStateRegistry(provider, clusterStateConfig, fakeLogRecorder)
clusterState.UpdateNodes([]*apiv1.Node{n1}, time.Now())
processors := ca_processors.TestProcessors()
processors.NodeGroupListProcessor = nodegroups.NewAutoprovisioningNodeGroupListProcessor()
context := &context.AutoscalingContext{
AutoscalingOptions: context.AutoscalingOptions{
EstimatorName: estimator.BinpackingEstimatorName,
@ -380,7 +383,7 @@ func TestStaticAutoscalerRunOnceWithAutoprovisionedEnabled(t *testing.T) {
lastScaleUpTime: time.Now(),
lastScaleDownFailTime: time.Now(),
scaleDown: sd,
podListProcessor: pods.NewDefaultPodListProcessor(),
processors: processors,
initialized: true}
// Scale up.
@ -518,7 +521,7 @@ func TestStaticAutoscalerRunOnceWithALongUnregisteredNode(t *testing.T) {
lastScaleUpTime: time.Now(),
lastScaleDownFailTime: time.Now(),
scaleDown: sd,
podListProcessor: pods.NewDefaultPodListProcessor()}
processors: ca_processors.TestProcessors()}
// Scale up.
readyNodeListerMock.On("List").Return([]*apiv1.Node{n1}, nil).Times(2) // due to initialized=false
@ -655,7 +658,7 @@ func TestStaticAutoscalerRunOncePodsWithPriorities(t *testing.T) {
lastScaleUpTime: time.Now(),
lastScaleDownFailTime: time.Now(),
scaleDown: sd,
podListProcessor: pods.NewDefaultPodListProcessor()}
processors: ca_processors.TestProcessors()}
// Scale up
readyNodeListerMock.On("List").Return([]*apiv1.Node{n1, n2, n3}, nil).Times(2) // due to initialized=false

View File

@ -0,0 +1,113 @@
/*
Copyright 2018 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package nodegroups
import (
apiv1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/resource"
"k8s.io/autoscaler/cluster-autoscaler/cloudprovider"
"k8s.io/autoscaler/cluster-autoscaler/context"
"k8s.io/autoscaler/cluster-autoscaler/utils/gpu"
"k8s.io/autoscaler/cluster-autoscaler/utils/labels"
"k8s.io/kubernetes/pkg/scheduler/schedulercache"
"github.com/golang/glog"
)
// AutoprovisioningNodeGroupListProcessor adds autoprovisioning candidates to consider in scale-up.
type AutoprovisioningNodeGroupListProcessor struct {
}
// NewAutoprovisioningNodeGroupListProcessor creates an instance of NodeGroupListProcessor.
func NewAutoprovisioningNodeGroupListProcessor() NodeGroupListProcessor {
return &AutoprovisioningNodeGroupListProcessor{}
}
// Process processes lists of unschedulable and sheduled pods before scaling of the cluster.
func (p *AutoprovisioningNodeGroupListProcessor) Process(context *context.AutoscalingContext, nodeGroups []cloudprovider.NodeGroup, nodeInfos map[string]*schedulercache.NodeInfo,
unschedulablePods []*apiv1.Pod) ([]cloudprovider.NodeGroup, map[string]*schedulercache.NodeInfo, error) {
if !context.AutoscalingOptions.NodeAutoprovisioningEnabled {
return nodeGroups, nodeInfos, nil
}
autoprovisionedNodeGroupCount := 0
for _, group := range nodeGroups {
if group.Autoprovisioned() {
autoprovisionedNodeGroupCount++
}
}
if autoprovisionedNodeGroupCount >= context.MaxAutoprovisionedNodeGroupCount {
glog.V(4).Infof("Max autoprovisioned node group count reached")
return nodeGroups, nodeInfos, nil
}
newGroupsCount := 0
newNodeGroups := addAllMachineTypesForConfig(context, map[string]string{}, map[string]resource.Quantity{},
nodeInfos, unschedulablePods)
newGroupsCount += len(newNodeGroups)
nodeGroups = append(nodeGroups, newNodeGroups...)
gpuRequests := gpu.GetGpuRequests(unschedulablePods)
for _, gpuRequestInfo := range gpuRequests {
glog.V(4).Info("Adding node groups using GPU to NAP simulations")
extraResources := map[string]resource.Quantity{
gpu.ResourceNvidiaGPU: gpuRequestInfo.MaxRequest,
}
newNodeGroups := addAllMachineTypesForConfig(context, gpuRequestInfo.SystemLabels, extraResources,
nodeInfos, gpuRequestInfo.Pods)
newGroupsCount += len(newNodeGroups)
nodeGroups = append(nodeGroups, newNodeGroups...)
}
glog.V(4).Infof("Considering %v potential node groups in NAP simulations", newGroupsCount)
return nodeGroups, nodeInfos, nil
}
func addAllMachineTypesForConfig(context *context.AutoscalingContext, systemLabels map[string]string, extraResources map[string]resource.Quantity,
nodeInfos map[string]*schedulercache.NodeInfo, unschedulablePods []*apiv1.Pod) []cloudprovider.NodeGroup {
nodeGroups := make([]cloudprovider.NodeGroup, 0)
machines, err := context.CloudProvider.GetAvailableMachineTypes()
if err != nil {
glog.Warningf("Failed to get machine types: %v", err)
return nodeGroups
}
bestLabels := labels.BestLabelSet(unschedulablePods)
taints := make([]apiv1.Taint, 0)
for _, machineType := range machines {
nodeGroup, err := context.CloudProvider.NewNodeGroup(machineType, bestLabels, systemLabels, taints, extraResources)
if err != nil {
// We don't check if a given node group setup is allowed.
// It's fine if it isn't, just don't consider it an option.
if err != cloudprovider.ErrIllegalConfiguration {
glog.Warningf("Unable to build temporary node group for %s: %v", machineType, err)
}
continue
}
nodeInfo, err := nodeGroup.TemplateNodeInfo()
if err != nil {
glog.Warningf("Unable to build template for node group for %s: %v", nodeGroup.Id(), err)
continue
}
nodeInfos[nodeGroup.Id()] = nodeInfo
nodeGroups = append(nodeGroups, nodeGroup)
}
return nodeGroups
}

View File

@ -0,0 +1,101 @@
/*
Copyright 2018 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package nodegroups
import (
"testing"
apiv1 "k8s.io/api/core/v1"
testprovider "k8s.io/autoscaler/cluster-autoscaler/cloudprovider/test"
"k8s.io/autoscaler/cluster-autoscaler/context"
. "k8s.io/autoscaler/cluster-autoscaler/utils/test"
"k8s.io/kubernetes/pkg/scheduler/schedulercache"
"github.com/stretchr/testify/assert"
)
func TestAutoprovisioningNGLProcessor(t *testing.T) {
processor := NewAutoprovisioningNodeGroupListProcessor()
t1 := BuildTestNode("t1", 4000, 1000000)
ti1 := schedulercache.NewNodeInfo()
ti1.SetNode(t1)
p1 := BuildTestPod("p1", 100, 100)
n1 := BuildTestNode("ng1-xxx", 4000, 1000000)
ni1 := schedulercache.NewNodeInfo()
ni1.SetNode(n1)
provider := testprovider.NewTestAutoprovisioningCloudProvider(nil, nil,
nil, nil,
[]string{"T1"}, map[string]*schedulercache.NodeInfo{"T1": ti1})
provider.AddNodeGroup("ng1", 1, 5, 3)
context := &context.AutoscalingContext{
AutoscalingOptions: context.AutoscalingOptions{
MaxAutoprovisionedNodeGroupCount: 1,
NodeAutoprovisioningEnabled: true,
},
CloudProvider: provider,
}
nodeGroups := provider.NodeGroups()
nodeInfos := map[string]*schedulercache.NodeInfo{
"ng1": ni1,
}
var err error
nodeGroups, nodeInfos, err = processor.Process(context, nodeGroups, nodeInfos, []*apiv1.Pod{p1})
assert.NoError(t, err)
assert.Equal(t, 2, len(nodeGroups))
assert.Equal(t, 2, len(nodeInfos))
}
func TestAutoprovisioningNGLProcessorTooMany(t *testing.T) {
processor := NewAutoprovisioningNodeGroupListProcessor()
t1 := BuildTestNode("T1-abc", 4000, 1000000)
ti1 := schedulercache.NewNodeInfo()
ti1.SetNode(t1)
x1 := BuildTestNode("X1-cde", 4000, 1000000)
xi1 := schedulercache.NewNodeInfo()
xi1.SetNode(x1)
p1 := BuildTestPod("p1", 100, 100)
provider := testprovider.NewTestAutoprovisioningCloudProvider(nil, nil,
nil, nil,
[]string{"T1", "X1"},
map[string]*schedulercache.NodeInfo{"T1": ti1, "X1": xi1})
provider.AddAutoprovisionedNodeGroup("autoprovisioned-X1", 0, 1000, 0, "X1")
context := &context.AutoscalingContext{
AutoscalingOptions: context.AutoscalingOptions{
MaxAutoprovisionedNodeGroupCount: 1,
NodeAutoprovisioningEnabled: true,
},
CloudProvider: provider,
}
nodeGroups := provider.NodeGroups()
nodeInfos := map[string]*schedulercache.NodeInfo{"X1": xi1}
var err error
nodeGroups, nodeInfos, err = processor.Process(context, nodeGroups, nodeInfos, []*apiv1.Pod{p1})
assert.NoError(t, err)
assert.Equal(t, 1, len(nodeGroups))
assert.Equal(t, 1, len(nodeInfos))
}

View File

@ -0,0 +1,47 @@
/*
Copyright 2018 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package nodegroups
import (
apiv1 "k8s.io/api/core/v1"
"k8s.io/autoscaler/cluster-autoscaler/cloudprovider"
"k8s.io/autoscaler/cluster-autoscaler/context"
"k8s.io/kubernetes/pkg/scheduler/schedulercache"
)
// NodeGroupListProcessor processes lists of NodeGroups considered in scale-up.
type NodeGroupListProcessor interface {
Process(context *context.AutoscalingContext, nodeGroups []cloudprovider.NodeGroup,
nodeInfos map[string]*schedulercache.NodeInfo,
unschedulablePods []*apiv1.Pod) ([]cloudprovider.NodeGroup, map[string]*schedulercache.NodeInfo, error)
}
// NoOpNodeGroupListProcessor is returning pod lists without processing them.
type NoOpNodeGroupListProcessor struct {
}
// NewDefaultNodeGroupListProcessor creates an instance of NodeGroupListProcessor.
func NewDefaultNodeGroupListProcessor() NodeGroupListProcessor {
// TODO(maciekpytel): Use a better default
return &AutoprovisioningNodeGroupListProcessor{}
}
// Process processes lists of unschedulable and sheduled pods before scaling of the cluster.
func (p *NoOpNodeGroupListProcessor) Process(context *context.AutoscalingContext, nodeGroups []cloudprovider.NodeGroup, nodeInfos map[string]*schedulercache.NodeInfo,
unschedulablePods []*apiv1.Pod) ([]cloudprovider.NodeGroup, map[string]*schedulercache.NodeInfo, error) {
return nodeGroups, nodeInfos, nil
}

View File

@ -0,0 +1,45 @@
/*
Copyright 2018 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package processors
import (
"k8s.io/autoscaler/cluster-autoscaler/processors/nodegroups"
"k8s.io/autoscaler/cluster-autoscaler/processors/pods"
)
// AutoscalingProcessors are a set of customizable processors used for encapsulating
// various heuristics used in different parts of Cluster Autoscaler code.
type AutoscalingProcessors struct {
// PodListProcessor is used to process list of unschedulable pods before autoscaling.
PodListProcessor pods.PodListProcessor
// NodeGroupListProcessor is used to process list of NodeGroups that can be used in scale-up.
NodeGroupListProcessor nodegroups.NodeGroupListProcessor
}
// DefaultProcessors returns default set of processors.
func DefaultProcessors() *AutoscalingProcessors {
return &AutoscalingProcessors{
PodListProcessor: pods.NewDefaultPodListProcessor(),
NodeGroupListProcessor: nodegroups.NewDefaultNodeGroupListProcessor()}
}
// TestProcessors returns a set of simple processors for use in tests.
func TestProcessors() *AutoscalingProcessors {
return &AutoscalingProcessors{
PodListProcessor: &pods.NoOpPodListProcessor{},
NodeGroupListProcessor: &nodegroups.NoOpNodeGroupListProcessor{}}
}