Node removal latency metrics added

This commit is contained in:
Tetiana Yeremenko 2025-08-28 13:01:51 +00:00
parent 2e528f9d85
commit ef7c53771f
No known key found for this signature in database
12 changed files with 324 additions and 29 deletions

View File

@ -342,6 +342,8 @@ type AutoscalingOptions struct {
ProactiveScaleupEnabled bool
// PodInjectionLimit limits total number of pods while injecting fake pods.
PodInjectionLimit int
// NodeLatencyTrackingEnabled is used to enable/disable node latency tracking.
NodeLatencyTrackingEnabled bool
}
// KubeClientOptions specify options for kube client

View File

@ -227,6 +227,7 @@ var (
enableDynamicResourceAllocation = flag.Bool("enable-dynamic-resource-allocation", false, "Whether logic for handling DRA (Dynamic Resource Allocation) objects is enabled.")
clusterSnapshotParallelism = flag.Int("cluster-snapshot-parallelism", 16, "Maximum parallelism of cluster snapshot creation.")
checkCapacityProcessorInstance = flag.String("check-capacity-processor-instance", "", "Name of the processor instance. Only ProvisioningRequests that define this name in their parameters with the key \"processorInstance\" will be processed by this CA instance. It only refers to check capacity ProvisioningRequests, but if not empty, best-effort atomic ProvisioningRequests processing is disabled in this instance. Not recommended: Until CA 1.35, ProvisioningRequests with this name as prefix in their class will be also processed.")
nodeLatencyTrackingEnabled = flag.Bool("enable-node-latency-tracking", false, "Whether logic for monitoring of node latency is enabled.")
// Deprecated flags
ignoreTaintsFlag = multiStringFlag("ignore-taint", "Specifies a taint to ignore in node templates when considering to scale a node group (Deprecated, use startup-taints instead)")
@ -408,6 +409,7 @@ func createAutoscalingOptions() config.AutoscalingOptions {
NodeInfoCacheExpireTime: *nodeInfoCacheExpireTime,
ProactiveScaleupEnabled: *proactiveScaleupEnabled,
PodInjectionLimit: *podInjectionLimit,
NodeLatencyTrackingEnabled: *nodeLatencyTrackingEnabled,
}
}

View File

@ -27,6 +27,7 @@ import (
"k8s.io/autoscaler/cluster-autoscaler/core/scaledown"
"k8s.io/autoscaler/cluster-autoscaler/core/scaledown/budgets"
"k8s.io/autoscaler/cluster-autoscaler/core/scaledown/deletiontracker"
"k8s.io/autoscaler/cluster-autoscaler/core/scaledown/latencytracker"
"k8s.io/autoscaler/cluster-autoscaler/core/scaledown/pdb"
"k8s.io/autoscaler/cluster-autoscaler/core/scaledown/status"
"k8s.io/autoscaler/cluster-autoscaler/core/utils"
@ -58,6 +59,7 @@ const (
type Actuator struct {
ctx *context.AutoscalingContext
nodeDeletionTracker *deletiontracker.NodeDeletionTracker
nodeLatencyTracker *latencytracker.NodeLatencyTracker
nodeDeletionScheduler *GroupDeletionScheduler
deleteOptions options.NodeDeleteOptions
drainabilityRules rules.Rules
@ -78,7 +80,7 @@ type actuatorNodeGroupConfigGetter interface {
}
// NewActuator returns a new instance of Actuator.
func NewActuator(ctx *context.AutoscalingContext, scaleStateNotifier nodegroupchange.NodeGroupChangeObserver, ndt *deletiontracker.NodeDeletionTracker, deleteOptions options.NodeDeleteOptions, drainabilityRules rules.Rules, configGetter actuatorNodeGroupConfigGetter) *Actuator {
func NewActuator(ctx *context.AutoscalingContext, scaleStateNotifier nodegroupchange.NodeGroupChangeObserver, ndt *deletiontracker.NodeDeletionTracker, nlt *latencytracker.NodeLatencyTracker, deleteOptions options.NodeDeleteOptions, drainabilityRules rules.Rules, configGetter actuatorNodeGroupConfigGetter) *Actuator {
ndb := NewNodeDeletionBatcher(ctx, scaleStateNotifier, ndt, ctx.NodeDeletionBatcherInterval)
legacyFlagDrainConfig := SingleRuleDrainConfig(ctx.MaxGracefulTerminationSec)
var evictor Evictor
@ -90,6 +92,7 @@ func NewActuator(ctx *context.AutoscalingContext, scaleStateNotifier nodegroupch
return &Actuator{
ctx: ctx,
nodeDeletionTracker: ndt,
nodeLatencyTracker: nlt,
nodeDeletionScheduler: NewGroupDeletionScheduler(ctx, ndt, ndb, evictor),
budgetProcessor: budgets.NewScaleDownBudgetProcessor(ctx),
deleteOptions: deleteOptions,
@ -324,6 +327,9 @@ func (a *Actuator) deleteNodesAsync(nodes []*apiv1.Node, nodeGroup cloudprovider
}
for _, node := range nodes {
if a.nodeLatencyTracker != nil {
a.nodeLatencyTracker.ObserveDeletion(node.Name, time.Now())
}
nodeInfo, err := clusterSnapshot.GetNodeInfo(node.Name)
if err != nil {
nodeDeleteResult := status.NodeDeleteResult{ResultType: status.NodeDeleteErrorInternal, Err: errors.NewAutoscalerErrorf(errors.InternalError, "nodeInfos.Get for %q returned error: %v", node.Name, err)}

View File

@ -39,6 +39,7 @@ import (
"k8s.io/autoscaler/cluster-autoscaler/config"
"k8s.io/autoscaler/cluster-autoscaler/core/scaledown/budgets"
"k8s.io/autoscaler/cluster-autoscaler/core/scaledown/deletiontracker"
"k8s.io/autoscaler/cluster-autoscaler/core/scaledown/latencytracker"
"k8s.io/autoscaler/cluster-autoscaler/core/scaledown/status"
. "k8s.io/autoscaler/cluster-autoscaler/core/test"
"k8s.io/autoscaler/cluster-autoscaler/observers/nodegroupchange"
@ -1279,6 +1280,7 @@ func runStartDeletionTest(t *testing.T, tc startDeletionTestCase, force bool) {
nodeDeletionScheduler: NewGroupDeletionScheduler(&ctx, ndt, ndb, evictor),
budgetProcessor: budgets.NewScaleDownBudgetProcessor(&ctx),
configGetter: nodegroupconfig.NewDefaultNodeGroupConfigProcessor(ctx.NodeGroupDefaults),
nodeLatencyTracker: latencytracker.NewNodeLatencyTracker(),
}
var gotResult status.ScaleDownResult
@ -1557,6 +1559,7 @@ func TestStartDeletionInBatchBasic(t *testing.T) {
ctx: &ctx, nodeDeletionTracker: ndt,
nodeDeletionScheduler: NewGroupDeletionScheduler(&ctx, ndt, ndb, evictor),
budgetProcessor: budgets.NewScaleDownBudgetProcessor(&ctx),
nodeLatencyTracker: latencytracker.NewNodeLatencyTracker(),
}
for _, nodes := range deleteNodes {

View File

@ -0,0 +1,134 @@
/*
Copyright 2024 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing,
software distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package latencytracker
import (
"sync"
"testing"
"time"
)
func TestUpdateStateWithUnneededList_AddsNewNodes(t *testing.T) {
tracker := NewNodeLatencyTracker()
now := time.Now()
node := NodeInfo{Name: "node1", UnneededSince: now, Threshold: 5 * time.Minute}
tracker.UpdateStateWithUnneededList([]NodeInfo{node}, now)
tracker.Lock()
defer tracker.Unlock()
if _, ok := tracker.nodes["node1"]; !ok {
t.Errorf("expected node1 to be tracked, but was not")
}
}
func TestUpdateStateWithUnneededList_DoesNotDuplicate(t *testing.T) {
tracker := NewNodeLatencyTracker()
now := time.Now()
node := NodeInfo{Name: "node1", UnneededSince: now, Threshold: 5 * time.Minute}
tracker.UpdateStateWithUnneededList([]NodeInfo{node}, now)
tracker.UpdateStateWithUnneededList([]NodeInfo{node}, now.Add(time.Minute))
tracker.Lock()
defer tracker.Unlock()
if len(tracker.nodes) != 1 {
t.Errorf("expected 1 tracked node, got %d", len(tracker.nodes))
}
}
func TestObserveDeletion_RemovesNode(t *testing.T) {
tracker := NewNodeLatencyTracker()
now := time.Now()
node := NodeInfo{
Name: "node1",
UnneededSince: now.Add(-10 * time.Minute),
Threshold: 5 * time.Minute,
}
tracker.UpdateStateWithUnneededList([]NodeInfo{node}, now)
tracker.ObserveDeletion("node1", now)
tracker.Lock()
defer tracker.Unlock()
if _, ok := tracker.nodes["node1"]; ok {
t.Errorf("expected node1 removed after ObserveDeletion")
}
}
func TestObserveDeletion_NoOpIfNodeNotTracked(t *testing.T) {
tracker := NewNodeLatencyTracker()
now := time.Now()
tracker.ObserveDeletion("node1", now)
tracker.Lock()
defer tracker.Unlock()
if len(tracker.nodes) != 0 {
t.Errorf("expected no nodes tracked, got %d", len(tracker.nodes))
}
}
func TestConcurrentUpdatesAndDeletions(t *testing.T) {
tracker := NewNodeLatencyTracker()
now := time.Now()
node := NodeInfo{
Name: "node1",
UnneededSince: now,
Threshold: 2 * time.Minute,
}
var wg sync.WaitGroup
stop := make(chan struct{})
wg.Add(1)
go func() {
defer wg.Done()
for {
select {
case <-stop:
return
default:
tracker.UpdateStateWithUnneededList([]NodeInfo{node}, time.Now())
}
}
}()
wg.Add(1)
go func() {
defer wg.Done()
for {
select {
case <-stop:
return
default:
tracker.ObserveDeletion("node1", time.Now())
}
}
}()
time.Sleep(50 * time.Millisecond)
close(stop)
wg.Wait()
tracker.Lock()
defer tracker.Unlock()
if len(tracker.nodes) > 1 {
t.Errorf("expected at most 1 tracked node, got %d", len(tracker.nodes))
}
}

View File

@ -0,0 +1,66 @@
package latencytracker
import (
"sync"
"time"
"k8s.io/autoscaler/cluster-autoscaler/metrics"
"k8s.io/klog/v2"
)
type NodeInfo struct {
Name string
UnneededSince time.Time
Threshold time.Duration
}
type NodeLatencyTracker struct {
sync.Mutex
nodes map[string]NodeInfo
}
// NewNodeLatencyTracker creates a new tracker.
func NewNodeLatencyTracker() *NodeLatencyTracker {
return &NodeLatencyTracker{
nodes: make(map[string]NodeInfo),
}
}
func (t *NodeLatencyTracker) UpdateStateWithUnneededList(list []NodeInfo, timestamp time.Time) {
t.Lock()
defer t.Unlock()
currentSet := make(map[string]struct{}, len(list))
for _, info := range list {
currentSet[info.Name] = struct{}{}
_, exists := t.nodes[info.Name]
if !exists {
t.nodes[info.Name] = NodeInfo{
Name: info.Name,
UnneededSince: info.UnneededSince,
Threshold: info.Threshold,
}
klog.V(2).Infof("Started tracking unneeded node %s at %v with ScaleDownUnneededTime=%v",
info.Name, info.UnneededSince, info.Threshold)
}
}
}
// ObserveDeletion is called by the actuator just before node deletion.
func (t *NodeLatencyTracker) ObserveDeletion(nodeName string, timestamp time.Time) {
t.Lock()
defer t.Unlock()
if info, exists := t.nodes[nodeName]; exists {
duration := timestamp.Sub(info.UnneededSince)
klog.V(2).Infof(
"Observing deletion for node %s, unneeded for %s (threshold was %s).",
nodeName, duration, info.Threshold,
)
metrics.UpdateScaleDownNodeDeletionDuration("true", duration-info.Threshold)
delete(t.nodes, nodeName)
}
}

View File

@ -26,6 +26,7 @@ import (
"k8s.io/autoscaler/cluster-autoscaler/context"
"k8s.io/autoscaler/cluster-autoscaler/core/scaledown"
"k8s.io/autoscaler/cluster-autoscaler/core/scaledown/eligibility"
"k8s.io/autoscaler/cluster-autoscaler/core/scaledown/latencytracker"
"k8s.io/autoscaler/cluster-autoscaler/core/scaledown/pdb"
"k8s.io/autoscaler/cluster-autoscaler/core/scaledown/resource"
"k8s.io/autoscaler/cluster-autoscaler/core/scaledown/unneeded"
@ -76,10 +77,11 @@ type Planner struct {
cc controllerReplicasCalculator
scaleDownSetProcessor nodes.ScaleDownSetProcessor
scaleDownContext *nodes.ScaleDownContext
nodeLatencyTracker *latencytracker.NodeLatencyTracker
}
// New creates a new Planner object.
func New(context *context.AutoscalingContext, processors *processors.AutoscalingProcessors, deleteOptions options.NodeDeleteOptions, drainabilityRules rules.Rules) *Planner {
func New(context *context.AutoscalingContext, processors *processors.AutoscalingProcessors, deleteOptions options.NodeDeleteOptions, drainabilityRules rules.Rules, nlt *latencytracker.NodeLatencyTracker) *Planner {
resourceLimitsFinder := resource.NewLimitsFinder(processors.CustomResourcesProcessor)
minUpdateInterval := context.AutoscalingOptions.NodeGroupDefaults.ScaleDownUnneededTime
if minUpdateInterval == 0*time.Nanosecond {
@ -98,6 +100,7 @@ func New(context *context.AutoscalingContext, processors *processors.Autoscaling
scaleDownSetProcessor: processors.ScaleDownSetProcessor,
scaleDownContext: nodes.NewDefaultScaleDownContext(),
minUpdateInterval: minUpdateInterval,
nodeLatencyTracker: nlt,
}
}
@ -301,6 +304,19 @@ func (p *Planner) categorizeNodes(podDestinations map[string]bool, scaleDownCand
}
}
p.unneededNodes.Update(removableList, p.latestUpdate)
if p.nodeLatencyTracker != nil {
var unneededList []latencytracker.NodeInfo
for _, n := range p.unneededNodes.AsList() {
if threshold, ok := p.unneededNodes.GetUnneededTimeForNode(p.context, n.Name); ok {
unneededList = append(unneededList, latencytracker.NodeInfo{
Name: n.Name,
UnneededSince: p.latestUpdate,
Threshold: threshold,
})
}
}
p.nodeLatencyTracker.UpdateStateWithUnneededList(unneededList, p.latestUpdate)
}
if unremovableCount > 0 {
klog.V(1).Infof("%v nodes found to be unremovable in simulation, will re-check them at %v", unremovableCount, unremovableTimeout)
}

View File

@ -32,6 +32,7 @@ import (
"k8s.io/autoscaler/cluster-autoscaler/config"
"k8s.io/autoscaler/cluster-autoscaler/context"
"k8s.io/autoscaler/cluster-autoscaler/core/scaledown/deletiontracker"
"k8s.io/autoscaler/cluster-autoscaler/core/scaledown/latencytracker"
"k8s.io/autoscaler/cluster-autoscaler/core/scaledown/pdb"
"k8s.io/autoscaler/cluster-autoscaler/core/scaledown/status"
"k8s.io/autoscaler/cluster-autoscaler/core/scaledown/unremovable"
@ -500,7 +501,7 @@ func TestUpdateClusterState(t *testing.T) {
assert.NoError(t, err)
clustersnapshot.InitializeClusterSnapshotOrDie(t, context.ClusterSnapshot, tc.nodes, tc.pods)
deleteOptions := options.NodeDeleteOptions{}
p := New(&context, processorstest.NewTestProcessors(&context), deleteOptions, nil)
p := New(&context, processorstest.NewTestProcessors(&context), deleteOptions, nil, latencytracker.NewNodeLatencyTracker())
p.eligibilityChecker = &fakeEligibilityChecker{eligible: asMap(tc.eligible)}
if tc.isSimulationTimeout {
context.AutoscalingOptions.ScaleDownSimulationTimeout = 1 * time.Second
@ -696,7 +697,7 @@ func TestUpdateClusterStatUnneededNodesLimit(t *testing.T) {
assert.NoError(t, err)
clustersnapshot.InitializeClusterSnapshotOrDie(t, context.ClusterSnapshot, nodes, nil)
deleteOptions := options.NodeDeleteOptions{}
p := New(&context, processorstest.NewTestProcessors(&context), deleteOptions, nil)
p := New(&context, processorstest.NewTestProcessors(&context), deleteOptions, nil, latencytracker.NewNodeLatencyTracker())
p.eligibilityChecker = &fakeEligibilityChecker{eligible: asMap(nodeNames(nodes))}
p.minUpdateInterval = tc.updateInterval
p.unneededNodes.Update(previouslyUnneeded, time.Now())
@ -864,7 +865,7 @@ func TestNodesToDelete(t *testing.T) {
assert.NoError(t, err)
clustersnapshot.InitializeClusterSnapshotOrDie(t, context.ClusterSnapshot, allNodes, nil)
deleteOptions := options.NodeDeleteOptions{}
p := New(&context, processorstest.NewTestProcessors(&context), deleteOptions, nil)
p := New(&context, processorstest.NewTestProcessors(&context), deleteOptions, nil, latencytracker.NewNodeLatencyTracker())
p.latestUpdate = time.Now()
p.scaleDownContext.ActuationStatus = deletiontracker.NewNodeDeletionTracker(0 * time.Second)
p.unneededNodes.Update(allRemovables, time.Now().Add(-1*time.Hour))

View File

@ -37,10 +37,11 @@ import (
// Nodes tracks the state of cluster nodes that are not needed.
type Nodes struct {
sdtg scaleDownTimeGetter
limitsFinder *resource.LimitsFinder
cachedList []*apiv1.Node
byName map[string]*node
sdtg scaleDownTimeGetter
limitsFinder *resource.LimitsFinder
cachedList []*apiv1.Node
byName map[string]*node
unneededTimeCache map[string]time.Duration
}
type node struct {
@ -58,8 +59,9 @@ type scaleDownTimeGetter interface {
// NewNodes returns a new initialized Nodes object.
func NewNodes(sdtg scaleDownTimeGetter, limitsFinder *resource.LimitsFinder) *Nodes {
return &Nodes{
sdtg: sdtg,
limitsFinder: limitsFinder,
sdtg: sdtg,
limitsFinder: limitsFinder,
unneededTimeCache: make(map[string]time.Duration),
}
}
@ -141,6 +143,41 @@ func (n *Nodes) RemovableAt(context *context.AutoscalingContext, scaleDownContex
return
}
// GetUnneededTimeForNode returns the unneeded timeout for a given node if tracked.
// Returns (duration, true) if found, otherwise (0, false).
func (n *Nodes) GetUnneededTimeForNode(ctx *context.AutoscalingContext, nodeName string) (time.Duration, bool) {
v, found := n.byName[nodeName]
if !found {
klog.V(4).Infof("Skipping - node %s not found in unneded list", nodeName)
return 0, false
}
node := v.ntbr.Node
nodeGroup, err := ctx.CloudProvider.NodeGroupForNode(node)
if err != nil {
klog.Errorf("Error while getting node group for %s: %v", nodeName, err)
return 0, false
}
if nodeGroup == nil || reflect.ValueOf(nodeGroup).IsNil() {
klog.V(4).Infof("Skipping %s - no node group", nodeName)
return 0, false
}
ngID := nodeGroup.Id()
if cached, ok := n.unneededTimeCache[ngID]; ok {
return cached, true
}
unneededTime, err := n.sdtg.GetScaleDownUnneededTime(nodeGroup)
if err != nil {
klog.Errorf("Error getting ScaleDownUnneededTime for node %s: %v", nodeName, err)
return 0, false
}
n.unneededTimeCache[ngID] = unneededTime
return unneededTime, true
}
func (n *Nodes) unremovableReason(context *context.AutoscalingContext, scaleDownContext nodes.ScaleDownContext, v *node, ts time.Time, nodeGroupSize map[string]int) simulator.UnremovableReason {
node := v.ntbr.Node
// Check if node is marked with no scale down annotation.

View File

@ -30,6 +30,7 @@ import (
"k8s.io/autoscaler/cluster-autoscaler/core/scaledown"
"k8s.io/autoscaler/cluster-autoscaler/core/scaledown/actuation"
"k8s.io/autoscaler/cluster-autoscaler/core/scaledown/deletiontracker"
"k8s.io/autoscaler/cluster-autoscaler/core/scaledown/latencytracker"
"k8s.io/autoscaler/cluster-autoscaler/core/scaledown/pdb"
"k8s.io/autoscaler/cluster-autoscaler/core/scaledown/planner"
scaledownstatus "k8s.io/autoscaler/cluster-autoscaler/core/scaledown/status"
@ -175,11 +176,15 @@ func NewStaticAutoscaler(
// TODO: Populate the ScaleDownActuator/Planner fields in AutoscalingContext
// during the struct creation rather than here.
scaleDownPlanner := planner.New(autoscalingContext, processors, deleteOptions, drainabilityRules)
var nldt *latencytracker.NodeLatencyTracker
if autoscalingContext.AutoscalingOptions.NodeLatencyTrackingEnabled {
nldt = latencytracker.NewNodeLatencyTracker()
}
scaleDownPlanner := planner.New(autoscalingContext, processors, deleteOptions, drainabilityRules, nldt)
processorCallbacks.scaleDownPlanner = scaleDownPlanner
ndt := deletiontracker.NewNodeDeletionTracker(0 * time.Second)
scaleDownActuator := actuation.NewActuator(autoscalingContext, processors.ScaleStateNotifier, ndt, deleteOptions, drainabilityRules, processors.NodeGroupConfigProcessor)
scaleDownActuator := actuation.NewActuator(autoscalingContext, processors.ScaleStateNotifier, ndt, nldt, deleteOptions, drainabilityRules, processors.NodeGroupConfigProcessor)
autoscalingContext.ScaleDownActuator = scaleDownActuator
if scaleUpOrchestrator == nil {

View File

@ -48,6 +48,7 @@ import (
"k8s.io/autoscaler/cluster-autoscaler/core/scaledown"
"k8s.io/autoscaler/cluster-autoscaler/core/scaledown/actuation"
"k8s.io/autoscaler/cluster-autoscaler/core/scaledown/deletiontracker"
"k8s.io/autoscaler/cluster-autoscaler/core/scaledown/latencytracker"
"k8s.io/autoscaler/cluster-autoscaler/core/scaledown/planner"
"k8s.io/autoscaler/cluster-autoscaler/core/scaledown/status"
"k8s.io/autoscaler/cluster-autoscaler/core/scaleup/orchestrator"
@ -164,7 +165,7 @@ func (m *onNodeGroupDeleteMock) Delete(id string) error {
func setUpScaleDownActuator(ctx *context.AutoscalingContext, autoscalingOptions config.AutoscalingOptions) {
deleteOptions := options.NewNodeDeleteOptions(autoscalingOptions)
ctx.ScaleDownActuator = actuation.NewActuator(ctx, nil, deletiontracker.NewNodeDeletionTracker(0*time.Second), deleteOptions, rules.Default(deleteOptions), processorstest.NewTestProcessors(ctx).NodeGroupConfigProcessor)
ctx.ScaleDownActuator = actuation.NewActuator(ctx, nil, deletiontracker.NewNodeDeletionTracker(0*time.Second), latencytracker.NewNodeLatencyTracker(), deleteOptions, rules.Default(deleteOptions), processorstest.NewTestProcessors(ctx).NodeGroupConfigProcessor)
}
type nodeGroup struct {
@ -210,6 +211,7 @@ type commonMocks struct {
podDisruptionBudgetLister *podDisruptionBudgetListerMock
daemonSetLister *daemonSetListerMock
nodeDeletionTracker *deletiontracker.NodeDeletionTracker
nodeLatencyTracker *latencytracker.NodeLatencyTracker
resourceClaimLister *fakeAllObjectsLister[*resourceapi.ResourceClaim]
resourceSliceLister *fakeAllObjectsLister[*resourceapi.ResourceSlice]
@ -320,8 +322,12 @@ func setupAutoscaler(config *autoscalerSetupConfig) (*StaticAutoscaler, error) {
if nodeDeletionTracker == nil {
nodeDeletionTracker = deletiontracker.NewNodeDeletionTracker(0 * time.Second)
}
ctx.ScaleDownActuator = actuation.NewActuator(&ctx, clusterState, nodeDeletionTracker, deleteOptions, drainabilityRules, processors.NodeGroupConfigProcessor)
sdPlanner := planner.New(&ctx, processors, deleteOptions, drainabilityRules)
nodeLatencyTracker := config.mocks.nodeLatencyTracker
if nodeLatencyTracker == nil {
nodeLatencyTracker = latencytracker.NewNodeLatencyTracker()
}
ctx.ScaleDownActuator = actuation.NewActuator(&ctx, clusterState, nodeDeletionTracker, nodeLatencyTracker, deleteOptions, drainabilityRules, processors.NodeGroupConfigProcessor)
sdPlanner := planner.New(&ctx, processors, deleteOptions, drainabilityRules, nodeLatencyTracker)
processorCallbacks.scaleDownPlanner = sdPlanner
@ -409,7 +415,7 @@ func TestStaticAutoscalerRunOnce(t *testing.T) {
}
processors := processorstest.NewTestProcessors(&context)
clusterState := clusterstate.NewClusterStateRegistry(provider, clusterStateConfig, context.LogRecorder, NewBackoff(), nodegroupconfig.NewDefaultNodeGroupConfigProcessor(options.NodeGroupDefaults), processors.AsyncNodeGroupStateChecker)
sdPlanner, sdActuator := newScaleDownPlannerAndActuator(&context, processors, clusterState, nil)
sdPlanner, sdActuator := newScaleDownPlannerAndActuator(&context, processors, clusterState, nil, nil)
suOrchestrator := orchestrator.New()
suOrchestrator.Initialize(&context, processors, clusterState, newEstimatorBuilder(), taints.TaintConfig{})
@ -677,7 +683,7 @@ func TestStaticAutoscalerRunOnceWithScaleDownDelayPerNG(t *testing.T) {
clusterState := clusterstate.NewClusterStateRegistry(provider, clusterStateConfig, context.LogRecorder, NewBackoff(), nodegroupconfig.NewDefaultNodeGroupConfigProcessor(options.NodeGroupDefaults), processors.AsyncNodeGroupStateChecker)
processors.ScaleStateNotifier.Register(clusterState)
sdPlanner, sdActuator := newScaleDownPlannerAndActuator(&context, processors, clusterState, nil)
sdPlanner, sdActuator := newScaleDownPlannerAndActuator(&context, processors, clusterState, nil, nil)
suOrchestrator := orchestrator.New()
suOrchestrator.Initialize(&context, processors, clusterState, newEstimatorBuilder(), taints.TaintConfig{})
@ -821,7 +827,7 @@ func TestStaticAutoscalerRunOnceWithAutoprovisionedEnabled(t *testing.T) {
}
clusterState := clusterstate.NewClusterStateRegistry(provider, clusterStateConfig, context.LogRecorder, NewBackoff(), nodegroupconfig.NewDefaultNodeGroupConfigProcessor(options.NodeGroupDefaults), processors.AsyncNodeGroupStateChecker)
sdPlanner, sdActuator := newScaleDownPlannerAndActuator(&context, processors, clusterState, nil)
sdPlanner, sdActuator := newScaleDownPlannerAndActuator(&context, processors, clusterState, nil, nil)
suOrchestrator := orchestrator.New()
suOrchestrator.Initialize(&context, processors, clusterState, newEstimatorBuilder(), taints.TaintConfig{})
@ -974,7 +980,7 @@ func TestStaticAutoscalerRunOnceWithALongUnregisteredNode(t *testing.T) {
// broken node failed to register in time
clusterState.UpdateNodes(nodes, nil, later)
sdPlanner, sdActuator := newScaleDownPlannerAndActuator(&context, processors, clusterState, nil)
sdPlanner, sdActuator := newScaleDownPlannerAndActuator(&context, processors, clusterState, nil, nil)
suOrchestrator := orchestrator.New()
suOrchestrator.Initialize(&context, processors, clusterState, newEstimatorBuilder(), taints.TaintConfig{})
@ -1129,7 +1135,7 @@ func TestStaticAutoscalerRunOncePodsWithPriorities(t *testing.T) {
processors := processorstest.NewTestProcessors(&context)
clusterState := clusterstate.NewClusterStateRegistry(provider, clusterStateConfig, context.LogRecorder, NewBackoff(), nodegroupconfig.NewDefaultNodeGroupConfigProcessor(options.NodeGroupDefaults), processors.AsyncNodeGroupStateChecker)
sdPlanner, sdActuator := newScaleDownPlannerAndActuator(&context, processors, clusterState, nil)
sdPlanner, sdActuator := newScaleDownPlannerAndActuator(&context, processors, clusterState, nil, nil)
suOrchestrator := orchestrator.New()
suOrchestrator.Initialize(&context, processors, clusterState, newEstimatorBuilder(), taints.TaintConfig{})
@ -1260,7 +1266,7 @@ func TestStaticAutoscalerRunOnceWithFilteringOnBinPackingEstimator(t *testing.T)
processors := processorstest.NewTestProcessors(&context)
clusterState := clusterstate.NewClusterStateRegistry(provider, clusterStateConfig, context.LogRecorder, NewBackoff(), nodegroupconfig.NewDefaultNodeGroupConfigProcessor(options.NodeGroupDefaults), processors.AsyncNodeGroupStateChecker)
sdPlanner, sdActuator := newScaleDownPlannerAndActuator(&context, processors, clusterState, nil)
sdPlanner, sdActuator := newScaleDownPlannerAndActuator(&context, processors, clusterState, nil, nil)
autoscaler := &StaticAutoscaler{
AutoscalingContext: &context,
@ -1358,7 +1364,7 @@ func TestStaticAutoscalerRunOnceWithFilteringOnUpcomingNodesEnabledNoScaleUp(t *
processors := processorstest.NewTestProcessors(&context)
clusterState := clusterstate.NewClusterStateRegistry(provider, clusterStateConfig, context.LogRecorder, NewBackoff(), nodegroupconfig.NewDefaultNodeGroupConfigProcessor(options.NodeGroupDefaults), processors.AsyncNodeGroupStateChecker)
sdPlanner, sdActuator := newScaleDownPlannerAndActuator(&context, processors, clusterState, nil)
sdPlanner, sdActuator := newScaleDownPlannerAndActuator(&context, processors, clusterState, nil, nil)
autoscaler := &StaticAutoscaler{
AutoscalingContext: &context,
@ -2260,7 +2266,7 @@ func TestStaticAutoscalerUpcomingScaleDownCandidates(t *testing.T) {
csr := clusterstate.NewClusterStateRegistry(provider, csrConfig, ctx.LogRecorder, NewBackoff(), nodegroupconfig.NewDefaultNodeGroupConfigProcessor(config.NodeGroupAutoscalingOptions{MaxNodeProvisionTime: 15 * time.Minute}), processors.AsyncNodeGroupStateChecker)
// Setting the Actuator is necessary for testing any scale-down logic, it shouldn't have anything to do in this test.
actuator := actuation.NewActuator(&ctx, csr, deletiontracker.NewNodeDeletionTracker(0*time.Second), options.NodeDeleteOptions{}, nil, processorstest.NewTestProcessors(&ctx).NodeGroupConfigProcessor)
actuator := actuation.NewActuator(&ctx, csr, deletiontracker.NewNodeDeletionTracker(0*time.Second), latencytracker.NewNodeLatencyTracker(), options.NodeDeleteOptions{}, nil, processorstest.NewTestProcessors(&ctx).NodeGroupConfigProcessor)
ctx.ScaleDownActuator = actuator
// Fake planner that keeps track of the scale-down candidates passed to UpdateClusterState.
@ -2921,7 +2927,7 @@ func waitForDeleteToFinish(t *testing.T, deleteFinished <-chan bool) {
}
}
func newScaleDownPlannerAndActuator(ctx *context.AutoscalingContext, p *ca_processors.AutoscalingProcessors, cs *clusterstate.ClusterStateRegistry, nodeDeletionTracker *deletiontracker.NodeDeletionTracker) (scaledown.Planner, scaledown.Actuator) {
func newScaleDownPlannerAndActuator(ctx *context.AutoscalingContext, p *ca_processors.AutoscalingProcessors, cs *clusterstate.ClusterStateRegistry, nodeDeletionTracker *deletiontracker.NodeDeletionTracker, nodeLatencyTracker *latencytracker.NodeLatencyTracker) (scaledown.Planner, scaledown.Actuator) {
ctx.MaxScaleDownParallelism = 10
ctx.MaxDrainParallelism = 1
ctx.NodeDeletionBatcherInterval = 0 * time.Second
@ -2936,8 +2942,11 @@ func newScaleDownPlannerAndActuator(ctx *context.AutoscalingContext, p *ca_proce
if nodeDeletionTracker == nil {
nodeDeletionTracker = deletiontracker.NewNodeDeletionTracker(0 * time.Second)
}
planner := planner.New(ctx, p, deleteOptions, nil)
actuator := actuation.NewActuator(ctx, cs, nodeDeletionTracker, deleteOptions, nil, p.NodeGroupConfigProcessor)
if nodeLatencyTracker == nil {
nodeLatencyTracker = latencytracker.NewNodeLatencyTracker()
}
planner := planner.New(ctx, p, deleteOptions, nil, nodeLatencyTracker)
actuator := actuation.NewActuator(ctx, cs, nodeDeletionTracker, nodeLatencyTracker, deleteOptions, nil, p.NodeGroupConfigProcessor)
return planner, actuator
}
@ -3053,13 +3062,13 @@ func buildStaticAutoscaler(t *testing.T, provider cloudprovider.CloudProvider, a
processors.ScaleDownNodeProcessor = cp
csr := clusterstate.NewClusterStateRegistry(provider, clusterstate.ClusterStateRegistryConfig{OkTotalUnreadyCount: 1}, ctx.LogRecorder, NewBackoff(), nodegroupconfig.NewDefaultNodeGroupConfigProcessor(config.NodeGroupAutoscalingOptions{MaxNodeProvisionTime: 15 * time.Minute}), processors.AsyncNodeGroupStateChecker)
actuator := actuation.NewActuator(&ctx, csr, deletiontracker.NewNodeDeletionTracker(0*time.Second), options.NodeDeleteOptions{}, nil, processors.NodeGroupConfigProcessor)
actuator := actuation.NewActuator(&ctx, csr, deletiontracker.NewNodeDeletionTracker(0*time.Second), latencytracker.NewNodeLatencyTracker(), options.NodeDeleteOptions{}, nil, processors.NodeGroupConfigProcessor)
ctx.ScaleDownActuator = actuator
deleteOptions := options.NewNodeDeleteOptions(ctx.AutoscalingOptions)
drainabilityRules := rules.Default(deleteOptions)
sdPlanner := planner.New(&ctx, processors, deleteOptions, drainabilityRules)
sdPlanner := planner.New(&ctx, processors, deleteOptions, drainabilityRules, latencytracker.NewNodeLatencyTracker())
autoscaler := &StaticAutoscaler{
AutoscalingContext: &ctx,

View File

@ -427,6 +427,15 @@ var (
Buckets: k8smetrics.ExponentialBuckets(1, 2, 6), // 1, 2, 4, ..., 32
}, []string{"instance_type", "cpu_count", "namespace_count"},
)
scaleDownNodeDeletionDuration = k8smetrics.NewHistogramVec(
&k8smetrics.HistogramOpts{
Namespace: caNamespace,
Name: "node_deletion_duration_seconds",
Help: "Latency from planning (node marked) to final outcome (deleted, aborted, rescued).",
Buckets: k8smetrics.ExponentialBuckets(10, 2, 12),
}, []string{"deleted"},
)
)
// RegisterAll registers all metrics.
@ -463,6 +472,7 @@ func RegisterAll(emitPerNodeGroupMetrics bool) {
legacyregistry.MustRegister(nodeTaintsCount)
legacyregistry.MustRegister(inconsistentInstancesMigsCount)
legacyregistry.MustRegister(binpackingHeterogeneity)
legacyregistry.MustRegister(scaleDownNodeDeletionDuration)
if emitPerNodeGroupMetrics {
legacyregistry.MustRegister(nodesGroupMinNodes)
@ -750,3 +760,7 @@ func UpdateInconsistentInstancesMigsCount(migCount int) {
func ObserveBinpackingHeterogeneity(instanceType, cpuCount, namespaceCount string, pegCount int) {
binpackingHeterogeneity.WithLabelValues(instanceType, cpuCount, namespaceCount).Observe(float64(pegCount))
}
func UpdateScaleDownNodeDeletionDuration(deleted string, duration time.Duration) {
scaleDownNodeDeletionDuration.WithLabelValues(deleted).Observe(duration.Seconds())
}