CA: Make CSR's Readiness keep lists of node names instead of just their count

This does make us call len() in a bunch of places within CSR, but allows
for greater flexibility - it's possible to act on the sets of nodes determined
by Readiness.
This commit is contained in:
Kuba Tużnik 2023-02-03 17:40:09 +01:00
parent 0ae555cbee
commit 6978ff8829
3 changed files with 66 additions and 69 deletions

View File

@ -354,7 +354,7 @@ func (csr *ClusterStateRegistry) IsClusterHealthy() bool {
csr.Lock()
defer csr.Unlock()
totalUnready := csr.totalReadiness.Unready
totalUnready := len(csr.totalReadiness.Unready)
if totalUnready > csr.config.OkTotalUnreadyCount &&
float64(totalUnready) > csr.config.MaxTotalUnreadyPercentage/100.0*float64(len(csr.nodes)) {
@ -384,14 +384,14 @@ func (csr *ClusterStateRegistry) IsNodeGroupHealthy(nodeGroupName string) bool {
unjustifiedUnready := 0
// Too few nodes, something is missing. Below the expected node count.
if readiness.Ready < acceptable.MinNodes {
unjustifiedUnready += acceptable.MinNodes - readiness.Ready
if len(readiness.Ready) < acceptable.MinNodes {
unjustifiedUnready += acceptable.MinNodes - len(readiness.Ready)
}
// TODO: verify against max nodes as well.
if unjustifiedUnready > csr.config.OkTotalUnreadyCount &&
float64(unjustifiedUnready) > csr.config.MaxTotalUnreadyPercentage/100.0*
float64(readiness.Ready+readiness.Unready+readiness.NotStarted) {
float64(len(readiness.Ready)+len(readiness.Unready)+len(readiness.NotStarted)) {
return false
}
@ -444,7 +444,7 @@ func (csr *ClusterStateRegistry) getProvisionedAndTargetSizesForNodeGroup(nodeGr
}
return 0, target, true
}
provisioned = readiness.Registered - readiness.NotStarted
provisioned = len(readiness.Registered) - len(readiness.NotStarted)
return provisioned, target, true
}
@ -496,7 +496,7 @@ func (csr *ClusterStateRegistry) updateAcceptableRanges(targetSize map[string]in
size := targetSize[nodeGroup.Id()]
readiness := csr.perNodeGroupReadiness[nodeGroup.Id()]
result[nodeGroup.Id()] = AcceptableRange{
MinNodes: size - readiness.LongUnregistered,
MinNodes: size - len(readiness.LongUnregistered),
MaxNodes: size,
CurrentTarget: size,
}
@ -516,46 +516,45 @@ func (csr *ClusterStateRegistry) updateAcceptableRanges(targetSize map[string]in
// Readiness contains readiness information about a group of nodes.
type Readiness struct {
// Number of ready nodes.
Ready int
// Number of unready nodes that broke down after they started.
Unready int
// Number of nodes that are being currently deleted. They exist in K8S but
// Names of ready nodes.
Ready []string
// Names of unready nodes that broke down after they started.
Unready []string
// Names of nodes that are being currently deleted. They exist in K8S but
// are not included in NodeGroup.TargetSize().
Deleted int
// Number of nodes that are not yet fully started.
NotStarted int
// Number of all registered nodes in the group (ready/unready/deleted/etc).
Registered int
// Number of nodes that failed to register within a reasonable limit.
LongUnregistered int
// Number of nodes that haven't yet registered.
Unregistered int
Deleted []string
// Names of nodes that are not yet fully started.
NotStarted []string
// Names of all registered nodes in the group (ready/unready/deleted/etc).
Registered []string
// Names of nodes that failed to register within a reasonable limit.
LongUnregistered []string
// Names of nodes that haven't yet registered.
Unregistered []string
// Time when the readiness was measured.
Time time.Time
// Number of nodes that are Unready due to missing resources.
// Names of nodes that are Unready due to missing resources.
// This field is only used for exposing information externally and
// doesn't influence CA behavior.
ResourceUnready int
ResourceUnready []string
}
func (csr *ClusterStateRegistry) updateReadinessStats(currentTime time.Time) {
perNodeGroup := make(map[string]Readiness)
total := Readiness{Time: currentTime}
update := func(current Readiness, node *apiv1.Node, nr kube_util.NodeReadiness) Readiness {
current.Registered++
current.Registered = append(current.Registered, node.Name)
if _, isDeleted := csr.deletedNodes[node.Name]; isDeleted {
current.Deleted++
current.Deleted = append(current.Deleted, node.Name)
} else if nr.Ready {
current.Ready++
current.Ready = append(current.Ready, node.Name)
} else if node.CreationTimestamp.Time.Add(MaxNodeStartupTime).After(currentTime) {
current.NotStarted++
current.NotStarted = append(current.NotStarted, node.Name)
} else {
current.Unready++
current.Unready = append(current.Unready, node.Name)
if nr.Reason == kube_util.ResourceUnready {
current.ResourceUnready++
current.ResourceUnready = append(current.ResourceUnready, node.Name)
}
}
return current
@ -579,7 +578,6 @@ func (csr *ClusterStateRegistry) updateReadinessStats(currentTime time.Time) {
total = update(total, node, nr)
}
var longUnregisteredNodeNames []string
for _, unregistered := range csr.unregisteredNodes {
nodeGroup, errNg := csr.cloudProvider.NodeGroupForNode(unregistered.Node)
if errNg != nil {
@ -592,17 +590,16 @@ func (csr *ClusterStateRegistry) updateReadinessStats(currentTime time.Time) {
}
perNgCopy := perNodeGroup[nodeGroup.Id()]
if unregistered.UnregisteredSince.Add(csr.config.MaxNodeProvisionTime).Before(currentTime) {
longUnregisteredNodeNames = append(longUnregisteredNodeNames, unregistered.Node.Name)
perNgCopy.LongUnregistered++
total.LongUnregistered++
perNgCopy.LongUnregistered = append(perNgCopy.LongUnregistered, unregistered.Node.Name)
total.LongUnregistered = append(total.LongUnregistered, unregistered.Node.Name)
} else {
perNgCopy.Unregistered++
total.Unregistered++
perNgCopy.Unregistered = append(perNgCopy.Unregistered, unregistered.Node.Name)
total.Unregistered = append(total.Unregistered, unregistered.Node.Name)
}
perNodeGroup[nodeGroup.Id()] = perNgCopy
}
if total.LongUnregistered > 0 {
klog.V(3).Infof("Found longUnregistered Nodes %s", longUnregisteredNodeNames)
if len(total.LongUnregistered) > 0 {
klog.V(3).Infof("Found longUnregistered Nodes %s", total.LongUnregistered)
}
for ngId, ngReadiness := range perNodeGroup {
@ -630,10 +627,10 @@ func (csr *ClusterStateRegistry) updateIncorrectNodeGroupSizes(currentTime time.
}
continue
}
if readiness.Registered > acceptableRange.MaxNodes ||
readiness.Registered < acceptableRange.MinNodes {
if len(readiness.Registered) > acceptableRange.MaxNodes ||
len(readiness.Registered) < acceptableRange.MinNodes {
incorrect := IncorrectNodeGroupSize{
CurrentSize: readiness.Registered,
CurrentSize: len(readiness.Registered),
ExpectedSize: acceptableRange.CurrentTarget,
FirstObserved: currentTime,
}
@ -752,12 +749,12 @@ func buildHealthStatusNodeGroup(isReady bool, readiness Readiness, acceptable Ac
condition := api.ClusterAutoscalerCondition{
Type: api.ClusterAutoscalerHealth,
Message: fmt.Sprintf("ready=%d unready=%d (resourceUnready=%d) notStarted=%d longNotStarted=0 registered=%d longUnregistered=%d cloudProviderTarget=%d (minSize=%d, maxSize=%d)",
readiness.Ready,
readiness.Unready,
readiness.ResourceUnready,
readiness.NotStarted,
readiness.Registered,
readiness.LongUnregistered,
len(readiness.Ready),
len(readiness.Unready),
len(readiness.ResourceUnready),
len(readiness.NotStarted),
len(readiness.Registered),
len(readiness.LongUnregistered),
acceptable.CurrentTarget,
minSize,
maxSize),
@ -775,7 +772,7 @@ func buildScaleUpStatusNodeGroup(isScaleUpInProgress bool, isSafeToScaleUp bool,
condition := api.ClusterAutoscalerCondition{
Type: api.ClusterAutoscalerScaleUp,
Message: fmt.Sprintf("ready=%d cloudProviderTarget=%d",
readiness.Ready,
len(readiness.Ready),
acceptable.CurrentTarget),
LastProbeTime: metav1.Time{Time: readiness.Time},
}
@ -807,12 +804,12 @@ func buildHealthStatusClusterwide(isReady bool, readiness Readiness) api.Cluster
condition := api.ClusterAutoscalerCondition{
Type: api.ClusterAutoscalerHealth,
Message: fmt.Sprintf("ready=%d unready=%d (resourceUnready=%d) notStarted=%d longNotStarted=0 registered=%d longUnregistered=%d",
readiness.Ready,
readiness.Unready,
readiness.ResourceUnready,
readiness.NotStarted,
readiness.Registered,
readiness.LongUnregistered,
len(readiness.Ready),
len(readiness.Unready),
len(readiness.ResourceUnready),
len(readiness.NotStarted),
len(readiness.Registered),
len(readiness.LongUnregistered),
),
LastProbeTime: metav1.Time{Time: readiness.Time},
}
@ -838,8 +835,8 @@ func buildScaleUpStatusClusterwide(nodeGroupStatuses []api.NodeGroupStatus, read
condition := api.ClusterAutoscalerCondition{
Type: api.ClusterAutoscalerScaleUp,
Message: fmt.Sprintf("ready=%d registered=%d",
readiness.Ready,
readiness.Registered),
len(readiness.Ready),
len(readiness.Registered)),
LastProbeTime: metav1.Time{Time: readiness.Time},
}
if isScaleUpInProgress {
@ -930,7 +927,7 @@ func (csr *ClusterStateRegistry) GetUpcomingNodes() map[string]int {
readiness := csr.perNodeGroupReadiness[id]
ar := csr.acceptableRanges[id]
// newNodes is the number of nodes that
newNodes := ar.CurrentTarget - (readiness.Ready + readiness.Unready + readiness.LongUnregistered)
newNodes := ar.CurrentTarget - (len(readiness.Ready) + len(readiness.Unready) + len(readiness.LongUnregistered))
if newNodes <= 0 {
// Negative value is unlikely but theoretically possible.
continue
@ -1003,7 +1000,7 @@ func (csr *ClusterStateRegistry) GetAutoscaledNodesCount() (currentSize, targetS
targetSize += accRange.CurrentTarget
}
for _, readiness := range csr.perNodeGroupReadiness {
currentSize += readiness.Registered - readiness.NotStarted
currentSize += len(readiness.Registered) - len(readiness.NotStarted)
}
return currentSize, targetSize
}

View File

@ -359,8 +359,8 @@ func TestUnreadyLongAfterCreation(t *testing.T) {
}, fakeLogRecorder, newBackoff())
err := clusterstate.UpdateNodes([]*apiv1.Node{ng1_1, ng2_1}, nil, now)
assert.NoError(t, err)
assert.Equal(t, 1, clusterstate.GetClusterReadiness().Unready)
assert.Equal(t, 0, clusterstate.GetClusterReadiness().NotStarted)
assert.Equal(t, 1, len(clusterstate.GetClusterReadiness().Unready))
assert.Equal(t, 0, len(clusterstate.GetClusterReadiness().NotStarted))
upcoming := clusterstate.GetUpcomingNodes()
assert.Equal(t, 0, upcoming["ng1"])
}
@ -390,22 +390,22 @@ func TestNotStarted(t *testing.T) {
}, fakeLogRecorder, newBackoff())
err := clusterstate.UpdateNodes([]*apiv1.Node{ng1_1, ng2_1}, nil, now)
assert.NoError(t, err)
assert.Equal(t, 1, clusterstate.GetClusterReadiness().NotStarted)
assert.Equal(t, 1, clusterstate.GetClusterReadiness().Ready)
assert.Equal(t, 1, len(clusterstate.GetClusterReadiness().NotStarted))
assert.Equal(t, 1, len(clusterstate.GetClusterReadiness().Ready))
// node ng2_1 moves condition to ready
SetNodeReadyState(ng2_1, true, now.Add(-4*time.Minute))
err = clusterstate.UpdateNodes([]*apiv1.Node{ng1_1, ng2_1}, nil, now)
assert.NoError(t, err)
assert.Equal(t, 1, clusterstate.GetClusterReadiness().NotStarted)
assert.Equal(t, 1, clusterstate.GetClusterReadiness().Ready)
assert.Equal(t, 1, len(clusterstate.GetClusterReadiness().NotStarted))
assert.Equal(t, 1, len(clusterstate.GetClusterReadiness().Ready))
// node ng2_1 no longer has the taint
RemoveNodeNotReadyTaint(ng2_1)
err = clusterstate.UpdateNodes([]*apiv1.Node{ng1_1, ng2_1}, nil, now)
assert.NoError(t, err)
assert.Equal(t, 0, clusterstate.GetClusterReadiness().NotStarted)
assert.Equal(t, 2, clusterstate.GetClusterReadiness().Ready)
assert.Equal(t, 0, len(clusterstate.GetClusterReadiness().NotStarted))
assert.Equal(t, 2, len(clusterstate.GetClusterReadiness().Ready))
}
func TestExpiredScaleUp(t *testing.T) {
@ -686,7 +686,7 @@ func TestCloudProviderDeletedNodes(t *testing.T) {
assert.NoError(t, err)
assert.Equal(t, 1, len(GetCloudProviderDeletedNodeNames(clusterstate)))
assert.Equal(t, "ng1-2", GetCloudProviderDeletedNodeNames(clusterstate)[0])
assert.Equal(t, 1, clusterstate.GetClusterReadiness().Deleted)
assert.Equal(t, 1, len(clusterstate.GetClusterReadiness().Deleted))
// The node is removed from Kubernetes
now.Add(time.Minute)
@ -719,7 +719,7 @@ func TestCloudProviderDeletedNodes(t *testing.T) {
assert.NoError(t, err)
assert.Equal(t, 1, len(GetCloudProviderDeletedNodeNames(clusterstate)))
assert.Equal(t, "ng1-3", GetCloudProviderDeletedNodeNames(clusterstate)[0])
assert.Equal(t, 1, clusterstate.GetClusterReadiness().Deleted)
assert.Equal(t, 1, len(clusterstate.GetClusterReadiness().Deleted))
// Confirm that previously identified deleted Cloud Provider nodes are still included
// until it is removed from Kubernetes
@ -729,7 +729,7 @@ func TestCloudProviderDeletedNodes(t *testing.T) {
assert.NoError(t, err)
assert.Equal(t, 1, len(GetCloudProviderDeletedNodeNames(clusterstate)))
assert.Equal(t, "ng1-3", GetCloudProviderDeletedNodeNames(clusterstate)[0])
assert.Equal(t, 1, clusterstate.GetClusterReadiness().Deleted)
assert.Equal(t, 1, len(clusterstate.GetClusterReadiness().Deleted))
// The node is removed from Kubernetes
now.Add(time.Minute)

View File

@ -183,7 +183,7 @@ func UpdateClusterStateMetrics(csr *clusterstate.ClusterStateRegistry) {
}
metrics.UpdateClusterSafeToAutoscale(csr.IsClusterHealthy())
readiness := csr.GetClusterReadiness()
metrics.UpdateNodesCount(readiness.Ready, readiness.Unready, readiness.NotStarted, readiness.LongUnregistered, readiness.Unregistered)
metrics.UpdateNodesCount(len(readiness.Ready), len(readiness.Unready), len(readiness.NotStarted), len(readiness.LongUnregistered), len(readiness.Unregistered))
}
// GetOldestCreateTime returns oldest creation time out of the pods in the set