Extract PredicateChecker interface

This commit is contained in:
Łukasz Osipiuk 2019-12-31 16:08:00 +01:00
parent fd01b98670
commit 373c558303
14 changed files with 93 additions and 31 deletions

View File

@ -41,7 +41,7 @@ type AutoscalingContext struct {
CloudProvider cloudprovider.CloudProvider CloudProvider cloudprovider.CloudProvider
// TODO(kgolab) - move away too as it's not config // TODO(kgolab) - move away too as it's not config
// PredicateChecker to check if a pod can fit into a node. // PredicateChecker to check if a pod can fit into a node.
PredicateChecker *simulator.PredicateChecker PredicateChecker simulator.PredicateChecker
// ExpanderStrategy is the strategy used to choose which node group to expand when scaling up // ExpanderStrategy is the strategy used to choose which node group to expand when scaling up
ExpanderStrategy expander.Strategy ExpanderStrategy expander.Strategy
// EstimatorBuilder is the builder function for node count estimator to be used. // EstimatorBuilder is the builder function for node count estimator to be used.
@ -83,7 +83,7 @@ func NewResourceLimiterFromAutoscalingOptions(options config.AutoscalingOptions)
} }
// NewAutoscalingContext returns an autoscaling context from all the necessary parameters passed via arguments // NewAutoscalingContext returns an autoscaling context from all the necessary parameters passed via arguments
func NewAutoscalingContext(options config.AutoscalingOptions, predicateChecker *simulator.PredicateChecker, func NewAutoscalingContext(options config.AutoscalingOptions, predicateChecker simulator.PredicateChecker,
autoscalingKubeClients *AutoscalingKubeClients, cloudProvider cloudprovider.CloudProvider, autoscalingKubeClients *AutoscalingKubeClients, cloudProvider cloudprovider.CloudProvider,
expanderStrategy expander.Strategy, estimatorBuilder estimator.EstimatorBuilder, expanderStrategy expander.Strategy, estimatorBuilder estimator.EstimatorBuilder,
processorCallbacks processor_callbacks.ProcessorCallbacks) *AutoscalingContext { processorCallbacks processor_callbacks.ProcessorCallbacks) *AutoscalingContext {

View File

@ -41,7 +41,7 @@ type AutoscalerOptions struct {
EventsKubeClient kube_client.Interface EventsKubeClient kube_client.Interface
AutoscalingKubeClients *context.AutoscalingKubeClients AutoscalingKubeClients *context.AutoscalingKubeClients
CloudProvider cloudprovider.CloudProvider CloudProvider cloudprovider.CloudProvider
PredicateChecker *simulator.PredicateChecker PredicateChecker simulator.PredicateChecker
ExpanderStrategy expander.Strategy ExpanderStrategy expander.Strategy
EstimatorBuilder estimator.EstimatorBuilder EstimatorBuilder estimator.EstimatorBuilder
Processors *ca_processors.AutoscalingProcessors Processors *ca_processors.AutoscalingProcessors
@ -86,7 +86,7 @@ func initializeDefaultOptions(opts *AutoscalerOptions) error {
} }
if opts.PredicateChecker == nil { if opts.PredicateChecker == nil {
predicateCheckerStopChannel := make(chan struct{}) predicateCheckerStopChannel := make(chan struct{})
predicateChecker, err := simulator.NewPredicateChecker(opts.KubeClient, predicateCheckerStopChannel) predicateChecker, err := simulator.NewSchedulerBasedPredicateChecker(opts.KubeClient, predicateCheckerStopChannel)
if err != nil { if err != nil {
return err return err
} }

View File

@ -92,7 +92,7 @@ func (filterOutSchedulablePodListProcessor) CleanUp() {
// tries to pack the higher priority pods first. It takes into account pods that are bound to node // tries to pack the higher priority pods first. It takes into account pods that are bound to node
// and will be scheduled after lower priority pod preemption. // and will be scheduled after lower priority pod preemption.
func filterOutSchedulableByPacking(unschedulableCandidates []*apiv1.Pod, nodes []*apiv1.Node, func filterOutSchedulableByPacking(unschedulableCandidates []*apiv1.Pod, nodes []*apiv1.Node,
allScheduled []*apiv1.Pod, predicateChecker *simulator.PredicateChecker, allScheduled []*apiv1.Pod, predicateChecker simulator.PredicateChecker,
expendablePodsPriorityCutoff int, nodesExist bool) []*apiv1.Pod { expendablePodsPriorityCutoff int, nodesExist bool) []*apiv1.Pod {
var unschedulablePods []*apiv1.Pod var unschedulablePods []*apiv1.Pod
nonExpendableScheduled := utils.FilterOutExpendablePods(allScheduled, expendablePodsPriorityCutoff) nonExpendableScheduled := utils.FilterOutExpendablePods(allScheduled, expendablePodsPriorityCutoff)

View File

@ -109,7 +109,7 @@ func (callbacks *staticAutoscalerProcessorCallbacks) reset() {
// NewStaticAutoscaler creates an instance of Autoscaler filled with provided parameters // NewStaticAutoscaler creates an instance of Autoscaler filled with provided parameters
func NewStaticAutoscaler( func NewStaticAutoscaler(
opts config.AutoscalingOptions, opts config.AutoscalingOptions,
predicateChecker *simulator.PredicateChecker, predicateChecker simulator.PredicateChecker,
autoscalingKubeClients *context.AutoscalingKubeClients, autoscalingKubeClients *context.AutoscalingKubeClients,
processors *ca_processors.AutoscalingProcessors, processors *ca_processors.AutoscalingProcessors,
cloudProvider cloudprovider.CloudProvider, cloudProvider cloudprovider.CloudProvider,

View File

@ -63,7 +63,7 @@ func TestPodSchedulableMap(t *testing.T) {
assert.True(t, found) assert.True(t, found)
assert.Nil(t, err) assert.Nil(t, err)
cpuErr := &simulator.PredicateError{} cpuErr := simulator.GenericPredicateError()
// Pod in different RC // Pod in different RC
_, found = pMap.Get(podInRc2) _, found = pMap.Get(podInRc2)

View File

@ -50,7 +50,7 @@ const (
func GetNodeInfosForGroups(nodes []*apiv1.Node, nodeInfoCache map[string]*schedulernodeinfo.NodeInfo, cloudProvider cloudprovider.CloudProvider, listers kube_util.ListerRegistry, func GetNodeInfosForGroups(nodes []*apiv1.Node, nodeInfoCache map[string]*schedulernodeinfo.NodeInfo, cloudProvider cloudprovider.CloudProvider, listers kube_util.ListerRegistry,
// TODO(mwielgus): This returns map keyed by url, while most code (including scheduler) uses node.Name for a key. // TODO(mwielgus): This returns map keyed by url, while most code (including scheduler) uses node.Name for a key.
// TODO(mwielgus): Review error policy - sometimes we may continue with partial errors. // TODO(mwielgus): Review error policy - sometimes we may continue with partial errors.
daemonsets []*appsv1.DaemonSet, predicateChecker *simulator.PredicateChecker, ignoredTaints TaintKeySet) (map[string]*schedulernodeinfo.NodeInfo, errors.AutoscalerError) { daemonsets []*appsv1.DaemonSet, predicateChecker simulator.PredicateChecker, ignoredTaints TaintKeySet) (map[string]*schedulernodeinfo.NodeInfo, errors.AutoscalerError) {
result := make(map[string]*schedulernodeinfo.NodeInfo) result := make(map[string]*schedulernodeinfo.NodeInfo)
seenGroups := make(map[string]bool) seenGroups := make(map[string]bool)
@ -173,7 +173,7 @@ func getPodsForNodes(listers kube_util.ListerRegistry) (map[string][]*apiv1.Pod,
} }
// GetNodeInfoFromTemplate returns NodeInfo object built base on TemplateNodeInfo returned by NodeGroup.TemplateNodeInfo(). // GetNodeInfoFromTemplate returns NodeInfo object built base on TemplateNodeInfo returned by NodeGroup.TemplateNodeInfo().
func GetNodeInfoFromTemplate(nodeGroup cloudprovider.NodeGroup, daemonsets []*appsv1.DaemonSet, predicateChecker *simulator.PredicateChecker, ignoredTaints TaintKeySet) (*schedulernodeinfo.NodeInfo, errors.AutoscalerError) { func GetNodeInfoFromTemplate(nodeGroup cloudprovider.NodeGroup, daemonsets []*appsv1.DaemonSet, predicateChecker simulator.PredicateChecker, ignoredTaints TaintKeySet) (*schedulernodeinfo.NodeInfo, errors.AutoscalerError) {
id := nodeGroup.Id() id := nodeGroup.Id()
baseNodeInfo, err := nodeGroup.TemplateNodeInfo() baseNodeInfo, err := nodeGroup.TemplateNodeInfo()
if err != nil { if err != nil {

View File

@ -34,11 +34,11 @@ type podInfo struct {
// BinpackingNodeEstimator estimates the number of needed nodes to handle the given amount of pods. // BinpackingNodeEstimator estimates the number of needed nodes to handle the given amount of pods.
type BinpackingNodeEstimator struct { type BinpackingNodeEstimator struct {
predicateChecker *simulator.PredicateChecker predicateChecker simulator.PredicateChecker
} }
// NewBinpackingNodeEstimator builds a new BinpackingNodeEstimator. // NewBinpackingNodeEstimator builds a new BinpackingNodeEstimator.
func NewBinpackingNodeEstimator(predicateChecker *simulator.PredicateChecker) *BinpackingNodeEstimator { func NewBinpackingNodeEstimator(predicateChecker simulator.PredicateChecker) *BinpackingNodeEstimator {
return &BinpackingNodeEstimator{ return &BinpackingNodeEstimator{
predicateChecker: predicateChecker, predicateChecker: predicateChecker,
} }

View File

@ -38,13 +38,13 @@ type Estimator interface {
} }
// EstimatorBuilder creates a new estimator object. // EstimatorBuilder creates a new estimator object.
type EstimatorBuilder func(*simulator.PredicateChecker) Estimator type EstimatorBuilder func(simulator.PredicateChecker) Estimator
// NewEstimatorBuilder creates a new estimator object from flag. // NewEstimatorBuilder creates a new estimator object from flag.
func NewEstimatorBuilder(name string) (EstimatorBuilder, error) { func NewEstimatorBuilder(name string) (EstimatorBuilder, error) {
switch name { switch name {
case BinpackingEstimatorName: case BinpackingEstimatorName:
return func(predicateChecker *simulator.PredicateChecker) Estimator { return func(predicateChecker simulator.PredicateChecker) Estimator {
return NewBinpackingNodeEstimator(predicateChecker) return NewBinpackingNodeEstimator(predicateChecker)
}, nil }, nil
} }

View File

@ -71,7 +71,7 @@ type UtilizationInfo struct {
// FindNodesToRemove finds nodes that can be removed. Returns also an information about good // FindNodesToRemove finds nodes that can be removed. Returns also an information about good
// rescheduling location for each of the pods. // rescheduling location for each of the pods.
func FindNodesToRemove(candidates []*apiv1.Node, destinationNodes []*apiv1.Node, pods []*apiv1.Pod, func FindNodesToRemove(candidates []*apiv1.Node, destinationNodes []*apiv1.Node, pods []*apiv1.Pod,
listers kube_util.ListerRegistry, predicateChecker *PredicateChecker, maxCount int, listers kube_util.ListerRegistry, predicateChecker PredicateChecker, maxCount int,
fastCheck bool, oldHints map[string]string, usageTracker *UsageTracker, fastCheck bool, oldHints map[string]string, usageTracker *UsageTracker,
timestamp time.Time, timestamp time.Time,
podDisruptionBudgets []*policyv1.PodDisruptionBudget, podDisruptionBudgets []*policyv1.PodDisruptionBudget,
@ -218,7 +218,7 @@ func calculateUtilizationOfResource(node *apiv1.Node, nodeInfo *schedulernodeinf
// TODO: We don't need to pass list of nodes here as they are already available in nodeInfos. // TODO: We don't need to pass list of nodes here as they are already available in nodeInfos.
func findPlaceFor(removedNode string, pods []*apiv1.Pod, nodes []*apiv1.Node, nodeInfos map[string]*schedulernodeinfo.NodeInfo, func findPlaceFor(removedNode string, pods []*apiv1.Pod, nodes []*apiv1.Node, nodeInfos map[string]*schedulernodeinfo.NodeInfo,
predicateChecker *PredicateChecker, oldHints map[string]string, newHints map[string]string, usageTracker *UsageTracker, predicateChecker PredicateChecker, oldHints map[string]string, newHints map[string]string, usageTracker *UsageTracker,
timestamp time.Time) error { timestamp time.Time) error {
newNodeInfos := make(map[string]*schedulernodeinfo.NodeInfo) newNodeInfos := make(map[string]*schedulernodeinfo.NodeInfo)

View File

@ -0,0 +1,29 @@
/*
Copyright 2016 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package simulator
import (
apiv1 "k8s.io/api/core/v1"
scheduler_nodeinfo "k8s.io/kubernetes/pkg/scheduler/nodeinfo"
)
// PredicateChecker checks whether all required predicates pass for given Pod and Node.
type PredicateChecker interface {
SnapshotClusterState() error
FitsAny(pod *apiv1.Pod, nodeInfos map[string]*scheduler_nodeinfo.NodeInfo) (string, error)
CheckPredicates(pod *apiv1.Pod, nodeInfo *scheduler_nodeinfo.NodeInfo) *PredicateError
}

View File

@ -35,8 +35,9 @@ import (
"k8s.io/kubernetes/pkg/scheduler/algorithmprovider" "k8s.io/kubernetes/pkg/scheduler/algorithmprovider"
) )
// PredicateChecker checks whether all required predicates pass for given Pod and Node. // SchedulerBasedPredicateChecker checks whether all required predicates pass for given Pod and Node.
type PredicateChecker struct { // The verification is done by calling out to scheduler code.
type SchedulerBasedPredicateChecker struct {
framework scheduler_framework.Framework framework scheduler_framework.Framework
snapshot scheduler_listers.SharedLister snapshot scheduler_listers.SharedLister
} }
@ -46,8 +47,8 @@ type PredicateChecker struct {
// There are no const arrays in Go, this is meant to be used as a const. // There are no const arrays in Go, this is meant to be used as a const.
var priorityPredicates = []string{"PodFitsResources", "PodToleratesNodeTaints", "GeneralPredicates", "ready"} var priorityPredicates = []string{"PodFitsResources", "PodToleratesNodeTaints", "GeneralPredicates", "ready"}
// NewPredicateChecker builds PredicateChecker. // NewSchedulerBasedPredicateChecker builds scheduler based PredicateChecker.
func NewPredicateChecker(kubeClient kube_client.Interface, stop <-chan struct{}) (*PredicateChecker, error) { func NewSchedulerBasedPredicateChecker(kubeClient kube_client.Interface, stop <-chan struct{}) (*SchedulerBasedPredicateChecker, error) {
informerFactory := informers.NewSharedInformerFactory(kubeClient, 0) informerFactory := informers.NewSharedInformerFactory(kubeClient, 0)
providerRegistry := algorithmprovider.NewRegistry(1) // 1 here is hardPodAffinityWeight not relevant for CA providerRegistry := algorithmprovider.NewRegistry(1) // 1 here is hardPodAffinityWeight not relevant for CA
config := providerRegistry[scheduler_apis_config.SchedulerDefaultProviderName] config := providerRegistry[scheduler_apis_config.SchedulerDefaultProviderName]
@ -82,27 +83,21 @@ func NewPredicateChecker(kubeClient kube_client.Interface, stop <-chan struct{})
// informerFactory....Lister()/informerFactory....Informer() methods // informerFactory....Lister()/informerFactory....Informer() methods
informerFactory.Start(stop) informerFactory.Start(stop)
return &PredicateChecker{ return &SchedulerBasedPredicateChecker{
framework: framework, framework: framework,
snapshot: snapshot, snapshot: snapshot,
}, nil }, nil
} }
// NewTestPredicateChecker builds test version of PredicateChecker.
func NewTestPredicateChecker() *PredicateChecker {
// TODO(scheduler_framework)
return nil
}
// SnapshotClusterState updates cluster snapshot used by the predicate checker. // SnapshotClusterState updates cluster snapshot used by the predicate checker.
// It should be called every CA loop iteration. // It should be called every CA loop iteration.
func (p *PredicateChecker) SnapshotClusterState() error { func (p *SchedulerBasedPredicateChecker) SnapshotClusterState() error {
// TODO(scheduler_framework rebuild snapshot // TODO(scheduler_framework rebuild snapshot
return nil return nil
} }
// FitsAny checks if the given pod can be place on any of the given nodes. // FitsAny checks if the given pod can be place on any of the given nodes.
func (p *PredicateChecker) FitsAny(pod *apiv1.Pod, nodeInfos map[string]*scheduler_nodeinfo.NodeInfo) (string, error) { func (p *SchedulerBasedPredicateChecker) FitsAny(pod *apiv1.Pod, nodeInfos map[string]*scheduler_nodeinfo.NodeInfo) (string, error) {
state := scheduler_framework.NewCycleState() state := scheduler_framework.NewCycleState()
preFilterStatus := p.framework.RunPreFilterPlugins(context.TODO(), state, pod) preFilterStatus := p.framework.RunPreFilterPlugins(context.TODO(), state, pod)
if !preFilterStatus.IsSuccess() { if !preFilterStatus.IsSuccess() {
@ -131,7 +126,7 @@ func (p *PredicateChecker) FitsAny(pod *apiv1.Pod, nodeInfos map[string]*schedul
} }
// CheckPredicates checks if the given pod can be placed on the given node. // CheckPredicates checks if the given pod can be placed on the given node.
func (p *PredicateChecker) CheckPredicates(pod *apiv1.Pod, nodeInfo *scheduler_nodeinfo.NodeInfo) *PredicateError { func (p *SchedulerBasedPredicateChecker) CheckPredicates(pod *apiv1.Pod, nodeInfo *scheduler_nodeinfo.NodeInfo) *PredicateError {
state := scheduler_framework.NewCycleState() state := scheduler_framework.NewCycleState()
preFilterStatus := p.framework.RunPreFilterPlugins(context.TODO(), state, pod) preFilterStatus := p.framework.RunPreFilterPlugins(context.TODO(), state, pod)
if !preFilterStatus.IsSuccess() { if !preFilterStatus.IsSuccess() {
@ -165,7 +160,7 @@ func (p *PredicateChecker) CheckPredicates(pod *apiv1.Pod, nodeInfo *scheduler_n
return nil return nil
} }
func (p *PredicateChecker) buildDebugInfo(filterName string, nodeInfo *scheduler_nodeinfo.NodeInfo) func() string { func (p *SchedulerBasedPredicateChecker) buildDebugInfo(filterName string, nodeInfo *scheduler_nodeinfo.NodeInfo) func() string {
switch filterName { switch filterName {
case "TaintToleration": case "TaintToleration":
taints := nodeInfo.Node().Spec.Taints taints := nodeInfo.Node().Spec.Taints

View File

@ -0,0 +1,38 @@
/*
Copyright 2020 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package simulator
import scheduler_predicates "k8s.io/kubernetes/pkg/scheduler/algorithm/predicates"
// PredicateInfo assigns a name to a predicate
type PredicateInfo struct {
Name string
Predicate scheduler_predicates.FitPredicate
}
// NewTestPredicateChecker builds test version of PredicateChecker.
func NewTestPredicateChecker() PredicateChecker {
// TODO(scheduler_framework)
return nil
}
// NewCustomTestPredicateChecker builds test version of PredicateChecker with additional predicates.
// Helps with benchmarking different ordering of predicates.
func NewCustomTestPredicateChecker(predicateInfos []PredicateInfo) PredicateChecker {
// TODO(scheduler_framework)
return nil
}

View File

@ -28,7 +28,7 @@ import (
) )
// GetDaemonSetPodsForNode returns daemonset nodes for the given pod. // GetDaemonSetPodsForNode returns daemonset nodes for the given pod.
func GetDaemonSetPodsForNode(nodeInfo *schedulernodeinfo.NodeInfo, daemonsets []*appsv1.DaemonSet, predicateChecker *simulator.PredicateChecker) []*apiv1.Pod { func GetDaemonSetPodsForNode(nodeInfo *schedulernodeinfo.NodeInfo, daemonsets []*appsv1.DaemonSet, predicateChecker simulator.PredicateChecker) []*apiv1.Pod {
result := make([]*apiv1.Pod, 0) result := make([]*apiv1.Pod, 0)
for _, ds := range daemonsets { for _, ds := range daemonsets {
pod := newPod(ds, nodeInfo.Node().Name) pod := newPod(ds, nodeInfo.Node().Name)