Move backoff mechanism to utils.
This commit is contained in:
parent
d36988bec7
commit
dd1db7a0ac
|
|
@ -26,6 +26,7 @@ import (
|
||||||
"k8s.io/autoscaler/cluster-autoscaler/clusterstate/api"
|
"k8s.io/autoscaler/cluster-autoscaler/clusterstate/api"
|
||||||
"k8s.io/autoscaler/cluster-autoscaler/clusterstate/utils"
|
"k8s.io/autoscaler/cluster-autoscaler/clusterstate/utils"
|
||||||
"k8s.io/autoscaler/cluster-autoscaler/metrics"
|
"k8s.io/autoscaler/cluster-autoscaler/metrics"
|
||||||
|
"k8s.io/autoscaler/cluster-autoscaler/utils/backoff"
|
||||||
"k8s.io/autoscaler/cluster-autoscaler/utils/deletetaint"
|
"k8s.io/autoscaler/cluster-autoscaler/utils/deletetaint"
|
||||||
kube_util "k8s.io/autoscaler/cluster-autoscaler/utils/kubernetes"
|
kube_util "k8s.io/autoscaler/cluster-autoscaler/utils/kubernetes"
|
||||||
|
|
||||||
|
|
@ -110,12 +111,6 @@ type UnregisteredNode struct {
|
||||||
UnregisteredSince time.Time
|
UnregisteredSince time.Time
|
||||||
}
|
}
|
||||||
|
|
||||||
type scaleUpBackoff struct {
|
|
||||||
duration time.Duration
|
|
||||||
backoffUntil time.Time
|
|
||||||
lastFailedScaleUp time.Time
|
|
||||||
}
|
|
||||||
|
|
||||||
// ClusterStateRegistry is a structure to keep track the current state of the cluster.
|
// ClusterStateRegistry is a structure to keep track the current state of the cluster.
|
||||||
type ClusterStateRegistry struct {
|
type ClusterStateRegistry struct {
|
||||||
sync.Mutex
|
sync.Mutex
|
||||||
|
|
@ -130,7 +125,7 @@ type ClusterStateRegistry struct {
|
||||||
incorrectNodeGroupSizes map[string]IncorrectNodeGroupSize
|
incorrectNodeGroupSizes map[string]IncorrectNodeGroupSize
|
||||||
unregisteredNodes map[string]UnregisteredNode
|
unregisteredNodes map[string]UnregisteredNode
|
||||||
candidatesForScaleDown map[string][]string
|
candidatesForScaleDown map[string][]string
|
||||||
nodeGroupBackoffInfo map[string]scaleUpBackoff
|
nodeGroupBackoffInfo *backoff.Backoff
|
||||||
lastStatus *api.ClusterAutoscalerStatus
|
lastStatus *api.ClusterAutoscalerStatus
|
||||||
lastScaleDownUpdateTime time.Time
|
lastScaleDownUpdateTime time.Time
|
||||||
logRecorder *utils.LogEventRecorder
|
logRecorder *utils.LogEventRecorder
|
||||||
|
|
@ -153,7 +148,7 @@ func NewClusterStateRegistry(cloudProvider cloudprovider.CloudProvider, config C
|
||||||
incorrectNodeGroupSizes: make(map[string]IncorrectNodeGroupSize),
|
incorrectNodeGroupSizes: make(map[string]IncorrectNodeGroupSize),
|
||||||
unregisteredNodes: make(map[string]UnregisteredNode),
|
unregisteredNodes: make(map[string]UnregisteredNode),
|
||||||
candidatesForScaleDown: make(map[string][]string),
|
candidatesForScaleDown: make(map[string][]string),
|
||||||
nodeGroupBackoffInfo: make(map[string]scaleUpBackoff),
|
nodeGroupBackoffInfo: backoff.NewBackoff(InitialNodeGroupBackoffDuration, MaxNodeGroupBackoffDuration, NodeGroupBackoffResetTimeout),
|
||||||
lastStatus: emptyStatus,
|
lastStatus: emptyStatus,
|
||||||
logRecorder: logRecorder,
|
logRecorder: logRecorder,
|
||||||
}
|
}
|
||||||
|
|
@ -176,11 +171,7 @@ func (csr *ClusterStateRegistry) RegisterScaleDown(request *ScaleDownRequest) {
|
||||||
// To be executed under a lock.
|
// To be executed under a lock.
|
||||||
func (csr *ClusterStateRegistry) updateScaleRequests(currentTime time.Time) {
|
func (csr *ClusterStateRegistry) updateScaleRequests(currentTime time.Time) {
|
||||||
// clean up stale backoff info
|
// clean up stale backoff info
|
||||||
for ngId, backoffInfo := range csr.nodeGroupBackoffInfo {
|
csr.nodeGroupBackoffInfo.RemoveStaleBackoffData(currentTime)
|
||||||
if backoffInfo.lastFailedScaleUp.Add(NodeGroupBackoffResetTimeout).Before(currentTime) {
|
|
||||||
delete(csr.nodeGroupBackoffInfo, ngId)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
timedOutSur := make([]*ScaleUpRequest, 0)
|
timedOutSur := make([]*ScaleUpRequest, 0)
|
||||||
newSur := make([]*ScaleUpRequest, 0)
|
newSur := make([]*ScaleUpRequest, 0)
|
||||||
|
|
@ -188,7 +179,7 @@ func (csr *ClusterStateRegistry) updateScaleRequests(currentTime time.Time) {
|
||||||
if !csr.areThereUpcomingNodesInNodeGroup(sur.NodeGroupName) {
|
if !csr.areThereUpcomingNodesInNodeGroup(sur.NodeGroupName) {
|
||||||
// scale-out finished successfully
|
// scale-out finished successfully
|
||||||
// remove it and reset node group backoff
|
// remove it and reset node group backoff
|
||||||
delete(csr.nodeGroupBackoffInfo, sur.NodeGroupName)
|
csr.nodeGroupBackoffInfo.RemoveBackoff(sur.NodeGroupName)
|
||||||
glog.V(4).Infof("Scale up in group %v finished successfully in %v",
|
glog.V(4).Infof("Scale up in group %v finished successfully in %v",
|
||||||
sur.NodeGroupName, currentTime.Sub(sur.Time))
|
sur.NodeGroupName, currentTime.Sub(sur.Time))
|
||||||
continue
|
continue
|
||||||
|
|
@ -228,24 +219,7 @@ func (csr *ClusterStateRegistry) updateScaleRequests(currentTime time.Time) {
|
||||||
|
|
||||||
// To be executed under a lock.
|
// To be executed under a lock.
|
||||||
func (csr *ClusterStateRegistry) backoffNodeGroup(nodeGroupName string, currentTime time.Time) {
|
func (csr *ClusterStateRegistry) backoffNodeGroup(nodeGroupName string, currentTime time.Time) {
|
||||||
duration := InitialNodeGroupBackoffDuration
|
backoffUntil := csr.nodeGroupBackoffInfo.Backoff(nodeGroupName, currentTime)
|
||||||
if backoffInfo, found := csr.nodeGroupBackoffInfo[nodeGroupName]; found {
|
|
||||||
// Multiple concurrent scale-ups failing shouldn't cause backoff
|
|
||||||
// duration to increase, so we only increase it if we're not in
|
|
||||||
// backoff right now.
|
|
||||||
if backoffInfo.backoffUntil.Before(currentTime) {
|
|
||||||
duration = 2 * backoffInfo.duration
|
|
||||||
if duration > MaxNodeGroupBackoffDuration {
|
|
||||||
duration = MaxNodeGroupBackoffDuration
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
backoffUntil := currentTime.Add(duration)
|
|
||||||
csr.nodeGroupBackoffInfo[nodeGroupName] = scaleUpBackoff{
|
|
||||||
duration: duration,
|
|
||||||
backoffUntil: backoffUntil,
|
|
||||||
lastFailedScaleUp: currentTime,
|
|
||||||
}
|
|
||||||
glog.Warningf("Disabling scale-up for node group %v until %v", nodeGroupName, backoffUntil)
|
glog.Warningf("Disabling scale-up for node group %v until %v", nodeGroupName, backoffUntil)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -386,8 +360,7 @@ func (csr *ClusterStateRegistry) IsNodeGroupSafeToScaleUp(nodeGroupName string,
|
||||||
if !csr.IsNodeGroupHealthy(nodeGroupName) {
|
if !csr.IsNodeGroupHealthy(nodeGroupName) {
|
||||||
return false
|
return false
|
||||||
}
|
}
|
||||||
backoffInfo, found := csr.nodeGroupBackoffInfo[nodeGroupName]
|
return !csr.nodeGroupBackoffInfo.IsBackedOff(nodeGroupName, now)
|
||||||
return !found || backoffInfo.backoffUntil.Before(now)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func (csr *ClusterStateRegistry) areThereUpcomingNodesInNodeGroup(nodeGroupName string) bool {
|
func (csr *ClusterStateRegistry) areThereUpcomingNodesInNodeGroup(nodeGroupName string) bool {
|
||||||
|
|
|
||||||
|
|
@ -669,6 +669,5 @@ func TestScaleUpBackoff(t *testing.T) {
|
||||||
assert.True(t, clusterstate.IsClusterHealthy())
|
assert.True(t, clusterstate.IsClusterHealthy())
|
||||||
assert.True(t, clusterstate.IsNodeGroupHealthy("ng1"))
|
assert.True(t, clusterstate.IsNodeGroupHealthy("ng1"))
|
||||||
assert.True(t, clusterstate.IsNodeGroupSafeToScaleUp("ng1", now))
|
assert.True(t, clusterstate.IsNodeGroupSafeToScaleUp("ng1", now))
|
||||||
_, found := clusterstate.nodeGroupBackoffInfo["ng1"]
|
assert.False(t, clusterstate.nodeGroupBackoffInfo.IsBackedOff("ng1", now))
|
||||||
assert.False(t, found)
|
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -0,0 +1,83 @@
|
||||||
|
/*
|
||||||
|
Copyright 2018 The Kubernetes Authors.
|
||||||
|
|
||||||
|
Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
you may not use this file except in compliance with the License.
|
||||||
|
You may obtain a copy of the License at
|
||||||
|
|
||||||
|
http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
|
||||||
|
Unless required by applicable law or agreed to in writing, software
|
||||||
|
distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
See the License for the specific language governing permissions and
|
||||||
|
limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package backoff
|
||||||
|
|
||||||
|
import (
|
||||||
|
"time"
|
||||||
|
)
|
||||||
|
|
||||||
|
type backoffInfo struct {
|
||||||
|
duration time.Duration
|
||||||
|
backoffUntil time.Time
|
||||||
|
lastFailedExecution time.Time
|
||||||
|
}
|
||||||
|
|
||||||
|
// Backoff handles backing off executions.
|
||||||
|
type Backoff struct {
|
||||||
|
maxBackoffDuration time.Duration
|
||||||
|
initialBackoffDuration time.Duration
|
||||||
|
backoffResetTimeout time.Duration
|
||||||
|
backoffInfo map[string]backoffInfo
|
||||||
|
}
|
||||||
|
|
||||||
|
// NewBackoff creates an instance of Backoff.
|
||||||
|
func NewBackoff(initialBackoffDuration time.Duration, maxBackoffDuration time.Duration, backoffResetTimeout time.Duration) *Backoff {
|
||||||
|
return &Backoff{maxBackoffDuration, initialBackoffDuration, backoffResetTimeout, make(map[string]backoffInfo)}
|
||||||
|
}
|
||||||
|
|
||||||
|
// RemoveStaleBackoffData removes stale backoff data.
|
||||||
|
func (b *Backoff) RemoveStaleBackoffData(currentTime time.Time) {
|
||||||
|
for key, backoffInfo := range b.backoffInfo {
|
||||||
|
if backoffInfo.lastFailedExecution.Add(b.backoffResetTimeout).Before(currentTime) {
|
||||||
|
delete(b.backoffInfo, key)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Backoff execution for the given key. Returns time till execution is backed off.
|
||||||
|
func (b *Backoff) Backoff(key string, currentTime time.Time) time.Time {
|
||||||
|
duration := b.initialBackoffDuration
|
||||||
|
if backoffInfo, found := b.backoffInfo[key]; found {
|
||||||
|
// Multiple concurrent scale-ups failing shouldn't cause backoff
|
||||||
|
// duration to increase, so we only increase it if we're not in
|
||||||
|
// backoff right now.
|
||||||
|
if backoffInfo.backoffUntil.Before(currentTime) {
|
||||||
|
duration = 2 * backoffInfo.duration
|
||||||
|
if duration > b.maxBackoffDuration {
|
||||||
|
duration = b.maxBackoffDuration
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
backoffUntil := currentTime.Add(duration)
|
||||||
|
b.backoffInfo[key] = backoffInfo{
|
||||||
|
duration: duration,
|
||||||
|
backoffUntil: backoffUntil,
|
||||||
|
lastFailedExecution: currentTime,
|
||||||
|
}
|
||||||
|
return backoffUntil
|
||||||
|
}
|
||||||
|
|
||||||
|
// RemoveBackoff removes backoff data for the given key.
|
||||||
|
func (b *Backoff) RemoveBackoff(key string) {
|
||||||
|
delete(b.backoffInfo, key)
|
||||||
|
}
|
||||||
|
|
||||||
|
// IsBackedOff returns true if execution is backed off for the given key.
|
||||||
|
func (b *Backoff) IsBackedOff(key string, currentTime time.Time) bool {
|
||||||
|
backoffInfo, found := b.backoffInfo[key]
|
||||||
|
return found && backoffInfo.backoffUntil.After(currentTime)
|
||||||
|
}
|
||||||
|
|
@ -0,0 +1,71 @@
|
||||||
|
/*
|
||||||
|
Copyright 2018 The Kubernetes Authors.
|
||||||
|
|
||||||
|
Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
you may not use this file except in compliance with the License.
|
||||||
|
You may obtain a copy of the License at
|
||||||
|
|
||||||
|
http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
|
||||||
|
Unless required by applicable law or agreed to in writing, software
|
||||||
|
distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
See the License for the specific language governing permissions and
|
||||||
|
limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package backoff
|
||||||
|
|
||||||
|
import (
|
||||||
|
"testing"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"github.com/stretchr/testify/assert"
|
||||||
|
)
|
||||||
|
|
||||||
|
func TestBackoffTwoKeys(t *testing.T) {
|
||||||
|
backoff := NewBackoff(10*time.Minute, time.Hour, 3*time.Hour)
|
||||||
|
startTime := time.Now()
|
||||||
|
assert.False(t, backoff.IsBackedOff("key1", startTime))
|
||||||
|
assert.False(t, backoff.IsBackedOff("key2", startTime))
|
||||||
|
backoff.Backoff("key1", startTime.Add(time.Minute))
|
||||||
|
assert.True(t, backoff.IsBackedOff("key1", startTime.Add(2*time.Minute)))
|
||||||
|
assert.False(t, backoff.IsBackedOff("key2", startTime))
|
||||||
|
assert.False(t, backoff.IsBackedOff("key1", startTime.Add(11*time.Minute)))
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestMaxBackoff(t *testing.T) {
|
||||||
|
backoff := NewBackoff(1*time.Minute, 3*time.Minute, 3*time.Hour)
|
||||||
|
startTime := time.Now()
|
||||||
|
backoff.Backoff("key1", startTime)
|
||||||
|
assert.True(t, backoff.IsBackedOff("key1", startTime))
|
||||||
|
assert.False(t, backoff.IsBackedOff("key1", startTime.Add(1*time.Minute)))
|
||||||
|
backoff.Backoff("key1", startTime.Add(1*time.Minute))
|
||||||
|
assert.True(t, backoff.IsBackedOff("key1", startTime.Add(1*time.Minute)))
|
||||||
|
assert.False(t, backoff.IsBackedOff("key1", startTime.Add(3*time.Minute)))
|
||||||
|
backoff.Backoff("key1", startTime.Add(3*time.Minute))
|
||||||
|
assert.True(t, backoff.IsBackedOff("key1", startTime.Add(3*time.Minute)))
|
||||||
|
assert.False(t, backoff.IsBackedOff("key1", startTime.Add(6*time.Minute)))
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestRemoveBackoff(t *testing.T) {
|
||||||
|
backoff := NewBackoff(1*time.Minute, 3*time.Minute, 3*time.Hour)
|
||||||
|
startTime := time.Now()
|
||||||
|
backoff.Backoff("key1", startTime)
|
||||||
|
assert.True(t, backoff.IsBackedOff("key1", startTime))
|
||||||
|
backoff.RemoveBackoff("key1")
|
||||||
|
assert.False(t, backoff.IsBackedOff("key1", startTime))
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestResetStaleBackoffData(t *testing.T) {
|
||||||
|
backoff := NewBackoff(1*time.Minute, 3*time.Minute, 3*time.Hour)
|
||||||
|
startTime := time.Now()
|
||||||
|
backoff.Backoff("key1", startTime)
|
||||||
|
backoff.Backoff("key2", startTime.Add(time.Hour))
|
||||||
|
backoff.RemoveStaleBackoffData(startTime.Add(time.Hour))
|
||||||
|
assert.Equal(t, 2, len(backoff.backoffInfo))
|
||||||
|
backoff.RemoveStaleBackoffData(startTime.Add(4 * time.Hour))
|
||||||
|
assert.Equal(t, 1, len(backoff.backoffInfo))
|
||||||
|
backoff.RemoveStaleBackoffData(startTime.Add(5 * time.Hour))
|
||||||
|
assert.Equal(t, 0, len(backoff.backoffInfo))
|
||||||
|
}
|
||||||
Loading…
Reference in New Issue