diff --git a/cmd/scheduler/app/scheduler.go b/cmd/scheduler/app/scheduler.go index 7ecb9c4ed..b60f311f4 100644 --- a/cmd/scheduler/app/scheduler.go +++ b/cmd/scheduler/app/scheduler.go @@ -95,7 +95,11 @@ func run(opts *options.Options, stopChan <-chan struct{}) error { cancel() }() - sched, err := scheduler.NewScheduler(dynamicClientSet, karmadaClient, kubeClientSet, opts) + sched, err := scheduler.NewScheduler(dynamicClientSet, karmadaClient, kubeClientSet, + scheduler.WithEnableSchedulerEstimator(opts.EnableSchedulerEstimator), + scheduler.WithSchedulerEstimatorPort(opts.SchedulerEstimatorPort), + scheduler.WithSchedulerEstimatorTimeout(opts.SchedulerEstimatorTimeout), + ) if err != nil { return fmt.Errorf("couldn't create scheduler: %w", err) } diff --git a/pkg/scheduler/scheduler.go b/pkg/scheduler/scheduler.go index 82a5099b1..cb1851cb0 100644 --- a/pkg/scheduler/scheduler.go +++ b/pkg/scheduler/scheduler.go @@ -21,7 +21,6 @@ import ( "k8s.io/client-go/util/workqueue" "k8s.io/klog/v2" - "github.com/karmada-io/karmada/cmd/scheduler/app/options" clusterv1alpha1 "github.com/karmada-io/karmada/pkg/apis/cluster/v1alpha1" policyv1alpha1 "github.com/karmada-io/karmada/pkg/apis/policy/v1alpha1" workv1alpha2 "github.com/karmada-io/karmada/pkg/apis/work/v1alpha2" @@ -86,8 +85,41 @@ type Scheduler struct { schedulerEstimatorWorker util.AsyncWorker } +type schedulerOptions struct { + // enableSchedulerEstimator represents whether the accurate scheduler estimator should be enabled. + enableSchedulerEstimator bool + // schedulerEstimatorTimeout specifies the timeout period of calling the accurate scheduler estimator service. + schedulerEstimatorTimeout metav1.Duration + // schedulerEstimatorPort is the port that the accurate scheduler estimator server serves at. + schedulerEstimatorPort int +} + +// Option configures a Scheduler +type Option func(*schedulerOptions) + +// WithEnableSchedulerEstimator sets the enableSchedulerEstimator for scheduler +func WithEnableSchedulerEstimator(enableSchedulerEstimator bool) Option { + return func(o *schedulerOptions) { + o.enableSchedulerEstimator = enableSchedulerEstimator + } +} + +// WithSchedulerEstimatorTimeout sets the schedulerEstimatorTimeout for scheduler +func WithSchedulerEstimatorTimeout(schedulerEstimatorTimeout metav1.Duration) Option { + return func(o *schedulerOptions) { + o.schedulerEstimatorTimeout = schedulerEstimatorTimeout + } +} + +// WithSchedulerEstimatorPort sets the schedulerEstimatorPort for scheduler +func WithSchedulerEstimatorPort(schedulerEstimatorPort int) Option { + return func(o *schedulerOptions) { + o.schedulerEstimatorPort = schedulerEstimatorPort + } +} + // NewScheduler instantiates a scheduler -func NewScheduler(dynamicClient dynamic.Interface, karmadaClient karmadaclientset.Interface, kubeClient kubernetes.Interface, opts *options.Options) (*Scheduler, error) { +func NewScheduler(dynamicClient dynamic.Interface, karmadaClient karmadaclientset.Interface, kubeClient kubernetes.Interface, opts ...Option) (*Scheduler, error) { factory := informerfactory.NewSharedInformerFactory(karmadaClient, 0) bindingLister := factory.Work().V1alpha2().ResourceBindings().Lister() policyLister := factory.Policy().V1alpha1().PropagationPolicies().Lister() @@ -97,6 +129,11 @@ func NewScheduler(dynamicClient dynamic.Interface, karmadaClient karmadaclientse queue := workqueue.NewRateLimitingQueue(workqueue.DefaultControllerRateLimiter()) schedulerCache := schedulercache.NewCache(clusterLister) + options := schedulerOptions{} + for _, opt := range opts { + opt(&options) + } + // TODO(kerthcet): make plugins configurable via config file registry := frameworkplugins.NewInTreeRegistry() algorithm, err := core.NewGenericScheduler(schedulerCache, registry) @@ -105,30 +142,31 @@ func NewScheduler(dynamicClient dynamic.Interface, karmadaClient karmadaclientse } sched := &Scheduler{ - DynamicClient: dynamicClient, - KarmadaClient: karmadaClient, - KubeClient: kubeClient, - bindingLister: bindingLister, - policyLister: policyLister, - clusterBindingLister: clusterBindingLister, - clusterPolicyLister: clusterPolicyLister, - clusterLister: clusterLister, - informerFactory: factory, - queue: queue, - Algorithm: algorithm, - schedulerCache: schedulerCache, - enableSchedulerEstimator: opts.EnableSchedulerEstimator, + DynamicClient: dynamicClient, + KarmadaClient: karmadaClient, + KubeClient: kubeClient, + bindingLister: bindingLister, + policyLister: policyLister, + clusterBindingLister: clusterBindingLister, + clusterPolicyLister: clusterPolicyLister, + clusterLister: clusterLister, + informerFactory: factory, + queue: queue, + Algorithm: algorithm, + schedulerCache: schedulerCache, } - if opts.EnableSchedulerEstimator { + + if options.enableSchedulerEstimator { + sched.enableSchedulerEstimator = options.enableSchedulerEstimator + sched.schedulerEstimatorPort = options.schedulerEstimatorPort sched.schedulerEstimatorCache = estimatorclient.NewSchedulerEstimatorCache() - sched.schedulerEstimatorPort = opts.SchedulerEstimatorPort schedulerEstimatorWorkerOptions := util.Options{ Name: "scheduler-estimator", KeyFunc: nil, ReconcileFunc: sched.reconcileEstimatorConnection, } sched.schedulerEstimatorWorker = util.NewAsyncWorker(schedulerEstimatorWorkerOptions) - schedulerEstimator := estimatorclient.NewSchedulerEstimator(sched.schedulerEstimatorCache, opts.SchedulerEstimatorTimeout.Duration) + schedulerEstimator := estimatorclient.NewSchedulerEstimator(sched.schedulerEstimatorCache, options.schedulerEstimatorTimeout.Duration) estimatorclient.RegisterSchedulerEstimator(schedulerEstimator) } diff --git a/pkg/scheduler/scheduler_test.go b/pkg/scheduler/scheduler_test.go new file mode 100644 index 000000000..35a09a6cc --- /dev/null +++ b/pkg/scheduler/scheduler_test.go @@ -0,0 +1,65 @@ +package scheduler + +import ( + "testing" + + "k8s.io/apimachinery/pkg/runtime" + dynamicfake "k8s.io/client-go/dynamic/fake" + "k8s.io/client-go/kubernetes/fake" + + karmadafake "github.com/karmada-io/karmada/pkg/generated/clientset/versioned/fake" +) + +func TestCreateScheduler(t *testing.T) { + dynamicClient := dynamicfake.NewSimpleDynamicClient(runtime.NewScheme()) + karmadaClient := karmadafake.NewSimpleClientset() + kubeClient := fake.NewSimpleClientset() + port := 10025 + + testcases := []struct { + name string + opts []Option + enableSchedulerEstimator bool + schedulerEstimatorPort int + }{ + { + name: "scheduler with default configuration", + opts: nil, + enableSchedulerEstimator: false, + }, + { + name: "scheduler with enableSchedulerEstimator enabled", + opts: []Option{ + WithEnableSchedulerEstimator(true), + WithSchedulerEstimatorPort(port), + }, + enableSchedulerEstimator: true, + schedulerEstimatorPort: port, + }, + { + name: "scheduler with enableSchedulerEstimator disabled, WithSchedulerEstimatorPort enabled", + opts: []Option{ + WithEnableSchedulerEstimator(false), + WithSchedulerEstimatorPort(port), + }, + enableSchedulerEstimator: false, + }, + } + + for _, tc := range testcases { + t.Run(tc.name, func(t *testing.T) { + sche, err := NewScheduler(dynamicClient, karmadaClient, kubeClient, tc.opts...) + if err != nil { + t.Errorf("create scheduler error: %s", err) + } + + if tc.enableSchedulerEstimator != sche.enableSchedulerEstimator { + t.Errorf("unexpected enableSchedulerEstimator want %v, got %v", tc.enableSchedulerEstimator, sche.enableSchedulerEstimator) + } + + if tc.schedulerEstimatorPort != sche.schedulerEstimatorPort { + t.Errorf("unexpected schedulerEstimatorPort want %v, got %v", tc.schedulerEstimatorPort, sche.schedulerEstimatorPort) + } + }) + } +}