Add empty nodes sorting for scale down candidates

This commit is contained in:
Yaroslava Serdiuk 2023-03-08 11:03:45 +00:00
parent af27896bb2
commit cea9d1a73b
8 changed files with 394 additions and 23 deletions

View File

@ -49,7 +49,9 @@ import (
ca_processors "k8s.io/autoscaler/cluster-autoscaler/processors"
"k8s.io/autoscaler/cluster-autoscaler/processors/nodegroupset"
"k8s.io/autoscaler/cluster-autoscaler/processors/nodeinfosprovider"
"k8s.io/autoscaler/cluster-autoscaler/processors/nodes"
"k8s.io/autoscaler/cluster-autoscaler/processors/scaledowncandidates"
"k8s.io/autoscaler/cluster-autoscaler/processors/scaledowncandidates/emptycandidates"
"k8s.io/autoscaler/cluster-autoscaler/processors/scaledowncandidates/previouscandidates"
"k8s.io/autoscaler/cluster-autoscaler/simulator/clustersnapshot"
"k8s.io/autoscaler/cluster-autoscaler/utils/errors"
kube_util "k8s.io/autoscaler/cluster-autoscaler/utils/kubernetes"
@ -402,11 +404,17 @@ func buildAutoscaler(debuggingSnapshotter debuggingsnapshot.DebuggingSnapshotter
opts.Processors = ca_processors.DefaultProcessors()
opts.Processors.TemplateNodeInfoProvider = nodeinfosprovider.NewDefaultTemplateNodeInfoProvider(nodeInfoCacheExpireTime, *forceDaemonSets)
opts.Processors.PodListProcessor = podlistprocessor.NewDefaultPodListProcessor(opts.PredicateChecker)
scaleDownCandidatesComparers := []scaledowncandidates.CandidatesComparer{}
if autoscalingOptions.ParallelDrain {
sdProcessor := nodes.NewScaleDownCandidatesSortingProcessor()
opts.Processors.ScaleDownNodeProcessor = sdProcessor
opts.Processors.ScaleDownCandidatesNotifier.Register(sdProcessor)
sdCandidatesSorting := previouscandidates.NewPreviousCandidates()
scaleDownCandidatesComparers = []scaledowncandidates.CandidatesComparer{
emptycandidates.NewEmptySortingProcessor(&autoscalingOptions, emptycandidates.NewNodeInfoGetter(opts.ClusterSnapshot)),
sdCandidatesSorting,
}
opts.Processors.ScaleDownCandidatesNotifier.Register(sdCandidatesSorting)
}
sdProcessor := scaledowncandidates.NewScaleDownCandidatesSortingProcessor(scaleDownCandidatesComparers)
opts.Processors.ScaleDownNodeProcessor = sdProcessor
var nodeInfoComparator nodegroupset.NodeInfoComparator
if len(autoscalingOptions.BalancingLabels) > 0 {

View File

@ -0,0 +1,80 @@
/*
Copyright 2023 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package emptycandidates
import (
"time"
apiv1 "k8s.io/api/core/v1"
"k8s.io/autoscaler/cluster-autoscaler/config"
"k8s.io/autoscaler/cluster-autoscaler/simulator"
"k8s.io/autoscaler/cluster-autoscaler/simulator/clustersnapshot"
schedulerframework "k8s.io/kubernetes/pkg/scheduler/framework"
)
type nodeInfoGetter interface {
GetNodeInfo(nodeName string) (*schedulerframework.NodeInfo, error)
}
type nodeInfoGetterImpl struct {
c clustersnapshot.ClusterSnapshot
}
func (n *nodeInfoGetterImpl) GetNodeInfo(nodeName string) (*schedulerframework.NodeInfo, error) {
return n.c.NodeInfos().Get(nodeName)
}
// NewNodeInfoGetter limits ClusterSnapshot interface to NodeInfoGet() method.
func NewNodeInfoGetter(c clustersnapshot.ClusterSnapshot) *nodeInfoGetterImpl {
return &nodeInfoGetterImpl{c}
}
// EmptySorting is sorting scale down candidates so that empty nodes appear first.
type EmptySorting struct {
nodeInfoGetter
deleteOptions simulator.NodeDeleteOptions
}
// NewEmptySortingProcessor return EmptySorting struct.
func NewEmptySortingProcessor(opts *config.AutoscalingOptions, n nodeInfoGetter) *EmptySorting {
deleteOptions := simulator.NodeDeleteOptions{
SkipNodesWithSystemPods: opts.SkipNodesWithSystemPods,
SkipNodesWithLocalStorage: opts.SkipNodesWithLocalStorage,
MinReplicaCount: opts.MinReplicaCount,
}
return &EmptySorting{n, deleteOptions}
}
// ScaleDownEarlierThan return true if node1 is empty and node2 isn't.
func (p *EmptySorting) ScaleDownEarlierThan(node1, node2 *apiv1.Node) bool {
if p.isNodeEmpty(node1) && !p.isNodeEmpty(node2) {
return true
}
return false
}
func (p *EmptySorting) isNodeEmpty(node *apiv1.Node) bool {
nodeInfo, err := p.nodeInfoGetter.GetNodeInfo(node.Name)
if err != nil {
return false
}
podsToRemove, _, _, err := simulator.GetPodsToMove(nodeInfo, p.deleteOptions, nil, nil, time.Now())
if err == nil && len(podsToRemove) == 0 {
return true
}
return false
}

View File

@ -0,0 +1,121 @@
/*
Copyright 2023 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package emptycandidates
import (
"fmt"
"testing"
v1 "k8s.io/api/core/v1"
"k8s.io/autoscaler/cluster-autoscaler/simulator"
. "k8s.io/autoscaler/cluster-autoscaler/utils/test"
schedulerframework "k8s.io/kubernetes/pkg/scheduler/framework"
)
var err = fmt.Errorf("error")
type testNodeInfoGetter struct {
m map[string]*schedulerframework.NodeInfo
}
func (t *testNodeInfoGetter) GetNodeInfo(nodeName string) (*schedulerframework.NodeInfo, error) {
if nodeInfo, ok := t.m[nodeName]; ok {
return nodeInfo, nil
}
return nil, err
}
func TestScaleDownEarlierThan(t *testing.T) {
niEmpty := schedulerframework.NewNodeInfo()
nodeEmptyName := "nodeEmpty"
nodeEmpty := BuildTestNode(nodeEmptyName, 0, 100)
niEmpty.SetNode(nodeEmpty)
niEmpty2 := schedulerframework.NewNodeInfo()
nodeEmptyName2 := "nodeEmpty2"
nodeEmpty2 := BuildTestNode(nodeEmptyName2, 0, 100)
niEmpty.SetNode(nodeEmpty2)
niNonEmpty := schedulerframework.NewNodeInfo()
nodeNonEmptyName := "nodeNonEmpty"
nodeNonEmpty := BuildTestNode(nodeNonEmptyName, 0, 100)
niNonEmpty.SetNode(nodeNonEmpty)
pod := BuildTestPod("p1", 0, 100)
pi, _ := schedulerframework.NewPodInfo(pod)
niNonEmpty.AddPodInfo(pi)
noNodeInfoNode := BuildTestNode("n1", 0, 100)
niGetter := testNodeInfoGetter{map[string]*schedulerframework.NodeInfo{nodeEmptyName: niEmpty, nodeNonEmptyName: niNonEmpty, nodeEmptyName2: niEmpty2}}
deleteOptions := simulator.NodeDeleteOptions{
SkipNodesWithSystemPods: true,
SkipNodesWithLocalStorage: true,
MinReplicaCount: 0,
}
p := EmptySorting{&niGetter, deleteOptions}
tests := []struct {
name string
node1 *v1.Node
node2 *v1.Node
wantEarlier bool
}{
{
name: "Empty node earlier that non-empty node",
node1: nodeEmpty,
node2: nodeNonEmpty,
wantEarlier: true,
},
{
name: "Non-empty node is not earlier that empty node",
node1: nodeEmpty,
node2: nodeNonEmpty,
wantEarlier: true,
},
{
name: "Empty node earlier that node without nodeInfo",
node1: nodeEmpty,
node2: noNodeInfoNode,
wantEarlier: true,
},
{
name: "Non-empty node is not earlier that node without nodeInfo",
node1: nodeNonEmpty,
node2: noNodeInfoNode,
wantEarlier: false,
},
{
name: "Node without nodeInfo is not earlier that non-empty node",
node1: noNodeInfoNode,
node2: nodeNonEmpty,
wantEarlier: false,
},
{
name: "Empty node is not earlier that another empty node",
node1: nodeEmpty,
node2: nodeEmpty2,
wantEarlier: false,
},
}
for _, test := range tests {
gotEarlier := p.ScaleDownEarlierThan(test.node1, test.node2)
if gotEarlier != test.wantEarlier {
t.Errorf("%s: want %v, got %v", test.name, test.wantEarlier, gotEarlier)
}
}
}

View File

@ -14,7 +14,7 @@ See the License for the specific language governing permissions and
limitations under the License.
*/
package nodes
package previouscandidates
import (
"time"

View File

@ -14,7 +14,7 @@ See the License for the specific language governing permissions and
limitations under the License.
*/
package nodes
package previouscandidates
import (
"testing"

View File

@ -14,23 +14,21 @@ See the License for the specific language governing permissions and
limitations under the License.
*/
package nodes
package scaledowncandidates
import (
"sort"
"time"
apiv1 "k8s.io/api/core/v1"
"k8s.io/autoscaler/cluster-autoscaler/context"
"k8s.io/autoscaler/cluster-autoscaler/processors/nodes"
"k8s.io/autoscaler/cluster-autoscaler/utils/errors"
)
// ScaleDownCandidatesSortingProcessor is a wrapper for preFilteringProcessor that takes into account previous
// scale down candidates. This is necessary for efficient parallel scale down.
type ScaleDownCandidatesSortingProcessor struct {
preFilter *PreFilteringScaleDownNodeProcessor
previousCandidates *PreviousCandidates
preFilter *nodes.PreFilteringScaleDownNodeProcessor
sorting []CandidatesComparer
}
// GetPodDestinationCandidates returns nodes that potentially could act as destinations for pods
@ -47,10 +45,8 @@ func (p *ScaleDownCandidatesSortingProcessor) GetScaleDownCandidates(ctx *contex
if err != nil {
return candidates, err
}
sort.Slice(candidates, func(i, j int) bool {
return p.previousCandidates.ScaleDownEarlierThan(candidates[i], candidates[j])
})
return candidates, nil
n := NodeSorter{nodes: candidates, processors: p.sorting}
return n.Sort(), err
}
// CleanUp is called at CA termination.
@ -58,11 +54,6 @@ func (p *ScaleDownCandidatesSortingProcessor) CleanUp() {
}
// NewScaleDownCandidatesSortingProcessor returns a new PreFilteringScaleDownNodeProcessor.
func NewScaleDownCandidatesSortingProcessor() *ScaleDownCandidatesSortingProcessor {
return &ScaleDownCandidatesSortingProcessor{preFilter: NewPreFilteringScaleDownNodeProcessor(), previousCandidates: NewPreviousCandidates()}
}
// UpdateScaleDownCandidates updates scale down candidates.
func (p *ScaleDownCandidatesSortingProcessor) UpdateScaleDownCandidates(nodes []*apiv1.Node, now time.Time) {
p.previousCandidates.UpdateScaleDownCandidates(nodes, now)
func NewScaleDownCandidatesSortingProcessor(sorting []CandidatesComparer) *ScaleDownCandidatesSortingProcessor {
return &ScaleDownCandidatesSortingProcessor{preFilter: nodes.NewPreFilteringScaleDownNodeProcessor(), sorting: sorting}
}

View File

@ -0,0 +1,65 @@
/*
Copyright 2023 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package scaledowncandidates
import (
"sort"
apiv1 "k8s.io/api/core/v1"
)
// CandidatesComparer is an used for sorting scale down candidates.
type CandidatesComparer interface {
// ScaleDownEarlierThan return true if node1 should be scaled down earlier than node2.
ScaleDownEarlierThan(node1, node2 *apiv1.Node) bool
}
// NodeSorter struct contain the list of nodes and the list of processors that should be applied for sorting.
type NodeSorter struct {
nodes []*apiv1.Node
processors []CandidatesComparer
}
// Sort return list of nodes in descending order.
func (n *NodeSorter) Sort() []*apiv1.Node {
if len(n.processors) == 0 {
return n.nodes
}
sort.Sort(n)
return n.nodes
}
// Less return true if node with index i is less than node with index j.
func (n *NodeSorter) Less(i, j int) bool {
node1, node2 := n.nodes[i], n.nodes[j]
for _, processor := range n.processors {
if val := processor.ScaleDownEarlierThan(node1, node2); val || processor.ScaleDownEarlierThan(node2, node1) {
return val
}
}
return false
}
// Swap is swapping the nodes in the list.
func (n *NodeSorter) Swap(i, j int) {
n.nodes[i], n.nodes[j] = n.nodes[j], n.nodes[i]
}
// Len return the length of node's list.
func (n *NodeSorter) Len() int {
return len(n.nodes)
}

View File

@ -0,0 +1,106 @@
/*
Copyright 2023 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package scaledowncandidates
import (
"fmt"
"strconv"
"testing"
"github.com/google/go-cmp/cmp"
apiv1 "k8s.io/api/core/v1"
. "k8s.io/autoscaler/cluster-autoscaler/utils/test"
)
type scoreProcessor struct {
scores []int
}
func (p *scoreProcessor) ScaleDownEarlierThan(node1, node2 *apiv1.Node) bool {
idx1, _ := strconv.Atoi(node1.Name[5:])
idx2, _ := strconv.Atoi(node2.Name[5:])
return p.scores[idx1] > p.scores[idx2]
}
func TestSort(t *testing.T) {
testCases := []struct {
name string
numNodes int
numProcessors int
nodeScores [][]int //2d array that represent the score of node for each processor
sortedOrder []int
}{
{
name: "No score, order the same",
numNodes: 5,
sortedOrder: []int{0, 1, 2, 3, 4},
},
{
name: "One processor, the order has changed",
numNodes: 5,
numProcessors: 1,
nodeScores: [][]int{{3}, {4}, {1}, {2}, {0}},
sortedOrder: []int{1, 0, 3, 2, 4},
},
{
name: "Two processors, second processor did not affect the order",
numNodes: 5,
numProcessors: 2,
nodeScores: [][]int{{3, 5}, {4, 0}, {1, 2}, {2, 4}, {0, 5}},
sortedOrder: []int{1, 0, 3, 2, 4},
},
{
name: "Two processors, the first processor has equal scores",
numNodes: 5,
numProcessors: 2,
nodeScores: [][]int{{4, 5}, {4, 0}, {1, 2}, {2, 4}, {0, 5}},
sortedOrder: []int{0, 1, 3, 2, 4},
},
{
name: "Three processors, all three processors affected the order",
numNodes: 5,
numProcessors: 3,
nodeScores: [][]int{{1, 1, 1}, {1, 1, 2}, {1, 1, 3}, {1, 2, 1}, {2, 1, 1}},
sortedOrder: []int{4, 3, 2, 1, 0},
},
}
for _, test := range testCases {
nodes := []*apiv1.Node{}
for i := 0; i < test.numNodes; i++ {
node := BuildTestNode(fmt.Sprintf("node-%d", i), 10, 100)
nodes = append(nodes, node)
}
processors := []CandidatesComparer{}
for i := 0; i < test.numProcessors; i++ {
scores := []int{}
for _, nodeScore := range test.nodeScores {
scores = append(scores, nodeScore[i])
}
processors = append(processors, &scoreProcessor{scores: scores})
}
nd := NodeSorter{nodes: nodes, processors: processors}
sorted := nd.Sort()
got := []int{}
for _, node := range sorted {
idx, _ := strconv.Atoi(node.Name[5:])
got = append(got, idx)
}
if diff := cmp.Diff(test.sortedOrder, got); diff != "" {
t.Errorf("%s: NodeSorter.Sort() diff (-want +got):\n%s", test.name, diff)
}
}
}