Update static_autoscaler tests & handle pod list processors errors as warnings

This commit is contained in:
Mahmoud Atwa 2023-11-21 19:03:31 +00:00
parent a1ae4d3b57
commit 5115f1263e
3 changed files with 152 additions and 153 deletions

View File

@ -17,6 +17,8 @@ limitations under the License.
package podlistprocessor
import (
"fmt"
apiv1 "k8s.io/api/core/v1"
"k8s.io/autoscaler/cluster-autoscaler/context"
core_utils "k8s.io/autoscaler/cluster-autoscaler/core/utils"
@ -36,8 +38,7 @@ func NewFilterOutExpendablePodListProcessor() *filterOutExpendable {
func (p *filterOutExpendable) Process(context *context.AutoscalingContext, pods []*apiv1.Pod) ([]*apiv1.Pod, error) {
nodes, err := context.AllNodeLister().List()
if err != nil {
klog.Warningf("Failed to list all nodes while filtering expendable: %v", err)
return nil, err
return nil, fmt.Errorf("Failed to list all nodes while filtering expendable pods: %v", err)
}
expendablePodsPriorityCutoff := context.AutoscalingOptions.ExpendablePodsPriorityCutoff

View File

@ -506,7 +506,11 @@ func (a *StaticAutoscaler) RunOnce(currentTime time.Time) caerrors.AutoscalerErr
a.AutoscalingContext.DebuggingSnapshotter.SetClusterNodes(l)
}
unschedulablePodsToHelp, _ := a.processors.PodListProcessor.Process(a.AutoscalingContext, unschedulablePods)
unschedulablePodsToHelp, err := a.processors.PodListProcessor.Process(a.AutoscalingContext, unschedulablePods)
if err != nil {
klog.Warningf("Failed to process unschedulable pods: %v", err)
}
// finally, filter out pods that are too "young" to safely be considered for a scale-up (delay is configurable)
unschedulablePodsToHelp = a.filterOutYoungPods(unschedulablePodsToHelp, currentTime)

View File

@ -42,6 +42,7 @@ import (
core_utils "k8s.io/autoscaler/cluster-autoscaler/core/utils"
"k8s.io/autoscaler/cluster-autoscaler/estimator"
ca_processors "k8s.io/autoscaler/cluster-autoscaler/processors"
"k8s.io/autoscaler/cluster-autoscaler/processors/callbacks"
"k8s.io/autoscaler/cluster-autoscaler/processors/nodegroupconfig"
"k8s.io/autoscaler/cluster-autoscaler/simulator"
"k8s.io/autoscaler/cluster-autoscaler/simulator/clustersnapshot"
@ -167,83 +168,111 @@ type scaleCall struct {
ng string
delta int
}
type testCase struct {
nodeGroups []*nodeGroup
pods []*apiv1.Pod
podListerCallTimes int
daemonSets []*appsv1.DaemonSet
daemonSetListerCallTimes int
pdbs []*policyv1.PodDisruptionBudget
pdbListerCallTimes int
expectedScaleUps []*scaleCall
expectedScaleDowns []*scaleCall
now time.Time
lastScaleupTime time.Time
lastScaleDownFailTime time.Time
runAutoscalerAt time.Time
autoscalingOptions config.AutoscalingOptions
OkTotalUnreadyCount int
type commonMocks struct {
readyNodeLister *kube_util.TestNodeLister
allNodeLister *kube_util.TestNodeLister
allPodLister *podListerMock
podDisruptionBudgetLister *podDisruptionBudgetListerMock
daemonSetLister *daemonSetListerMock
onScaleUp *onScaleUpMock
onScaleDown *onScaleDownMock
}
func testAutoscaler(t *testing.T, tc testCase) {
readyNodeLister := kubernetes.NewTestNodeLister(nil)
allNodeLister := kubernetes.NewTestNodeLister(nil)
allPodListerMock := &podListerMock{}
podDisruptionBudgetListerMock := &podDisruptionBudgetListerMock{}
daemonSetListerMock := &daemonSetListerMock{}
onScaleUpMock := &onScaleUpMock{}
onScaleDownMock := &onScaleDownMock{}
deleteFinished := make(chan bool, 1)
func newCommonMocks() *commonMocks {
return &commonMocks{
readyNodeLister: kubernetes.NewTestNodeLister(nil),
allNodeLister: kubernetes.NewTestNodeLister(nil),
allPodLister: &podListerMock{},
podDisruptionBudgetLister: &podDisruptionBudgetListerMock{},
daemonSetLister: &daemonSetListerMock{},
onScaleUp: &onScaleUpMock{},
onScaleDown: &onScaleDownMock{},
}
}
type autoscalerSetupConfig struct {
nodeGroups []*nodeGroup
nodeStateUpdateTime time.Time
autoscalingOptions config.AutoscalingOptions
clusterStateConfig clusterstate.ClusterStateRegistryConfig
mocks *commonMocks
nodesDeleted chan bool
}
func setupCloudProvider(config *autoscalerSetupConfig) (*testprovider.TestCloudProvider, error) {
provider := testprovider.NewTestCloudProvider(
func(id string, delta int) error {
return onScaleUpMock.ScaleUp(id, delta)
return config.mocks.onScaleUp.ScaleUp(id, delta)
}, func(id string, name string) error {
ret := onScaleDownMock.ScaleDown(id, name)
deleteFinished <- true
ret := config.mocks.onScaleDown.ScaleDown(id, name)
config.nodesDeleted <- true
return ret
})
allNodes := make([]*apiv1.Node, 0)
for _, ng := range tc.nodeGroups {
for _, ng := range config.nodeGroups {
provider.AddNodeGroup(ng.name, ng.min, ng.max, len(ng.nodes))
for _, node := range ng.nodes {
allNodes = append(allNodes, node)
provider.AddNode(ng.name, node)
}
reflectedNg := reflect.ValueOf(provider.GetNodeGroup(ng.name)).Interface().(*testprovider.TestNodeGroup)
assert.NotNil(t, reflectedNg)
if reflectedNg == nil {
return nil, fmt.Errorf("Nodegroup '%v' found as nil after setting up cloud provider", ng.name)
}
}
return provider, nil
}
func setupAutoscalingContext(opts config.AutoscalingOptions, provider cloudprovider.CloudProvider, processorCallbacks callbacks.ProcessorCallbacks) (context.AutoscalingContext, error) {
context, err := NewScaleTestAutoscalingContext(opts, &fake.Clientset{}, nil, provider, processorCallbacks, nil)
if err != nil {
return context, err
}
return context, nil
}
func setupAutoscaler(config *autoscalerSetupConfig) (*StaticAutoscaler, error) {
provider, err := setupCloudProvider(config)
if err != nil {
return nil, err
}
allNodes := make([]*apiv1.Node, 0)
for _, ng := range config.nodeGroups {
allNodes = append(allNodes, ng.nodes...)
}
// Create context with mocked lister registry.
processorCallbacks := newStaticAutoscalerProcessorCallbacks()
context, err := NewScaleTestAutoscalingContext(tc.autoscalingOptions, &fake.Clientset{}, nil, provider, processorCallbacks, nil)
assert.NoError(t, err)
context, err := setupAutoscalingContext(config.autoscalingOptions, provider, processorCallbacks)
setUpScaleDownActuator(&context, tc.autoscalingOptions)
if err != nil {
return nil, err
}
listerRegistry := kube_util.NewListerRegistry(allNodeLister, readyNodeLister, allPodListerMock,
podDisruptionBudgetListerMock, daemonSetListerMock,
setUpScaleDownActuator(&context, config.autoscalingOptions)
listerRegistry := kube_util.NewListerRegistry(config.mocks.allNodeLister, config.mocks.readyNodeLister, config.mocks.allPodLister,
config.mocks.podDisruptionBudgetLister, config.mocks.daemonSetLister,
nil, nil, nil, nil)
context.ListerRegistry = listerRegistry
clusterStateConfig := clusterstate.ClusterStateRegistryConfig{
OkTotalUnreadyCount: tc.OkTotalUnreadyCount,
}
clusterState := clusterstate.NewClusterStateRegistry(provider, clusterStateConfig, context.LogRecorder, NewBackoff(), nodegroupconfig.NewDefaultNodeGroupConfigProcessor(tc.autoscalingOptions.NodeGroupDefaults))
ngConfigProcesssor := nodegroupconfig.NewDefaultNodeGroupConfigProcessor(config.autoscalingOptions.NodeGroupDefaults)
clusterState := clusterstate.NewClusterStateRegistry(provider, config.clusterStateConfig, context.LogRecorder, NewBackoff(), ngConfigProcesssor)
clusterState.UpdateNodes(allNodes, nil, config.nodeStateUpdateTime)
clusterState.UpdateNodes(allNodes, nil, tc.now)
processors := NewTestProcessors(&context)
sdPlanner, sdActuator := newScaleDownPlannerAndActuator(t, &context, processors, clusterState)
sdPlanner, sdActuator := newScaleDownPlannerAndActuator(&context, processors, clusterState)
suOrchestrator := orchestrator.New()
suOrchestrator.Initialize(&context, processors, clusterState, taints.TaintConfig{})
autoscaler := &StaticAutoscaler{
AutoscalingContext: &context,
clusterStateRegistry: clusterState,
lastScaleUpTime: tc.lastScaleupTime,
lastScaleDownFailTime: tc.lastScaleDownFailTime,
scaleDownPlanner: sdPlanner,
scaleDownActuator: sdActuator,
scaleUpOrchestrator: suOrchestrator,
@ -251,27 +280,10 @@ func testAutoscaler(t *testing.T, tc testCase) {
processorCallbacks: processorCallbacks,
}
// Assummes all nodes are ready, to be updated when used in tests which needs non-ready nodes
readyNodeLister.SetNodes(allNodes)
allNodeLister.SetNodes(allNodes)
allPodListerMock.On("List").Return(tc.pods, nil).Times(tc.podListerCallTimes)
daemonSetListerMock.On("List", labels.Everything()).Return(tc.daemonSets, nil).Times(tc.daemonSetListerCallTimes)
podDisruptionBudgetListerMock.On("List").Return([]*policyv1.PodDisruptionBudget{}, nil).Times(tc.pdbListerCallTimes)
for _, scaleUpCall := range tc.expectedScaleUps {
onScaleUpMock.On("ScaleUp", scaleUpCall.ng, scaleUpCall.delta).Return(nil).Once()
}
for _, scaleDownCall := range tc.expectedScaleDowns {
onScaleDownMock.On("ScaleDown", scaleDownCall.ng, scaleDownCall.delta).Return(nil).Once()
return autoscaler, nil
}
err = autoscaler.RunOnce(tc.runAutoscalerAt)
assert.NoError(t, err)
mock.AssertExpectationsForObjects(t, allPodListerMock,
podDisruptionBudgetListerMock, daemonSetListerMock, onScaleUpMock, onScaleDownMock)
}
// TODO: Refactor tests to use testAutoscaler
// TODO: Refactor tests to use setupAutoscaler
func TestStaticAutoscalerRunOnce(t *testing.T) {
readyNodeLister := kubernetes.NewTestNodeLister(nil)
@ -345,7 +357,7 @@ func TestStaticAutoscalerRunOnce(t *testing.T) {
}
processors := NewTestProcessors(&context)
clusterState := clusterstate.NewClusterStateRegistry(provider, clusterStateConfig, context.LogRecorder, NewBackoff(), nodegroupconfig.NewDefaultNodeGroupConfigProcessor(options.NodeGroupDefaults))
sdPlanner, sdActuator := newScaleDownPlannerAndActuator(t, &context, processors, clusterState)
sdPlanner, sdActuator := newScaleDownPlannerAndActuator(&context, processors, clusterState)
suOrchestrator := orchestrator.New()
suOrchestrator.Initialize(&context, processors, clusterState, taints.TaintConfig{})
@ -554,7 +566,7 @@ func TestStaticAutoscalerRunOnceWithAutoprovisionedEnabled(t *testing.T) {
}
clusterState := clusterstate.NewClusterStateRegistry(provider, clusterStateConfig, context.LogRecorder, NewBackoff(), nodegroupconfig.NewDefaultNodeGroupConfigProcessor(options.NodeGroupDefaults))
sdPlanner, sdActuator := newScaleDownPlannerAndActuator(t, &context, processors, clusterState)
sdPlanner, sdActuator := newScaleDownPlannerAndActuator(&context, processors, clusterState)
suOrchestrator := orchestrator.New()
suOrchestrator.Initialize(&context, processors, clusterState, taints.TaintConfig{})
@ -704,7 +716,7 @@ func TestStaticAutoscalerRunOnceWithALongUnregisteredNode(t *testing.T) {
processors := NewTestProcessors(&context)
sdPlanner, sdActuator := newScaleDownPlannerAndActuator(t, &context, processors, clusterState)
sdPlanner, sdActuator := newScaleDownPlannerAndActuator(&context, processors, clusterState)
suOrchestrator := orchestrator.New()
suOrchestrator.Initialize(&context, processors, clusterState, taints.TaintConfig{})
@ -852,7 +864,7 @@ func TestStaticAutoscalerRunOncePodsWithPriorities(t *testing.T) {
processors := NewTestProcessors(&context)
clusterState := clusterstate.NewClusterStateRegistry(provider, clusterStateConfig, context.LogRecorder, NewBackoff(), nodegroupconfig.NewDefaultNodeGroupConfigProcessor(options.NodeGroupDefaults))
sdPlanner, sdActuator := newScaleDownPlannerAndActuator(t, &context, processors, clusterState)
sdPlanner, sdActuator := newScaleDownPlannerAndActuator(&context, processors, clusterState)
suOrchestrator := orchestrator.New()
suOrchestrator.Initialize(&context, processors, clusterState, taints.TaintConfig{})
@ -983,7 +995,7 @@ func TestStaticAutoscalerRunOnceWithFilteringOnBinPackingEstimator(t *testing.T)
processors := NewTestProcessors(&context)
clusterState := clusterstate.NewClusterStateRegistry(provider, clusterStateConfig, context.LogRecorder, NewBackoff(), nodegroupconfig.NewDefaultNodeGroupConfigProcessor(options.NodeGroupDefaults))
sdPlanner, sdActuator := newScaleDownPlannerAndActuator(t, &context, processors, clusterState)
sdPlanner, sdActuator := newScaleDownPlannerAndActuator(&context, processors, clusterState)
autoscaler := &StaticAutoscaler{
AutoscalingContext: &context,
@ -1081,7 +1093,7 @@ func TestStaticAutoscalerRunOnceWithFilteringOnUpcomingNodesEnabledNoScaleUp(t *
processors := NewTestProcessors(&context)
clusterState := clusterstate.NewClusterStateRegistry(provider, clusterStateConfig, context.LogRecorder, NewBackoff(), nodegroupconfig.NewDefaultNodeGroupConfigProcessor(options.NodeGroupDefaults))
sdPlanner, sdActuator := newScaleDownPlannerAndActuator(t, &context, processors, clusterState)
sdPlanner, sdActuator := newScaleDownPlannerAndActuator(&context, processors, clusterState)
autoscaler := &StaticAutoscaler{
AutoscalingContext: &context,
@ -1108,7 +1120,7 @@ func TestStaticAutoscalerRunOnceWithFilteringOnUpcomingNodesEnabledNoScaleUp(t *
podDisruptionBudgetListerMock, daemonSetListerMock, onScaleUpMock, onScaleDownMock)
}
func TestStaticAutoscalerRunOnceWithSchedulerProcessingIgnored(t *testing.T) {
func TestStaticAutoscalerRunOnceWithBypassedSchedulers(t *testing.T) {
bypassedScheduler := "bypassed-scheduler"
nonBypassedScheduler := "non-bypassed-scheduler"
options := config.AutoscalingOptions{
@ -1128,101 +1140,83 @@ func TestStaticAutoscalerRunOnceWithSchedulerProcessingIgnored(t *testing.T) {
bypassedScheduler,
}),
}
now := time.Now()
n1 := BuildTestNode("n1", 1000, 1000)
SetNodeReadyState(n1, true, time.Now())
SetNodeReadyState(n1, true, now)
ngs := []*nodeGroup{{
name: "ng1",
min: 1,
max: 10,
nodes: []*apiv1.Node{n1},
}}
p1 := BuildTestPod("p1", 600, 100)
p1.Spec.NodeName = "n1"
p2 := BuildTestPod("p2", 100, 100, AddSchedulerName(bypassedScheduler))
p3 := BuildTestPod("p3", 600, 100, AddSchedulerName(apiv1.DefaultSchedulerName)) // Not yet processed by scheduler, default scheduler is ignored
p3 := BuildTestPod("p3", 600, 100) // Not yet processed by scheduler, default scheduler is ignored
p4 := BuildTestPod("p4", 600, 100, AddSchedulerName(bypassedScheduler)) // non-default scheduler & ignored, expects a scale-up
p5 := BuildTestPod("p5", 600, 100, AddSchedulerName(nonBypassedScheduler))
testCases := map[string]testCase{
testSetupConfig := &autoscalerSetupConfig{
autoscalingOptions: options,
nodeGroups: ngs,
nodeStateUpdateTime: now,
mocks: newCommonMocks(),
clusterStateConfig: clusterstate.ClusterStateRegistryConfig{
OkTotalUnreadyCount: 1,
},
}
testCases := map[string]struct {
setupConfig *autoscalerSetupConfig
pods []*apiv1.Pod
expectedScaleUp *scaleCall
}{
"Unprocessed pod with bypassed scheduler doesn't cause a scale-up when there's capacity": {
pods: []*apiv1.Pod{p1, p2},
podListerCallTimes: 2,
nodeGroups: []*nodeGroup{{
name: "ng1",
min: 1,
max: 10,
nodes: []*apiv1.Node{n1},
}},
daemonSetListerCallTimes: 1,
pdbListerCallTimes: 1,
expectedScaleUps: []*scaleCall{},
now: time.Now(),
lastScaleupTime: time.Now(),
lastScaleDownFailTime: time.Now(),
runAutoscalerAt: time.Now().Add(time.Hour),
autoscalingOptions: options,
setupConfig: testSetupConfig,
},
"Unprocessed pod with bypassed scheduler causes a scale-up when there's no capacity - Default Scheduler": {
pods: []*apiv1.Pod{p1, p3},
podListerCallTimes: 2,
nodeGroups: []*nodeGroup{{
name: "ng1",
min: 1,
max: 10,
nodes: []*apiv1.Node{n1},
}},
daemonSetListerCallTimes: 1,
pdbListerCallTimes: 1,
expectedScaleUps: []*scaleCall{{
expectedScaleUp: &scaleCall{
ng: "ng1",
delta: 1,
}},
now: time.Now(),
lastScaleupTime: time.Now(),
lastScaleDownFailTime: time.Now(),
runAutoscalerAt: time.Now().Add(time.Hour),
autoscalingOptions: options,
},
setupConfig: testSetupConfig,
},
"Unprocessed pod with bypassed scheduler causes a scale-up when there's no capacity - Non-default Scheduler": {
pods: []*apiv1.Pod{p1, p4},
podListerCallTimes: 2,
nodeGroups: []*nodeGroup{{
name: "ng1",
min: 1,
max: 10,
nodes: []*apiv1.Node{n1},
}},
daemonSetListerCallTimes: 1,
pdbListerCallTimes: 1,
expectedScaleUps: []*scaleCall{{
setupConfig: testSetupConfig,
expectedScaleUp: &scaleCall{
ng: "ng1",
delta: 1,
}},
now: time.Now(),
lastScaleupTime: time.Now(),
lastScaleDownFailTime: time.Now(),
runAutoscalerAt: time.Now().Add(time.Hour),
autoscalingOptions: options,
},
},
"Unprocessed pod with non-bypassed scheduler doesn't cause a scale-up when there's no capacity": {
pods: []*apiv1.Pod{p1, p5},
podListerCallTimes: 2,
nodeGroups: []*nodeGroup{{
name: "ng1",
min: 1,
max: 10,
nodes: []*apiv1.Node{n1},
}},
daemonSetListerCallTimes: 1,
pdbListerCallTimes: 1,
expectedScaleUps: []*scaleCall{},
now: time.Now(),
lastScaleupTime: time.Now(),
lastScaleDownFailTime: time.Now(),
runAutoscalerAt: time.Now().Add(time.Hour),
autoscalingOptions: options,
setupConfig: testSetupConfig,
},
}
for tcName, tc := range testCases {
t.Run(tcName, func(t *testing.T) {
testAutoscaler(t, tc)
autoscaler, err := setupAutoscaler(tc.setupConfig)
assert.NoError(t, err)
tc.setupConfig.mocks.readyNodeLister.SetNodes([]*apiv1.Node{n1})
tc.setupConfig.mocks.allNodeLister.SetNodes([]*apiv1.Node{n1})
tc.setupConfig.mocks.allPodLister.On("List").Return(tc.pods, nil).Twice()
tc.setupConfig.mocks.daemonSetLister.On("List", labels.Everything()).Return([]*appsv1.DaemonSet{}, nil).Once()
tc.setupConfig.mocks.podDisruptionBudgetLister.On("List").Return([]*policyv1.PodDisruptionBudget{}, nil).Once()
if tc.expectedScaleUp != nil {
tc.setupConfig.mocks.onScaleUp.On("ScaleUp", tc.expectedScaleUp.ng, tc.expectedScaleUp.delta).Return(nil).Once()
}
err = autoscaler.RunOnce(now.Add(time.Hour))
assert.NoError(t, err)
mock.AssertExpectationsForObjects(t, tc.setupConfig.mocks.allPodLister,
tc.setupConfig.mocks.podDisruptionBudgetLister, tc.setupConfig.mocks.daemonSetLister, tc.setupConfig.mocks.onScaleUp, tc.setupConfig.mocks.onScaleDown)
})
}
@ -2081,7 +2075,7 @@ func waitForDeleteToFinish(t *testing.T, deleteFinished <-chan bool) {
}
}
func newScaleDownPlannerAndActuator(t *testing.T, ctx *context.AutoscalingContext, p *ca_processors.AutoscalingProcessors, cs *clusterstate.ClusterStateRegistry) (scaledown.Planner, scaledown.Actuator) {
func newScaleDownPlannerAndActuator(ctx *context.AutoscalingContext, p *ca_processors.AutoscalingProcessors, cs *clusterstate.ClusterStateRegistry) (scaledown.Planner, scaledown.Actuator) {
ctx.MaxScaleDownParallelism = 10
ctx.MaxDrainParallelism = 1
ctx.NodeDeletionBatcherInterval = 0 * time.Second