diff --git a/cluster-autoscaler/core/static_autoscaler.go b/cluster-autoscaler/core/static_autoscaler.go index afb860c5a1..08870a1d85 100644 --- a/cluster-autoscaler/core/static_autoscaler.go +++ b/cluster-autoscaler/core/static_autoscaler.go @@ -163,6 +163,7 @@ func NewStaticAutoscaler( } clusterStateRegistry := clusterstate.NewClusterStateRegistry(autoscalingContext.CloudProvider, clusterStateConfig, autoscalingContext.LogRecorder, backoff) + processors.ScaleDownCandidatesNotifier.Register(clusterStateRegistry) deleteOptions := simulator.NodeDeleteOptions{ SkipNodesWithSystemPods: opts.SkipNodesWithSystemPods, @@ -571,7 +572,7 @@ func (a *StaticAutoscaler) RunOnce(currentTime time.Time) errors.AutoscalerError typedErr := a.scaleDownPlanner.UpdateClusterState(podDestinations, scaleDownCandidates, actuationStatus, pdbs, currentTime) // Update clusterStateRegistry and metrics regardless of whether ScaleDown was successful or not. unneededNodes := a.scaleDownPlanner.UnneededNodes() - a.clusterStateRegistry.UpdateScaleDownCandidates(unneededNodes, currentTime) + a.processors.ScaleDownCandidatesNotifier.Update(unneededNodes, currentTime) metrics.UpdateUnneededNodesCount(len(unneededNodes)) if typedErr != nil { scaleDownStatus.Result = scaledownstatus.ScaleDownError diff --git a/cluster-autoscaler/core/test/common.go b/cluster-autoscaler/core/test/common.go index 6686945608..8a57ec508d 100644 --- a/cluster-autoscaler/core/test/common.go +++ b/cluster-autoscaler/core/test/common.go @@ -45,6 +45,7 @@ import ( "k8s.io/autoscaler/cluster-autoscaler/processors/nodeinfos" "k8s.io/autoscaler/cluster-autoscaler/processors/nodeinfosprovider" "k8s.io/autoscaler/cluster-autoscaler/processors/nodes" + "k8s.io/autoscaler/cluster-autoscaler/processors/scaledowncandidates" "k8s.io/autoscaler/cluster-autoscaler/processors/status" "k8s.io/autoscaler/cluster-autoscaler/simulator/clustersnapshot" "k8s.io/autoscaler/cluster-autoscaler/simulator/predicatechecker" @@ -143,15 +144,16 @@ func NewTestProcessors(context *context.AutoscalingContext) *processors.Autoscal NodeGroupSetProcessor: nodegroupset.NewDefaultNodeGroupSetProcessor([]string{}, config.NodeGroupDifferenceRatios{}), ScaleDownSetProcessor: nodes.NewPostFilteringScaleDownNodeProcessor(), // TODO(bskiba): change scale up test so that this can be a NoOpProcessor - ScaleUpStatusProcessor: &status.EventingScaleUpStatusProcessor{}, - ScaleDownStatusProcessor: &status.NoOpScaleDownStatusProcessor{}, - AutoscalingStatusProcessor: &status.NoOpAutoscalingStatusProcessor{}, - NodeGroupManager: nodegroups.NewDefaultNodeGroupManager(), - NodeInfoProcessor: nodeinfos.NewDefaultNodeInfoProcessor(), - TemplateNodeInfoProvider: nodeinfosprovider.NewDefaultTemplateNodeInfoProvider(nil), - NodeGroupConfigProcessor: nodegroupconfig.NewDefaultNodeGroupConfigProcessor(), - CustomResourcesProcessor: customresources.NewDefaultCustomResourcesProcessor(), - ActionableClusterProcessor: actionablecluster.NewDefaultActionableClusterProcessor(), + ScaleUpStatusProcessor: &status.EventingScaleUpStatusProcessor{}, + ScaleDownStatusProcessor: &status.NoOpScaleDownStatusProcessor{}, + AutoscalingStatusProcessor: &status.NoOpAutoscalingStatusProcessor{}, + NodeGroupManager: nodegroups.NewDefaultNodeGroupManager(), + NodeInfoProcessor: nodeinfos.NewDefaultNodeInfoProcessor(), + TemplateNodeInfoProvider: nodeinfosprovider.NewDefaultTemplateNodeInfoProvider(nil), + NodeGroupConfigProcessor: nodegroupconfig.NewDefaultNodeGroupConfigProcessor(), + CustomResourcesProcessor: customresources.NewDefaultCustomResourcesProcessor(), + ActionableClusterProcessor: actionablecluster.NewDefaultActionableClusterProcessor(), + ScaleDownCandidatesNotifier: scaledowncandidates.NewObserversList(), } } diff --git a/cluster-autoscaler/processors/processors.go b/cluster-autoscaler/processors/processors.go index 79ba3456c1..b6a396098d 100644 --- a/cluster-autoscaler/processors/processors.go +++ b/cluster-autoscaler/processors/processors.go @@ -27,6 +27,7 @@ import ( "k8s.io/autoscaler/cluster-autoscaler/processors/nodeinfosprovider" "k8s.io/autoscaler/cluster-autoscaler/processors/nodes" "k8s.io/autoscaler/cluster-autoscaler/processors/pods" + "k8s.io/autoscaler/cluster-autoscaler/processors/scaledowncandidates" "k8s.io/autoscaler/cluster-autoscaler/processors/status" ) @@ -61,6 +62,8 @@ type AutoscalingProcessors struct { CustomResourcesProcessor customresources.CustomResourcesProcessor // ActionableClusterProcessor is interface defining whether the cluster is in an actionable state ActionableClusterProcessor actionablecluster.ActionableClusterProcessor + // ScaleDownCandidatesNotifier is used to Update and Register new scale down candidates observer. + ScaleDownCandidatesNotifier *scaledowncandidates.ObserversList } // DefaultProcessors returns default set of processors. @@ -73,17 +76,18 @@ func DefaultProcessors() *AutoscalingProcessors { MaxCapacityMemoryDifferenceRatio: config.DefaultMaxCapacityMemoryDifferenceRatio, MaxFreeDifferenceRatio: config.DefaultMaxFreeDifferenceRatio, }), - ScaleUpStatusProcessor: status.NewDefaultScaleUpStatusProcessor(), - ScaleDownNodeProcessor: nodes.NewPreFilteringScaleDownNodeProcessor(), - ScaleDownSetProcessor: nodes.NewPostFilteringScaleDownNodeProcessor(), - ScaleDownStatusProcessor: status.NewDefaultScaleDownStatusProcessor(), - AutoscalingStatusProcessor: status.NewDefaultAutoscalingStatusProcessor(), - NodeGroupManager: nodegroups.NewDefaultNodeGroupManager(), - NodeInfoProcessor: nodeinfos.NewDefaultNodeInfoProcessor(), - NodeGroupConfigProcessor: nodegroupconfig.NewDefaultNodeGroupConfigProcessor(), - CustomResourcesProcessor: customresources.NewDefaultCustomResourcesProcessor(), - ActionableClusterProcessor: actionablecluster.NewDefaultActionableClusterProcessor(), - TemplateNodeInfoProvider: nodeinfosprovider.NewDefaultTemplateNodeInfoProvider(nil), + ScaleUpStatusProcessor: status.NewDefaultScaleUpStatusProcessor(), + ScaleDownNodeProcessor: nodes.NewPreFilteringScaleDownNodeProcessor(), + ScaleDownSetProcessor: nodes.NewPostFilteringScaleDownNodeProcessor(), + ScaleDownStatusProcessor: status.NewDefaultScaleDownStatusProcessor(), + AutoscalingStatusProcessor: status.NewDefaultAutoscalingStatusProcessor(), + NodeGroupManager: nodegroups.NewDefaultNodeGroupManager(), + NodeInfoProcessor: nodeinfos.NewDefaultNodeInfoProcessor(), + NodeGroupConfigProcessor: nodegroupconfig.NewDefaultNodeGroupConfigProcessor(), + CustomResourcesProcessor: customresources.NewDefaultCustomResourcesProcessor(), + ActionableClusterProcessor: actionablecluster.NewDefaultActionableClusterProcessor(), + TemplateNodeInfoProvider: nodeinfosprovider.NewDefaultTemplateNodeInfoProvider(nil), + ScaleDownCandidatesNotifier: scaledowncandidates.NewObserversList(), } } diff --git a/cluster-autoscaler/processors/scaledowncandidates/scale_down_candidates_observer.go b/cluster-autoscaler/processors/scaledowncandidates/scale_down_candidates_observer.go new file mode 100644 index 0000000000..34f14a8797 --- /dev/null +++ b/cluster-autoscaler/processors/scaledowncandidates/scale_down_candidates_observer.go @@ -0,0 +1,51 @@ +/* +Copyright 2023 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package scaledowncandidates + +import ( + "time" + + apiv1 "k8s.io/api/core/v1" +) + +// Observer is an observer of scale down candidates +type Observer interface { + // UpdateScaleDownCandidates updates scale down candidates. + UpdateScaleDownCandidates([]*apiv1.Node, time.Time) +} + +// ObserversList is a slice of observers of scale down candidates +type ObserversList struct { + observers []Observer +} + +// Register adds new observer to the list. +func (l *ObserversList) Register(o Observer) { + l.observers = append(l.observers, o) +} + +// Update updates scale down candidates for each observer. +func (l *ObserversList) Update(nodes []*apiv1.Node, now time.Time) { + for _, observer := range l.observers { + observer.UpdateScaleDownCandidates(nodes, now) + } +} + +// NewObserversList return empty list of observers. +func NewObserversList() *ObserversList { + return &ObserversList{} +}