Add scale down candidates observer
This commit is contained in:
parent
0c2275d7c4
commit
97159df69b
|
|
@ -163,6 +163,7 @@ func NewStaticAutoscaler(
|
|||
}
|
||||
|
||||
clusterStateRegistry := clusterstate.NewClusterStateRegistry(autoscalingContext.CloudProvider, clusterStateConfig, autoscalingContext.LogRecorder, backoff)
|
||||
processors.ScaleDownCandidatesNotifier.Register(clusterStateRegistry)
|
||||
|
||||
deleteOptions := simulator.NodeDeleteOptions{
|
||||
SkipNodesWithSystemPods: opts.SkipNodesWithSystemPods,
|
||||
|
|
@ -571,7 +572,7 @@ func (a *StaticAutoscaler) RunOnce(currentTime time.Time) errors.AutoscalerError
|
|||
typedErr := a.scaleDownPlanner.UpdateClusterState(podDestinations, scaleDownCandidates, actuationStatus, pdbs, currentTime)
|
||||
// Update clusterStateRegistry and metrics regardless of whether ScaleDown was successful or not.
|
||||
unneededNodes := a.scaleDownPlanner.UnneededNodes()
|
||||
a.clusterStateRegistry.UpdateScaleDownCandidates(unneededNodes, currentTime)
|
||||
a.processors.ScaleDownCandidatesNotifier.Update(unneededNodes, currentTime)
|
||||
metrics.UpdateUnneededNodesCount(len(unneededNodes))
|
||||
if typedErr != nil {
|
||||
scaleDownStatus.Result = scaledownstatus.ScaleDownError
|
||||
|
|
|
|||
|
|
@ -45,6 +45,7 @@ import (
|
|||
"k8s.io/autoscaler/cluster-autoscaler/processors/nodeinfos"
|
||||
"k8s.io/autoscaler/cluster-autoscaler/processors/nodeinfosprovider"
|
||||
"k8s.io/autoscaler/cluster-autoscaler/processors/nodes"
|
||||
"k8s.io/autoscaler/cluster-autoscaler/processors/scaledowncandidates"
|
||||
"k8s.io/autoscaler/cluster-autoscaler/processors/status"
|
||||
"k8s.io/autoscaler/cluster-autoscaler/simulator/clustersnapshot"
|
||||
"k8s.io/autoscaler/cluster-autoscaler/simulator/predicatechecker"
|
||||
|
|
@ -143,15 +144,16 @@ func NewTestProcessors(context *context.AutoscalingContext) *processors.Autoscal
|
|||
NodeGroupSetProcessor: nodegroupset.NewDefaultNodeGroupSetProcessor([]string{}, config.NodeGroupDifferenceRatios{}),
|
||||
ScaleDownSetProcessor: nodes.NewPostFilteringScaleDownNodeProcessor(),
|
||||
// TODO(bskiba): change scale up test so that this can be a NoOpProcessor
|
||||
ScaleUpStatusProcessor: &status.EventingScaleUpStatusProcessor{},
|
||||
ScaleDownStatusProcessor: &status.NoOpScaleDownStatusProcessor{},
|
||||
AutoscalingStatusProcessor: &status.NoOpAutoscalingStatusProcessor{},
|
||||
NodeGroupManager: nodegroups.NewDefaultNodeGroupManager(),
|
||||
NodeInfoProcessor: nodeinfos.NewDefaultNodeInfoProcessor(),
|
||||
TemplateNodeInfoProvider: nodeinfosprovider.NewDefaultTemplateNodeInfoProvider(nil),
|
||||
NodeGroupConfigProcessor: nodegroupconfig.NewDefaultNodeGroupConfigProcessor(),
|
||||
CustomResourcesProcessor: customresources.NewDefaultCustomResourcesProcessor(),
|
||||
ActionableClusterProcessor: actionablecluster.NewDefaultActionableClusterProcessor(),
|
||||
ScaleUpStatusProcessor: &status.EventingScaleUpStatusProcessor{},
|
||||
ScaleDownStatusProcessor: &status.NoOpScaleDownStatusProcessor{},
|
||||
AutoscalingStatusProcessor: &status.NoOpAutoscalingStatusProcessor{},
|
||||
NodeGroupManager: nodegroups.NewDefaultNodeGroupManager(),
|
||||
NodeInfoProcessor: nodeinfos.NewDefaultNodeInfoProcessor(),
|
||||
TemplateNodeInfoProvider: nodeinfosprovider.NewDefaultTemplateNodeInfoProvider(nil),
|
||||
NodeGroupConfigProcessor: nodegroupconfig.NewDefaultNodeGroupConfigProcessor(),
|
||||
CustomResourcesProcessor: customresources.NewDefaultCustomResourcesProcessor(),
|
||||
ActionableClusterProcessor: actionablecluster.NewDefaultActionableClusterProcessor(),
|
||||
ScaleDownCandidatesNotifier: scaledowncandidates.NewObserversList(),
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -27,6 +27,7 @@ import (
|
|||
"k8s.io/autoscaler/cluster-autoscaler/processors/nodeinfosprovider"
|
||||
"k8s.io/autoscaler/cluster-autoscaler/processors/nodes"
|
||||
"k8s.io/autoscaler/cluster-autoscaler/processors/pods"
|
||||
"k8s.io/autoscaler/cluster-autoscaler/processors/scaledowncandidates"
|
||||
"k8s.io/autoscaler/cluster-autoscaler/processors/status"
|
||||
)
|
||||
|
||||
|
|
@ -61,6 +62,8 @@ type AutoscalingProcessors struct {
|
|||
CustomResourcesProcessor customresources.CustomResourcesProcessor
|
||||
// ActionableClusterProcessor is interface defining whether the cluster is in an actionable state
|
||||
ActionableClusterProcessor actionablecluster.ActionableClusterProcessor
|
||||
// ScaleDownCandidatesNotifier is used to Update and Register new scale down candidates observer.
|
||||
ScaleDownCandidatesNotifier *scaledowncandidates.ObserversList
|
||||
}
|
||||
|
||||
// DefaultProcessors returns default set of processors.
|
||||
|
|
@ -73,17 +76,18 @@ func DefaultProcessors() *AutoscalingProcessors {
|
|||
MaxCapacityMemoryDifferenceRatio: config.DefaultMaxCapacityMemoryDifferenceRatio,
|
||||
MaxFreeDifferenceRatio: config.DefaultMaxFreeDifferenceRatio,
|
||||
}),
|
||||
ScaleUpStatusProcessor: status.NewDefaultScaleUpStatusProcessor(),
|
||||
ScaleDownNodeProcessor: nodes.NewPreFilteringScaleDownNodeProcessor(),
|
||||
ScaleDownSetProcessor: nodes.NewPostFilteringScaleDownNodeProcessor(),
|
||||
ScaleDownStatusProcessor: status.NewDefaultScaleDownStatusProcessor(),
|
||||
AutoscalingStatusProcessor: status.NewDefaultAutoscalingStatusProcessor(),
|
||||
NodeGroupManager: nodegroups.NewDefaultNodeGroupManager(),
|
||||
NodeInfoProcessor: nodeinfos.NewDefaultNodeInfoProcessor(),
|
||||
NodeGroupConfigProcessor: nodegroupconfig.NewDefaultNodeGroupConfigProcessor(),
|
||||
CustomResourcesProcessor: customresources.NewDefaultCustomResourcesProcessor(),
|
||||
ActionableClusterProcessor: actionablecluster.NewDefaultActionableClusterProcessor(),
|
||||
TemplateNodeInfoProvider: nodeinfosprovider.NewDefaultTemplateNodeInfoProvider(nil),
|
||||
ScaleUpStatusProcessor: status.NewDefaultScaleUpStatusProcessor(),
|
||||
ScaleDownNodeProcessor: nodes.NewPreFilteringScaleDownNodeProcessor(),
|
||||
ScaleDownSetProcessor: nodes.NewPostFilteringScaleDownNodeProcessor(),
|
||||
ScaleDownStatusProcessor: status.NewDefaultScaleDownStatusProcessor(),
|
||||
AutoscalingStatusProcessor: status.NewDefaultAutoscalingStatusProcessor(),
|
||||
NodeGroupManager: nodegroups.NewDefaultNodeGroupManager(),
|
||||
NodeInfoProcessor: nodeinfos.NewDefaultNodeInfoProcessor(),
|
||||
NodeGroupConfigProcessor: nodegroupconfig.NewDefaultNodeGroupConfigProcessor(),
|
||||
CustomResourcesProcessor: customresources.NewDefaultCustomResourcesProcessor(),
|
||||
ActionableClusterProcessor: actionablecluster.NewDefaultActionableClusterProcessor(),
|
||||
TemplateNodeInfoProvider: nodeinfosprovider.NewDefaultTemplateNodeInfoProvider(nil),
|
||||
ScaleDownCandidatesNotifier: scaledowncandidates.NewObserversList(),
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -0,0 +1,51 @@
|
|||
/*
|
||||
Copyright 2023 The Kubernetes Authors.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package scaledowncandidates
|
||||
|
||||
import (
|
||||
"time"
|
||||
|
||||
apiv1 "k8s.io/api/core/v1"
|
||||
)
|
||||
|
||||
// Observer is an observer of scale down candidates
|
||||
type Observer interface {
|
||||
// UpdateScaleDownCandidates updates scale down candidates.
|
||||
UpdateScaleDownCandidates([]*apiv1.Node, time.Time)
|
||||
}
|
||||
|
||||
// ObserversList is a slice of observers of scale down candidates
|
||||
type ObserversList struct {
|
||||
observers []Observer
|
||||
}
|
||||
|
||||
// Register adds new observer to the list.
|
||||
func (l *ObserversList) Register(o Observer) {
|
||||
l.observers = append(l.observers, o)
|
||||
}
|
||||
|
||||
// Update updates scale down candidates for each observer.
|
||||
func (l *ObserversList) Update(nodes []*apiv1.Node, now time.Time) {
|
||||
for _, observer := range l.observers {
|
||||
observer.UpdateScaleDownCandidates(nodes, now)
|
||||
}
|
||||
}
|
||||
|
||||
// NewObserversList return empty list of observers.
|
||||
func NewObserversList() *ObserversList {
|
||||
return &ObserversList{}
|
||||
}
|
||||
Loading…
Reference in New Issue