Enable pricing in expander factory

This commit is contained in:
Marcin Wielgus 2017-06-09 11:02:55 -07:00
parent 5e778cea2c
commit e2e171b7b7
15 changed files with 107 additions and 54 deletions

View File

@ -46,7 +46,9 @@ type Autoscaler interface {
}
// NewAutoscaler creates an autoscaler of an appropriate type according to the parameters
func NewAutoscaler(opts AutoscalerOptions, predicateChecker *simulator.PredicateChecker, kubeClient kube_client.Interface, kubeEventRecorder kube_record.EventRecorder, listerRegistry kube_util.ListerRegistry) Autoscaler {
func NewAutoscaler(opts AutoscalerOptions, predicateChecker *simulator.PredicateChecker, kubeClient kube_client.Interface,
kubeEventRecorder kube_record.EventRecorder, listerRegistry kube_util.ListerRegistry) (Autoscaler, error) {
autoscalerBuilder := NewAutoscalerBuilder(opts.AutoscalingOptions, predicateChecker, kubeClient, kubeEventRecorder, listerRegistry)
if opts.ConfigMapName != "" {
if opts.NodeGroupAutoDiscovery != "" {

View File

@ -27,7 +27,7 @@ import (
// AutoscalerBuilder builds an instance of Autoscaler which is the core of CA
type AutoscalerBuilder interface {
SetDynamicConfig(config dynamic.Config) AutoscalerBuilder
Build() Autoscaler
Build() (Autoscaler, error)
}
// AutoscalerBuilderImpl builds new autoscalers from its state including initial `AutoscalingOptions` given at startup and
@ -42,7 +42,8 @@ type AutoscalerBuilderImpl struct {
}
// NewAutoscalerBuilder builds an AutoscalerBuilder from required parameters
func NewAutoscalerBuilder(autoscalingOptions AutoscalingOptions, predicateChecker *simulator.PredicateChecker, kubeClient kube_client.Interface, kubeEventRecorder kube_record.EventRecorder, listerRegistry kube_util.ListerRegistry) *AutoscalerBuilderImpl {
func NewAutoscalerBuilder(autoscalingOptions AutoscalingOptions, predicateChecker *simulator.PredicateChecker,
kubeClient kube_client.Interface, kubeEventRecorder kube_record.EventRecorder, listerRegistry kube_util.ListerRegistry) *AutoscalerBuilderImpl {
return &AutoscalerBuilderImpl{
autoscalingOptions: autoscalingOptions,
kubeClient: kubeClient,
@ -60,7 +61,7 @@ func (b *AutoscalerBuilderImpl) SetDynamicConfig(config dynamic.Config) Autoscal
}
// Build an autoscaler according to the builder's state
func (b *AutoscalerBuilderImpl) Build() Autoscaler {
func (b *AutoscalerBuilderImpl) Build() (Autoscaler, error) {
options := b.autoscalingOptions
if b.dynamicConfig != nil {
c := *(b.dynamicConfig)

View File

@ -67,7 +67,7 @@ func TestNewAutoscalerStatic(t *testing.T) {
}
predicateChecker := simulator.NewTestPredicateChecker()
listerRegistry := kube_util.NewListerRegistry(nil, nil, nil, nil, nil, nil)
a := NewAutoscaler(opts, predicateChecker, fakeClient, kubeEventRecorder, listerRegistry)
a, _ := NewAutoscaler(opts, predicateChecker, fakeClient, kubeEventRecorder, listerRegistry)
assert.IsType(t, &StaticAutoscaler{}, a)
}
@ -104,6 +104,6 @@ func TestNewAutoscalerDynamic(t *testing.T) {
}
predicateChecker := simulator.NewTestPredicateChecker()
listerRegistry := kube_util.NewListerRegistry(nil, nil, nil, nil, nil, nil)
a := NewAutoscaler(opts, predicateChecker, fakeClient, kubeEventRecorder, listerRegistry)
a, _ := NewAutoscaler(opts, predicateChecker, fakeClient, kubeEventRecorder, listerRegistry)
assert.IsType(t, &DynamicAutoscaler{}, a)
}

View File

@ -26,6 +26,7 @@ import (
"k8s.io/autoscaler/cluster-autoscaler/expander"
"k8s.io/autoscaler/cluster-autoscaler/expander/factory"
"k8s.io/autoscaler/cluster-autoscaler/simulator"
kube_util "k8s.io/autoscaler/cluster-autoscaler/utils/kubernetes"
kube_record "k8s.io/client-go/tools/record"
kube_client "k8s.io/kubernetes/pkg/client/clientset_generated/clientset"
)
@ -107,13 +108,20 @@ type AutoscalingOptions struct {
// NewAutoscalingContext returns an autoscaling context from all the necessary parameters passed via arguments
func NewAutoscalingContext(options AutoscalingOptions, predicateChecker *simulator.PredicateChecker,
kubeClient kube_client.Interface, kubeEventRecorder kube_record.EventRecorder, logEventRecorder *utils.LogEventRecorder) *AutoscalingContext {
kubeClient kube_client.Interface, kubeEventRecorder kube_record.EventRecorder,
logEventRecorder *utils.LogEventRecorder, listerRegistry kube_util.ListerRegistry) (*AutoscalingContext, error) {
cloudProviderBuilder := builder.NewCloudProviderBuilder(options.CloudProviderName, options.CloudConfig)
cloudProvider := cloudProviderBuilder.Build(cloudprovider.NodeGroupDiscoveryOptions{
NodeGroupSpecs: options.NodeGroups,
NodeGroupAutoDiscoverySpec: options.NodeGroupAutoDiscovery,
})
expanderStrategy := factory.ExpanderStrategyFromString(options.ExpanderName)
expanderStrategy, err := factory.ExpanderStrategyFromString(options.ExpanderName,
cloudProvider, listerRegistry.AllNodeLister())
if err != nil {
return nil, err
}
clusterStateConfig := clusterstate.ClusterStateRegistryConfig{
MaxTotalUnreadyPercentage: options.MaxTotalUnreadyPercentage,
OkTotalUnreadyCount: options.OkTotalUnreadyCount,
@ -132,5 +140,5 @@ func NewAutoscalingContext(options AutoscalingOptions, predicateChecker *simulat
LogRecorder: logEventRecorder,
}
return &autoscalingContext
return &autoscalingContext, nil
}

View File

@ -34,12 +34,16 @@ type DynamicAutoscaler struct {
}
// NewDynamicAutoscaler builds a DynamicAutoscaler from required parameters
func NewDynamicAutoscaler(autoscalerBuilder AutoscalerBuilder, configFetcher dynamic.ConfigFetcher) *DynamicAutoscaler {
func NewDynamicAutoscaler(autoscalerBuilder AutoscalerBuilder, configFetcher dynamic.ConfigFetcher) (*DynamicAutoscaler, error) {
autoscaler, err := autoscalerBuilder.Build()
if err != nil {
return nil, err
}
return &DynamicAutoscaler{
autoscaler: autoscalerBuilder.Build(),
autoscaler: autoscaler,
autoscalerBuilder: autoscalerBuilder,
configFetcher: configFetcher,
}
}, err
}
// CleanUp does the work required before all the iterations of a dynamic autoscaler run
@ -75,7 +79,10 @@ func (a *DynamicAutoscaler) Reconfigure() error {
if updatedConfig != nil {
// For safety, any config change should stop and recreate all the stuff running in CA hence recreating all the Autoscaler instance here
// See https://github.com/kubernetes/contrib/pull/2226#discussion_r94126064
a.autoscaler = a.autoscalerBuilder.SetDynamicConfig(*updatedConfig).Build()
a.autoscaler, err = a.autoscalerBuilder.SetDynamicConfig(*updatedConfig).Build()
if err != nil {
return err
}
glog.V(4).Infof("Dynamic reconfiguration finished: updatedConfig=%v", updatedConfig)
}

View File

@ -59,9 +59,9 @@ func (m *AutoscalerBuilderMock) SetDynamicConfig(config dynamic.Config) Autoscal
return args.Get(0).(AutoscalerBuilder)
}
func (m *AutoscalerBuilderMock) Build() Autoscaler {
func (m *AutoscalerBuilderMock) Build() (Autoscaler, error) {
args := m.Called()
return args.Get(0).(Autoscaler)
return args.Get(0).(Autoscaler), nil
}
func TestRunOnceWhenNoUpdate(t *testing.T) {
@ -76,7 +76,7 @@ func TestRunOnceWhenNoUpdate(t *testing.T) {
builder := &AutoscalerBuilderMock{}
builder.On("Build").Return(autoscaler).Once()
a := NewDynamicAutoscaler(builder, configFetcher)
a, _ := NewDynamicAutoscaler(builder, configFetcher)
a.RunOnce(currentTime)
autoscaler.AssertExpectations(t)
@ -102,7 +102,7 @@ func TestRunOnceWhenUpdated(t *testing.T) {
builder.On("SetDynamicConfig", newConfig).Return(builder).Once()
builder.On("Build").Return(newAutoscaler).Once()
a := NewDynamicAutoscaler(builder, configFetcher)
a, _ := NewDynamicAutoscaler(builder, configFetcher)
a.RunOnce(currentTime)
initialAutoscaler.AssertNotCalled(t, "RunOnce", mock.AnythingOfType("time.Time"))

View File

@ -31,11 +31,16 @@ type PollingAutoscaler struct {
}
// NewPollingAutoscaler builds a PollingAutoscaler from required parameters
func NewPollingAutoscaler(autoscalerBuilder AutoscalerBuilder) *PollingAutoscaler {
return &PollingAutoscaler{
autoscaler: autoscalerBuilder.Build(),
autoscalerBuilder: autoscalerBuilder,
func NewPollingAutoscaler(autoscalerBuilder AutoscalerBuilder) (*PollingAutoscaler, error) {
autoscaler, err := autoscalerBuilder.Build()
if err != nil {
return nil, err
}
return &PollingAutoscaler{
autoscaler: autoscaler,
autoscalerBuilder: autoscalerBuilder,
}, err
}
// CleanUp does the work required before all the iterations of a polling autoscaler run
@ -63,8 +68,11 @@ func (a *PollingAutoscaler) RunOnce(currentTime time.Time) *errors.AutoscalerErr
func (a *PollingAutoscaler) Poll() error {
// For safety, any config change should stop and recreate all the stuff running in CA hence recreating all the Autoscaler instance here
// See https://github.com/kubernetes/contrib/pull/2226#discussion_r94126064
a.autoscaler = a.autoscalerBuilder.Build()
autoscaler, err := a.autoscalerBuilder.Build()
if err != nil {
return err
}
a.autoscaler = autoscaler
glog.V(4).Infof("Poll finished")
return nil
}

View File

@ -35,7 +35,7 @@ func TestRunOnce(t *testing.T) {
builder.On("Build").Return(initialAutoscaler).Once()
builder.On("Build").Return(newAutoscaler).Once()
a := NewPollingAutoscaler(builder)
a, _ := NewPollingAutoscaler(builder)
a.RunOnce(currentTime)
initialAutoscaler.AssertNotCalled(t, "RunOnce", mock.AnythingOfType("time.Time"))

View File

@ -43,7 +43,7 @@ type StaticAutoscaler struct {
// NewStaticAutoscaler creates an instance of Autoscaler filled with provided parameters
func NewStaticAutoscaler(opts AutoscalingOptions, predicateChecker *simulator.PredicateChecker,
kubeClient kube_client.Interface, kubeEventRecorder kube_record.EventRecorder, listerRegistry kube_util.ListerRegistry) *StaticAutoscaler {
kubeClient kube_client.Interface, kubeEventRecorder kube_record.EventRecorder, listerRegistry kube_util.ListerRegistry) (*StaticAutoscaler, error) {
logRecorder, err := utils.NewStatusMapRecorder(kubeClient, kubeEventRecorder, opts.WriteStatusConfigMap)
if err != nil {
glog.Error("Failed to initialize status configmap, unable to write status events")
@ -51,7 +51,10 @@ func NewStaticAutoscaler(opts AutoscalingOptions, predicateChecker *simulator.Pr
// TODO(maciekpytel): recover from this after successfull status configmap update?
logRecorder, _ = utils.NewStatusMapRecorder(kubeClient, kubeEventRecorder, false)
}
autoscalingContext := NewAutoscalingContext(opts, predicateChecker, kubeClient, kubeEventRecorder, logRecorder)
autoscalingContext, err := NewAutoscalingContext(opts, predicateChecker, kubeClient, kubeEventRecorder, logRecorder, listerRegistry)
if err != nil {
return nil, err
}
scaleDown := NewScaleDown(autoscalingContext)
@ -61,7 +64,7 @@ func NewStaticAutoscaler(opts AutoscalingOptions, predicateChecker *simulator.Pr
lastScaleUpTime: time.Now(),
lastScaleDownFailedTrial: time.Now(),
scaleDown: scaleDown,
}
}, nil
}
// CleanUp cleans up ToBeDeleted taints added by the previously run and then failed CA

View File

@ -24,13 +24,16 @@ import (
var (
// AvailableExpanders is a list of available expander options
AvailableExpanders = []string{RandomExpanderName, MostPodsExpanderName, LeastWasteExpanderName}
AvailableExpanders = []string{RandomExpanderName, MostPodsExpanderName, LeastWasteExpanderName, PriceBasedExpanderName}
// RandomExpanderName selects a node group at random
RandomExpanderName = "random"
// MostPodsExpanderName selects a node group that fits the most pods
MostPodsExpanderName = "most-pods"
// LeastWasteExpanderName selects a node group that leaves the least fraction of CPU and Memory
LeastWasteExpanderName = "least-waste"
// PriceBasedExpanderName selects a node group that is the most cost-effective and consistent with
// the preferred node size for the cluster
PriceBasedExpanderName = "price"
)
// Option describes an option to expand the cluster.

View File

@ -17,24 +17,35 @@ limitations under the License.
package factory
import (
"k8s.io/autoscaler/cluster-autoscaler/cloudprovider"
"k8s.io/autoscaler/cluster-autoscaler/expander"
"k8s.io/autoscaler/cluster-autoscaler/expander/mostpods"
"k8s.io/autoscaler/cluster-autoscaler/expander/price"
"k8s.io/autoscaler/cluster-autoscaler/expander/random"
"k8s.io/autoscaler/cluster-autoscaler/expander/waste"
"k8s.io/autoscaler/cluster-autoscaler/utils/errors"
kube_util "k8s.io/autoscaler/cluster-autoscaler/utils/kubernetes"
)
// ExpanderStrategyFromString creates an expander.Strategy according to its name
func ExpanderStrategyFromString(expanderFlag string) expander.Strategy {
var expanderStrategy expander.Strategy
{
switch expanderFlag {
case expander.RandomExpanderName:
expanderStrategy = random.NewStrategy()
case expander.MostPodsExpanderName:
expanderStrategy = mostpods.NewStrategy()
case expander.LeastWasteExpanderName:
expanderStrategy = waste.NewStrategy()
func ExpanderStrategyFromString(expanderFlag string, cloudProvider cloudprovider.CloudProvider,
nodeLister kube_util.NodeLister) (expander.Strategy, error) {
switch expanderFlag {
case expander.RandomExpanderName:
return random.NewStrategy(), nil
case expander.MostPodsExpanderName:
return mostpods.NewStrategy(), nil
case expander.LeastWasteExpanderName:
return waste.NewStrategy(), nil
case expander.PriceBasedExpanderName:
pricing, err := cloudProvider.Pricing()
if err != nil {
return nil, err
}
return price.NewStrategy(pricing,
price.NewSimplePreferredNodeProvider(nodeLister),
price.SimpleNodeUnfitness), nil
}
return expanderStrategy
return nil, errors.NewAutoscalerError(errors.InternalError, "Expander %s not supported", expanderFlag)
}

View File

@ -26,11 +26,18 @@ import (
apiv1 "k8s.io/kubernetes/pkg/api/v1"
)
type simplePreferredNodeProvider struct {
// SimplePreferredNodeProvider returns preferred node based on the cluster size.
type SimplePreferredNodeProvider struct {
nodeLister kube_util.NodeLister
}
func (spnp *simplePreferredNodeProvider) Node() (*apiv1.Node, error) {
func NewSimplePreferredNodeProvider(nodeLister kube_util.NodeLister) *SimplePreferredNodeProvider {
return &SimplePreferredNodeProvider{
nodeLister: nodeLister,
}
}
func (spnp *SimplePreferredNodeProvider) Node() (*apiv1.Node, error) {
nodes, err := spnp.nodeLister.List()
if err != nil {
return nil, err
@ -74,7 +81,8 @@ func buildNode(millicpu int64, mem int64) *apiv1.Node {
return node
}
func simpleNodeUnfitness(preferredNode, evaluatedNode *apiv1.Node) float64 {
// SimpleNodeUnfitness returns unfitness based on cpu only.
func SimpleNodeUnfitness(preferredNode, evaluatedNode *apiv1.Node) float64 {
preferredCpu := preferredNode.Status.Capacity[apiv1.ResourceCPU]
evaluatedCpu := evaluatedNode.Status.Capacity[apiv1.ResourceCPU]
return math.Max(float64(preferredCpu.MilliValue())/float64(evaluatedCpu.MilliValue()),

View File

@ -38,7 +38,7 @@ func TestPreferred(t *testing.T) {
n2 := BuildTestNode("n2", 2000, 2000)
n3 := BuildTestNode("n2", 2000, 2000)
provider := simplePreferredNodeProvider{
provider := SimplePreferredNodeProvider{
nodeLister: &testNodeLister{
list: []*apiv1.Node{n1, n2, n3},
},
@ -47,7 +47,7 @@ func TestPreferred(t *testing.T) {
assert.NoError(t, err)
cpu := node.Status.Capacity[apiv1.ResourceCPU]
assert.Equal(t, int64(2), cpu.Value())
assert.Equal(t, 2.0, simpleNodeUnfitness(n1, n2))
assert.Equal(t, 2.0, simpleNodeUnfitness(n2, n1))
assert.Equal(t, 1.0, simpleNodeUnfitness(n1, n1))
assert.Equal(t, 2.0, SimpleNodeUnfitness(n1, n2))
assert.Equal(t, 2.0, SimpleNodeUnfitness(n2, n1))
assert.Equal(t, 1.0, SimpleNodeUnfitness(n1, n1))
}

View File

@ -113,7 +113,7 @@ func TestPriceExpander(t *testing.T) {
&testPreferredNodeProvider{
preferred: buildNode(2000, 1024*1024*1024),
},
simpleNodeUnfitness,
SimpleNodeUnfitness,
).BestOption(options, nodeInfosForGroups).Debug, "ng1")
// First node group is cheapter however the second is preferred.
@ -132,7 +132,7 @@ func TestPriceExpander(t *testing.T) {
&testPreferredNodeProvider{
preferred: buildNode(4000, 1024*1024*1024),
},
simpleNodeUnfitness,
SimpleNodeUnfitness,
).BestOption(options, nodeInfosForGroups).Debug, "ng2")
// All node groups accept the same set of pods. Lots of nodes.
@ -167,7 +167,7 @@ func TestPriceExpander(t *testing.T) {
&testPreferredNodeProvider{
preferred: buildNode(4000, 1024*1024*1024),
},
simpleNodeUnfitness,
SimpleNodeUnfitness,
).BestOption(options1b, nodeInfosForGroups).Debug, "ng1")
// Second node group is cheapter
@ -186,7 +186,7 @@ func TestPriceExpander(t *testing.T) {
&testPreferredNodeProvider{
preferred: buildNode(2000, 1024*1024*1024),
},
simpleNodeUnfitness,
SimpleNodeUnfitness,
).BestOption(options, nodeInfosForGroups).Debug, "ng2")
// First group accept 1 pod and second accepts 2.
@ -222,7 +222,7 @@ func TestPriceExpander(t *testing.T) {
&testPreferredNodeProvider{
preferred: buildNode(2000, 1024*1024*1024),
},
simpleNodeUnfitness,
SimpleNodeUnfitness,
).BestOption(options2, nodeInfosForGroups).Debug, "ng2")
// Errors are expected
@ -234,6 +234,6 @@ func TestPriceExpander(t *testing.T) {
&testPreferredNodeProvider{
preferred: buildNode(2000, 1024*1024*1024),
},
simpleNodeUnfitness,
SimpleNodeUnfitness,
).BestOption(options2, nodeInfosForGroups))
}

View File

@ -181,8 +181,10 @@ func run(healthCheck *metrics.HealthCheck) {
}
listerRegistryStopChannel := make(chan struct{})
listerRegistry := kube_util.NewListerRegistryWithDefaultListers(kubeClient, listerRegistryStopChannel)
autoscaler := core.NewAutoscaler(opts, predicateChecker, kubeClient, kubeEventRecorder, listerRegistry)
autoscaler, err := core.NewAutoscaler(opts, predicateChecker, kubeClient, kubeEventRecorder, listerRegistry)
if err != nil {
glog.Fatal("Failed to create autoscaler: %v", err)
}
autoscaler.CleanUp()
registerSignalHandlers(autoscaler)
healthCheck.StartMonitoring()