From 541ce04e4b862b2714f2366e4e1298a94b97cfbe Mon Sep 17 00:00:00 2001 From: Yaroslava Serdiuk Date: Tue, 17 Jan 2023 17:25:42 +0000 Subject: [PATCH] Add previous scale down candidate sorting --- cluster-autoscaler/main.go | 6 ++ .../nodes/previous_candidates_sorting.go | 54 ++++++++++++++ .../nodes/previous_candidates_sorting_test.go | 73 +++++++++++++++++++ ...scale_down_candidates_sorting_processor.go | 68 +++++++++++++++++ 4 files changed, 201 insertions(+) create mode 100644 cluster-autoscaler/processors/nodes/previous_candidates_sorting.go create mode 100644 cluster-autoscaler/processors/nodes/previous_candidates_sorting_test.go create mode 100644 cluster-autoscaler/processors/nodes/scale_down_candidates_sorting_processor.go diff --git a/cluster-autoscaler/main.go b/cluster-autoscaler/main.go index d1562adefb..bd5a26dc9f 100644 --- a/cluster-autoscaler/main.go +++ b/cluster-autoscaler/main.go @@ -49,6 +49,7 @@ import ( ca_processors "k8s.io/autoscaler/cluster-autoscaler/processors" "k8s.io/autoscaler/cluster-autoscaler/processors/nodegroupset" "k8s.io/autoscaler/cluster-autoscaler/processors/nodeinfosprovider" + "k8s.io/autoscaler/cluster-autoscaler/processors/nodes" "k8s.io/autoscaler/cluster-autoscaler/simulator/clustersnapshot" "k8s.io/autoscaler/cluster-autoscaler/utils/errors" kube_util "k8s.io/autoscaler/cluster-autoscaler/utils/kubernetes" @@ -403,6 +404,11 @@ func buildAutoscaler(debuggingSnapshotter debuggingsnapshot.DebuggingSnapshotter podlistprocessor.NewCurrentlyDrainedNodesPodListProcessor(), podlistprocessor.NewFilterOutSchedulablePodListProcessor(opts.PredicateChecker), ) + if autoscalingOptions.ParallelDrain { + sdProcessor := nodes.NewScaleDownCandidatesSortingProcessor() + opts.Processors.ScaleDownNodeProcessor = sdProcessor + opts.Processors.ScaleDownCandidatesNotifier.Register(sdProcessor) + } var nodeInfoComparator nodegroupset.NodeInfoComparator if len(autoscalingOptions.BalancingLabels) > 0 { diff --git a/cluster-autoscaler/processors/nodes/previous_candidates_sorting.go b/cluster-autoscaler/processors/nodes/previous_candidates_sorting.go new file mode 100644 index 0000000000..80ff6e8f4f --- /dev/null +++ b/cluster-autoscaler/processors/nodes/previous_candidates_sorting.go @@ -0,0 +1,54 @@ +/* +Copyright 2023 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package nodes + +import ( + "time" + + apiv1 "k8s.io/api/core/v1" +) + +// PreviousCandidates is a struct that store scale down candidates from previous loop. +type PreviousCandidates struct { + candidates map[string]bool +} + +// NewPreviousCandidates return empty PreviousCandidates struct. +func NewPreviousCandidates() *PreviousCandidates { + return &PreviousCandidates{} +} + +// UpdateScaleDownCandidates updates scale down candidates. +func (p *PreviousCandidates) UpdateScaleDownCandidates(nodes []*apiv1.Node, now time.Time) { + result := make(map[string]bool) + for _, node := range nodes { + result[node.Name] = true + } + p.candidates = result +} + +// ScaleDownEarlierThan return true if node1 is in candidate list and node2 isn't. +func (p *PreviousCandidates) ScaleDownEarlierThan(node1, node2 *apiv1.Node) bool { + if p.isPreviousCandidate(node1) && !(p.isPreviousCandidate(node2)) { + return true + } + return false +} + +func (p *PreviousCandidates) isPreviousCandidate(node *apiv1.Node) bool { + return p.candidates[node.Name] +} diff --git a/cluster-autoscaler/processors/nodes/previous_candidates_sorting_test.go b/cluster-autoscaler/processors/nodes/previous_candidates_sorting_test.go new file mode 100644 index 0000000000..67c2ef4f5a --- /dev/null +++ b/cluster-autoscaler/processors/nodes/previous_candidates_sorting_test.go @@ -0,0 +1,73 @@ +/* +Copyright 2023 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package nodes + +import ( + "testing" + "time" + + "github.com/stretchr/testify/assert" + apiv1 "k8s.io/api/core/v1" + . "k8s.io/autoscaler/cluster-autoscaler/utils/test" +) + +func TestScaleDownEarlierThan(t *testing.T) { + candidate1 := BuildTestNode("candidate1", 100, 0) + candidate2 := BuildTestNode("candidate2", 100, 0) + nonCandidate1 := BuildTestNode("non-candidate1", 100, 0) + nonCandidate2 := BuildTestNode("non-candidate2", 100, 0) + + p := NewPreviousCandidates() + p.UpdateScaleDownCandidates([]*apiv1.Node{candidate1, candidate2}, time.Now()) + testCases := []struct { + name string + node1 *apiv1.Node + node2 *apiv1.Node + want bool + }{ + { + name: "Compare two candidates", + node1: candidate1, + node2: candidate2, + want: false, + }, + { + name: "Compare two non-candidates", + node1: nonCandidate1, + node2: nonCandidate2, + want: false, + }, + { + name: "Compare candidate and non-candidate", + node1: candidate1, + node2: nonCandidate2, + want: true, + }, + { + name: "Compare non-candidate and candidate", + node1: nonCandidate1, + node2: candidate2, + want: false, + }, + } + for _, test := range testCases { + t.Run(test.name, func(t *testing.T) { + got := p.ScaleDownEarlierThan(test.node1, test.node2) + assert.Equal(t, got, test.want) + }) + } +} diff --git a/cluster-autoscaler/processors/nodes/scale_down_candidates_sorting_processor.go b/cluster-autoscaler/processors/nodes/scale_down_candidates_sorting_processor.go new file mode 100644 index 0000000000..e5b07b6fa7 --- /dev/null +++ b/cluster-autoscaler/processors/nodes/scale_down_candidates_sorting_processor.go @@ -0,0 +1,68 @@ +/* +Copyright 2023 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package nodes + +import ( + "sort" + "time" + + apiv1 "k8s.io/api/core/v1" + + "k8s.io/autoscaler/cluster-autoscaler/context" + "k8s.io/autoscaler/cluster-autoscaler/utils/errors" +) + +// ScaleDownCandidatesSortingProcessor is a wrapper for preFilteringProcessor that takes into account previous +// scale down candidates. This is necessary for efficient parallel scale down. +type ScaleDownCandidatesSortingProcessor struct { + preFilter *PreFilteringScaleDownNodeProcessor + previousCandidates *PreviousCandidates +} + +// GetPodDestinationCandidates returns nodes that potentially could act as destinations for pods +// that would become unscheduled after a scale down. +func (p *ScaleDownCandidatesSortingProcessor) GetPodDestinationCandidates(ctx *context.AutoscalingContext, + nodes []*apiv1.Node) ([]*apiv1.Node, errors.AutoscalerError) { + return p.preFilter.GetPodDestinationCandidates(ctx, nodes) +} + +// GetScaleDownCandidates returns filter nodes and move previous scale down candidates to the beginning of the list. +func (p *ScaleDownCandidatesSortingProcessor) GetScaleDownCandidates(ctx *context.AutoscalingContext, + nodes []*apiv1.Node) ([]*apiv1.Node, errors.AutoscalerError) { + candidates, err := p.preFilter.GetScaleDownCandidates(ctx, nodes) + if err != nil { + return candidates, err + } + sort.Slice(candidates, func(i, j int) bool { + return p.previousCandidates.ScaleDownEarlierThan(candidates[i], candidates[j]) + }) + return candidates, nil +} + +// CleanUp is called at CA termination. +func (p *ScaleDownCandidatesSortingProcessor) CleanUp() { +} + +// NewScaleDownCandidatesSortingProcessor returns a new PreFilteringScaleDownNodeProcessor. +func NewScaleDownCandidatesSortingProcessor() *ScaleDownCandidatesSortingProcessor { + return &ScaleDownCandidatesSortingProcessor{preFilter: NewPreFilteringScaleDownNodeProcessor(), previousCandidates: NewPreviousCandidates()} +} + +// UpdateScaleDownCandidates updates scale down candidates. +func (p *ScaleDownCandidatesSortingProcessor) UpdateScaleDownCandidates(nodes []*apiv1.Node, now time.Time) { + p.previousCandidates.UpdateScaleDownCandidates(nodes, now) +}