From c6cf6b83fb1e768be84f453c19d57e700cd9f112 Mon Sep 17 00:00:00 2001 From: pengli Date: Fri, 25 Jun 2021 21:07:22 +0800 Subject: [PATCH] Add new scheduler plugin for checking if required api has installed in target cluster Signed-off-by: pengli --- pkg/scheduler/core/generic_scheduler.go | 10 ++-- pkg/scheduler/framework/interface.go | 5 +- .../plugins/apiinstalled/api_installed.go | 51 +++++++++++++++++++ .../clusteraffinity/cluster_affinity.go | 9 ++-- pkg/scheduler/framework/plugins/registry.go | 2 + .../tainttoleration/taint_toleration.go | 9 ++-- pkg/scheduler/framework/runtime/framework.go | 11 ++-- pkg/scheduler/scheduler.go | 7 +-- 8 files changed, 82 insertions(+), 22 deletions(-) create mode 100644 pkg/scheduler/framework/plugins/apiinstalled/api_installed.go diff --git a/pkg/scheduler/core/generic_scheduler.go b/pkg/scheduler/core/generic_scheduler.go index 3f9af4ec0..b85e7348c 100644 --- a/pkg/scheduler/core/generic_scheduler.go +++ b/pkg/scheduler/core/generic_scheduler.go @@ -8,6 +8,7 @@ import ( clusterv1alpha1 "github.com/karmada-io/karmada/pkg/apis/cluster/v1alpha1" policyv1alpha1 "github.com/karmada-io/karmada/pkg/apis/policy/v1alpha1" + workv1alpha1 "github.com/karmada-io/karmada/pkg/apis/work/v1alpha1" lister "github.com/karmada-io/karmada/pkg/generated/listers/policy/v1alpha1" "github.com/karmada-io/karmada/pkg/scheduler/cache" "github.com/karmada-io/karmada/pkg/scheduler/framework" @@ -17,7 +18,7 @@ import ( // ScheduleAlgorithm is the interface that should be implemented to schedule a resource to the target clusters. type ScheduleAlgorithm interface { - Schedule(context.Context, *policyv1alpha1.Placement) (scheduleResult ScheduleResult, err error) + Schedule(context.Context, *policyv1alpha1.Placement, *workv1alpha1.ObjectReference) (scheduleResult ScheduleResult, err error) } // ScheduleResult includes the clusters selected. @@ -45,13 +46,13 @@ func NewGenericScheduler( } } -func (g *genericScheduler) Schedule(ctx context.Context, placement *policyv1alpha1.Placement) (result ScheduleResult, err error) { +func (g *genericScheduler) Schedule(ctx context.Context, placement *policyv1alpha1.Placement, resource *workv1alpha1.ObjectReference) (result ScheduleResult, err error) { clusterInfoSnapshot := g.schedulerCache.Snapshot() if clusterInfoSnapshot.NumOfClusters() == 0 { return result, fmt.Errorf("no clusters available to schedule") } - feasibleClusters, err := g.findClustersThatFit(ctx, g.scheduleFramework, placement, clusterInfoSnapshot) + feasibleClusters, err := g.findClustersThatFit(ctx, g.scheduleFramework, placement, resource, clusterInfoSnapshot) if err != nil { return result, fmt.Errorf("failed to findClustersThatFit: %v", err) } @@ -77,11 +78,12 @@ func (g *genericScheduler) findClustersThatFit( ctx context.Context, fwk framework.Framework, placement *policyv1alpha1.Placement, + resource *workv1alpha1.ObjectReference, clusterInfo *cache.Snapshot) ([]*clusterv1alpha1.Cluster, error) { var out []*clusterv1alpha1.Cluster clusters := clusterInfo.GetReadyClusters() for _, c := range clusters { - resMap := fwk.RunFilterPlugins(ctx, placement, c.Cluster()) + resMap := fwk.RunFilterPlugins(ctx, placement, resource, c.Cluster()) res := resMap.Merge() if !res.IsSuccess() { klog.V(4).Infof("cluster %q is not fit", c.Cluster().Name) diff --git a/pkg/scheduler/framework/interface.go b/pkg/scheduler/framework/interface.go index fd6da2194..b0706fba1 100644 --- a/pkg/scheduler/framework/interface.go +++ b/pkg/scheduler/framework/interface.go @@ -7,6 +7,7 @@ import ( cluster "github.com/karmada-io/karmada/pkg/apis/cluster/v1alpha1" "github.com/karmada-io/karmada/pkg/apis/policy/v1alpha1" + work "github.com/karmada-io/karmada/pkg/apis/work/v1alpha1" ) // Framework manages the set of plugins in use by the scheduling framework. @@ -15,7 +16,7 @@ type Framework interface { // RunFilterPlugins runs the set of configured Filter plugins for resources on // the given cluster. - RunFilterPlugins(ctx context.Context, placement *v1alpha1.Placement, cluster *cluster.Cluster) PluginToResult + RunFilterPlugins(ctx context.Context, placement *v1alpha1.Placement, resource *work.ObjectReference, cluster *cluster.Cluster) PluginToResult // RunScorePlugins runs the set of configured Score plugins, it returns a map of plugin name to cores RunScorePlugins(ctx context.Context, placement *v1alpha1.Placement, clusters []*cluster.Cluster) (PluginToClusterScores, error) @@ -31,7 +32,7 @@ type Plugin interface { type FilterPlugin interface { Plugin // Filter is called by the scheduling framework. - Filter(ctx context.Context, placement *v1alpha1.Placement, cluster *cluster.Cluster) *Result + Filter(ctx context.Context, placement *v1alpha1.Placement, resource *work.ObjectReference, cluster *cluster.Cluster) *Result } // Result indicates the result of running a plugin. It consists of a code, a diff --git a/pkg/scheduler/framework/plugins/apiinstalled/api_installed.go b/pkg/scheduler/framework/plugins/apiinstalled/api_installed.go new file mode 100644 index 000000000..437d83fde --- /dev/null +++ b/pkg/scheduler/framework/plugins/apiinstalled/api_installed.go @@ -0,0 +1,51 @@ +package apiinstalled + +import ( + "context" + + clusterv1alpha1 "github.com/karmada-io/karmada/pkg/apis/cluster/v1alpha1" + policyv1alpha1 "github.com/karmada-io/karmada/pkg/apis/policy/v1alpha1" + workv1alpha1 "github.com/karmada-io/karmada/pkg/apis/work/v1alpha1" + "github.com/karmada-io/karmada/pkg/scheduler/framework" +) + +const ( + // Name is the name of the plugin used in the plugin registry and configurations. + Name = "APIInstalled" +) + +// APIInstalled is a plugin that checks if the API(CRD) of the resource is installed in the target cluster. +type APIInstalled struct{} + +var _ framework.FilterPlugin = &APIInstalled{} +var _ framework.ScorePlugin = &APIInstalled{} + +// New instantiates the APIInstalled plugin. +func New() framework.Plugin { + return &APIInstalled{} +} + +// Name returns the plugin name. +func (p *APIInstalled) Name() string { + return Name +} + +// Filter checks if the API(CRD) of the resource is installed in the target cluster. +func (p *APIInstalled) Filter(ctx context.Context, placement *policyv1alpha1.Placement, resource *workv1alpha1.ObjectReference, cluster *clusterv1alpha1.Cluster) *framework.Result { + for _, apiEnablement := range cluster.Status.APIEnablements { + if apiEnablement.GroupVersion == resource.APIVersion { + for _, apiResource := range apiEnablement.Resources { + if apiResource.Kind == resource.Kind { + return framework.NewResult(framework.Success) + } + } + } + } + + return framework.NewResult(framework.Unschedulable, "no such API resource") +} + +// Score calculates the score on the candidate cluster. +func (p *APIInstalled) Score(ctx context.Context, placement *policyv1alpha1.Placement, cluster *clusterv1alpha1.Cluster) (float64, *framework.Result) { + return 0, framework.NewResult(framework.Success) +} diff --git a/pkg/scheduler/framework/plugins/clusteraffinity/cluster_affinity.go b/pkg/scheduler/framework/plugins/clusteraffinity/cluster_affinity.go index 01e889982..a527509f2 100644 --- a/pkg/scheduler/framework/plugins/clusteraffinity/cluster_affinity.go +++ b/pkg/scheduler/framework/plugins/clusteraffinity/cluster_affinity.go @@ -3,8 +3,9 @@ package clusteraffinity import ( "context" - cluster "github.com/karmada-io/karmada/pkg/apis/cluster/v1alpha1" - "github.com/karmada-io/karmada/pkg/apis/policy/v1alpha1" + clusterv1alpha1 "github.com/karmada-io/karmada/pkg/apis/cluster/v1alpha1" + policyv1alpha1 "github.com/karmada-io/karmada/pkg/apis/policy/v1alpha1" + workv1alpha1 "github.com/karmada-io/karmada/pkg/apis/work/v1alpha1" "github.com/karmada-io/karmada/pkg/scheduler/framework" "github.com/karmada-io/karmada/pkg/util" ) @@ -31,7 +32,7 @@ func (p *ClusterAffinity) Name() string { } // Filter checks if the cluster matched the placement cluster affinity constraint. -func (p *ClusterAffinity) Filter(ctx context.Context, placement *v1alpha1.Placement, cluster *cluster.Cluster) *framework.Result { +func (p *ClusterAffinity) Filter(ctx context.Context, placement *policyv1alpha1.Placement, resource *workv1alpha1.ObjectReference, cluster *clusterv1alpha1.Cluster) *framework.Result { affinity := placement.ClusterAffinity if affinity != nil { if util.ClusterMatches(cluster, *affinity) { @@ -45,6 +46,6 @@ func (p *ClusterAffinity) Filter(ctx context.Context, placement *v1alpha1.Placem } // Score calculates the score on the candidate cluster. -func (p *ClusterAffinity) Score(ctx context.Context, placement *v1alpha1.Placement, cluster *cluster.Cluster) (float64, *framework.Result) { +func (p *ClusterAffinity) Score(ctx context.Context, placement *policyv1alpha1.Placement, cluster *clusterv1alpha1.Cluster) (float64, *framework.Result) { return 0, framework.NewResult(framework.Success) } diff --git a/pkg/scheduler/framework/plugins/registry.go b/pkg/scheduler/framework/plugins/registry.go index 3d05bc602..73706f09b 100644 --- a/pkg/scheduler/framework/plugins/registry.go +++ b/pkg/scheduler/framework/plugins/registry.go @@ -2,6 +2,7 @@ package plugins import ( "github.com/karmada-io/karmada/pkg/scheduler/framework" + "github.com/karmada-io/karmada/pkg/scheduler/framework/plugins/apiinstalled" "github.com/karmada-io/karmada/pkg/scheduler/framework/plugins/clusteraffinity" "github.com/karmada-io/karmada/pkg/scheduler/framework/plugins/tainttoleration" ) @@ -11,5 +12,6 @@ func NewPlugins() map[string]framework.Plugin { return map[string]framework.Plugin{ clusteraffinity.Name: clusteraffinity.New(), tainttoleration.Name: tainttoleration.New(), + apiinstalled.Name: apiinstalled.New(), } } diff --git a/pkg/scheduler/framework/plugins/tainttoleration/taint_toleration.go b/pkg/scheduler/framework/plugins/tainttoleration/taint_toleration.go index 86301eed5..a19a7d555 100644 --- a/pkg/scheduler/framework/plugins/tainttoleration/taint_toleration.go +++ b/pkg/scheduler/framework/plugins/tainttoleration/taint_toleration.go @@ -7,8 +7,9 @@ import ( v1 "k8s.io/api/core/v1" v1helper "k8s.io/kubernetes/pkg/apis/core/v1/helper" - cluster "github.com/karmada-io/karmada/pkg/apis/cluster/v1alpha1" - "github.com/karmada-io/karmada/pkg/apis/policy/v1alpha1" + clusterv1alpha1 "github.com/karmada-io/karmada/pkg/apis/cluster/v1alpha1" + policyv1alpha1 "github.com/karmada-io/karmada/pkg/apis/policy/v1alpha1" + workv1alpha1 "github.com/karmada-io/karmada/pkg/apis/work/v1alpha1" "github.com/karmada-io/karmada/pkg/scheduler/framework" ) @@ -34,7 +35,7 @@ func (p *TaintToleration) Name() string { } // Filter checks if the given tolerations in placement tolerate cluster's taints. -func (p *TaintToleration) Filter(ctx context.Context, placement *v1alpha1.Placement, cluster *cluster.Cluster) *framework.Result { +func (p *TaintToleration) Filter(ctx context.Context, placement *policyv1alpha1.Placement, resource *workv1alpha1.ObjectReference, cluster *clusterv1alpha1.Cluster) *framework.Result { filterPredicate := func(t *v1.Taint) bool { // now only interested in NoSchedule taint which means do not allow new resource to schedule onto the cluster unless they tolerate the taint // todo: supprot NoExecute taint @@ -51,6 +52,6 @@ func (p *TaintToleration) Filter(ctx context.Context, placement *v1alpha1.Placem } // Score calculates the score on the candidate cluster. -func (p *TaintToleration) Score(ctx context.Context, placement *v1alpha1.Placement, cluster *cluster.Cluster) (float64, *framework.Result) { +func (p *TaintToleration) Score(ctx context.Context, placement *policyv1alpha1.Placement, cluster *clusterv1alpha1.Cluster) (float64, *framework.Result) { return 0, framework.NewResult(framework.Success) } diff --git a/pkg/scheduler/framework/runtime/framework.go b/pkg/scheduler/framework/runtime/framework.go index 166b42738..58b15f5b4 100644 --- a/pkg/scheduler/framework/runtime/framework.go +++ b/pkg/scheduler/framework/runtime/framework.go @@ -7,8 +7,9 @@ import ( "k8s.io/klog/v2" - cluster "github.com/karmada-io/karmada/pkg/apis/cluster/v1alpha1" - "github.com/karmada-io/karmada/pkg/apis/policy/v1alpha1" + clusterv1alpha1 "github.com/karmada-io/karmada/pkg/apis/cluster/v1alpha1" + policyv1alpha1 "github.com/karmada-io/karmada/pkg/apis/policy/v1alpha1" + workv1alpha1 "github.com/karmada-io/karmada/pkg/apis/work/v1alpha1" "github.com/karmada-io/karmada/pkg/scheduler/framework" plugins2 "github.com/karmada-io/karmada/pkg/scheduler/framework/plugins" ) @@ -52,10 +53,10 @@ func NewFramework(plugins []string) framework.Framework { // RunFilterPlugins runs the set of configured Filter plugins for resources on the cluster. // If any of the result is not success, the cluster is not suited for the resource. -func (frw *frameworkImpl) RunFilterPlugins(ctx context.Context, placement *v1alpha1.Placement, cluster *cluster.Cluster) framework.PluginToResult { +func (frw *frameworkImpl) RunFilterPlugins(ctx context.Context, placement *policyv1alpha1.Placement, resource *workv1alpha1.ObjectReference, cluster *clusterv1alpha1.Cluster) framework.PluginToResult { result := make(framework.PluginToResult, len(frw.filterPlugins)) for _, p := range frw.filterPlugins { - pluginResult := p.Filter(ctx, placement, cluster) + pluginResult := p.Filter(ctx, placement, resource, cluster) result[p.Name()] = pluginResult } @@ -64,7 +65,7 @@ func (frw *frameworkImpl) RunFilterPlugins(ctx context.Context, placement *v1alp // RunScorePlugins runs the set of configured Filter plugins for resources on the cluster. // If any of the result is not success, the cluster is not suited for the resource. -func (frw *frameworkImpl) RunScorePlugins(ctx context.Context, placement *v1alpha1.Placement, clusters []*cluster.Cluster) (framework.PluginToClusterScores, error) { +func (frw *frameworkImpl) RunScorePlugins(ctx context.Context, placement *policyv1alpha1.Placement, clusters []*clusterv1alpha1.Cluster) (framework.PluginToClusterScores, error) { result := make(framework.PluginToClusterScores, len(frw.filterPlugins)) for _, p := range frw.scorePlugins { for i, cluster := range clusters { diff --git a/pkg/scheduler/scheduler.go b/pkg/scheduler/scheduler.go index e141d34c4..837aef39e 100644 --- a/pkg/scheduler/scheduler.go +++ b/pkg/scheduler/scheduler.go @@ -31,6 +31,7 @@ import ( worklister "github.com/karmada-io/karmada/pkg/generated/listers/work/v1alpha1" schedulercache "github.com/karmada-io/karmada/pkg/scheduler/cache" "github.com/karmada-io/karmada/pkg/scheduler/core" + "github.com/karmada-io/karmada/pkg/scheduler/framework/plugins/apiinstalled" "github.com/karmada-io/karmada/pkg/scheduler/framework/plugins/clusteraffinity" "github.com/karmada-io/karmada/pkg/scheduler/framework/plugins/tainttoleration" "github.com/karmada-io/karmada/pkg/util" @@ -107,7 +108,7 @@ func NewScheduler(dynamicClient dynamic.Interface, karmadaClient karmadaclientse queue := workqueue.NewRateLimitingQueue(workqueue.DefaultControllerRateLimiter()) schedulerCache := schedulercache.NewCache() // TODO: make plugins as a flag - algorithm := core.NewGenericScheduler(schedulerCache, policyLister, []string{clusteraffinity.Name, tainttoleration.Name}) + algorithm := core.NewGenericScheduler(schedulerCache, policyLister, []string{clusteraffinity.Name, tainttoleration.Name, apiinstalled.Name}) sched := &Scheduler{ DynamicClient: dynamicClient, KarmadaClient: karmadaClient, @@ -476,7 +477,7 @@ func (s *Scheduler) scheduleResourceBinding(resourceBinding *workv1alpha1.Resour return err } - scheduleResult, err := s.Algorithm.Schedule(context.TODO(), &placement) + scheduleResult, err := s.Algorithm.Schedule(context.TODO(), &placement, &resourceBinding.Spec.Resource) if err != nil { klog.V(2).Infof("failed scheduling ResourceBinding %s/%s: %v", resourceBinding.Namespace, resourceBinding.Name, err) return err @@ -503,7 +504,7 @@ func (s *Scheduler) scheduleResourceBinding(resourceBinding *workv1alpha1.Resour } func (s *Scheduler) scheduleClusterResourceBinding(clusterResourceBinding *workv1alpha1.ClusterResourceBinding, policy *policyv1alpha1.ClusterPropagationPolicy) (err error) { - scheduleResult, err := s.Algorithm.Schedule(context.TODO(), &policy.Spec.Placement) + scheduleResult, err := s.Algorithm.Schedule(context.TODO(), &policy.Spec.Placement, &clusterResourceBinding.Spec.Resource) if err != nil { klog.V(2).Infof("failed scheduling ClusterResourceBinding %s: %v", clusterResourceBinding.Name, err) return err