Retain information about scale-up failures in CSR

This will provide the AutoscalingStatusProcessor with information
about failed scale-ups.
This commit is contained in:
Jakub Tużnik 2019-05-31 14:02:13 +02:00
parent 6a31047c1c
commit bb382f47f9
3 changed files with 87 additions and 2 deletions

View File

@ -113,6 +113,13 @@ type UnregisteredNode struct {
UnregisteredSince time.Time
}
// ScaleUpFailure contains information about a failure of a scale-up.
type ScaleUpFailure struct {
NodeGroup cloudprovider.NodeGroup
Reason metrics.FailedScaleUpReason
Time time.Time
}
// ClusterStateRegistry is a structure to keep track the current state of the cluster.
type ClusterStateRegistry struct {
sync.Mutex
@ -136,6 +143,10 @@ type ClusterStateRegistry struct {
previousCloudProviderNodeInstances map[string][]cloudprovider.Instance
cloudProviderNodeInstancesCache *utils.CloudProviderNodeInstancesCache
interrupt chan struct{}
// scaleUpFailures contains information about scale-up failures for each node group. It should be
// cleared periodically to avoid unnecessary accumulation.
scaleUpFailures map[string][]ScaleUpFailure
}
// NewClusterStateRegistry creates new ClusterStateRegistry.
@ -161,6 +172,7 @@ func NewClusterStateRegistry(cloudProvider cloudprovider.CloudProvider, config C
logRecorder: logRecorder,
cloudProviderNodeInstancesCache: utils.NewCloudProviderNodeInstancesCache(cloudProvider),
interrupt: make(chan struct{}),
scaleUpFailures: make(map[string][]ScaleUpFailure),
}
}
@ -248,8 +260,7 @@ func (csr *ClusterStateRegistry) updateScaleRequests(currentTime time.Time) {
csr.logRecorder.Eventf(apiv1.EventTypeWarning, "ScaleUpTimedOut",
"Nodes added to group %s failed to register within %v",
scaleUpRequest.NodeGroup.Id(), currentTime.Sub(scaleUpRequest.Time))
metrics.RegisterFailedScaleUp(metrics.Timeout)
csr.backoffNodeGroup(scaleUpRequest.NodeGroup, cloudprovider.OtherErrorClass, "timeout", currentTime)
csr.registerFailedScaleUpNoLock(scaleUpRequest.NodeGroup, metrics.Timeout, cloudprovider.OtherErrorClass, "timeout", currentTime)
delete(csr.scaleUpRequests, nodeGroupName)
}
}
@ -280,6 +291,7 @@ func (csr *ClusterStateRegistry) RegisterFailedScaleUp(nodeGroup cloudprovider.N
}
func (csr *ClusterStateRegistry) registerFailedScaleUpNoLock(nodeGroup cloudprovider.NodeGroup, reason metrics.FailedScaleUpReason, errorClass cloudprovider.InstanceErrorClass, errorCode string, currentTime time.Time) {
csr.scaleUpFailures[nodeGroup.Id()] = append(csr.scaleUpFailures[nodeGroup.Id()], ScaleUpFailure{NodeGroup: nodeGroup, Reason: reason, Time: currentTime})
metrics.RegisterFailedScaleUp(reason)
csr.backoffNodeGroup(nodeGroup, errorClass, errorCode, currentTime)
}
@ -1121,3 +1133,28 @@ func fakeNode(instance cloudprovider.Instance) *apiv1.Node {
},
}
}
// PeriodicCleanup performs clean-ups that should be done periodically, e.g.
// each Autoscaler loop.
func (csr *ClusterStateRegistry) PeriodicCleanup() {
// Clear the scale-up failures info so they don't accumulate.
csr.clearScaleUpFailures()
}
// clearScaleUpFailures clears the scale-up failures map.
func (csr *ClusterStateRegistry) clearScaleUpFailures() {
csr.Lock()
defer csr.Unlock()
csr.scaleUpFailures = make(map[string][]ScaleUpFailure)
}
// GetScaleUpFailures returns the scale-up failures map.
func (csr *ClusterStateRegistry) GetScaleUpFailures() map[string][]ScaleUpFailure {
csr.Lock()
defer csr.Unlock()
result := make(map[string][]ScaleUpFailure)
for nodeGroupId, failures := range csr.scaleUpFailures {
result[nodeGroupId] = failures
}
return result
}

View File

@ -20,6 +20,8 @@ import (
"testing"
"time"
"k8s.io/autoscaler/cluster-autoscaler/metrics"
apiv1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
testprovider "k8s.io/autoscaler/cluster-autoscaler/cloudprovider/test"
@ -60,6 +62,7 @@ func TestOKWithScaleUp(t *testing.T) {
err := clusterstate.UpdateNodes([]*apiv1.Node{ng1_1, ng2_1}, nil, now)
assert.NoError(t, err)
assert.True(t, clusterstate.IsClusterHealthy())
assert.Empty(t, clusterstate.GetScaleUpFailures())
status := clusterstate.GetStatus(now)
assert.Equal(t, api.ClusterAutoscalerInProgress,
@ -100,6 +103,7 @@ func TestEmptyOK(t *testing.T) {
err := clusterstate.UpdateNodes([]*apiv1.Node{}, nil, now.Add(-5*time.Second))
assert.NoError(t, err)
assert.True(t, clusterstate.IsClusterHealthy())
assert.Empty(t, clusterstate.GetScaleUpFailures())
assert.True(t, clusterstate.IsNodeGroupHealthy("ng1"))
assert.False(t, clusterstate.IsNodeGroupScalingUp("ng1"))
@ -139,6 +143,7 @@ func TestOKOneUnreadyNode(t *testing.T) {
err := clusterstate.UpdateNodes([]*apiv1.Node{ng1_1, ng2_1}, nil, now)
assert.NoError(t, err)
assert.True(t, clusterstate.IsClusterHealthy())
assert.Empty(t, clusterstate.GetScaleUpFailures())
assert.True(t, clusterstate.IsNodeGroupHealthy("ng1"))
status := clusterstate.GetStatus(now)
@ -175,6 +180,7 @@ func TestNodeWithoutNodeGroupDontCrash(t *testing.T) {
}, fakeLogRecorder, newBackoff())
err := clusterstate.UpdateNodes([]*apiv1.Node{noNgNode}, nil, now)
assert.NoError(t, err)
assert.Empty(t, clusterstate.GetScaleUpFailures())
clusterstate.UpdateScaleDownCandidates([]*apiv1.Node{noNgNode}, now)
}
@ -204,6 +210,7 @@ func TestOKOneUnreadyNodeWithScaleDownCandidate(t *testing.T) {
assert.NoError(t, err)
assert.True(t, clusterstate.IsClusterHealthy())
assert.Empty(t, clusterstate.GetScaleUpFailures())
assert.True(t, clusterstate.IsNodeGroupHealthy("ng1"))
status := clusterstate.GetStatus(now)
@ -266,6 +273,7 @@ func TestMissingNodes(t *testing.T) {
err := clusterstate.UpdateNodes([]*apiv1.Node{ng1_1, ng2_1}, nil, now)
assert.NoError(t, err)
assert.True(t, clusterstate.IsClusterHealthy())
assert.Empty(t, clusterstate.GetScaleUpFailures())
assert.False(t, clusterstate.IsNodeGroupHealthy("ng1"))
status := clusterstate.GetStatus(now)
@ -307,6 +315,7 @@ func TestTooManyUnready(t *testing.T) {
err := clusterstate.UpdateNodes([]*apiv1.Node{ng1_1, ng2_1}, nil, now)
assert.NoError(t, err)
assert.False(t, clusterstate.IsClusterHealthy())
assert.Empty(t, clusterstate.GetScaleUpFailures())
assert.True(t, clusterstate.IsNodeGroupHealthy("ng1"))
}
@ -333,6 +342,11 @@ func TestExpiredScaleUp(t *testing.T) {
assert.NoError(t, err)
assert.True(t, clusterstate.IsClusterHealthy())
assert.False(t, clusterstate.IsNodeGroupHealthy("ng1"))
assert.Equal(t, clusterstate.GetScaleUpFailures(), map[string][]ScaleUpFailure{
"ng1": {
{NodeGroup: provider.GetNodeGroup("ng1"), Time: now, Reason: metrics.Timeout},
},
})
}
func TestRegisterScaleDown(t *testing.T) {
@ -360,6 +374,7 @@ func TestRegisterScaleDown(t *testing.T) {
assert.Equal(t, 1, len(clusterstate.scaleDownRequests))
clusterstate.updateScaleRequests(now.Add(5 * time.Minute))
assert.Equal(t, 0, len(clusterstate.scaleDownRequests))
assert.Empty(t, clusterstate.GetScaleUpFailures())
}
func TestUpcomingNodes(t *testing.T) {
@ -402,6 +417,7 @@ func TestUpcomingNodes(t *testing.T) {
}, fakeLogRecorder, newBackoff())
err := clusterstate.UpdateNodes([]*apiv1.Node{ng1_1, ng2_1, ng3_1, ng4_1}, nil, now)
assert.NoError(t, err)
assert.Empty(t, clusterstate.GetScaleUpFailures())
upcomingNodes := clusterstate.GetUpcomingNodes()
assert.Equal(t, 6, upcomingNodes["ng1"])
@ -778,6 +794,37 @@ func TestIsNodeStillStarting(t *testing.T) {
}
}
func TestScaleUpFailures(t *testing.T) {
now := time.Now()
provider := testprovider.NewTestCloudProvider(nil, nil)
provider.AddNodeGroup("ng1", 0, 10, 0)
provider.AddNodeGroup("ng2", 0, 10, 0)
assert.NotNil(t, provider)
fakeClient := &fake.Clientset{}
fakeLogRecorder, _ := utils.NewStatusMapRecorder(fakeClient, "kube-system", kube_record.NewFakeRecorder(5), false)
clusterstate := NewClusterStateRegistry(provider, ClusterStateRegistryConfig{}, fakeLogRecorder, newBackoff())
clusterstate.RegisterFailedScaleUp(provider.GetNodeGroup("ng1"), metrics.Timeout, now)
clusterstate.RegisterFailedScaleUp(provider.GetNodeGroup("ng2"), metrics.Timeout, now)
clusterstate.RegisterFailedScaleUp(provider.GetNodeGroup("ng1"), metrics.APIError, now.Add(time.Minute))
failures := clusterstate.GetScaleUpFailures()
assert.Equal(t, map[string][]ScaleUpFailure{
"ng1": {
{NodeGroup: provider.GetNodeGroup("ng1"), Reason: metrics.Timeout, Time: now},
{NodeGroup: provider.GetNodeGroup("ng1"), Reason: metrics.APIError, Time: now.Add(time.Minute)},
},
"ng2": {
{NodeGroup: provider.GetNodeGroup("ng2"), Reason: metrics.Timeout, Time: now},
},
}, failures)
clusterstate.clearScaleUpFailures()
assert.Empty(t, clusterstate.GetScaleUpFailures())
}
func newBackoff() backoff.Backoff {
return backoff.NewIdBasedExponentialBackoff(InitialNodeGroupBackoffDuration, MaxNodeGroupBackoffDuration, NodeGroupBackoffResetTimeout)
}

View File

@ -177,6 +177,7 @@ func (a *StaticAutoscaler) cleanUpIfRequired() {
func (a *StaticAutoscaler) RunOnce(currentTime time.Time) errors.AutoscalerError {
a.cleanUpIfRequired()
a.processorCallbacks.reset()
a.clusterStateRegistry.PeriodicCleanup()
unschedulablePodLister := a.UnschedulablePodLister()
scheduledPodLister := a.ScheduledPodLister()