Support multiple schedulers

Signed-off-by: Poor12 <shentiecheng@huawei.com>
This commit is contained in:
Poor12 2023-01-06 15:00:12 +08:00
parent 43c1a8c753
commit bec9ca1f6a
13 changed files with 106 additions and 11 deletions

View File

@ -16676,6 +16676,10 @@
"description": "Resource represents the Kubernetes resource to be propagated.",
"default": {},
"$ref": "#/definitions/com.github.karmada-io.karmada.pkg.apis.work.v1alpha2.ObjectReference"
},
"schedulerName": {
"description": "SchedulerName represents which scheduler to proceed the scheduling. It inherits directly from the associated PropagationPolicy(or ClusterPropagationPolicy).",
"type": "string"
}
}
},

View File

@ -531,6 +531,7 @@ spec:
minItems: 1
type: array
schedulerName:
default: default-scheduler
description: SchedulerName represents which scheduler to proceed the
scheduling. If specified, the policy will be dispatched by specified
scheduler. If not specified, the policy will be dispatched by default

View File

@ -527,6 +527,7 @@ spec:
minItems: 1
type: array
schedulerName:
default: default-scheduler
description: SchedulerName represents which scheduler to proceed the
scheduling. If specified, the policy will be dispatched by specified
scheduler. If not specified, the policy will be dispatched by default

View File

@ -564,6 +564,11 @@ spec:
- kind
- name
type: object
schedulerName:
description: SchedulerName represents which scheduler to proceed the
scheduling. It inherits directly from the associated PropagationPolicy(or
ClusterPropagationPolicy).
type: string
required:
- resource
type: object

View File

@ -564,6 +564,11 @@ spec:
- kind
- name
type: object
schedulerName:
description: SchedulerName represents which scheduler to proceed the
scheduling. It inherits directly from the associated PropagationPolicy(or
ClusterPropagationPolicy).
type: string
required:
- resource
type: object

View File

@ -11,6 +11,7 @@ import (
componentbaseconfig "k8s.io/component-base/config"
"github.com/karmada-io/karmada/pkg/features"
"github.com/karmada-io/karmada/pkg/scheduler"
frameworkplugins "github.com/karmada-io/karmada/pkg/scheduler/framework/plugins"
"github.com/karmada-io/karmada/pkg/sharedcli/profileflag"
"github.com/karmada-io/karmada/pkg/util"
@ -63,6 +64,10 @@ type Options struct {
// 'foo' means "enable 'foo'"
// '*,-foo' means "disable 'foo'"
Plugins []string
// SchedulerName represents the name of the scheduler.
// default is "default-scheduler".
SchedulerName string
}
// NewOptions builds an default scheduler options.
@ -87,6 +92,7 @@ func (o *Options) AddFlags(fs *pflag.FlagSet) {
}
fs.BoolVar(&o.LeaderElection.LeaderElect, "leader-elect", true, "Enable leader election, which must be true when running multi instances.")
fs.StringVar(&o.LeaderElection.ResourceName, "leader-elect-resource-name", "karmada-scheduler", "The name of resource object that is used for locking during leader election.")
fs.StringVar(&o.LeaderElection.ResourceNamespace, "leader-elect-resource-namespace", util.NamespaceKarmadaSystem, "The namespace of resource object that is used for locking during leader election.")
fs.StringVar(&o.KubeConfig, "kubeconfig", o.KubeConfig, "Path to karmada control plane kubeconfig file.")
fs.StringVar(&o.Master, "master", o.Master, "The address of the Kubernetes API server. Overrides any value in KubeConfig. Only required if out-of-cluster.")
@ -102,6 +108,7 @@ func (o *Options) AddFlags(fs *pflag.FlagSet) {
fs.BoolVar(&o.EnableEmptyWorkloadPropagation, "enable-empty-workload-propagation", false, "Enable workload with replicas 0 to be propagated to member clusters.")
fs.StringSliceVar(&o.Plugins, "plugins", []string{"*"},
fmt.Sprintf("A list of plugins to enable. '*' enables all build-in and customized plugins, 'foo' enables the plugin named 'foo', '*,-foo' disables the plugin named 'foo'.\nAll build-in plugins: %s.", strings.Join(frameworkplugins.NewInTreeRegistry().FactoryNames(), ",")))
fs.StringVar(&o.SchedulerName, "scheduler-name", scheduler.DefaultScheduler, "SchedulerName represents the name of the scheduler. default is 'default-scheduler'.")
features.FeatureGate.AddFlag(fs)
o.ProfileOpts.AddFlags(fs)
}

View File

@ -142,6 +142,7 @@ func run(opts *options.Options, stopChan <-chan struct{}, registryOptions ...Opt
scheduler.WithSchedulerEstimatorTimeout(opts.SchedulerEstimatorTimeout),
scheduler.WithEnableEmptyWorkloadPropagation(opts.EnableEmptyWorkloadPropagation),
scheduler.WithEnableSchedulerPlugin(opts.Plugins),
scheduler.WithSchedulerName(opts.SchedulerName),
)
if err != nil {
return fmt.Errorf("couldn't create scheduler: %w", err)

View File

@ -106,6 +106,7 @@ type PropagationSpec struct {
// SchedulerName represents which scheduler to proceed the scheduling.
// If specified, the policy will be dispatched by specified scheduler.
// If not specified, the policy will be dispatched by default scheduler.
// +kubebuilder:default="default-scheduler"
// +optional
SchedulerName string `json:"schedulerName,omitempty"`
}

View File

@ -91,6 +91,11 @@ type ResourceBindingSpec struct {
// RequiredBy represents the list of Bindings that depend on the referencing resource.
// +optional
RequiredBy []BindingSnapshot `json:"requiredBy,omitempty"`
// SchedulerName represents which scheduler to proceed the scheduling.
// It inherits directly from the associated PropagationPolicy(or ClusterPropagationPolicy).
// +optional
SchedulerName string `json:"schedulerName,omitempty"`
}
// ObjectReference contains enough information to locate the referenced object inside current cluster.

View File

@ -392,7 +392,7 @@ func (d *ResourceDetector) ApplyPolicy(object *unstructured.Unstructured, object
policyv1alpha1.PropagationPolicyNameLabel: policy.GetName(),
}
binding, err := d.BuildResourceBinding(object, objectKey, policyLabels, policy.Spec.PropagateDeps)
binding, err := d.BuildResourceBinding(object, objectKey, policyLabels, &policy.Spec)
if err != nil {
klog.Errorf("Failed to build resourceBinding for object: %s. error: %v", objectKey, err)
return err
@ -414,6 +414,7 @@ func (d *ResourceDetector) ApplyPolicy(object *unstructured.Unstructured, object
bindingCopy.Spec.ReplicaRequirements = binding.Spec.ReplicaRequirements
bindingCopy.Spec.Replicas = binding.Spec.Replicas
bindingCopy.Spec.PropagateDeps = binding.Spec.PropagateDeps
bindingCopy.Spec.SchedulerName = binding.Spec.SchedulerName
return nil
})
if err != nil {
@ -465,7 +466,7 @@ func (d *ResourceDetector) ApplyClusterPolicy(object *unstructured.Unstructured,
// For namespace-scoped resources, which namespace is not empty, building `ResourceBinding`.
// For cluster-scoped resources, which namespace is empty, building `ClusterResourceBinding`.
if object.GetNamespace() != "" {
binding, err := d.BuildResourceBinding(object, objectKey, policyLabels, policy.Spec.PropagateDeps)
binding, err := d.BuildResourceBinding(object, objectKey, policyLabels, &policy.Spec)
if err != nil {
klog.Errorf("Failed to build resourceBinding for object: %s. error: %v", objectKey, err)
return err
@ -486,6 +487,8 @@ func (d *ResourceDetector) ApplyClusterPolicy(object *unstructured.Unstructured,
bindingCopy.Spec.Resource = binding.Spec.Resource
bindingCopy.Spec.ReplicaRequirements = binding.Spec.ReplicaRequirements
bindingCopy.Spec.Replicas = binding.Spec.Replicas
bindingCopy.Spec.PropagateDeps = binding.Spec.PropagateDeps
bindingCopy.Spec.SchedulerName = binding.Spec.SchedulerName
return nil
})
if err != nil {
@ -507,7 +510,7 @@ func (d *ResourceDetector) ApplyClusterPolicy(object *unstructured.Unstructured,
klog.V(2).Infof("ResourceBinding(%s) is up to date.", binding.GetName())
}
} else {
binding, err := d.BuildClusterResourceBinding(object, objectKey, policyLabels, policy.Spec.PropagateDeps)
binding, err := d.BuildClusterResourceBinding(object, objectKey, policyLabels, &policy.Spec)
if err != nil {
klog.Errorf("Failed to build clusterResourceBinding for object: %s. error: %v", objectKey, err)
return err
@ -527,6 +530,7 @@ func (d *ResourceDetector) ApplyClusterPolicy(object *unstructured.Unstructured,
bindingCopy.Spec.Resource = binding.Spec.Resource
bindingCopy.Spec.ReplicaRequirements = binding.Spec.ReplicaRequirements
bindingCopy.Spec.Replicas = binding.Spec.Replicas
bindingCopy.Spec.SchedulerName = binding.Spec.SchedulerName
return nil
})
if err != nil {
@ -609,7 +613,7 @@ func (d *ResourceDetector) ClaimClusterPolicyForObject(object *unstructured.Unst
}
// BuildResourceBinding builds a desired ResourceBinding for object.
func (d *ResourceDetector) BuildResourceBinding(object *unstructured.Unstructured, objectKey keys.ClusterWideKey, labels map[string]string, propagateDeps bool) (*workv1alpha2.ResourceBinding, error) {
func (d *ResourceDetector) BuildResourceBinding(object *unstructured.Unstructured, objectKey keys.ClusterWideKey, labels map[string]string, policySpec *policyv1alpha1.PropagationSpec) (*workv1alpha2.ResourceBinding, error) {
bindingName := names.GenerateBindingName(object.GetKind(), object.GetName())
propagationBinding := &workv1alpha2.ResourceBinding{
ObjectMeta: metav1.ObjectMeta{
@ -622,7 +626,8 @@ func (d *ResourceDetector) BuildResourceBinding(object *unstructured.Unstructure
Finalizers: []string{util.BindingControllerFinalizer},
},
Spec: workv1alpha2.ResourceBindingSpec{
PropagateDeps: propagateDeps,
PropagateDeps: policySpec.PropagateDeps,
SchedulerName: policySpec.SchedulerName,
Resource: workv1alpha2.ObjectReference{
APIVersion: object.GetAPIVersion(),
Kind: object.GetKind(),
@ -648,7 +653,7 @@ func (d *ResourceDetector) BuildResourceBinding(object *unstructured.Unstructure
}
// BuildClusterResourceBinding builds a desired ClusterResourceBinding for object.
func (d *ResourceDetector) BuildClusterResourceBinding(object *unstructured.Unstructured, objectKey keys.ClusterWideKey, labels map[string]string, propagateDeps bool) (*workv1alpha2.ClusterResourceBinding, error) {
func (d *ResourceDetector) BuildClusterResourceBinding(object *unstructured.Unstructured, objectKey keys.ClusterWideKey, labels map[string]string, policySpec *policyv1alpha1.PropagationSpec) (*workv1alpha2.ClusterResourceBinding, error) {
bindingName := names.GenerateBindingName(object.GetKind(), object.GetName())
binding := &workv1alpha2.ClusterResourceBinding{
ObjectMeta: metav1.ObjectMeta{
@ -660,7 +665,8 @@ func (d *ResourceDetector) BuildClusterResourceBinding(object *unstructured.Unst
Finalizers: []string{util.ClusterResourceBindingControllerFinalizer},
},
Spec: workv1alpha2.ResourceBindingSpec{
PropagateDeps: propagateDeps,
PropagateDeps: policySpec.PropagateDeps,
SchedulerName: policySpec.SchedulerName,
Resource: workv1alpha2.ObjectReference{
APIVersion: object.GetAPIVersion(),
Kind: object.GetKind(),

View File

@ -5237,6 +5237,13 @@ func schema_pkg_apis_work_v1alpha2_ResourceBindingSpec(ref common.ReferenceCallb
},
},
},
"schedulerName": {
SchemaProps: spec.SchemaProps{
Description: "SchedulerName represents which scheduler to proceed the scheduling. It inherits directly from the associated PropagationPolicy(or ClusterPropagationPolicy).",
Type: []string{"string"},
Format: "",
},
},
},
Required: []string{"resource"},
},

View File

@ -35,8 +35,11 @@ func (s *Scheduler) addAllEventHandlers() {
}
policyInformer := s.informerFactory.Policy().V1alpha1().PropagationPolicies().Informer()
_, err = policyInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{
UpdateFunc: s.onPropagationPolicyUpdate,
_, err = policyInformer.AddEventHandler(cache.FilteringResourceEventHandler{
FilterFunc: s.policyEventFilter,
Handler: cache.ResourceEventHandlerFuncs{
UpdateFunc: s.onPropagationPolicyUpdate,
},
})
if err != nil {
klog.Errorf("Failed to add handlers for PropagationPolicies: %v", err)
@ -55,8 +58,11 @@ func (s *Scheduler) addAllEventHandlers() {
}
clusterPolicyInformer := s.informerFactory.Policy().V1alpha1().ClusterPropagationPolicies().Informer()
_, err = clusterPolicyInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{
UpdateFunc: s.onClusterPropagationPolicyUpdate,
_, err = clusterPolicyInformer.AddEventHandler(cache.FilteringResourceEventHandler{
FilterFunc: s.policyEventFilter,
Handler: cache.ResourceEventHandlerFuncs{
UpdateFunc: s.onClusterPropagationPolicyUpdate,
},
})
if err != nil {
klog.Errorf("Failed to add handlers for ClusterPropagationPolicies: %v", err)
@ -93,10 +99,32 @@ func (s *Scheduler) resourceBindingEventFilter(obj interface{}) bool {
return false
}
switch t := obj.(type) {
case *workv1alpha2.ResourceBinding:
if !schedulerNameFilter(s.schedulerName, t.Spec.SchedulerName) {
return false
}
case *workv1alpha2.ClusterResourceBinding:
if !schedulerNameFilter(s.schedulerName, t.Spec.SchedulerName) {
return false
}
}
return util.GetLabelValue(accessor.GetLabels(), policyv1alpha1.PropagationPolicyNameLabel) != "" ||
util.GetLabelValue(accessor.GetLabels(), policyv1alpha1.ClusterPropagationPolicyLabel) != ""
}
func (s *Scheduler) policyEventFilter(obj interface{}) bool {
switch t := obj.(type) {
case *policyv1alpha1.PropagationPolicy:
return schedulerNameFilter(s.schedulerName, t.Spec.SchedulerName)
case *policyv1alpha1.ClusterPropagationPolicy:
return schedulerNameFilter(s.schedulerName, t.Spec.SchedulerName)
}
return true
}
func (s *Scheduler) onResourceBindingAdd(obj interface{}) {
key, err := cache.MetaNamespaceKeyFunc(obj)
if err != nil {
@ -328,3 +356,11 @@ func (s *Scheduler) deleteCluster(obj interface{}) {
s.schedulerEstimatorWorker.Add(cluster.Name)
}
}
func schedulerNameFilter(schedulerNameFromOptions, schedulerName string) bool {
if schedulerName == "" {
schedulerName = DefaultScheduler
}
return schedulerNameFromOptions == schedulerName
}

View File

@ -61,6 +61,11 @@ const (
scheduleSuccessMessage = "Binding has been scheduled"
)
const (
// DefaultScheduler defines the name of default scheduler.
DefaultScheduler = "default-scheduler"
)
// Scheduler is the scheduler schema, which is used to schedule a specific resource to specific clusters
type Scheduler struct {
DynamicClient dynamic.Interface
@ -87,6 +92,7 @@ type Scheduler struct {
schedulerEstimatorServicePrefix string
schedulerEstimatorPort int
schedulerEstimatorWorker util.AsyncWorker
schedulerName string
enableEmptyWorkloadPropagation bool
}
@ -102,6 +108,8 @@ type schedulerOptions struct {
schedulerEstimatorServicePrefix string
// schedulerEstimatorPort is the port that the accurate scheduler estimator server serves at.
schedulerEstimatorPort int
// schedulerName is the name of the scheduler. Default is "default-scheduler".
schedulerName string
//enableEmptyWorkloadPropagation represents whether allow workload with replicas 0 propagated to member clusters should be enabled
enableEmptyWorkloadPropagation bool
// outOfTreeRegistry represents the registry of out-of-tree plugins
@ -148,6 +156,13 @@ func WithSchedulerEstimatorPort(schedulerEstimatorPort int) Option {
}
}
// WithSchedulerName sets the schedulerName for scheduler
func WithSchedulerName(schedulerName string) Option {
return func(o *schedulerOptions) {
o.schedulerName = schedulerName
}
}
// WithEnableEmptyWorkloadPropagation sets the enablePropagateEmptyWorkLoad for scheduler
func WithEnableEmptyWorkloadPropagation(enableEmptyWorkloadPropagation bool) Option {
return func(o *schedulerOptions) {
@ -227,6 +242,7 @@ func NewScheduler(dynamicClient dynamic.Interface, karmadaClient karmadaclientse
estimatorclient.RegisterSchedulerEstimator(schedulerEstimator)
}
sched.enableEmptyWorkloadPropagation = options.enableEmptyWorkloadPropagation
sched.schedulerName = options.schedulerName
sched.addAllEventHandlers()
return sched, nil