Merge pull request #794 from krzysztof-jastrzebski/pods
Refactor cluster autoscaler builder and add pod list processor.
This commit is contained in:
commit
054f6d8650
|
|
@ -14,7 +14,7 @@ See the License for the specific language governing permissions and
|
||||||
limitations under the License.
|
limitations under the License.
|
||||||
*/
|
*/
|
||||||
|
|
||||||
package core
|
package context
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"time"
|
"time"
|
||||||
|
|
@ -14,7 +14,7 @@ See the License for the specific language governing permissions and
|
||||||
limitations under the License.
|
limitations under the License.
|
||||||
*/
|
*/
|
||||||
|
|
||||||
package core
|
package context
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"testing"
|
"testing"
|
||||||
|
|
@ -21,17 +21,24 @@ import (
|
||||||
|
|
||||||
"k8s.io/autoscaler/cluster-autoscaler/cloudprovider"
|
"k8s.io/autoscaler/cluster-autoscaler/cloudprovider"
|
||||||
"k8s.io/autoscaler/cluster-autoscaler/config/dynamic"
|
"k8s.io/autoscaler/cluster-autoscaler/config/dynamic"
|
||||||
|
"k8s.io/autoscaler/cluster-autoscaler/context"
|
||||||
"k8s.io/autoscaler/cluster-autoscaler/simulator"
|
"k8s.io/autoscaler/cluster-autoscaler/simulator"
|
||||||
"k8s.io/autoscaler/cluster-autoscaler/utils/errors"
|
"k8s.io/autoscaler/cluster-autoscaler/utils/errors"
|
||||||
kube_util "k8s.io/autoscaler/cluster-autoscaler/utils/kubernetes"
|
kube_util "k8s.io/autoscaler/cluster-autoscaler/utils/kubernetes"
|
||||||
|
"k8s.io/autoscaler/cluster-autoscaler/utils/pods"
|
||||||
kube_client "k8s.io/client-go/kubernetes"
|
kube_client "k8s.io/client-go/kubernetes"
|
||||||
kube_record "k8s.io/client-go/tools/record"
|
kube_record "k8s.io/client-go/tools/record"
|
||||||
)
|
)
|
||||||
|
|
||||||
// AutoscalerOptions is the whole set of options for configuring an autoscaler
|
// AutoscalerOptions is the whole set of options for configuring an autoscaler
|
||||||
type AutoscalerOptions struct {
|
type AutoscalerOptions struct {
|
||||||
AutoscalingOptions
|
context.AutoscalingOptions
|
||||||
dynamic.ConfigFetcherOptions
|
dynamic.ConfigFetcherOptions
|
||||||
|
KubeClient kube_client.Interface
|
||||||
|
KubeEventRecorder kube_record.EventRecorder
|
||||||
|
PredicateChecker *simulator.PredicateChecker
|
||||||
|
ListerRegistry kube_util.ListerRegistry
|
||||||
|
PodListProcessor pods.PodListProcessor
|
||||||
}
|
}
|
||||||
|
|
||||||
// Autoscaler is the main component of CA which scales up/down node groups according to its configuration
|
// Autoscaler is the main component of CA which scales up/down node groups according to its configuration
|
||||||
|
|
@ -47,13 +54,22 @@ type Autoscaler interface {
|
||||||
ExitCleanUp()
|
ExitCleanUp()
|
||||||
}
|
}
|
||||||
|
|
||||||
// NewAutoscaler creates an autoscaler of an appropriate type according to the parameters
|
func initializeDefaultOptions(opts *AutoscalerOptions) error {
|
||||||
func NewAutoscaler(opts AutoscalerOptions, predicateChecker *simulator.PredicateChecker, kubeClient kube_client.Interface,
|
if opts.PodListProcessor == nil {
|
||||||
kubeEventRecorder kube_record.EventRecorder, listerRegistry kube_util.ListerRegistry) (Autoscaler, errors.AutoscalerError) {
|
opts.PodListProcessor = pods.NewDefaultPodListProcessor()
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
autoscalerBuilder := NewAutoscalerBuilder(opts.AutoscalingOptions, predicateChecker, kubeClient, kubeEventRecorder, listerRegistry)
|
// NewAutoscaler creates an autoscaler of an appropriate type according to the parameters
|
||||||
|
func NewAutoscaler(opts AutoscalerOptions) (Autoscaler, errors.AutoscalerError) {
|
||||||
|
err := initializeDefaultOptions(&opts)
|
||||||
|
if err != nil {
|
||||||
|
return nil, errors.ToAutoscalerError(errors.InternalError, err)
|
||||||
|
}
|
||||||
|
autoscalerBuilder := NewAutoscalerBuilder(opts.AutoscalingOptions, opts.PredicateChecker, opts.KubeClient, opts.KubeEventRecorder, opts.ListerRegistry, opts.PodListProcessor)
|
||||||
if opts.ConfigMapName != "" {
|
if opts.ConfigMapName != "" {
|
||||||
configFetcher := dynamic.NewConfigFetcher(opts.ConfigFetcherOptions, kubeClient, kubeEventRecorder)
|
configFetcher := dynamic.NewConfigFetcher(opts.ConfigFetcherOptions, opts.KubeClient, opts.KubeEventRecorder)
|
||||||
return NewDynamicAutoscaler(autoscalerBuilder, configFetcher)
|
return NewDynamicAutoscaler(autoscalerBuilder, configFetcher)
|
||||||
}
|
}
|
||||||
return autoscalerBuilder.Build()
|
return autoscalerBuilder.Build()
|
||||||
|
|
|
||||||
|
|
@ -18,9 +18,11 @@ package core
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"k8s.io/autoscaler/cluster-autoscaler/config/dynamic"
|
"k8s.io/autoscaler/cluster-autoscaler/config/dynamic"
|
||||||
|
"k8s.io/autoscaler/cluster-autoscaler/context"
|
||||||
"k8s.io/autoscaler/cluster-autoscaler/simulator"
|
"k8s.io/autoscaler/cluster-autoscaler/simulator"
|
||||||
"k8s.io/autoscaler/cluster-autoscaler/utils/errors"
|
"k8s.io/autoscaler/cluster-autoscaler/utils/errors"
|
||||||
kube_util "k8s.io/autoscaler/cluster-autoscaler/utils/kubernetes"
|
kube_util "k8s.io/autoscaler/cluster-autoscaler/utils/kubernetes"
|
||||||
|
"k8s.io/autoscaler/cluster-autoscaler/utils/pods"
|
||||||
kube_client "k8s.io/client-go/kubernetes"
|
kube_client "k8s.io/client-go/kubernetes"
|
||||||
kube_record "k8s.io/client-go/tools/record"
|
kube_record "k8s.io/client-go/tools/record"
|
||||||
)
|
)
|
||||||
|
|
@ -34,23 +36,25 @@ type AutoscalerBuilder interface {
|
||||||
// AutoscalerBuilderImpl builds new autoscalers from its state including initial `AutoscalingOptions` given at startup and
|
// AutoscalerBuilderImpl builds new autoscalers from its state including initial `AutoscalingOptions` given at startup and
|
||||||
// `dynamic.Config` read on demand from the configmap
|
// `dynamic.Config` read on demand from the configmap
|
||||||
type AutoscalerBuilderImpl struct {
|
type AutoscalerBuilderImpl struct {
|
||||||
autoscalingOptions AutoscalingOptions
|
autoscalingOptions context.AutoscalingOptions
|
||||||
dynamicConfig *dynamic.Config
|
dynamicConfig *dynamic.Config
|
||||||
kubeClient kube_client.Interface
|
kubeClient kube_client.Interface
|
||||||
kubeEventRecorder kube_record.EventRecorder
|
kubeEventRecorder kube_record.EventRecorder
|
||||||
predicateChecker *simulator.PredicateChecker
|
predicateChecker *simulator.PredicateChecker
|
||||||
listerRegistry kube_util.ListerRegistry
|
listerRegistry kube_util.ListerRegistry
|
||||||
|
podListProcessor pods.PodListProcessor
|
||||||
}
|
}
|
||||||
|
|
||||||
// NewAutoscalerBuilder builds an AutoscalerBuilder from required parameters
|
// NewAutoscalerBuilder builds an AutoscalerBuilder from required parameters
|
||||||
func NewAutoscalerBuilder(autoscalingOptions AutoscalingOptions, predicateChecker *simulator.PredicateChecker,
|
func NewAutoscalerBuilder(autoscalingOptions context.AutoscalingOptions, predicateChecker *simulator.PredicateChecker,
|
||||||
kubeClient kube_client.Interface, kubeEventRecorder kube_record.EventRecorder, listerRegistry kube_util.ListerRegistry) *AutoscalerBuilderImpl {
|
kubeClient kube_client.Interface, kubeEventRecorder kube_record.EventRecorder, listerRegistry kube_util.ListerRegistry, podListProcessor pods.PodListProcessor) *AutoscalerBuilderImpl {
|
||||||
return &AutoscalerBuilderImpl{
|
return &AutoscalerBuilderImpl{
|
||||||
autoscalingOptions: autoscalingOptions,
|
autoscalingOptions: autoscalingOptions,
|
||||||
kubeClient: kubeClient,
|
kubeClient: kubeClient,
|
||||||
kubeEventRecorder: kubeEventRecorder,
|
kubeEventRecorder: kubeEventRecorder,
|
||||||
predicateChecker: predicateChecker,
|
predicateChecker: predicateChecker,
|
||||||
listerRegistry: listerRegistry,
|
listerRegistry: listerRegistry,
|
||||||
|
podListProcessor: podListProcessor,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -68,5 +72,5 @@ func (b *AutoscalerBuilderImpl) Build() (Autoscaler, errors.AutoscalerError) {
|
||||||
c := *(b.dynamicConfig)
|
c := *(b.dynamicConfig)
|
||||||
options.NodeGroups = c.NodeGroupSpecStrings()
|
options.NodeGroups = c.NodeGroupSpecStrings()
|
||||||
}
|
}
|
||||||
return NewStaticAutoscaler(options, b.predicateChecker, b.kubeClient, b.kubeEventRecorder, b.listerRegistry)
|
return NewStaticAutoscaler(options, b.predicateChecker, b.kubeClient, b.kubeEventRecorder, b.listerRegistry, b.podListProcessor)
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -60,14 +60,18 @@ func TestNewAutoscalerStatic(t *testing.T) {
|
||||||
return true, nil, fmt.Errorf("Wrong node: %v", getAction.GetName())
|
return true, nil, fmt.Errorf("Wrong node: %v", getAction.GetName())
|
||||||
})
|
})
|
||||||
kubeEventRecorder := kube_util.CreateEventRecorder(fakeClient)
|
kubeEventRecorder := kube_util.CreateEventRecorder(fakeClient)
|
||||||
|
predicateChecker := simulator.NewTestPredicateChecker()
|
||||||
|
listerRegistry := kube_util.NewListerRegistry(nil, nil, nil, nil, nil, nil)
|
||||||
opts := AutoscalerOptions{
|
opts := AutoscalerOptions{
|
||||||
ConfigFetcherOptions: dynamic.ConfigFetcherOptions{
|
ConfigFetcherOptions: dynamic.ConfigFetcherOptions{
|
||||||
ConfigMapName: "",
|
ConfigMapName: "",
|
||||||
},
|
},
|
||||||
|
PredicateChecker: predicateChecker,
|
||||||
|
KubeClient: fakeClient,
|
||||||
|
KubeEventRecorder: kubeEventRecorder,
|
||||||
|
ListerRegistry: listerRegistry,
|
||||||
}
|
}
|
||||||
predicateChecker := simulator.NewTestPredicateChecker()
|
a, _ := NewAutoscaler(opts)
|
||||||
listerRegistry := kube_util.NewListerRegistry(nil, nil, nil, nil, nil, nil)
|
|
||||||
a, _ := NewAutoscaler(opts, predicateChecker, fakeClient, kubeEventRecorder, listerRegistry)
|
|
||||||
assert.IsType(t, &StaticAutoscaler{}, a)
|
assert.IsType(t, &StaticAutoscaler{}, a)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -97,13 +101,17 @@ func TestNewAutoscalerDynamic(t *testing.T) {
|
||||||
return true, nil, fmt.Errorf("Wrong node: %v", getAction.GetName())
|
return true, nil, fmt.Errorf("Wrong node: %v", getAction.GetName())
|
||||||
})
|
})
|
||||||
kubeEventRecorder := kube_util.CreateEventRecorder(fakeClient)
|
kubeEventRecorder := kube_util.CreateEventRecorder(fakeClient)
|
||||||
|
predicateChecker := simulator.NewTestPredicateChecker()
|
||||||
|
listerRegistry := kube_util.NewListerRegistry(nil, nil, nil, nil, nil, nil)
|
||||||
opts := AutoscalerOptions{
|
opts := AutoscalerOptions{
|
||||||
ConfigFetcherOptions: dynamic.ConfigFetcherOptions{
|
ConfigFetcherOptions: dynamic.ConfigFetcherOptions{
|
||||||
ConfigMapName: "testconfigmap",
|
ConfigMapName: "testconfigmap",
|
||||||
},
|
},
|
||||||
|
PredicateChecker: predicateChecker,
|
||||||
|
KubeClient: fakeClient,
|
||||||
|
KubeEventRecorder: kubeEventRecorder,
|
||||||
|
ListerRegistry: listerRegistry,
|
||||||
}
|
}
|
||||||
predicateChecker := simulator.NewTestPredicateChecker()
|
a, _ := NewAutoscaler(opts)
|
||||||
listerRegistry := kube_util.NewListerRegistry(nil, nil, nil, nil, nil, nil)
|
|
||||||
a, _ := NewAutoscaler(opts, predicateChecker, fakeClient, kubeEventRecorder, listerRegistry)
|
|
||||||
assert.IsType(t, &DynamicAutoscaler{}, a)
|
assert.IsType(t, &DynamicAutoscaler{}, a)
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -28,6 +28,7 @@ import (
|
||||||
"k8s.io/autoscaler/cluster-autoscaler/clusterstate"
|
"k8s.io/autoscaler/cluster-autoscaler/clusterstate"
|
||||||
"k8s.io/autoscaler/cluster-autoscaler/clusterstate/utils"
|
"k8s.io/autoscaler/cluster-autoscaler/clusterstate/utils"
|
||||||
"k8s.io/autoscaler/cluster-autoscaler/config"
|
"k8s.io/autoscaler/cluster-autoscaler/config"
|
||||||
|
"k8s.io/autoscaler/cluster-autoscaler/context"
|
||||||
"k8s.io/autoscaler/cluster-autoscaler/metrics"
|
"k8s.io/autoscaler/cluster-autoscaler/metrics"
|
||||||
"k8s.io/autoscaler/cluster-autoscaler/simulator"
|
"k8s.io/autoscaler/cluster-autoscaler/simulator"
|
||||||
"k8s.io/autoscaler/cluster-autoscaler/utils/deletetaint"
|
"k8s.io/autoscaler/cluster-autoscaler/utils/deletetaint"
|
||||||
|
|
@ -101,7 +102,7 @@ func (n *NodeDeleteStatus) SetDeleteInProgress(status bool) {
|
||||||
|
|
||||||
// ScaleDown is responsible for maintaining the state needed to perform unneeded node removals.
|
// ScaleDown is responsible for maintaining the state needed to perform unneeded node removals.
|
||||||
type ScaleDown struct {
|
type ScaleDown struct {
|
||||||
context *AutoscalingContext
|
context *context.AutoscalingContext
|
||||||
unneededNodes map[string]time.Time
|
unneededNodes map[string]time.Time
|
||||||
unneededNodesList []*apiv1.Node
|
unneededNodesList []*apiv1.Node
|
||||||
unremovableNodes map[string]time.Time
|
unremovableNodes map[string]time.Time
|
||||||
|
|
@ -112,7 +113,7 @@ type ScaleDown struct {
|
||||||
}
|
}
|
||||||
|
|
||||||
// NewScaleDown builds new ScaleDown object.
|
// NewScaleDown builds new ScaleDown object.
|
||||||
func NewScaleDown(context *AutoscalingContext) *ScaleDown {
|
func NewScaleDown(context *context.AutoscalingContext) *ScaleDown {
|
||||||
return &ScaleDown{
|
return &ScaleDown{
|
||||||
context: context,
|
context: context,
|
||||||
unneededNodes: make(map[string]time.Time),
|
unneededNodes: make(map[string]time.Time),
|
||||||
|
|
@ -648,7 +649,7 @@ func (sd *ScaleDown) waitForEmptyNodesDeleted(emptyNodes []*apiv1.Node, confirma
|
||||||
return finalError
|
return finalError
|
||||||
}
|
}
|
||||||
|
|
||||||
func deleteNode(context *AutoscalingContext, node *apiv1.Node, pods []*apiv1.Pod) errors.AutoscalerError {
|
func deleteNode(context *context.AutoscalingContext, node *apiv1.Node, pods []*apiv1.Pod) errors.AutoscalerError {
|
||||||
deleteSuccessful := false
|
deleteSuccessful := false
|
||||||
drainSuccessful := false
|
drainSuccessful := false
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -33,6 +33,7 @@ import (
|
||||||
"k8s.io/autoscaler/cluster-autoscaler/clusterstate"
|
"k8s.io/autoscaler/cluster-autoscaler/clusterstate"
|
||||||
"k8s.io/autoscaler/cluster-autoscaler/clusterstate/utils"
|
"k8s.io/autoscaler/cluster-autoscaler/clusterstate/utils"
|
||||||
"k8s.io/autoscaler/cluster-autoscaler/config"
|
"k8s.io/autoscaler/cluster-autoscaler/config"
|
||||||
|
"k8s.io/autoscaler/cluster-autoscaler/context"
|
||||||
"k8s.io/autoscaler/cluster-autoscaler/simulator"
|
"k8s.io/autoscaler/cluster-autoscaler/simulator"
|
||||||
kube_util "k8s.io/autoscaler/cluster-autoscaler/utils/kubernetes"
|
kube_util "k8s.io/autoscaler/cluster-autoscaler/utils/kubernetes"
|
||||||
scheduler_util "k8s.io/autoscaler/cluster-autoscaler/utils/scheduler"
|
scheduler_util "k8s.io/autoscaler/cluster-autoscaler/utils/scheduler"
|
||||||
|
|
@ -123,8 +124,8 @@ func TestFindUnneededNodes(t *testing.T) {
|
||||||
provider.AddNode("ng1", n8)
|
provider.AddNode("ng1", n8)
|
||||||
provider.AddNode("ng1", n9)
|
provider.AddNode("ng1", n9)
|
||||||
|
|
||||||
context := AutoscalingContext{
|
context := context.AutoscalingContext{
|
||||||
AutoscalingOptions: AutoscalingOptions{
|
AutoscalingOptions: context.AutoscalingOptions{
|
||||||
ScaleDownUtilizationThreshold: 0.35,
|
ScaleDownUtilizationThreshold: 0.35,
|
||||||
ExpendablePodsPriorityCutoff: 10,
|
ExpendablePodsPriorityCutoff: 10,
|
||||||
},
|
},
|
||||||
|
|
@ -242,8 +243,8 @@ func TestPodsWithPrioritiesFindUnneededNodes(t *testing.T) {
|
||||||
provider.AddNode("ng1", n3)
|
provider.AddNode("ng1", n3)
|
||||||
provider.AddNode("ng1", n4)
|
provider.AddNode("ng1", n4)
|
||||||
|
|
||||||
context := AutoscalingContext{
|
context := context.AutoscalingContext{
|
||||||
AutoscalingOptions: AutoscalingOptions{
|
AutoscalingOptions: context.AutoscalingOptions{
|
||||||
ScaleDownUtilizationThreshold: 0.35,
|
ScaleDownUtilizationThreshold: 0.35,
|
||||||
ExpendablePodsPriorityCutoff: 10,
|
ExpendablePodsPriorityCutoff: 10,
|
||||||
},
|
},
|
||||||
|
|
@ -298,8 +299,8 @@ func TestFindUnneededMaxCandidates(t *testing.T) {
|
||||||
|
|
||||||
numCandidates := 30
|
numCandidates := 30
|
||||||
|
|
||||||
context := AutoscalingContext{
|
context := context.AutoscalingContext{
|
||||||
AutoscalingOptions: AutoscalingOptions{
|
AutoscalingOptions: context.AutoscalingOptions{
|
||||||
ScaleDownUtilizationThreshold: 0.35,
|
ScaleDownUtilizationThreshold: 0.35,
|
||||||
ScaleDownNonEmptyCandidatesCount: numCandidates,
|
ScaleDownNonEmptyCandidatesCount: numCandidates,
|
||||||
ScaleDownCandidatesPoolRatio: 1,
|
ScaleDownCandidatesPoolRatio: 1,
|
||||||
|
|
@ -371,8 +372,8 @@ func TestFindUnneededEmptyNodes(t *testing.T) {
|
||||||
|
|
||||||
numCandidates := 30
|
numCandidates := 30
|
||||||
|
|
||||||
context := AutoscalingContext{
|
context := context.AutoscalingContext{
|
||||||
AutoscalingOptions: AutoscalingOptions{
|
AutoscalingOptions: context.AutoscalingOptions{
|
||||||
ScaleDownUtilizationThreshold: 0.35,
|
ScaleDownUtilizationThreshold: 0.35,
|
||||||
ScaleDownNonEmptyCandidatesCount: numCandidates,
|
ScaleDownNonEmptyCandidatesCount: numCandidates,
|
||||||
ScaleDownCandidatesPoolRatio: 1.0,
|
ScaleDownCandidatesPoolRatio: 1.0,
|
||||||
|
|
@ -422,8 +423,8 @@ func TestFindUnneededNodePool(t *testing.T) {
|
||||||
|
|
||||||
numCandidates := 30
|
numCandidates := 30
|
||||||
|
|
||||||
context := AutoscalingContext{
|
context := context.AutoscalingContext{
|
||||||
AutoscalingOptions: AutoscalingOptions{
|
AutoscalingOptions: context.AutoscalingOptions{
|
||||||
ScaleDownUtilizationThreshold: 0.35,
|
ScaleDownUtilizationThreshold: 0.35,
|
||||||
ScaleDownNonEmptyCandidatesCount: numCandidates,
|
ScaleDownNonEmptyCandidatesCount: numCandidates,
|
||||||
ScaleDownCandidatesPoolRatio: 0.1,
|
ScaleDownCandidatesPoolRatio: 0.1,
|
||||||
|
|
@ -569,8 +570,8 @@ func TestDeleteNode(t *testing.T) {
|
||||||
fakeLogRecorder, _ := utils.NewStatusMapRecorder(fakeClient, "kube-system", fakeRecorder, false)
|
fakeLogRecorder, _ := utils.NewStatusMapRecorder(fakeClient, "kube-system", fakeRecorder, false)
|
||||||
|
|
||||||
// build context
|
// build context
|
||||||
context := &AutoscalingContext{
|
context := &context.AutoscalingContext{
|
||||||
AutoscalingOptions: AutoscalingOptions{},
|
AutoscalingOptions: context.AutoscalingOptions{},
|
||||||
ClientSet: fakeClient,
|
ClientSet: fakeClient,
|
||||||
Recorder: fakeRecorder,
|
Recorder: fakeRecorder,
|
||||||
LogRecorder: fakeLogRecorder,
|
LogRecorder: fakeLogRecorder,
|
||||||
|
|
@ -755,8 +756,8 @@ func TestScaleDown(t *testing.T) {
|
||||||
|
|
||||||
fakeRecorder := kube_util.CreateEventRecorder(fakeClient)
|
fakeRecorder := kube_util.CreateEventRecorder(fakeClient)
|
||||||
fakeLogRecorder, _ := utils.NewStatusMapRecorder(fakeClient, "kube-system", fakeRecorder, false)
|
fakeLogRecorder, _ := utils.NewStatusMapRecorder(fakeClient, "kube-system", fakeRecorder, false)
|
||||||
context := &AutoscalingContext{
|
context := &context.AutoscalingContext{
|
||||||
AutoscalingOptions: AutoscalingOptions{
|
AutoscalingOptions: context.AutoscalingOptions{
|
||||||
ScaleDownUtilizationThreshold: 0.5,
|
ScaleDownUtilizationThreshold: 0.5,
|
||||||
ScaleDownUnneededTime: time.Minute,
|
ScaleDownUnneededTime: time.Minute,
|
||||||
MaxGracefulTerminationSec: 60,
|
MaxGracefulTerminationSec: 60,
|
||||||
|
|
@ -811,7 +812,7 @@ func assertSubset(t *testing.T, a []string, b []string) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
var defaultScaleDownOptions = AutoscalingOptions{
|
var defaultScaleDownOptions = context.AutoscalingOptions{
|
||||||
ScaleDownUtilizationThreshold: 0.5,
|
ScaleDownUtilizationThreshold: 0.5,
|
||||||
ScaleDownUnneededTime: time.Minute,
|
ScaleDownUnneededTime: time.Minute,
|
||||||
MaxGracefulTerminationSec: 60,
|
MaxGracefulTerminationSec: 60,
|
||||||
|
|
@ -947,7 +948,7 @@ func simpleScaleDownEmpty(t *testing.T, config *scaleTestConfig) {
|
||||||
|
|
||||||
fakeRecorder := kube_util.CreateEventRecorder(fakeClient)
|
fakeRecorder := kube_util.CreateEventRecorder(fakeClient)
|
||||||
fakeLogRecorder, _ := utils.NewStatusMapRecorder(fakeClient, "kube-system", fakeRecorder, false)
|
fakeLogRecorder, _ := utils.NewStatusMapRecorder(fakeClient, "kube-system", fakeRecorder, false)
|
||||||
context := &AutoscalingContext{
|
context := &context.AutoscalingContext{
|
||||||
AutoscalingOptions: config.options,
|
AutoscalingOptions: config.options,
|
||||||
PredicateChecker: simulator.NewTestPredicateChecker(),
|
PredicateChecker: simulator.NewTestPredicateChecker(),
|
||||||
CloudProvider: provider,
|
CloudProvider: provider,
|
||||||
|
|
@ -1024,8 +1025,8 @@ func TestNoScaleDownUnready(t *testing.T) {
|
||||||
|
|
||||||
fakeRecorder := kube_util.CreateEventRecorder(fakeClient)
|
fakeRecorder := kube_util.CreateEventRecorder(fakeClient)
|
||||||
fakeLogRecorder, _ := utils.NewStatusMapRecorder(fakeClient, "kube-system", fakeRecorder, false)
|
fakeLogRecorder, _ := utils.NewStatusMapRecorder(fakeClient, "kube-system", fakeRecorder, false)
|
||||||
context := &AutoscalingContext{
|
context := &context.AutoscalingContext{
|
||||||
AutoscalingOptions: AutoscalingOptions{
|
AutoscalingOptions: context.AutoscalingOptions{
|
||||||
ScaleDownUtilizationThreshold: 0.5,
|
ScaleDownUtilizationThreshold: 0.5,
|
||||||
ScaleDownUnneededTime: time.Minute,
|
ScaleDownUnneededTime: time.Minute,
|
||||||
ScaleDownUnreadyTime: time.Hour,
|
ScaleDownUnreadyTime: time.Hour,
|
||||||
|
|
@ -1132,8 +1133,8 @@ func TestScaleDownNoMove(t *testing.T) {
|
||||||
|
|
||||||
fakeRecorder := kube_util.CreateEventRecorder(fakeClient)
|
fakeRecorder := kube_util.CreateEventRecorder(fakeClient)
|
||||||
fakeLogRecorder, _ := utils.NewStatusMapRecorder(fakeClient, "kube-system", fakeRecorder, false)
|
fakeLogRecorder, _ := utils.NewStatusMapRecorder(fakeClient, "kube-system", fakeRecorder, false)
|
||||||
context := &AutoscalingContext{
|
context := &context.AutoscalingContext{
|
||||||
AutoscalingOptions: AutoscalingOptions{
|
AutoscalingOptions: context.AutoscalingOptions{
|
||||||
ScaleDownUtilizationThreshold: 0.5,
|
ScaleDownUtilizationThreshold: 0.5,
|
||||||
ScaleDownUnneededTime: time.Minute,
|
ScaleDownUnneededTime: time.Minute,
|
||||||
ScaleDownUnreadyTime: time.Hour,
|
ScaleDownUnreadyTime: time.Hour,
|
||||||
|
|
|
||||||
|
|
@ -25,6 +25,7 @@ import (
|
||||||
"k8s.io/apimachinery/pkg/api/resource"
|
"k8s.io/apimachinery/pkg/api/resource"
|
||||||
"k8s.io/autoscaler/cluster-autoscaler/cloudprovider"
|
"k8s.io/autoscaler/cluster-autoscaler/cloudprovider"
|
||||||
"k8s.io/autoscaler/cluster-autoscaler/clusterstate"
|
"k8s.io/autoscaler/cluster-autoscaler/clusterstate"
|
||||||
|
"k8s.io/autoscaler/cluster-autoscaler/context"
|
||||||
"k8s.io/autoscaler/cluster-autoscaler/estimator"
|
"k8s.io/autoscaler/cluster-autoscaler/estimator"
|
||||||
"k8s.io/autoscaler/cluster-autoscaler/expander"
|
"k8s.io/autoscaler/cluster-autoscaler/expander"
|
||||||
"k8s.io/autoscaler/cluster-autoscaler/metrics"
|
"k8s.io/autoscaler/cluster-autoscaler/metrics"
|
||||||
|
|
@ -42,7 +43,7 @@ import (
|
||||||
// ScaleUp tries to scale the cluster up. Return true if it found a way to increase the size,
|
// ScaleUp tries to scale the cluster up. Return true if it found a way to increase the size,
|
||||||
// false if it didn't and error if an error occurred. Assumes that all nodes in the cluster are
|
// false if it didn't and error if an error occurred. Assumes that all nodes in the cluster are
|
||||||
// ready and in sync with instance groups.
|
// ready and in sync with instance groups.
|
||||||
func ScaleUp(context *AutoscalingContext, unschedulablePods []*apiv1.Pod, nodes []*apiv1.Node,
|
func ScaleUp(context *context.AutoscalingContext, unschedulablePods []*apiv1.Pod, nodes []*apiv1.Node,
|
||||||
daemonSets []*extensionsv1.DaemonSet) (bool, errors.AutoscalerError) {
|
daemonSets []*extensionsv1.DaemonSet) (bool, errors.AutoscalerError) {
|
||||||
// From now on we only care about unschedulable pods that were marked after the newest
|
// From now on we only care about unschedulable pods that were marked after the newest
|
||||||
// node became available for the scheduler.
|
// node became available for the scheduler.
|
||||||
|
|
@ -340,7 +341,7 @@ groupsloop:
|
||||||
return result
|
return result
|
||||||
}
|
}
|
||||||
|
|
||||||
func executeScaleUp(context *AutoscalingContext, info nodegroupset.ScaleUpInfo) errors.AutoscalerError {
|
func executeScaleUp(context *context.AutoscalingContext, info nodegroupset.ScaleUpInfo) errors.AutoscalerError {
|
||||||
glog.V(0).Infof("Scale-up: setting group %s size to %d", info.Group.Id(), info.NewSize)
|
glog.V(0).Infof("Scale-up: setting group %s size to %d", info.Group.Id(), info.NewSize)
|
||||||
increase := info.NewSize - info.CurrentSize
|
increase := info.NewSize - info.CurrentSize
|
||||||
if err := info.Group.IncreaseSize(increase); err != nil {
|
if err := info.Group.IncreaseSize(increase); err != nil {
|
||||||
|
|
@ -362,7 +363,7 @@ func executeScaleUp(context *AutoscalingContext, info nodegroupset.ScaleUpInfo)
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func addAutoprovisionedCandidates(context *AutoscalingContext, nodeGroups []cloudprovider.NodeGroup,
|
func addAutoprovisionedCandidates(context *context.AutoscalingContext, nodeGroups []cloudprovider.NodeGroup,
|
||||||
nodeInfos map[string]*schedulercache.NodeInfo, unschedulablePods []*apiv1.Pod) ([]cloudprovider.NodeGroup,
|
nodeInfos map[string]*schedulercache.NodeInfo, unschedulablePods []*apiv1.Pod) ([]cloudprovider.NodeGroup,
|
||||||
map[string]*schedulercache.NodeInfo) {
|
map[string]*schedulercache.NodeInfo) {
|
||||||
|
|
||||||
|
|
@ -400,7 +401,7 @@ func addAutoprovisionedCandidates(context *AutoscalingContext, nodeGroups []clou
|
||||||
return nodeGroups, nodeInfos
|
return nodeGroups, nodeInfos
|
||||||
}
|
}
|
||||||
|
|
||||||
func addAllMachineTypesForConfig(context *AutoscalingContext, systemLabels map[string]string, extraResources map[string]resource.Quantity,
|
func addAllMachineTypesForConfig(context *context.AutoscalingContext, systemLabels map[string]string, extraResources map[string]resource.Quantity,
|
||||||
nodeInfos map[string]*schedulercache.NodeInfo, unschedulablePods []*apiv1.Pod) []cloudprovider.NodeGroup {
|
nodeInfos map[string]*schedulercache.NodeInfo, unschedulablePods []*apiv1.Pod) []cloudprovider.NodeGroup {
|
||||||
|
|
||||||
nodeGroups := make([]cloudprovider.NodeGroup, 0)
|
nodeGroups := make([]cloudprovider.NodeGroup, 0)
|
||||||
|
|
|
||||||
|
|
@ -28,6 +28,7 @@ import (
|
||||||
"k8s.io/autoscaler/cluster-autoscaler/clusterstate"
|
"k8s.io/autoscaler/cluster-autoscaler/clusterstate"
|
||||||
"k8s.io/autoscaler/cluster-autoscaler/clusterstate/utils"
|
"k8s.io/autoscaler/cluster-autoscaler/clusterstate/utils"
|
||||||
"k8s.io/autoscaler/cluster-autoscaler/config"
|
"k8s.io/autoscaler/cluster-autoscaler/config"
|
||||||
|
"k8s.io/autoscaler/cluster-autoscaler/context"
|
||||||
"k8s.io/autoscaler/cluster-autoscaler/estimator"
|
"k8s.io/autoscaler/cluster-autoscaler/estimator"
|
||||||
"k8s.io/autoscaler/cluster-autoscaler/expander/random"
|
"k8s.io/autoscaler/cluster-autoscaler/expander/random"
|
||||||
"k8s.io/autoscaler/cluster-autoscaler/simulator"
|
"k8s.io/autoscaler/cluster-autoscaler/simulator"
|
||||||
|
|
@ -67,10 +68,10 @@ type scaleTestConfig struct {
|
||||||
expectedScaleUp string
|
expectedScaleUp string
|
||||||
expectedScaleUpGroup string
|
expectedScaleUpGroup string
|
||||||
expectedScaleDowns []string
|
expectedScaleDowns []string
|
||||||
options AutoscalingOptions
|
options context.AutoscalingOptions
|
||||||
}
|
}
|
||||||
|
|
||||||
var defaultOptions = AutoscalingOptions{
|
var defaultOptions = context.AutoscalingOptions{
|
||||||
EstimatorName: estimator.BinpackingEstimatorName,
|
EstimatorName: estimator.BinpackingEstimatorName,
|
||||||
MaxCoresTotal: config.DefaultMaxClusterCores,
|
MaxCoresTotal: config.DefaultMaxClusterCores,
|
||||||
MaxMemoryTotal: config.DefaultMaxClusterMemory,
|
MaxMemoryTotal: config.DefaultMaxClusterMemory,
|
||||||
|
|
@ -231,7 +232,7 @@ func simpleScaleUpTest(t *testing.T, config *scaleTestConfig) {
|
||||||
|
|
||||||
clusterState.UpdateNodes(nodes, time.Now())
|
clusterState.UpdateNodes(nodes, time.Now())
|
||||||
|
|
||||||
context := &AutoscalingContext{
|
context := &context.AutoscalingContext{
|
||||||
AutoscalingOptions: config.options,
|
AutoscalingOptions: config.options,
|
||||||
PredicateChecker: simulator.NewTestPredicateChecker(),
|
PredicateChecker: simulator.NewTestPredicateChecker(),
|
||||||
CloudProvider: provider,
|
CloudProvider: provider,
|
||||||
|
|
@ -313,8 +314,8 @@ func TestScaleUpNodeComingNoScale(t *testing.T) {
|
||||||
})
|
})
|
||||||
clusterState.UpdateNodes([]*apiv1.Node{n1, n2}, time.Now())
|
clusterState.UpdateNodes([]*apiv1.Node{n1, n2}, time.Now())
|
||||||
|
|
||||||
context := &AutoscalingContext{
|
context := &context.AutoscalingContext{
|
||||||
AutoscalingOptions: AutoscalingOptions{
|
AutoscalingOptions: context.AutoscalingOptions{
|
||||||
EstimatorName: estimator.BinpackingEstimatorName,
|
EstimatorName: estimator.BinpackingEstimatorName,
|
||||||
MaxCoresTotal: config.DefaultMaxClusterCores,
|
MaxCoresTotal: config.DefaultMaxClusterCores,
|
||||||
MaxMemoryTotal: config.DefaultMaxClusterMemory,
|
MaxMemoryTotal: config.DefaultMaxClusterMemory,
|
||||||
|
|
@ -380,7 +381,7 @@ func TestScaleUpNodeComingHasScale(t *testing.T) {
|
||||||
})
|
})
|
||||||
clusterState.UpdateNodes([]*apiv1.Node{n1, n2}, time.Now())
|
clusterState.UpdateNodes([]*apiv1.Node{n1, n2}, time.Now())
|
||||||
|
|
||||||
context := &AutoscalingContext{
|
context := &context.AutoscalingContext{
|
||||||
AutoscalingOptions: defaultOptions,
|
AutoscalingOptions: defaultOptions,
|
||||||
PredicateChecker: simulator.NewTestPredicateChecker(),
|
PredicateChecker: simulator.NewTestPredicateChecker(),
|
||||||
CloudProvider: provider,
|
CloudProvider: provider,
|
||||||
|
|
@ -436,8 +437,8 @@ func TestScaleUpUnhealthy(t *testing.T) {
|
||||||
fakeLogRecorder, _ := utils.NewStatusMapRecorder(fakeClient, "kube-system", fakeRecorder, false)
|
fakeLogRecorder, _ := utils.NewStatusMapRecorder(fakeClient, "kube-system", fakeRecorder, false)
|
||||||
clusterState := clusterstate.NewClusterStateRegistry(provider, clusterstate.ClusterStateRegistryConfig{}, fakeLogRecorder)
|
clusterState := clusterstate.NewClusterStateRegistry(provider, clusterstate.ClusterStateRegistryConfig{}, fakeLogRecorder)
|
||||||
clusterState.UpdateNodes([]*apiv1.Node{n1, n2}, time.Now())
|
clusterState.UpdateNodes([]*apiv1.Node{n1, n2}, time.Now())
|
||||||
context := &AutoscalingContext{
|
context := &context.AutoscalingContext{
|
||||||
AutoscalingOptions: AutoscalingOptions{
|
AutoscalingOptions: context.AutoscalingOptions{
|
||||||
EstimatorName: estimator.BinpackingEstimatorName,
|
EstimatorName: estimator.BinpackingEstimatorName,
|
||||||
MaxCoresTotal: config.DefaultMaxClusterCores,
|
MaxCoresTotal: config.DefaultMaxClusterCores,
|
||||||
MaxMemoryTotal: config.DefaultMaxClusterMemory,
|
MaxMemoryTotal: config.DefaultMaxClusterMemory,
|
||||||
|
|
@ -487,8 +488,8 @@ func TestScaleUpNoHelp(t *testing.T) {
|
||||||
fakeLogRecorder, _ := utils.NewStatusMapRecorder(fakeClient, "kube-system", kube_record.NewFakeRecorder(5), false)
|
fakeLogRecorder, _ := utils.NewStatusMapRecorder(fakeClient, "kube-system", kube_record.NewFakeRecorder(5), false)
|
||||||
clusterState := clusterstate.NewClusterStateRegistry(provider, clusterstate.ClusterStateRegistryConfig{}, fakeLogRecorder)
|
clusterState := clusterstate.NewClusterStateRegistry(provider, clusterstate.ClusterStateRegistryConfig{}, fakeLogRecorder)
|
||||||
clusterState.UpdateNodes([]*apiv1.Node{n1}, time.Now())
|
clusterState.UpdateNodes([]*apiv1.Node{n1}, time.Now())
|
||||||
context := &AutoscalingContext{
|
context := &context.AutoscalingContext{
|
||||||
AutoscalingOptions: AutoscalingOptions{
|
AutoscalingOptions: context.AutoscalingOptions{
|
||||||
EstimatorName: estimator.BinpackingEstimatorName,
|
EstimatorName: estimator.BinpackingEstimatorName,
|
||||||
MaxCoresTotal: config.DefaultMaxClusterCores,
|
MaxCoresTotal: config.DefaultMaxClusterCores,
|
||||||
MaxMemoryTotal: config.DefaultMaxClusterMemory,
|
MaxMemoryTotal: config.DefaultMaxClusterMemory,
|
||||||
|
|
@ -567,8 +568,8 @@ func TestScaleUpBalanceGroups(t *testing.T) {
|
||||||
fakeLogRecorder, _ := utils.NewStatusMapRecorder(fakeClient, "kube-system", kube_record.NewFakeRecorder(5), false)
|
fakeLogRecorder, _ := utils.NewStatusMapRecorder(fakeClient, "kube-system", kube_record.NewFakeRecorder(5), false)
|
||||||
clusterState := clusterstate.NewClusterStateRegistry(provider, clusterstate.ClusterStateRegistryConfig{}, fakeLogRecorder)
|
clusterState := clusterstate.NewClusterStateRegistry(provider, clusterstate.ClusterStateRegistryConfig{}, fakeLogRecorder)
|
||||||
clusterState.UpdateNodes(nodes, time.Now())
|
clusterState.UpdateNodes(nodes, time.Now())
|
||||||
context := &AutoscalingContext{
|
context := &context.AutoscalingContext{
|
||||||
AutoscalingOptions: AutoscalingOptions{
|
AutoscalingOptions: context.AutoscalingOptions{
|
||||||
EstimatorName: estimator.BinpackingEstimatorName,
|
EstimatorName: estimator.BinpackingEstimatorName,
|
||||||
BalanceSimilarNodeGroups: true,
|
BalanceSimilarNodeGroups: true,
|
||||||
MaxCoresTotal: config.DefaultMaxClusterCores,
|
MaxCoresTotal: config.DefaultMaxClusterCores,
|
||||||
|
|
@ -630,8 +631,8 @@ func TestScaleUpAutoprovisionedNodeGroup(t *testing.T) {
|
||||||
fakeLogRecorder, _ := utils.NewStatusMapRecorder(fakeClient, "kube-system", fakeRecorder, false)
|
fakeLogRecorder, _ := utils.NewStatusMapRecorder(fakeClient, "kube-system", fakeRecorder, false)
|
||||||
clusterState := clusterstate.NewClusterStateRegistry(provider, clusterstate.ClusterStateRegistryConfig{}, fakeLogRecorder)
|
clusterState := clusterstate.NewClusterStateRegistry(provider, clusterstate.ClusterStateRegistryConfig{}, fakeLogRecorder)
|
||||||
|
|
||||||
context := &AutoscalingContext{
|
context := &context.AutoscalingContext{
|
||||||
AutoscalingOptions: AutoscalingOptions{
|
AutoscalingOptions: context.AutoscalingOptions{
|
||||||
EstimatorName: estimator.BinpackingEstimatorName,
|
EstimatorName: estimator.BinpackingEstimatorName,
|
||||||
MaxCoresTotal: 5000 * 64,
|
MaxCoresTotal: 5000 * 64,
|
||||||
MaxMemoryTotal: 5000 * 64 * 20,
|
MaxMemoryTotal: 5000 * 64 * 20,
|
||||||
|
|
@ -669,8 +670,8 @@ func TestAddAutoprovisionedCandidatesOK(t *testing.T) {
|
||||||
[]string{"T1"}, map[string]*schedulercache.NodeInfo{"T1": ti1})
|
[]string{"T1"}, map[string]*schedulercache.NodeInfo{"T1": ti1})
|
||||||
provider.AddNodeGroup("ng1", 1, 5, 3)
|
provider.AddNodeGroup("ng1", 1, 5, 3)
|
||||||
|
|
||||||
context := &AutoscalingContext{
|
context := &context.AutoscalingContext{
|
||||||
AutoscalingOptions: AutoscalingOptions{
|
AutoscalingOptions: context.AutoscalingOptions{
|
||||||
MaxAutoprovisionedNodeGroupCount: 1,
|
MaxAutoprovisionedNodeGroupCount: 1,
|
||||||
},
|
},
|
||||||
CloudProvider: provider,
|
CloudProvider: provider,
|
||||||
|
|
@ -702,8 +703,8 @@ func TestAddAutoprovisionedCandidatesToMany(t *testing.T) {
|
||||||
map[string]*schedulercache.NodeInfo{"T1": ti1, "X1": xi1})
|
map[string]*schedulercache.NodeInfo{"T1": ti1, "X1": xi1})
|
||||||
provider.AddAutoprovisionedNodeGroup("autoprovisioned-X1", 0, 1000, 0, "X1")
|
provider.AddAutoprovisionedNodeGroup("autoprovisioned-X1", 0, 1000, 0, "X1")
|
||||||
|
|
||||||
context := &AutoscalingContext{
|
context := &context.AutoscalingContext{
|
||||||
AutoscalingOptions: AutoscalingOptions{
|
AutoscalingOptions: context.AutoscalingOptions{
|
||||||
MaxAutoprovisionedNodeGroupCount: 1,
|
MaxAutoprovisionedNodeGroupCount: 1,
|
||||||
},
|
},
|
||||||
CloudProvider: provider,
|
CloudProvider: provider,
|
||||||
|
|
|
||||||
|
|
@ -21,11 +21,13 @@ import (
|
||||||
|
|
||||||
"k8s.io/autoscaler/cluster-autoscaler/cloudprovider"
|
"k8s.io/autoscaler/cluster-autoscaler/cloudprovider"
|
||||||
"k8s.io/autoscaler/cluster-autoscaler/clusterstate/utils"
|
"k8s.io/autoscaler/cluster-autoscaler/clusterstate/utils"
|
||||||
|
"k8s.io/autoscaler/cluster-autoscaler/context"
|
||||||
"k8s.io/autoscaler/cluster-autoscaler/metrics"
|
"k8s.io/autoscaler/cluster-autoscaler/metrics"
|
||||||
"k8s.io/autoscaler/cluster-autoscaler/simulator"
|
"k8s.io/autoscaler/cluster-autoscaler/simulator"
|
||||||
"k8s.io/autoscaler/cluster-autoscaler/utils/errors"
|
"k8s.io/autoscaler/cluster-autoscaler/utils/errors"
|
||||||
"k8s.io/autoscaler/cluster-autoscaler/utils/gpu"
|
"k8s.io/autoscaler/cluster-autoscaler/utils/gpu"
|
||||||
kube_util "k8s.io/autoscaler/cluster-autoscaler/utils/kubernetes"
|
kube_util "k8s.io/autoscaler/cluster-autoscaler/utils/kubernetes"
|
||||||
|
"k8s.io/autoscaler/cluster-autoscaler/utils/pods"
|
||||||
"k8s.io/autoscaler/cluster-autoscaler/utils/tpu"
|
"k8s.io/autoscaler/cluster-autoscaler/utils/tpu"
|
||||||
|
|
||||||
apiv1 "k8s.io/api/core/v1"
|
apiv1 "k8s.io/api/core/v1"
|
||||||
|
|
@ -49,18 +51,20 @@ const (
|
||||||
// StaticAutoscaler is an autoscaler which has all the core functionality of a CA but without the reconfiguration feature
|
// StaticAutoscaler is an autoscaler which has all the core functionality of a CA but without the reconfiguration feature
|
||||||
type StaticAutoscaler struct {
|
type StaticAutoscaler struct {
|
||||||
// AutoscalingContext consists of validated settings and options for this autoscaler
|
// AutoscalingContext consists of validated settings and options for this autoscaler
|
||||||
*AutoscalingContext
|
*context.AutoscalingContext
|
||||||
kube_util.ListerRegistry
|
kube_util.ListerRegistry
|
||||||
startTime time.Time
|
startTime time.Time
|
||||||
lastScaleUpTime time.Time
|
lastScaleUpTime time.Time
|
||||||
lastScaleDownDeleteTime time.Time
|
lastScaleDownDeleteTime time.Time
|
||||||
lastScaleDownFailTime time.Time
|
lastScaleDownFailTime time.Time
|
||||||
scaleDown *ScaleDown
|
scaleDown *ScaleDown
|
||||||
|
podListProcessor pods.PodListProcessor
|
||||||
}
|
}
|
||||||
|
|
||||||
// NewStaticAutoscaler creates an instance of Autoscaler filled with provided parameters
|
// NewStaticAutoscaler creates an instance of Autoscaler filled with provided parameters
|
||||||
func NewStaticAutoscaler(opts AutoscalingOptions, predicateChecker *simulator.PredicateChecker,
|
func NewStaticAutoscaler(opts context.AutoscalingOptions, predicateChecker *simulator.PredicateChecker,
|
||||||
kubeClient kube_client.Interface, kubeEventRecorder kube_record.EventRecorder, listerRegistry kube_util.ListerRegistry) (*StaticAutoscaler, errors.AutoscalerError) {
|
kubeClient kube_client.Interface, kubeEventRecorder kube_record.EventRecorder, listerRegistry kube_util.ListerRegistry,
|
||||||
|
podListProcessor pods.PodListProcessor) (*StaticAutoscaler, errors.AutoscalerError) {
|
||||||
logRecorder, err := utils.NewStatusMapRecorder(kubeClient, opts.ConfigNamespace, kubeEventRecorder, opts.WriteStatusConfigMap)
|
logRecorder, err := utils.NewStatusMapRecorder(kubeClient, opts.ConfigNamespace, kubeEventRecorder, opts.WriteStatusConfigMap)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
glog.Error("Failed to initialize status configmap, unable to write status events")
|
glog.Error("Failed to initialize status configmap, unable to write status events")
|
||||||
|
|
@ -68,7 +72,7 @@ func NewStaticAutoscaler(opts AutoscalingOptions, predicateChecker *simulator.Pr
|
||||||
// TODO(maciekpytel): recover from this after successful status configmap update?
|
// TODO(maciekpytel): recover from this after successful status configmap update?
|
||||||
logRecorder, _ = utils.NewStatusMapRecorder(kubeClient, opts.ConfigNamespace, kubeEventRecorder, false)
|
logRecorder, _ = utils.NewStatusMapRecorder(kubeClient, opts.ConfigNamespace, kubeEventRecorder, false)
|
||||||
}
|
}
|
||||||
autoscalingContext, errctx := NewAutoscalingContext(opts, predicateChecker, kubeClient, kubeEventRecorder, logRecorder, listerRegistry)
|
autoscalingContext, errctx := context.NewAutoscalingContext(opts, predicateChecker, kubeClient, kubeEventRecorder, logRecorder, listerRegistry)
|
||||||
if errctx != nil {
|
if errctx != nil {
|
||||||
return nil, errctx
|
return nil, errctx
|
||||||
}
|
}
|
||||||
|
|
@ -83,6 +87,7 @@ func NewStaticAutoscaler(opts AutoscalingOptions, predicateChecker *simulator.Pr
|
||||||
lastScaleDownDeleteTime: time.Now(),
|
lastScaleDownDeleteTime: time.Now(),
|
||||||
lastScaleDownFailTime: time.Now(),
|
lastScaleDownFailTime: time.Now(),
|
||||||
scaleDown: scaleDown,
|
scaleDown: scaleDown,
|
||||||
|
podListProcessor: podListProcessor,
|
||||||
}, nil
|
}, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -236,6 +241,12 @@ func (a *StaticAutoscaler) RunOnce(currentTime time.Time) errors.AutoscalerError
|
||||||
return errors.ToAutoscalerError(errors.ApiCallError, err)
|
return errors.ToAutoscalerError(errors.ApiCallError, err)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
allUnschedulablePods, allScheduled, err = a.podListProcessor.Process(a.AutoscalingContext, allUnschedulablePods, allScheduled, allNodes)
|
||||||
|
if err != nil {
|
||||||
|
glog.Errorf("Failed to process pod list: %v", err)
|
||||||
|
return errors.ToAutoscalerError(errors.InternalError, err)
|
||||||
|
}
|
||||||
|
|
||||||
ConfigurePredicateCheckerForLoop(allUnschedulablePods, allScheduled, a.PredicateChecker)
|
ConfigurePredicateCheckerForLoop(allUnschedulablePods, allScheduled, a.PredicateChecker)
|
||||||
|
|
||||||
// We need to check whether pods marked as unschedulable are actually unschedulable.
|
// We need to check whether pods marked as unschedulable are actually unschedulable.
|
||||||
|
|
|
||||||
|
|
@ -24,10 +24,12 @@ import (
|
||||||
testprovider "k8s.io/autoscaler/cluster-autoscaler/cloudprovider/test"
|
testprovider "k8s.io/autoscaler/cluster-autoscaler/cloudprovider/test"
|
||||||
"k8s.io/autoscaler/cluster-autoscaler/clusterstate"
|
"k8s.io/autoscaler/cluster-autoscaler/clusterstate"
|
||||||
"k8s.io/autoscaler/cluster-autoscaler/clusterstate/utils"
|
"k8s.io/autoscaler/cluster-autoscaler/clusterstate/utils"
|
||||||
|
"k8s.io/autoscaler/cluster-autoscaler/context"
|
||||||
"k8s.io/autoscaler/cluster-autoscaler/estimator"
|
"k8s.io/autoscaler/cluster-autoscaler/estimator"
|
||||||
"k8s.io/autoscaler/cluster-autoscaler/expander/random"
|
"k8s.io/autoscaler/cluster-autoscaler/expander/random"
|
||||||
"k8s.io/autoscaler/cluster-autoscaler/simulator"
|
"k8s.io/autoscaler/cluster-autoscaler/simulator"
|
||||||
kube_util "k8s.io/autoscaler/cluster-autoscaler/utils/kubernetes"
|
kube_util "k8s.io/autoscaler/cluster-autoscaler/utils/kubernetes"
|
||||||
|
"k8s.io/autoscaler/cluster-autoscaler/utils/pods"
|
||||||
scheduler_util "k8s.io/autoscaler/cluster-autoscaler/utils/scheduler"
|
scheduler_util "k8s.io/autoscaler/cluster-autoscaler/utils/scheduler"
|
||||||
. "k8s.io/autoscaler/cluster-autoscaler/utils/test"
|
. "k8s.io/autoscaler/cluster-autoscaler/utils/test"
|
||||||
|
|
||||||
|
|
@ -168,8 +170,8 @@ func TestStaticAutoscalerRunOnce(t *testing.T) {
|
||||||
clusterState := clusterstate.NewClusterStateRegistry(provider, clusterStateConfig, fakeLogRecorder)
|
clusterState := clusterstate.NewClusterStateRegistry(provider, clusterStateConfig, fakeLogRecorder)
|
||||||
clusterState.UpdateNodes([]*apiv1.Node{n1, n2}, time.Now())
|
clusterState.UpdateNodes([]*apiv1.Node{n1, n2}, time.Now())
|
||||||
|
|
||||||
context := &AutoscalingContext{
|
context := &context.AutoscalingContext{
|
||||||
AutoscalingOptions: AutoscalingOptions{
|
AutoscalingOptions: context.AutoscalingOptions{
|
||||||
EstimatorName: estimator.BinpackingEstimatorName,
|
EstimatorName: estimator.BinpackingEstimatorName,
|
||||||
ScaleDownEnabled: true,
|
ScaleDownEnabled: true,
|
||||||
ScaleDownUtilizationThreshold: 0.5,
|
ScaleDownUtilizationThreshold: 0.5,
|
||||||
|
|
@ -197,7 +199,8 @@ func TestStaticAutoscalerRunOnce(t *testing.T) {
|
||||||
ListerRegistry: listerRegistry,
|
ListerRegistry: listerRegistry,
|
||||||
lastScaleUpTime: time.Now(),
|
lastScaleUpTime: time.Now(),
|
||||||
lastScaleDownFailTime: time.Now(),
|
lastScaleDownFailTime: time.Now(),
|
||||||
scaleDown: sd}
|
scaleDown: sd,
|
||||||
|
podListProcessor: pods.NewDefaultPodListProcessor()}
|
||||||
|
|
||||||
// MaxNodesTotal reached.
|
// MaxNodesTotal reached.
|
||||||
readyNodeListerMock.On("List").Return([]*apiv1.Node{n1}, nil).Once()
|
readyNodeListerMock.On("List").Return([]*apiv1.Node{n1}, nil).Once()
|
||||||
|
|
@ -344,8 +347,8 @@ func TestStaticAutoscalerRunOnceWithAutoprovisionedEnabled(t *testing.T) {
|
||||||
clusterState := clusterstate.NewClusterStateRegistry(provider, clusterStateConfig, fakeLogRecorder)
|
clusterState := clusterstate.NewClusterStateRegistry(provider, clusterStateConfig, fakeLogRecorder)
|
||||||
clusterState.UpdateNodes([]*apiv1.Node{n1}, time.Now())
|
clusterState.UpdateNodes([]*apiv1.Node{n1}, time.Now())
|
||||||
|
|
||||||
context := &AutoscalingContext{
|
context := &context.AutoscalingContext{
|
||||||
AutoscalingOptions: AutoscalingOptions{
|
AutoscalingOptions: context.AutoscalingOptions{
|
||||||
EstimatorName: estimator.BinpackingEstimatorName,
|
EstimatorName: estimator.BinpackingEstimatorName,
|
||||||
ScaleDownEnabled: true,
|
ScaleDownEnabled: true,
|
||||||
ScaleDownUtilizationThreshold: 0.5,
|
ScaleDownUtilizationThreshold: 0.5,
|
||||||
|
|
@ -375,7 +378,8 @@ func TestStaticAutoscalerRunOnceWithAutoprovisionedEnabled(t *testing.T) {
|
||||||
ListerRegistry: listerRegistry,
|
ListerRegistry: listerRegistry,
|
||||||
lastScaleUpTime: time.Now(),
|
lastScaleUpTime: time.Now(),
|
||||||
lastScaleDownFailTime: time.Now(),
|
lastScaleDownFailTime: time.Now(),
|
||||||
scaleDown: sd}
|
scaleDown: sd,
|
||||||
|
podListProcessor: pods.NewDefaultPodListProcessor()}
|
||||||
|
|
||||||
// Scale up.
|
// Scale up.
|
||||||
readyNodeListerMock.On("List").Return([]*apiv1.Node{n1}, nil).Once()
|
readyNodeListerMock.On("List").Return([]*apiv1.Node{n1}, nil).Once()
|
||||||
|
|
@ -481,8 +485,8 @@ func TestStaticAutoscalerRunOnceWithALongUnregisteredNode(t *testing.T) {
|
||||||
// broken node failed to register in time
|
// broken node failed to register in time
|
||||||
clusterState.UpdateNodes([]*apiv1.Node{n1}, later)
|
clusterState.UpdateNodes([]*apiv1.Node{n1}, later)
|
||||||
|
|
||||||
context := &AutoscalingContext{
|
context := &context.AutoscalingContext{
|
||||||
AutoscalingOptions: AutoscalingOptions{
|
AutoscalingOptions: context.AutoscalingOptions{
|
||||||
EstimatorName: estimator.BinpackingEstimatorName,
|
EstimatorName: estimator.BinpackingEstimatorName,
|
||||||
ScaleDownEnabled: true,
|
ScaleDownEnabled: true,
|
||||||
ScaleDownUtilizationThreshold: 0.5,
|
ScaleDownUtilizationThreshold: 0.5,
|
||||||
|
|
@ -511,7 +515,8 @@ func TestStaticAutoscalerRunOnceWithALongUnregisteredNode(t *testing.T) {
|
||||||
ListerRegistry: listerRegistry,
|
ListerRegistry: listerRegistry,
|
||||||
lastScaleUpTime: time.Now(),
|
lastScaleUpTime: time.Now(),
|
||||||
lastScaleDownFailTime: time.Now(),
|
lastScaleDownFailTime: time.Now(),
|
||||||
scaleDown: sd}
|
scaleDown: sd,
|
||||||
|
podListProcessor: pods.NewDefaultPodListProcessor()}
|
||||||
|
|
||||||
// Scale up.
|
// Scale up.
|
||||||
readyNodeListerMock.On("List").Return([]*apiv1.Node{n1}, nil).Once()
|
readyNodeListerMock.On("List").Return([]*apiv1.Node{n1}, nil).Once()
|
||||||
|
|
@ -617,8 +622,8 @@ func TestStaticAutoscalerRunOncePodsWithPriorities(t *testing.T) {
|
||||||
clusterState := clusterstate.NewClusterStateRegistry(provider, clusterStateConfig, fakeLogRecorder)
|
clusterState := clusterstate.NewClusterStateRegistry(provider, clusterStateConfig, fakeLogRecorder)
|
||||||
clusterState.UpdateNodes([]*apiv1.Node{n1, n2}, time.Now())
|
clusterState.UpdateNodes([]*apiv1.Node{n1, n2}, time.Now())
|
||||||
|
|
||||||
context := &AutoscalingContext{
|
context := &context.AutoscalingContext{
|
||||||
AutoscalingOptions: AutoscalingOptions{
|
AutoscalingOptions: context.AutoscalingOptions{
|
||||||
EstimatorName: estimator.BinpackingEstimatorName,
|
EstimatorName: estimator.BinpackingEstimatorName,
|
||||||
ScaleDownEnabled: true,
|
ScaleDownEnabled: true,
|
||||||
ScaleDownUtilizationThreshold: 0.5,
|
ScaleDownUtilizationThreshold: 0.5,
|
||||||
|
|
@ -647,7 +652,8 @@ func TestStaticAutoscalerRunOncePodsWithPriorities(t *testing.T) {
|
||||||
ListerRegistry: listerRegistry,
|
ListerRegistry: listerRegistry,
|
||||||
lastScaleUpTime: time.Now(),
|
lastScaleUpTime: time.Now(),
|
||||||
lastScaleDownFailTime: time.Now(),
|
lastScaleDownFailTime: time.Now(),
|
||||||
scaleDown: sd}
|
scaleDown: sd,
|
||||||
|
podListProcessor: pods.NewDefaultPodListProcessor()}
|
||||||
|
|
||||||
// Scale up
|
// Scale up
|
||||||
readyNodeListerMock.On("List").Return([]*apiv1.Node{n1, n2, n3}, nil).Once()
|
readyNodeListerMock.On("List").Return([]*apiv1.Node{n1, n2, n3}, nil).Once()
|
||||||
|
|
|
||||||
|
|
@ -26,6 +26,7 @@ import (
|
||||||
"k8s.io/autoscaler/cluster-autoscaler/cloudprovider"
|
"k8s.io/autoscaler/cluster-autoscaler/cloudprovider"
|
||||||
"k8s.io/autoscaler/cluster-autoscaler/clusterstate"
|
"k8s.io/autoscaler/cluster-autoscaler/clusterstate"
|
||||||
"k8s.io/autoscaler/cluster-autoscaler/clusterstate/utils"
|
"k8s.io/autoscaler/cluster-autoscaler/clusterstate/utils"
|
||||||
|
"k8s.io/autoscaler/cluster-autoscaler/context"
|
||||||
"k8s.io/autoscaler/cluster-autoscaler/metrics"
|
"k8s.io/autoscaler/cluster-autoscaler/metrics"
|
||||||
"k8s.io/autoscaler/cluster-autoscaler/simulator"
|
"k8s.io/autoscaler/cluster-autoscaler/simulator"
|
||||||
"k8s.io/autoscaler/cluster-autoscaler/utils/daemonset"
|
"k8s.io/autoscaler/cluster-autoscaler/utils/daemonset"
|
||||||
|
|
@ -322,7 +323,7 @@ func sanitizeTemplateNode(node *apiv1.Node, nodeGroup string) (*apiv1.Node, erro
|
||||||
}
|
}
|
||||||
|
|
||||||
// Removes unregistered nodes if needed. Returns true if anything was removed and error if such occurred.
|
// Removes unregistered nodes if needed. Returns true if anything was removed and error if such occurred.
|
||||||
func removeOldUnregisteredNodes(unregisteredNodes []clusterstate.UnregisteredNode, context *AutoscalingContext,
|
func removeOldUnregisteredNodes(unregisteredNodes []clusterstate.UnregisteredNode, context *context.AutoscalingContext,
|
||||||
currentTime time.Time, logRecorder *utils.LogEventRecorder) (bool, error) {
|
currentTime time.Time, logRecorder *utils.LogEventRecorder) (bool, error) {
|
||||||
removedAny := false
|
removedAny := false
|
||||||
for _, unregisteredNode := range unregisteredNodes {
|
for _, unregisteredNode := range unregisteredNodes {
|
||||||
|
|
@ -362,7 +363,7 @@ func removeOldUnregisteredNodes(unregisteredNodes []clusterstate.UnregisteredNod
|
||||||
// Sets the target size of node groups to the current number of nodes in them
|
// Sets the target size of node groups to the current number of nodes in them
|
||||||
// if the difference was constant for a prolonged time. Returns true if managed
|
// if the difference was constant for a prolonged time. Returns true if managed
|
||||||
// to fix something.
|
// to fix something.
|
||||||
func fixNodeGroupSize(context *AutoscalingContext, currentTime time.Time) (bool, error) {
|
func fixNodeGroupSize(context *context.AutoscalingContext, currentTime time.Time) (bool, error) {
|
||||||
fixed := false
|
fixed := false
|
||||||
for _, nodeGroup := range context.CloudProvider.NodeGroups() {
|
for _, nodeGroup := range context.CloudProvider.NodeGroups() {
|
||||||
incorrectSize := context.ClusterStateRegistry.GetIncorrectNodeGroupSize(nodeGroup.Id())
|
incorrectSize := context.ClusterStateRegistry.GetIncorrectNodeGroupSize(nodeGroup.Id())
|
||||||
|
|
@ -389,7 +390,7 @@ func fixNodeGroupSize(context *AutoscalingContext, currentTime time.Time) (bool,
|
||||||
// getPotentiallyUnneededNodes returns nodes that are:
|
// getPotentiallyUnneededNodes returns nodes that are:
|
||||||
// - managed by the cluster autoscaler
|
// - managed by the cluster autoscaler
|
||||||
// - in groups with size > min size
|
// - in groups with size > min size
|
||||||
func getPotentiallyUnneededNodes(context *AutoscalingContext, nodes []*apiv1.Node) []*apiv1.Node {
|
func getPotentiallyUnneededNodes(context *context.AutoscalingContext, nodes []*apiv1.Node) []*apiv1.Node {
|
||||||
result := make([]*apiv1.Node, 0, len(nodes))
|
result := make([]*apiv1.Node, 0, len(nodes))
|
||||||
|
|
||||||
nodeGroupSize := getNodeGroupSizeMap(context.CloudProvider)
|
nodeGroupSize := getNodeGroupSizeMap(context.CloudProvider)
|
||||||
|
|
|
||||||
|
|
@ -24,6 +24,7 @@ import (
|
||||||
testprovider "k8s.io/autoscaler/cluster-autoscaler/cloudprovider/test"
|
testprovider "k8s.io/autoscaler/cluster-autoscaler/cloudprovider/test"
|
||||||
"k8s.io/autoscaler/cluster-autoscaler/clusterstate"
|
"k8s.io/autoscaler/cluster-autoscaler/clusterstate"
|
||||||
"k8s.io/autoscaler/cluster-autoscaler/clusterstate/utils"
|
"k8s.io/autoscaler/cluster-autoscaler/clusterstate/utils"
|
||||||
|
"k8s.io/autoscaler/cluster-autoscaler/context"
|
||||||
"k8s.io/autoscaler/cluster-autoscaler/simulator"
|
"k8s.io/autoscaler/cluster-autoscaler/simulator"
|
||||||
"k8s.io/autoscaler/cluster-autoscaler/utils/deletetaint"
|
"k8s.io/autoscaler/cluster-autoscaler/utils/deletetaint"
|
||||||
scheduler_util "k8s.io/autoscaler/cluster-autoscaler/utils/scheduler"
|
scheduler_util "k8s.io/autoscaler/cluster-autoscaler/utils/scheduler"
|
||||||
|
|
@ -330,8 +331,8 @@ func TestRemoveOldUnregisteredNodes(t *testing.T) {
|
||||||
err := clusterState.UpdateNodes([]*apiv1.Node{ng1_1}, now.Add(-time.Hour))
|
err := clusterState.UpdateNodes([]*apiv1.Node{ng1_1}, now.Add(-time.Hour))
|
||||||
assert.NoError(t, err)
|
assert.NoError(t, err)
|
||||||
|
|
||||||
context := &AutoscalingContext{
|
context := &context.AutoscalingContext{
|
||||||
AutoscalingOptions: AutoscalingOptions{
|
AutoscalingOptions: context.AutoscalingOptions{
|
||||||
MaxNodeProvisionTime: 45 * time.Minute,
|
MaxNodeProvisionTime: 45 * time.Minute,
|
||||||
},
|
},
|
||||||
CloudProvider: provider,
|
CloudProvider: provider,
|
||||||
|
|
@ -428,8 +429,8 @@ func TestRemoveFixNodeTargetSize(t *testing.T) {
|
||||||
err := clusterState.UpdateNodes([]*apiv1.Node{ng1_1}, now.Add(-time.Hour))
|
err := clusterState.UpdateNodes([]*apiv1.Node{ng1_1}, now.Add(-time.Hour))
|
||||||
assert.NoError(t, err)
|
assert.NoError(t, err)
|
||||||
|
|
||||||
context := &AutoscalingContext{
|
context := &context.AutoscalingContext{
|
||||||
AutoscalingOptions: AutoscalingOptions{
|
AutoscalingOptions: context.AutoscalingOptions{
|
||||||
MaxNodeProvisionTime: 45 * time.Minute,
|
MaxNodeProvisionTime: 45 * time.Minute,
|
||||||
},
|
},
|
||||||
CloudProvider: provider,
|
CloudProvider: provider,
|
||||||
|
|
@ -461,7 +462,7 @@ func TestGetPotentiallyUnneededNodes(t *testing.T) {
|
||||||
provider.AddNode("ng1", ng1_2)
|
provider.AddNode("ng1", ng1_2)
|
||||||
provider.AddNode("ng2", ng2_1)
|
provider.AddNode("ng2", ng2_1)
|
||||||
|
|
||||||
context := &AutoscalingContext{
|
context := &context.AutoscalingContext{
|
||||||
CloudProvider: provider,
|
CloudProvider: provider,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -33,6 +33,7 @@ import (
|
||||||
cloudBuilder "k8s.io/autoscaler/cluster-autoscaler/cloudprovider/builder"
|
cloudBuilder "k8s.io/autoscaler/cluster-autoscaler/cloudprovider/builder"
|
||||||
"k8s.io/autoscaler/cluster-autoscaler/config"
|
"k8s.io/autoscaler/cluster-autoscaler/config"
|
||||||
"k8s.io/autoscaler/cluster-autoscaler/config/dynamic"
|
"k8s.io/autoscaler/cluster-autoscaler/config/dynamic"
|
||||||
|
"k8s.io/autoscaler/cluster-autoscaler/context"
|
||||||
"k8s.io/autoscaler/cluster-autoscaler/core"
|
"k8s.io/autoscaler/cluster-autoscaler/core"
|
||||||
"k8s.io/autoscaler/cluster-autoscaler/estimator"
|
"k8s.io/autoscaler/cluster-autoscaler/estimator"
|
||||||
"k8s.io/autoscaler/cluster-autoscaler/expander"
|
"k8s.io/autoscaler/cluster-autoscaler/expander"
|
||||||
|
|
@ -134,7 +135,7 @@ var (
|
||||||
regional = flag.Bool("regional", false, "Cluster is regional.")
|
regional = flag.Bool("regional", false, "Cluster is regional.")
|
||||||
)
|
)
|
||||||
|
|
||||||
func createAutoscalerOptions() core.AutoscalerOptions {
|
func createAutoscalingOptions() context.AutoscalingOptions {
|
||||||
minCoresTotal, maxCoresTotal, err := parseMinMaxFlag(*coresTotal)
|
minCoresTotal, maxCoresTotal, err := parseMinMaxFlag(*coresTotal)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
glog.Fatalf("Failed to parse flags: %v", err)
|
glog.Fatalf("Failed to parse flags: %v", err)
|
||||||
|
|
@ -147,7 +148,7 @@ func createAutoscalerOptions() core.AutoscalerOptions {
|
||||||
minMemoryTotal = minMemoryTotal * 1024
|
minMemoryTotal = minMemoryTotal * 1024
|
||||||
maxMemoryTotal = maxMemoryTotal * 1024
|
maxMemoryTotal = maxMemoryTotal * 1024
|
||||||
|
|
||||||
autoscalingOpts := core.AutoscalingOptions{
|
return context.AutoscalingOptions{
|
||||||
CloudConfig: *cloudConfig,
|
CloudConfig: *cloudConfig,
|
||||||
CloudProviderName: *cloudProviderFlag,
|
CloudProviderName: *cloudProviderFlag,
|
||||||
NodeGroupAutoDiscovery: nodeGroupAutoDiscoveryFlag,
|
NodeGroupAutoDiscovery: nodeGroupAutoDiscoveryFlag,
|
||||||
|
|
@ -183,16 +184,13 @@ func createAutoscalerOptions() core.AutoscalerOptions {
|
||||||
ExpendablePodsPriorityCutoff: *expendablePodsPriorityCutoff,
|
ExpendablePodsPriorityCutoff: *expendablePodsPriorityCutoff,
|
||||||
Regional: *regional,
|
Regional: *regional,
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
|
||||||
configFetcherOpts := dynamic.ConfigFetcherOptions{
|
func createConfigFetcherOptions() dynamic.ConfigFetcherOptions {
|
||||||
|
return dynamic.ConfigFetcherOptions{
|
||||||
ConfigMapName: *configMapName,
|
ConfigMapName: *configMapName,
|
||||||
Namespace: *namespace,
|
Namespace: *namespace,
|
||||||
}
|
}
|
||||||
|
|
||||||
return core.AutoscalerOptions{
|
|
||||||
AutoscalingOptions: autoscalingOpts,
|
|
||||||
ConfigFetcherOptions: configFetcherOpts,
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func createKubeClient() kube_client.Interface {
|
func createKubeClient() kube_client.Interface {
|
||||||
|
|
@ -241,8 +239,8 @@ func run(healthCheck *metrics.HealthCheck) {
|
||||||
metrics.RegisterAll()
|
metrics.RegisterAll()
|
||||||
kubeClient := createKubeClient()
|
kubeClient := createKubeClient()
|
||||||
kubeEventRecorder := kube_util.CreateEventRecorder(kubeClient)
|
kubeEventRecorder := kube_util.CreateEventRecorder(kubeClient)
|
||||||
opts := createAutoscalerOptions()
|
autoscalingOptions := createAutoscalingOptions()
|
||||||
metrics.UpdateNapEnabled(opts.NodeAutoprovisioningEnabled)
|
metrics.UpdateNapEnabled(autoscalingOptions.NodeAutoprovisioningEnabled)
|
||||||
predicateCheckerStopChannel := make(chan struct{})
|
predicateCheckerStopChannel := make(chan struct{})
|
||||||
predicateChecker, err := simulator.NewPredicateChecker(kubeClient, predicateCheckerStopChannel)
|
predicateChecker, err := simulator.NewPredicateChecker(kubeClient, predicateCheckerStopChannel)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
|
@ -250,7 +248,16 @@ func run(healthCheck *metrics.HealthCheck) {
|
||||||
}
|
}
|
||||||
listerRegistryStopChannel := make(chan struct{})
|
listerRegistryStopChannel := make(chan struct{})
|
||||||
listerRegistry := kube_util.NewListerRegistryWithDefaultListers(kubeClient, listerRegistryStopChannel)
|
listerRegistry := kube_util.NewListerRegistryWithDefaultListers(kubeClient, listerRegistryStopChannel)
|
||||||
autoscaler, err := core.NewAutoscaler(opts, predicateChecker, kubeClient, kubeEventRecorder, listerRegistry)
|
|
||||||
|
opts := core.AutoscalerOptions{
|
||||||
|
AutoscalingOptions: autoscalingOptions,
|
||||||
|
ConfigFetcherOptions: createConfigFetcherOptions(),
|
||||||
|
PredicateChecker: predicateChecker,
|
||||||
|
KubeClient: kubeClient,
|
||||||
|
KubeEventRecorder: kubeEventRecorder,
|
||||||
|
ListerRegistry: listerRegistry,
|
||||||
|
}
|
||||||
|
autoscaler, err := core.NewAutoscaler(opts)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
glog.Fatalf("Failed to create autoscaler: %v", err)
|
glog.Fatalf("Failed to create autoscaler: %v", err)
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -0,0 +1,41 @@
|
||||||
|
/*
|
||||||
|
Copyright 2018 The Kubernetes Authors.
|
||||||
|
|
||||||
|
Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
you may not use this file except in compliance with the License.
|
||||||
|
You may obtain a copy of the License at
|
||||||
|
|
||||||
|
http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
|
||||||
|
Unless required by applicable law or agreed to in writing, software
|
||||||
|
distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
See the License for the specific language governing permissions and
|
||||||
|
limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package pods
|
||||||
|
|
||||||
|
import (
|
||||||
|
apiv1 "k8s.io/api/core/v1"
|
||||||
|
"k8s.io/autoscaler/cluster-autoscaler/context"
|
||||||
|
)
|
||||||
|
|
||||||
|
// PodListProcessor processes lists of unschedulable and sheduled pods before scaling of the cluster.
|
||||||
|
type PodListProcessor interface {
|
||||||
|
Process(context *context.AutoscalingContext, unschedulablePods []*apiv1.Pod, allScheduled []*apiv1.Pod, nodes []*apiv1.Node) ([]*apiv1.Pod, []*apiv1.Pod, error)
|
||||||
|
}
|
||||||
|
|
||||||
|
// NoOpPodListProcessor is returning pod lists without processing them.
|
||||||
|
type NoOpPodListProcessor struct {
|
||||||
|
}
|
||||||
|
|
||||||
|
// NewDefaultPodListProcessor creates an instance of PodListProcessor.
|
||||||
|
func NewDefaultPodListProcessor() PodListProcessor {
|
||||||
|
return &NoOpPodListProcessor{}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Process processes lists of unschedulable and sheduled pods before scaling of the cluster.
|
||||||
|
func (p *NoOpPodListProcessor) Process(context *context.AutoscalingContext, unschedulablePods []*apiv1.Pod, allScheduled []*apiv1.Pod, nodes []*apiv1.Node) ([]*apiv1.Pod, []*apiv1.Pod, error) {
|
||||||
|
return unschedulablePods, allScheduled, nil
|
||||||
|
}
|
||||||
|
|
@ -0,0 +1,44 @@
|
||||||
|
/*
|
||||||
|
Copyright 2018 The Kubernetes Authors.
|
||||||
|
|
||||||
|
Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
you may not use this file except in compliance with the License.
|
||||||
|
You may obtain a copy of the License at
|
||||||
|
|
||||||
|
http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
|
||||||
|
Unless required by applicable law or agreed to in writing, software
|
||||||
|
distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
See the License for the specific language governing permissions and
|
||||||
|
limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package pods
|
||||||
|
|
||||||
|
import (
|
||||||
|
"testing"
|
||||||
|
|
||||||
|
apiv1 "k8s.io/api/core/v1"
|
||||||
|
|
||||||
|
"k8s.io/autoscaler/cluster-autoscaler/context"
|
||||||
|
. "k8s.io/autoscaler/cluster-autoscaler/utils/test"
|
||||||
|
)
|
||||||
|
|
||||||
|
func TestPodListProcessor(t *testing.T) {
|
||||||
|
context := &context.AutoscalingContext{}
|
||||||
|
p1 := BuildTestPod("p1", 40, 0)
|
||||||
|
p2 := BuildTestPod("p2", 400, 0)
|
||||||
|
n1 := BuildTestNode("n1", 100, 1000)
|
||||||
|
n2 := BuildTestNode("n1", 100, 1000)
|
||||||
|
unschedulablePods := []*apiv1.Pod{p1}
|
||||||
|
allScheduled := []*apiv1.Pod{p2}
|
||||||
|
nodes := []*apiv1.Node{n1, n2}
|
||||||
|
podListProcessor := NewDefaultPodListProcessor()
|
||||||
|
gotUnschedulablePods, gotAllScheduled, err := podListProcessor.Process(context, unschedulablePods, allScheduled, nodes)
|
||||||
|
if len(gotUnschedulablePods) != 1 || len(gotAllScheduled) != 1 || err != nil {
|
||||||
|
t.Errorf("Error podListProcessor.Process() = %v, %v, %v want %v, %v, nil ",
|
||||||
|
gotUnschedulablePods, gotAllScheduled, err, unschedulablePods, allScheduled)
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
Loading…
Reference in New Issue