1307 lines
50 KiB
Go
1307 lines
50 KiB
Go
/*
|
|
Copyright 2016 The Kubernetes Authors.
|
|
|
|
Licensed under the Apache License, Version 2.0 (the "License");
|
|
you may not use this file except in compliance with the License.
|
|
You may obtain a copy of the License at
|
|
|
|
http://www.apache.org/licenses/LICENSE-2.0
|
|
|
|
Unless required by applicable law or agreed to in writing, software
|
|
distributed under the License is distributed on an "AS IS" BASIS,
|
|
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
See the License for the specific language governing permissions and
|
|
limitations under the License.
|
|
*/
|
|
|
|
package clusterstate
|
|
|
|
import (
|
|
"errors"
|
|
"fmt"
|
|
"reflect"
|
|
"strings"
|
|
"sync"
|
|
"time"
|
|
|
|
"k8s.io/autoscaler/cluster-autoscaler/cloudprovider"
|
|
"k8s.io/autoscaler/cluster-autoscaler/clusterstate/api"
|
|
"k8s.io/autoscaler/cluster-autoscaler/clusterstate/utils"
|
|
"k8s.io/autoscaler/cluster-autoscaler/metrics"
|
|
"k8s.io/autoscaler/cluster-autoscaler/processors/nodegroupconfig"
|
|
"k8s.io/autoscaler/cluster-autoscaler/processors/nodegroups/asyncnodegroups"
|
|
"k8s.io/autoscaler/cluster-autoscaler/simulator/framework"
|
|
"k8s.io/autoscaler/cluster-autoscaler/utils/backoff"
|
|
"k8s.io/autoscaler/cluster-autoscaler/utils/gpu"
|
|
kube_util "k8s.io/autoscaler/cluster-autoscaler/utils/kubernetes"
|
|
"k8s.io/autoscaler/cluster-autoscaler/utils/taints"
|
|
|
|
apiv1 "k8s.io/api/core/v1"
|
|
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
|
"k8s.io/apimachinery/pkg/util/sets"
|
|
|
|
klog "k8s.io/klog/v2"
|
|
)
|
|
|
|
const (
|
|
// MaxNodeStartupTime is the maximum time from the moment the node is registered to the time the node is ready.
|
|
MaxNodeStartupTime = 15 * time.Minute
|
|
// maxErrorMessageSize is the maximum size of error messages displayed in config map as the max size of configmap is 1MB.
|
|
maxErrorMessageSize = 500
|
|
// messageTrancated is displayed at the end of a trancated message.
|
|
messageTrancated = "<truncated>"
|
|
)
|
|
|
|
var (
|
|
errMaxNodeProvisionTimeProviderNotSet = errors.New("MaxNodeProvisionTimeProvider was not set in cluster state")
|
|
)
|
|
|
|
// ScaleUpRequest contains information about the requested node group scale up.
|
|
type ScaleUpRequest struct {
|
|
// NodeGroup is the node group to be scaled up.
|
|
NodeGroup cloudprovider.NodeGroup
|
|
// Time is the time when the request was submitted.
|
|
Time time.Time
|
|
// ExpectedAddTime is the time at which the request should be fulfilled.
|
|
ExpectedAddTime time.Time
|
|
// How much the node group is increased.
|
|
Increase int
|
|
}
|
|
|
|
// ScaleDownRequest contains information about the requested node deletion.
|
|
type ScaleDownRequest struct {
|
|
// NodeName is the name of the node to be deleted.
|
|
NodeName string
|
|
// NodeGroup is the node group of the deleted node.
|
|
NodeGroup cloudprovider.NodeGroup
|
|
// Time is the time when the node deletion was requested.
|
|
Time time.Time
|
|
// ExpectedDeleteTime is the time when the node is expected to be deleted.
|
|
ExpectedDeleteTime time.Time
|
|
}
|
|
|
|
// ClusterStateRegistryConfig contains configuration information for ClusterStateRegistry.
|
|
type ClusterStateRegistryConfig struct {
|
|
// Maximum percentage of unready nodes in total, if the number of unready nodes is higher than OkTotalUnreadyCount.
|
|
MaxTotalUnreadyPercentage float64
|
|
// Minimum number of nodes that must be unready for MaxTotalUnreadyPercentage to apply.
|
|
// This is to ensure that in very small clusters (e.g. 2 nodes) a single node's failure doesn't disable autoscaling.
|
|
OkTotalUnreadyCount int
|
|
}
|
|
|
|
// IncorrectNodeGroupSize contains information about how much the current size of the node group
|
|
// differs from the expected size. Prolonged, stable mismatch is an indication of quota
|
|
// or startup issues.
|
|
type IncorrectNodeGroupSize struct {
|
|
// ExpectedSize is the size of the node group measured on the cloud provider side.
|
|
ExpectedSize int
|
|
// CurrentSize is the size of the node group measured on the kubernetes side.
|
|
CurrentSize int
|
|
// FirstObserved is the time when the given difference occurred.
|
|
FirstObserved time.Time
|
|
}
|
|
|
|
// UnregisteredNode contains information about nodes that are present on the cluster provider side
|
|
// but failed to register in Kubernetes.
|
|
type UnregisteredNode struct {
|
|
// Node is a dummy node that contains only the name of the node.
|
|
Node *apiv1.Node
|
|
// UnregisteredSince is the time when the node was first spotted.
|
|
UnregisteredSince time.Time
|
|
}
|
|
|
|
// ScaleUpFailure contains information about a failure of a scale-up.
|
|
type ScaleUpFailure struct {
|
|
NodeGroup cloudprovider.NodeGroup
|
|
Reason metrics.FailedScaleUpReason
|
|
Time time.Time
|
|
}
|
|
|
|
// ClusterStateRegistry is a structure to keep track the current state of the cluster.
|
|
type ClusterStateRegistry struct {
|
|
sync.Mutex
|
|
config ClusterStateRegistryConfig
|
|
scaleUpRequests map[string]*ScaleUpRequest // nodeGroupName -> ScaleUpRequest
|
|
scaleDownRequests []*ScaleDownRequest
|
|
nodes []*apiv1.Node
|
|
nodeInfosForGroups map[string]*framework.NodeInfo
|
|
cloudProvider cloudprovider.CloudProvider
|
|
perNodeGroupReadiness map[string]Readiness
|
|
totalReadiness Readiness
|
|
acceptableRanges map[string]AcceptableRange
|
|
incorrectNodeGroupSizes map[string]IncorrectNodeGroupSize
|
|
unregisteredNodes map[string]UnregisteredNode
|
|
deletedNodes map[string]struct{}
|
|
candidatesForScaleDown map[string][]string
|
|
backoff backoff.Backoff
|
|
lastStatus *api.ClusterAutoscalerStatus
|
|
lastScaleDownUpdateTime time.Time
|
|
logRecorder *utils.LogEventRecorder
|
|
cloudProviderNodeInstances map[string][]cloudprovider.Instance
|
|
previousCloudProviderNodeInstances map[string][]cloudprovider.Instance
|
|
cloudProviderNodeInstancesCache *utils.CloudProviderNodeInstancesCache
|
|
interrupt chan struct{}
|
|
nodeGroupConfigProcessor nodegroupconfig.NodeGroupConfigProcessor
|
|
asyncNodeGroupStateChecker asyncnodegroups.AsyncNodeGroupStateChecker
|
|
|
|
// scaleUpFailures contains information about scale-up failures for each node group. It should be
|
|
// cleared periodically to avoid unnecessary accumulation.
|
|
scaleUpFailures map[string][]ScaleUpFailure
|
|
}
|
|
|
|
// NodeGroupScalingSafety contains information about the safety of the node group to scale up/down.
|
|
type NodeGroupScalingSafety struct {
|
|
SafeToScale bool
|
|
Healthy bool
|
|
BackoffStatus backoff.Status
|
|
}
|
|
|
|
// NewClusterStateRegistry creates new ClusterStateRegistry.
|
|
func NewClusterStateRegistry(cloudProvider cloudprovider.CloudProvider, config ClusterStateRegistryConfig, logRecorder *utils.LogEventRecorder, backoff backoff.Backoff, nodeGroupConfigProcessor nodegroupconfig.NodeGroupConfigProcessor, asyncNodeGroupStateChecker asyncnodegroups.AsyncNodeGroupStateChecker) *ClusterStateRegistry {
|
|
return &ClusterStateRegistry{
|
|
scaleUpRequests: make(map[string]*ScaleUpRequest),
|
|
scaleDownRequests: make([]*ScaleDownRequest, 0),
|
|
nodes: make([]*apiv1.Node, 0),
|
|
cloudProvider: cloudProvider,
|
|
config: config,
|
|
perNodeGroupReadiness: make(map[string]Readiness),
|
|
acceptableRanges: make(map[string]AcceptableRange),
|
|
incorrectNodeGroupSizes: make(map[string]IncorrectNodeGroupSize),
|
|
unregisteredNodes: make(map[string]UnregisteredNode),
|
|
deletedNodes: make(map[string]struct{}),
|
|
candidatesForScaleDown: make(map[string][]string),
|
|
backoff: backoff,
|
|
lastStatus: utils.EmptyClusterAutoscalerStatus(),
|
|
logRecorder: logRecorder,
|
|
cloudProviderNodeInstancesCache: utils.NewCloudProviderNodeInstancesCache(cloudProvider),
|
|
interrupt: make(chan struct{}),
|
|
scaleUpFailures: make(map[string][]ScaleUpFailure),
|
|
nodeGroupConfigProcessor: nodeGroupConfigProcessor,
|
|
asyncNodeGroupStateChecker: asyncNodeGroupStateChecker,
|
|
}
|
|
}
|
|
|
|
// Start starts components running in background.
|
|
func (csr *ClusterStateRegistry) Start() {
|
|
csr.cloudProviderNodeInstancesCache.Start(csr.interrupt)
|
|
}
|
|
|
|
// Stop stops components running in background.
|
|
func (csr *ClusterStateRegistry) Stop() {
|
|
close(csr.interrupt)
|
|
}
|
|
|
|
// RegisterScaleUp registers scale-up for give node group
|
|
func (csr *ClusterStateRegistry) RegisterScaleUp(nodeGroup cloudprovider.NodeGroup, delta int, currentTime time.Time) {
|
|
csr.Lock()
|
|
defer csr.Unlock()
|
|
csr.registerOrUpdateScaleUpNoLock(nodeGroup, delta, currentTime)
|
|
}
|
|
|
|
// MaxNodeProvisionTime returns MaxNodeProvisionTime value that should be used for the given NodeGroup.
|
|
// TODO(BigDarkClown): remove this method entirely, it is a redundant wrapper
|
|
func (csr *ClusterStateRegistry) MaxNodeProvisionTime(nodeGroup cloudprovider.NodeGroup) (time.Duration, error) {
|
|
return csr.nodeGroupConfigProcessor.GetMaxNodeProvisionTime(nodeGroup)
|
|
}
|
|
|
|
func (csr *ClusterStateRegistry) registerOrUpdateScaleUpNoLock(nodeGroup cloudprovider.NodeGroup, delta int, currentTime time.Time) {
|
|
maxNodeProvisionTime, err := csr.MaxNodeProvisionTime(nodeGroup)
|
|
if err != nil {
|
|
klog.Warningf("Couldn't update scale up request: failed to get maxNodeProvisionTime for node group %s: %v", nodeGroup.Id(), err)
|
|
return
|
|
}
|
|
|
|
scaleUpRequest, found := csr.scaleUpRequests[nodeGroup.Id()]
|
|
if !found && delta > 0 {
|
|
scaleUpRequest = &ScaleUpRequest{
|
|
NodeGroup: nodeGroup,
|
|
Increase: delta,
|
|
Time: currentTime,
|
|
ExpectedAddTime: currentTime.Add(maxNodeProvisionTime),
|
|
}
|
|
csr.scaleUpRequests[nodeGroup.Id()] = scaleUpRequest
|
|
return
|
|
}
|
|
|
|
if !found {
|
|
// delta <=0
|
|
return
|
|
}
|
|
|
|
// update the old request
|
|
if scaleUpRequest.Increase+delta <= 0 {
|
|
// increase <= 0 means that there is no scale-up intent really
|
|
delete(csr.scaleUpRequests, nodeGroup.Id())
|
|
return
|
|
}
|
|
|
|
scaleUpRequest.Increase += delta
|
|
if delta > 0 {
|
|
// if we are actually adding new nodes shift Time and ExpectedAddTime
|
|
scaleUpRequest.Time = currentTime
|
|
scaleUpRequest.ExpectedAddTime = currentTime.Add(maxNodeProvisionTime)
|
|
}
|
|
}
|
|
|
|
// RegisterScaleDown registers node scale down.
|
|
func (csr *ClusterStateRegistry) RegisterScaleDown(nodeGroup cloudprovider.NodeGroup,
|
|
nodeName string, currentTime time.Time, expectedDeleteTime time.Time) {
|
|
request := &ScaleDownRequest{
|
|
NodeGroup: nodeGroup,
|
|
NodeName: nodeName,
|
|
Time: currentTime,
|
|
ExpectedDeleteTime: expectedDeleteTime,
|
|
}
|
|
csr.Lock()
|
|
defer csr.Unlock()
|
|
csr.scaleDownRequests = append(csr.scaleDownRequests, request)
|
|
}
|
|
|
|
// To be executed under a lock.
|
|
func (csr *ClusterStateRegistry) updateScaleRequests(currentTime time.Time) {
|
|
// clean up stale backoff info
|
|
csr.backoff.RemoveStaleBackoffData(currentTime)
|
|
|
|
for nodeGroupName, scaleUpRequest := range csr.scaleUpRequests {
|
|
if csr.asyncNodeGroupStateChecker.IsUpcoming(scaleUpRequest.NodeGroup) {
|
|
continue
|
|
}
|
|
if !csr.areThereUpcomingNodesInNodeGroup(nodeGroupName) {
|
|
// scale up finished successfully, remove request
|
|
delete(csr.scaleUpRequests, nodeGroupName)
|
|
klog.V(4).Infof("Scale up in group %v finished successfully in %v",
|
|
nodeGroupName, currentTime.Sub(scaleUpRequest.Time))
|
|
continue
|
|
}
|
|
|
|
if scaleUpRequest.ExpectedAddTime.Before(currentTime) {
|
|
klog.Warningf("Scale-up timed out for node group %v after %v",
|
|
nodeGroupName, currentTime.Sub(scaleUpRequest.Time))
|
|
csr.logRecorder.Eventf(apiv1.EventTypeWarning, "ScaleUpTimedOut",
|
|
"Nodes added to group %s failed to register within %v",
|
|
scaleUpRequest.NodeGroup.Id(), currentTime.Sub(scaleUpRequest.Time))
|
|
availableGPUTypes := csr.cloudProvider.GetAvailableGPUTypes()
|
|
gpuResource, gpuType := "", ""
|
|
nodeInfo, err := scaleUpRequest.NodeGroup.TemplateNodeInfo()
|
|
if err != nil {
|
|
klog.Warningf("Failed to get template node info for a node group: %s", err)
|
|
} else {
|
|
gpuResource, gpuType = gpu.GetGpuInfoForMetrics(csr.cloudProvider.GetNodeGpuConfig(nodeInfo.Node()), availableGPUTypes, nodeInfo.Node(), scaleUpRequest.NodeGroup)
|
|
}
|
|
csr.registerFailedScaleUpNoLock(scaleUpRequest.NodeGroup, metrics.Timeout, cloudprovider.InstanceErrorInfo{
|
|
ErrorClass: cloudprovider.OtherErrorClass,
|
|
ErrorCode: "timeout",
|
|
ErrorMessage: fmt.Sprintf("Scale-up timed out for node group %v after %v", nodeGroupName, currentTime.Sub(scaleUpRequest.Time)),
|
|
}, gpuResource, gpuType, currentTime)
|
|
delete(csr.scaleUpRequests, nodeGroupName)
|
|
}
|
|
}
|
|
|
|
newScaleDownRequests := make([]*ScaleDownRequest, 0)
|
|
for _, scaleDownRequest := range csr.scaleDownRequests {
|
|
if scaleDownRequest.ExpectedDeleteTime.After(currentTime) {
|
|
newScaleDownRequests = append(newScaleDownRequests, scaleDownRequest)
|
|
}
|
|
}
|
|
csr.scaleDownRequests = newScaleDownRequests
|
|
}
|
|
|
|
// To be executed under a lock.
|
|
func (csr *ClusterStateRegistry) backoffNodeGroup(nodeGroup cloudprovider.NodeGroup, errorInfo cloudprovider.InstanceErrorInfo, currentTime time.Time) {
|
|
nodeGroupInfo := csr.nodeInfosForGroups[nodeGroup.Id()]
|
|
backoffUntil := csr.backoff.Backoff(nodeGroup, nodeGroupInfo, errorInfo, currentTime)
|
|
klog.Warningf("Disabling scale-up for node group %v until %v; errorClass=%v; errorCode=%v", nodeGroup.Id(), backoffUntil, errorInfo.ErrorClass, errorInfo.ErrorCode)
|
|
}
|
|
|
|
// RegisterFailedScaleUp should be called after getting error from cloudprovider
|
|
// when trying to scale-up node group. It will mark this group as not safe to autoscale
|
|
// for some time.
|
|
func (csr *ClusterStateRegistry) RegisterFailedScaleUp(nodeGroup cloudprovider.NodeGroup, reason string, errorMessage, gpuResourceName, gpuType string, currentTime time.Time) {
|
|
csr.Lock()
|
|
defer csr.Unlock()
|
|
csr.registerFailedScaleUpNoLock(nodeGroup, metrics.FailedScaleUpReason(reason), cloudprovider.InstanceErrorInfo{
|
|
ErrorClass: cloudprovider.OtherErrorClass,
|
|
ErrorCode: string(reason),
|
|
ErrorMessage: errorMessage,
|
|
}, gpuResourceName, gpuType, currentTime)
|
|
}
|
|
|
|
// RegisterFailedScaleDown records failed scale-down for a nodegroup.
|
|
// We don't need to implement this function for cluster state registry
|
|
func (csr *ClusterStateRegistry) RegisterFailedScaleDown(_ cloudprovider.NodeGroup, _ string, _ time.Time) {
|
|
}
|
|
|
|
func (csr *ClusterStateRegistry) registerFailedScaleUpNoLock(nodeGroup cloudprovider.NodeGroup, reason metrics.FailedScaleUpReason, errorInfo cloudprovider.InstanceErrorInfo, gpuResourceName, gpuType string, currentTime time.Time) {
|
|
csr.scaleUpFailures[nodeGroup.Id()] = append(csr.scaleUpFailures[nodeGroup.Id()], ScaleUpFailure{NodeGroup: nodeGroup, Reason: reason, Time: currentTime})
|
|
metrics.RegisterFailedScaleUp(reason, gpuResourceName, gpuType)
|
|
csr.backoffNodeGroup(nodeGroup, errorInfo, currentTime)
|
|
}
|
|
|
|
// UpdateNodes updates the state of the nodes in the ClusterStateRegistry and recalculates the stats
|
|
func (csr *ClusterStateRegistry) UpdateNodes(nodes []*apiv1.Node, nodeInfosForGroups map[string]*framework.NodeInfo, currentTime time.Time) error {
|
|
csr.updateNodeGroupMetrics()
|
|
targetSizes, err := getTargetSizes(csr.cloudProvider)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
metrics.UpdateNodeGroupTargetSize(targetSizes)
|
|
|
|
cloudProviderNodeInstances, err := csr.getCloudProviderNodeInstances()
|
|
if err != nil {
|
|
return err
|
|
}
|
|
cloudProviderNodesRemoved := csr.getCloudProviderDeletedNodes(nodes)
|
|
notRegistered := getNotRegisteredNodes(nodes, cloudProviderNodeInstances, currentTime)
|
|
|
|
csr.Lock()
|
|
defer csr.Unlock()
|
|
|
|
csr.nodes = nodes
|
|
csr.nodeInfosForGroups = nodeInfosForGroups
|
|
csr.previousCloudProviderNodeInstances = csr.cloudProviderNodeInstances
|
|
csr.cloudProviderNodeInstances = cloudProviderNodeInstances
|
|
|
|
csr.updateUnregisteredNodes(notRegistered)
|
|
csr.updateCloudProviderDeletedNodes(cloudProviderNodesRemoved)
|
|
csr.updateReadinessStats(currentTime)
|
|
|
|
// update acceptable ranges based on requests from last loop and targetSizes
|
|
// updateScaleRequests relies on acceptableRanges being up to date
|
|
csr.updateAcceptableRanges(targetSizes)
|
|
csr.updateScaleRequests(currentTime)
|
|
csr.handleInstanceCreationErrors(currentTime)
|
|
// recalculate acceptable ranges after removing timed out requests
|
|
csr.updateAcceptableRanges(targetSizes)
|
|
csr.updateIncorrectNodeGroupSizes(currentTime)
|
|
return nil
|
|
}
|
|
|
|
// Recalculate cluster state after scale-ups or scale-downs were registered.
|
|
func (csr *ClusterStateRegistry) Recalculate() {
|
|
targetSizes, err := getTargetSizes(csr.cloudProvider)
|
|
if err != nil {
|
|
klog.Warningf("Failed to get target sizes, when trying to recalculate cluster state: %v", err)
|
|
}
|
|
|
|
csr.Lock()
|
|
defer csr.Unlock()
|
|
csr.updateAcceptableRanges(targetSizes)
|
|
}
|
|
|
|
// getTargetSizes gets target sizes of node groups.
|
|
func getTargetSizes(cp cloudprovider.CloudProvider) (map[string]int, error) {
|
|
result := make(map[string]int)
|
|
for _, ng := range cp.NodeGroups() {
|
|
size, err := ng.TargetSize()
|
|
if err != nil {
|
|
return map[string]int{}, err
|
|
}
|
|
result[ng.Id()] = size
|
|
}
|
|
return result, nil
|
|
}
|
|
|
|
// IsClusterHealthy returns true if the cluster health is within the acceptable limits
|
|
func (csr *ClusterStateRegistry) IsClusterHealthy() bool {
|
|
csr.Lock()
|
|
defer csr.Unlock()
|
|
|
|
totalUnready := len(csr.totalReadiness.Unready)
|
|
|
|
if totalUnready > csr.config.OkTotalUnreadyCount &&
|
|
float64(totalUnready) > csr.config.MaxTotalUnreadyPercentage/100.0*float64(len(csr.nodes)) {
|
|
return false
|
|
}
|
|
|
|
return true
|
|
}
|
|
|
|
// IsNodeGroupHealthy returns true if the node group health is within the acceptable limits
|
|
func (csr *ClusterStateRegistry) IsNodeGroupHealthy(nodeGroupName string) bool {
|
|
acceptable, found := csr.acceptableRanges[nodeGroupName]
|
|
if !found {
|
|
klog.V(5).Infof("Failed to find acceptable ranges for %v", nodeGroupName)
|
|
return false
|
|
}
|
|
|
|
readiness, found := csr.perNodeGroupReadiness[nodeGroupName]
|
|
if !found {
|
|
// No nodes but target == 0 or just scaling up.
|
|
if acceptable.CurrentTarget == 0 || (acceptable.MinNodes == 0 && acceptable.CurrentTarget > 0) {
|
|
return true
|
|
}
|
|
klog.V(5).Infof("Failed to find readiness information for %v", nodeGroupName)
|
|
return false
|
|
}
|
|
|
|
unjustifiedUnready := 0
|
|
// Too few nodes, something is missing. Below the expected node count.
|
|
if len(readiness.Ready) < acceptable.MinNodes {
|
|
unjustifiedUnready += acceptable.MinNodes - len(readiness.Ready)
|
|
}
|
|
// TODO: verify against max nodes as well.
|
|
if unjustifiedUnready > csr.config.OkTotalUnreadyCount &&
|
|
float64(unjustifiedUnready) > csr.config.MaxTotalUnreadyPercentage/100.0*
|
|
float64(len(readiness.Ready)+len(readiness.Unready)+len(readiness.NotStarted)) {
|
|
return false
|
|
}
|
|
|
|
return true
|
|
}
|
|
|
|
// updateNodeGroupMetrics looks at NodeGroups provided by cloudprovider and updates corresponding metrics
|
|
func (csr *ClusterStateRegistry) updateNodeGroupMetrics() {
|
|
autoscaled := 0
|
|
autoprovisioned := 0
|
|
for _, nodeGroup := range csr.getRunningNodeGroups() {
|
|
if nodeGroup.Autoprovisioned() {
|
|
autoprovisioned++
|
|
} else {
|
|
autoscaled++
|
|
}
|
|
}
|
|
metrics.UpdateNodeGroupsCount(autoscaled, autoprovisioned)
|
|
}
|
|
|
|
// BackoffStatusForNodeGroup queries the backoff status of the node group
|
|
func (csr *ClusterStateRegistry) BackoffStatusForNodeGroup(nodeGroup cloudprovider.NodeGroup, now time.Time) backoff.Status {
|
|
return csr.backoff.BackoffStatus(nodeGroup, csr.nodeInfosForGroups[nodeGroup.Id()], now)
|
|
}
|
|
|
|
// NodeGroupScaleUpSafety returns information about node group safety to be scaled up now.
|
|
func (csr *ClusterStateRegistry) NodeGroupScaleUpSafety(nodeGroup cloudprovider.NodeGroup, now time.Time) NodeGroupScalingSafety {
|
|
isHealthy := csr.IsNodeGroupHealthy(nodeGroup.Id())
|
|
backoffStatus := csr.backoff.BackoffStatus(nodeGroup, csr.nodeInfosForGroups[nodeGroup.Id()], now)
|
|
return NodeGroupScalingSafety{SafeToScale: isHealthy && !backoffStatus.IsBackedOff, Healthy: isHealthy, BackoffStatus: backoffStatus}
|
|
}
|
|
|
|
func (csr *ClusterStateRegistry) getProvisionedAndTargetSizesForNodeGroup(nodeGroupName string) (provisioned, target int, ok bool) {
|
|
if len(csr.acceptableRanges) == 0 {
|
|
klog.Warningf("AcceptableRanges have not been populated yet. Skip checking")
|
|
return 0, 0, false
|
|
}
|
|
|
|
acceptable, found := csr.acceptableRanges[nodeGroupName]
|
|
if !found {
|
|
klog.Warningf("Failed to find acceptable ranges for %v", nodeGroupName)
|
|
return 0, 0, false
|
|
}
|
|
target = acceptable.CurrentTarget
|
|
|
|
readiness, found := csr.perNodeGroupReadiness[nodeGroupName]
|
|
if !found {
|
|
// No need to warn if node group has size 0 (was scaled to 0 before).
|
|
if acceptable.MinNodes != 0 {
|
|
klog.Warningf("Failed to find readiness information for %v", nodeGroupName)
|
|
}
|
|
return 0, target, true
|
|
}
|
|
provisioned = len(readiness.Registered) - len(readiness.NotStarted)
|
|
|
|
return provisioned, target, true
|
|
}
|
|
|
|
func (csr *ClusterStateRegistry) areThereUpcomingNodesInNodeGroup(nodeGroupName string) bool {
|
|
provisioned, target, ok := csr.getProvisionedAndTargetSizesForNodeGroup(nodeGroupName)
|
|
if !ok {
|
|
return false
|
|
}
|
|
return target > provisioned
|
|
}
|
|
|
|
// IsNodeGroupRegistered returns true if the node group is registered in cluster state.
|
|
func (csr *ClusterStateRegistry) IsNodeGroupRegistered(nodeGroupName string) bool {
|
|
_, found := csr.acceptableRanges[nodeGroupName]
|
|
return found
|
|
}
|
|
|
|
// IsNodeGroupAtTargetSize returns true if the number of nodes provisioned in the group is equal to the target number of nodes.
|
|
func (csr *ClusterStateRegistry) IsNodeGroupAtTargetSize(nodeGroupName string) bool {
|
|
provisioned, target, ok := csr.getProvisionedAndTargetSizesForNodeGroup(nodeGroupName)
|
|
if !ok {
|
|
return false
|
|
}
|
|
return target == provisioned
|
|
}
|
|
|
|
// IsNodeGroupScalingUp returns true if the node group is currently scaling up.
|
|
func (csr *ClusterStateRegistry) IsNodeGroupScalingUp(nodeGroupName string) bool {
|
|
if !csr.areThereUpcomingNodesInNodeGroup(nodeGroupName) {
|
|
return false
|
|
}
|
|
_, found := csr.scaleUpRequests[nodeGroupName]
|
|
return found
|
|
}
|
|
|
|
// HasNodeGroupStartedScaleUp returns true if the node group has started scale up regardless
|
|
// of whether there are any upcoming nodes. This is useful in the case when the node group's
|
|
// size reverts back to its previous size before the next UpdatesCall and we want to know
|
|
// if a scale up for node group has started.
|
|
func (csr *ClusterStateRegistry) HasNodeGroupStartedScaleUp(nodeGroupName string) bool {
|
|
csr.Lock()
|
|
defer csr.Unlock()
|
|
_, found := csr.scaleUpRequests[nodeGroupName]
|
|
return found
|
|
}
|
|
|
|
// AcceptableRange contains information about acceptable size of a node group.
|
|
type AcceptableRange struct {
|
|
// MinNodes is the minimum number of nodes in the group.
|
|
MinNodes int
|
|
// MaxNodes is the maximum number of nodes in the group.
|
|
MaxNodes int
|
|
// CurrentTarget is the current target size of the group.
|
|
CurrentTarget int
|
|
}
|
|
|
|
// updateAcceptableRanges updates cluster state registry with how many nodes can be in a cluster.
|
|
// The function assumes that the nodeGroup.TargetSize() is the desired number of nodes.
|
|
// So if there has been a recent scale up of size 5 then there should be between targetSize-5 and targetSize
|
|
// nodes in ready state. In the same way, if there have been 3 nodes removed recently then
|
|
// the expected number of ready nodes is between targetSize and targetSize + 3.
|
|
func (csr *ClusterStateRegistry) updateAcceptableRanges(targetSize map[string]int) {
|
|
result := make(map[string]AcceptableRange)
|
|
for _, nodeGroup := range csr.getRunningNodeGroups() {
|
|
size := targetSize[nodeGroup.Id()]
|
|
readiness := csr.perNodeGroupReadiness[nodeGroup.Id()]
|
|
result[nodeGroup.Id()] = AcceptableRange{
|
|
MinNodes: size - len(readiness.LongUnregistered),
|
|
MaxNodes: size,
|
|
CurrentTarget: size,
|
|
}
|
|
}
|
|
for nodeGroupName, scaleUpRequest := range csr.scaleUpRequests {
|
|
acceptableRange := result[nodeGroupName]
|
|
acceptableRange.MinNodes -= scaleUpRequest.Increase
|
|
result[nodeGroupName] = acceptableRange
|
|
}
|
|
for _, scaleDownRequest := range csr.scaleDownRequests {
|
|
acceptableRange := result[scaleDownRequest.NodeGroup.Id()]
|
|
acceptableRange.MaxNodes++
|
|
result[scaleDownRequest.NodeGroup.Id()] = acceptableRange
|
|
}
|
|
csr.acceptableRanges = result
|
|
}
|
|
|
|
// Readiness contains readiness information about a group of nodes.
|
|
type Readiness struct {
|
|
// Names of ready nodes.
|
|
Ready []string
|
|
// Names of unready nodes that broke down after they started.
|
|
Unready []string
|
|
// Names of nodes that are being currently deleted. They exist in K8S but
|
|
// are not included in NodeGroup.TargetSize().
|
|
Deleted []string
|
|
// Names of nodes that are not yet fully started.
|
|
NotStarted []string
|
|
// Names of all registered nodes in the group (ready/unready/deleted/etc).
|
|
Registered []string
|
|
// Names of nodes that failed to register within a reasonable limit.
|
|
LongUnregistered []string
|
|
// Names of nodes that haven't yet registered.
|
|
Unregistered []string
|
|
// Time when the readiness was measured.
|
|
Time time.Time
|
|
// Names of nodes that are Unready due to missing resources.
|
|
// This field is only used for exposing information externally and
|
|
// doesn't influence CA behavior.
|
|
ResourceUnready []string
|
|
}
|
|
|
|
func (csr *ClusterStateRegistry) updateReadinessStats(currentTime time.Time) {
|
|
perNodeGroup := make(map[string]Readiness)
|
|
total := Readiness{Time: currentTime}
|
|
|
|
update := func(current Readiness, node *apiv1.Node, nr kube_util.NodeReadiness) Readiness {
|
|
current.Registered = append(current.Registered, node.Name)
|
|
if _, isDeleted := csr.deletedNodes[node.Name]; isDeleted {
|
|
current.Deleted = append(current.Deleted, node.Name)
|
|
} else if nr.Ready {
|
|
current.Ready = append(current.Ready, node.Name)
|
|
} else if node.CreationTimestamp.Time.Add(MaxNodeStartupTime).After(currentTime) {
|
|
current.NotStarted = append(current.NotStarted, node.Name)
|
|
} else {
|
|
current.Unready = append(current.Unready, node.Name)
|
|
if nr.Reason == kube_util.ResourceUnready {
|
|
current.ResourceUnready = append(current.ResourceUnready, node.Name)
|
|
}
|
|
}
|
|
return current
|
|
}
|
|
|
|
for _, node := range csr.nodes {
|
|
nodeGroup, errNg := csr.cloudProvider.NodeGroupForNode(node)
|
|
nr, errReady := kube_util.GetNodeReadiness(node)
|
|
|
|
// Node is most likely not autoscaled, however check the errors.
|
|
if nodeGroup == nil || reflect.ValueOf(nodeGroup).IsNil() {
|
|
if errNg != nil {
|
|
klog.Warningf("Failed to get nodegroup for %s: %v", node.Name, errNg)
|
|
}
|
|
if errReady != nil {
|
|
klog.Warningf("Failed to get readiness info for %s: %v", node.Name, errReady)
|
|
}
|
|
} else {
|
|
perNodeGroup[nodeGroup.Id()] = update(perNodeGroup[nodeGroup.Id()], node, nr)
|
|
}
|
|
total = update(total, node, nr)
|
|
}
|
|
|
|
for _, unregistered := range csr.unregisteredNodes {
|
|
nodeGroup, errNg := csr.cloudProvider.NodeGroupForNode(unregistered.Node)
|
|
if errNg != nil {
|
|
klog.Warningf("Failed to get nodegroup for %s: %v", unregistered.Node.Name, errNg)
|
|
continue
|
|
}
|
|
if nodeGroup == nil || reflect.ValueOf(nodeGroup).IsNil() {
|
|
klog.Warningf("Nodegroup is nil for %s", unregistered.Node.Name)
|
|
continue
|
|
}
|
|
perNgCopy := perNodeGroup[nodeGroup.Id()]
|
|
maxNodeProvisionTime, err := csr.MaxNodeProvisionTime(nodeGroup)
|
|
if err != nil {
|
|
klog.Warningf("Failed to get maxNodeProvisionTime for node %s in node group %s: %v", unregistered.Node.Name, nodeGroup.Id(), err)
|
|
continue
|
|
}
|
|
if unregistered.UnregisteredSince.Add(maxNodeProvisionTime).Before(currentTime) {
|
|
perNgCopy.LongUnregistered = append(perNgCopy.LongUnregistered, unregistered.Node.Name)
|
|
total.LongUnregistered = append(total.LongUnregistered, unregistered.Node.Name)
|
|
} else {
|
|
perNgCopy.Unregistered = append(perNgCopy.Unregistered, unregistered.Node.Name)
|
|
total.Unregistered = append(total.Unregistered, unregistered.Node.Name)
|
|
}
|
|
perNodeGroup[nodeGroup.Id()] = perNgCopy
|
|
}
|
|
if len(total.LongUnregistered) > 0 {
|
|
klog.V(3).Infof("Found longUnregistered Nodes %s", total.LongUnregistered)
|
|
}
|
|
|
|
for ngId, ngReadiness := range perNodeGroup {
|
|
ngReadiness.Time = currentTime
|
|
perNodeGroup[ngId] = ngReadiness
|
|
}
|
|
csr.perNodeGroupReadiness = perNodeGroup
|
|
csr.totalReadiness = total
|
|
}
|
|
|
|
// Calculates which node groups have incorrect size.
|
|
func (csr *ClusterStateRegistry) updateIncorrectNodeGroupSizes(currentTime time.Time) {
|
|
result := make(map[string]IncorrectNodeGroupSize)
|
|
for _, nodeGroup := range csr.getRunningNodeGroups() {
|
|
acceptableRange, found := csr.acceptableRanges[nodeGroup.Id()]
|
|
if !found {
|
|
klog.Warningf("Acceptable range for node group %s not found", nodeGroup.Id())
|
|
continue
|
|
}
|
|
readiness, found := csr.perNodeGroupReadiness[nodeGroup.Id()]
|
|
if !found {
|
|
// if MinNodes == 0 node group has been scaled to 0 and everything's fine
|
|
if acceptableRange.MinNodes != 0 {
|
|
klog.Warningf("Readiness for node group %s not found", nodeGroup.Id())
|
|
}
|
|
continue
|
|
}
|
|
unregisteredNodes := len(readiness.Unregistered) + len(readiness.LongUnregistered)
|
|
if len(readiness.Registered) > acceptableRange.CurrentTarget ||
|
|
len(readiness.Registered) < acceptableRange.CurrentTarget-unregisteredNodes {
|
|
incorrect := IncorrectNodeGroupSize{
|
|
CurrentSize: len(readiness.Registered),
|
|
ExpectedSize: acceptableRange.CurrentTarget,
|
|
FirstObserved: currentTime,
|
|
}
|
|
existing, found := csr.incorrectNodeGroupSizes[nodeGroup.Id()]
|
|
if found {
|
|
if incorrect.CurrentSize == existing.CurrentSize &&
|
|
incorrect.ExpectedSize == existing.ExpectedSize {
|
|
incorrect = existing
|
|
}
|
|
}
|
|
result[nodeGroup.Id()] = incorrect
|
|
}
|
|
}
|
|
csr.incorrectNodeGroupSizes = result
|
|
}
|
|
|
|
func (csr *ClusterStateRegistry) updateUnregisteredNodes(unregisteredNodes []UnregisteredNode) {
|
|
result := make(map[string]UnregisteredNode)
|
|
for _, unregistered := range unregisteredNodes {
|
|
if prev, found := csr.unregisteredNodes[unregistered.Node.Name]; found {
|
|
result[unregistered.Node.Name] = prev
|
|
} else {
|
|
result[unregistered.Node.Name] = unregistered
|
|
}
|
|
}
|
|
csr.unregisteredNodes = result
|
|
}
|
|
|
|
// GetUnregisteredNodes returns a list of all unregistered nodes.
|
|
func (csr *ClusterStateRegistry) GetUnregisteredNodes() []UnregisteredNode {
|
|
csr.Lock()
|
|
defer csr.Unlock()
|
|
|
|
result := make([]UnregisteredNode, 0, len(csr.unregisteredNodes))
|
|
for _, unregistered := range csr.unregisteredNodes {
|
|
result = append(result, unregistered)
|
|
}
|
|
return result
|
|
}
|
|
|
|
func (csr *ClusterStateRegistry) updateCloudProviderDeletedNodes(deletedNodes []*apiv1.Node) {
|
|
result := make(map[string]struct{}, len(deletedNodes))
|
|
for _, deleted := range deletedNodes {
|
|
result[deleted.Name] = struct{}{}
|
|
}
|
|
csr.deletedNodes = result
|
|
}
|
|
|
|
// UpdateScaleDownCandidates updates scale down candidates
|
|
func (csr *ClusterStateRegistry) UpdateScaleDownCandidates(nodes []*apiv1.Node, now time.Time) {
|
|
result := make(map[string][]string)
|
|
for _, node := range nodes {
|
|
group, err := csr.cloudProvider.NodeGroupForNode(node)
|
|
if err != nil {
|
|
klog.Warningf("Failed to get node group for %s: %v", node.Name, err)
|
|
continue
|
|
}
|
|
if group == nil || reflect.ValueOf(group).IsNil() {
|
|
continue
|
|
}
|
|
result[group.Id()] = append(result[group.Id()], node.Name)
|
|
}
|
|
csr.candidatesForScaleDown = result
|
|
csr.lastScaleDownUpdateTime = now
|
|
}
|
|
|
|
// GetStatus returns ClusterAutoscalerStatus with the current cluster autoscaler status.
|
|
func (csr *ClusterStateRegistry) GetStatus(now time.Time) *api.ClusterAutoscalerStatus {
|
|
result := &api.ClusterAutoscalerStatus{
|
|
AutoscalerStatus: api.ClusterAutoscalerRunning,
|
|
NodeGroups: make([]api.NodeGroupStatus, 0),
|
|
}
|
|
nodeGroupsLastStatus := make(map[string]api.NodeGroupStatus)
|
|
for _, nodeGroup := range csr.lastStatus.NodeGroups {
|
|
nodeGroupsLastStatus[nodeGroup.Name] = nodeGroup
|
|
}
|
|
for _, nodeGroup := range csr.getRunningNodeGroups() {
|
|
nodeGroupStatus := api.NodeGroupStatus{
|
|
Name: nodeGroup.Id(),
|
|
}
|
|
readiness := csr.perNodeGroupReadiness[nodeGroup.Id()]
|
|
acceptable := csr.acceptableRanges[nodeGroup.Id()]
|
|
|
|
nodeGroupLastStatus := nodeGroupsLastStatus[nodeGroup.Id()]
|
|
|
|
// Health.
|
|
nodeGroupStatus.Health = buildHealthStatusNodeGroup(
|
|
csr.IsNodeGroupHealthy(nodeGroup.Id()), readiness, acceptable, nodeGroup.MinSize(), nodeGroup.MaxSize(), nodeGroupLastStatus.Health)
|
|
|
|
// Scale up.
|
|
nodeGroupStatus.ScaleUp = csr.buildScaleUpStatusNodeGroup(
|
|
nodeGroup,
|
|
readiness,
|
|
acceptable, now, nodeGroupLastStatus.ScaleUp)
|
|
|
|
// Scale down.
|
|
nodeGroupStatus.ScaleDown = buildScaleDownStatusNodeGroup(
|
|
csr.candidatesForScaleDown[nodeGroup.Id()], csr.lastScaleDownUpdateTime, nodeGroupLastStatus.ScaleDown)
|
|
|
|
result.NodeGroups = append(result.NodeGroups, nodeGroupStatus)
|
|
}
|
|
result.ClusterWide.Health =
|
|
buildHealthStatusClusterwide(csr.IsClusterHealthy(), csr.totalReadiness, csr.lastStatus.ClusterWide.Health)
|
|
result.ClusterWide.ScaleUp =
|
|
buildScaleUpStatusClusterwide(result.NodeGroups, csr.totalReadiness, csr.lastStatus.ClusterWide.ScaleUp)
|
|
result.ClusterWide.ScaleDown =
|
|
buildScaleDownStatusClusterwide(csr.candidatesForScaleDown, csr.lastScaleDownUpdateTime, csr.lastStatus.ClusterWide.ScaleDown)
|
|
|
|
csr.lastStatus = result
|
|
return result
|
|
}
|
|
|
|
// GetClusterReadiness returns current readiness stats of cluster
|
|
func (csr *ClusterStateRegistry) GetClusterReadiness() Readiness {
|
|
return csr.totalReadiness
|
|
}
|
|
|
|
func buildNodeCount(readiness Readiness) api.NodeCount {
|
|
return api.NodeCount{
|
|
Registered: api.RegisteredNodeCount{
|
|
Total: len(readiness.Registered),
|
|
Ready: len(readiness.Ready),
|
|
NotStarted: len(readiness.NotStarted),
|
|
BeingDeleted: len(readiness.Deleted),
|
|
Unready: api.RegisteredUnreadyNodeCount{
|
|
Total: len(readiness.Unready),
|
|
ResourceUnready: len(readiness.ResourceUnready),
|
|
},
|
|
},
|
|
LongUnregistered: len(readiness.LongUnregistered),
|
|
Unregistered: len(readiness.Unregistered),
|
|
}
|
|
}
|
|
|
|
func buildHealthStatusNodeGroup(isHealthy bool, readiness Readiness, acceptable AcceptableRange, minSize, maxSize int, lastStatus api.NodeGroupHealthCondition) api.NodeGroupHealthCondition {
|
|
condition := api.NodeGroupHealthCondition{
|
|
NodeCounts: buildNodeCount(readiness),
|
|
CloudProviderTarget: acceptable.CurrentTarget,
|
|
MinSize: minSize,
|
|
MaxSize: maxSize,
|
|
LastProbeTime: metav1.Time{Time: readiness.Time},
|
|
}
|
|
if isHealthy {
|
|
condition.Status = api.ClusterAutoscalerHealthy
|
|
} else {
|
|
condition.Status = api.ClusterAutoscalerUnhealthy
|
|
}
|
|
if condition.Status == lastStatus.Status {
|
|
condition.LastTransitionTime = lastStatus.LastTransitionTime
|
|
} else {
|
|
condition.LastTransitionTime = condition.LastProbeTime
|
|
}
|
|
return condition
|
|
}
|
|
|
|
func (csr *ClusterStateRegistry) buildScaleUpStatusNodeGroup(nodeGroup cloudprovider.NodeGroup, readiness Readiness, acceptable AcceptableRange, now time.Time, lastStatus api.NodeGroupScaleUpCondition) api.NodeGroupScaleUpCondition {
|
|
isScaleUpInProgress := csr.IsNodeGroupScalingUp(nodeGroup.Id())
|
|
scaleUpSafety := csr.NodeGroupScaleUpSafety(nodeGroup, now)
|
|
condition := api.NodeGroupScaleUpCondition{
|
|
LastProbeTime: metav1.Time{Time: readiness.Time},
|
|
}
|
|
if isScaleUpInProgress {
|
|
condition.Status = api.ClusterAutoscalerInProgress
|
|
} else if !scaleUpSafety.Healthy {
|
|
condition.Status = api.ClusterAutoscalerUnhealthy
|
|
} else if !scaleUpSafety.SafeToScale {
|
|
condition.Status = api.ClusterAutoscalerBackoff
|
|
condition.BackoffInfo = api.BackoffInfo{
|
|
ErrorCode: scaleUpSafety.BackoffStatus.ErrorInfo.ErrorCode,
|
|
ErrorMessage: truncateIfExceedMaxLength(scaleUpSafety.BackoffStatus.ErrorInfo.ErrorMessage, maxErrorMessageSize),
|
|
}
|
|
} else {
|
|
condition.Status = api.ClusterAutoscalerNoActivity
|
|
}
|
|
if condition.Status == lastStatus.Status {
|
|
condition.LastTransitionTime = lastStatus.LastTransitionTime
|
|
} else {
|
|
condition.LastTransitionTime = condition.LastProbeTime
|
|
}
|
|
return condition
|
|
}
|
|
|
|
func buildScaleDownStatusNodeGroup(candidates []string, lastProbed time.Time, lastStatus api.ScaleDownCondition) api.ScaleDownCondition {
|
|
condition := api.ScaleDownCondition{
|
|
Candidates: len(candidates),
|
|
LastProbeTime: metav1.Time{Time: lastProbed},
|
|
}
|
|
if len(candidates) > 0 {
|
|
condition.Status = api.ClusterAutoscalerCandidatesPresent
|
|
} else {
|
|
condition.Status = api.ClusterAutoscalerNoCandidates
|
|
}
|
|
if condition.Status == lastStatus.Status {
|
|
condition.LastTransitionTime = lastStatus.LastTransitionTime
|
|
} else {
|
|
condition.LastTransitionTime = condition.LastProbeTime
|
|
}
|
|
return condition
|
|
}
|
|
|
|
func buildHealthStatusClusterwide(isHealthy bool, readiness Readiness, lastStatus api.ClusterHealthCondition) api.ClusterHealthCondition {
|
|
condition := api.ClusterHealthCondition{
|
|
NodeCounts: buildNodeCount(readiness),
|
|
LastProbeTime: metav1.Time{Time: readiness.Time},
|
|
}
|
|
if isHealthy {
|
|
condition.Status = api.ClusterAutoscalerHealthy
|
|
} else {
|
|
condition.Status = api.ClusterAutoscalerUnhealthy
|
|
}
|
|
if condition.Status == lastStatus.Status {
|
|
condition.LastTransitionTime = lastStatus.LastTransitionTime
|
|
} else {
|
|
condition.LastTransitionTime = condition.LastProbeTime
|
|
}
|
|
return condition
|
|
}
|
|
|
|
func buildScaleUpStatusClusterwide(nodeGroupsStatuses []api.NodeGroupStatus, readiness Readiness, lastStatus api.ClusterScaleUpCondition) api.ClusterScaleUpCondition {
|
|
isScaleUpInProgress := false
|
|
for _, nodeGroupStatus := range nodeGroupsStatuses {
|
|
if nodeGroupStatus.ScaleUp.Status == api.ClusterAutoscalerInProgress {
|
|
isScaleUpInProgress = true
|
|
break
|
|
}
|
|
}
|
|
condition := api.ClusterScaleUpCondition{
|
|
LastProbeTime: metav1.Time{Time: readiness.Time},
|
|
}
|
|
if isScaleUpInProgress {
|
|
condition.Status = api.ClusterAutoscalerInProgress
|
|
} else {
|
|
condition.Status = api.ClusterAutoscalerNoActivity
|
|
}
|
|
if condition.Status == lastStatus.Status {
|
|
condition.LastTransitionTime = lastStatus.LastTransitionTime
|
|
} else {
|
|
condition.LastTransitionTime = condition.LastProbeTime
|
|
}
|
|
return condition
|
|
}
|
|
|
|
func buildScaleDownStatusClusterwide(candidates map[string][]string, lastProbed time.Time, lastStatus api.ScaleDownCondition) api.ScaleDownCondition {
|
|
totalCandidates := 0
|
|
for _, val := range candidates {
|
|
totalCandidates += len(val)
|
|
}
|
|
condition := api.ScaleDownCondition{
|
|
Candidates: totalCandidates,
|
|
LastProbeTime: metav1.Time{Time: lastProbed},
|
|
}
|
|
if totalCandidates > 0 {
|
|
condition.Status = api.ClusterAutoscalerCandidatesPresent
|
|
} else {
|
|
condition.Status = api.ClusterAutoscalerNoCandidates
|
|
}
|
|
if condition.Status == lastStatus.Status {
|
|
condition.LastTransitionTime = lastStatus.LastTransitionTime
|
|
} else {
|
|
condition.LastTransitionTime = condition.LastProbeTime
|
|
}
|
|
return condition
|
|
}
|
|
|
|
// GetIncorrectNodeGroupSize gets IncorrectNodeGroupSizeInformation for the given node group.
|
|
func (csr *ClusterStateRegistry) GetIncorrectNodeGroupSize(nodeGroupName string) *IncorrectNodeGroupSize {
|
|
result, found := csr.incorrectNodeGroupSizes[nodeGroupName]
|
|
if !found {
|
|
return nil
|
|
}
|
|
return &result
|
|
}
|
|
|
|
// GetUpcomingNodes returns how many new nodes will be added shortly to the node groups or should become ready soon.
|
|
// The function may overestimate the number of nodes. The second return value contains the names of upcoming nodes
|
|
// that are already registered in the cluster.
|
|
func (csr *ClusterStateRegistry) GetUpcomingNodes() (upcomingCounts map[string]int, registeredNodeNames map[string][]string) {
|
|
csr.Lock()
|
|
defer csr.Unlock()
|
|
|
|
upcomingCounts = map[string]int{}
|
|
registeredNodeNames = map[string][]string{}
|
|
for _, nodeGroup := range csr.cloudProvider.NodeGroups() {
|
|
id := nodeGroup.Id()
|
|
if csr.asyncNodeGroupStateChecker.IsUpcoming(nodeGroup) {
|
|
size, err := nodeGroup.TargetSize()
|
|
if size >= 0 || err != nil {
|
|
upcomingCounts[id] = size
|
|
}
|
|
continue
|
|
}
|
|
readiness := csr.perNodeGroupReadiness[id]
|
|
ar := csr.acceptableRanges[id]
|
|
// newNodes is the number of nodes that
|
|
newNodes := ar.CurrentTarget - (len(readiness.Ready) + len(readiness.Unready) + len(readiness.LongUnregistered))
|
|
if newNodes <= 0 {
|
|
// Negative value is unlikely but theoretically possible.
|
|
continue
|
|
}
|
|
upcomingCounts[id] = newNodes
|
|
// newNodes should include instances that have registered with k8s but aren't ready yet, instances that came up on the cloud provider side
|
|
// but haven't registered with k8s yet, and instances that haven't even come up on the cloud provider side yet (but are reflected in the target
|
|
// size). The first category is categorized as NotStarted in readiness, the other two aren't registered with k8s, so they shouldn't be
|
|
// included.
|
|
registeredNodeNames[id] = readiness.NotStarted
|
|
}
|
|
return upcomingCounts, registeredNodeNames
|
|
}
|
|
|
|
// getRunningNodeGroups returns running node groups, filters out upcoming ones.
|
|
func (csr *ClusterStateRegistry) getRunningNodeGroups() []cloudprovider.NodeGroup {
|
|
nodeGroups := csr.cloudProvider.NodeGroups()
|
|
result := make([]cloudprovider.NodeGroup, 0, len(nodeGroups))
|
|
for _, nodeGroup := range nodeGroups {
|
|
if !csr.asyncNodeGroupStateChecker.IsUpcoming(nodeGroup) {
|
|
result = append(result, nodeGroup)
|
|
}
|
|
}
|
|
return result
|
|
}
|
|
|
|
// getCloudProviderNodeInstances returns map keyed on node group id where value is list of node instances
|
|
// as returned by NodeGroup.Nodes().
|
|
func (csr *ClusterStateRegistry) getCloudProviderNodeInstances() (map[string][]cloudprovider.Instance, error) {
|
|
for _, nodeGroup := range csr.getRunningNodeGroups() {
|
|
if csr.IsNodeGroupScalingUp(nodeGroup.Id()) {
|
|
csr.cloudProviderNodeInstancesCache.InvalidateCacheEntry(nodeGroup)
|
|
}
|
|
}
|
|
return csr.cloudProviderNodeInstancesCache.GetCloudProviderNodeInstances()
|
|
}
|
|
|
|
// Calculates which of the existing cloud provider nodes are not yet registered in Kubernetes.
|
|
// As we are expecting for those instances to be Ready soon (O(~minutes)), to speed up the scaling process,
|
|
// we are injecting a temporary, fake nodes to continue scaling based on in-memory cluster state.
|
|
func getNotRegisteredNodes(allNodes []*apiv1.Node, cloudProviderNodeInstances map[string][]cloudprovider.Instance, time time.Time) []UnregisteredNode {
|
|
registered := sets.NewString()
|
|
for _, node := range allNodes {
|
|
registered.Insert(node.Spec.ProviderID)
|
|
}
|
|
notRegistered := make([]UnregisteredNode, 0)
|
|
for _, instances := range cloudProviderNodeInstances {
|
|
for _, instance := range instances {
|
|
if !registered.Has(instance.Id) && expectedToRegister(instance) {
|
|
notRegistered = append(notRegistered, UnregisteredNode{
|
|
Node: FakeNode(instance, cloudprovider.FakeNodeUnregistered),
|
|
UnregisteredSince: time,
|
|
})
|
|
}
|
|
}
|
|
}
|
|
return notRegistered
|
|
}
|
|
|
|
func expectedToRegister(instance cloudprovider.Instance) bool {
|
|
return instance.Status == nil || (instance.Status.State != cloudprovider.InstanceDeleting && instance.Status.ErrorInfo == nil)
|
|
}
|
|
|
|
// Calculates which of the registered nodes in Kubernetes that do not exist in cloud provider.
|
|
func (csr *ClusterStateRegistry) getCloudProviderDeletedNodes(allNodes []*apiv1.Node) []*apiv1.Node {
|
|
nodesRemoved := make([]*apiv1.Node, 0)
|
|
for _, node := range allNodes {
|
|
if !csr.hasCloudProviderInstance(node) {
|
|
nodesRemoved = append(nodesRemoved, node)
|
|
}
|
|
}
|
|
return nodesRemoved
|
|
}
|
|
|
|
func (csr *ClusterStateRegistry) hasCloudProviderInstance(node *apiv1.Node) bool {
|
|
exists, err := csr.cloudProvider.HasInstance(node)
|
|
if err == nil {
|
|
return exists
|
|
}
|
|
if !errors.Is(err, cloudprovider.ErrNotImplemented) {
|
|
klog.Warningf("Failed to check cloud provider has instance for %s: %v", node.Name, err)
|
|
}
|
|
return !taints.HasToBeDeletedTaint(node)
|
|
}
|
|
|
|
// GetAutoscaledNodesCount calculates and returns the actual and the target number of nodes
|
|
// belonging to autoscaled node groups in the cluster.
|
|
func (csr *ClusterStateRegistry) GetAutoscaledNodesCount() (currentSize, targetSize int) {
|
|
csr.Lock()
|
|
defer csr.Unlock()
|
|
|
|
for _, accRange := range csr.acceptableRanges {
|
|
targetSize += accRange.CurrentTarget
|
|
}
|
|
for _, readiness := range csr.perNodeGroupReadiness {
|
|
currentSize += len(readiness.Registered) - len(readiness.NotStarted)
|
|
}
|
|
return currentSize, targetSize
|
|
}
|
|
|
|
func (csr *ClusterStateRegistry) handleInstanceCreationErrors(currentTime time.Time) {
|
|
nodeGroups := csr.getRunningNodeGroups()
|
|
|
|
for _, nodeGroup := range nodeGroups {
|
|
csr.handleInstanceCreationErrorsForNodeGroup(
|
|
nodeGroup,
|
|
csr.cloudProviderNodeInstances[nodeGroup.Id()],
|
|
csr.previousCloudProviderNodeInstances[nodeGroup.Id()],
|
|
currentTime)
|
|
}
|
|
}
|
|
|
|
func (csr *ClusterStateRegistry) handleInstanceCreationErrorsForNodeGroup(
|
|
nodeGroup cloudprovider.NodeGroup,
|
|
currentInstances []cloudprovider.Instance,
|
|
previousInstances []cloudprovider.Instance,
|
|
currentTime time.Time) {
|
|
|
|
_, currentUniqueErrorMessagesForErrorCode, currentErrorCodeToInstance := csr.buildInstanceToErrorCodeMappings(currentInstances)
|
|
previousInstanceToErrorCode, _, _ := csr.buildInstanceToErrorCodeMappings(previousInstances)
|
|
|
|
for errorCode, instances := range currentErrorCodeToInstance {
|
|
if len(instances) > 0 {
|
|
klog.V(4).Infof("Found %v instances with errorCode %v in nodeGroup %v", len(instances), errorCode, nodeGroup.Id())
|
|
}
|
|
}
|
|
|
|
// If node group is scaling up and there are new node-create requests which cannot be satisfied because of
|
|
// out-of-resources errors we:
|
|
// - emit event
|
|
// - alter the scale-up
|
|
// - increase scale-up failure metric
|
|
// - backoff the node group
|
|
for errorCode, instances := range currentErrorCodeToInstance {
|
|
unseenInstanceIds := make([]string, 0)
|
|
for _, instance := range instances {
|
|
if _, seen := previousInstanceToErrorCode[instance.Id]; !seen {
|
|
unseenInstanceIds = append(unseenInstanceIds, instance.Id)
|
|
}
|
|
}
|
|
|
|
klog.V(1).Infof("Failed adding %v nodes (%v unseen previously) to group %v due to %v; errorMessages=%#v", len(instances), len(unseenInstanceIds), nodeGroup.Id(), errorCode, currentUniqueErrorMessagesForErrorCode[errorCode])
|
|
if len(unseenInstanceIds) > 0 && csr.IsNodeGroupScalingUp(nodeGroup.Id()) {
|
|
csr.logRecorder.Eventf(
|
|
apiv1.EventTypeWarning,
|
|
"ScaleUpFailed",
|
|
"Failed adding %v nodes to group %v due to %v; source errors: %v",
|
|
len(unseenInstanceIds),
|
|
nodeGroup.Id(),
|
|
errorCode,
|
|
csr.buildErrorMessageEventString(currentUniqueErrorMessagesForErrorCode[errorCode]))
|
|
|
|
availableGPUTypes := csr.cloudProvider.GetAvailableGPUTypes()
|
|
gpuResource, gpuType := "", ""
|
|
nodeInfo, err := nodeGroup.TemplateNodeInfo()
|
|
if err != nil {
|
|
klog.Warningf("Failed to get template node info for a node group: %s", err)
|
|
} else {
|
|
gpuResource, gpuType = gpu.GetGpuInfoForMetrics(csr.cloudProvider.GetNodeGpuConfig(nodeInfo.Node()), availableGPUTypes, nodeInfo.Node(), nodeGroup)
|
|
}
|
|
// Decrease the scale up request by the number of deleted nodes
|
|
csr.registerOrUpdateScaleUpNoLock(nodeGroup, -len(unseenInstanceIds), currentTime)
|
|
|
|
csr.registerFailedScaleUpNoLock(nodeGroup, metrics.FailedScaleUpReason(errorCode.code), cloudprovider.InstanceErrorInfo{
|
|
ErrorClass: errorCode.class,
|
|
ErrorCode: errorCode.code,
|
|
ErrorMessage: csr.buildErrorMessageEventString(currentUniqueErrorMessagesForErrorCode[errorCode]),
|
|
}, gpuResource, gpuType, currentTime)
|
|
}
|
|
}
|
|
}
|
|
|
|
func (csr *ClusterStateRegistry) buildErrorMessageEventString(uniqErrorMessages []string) string {
|
|
var sb strings.Builder
|
|
maxErrors := 3
|
|
for i, errorMessage := range uniqErrorMessages {
|
|
if i > 0 {
|
|
sb.WriteString(", ")
|
|
}
|
|
sb.WriteString(errorMessage)
|
|
|
|
}
|
|
if len(uniqErrorMessages) > maxErrors {
|
|
sb.WriteString(", ...")
|
|
}
|
|
return sb.String()
|
|
}
|
|
|
|
type errorCode struct {
|
|
code string
|
|
class cloudprovider.InstanceErrorClass
|
|
}
|
|
|
|
func (c errorCode) String() string {
|
|
return fmt.Sprintf("%v.%v", c.class, c.code)
|
|
}
|
|
|
|
func (csr *ClusterStateRegistry) buildInstanceToErrorCodeMappings(instances []cloudprovider.Instance) (instanceToErrorCode map[string]errorCode, uniqueErrorMessagesForErrorCode map[errorCode][]string, errorCodeToInstance map[errorCode][]cloudprovider.Instance) {
|
|
instanceToErrorCode = make(map[string]errorCode)
|
|
uniqueErrorMessagesForErrorCode = make(map[errorCode][]string)
|
|
errorCodeToInstance = make(map[errorCode][]cloudprovider.Instance)
|
|
|
|
uniqErrorMessagesForErrorCodeTmp := make(map[errorCode]map[string]bool)
|
|
for _, instance := range instances {
|
|
if instance.Status != nil && instance.Status.State == cloudprovider.InstanceCreating && instance.Status.ErrorInfo != nil {
|
|
errorInfo := instance.Status.ErrorInfo
|
|
errorCode := errorCode{errorInfo.ErrorCode, errorInfo.ErrorClass}
|
|
|
|
if _, found := uniqErrorMessagesForErrorCodeTmp[errorCode]; !found {
|
|
uniqErrorMessagesForErrorCodeTmp[errorCode] = make(map[string]bool)
|
|
}
|
|
instanceToErrorCode[instance.Id] = errorCode
|
|
uniqErrorMessagesForErrorCodeTmp[errorCode][errorInfo.ErrorMessage] = true
|
|
errorCodeToInstance[errorCode] = append(errorCodeToInstance[errorCode], instance)
|
|
}
|
|
}
|
|
|
|
for errorCode, uniqueErrorMessages := range uniqErrorMessagesForErrorCodeTmp {
|
|
for errorMessage := range uniqueErrorMessages {
|
|
uniqueErrorMessagesForErrorCode[errorCode] = append(uniqueErrorMessagesForErrorCode[errorCode], errorMessage)
|
|
}
|
|
}
|
|
|
|
return
|
|
}
|
|
|
|
// GetCreatedNodesWithErrors returns a map from node group id to list of nodes which reported a create error.
|
|
func (csr *ClusterStateRegistry) GetCreatedNodesWithErrors() map[string][]*apiv1.Node {
|
|
csr.Lock()
|
|
defer csr.Unlock()
|
|
|
|
nodesWithCreateErrors := make(map[string][]*apiv1.Node)
|
|
for nodeGroupId, nodeGroupInstances := range csr.cloudProviderNodeInstances {
|
|
_, _, instancesByErrorCode := csr.buildInstanceToErrorCodeMappings(nodeGroupInstances)
|
|
for _, instances := range instancesByErrorCode {
|
|
for _, instance := range instances {
|
|
nodesWithCreateErrors[nodeGroupId] = append(nodesWithCreateErrors[nodeGroupId], FakeNode(instance, cloudprovider.FakeNodeCreateError))
|
|
}
|
|
}
|
|
}
|
|
return nodesWithCreateErrors
|
|
}
|
|
|
|
// RefreshCloudProviderNodeInstancesCache refreshes cloud provider node instances cache.
|
|
func (csr *ClusterStateRegistry) RefreshCloudProviderNodeInstancesCache() {
|
|
csr.cloudProviderNodeInstancesCache.Refresh()
|
|
}
|
|
|
|
// InvalidateNodeInstancesCacheEntry removes a node group from the cloud provider node instances cache.
|
|
func (csr *ClusterStateRegistry) InvalidateNodeInstancesCacheEntry(nodeGroup cloudprovider.NodeGroup) {
|
|
csr.cloudProviderNodeInstancesCache.InvalidateCacheEntry(nodeGroup)
|
|
}
|
|
|
|
// FakeNode creates a fake node with Name field populated and FakeNodeReasonAnnotation added
|
|
func FakeNode(instance cloudprovider.Instance, reason string) *apiv1.Node {
|
|
return &apiv1.Node{
|
|
ObjectMeta: metav1.ObjectMeta{
|
|
Name: instance.Id,
|
|
Annotations: map[string]string{
|
|
cloudprovider.FakeNodeReasonAnnotation: reason,
|
|
},
|
|
},
|
|
Spec: apiv1.NodeSpec{
|
|
ProviderID: instance.Id,
|
|
},
|
|
}
|
|
}
|
|
|
|
// PeriodicCleanup performs clean-ups that should be done periodically, e.g.
|
|
// each Autoscaler loop.
|
|
func (csr *ClusterStateRegistry) PeriodicCleanup() {
|
|
// Clear the scale-up failures info so they don't accumulate.
|
|
csr.clearScaleUpFailures()
|
|
}
|
|
|
|
// clearScaleUpFailures clears the scale-up failures map.
|
|
func (csr *ClusterStateRegistry) clearScaleUpFailures() {
|
|
csr.Lock()
|
|
defer csr.Unlock()
|
|
csr.scaleUpFailures = make(map[string][]ScaleUpFailure)
|
|
}
|
|
|
|
// GetScaleUpFailures returns the scale-up failures map.
|
|
func (csr *ClusterStateRegistry) GetScaleUpFailures() map[string][]ScaleUpFailure {
|
|
csr.Lock()
|
|
defer csr.Unlock()
|
|
result := make(map[string][]ScaleUpFailure)
|
|
for nodeGroupId, failures := range csr.scaleUpFailures {
|
|
result[nodeGroupId] = failures
|
|
}
|
|
return result
|
|
}
|
|
|
|
func truncateIfExceedMaxLength(s string, maxLength int) string {
|
|
if len(s) <= maxLength {
|
|
return s
|
|
}
|
|
untrancatedLen := maxLength - len(messageTrancated)
|
|
if untrancatedLen < 0 {
|
|
return s[:maxLength]
|
|
}
|
|
return s[:untrancatedLen] + messageTrancated
|
|
}
|