Merge pull request #900 from MaciekPytel/nodegroup_processor

Move some GKE-specific logic outside core
This commit is contained in:
Beata Skiba 2018-05-29 13:58:41 +02:00 committed by GitHub
commit 7546dff1a4
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
12 changed files with 363 additions and 168 deletions

View File

@ -20,10 +20,10 @@ import (
"time" "time"
"k8s.io/autoscaler/cluster-autoscaler/context" "k8s.io/autoscaler/cluster-autoscaler/context"
ca_processors "k8s.io/autoscaler/cluster-autoscaler/processors"
"k8s.io/autoscaler/cluster-autoscaler/simulator" "k8s.io/autoscaler/cluster-autoscaler/simulator"
"k8s.io/autoscaler/cluster-autoscaler/utils/errors" "k8s.io/autoscaler/cluster-autoscaler/utils/errors"
kube_util "k8s.io/autoscaler/cluster-autoscaler/utils/kubernetes" kube_util "k8s.io/autoscaler/cluster-autoscaler/utils/kubernetes"
"k8s.io/autoscaler/cluster-autoscaler/utils/pods"
kube_client "k8s.io/client-go/kubernetes" kube_client "k8s.io/client-go/kubernetes"
kube_record "k8s.io/client-go/tools/record" kube_record "k8s.io/client-go/tools/record"
) )
@ -35,7 +35,7 @@ type AutoscalerOptions struct {
KubeEventRecorder kube_record.EventRecorder KubeEventRecorder kube_record.EventRecorder
PredicateChecker *simulator.PredicateChecker PredicateChecker *simulator.PredicateChecker
ListerRegistry kube_util.ListerRegistry ListerRegistry kube_util.ListerRegistry
PodListProcessor pods.PodListProcessor Processors *ca_processors.AutoscalingProcessors
} }
// Autoscaler is the main component of CA which scales up/down node groups according to its configuration // Autoscaler is the main component of CA which scales up/down node groups according to its configuration
@ -48,8 +48,8 @@ type Autoscaler interface {
} }
func initializeDefaultOptions(opts *AutoscalerOptions) error { func initializeDefaultOptions(opts *AutoscalerOptions) error {
if opts.PodListProcessor == nil { if opts.Processors == nil {
opts.PodListProcessor = pods.NewDefaultPodListProcessor() opts.Processors = ca_processors.DefaultProcessors()
} }
return nil return nil
} }
@ -60,6 +60,6 @@ func NewAutoscaler(opts AutoscalerOptions) (Autoscaler, errors.AutoscalerError)
if err != nil { if err != nil {
return nil, errors.ToAutoscalerError(errors.InternalError, err) return nil, errors.ToAutoscalerError(errors.InternalError, err)
} }
autoscalerBuilder := NewAutoscalerBuilder(opts.AutoscalingOptions, opts.PredicateChecker, opts.KubeClient, opts.KubeEventRecorder, opts.ListerRegistry, opts.PodListProcessor) autoscalerBuilder := NewAutoscalerBuilder(opts.AutoscalingOptions, opts.PredicateChecker, opts.KubeClient, opts.KubeEventRecorder, opts.ListerRegistry, opts.Processors)
return autoscalerBuilder.Build() return autoscalerBuilder.Build()
} }

View File

@ -19,10 +19,10 @@ package core
import ( import (
"k8s.io/autoscaler/cluster-autoscaler/config/dynamic" "k8s.io/autoscaler/cluster-autoscaler/config/dynamic"
"k8s.io/autoscaler/cluster-autoscaler/context" "k8s.io/autoscaler/cluster-autoscaler/context"
ca_processors "k8s.io/autoscaler/cluster-autoscaler/processors"
"k8s.io/autoscaler/cluster-autoscaler/simulator" "k8s.io/autoscaler/cluster-autoscaler/simulator"
"k8s.io/autoscaler/cluster-autoscaler/utils/errors" "k8s.io/autoscaler/cluster-autoscaler/utils/errors"
kube_util "k8s.io/autoscaler/cluster-autoscaler/utils/kubernetes" kube_util "k8s.io/autoscaler/cluster-autoscaler/utils/kubernetes"
"k8s.io/autoscaler/cluster-autoscaler/utils/pods"
kube_client "k8s.io/client-go/kubernetes" kube_client "k8s.io/client-go/kubernetes"
kube_record "k8s.io/client-go/tools/record" kube_record "k8s.io/client-go/tools/record"
) )
@ -42,19 +42,19 @@ type AutoscalerBuilderImpl struct {
kubeEventRecorder kube_record.EventRecorder kubeEventRecorder kube_record.EventRecorder
predicateChecker *simulator.PredicateChecker predicateChecker *simulator.PredicateChecker
listerRegistry kube_util.ListerRegistry listerRegistry kube_util.ListerRegistry
podListProcessor pods.PodListProcessor processors *ca_processors.AutoscalingProcessors
} }
// NewAutoscalerBuilder builds an AutoscalerBuilder from required parameters // NewAutoscalerBuilder builds an AutoscalerBuilder from required parameters
func NewAutoscalerBuilder(autoscalingOptions context.AutoscalingOptions, predicateChecker *simulator.PredicateChecker, func NewAutoscalerBuilder(autoscalingOptions context.AutoscalingOptions, predicateChecker *simulator.PredicateChecker,
kubeClient kube_client.Interface, kubeEventRecorder kube_record.EventRecorder, listerRegistry kube_util.ListerRegistry, podListProcessor pods.PodListProcessor) *AutoscalerBuilderImpl { kubeClient kube_client.Interface, kubeEventRecorder kube_record.EventRecorder, listerRegistry kube_util.ListerRegistry, processors *ca_processors.AutoscalingProcessors) *AutoscalerBuilderImpl {
return &AutoscalerBuilderImpl{ return &AutoscalerBuilderImpl{
autoscalingOptions: autoscalingOptions, autoscalingOptions: autoscalingOptions,
kubeClient: kubeClient, kubeClient: kubeClient,
kubeEventRecorder: kubeEventRecorder, kubeEventRecorder: kubeEventRecorder,
predicateChecker: predicateChecker, predicateChecker: predicateChecker,
listerRegistry: listerRegistry, listerRegistry: listerRegistry,
podListProcessor: podListProcessor, processors: processors,
} }
} }
@ -72,5 +72,5 @@ func (b *AutoscalerBuilderImpl) Build() (Autoscaler, errors.AutoscalerError) {
c := *(b.dynamicConfig) c := *(b.dynamicConfig)
options.NodeGroups = c.NodeGroupSpecStrings() options.NodeGroups = c.NodeGroupSpecStrings()
} }
return NewStaticAutoscaler(options, b.predicateChecker, b.kubeClient, b.kubeEventRecorder, b.listerRegistry, b.podListProcessor) return NewStaticAutoscaler(options, b.predicateChecker, b.kubeClient, b.kubeEventRecorder, b.listerRegistry, b.processors)
} }

View File

@ -22,17 +22,15 @@ import (
apiv1 "k8s.io/api/core/v1" apiv1 "k8s.io/api/core/v1"
extensionsv1 "k8s.io/api/extensions/v1beta1" extensionsv1 "k8s.io/api/extensions/v1beta1"
"k8s.io/apimachinery/pkg/api/resource"
"k8s.io/autoscaler/cluster-autoscaler/cloudprovider" "k8s.io/autoscaler/cluster-autoscaler/cloudprovider"
"k8s.io/autoscaler/cluster-autoscaler/clusterstate" "k8s.io/autoscaler/cluster-autoscaler/clusterstate"
"k8s.io/autoscaler/cluster-autoscaler/context" "k8s.io/autoscaler/cluster-autoscaler/context"
"k8s.io/autoscaler/cluster-autoscaler/estimator" "k8s.io/autoscaler/cluster-autoscaler/estimator"
"k8s.io/autoscaler/cluster-autoscaler/expander" "k8s.io/autoscaler/cluster-autoscaler/expander"
"k8s.io/autoscaler/cluster-autoscaler/metrics" "k8s.io/autoscaler/cluster-autoscaler/metrics"
ca_processors "k8s.io/autoscaler/cluster-autoscaler/processors"
"k8s.io/autoscaler/cluster-autoscaler/utils/errors" "k8s.io/autoscaler/cluster-autoscaler/utils/errors"
"k8s.io/autoscaler/cluster-autoscaler/utils/glogx" "k8s.io/autoscaler/cluster-autoscaler/utils/glogx"
"k8s.io/autoscaler/cluster-autoscaler/utils/gpu"
"k8s.io/autoscaler/cluster-autoscaler/utils/labels"
"k8s.io/autoscaler/cluster-autoscaler/utils/nodegroupset" "k8s.io/autoscaler/cluster-autoscaler/utils/nodegroupset"
"k8s.io/kubernetes/pkg/scheduler/schedulercache" "k8s.io/kubernetes/pkg/scheduler/schedulercache"
@ -42,8 +40,8 @@ import (
// ScaleUp tries to scale the cluster up. Return true if it found a way to increase the size, // ScaleUp tries to scale the cluster up. Return true if it found a way to increase the size,
// false if it didn't and error if an error occurred. Assumes that all nodes in the cluster are // false if it didn't and error if an error occurred. Assumes that all nodes in the cluster are
// ready and in sync with instance groups. // ready and in sync with instance groups.
func ScaleUp(context *context.AutoscalingContext, clusterStateRegistry *clusterstate.ClusterStateRegistry, unschedulablePods []*apiv1.Pod, nodes []*apiv1.Node, func ScaleUp(context *context.AutoscalingContext, processors *ca_processors.AutoscalingProcessors, clusterStateRegistry *clusterstate.ClusterStateRegistry, unschedulablePods []*apiv1.Pod,
daemonSets []*extensionsv1.DaemonSet) (bool, errors.AutoscalerError) { nodes []*apiv1.Node, daemonSets []*extensionsv1.DaemonSet) (bool, errors.AutoscalerError) {
// From now on we only care about unschedulable pods that were marked after the newest // From now on we only care about unschedulable pods that were marked after the newest
// node became available for the scheduler. // node became available for the scheduler.
if len(unschedulablePods) == 0 { if len(unschedulablePods) == 0 {
@ -97,8 +95,12 @@ func ScaleUp(context *context.AutoscalingContext, clusterStateRegistry *clusters
podsPassingPredicates := make(map[string][]*apiv1.Pod) podsPassingPredicates := make(map[string][]*apiv1.Pod)
expansionOptions := make([]expander.Option, 0) expansionOptions := make([]expander.Option, 0)
if context.AutoscalingOptions.NodeAutoprovisioningEnabled { if processors != nil && processors.NodeGroupListProcessor != nil {
nodeGroups, nodeInfos = addAutoprovisionedCandidates(context, nodeGroups, nodeInfos, unschedulablePods) var errProc error
nodeGroups, nodeInfos, errProc = processors.NodeGroupListProcessor.Process(context, nodeGroups, nodeInfos, unschedulablePods)
if errProc != nil {
return false, errors.ToAutoscalerError(errors.InternalError, errProc)
}
} }
for _, nodeGroup := range nodeGroups { for _, nodeGroup := range nodeGroups {
@ -356,77 +358,6 @@ func executeScaleUp(context *context.AutoscalingContext, clusterStateRegistry *c
return nil return nil
} }
func addAutoprovisionedCandidates(context *context.AutoscalingContext, nodeGroups []cloudprovider.NodeGroup,
nodeInfos map[string]*schedulercache.NodeInfo, unschedulablePods []*apiv1.Pod) ([]cloudprovider.NodeGroup,
map[string]*schedulercache.NodeInfo) {
autoprovisionedNodeGroupCount := 0
for _, group := range nodeGroups {
if group.Autoprovisioned() {
autoprovisionedNodeGroupCount++
}
}
if autoprovisionedNodeGroupCount >= context.MaxAutoprovisionedNodeGroupCount {
glog.V(4).Infof("Max autoprovisioned node group count reached")
return nodeGroups, nodeInfos
}
newGroupsCount := 0
newNodeGroups := addAllMachineTypesForConfig(context, map[string]string{}, map[string]resource.Quantity{},
nodeInfos, unschedulablePods)
newGroupsCount += len(newNodeGroups)
nodeGroups = append(nodeGroups, newNodeGroups...)
gpuRequests := gpu.GetGpuRequests(unschedulablePods)
for _, gpuRequestInfo := range gpuRequests {
glog.V(4).Info("Adding node groups using GPU to NAP simulations")
extraResources := map[string]resource.Quantity{
gpu.ResourceNvidiaGPU: gpuRequestInfo.MaxRequest,
}
newNodeGroups := addAllMachineTypesForConfig(context, gpuRequestInfo.SystemLabels, extraResources,
nodeInfos, gpuRequestInfo.Pods)
newGroupsCount += len(newNodeGroups)
nodeGroups = append(nodeGroups, newNodeGroups...)
}
glog.V(4).Infof("Considering %v potential node groups in NAP simulations", newGroupsCount)
return nodeGroups, nodeInfos
}
func addAllMachineTypesForConfig(context *context.AutoscalingContext, systemLabels map[string]string, extraResources map[string]resource.Quantity,
nodeInfos map[string]*schedulercache.NodeInfo, unschedulablePods []*apiv1.Pod) []cloudprovider.NodeGroup {
nodeGroups := make([]cloudprovider.NodeGroup, 0)
machines, err := context.CloudProvider.GetAvailableMachineTypes()
if err != nil {
glog.Warningf("Failed to get machine types: %v", err)
return nodeGroups
}
bestLabels := labels.BestLabelSet(unschedulablePods)
taints := make([]apiv1.Taint, 0)
for _, machineType := range machines {
nodeGroup, err := context.CloudProvider.NewNodeGroup(machineType, bestLabels, systemLabels, taints, extraResources)
if err != nil {
// We don't check if a given node group setup is allowed.
// It's fine if it isn't, just don't consider it an option.
if err != cloudprovider.ErrIllegalConfiguration {
glog.Warningf("Unable to build temporary node group for %s: %v", machineType, err)
}
continue
}
nodeInfo, err := nodeGroup.TemplateNodeInfo()
if err != nil {
glog.Warningf("Unable to build template for node group for %s: %v", nodeGroup.Id(), err)
continue
}
nodeInfos[nodeGroup.Id()] = nodeInfo
nodeGroups = append(nodeGroups, nodeGroup)
}
return nodeGroups
}
func calculateClusterCoresMemoryTotal(nodeGroups []cloudprovider.NodeGroup, nodeInfos map[string]*schedulercache.NodeInfo) (int64, int64) { func calculateClusterCoresMemoryTotal(nodeGroups []cloudprovider.NodeGroup, nodeInfos map[string]*schedulercache.NodeInfo) (int64, int64) {
var coresTotal int64 var coresTotal int64
var memoryTotal int64 var memoryTotal int64

View File

@ -31,6 +31,8 @@ import (
"k8s.io/autoscaler/cluster-autoscaler/context" "k8s.io/autoscaler/cluster-autoscaler/context"
"k8s.io/autoscaler/cluster-autoscaler/estimator" "k8s.io/autoscaler/cluster-autoscaler/estimator"
"k8s.io/autoscaler/cluster-autoscaler/expander/random" "k8s.io/autoscaler/cluster-autoscaler/expander/random"
ca_processors "k8s.io/autoscaler/cluster-autoscaler/processors"
"k8s.io/autoscaler/cluster-autoscaler/processors/nodegroups"
"k8s.io/autoscaler/cluster-autoscaler/simulator" "k8s.io/autoscaler/cluster-autoscaler/simulator"
kube_util "k8s.io/autoscaler/cluster-autoscaler/utils/kubernetes" kube_util "k8s.io/autoscaler/cluster-autoscaler/utils/kubernetes"
. "k8s.io/autoscaler/cluster-autoscaler/utils/test" . "k8s.io/autoscaler/cluster-autoscaler/utils/test"
@ -368,7 +370,9 @@ func simpleScaleUpTest(t *testing.T, config *scaleTestConfig) {
extraPods[i] = pod extraPods[i] = pod
} }
result, err := ScaleUp(context, clusterState, extraPods, nodes, []*extensionsv1.DaemonSet{}) processors := ca_processors.TestProcessors()
result, err := ScaleUp(context, processors, clusterState, extraPods, nodes, []*extensionsv1.DaemonSet{})
assert.NoError(t, err) assert.NoError(t, err)
assert.True(t, result) assert.True(t, result)
@ -470,7 +474,9 @@ func TestScaleUpNodeComingNoScale(t *testing.T) {
} }
p3 := BuildTestPod("p-new", 550, 0) p3 := BuildTestPod("p-new", 550, 0)
result, err := ScaleUp(context, clusterState, []*apiv1.Pod{p3}, []*apiv1.Node{n1, n2}, []*extensionsv1.DaemonSet{}) processors := ca_processors.TestProcessors()
result, err := ScaleUp(context, processors, clusterState, []*apiv1.Pod{p3}, []*apiv1.Node{n1, n2}, []*extensionsv1.DaemonSet{})
assert.NoError(t, err) assert.NoError(t, err)
// A node is already coming - no need for scale up. // A node is already coming - no need for scale up.
assert.False(t, result) assert.False(t, result)
@ -532,7 +538,9 @@ func TestScaleUpNodeComingHasScale(t *testing.T) {
} }
p3 := BuildTestPod("p-new", 550, 0) p3 := BuildTestPod("p-new", 550, 0)
result, err := ScaleUp(context, clusterState, []*apiv1.Pod{p3, p3}, []*apiv1.Node{n1, n2}, []*extensionsv1.DaemonSet{}) processors := ca_processors.TestProcessors()
result, err := ScaleUp(context, processors, clusterState, []*apiv1.Pod{p3, p3}, []*apiv1.Node{n1, n2}, []*extensionsv1.DaemonSet{})
assert.NoError(t, err) assert.NoError(t, err)
// Two nodes needed but one node is already coming, so it should increase by one. // Two nodes needed but one node is already coming, so it should increase by one.
assert.True(t, result) assert.True(t, result)
@ -591,7 +599,9 @@ func TestScaleUpUnhealthy(t *testing.T) {
} }
p3 := BuildTestPod("p-new", 550, 0) p3 := BuildTestPod("p-new", 550, 0)
result, err := ScaleUp(context, clusterState, []*apiv1.Pod{p3}, []*apiv1.Node{n1, n2}, []*extensionsv1.DaemonSet{}) processors := ca_processors.TestProcessors()
result, err := ScaleUp(context, processors, clusterState, []*apiv1.Pod{p3}, []*apiv1.Node{n1, n2}, []*extensionsv1.DaemonSet{})
assert.NoError(t, err) assert.NoError(t, err)
// Node group is unhealthy. // Node group is unhealthy.
assert.False(t, result) assert.False(t, result)
@ -641,7 +651,9 @@ func TestScaleUpNoHelp(t *testing.T) {
} }
p3 := BuildTestPod("p-new", 500, 0) p3 := BuildTestPod("p-new", 500, 0)
result, err := ScaleUp(context, clusterState, []*apiv1.Pod{p3}, []*apiv1.Node{n1}, []*extensionsv1.DaemonSet{}) processors := ca_processors.TestProcessors()
result, err := ScaleUp(context, processors, clusterState, []*apiv1.Pod{p3}, []*apiv1.Node{n1}, []*extensionsv1.DaemonSet{})
assert.NoError(t, err) assert.NoError(t, err)
assert.False(t, result) assert.False(t, result)
var event string var event string
@ -725,7 +737,9 @@ func TestScaleUpBalanceGroups(t *testing.T) {
pods = append(pods, BuildTestPod(fmt.Sprintf("test-pod-%v", i), 80, 0)) pods = append(pods, BuildTestPod(fmt.Sprintf("test-pod-%v", i), 80, 0))
} }
result, typedErr := ScaleUp(context, clusterState, pods, nodes, []*extensionsv1.DaemonSet{}) processors := ca_processors.TestProcessors()
result, typedErr := ScaleUp(context, processors, clusterState, pods, nodes, []*extensionsv1.DaemonSet{})
assert.NoError(t, typedErr) assert.NoError(t, typedErr)
assert.True(t, result) assert.True(t, result)
groupMap := make(map[string]cloudprovider.NodeGroup, 3) groupMap := make(map[string]cloudprovider.NodeGroup, 3)
@ -783,71 +797,12 @@ func TestScaleUpAutoprovisionedNodeGroup(t *testing.T) {
LogRecorder: fakeLogRecorder, LogRecorder: fakeLogRecorder,
} }
result, err := ScaleUp(context, clusterState, []*apiv1.Pod{p1}, []*apiv1.Node{}, []*extensionsv1.DaemonSet{}) processors := ca_processors.TestProcessors()
processors.NodeGroupListProcessor = nodegroups.NewAutoprovisioningNodeGroupListProcessor()
result, err := ScaleUp(context, processors, clusterState, []*apiv1.Pod{p1}, []*apiv1.Node{}, []*extensionsv1.DaemonSet{})
assert.NoError(t, err) assert.NoError(t, err)
assert.True(t, result) assert.True(t, result)
assert.Equal(t, "autoprovisioned-T1", getStringFromChan(createdGroups)) assert.Equal(t, "autoprovisioned-T1", getStringFromChan(createdGroups))
assert.Equal(t, "autoprovisioned-T1-1", getStringFromChan(expandedGroups)) assert.Equal(t, "autoprovisioned-T1-1", getStringFromChan(expandedGroups))
} }
func TestAddAutoprovisionedCandidatesOK(t *testing.T) {
t1 := BuildTestNode("t1", 4000, 1000000)
ti1 := schedulercache.NewNodeInfo()
ti1.SetNode(t1)
p1 := BuildTestPod("p1", 100, 100)
n1 := BuildTestNode("ng1-xxx", 4000, 1000000)
ni1 := schedulercache.NewNodeInfo()
ni1.SetNode(n1)
provider := testprovider.NewTestAutoprovisioningCloudProvider(nil, nil,
nil, nil,
[]string{"T1"}, map[string]*schedulercache.NodeInfo{"T1": ti1})
provider.AddNodeGroup("ng1", 1, 5, 3)
context := &context.AutoscalingContext{
AutoscalingOptions: context.AutoscalingOptions{
MaxAutoprovisionedNodeGroupCount: 1,
},
CloudProvider: provider,
}
nodeGroups := provider.NodeGroups()
nodeInfos := map[string]*schedulercache.NodeInfo{
"ng1": ni1,
}
nodeGroups, nodeInfos = addAutoprovisionedCandidates(context, nodeGroups, nodeInfos, []*apiv1.Pod{p1})
assert.Equal(t, 2, len(nodeGroups))
assert.Equal(t, 2, len(nodeInfos))
}
func TestAddAutoprovisionedCandidatesToMany(t *testing.T) {
t1 := BuildTestNode("T1-abc", 4000, 1000000)
ti1 := schedulercache.NewNodeInfo()
ti1.SetNode(t1)
x1 := BuildTestNode("X1-cde", 4000, 1000000)
xi1 := schedulercache.NewNodeInfo()
xi1.SetNode(x1)
p1 := BuildTestPod("p1", 100, 100)
provider := testprovider.NewTestAutoprovisioningCloudProvider(nil, nil,
nil, nil,
[]string{"T1", "X1"},
map[string]*schedulercache.NodeInfo{"T1": ti1, "X1": xi1})
provider.AddAutoprovisionedNodeGroup("autoprovisioned-X1", 0, 1000, 0, "X1")
context := &context.AutoscalingContext{
AutoscalingOptions: context.AutoscalingOptions{
MaxAutoprovisionedNodeGroupCount: 1,
},
CloudProvider: provider,
}
nodeGroups := provider.NodeGroups()
nodeInfos := map[string]*schedulercache.NodeInfo{"X1": xi1}
nodeGroups, nodeInfos = addAutoprovisionedCandidates(context, nodeGroups, nodeInfos, []*apiv1.Pod{p1})
assert.Equal(t, 1, len(nodeGroups))
assert.Equal(t, 1, len(nodeInfos))
}

View File

@ -23,11 +23,11 @@ import (
"k8s.io/autoscaler/cluster-autoscaler/clusterstate/utils" "k8s.io/autoscaler/cluster-autoscaler/clusterstate/utils"
"k8s.io/autoscaler/cluster-autoscaler/context" "k8s.io/autoscaler/cluster-autoscaler/context"
"k8s.io/autoscaler/cluster-autoscaler/metrics" "k8s.io/autoscaler/cluster-autoscaler/metrics"
ca_processors "k8s.io/autoscaler/cluster-autoscaler/processors"
"k8s.io/autoscaler/cluster-autoscaler/simulator" "k8s.io/autoscaler/cluster-autoscaler/simulator"
"k8s.io/autoscaler/cluster-autoscaler/utils/errors" "k8s.io/autoscaler/cluster-autoscaler/utils/errors"
"k8s.io/autoscaler/cluster-autoscaler/utils/gpu" "k8s.io/autoscaler/cluster-autoscaler/utils/gpu"
kube_util "k8s.io/autoscaler/cluster-autoscaler/utils/kubernetes" kube_util "k8s.io/autoscaler/cluster-autoscaler/utils/kubernetes"
"k8s.io/autoscaler/cluster-autoscaler/utils/pods"
"k8s.io/autoscaler/cluster-autoscaler/utils/tpu" "k8s.io/autoscaler/cluster-autoscaler/utils/tpu"
apiv1 "k8s.io/api/core/v1" apiv1 "k8s.io/api/core/v1"
@ -60,14 +60,14 @@ type StaticAutoscaler struct {
lastScaleDownDeleteTime time.Time lastScaleDownDeleteTime time.Time
lastScaleDownFailTime time.Time lastScaleDownFailTime time.Time
scaleDown *ScaleDown scaleDown *ScaleDown
podListProcessor pods.PodListProcessor processors *ca_processors.AutoscalingProcessors
initialized bool initialized bool
} }
// NewStaticAutoscaler creates an instance of Autoscaler filled with provided parameters // NewStaticAutoscaler creates an instance of Autoscaler filled with provided parameters
func NewStaticAutoscaler(opts context.AutoscalingOptions, predicateChecker *simulator.PredicateChecker, func NewStaticAutoscaler(opts context.AutoscalingOptions, predicateChecker *simulator.PredicateChecker,
kubeClient kube_client.Interface, kubeEventRecorder kube_record.EventRecorder, listerRegistry kube_util.ListerRegistry, kubeClient kube_client.Interface, kubeEventRecorder kube_record.EventRecorder, listerRegistry kube_util.ListerRegistry,
podListProcessor pods.PodListProcessor) (*StaticAutoscaler, errors.AutoscalerError) { processors *ca_processors.AutoscalingProcessors) (*StaticAutoscaler, errors.AutoscalerError) {
logRecorder, err := utils.NewStatusMapRecorder(kubeClient, opts.ConfigNamespace, kubeEventRecorder, opts.WriteStatusConfigMap) logRecorder, err := utils.NewStatusMapRecorder(kubeClient, opts.ConfigNamespace, kubeEventRecorder, opts.WriteStatusConfigMap)
if err != nil { if err != nil {
glog.Error("Failed to initialize status configmap, unable to write status events") glog.Error("Failed to initialize status configmap, unable to write status events")
@ -97,7 +97,7 @@ func NewStaticAutoscaler(opts context.AutoscalingOptions, predicateChecker *simu
lastScaleDownDeleteTime: time.Now(), lastScaleDownDeleteTime: time.Now(),
lastScaleDownFailTime: time.Now(), lastScaleDownFailTime: time.Now(),
scaleDown: scaleDown, scaleDown: scaleDown,
podListProcessor: podListProcessor, processors: processors,
clusterStateRegistry: clusterStateRegistry, clusterStateRegistry: clusterStateRegistry,
}, nil }, nil
} }
@ -210,7 +210,7 @@ func (a *StaticAutoscaler) RunOnce(currentTime time.Time) errors.AutoscalerError
return errors.ToAutoscalerError(errors.ApiCallError, err) return errors.ToAutoscalerError(errors.ApiCallError, err)
} }
allUnschedulablePods, allScheduled, err = a.podListProcessor.Process(a.AutoscalingContext, allUnschedulablePods, allScheduled, allNodes) allUnschedulablePods, allScheduled, err = a.processors.PodListProcessor.Process(a.AutoscalingContext, allUnschedulablePods, allScheduled, allNodes)
if err != nil { if err != nil {
glog.Errorf("Failed to process pod list: %v", err) glog.Errorf("Failed to process pod list: %v", err)
return errors.ToAutoscalerError(errors.InternalError, err) return errors.ToAutoscalerError(errors.InternalError, err)
@ -275,7 +275,7 @@ func (a *StaticAutoscaler) RunOnce(currentTime time.Time) errors.AutoscalerError
scaleUpStart := time.Now() scaleUpStart := time.Now()
metrics.UpdateLastTime(metrics.ScaleUp, scaleUpStart) metrics.UpdateLastTime(metrics.ScaleUp, scaleUpStart)
scaledUp, typedErr := ScaleUp(autoscalingContext, a.clusterStateRegistry, unschedulablePodsToHelp, readyNodes, daemonsets) scaledUp, typedErr := ScaleUp(autoscalingContext, a.processors, a.clusterStateRegistry, unschedulablePodsToHelp, readyNodes, daemonsets)
metrics.UpdateDurationFromStart(metrics.ScaleUp, scaleUpStart) metrics.UpdateDurationFromStart(metrics.ScaleUp, scaleUpStart)

View File

@ -27,9 +27,10 @@ import (
"k8s.io/autoscaler/cluster-autoscaler/context" "k8s.io/autoscaler/cluster-autoscaler/context"
"k8s.io/autoscaler/cluster-autoscaler/estimator" "k8s.io/autoscaler/cluster-autoscaler/estimator"
"k8s.io/autoscaler/cluster-autoscaler/expander/random" "k8s.io/autoscaler/cluster-autoscaler/expander/random"
ca_processors "k8s.io/autoscaler/cluster-autoscaler/processors"
"k8s.io/autoscaler/cluster-autoscaler/processors/nodegroups"
"k8s.io/autoscaler/cluster-autoscaler/simulator" "k8s.io/autoscaler/cluster-autoscaler/simulator"
kube_util "k8s.io/autoscaler/cluster-autoscaler/utils/kubernetes" kube_util "k8s.io/autoscaler/cluster-autoscaler/utils/kubernetes"
"k8s.io/autoscaler/cluster-autoscaler/utils/pods"
scheduler_util "k8s.io/autoscaler/cluster-autoscaler/utils/scheduler" scheduler_util "k8s.io/autoscaler/cluster-autoscaler/utils/scheduler"
. "k8s.io/autoscaler/cluster-autoscaler/utils/test" . "k8s.io/autoscaler/cluster-autoscaler/utils/test"
@ -200,7 +201,7 @@ func TestStaticAutoscalerRunOnce(t *testing.T) {
lastScaleUpTime: time.Now(), lastScaleUpTime: time.Now(),
lastScaleDownFailTime: time.Now(), lastScaleDownFailTime: time.Now(),
scaleDown: sd, scaleDown: sd,
podListProcessor: pods.NewDefaultPodListProcessor(), processors: ca_processors.TestProcessors(),
initialized: true} initialized: true}
// MaxNodesTotal reached. // MaxNodesTotal reached.
@ -348,6 +349,8 @@ func TestStaticAutoscalerRunOnceWithAutoprovisionedEnabled(t *testing.T) {
clusterState := clusterstate.NewClusterStateRegistry(provider, clusterStateConfig, fakeLogRecorder) clusterState := clusterstate.NewClusterStateRegistry(provider, clusterStateConfig, fakeLogRecorder)
clusterState.UpdateNodes([]*apiv1.Node{n1}, time.Now()) clusterState.UpdateNodes([]*apiv1.Node{n1}, time.Now())
processors := ca_processors.TestProcessors()
processors.NodeGroupListProcessor = nodegroups.NewAutoprovisioningNodeGroupListProcessor()
context := &context.AutoscalingContext{ context := &context.AutoscalingContext{
AutoscalingOptions: context.AutoscalingOptions{ AutoscalingOptions: context.AutoscalingOptions{
EstimatorName: estimator.BinpackingEstimatorName, EstimatorName: estimator.BinpackingEstimatorName,
@ -380,7 +383,7 @@ func TestStaticAutoscalerRunOnceWithAutoprovisionedEnabled(t *testing.T) {
lastScaleUpTime: time.Now(), lastScaleUpTime: time.Now(),
lastScaleDownFailTime: time.Now(), lastScaleDownFailTime: time.Now(),
scaleDown: sd, scaleDown: sd,
podListProcessor: pods.NewDefaultPodListProcessor(), processors: processors,
initialized: true} initialized: true}
// Scale up. // Scale up.
@ -518,7 +521,7 @@ func TestStaticAutoscalerRunOnceWithALongUnregisteredNode(t *testing.T) {
lastScaleUpTime: time.Now(), lastScaleUpTime: time.Now(),
lastScaleDownFailTime: time.Now(), lastScaleDownFailTime: time.Now(),
scaleDown: sd, scaleDown: sd,
podListProcessor: pods.NewDefaultPodListProcessor()} processors: ca_processors.TestProcessors()}
// Scale up. // Scale up.
readyNodeListerMock.On("List").Return([]*apiv1.Node{n1}, nil).Times(2) // due to initialized=false readyNodeListerMock.On("List").Return([]*apiv1.Node{n1}, nil).Times(2) // due to initialized=false
@ -655,7 +658,7 @@ func TestStaticAutoscalerRunOncePodsWithPriorities(t *testing.T) {
lastScaleUpTime: time.Now(), lastScaleUpTime: time.Now(),
lastScaleDownFailTime: time.Now(), lastScaleDownFailTime: time.Now(),
scaleDown: sd, scaleDown: sd,
podListProcessor: pods.NewDefaultPodListProcessor()} processors: ca_processors.TestProcessors()}
// Scale up // Scale up
readyNodeListerMock.On("List").Return([]*apiv1.Node{n1, n2, n3}, nil).Times(2) // due to initialized=false readyNodeListerMock.On("List").Return([]*apiv1.Node{n1, n2, n3}, nil).Times(2) // due to initialized=false

View File

@ -0,0 +1,113 @@
/*
Copyright 2018 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package nodegroups
import (
apiv1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/resource"
"k8s.io/autoscaler/cluster-autoscaler/cloudprovider"
"k8s.io/autoscaler/cluster-autoscaler/context"
"k8s.io/autoscaler/cluster-autoscaler/utils/gpu"
"k8s.io/autoscaler/cluster-autoscaler/utils/labels"
"k8s.io/kubernetes/pkg/scheduler/schedulercache"
"github.com/golang/glog"
)
// AutoprovisioningNodeGroupListProcessor adds autoprovisioning candidates to consider in scale-up.
type AutoprovisioningNodeGroupListProcessor struct {
}
// NewAutoprovisioningNodeGroupListProcessor creates an instance of NodeGroupListProcessor.
func NewAutoprovisioningNodeGroupListProcessor() NodeGroupListProcessor {
return &AutoprovisioningNodeGroupListProcessor{}
}
// Process processes lists of unschedulable and sheduled pods before scaling of the cluster.
func (p *AutoprovisioningNodeGroupListProcessor) Process(context *context.AutoscalingContext, nodeGroups []cloudprovider.NodeGroup, nodeInfos map[string]*schedulercache.NodeInfo,
unschedulablePods []*apiv1.Pod) ([]cloudprovider.NodeGroup, map[string]*schedulercache.NodeInfo, error) {
if !context.AutoscalingOptions.NodeAutoprovisioningEnabled {
return nodeGroups, nodeInfos, nil
}
autoprovisionedNodeGroupCount := 0
for _, group := range nodeGroups {
if group.Autoprovisioned() {
autoprovisionedNodeGroupCount++
}
}
if autoprovisionedNodeGroupCount >= context.MaxAutoprovisionedNodeGroupCount {
glog.V(4).Infof("Max autoprovisioned node group count reached")
return nodeGroups, nodeInfos, nil
}
newGroupsCount := 0
newNodeGroups := addAllMachineTypesForConfig(context, map[string]string{}, map[string]resource.Quantity{},
nodeInfos, unschedulablePods)
newGroupsCount += len(newNodeGroups)
nodeGroups = append(nodeGroups, newNodeGroups...)
gpuRequests := gpu.GetGpuRequests(unschedulablePods)
for _, gpuRequestInfo := range gpuRequests {
glog.V(4).Info("Adding node groups using GPU to NAP simulations")
extraResources := map[string]resource.Quantity{
gpu.ResourceNvidiaGPU: gpuRequestInfo.MaxRequest,
}
newNodeGroups := addAllMachineTypesForConfig(context, gpuRequestInfo.SystemLabels, extraResources,
nodeInfos, gpuRequestInfo.Pods)
newGroupsCount += len(newNodeGroups)
nodeGroups = append(nodeGroups, newNodeGroups...)
}
glog.V(4).Infof("Considering %v potential node groups in NAP simulations", newGroupsCount)
return nodeGroups, nodeInfos, nil
}
func addAllMachineTypesForConfig(context *context.AutoscalingContext, systemLabels map[string]string, extraResources map[string]resource.Quantity,
nodeInfos map[string]*schedulercache.NodeInfo, unschedulablePods []*apiv1.Pod) []cloudprovider.NodeGroup {
nodeGroups := make([]cloudprovider.NodeGroup, 0)
machines, err := context.CloudProvider.GetAvailableMachineTypes()
if err != nil {
glog.Warningf("Failed to get machine types: %v", err)
return nodeGroups
}
bestLabels := labels.BestLabelSet(unschedulablePods)
taints := make([]apiv1.Taint, 0)
for _, machineType := range machines {
nodeGroup, err := context.CloudProvider.NewNodeGroup(machineType, bestLabels, systemLabels, taints, extraResources)
if err != nil {
// We don't check if a given node group setup is allowed.
// It's fine if it isn't, just don't consider it an option.
if err != cloudprovider.ErrIllegalConfiguration {
glog.Warningf("Unable to build temporary node group for %s: %v", machineType, err)
}
continue
}
nodeInfo, err := nodeGroup.TemplateNodeInfo()
if err != nil {
glog.Warningf("Unable to build template for node group for %s: %v", nodeGroup.Id(), err)
continue
}
nodeInfos[nodeGroup.Id()] = nodeInfo
nodeGroups = append(nodeGroups, nodeGroup)
}
return nodeGroups
}

View File

@ -0,0 +1,101 @@
/*
Copyright 2018 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package nodegroups
import (
"testing"
apiv1 "k8s.io/api/core/v1"
testprovider "k8s.io/autoscaler/cluster-autoscaler/cloudprovider/test"
"k8s.io/autoscaler/cluster-autoscaler/context"
. "k8s.io/autoscaler/cluster-autoscaler/utils/test"
"k8s.io/kubernetes/pkg/scheduler/schedulercache"
"github.com/stretchr/testify/assert"
)
func TestAutoprovisioningNGLProcessor(t *testing.T) {
processor := NewAutoprovisioningNodeGroupListProcessor()
t1 := BuildTestNode("t1", 4000, 1000000)
ti1 := schedulercache.NewNodeInfo()
ti1.SetNode(t1)
p1 := BuildTestPod("p1", 100, 100)
n1 := BuildTestNode("ng1-xxx", 4000, 1000000)
ni1 := schedulercache.NewNodeInfo()
ni1.SetNode(n1)
provider := testprovider.NewTestAutoprovisioningCloudProvider(nil, nil,
nil, nil,
[]string{"T1"}, map[string]*schedulercache.NodeInfo{"T1": ti1})
provider.AddNodeGroup("ng1", 1, 5, 3)
context := &context.AutoscalingContext{
AutoscalingOptions: context.AutoscalingOptions{
MaxAutoprovisionedNodeGroupCount: 1,
NodeAutoprovisioningEnabled: true,
},
CloudProvider: provider,
}
nodeGroups := provider.NodeGroups()
nodeInfos := map[string]*schedulercache.NodeInfo{
"ng1": ni1,
}
var err error
nodeGroups, nodeInfos, err = processor.Process(context, nodeGroups, nodeInfos, []*apiv1.Pod{p1})
assert.NoError(t, err)
assert.Equal(t, 2, len(nodeGroups))
assert.Equal(t, 2, len(nodeInfos))
}
func TestAutoprovisioningNGLProcessorTooMany(t *testing.T) {
processor := NewAutoprovisioningNodeGroupListProcessor()
t1 := BuildTestNode("T1-abc", 4000, 1000000)
ti1 := schedulercache.NewNodeInfo()
ti1.SetNode(t1)
x1 := BuildTestNode("X1-cde", 4000, 1000000)
xi1 := schedulercache.NewNodeInfo()
xi1.SetNode(x1)
p1 := BuildTestPod("p1", 100, 100)
provider := testprovider.NewTestAutoprovisioningCloudProvider(nil, nil,
nil, nil,
[]string{"T1", "X1"},
map[string]*schedulercache.NodeInfo{"T1": ti1, "X1": xi1})
provider.AddAutoprovisionedNodeGroup("autoprovisioned-X1", 0, 1000, 0, "X1")
context := &context.AutoscalingContext{
AutoscalingOptions: context.AutoscalingOptions{
MaxAutoprovisionedNodeGroupCount: 1,
NodeAutoprovisioningEnabled: true,
},
CloudProvider: provider,
}
nodeGroups := provider.NodeGroups()
nodeInfos := map[string]*schedulercache.NodeInfo{"X1": xi1}
var err error
nodeGroups, nodeInfos, err = processor.Process(context, nodeGroups, nodeInfos, []*apiv1.Pod{p1})
assert.NoError(t, err)
assert.Equal(t, 1, len(nodeGroups))
assert.Equal(t, 1, len(nodeInfos))
}

View File

@ -0,0 +1,47 @@
/*
Copyright 2018 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package nodegroups
import (
apiv1 "k8s.io/api/core/v1"
"k8s.io/autoscaler/cluster-autoscaler/cloudprovider"
"k8s.io/autoscaler/cluster-autoscaler/context"
"k8s.io/kubernetes/pkg/scheduler/schedulercache"
)
// NodeGroupListProcessor processes lists of NodeGroups considered in scale-up.
type NodeGroupListProcessor interface {
Process(context *context.AutoscalingContext, nodeGroups []cloudprovider.NodeGroup,
nodeInfos map[string]*schedulercache.NodeInfo,
unschedulablePods []*apiv1.Pod) ([]cloudprovider.NodeGroup, map[string]*schedulercache.NodeInfo, error)
}
// NoOpNodeGroupListProcessor is returning pod lists without processing them.
type NoOpNodeGroupListProcessor struct {
}
// NewDefaultNodeGroupListProcessor creates an instance of NodeGroupListProcessor.
func NewDefaultNodeGroupListProcessor() NodeGroupListProcessor {
// TODO(maciekpytel): Use a better default
return &AutoprovisioningNodeGroupListProcessor{}
}
// Process processes lists of unschedulable and sheduled pods before scaling of the cluster.
func (p *NoOpNodeGroupListProcessor) Process(context *context.AutoscalingContext, nodeGroups []cloudprovider.NodeGroup, nodeInfos map[string]*schedulercache.NodeInfo,
unschedulablePods []*apiv1.Pod) ([]cloudprovider.NodeGroup, map[string]*schedulercache.NodeInfo, error) {
return nodeGroups, nodeInfos, nil
}

View File

@ -0,0 +1,45 @@
/*
Copyright 2018 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package processors
import (
"k8s.io/autoscaler/cluster-autoscaler/processors/nodegroups"
"k8s.io/autoscaler/cluster-autoscaler/processors/pods"
)
// AutoscalingProcessors are a set of customizable processors used for encapsulating
// various heuristics used in different parts of Cluster Autoscaler code.
type AutoscalingProcessors struct {
// PodListProcessor is used to process list of unschedulable pods before autoscaling.
PodListProcessor pods.PodListProcessor
// NodeGroupListProcessor is used to process list of NodeGroups that can be used in scale-up.
NodeGroupListProcessor nodegroups.NodeGroupListProcessor
}
// DefaultProcessors returns default set of processors.
func DefaultProcessors() *AutoscalingProcessors {
return &AutoscalingProcessors{
PodListProcessor: pods.NewDefaultPodListProcessor(),
NodeGroupListProcessor: nodegroups.NewDefaultNodeGroupListProcessor()}
}
// TestProcessors returns a set of simple processors for use in tests.
func TestProcessors() *AutoscalingProcessors {
return &AutoscalingProcessors{
PodListProcessor: &pods.NoOpPodListProcessor{},
NodeGroupListProcessor: &nodegroups.NoOpNodeGroupListProcessor{}}
}