WIP update to include unschedulable nodes

This commit is contained in:
elmiko 2025-08-29 14:32:50 -04:00
parent b8f910fdf4
commit a0ebb28234
26 changed files with 156 additions and 99 deletions

View File

@ -1228,7 +1228,7 @@ func runStartDeletionTest(t *testing.T, tc startDeletionTestCase, force bool) {
t.Fatalf("Couldn't create daemonset lister")
}
registry := kube_util.NewListerRegistry(nil, nil, podLister, pdbLister, dsLister, nil, nil, nil, nil)
registry := kube_util.NewListerRegistry(nil, nil, nil, podLister, pdbLister, dsLister, nil, nil, nil, nil)
ctx, err := NewScaleTestAutoscalingContext(opts, fakeClient, registry, provider, nil, nil)
if err != nil {
t.Fatalf("Couldn't set up autoscaling context: %v", err)
@ -1541,7 +1541,7 @@ func TestStartDeletionInBatchBasic(t *testing.T) {
podLister := kube_util.NewTestPodLister([]*apiv1.Pod{})
pdbLister := kube_util.NewTestPodDisruptionBudgetLister([]*policyv1.PodDisruptionBudget{})
registry := kube_util.NewListerRegistry(nil, nil, podLister, pdbLister, nil, nil, nil, nil, nil)
registry := kube_util.NewListerRegistry(nil, nil, nil, podLister, pdbLister, nil, nil, nil, nil, nil)
ctx, err := NewScaleTestAutoscalingContext(opts, fakeClient, registry, provider, nil, nil)
if err != nil {
t.Fatalf("Couldn't set up autoscaling context: %v", err)

View File

@ -139,7 +139,7 @@ func TestDaemonSetEvictionForEmptyNodes(t *testing.T) {
provider := testprovider.NewTestCloudProviderBuilder().Build()
provider.AddNodeGroup("ng1", 1, 10, 1)
provider.AddNode("ng1", n1)
registry := kube_util.NewListerRegistry(nil, nil, nil, nil, nil, nil, nil, nil, nil)
registry := kube_util.NewListerRegistry(nil, nil, nil, nil, nil, nil, nil, nil, nil, nil)
context, err := NewScaleTestAutoscalingContext(options, fakeClient, registry, provider, nil, nil)
assert.NoError(t, err)

View File

@ -146,7 +146,7 @@ func TestScheduleDeletion(t *testing.T) {
if err != nil {
t.Fatalf("Couldn't create daemonset lister")
}
registry := kube_util.NewListerRegistry(nil, nil, podLister, pdbLister, dsLister, nil, nil, nil, nil)
registry := kube_util.NewListerRegistry(nil, nil, nil, podLister, pdbLister, dsLister, nil, nil, nil, nil)
ctx, err := NewScaleTestAutoscalingContext(opts, fakeClient, registry, provider, nil, nil)
if err != nil {
t.Fatalf("Couldn't set up autoscaling context: %v", err)

View File

@ -67,7 +67,7 @@ func TestSoftTaintUpdate(t *testing.T) {
MaxBulkSoftTaintCount: 1,
MaxBulkSoftTaintTime: 3 * time.Second,
}
registry := kube_util.NewListerRegistry(nil, nil, nil, nil, nil, nil, nil, nil, nil)
registry := kube_util.NewListerRegistry(nil, nil, nil, nil, nil, nil, nil, nil, nil, nil)
actx, err := test.NewScaleTestAutoscalingContext(options, fakeClient, registry, provider, nil, nil)
assert.NoError(t, err)
@ -151,7 +151,7 @@ func TestSoftTaintTimeLimit(t *testing.T) {
MaxBulkSoftTaintCount: 10,
MaxBulkSoftTaintTime: maxSoftTaintDuration,
}
registry := kube_util.NewListerRegistry(nil, nil, nil, nil, nil, nil, nil, nil, nil)
registry := kube_util.NewListerRegistry(nil, nil, nil, nil, nil, nil, nil, nil, nil, nil)
actx, err := test.NewScaleTestAutoscalingContext(options, fakeClient, registry, provider, nil, nil)
assert.NoError(t, err)

View File

@ -125,7 +125,7 @@ func TestReplicasCounter(t *testing.T) {
jobLister, _ := kube_util.NewTestJobLister([]*batchv1.Job{job, unsetJob, jobWithSucceededReplicas})
rsLister, _ := kube_util.NewTestReplicaSetLister([]*appsv1.ReplicaSet{rs, unsetRs})
ssLister, _ := kube_util.NewTestStatefulSetLister([]*appsv1.StatefulSet{sS})
listers := kube_util.NewListerRegistry(nil, nil, nil, nil, nil, rcLister, jobLister, rsLister, ssLister)
listers := kube_util.NewListerRegistry(nil, nil, nil, nil, nil, nil, rcLister, jobLister, rsLister, ssLister)
testCases := []struct {
name string
ownerRef metav1.OwnerReference

View File

@ -484,7 +484,7 @@ func TestUpdateClusterState(t *testing.T) {
}
rsLister, err := kube_util.NewTestReplicaSetLister(tc.replicasSets)
assert.NoError(t, err)
registry := kube_util.NewListerRegistry(nil, nil, nil, nil, nil, nil, nil, rsLister, nil)
registry := kube_util.NewListerRegistry(nil, nil, nil, nil, nil, nil, nil, nil, rsLister, nil)
provider := testprovider.NewTestCloudProviderBuilder().Build()
provider.AddNodeGroup("ng1", 0, 0, 0)
for _, node := range tc.nodes {

View File

@ -197,7 +197,7 @@ func TestRemovableAt(t *testing.T) {
rsLister, err := kube_util.NewTestReplicaSetLister(nil)
assert.NoError(t, err)
registry := kube_util.NewListerRegistry(nil, nil, nil, nil, nil, nil, nil, rsLister, nil)
registry := kube_util.NewListerRegistry(nil, nil, nil, nil, nil, nil, nil, nil, rsLister, nil)
ctx, err := NewScaleTestAutoscalingContext(config.AutoscalingOptions{ScaleDownSimulationTimeout: 5 * time.Minute}, &fake.Clientset{}, registry, provider, nil, nil)
assert.NoError(t, err)

View File

@ -92,7 +92,7 @@ func TestNodePoolAsyncInitialization(t *testing.T) {
},
},
}
listers := kube_util.NewListerRegistry(nil, nil, nil, nil, nil, nil, nil, nil, nil)
listers := kube_util.NewListerRegistry(nil, nil, nil, nil, nil, nil, nil, nil, nil, nil)
upcomingNodeGroup := provider.BuildNodeGroup("upcoming-ng", 0, 100, 0, false, true, "T1", nil)
options := config.AutoscalingOptions{AsyncNodeGroupsEnabled: true}
context, err := NewScaleTestAutoscalingContext(options, &fake.Clientset{}, listers, provider, nil, nil)

View File

@ -977,7 +977,7 @@ func runSimpleScaleUpTest(t *testing.T, config *ScaleUpTestConfig) *ScaleUpTestR
extraPods[i] = buildTestPod(p)
}
podLister := kube_util.NewTestPodLister(pods)
listers := kube_util.NewListerRegistry(nil, nil, podLister, nil, nil, nil, nil, nil, nil)
listers := kube_util.NewListerRegistry(nil, nil, nil, podLister, nil, nil, nil, nil, nil, nil)
// setup node groups
var provider *testprovider.TestCloudProvider
@ -1046,7 +1046,7 @@ func runSimpleScaleUpTest(t *testing.T, config *ScaleUpTestConfig) *ScaleUpTestR
err = context.ClusterSnapshot.SetClusterState(nodes, kube_util.ScheduledPods(pods), nil)
assert.NoError(t, err)
nodeInfos, err := nodeinfosprovider.NewDefaultTemplateNodeInfoProvider(nil, false).
Process(&context, nodes, []*appsv1.DaemonSet{}, taints.TaintConfig{}, now)
Process(&context, nodes, []*apiv1.Node{}, []*appsv1.DaemonSet{}, taints.TaintConfig{}, now)
assert.NoError(t, err)
clusterState := clusterstate.NewClusterStateRegistry(provider, clusterstate.ClusterStateRegistryConfig{}, context.LogRecorder, NewBackoff(), nodegroupconfig.NewDefaultNodeGroupConfigProcessor(options.NodeGroupDefaults), asyncnodegroups.NewDefaultAsyncNodeGroupStateChecker())
clusterState.UpdateNodes(nodes, nodeInfos, time.Now())
@ -1135,7 +1135,7 @@ func TestScaleUpUnhealthy(t *testing.T) {
pods := []*apiv1.Pod{p1, p2}
podLister := kube_util.NewTestPodLister(pods)
listers := kube_util.NewListerRegistry(nil, nil, podLister, nil, nil, nil, nil, nil, nil)
listers := kube_util.NewListerRegistry(nil, nil, nil, podLister, nil, nil, nil, nil, nil, nil)
provider := testprovider.NewTestCloudProviderBuilder().WithOnScaleUp(func(nodeGroup string, increase int) error {
t.Fatalf("No expansion is expected, but increased %s by %d", nodeGroup, increase)
@ -1155,7 +1155,7 @@ func TestScaleUpUnhealthy(t *testing.T) {
assert.NoError(t, err)
err = context.ClusterSnapshot.SetClusterState(nodes, pods, nil)
assert.NoError(t, err)
nodeInfos, _ := nodeinfosprovider.NewDefaultTemplateNodeInfoProvider(nil, false).Process(&context, nodes, []*appsv1.DaemonSet{}, taints.TaintConfig{}, now)
nodeInfos, _ := nodeinfosprovider.NewDefaultTemplateNodeInfoProvider(nil, false).Process(&context, nodes, []*apiv1.Node{}, []*appsv1.DaemonSet{}, taints.TaintConfig{}, now)
clusterState := clusterstate.NewClusterStateRegistry(provider, clusterstate.ClusterStateRegistryConfig{}, context.LogRecorder, NewBackoff(), nodegroupconfig.NewDefaultNodeGroupConfigProcessor(config.NodeGroupAutoscalingOptions{MaxNodeProvisionTime: 15 * time.Minute}), asyncnodegroups.NewDefaultAsyncNodeGroupStateChecker())
clusterState.UpdateNodes(nodes, nodeInfos, time.Now())
p3 := BuildTestPod("p-new", 550, 0)
@ -1181,7 +1181,7 @@ func TestBinpackingLimiter(t *testing.T) {
nodes := []*apiv1.Node{n1, n2}
podLister := kube_util.NewTestPodLister([]*apiv1.Pod{})
listers := kube_util.NewListerRegistry(nil, nil, podLister, nil, nil, nil, nil, nil, nil)
listers := kube_util.NewListerRegistry(nil, nil, nil, podLister, nil, nil, nil, nil, nil, nil)
provider := testprovider.NewTestCloudProviderBuilder().WithOnScaleUp(func(nodeGroup string, increase int) error {
return nil
@ -1199,7 +1199,7 @@ func TestBinpackingLimiter(t *testing.T) {
err = context.ClusterSnapshot.SetClusterState(nodes, nil, nil)
assert.NoError(t, err)
nodeInfos, err := nodeinfosprovider.NewDefaultTemplateNodeInfoProvider(nil, false).
Process(&context, nodes, []*appsv1.DaemonSet{}, taints.TaintConfig{}, now)
Process(&context, nodes, []*apiv1.Node{}, []*appsv1.DaemonSet{}, taints.TaintConfig{}, now)
assert.NoError(t, err)
clusterState := clusterstate.NewClusterStateRegistry(provider, clusterstate.ClusterStateRegistryConfig{}, context.LogRecorder, NewBackoff(), nodegroupconfig.NewDefaultNodeGroupConfigProcessor(config.NodeGroupAutoscalingOptions{MaxNodeProvisionTime: 15 * time.Minute}), asyncnodegroups.NewDefaultAsyncNodeGroupStateChecker())
@ -1239,7 +1239,7 @@ func TestScaleUpNoHelp(t *testing.T) {
pods := []*apiv1.Pod{p1}
podLister := kube_util.NewTestPodLister(pods)
listers := kube_util.NewListerRegistry(nil, nil, podLister, nil, nil, nil, nil, nil, nil)
listers := kube_util.NewListerRegistry(nil, nil, nil, podLister, nil, nil, nil, nil, nil, nil)
provider := testprovider.NewTestCloudProviderBuilder().WithOnScaleUp(func(nodeGroup string, increase int) error {
t.Fatalf("No expansion is expected")
@ -1258,7 +1258,7 @@ func TestScaleUpNoHelp(t *testing.T) {
assert.NoError(t, err)
err = context.ClusterSnapshot.SetClusterState(nodes, pods, nil)
assert.NoError(t, err)
nodeInfos, _ := nodeinfosprovider.NewDefaultTemplateNodeInfoProvider(nil, false).Process(&context, nodes, []*appsv1.DaemonSet{}, taints.TaintConfig{}, now)
nodeInfos, _ := nodeinfosprovider.NewDefaultTemplateNodeInfoProvider(nil, false).Process(&context, nodes, []*apiv1.Node{}, []*appsv1.DaemonSet{}, taints.TaintConfig{}, now)
clusterState := clusterstate.NewClusterStateRegistry(provider, clusterstate.ClusterStateRegistryConfig{}, context.LogRecorder, NewBackoff(), nodegroupconfig.NewDefaultNodeGroupConfigProcessor(config.NodeGroupAutoscalingOptions{MaxNodeProvisionTime: 15 * time.Minute}), asyncnodegroups.NewDefaultAsyncNodeGroupStateChecker())
clusterState.UpdateNodes(nodes, nodeInfos, time.Now())
p3 := BuildTestPod("p-new", 500, 0)
@ -1408,12 +1408,12 @@ func TestComputeSimilarNodeGroups(t *testing.T) {
nodeGroupSetProcessor.similarNodeGroups = append(nodeGroupSetProcessor.similarNodeGroups, provider.GetNodeGroup(ng))
}
listers := kube_util.NewListerRegistry(nil, nil, kube_util.NewTestPodLister(nil), nil, nil, nil, nil, nil, nil)
listers := kube_util.NewListerRegistry(nil, nil, nil, kube_util.NewTestPodLister(nil), nil, nil, nil, nil, nil, nil)
ctx, err := NewScaleTestAutoscalingContext(config.AutoscalingOptions{BalanceSimilarNodeGroups: tc.balancingEnabled}, &fake.Clientset{}, listers, provider, nil, nil)
assert.NoError(t, err)
err = ctx.ClusterSnapshot.SetClusterState(nodes, nil, nil)
assert.NoError(t, err)
nodeInfos, _ := nodeinfosprovider.NewDefaultTemplateNodeInfoProvider(nil, false).Process(&ctx, nodes, []*appsv1.DaemonSet{}, taints.TaintConfig{}, now)
nodeInfos, _ := nodeinfosprovider.NewDefaultTemplateNodeInfoProvider(nil, false).Process(&ctx, nodes, []*apiv1.Node{}, []*appsv1.DaemonSet{}, taints.TaintConfig{}, now)
clusterState := clusterstate.NewClusterStateRegistry(provider, clusterstate.ClusterStateRegistryConfig{}, ctx.LogRecorder, NewBackoff(), nodegroupconfig.NewDefaultNodeGroupConfigProcessor(config.NodeGroupAutoscalingOptions{MaxNodeProvisionTime: 15 * time.Minute}), asyncnodegroups.NewDefaultAsyncNodeGroupStateChecker())
assert.NoError(t, clusterState.UpdateNodes(nodes, nodeInfos, time.Now()))
@ -1485,7 +1485,7 @@ func TestScaleUpBalanceGroups(t *testing.T) {
}
podLister := kube_util.NewTestPodLister(podList)
listers := kube_util.NewListerRegistry(nil, nil, podLister, nil, nil, nil, nil, nil, nil)
listers := kube_util.NewListerRegistry(nil, nil, nil, podLister, nil, nil, nil, nil, nil, nil)
options := config.AutoscalingOptions{
EstimatorName: estimator.BinpackingEstimatorName,
@ -1497,7 +1497,7 @@ func TestScaleUpBalanceGroups(t *testing.T) {
assert.NoError(t, err)
err = context.ClusterSnapshot.SetClusterState(nodes, podList, nil)
assert.NoError(t, err)
nodeInfos, _ := nodeinfosprovider.NewDefaultTemplateNodeInfoProvider(nil, false).Process(&context, nodes, []*appsv1.DaemonSet{}, taints.TaintConfig{}, now)
nodeInfos, _ := nodeinfosprovider.NewDefaultTemplateNodeInfoProvider(nil, false).Process(&context, nodes, []*apiv1.Node{}, []*appsv1.DaemonSet{}, taints.TaintConfig{}, now)
clusterState := clusterstate.NewClusterStateRegistry(provider, clusterstate.ClusterStateRegistryConfig{}, context.LogRecorder, NewBackoff(), nodegroupconfig.NewDefaultNodeGroupConfigProcessor(config.NodeGroupAutoscalingOptions{MaxNodeProvisionTime: 15 * time.Minute}), asyncnodegroups.NewDefaultAsyncNodeGroupStateChecker())
clusterState.UpdateNodes(nodes, nodeInfos, time.Now())
@ -1557,7 +1557,7 @@ func TestScaleUpAutoprovisionedNodeGroup(t *testing.T) {
MaxMemoryTotal: 5000 * 64 * 20,
}
podLister := kube_util.NewTestPodLister([]*apiv1.Pod{})
listers := kube_util.NewListerRegistry(nil, nil, podLister, nil, nil, nil, nil, nil, nil)
listers := kube_util.NewListerRegistry(nil, nil, nil, podLister, nil, nil, nil, nil, nil, nil)
context, err := NewScaleTestAutoscalingContext(options, fakeClient, listers, provider, nil, nil)
assert.NoError(t, err)
@ -1568,7 +1568,7 @@ func TestScaleUpAutoprovisionedNodeGroup(t *testing.T) {
processors.NodeGroupManager = &MockAutoprovisioningNodeGroupManager{T: t, ExtraGroups: 0}
nodes := []*apiv1.Node{}
nodeInfos, _ := nodeinfosprovider.NewDefaultTemplateNodeInfoProvider(nil, false).Process(&context, nodes, []*appsv1.DaemonSet{}, taints.TaintConfig{}, time.Now())
nodeInfos, _ := nodeinfosprovider.NewDefaultTemplateNodeInfoProvider(nil, false).Process(&context, nodes, []*apiv1.Node{}, []*appsv1.DaemonSet{}, taints.TaintConfig{}, time.Now())
suOrchestrator := New()
suOrchestrator.Initialize(&context, processors, clusterState, newEstimatorBuilder(), taints.TaintConfig{})
@ -1608,7 +1608,7 @@ func TestScaleUpBalanceAutoprovisionedNodeGroups(t *testing.T) {
MaxMemoryTotal: 5000 * 64 * 20,
}
podLister := kube_util.NewTestPodLister([]*apiv1.Pod{})
listers := kube_util.NewListerRegistry(nil, nil, podLister, nil, nil, nil, nil, nil, nil)
listers := kube_util.NewListerRegistry(nil, nil, nil, podLister, nil, nil, nil, nil, nil, nil)
context, err := NewScaleTestAutoscalingContext(options, fakeClient, listers, provider, nil, nil)
assert.NoError(t, err)
@ -1619,7 +1619,7 @@ func TestScaleUpBalanceAutoprovisionedNodeGroups(t *testing.T) {
processors.NodeGroupManager = &MockAutoprovisioningNodeGroupManager{T: t, ExtraGroups: 2}
nodes := []*apiv1.Node{}
nodeInfos, _ := nodeinfosprovider.NewDefaultTemplateNodeInfoProvider(nil, false).Process(&context, nodes, []*appsv1.DaemonSet{}, taints.TaintConfig{}, time.Now())
nodeInfos, _ := nodeinfosprovider.NewDefaultTemplateNodeInfoProvider(nil, false).Process(&context, nodes, []*apiv1.Node{}, []*appsv1.DaemonSet{}, taints.TaintConfig{}, time.Now())
suOrchestrator := New()
suOrchestrator.Initialize(&context, processors, clusterState, newEstimatorBuilder(), taints.TaintConfig{})
@ -1638,7 +1638,7 @@ func TestScaleUpBalanceAutoprovisionedNodeGroups(t *testing.T) {
func TestScaleUpToMeetNodeGroupMinSize(t *testing.T) {
podLister := kube_util.NewTestPodLister([]*apiv1.Pod{})
listers := kube_util.NewListerRegistry(nil, nil, podLister, nil, nil, nil, nil, nil, nil)
listers := kube_util.NewListerRegistry(nil, nil, nil, podLister, nil, nil, nil, nil, nil, nil)
provider := testprovider.NewTestCloudProviderBuilder().WithOnScaleUp(func(nodeGroup string, increase int) error {
assert.Equal(t, "ng1", nodeGroup)
assert.Equal(t, 1, increase)
@ -1673,7 +1673,7 @@ func TestScaleUpToMeetNodeGroupMinSize(t *testing.T) {
nodes := []*apiv1.Node{n1, n2}
err = context.ClusterSnapshot.SetClusterState(nodes, nil, nil)
assert.NoError(t, err)
nodeInfos, _ := nodeinfosprovider.NewDefaultTemplateNodeInfoProvider(nil, false).Process(&context, nodes, []*appsv1.DaemonSet{}, taints.TaintConfig{}, time.Now())
nodeInfos, _ := nodeinfosprovider.NewDefaultTemplateNodeInfoProvider(nil, false).Process(&context, nodes, []*apiv1.Node{}, []*appsv1.DaemonSet{}, taints.TaintConfig{}, time.Now())
processors := processorstest.NewTestProcessors(&context)
clusterState := clusterstate.NewClusterStateRegistry(provider, clusterstate.ClusterStateRegistryConfig{}, context.LogRecorder, NewBackoff(), nodegroupconfig.NewDefaultNodeGroupConfigProcessor(config.NodeGroupAutoscalingOptions{MaxNodeProvisionTime: 15 * time.Minute}), asyncnodegroups.NewDefaultAsyncNodeGroupStateChecker())
clusterState.UpdateNodes(nodes, nodeInfos, time.Now())
@ -1756,7 +1756,7 @@ func TestScaleupAsyncNodeGroupsEnabled(t *testing.T) {
AsyncNodeGroupsEnabled: true,
}
podLister := kube_util.NewTestPodLister([]*apiv1.Pod{})
listers := kube_util.NewListerRegistry(nil, nil, podLister, nil, nil, nil, nil, nil, nil)
listers := kube_util.NewListerRegistry(nil, nil, nil, podLister, nil, nil, nil, nil, nil, nil)
context, err := NewScaleTestAutoscalingContext(options, fakeClient, listers, provider, nil, nil)
assert.NoError(t, err)
@ -1768,7 +1768,7 @@ func TestScaleupAsyncNodeGroupsEnabled(t *testing.T) {
processors.AsyncNodeGroupStateChecker = &asyncnodegroups.MockAsyncNodeGroupStateChecker{IsUpcomingNodeGroup: tc.isUpcomingMockMap}
nodes := []*apiv1.Node{}
nodeInfos, _ := nodeinfosprovider.NewDefaultTemplateNodeInfoProvider(nil, false).Process(&context, nodes, []*appsv1.DaemonSet{}, taints.TaintConfig{}, time.Now())
nodeInfos, _ := nodeinfosprovider.NewDefaultTemplateNodeInfoProvider(nil, false).Process(&context, nodes, []*apiv1.Node{}, []*appsv1.DaemonSet{}, taints.TaintConfig{}, time.Now())
suOrchestrator := New()
suOrchestrator.Initialize(&context, processors, clusterState, newEstimatorBuilder(), taints.TaintConfig{})

View File

@ -73,7 +73,7 @@ func TestDeltaForNode(t *testing.T) {
group, nodes := newNodeGroup(t, cp, ng.Name, ng.Min, ng.Max, ng.Size, ng.CPU, ng.Mem)
err := ctx.ClusterSnapshot.SetClusterState(nodes, nil, nil)
assert.NoError(t, err)
nodeInfos, _ := nodeinfosprovider.NewDefaultTemplateNodeInfoProvider(nil, false).Process(&ctx, nodes, []*appsv1.DaemonSet{}, taints.TaintConfig{}, time.Now())
nodeInfos, _ := nodeinfosprovider.NewDefaultTemplateNodeInfoProvider(nil, false).Process(&ctx, nodes, []*corev1.Node{}, []*appsv1.DaemonSet{}, taints.TaintConfig{}, time.Now())
rm := NewManager(processors.CustomResourcesProcessor)
delta, err := rm.DeltaForNode(&ctx, nodeInfos[ng.Name], group)
@ -116,7 +116,7 @@ func TestResourcesLeft(t *testing.T) {
_, nodes := newNodeGroup(t, cp, ng.Name, ng.Min, ng.Max, ng.Size, ng.CPU, ng.Mem)
err := ctx.ClusterSnapshot.SetClusterState(nodes, nil, nil)
assert.NoError(t, err)
nodeInfos, _ := nodeinfosprovider.NewDefaultTemplateNodeInfoProvider(nil, false).Process(&ctx, nodes, []*appsv1.DaemonSet{}, taints.TaintConfig{}, time.Now())
nodeInfos, _ := nodeinfosprovider.NewDefaultTemplateNodeInfoProvider(nil, false).Process(&ctx, nodes, []*corev1.Node{}, []*appsv1.DaemonSet{}, taints.TaintConfig{}, time.Now())
rm := NewManager(processors.CustomResourcesProcessor)
left, err := rm.ResourcesLeft(&ctx, nodeInfos, nodes)
@ -169,7 +169,7 @@ func TestApplyLimits(t *testing.T) {
group, nodes := newNodeGroup(t, cp, ng.Name, ng.Min, ng.Max, ng.Size, ng.CPU, ng.Mem)
err := ctx.ClusterSnapshot.SetClusterState(nodes, nil, nil)
assert.NoError(t, err)
nodeInfos, _ := nodeinfosprovider.NewDefaultTemplateNodeInfoProvider(nil, false).Process(&ctx, nodes, []*appsv1.DaemonSet{}, taints.TaintConfig{}, time.Now())
nodeInfos, _ := nodeinfosprovider.NewDefaultTemplateNodeInfoProvider(nil, false).Process(&ctx, nodes, []*corev1.Node{}, []*appsv1.DaemonSet{}, taints.TaintConfig{}, time.Now())
rm := NewManager(processors.CustomResourcesProcessor)
newCount, err := rm.ApplyLimits(&ctx, testCase.newNodeCount, testCase.resourcesLeft, nodeInfos[testCase.nodeGroupConfig.Name], group)
@ -236,7 +236,7 @@ func TestResourceManagerWithGpuResource(t *testing.T) {
nodes := []*corev1.Node{n1}
err = context.ClusterSnapshot.SetClusterState(nodes, nil, nil)
assert.NoError(t, err)
nodeInfos, _ := nodeinfosprovider.NewDefaultTemplateNodeInfoProvider(nil, false).Process(&context, nodes, []*appsv1.DaemonSet{}, taints.TaintConfig{}, time.Now())
nodeInfos, _ := nodeinfosprovider.NewDefaultTemplateNodeInfoProvider(nil, false).Process(&context, nodes, []*corev1.Node{}, []*appsv1.DaemonSet{}, taints.TaintConfig{}, time.Now())
rm := NewManager(processors.CustomResourcesProcessor)
@ -271,7 +271,7 @@ func newCloudProvider(t *testing.T, cpu, mem int64) *testprovider.TestCloudProvi
func newContext(t *testing.T, provider cloudprovider.CloudProvider) context.AutoscalingContext {
podLister := kube_util.NewTestPodLister([]*corev1.Pod{})
listers := kube_util.NewListerRegistry(nil, nil, podLister, nil, nil, nil, nil, nil, nil)
listers := kube_util.NewListerRegistry(nil, nil, nil, podLister, nil, nil, nil, nil, nil, nil)
context, err := test.NewScaleTestAutoscalingContext(config.AutoscalingOptions{}, &fake.Clientset{}, listers, provider, nil, nil)
assert.NoError(t, err)
return context

View File

@ -286,7 +286,7 @@ func (a *StaticAutoscaler) RunOnce(currentTime time.Time) caerrors.AutoscalerErr
}
// Get nodes and pods currently living on cluster
allNodes, readyNodes, typedErr := a.obtainNodeLists(draSnapshot)
allNodes, readyNodes, readyUnschedulableNodes, typedErr := a.obtainNodeLists(draSnapshot)
if typedErr != nil {
klog.Errorf("Failed to get node list: %v", typedErr)
return typedErr
@ -353,7 +353,7 @@ func (a *StaticAutoscaler) RunOnce(currentTime time.Time) caerrors.AutoscalerErr
return typedErr.AddPrefix("failed to initialize RemainingPdbTracker: ")
}
nodeInfosForGroups, autoscalerError := a.processors.TemplateNodeInfoProvider.Process(autoscalingContext, readyNodes, daemonsets, a.taintConfig, currentTime)
nodeInfosForGroups, autoscalerError := a.processors.TemplateNodeInfoProvider.Process(autoscalingContext, readyNodes, readyUnschedulableNodes, daemonsets, a.taintConfig, currentTime)
if autoscalerError != nil {
klog.Errorf("Failed to get node infos for groups: %v", autoscalerError)
return autoscalerError.AddPrefix("failed to build node infos for node groups: ")
@ -995,16 +995,21 @@ func (a *StaticAutoscaler) ExitCleanUp() {
a.clusterStateRegistry.Stop()
}
func (a *StaticAutoscaler) obtainNodeLists(draSnapshot *drasnapshot.Snapshot) ([]*apiv1.Node, []*apiv1.Node, caerrors.AutoscalerError) {
func (a *StaticAutoscaler) obtainNodeLists(draSnapshot *drasnapshot.Snapshot) ([]*apiv1.Node, []*apiv1.Node, []*apiv1.Node, caerrors.AutoscalerError) {
allNodes, err := a.AllNodeLister().List()
if err != nil {
klog.Errorf("Failed to list all nodes: %v", err)
return nil, nil, caerrors.ToAutoscalerError(caerrors.ApiCallError, err)
return nil, nil, nil, caerrors.ToAutoscalerError(caerrors.ApiCallError, err)
}
readyNodes, err := a.ReadyNodeLister().List()
if err != nil {
klog.Errorf("Failed to list ready nodes: %v", err)
return nil, nil, caerrors.ToAutoscalerError(caerrors.ApiCallError, err)
return nil, nil, nil, caerrors.ToAutoscalerError(caerrors.ApiCallError, err)
}
readyUnschedulableNodes, err := a.ReadyUnschedulableNodeLister().List()
if err != nil {
klog.Errorf("Failed to list ready unschedulable nodes: %v", err)
return nil, nil, nil, caerrors.ToAutoscalerError(caerrors.ApiCallError, err)
}
a.reportTaintsCount(allNodes)
@ -1015,7 +1020,11 @@ func (a *StaticAutoscaler) obtainNodeLists(draSnapshot *drasnapshot.Snapshot) ([
// TODO: Remove this call when we handle dynamically provisioned resources.
allNodes, readyNodes = a.processors.CustomResourcesProcessor.FilterOutNodesWithUnreadyResources(a.AutoscalingContext, allNodes, readyNodes, draSnapshot)
allNodes, readyNodes = taints.FilterOutNodesWithStartupTaints(a.taintConfig, allNodes, readyNodes)
return allNodes, readyNodes, nil
// Filter the ready unschedulable nodes for custom processors and startup taints
_, readyUnschedulableNodes = a.processors.CustomResourcesProcessor.FilterOutNodesWithUnreadyResources(a.AutoscalingContext, []*apiv1.Node{}, readyUnschedulableNodes, draSnapshot)
_, readyUnschedulableNodes = taints.FilterOutNodesWithStartupTaints(a.taintConfig, []*apiv1.Node{}, readyUnschedulableNodes)
return allNodes, readyNodes, readyUnschedulableNodes, nil
}
func filterNodesFromSelectedGroups(cp cloudprovider.CloudProvider, nodes ...*apiv1.Node) []*apiv1.Node {

View File

@ -205,6 +205,7 @@ func (l *fakeAllObjectsLister[T]) ListAll() ([]T, error) {
type commonMocks struct {
readyNodeLister *kube_util.TestNodeLister
readyUnschedNodeLister *kube_util.TestNodeLister
allNodeLister *kube_util.TestNodeLister
allPodLister *podListerMock
podDisruptionBudgetLister *podDisruptionBudgetListerMock
@ -222,6 +223,7 @@ type commonMocks struct {
func newCommonMocks() *commonMocks {
return &commonMocks{
readyNodeLister: kubernetes.NewTestNodeLister(nil),
readyUnschedNodeLister: kubernetes.NewTestNodeLister(nil),
allNodeLister: kubernetes.NewTestNodeLister(nil),
allPodLister: &podListerMock{},
podDisruptionBudgetLister: &podDisruptionBudgetListerMock{},
@ -294,7 +296,7 @@ func setupAutoscaler(config *autoscalerSetupConfig) (*StaticAutoscaler, error) {
// Create all necessary autoscaler dependencies, applying the mocks from config.
processorCallbacks := newStaticAutoscalerProcessorCallbacks()
listerRegistry := kube_util.NewListerRegistry(config.mocks.allNodeLister, config.mocks.readyNodeLister, config.mocks.allPodLister,
listerRegistry := kube_util.NewListerRegistry(config.mocks.allNodeLister, config.mocks.readyNodeLister, config.mocks.readyUnschedNodeLister, config.mocks.allPodLister,
config.mocks.podDisruptionBudgetLister, config.mocks.daemonSetLister, nil, nil, nil, nil)
ctx, err := NewScaleTestAutoscalingContext(config.autoscalingOptions, &fake.Clientset{}, listerRegistry, provider, processorCallbacks, nil)
if err != nil {
@ -343,6 +345,7 @@ func setupAutoscaler(config *autoscalerSetupConfig) (*StaticAutoscaler, error) {
func TestStaticAutoscalerRunOnce(t *testing.T) {
readyNodeLister := kubernetes.NewTestNodeLister(nil)
readyUnschedNodeLister := kubernetes.NewTestNodeLister([]*apiv1.Node{})
allNodeLister := kubernetes.NewTestNodeLister(nil)
allPodListerMock := &podListerMock{}
podDisruptionBudgetListerMock := &podDisruptionBudgetListerMock{}
@ -400,7 +403,7 @@ func TestStaticAutoscalerRunOnce(t *testing.T) {
setUpScaleDownActuator(&context, options)
listerRegistry := kube_util.NewListerRegistry(allNodeLister, readyNodeLister, allPodListerMock, podDisruptionBudgetListerMock, daemonSetListerMock,
listerRegistry := kube_util.NewListerRegistry(allNodeLister, readyNodeLister, readyUnschedNodeLister, allPodListerMock, podDisruptionBudgetListerMock, daemonSetListerMock,
nil, nil, nil, nil)
context.ListerRegistry = listerRegistry
@ -628,6 +631,7 @@ func TestStaticAutoscalerRunOnceWithScaleDownDelayPerNG(t *testing.T) {
for _, tc := range testCases {
t.Run(tc.description, func(t *testing.T) {
readyNodeLister := kubernetes.NewTestNodeLister(nil)
readyUnschedNodeLister := kubernetes.NewTestNodeLister([]*apiv1.Node{})
allNodeLister := kubernetes.NewTestNodeLister(nil)
allPodListerMock := &podListerMock{}
podDisruptionBudgetListerMock := &podDisruptionBudgetListerMock{}
@ -659,7 +663,7 @@ func TestStaticAutoscalerRunOnceWithScaleDownDelayPerNG(t *testing.T) {
setUpScaleDownActuator(&context, options)
listerRegistry := kube_util.NewListerRegistry(allNodeLister, readyNodeLister, allPodListerMock, podDisruptionBudgetListerMock, daemonSetListerMock,
listerRegistry := kube_util.NewListerRegistry(allNodeLister, readyNodeLister, readyUnschedNodeLister, allPodListerMock, podDisruptionBudgetListerMock, daemonSetListerMock,
nil, nil, nil, nil)
context.ListerRegistry = listerRegistry
@ -740,6 +744,7 @@ func TestStaticAutoscalerRunOnceWithScaleDownDelayPerNG(t *testing.T) {
func TestStaticAutoscalerRunOnceWithAutoprovisionedEnabled(t *testing.T) {
readyNodeLister := kubernetes.NewTestNodeLister(nil)
readyUnschedNodeLister := kubernetes.NewTestNodeLister([]*apiv1.Node{})
allNodeLister := kubernetes.NewTestNodeLister(nil)
allPodListerMock := &podListerMock{}
podDisruptionBudgetListerMock := &podDisruptionBudgetListerMock{}
@ -811,7 +816,7 @@ func TestStaticAutoscalerRunOnceWithAutoprovisionedEnabled(t *testing.T) {
processors.NodeGroupManager = nodeGroupManager
processors.NodeGroupListProcessor = nodeGroupListProcessor
listerRegistry := kube_util.NewListerRegistry(allNodeLister, readyNodeLister, allPodListerMock,
listerRegistry := kube_util.NewListerRegistry(allNodeLister, readyNodeLister, readyUnschedNodeLister, allPodListerMock,
podDisruptionBudgetListerMock, daemonSetListerMock,
nil, nil, nil, nil)
context.ListerRegistry = listerRegistry
@ -893,6 +898,7 @@ func TestStaticAutoscalerRunOnceWithALongUnregisteredNode(t *testing.T) {
for _, forceDeleteLongUnregisteredNodes := range []bool{false, true} {
t.Run(fmt.Sprintf("forceDeleteLongUnregisteredNodes=%v", forceDeleteLongUnregisteredNodes), func(t *testing.T) {
readyNodeLister := kubernetes.NewTestNodeLister(nil)
readyUnschedNodeLister := kubernetes.NewTestNodeLister([]*apiv1.Node{})
allNodeLister := kubernetes.NewTestNodeLister(nil)
allPodListerMock := &podListerMock{}
podDisruptionBudgetListerMock := &podDisruptionBudgetListerMock{}
@ -954,7 +960,7 @@ func TestStaticAutoscalerRunOnceWithALongUnregisteredNode(t *testing.T) {
setUpScaleDownActuator(&context, options)
listerRegistry := kube_util.NewListerRegistry(allNodeLister, readyNodeLister, allPodListerMock,
listerRegistry := kube_util.NewListerRegistry(allNodeLister, readyNodeLister, readyUnschedNodeLister, allPodListerMock,
podDisruptionBudgetListerMock, daemonSetListerMock,
nil, nil, nil, nil)
context.ListerRegistry = listerRegistry
@ -1031,6 +1037,7 @@ func TestStaticAutoscalerRunOnceWithALongUnregisteredNode(t *testing.T) {
func TestStaticAutoscalerRunOncePodsWithPriorities(t *testing.T) {
readyNodeLister := kubernetes.NewTestNodeLister(nil)
readyUnschedNodeLister := kubernetes.NewTestNodeLister([]*apiv1.Node{})
allNodeLister := kubernetes.NewTestNodeLister(nil)
allPodListerMock := &podListerMock{}
podDisruptionBudgetListerMock := &podDisruptionBudgetListerMock{}
@ -1118,7 +1125,7 @@ func TestStaticAutoscalerRunOncePodsWithPriorities(t *testing.T) {
setUpScaleDownActuator(&context, options)
listerRegistry := kube_util.NewListerRegistry(allNodeLister, readyNodeLister, allPodListerMock,
listerRegistry := kube_util.NewListerRegistry(allNodeLister, readyNodeLister, readyUnschedNodeLister, allPodListerMock,
podDisruptionBudgetListerMock, daemonSetListerMock,
nil, nil, nil, nil)
context.ListerRegistry = listerRegistry
@ -1192,6 +1199,7 @@ func TestStaticAutoscalerRunOncePodsWithPriorities(t *testing.T) {
func TestStaticAutoscalerRunOnceWithFilteringOnBinPackingEstimator(t *testing.T) {
readyNodeLister := kubernetes.NewTestNodeLister(nil)
readyUnschedNodeLister := kubernetes.NewTestNodeLister([]*apiv1.Node{})
allNodeLister := kubernetes.NewTestNodeLister(nil)
allPodListerMock := &podListerMock{}
podDisruptionBudgetListerMock := &podDisruptionBudgetListerMock{}
@ -1249,7 +1257,7 @@ func TestStaticAutoscalerRunOnceWithFilteringOnBinPackingEstimator(t *testing.T)
setUpScaleDownActuator(&context, options)
listerRegistry := kube_util.NewListerRegistry(allNodeLister, readyNodeLister, allPodListerMock,
listerRegistry := kube_util.NewListerRegistry(allNodeLister, readyNodeLister, readyUnschedNodeLister, allPodListerMock,
podDisruptionBudgetListerMock, daemonSetListerMock,
nil, nil, nil, nil)
context.ListerRegistry = listerRegistry
@ -1290,6 +1298,7 @@ func TestStaticAutoscalerRunOnceWithFilteringOnBinPackingEstimator(t *testing.T)
func TestStaticAutoscalerRunOnceWithFilteringOnUpcomingNodesEnabledNoScaleUp(t *testing.T) {
readyNodeLister := kubernetes.NewTestNodeLister(nil)
readyUnschedNodeLister := kubernetes.NewTestNodeLister([]*apiv1.Node{})
allNodeLister := kubernetes.NewTestNodeLister(nil)
allPodListerMock := &podListerMock{}
podDisruptionBudgetListerMock := &podDisruptionBudgetListerMock{}
@ -1347,7 +1356,7 @@ func TestStaticAutoscalerRunOnceWithFilteringOnUpcomingNodesEnabledNoScaleUp(t *
setUpScaleDownActuator(&context, options)
listerRegistry := kube_util.NewListerRegistry(allNodeLister, readyNodeLister, allPodListerMock,
listerRegistry := kube_util.NewListerRegistry(allNodeLister, readyNodeLister, readyUnschedNodeLister, allPodListerMock,
podDisruptionBudgetListerMock, daemonSetListerMock,
nil, nil, nil, nil)
context.ListerRegistry = listerRegistry
@ -2240,9 +2249,10 @@ func TestStaticAutoscalerUpcomingScaleDownCandidates(t *testing.T) {
// Create fake listers for the generated nodes, nothing returned by the rest (but the ones used in the tested path have to be defined).
allNodeLister := kubernetes.NewTestNodeLister(allNodes)
readyNodeLister := kubernetes.NewTestNodeLister(readyNodes)
readyUnschedNodeLister := kubernetes.NewTestNodeLister([]*apiv1.Node{})
daemonSetLister, err := kubernetes.NewTestDaemonSetLister(nil)
assert.NoError(t, err)
listerRegistry := kube_util.NewListerRegistry(allNodeLister, readyNodeLister,
listerRegistry := kube_util.NewListerRegistry(allNodeLister, readyNodeLister, readyUnschedNodeLister,
kubernetes.NewTestPodLister(nil),
kubernetes.NewTestPodDisruptionBudgetLister(nil), daemonSetLister, nil, nil, nil, nil)
@ -3035,10 +3045,11 @@ func buildStaticAutoscaler(t *testing.T, provider cloudprovider.CloudProvider, a
allNodeLister := kubernetes.NewTestNodeLister(allNodes)
readyNodeLister := kubernetes.NewTestNodeLister(readyNodes)
readyUnschedNodeLister := kubernetes.NewTestNodeLister([]*apiv1.Node{})
daemonSetLister, err := kubernetes.NewTestDaemonSetLister(nil)
assert.NoError(t, err)
listerRegistry := kube_util.NewListerRegistry(allNodeLister, readyNodeLister,
listerRegistry := kube_util.NewListerRegistry(allNodeLister, readyNodeLister, readyUnschedNodeLister,
kubernetes.NewTestPodLister(nil),
kubernetes.NewTestPodDisruptionBudgetLister(nil), daemonSetLister, nil, nil, nil, nil)

View File

@ -47,8 +47,8 @@ func NewCustomAnnotationNodeInfoProvider(templateNodeInfoProvider TemplateNodeIn
}
// Process returns the nodeInfos set for this cluster.
func (p *AnnotationNodeInfoProvider) Process(ctx *context.AutoscalingContext, nodes []*apiv1.Node, daemonsets []*appsv1.DaemonSet, taintConfig taints.TaintConfig, currentTime time.Time) (map[string]*framework.NodeInfo, errors.AutoscalerError) {
nodeInfos, err := p.templateNodeInfoProvider.Process(ctx, nodes, daemonsets, taintConfig, currentTime)
func (p *AnnotationNodeInfoProvider) Process(ctx *context.AutoscalingContext, nodes []*apiv1.Node, unschedulableNodes []*apiv1.Node, daemonsets []*appsv1.DaemonSet, taintConfig taints.TaintConfig, currentTime time.Time) (map[string]*framework.NodeInfo, errors.AutoscalerError) {
nodeInfos, err := p.templateNodeInfoProvider.Process(ctx, nodes, unschedulableNodes, daemonsets, taintConfig, currentTime)
if err != nil {
return nil, err
}

View File

@ -40,8 +40,8 @@ func NewAsgTagResourceNodeInfoProvider(t *time.Duration, forceDaemonSets bool) *
}
// Process returns the nodeInfos set for this cluster.
func (p *AsgTagResourceNodeInfoProvider) Process(ctx *context.AutoscalingContext, nodes []*apiv1.Node, daemonsets []*appsv1.DaemonSet, taintConfig taints.TaintConfig, currentTime time.Time) (map[string]*framework.NodeInfo, errors.AutoscalerError) {
nodeInfos, err := p.mixedTemplateNodeInfoProvider.Process(ctx, nodes, daemonsets, taintConfig, currentTime)
func (p *AsgTagResourceNodeInfoProvider) Process(ctx *context.AutoscalingContext, nodes []*apiv1.Node, unschedulableNodes []*apiv1.Node, daemonsets []*appsv1.DaemonSet, taintConfig taints.TaintConfig, currentTime time.Time) (map[string]*framework.NodeInfo, errors.AutoscalerError) {
nodeInfos, err := p.mixedTemplateNodeInfoProvider.Process(ctx, nodes, unschedulableNodes, daemonsets, taintConfig, currentTime)
if err != nil {
return nil, err
}

View File

@ -72,7 +72,7 @@ func (p *MixedTemplateNodeInfoProvider) CleanUp() {
}
// Process returns the nodeInfos set for this cluster
func (p *MixedTemplateNodeInfoProvider) Process(ctx *context.AutoscalingContext, nodes []*apiv1.Node, daemonsets []*appsv1.DaemonSet, taintConfig taints.TaintConfig, now time.Time) (map[string]*framework.NodeInfo, caerror.AutoscalerError) {
func (p *MixedTemplateNodeInfoProvider) Process(ctx *context.AutoscalingContext, nodes []*apiv1.Node, unschedulableNodes []*apiv1.Node, daemonsets []*appsv1.DaemonSet, taintConfig taints.TaintConfig, now time.Time) (map[string]*framework.NodeInfo, caerror.AutoscalerError) {
// TODO(mwielgus): This returns map keyed by url, while most code (including scheduler) uses node.Name for a key.
// TODO(mwielgus): Review error policy - sometimes we may continue with partial errors.
result := make(map[string]*framework.NodeInfo)
@ -156,7 +156,8 @@ func (p *MixedTemplateNodeInfoProvider) Process(ctx *context.AutoscalingContext,
}
// Last resort - unready/unschedulable nodes.
for _, node := range nodes {
// we want to check not only the ready nodes, but also ready unschedulable nodes.
for _, node := range append(nodes, unschedulableNodes...) {
// Allowing broken nodes
if isNodeGoodTemplateCandidate(node, now) {
continue

View File

@ -81,7 +81,7 @@ func TestGetNodeInfosForGroups(t *testing.T) {
provider2.AddNodeGroup("ng7", 1, 10, 1) // Nodegroup without nodes.
podLister := kube_util.NewTestPodLister([]*apiv1.Pod{})
registry := kube_util.NewListerRegistry(nil, nil, podLister, nil, nil, nil, nil, nil, nil)
registry := kube_util.NewListerRegistry(nil, nil, nil, podLister, nil, nil, nil, nil, nil, nil)
nodes := []*apiv1.Node{justReady5, unready4, unready3, ready2, ready1, ready7, readyToBeDeleted6}
snapshot := testsnapshot.NewTestSnapshotOrDie(t)
@ -95,7 +95,7 @@ func TestGetNodeInfosForGroups(t *testing.T) {
ListerRegistry: registry,
},
}
res, err := NewMixedTemplateNodeInfoProvider(&cacheTtl, false).Process(&ctx, nodes, []*appsv1.DaemonSet{}, taints.TaintConfig{}, now)
res, err := NewMixedTemplateNodeInfoProvider(&cacheTtl, false).Process(&ctx, nodes, []*apiv1.Node{}, []*appsv1.DaemonSet{}, taints.TaintConfig{}, now)
assert.NoError(t, err)
assert.Equal(t, 6, len(res))
info, found := res["ng1"]
@ -125,7 +125,7 @@ func TestGetNodeInfosForGroups(t *testing.T) {
ListerRegistry: registry,
},
}
res, err = NewMixedTemplateNodeInfoProvider(&cacheTtl, false).Process(&ctx, []*apiv1.Node{}, []*appsv1.DaemonSet{}, taints.TaintConfig{}, now)
res, err = NewMixedTemplateNodeInfoProvider(&cacheTtl, false).Process(&ctx, []*apiv1.Node{}, []*apiv1.Node{}, []*appsv1.DaemonSet{}, taints.TaintConfig{}, now)
assert.NoError(t, err)
assert.Equal(t, 0, len(res))
}
@ -168,7 +168,7 @@ func TestGetNodeInfosForGroupsCache(t *testing.T) {
provider1.AddNode("ng4", ready6)
podLister := kube_util.NewTestPodLister([]*apiv1.Pod{})
registry := kube_util.NewListerRegistry(nil, nil, podLister, nil, nil, nil, nil, nil, nil)
registry := kube_util.NewListerRegistry(nil, nil, nil, podLister, nil, nil, nil, nil, nil, nil)
nodes := []*apiv1.Node{unready4, unready3, ready2, ready1}
snapshot := testsnapshot.NewTestSnapshotOrDie(t)
@ -184,7 +184,7 @@ func TestGetNodeInfosForGroupsCache(t *testing.T) {
},
}
niProcessor := NewMixedTemplateNodeInfoProvider(&cacheTtl, false)
res, err := niProcessor.Process(&ctx, nodes, []*appsv1.DaemonSet{}, taints.TaintConfig{}, now)
res, err := niProcessor.Process(&ctx, nodes, []*apiv1.Node{}, []*appsv1.DaemonSet{}, taints.TaintConfig{}, now)
assert.NoError(t, err)
// Check results
assert.Equal(t, 4, len(res))
@ -218,7 +218,7 @@ func TestGetNodeInfosForGroupsCache(t *testing.T) {
assert.Equal(t, "ng3", lastDeletedGroup)
// Check cache with all nodes removed
res, err = niProcessor.Process(&ctx, nodes, []*appsv1.DaemonSet{}, taints.TaintConfig{}, now)
res, err = niProcessor.Process(&ctx, nodes, []*apiv1.Node{}, []*appsv1.DaemonSet{}, taints.TaintConfig{}, now)
assert.NoError(t, err)
// Check results
assert.Equal(t, 2, len(res))
@ -239,7 +239,7 @@ func TestGetNodeInfosForGroupsCache(t *testing.T) {
// Fill cache manually
infoNg4Node6 := framework.NewTestNodeInfo(ready6.DeepCopy())
niProcessor.nodeInfoCache = map[string]cacheItem{"ng4": {NodeInfo: infoNg4Node6, added: now}}
res, err = niProcessor.Process(&ctx, nodes, []*appsv1.DaemonSet{}, taints.TaintConfig{}, now)
res, err = niProcessor.Process(&ctx, nodes, []*apiv1.Node{}, []*appsv1.DaemonSet{}, taints.TaintConfig{}, now)
// Check if cache was used
assert.NoError(t, err)
assert.Equal(t, 2, len(res))
@ -259,7 +259,7 @@ func TestGetNodeInfosCacheExpired(t *testing.T) {
// Cloud provider with TemplateNodeInfo not implemented.
provider := testprovider.NewTestCloudProviderBuilder().Build()
podLister := kube_util.NewTestPodLister([]*apiv1.Pod{})
registry := kube_util.NewListerRegistry(nil, nil, podLister, nil, nil, nil, nil, nil, nil)
registry := kube_util.NewListerRegistry(nil, nil, nil, podLister, nil, nil, nil, nil, nil, nil)
nodes := []*apiv1.Node{ready1}
snapshot := testsnapshot.NewTestSnapshotOrDie(t)
@ -285,7 +285,7 @@ func TestGetNodeInfosCacheExpired(t *testing.T) {
provider.AddNode("ng1", ready1)
assert.Equal(t, 2, len(niProcessor1.nodeInfoCache))
_, err = niProcessor1.Process(&ctx, nodes, []*appsv1.DaemonSet{}, taints.TaintConfig{}, now)
_, err = niProcessor1.Process(&ctx, nodes, []*apiv1.Node{}, []*appsv1.DaemonSet{}, taints.TaintConfig{}, now)
assert.NoError(t, err)
assert.Equal(t, 1, len(niProcessor1.nodeInfoCache))
@ -296,7 +296,7 @@ func TestGetNodeInfosCacheExpired(t *testing.T) {
"ng2": {NodeInfo: tni, added: now.Add(-2 * time.Second)},
}
assert.Equal(t, 2, len(niProcessor2.nodeInfoCache))
_, err = niProcessor1.Process(&ctx, nodes, []*appsv1.DaemonSet{}, taints.TaintConfig{}, now)
_, err = niProcessor1.Process(&ctx, nodes, []*apiv1.Node{}, []*appsv1.DaemonSet{}, taints.TaintConfig{}, now)
assert.NoError(t, err)
assert.Equal(t, 2, len(niProcessor2.nodeInfoCache))
@ -319,7 +319,7 @@ func TestProcessHandlesTemplateNodeInfoErrors(t *testing.T) {
ClusterSnapshot: testsnapshot.NewTestSnapshotOrDie(t),
}
res, err := NewMixedTemplateNodeInfoProvider(&cacheTtl, false).Process(&ctx, []*apiv1.Node{}, []*appsv1.DaemonSet{}, taints.TaintConfig{}, now)
res, err := NewMixedTemplateNodeInfoProvider(&cacheTtl, false).Process(&ctx, []*apiv1.Node{}, []*apiv1.Node{}, []*appsv1.DaemonSet{}, taints.TaintConfig{}, now)
// Should not fail despite ng1 error - continues processing
assert.NoError(t, err)

View File

@ -31,7 +31,7 @@ import (
// TemplateNodeInfoProvider is provides the initial nodeInfos set.
type TemplateNodeInfoProvider interface {
// Process returns a map of nodeInfos for node groups.
Process(ctx *context.AutoscalingContext, nodes []*apiv1.Node, daemonsets []*appsv1.DaemonSet, taintConfig taints.TaintConfig, currentTime time.Time) (map[string]*framework.NodeInfo, errors.AutoscalerError)
Process(ctx *context.AutoscalingContext, nodes []*apiv1.Node, unschedulableNodes []*apiv1.Node, daemonsets []*appsv1.DaemonSet, taintConfig taints.TaintConfig, currentTime time.Time) (map[string]*framework.NodeInfo, errors.AutoscalerError)
// CleanUp cleans up processor's internal structures.
CleanUp()
}

View File

@ -121,7 +121,7 @@ func TestTargetCountInjectionPodListProcessor(t *testing.T) {
assert.NoError(t, err)
ctx := context.AutoscalingContext{
AutoscalingKubeClients: context.AutoscalingKubeClients{
ListerRegistry: kubernetes.NewListerRegistry(nil, nil, nil, nil, nil, nil, jobLister, replicaSetLister, statefulsetLister),
ListerRegistry: kubernetes.NewListerRegistry(nil, nil, nil, nil, nil, nil, nil, jobLister, replicaSetLister, statefulsetLister),
},
ClusterSnapshot: clusterSnapshot,
}
@ -285,7 +285,7 @@ func TestGroupPods(t *testing.T) {
ctx := context.AutoscalingContext{
AutoscalingKubeClients: context.AutoscalingKubeClients{
ListerRegistry: kubernetes.NewListerRegistry(nil, nil, nil, nil, nil, nil, jobLister, replicaSetLister, statefulsetLister),
ListerRegistry: kubernetes.NewListerRegistry(nil, nil, nil, nil, nil, nil, nil, jobLister, replicaSetLister, statefulsetLister),
},
}
controllers := listControllers(&ctx)

View File

@ -480,7 +480,7 @@ func setupTest(t *testing.T, client *provreqclient.ProvisioningRequestClient, no
}
podLister := kube_util.NewTestPodLister(nil)
listers := kube_util.NewListerRegistry(nil, nil, podLister, nil, nil, nil, nil, nil, nil)
listers := kube_util.NewListerRegistry(nil, nil, nil, podLister, nil, nil, nil, nil, nil, nil)
options := config.AutoscalingOptions{}
if batchProcessing {
@ -498,7 +498,7 @@ func setupTest(t *testing.T, client *provreqclient.ProvisioningRequestClient, no
processors.NodeGroupListProcessor = &MockAutoprovisioningNodeGroupListProcessor{T: t}
processors.NodeGroupManager = &MockAutoprovisioningNodeGroupManager{T: t, ExtraGroups: 2}
}
nodeInfos, err := nodeinfosprovider.NewDefaultTemplateNodeInfoProvider(nil, false).Process(&autoscalingContext, nodes, []*appsv1.DaemonSet{}, taints.TaintConfig{}, now)
nodeInfos, err := nodeinfosprovider.NewDefaultTemplateNodeInfoProvider(nil, false).Process(&autoscalingContext, nodes, []*apiv1.Node{}, []*appsv1.DaemonSet{}, taints.TaintConfig{}, now)
assert.NoError(t, err)
estimatorBuilder, _ := estimator.NewEstimatorBuilder(

View File

@ -108,7 +108,7 @@ func TestFindNodesToRemove(t *testing.T) {
}
rsLister, err := kube_util.NewTestReplicaSetLister(replicaSets)
assert.NoError(t, err)
registry := kube_util.NewListerRegistry(nil, nil, nil, nil, nil, nil, nil, rsLister, nil)
registry := kube_util.NewListerRegistry(nil, nil, nil, nil, nil, nil, nil, nil, rsLister, nil)
ownerRefs := GenerateOwnerReferences("rs", "ReplicaSet", "extensions/v1beta1", "")

View File

@ -806,7 +806,7 @@ func TestGetPodsToMove(t *testing.T) {
ssLister, err := kube_util.NewTestStatefulSetLister([]*appsv1.StatefulSet{&statefulset})
assert.NoError(t, err)
registry = kube_util.NewListerRegistry(nil, nil, nil, nil, dsLister, rcLister, jobLister, rsLister, ssLister)
registry = kube_util.NewListerRegistry(nil, nil, nil, nil, nil, dsLister, rcLister, jobLister, rsLister, ssLister)
}
deleteOptions := options.NodeDeleteOptions{

View File

@ -329,7 +329,7 @@ func TestDrainable(t *testing.T) {
ssLister, err := kube_util.NewTestStatefulSetLister([]*appsv1.StatefulSet{&statefulset})
assert.NoError(t, err)
registry := kube_util.NewListerRegistry(nil, nil, nil, nil, dsLister, rcLister, jobLister, rsLister, ssLister)
registry := kube_util.NewListerRegistry(nil, nil, nil, nil, nil, dsLister, rcLister, jobLister, rsLister, ssLister)
drainCtx := &drainability.DrainContext{
Listers: registry,

View File

@ -225,7 +225,7 @@ func TestDrainable(t *testing.T) {
ssLister, err := kube_util.NewTestStatefulSetLister([]*appsv1.StatefulSet{&statefulset})
assert.NoError(t, err)
registry := kube_util.NewListerRegistry(nil, nil, nil, nil, dsLister, rcLister, jobLister, rsLister, ssLister)
registry := kube_util.NewListerRegistry(nil, nil, nil, nil, nil, dsLister, rcLister, jobLister, rsLister, ssLister)
drainCtx := &drainability.DrainContext{
Listers: registry,

View File

@ -115,6 +115,8 @@ func createSanitizedNode(node *apiv1.Node, newName string, taintConfig *taints.T
}
newNode.Labels[apiv1.LabelHostname] = newName
newNode.Spec.Unschedulable = false
if taintConfig != nil {
newNode.Spec.Taints = taints.SanitizeTaints(newNode.Spec.Taints, *taintConfig)
}

View File

@ -37,6 +37,7 @@ import (
type ListerRegistry interface {
AllNodeLister() NodeLister
ReadyNodeLister() NodeLister
ReadyUnschedulableNodeLister() NodeLister
AllPodLister() PodLister
PodDisruptionBudgetLister() PodDisruptionBudgetLister
DaemonSetLister() v1appslister.DaemonSetLister
@ -47,32 +48,34 @@ type ListerRegistry interface {
}
type listerRegistryImpl struct {
allNodeLister NodeLister
readyNodeLister NodeLister
allPodLister PodLister
podDisruptionBudgetLister PodDisruptionBudgetLister
daemonSetLister v1appslister.DaemonSetLister
replicationControllerLister v1lister.ReplicationControllerLister
jobLister v1batchlister.JobLister
replicaSetLister v1appslister.ReplicaSetLister
statefulSetLister v1appslister.StatefulSetLister
allNodeLister NodeLister
readyNodeLister NodeLister
readyUnschedulableNodeLister NodeLister
allPodLister PodLister
podDisruptionBudgetLister PodDisruptionBudgetLister
daemonSetLister v1appslister.DaemonSetLister
replicationControllerLister v1lister.ReplicationControllerLister
jobLister v1batchlister.JobLister
replicaSetLister v1appslister.ReplicaSetLister
statefulSetLister v1appslister.StatefulSetLister
}
// NewListerRegistry returns a registry providing various listers to list pods or nodes matching conditions
func NewListerRegistry(allNode NodeLister, readyNode NodeLister, allPodLister PodLister, podDisruptionBudgetLister PodDisruptionBudgetLister,
func NewListerRegistry(allNode NodeLister, readyNode NodeLister, readyUnschedulableNode NodeLister, allPodLister PodLister, podDisruptionBudgetLister PodDisruptionBudgetLister,
daemonSetLister v1appslister.DaemonSetLister, replicationControllerLister v1lister.ReplicationControllerLister,
jobLister v1batchlister.JobLister, replicaSetLister v1appslister.ReplicaSetLister,
statefulSetLister v1appslister.StatefulSetLister) ListerRegistry {
return listerRegistryImpl{
allNodeLister: allNode,
readyNodeLister: readyNode,
allPodLister: allPodLister,
podDisruptionBudgetLister: podDisruptionBudgetLister,
daemonSetLister: daemonSetLister,
replicationControllerLister: replicationControllerLister,
jobLister: jobLister,
replicaSetLister: replicaSetLister,
statefulSetLister: statefulSetLister,
allNodeLister: allNode,
readyNodeLister: readyNode,
readyUnschedulableNodeLister: readyUnschedulableNode,
allPodLister: allPodLister,
podDisruptionBudgetLister: podDisruptionBudgetLister,
daemonSetLister: daemonSetLister,
replicationControllerLister: replicationControllerLister,
jobLister: jobLister,
replicaSetLister: replicaSetLister,
statefulSetLister: statefulSetLister,
}
}
@ -80,6 +83,7 @@ func NewListerRegistry(allNode NodeLister, readyNode NodeLister, allPodLister Po
func NewListerRegistryWithDefaultListers(informerFactory informers.SharedInformerFactory) ListerRegistry {
allPodLister := NewAllPodLister(informerFactory.Core().V1().Pods().Lister())
readyNodeLister := NewReadyNodeLister(informerFactory.Core().V1().Nodes().Lister())
readyUnschedulableNodeLister := NewReadyUnschedulableNodeLister(informerFactory.Core().V1().Nodes().Lister())
allNodeLister := NewAllNodeLister(informerFactory.Core().V1().Nodes().Lister())
podDisruptionBudgetLister := NewPodDisruptionBudgetLister(informerFactory.Policy().V1().PodDisruptionBudgets().Lister())
@ -88,7 +92,7 @@ func NewListerRegistryWithDefaultListers(informerFactory informers.SharedInforme
jobLister := informerFactory.Batch().V1().Jobs().Lister()
replicaSetLister := informerFactory.Apps().V1().ReplicaSets().Lister()
statefulSetLister := informerFactory.Apps().V1().StatefulSets().Lister()
return NewListerRegistry(allNodeLister, readyNodeLister, allPodLister,
return NewListerRegistry(allNodeLister, readyNodeLister, readyUnschedulableNodeLister, allPodLister,
podDisruptionBudgetLister, daemonSetLister, replicationControllerLister,
jobLister, replicaSetLister, statefulSetLister)
}
@ -108,6 +112,11 @@ func (r listerRegistryImpl) ReadyNodeLister() NodeLister {
return r.readyNodeLister
}
// ReadyUnschedulableNodeLister returns the ReadyUnschedulableNodeLister registered to this registry
func (r listerRegistryImpl) ReadyUnschedulableNodeLister() NodeLister {
return r.readyUnschedulableNodeLister
}
// PodDisruptionBudgetLister returns the podDisruptionBudgetLister registered to this registry
func (r listerRegistryImpl) PodDisruptionBudgetLister() PodDisruptionBudgetLister {
return r.podDisruptionBudgetLister
@ -266,6 +275,11 @@ func NewReadyNodeLister(nl v1lister.NodeLister) NodeLister {
return NewNodeLister(nl, IsNodeReadyAndSchedulable)
}
// NewReadyUnschedulableNodeLister builds a node lister that returns only ready nodes that are also unschedulable.
func NewReadyUnschedulableNodeLister(nl v1lister.NodeLister) NodeLister {
return NewNodeLister(nl, IsNodeReadyAndUnschedulable)
}
// NewNodeLister builds a node lister.
func NewNodeLister(nl v1lister.NodeLister, filter func(*apiv1.Node) bool) NodeLister {
return &nodeListerImpl{

View File

@ -41,12 +41,20 @@ const (
StartupNodes NodeNotReadyReason = "cluster-autoscaler.kubernetes.io/startup-taint"
)
// IsNodeReadyAndSchedulable returns true if the node is ready and schedulable.
func IsNodeReadyAndSchedulable(node *apiv1.Node) bool {
// IsNodeReady returns true if the node is ready.
func IsNodeReady(node *apiv1.Node) bool {
ready, _, _ := GetReadinessState(node)
if !ready {
return false
}
return true
}
// IsNodeReadyAndSchedulable returns true if the node is ready and schedulable.
func IsNodeReadyAndSchedulable(node *apiv1.Node) bool {
if !IsNodeReady(node) {
return false
}
// Ignore nodes that are marked unschedulable
if node.Spec.Unschedulable {
return false
@ -54,6 +62,18 @@ func IsNodeReadyAndSchedulable(node *apiv1.Node) bool {
return true
}
// IsNodeReadyAndUnschedulable returns true if the node is ready and has its .spec.unschedulable set true.
func IsNodeReadyAndUnschedulable(node *apiv1.Node) bool {
if !IsNodeReady(node) {
return false
}
// Ignore nodes that are marked schedulable
if !node.Spec.Unschedulable {
return false
}
return true
}
// NodeReadiness represents the last known node readiness.
type NodeReadiness struct {
// Is the node ready or not.