diff --git a/cluster-autoscaler/config/autoscaling_options.go b/cluster-autoscaler/config/autoscaling_options.go index 36b47a5bfe..a1f9c087ad 100644 --- a/cluster-autoscaler/config/autoscaling_options.go +++ b/cluster-autoscaler/config/autoscaling_options.go @@ -342,6 +342,8 @@ type AutoscalingOptions struct { ProactiveScaleupEnabled bool // PodInjectionLimit limits total number of pods while injecting fake pods. PodInjectionLimit int + // NodeLatencyTrackingEnabled is used to enable/disable node latency tracking. + NodeLatencyTrackingEnabled bool } // KubeClientOptions specify options for kube client diff --git a/cluster-autoscaler/config/flags/flags.go b/cluster-autoscaler/config/flags/flags.go index 1ad17c4044..3c481b5714 100644 --- a/cluster-autoscaler/config/flags/flags.go +++ b/cluster-autoscaler/config/flags/flags.go @@ -227,6 +227,7 @@ var ( enableDynamicResourceAllocation = flag.Bool("enable-dynamic-resource-allocation", false, "Whether logic for handling DRA (Dynamic Resource Allocation) objects is enabled.") clusterSnapshotParallelism = flag.Int("cluster-snapshot-parallelism", 16, "Maximum parallelism of cluster snapshot creation.") checkCapacityProcessorInstance = flag.String("check-capacity-processor-instance", "", "Name of the processor instance. Only ProvisioningRequests that define this name in their parameters with the key \"processorInstance\" will be processed by this CA instance. It only refers to check capacity ProvisioningRequests, but if not empty, best-effort atomic ProvisioningRequests processing is disabled in this instance. Not recommended: Until CA 1.35, ProvisioningRequests with this name as prefix in their class will be also processed.") + nodeLatencyTrackingEnabled = flag.Bool("enable-node-latency-tracking", false, "Whether logic for monitoring of node latency is enabled.") // Deprecated flags ignoreTaintsFlag = multiStringFlag("ignore-taint", "Specifies a taint to ignore in node templates when considering to scale a node group (Deprecated, use startup-taints instead)") @@ -408,6 +409,7 @@ func createAutoscalingOptions() config.AutoscalingOptions { NodeInfoCacheExpireTime: *nodeInfoCacheExpireTime, ProactiveScaleupEnabled: *proactiveScaleupEnabled, PodInjectionLimit: *podInjectionLimit, + NodeLatencyTrackingEnabled: *nodeLatencyTrackingEnabled, } } diff --git a/cluster-autoscaler/core/scaledown/actuation/actuator.go b/cluster-autoscaler/core/scaledown/actuation/actuator.go index 55ef2e5a8f..462db0823d 100644 --- a/cluster-autoscaler/core/scaledown/actuation/actuator.go +++ b/cluster-autoscaler/core/scaledown/actuation/actuator.go @@ -27,6 +27,7 @@ import ( "k8s.io/autoscaler/cluster-autoscaler/core/scaledown" "k8s.io/autoscaler/cluster-autoscaler/core/scaledown/budgets" "k8s.io/autoscaler/cluster-autoscaler/core/scaledown/deletiontracker" + "k8s.io/autoscaler/cluster-autoscaler/core/scaledown/latencytracker" "k8s.io/autoscaler/cluster-autoscaler/core/scaledown/pdb" "k8s.io/autoscaler/cluster-autoscaler/core/scaledown/status" "k8s.io/autoscaler/cluster-autoscaler/core/utils" @@ -58,6 +59,7 @@ const ( type Actuator struct { ctx *context.AutoscalingContext nodeDeletionTracker *deletiontracker.NodeDeletionTracker + nodeLatencyTracker *latencytracker.NodeLatencyTracker nodeDeletionScheduler *GroupDeletionScheduler deleteOptions options.NodeDeleteOptions drainabilityRules rules.Rules @@ -78,7 +80,7 @@ type actuatorNodeGroupConfigGetter interface { } // NewActuator returns a new instance of Actuator. -func NewActuator(ctx *context.AutoscalingContext, scaleStateNotifier nodegroupchange.NodeGroupChangeObserver, ndt *deletiontracker.NodeDeletionTracker, deleteOptions options.NodeDeleteOptions, drainabilityRules rules.Rules, configGetter actuatorNodeGroupConfigGetter) *Actuator { +func NewActuator(ctx *context.AutoscalingContext, scaleStateNotifier nodegroupchange.NodeGroupChangeObserver, ndt *deletiontracker.NodeDeletionTracker, nlt *latencytracker.NodeLatencyTracker, deleteOptions options.NodeDeleteOptions, drainabilityRules rules.Rules, configGetter actuatorNodeGroupConfigGetter) *Actuator { ndb := NewNodeDeletionBatcher(ctx, scaleStateNotifier, ndt, ctx.NodeDeletionBatcherInterval) legacyFlagDrainConfig := SingleRuleDrainConfig(ctx.MaxGracefulTerminationSec) var evictor Evictor @@ -90,6 +92,7 @@ func NewActuator(ctx *context.AutoscalingContext, scaleStateNotifier nodegroupch return &Actuator{ ctx: ctx, nodeDeletionTracker: ndt, + nodeLatencyTracker: nlt, nodeDeletionScheduler: NewGroupDeletionScheduler(ctx, ndt, ndb, evictor), budgetProcessor: budgets.NewScaleDownBudgetProcessor(ctx), deleteOptions: deleteOptions, @@ -324,6 +327,9 @@ func (a *Actuator) deleteNodesAsync(nodes []*apiv1.Node, nodeGroup cloudprovider } for _, node := range nodes { + if a.nodeLatencyTracker != nil { + a.nodeLatencyTracker.ObserveDeletion(node.Name, time.Now()) + } nodeInfo, err := clusterSnapshot.GetNodeInfo(node.Name) if err != nil { nodeDeleteResult := status.NodeDeleteResult{ResultType: status.NodeDeleteErrorInternal, Err: errors.NewAutoscalerErrorf(errors.InternalError, "nodeInfos.Get for %q returned error: %v", node.Name, err)} diff --git a/cluster-autoscaler/core/scaledown/actuation/actuator_test.go b/cluster-autoscaler/core/scaledown/actuation/actuator_test.go index c2b6788f62..b0e48898a5 100644 --- a/cluster-autoscaler/core/scaledown/actuation/actuator_test.go +++ b/cluster-autoscaler/core/scaledown/actuation/actuator_test.go @@ -39,6 +39,7 @@ import ( "k8s.io/autoscaler/cluster-autoscaler/config" "k8s.io/autoscaler/cluster-autoscaler/core/scaledown/budgets" "k8s.io/autoscaler/cluster-autoscaler/core/scaledown/deletiontracker" + "k8s.io/autoscaler/cluster-autoscaler/core/scaledown/latencytracker" "k8s.io/autoscaler/cluster-autoscaler/core/scaledown/status" . "k8s.io/autoscaler/cluster-autoscaler/core/test" "k8s.io/autoscaler/cluster-autoscaler/observers/nodegroupchange" @@ -1279,6 +1280,7 @@ func runStartDeletionTest(t *testing.T, tc startDeletionTestCase, force bool) { nodeDeletionScheduler: NewGroupDeletionScheduler(&ctx, ndt, ndb, evictor), budgetProcessor: budgets.NewScaleDownBudgetProcessor(&ctx), configGetter: nodegroupconfig.NewDefaultNodeGroupConfigProcessor(ctx.NodeGroupDefaults), + nodeLatencyTracker: latencytracker.NewNodeLatencyTracker(), } var gotResult status.ScaleDownResult @@ -1557,6 +1559,7 @@ func TestStartDeletionInBatchBasic(t *testing.T) { ctx: &ctx, nodeDeletionTracker: ndt, nodeDeletionScheduler: NewGroupDeletionScheduler(&ctx, ndt, ndb, evictor), budgetProcessor: budgets.NewScaleDownBudgetProcessor(&ctx), + nodeLatencyTracker: latencytracker.NewNodeLatencyTracker(), } for _, nodes := range deleteNodes { diff --git a/cluster-autoscaler/core/scaledown/latencytracker/latencytracker_test.go b/cluster-autoscaler/core/scaledown/latencytracker/latencytracker_test.go new file mode 100644 index 0000000000..d010038b81 --- /dev/null +++ b/cluster-autoscaler/core/scaledown/latencytracker/latencytracker_test.go @@ -0,0 +1,134 @@ +/* +Copyright 2024 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package latencytracker + +import ( + "sync" + "testing" + "time" +) + +func TestUpdateStateWithUnneededList_AddsNewNodes(t *testing.T) { + tracker := NewNodeLatencyTracker() + now := time.Now() + node := NodeInfo{Name: "node1", UnneededSince: now, Threshold: 5 * time.Minute} + + tracker.UpdateStateWithUnneededList([]NodeInfo{node}, now) + + tracker.Lock() + defer tracker.Unlock() + if _, ok := tracker.nodes["node1"]; !ok { + t.Errorf("expected node1 to be tracked, but was not") + } +} + +func TestUpdateStateWithUnneededList_DoesNotDuplicate(t *testing.T) { + tracker := NewNodeLatencyTracker() + now := time.Now() + node := NodeInfo{Name: "node1", UnneededSince: now, Threshold: 5 * time.Minute} + + tracker.UpdateStateWithUnneededList([]NodeInfo{node}, now) + tracker.UpdateStateWithUnneededList([]NodeInfo{node}, now.Add(time.Minute)) + + tracker.Lock() + defer tracker.Unlock() + if len(tracker.nodes) != 1 { + t.Errorf("expected 1 tracked node, got %d", len(tracker.nodes)) + } +} + +func TestObserveDeletion_RemovesNode(t *testing.T) { + tracker := NewNodeLatencyTracker() + now := time.Now() + node := NodeInfo{ + Name: "node1", + UnneededSince: now.Add(-10 * time.Minute), + Threshold: 5 * time.Minute, + } + tracker.UpdateStateWithUnneededList([]NodeInfo{node}, now) + + tracker.ObserveDeletion("node1", now) + + tracker.Lock() + defer tracker.Unlock() + if _, ok := tracker.nodes["node1"]; ok { + t.Errorf("expected node1 removed after ObserveDeletion") + } +} + +func TestObserveDeletion_NoOpIfNodeNotTracked(t *testing.T) { + tracker := NewNodeLatencyTracker() + now := time.Now() + + tracker.ObserveDeletion("node1", now) + + tracker.Lock() + defer tracker.Unlock() + if len(tracker.nodes) != 0 { + t.Errorf("expected no nodes tracked, got %d", len(tracker.nodes)) + } +} + +func TestConcurrentUpdatesAndDeletions(t *testing.T) { + tracker := NewNodeLatencyTracker() + now := time.Now() + + node := NodeInfo{ + Name: "node1", + UnneededSince: now, + Threshold: 2 * time.Minute, + } + + var wg sync.WaitGroup + stop := make(chan struct{}) + + wg.Add(1) + go func() { + defer wg.Done() + for { + select { + case <-stop: + return + default: + tracker.UpdateStateWithUnneededList([]NodeInfo{node}, time.Now()) + } + } + }() + + wg.Add(1) + go func() { + defer wg.Done() + for { + select { + case <-stop: + return + default: + tracker.ObserveDeletion("node1", time.Now()) + } + } + }() + + time.Sleep(50 * time.Millisecond) + close(stop) + wg.Wait() + + tracker.Lock() + defer tracker.Unlock() + if len(tracker.nodes) > 1 { + t.Errorf("expected at most 1 tracked node, got %d", len(tracker.nodes)) + } +} diff --git a/cluster-autoscaler/core/scaledown/latencytracker/nodelatencytracker.go b/cluster-autoscaler/core/scaledown/latencytracker/nodelatencytracker.go new file mode 100644 index 0000000000..0735c146e6 --- /dev/null +++ b/cluster-autoscaler/core/scaledown/latencytracker/nodelatencytracker.go @@ -0,0 +1,66 @@ +package latencytracker + +import ( + "sync" + "time" + + "k8s.io/autoscaler/cluster-autoscaler/metrics" + + "k8s.io/klog/v2" +) + +type NodeInfo struct { + Name string + UnneededSince time.Time + Threshold time.Duration +} + +type NodeLatencyTracker struct { + sync.Mutex + nodes map[string]NodeInfo +} + +// NewNodeLatencyTracker creates a new tracker. +func NewNodeLatencyTracker() *NodeLatencyTracker { + return &NodeLatencyTracker{ + nodes: make(map[string]NodeInfo), + } +} + +func (t *NodeLatencyTracker) UpdateStateWithUnneededList(list []NodeInfo, timestamp time.Time) { + t.Lock() + defer t.Unlock() + + currentSet := make(map[string]struct{}, len(list)) + for _, info := range list { + currentSet[info.Name] = struct{}{} + _, exists := t.nodes[info.Name] + if !exists { + t.nodes[info.Name] = NodeInfo{ + Name: info.Name, + UnneededSince: info.UnneededSince, + Threshold: info.Threshold, + } + klog.V(2).Infof("Started tracking unneeded node %s at %v with ScaleDownUnneededTime=%v", + info.Name, info.UnneededSince, info.Threshold) + } + } +} + +// ObserveDeletion is called by the actuator just before node deletion. +func (t *NodeLatencyTracker) ObserveDeletion(nodeName string, timestamp time.Time) { + t.Lock() + defer t.Unlock() + + if info, exists := t.nodes[nodeName]; exists { + duration := timestamp.Sub(info.UnneededSince) + + klog.V(2).Infof( + "Observing deletion for node %s, unneeded for %s (threshold was %s).", + nodeName, duration, info.Threshold, + ) + + metrics.UpdateScaleDownNodeDeletionDuration("true", duration-info.Threshold) + delete(t.nodes, nodeName) + } +} diff --git a/cluster-autoscaler/core/scaledown/planner/planner.go b/cluster-autoscaler/core/scaledown/planner/planner.go index 32be506ca7..7b794571db 100644 --- a/cluster-autoscaler/core/scaledown/planner/planner.go +++ b/cluster-autoscaler/core/scaledown/planner/planner.go @@ -26,6 +26,7 @@ import ( "k8s.io/autoscaler/cluster-autoscaler/context" "k8s.io/autoscaler/cluster-autoscaler/core/scaledown" "k8s.io/autoscaler/cluster-autoscaler/core/scaledown/eligibility" + "k8s.io/autoscaler/cluster-autoscaler/core/scaledown/latencytracker" "k8s.io/autoscaler/cluster-autoscaler/core/scaledown/pdb" "k8s.io/autoscaler/cluster-autoscaler/core/scaledown/resource" "k8s.io/autoscaler/cluster-autoscaler/core/scaledown/unneeded" @@ -76,10 +77,11 @@ type Planner struct { cc controllerReplicasCalculator scaleDownSetProcessor nodes.ScaleDownSetProcessor scaleDownContext *nodes.ScaleDownContext + nodeLatencyTracker *latencytracker.NodeLatencyTracker } // New creates a new Planner object. -func New(context *context.AutoscalingContext, processors *processors.AutoscalingProcessors, deleteOptions options.NodeDeleteOptions, drainabilityRules rules.Rules) *Planner { +func New(context *context.AutoscalingContext, processors *processors.AutoscalingProcessors, deleteOptions options.NodeDeleteOptions, drainabilityRules rules.Rules, nlt *latencytracker.NodeLatencyTracker) *Planner { resourceLimitsFinder := resource.NewLimitsFinder(processors.CustomResourcesProcessor) minUpdateInterval := context.AutoscalingOptions.NodeGroupDefaults.ScaleDownUnneededTime if minUpdateInterval == 0*time.Nanosecond { @@ -98,6 +100,7 @@ func New(context *context.AutoscalingContext, processors *processors.Autoscaling scaleDownSetProcessor: processors.ScaleDownSetProcessor, scaleDownContext: nodes.NewDefaultScaleDownContext(), minUpdateInterval: minUpdateInterval, + nodeLatencyTracker: nlt, } } @@ -301,6 +304,19 @@ func (p *Planner) categorizeNodes(podDestinations map[string]bool, scaleDownCand } } p.unneededNodes.Update(removableList, p.latestUpdate) + if p.nodeLatencyTracker != nil { + var unneededList []latencytracker.NodeInfo + for _, n := range p.unneededNodes.AsList() { + if threshold, ok := p.unneededNodes.GetUnneededTimeForNode(p.context, n.Name); ok { + unneededList = append(unneededList, latencytracker.NodeInfo{ + Name: n.Name, + UnneededSince: p.latestUpdate, + Threshold: threshold, + }) + } + } + p.nodeLatencyTracker.UpdateStateWithUnneededList(unneededList, p.latestUpdate) + } if unremovableCount > 0 { klog.V(1).Infof("%v nodes found to be unremovable in simulation, will re-check them at %v", unremovableCount, unremovableTimeout) } diff --git a/cluster-autoscaler/core/scaledown/planner/planner_test.go b/cluster-autoscaler/core/scaledown/planner/planner_test.go index 9fe08513f7..56793080ae 100644 --- a/cluster-autoscaler/core/scaledown/planner/planner_test.go +++ b/cluster-autoscaler/core/scaledown/planner/planner_test.go @@ -32,6 +32,7 @@ import ( "k8s.io/autoscaler/cluster-autoscaler/config" "k8s.io/autoscaler/cluster-autoscaler/context" "k8s.io/autoscaler/cluster-autoscaler/core/scaledown/deletiontracker" + "k8s.io/autoscaler/cluster-autoscaler/core/scaledown/latencytracker" "k8s.io/autoscaler/cluster-autoscaler/core/scaledown/pdb" "k8s.io/autoscaler/cluster-autoscaler/core/scaledown/status" "k8s.io/autoscaler/cluster-autoscaler/core/scaledown/unremovable" @@ -500,7 +501,7 @@ func TestUpdateClusterState(t *testing.T) { assert.NoError(t, err) clustersnapshot.InitializeClusterSnapshotOrDie(t, context.ClusterSnapshot, tc.nodes, tc.pods) deleteOptions := options.NodeDeleteOptions{} - p := New(&context, processorstest.NewTestProcessors(&context), deleteOptions, nil) + p := New(&context, processorstest.NewTestProcessors(&context), deleteOptions, nil, latencytracker.NewNodeLatencyTracker()) p.eligibilityChecker = &fakeEligibilityChecker{eligible: asMap(tc.eligible)} if tc.isSimulationTimeout { context.AutoscalingOptions.ScaleDownSimulationTimeout = 1 * time.Second @@ -696,7 +697,7 @@ func TestUpdateClusterStatUnneededNodesLimit(t *testing.T) { assert.NoError(t, err) clustersnapshot.InitializeClusterSnapshotOrDie(t, context.ClusterSnapshot, nodes, nil) deleteOptions := options.NodeDeleteOptions{} - p := New(&context, processorstest.NewTestProcessors(&context), deleteOptions, nil) + p := New(&context, processorstest.NewTestProcessors(&context), deleteOptions, nil, latencytracker.NewNodeLatencyTracker()) p.eligibilityChecker = &fakeEligibilityChecker{eligible: asMap(nodeNames(nodes))} p.minUpdateInterval = tc.updateInterval p.unneededNodes.Update(previouslyUnneeded, time.Now()) @@ -864,7 +865,7 @@ func TestNodesToDelete(t *testing.T) { assert.NoError(t, err) clustersnapshot.InitializeClusterSnapshotOrDie(t, context.ClusterSnapshot, allNodes, nil) deleteOptions := options.NodeDeleteOptions{} - p := New(&context, processorstest.NewTestProcessors(&context), deleteOptions, nil) + p := New(&context, processorstest.NewTestProcessors(&context), deleteOptions, nil, latencytracker.NewNodeLatencyTracker()) p.latestUpdate = time.Now() p.scaleDownContext.ActuationStatus = deletiontracker.NewNodeDeletionTracker(0 * time.Second) p.unneededNodes.Update(allRemovables, time.Now().Add(-1*time.Hour)) diff --git a/cluster-autoscaler/core/scaledown/unneeded/nodes.go b/cluster-autoscaler/core/scaledown/unneeded/nodes.go index 9c29787608..afcf4aad50 100644 --- a/cluster-autoscaler/core/scaledown/unneeded/nodes.go +++ b/cluster-autoscaler/core/scaledown/unneeded/nodes.go @@ -37,10 +37,11 @@ import ( // Nodes tracks the state of cluster nodes that are not needed. type Nodes struct { - sdtg scaleDownTimeGetter - limitsFinder *resource.LimitsFinder - cachedList []*apiv1.Node - byName map[string]*node + sdtg scaleDownTimeGetter + limitsFinder *resource.LimitsFinder + cachedList []*apiv1.Node + byName map[string]*node + unneededTimeCache map[string]time.Duration } type node struct { @@ -58,8 +59,9 @@ type scaleDownTimeGetter interface { // NewNodes returns a new initialized Nodes object. func NewNodes(sdtg scaleDownTimeGetter, limitsFinder *resource.LimitsFinder) *Nodes { return &Nodes{ - sdtg: sdtg, - limitsFinder: limitsFinder, + sdtg: sdtg, + limitsFinder: limitsFinder, + unneededTimeCache: make(map[string]time.Duration), } } @@ -141,6 +143,41 @@ func (n *Nodes) RemovableAt(context *context.AutoscalingContext, scaleDownContex return } +// GetUnneededTimeForNode returns the unneeded timeout for a given node if tracked. +// Returns (duration, true) if found, otherwise (0, false). +func (n *Nodes) GetUnneededTimeForNode(ctx *context.AutoscalingContext, nodeName string) (time.Duration, bool) { + v, found := n.byName[nodeName] + if !found { + klog.V(4).Infof("Skipping - node %s not found in unneded list", nodeName) + return 0, false + } + + node := v.ntbr.Node + nodeGroup, err := ctx.CloudProvider.NodeGroupForNode(node) + if err != nil { + klog.Errorf("Error while getting node group for %s: %v", nodeName, err) + return 0, false + } + if nodeGroup == nil || reflect.ValueOf(nodeGroup).IsNil() { + klog.V(4).Infof("Skipping %s - no node group", nodeName) + return 0, false + } + + ngID := nodeGroup.Id() + if cached, ok := n.unneededTimeCache[ngID]; ok { + return cached, true + } + + unneededTime, err := n.sdtg.GetScaleDownUnneededTime(nodeGroup) + if err != nil { + klog.Errorf("Error getting ScaleDownUnneededTime for node %s: %v", nodeName, err) + return 0, false + } + + n.unneededTimeCache[ngID] = unneededTime + return unneededTime, true +} + func (n *Nodes) unremovableReason(context *context.AutoscalingContext, scaleDownContext nodes.ScaleDownContext, v *node, ts time.Time, nodeGroupSize map[string]int) simulator.UnremovableReason { node := v.ntbr.Node // Check if node is marked with no scale down annotation. diff --git a/cluster-autoscaler/core/static_autoscaler.go b/cluster-autoscaler/core/static_autoscaler.go index d84e70e715..581382c396 100644 --- a/cluster-autoscaler/core/static_autoscaler.go +++ b/cluster-autoscaler/core/static_autoscaler.go @@ -30,6 +30,7 @@ import ( "k8s.io/autoscaler/cluster-autoscaler/core/scaledown" "k8s.io/autoscaler/cluster-autoscaler/core/scaledown/actuation" "k8s.io/autoscaler/cluster-autoscaler/core/scaledown/deletiontracker" + "k8s.io/autoscaler/cluster-autoscaler/core/scaledown/latencytracker" "k8s.io/autoscaler/cluster-autoscaler/core/scaledown/pdb" "k8s.io/autoscaler/cluster-autoscaler/core/scaledown/planner" scaledownstatus "k8s.io/autoscaler/cluster-autoscaler/core/scaledown/status" @@ -175,11 +176,15 @@ func NewStaticAutoscaler( // TODO: Populate the ScaleDownActuator/Planner fields in AutoscalingContext // during the struct creation rather than here. - scaleDownPlanner := planner.New(autoscalingContext, processors, deleteOptions, drainabilityRules) + var nldt *latencytracker.NodeLatencyTracker + if autoscalingContext.AutoscalingOptions.NodeLatencyTrackingEnabled { + nldt = latencytracker.NewNodeLatencyTracker() + } + scaleDownPlanner := planner.New(autoscalingContext, processors, deleteOptions, drainabilityRules, nldt) processorCallbacks.scaleDownPlanner = scaleDownPlanner ndt := deletiontracker.NewNodeDeletionTracker(0 * time.Second) - scaleDownActuator := actuation.NewActuator(autoscalingContext, processors.ScaleStateNotifier, ndt, deleteOptions, drainabilityRules, processors.NodeGroupConfigProcessor) + scaleDownActuator := actuation.NewActuator(autoscalingContext, processors.ScaleStateNotifier, ndt, nldt, deleteOptions, drainabilityRules, processors.NodeGroupConfigProcessor) autoscalingContext.ScaleDownActuator = scaleDownActuator if scaleUpOrchestrator == nil { diff --git a/cluster-autoscaler/core/static_autoscaler_test.go b/cluster-autoscaler/core/static_autoscaler_test.go index b16f755721..cc138ca702 100644 --- a/cluster-autoscaler/core/static_autoscaler_test.go +++ b/cluster-autoscaler/core/static_autoscaler_test.go @@ -48,6 +48,7 @@ import ( "k8s.io/autoscaler/cluster-autoscaler/core/scaledown" "k8s.io/autoscaler/cluster-autoscaler/core/scaledown/actuation" "k8s.io/autoscaler/cluster-autoscaler/core/scaledown/deletiontracker" + "k8s.io/autoscaler/cluster-autoscaler/core/scaledown/latencytracker" "k8s.io/autoscaler/cluster-autoscaler/core/scaledown/planner" "k8s.io/autoscaler/cluster-autoscaler/core/scaledown/status" "k8s.io/autoscaler/cluster-autoscaler/core/scaleup/orchestrator" @@ -164,7 +165,7 @@ func (m *onNodeGroupDeleteMock) Delete(id string) error { func setUpScaleDownActuator(ctx *context.AutoscalingContext, autoscalingOptions config.AutoscalingOptions) { deleteOptions := options.NewNodeDeleteOptions(autoscalingOptions) - ctx.ScaleDownActuator = actuation.NewActuator(ctx, nil, deletiontracker.NewNodeDeletionTracker(0*time.Second), deleteOptions, rules.Default(deleteOptions), processorstest.NewTestProcessors(ctx).NodeGroupConfigProcessor) + ctx.ScaleDownActuator = actuation.NewActuator(ctx, nil, deletiontracker.NewNodeDeletionTracker(0*time.Second), latencytracker.NewNodeLatencyTracker(), deleteOptions, rules.Default(deleteOptions), processorstest.NewTestProcessors(ctx).NodeGroupConfigProcessor) } type nodeGroup struct { @@ -210,6 +211,7 @@ type commonMocks struct { podDisruptionBudgetLister *podDisruptionBudgetListerMock daemonSetLister *daemonSetListerMock nodeDeletionTracker *deletiontracker.NodeDeletionTracker + nodeLatencyTracker *latencytracker.NodeLatencyTracker resourceClaimLister *fakeAllObjectsLister[*resourceapi.ResourceClaim] resourceSliceLister *fakeAllObjectsLister[*resourceapi.ResourceSlice] @@ -320,8 +322,12 @@ func setupAutoscaler(config *autoscalerSetupConfig) (*StaticAutoscaler, error) { if nodeDeletionTracker == nil { nodeDeletionTracker = deletiontracker.NewNodeDeletionTracker(0 * time.Second) } - ctx.ScaleDownActuator = actuation.NewActuator(&ctx, clusterState, nodeDeletionTracker, deleteOptions, drainabilityRules, processors.NodeGroupConfigProcessor) - sdPlanner := planner.New(&ctx, processors, deleteOptions, drainabilityRules) + nodeLatencyTracker := config.mocks.nodeLatencyTracker + if nodeLatencyTracker == nil { + nodeLatencyTracker = latencytracker.NewNodeLatencyTracker() + } + ctx.ScaleDownActuator = actuation.NewActuator(&ctx, clusterState, nodeDeletionTracker, nodeLatencyTracker, deleteOptions, drainabilityRules, processors.NodeGroupConfigProcessor) + sdPlanner := planner.New(&ctx, processors, deleteOptions, drainabilityRules, nodeLatencyTracker) processorCallbacks.scaleDownPlanner = sdPlanner @@ -409,7 +415,7 @@ func TestStaticAutoscalerRunOnce(t *testing.T) { } processors := processorstest.NewTestProcessors(&context) clusterState := clusterstate.NewClusterStateRegistry(provider, clusterStateConfig, context.LogRecorder, NewBackoff(), nodegroupconfig.NewDefaultNodeGroupConfigProcessor(options.NodeGroupDefaults), processors.AsyncNodeGroupStateChecker) - sdPlanner, sdActuator := newScaleDownPlannerAndActuator(&context, processors, clusterState, nil) + sdPlanner, sdActuator := newScaleDownPlannerAndActuator(&context, processors, clusterState, nil, nil) suOrchestrator := orchestrator.New() suOrchestrator.Initialize(&context, processors, clusterState, newEstimatorBuilder(), taints.TaintConfig{}) @@ -677,7 +683,7 @@ func TestStaticAutoscalerRunOnceWithScaleDownDelayPerNG(t *testing.T) { clusterState := clusterstate.NewClusterStateRegistry(provider, clusterStateConfig, context.LogRecorder, NewBackoff(), nodegroupconfig.NewDefaultNodeGroupConfigProcessor(options.NodeGroupDefaults), processors.AsyncNodeGroupStateChecker) processors.ScaleStateNotifier.Register(clusterState) - sdPlanner, sdActuator := newScaleDownPlannerAndActuator(&context, processors, clusterState, nil) + sdPlanner, sdActuator := newScaleDownPlannerAndActuator(&context, processors, clusterState, nil, nil) suOrchestrator := orchestrator.New() suOrchestrator.Initialize(&context, processors, clusterState, newEstimatorBuilder(), taints.TaintConfig{}) @@ -821,7 +827,7 @@ func TestStaticAutoscalerRunOnceWithAutoprovisionedEnabled(t *testing.T) { } clusterState := clusterstate.NewClusterStateRegistry(provider, clusterStateConfig, context.LogRecorder, NewBackoff(), nodegroupconfig.NewDefaultNodeGroupConfigProcessor(options.NodeGroupDefaults), processors.AsyncNodeGroupStateChecker) - sdPlanner, sdActuator := newScaleDownPlannerAndActuator(&context, processors, clusterState, nil) + sdPlanner, sdActuator := newScaleDownPlannerAndActuator(&context, processors, clusterState, nil, nil) suOrchestrator := orchestrator.New() suOrchestrator.Initialize(&context, processors, clusterState, newEstimatorBuilder(), taints.TaintConfig{}) @@ -974,7 +980,7 @@ func TestStaticAutoscalerRunOnceWithALongUnregisteredNode(t *testing.T) { // broken node failed to register in time clusterState.UpdateNodes(nodes, nil, later) - sdPlanner, sdActuator := newScaleDownPlannerAndActuator(&context, processors, clusterState, nil) + sdPlanner, sdActuator := newScaleDownPlannerAndActuator(&context, processors, clusterState, nil, nil) suOrchestrator := orchestrator.New() suOrchestrator.Initialize(&context, processors, clusterState, newEstimatorBuilder(), taints.TaintConfig{}) @@ -1129,7 +1135,7 @@ func TestStaticAutoscalerRunOncePodsWithPriorities(t *testing.T) { processors := processorstest.NewTestProcessors(&context) clusterState := clusterstate.NewClusterStateRegistry(provider, clusterStateConfig, context.LogRecorder, NewBackoff(), nodegroupconfig.NewDefaultNodeGroupConfigProcessor(options.NodeGroupDefaults), processors.AsyncNodeGroupStateChecker) - sdPlanner, sdActuator := newScaleDownPlannerAndActuator(&context, processors, clusterState, nil) + sdPlanner, sdActuator := newScaleDownPlannerAndActuator(&context, processors, clusterState, nil, nil) suOrchestrator := orchestrator.New() suOrchestrator.Initialize(&context, processors, clusterState, newEstimatorBuilder(), taints.TaintConfig{}) @@ -1260,7 +1266,7 @@ func TestStaticAutoscalerRunOnceWithFilteringOnBinPackingEstimator(t *testing.T) processors := processorstest.NewTestProcessors(&context) clusterState := clusterstate.NewClusterStateRegistry(provider, clusterStateConfig, context.LogRecorder, NewBackoff(), nodegroupconfig.NewDefaultNodeGroupConfigProcessor(options.NodeGroupDefaults), processors.AsyncNodeGroupStateChecker) - sdPlanner, sdActuator := newScaleDownPlannerAndActuator(&context, processors, clusterState, nil) + sdPlanner, sdActuator := newScaleDownPlannerAndActuator(&context, processors, clusterState, nil, nil) autoscaler := &StaticAutoscaler{ AutoscalingContext: &context, @@ -1358,7 +1364,7 @@ func TestStaticAutoscalerRunOnceWithFilteringOnUpcomingNodesEnabledNoScaleUp(t * processors := processorstest.NewTestProcessors(&context) clusterState := clusterstate.NewClusterStateRegistry(provider, clusterStateConfig, context.LogRecorder, NewBackoff(), nodegroupconfig.NewDefaultNodeGroupConfigProcessor(options.NodeGroupDefaults), processors.AsyncNodeGroupStateChecker) - sdPlanner, sdActuator := newScaleDownPlannerAndActuator(&context, processors, clusterState, nil) + sdPlanner, sdActuator := newScaleDownPlannerAndActuator(&context, processors, clusterState, nil, nil) autoscaler := &StaticAutoscaler{ AutoscalingContext: &context, @@ -2260,7 +2266,7 @@ func TestStaticAutoscalerUpcomingScaleDownCandidates(t *testing.T) { csr := clusterstate.NewClusterStateRegistry(provider, csrConfig, ctx.LogRecorder, NewBackoff(), nodegroupconfig.NewDefaultNodeGroupConfigProcessor(config.NodeGroupAutoscalingOptions{MaxNodeProvisionTime: 15 * time.Minute}), processors.AsyncNodeGroupStateChecker) // Setting the Actuator is necessary for testing any scale-down logic, it shouldn't have anything to do in this test. - actuator := actuation.NewActuator(&ctx, csr, deletiontracker.NewNodeDeletionTracker(0*time.Second), options.NodeDeleteOptions{}, nil, processorstest.NewTestProcessors(&ctx).NodeGroupConfigProcessor) + actuator := actuation.NewActuator(&ctx, csr, deletiontracker.NewNodeDeletionTracker(0*time.Second), latencytracker.NewNodeLatencyTracker(), options.NodeDeleteOptions{}, nil, processorstest.NewTestProcessors(&ctx).NodeGroupConfigProcessor) ctx.ScaleDownActuator = actuator // Fake planner that keeps track of the scale-down candidates passed to UpdateClusterState. @@ -2921,7 +2927,7 @@ func waitForDeleteToFinish(t *testing.T, deleteFinished <-chan bool) { } } -func newScaleDownPlannerAndActuator(ctx *context.AutoscalingContext, p *ca_processors.AutoscalingProcessors, cs *clusterstate.ClusterStateRegistry, nodeDeletionTracker *deletiontracker.NodeDeletionTracker) (scaledown.Planner, scaledown.Actuator) { +func newScaleDownPlannerAndActuator(ctx *context.AutoscalingContext, p *ca_processors.AutoscalingProcessors, cs *clusterstate.ClusterStateRegistry, nodeDeletionTracker *deletiontracker.NodeDeletionTracker, nodeLatencyTracker *latencytracker.NodeLatencyTracker) (scaledown.Planner, scaledown.Actuator) { ctx.MaxScaleDownParallelism = 10 ctx.MaxDrainParallelism = 1 ctx.NodeDeletionBatcherInterval = 0 * time.Second @@ -2936,8 +2942,11 @@ func newScaleDownPlannerAndActuator(ctx *context.AutoscalingContext, p *ca_proce if nodeDeletionTracker == nil { nodeDeletionTracker = deletiontracker.NewNodeDeletionTracker(0 * time.Second) } - planner := planner.New(ctx, p, deleteOptions, nil) - actuator := actuation.NewActuator(ctx, cs, nodeDeletionTracker, deleteOptions, nil, p.NodeGroupConfigProcessor) + if nodeLatencyTracker == nil { + nodeLatencyTracker = latencytracker.NewNodeLatencyTracker() + } + planner := planner.New(ctx, p, deleteOptions, nil, nodeLatencyTracker) + actuator := actuation.NewActuator(ctx, cs, nodeDeletionTracker, nodeLatencyTracker, deleteOptions, nil, p.NodeGroupConfigProcessor) return planner, actuator } @@ -3053,13 +3062,13 @@ func buildStaticAutoscaler(t *testing.T, provider cloudprovider.CloudProvider, a processors.ScaleDownNodeProcessor = cp csr := clusterstate.NewClusterStateRegistry(provider, clusterstate.ClusterStateRegistryConfig{OkTotalUnreadyCount: 1}, ctx.LogRecorder, NewBackoff(), nodegroupconfig.NewDefaultNodeGroupConfigProcessor(config.NodeGroupAutoscalingOptions{MaxNodeProvisionTime: 15 * time.Minute}), processors.AsyncNodeGroupStateChecker) - actuator := actuation.NewActuator(&ctx, csr, deletiontracker.NewNodeDeletionTracker(0*time.Second), options.NodeDeleteOptions{}, nil, processors.NodeGroupConfigProcessor) + actuator := actuation.NewActuator(&ctx, csr, deletiontracker.NewNodeDeletionTracker(0*time.Second), latencytracker.NewNodeLatencyTracker(), options.NodeDeleteOptions{}, nil, processors.NodeGroupConfigProcessor) ctx.ScaleDownActuator = actuator deleteOptions := options.NewNodeDeleteOptions(ctx.AutoscalingOptions) drainabilityRules := rules.Default(deleteOptions) - sdPlanner := planner.New(&ctx, processors, deleteOptions, drainabilityRules) + sdPlanner := planner.New(&ctx, processors, deleteOptions, drainabilityRules, latencytracker.NewNodeLatencyTracker()) autoscaler := &StaticAutoscaler{ AutoscalingContext: &ctx, diff --git a/cluster-autoscaler/metrics/metrics.go b/cluster-autoscaler/metrics/metrics.go index ebc5541c5e..fd9168c4e1 100644 --- a/cluster-autoscaler/metrics/metrics.go +++ b/cluster-autoscaler/metrics/metrics.go @@ -427,6 +427,15 @@ var ( Buckets: k8smetrics.ExponentialBuckets(1, 2, 6), // 1, 2, 4, ..., 32 }, []string{"instance_type", "cpu_count", "namespace_count"}, ) + + scaleDownNodeDeletionDuration = k8smetrics.NewHistogramVec( + &k8smetrics.HistogramOpts{ + Namespace: caNamespace, + Name: "node_deletion_duration_seconds", + Help: "Latency from planning (node marked) to final outcome (deleted, aborted, rescued).", + Buckets: k8smetrics.ExponentialBuckets(10, 2, 12), + }, []string{"deleted"}, + ) ) // RegisterAll registers all metrics. @@ -463,6 +472,7 @@ func RegisterAll(emitPerNodeGroupMetrics bool) { legacyregistry.MustRegister(nodeTaintsCount) legacyregistry.MustRegister(inconsistentInstancesMigsCount) legacyregistry.MustRegister(binpackingHeterogeneity) + legacyregistry.MustRegister(scaleDownNodeDeletionDuration) if emitPerNodeGroupMetrics { legacyregistry.MustRegister(nodesGroupMinNodes) @@ -750,3 +760,7 @@ func UpdateInconsistentInstancesMigsCount(migCount int) { func ObserveBinpackingHeterogeneity(instanceType, cpuCount, namespaceCount string, pegCount int) { binpackingHeterogeneity.WithLabelValues(instanceType, cpuCount, namespaceCount).Observe(float64(pegCount)) } + +func UpdateScaleDownNodeDeletionDuration(deleted string, duration time.Duration) { + scaleDownNodeDeletionDuration.WithLabelValues(deleted).Observe(duration.Seconds()) +}