Add previous scale down candidate sorting

This commit is contained in:
Yaroslava Serdiuk 2023-01-17 17:25:42 +00:00
parent 97159df69b
commit 541ce04e4b
4 changed files with 201 additions and 0 deletions

View File

@ -49,6 +49,7 @@ import (
ca_processors "k8s.io/autoscaler/cluster-autoscaler/processors"
"k8s.io/autoscaler/cluster-autoscaler/processors/nodegroupset"
"k8s.io/autoscaler/cluster-autoscaler/processors/nodeinfosprovider"
"k8s.io/autoscaler/cluster-autoscaler/processors/nodes"
"k8s.io/autoscaler/cluster-autoscaler/simulator/clustersnapshot"
"k8s.io/autoscaler/cluster-autoscaler/utils/errors"
kube_util "k8s.io/autoscaler/cluster-autoscaler/utils/kubernetes"
@ -403,6 +404,11 @@ func buildAutoscaler(debuggingSnapshotter debuggingsnapshot.DebuggingSnapshotter
podlistprocessor.NewCurrentlyDrainedNodesPodListProcessor(),
podlistprocessor.NewFilterOutSchedulablePodListProcessor(opts.PredicateChecker),
)
if autoscalingOptions.ParallelDrain {
sdProcessor := nodes.NewScaleDownCandidatesSortingProcessor()
opts.Processors.ScaleDownNodeProcessor = sdProcessor
opts.Processors.ScaleDownCandidatesNotifier.Register(sdProcessor)
}
var nodeInfoComparator nodegroupset.NodeInfoComparator
if len(autoscalingOptions.BalancingLabels) > 0 {

View File

@ -0,0 +1,54 @@
/*
Copyright 2023 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package nodes
import (
"time"
apiv1 "k8s.io/api/core/v1"
)
// PreviousCandidates is a struct that store scale down candidates from previous loop.
type PreviousCandidates struct {
candidates map[string]bool
}
// NewPreviousCandidates return empty PreviousCandidates struct.
func NewPreviousCandidates() *PreviousCandidates {
return &PreviousCandidates{}
}
// UpdateScaleDownCandidates updates scale down candidates.
func (p *PreviousCandidates) UpdateScaleDownCandidates(nodes []*apiv1.Node, now time.Time) {
result := make(map[string]bool)
for _, node := range nodes {
result[node.Name] = true
}
p.candidates = result
}
// ScaleDownEarlierThan return true if node1 is in candidate list and node2 isn't.
func (p *PreviousCandidates) ScaleDownEarlierThan(node1, node2 *apiv1.Node) bool {
if p.isPreviousCandidate(node1) && !(p.isPreviousCandidate(node2)) {
return true
}
return false
}
func (p *PreviousCandidates) isPreviousCandidate(node *apiv1.Node) bool {
return p.candidates[node.Name]
}

View File

@ -0,0 +1,73 @@
/*
Copyright 2023 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package nodes
import (
"testing"
"time"
"github.com/stretchr/testify/assert"
apiv1 "k8s.io/api/core/v1"
. "k8s.io/autoscaler/cluster-autoscaler/utils/test"
)
func TestScaleDownEarlierThan(t *testing.T) {
candidate1 := BuildTestNode("candidate1", 100, 0)
candidate2 := BuildTestNode("candidate2", 100, 0)
nonCandidate1 := BuildTestNode("non-candidate1", 100, 0)
nonCandidate2 := BuildTestNode("non-candidate2", 100, 0)
p := NewPreviousCandidates()
p.UpdateScaleDownCandidates([]*apiv1.Node{candidate1, candidate2}, time.Now())
testCases := []struct {
name string
node1 *apiv1.Node
node2 *apiv1.Node
want bool
}{
{
name: "Compare two candidates",
node1: candidate1,
node2: candidate2,
want: false,
},
{
name: "Compare two non-candidates",
node1: nonCandidate1,
node2: nonCandidate2,
want: false,
},
{
name: "Compare candidate and non-candidate",
node1: candidate1,
node2: nonCandidate2,
want: true,
},
{
name: "Compare non-candidate and candidate",
node1: nonCandidate1,
node2: candidate2,
want: false,
},
}
for _, test := range testCases {
t.Run(test.name, func(t *testing.T) {
got := p.ScaleDownEarlierThan(test.node1, test.node2)
assert.Equal(t, got, test.want)
})
}
}

View File

@ -0,0 +1,68 @@
/*
Copyright 2023 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package nodes
import (
"sort"
"time"
apiv1 "k8s.io/api/core/v1"
"k8s.io/autoscaler/cluster-autoscaler/context"
"k8s.io/autoscaler/cluster-autoscaler/utils/errors"
)
// ScaleDownCandidatesSortingProcessor is a wrapper for preFilteringProcessor that takes into account previous
// scale down candidates. This is necessary for efficient parallel scale down.
type ScaleDownCandidatesSortingProcessor struct {
preFilter *PreFilteringScaleDownNodeProcessor
previousCandidates *PreviousCandidates
}
// GetPodDestinationCandidates returns nodes that potentially could act as destinations for pods
// that would become unscheduled after a scale down.
func (p *ScaleDownCandidatesSortingProcessor) GetPodDestinationCandidates(ctx *context.AutoscalingContext,
nodes []*apiv1.Node) ([]*apiv1.Node, errors.AutoscalerError) {
return p.preFilter.GetPodDestinationCandidates(ctx, nodes)
}
// GetScaleDownCandidates returns filter nodes and move previous scale down candidates to the beginning of the list.
func (p *ScaleDownCandidatesSortingProcessor) GetScaleDownCandidates(ctx *context.AutoscalingContext,
nodes []*apiv1.Node) ([]*apiv1.Node, errors.AutoscalerError) {
candidates, err := p.preFilter.GetScaleDownCandidates(ctx, nodes)
if err != nil {
return candidates, err
}
sort.Slice(candidates, func(i, j int) bool {
return p.previousCandidates.ScaleDownEarlierThan(candidates[i], candidates[j])
})
return candidates, nil
}
// CleanUp is called at CA termination.
func (p *ScaleDownCandidatesSortingProcessor) CleanUp() {
}
// NewScaleDownCandidatesSortingProcessor returns a new PreFilteringScaleDownNodeProcessor.
func NewScaleDownCandidatesSortingProcessor() *ScaleDownCandidatesSortingProcessor {
return &ScaleDownCandidatesSortingProcessor{preFilter: NewPreFilteringScaleDownNodeProcessor(), previousCandidates: NewPreviousCandidates()}
}
// UpdateScaleDownCandidates updates scale down candidates.
func (p *ScaleDownCandidatesSortingProcessor) UpdateScaleDownCandidates(nodes []*apiv1.Node, now time.Time) {
p.previousCandidates.UpdateScaleDownCandidates(nodes, now)
}