Filter upcoming nodes in clusterstate and scale-up executor

This commit is contained in:
mendelski 2024-10-04 16:47:47 +00:00
parent bb94d270d7
commit 72ec806382
No known key found for this signature in database
GPG Key ID: 381F13B19D5B68EC
3 changed files with 70 additions and 20 deletions

View File

@ -263,6 +263,9 @@ func (csr *ClusterStateRegistry) updateScaleRequests(currentTime time.Time) {
csr.backoff.RemoveStaleBackoffData(currentTime)
for nodeGroupName, scaleUpRequest := range csr.scaleUpRequests {
if csr.asyncNodeGroupStateChecker.IsUpcoming(scaleUpRequest.NodeGroup) {
continue
}
if !csr.areThereUpcomingNodesInNodeGroup(nodeGroupName) {
// scale up finished successfully, remove request
delete(csr.scaleUpRequests, nodeGroupName)
@ -450,10 +453,7 @@ func (csr *ClusterStateRegistry) IsNodeGroupHealthy(nodeGroupName string) bool {
func (csr *ClusterStateRegistry) updateNodeGroupMetrics() {
autoscaled := 0
autoprovisioned := 0
for _, nodeGroup := range csr.cloudProvider.NodeGroups() {
if !nodeGroup.Exist() {
continue
}
for _, nodeGroup := range csr.getRunningNodeGroups() {
if nodeGroup.Autoprovisioned() {
autoprovisioned++
} else {
@ -509,6 +509,12 @@ func (csr *ClusterStateRegistry) areThereUpcomingNodesInNodeGroup(nodeGroupName
return target > provisioned
}
// IsNodeGroupRegistered returns true if the node group is registered in cluster state.
func (csr *ClusterStateRegistry) IsNodeGroupRegistered(nodeGroupName string) bool {
_, found := csr.acceptableRanges[nodeGroupName]
return found
}
// IsNodeGroupAtTargetSize returns true if the number of nodes provisioned in the group is equal to the target number of nodes.
func (csr *ClusterStateRegistry) IsNodeGroupAtTargetSize(nodeGroupName string) bool {
provisioned, target, ok := csr.getProvisionedAndTargetSizesForNodeGroup(nodeGroupName)
@ -555,7 +561,7 @@ type AcceptableRange struct {
// the expected number of ready nodes is between targetSize and targetSize + 3.
func (csr *ClusterStateRegistry) updateAcceptableRanges(targetSize map[string]int) {
result := make(map[string]AcceptableRange)
for _, nodeGroup := range csr.cloudProvider.NodeGroups() {
for _, nodeGroup := range csr.getRunningNodeGroups() {
size := targetSize[nodeGroup.Id()]
readiness := csr.perNodeGroupReadiness[nodeGroup.Id()]
result[nodeGroup.Id()] = AcceptableRange{
@ -681,17 +687,12 @@ func (csr *ClusterStateRegistry) updateReadinessStats(currentTime time.Time) {
// Calculates which node groups have incorrect size.
func (csr *ClusterStateRegistry) updateIncorrectNodeGroupSizes(currentTime time.Time) {
result := make(map[string]IncorrectNodeGroupSize)
for _, nodeGroup := range csr.cloudProvider.NodeGroups() {
for _, nodeGroup := range csr.getRunningNodeGroups() {
acceptableRange, found := csr.acceptableRanges[nodeGroup.Id()]
if !found {
klog.Warningf("Acceptable range for node group %s not found", nodeGroup.Id())
continue
}
if csr.asyncNodeGroupStateChecker.IsUpcoming(nodeGroup) {
// Nodes for upcoming node groups reside in-memory and wait for node group to be fully
// created. There is no need to mark their sizes incorrect.
continue
}
readiness, found := csr.perNodeGroupReadiness[nodeGroup.Id()]
if !found {
// if MinNodes == 0 node group has been scaled to 0 and everything's fine
@ -781,7 +782,7 @@ func (csr *ClusterStateRegistry) GetStatus(now time.Time) *api.ClusterAutoscaler
for _, nodeGroup := range csr.lastStatus.NodeGroups {
nodeGroupsLastStatus[nodeGroup.Name] = nodeGroup
}
for _, nodeGroup := range csr.cloudProvider.NodeGroups() {
for _, nodeGroup := range csr.getRunningNodeGroups() {
nodeGroupStatus := api.NodeGroupStatus{
Name: nodeGroup.Id(),
}
@ -1014,10 +1015,22 @@ func (csr *ClusterStateRegistry) GetUpcomingNodes() (upcomingCounts map[string]i
return upcomingCounts, registeredNodeNames
}
// getRunningNodeGroups returns running node groups, filters out upcoming ones.
func (csr *ClusterStateRegistry) getRunningNodeGroups() []cloudprovider.NodeGroup {
nodeGroups := csr.cloudProvider.NodeGroups()
result := make([]cloudprovider.NodeGroup, 0, len(nodeGroups))
for _, nodeGroup := range nodeGroups {
if !csr.asyncNodeGroupStateChecker.IsUpcoming(nodeGroup) {
result = append(result, nodeGroup)
}
}
return result
}
// getCloudProviderNodeInstances returns map keyed on node group id where value is list of node instances
// as returned by NodeGroup.Nodes().
func (csr *ClusterStateRegistry) getCloudProviderNodeInstances() (map[string][]cloudprovider.Instance, error) {
for _, nodeGroup := range csr.cloudProvider.NodeGroups() {
for _, nodeGroup := range csr.getRunningNodeGroups() {
if csr.IsNodeGroupScalingUp(nodeGroup.Id()) {
csr.cloudProviderNodeInstancesCache.InvalidateCacheEntry(nodeGroup)
}
@ -1089,7 +1102,7 @@ func (csr *ClusterStateRegistry) GetAutoscaledNodesCount() (currentSize, targetS
}
func (csr *ClusterStateRegistry) handleInstanceCreationErrors(currentTime time.Time) {
nodeGroups := csr.cloudProvider.NodeGroups()
nodeGroups := csr.getRunningNodeGroups()
for _, nodeGroup := range nodeGroups {
csr.handleInstanceCreationErrorsForNodeGroup(

View File

@ -1225,10 +1225,11 @@ func TestUpdateAcceptableRanges(t *testing.T) {
}
clusterState := &ClusterStateRegistry{
cloudProvider: provider,
perNodeGroupReadiness: tc.readiness,
scaleUpRequests: tc.scaleUpRequests,
scaleDownRequests: scaleDownRequests,
cloudProvider: provider,
perNodeGroupReadiness: tc.readiness,
scaleUpRequests: tc.scaleUpRequests,
scaleDownRequests: scaleDownRequests,
asyncNodeGroupStateChecker: asyncnodegroups.NewDefaultAsyncNodeGroupStateChecker(),
}
clusterState.updateAcceptableRanges(tc.targetSizes)
@ -1456,6 +1457,43 @@ func TestTruncateIfExceedMaxSize(t *testing.T) {
}
}
func TestIsNodeGroupRegistered(t *testing.T) {
provider := testprovider.NewTestCloudProvider(nil, nil)
registeredNodeGroupName := "registered-node-group"
provider.AddNodeGroup(registeredNodeGroupName, 1, 10, 1)
fakeClient := &fake.Clientset{}
fakeLogRecorder, _ := utils.NewStatusMapRecorder(fakeClient, "kube-system", kube_record.NewFakeRecorder(5), false, "some-map")
clusterstate := NewClusterStateRegistry(
provider,
ClusterStateRegistryConfig{MaxTotalUnreadyPercentage: 10, OkTotalUnreadyCount: 1},
fakeLogRecorder,
newBackoff(),
nodegroupconfig.NewDefaultNodeGroupConfigProcessor(config.NodeGroupAutoscalingOptions{MaxNodeProvisionTime: 15 * time.Minute}),
asyncnodegroups.NewDefaultAsyncNodeGroupStateChecker(),
)
clusterstate.Recalculate()
testCases := []struct {
nodeGroupName string
want bool
}{
{
nodeGroupName: registeredNodeGroupName,
want: true,
},
{
nodeGroupName: "unregistered-node-group",
want: false,
},
}
for _, tc := range testCases {
t.Run(tc.nodeGroupName, func(t *testing.T) {
registered := clusterstate.IsNodeGroupRegistered(tc.nodeGroupName)
assert.Equal(t, tc.want, registered)
})
}
}
func TestUpcomingNodesFromUpcomingNodeGroups(t *testing.T) {
testCases := []struct {

View File

@ -176,10 +176,9 @@ func (e *scaleUpExecutor) executeScaleUp(
if increase < 0 {
return errors.NewAutoscalerError(errors.InternalError, fmt.Sprintf("increase in number of nodes cannot be negative, got: %v", increase))
}
if e.asyncNodeGroupStateChecker.IsUpcoming(info.Group) {
if !info.Group.Exist() && e.asyncNodeGroupStateChecker.IsUpcoming(info.Group) {
// Don't emit scale up event for upcoming node group as it will be generated after
// the node group is created, during initial scale up.
klog.V(0).Infof("Scale-up: group %s is an upcoming node group, skipping emit scale up event", info.Group.Id())
return nil
}
e.scaleStateNotifier.RegisterScaleUp(info.Group, increase, time.Now())