Refactor cluster autoscaler builder and add pod list processor.

This commit is contained in:
Krzysztof Jastrzebski 2018-04-12 13:41:25 +02:00
parent d0cc9bbc47
commit 88b769b324
16 changed files with 241 additions and 98 deletions

View File

@ -14,7 +14,7 @@ See the License for the specific language governing permissions and
limitations under the License.
*/
package core
package context
import (
"time"

View File

@ -14,7 +14,7 @@ See the License for the specific language governing permissions and
limitations under the License.
*/
package core
package context
import (
"testing"

View File

@ -21,17 +21,24 @@ import (
"k8s.io/autoscaler/cluster-autoscaler/cloudprovider"
"k8s.io/autoscaler/cluster-autoscaler/config/dynamic"
"k8s.io/autoscaler/cluster-autoscaler/context"
"k8s.io/autoscaler/cluster-autoscaler/simulator"
"k8s.io/autoscaler/cluster-autoscaler/utils/errors"
kube_util "k8s.io/autoscaler/cluster-autoscaler/utils/kubernetes"
"k8s.io/autoscaler/cluster-autoscaler/utils/pods"
kube_client "k8s.io/client-go/kubernetes"
kube_record "k8s.io/client-go/tools/record"
)
// AutoscalerOptions is the whole set of options for configuring an autoscaler
type AutoscalerOptions struct {
AutoscalingOptions
context.AutoscalingOptions
dynamic.ConfigFetcherOptions
KubeClient kube_client.Interface
KubeEventRecorder kube_record.EventRecorder
PredicateChecker *simulator.PredicateChecker
ListerRegistry kube_util.ListerRegistry
PodListProcessor pods.PodListProcessor
}
// Autoscaler is the main component of CA which scales up/down node groups according to its configuration
@ -47,13 +54,22 @@ type Autoscaler interface {
ExitCleanUp()
}
// NewAutoscaler creates an autoscaler of an appropriate type according to the parameters
func NewAutoscaler(opts AutoscalerOptions, predicateChecker *simulator.PredicateChecker, kubeClient kube_client.Interface,
kubeEventRecorder kube_record.EventRecorder, listerRegistry kube_util.ListerRegistry) (Autoscaler, errors.AutoscalerError) {
func initializeDefaultOptions(opts *AutoscalerOptions) error {
if opts.PodListProcessor == nil {
opts.PodListProcessor = pods.NewDefaultPodListProcessor()
}
return nil
}
autoscalerBuilder := NewAutoscalerBuilder(opts.AutoscalingOptions, predicateChecker, kubeClient, kubeEventRecorder, listerRegistry)
// NewAutoscaler creates an autoscaler of an appropriate type according to the parameters
func NewAutoscaler(opts AutoscalerOptions) (Autoscaler, errors.AutoscalerError) {
err := initializeDefaultOptions(&opts)
if err != nil {
return nil, errors.ToAutoscalerError(errors.InternalError, err)
}
autoscalerBuilder := NewAutoscalerBuilder(opts.AutoscalingOptions, opts.PredicateChecker, opts.KubeClient, opts.KubeEventRecorder, opts.ListerRegistry, opts.PodListProcessor)
if opts.ConfigMapName != "" {
configFetcher := dynamic.NewConfigFetcher(opts.ConfigFetcherOptions, kubeClient, kubeEventRecorder)
configFetcher := dynamic.NewConfigFetcher(opts.ConfigFetcherOptions, opts.KubeClient, opts.KubeEventRecorder)
return NewDynamicAutoscaler(autoscalerBuilder, configFetcher)
}
return autoscalerBuilder.Build()

View File

@ -18,9 +18,11 @@ package core
import (
"k8s.io/autoscaler/cluster-autoscaler/config/dynamic"
"k8s.io/autoscaler/cluster-autoscaler/context"
"k8s.io/autoscaler/cluster-autoscaler/simulator"
"k8s.io/autoscaler/cluster-autoscaler/utils/errors"
kube_util "k8s.io/autoscaler/cluster-autoscaler/utils/kubernetes"
"k8s.io/autoscaler/cluster-autoscaler/utils/pods"
kube_client "k8s.io/client-go/kubernetes"
kube_record "k8s.io/client-go/tools/record"
)
@ -34,23 +36,25 @@ type AutoscalerBuilder interface {
// AutoscalerBuilderImpl builds new autoscalers from its state including initial `AutoscalingOptions` given at startup and
// `dynamic.Config` read on demand from the configmap
type AutoscalerBuilderImpl struct {
autoscalingOptions AutoscalingOptions
autoscalingOptions context.AutoscalingOptions
dynamicConfig *dynamic.Config
kubeClient kube_client.Interface
kubeEventRecorder kube_record.EventRecorder
predicateChecker *simulator.PredicateChecker
listerRegistry kube_util.ListerRegistry
podListProcessor pods.PodListProcessor
}
// NewAutoscalerBuilder builds an AutoscalerBuilder from required parameters
func NewAutoscalerBuilder(autoscalingOptions AutoscalingOptions, predicateChecker *simulator.PredicateChecker,
kubeClient kube_client.Interface, kubeEventRecorder kube_record.EventRecorder, listerRegistry kube_util.ListerRegistry) *AutoscalerBuilderImpl {
func NewAutoscalerBuilder(autoscalingOptions context.AutoscalingOptions, predicateChecker *simulator.PredicateChecker,
kubeClient kube_client.Interface, kubeEventRecorder kube_record.EventRecorder, listerRegistry kube_util.ListerRegistry, podListProcessor pods.PodListProcessor) *AutoscalerBuilderImpl {
return &AutoscalerBuilderImpl{
autoscalingOptions: autoscalingOptions,
kubeClient: kubeClient,
kubeEventRecorder: kubeEventRecorder,
predicateChecker: predicateChecker,
listerRegistry: listerRegistry,
podListProcessor: podListProcessor,
}
}
@ -68,5 +72,5 @@ func (b *AutoscalerBuilderImpl) Build() (Autoscaler, errors.AutoscalerError) {
c := *(b.dynamicConfig)
options.NodeGroups = c.NodeGroupSpecStrings()
}
return NewStaticAutoscaler(options, b.predicateChecker, b.kubeClient, b.kubeEventRecorder, b.listerRegistry)
return NewStaticAutoscaler(options, b.predicateChecker, b.kubeClient, b.kubeEventRecorder, b.listerRegistry, b.podListProcessor)
}

View File

@ -60,14 +60,18 @@ func TestNewAutoscalerStatic(t *testing.T) {
return true, nil, fmt.Errorf("Wrong node: %v", getAction.GetName())
})
kubeEventRecorder := kube_util.CreateEventRecorder(fakeClient)
predicateChecker := simulator.NewTestPredicateChecker()
listerRegistry := kube_util.NewListerRegistry(nil, nil, nil, nil, nil, nil)
opts := AutoscalerOptions{
ConfigFetcherOptions: dynamic.ConfigFetcherOptions{
ConfigMapName: "",
},
PredicateChecker: predicateChecker,
KubeClient: fakeClient,
KubeEventRecorder: kubeEventRecorder,
ListerRegistry: listerRegistry,
}
predicateChecker := simulator.NewTestPredicateChecker()
listerRegistry := kube_util.NewListerRegistry(nil, nil, nil, nil, nil, nil)
a, _ := NewAutoscaler(opts, predicateChecker, fakeClient, kubeEventRecorder, listerRegistry)
a, _ := NewAutoscaler(opts)
assert.IsType(t, &StaticAutoscaler{}, a)
}
@ -97,13 +101,17 @@ func TestNewAutoscalerDynamic(t *testing.T) {
return true, nil, fmt.Errorf("Wrong node: %v", getAction.GetName())
})
kubeEventRecorder := kube_util.CreateEventRecorder(fakeClient)
predicateChecker := simulator.NewTestPredicateChecker()
listerRegistry := kube_util.NewListerRegistry(nil, nil, nil, nil, nil, nil)
opts := AutoscalerOptions{
ConfigFetcherOptions: dynamic.ConfigFetcherOptions{
ConfigMapName: "testconfigmap",
},
PredicateChecker: predicateChecker,
KubeClient: fakeClient,
KubeEventRecorder: kubeEventRecorder,
ListerRegistry: listerRegistry,
}
predicateChecker := simulator.NewTestPredicateChecker()
listerRegistry := kube_util.NewListerRegistry(nil, nil, nil, nil, nil, nil)
a, _ := NewAutoscaler(opts, predicateChecker, fakeClient, kubeEventRecorder, listerRegistry)
a, _ := NewAutoscaler(opts)
assert.IsType(t, &DynamicAutoscaler{}, a)
}

View File

@ -28,6 +28,7 @@ import (
"k8s.io/autoscaler/cluster-autoscaler/clusterstate"
"k8s.io/autoscaler/cluster-autoscaler/clusterstate/utils"
"k8s.io/autoscaler/cluster-autoscaler/config"
"k8s.io/autoscaler/cluster-autoscaler/context"
"k8s.io/autoscaler/cluster-autoscaler/metrics"
"k8s.io/autoscaler/cluster-autoscaler/simulator"
"k8s.io/autoscaler/cluster-autoscaler/utils/deletetaint"
@ -101,7 +102,7 @@ func (n *NodeDeleteStatus) SetDeleteInProgress(status bool) {
// ScaleDown is responsible for maintaining the state needed to perform unneeded node removals.
type ScaleDown struct {
context *AutoscalingContext
context *context.AutoscalingContext
unneededNodes map[string]time.Time
unneededNodesList []*apiv1.Node
unremovableNodes map[string]time.Time
@ -112,7 +113,7 @@ type ScaleDown struct {
}
// NewScaleDown builds new ScaleDown object.
func NewScaleDown(context *AutoscalingContext) *ScaleDown {
func NewScaleDown(context *context.AutoscalingContext) *ScaleDown {
return &ScaleDown{
context: context,
unneededNodes: make(map[string]time.Time),
@ -648,7 +649,7 @@ func (sd *ScaleDown) waitForEmptyNodesDeleted(emptyNodes []*apiv1.Node, confirma
return finalError
}
func deleteNode(context *AutoscalingContext, node *apiv1.Node, pods []*apiv1.Pod) errors.AutoscalerError {
func deleteNode(context *context.AutoscalingContext, node *apiv1.Node, pods []*apiv1.Pod) errors.AutoscalerError {
deleteSuccessful := false
drainSuccessful := false

View File

@ -33,6 +33,7 @@ import (
"k8s.io/autoscaler/cluster-autoscaler/clusterstate"
"k8s.io/autoscaler/cluster-autoscaler/clusterstate/utils"
"k8s.io/autoscaler/cluster-autoscaler/config"
"k8s.io/autoscaler/cluster-autoscaler/context"
"k8s.io/autoscaler/cluster-autoscaler/simulator"
kube_util "k8s.io/autoscaler/cluster-autoscaler/utils/kubernetes"
scheduler_util "k8s.io/autoscaler/cluster-autoscaler/utils/scheduler"
@ -123,8 +124,8 @@ func TestFindUnneededNodes(t *testing.T) {
provider.AddNode("ng1", n8)
provider.AddNode("ng1", n9)
context := AutoscalingContext{
AutoscalingOptions: AutoscalingOptions{
context := context.AutoscalingContext{
AutoscalingOptions: context.AutoscalingOptions{
ScaleDownUtilizationThreshold: 0.35,
ExpendablePodsPriorityCutoff: 10,
},
@ -242,8 +243,8 @@ func TestPodsWithPrioritiesFindUnneededNodes(t *testing.T) {
provider.AddNode("ng1", n3)
provider.AddNode("ng1", n4)
context := AutoscalingContext{
AutoscalingOptions: AutoscalingOptions{
context := context.AutoscalingContext{
AutoscalingOptions: context.AutoscalingOptions{
ScaleDownUtilizationThreshold: 0.35,
ExpendablePodsPriorityCutoff: 10,
},
@ -298,8 +299,8 @@ func TestFindUnneededMaxCandidates(t *testing.T) {
numCandidates := 30
context := AutoscalingContext{
AutoscalingOptions: AutoscalingOptions{
context := context.AutoscalingContext{
AutoscalingOptions: context.AutoscalingOptions{
ScaleDownUtilizationThreshold: 0.35,
ScaleDownNonEmptyCandidatesCount: numCandidates,
ScaleDownCandidatesPoolRatio: 1,
@ -371,8 +372,8 @@ func TestFindUnneededEmptyNodes(t *testing.T) {
numCandidates := 30
context := AutoscalingContext{
AutoscalingOptions: AutoscalingOptions{
context := context.AutoscalingContext{
AutoscalingOptions: context.AutoscalingOptions{
ScaleDownUtilizationThreshold: 0.35,
ScaleDownNonEmptyCandidatesCount: numCandidates,
ScaleDownCandidatesPoolRatio: 1.0,
@ -422,8 +423,8 @@ func TestFindUnneededNodePool(t *testing.T) {
numCandidates := 30
context := AutoscalingContext{
AutoscalingOptions: AutoscalingOptions{
context := context.AutoscalingContext{
AutoscalingOptions: context.AutoscalingOptions{
ScaleDownUtilizationThreshold: 0.35,
ScaleDownNonEmptyCandidatesCount: numCandidates,
ScaleDownCandidatesPoolRatio: 0.1,
@ -569,8 +570,8 @@ func TestDeleteNode(t *testing.T) {
fakeLogRecorder, _ := utils.NewStatusMapRecorder(fakeClient, "kube-system", fakeRecorder, false)
// build context
context := &AutoscalingContext{
AutoscalingOptions: AutoscalingOptions{},
context := &context.AutoscalingContext{
AutoscalingOptions: context.AutoscalingOptions{},
ClientSet: fakeClient,
Recorder: fakeRecorder,
LogRecorder: fakeLogRecorder,
@ -755,8 +756,8 @@ func TestScaleDown(t *testing.T) {
fakeRecorder := kube_util.CreateEventRecorder(fakeClient)
fakeLogRecorder, _ := utils.NewStatusMapRecorder(fakeClient, "kube-system", fakeRecorder, false)
context := &AutoscalingContext{
AutoscalingOptions: AutoscalingOptions{
context := &context.AutoscalingContext{
AutoscalingOptions: context.AutoscalingOptions{
ScaleDownUtilizationThreshold: 0.5,
ScaleDownUnneededTime: time.Minute,
MaxGracefulTerminationSec: 60,
@ -811,7 +812,7 @@ func assertSubset(t *testing.T, a []string, b []string) {
}
}
var defaultScaleDownOptions = AutoscalingOptions{
var defaultScaleDownOptions = context.AutoscalingOptions{
ScaleDownUtilizationThreshold: 0.5,
ScaleDownUnneededTime: time.Minute,
MaxGracefulTerminationSec: 60,
@ -947,7 +948,7 @@ func simpleScaleDownEmpty(t *testing.T, config *scaleTestConfig) {
fakeRecorder := kube_util.CreateEventRecorder(fakeClient)
fakeLogRecorder, _ := utils.NewStatusMapRecorder(fakeClient, "kube-system", fakeRecorder, false)
context := &AutoscalingContext{
context := &context.AutoscalingContext{
AutoscalingOptions: config.options,
PredicateChecker: simulator.NewTestPredicateChecker(),
CloudProvider: provider,
@ -1024,8 +1025,8 @@ func TestNoScaleDownUnready(t *testing.T) {
fakeRecorder := kube_util.CreateEventRecorder(fakeClient)
fakeLogRecorder, _ := utils.NewStatusMapRecorder(fakeClient, "kube-system", fakeRecorder, false)
context := &AutoscalingContext{
AutoscalingOptions: AutoscalingOptions{
context := &context.AutoscalingContext{
AutoscalingOptions: context.AutoscalingOptions{
ScaleDownUtilizationThreshold: 0.5,
ScaleDownUnneededTime: time.Minute,
ScaleDownUnreadyTime: time.Hour,
@ -1132,8 +1133,8 @@ func TestScaleDownNoMove(t *testing.T) {
fakeRecorder := kube_util.CreateEventRecorder(fakeClient)
fakeLogRecorder, _ := utils.NewStatusMapRecorder(fakeClient, "kube-system", fakeRecorder, false)
context := &AutoscalingContext{
AutoscalingOptions: AutoscalingOptions{
context := &context.AutoscalingContext{
AutoscalingOptions: context.AutoscalingOptions{
ScaleDownUtilizationThreshold: 0.5,
ScaleDownUnneededTime: time.Minute,
ScaleDownUnreadyTime: time.Hour,

View File

@ -25,6 +25,7 @@ import (
"k8s.io/apimachinery/pkg/api/resource"
"k8s.io/autoscaler/cluster-autoscaler/cloudprovider"
"k8s.io/autoscaler/cluster-autoscaler/clusterstate"
"k8s.io/autoscaler/cluster-autoscaler/context"
"k8s.io/autoscaler/cluster-autoscaler/estimator"
"k8s.io/autoscaler/cluster-autoscaler/expander"
"k8s.io/autoscaler/cluster-autoscaler/metrics"
@ -42,7 +43,7 @@ import (
// ScaleUp tries to scale the cluster up. Return true if it found a way to increase the size,
// false if it didn't and error if an error occurred. Assumes that all nodes in the cluster are
// ready and in sync with instance groups.
func ScaleUp(context *AutoscalingContext, unschedulablePods []*apiv1.Pod, nodes []*apiv1.Node,
func ScaleUp(context *context.AutoscalingContext, unschedulablePods []*apiv1.Pod, nodes []*apiv1.Node,
daemonSets []*extensionsv1.DaemonSet) (bool, errors.AutoscalerError) {
// From now on we only care about unschedulable pods that were marked after the newest
// node became available for the scheduler.
@ -340,7 +341,7 @@ groupsloop:
return result
}
func executeScaleUp(context *AutoscalingContext, info nodegroupset.ScaleUpInfo) errors.AutoscalerError {
func executeScaleUp(context *context.AutoscalingContext, info nodegroupset.ScaleUpInfo) errors.AutoscalerError {
glog.V(0).Infof("Scale-up: setting group %s size to %d", info.Group.Id(), info.NewSize)
increase := info.NewSize - info.CurrentSize
if err := info.Group.IncreaseSize(increase); err != nil {
@ -362,7 +363,7 @@ func executeScaleUp(context *AutoscalingContext, info nodegroupset.ScaleUpInfo)
return nil
}
func addAutoprovisionedCandidates(context *AutoscalingContext, nodeGroups []cloudprovider.NodeGroup,
func addAutoprovisionedCandidates(context *context.AutoscalingContext, nodeGroups []cloudprovider.NodeGroup,
nodeInfos map[string]*schedulercache.NodeInfo, unschedulablePods []*apiv1.Pod) ([]cloudprovider.NodeGroup,
map[string]*schedulercache.NodeInfo) {
@ -400,7 +401,7 @@ func addAutoprovisionedCandidates(context *AutoscalingContext, nodeGroups []clou
return nodeGroups, nodeInfos
}
func addAllMachineTypesForConfig(context *AutoscalingContext, systemLabels map[string]string, extraResources map[string]resource.Quantity,
func addAllMachineTypesForConfig(context *context.AutoscalingContext, systemLabels map[string]string, extraResources map[string]resource.Quantity,
nodeInfos map[string]*schedulercache.NodeInfo, unschedulablePods []*apiv1.Pod) []cloudprovider.NodeGroup {
nodeGroups := make([]cloudprovider.NodeGroup, 0)

View File

@ -28,6 +28,7 @@ import (
"k8s.io/autoscaler/cluster-autoscaler/clusterstate"
"k8s.io/autoscaler/cluster-autoscaler/clusterstate/utils"
"k8s.io/autoscaler/cluster-autoscaler/config"
"k8s.io/autoscaler/cluster-autoscaler/context"
"k8s.io/autoscaler/cluster-autoscaler/estimator"
"k8s.io/autoscaler/cluster-autoscaler/expander/random"
"k8s.io/autoscaler/cluster-autoscaler/simulator"
@ -67,10 +68,10 @@ type scaleTestConfig struct {
expectedScaleUp string
expectedScaleUpGroup string
expectedScaleDowns []string
options AutoscalingOptions
options context.AutoscalingOptions
}
var defaultOptions = AutoscalingOptions{
var defaultOptions = context.AutoscalingOptions{
EstimatorName: estimator.BinpackingEstimatorName,
MaxCoresTotal: config.DefaultMaxClusterCores,
MaxMemoryTotal: config.DefaultMaxClusterMemory,
@ -231,7 +232,7 @@ func simpleScaleUpTest(t *testing.T, config *scaleTestConfig) {
clusterState.UpdateNodes(nodes, time.Now())
context := &AutoscalingContext{
context := &context.AutoscalingContext{
AutoscalingOptions: config.options,
PredicateChecker: simulator.NewTestPredicateChecker(),
CloudProvider: provider,
@ -313,8 +314,8 @@ func TestScaleUpNodeComingNoScale(t *testing.T) {
})
clusterState.UpdateNodes([]*apiv1.Node{n1, n2}, time.Now())
context := &AutoscalingContext{
AutoscalingOptions: AutoscalingOptions{
context := &context.AutoscalingContext{
AutoscalingOptions: context.AutoscalingOptions{
EstimatorName: estimator.BinpackingEstimatorName,
MaxCoresTotal: config.DefaultMaxClusterCores,
MaxMemoryTotal: config.DefaultMaxClusterMemory,
@ -380,7 +381,7 @@ func TestScaleUpNodeComingHasScale(t *testing.T) {
})
clusterState.UpdateNodes([]*apiv1.Node{n1, n2}, time.Now())
context := &AutoscalingContext{
context := &context.AutoscalingContext{
AutoscalingOptions: defaultOptions,
PredicateChecker: simulator.NewTestPredicateChecker(),
CloudProvider: provider,
@ -436,8 +437,8 @@ func TestScaleUpUnhealthy(t *testing.T) {
fakeLogRecorder, _ := utils.NewStatusMapRecorder(fakeClient, "kube-system", fakeRecorder, false)
clusterState := clusterstate.NewClusterStateRegistry(provider, clusterstate.ClusterStateRegistryConfig{}, fakeLogRecorder)
clusterState.UpdateNodes([]*apiv1.Node{n1, n2}, time.Now())
context := &AutoscalingContext{
AutoscalingOptions: AutoscalingOptions{
context := &context.AutoscalingContext{
AutoscalingOptions: context.AutoscalingOptions{
EstimatorName: estimator.BinpackingEstimatorName,
MaxCoresTotal: config.DefaultMaxClusterCores,
MaxMemoryTotal: config.DefaultMaxClusterMemory,
@ -487,8 +488,8 @@ func TestScaleUpNoHelp(t *testing.T) {
fakeLogRecorder, _ := utils.NewStatusMapRecorder(fakeClient, "kube-system", kube_record.NewFakeRecorder(5), false)
clusterState := clusterstate.NewClusterStateRegistry(provider, clusterstate.ClusterStateRegistryConfig{}, fakeLogRecorder)
clusterState.UpdateNodes([]*apiv1.Node{n1}, time.Now())
context := &AutoscalingContext{
AutoscalingOptions: AutoscalingOptions{
context := &context.AutoscalingContext{
AutoscalingOptions: context.AutoscalingOptions{
EstimatorName: estimator.BinpackingEstimatorName,
MaxCoresTotal: config.DefaultMaxClusterCores,
MaxMemoryTotal: config.DefaultMaxClusterMemory,
@ -567,8 +568,8 @@ func TestScaleUpBalanceGroups(t *testing.T) {
fakeLogRecorder, _ := utils.NewStatusMapRecorder(fakeClient, "kube-system", kube_record.NewFakeRecorder(5), false)
clusterState := clusterstate.NewClusterStateRegistry(provider, clusterstate.ClusterStateRegistryConfig{}, fakeLogRecorder)
clusterState.UpdateNodes(nodes, time.Now())
context := &AutoscalingContext{
AutoscalingOptions: AutoscalingOptions{
context := &context.AutoscalingContext{
AutoscalingOptions: context.AutoscalingOptions{
EstimatorName: estimator.BinpackingEstimatorName,
BalanceSimilarNodeGroups: true,
MaxCoresTotal: config.DefaultMaxClusterCores,
@ -630,8 +631,8 @@ func TestScaleUpAutoprovisionedNodeGroup(t *testing.T) {
fakeLogRecorder, _ := utils.NewStatusMapRecorder(fakeClient, "kube-system", fakeRecorder, false)
clusterState := clusterstate.NewClusterStateRegistry(provider, clusterstate.ClusterStateRegistryConfig{}, fakeLogRecorder)
context := &AutoscalingContext{
AutoscalingOptions: AutoscalingOptions{
context := &context.AutoscalingContext{
AutoscalingOptions: context.AutoscalingOptions{
EstimatorName: estimator.BinpackingEstimatorName,
MaxCoresTotal: 5000 * 64,
MaxMemoryTotal: 5000 * 64 * 20,
@ -669,8 +670,8 @@ func TestAddAutoprovisionedCandidatesOK(t *testing.T) {
[]string{"T1"}, map[string]*schedulercache.NodeInfo{"T1": ti1})
provider.AddNodeGroup("ng1", 1, 5, 3)
context := &AutoscalingContext{
AutoscalingOptions: AutoscalingOptions{
context := &context.AutoscalingContext{
AutoscalingOptions: context.AutoscalingOptions{
MaxAutoprovisionedNodeGroupCount: 1,
},
CloudProvider: provider,
@ -702,8 +703,8 @@ func TestAddAutoprovisionedCandidatesToMany(t *testing.T) {
map[string]*schedulercache.NodeInfo{"T1": ti1, "X1": xi1})
provider.AddAutoprovisionedNodeGroup("autoprovisioned-X1", 0, 1000, 0, "X1")
context := &AutoscalingContext{
AutoscalingOptions: AutoscalingOptions{
context := &context.AutoscalingContext{
AutoscalingOptions: context.AutoscalingOptions{
MaxAutoprovisionedNodeGroupCount: 1,
},
CloudProvider: provider,

View File

@ -21,11 +21,13 @@ import (
"k8s.io/autoscaler/cluster-autoscaler/cloudprovider"
"k8s.io/autoscaler/cluster-autoscaler/clusterstate/utils"
"k8s.io/autoscaler/cluster-autoscaler/context"
"k8s.io/autoscaler/cluster-autoscaler/metrics"
"k8s.io/autoscaler/cluster-autoscaler/simulator"
"k8s.io/autoscaler/cluster-autoscaler/utils/errors"
"k8s.io/autoscaler/cluster-autoscaler/utils/gpu"
kube_util "k8s.io/autoscaler/cluster-autoscaler/utils/kubernetes"
"k8s.io/autoscaler/cluster-autoscaler/utils/pods"
"k8s.io/autoscaler/cluster-autoscaler/utils/tpu"
apiv1 "k8s.io/api/core/v1"
@ -49,18 +51,20 @@ const (
// StaticAutoscaler is an autoscaler which has all the core functionality of a CA but without the reconfiguration feature
type StaticAutoscaler struct {
// AutoscalingContext consists of validated settings and options for this autoscaler
*AutoscalingContext
*context.AutoscalingContext
kube_util.ListerRegistry
startTime time.Time
lastScaleUpTime time.Time
lastScaleDownDeleteTime time.Time
lastScaleDownFailTime time.Time
scaleDown *ScaleDown
podListProcessor pods.PodListProcessor
}
// NewStaticAutoscaler creates an instance of Autoscaler filled with provided parameters
func NewStaticAutoscaler(opts AutoscalingOptions, predicateChecker *simulator.PredicateChecker,
kubeClient kube_client.Interface, kubeEventRecorder kube_record.EventRecorder, listerRegistry kube_util.ListerRegistry) (*StaticAutoscaler, errors.AutoscalerError) {
func NewStaticAutoscaler(opts context.AutoscalingOptions, predicateChecker *simulator.PredicateChecker,
kubeClient kube_client.Interface, kubeEventRecorder kube_record.EventRecorder, listerRegistry kube_util.ListerRegistry,
podListProcessor pods.PodListProcessor) (*StaticAutoscaler, errors.AutoscalerError) {
logRecorder, err := utils.NewStatusMapRecorder(kubeClient, opts.ConfigNamespace, kubeEventRecorder, opts.WriteStatusConfigMap)
if err != nil {
glog.Error("Failed to initialize status configmap, unable to write status events")
@ -68,7 +72,7 @@ func NewStaticAutoscaler(opts AutoscalingOptions, predicateChecker *simulator.Pr
// TODO(maciekpytel): recover from this after successful status configmap update?
logRecorder, _ = utils.NewStatusMapRecorder(kubeClient, opts.ConfigNamespace, kubeEventRecorder, false)
}
autoscalingContext, errctx := NewAutoscalingContext(opts, predicateChecker, kubeClient, kubeEventRecorder, logRecorder, listerRegistry)
autoscalingContext, errctx := context.NewAutoscalingContext(opts, predicateChecker, kubeClient, kubeEventRecorder, logRecorder, listerRegistry)
if errctx != nil {
return nil, errctx
}
@ -83,6 +87,7 @@ func NewStaticAutoscaler(opts AutoscalingOptions, predicateChecker *simulator.Pr
lastScaleDownDeleteTime: time.Now(),
lastScaleDownFailTime: time.Now(),
scaleDown: scaleDown,
podListProcessor: podListProcessor,
}, nil
}
@ -236,6 +241,12 @@ func (a *StaticAutoscaler) RunOnce(currentTime time.Time) errors.AutoscalerError
return errors.ToAutoscalerError(errors.ApiCallError, err)
}
allUnschedulablePods, allScheduled, err = a.podListProcessor.Process(a.AutoscalingContext, allUnschedulablePods, allScheduled, allNodes)
if err != nil {
glog.Errorf("Failed to process pod list: %v", err)
return errors.ToAutoscalerError(errors.InternalError, err)
}
ConfigurePredicateCheckerForLoop(allUnschedulablePods, allScheduled, a.PredicateChecker)
// We need to check whether pods marked as unschedulable are actually unschedulable.

View File

@ -24,10 +24,12 @@ import (
testprovider "k8s.io/autoscaler/cluster-autoscaler/cloudprovider/test"
"k8s.io/autoscaler/cluster-autoscaler/clusterstate"
"k8s.io/autoscaler/cluster-autoscaler/clusterstate/utils"
"k8s.io/autoscaler/cluster-autoscaler/context"
"k8s.io/autoscaler/cluster-autoscaler/estimator"
"k8s.io/autoscaler/cluster-autoscaler/expander/random"
"k8s.io/autoscaler/cluster-autoscaler/simulator"
kube_util "k8s.io/autoscaler/cluster-autoscaler/utils/kubernetes"
"k8s.io/autoscaler/cluster-autoscaler/utils/pods"
scheduler_util "k8s.io/autoscaler/cluster-autoscaler/utils/scheduler"
. "k8s.io/autoscaler/cluster-autoscaler/utils/test"
@ -168,8 +170,8 @@ func TestStaticAutoscalerRunOnce(t *testing.T) {
clusterState := clusterstate.NewClusterStateRegistry(provider, clusterStateConfig, fakeLogRecorder)
clusterState.UpdateNodes([]*apiv1.Node{n1, n2}, time.Now())
context := &AutoscalingContext{
AutoscalingOptions: AutoscalingOptions{
context := &context.AutoscalingContext{
AutoscalingOptions: context.AutoscalingOptions{
EstimatorName: estimator.BinpackingEstimatorName,
ScaleDownEnabled: true,
ScaleDownUtilizationThreshold: 0.5,
@ -197,7 +199,8 @@ func TestStaticAutoscalerRunOnce(t *testing.T) {
ListerRegistry: listerRegistry,
lastScaleUpTime: time.Now(),
lastScaleDownFailTime: time.Now(),
scaleDown: sd}
scaleDown: sd,
podListProcessor: pods.NewDefaultPodListProcessor()}
// MaxNodesTotal reached.
readyNodeListerMock.On("List").Return([]*apiv1.Node{n1}, nil).Once()
@ -344,8 +347,8 @@ func TestStaticAutoscalerRunOnceWithAutoprovisionedEnabled(t *testing.T) {
clusterState := clusterstate.NewClusterStateRegistry(provider, clusterStateConfig, fakeLogRecorder)
clusterState.UpdateNodes([]*apiv1.Node{n1}, time.Now())
context := &AutoscalingContext{
AutoscalingOptions: AutoscalingOptions{
context := &context.AutoscalingContext{
AutoscalingOptions: context.AutoscalingOptions{
EstimatorName: estimator.BinpackingEstimatorName,
ScaleDownEnabled: true,
ScaleDownUtilizationThreshold: 0.5,
@ -375,7 +378,8 @@ func TestStaticAutoscalerRunOnceWithAutoprovisionedEnabled(t *testing.T) {
ListerRegistry: listerRegistry,
lastScaleUpTime: time.Now(),
lastScaleDownFailTime: time.Now(),
scaleDown: sd}
scaleDown: sd,
podListProcessor: pods.NewDefaultPodListProcessor()}
// Scale up.
readyNodeListerMock.On("List").Return([]*apiv1.Node{n1}, nil).Once()
@ -481,8 +485,8 @@ func TestStaticAutoscalerRunOnceWithALongUnregisteredNode(t *testing.T) {
// broken node failed to register in time
clusterState.UpdateNodes([]*apiv1.Node{n1}, later)
context := &AutoscalingContext{
AutoscalingOptions: AutoscalingOptions{
context := &context.AutoscalingContext{
AutoscalingOptions: context.AutoscalingOptions{
EstimatorName: estimator.BinpackingEstimatorName,
ScaleDownEnabled: true,
ScaleDownUtilizationThreshold: 0.5,
@ -511,7 +515,8 @@ func TestStaticAutoscalerRunOnceWithALongUnregisteredNode(t *testing.T) {
ListerRegistry: listerRegistry,
lastScaleUpTime: time.Now(),
lastScaleDownFailTime: time.Now(),
scaleDown: sd}
scaleDown: sd,
podListProcessor: pods.NewDefaultPodListProcessor()}
// Scale up.
readyNodeListerMock.On("List").Return([]*apiv1.Node{n1}, nil).Once()
@ -617,8 +622,8 @@ func TestStaticAutoscalerRunOncePodsWithPriorities(t *testing.T) {
clusterState := clusterstate.NewClusterStateRegistry(provider, clusterStateConfig, fakeLogRecorder)
clusterState.UpdateNodes([]*apiv1.Node{n1, n2}, time.Now())
context := &AutoscalingContext{
AutoscalingOptions: AutoscalingOptions{
context := &context.AutoscalingContext{
AutoscalingOptions: context.AutoscalingOptions{
EstimatorName: estimator.BinpackingEstimatorName,
ScaleDownEnabled: true,
ScaleDownUtilizationThreshold: 0.5,
@ -647,7 +652,8 @@ func TestStaticAutoscalerRunOncePodsWithPriorities(t *testing.T) {
ListerRegistry: listerRegistry,
lastScaleUpTime: time.Now(),
lastScaleDownFailTime: time.Now(),
scaleDown: sd}
scaleDown: sd,
podListProcessor: pods.NewDefaultPodListProcessor()}
// Scale up
readyNodeListerMock.On("List").Return([]*apiv1.Node{n1, n2, n3}, nil).Once()

View File

@ -26,6 +26,7 @@ import (
"k8s.io/autoscaler/cluster-autoscaler/cloudprovider"
"k8s.io/autoscaler/cluster-autoscaler/clusterstate"
"k8s.io/autoscaler/cluster-autoscaler/clusterstate/utils"
"k8s.io/autoscaler/cluster-autoscaler/context"
"k8s.io/autoscaler/cluster-autoscaler/metrics"
"k8s.io/autoscaler/cluster-autoscaler/simulator"
"k8s.io/autoscaler/cluster-autoscaler/utils/daemonset"
@ -322,7 +323,7 @@ func sanitizeTemplateNode(node *apiv1.Node, nodeGroup string) (*apiv1.Node, erro
}
// Removes unregistered nodes if needed. Returns true if anything was removed and error if such occurred.
func removeOldUnregisteredNodes(unregisteredNodes []clusterstate.UnregisteredNode, context *AutoscalingContext,
func removeOldUnregisteredNodes(unregisteredNodes []clusterstate.UnregisteredNode, context *context.AutoscalingContext,
currentTime time.Time, logRecorder *utils.LogEventRecorder) (bool, error) {
removedAny := false
for _, unregisteredNode := range unregisteredNodes {
@ -362,7 +363,7 @@ func removeOldUnregisteredNodes(unregisteredNodes []clusterstate.UnregisteredNod
// Sets the target size of node groups to the current number of nodes in them
// if the difference was constant for a prolonged time. Returns true if managed
// to fix something.
func fixNodeGroupSize(context *AutoscalingContext, currentTime time.Time) (bool, error) {
func fixNodeGroupSize(context *context.AutoscalingContext, currentTime time.Time) (bool, error) {
fixed := false
for _, nodeGroup := range context.CloudProvider.NodeGroups() {
incorrectSize := context.ClusterStateRegistry.GetIncorrectNodeGroupSize(nodeGroup.Id())
@ -389,7 +390,7 @@ func fixNodeGroupSize(context *AutoscalingContext, currentTime time.Time) (bool,
// getPotentiallyUnneededNodes returns nodes that are:
// - managed by the cluster autoscaler
// - in groups with size > min size
func getPotentiallyUnneededNodes(context *AutoscalingContext, nodes []*apiv1.Node) []*apiv1.Node {
func getPotentiallyUnneededNodes(context *context.AutoscalingContext, nodes []*apiv1.Node) []*apiv1.Node {
result := make([]*apiv1.Node, 0, len(nodes))
nodeGroupSize := getNodeGroupSizeMap(context.CloudProvider)

View File

@ -24,6 +24,7 @@ import (
testprovider "k8s.io/autoscaler/cluster-autoscaler/cloudprovider/test"
"k8s.io/autoscaler/cluster-autoscaler/clusterstate"
"k8s.io/autoscaler/cluster-autoscaler/clusterstate/utils"
"k8s.io/autoscaler/cluster-autoscaler/context"
"k8s.io/autoscaler/cluster-autoscaler/simulator"
"k8s.io/autoscaler/cluster-autoscaler/utils/deletetaint"
scheduler_util "k8s.io/autoscaler/cluster-autoscaler/utils/scheduler"
@ -330,8 +331,8 @@ func TestRemoveOldUnregisteredNodes(t *testing.T) {
err := clusterState.UpdateNodes([]*apiv1.Node{ng1_1}, now.Add(-time.Hour))
assert.NoError(t, err)
context := &AutoscalingContext{
AutoscalingOptions: AutoscalingOptions{
context := &context.AutoscalingContext{
AutoscalingOptions: context.AutoscalingOptions{
MaxNodeProvisionTime: 45 * time.Minute,
},
CloudProvider: provider,
@ -428,8 +429,8 @@ func TestRemoveFixNodeTargetSize(t *testing.T) {
err := clusterState.UpdateNodes([]*apiv1.Node{ng1_1}, now.Add(-time.Hour))
assert.NoError(t, err)
context := &AutoscalingContext{
AutoscalingOptions: AutoscalingOptions{
context := &context.AutoscalingContext{
AutoscalingOptions: context.AutoscalingOptions{
MaxNodeProvisionTime: 45 * time.Minute,
},
CloudProvider: provider,
@ -461,7 +462,7 @@ func TestGetPotentiallyUnneededNodes(t *testing.T) {
provider.AddNode("ng1", ng1_2)
provider.AddNode("ng2", ng2_1)
context := &AutoscalingContext{
context := &context.AutoscalingContext{
CloudProvider: provider,
}

View File

@ -33,6 +33,7 @@ import (
cloudBuilder "k8s.io/autoscaler/cluster-autoscaler/cloudprovider/builder"
"k8s.io/autoscaler/cluster-autoscaler/config"
"k8s.io/autoscaler/cluster-autoscaler/config/dynamic"
"k8s.io/autoscaler/cluster-autoscaler/context"
"k8s.io/autoscaler/cluster-autoscaler/core"
"k8s.io/autoscaler/cluster-autoscaler/estimator"
"k8s.io/autoscaler/cluster-autoscaler/expander"
@ -134,7 +135,7 @@ var (
regional = flag.Bool("regional", false, "Cluster is regional.")
)
func createAutoscalerOptions() core.AutoscalerOptions {
func createAutoscalingOptions() context.AutoscalingOptions {
minCoresTotal, maxCoresTotal, err := parseMinMaxFlag(*coresTotal)
if err != nil {
glog.Fatalf("Failed to parse flags: %v", err)
@ -147,7 +148,7 @@ func createAutoscalerOptions() core.AutoscalerOptions {
minMemoryTotal = minMemoryTotal * 1024
maxMemoryTotal = maxMemoryTotal * 1024
autoscalingOpts := core.AutoscalingOptions{
return context.AutoscalingOptions{
CloudConfig: *cloudConfig,
CloudProviderName: *cloudProviderFlag,
NodeGroupAutoDiscovery: nodeGroupAutoDiscoveryFlag,
@ -183,16 +184,13 @@ func createAutoscalerOptions() core.AutoscalerOptions {
ExpendablePodsPriorityCutoff: *expendablePodsPriorityCutoff,
Regional: *regional,
}
}
configFetcherOpts := dynamic.ConfigFetcherOptions{
func createConfigFetcherOptions() dynamic.ConfigFetcherOptions {
return dynamic.ConfigFetcherOptions{
ConfigMapName: *configMapName,
Namespace: *namespace,
}
return core.AutoscalerOptions{
AutoscalingOptions: autoscalingOpts,
ConfigFetcherOptions: configFetcherOpts,
}
}
func createKubeClient() kube_client.Interface {
@ -241,8 +239,8 @@ func run(healthCheck *metrics.HealthCheck) {
metrics.RegisterAll()
kubeClient := createKubeClient()
kubeEventRecorder := kube_util.CreateEventRecorder(kubeClient)
opts := createAutoscalerOptions()
metrics.UpdateNapEnabled(opts.NodeAutoprovisioningEnabled)
autoscalingOptions := createAutoscalingOptions()
metrics.UpdateNapEnabled(autoscalingOptions.NodeAutoprovisioningEnabled)
predicateCheckerStopChannel := make(chan struct{})
predicateChecker, err := simulator.NewPredicateChecker(kubeClient, predicateCheckerStopChannel)
if err != nil {
@ -250,7 +248,16 @@ func run(healthCheck *metrics.HealthCheck) {
}
listerRegistryStopChannel := make(chan struct{})
listerRegistry := kube_util.NewListerRegistryWithDefaultListers(kubeClient, listerRegistryStopChannel)
autoscaler, err := core.NewAutoscaler(opts, predicateChecker, kubeClient, kubeEventRecorder, listerRegistry)
opts := core.AutoscalerOptions{
AutoscalingOptions: autoscalingOptions,
ConfigFetcherOptions: createConfigFetcherOptions(),
PredicateChecker: predicateChecker,
KubeClient: kubeClient,
KubeEventRecorder: kubeEventRecorder,
ListerRegistry: listerRegistry,
}
autoscaler, err := core.NewAutoscaler(opts)
if err != nil {
glog.Fatalf("Failed to create autoscaler: %v", err)
}

View File

@ -0,0 +1,41 @@
/*
Copyright 2018 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package pods
import (
apiv1 "k8s.io/api/core/v1"
"k8s.io/autoscaler/cluster-autoscaler/context"
)
// PodListProcessor processes lists of unschedulable and sheduled pods before scaling of the cluster.
type PodListProcessor interface {
Process(context *context.AutoscalingContext, unschedulablePods []*apiv1.Pod, allScheduled []*apiv1.Pod, nodes []*apiv1.Node) ([]*apiv1.Pod, []*apiv1.Pod, error)
}
// NoOpPodListProcessor is returning pod lists without processing them.
type NoOpPodListProcessor struct {
}
// NewDefaultPodListProcessor creates an instance of PodListProcessor.
func NewDefaultPodListProcessor() PodListProcessor {
return &NoOpPodListProcessor{}
}
// Process processes lists of unschedulable and sheduled pods before scaling of the cluster.
func (p *NoOpPodListProcessor) Process(context *context.AutoscalingContext, unschedulablePods []*apiv1.Pod, allScheduled []*apiv1.Pod, nodes []*apiv1.Node) ([]*apiv1.Pod, []*apiv1.Pod, error) {
return unschedulablePods, allScheduled, nil
}

View File

@ -0,0 +1,44 @@
/*
Copyright 2018 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package pods
import (
"testing"
apiv1 "k8s.io/api/core/v1"
"k8s.io/autoscaler/cluster-autoscaler/context"
. "k8s.io/autoscaler/cluster-autoscaler/utils/test"
)
func TestPodListProcessor(t *testing.T) {
context := &context.AutoscalingContext{}
p1 := BuildTestPod("p1", 40, 0)
p2 := BuildTestPod("p2", 400, 0)
n1 := BuildTestNode("n1", 100, 1000)
n2 := BuildTestNode("n1", 100, 1000)
unschedulablePods := []*apiv1.Pod{p1}
allScheduled := []*apiv1.Pod{p2}
nodes := []*apiv1.Node{n1, n2}
podListProcessor := NewDefaultPodListProcessor()
gotUnschedulablePods, gotAllScheduled, err := podListProcessor.Process(context, unschedulablePods, allScheduled, nodes)
if len(gotUnschedulablePods) != 1 || len(gotAllScheduled) != 1 || err != nil {
t.Errorf("Error podListProcessor.Process() = %v, %v, %v want %v, %v, nil ",
gotUnschedulablePods, gotAllScheduled, err, unschedulablePods, allScheduled)
}
}