Limit binpacking based on #new_nodes or time

The binpacking algorithm is O(#pending_pods * #new_nodes) and
calculating a very large scale-up can get stuck for minutes or even
hours, leading to CA failing it's healthcheck and going down.
The new limiting prevents this scenario by stopping binpacking after
reaching specified threshold. Any pods that remain pending as a result
of shorter binpacking will be processed next autoscaler loop.

The thresholds used can be controlled with newly introduced flags:
--max-nodes-per-scaleup and --max-nodegroup-binpacking-duration. The
limiting can be disabled by setting both flags to 0 (not recommended,
especially for --max-nodegroup-binpacking-duration).
This commit is contained in:
Maciek Pytel 2022-06-14 11:01:46 +02:00
parent f599494f48
commit ab891418f6
10 changed files with 211 additions and 49 deletions

View File

@ -185,4 +185,11 @@ type AutoscalingOptions struct {
GceExpanderEphemeralStorageSupport bool
// RecordDuplicatedEvents controls whether events should be duplicated within a 5 minute window.
RecordDuplicatedEvents bool
// MaxNodesPerScaleUp controls how many nodes can be added in a single scale-up.
// Note that this is strictly a performance optimization aimed at limiting binpacking time, not a tool to rate-limit
// scale-up. There is nothing stopping CA from adding MaxNodesPerScaleUp every loop.
MaxNodesPerScaleUp int
// MaxNodeGroupBinpackingDuration is a maximum time that can be spent binpacking a single NodeGroup. If the threshold
// is exceeded binpacking will be cut short and a partial scale-up will be performed.
MaxNodeGroupBinpackingDuration time.Duration
}

View File

@ -112,7 +112,7 @@ func initializeDefaultOptions(opts *AutoscalerOptions) error {
opts.ExpanderStrategy = expanderStrategy
}
if opts.EstimatorBuilder == nil {
estimatorBuilder, err := estimator.NewEstimatorBuilder(opts.EstimatorName, estimator.NewFakeEstimationLimiter(0))
estimatorBuilder, err := estimator.NewEstimatorBuilder(opts.EstimatorName, estimator.NewThresholdBasedEstimationLimiter(opts.MaxNodesPerScaleUp, opts.MaxNodeGroupBinpackingDuration))
if err != nil {
return err
}

View File

@ -165,7 +165,7 @@ func NewScaleTestAutoscalingContext(
}
// Ignoring error here is safe - if a test doesn't specify valid estimatorName,
// it either doesn't need one, or should fail when it turns out to be nil.
estimatorBuilder, _ := estimator.NewEstimatorBuilder(options.EstimatorName, estimator.NewFakeEstimationLimiter(0))
estimatorBuilder, _ := estimator.NewEstimatorBuilder(options.EstimatorName, estimator.NewThresholdBasedEstimationLimiter(0, 0))
predicateChecker, err := simulator.NewTestPredicateChecker()
if err != nil {
return context.AutoscalingContext{}, err

View File

@ -105,7 +105,7 @@ func (e *BinpackingNodeEstimator) Estimate(
if !found {
// Stop binpacking if we reach the limit of nodes we can add.
// We return the result of the binpacking that we already performed.
if !e.limiter.PermissionToAddNodes(1) {
if !e.limiter.PermissionToAddNode() {
break
}

View File

@ -105,7 +105,7 @@ func TestBinpackingEstimate(t *testing.T) {
}
for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
limiter := NewFakeEstimationLimiter(tc.maxNodes)
limiter := NewThresholdBasedEstimationLimiter(tc.maxNodes, time.Duration(0))
estimator := newBinPackingEstimator(t, limiter)
node := &apiv1.Node{
Status: apiv1.NodeStatus{

View File

@ -64,11 +64,10 @@ type EstimationLimiter interface {
StartEstimation([]*apiv1.Pod, cloudprovider.NodeGroup)
// EndEstimation is called at the end of estimation.
EndEstimation()
// PermissionToAddNodes is called by an estimator when it wants to add additional
// PermissionToAddNode is called by an estimator when it wants to add additional
// nodes to simulation. If permission is not granted the Estimator is expected
// not to add any more nodes in this simulation (though it may request a
// permission to add less nodes).
// not to add any more nodes in this simulation.
// There is no requirement for the Estimator to stop calculations, it's
// just not expected to add any more nodes.
PermissionToAddNodes(int) bool
PermissionToAddNode() bool
}

View File

@ -1,41 +0,0 @@
/*
Copyright 2022 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package estimator
import (
apiv1 "k8s.io/api/core/v1"
"k8s.io/autoscaler/cluster-autoscaler/cloudprovider"
)
type fakeEstimationLimiter struct {
nodes int
maxNodes int
}
func (_ *fakeEstimationLimiter) StartEstimation([]*apiv1.Pod, cloudprovider.NodeGroup) {}
func (_ *fakeEstimationLimiter) EndEstimation() {}
func (f *fakeEstimationLimiter) PermissionToAddNodes(nodes int) bool {
f.nodes += nodes
return f.maxNodes == 0 || f.nodes <= f.maxNodes
}
func NewFakeEstimationLimiter(maxNodes int) EstimationLimiter {
return &fakeEstimationLimiter{
nodes: 0,
maxNodes: maxNodes,
}
}

View File

@ -0,0 +1,64 @@
/*
Copyright 2022 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package estimator
import (
"time"
apiv1 "k8s.io/api/core/v1"
"k8s.io/autoscaler/cluster-autoscaler/cloudprovider"
klog "k8s.io/klog/v2"
)
type thresholdBasedEstimationLimiter struct {
maxDuration time.Duration
maxNodes int
nodes int
start time.Time
}
func (tbel *thresholdBasedEstimationLimiter) StartEstimation([]*apiv1.Pod, cloudprovider.NodeGroup) {
tbel.start = time.Now()
tbel.nodes = 0
}
func (*thresholdBasedEstimationLimiter) EndEstimation() {}
func (tbel *thresholdBasedEstimationLimiter) PermissionToAddNode() bool {
if tbel.maxNodes > 0 && tbel.nodes >= tbel.maxNodes {
klog.V(4).Infof("Capping binpacking after exceeding threshold of %i nodes", tbel.maxNodes)
return false
}
timeDefined := tbel.maxDuration > 0 && tbel.start != time.Time{}
if timeDefined && time.Now().After(tbel.start.Add(tbel.maxDuration)) {
klog.V(4).Infof("Capping binpacking after exceeding max duration of %v", tbel.maxDuration)
return false
}
tbel.nodes++
return true
}
// NewThresholdBasedEstimationLimiter returns an EstimationLimiter that will prevent estimation
// after either a node count- of time-based threshold is reached. This is meant to prevent cases
// where binpacking of hundreds or thousands of nodes takes extremely long time rendering CA
// incredibly slow or even completely crashing it.
func NewThresholdBasedEstimationLimiter(maxNodes int, maxDuration time.Duration) EstimationLimiter {
return &thresholdBasedEstimationLimiter{
maxNodes: maxNodes,
maxDuration: maxDuration,
}
}

View File

@ -0,0 +1,129 @@
/*
Copyright 2016 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package estimator
import (
"testing"
"time"
apiv1 "k8s.io/api/core/v1"
"github.com/stretchr/testify/assert"
)
type limiterOperation func(*testing.T, EstimationLimiter)
func expectDeny(t *testing.T, l EstimationLimiter) {
assert.Equal(t, false, l.PermissionToAddNode())
}
func expectAllow(t *testing.T, l EstimationLimiter) {
assert.Equal(t, true, l.PermissionToAddNode())
}
func resetLimiter(t *testing.T, l EstimationLimiter) {
l.EndEstimation()
l.StartEstimation([]*apiv1.Pod{}, nil)
}
func TestThresholdBasedLimiter(t *testing.T) {
testCases := []struct {
name string
maxNodes int
maxDuration time.Duration
startDelta time.Duration
operations []limiterOperation
expectNodeCount int
}{
{
name: "no limiting happens",
maxNodes: 20,
operations: []limiterOperation{
expectAllow,
expectAllow,
expectAllow,
},
expectNodeCount: 3,
},
{
name: "time based trigger fires",
maxNodes: 20,
maxDuration: 5 * time.Second,
startDelta: -10 * time.Second,
operations: []limiterOperation{
expectDeny,
expectDeny,
},
expectNodeCount: 0,
},
{
name: "sequence of additions works until the threshold is hit",
maxNodes: 3,
operations: []limiterOperation{
expectAllow,
expectAllow,
expectAllow,
expectDeny,
},
expectNodeCount: 3,
},
{
name: "node counter is reset",
maxNodes: 2,
operations: []limiterOperation{
expectAllow,
expectAllow,
expectDeny,
resetLimiter,
expectAllow,
},
expectNodeCount: 1,
},
{
name: "timer is reset",
maxNodes: 20,
maxDuration: 5 * time.Second,
startDelta: -10 * time.Second,
operations: []limiterOperation{
expectDeny,
resetLimiter,
expectAllow,
expectAllow,
},
expectNodeCount: 2,
},
}
for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
limiter := &thresholdBasedEstimationLimiter{
maxNodes: tc.maxNodes,
maxDuration: tc.maxDuration,
}
limiter.StartEstimation([]*apiv1.Pod{}, nil)
if tc.startDelta != time.Duration(0) {
limiter.start = limiter.start.Add(tc.startDelta)
}
for _, op := range tc.operations {
op(t, limiter)
}
assert.Equal(t, tc.expectNodeCount, limiter.nodes)
limiter.EndEstimation()
})
}
}

View File

@ -202,6 +202,8 @@ var (
maxDrainParallelismFlag = flag.Int("max-drain-parallelism", 1, "Maximum number of nodes needing drain, that can be drained and deleted in parallel.")
gceExpanderEphemeralStorageSupport = flag.Bool("gce-expander-ephemeral-storage-support", false, "Whether scale-up takes ephemeral storage resources into account for GCE cloud provider")
recordDuplicatedEvents = flag.Bool("record-duplicated-events", false, "enable duplication of similar events within a 5 minute window.")
maxNodesPerScaleUp = flag.Int("max-nodes-per-scaleup", 1000, "Max nodes added in a single scale-up. This is intended strictly for optimizing CA algorithm latency and not a tool to rate-limit scale-up throughput.")
maxNodeGroupBinpackingDuration = flag.Duration("max-nodegroup-binpacking-duration", 10*time.Second, "Maximum time that will be spent in binpacking simulation for each NodeGroup.")
)
func createAutoscalingOptions() config.AutoscalingOptions {
@ -289,6 +291,8 @@ func createAutoscalingOptions() config.AutoscalingOptions {
MaxDrainParallelism: *maxDrainParallelismFlag,
GceExpanderEphemeralStorageSupport: *gceExpanderEphemeralStorageSupport,
RecordDuplicatedEvents: *recordDuplicatedEvents,
MaxNodesPerScaleUp: *maxNodesPerScaleUp,
MaxNodeGroupBinpackingDuration: *maxNodeGroupBinpackingDuration,
}
}