Pass ready nodes list to PodListProcessor

This commit is contained in:
Łukasz Osipiuk 2019-04-10 22:01:12 +02:00
parent c6115b826e
commit 5c09c50774
3 changed files with 19 additions and 14 deletions

View File

@ -276,19 +276,19 @@ func (a *StaticAutoscaler) RunOnce(currentTime time.Time) errors.AutoscalerError
}
metrics.UpdateUnschedulablePodsCount(len(allUnschedulablePods))
allScheduled, err := scheduledPodLister.List()
allScheduledPods, err := scheduledPodLister.List()
if err != nil {
klog.Errorf("Failed to list scheduled pods: %v", err)
return errors.ToAutoscalerError(errors.ApiCallError, err)
}
allUnschedulablePods, allScheduled, err = a.processors.PodListProcessor.Process(a.AutoscalingContext, allUnschedulablePods, allScheduled, allNodes)
allUnschedulablePods, allScheduledPods, err = a.processors.PodListProcessor.Process(a.AutoscalingContext, allUnschedulablePods, allScheduledPods, allNodes, readyNodes)
if err != nil {
klog.Errorf("Failed to process pod list: %v", err)
return errors.ToAutoscalerError(errors.InternalError, err)
}
ConfigurePredicateCheckerForLoop(allUnschedulablePods, allScheduled, a.PredicateChecker)
ConfigurePredicateCheckerForLoop(allUnschedulablePods, allScheduledPods, a.PredicateChecker)
// We need to check whether pods marked as unschedulable are actually unschedulable.
// It's likely we added a new node and the scheduler just haven't managed to put the
@ -317,10 +317,10 @@ func (a *StaticAutoscaler) RunOnce(currentTime time.Time) errors.AutoscalerError
filterOutSchedulableStart := time.Now()
var unschedulablePodsToHelp []*apiv1.Pod
if a.FilterOutSchedulablePodsUsesPacking {
unschedulablePodsToHelp = filterOutSchedulableByPacking(unschedulablePods, readyNodes, allScheduled,
unschedulablePodsToHelp = filterOutSchedulableByPacking(unschedulablePods, readyNodes, allScheduledPods,
unschedulableWaitingForLowerPriorityPreemption, a.PredicateChecker, a.ExpendablePodsPriorityCutoff)
} else {
unschedulablePodsToHelp = filterOutSchedulableSimple(unschedulablePods, readyNodes, allScheduled,
unschedulablePodsToHelp = filterOutSchedulableSimple(unschedulablePods, readyNodes, allScheduledPods,
unschedulableWaitingForLowerPriorityPreemption, a.PredicateChecker, a.ExpendablePodsPriorityCutoff)
}
@ -390,7 +390,7 @@ func (a *StaticAutoscaler) RunOnce(currentTime time.Time) errors.AutoscalerError
scaleDown.CleanUp(currentTime)
potentiallyUnneeded := getPotentiallyUnneededNodes(autoscalingContext, allNodes)
typedErr := scaleDown.UpdateUnneededNodes(allNodes, potentiallyUnneeded, append(allScheduled, unschedulableWaitingForLowerPriorityPreemption...), currentTime, pdbs)
typedErr := scaleDown.UpdateUnneededNodes(allNodes, potentiallyUnneeded, append(allScheduledPods, unschedulableWaitingForLowerPriorityPreemption...), currentTime, pdbs)
if typedErr != nil {
scaleDownStatus.Result = status.ScaleDownError
klog.Errorf("Failed to scale down: %v", typedErr)
@ -431,7 +431,7 @@ func (a *StaticAutoscaler) RunOnce(currentTime time.Time) errors.AutoscalerError
scaleDownStart := time.Now()
metrics.UpdateLastTime(metrics.ScaleDown, scaleDownStart)
scaleDownStatus, typedErr := scaleDown.TryToScaleDown(allNodes, allScheduled, pdbs, currentTime)
scaleDownStatus, typedErr := scaleDown.TryToScaleDown(allNodes, allScheduledPods, pdbs, currentTime)
metrics.UpdateDurationFromStart(metrics.ScaleDown, scaleDownStart)
if scaleDownStatus.Result == status.ScaleDownNodeDeleted {

View File

@ -23,7 +23,9 @@ import (
// PodListProcessor processes lists of unschedulable and scheduled pods before scaling of the cluster.
type PodListProcessor interface {
Process(context *context.AutoscalingContext, unschedulablePods []*apiv1.Pod, allScheduled []*apiv1.Pod, nodes []*apiv1.Node) ([]*apiv1.Pod, []*apiv1.Pod, error)
Process(context *context.AutoscalingContext,
unschedulablePods []*apiv1.Pod, allScheduledPods []*apiv1.Pod,
allNodes []*apiv1.Node, readyNodes []*apiv1.Node) ([]*apiv1.Pod, []*apiv1.Pod, error)
CleanUp()
}
@ -37,8 +39,10 @@ func NewDefaultPodListProcessor() PodListProcessor {
}
// Process processes lists of unschedulable and scheduled pods before scaling of the cluster.
func (p *NoOpPodListProcessor) Process(context *context.AutoscalingContext, unschedulablePods []*apiv1.Pod, allScheduled []*apiv1.Pod, nodes []*apiv1.Node) ([]*apiv1.Pod, []*apiv1.Pod, error) {
return unschedulablePods, allScheduled, nil
func (p *NoOpPodListProcessor) Process(context *context.AutoscalingContext,
unschedulablePods []*apiv1.Pod, allScheduledPods []*apiv1.Pod,
allNodes []*apiv1.Node, readyNodes []*apiv1.Node) ([]*apiv1.Pod, []*apiv1.Pod, error) {
return unschedulablePods, allScheduledPods, nil
}
// CleanUp cleans up the processor's internal structures.

View File

@ -32,13 +32,14 @@ func TestPodListProcessor(t *testing.T) {
n1 := BuildTestNode("n1", 100, 1000)
n2 := BuildTestNode("n1", 100, 1000)
unschedulablePods := []*apiv1.Pod{p1}
allScheduled := []*apiv1.Pod{p2}
nodes := []*apiv1.Node{n1, n2}
allScheduledPods := []*apiv1.Pod{p2}
allNodes := []*apiv1.Node{n1, n2}
readyNodes := []*apiv1.Node{n1, n2}
podListProcessor := NewDefaultPodListProcessor()
gotUnschedulablePods, gotAllScheduled, err := podListProcessor.Process(context, unschedulablePods, allScheduled, nodes)
gotUnschedulablePods, gotAllScheduled, err := podListProcessor.Process(context, unschedulablePods, allScheduledPods, allNodes, readyNodes)
if len(gotUnschedulablePods) != 1 || len(gotAllScheduled) != 1 || err != nil {
t.Errorf("Error podListProcessor.Process() = %v, %v, %v want %v, %v, nil ",
gotUnschedulablePods, gotAllScheduled, err, unschedulablePods, allScheduled)
gotUnschedulablePods, gotAllScheduled, err, unschedulablePods, allScheduledPods)
}
}