Merge pull request #470 from vincent-pli/new-scheduler-plugin-for-api-target-cluster

Add new scheduler plugin for checking if required api has installed in target cluster
This commit is contained in:
karmada-bot 2021-07-02 15:01:17 +08:00 committed by GitHub
commit 0545509cd5
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 82 additions and 22 deletions

View File

@ -8,6 +8,7 @@ import (
clusterv1alpha1 "github.com/karmada-io/karmada/pkg/apis/cluster/v1alpha1"
policyv1alpha1 "github.com/karmada-io/karmada/pkg/apis/policy/v1alpha1"
workv1alpha1 "github.com/karmada-io/karmada/pkg/apis/work/v1alpha1"
lister "github.com/karmada-io/karmada/pkg/generated/listers/policy/v1alpha1"
"github.com/karmada-io/karmada/pkg/scheduler/cache"
"github.com/karmada-io/karmada/pkg/scheduler/framework"
@ -17,7 +18,7 @@ import (
// ScheduleAlgorithm is the interface that should be implemented to schedule a resource to the target clusters.
type ScheduleAlgorithm interface {
Schedule(context.Context, *policyv1alpha1.Placement) (scheduleResult ScheduleResult, err error)
Schedule(context.Context, *policyv1alpha1.Placement, *workv1alpha1.ObjectReference) (scheduleResult ScheduleResult, err error)
}
// ScheduleResult includes the clusters selected.
@ -45,13 +46,13 @@ func NewGenericScheduler(
}
}
func (g *genericScheduler) Schedule(ctx context.Context, placement *policyv1alpha1.Placement) (result ScheduleResult, err error) {
func (g *genericScheduler) Schedule(ctx context.Context, placement *policyv1alpha1.Placement, resource *workv1alpha1.ObjectReference) (result ScheduleResult, err error) {
clusterInfoSnapshot := g.schedulerCache.Snapshot()
if clusterInfoSnapshot.NumOfClusters() == 0 {
return result, fmt.Errorf("no clusters available to schedule")
}
feasibleClusters, err := g.findClustersThatFit(ctx, g.scheduleFramework, placement, clusterInfoSnapshot)
feasibleClusters, err := g.findClustersThatFit(ctx, g.scheduleFramework, placement, resource, clusterInfoSnapshot)
if err != nil {
return result, fmt.Errorf("failed to findClustersThatFit: %v", err)
}
@ -77,11 +78,12 @@ func (g *genericScheduler) findClustersThatFit(
ctx context.Context,
fwk framework.Framework,
placement *policyv1alpha1.Placement,
resource *workv1alpha1.ObjectReference,
clusterInfo *cache.Snapshot) ([]*clusterv1alpha1.Cluster, error) {
var out []*clusterv1alpha1.Cluster
clusters := clusterInfo.GetReadyClusters()
for _, c := range clusters {
resMap := fwk.RunFilterPlugins(ctx, placement, c.Cluster())
resMap := fwk.RunFilterPlugins(ctx, placement, resource, c.Cluster())
res := resMap.Merge()
if !res.IsSuccess() {
klog.V(4).Infof("cluster %q is not fit", c.Cluster().Name)

View File

@ -7,6 +7,7 @@ import (
cluster "github.com/karmada-io/karmada/pkg/apis/cluster/v1alpha1"
"github.com/karmada-io/karmada/pkg/apis/policy/v1alpha1"
work "github.com/karmada-io/karmada/pkg/apis/work/v1alpha1"
)
// Framework manages the set of plugins in use by the scheduling framework.
@ -15,7 +16,7 @@ type Framework interface {
// RunFilterPlugins runs the set of configured Filter plugins for resources on
// the given cluster.
RunFilterPlugins(ctx context.Context, placement *v1alpha1.Placement, cluster *cluster.Cluster) PluginToResult
RunFilterPlugins(ctx context.Context, placement *v1alpha1.Placement, resource *work.ObjectReference, cluster *cluster.Cluster) PluginToResult
// RunScorePlugins runs the set of configured Score plugins, it returns a map of plugin name to cores
RunScorePlugins(ctx context.Context, placement *v1alpha1.Placement, clusters []*cluster.Cluster) (PluginToClusterScores, error)
@ -31,7 +32,7 @@ type Plugin interface {
type FilterPlugin interface {
Plugin
// Filter is called by the scheduling framework.
Filter(ctx context.Context, placement *v1alpha1.Placement, cluster *cluster.Cluster) *Result
Filter(ctx context.Context, placement *v1alpha1.Placement, resource *work.ObjectReference, cluster *cluster.Cluster) *Result
}
// Result indicates the result of running a plugin. It consists of a code, a

View File

@ -0,0 +1,51 @@
package apiinstalled
import (
"context"
clusterv1alpha1 "github.com/karmada-io/karmada/pkg/apis/cluster/v1alpha1"
policyv1alpha1 "github.com/karmada-io/karmada/pkg/apis/policy/v1alpha1"
workv1alpha1 "github.com/karmada-io/karmada/pkg/apis/work/v1alpha1"
"github.com/karmada-io/karmada/pkg/scheduler/framework"
)
const (
// Name is the name of the plugin used in the plugin registry and configurations.
Name = "APIInstalled"
)
// APIInstalled is a plugin that checks if the API(CRD) of the resource is installed in the target cluster.
type APIInstalled struct{}
var _ framework.FilterPlugin = &APIInstalled{}
var _ framework.ScorePlugin = &APIInstalled{}
// New instantiates the APIInstalled plugin.
func New() framework.Plugin {
return &APIInstalled{}
}
// Name returns the plugin name.
func (p *APIInstalled) Name() string {
return Name
}
// Filter checks if the API(CRD) of the resource is installed in the target cluster.
func (p *APIInstalled) Filter(ctx context.Context, placement *policyv1alpha1.Placement, resource *workv1alpha1.ObjectReference, cluster *clusterv1alpha1.Cluster) *framework.Result {
for _, apiEnablement := range cluster.Status.APIEnablements {
if apiEnablement.GroupVersion == resource.APIVersion {
for _, apiResource := range apiEnablement.Resources {
if apiResource.Kind == resource.Kind {
return framework.NewResult(framework.Success)
}
}
}
}
return framework.NewResult(framework.Unschedulable, "no such API resource")
}
// Score calculates the score on the candidate cluster.
func (p *APIInstalled) Score(ctx context.Context, placement *policyv1alpha1.Placement, cluster *clusterv1alpha1.Cluster) (float64, *framework.Result) {
return 0, framework.NewResult(framework.Success)
}

View File

@ -3,8 +3,9 @@ package clusteraffinity
import (
"context"
cluster "github.com/karmada-io/karmada/pkg/apis/cluster/v1alpha1"
"github.com/karmada-io/karmada/pkg/apis/policy/v1alpha1"
clusterv1alpha1 "github.com/karmada-io/karmada/pkg/apis/cluster/v1alpha1"
policyv1alpha1 "github.com/karmada-io/karmada/pkg/apis/policy/v1alpha1"
workv1alpha1 "github.com/karmada-io/karmada/pkg/apis/work/v1alpha1"
"github.com/karmada-io/karmada/pkg/scheduler/framework"
"github.com/karmada-io/karmada/pkg/util"
)
@ -31,7 +32,7 @@ func (p *ClusterAffinity) Name() string {
}
// Filter checks if the cluster matched the placement cluster affinity constraint.
func (p *ClusterAffinity) Filter(ctx context.Context, placement *v1alpha1.Placement, cluster *cluster.Cluster) *framework.Result {
func (p *ClusterAffinity) Filter(ctx context.Context, placement *policyv1alpha1.Placement, resource *workv1alpha1.ObjectReference, cluster *clusterv1alpha1.Cluster) *framework.Result {
affinity := placement.ClusterAffinity
if affinity != nil {
if util.ClusterMatches(cluster, *affinity) {
@ -45,6 +46,6 @@ func (p *ClusterAffinity) Filter(ctx context.Context, placement *v1alpha1.Placem
}
// Score calculates the score on the candidate cluster.
func (p *ClusterAffinity) Score(ctx context.Context, placement *v1alpha1.Placement, cluster *cluster.Cluster) (float64, *framework.Result) {
func (p *ClusterAffinity) Score(ctx context.Context, placement *policyv1alpha1.Placement, cluster *clusterv1alpha1.Cluster) (float64, *framework.Result) {
return 0, framework.NewResult(framework.Success)
}

View File

@ -2,6 +2,7 @@ package plugins
import (
"github.com/karmada-io/karmada/pkg/scheduler/framework"
"github.com/karmada-io/karmada/pkg/scheduler/framework/plugins/apiinstalled"
"github.com/karmada-io/karmada/pkg/scheduler/framework/plugins/clusteraffinity"
"github.com/karmada-io/karmada/pkg/scheduler/framework/plugins/tainttoleration"
)
@ -11,5 +12,6 @@ func NewPlugins() map[string]framework.Plugin {
return map[string]framework.Plugin{
clusteraffinity.Name: clusteraffinity.New(),
tainttoleration.Name: tainttoleration.New(),
apiinstalled.Name: apiinstalled.New(),
}
}

View File

@ -7,8 +7,9 @@ import (
v1 "k8s.io/api/core/v1"
v1helper "k8s.io/kubernetes/pkg/apis/core/v1/helper"
cluster "github.com/karmada-io/karmada/pkg/apis/cluster/v1alpha1"
"github.com/karmada-io/karmada/pkg/apis/policy/v1alpha1"
clusterv1alpha1 "github.com/karmada-io/karmada/pkg/apis/cluster/v1alpha1"
policyv1alpha1 "github.com/karmada-io/karmada/pkg/apis/policy/v1alpha1"
workv1alpha1 "github.com/karmada-io/karmada/pkg/apis/work/v1alpha1"
"github.com/karmada-io/karmada/pkg/scheduler/framework"
)
@ -34,7 +35,7 @@ func (p *TaintToleration) Name() string {
}
// Filter checks if the given tolerations in placement tolerate cluster's taints.
func (p *TaintToleration) Filter(ctx context.Context, placement *v1alpha1.Placement, cluster *cluster.Cluster) *framework.Result {
func (p *TaintToleration) Filter(ctx context.Context, placement *policyv1alpha1.Placement, resource *workv1alpha1.ObjectReference, cluster *clusterv1alpha1.Cluster) *framework.Result {
filterPredicate := func(t *v1.Taint) bool {
// now only interested in NoSchedule taint which means do not allow new resource to schedule onto the cluster unless they tolerate the taint
// todo: supprot NoExecute taint
@ -51,6 +52,6 @@ func (p *TaintToleration) Filter(ctx context.Context, placement *v1alpha1.Placem
}
// Score calculates the score on the candidate cluster.
func (p *TaintToleration) Score(ctx context.Context, placement *v1alpha1.Placement, cluster *cluster.Cluster) (float64, *framework.Result) {
func (p *TaintToleration) Score(ctx context.Context, placement *policyv1alpha1.Placement, cluster *clusterv1alpha1.Cluster) (float64, *framework.Result) {
return 0, framework.NewResult(framework.Success)
}

View File

@ -7,8 +7,9 @@ import (
"k8s.io/klog/v2"
cluster "github.com/karmada-io/karmada/pkg/apis/cluster/v1alpha1"
"github.com/karmada-io/karmada/pkg/apis/policy/v1alpha1"
clusterv1alpha1 "github.com/karmada-io/karmada/pkg/apis/cluster/v1alpha1"
policyv1alpha1 "github.com/karmada-io/karmada/pkg/apis/policy/v1alpha1"
workv1alpha1 "github.com/karmada-io/karmada/pkg/apis/work/v1alpha1"
"github.com/karmada-io/karmada/pkg/scheduler/framework"
plugins2 "github.com/karmada-io/karmada/pkg/scheduler/framework/plugins"
)
@ -52,10 +53,10 @@ func NewFramework(plugins []string) framework.Framework {
// RunFilterPlugins runs the set of configured Filter plugins for resources on the cluster.
// If any of the result is not success, the cluster is not suited for the resource.
func (frw *frameworkImpl) RunFilterPlugins(ctx context.Context, placement *v1alpha1.Placement, cluster *cluster.Cluster) framework.PluginToResult {
func (frw *frameworkImpl) RunFilterPlugins(ctx context.Context, placement *policyv1alpha1.Placement, resource *workv1alpha1.ObjectReference, cluster *clusterv1alpha1.Cluster) framework.PluginToResult {
result := make(framework.PluginToResult, len(frw.filterPlugins))
for _, p := range frw.filterPlugins {
pluginResult := p.Filter(ctx, placement, cluster)
pluginResult := p.Filter(ctx, placement, resource, cluster)
result[p.Name()] = pluginResult
}
@ -64,7 +65,7 @@ func (frw *frameworkImpl) RunFilterPlugins(ctx context.Context, placement *v1alp
// RunScorePlugins runs the set of configured Filter plugins for resources on the cluster.
// If any of the result is not success, the cluster is not suited for the resource.
func (frw *frameworkImpl) RunScorePlugins(ctx context.Context, placement *v1alpha1.Placement, clusters []*cluster.Cluster) (framework.PluginToClusterScores, error) {
func (frw *frameworkImpl) RunScorePlugins(ctx context.Context, placement *policyv1alpha1.Placement, clusters []*clusterv1alpha1.Cluster) (framework.PluginToClusterScores, error) {
result := make(framework.PluginToClusterScores, len(frw.filterPlugins))
for _, p := range frw.scorePlugins {
for i, cluster := range clusters {

View File

@ -31,6 +31,7 @@ import (
worklister "github.com/karmada-io/karmada/pkg/generated/listers/work/v1alpha1"
schedulercache "github.com/karmada-io/karmada/pkg/scheduler/cache"
"github.com/karmada-io/karmada/pkg/scheduler/core"
"github.com/karmada-io/karmada/pkg/scheduler/framework/plugins/apiinstalled"
"github.com/karmada-io/karmada/pkg/scheduler/framework/plugins/clusteraffinity"
"github.com/karmada-io/karmada/pkg/scheduler/framework/plugins/tainttoleration"
"github.com/karmada-io/karmada/pkg/util"
@ -107,7 +108,7 @@ func NewScheduler(dynamicClient dynamic.Interface, karmadaClient karmadaclientse
queue := workqueue.NewRateLimitingQueue(workqueue.DefaultControllerRateLimiter())
schedulerCache := schedulercache.NewCache()
// TODO: make plugins as a flag
algorithm := core.NewGenericScheduler(schedulerCache, policyLister, []string{clusteraffinity.Name, tainttoleration.Name})
algorithm := core.NewGenericScheduler(schedulerCache, policyLister, []string{clusteraffinity.Name, tainttoleration.Name, apiinstalled.Name})
sched := &Scheduler{
DynamicClient: dynamicClient,
KarmadaClient: karmadaClient,
@ -476,7 +477,7 @@ func (s *Scheduler) scheduleResourceBinding(resourceBinding *workv1alpha1.Resour
return err
}
scheduleResult, err := s.Algorithm.Schedule(context.TODO(), &placement)
scheduleResult, err := s.Algorithm.Schedule(context.TODO(), &placement, &resourceBinding.Spec.Resource)
if err != nil {
klog.V(2).Infof("failed scheduling ResourceBinding %s/%s: %v", resourceBinding.Namespace, resourceBinding.Name, err)
return err
@ -503,7 +504,7 @@ func (s *Scheduler) scheduleResourceBinding(resourceBinding *workv1alpha1.Resour
}
func (s *Scheduler) scheduleClusterResourceBinding(clusterResourceBinding *workv1alpha1.ClusterResourceBinding, policy *policyv1alpha1.ClusterPropagationPolicy) (err error) {
scheduleResult, err := s.Algorithm.Schedule(context.TODO(), &policy.Spec.Placement)
scheduleResult, err := s.Algorithm.Schedule(context.TODO(), &policy.Spec.Placement, &clusterResourceBinding.Spec.Resource)
if err != nil {
klog.V(2).Infof("failed scheduling ClusterResourceBinding %s: %v", clusterResourceBinding.Name, err)
return err