Merge pull request #1382 from kerthcet/feature/add-outoftree-plugins

feat: introduce registry to initialize framework
This commit is contained in:
karmada-bot 2022-04-15 15:53:57 +08:00 committed by GitHub
commit 9a99433b65
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
11 changed files with 103 additions and 51 deletions

View File

@ -95,7 +95,11 @@ func run(opts *options.Options, stopChan <-chan struct{}) error {
cancel() cancel()
}() }()
sched := scheduler.NewScheduler(dynamicClientSet, karmadaClient, kubeClientSet, opts) sched, err := scheduler.NewScheduler(dynamicClientSet, karmadaClient, kubeClientSet, opts)
if err != nil {
return fmt.Errorf("couldn't create scheduler: %w", err)
}
if !opts.LeaderElection.LeaderElect { if !opts.LeaderElection.LeaderElect {
sched.Run(ctx) sched.Run(ctx)
return fmt.Errorf("scheduler exited") return fmt.Errorf("scheduler exited")

View File

@ -35,12 +35,16 @@ type genericScheduler struct {
// NewGenericScheduler creates a genericScheduler object. // NewGenericScheduler creates a genericScheduler object.
func NewGenericScheduler( func NewGenericScheduler(
schedCache cache.Cache, schedCache cache.Cache,
plugins []string, registry runtime.Registry,
) ScheduleAlgorithm { ) (ScheduleAlgorithm, error) {
f, err := runtime.NewFramework(registry)
if err != nil {
return nil, err
}
return &genericScheduler{ return &genericScheduler{
schedulerCache: schedCache, schedulerCache: schedCache,
scheduleFramework: runtime.NewFramework(plugins), scheduleFramework: f,
} }, nil
} }
func (g *genericScheduler) Schedule(ctx context.Context, placement *policyv1alpha1.Placement, spec *workv1alpha2.ResourceBindingSpec) (result ScheduleResult, err error) { func (g *genericScheduler) Schedule(ctx context.Context, placement *policyv1alpha1.Placement, spec *workv1alpha2.ResourceBindingSpec) (result ScheduleResult, err error) {

View File

@ -23,8 +23,8 @@ type APIInstalled struct{}
var _ framework.FilterPlugin = &APIInstalled{} var _ framework.FilterPlugin = &APIInstalled{}
// New instantiates the APIInstalled plugin. // New instantiates the APIInstalled plugin.
func New() framework.Plugin { func New() (framework.Plugin, error) {
return &APIInstalled{} return &APIInstalled{}, nil
} }
// Name returns the plugin name. // Name returns the plugin name.

View File

@ -22,8 +22,8 @@ var _ framework.FilterPlugin = &ClusterAffinity{}
var _ framework.ScorePlugin = &ClusterAffinity{} var _ framework.ScorePlugin = &ClusterAffinity{}
// New instantiates the clusteraffinity plugin. // New instantiates the clusteraffinity plugin.
func New() framework.Plugin { func New() (framework.Plugin, error) {
return &ClusterAffinity{} return &ClusterAffinity{}, nil
} }
// Name returns the plugin name. // Name returns the plugin name.

View File

@ -21,8 +21,8 @@ type ClusterLocality struct{}
var _ framework.ScorePlugin = &ClusterLocality{} var _ framework.ScorePlugin = &ClusterLocality{}
// New instantiates the clusteraffinity plugin. // New instantiates the clusteraffinity plugin.
func New() framework.Plugin { func New() (framework.Plugin, error) {
return &ClusterLocality{} return &ClusterLocality{}, nil
} }
// Name returns the plugin name. // Name returns the plugin name.

View File

@ -1,21 +1,21 @@
package plugins package plugins
import ( import (
"github.com/karmada-io/karmada/pkg/scheduler/framework"
"github.com/karmada-io/karmada/pkg/scheduler/framework/plugins/apiinstalled" "github.com/karmada-io/karmada/pkg/scheduler/framework/plugins/apiinstalled"
"github.com/karmada-io/karmada/pkg/scheduler/framework/plugins/clusteraffinity" "github.com/karmada-io/karmada/pkg/scheduler/framework/plugins/clusteraffinity"
"github.com/karmada-io/karmada/pkg/scheduler/framework/plugins/clusterlocality" "github.com/karmada-io/karmada/pkg/scheduler/framework/plugins/clusterlocality"
"github.com/karmada-io/karmada/pkg/scheduler/framework/plugins/spreadconstraint" "github.com/karmada-io/karmada/pkg/scheduler/framework/plugins/spreadconstraint"
"github.com/karmada-io/karmada/pkg/scheduler/framework/plugins/tainttoleration" "github.com/karmada-io/karmada/pkg/scheduler/framework/plugins/tainttoleration"
"github.com/karmada-io/karmada/pkg/scheduler/framework/runtime"
) )
// NewPlugins builds all the scheduling plugins. // NewInTreeRegistry builds the registry with all the in-tree plugins.
func NewPlugins() map[string]framework.Plugin { func NewInTreeRegistry() runtime.Registry {
return map[string]framework.Plugin{ return runtime.Registry{
clusteraffinity.Name: clusteraffinity.New(), apiinstalled.Name: apiinstalled.New,
tainttoleration.Name: tainttoleration.New(), tainttoleration.Name: tainttoleration.New,
apiinstalled.Name: apiinstalled.New(), clusteraffinity.Name: clusteraffinity.New,
clusterlocality.Name: clusterlocality.New(), spreadconstraint.Name: spreadconstraint.New,
spreadconstraint.Name: spreadconstraint.New(), clusterlocality.Name: clusterlocality.New,
} }
} }

View File

@ -20,8 +20,8 @@ type SpreadConstraint struct{}
var _ framework.FilterPlugin = &SpreadConstraint{} var _ framework.FilterPlugin = &SpreadConstraint{}
// New instantiates the spreadconstraint plugin. // New instantiates the spreadconstraint plugin.
func New() framework.Plugin { func New() (framework.Plugin, error) {
return &SpreadConstraint{} return &SpreadConstraint{}, nil
} }
// Name returns the plugin name. // Name returns the plugin name.

View File

@ -24,8 +24,8 @@ type TaintToleration struct{}
var _ framework.FilterPlugin = &TaintToleration{} var _ framework.FilterPlugin = &TaintToleration{}
// New instantiates the TaintToleration plugin. // New instantiates the TaintToleration plugin.
func New() framework.Plugin { func New() (framework.Plugin, error) {
return &TaintToleration{} return &TaintToleration{}, nil
} }
// Name returns the plugin name. // Name returns the plugin name.

View File

@ -5,13 +5,10 @@ import (
"fmt" "fmt"
"reflect" "reflect"
"k8s.io/klog/v2"
clusterv1alpha1 "github.com/karmada-io/karmada/pkg/apis/cluster/v1alpha1" clusterv1alpha1 "github.com/karmada-io/karmada/pkg/apis/cluster/v1alpha1"
policyv1alpha1 "github.com/karmada-io/karmada/pkg/apis/policy/v1alpha1" policyv1alpha1 "github.com/karmada-io/karmada/pkg/apis/policy/v1alpha1"
workv1alpha2 "github.com/karmada-io/karmada/pkg/apis/work/v1alpha2" workv1alpha2 "github.com/karmada-io/karmada/pkg/apis/work/v1alpha2"
"github.com/karmada-io/karmada/pkg/scheduler/framework" "github.com/karmada-io/karmada/pkg/scheduler/framework"
plugins2 "github.com/karmada-io/karmada/pkg/scheduler/framework/plugins"
) )
// frameworkImpl implements the Framework interface and is responsible for initializing and running scheduler // frameworkImpl implements the Framework interface and is responsible for initializing and running scheduler
@ -24,27 +21,25 @@ type frameworkImpl struct {
var _ framework.Framework = &frameworkImpl{} var _ framework.Framework = &frameworkImpl{}
// NewFramework creates a scheduling framework. // NewFramework creates a scheduling framework by registry.
func NewFramework(plugins []string) framework.Framework { func NewFramework(r Registry) (framework.Framework, error) {
pluginsMap := plugins2.NewPlugins() f := &frameworkImpl{}
out := &frameworkImpl{} filterPluginsList := reflect.ValueOf(&f.filterPlugins).Elem()
filterPluginsList := reflect.ValueOf(&out.filterPlugins).Elem() scorePluginsList := reflect.ValueOf(&f.scorePlugins).Elem()
scorePluginsList := reflect.ValueOf(&out.scorePlugins).Elem()
filterType := filterPluginsList.Type().Elem() filterType := filterPluginsList.Type().Elem()
scoreType := scorePluginsList.Type().Elem() scoreType := scorePluginsList.Type().Elem()
for _, p := range plugins { for name, factory := range r {
plugin := pluginsMap[p] p, err := factory()
if plugin == nil { if err != nil {
klog.Warningf("scheduling plugin %s not exists", p) return nil, fmt.Errorf("failed to initialize plugin %q: %w", name, err)
continue
} }
addPluginToList(plugin, filterType, &filterPluginsList)
addPluginToList(plugin, scoreType, &scorePluginsList) addPluginToList(p, filterType, &filterPluginsList)
addPluginToList(p, scoreType, &scorePluginsList)
} }
return out return f, nil
} }
// RunFilterPlugins runs the set of configured Filter plugins for resources on the cluster. // RunFilterPlugins runs the set of configured Filter plugins for resources on the cluster.

View File

@ -0,0 +1,45 @@
package runtime
import (
"fmt"
"github.com/karmada-io/karmada/pkg/scheduler/framework"
)
// PluginFactory is a function that builds a plugin.
type PluginFactory = func() (framework.Plugin, error)
// Registry is a collection of all available plugins. The framework uses a
// registry to enable and initialize configured plugins.
// All plugins must be in the registry before initializing the framework.
type Registry map[string]PluginFactory
// Register adds a new plugin to the registry. If a plugin with the same name
// exists, it returns an error.
func (r Registry) Register(name string, factory PluginFactory) error {
if _, ok := r[name]; ok {
return fmt.Errorf("a plugin named %v already exists", name)
}
r[name] = factory
return nil
}
// Unregister removes an existing plugin from the registry. If no plugin with
// the provided name exists, it returns an error.
func (r Registry) Unregister(name string) error {
if _, ok := r[name]; !ok {
return fmt.Errorf("no plugin named %v exists", name)
}
delete(r, name)
return nil
}
// Merge merges the provided registry to the current one.
func (r Registry) Merge(in Registry) error {
for name, factory := range in {
if err := r.Register(name, factory); err != nil {
return err
}
}
return nil
}

View File

@ -34,11 +34,7 @@ import (
worklister "github.com/karmada-io/karmada/pkg/generated/listers/work/v1alpha2" worklister "github.com/karmada-io/karmada/pkg/generated/listers/work/v1alpha2"
schedulercache "github.com/karmada-io/karmada/pkg/scheduler/cache" schedulercache "github.com/karmada-io/karmada/pkg/scheduler/cache"
"github.com/karmada-io/karmada/pkg/scheduler/core" "github.com/karmada-io/karmada/pkg/scheduler/core"
"github.com/karmada-io/karmada/pkg/scheduler/framework/plugins/apiinstalled" frameworkplugins "github.com/karmada-io/karmada/pkg/scheduler/framework/plugins"
"github.com/karmada-io/karmada/pkg/scheduler/framework/plugins/clusteraffinity"
"github.com/karmada-io/karmada/pkg/scheduler/framework/plugins/clusterlocality"
"github.com/karmada-io/karmada/pkg/scheduler/framework/plugins/spreadconstraint"
"github.com/karmada-io/karmada/pkg/scheduler/framework/plugins/tainttoleration"
"github.com/karmada-io/karmada/pkg/scheduler/metrics" "github.com/karmada-io/karmada/pkg/scheduler/metrics"
"github.com/karmada-io/karmada/pkg/util" "github.com/karmada-io/karmada/pkg/util"
utilmetrics "github.com/karmada-io/karmada/pkg/util/metrics" utilmetrics "github.com/karmada-io/karmada/pkg/util/metrics"
@ -91,7 +87,7 @@ type Scheduler struct {
} }
// NewScheduler instantiates a scheduler // NewScheduler instantiates a scheduler
func NewScheduler(dynamicClient dynamic.Interface, karmadaClient karmadaclientset.Interface, kubeClient kubernetes.Interface, opts *options.Options) *Scheduler { func NewScheduler(dynamicClient dynamic.Interface, karmadaClient karmadaclientset.Interface, kubeClient kubernetes.Interface, opts *options.Options) (*Scheduler, error) {
factory := informerfactory.NewSharedInformerFactory(karmadaClient, 0) factory := informerfactory.NewSharedInformerFactory(karmadaClient, 0)
bindingLister := factory.Work().V1alpha2().ResourceBindings().Lister() bindingLister := factory.Work().V1alpha2().ResourceBindings().Lister()
policyLister := factory.Policy().V1alpha1().PropagationPolicies().Lister() policyLister := factory.Policy().V1alpha1().PropagationPolicies().Lister()
@ -100,8 +96,16 @@ func NewScheduler(dynamicClient dynamic.Interface, karmadaClient karmadaclientse
clusterLister := factory.Cluster().V1alpha1().Clusters().Lister() clusterLister := factory.Cluster().V1alpha1().Clusters().Lister()
queue := workqueue.NewRateLimitingQueue(workqueue.DefaultControllerRateLimiter()) queue := workqueue.NewRateLimitingQueue(workqueue.DefaultControllerRateLimiter())
schedulerCache := schedulercache.NewCache(clusterLister) schedulerCache := schedulercache.NewCache(clusterLister)
// TODO: make plugins as a flag
algorithm := core.NewGenericScheduler(schedulerCache, []string{clusteraffinity.Name, tainttoleration.Name, apiinstalled.Name, clusterlocality.Name, spreadconstraint.Name}) // algorithm := core.NewGenericScheduler(schedulerCache, []string{clusteraffinity.Name, tainttoleration.Name, apiinstalled.Name, clusterlocality.Name, spreadconstraint.Name})
// TODO(kerthcet): make plugins configurable via config file
registry := frameworkplugins.NewInTreeRegistry()
algorithm, err := core.NewGenericScheduler(schedulerCache, registry)
if err != nil {
return nil, err
}
sched := &Scheduler{ sched := &Scheduler{
DynamicClient: dynamicClient, DynamicClient: dynamicClient,
KarmadaClient: karmadaClient, KarmadaClient: karmadaClient,
@ -131,7 +135,7 @@ func NewScheduler(dynamicClient dynamic.Interface, karmadaClient karmadaclientse
} }
sched.addAllEventHandlers() sched.addAllEventHandlers()
return sched return sched, nil
} }
// Run runs the scheduler // Run runs the scheduler