Add default PodListProcessor wrapper

This commit is contained in:
Bartłomiej Wróblewski 2022-12-01 09:32:55 +00:00
parent c3d8e81b98
commit 2e1b04ff69
7 changed files with 120 additions and 9 deletions

View File

@ -0,0 +1,39 @@
/*
Copyright 2022 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package podlistprocessor
import (
apiv1 "k8s.io/api/core/v1"
"k8s.io/autoscaler/cluster-autoscaler/context"
)
type currentlyDrainedNodesPodListProcessor struct {
}
// NewCurrentlyDrainedNodesPodListProcessor returns a new processor adding pods
// from currently drained nodes to the unschedulable pods.
func NewCurrentlyDrainedNodesPodListProcessor() *currentlyDrainedNodesPodListProcessor {
return &currentlyDrainedNodesPodListProcessor{}
}
// Process adds recreatable pods from currently drained nodes
func (p *currentlyDrainedNodesPodListProcessor) Process(_ *context.AutoscalingContext, unschedulablePods []*apiv1.Pod) ([]*apiv1.Pod, error) {
return unschedulablePods, nil
}
func (p *currentlyDrainedNodesPodListProcessor) CleanUp() {
}

View File

@ -0,0 +1,17 @@
/*
Copyright 2022 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package podlistprocessor

View File

@ -14,7 +14,7 @@ See the License for the specific language governing permissions and
limitations under the License.
*/
package filteroutschedulable
package podlistprocessor
import (
"sort"
@ -43,9 +43,7 @@ func NewFilterOutSchedulablePodListProcessor(predicateChecker predicatechecker.P
}
// Process filters out pods which are schedulable from list of unschedulable pods.
func (p *filterOutSchedulablePodListProcessor) Process(
context *context.AutoscalingContext,
unschedulablePods []*apiv1.Pod) ([]*apiv1.Pod, error) {
func (p *filterOutSchedulablePodListProcessor) Process(context *context.AutoscalingContext, unschedulablePods []*apiv1.Pod) ([]*apiv1.Pod, error) {
// We need to check whether pods marked as unschedulable are actually unschedulable.
// It's likely we added a new node and the scheduler just haven't managed to put the
// pod on in yet. In this situation we don't want to trigger another scale-up.

View File

@ -14,7 +14,7 @@ See the License for the specific language governing permissions and
limitations under the License.
*/
package filteroutschedulable
package podlistprocessor
import (
"fmt"

View File

@ -0,0 +1,51 @@
/*
Copyright 2022 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package podlistprocessor
import (
apiv1 "k8s.io/api/core/v1"
"k8s.io/autoscaler/cluster-autoscaler/context"
)
type defaultPodListProcessor struct {
currentlyDrainedNodes *currentlyDrainedNodesPodListProcessor
filterOutSchedulable *filterOutSchedulablePodListProcessor
}
// NewDefaultPodListProcessor returns a default implementation of the pod list
// processor, which wraps and sequentially runs other sub-processors.
func NewDefaultPodListProcessor(currentlyDrainedNodes *currentlyDrainedNodesPodListProcessor, filterOutSchedulable *filterOutSchedulablePodListProcessor) *defaultPodListProcessor {
return &defaultPodListProcessor{
currentlyDrainedNodes: currentlyDrainedNodes,
filterOutSchedulable: filterOutSchedulable,
}
}
// Process runs sub-processors sequentially
func (p *defaultPodListProcessor) Process(ctx *context.AutoscalingContext, unschedulablePods []*apiv1.Pod) ([]*apiv1.Pod, error) {
unschedulablePods, err := p.currentlyDrainedNodes.Process(ctx, unschedulablePods)
if err != nil {
return nil, err
}
return p.filterOutSchedulable.Process(ctx, unschedulablePods)
}
func (p *defaultPodListProcessor) CleanUp() {
p.currentlyDrainedNodes.CleanUp()
p.filterOutSchedulable.CleanUp()
}

View File

@ -30,7 +30,7 @@ import (
"k8s.io/autoscaler/cluster-autoscaler/clusterstate/utils"
"k8s.io/autoscaler/cluster-autoscaler/config"
"k8s.io/autoscaler/cluster-autoscaler/context"
"k8s.io/autoscaler/cluster-autoscaler/core/filteroutschedulable"
"k8s.io/autoscaler/cluster-autoscaler/core/podlistprocessor"
"k8s.io/autoscaler/cluster-autoscaler/core/scaledown/deletiontracker"
"k8s.io/autoscaler/cluster-autoscaler/estimator"
"k8s.io/autoscaler/cluster-autoscaler/expander/random"
@ -135,7 +135,10 @@ func ExtractPodNames(pods []*apiv1.Pod) []string {
// NewTestProcessors returns a set of simple processors for use in tests.
func NewTestProcessors(context *context.AutoscalingContext) *processors.AutoscalingProcessors {
return &processors.AutoscalingProcessors{
PodListProcessor: filteroutschedulable.NewFilterOutSchedulablePodListProcessor(context.PredicateChecker),
PodListProcessor: podlistprocessor.NewDefaultPodListProcessor(
podlistprocessor.NewCurrentlyDrainedNodesPodListProcessor(),
podlistprocessor.NewFilterOutSchedulablePodListProcessor(context.PredicateChecker),
),
NodeGroupListProcessor: &nodegroups.NoOpNodeGroupListProcessor{},
NodeGroupSetProcessor: nodegroupset.NewDefaultNodeGroupSetProcessor([]string{}),
ScaleDownSetProcessor: nodes.NewPostFilteringScaleDownNodeProcessor(),

View File

@ -41,7 +41,7 @@ import (
cloudBuilder "k8s.io/autoscaler/cluster-autoscaler/cloudprovider/builder"
"k8s.io/autoscaler/cluster-autoscaler/config"
"k8s.io/autoscaler/cluster-autoscaler/core"
"k8s.io/autoscaler/cluster-autoscaler/core/filteroutschedulable"
"k8s.io/autoscaler/cluster-autoscaler/core/podlistprocessor"
"k8s.io/autoscaler/cluster-autoscaler/estimator"
"k8s.io/autoscaler/cluster-autoscaler/expander"
"k8s.io/autoscaler/cluster-autoscaler/metrics"
@ -375,7 +375,10 @@ func buildAutoscaler(debuggingSnapshotter debuggingsnapshot.DebuggingSnapshotter
opts.Processors = ca_processors.DefaultProcessors()
opts.Processors.TemplateNodeInfoProvider = nodeinfosprovider.NewDefaultTemplateNodeInfoProvider(nodeInfoCacheExpireTime)
opts.Processors.PodListProcessor = filteroutschedulable.NewFilterOutSchedulablePodListProcessor(opts.PredicateChecker)
opts.Processors.PodListProcessor = podlistprocessor.NewDefaultPodListProcessor(
podlistprocessor.NewCurrentlyDrainedNodesPodListProcessor(),
podlistprocessor.NewFilterOutSchedulablePodListProcessor(opts.PredicateChecker),
)
var nodeInfoComparator nodegroupset.NodeInfoComparator
if len(autoscalingOptions.BalancingLabels) > 0 {