diff --git a/cluster-autoscaler/clusterstate/clusterstate.go b/cluster-autoscaler/clusterstate/clusterstate.go index 471afe9191..bdb008baea 100644 --- a/cluster-autoscaler/clusterstate/clusterstate.go +++ b/cluster-autoscaler/clusterstate/clusterstate.go @@ -263,6 +263,9 @@ func (csr *ClusterStateRegistry) updateScaleRequests(currentTime time.Time) { csr.backoff.RemoveStaleBackoffData(currentTime) for nodeGroupName, scaleUpRequest := range csr.scaleUpRequests { + if csr.asyncNodeGroupStateChecker.IsUpcoming(scaleUpRequest.NodeGroup) { + continue + } if !csr.areThereUpcomingNodesInNodeGroup(nodeGroupName) { // scale up finished successfully, remove request delete(csr.scaleUpRequests, nodeGroupName) @@ -450,10 +453,7 @@ func (csr *ClusterStateRegistry) IsNodeGroupHealthy(nodeGroupName string) bool { func (csr *ClusterStateRegistry) updateNodeGroupMetrics() { autoscaled := 0 autoprovisioned := 0 - for _, nodeGroup := range csr.cloudProvider.NodeGroups() { - if !nodeGroup.Exist() { - continue - } + for _, nodeGroup := range csr.getRunningNodeGroups() { if nodeGroup.Autoprovisioned() { autoprovisioned++ } else { @@ -509,6 +509,12 @@ func (csr *ClusterStateRegistry) areThereUpcomingNodesInNodeGroup(nodeGroupName return target > provisioned } +// IsNodeGroupRegistered returns true if the node group is registered in cluster state. +func (csr *ClusterStateRegistry) IsNodeGroupRegistered(nodeGroupName string) bool { + _, found := csr.acceptableRanges[nodeGroupName] + return found +} + // IsNodeGroupAtTargetSize returns true if the number of nodes provisioned in the group is equal to the target number of nodes. func (csr *ClusterStateRegistry) IsNodeGroupAtTargetSize(nodeGroupName string) bool { provisioned, target, ok := csr.getProvisionedAndTargetSizesForNodeGroup(nodeGroupName) @@ -555,7 +561,7 @@ type AcceptableRange struct { // the expected number of ready nodes is between targetSize and targetSize + 3. func (csr *ClusterStateRegistry) updateAcceptableRanges(targetSize map[string]int) { result := make(map[string]AcceptableRange) - for _, nodeGroup := range csr.cloudProvider.NodeGroups() { + for _, nodeGroup := range csr.getRunningNodeGroups() { size := targetSize[nodeGroup.Id()] readiness := csr.perNodeGroupReadiness[nodeGroup.Id()] result[nodeGroup.Id()] = AcceptableRange{ @@ -681,17 +687,12 @@ func (csr *ClusterStateRegistry) updateReadinessStats(currentTime time.Time) { // Calculates which node groups have incorrect size. func (csr *ClusterStateRegistry) updateIncorrectNodeGroupSizes(currentTime time.Time) { result := make(map[string]IncorrectNodeGroupSize) - for _, nodeGroup := range csr.cloudProvider.NodeGroups() { + for _, nodeGroup := range csr.getRunningNodeGroups() { acceptableRange, found := csr.acceptableRanges[nodeGroup.Id()] if !found { klog.Warningf("Acceptable range for node group %s not found", nodeGroup.Id()) continue } - if csr.asyncNodeGroupStateChecker.IsUpcoming(nodeGroup) { - // Nodes for upcoming node groups reside in-memory and wait for node group to be fully - // created. There is no need to mark their sizes incorrect. - continue - } readiness, found := csr.perNodeGroupReadiness[nodeGroup.Id()] if !found { // if MinNodes == 0 node group has been scaled to 0 and everything's fine @@ -781,7 +782,7 @@ func (csr *ClusterStateRegistry) GetStatus(now time.Time) *api.ClusterAutoscaler for _, nodeGroup := range csr.lastStatus.NodeGroups { nodeGroupsLastStatus[nodeGroup.Name] = nodeGroup } - for _, nodeGroup := range csr.cloudProvider.NodeGroups() { + for _, nodeGroup := range csr.getRunningNodeGroups() { nodeGroupStatus := api.NodeGroupStatus{ Name: nodeGroup.Id(), } @@ -1014,10 +1015,22 @@ func (csr *ClusterStateRegistry) GetUpcomingNodes() (upcomingCounts map[string]i return upcomingCounts, registeredNodeNames } +// getRunningNodeGroups returns running node groups, filters out upcoming ones. +func (csr *ClusterStateRegistry) getRunningNodeGroups() []cloudprovider.NodeGroup { + nodeGroups := csr.cloudProvider.NodeGroups() + result := make([]cloudprovider.NodeGroup, 0, len(nodeGroups)) + for _, nodeGroup := range nodeGroups { + if !csr.asyncNodeGroupStateChecker.IsUpcoming(nodeGroup) { + result = append(result, nodeGroup) + } + } + return result +} + // getCloudProviderNodeInstances returns map keyed on node group id where value is list of node instances // as returned by NodeGroup.Nodes(). func (csr *ClusterStateRegistry) getCloudProviderNodeInstances() (map[string][]cloudprovider.Instance, error) { - for _, nodeGroup := range csr.cloudProvider.NodeGroups() { + for _, nodeGroup := range csr.getRunningNodeGroups() { if csr.IsNodeGroupScalingUp(nodeGroup.Id()) { csr.cloudProviderNodeInstancesCache.InvalidateCacheEntry(nodeGroup) } @@ -1089,7 +1102,7 @@ func (csr *ClusterStateRegistry) GetAutoscaledNodesCount() (currentSize, targetS } func (csr *ClusterStateRegistry) handleInstanceCreationErrors(currentTime time.Time) { - nodeGroups := csr.cloudProvider.NodeGroups() + nodeGroups := csr.getRunningNodeGroups() for _, nodeGroup := range nodeGroups { csr.handleInstanceCreationErrorsForNodeGroup( diff --git a/cluster-autoscaler/clusterstate/clusterstate_test.go b/cluster-autoscaler/clusterstate/clusterstate_test.go index 6d4e683394..5aa8cec3c1 100644 --- a/cluster-autoscaler/clusterstate/clusterstate_test.go +++ b/cluster-autoscaler/clusterstate/clusterstate_test.go @@ -1225,10 +1225,11 @@ func TestUpdateAcceptableRanges(t *testing.T) { } clusterState := &ClusterStateRegistry{ - cloudProvider: provider, - perNodeGroupReadiness: tc.readiness, - scaleUpRequests: tc.scaleUpRequests, - scaleDownRequests: scaleDownRequests, + cloudProvider: provider, + perNodeGroupReadiness: tc.readiness, + scaleUpRequests: tc.scaleUpRequests, + scaleDownRequests: scaleDownRequests, + asyncNodeGroupStateChecker: asyncnodegroups.NewDefaultAsyncNodeGroupStateChecker(), } clusterState.updateAcceptableRanges(tc.targetSizes) @@ -1456,6 +1457,43 @@ func TestTruncateIfExceedMaxSize(t *testing.T) { } } +func TestIsNodeGroupRegistered(t *testing.T) { + provider := testprovider.NewTestCloudProvider(nil, nil) + registeredNodeGroupName := "registered-node-group" + provider.AddNodeGroup(registeredNodeGroupName, 1, 10, 1) + fakeClient := &fake.Clientset{} + fakeLogRecorder, _ := utils.NewStatusMapRecorder(fakeClient, "kube-system", kube_record.NewFakeRecorder(5), false, "some-map") + clusterstate := NewClusterStateRegistry( + provider, + ClusterStateRegistryConfig{MaxTotalUnreadyPercentage: 10, OkTotalUnreadyCount: 1}, + fakeLogRecorder, + newBackoff(), + nodegroupconfig.NewDefaultNodeGroupConfigProcessor(config.NodeGroupAutoscalingOptions{MaxNodeProvisionTime: 15 * time.Minute}), + asyncnodegroups.NewDefaultAsyncNodeGroupStateChecker(), + ) + clusterstate.Recalculate() + + testCases := []struct { + nodeGroupName string + want bool + }{ + { + nodeGroupName: registeredNodeGroupName, + want: true, + }, + { + nodeGroupName: "unregistered-node-group", + want: false, + }, + } + for _, tc := range testCases { + t.Run(tc.nodeGroupName, func(t *testing.T) { + registered := clusterstate.IsNodeGroupRegistered(tc.nodeGroupName) + assert.Equal(t, tc.want, registered) + }) + } +} + func TestUpcomingNodesFromUpcomingNodeGroups(t *testing.T) { testCases := []struct { diff --git a/cluster-autoscaler/core/scaleup/orchestrator/executor.go b/cluster-autoscaler/core/scaleup/orchestrator/executor.go index 63f8dbf1b1..ec064dde5d 100644 --- a/cluster-autoscaler/core/scaleup/orchestrator/executor.go +++ b/cluster-autoscaler/core/scaleup/orchestrator/executor.go @@ -176,10 +176,9 @@ func (e *scaleUpExecutor) executeScaleUp( if increase < 0 { return errors.NewAutoscalerError(errors.InternalError, fmt.Sprintf("increase in number of nodes cannot be negative, got: %v", increase)) } - if e.asyncNodeGroupStateChecker.IsUpcoming(info.Group) { + if !info.Group.Exist() && e.asyncNodeGroupStateChecker.IsUpcoming(info.Group) { // Don't emit scale up event for upcoming node group as it will be generated after // the node group is created, during initial scale up. - klog.V(0).Infof("Scale-up: group %s is an upcoming node group, skipping emit scale up event", info.Group.Id()) return nil } e.scaleStateNotifier.RegisterScaleUp(info.Group, increase, time.Now())