Merge pull request #1464 from losipiuk/lo/stockouts2

Better quota-exceeded/stockout handling
This commit is contained in:
Kubernetes Prow Robot 2018-12-31 05:28:08 -08:00 committed by GitHub
commit ab7f1e69be
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
9 changed files with 1071 additions and 102 deletions

View File

@ -0,0 +1,203 @@
/*
Copyright 2017 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package mocks
import cloudprovider "k8s.io/autoscaler/cluster-autoscaler/cloudprovider"
import errors "k8s.io/autoscaler/cluster-autoscaler/utils/errors"
import mock "github.com/stretchr/testify/mock"
import resource "k8s.io/apimachinery/pkg/api/resource"
import v1 "k8s.io/api/core/v1"
// CloudProvider is an autogenerated mock type for the CloudProvider type
type CloudProvider struct {
mock.Mock
}
// Cleanup provides a mock function with given fields:
func (_m *CloudProvider) Cleanup() error {
ret := _m.Called()
var r0 error
if rf, ok := ret.Get(0).(func() error); ok {
r0 = rf()
} else {
r0 = ret.Error(0)
}
return r0
}
// GetAvailableMachineTypes provides a mock function with given fields:
func (_m *CloudProvider) GetAvailableMachineTypes() ([]string, error) {
ret := _m.Called()
var r0 []string
if rf, ok := ret.Get(0).(func() []string); ok {
r0 = rf()
} else {
if ret.Get(0) != nil {
r0 = ret.Get(0).([]string)
}
}
var r1 error
if rf, ok := ret.Get(1).(func() error); ok {
r1 = rf()
} else {
r1 = ret.Error(1)
}
return r0, r1
}
// GetResourceLimiter provides a mock function with given fields:
func (_m *CloudProvider) GetResourceLimiter() (*cloudprovider.ResourceLimiter, error) {
ret := _m.Called()
var r0 *cloudprovider.ResourceLimiter
if rf, ok := ret.Get(0).(func() *cloudprovider.ResourceLimiter); ok {
r0 = rf()
} else {
if ret.Get(0) != nil {
r0 = ret.Get(0).(*cloudprovider.ResourceLimiter)
}
}
var r1 error
if rf, ok := ret.Get(1).(func() error); ok {
r1 = rf()
} else {
r1 = ret.Error(1)
}
return r0, r1
}
// Name provides a mock function with given fields:
func (_m *CloudProvider) Name() string {
ret := _m.Called()
var r0 string
if rf, ok := ret.Get(0).(func() string); ok {
r0 = rf()
} else {
r0 = ret.Get(0).(string)
}
return r0
}
// NewNodeGroup provides a mock function with given fields: machineType, labels, systemLabels, taints, extraResources
func (_m *CloudProvider) NewNodeGroup(machineType string, labels map[string]string, systemLabels map[string]string, taints []v1.Taint, extraResources map[string]resource.Quantity) (cloudprovider.NodeGroup, error) {
ret := _m.Called(machineType, labels, systemLabels, taints, extraResources)
var r0 cloudprovider.NodeGroup
if rf, ok := ret.Get(0).(func(string, map[string]string, map[string]string, []v1.Taint, map[string]resource.Quantity) cloudprovider.NodeGroup); ok {
r0 = rf(machineType, labels, systemLabels, taints, extraResources)
} else {
if ret.Get(0) != nil {
r0 = ret.Get(0).(cloudprovider.NodeGroup)
}
}
var r1 error
if rf, ok := ret.Get(1).(func(string, map[string]string, map[string]string, []v1.Taint, map[string]resource.Quantity) error); ok {
r1 = rf(machineType, labels, systemLabels, taints, extraResources)
} else {
r1 = ret.Error(1)
}
return r0, r1
}
// NodeGroupForNode provides a mock function with given fields: _a0
func (_m *CloudProvider) NodeGroupForNode(_a0 *v1.Node) (cloudprovider.NodeGroup, error) {
ret := _m.Called(_a0)
var r0 cloudprovider.NodeGroup
if rf, ok := ret.Get(0).(func(*v1.Node) cloudprovider.NodeGroup); ok {
r0 = rf(_a0)
} else {
if ret.Get(0) != nil {
r0 = ret.Get(0).(cloudprovider.NodeGroup)
}
}
var r1 error
if rf, ok := ret.Get(1).(func(*v1.Node) error); ok {
r1 = rf(_a0)
} else {
r1 = ret.Error(1)
}
return r0, r1
}
// NodeGroups provides a mock function with given fields:
func (_m *CloudProvider) NodeGroups() []cloudprovider.NodeGroup {
ret := _m.Called()
var r0 []cloudprovider.NodeGroup
if rf, ok := ret.Get(0).(func() []cloudprovider.NodeGroup); ok {
r0 = rf()
} else {
if ret.Get(0) != nil {
r0 = ret.Get(0).([]cloudprovider.NodeGroup)
}
}
return r0
}
// Pricing provides a mock function with given fields:
func (_m *CloudProvider) Pricing() (cloudprovider.PricingModel, errors.AutoscalerError) {
ret := _m.Called()
var r0 cloudprovider.PricingModel
if rf, ok := ret.Get(0).(func() cloudprovider.PricingModel); ok {
r0 = rf()
} else {
if ret.Get(0) != nil {
r0 = ret.Get(0).(cloudprovider.PricingModel)
}
}
var r1 errors.AutoscalerError
if rf, ok := ret.Get(1).(func() errors.AutoscalerError); ok {
r1 = rf()
} else {
if ret.Get(1) != nil {
r1 = ret.Get(1).(errors.AutoscalerError)
}
}
return r0, r1
}
// Refresh provides a mock function with given fields:
func (_m *CloudProvider) Refresh() error {
ret := _m.Called()
var r0 error
if rf, ok := ret.Get(0).(func() error); ok {
r0 = rf()
} else {
r0 = ret.Error(0)
}
return r0
}

View File

@ -0,0 +1,257 @@
/*
Copyright 2017 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package mocks
import cache "k8s.io/kubernetes/pkg/scheduler/cache"
import cloudprovider "k8s.io/autoscaler/cluster-autoscaler/cloudprovider"
import mock "github.com/stretchr/testify/mock"
import v1 "k8s.io/api/core/v1"
// NodeGroup is an autogenerated mock type for the NodeGroup type
type NodeGroup struct {
mock.Mock
}
// Autoprovisioned provides a mock function with given fields:
func (_m *NodeGroup) Autoprovisioned() bool {
ret := _m.Called()
var r0 bool
if rf, ok := ret.Get(0).(func() bool); ok {
r0 = rf()
} else {
r0 = ret.Get(0).(bool)
}
return r0
}
// Create provides a mock function with given fields:
func (_m *NodeGroup) Create() (cloudprovider.NodeGroup, error) {
ret := _m.Called()
var r0 cloudprovider.NodeGroup
if rf, ok := ret.Get(0).(func() cloudprovider.NodeGroup); ok {
r0 = rf()
} else {
if ret.Get(0) != nil {
r0 = ret.Get(0).(cloudprovider.NodeGroup)
}
}
var r1 error
if rf, ok := ret.Get(1).(func() error); ok {
r1 = rf()
} else {
r1 = ret.Error(1)
}
return r0, r1
}
// Debug provides a mock function with given fields:
func (_m *NodeGroup) Debug() string {
ret := _m.Called()
var r0 string
if rf, ok := ret.Get(0).(func() string); ok {
r0 = rf()
} else {
r0 = ret.Get(0).(string)
}
return r0
}
// DecreaseTargetSize provides a mock function with given fields: delta
func (_m *NodeGroup) DecreaseTargetSize(delta int) error {
ret := _m.Called(delta)
var r0 error
if rf, ok := ret.Get(0).(func(int) error); ok {
r0 = rf(delta)
} else {
r0 = ret.Error(0)
}
return r0
}
// Delete provides a mock function with given fields:
func (_m *NodeGroup) Delete() error {
ret := _m.Called()
var r0 error
if rf, ok := ret.Get(0).(func() error); ok {
r0 = rf()
} else {
r0 = ret.Error(0)
}
return r0
}
// DeleteNodes provides a mock function with given fields: _a0
func (_m *NodeGroup) DeleteNodes(_a0 []*v1.Node) error {
ret := _m.Called(_a0)
var r0 error
if rf, ok := ret.Get(0).(func([]*v1.Node) error); ok {
r0 = rf(_a0)
} else {
r0 = ret.Error(0)
}
return r0
}
// Exist provides a mock function with given fields:
func (_m *NodeGroup) Exist() bool {
ret := _m.Called()
var r0 bool
if rf, ok := ret.Get(0).(func() bool); ok {
r0 = rf()
} else {
r0 = ret.Get(0).(bool)
}
return r0
}
// Id provides a mock function with given fields:
func (_m *NodeGroup) Id() string {
ret := _m.Called()
var r0 string
if rf, ok := ret.Get(0).(func() string); ok {
r0 = rf()
} else {
r0 = ret.Get(0).(string)
}
return r0
}
// IncreaseSize provides a mock function with given fields: delta
func (_m *NodeGroup) IncreaseSize(delta int) error {
ret := _m.Called(delta)
var r0 error
if rf, ok := ret.Get(0).(func(int) error); ok {
r0 = rf(delta)
} else {
r0 = ret.Error(0)
}
return r0
}
// MaxSize provides a mock function with given fields:
func (_m *NodeGroup) MaxSize() int {
ret := _m.Called()
var r0 int
if rf, ok := ret.Get(0).(func() int); ok {
r0 = rf()
} else {
r0 = ret.Get(0).(int)
}
return r0
}
// MinSize provides a mock function with given fields:
func (_m *NodeGroup) MinSize() int {
ret := _m.Called()
var r0 int
if rf, ok := ret.Get(0).(func() int); ok {
r0 = rf()
} else {
r0 = ret.Get(0).(int)
}
return r0
}
// Nodes provides a mock function with given fields:
func (_m *NodeGroup) Nodes() ([]cloudprovider.Instance, error) {
ret := _m.Called()
var r0 []cloudprovider.Instance
if rf, ok := ret.Get(0).(func() []cloudprovider.Instance); ok {
r0 = rf()
} else {
if ret.Get(0) != nil {
r0 = ret.Get(0).([]cloudprovider.Instance)
}
}
var r1 error
if rf, ok := ret.Get(1).(func() error); ok {
r1 = rf()
} else {
r1 = ret.Error(1)
}
return r0, r1
}
// TargetSize provides a mock function with given fields:
func (_m *NodeGroup) TargetSize() (int, error) {
ret := _m.Called()
var r0 int
if rf, ok := ret.Get(0).(func() int); ok {
r0 = rf()
} else {
r0 = ret.Get(0).(int)
}
var r1 error
if rf, ok := ret.Get(1).(func() error); ok {
r1 = rf()
} else {
r1 = ret.Error(1)
}
return r0, r1
}
// TemplateNodeInfo provides a mock function with given fields:
func (_m *NodeGroup) TemplateNodeInfo() (*cache.NodeInfo, error) {
ret := _m.Called()
var r0 *cache.NodeInfo
if rf, ok := ret.Get(0).(func() *cache.NodeInfo); ok {
r0 = rf()
} else {
if ret.Get(0) != nil {
r0 = ret.Get(0).(*cache.NodeInfo)
}
}
var r1 error
if rf, ok := ret.Get(1).(func() error); ok {
r1 = rf()
} else {
r1 = ret.Error(1)
}
return r0, r1
}

View File

@ -0,0 +1,68 @@
/*
Copyright 2017 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package mocks
import mock "github.com/stretchr/testify/mock"
import time "time"
import v1 "k8s.io/api/core/v1"
// PricingModel is an autogenerated mock type for the PricingModel type
type PricingModel struct {
mock.Mock
}
// NodePrice provides a mock function with given fields: node, startTime, endTime
func (_m *PricingModel) NodePrice(node *v1.Node, startTime time.Time, endTime time.Time) (float64, error) {
ret := _m.Called(node, startTime, endTime)
var r0 float64
if rf, ok := ret.Get(0).(func(*v1.Node, time.Time, time.Time) float64); ok {
r0 = rf(node, startTime, endTime)
} else {
r0 = ret.Get(0).(float64)
}
var r1 error
if rf, ok := ret.Get(1).(func(*v1.Node, time.Time, time.Time) error); ok {
r1 = rf(node, startTime, endTime)
} else {
r1 = ret.Error(1)
}
return r0, r1
}
// PodPrice provides a mock function with given fields: pod, startTime, endTime
func (_m *PricingModel) PodPrice(pod *v1.Pod, startTime time.Time, endTime time.Time) (float64, error) {
ret := _m.Called(pod, startTime, endTime)
var r0 float64
if rf, ok := ret.Get(0).(func(*v1.Pod, time.Time, time.Time) float64); ok {
r0 = rf(pod, startTime, endTime)
} else {
r0 = ret.Get(0).(float64)
}
var r1 error
if rf, ok := ret.Get(1).(func(*v1.Pod, time.Time, time.Time) error); ok {
r1 = rf(pod, startTime, endTime)
} else {
r1 = ret.Error(1)
}
return r0, r1
}

View File

@ -19,6 +19,7 @@ package clusterstate
import ( import (
"fmt" "fmt"
"reflect" "reflect"
"strings"
"sync" "sync"
"time" "time"
@ -114,21 +115,23 @@ type UnregisteredNode struct {
// ClusterStateRegistry is a structure to keep track the current state of the cluster. // ClusterStateRegistry is a structure to keep track the current state of the cluster.
type ClusterStateRegistry struct { type ClusterStateRegistry struct {
sync.Mutex sync.Mutex
config ClusterStateRegistryConfig config ClusterStateRegistryConfig
scaleUpRequests map[string]*ScaleUpRequest // nodeGroupName -> ScaleUpRequest scaleUpRequests map[string]*ScaleUpRequest // nodeGroupName -> ScaleUpRequest
scaleDownRequests []*ScaleDownRequest scaleDownRequests []*ScaleDownRequest
nodes []*apiv1.Node nodes []*apiv1.Node
cloudProvider cloudprovider.CloudProvider cloudProvider cloudprovider.CloudProvider
perNodeGroupReadiness map[string]Readiness perNodeGroupReadiness map[string]Readiness
totalReadiness Readiness totalReadiness Readiness
acceptableRanges map[string]AcceptableRange acceptableRanges map[string]AcceptableRange
incorrectNodeGroupSizes map[string]IncorrectNodeGroupSize incorrectNodeGroupSizes map[string]IncorrectNodeGroupSize
unregisteredNodes map[string]UnregisteredNode unregisteredNodes map[string]UnregisteredNode
candidatesForScaleDown map[string][]string candidatesForScaleDown map[string][]string
nodeGroupBackoffInfo backoff.Backoff nodeGroupBackoffInfo backoff.Backoff
lastStatus *api.ClusterAutoscalerStatus lastStatus *api.ClusterAutoscalerStatus
lastScaleDownUpdateTime time.Time lastScaleDownUpdateTime time.Time
logRecorder *utils.LogEventRecorder logRecorder *utils.LogEventRecorder
cloudProviderNodeInstances map[string][]cloudprovider.Instance
previousCloudProviderNodeInstances map[string][]cloudprovider.Instance
} }
// NewClusterStateRegistry creates new ClusterStateRegistry. // NewClusterStateRegistry creates new ClusterStateRegistry.
@ -154,21 +157,49 @@ func NewClusterStateRegistry(cloudProvider cloudprovider.CloudProvider, config C
} }
} }
// RegisterScaleUp registers scale up. // RegisterOrUpdateScaleUp registers scale-up for give node group or changes requested node increase
func (csr *ClusterStateRegistry) RegisterScaleUp(request *ScaleUpRequest) { // count.
// If delta is positive then number of new nodes requested is increased; Time and expectedAddTime
// are reset.
// If delta is negative the number of new nodes requested is decreased; Time and expectedAddTime are
// left intact.
func (csr *ClusterStateRegistry) RegisterOrUpdateScaleUp(nodeGroup cloudprovider.NodeGroup, delta int, currentTime time.Time) {
csr.Lock() csr.Lock()
defer csr.Unlock() defer csr.Unlock()
csr.registerOrUpdateScaleUpNoLock(nodeGroup, delta, currentTime)
}
func (csr *ClusterStateRegistry) registerOrUpdateScaleUpNoLock(nodeGroup cloudprovider.NodeGroup, delta int, currentTime time.Time) {
scaleUpRequest, found := csr.scaleUpRequests[nodeGroup.Id()]
if !found && delta > 0 {
scaleUpRequest = &ScaleUpRequest{
NodeGroup: nodeGroup,
Increase: delta,
Time: currentTime,
ExpectedAddTime: currentTime.Add(csr.config.MaxNodeProvisionTime),
}
csr.scaleUpRequests[nodeGroup.Id()] = scaleUpRequest
return
}
oldScaleUpRequest, found := csr.scaleUpRequests[request.NodeGroup.Id()]
if !found { if !found {
csr.scaleUpRequests[request.NodeGroup.Id()] = request // delta <=0
return return
} }
// update the old request // update the old request
oldScaleUpRequest.Time = request.Time if scaleUpRequest.Increase+delta <= 0 {
oldScaleUpRequest.ExpectedAddTime = request.ExpectedAddTime // increase <= 0 means that there is no scale-up intent really
oldScaleUpRequest.Increase += request.Increase delete(csr.scaleUpRequests, nodeGroup.Id())
return
}
scaleUpRequest.Increase += delta
if delta > 0 {
// if we are actually adding new nodes shift Time and ExpectedAddTime
scaleUpRequest.Time = currentTime
scaleUpRequest.ExpectedAddTime = currentTime.Add(csr.config.MaxNodeProvisionTime)
}
} }
// RegisterScaleDown registers node scale down. // RegisterScaleDown registers node scale down.
@ -224,12 +255,15 @@ func (csr *ClusterStateRegistry) backoffNodeGroup(nodeGroup cloudprovider.NodeGr
// RegisterFailedScaleUp should be called after getting error from cloudprovider // RegisterFailedScaleUp should be called after getting error from cloudprovider
// when trying to scale-up node group. It will mark this group as not safe to autoscale // when trying to scale-up node group. It will mark this group as not safe to autoscale
// for some time. // for some time.
func (csr *ClusterStateRegistry) RegisterFailedScaleUp(nodeGroup cloudprovider.NodeGroup, reason metrics.FailedScaleUpReason) { func (csr *ClusterStateRegistry) RegisterFailedScaleUp(nodeGroup cloudprovider.NodeGroup, reason metrics.FailedScaleUpReason, currentTime time.Time) {
csr.Lock() csr.Lock()
defer csr.Unlock() defer csr.Unlock()
csr.registerFailedScaleUpNoLock(nodeGroup, reason, currentTime)
}
func (csr *ClusterStateRegistry) registerFailedScaleUpNoLock(nodeGroup cloudprovider.NodeGroup, reason metrics.FailedScaleUpReason, currentTime time.Time) {
metrics.RegisterFailedScaleUp(reason) metrics.RegisterFailedScaleUp(reason)
csr.backoffNodeGroup(nodeGroup, time.Now()) csr.backoffNodeGroup(nodeGroup, currentTime)
} }
// UpdateNodes updates the state of the nodes in the ClusterStateRegistry and recalculates the stats // UpdateNodes updates the state of the nodes in the ClusterStateRegistry and recalculates the stats
@ -239,15 +273,19 @@ func (csr *ClusterStateRegistry) UpdateNodes(nodes []*apiv1.Node, currentTime ti
if err != nil { if err != nil {
return err return err
} }
notRegistered, err := getNotRegisteredNodes(nodes, csr.cloudProvider, currentTime)
cloudProviderNodeInstances, err := getCloudProviderNodeInstances(csr.cloudProvider)
if err != nil { if err != nil {
return err return err
} }
notRegistered := getNotRegisteredNodes(nodes, cloudProviderNodeInstances, currentTime)
csr.Lock() csr.Lock()
defer csr.Unlock() defer csr.Unlock()
csr.nodes = nodes csr.nodes = nodes
csr.previousCloudProviderNodeInstances = csr.cloudProviderNodeInstances
csr.cloudProviderNodeInstances = cloudProviderNodeInstances
csr.updateUnregisteredNodes(notRegistered) csr.updateUnregisteredNodes(notRegistered)
csr.updateReadinessStats(currentTime) csr.updateReadinessStats(currentTime)
@ -256,6 +294,7 @@ func (csr *ClusterStateRegistry) UpdateNodes(nodes []*apiv1.Node, currentTime ti
// updateScaleRequests relies on acceptableRanges being up to date // updateScaleRequests relies on acceptableRanges being up to date
csr.updateAcceptableRanges(targetSizes) csr.updateAcceptableRanges(targetSizes)
csr.updateScaleRequests(currentTime) csr.updateScaleRequests(currentTime)
csr.handleOutOfResourcesErrors(currentTime)
// recalculate acceptable ranges after removing timed out requests // recalculate acceptable ranges after removing timed out requests
csr.updateAcceptableRanges(targetSizes) csr.updateAcceptableRanges(targetSizes)
csr.updateIncorrectNodeGroupSizes(currentTime) csr.updateIncorrectNodeGroupSizes(currentTime)
@ -874,35 +913,38 @@ func (csr *ClusterStateRegistry) GetUpcomingNodes() map[string]int {
return result return result
} }
// getCloudProviderNodeInstances returns map keyed on node group id where value is list of node instances
// as returned by NodeGroup.Nodes().
func getCloudProviderNodeInstances(cloudProvider cloudprovider.CloudProvider) (map[string][]cloudprovider.Instance, error) {
allInstances := make(map[string][]cloudprovider.Instance)
for _, nodeGroup := range cloudProvider.NodeGroups() {
nodeGroupInstances, err := nodeGroup.Nodes()
if err != nil {
return nil, err
}
allInstances[nodeGroup.Id()] = nodeGroupInstances
}
return allInstances, nil
}
// Calculates which of the existing cloud provider nodes are not registered in Kubernetes. // Calculates which of the existing cloud provider nodes are not registered in Kubernetes.
func getNotRegisteredNodes(allNodes []*apiv1.Node, cloudProvider cloudprovider.CloudProvider, time time.Time) ([]UnregisteredNode, error) { func getNotRegisteredNodes(allNodes []*apiv1.Node, cloudProviderNodeInstances map[string][]cloudprovider.Instance, time time.Time) []UnregisteredNode {
registered := sets.NewString() registered := sets.NewString()
for _, node := range allNodes { for _, node := range allNodes {
registered.Insert(node.Spec.ProviderID) registered.Insert(node.Spec.ProviderID)
} }
notRegistered := make([]UnregisteredNode, 0) notRegistered := make([]UnregisteredNode, 0)
for _, nodeGroup := range cloudProvider.NodeGroups() { for _, instances := range cloudProviderNodeInstances {
nodes, err := nodeGroup.Nodes() for _, instance := range instances {
if err != nil { if !registered.Has(instance.Id) {
return []UnregisteredNode{}, err
}
for _, node := range nodes {
if !registered.Has(node.Id) {
notRegistered = append(notRegistered, UnregisteredNode{ notRegistered = append(notRegistered, UnregisteredNode{
Node: &apiv1.Node{ Node: fakeNode(instance),
ObjectMeta: metav1.ObjectMeta{
Name: node.Id,
},
Spec: apiv1.NodeSpec{
ProviderID: node.Id,
},
},
UnregisteredSince: time, UnregisteredSince: time,
}) })
} }
} }
} }
return notRegistered, nil return notRegistered
} }
// GetClusterSize calculates and returns cluster's current size and target size. The current size is the // GetClusterSize calculates and returns cluster's current size and target size. The current size is the
@ -917,3 +959,130 @@ func (csr *ClusterStateRegistry) GetClusterSize() (currentSize, targetSize int)
currentSize = csr.totalReadiness.Registered - csr.totalReadiness.NotStarted - csr.totalReadiness.LongNotStarted currentSize = csr.totalReadiness.Registered - csr.totalReadiness.NotStarted - csr.totalReadiness.LongNotStarted
return currentSize, targetSize return currentSize, targetSize
} }
func (csr *ClusterStateRegistry) handleOutOfResourcesErrors(currentTime time.Time) {
nodeGroups := csr.cloudProvider.NodeGroups()
for _, nodeGroup := range nodeGroups {
csr.handleOutOfResourcesErrorsForNodeGroup(
nodeGroup,
csr.cloudProviderNodeInstances[nodeGroup.Id()],
csr.previousCloudProviderNodeInstances[nodeGroup.Id()],
currentTime)
}
}
func (csr *ClusterStateRegistry) handleOutOfResourcesErrorsForNodeGroup(
nodeGroup cloudprovider.NodeGroup,
currentInstances []cloudprovider.Instance,
previousInstances []cloudprovider.Instance,
currentTime time.Time) {
_, currentUniqueErrorMessagesForErrorCode, currentErrorCodeToInstance := csr.buildInstanceToOutOfResourcesErrorCodeMappings(currentInstances)
previousInstanceToErrorCode, _, _ := csr.buildInstanceToOutOfResourcesErrorCodeMappings(previousInstances)
// If node group is scaling up and there are new node-create requests which cannot be satisfied because of
// out-of-resources errors we:
// - emit event
// - alter the scale-up
// - increase scale-up failure metric
// - backoff the node group
for errorCode, instances := range currentErrorCodeToInstance {
unseenInstanceIds := make([]string, 0)
for _, instance := range instances {
if _, seen := previousInstanceToErrorCode[instance.Id]; !seen {
unseenInstanceIds = append(unseenInstanceIds, instance.Id)
}
}
klog.V(1).Infof("Failed adding %v nodes (%v unseen previously) to group %v due to %v", len(instances), len(unseenInstanceIds), nodeGroup.Id(), errorCode)
if len(unseenInstanceIds) > 0 && csr.IsNodeGroupScalingUp(nodeGroup.Id()) {
csr.logRecorder.Eventf(
apiv1.EventTypeWarning,
"ScaleUpFailed",
"Failed adding %v nodes to group %v due to %v; source errors: %v",
len(unseenInstanceIds),
nodeGroup.Id(),
errorCode,
csr.buildErrorMessageEventString(currentUniqueErrorMessagesForErrorCode[errorCode]))
// Decrease the scale up request by the number of deleted nodes
csr.registerOrUpdateScaleUpNoLock(nodeGroup, -len(unseenInstanceIds), currentTime)
csr.registerFailedScaleUpNoLock(nodeGroup, metrics.FailedScaleUpReason(errorCode), currentTime)
}
}
}
func (csr *ClusterStateRegistry) buildErrorMessageEventString(uniqErrorMessages []string) string {
var sb strings.Builder
maxErrors := 3
for i, errorMessage := range uniqErrorMessages {
if i > 0 {
sb.WriteString(", ")
}
sb.WriteString(errorMessage)
}
if len(uniqErrorMessages) > maxErrors {
sb.WriteString(", ...")
}
return sb.String()
}
func (csr *ClusterStateRegistry) buildInstanceToOutOfResourcesErrorCodeMappings(instances []cloudprovider.Instance) (instanceToErrorCode map[string]string, uniqueErrorMessagesForErrorCode map[string][]string, errorCodeToInstance map[string][]cloudprovider.Instance) {
instanceToErrorCode = make(map[string]string)
uniqueErrorMessagesForErrorCode = make(map[string][]string)
errorCodeToInstance = make(map[string][]cloudprovider.Instance)
uniqErrorMessagesForErrorCodeTmp := make(map[string]map[string]bool)
for _, instance := range instances {
if instance.Status != nil && instance.Status.State == cloudprovider.InstanceCreating && instance.Status.ErrorInfo != nil {
errorInfo := instance.Status.ErrorInfo
if errorInfo.ErrorClass == cloudprovider.OutOfResourcesErrorClass {
if _, found := uniqErrorMessagesForErrorCodeTmp[errorInfo.ErrorCode]; !found {
uniqErrorMessagesForErrorCodeTmp[errorInfo.ErrorCode] = make(map[string]bool)
}
instanceToErrorCode[instance.Id] = errorInfo.ErrorCode
uniqErrorMessagesForErrorCodeTmp[errorInfo.ErrorCode][errorInfo.ErrorMessage] = true
errorCodeToInstance[errorInfo.ErrorCode] = append(errorCodeToInstance[errorInfo.ErrorCode], instance)
}
}
}
for errorCode, uniqueErrorMessages := range uniqErrorMessagesForErrorCodeTmp {
for errorMessage := range uniqueErrorMessages {
uniqueErrorMessagesForErrorCode[errorCode] = append(uniqueErrorMessagesForErrorCode[errorCode], errorMessage)
}
}
return
}
// GetCreatedNodesWithOutOfResourcesErrors returns list of nodes being created which reported create error of "out of resources" class.
func (csr *ClusterStateRegistry) GetCreatedNodesWithOutOfResourcesErrors() []*apiv1.Node {
csr.Lock()
defer csr.Unlock()
nodesWithCreateErrors := make([]*apiv1.Node, 0, 0)
for _, nodeGroupInstances := range csr.cloudProviderNodeInstances {
_, _, instancesByErrorCode := csr.buildInstanceToOutOfResourcesErrorCodeMappings(nodeGroupInstances)
for _, instances := range instancesByErrorCode {
for _, instance := range instances {
nodesWithCreateErrors = append(nodesWithCreateErrors, fakeNode(instance))
}
}
}
return nodesWithCreateErrors
}
func fakeNode(instance cloudprovider.Instance) *apiv1.Node {
return &apiv1.Node{
ObjectMeta: metav1.ObjectMeta{
Name: instance.Id,
},
Spec: apiv1.NodeSpec{
ProviderID: instance.Id,
},
}
}

View File

@ -54,13 +54,9 @@ func TestOKWithScaleUp(t *testing.T) {
clusterstate := NewClusterStateRegistry(provider, ClusterStateRegistryConfig{ clusterstate := NewClusterStateRegistry(provider, ClusterStateRegistryConfig{
MaxTotalUnreadyPercentage: 10, MaxTotalUnreadyPercentage: 10,
OkTotalUnreadyCount: 1, OkTotalUnreadyCount: 1,
MaxNodeProvisionTime: time.Minute,
}, fakeLogRecorder, newBackoff()) }, fakeLogRecorder, newBackoff())
clusterstate.RegisterScaleUp(&ScaleUpRequest{ clusterstate.RegisterOrUpdateScaleUp(provider.GetNodeGroup("ng1"), 4, time.Now())
NodeGroup: provider.GetNodeGroup("ng1"),
Increase: 4,
Time: now,
ExpectedAddTime: now.Add(time.Minute),
})
err := clusterstate.UpdateNodes([]*apiv1.Node{ng1_1, ng2_1}, now) err := clusterstate.UpdateNodes([]*apiv1.Node{ng1_1, ng2_1}, now)
assert.NoError(t, err) assert.NoError(t, err)
assert.True(t, clusterstate.IsClusterHealthy()) assert.True(t, clusterstate.IsClusterHealthy())
@ -99,6 +95,7 @@ func TestEmptyOK(t *testing.T) {
clusterstate := NewClusterStateRegistry(provider, ClusterStateRegistryConfig{ clusterstate := NewClusterStateRegistry(provider, ClusterStateRegistryConfig{
MaxTotalUnreadyPercentage: 10, MaxTotalUnreadyPercentage: 10,
OkTotalUnreadyCount: 1, OkTotalUnreadyCount: 1,
MaxNodeProvisionTime: time.Minute,
}, fakeLogRecorder, newBackoff()) }, fakeLogRecorder, newBackoff())
err := clusterstate.UpdateNodes([]*apiv1.Node{}, now.Add(-5*time.Second)) err := clusterstate.UpdateNodes([]*apiv1.Node{}, now.Add(-5*time.Second))
assert.NoError(t, err) assert.NoError(t, err)
@ -107,12 +104,9 @@ func TestEmptyOK(t *testing.T) {
assert.False(t, clusterstate.IsNodeGroupScalingUp("ng1")) assert.False(t, clusterstate.IsNodeGroupScalingUp("ng1"))
provider.AddNodeGroup("ng1", 0, 10, 3) provider.AddNodeGroup("ng1", 0, 10, 3)
clusterstate.RegisterScaleUp(&ScaleUpRequest{ clusterstate.RegisterOrUpdateScaleUp(provider.GetNodeGroup("ng1"), 3, now.Add(-3*time.Second))
NodeGroup: provider.GetNodeGroup("ng1"), // clusterstate.scaleUpRequests["ng1"].Time = now.Add(-3 * time.Second)
Increase: 3, // clusterstate.scaleUpRequests["ng1"].ExpectedAddTime = now.Add(1 * time.Minute)
Time: now.Add(-3 * time.Second),
ExpectedAddTime: now.Add(1 * time.Minute),
})
err = clusterstate.UpdateNodes([]*apiv1.Node{}, now) err = clusterstate.UpdateNodes([]*apiv1.Node{}, now)
assert.NoError(t, err) assert.NoError(t, err)
@ -332,13 +326,9 @@ func TestExpiredScaleUp(t *testing.T) {
clusterstate := NewClusterStateRegistry(provider, ClusterStateRegistryConfig{ clusterstate := NewClusterStateRegistry(provider, ClusterStateRegistryConfig{
MaxTotalUnreadyPercentage: 10, MaxTotalUnreadyPercentage: 10,
OkTotalUnreadyCount: 1, OkTotalUnreadyCount: 1,
MaxNodeProvisionTime: 2 * time.Minute,
}, fakeLogRecorder, newBackoff()) }, fakeLogRecorder, newBackoff())
clusterstate.RegisterScaleUp(&ScaleUpRequest{ clusterstate.RegisterOrUpdateScaleUp(provider.GetNodeGroup("ng1"), 4, now.Add(-3*time.Minute))
NodeGroup: provider.GetNodeGroup("ng1"),
Increase: 4,
Time: now.Add(-3 * time.Minute),
ExpectedAddTime: now.Add(-1 * time.Minute),
})
err := clusterstate.UpdateNodes([]*apiv1.Node{ng1_1}, now) err := clusterstate.UpdateNodes([]*apiv1.Node{ng1_1}, now)
assert.NoError(t, err) assert.NoError(t, err)
assert.True(t, clusterstate.IsClusterHealthy()) assert.True(t, clusterstate.IsClusterHealthy())
@ -619,15 +609,11 @@ func TestScaleUpBackoff(t *testing.T) {
clusterstate := NewClusterStateRegistry(provider, ClusterStateRegistryConfig{ clusterstate := NewClusterStateRegistry(provider, ClusterStateRegistryConfig{
MaxTotalUnreadyPercentage: 10, MaxTotalUnreadyPercentage: 10,
OkTotalUnreadyCount: 1, OkTotalUnreadyCount: 1,
MaxNodeProvisionTime: 120 * time.Second,
}, fakeLogRecorder, newBackoff()) }, fakeLogRecorder, newBackoff())
// After failed scale-up, node group should be still healthy, but should backoff from scale-ups // After failed scale-up, node group should be still healthy, but should backoff from scale-ups
clusterstate.RegisterScaleUp(&ScaleUpRequest{ clusterstate.RegisterOrUpdateScaleUp(provider.GetNodeGroup("ng1"), 1, now.Add(-180*time.Second))
NodeGroup: provider.GetNodeGroup("ng1"),
Increase: 1,
Time: now.Add(-3 * time.Minute),
ExpectedAddTime: now.Add(-1 * time.Minute),
})
err := clusterstate.UpdateNodes([]*apiv1.Node{ng1_1, ng1_2, ng1_3}, now) err := clusterstate.UpdateNodes([]*apiv1.Node{ng1_1, ng1_2, ng1_3}, now)
assert.NoError(t, err) assert.NoError(t, err)
assert.True(t, clusterstate.IsClusterHealthy()) assert.True(t, clusterstate.IsClusterHealthy())
@ -641,12 +627,8 @@ func TestScaleUpBackoff(t *testing.T) {
assert.True(t, clusterstate.IsNodeGroupSafeToScaleUp(ng1, now)) assert.True(t, clusterstate.IsNodeGroupSafeToScaleUp(ng1, now))
// Another failed scale up should cause longer backoff // Another failed scale up should cause longer backoff
clusterstate.RegisterScaleUp(&ScaleUpRequest{ clusterstate.RegisterOrUpdateScaleUp(provider.GetNodeGroup("ng1"), 1, now.Add(-121*time.Second))
NodeGroup: provider.GetNodeGroup("ng1"),
Increase: 1,
Time: now.Add(-2 * time.Minute),
ExpectedAddTime: now.Add(-1 * time.Second),
})
err = clusterstate.UpdateNodes([]*apiv1.Node{ng1_1, ng1_2, ng1_3}, now) err = clusterstate.UpdateNodes([]*apiv1.Node{ng1_1, ng1_2, ng1_3}, now)
assert.NoError(t, err) assert.NoError(t, err)
assert.True(t, clusterstate.IsClusterHealthy()) assert.True(t, clusterstate.IsClusterHealthy())
@ -657,12 +639,7 @@ func TestScaleUpBackoff(t *testing.T) {
assert.False(t, clusterstate.IsNodeGroupSafeToScaleUp(ng1, now)) assert.False(t, clusterstate.IsNodeGroupSafeToScaleUp(ng1, now))
// The backoff should be cleared after a successful scale-up // The backoff should be cleared after a successful scale-up
clusterstate.RegisterScaleUp(&ScaleUpRequest{ clusterstate.RegisterOrUpdateScaleUp(provider.GetNodeGroup("ng1"), 1, now)
NodeGroup: provider.GetNodeGroup("ng1"),
Increase: 1,
Time: now,
ExpectedAddTime: now.Add(time.Second),
})
ng1_4 := BuildTestNode("ng1-4", 1000, 1000) ng1_4 := BuildTestNode("ng1-4", 1000, 1000)
SetNodeReadyState(ng1_4, true, now.Add(-1*time.Minute)) SetNodeReadyState(ng1_4, true, now.Add(-1*time.Minute))
provider.AddNode("ng1", ng1_4) provider.AddNode("ng1", ng1_4)
@ -725,6 +702,50 @@ func TestGetClusterSize(t *testing.T) {
assert.Equal(t, 30, targetSize) assert.Equal(t, 30, targetSize)
} }
func TestUpdateScaleUp(t *testing.T) {
now := time.Now()
later := now.Add(time.Minute)
provider := testprovider.NewTestCloudProvider(nil, nil)
provider.AddNodeGroup("ng1", 1, 10, 5)
fakeClient := &fake.Clientset{}
fakeLogRecorder, _ := utils.NewStatusMapRecorder(fakeClient, "kube-system", kube_record.NewFakeRecorder(5), false)
clusterstate := NewClusterStateRegistry(
provider,
ClusterStateRegistryConfig{
MaxTotalUnreadyPercentage: 10,
OkTotalUnreadyCount: 1,
MaxNodeProvisionTime: 10 * time.Second,
},
fakeLogRecorder,
newBackoff())
clusterstate.RegisterOrUpdateScaleUp(provider.GetNodeGroup("ng1"), 100, now)
assert.Equal(t, clusterstate.scaleUpRequests["ng1"].Increase, 100)
assert.Equal(t, clusterstate.scaleUpRequests["ng1"].Time, now)
assert.Equal(t, clusterstate.scaleUpRequests["ng1"].ExpectedAddTime, now.Add(10*time.Second))
// expect no change of times on negative delta
clusterstate.RegisterOrUpdateScaleUp(provider.GetNodeGroup("ng1"), -20, later)
assert.Equal(t, clusterstate.scaleUpRequests["ng1"].Increase, 80)
assert.Equal(t, clusterstate.scaleUpRequests["ng1"].Time, now)
assert.Equal(t, clusterstate.scaleUpRequests["ng1"].ExpectedAddTime, now.Add(10*time.Second))
// update times on positive delta
clusterstate.RegisterOrUpdateScaleUp(provider.GetNodeGroup("ng1"), 30, later)
assert.Equal(t, clusterstate.scaleUpRequests["ng1"].Increase, 110)
assert.Equal(t, clusterstate.scaleUpRequests["ng1"].Time, later)
assert.Equal(t, clusterstate.scaleUpRequests["ng1"].ExpectedAddTime, later.Add(10*time.Second))
// if we get below 0 scalup is deleted
clusterstate.RegisterOrUpdateScaleUp(provider.GetNodeGroup("ng1"), -200, now)
assert.Nil(t, clusterstate.scaleUpRequests["ng1"])
// If new scalup is registered with negative delta nothing should happen
clusterstate.RegisterOrUpdateScaleUp(provider.GetNodeGroup("ng1"), -200, now)
assert.Nil(t, clusterstate.scaleUpRequests["ng1"])
}
func TestIsNodeStillStarting(t *testing.T) { func TestIsNodeStillStarting(t *testing.T) {
testCases := []struct { testCases := []struct {
desc string desc string

View File

@ -532,7 +532,7 @@ func ScaleUp(context *context.AutoscalingContext, processors *ca_processors.Auto
} }
klog.V(1).Infof("Final scale-up plan: %v", scaleUpInfos) klog.V(1).Infof("Final scale-up plan: %v", scaleUpInfos)
for _, info := range scaleUpInfos { for _, info := range scaleUpInfos {
typedErr := executeScaleUp(context, clusterStateRegistry, info, gpu.GetGpuTypeForMetrics(nodeInfo.Node(), nil)) typedErr := executeScaleUp(context, clusterStateRegistry, info, gpu.GetGpuTypeForMetrics(nodeInfo.Node(), nil), now)
if typedErr != nil { if typedErr != nil {
return &status.ScaleUpStatus{Result: status.ScaleUpError}, typedErr return &status.ScaleUpStatus{Result: status.ScaleUpError}, typedErr
} }
@ -686,24 +686,21 @@ groupsloop:
return result return result
} }
func executeScaleUp(context *context.AutoscalingContext, clusterStateRegistry *clusterstate.ClusterStateRegistry, info nodegroupset.ScaleUpInfo, gpuType string) errors.AutoscalerError { func executeScaleUp(context *context.AutoscalingContext, clusterStateRegistry *clusterstate.ClusterStateRegistry, info nodegroupset.ScaleUpInfo, gpuType string, now time.Time) errors.AutoscalerError {
klog.V(0).Infof("Scale-up: setting group %s size to %d", info.Group.Id(), info.NewSize) klog.V(0).Infof("Scale-up: setting group %s size to %d", info.Group.Id(), info.NewSize)
context.LogRecorder.Eventf(apiv1.EventTypeNormal, "ScaledUpGroup", context.LogRecorder.Eventf(apiv1.EventTypeNormal, "ScaledUpGroup",
"Scale-up: setting group %s size to %d", info.Group.Id(), info.NewSize) "Scale-up: setting group %s size to %d", info.Group.Id(), info.NewSize)
increase := info.NewSize - info.CurrentSize increase := info.NewSize - info.CurrentSize
if err := info.Group.IncreaseSize(increase); err != nil { if err := info.Group.IncreaseSize(increase); err != nil {
context.LogRecorder.Eventf(apiv1.EventTypeWarning, "FailedToScaleUpGroup", "Scale-up failed for group %s: %v", info.Group.Id(), err) context.LogRecorder.Eventf(apiv1.EventTypeWarning, "FailedToScaleUpGroup", "Scale-up failed for group %s: %v", info.Group.Id(), err)
clusterStateRegistry.RegisterFailedScaleUp(info.Group, metrics.APIError) clusterStateRegistry.RegisterFailedScaleUp(info.Group, metrics.APIError, now)
return errors.NewAutoscalerError(errors.CloudProviderError, return errors.NewAutoscalerError(errors.CloudProviderError,
"failed to increase node group size: %v", err) "failed to increase node group size: %v", err)
} }
clusterStateRegistry.RegisterScaleUp( clusterStateRegistry.RegisterOrUpdateScaleUp(
&clusterstate.ScaleUpRequest{ info.Group,
NodeGroup: info.Group, increase,
Increase: increase, time.Now())
Time: time.Now(),
ExpectedAddTime: time.Now().Add(context.MaxNodeProvisionTime),
})
metrics.RegisterScaleUp(increase, gpuType) metrics.RegisterScaleUp(increase, gpuType)
context.LogRecorder.Eventf(apiv1.EventTypeNormal, "ScaledUpGroup", context.LogRecorder.Eventf(apiv1.EventTypeNormal, "ScaledUpGroup",
"Scale-up: group %s size set to %d", info.Group.Id(), info.NewSize) "Scale-up: group %s size set to %d", info.Group.Id(), info.NewSize)

View File

@ -520,13 +520,12 @@ func TestScaleUpNodeComingNoScale(t *testing.T) {
} }
context := NewScaleTestAutoscalingContext(options, fakeClient, provider) context := NewScaleTestAutoscalingContext(options, fakeClient, provider)
clusterState := clusterstate.NewClusterStateRegistry(provider, clusterstate.ClusterStateRegistryConfig{}, context.LogRecorder, newBackoff()) clusterState := clusterstate.NewClusterStateRegistry(
clusterState.RegisterScaleUp(&clusterstate.ScaleUpRequest{ provider,
NodeGroup: provider.GetNodeGroup("ng2"), clusterstate.ClusterStateRegistryConfig{MaxNodeProvisionTime: 5 * time.Minute},
Increase: 1, context.LogRecorder,
Time: time.Now(), newBackoff())
ExpectedAddTime: time.Now().Add(5 * time.Minute), clusterState.RegisterOrUpdateScaleUp(provider.GetNodeGroup("ng2"), 1, time.Now())
})
clusterState.UpdateNodes([]*apiv1.Node{n1, n2}, time.Now()) clusterState.UpdateNodes([]*apiv1.Node{n1, n2}, time.Now())
p3 := BuildTestPod("p-new", 550, 0) p3 := BuildTestPod("p-new", 550, 0)
@ -575,13 +574,14 @@ func TestScaleUpNodeComingHasScale(t *testing.T) {
context := NewScaleTestAutoscalingContext(defaultOptions, fakeClient, provider) context := NewScaleTestAutoscalingContext(defaultOptions, fakeClient, provider)
clusterState := clusterstate.NewClusterStateRegistry(provider, clusterstate.ClusterStateRegistryConfig{}, context.LogRecorder, newBackoff()) clusterState := clusterstate.NewClusterStateRegistry(
clusterState.RegisterScaleUp(&clusterstate.ScaleUpRequest{ provider,
NodeGroup: provider.GetNodeGroup("ng2"), clusterstate.ClusterStateRegistryConfig{
Increase: 1, MaxNodeProvisionTime: 5 * time.Minute,
Time: time.Now(), },
ExpectedAddTime: time.Now().Add(5 * time.Minute), context.LogRecorder,
}) newBackoff())
clusterState.RegisterOrUpdateScaleUp(provider.GetNodeGroup("ng2"), 1, time.Now())
clusterState.UpdateNodes([]*apiv1.Node{n1, n2}, time.Now()) clusterState.UpdateNodes([]*apiv1.Node{n1, n2}, time.Now())
p3 := BuildTestPod("p-new", 550, 0) p3 := BuildTestPod("p-new", 550, 0)

View File

@ -17,6 +17,7 @@ limitations under the License.
package core package core
import ( import (
"fmt"
"time" "time"
apiv1 "k8s.io/api/core/v1" apiv1 "k8s.io/api/core/v1"
@ -192,6 +193,7 @@ func (a *StaticAutoscaler) RunOnce(currentTime time.Time) errors.AutoscalerError
return nil return nil
} }
} }
if !a.clusterStateRegistry.IsClusterHealthy() { if !a.clusterStateRegistry.IsClusterHealthy() {
klog.Warning("Cluster is not ready for autoscaling") klog.Warning("Cluster is not ready for autoscaling")
scaleDown.CleanUpUnneededNodes() scaleDown.CleanUpUnneededNodes()
@ -199,6 +201,8 @@ func (a *StaticAutoscaler) RunOnce(currentTime time.Time) errors.AutoscalerError
return nil return nil
} }
a.deleteCreatedNodesWithErrors()
// Check if there has been a constant difference between the number of nodes in k8s and // Check if there has been a constant difference between the number of nodes in k8s and
// the number of nodes on the cloud provider side. // the number of nodes on the cloud provider side.
// TODO: andrewskim - add protection for ready AWS nodes. // TODO: andrewskim - add protection for ready AWS nodes.
@ -398,6 +402,52 @@ func (a *StaticAutoscaler) RunOnce(currentTime time.Time) errors.AutoscalerError
return nil return nil
} }
func (a *StaticAutoscaler) deleteCreatedNodesWithErrors() {
// We always schedule deleting of incoming errornous nodes
// TODO[lukaszos] Consider adding logic to not retry delete every loop iteration
nodes := a.clusterStateRegistry.GetCreatedNodesWithOutOfResourcesErrors()
nodeGroups := a.nodeGroupsById()
nodesToBeDeletedByNodeGroupId := make(map[string][]*apiv1.Node)
for _, node := range nodes {
nodeGroup, err := a.CloudProvider.NodeGroupForNode(node)
if err != nil {
id := "<nil>"
if node != nil {
id = node.Spec.ProviderID
}
klog.Warningf("Cannot determine nodeGroup for node %v; %v", id, err)
continue
}
nodesToBeDeletedByNodeGroupId[nodeGroup.Id()] = append(nodesToBeDeletedByNodeGroupId[nodeGroup.Id()], node)
}
for nodeGroupId, nodesToBeDeleted := range nodesToBeDeletedByNodeGroupId {
var err error
klog.V(1).Infof("Deleting %v from %v node group because of create errors", len(nodesToBeDeleted), nodeGroupId)
nodeGroup := nodeGroups[nodeGroupId]
if nodeGroup == nil {
err = fmt.Errorf("Node group %s not found", nodeGroup)
} else {
err = nodeGroup.DeleteNodes(nodesToBeDeleted)
}
if err != nil {
klog.Warningf("Error while trying to delete nodes from %v: %v", nodeGroup.Id(), err)
}
}
}
func (a *StaticAutoscaler) nodeGroupsById() map[string]cloudprovider.NodeGroup {
nodeGroups := make(map[string]cloudprovider.NodeGroup)
for _, nodeGroup := range a.CloudProvider.NodeGroups() {
nodeGroups[nodeGroup.Id()] = nodeGroup
}
return nodeGroups
}
// don't consider pods newer than newPodScaleUpDelay seconds old as unschedulable // don't consider pods newer than newPodScaleUpDelay seconds old as unschedulable
func (a *StaticAutoscaler) filterOutYoungPods(allUnschedulablePods []*apiv1.Pod, currentTime time.Time) []*apiv1.Pod { func (a *StaticAutoscaler) filterOutYoungPods(allUnschedulablePods []*apiv1.Pod, currentTime time.Time) []*apiv1.Pod {
var oldUnschedulablePods []*apiv1.Pod var oldUnschedulablePods []*apiv1.Pod

View File

@ -17,10 +17,13 @@ limitations under the License.
package core package core
import ( import (
"k8s.io/autoscaler/cluster-autoscaler/cloudprovider"
"reflect" "reflect"
"strings"
"testing" "testing"
"time" "time"
mockprovider "k8s.io/autoscaler/cluster-autoscaler/cloudprovider/mocks"
testprovider "k8s.io/autoscaler/cluster-autoscaler/cloudprovider/test" testprovider "k8s.io/autoscaler/cluster-autoscaler/cloudprovider/test"
"k8s.io/autoscaler/cluster-autoscaler/clusterstate" "k8s.io/autoscaler/cluster-autoscaler/clusterstate"
"k8s.io/autoscaler/cluster-autoscaler/config" "k8s.io/autoscaler/cluster-autoscaler/config"
@ -692,3 +695,204 @@ func TestStaticAutoscalerRunOncePodsWithPriorities(t *testing.T) {
podDisruptionBudgetListerMock, daemonSetListerMock, onScaleUpMock, onScaleDownMock) podDisruptionBudgetListerMock, daemonSetListerMock, onScaleUpMock, onScaleDownMock)
} }
func TestStaticAutoscalerOutOfResources(t *testing.T) {
// setup
provider := &mockprovider.CloudProvider{}
// Create context with mocked lister registry.
options := config.AutoscalingOptions{
EstimatorName: estimator.BinpackingEstimatorName,
ScaleDownEnabled: true,
ScaleDownUtilizationThreshold: 0.5,
MaxNodesTotal: 10,
MaxCoresTotal: 10,
MaxMemoryTotal: 100000,
ScaleDownUnreadyTime: time.Minute,
ScaleDownUnneededTime: time.Minute,
ExpendablePodsPriorityCutoff: 10,
}
context := NewScaleTestAutoscalingContext(options, &fake.Clientset{}, provider)
clusterStateConfig := clusterstate.ClusterStateRegistryConfig{
OkTotalUnreadyCount: 1,
MaxNodeProvisionTime: 10 * time.Second,
}
clusterState := clusterstate.NewClusterStateRegistry(provider, clusterStateConfig, context.LogRecorder, newBackoff())
autoscaler := &StaticAutoscaler{
AutoscalingContext: &context,
clusterStateRegistry: clusterState,
lastScaleUpTime: time.Now(),
lastScaleDownFailTime: time.Now(),
}
nodeGroupA := &mockprovider.NodeGroup{}
nodeGroupB := &mockprovider.NodeGroup{}
// Three nodes with out-of-resources errors
nodeGroupA.On("Exist").Return(true)
nodeGroupA.On("Autoprovisioned").Return(false)
nodeGroupA.On("TargetSize").Return(5, nil)
nodeGroupA.On("Id").Return("A")
nodeGroupA.On("DeleteNodes", mock.Anything).Return(nil)
nodeGroupA.On("Nodes").Return([]cloudprovider.Instance{
{
"A1",
&cloudprovider.InstanceStatus{
State: cloudprovider.InstanceRunning,
},
},
{
"A2",
&cloudprovider.InstanceStatus{
State: cloudprovider.InstanceCreating,
},
},
{
"A3",
&cloudprovider.InstanceStatus{
State: cloudprovider.InstanceCreating,
ErrorInfo: &cloudprovider.InstanceErrorInfo{
ErrorClass: cloudprovider.OutOfResourcesErrorClass,
ErrorCode: "STOCKOUT",
},
},
},
{
"A4",
&cloudprovider.InstanceStatus{
State: cloudprovider.InstanceCreating,
ErrorInfo: &cloudprovider.InstanceErrorInfo{
ErrorClass: cloudprovider.OutOfResourcesErrorClass,
ErrorCode: "STOCKOUT",
},
},
},
{
"A5",
&cloudprovider.InstanceStatus{
State: cloudprovider.InstanceCreating,
ErrorInfo: &cloudprovider.InstanceErrorInfo{
ErrorClass: cloudprovider.OutOfResourcesErrorClass,
ErrorCode: "QUOTA",
},
},
},
}, nil).Twice()
nodeGroupB.On("Exist").Return(true)
nodeGroupB.On("Autoprovisioned").Return(false)
nodeGroupB.On("TargetSize").Return(5, nil)
nodeGroupB.On("Id").Return("B")
nodeGroupB.On("DeleteNodes", mock.Anything).Return(nil)
nodeGroupB.On("Nodes").Return([]cloudprovider.Instance{
{
"B1",
&cloudprovider.InstanceStatus{
State: cloudprovider.InstanceRunning,
},
},
}, nil)
provider.On("NodeGroups").Return([]cloudprovider.NodeGroup{nodeGroupA})
provider.On("NodeGroupForNode", mock.Anything).Return(
func(node *apiv1.Node) cloudprovider.NodeGroup {
if strings.HasPrefix(node.Spec.ProviderID, "A") {
return nodeGroupA
}
if strings.HasPrefix(node.Spec.ProviderID, "B") {
return nodeGroupB
}
return nil
}, nil)
now := time.Now()
// propagate nodes info in cluster state
clusterState.UpdateNodes([]*apiv1.Node{}, now)
// delete nodes with create errors
autoscaler.deleteCreatedNodesWithErrors()
// check delete was called on correct nodes
nodeGroupA.AssertCalled(t, "DeleteNodes", mock.MatchedBy(
func(nodes []*apiv1.Node) bool {
if len(nodes) != 3 {
return false
}
names := make(map[string]bool)
for _, node := range nodes {
names[node.Spec.ProviderID] = true
}
return names["A3"] && names["A4"] && names["A5"]
}))
// TODO assert that scaleup was failed (separately for QUOTA and STOCKOUT)
// propagate nodes info in cluster state again
// no changes in what provider returns
clusterState.UpdateNodes([]*apiv1.Node{}, now)
// delete nodes with create errors
autoscaler.deleteCreatedNodesWithErrors()
// nodes should be deleted again
nodeGroupA.AssertCalled(t, "DeleteNodes", mock.MatchedBy(
func(nodes []*apiv1.Node) bool {
if len(nodes) != 3 {
return false
}
names := make(map[string]bool)
for _, node := range nodes {
names[node.Spec.ProviderID] = true
}
return names["A3"] && names["A4"] && names["A5"]
}))
// TODO assert that scaleup is not failed again
// restub node group A so nodes are no longer reporting errors
nodeGroupA.On("Nodes").Return([]cloudprovider.Instance{
{
"A1",
&cloudprovider.InstanceStatus{
State: cloudprovider.InstanceRunning,
},
},
{
"A2",
&cloudprovider.InstanceStatus{
State: cloudprovider.InstanceCreating,
},
},
{
"A3",
&cloudprovider.InstanceStatus{
State: cloudprovider.InstanceDeleting,
},
},
{
"A4",
&cloudprovider.InstanceStatus{
State: cloudprovider.InstanceDeleting,
},
},
{
"A5",
&cloudprovider.InstanceStatus{
State: cloudprovider.InstanceDeleting,
},
},
}, nil)
// update cluster state
clusterState.UpdateNodes([]*apiv1.Node{}, now)
// delete nodes with create errors
autoscaler.deleteCreatedNodesWithErrors()
// we expect no more Delete Nodes
nodeGroupA.AssertNumberOfCalls(t, "DeleteNodes", 2)
}