From c3458310d4017d444a9297af1a409a1e3539277f Mon Sep 17 00:00:00 2001 From: Yao Weng Date: Fri, 19 Jan 2024 09:14:41 -0500 Subject: [PATCH] [MVP] add resourcequota plugin in scheduler-estimator: create framework for scheulder-estimator (#4534) * [MVP] add resourcequota plugin in scheudler-estimator Signed-off-by: yweng14 * framework only Signed-off-by: yweng14 * fix lint error Signed-off-by: yweng14 * address comments Signed-off-by: yweng14 * add KubeClient in the Handle Signed-off-by: yweng14 * - add snapshot as input argument of RunEstimateReplicasPlugins - add Result to give clearer message Signed-off-by: yweng14 * fix unitest name due Signed-off-by: yweng14 --------- Signed-off-by: yweng14 --- charts/karmada/README.md | 9 +- charts/karmada/templates/_helpers.tpl | 17 ++ .../karmada-scheduler-estimator.yaml | 3 + charts/karmada/values.yaml | 3 + .../app/options/options.go | 3 + .../app/scheduler-estimator.go | 9 +- hack/verify-license.sh | 1 + pkg/estimator/server/estimate.go | 10 +- pkg/estimator/server/framework/interface.go | 196 ++++++++++++++ .../server/framework/plugins/registry.go | 27 ++ .../server/framework/runtime/framework.go | 146 ++++++++++ .../framework/runtime/framework_test.go | 254 ++++++++++++++++++ .../server/framework/runtime/registry.go | 61 +++++ .../server/framework/runtime/registry_test.go | 224 +++++++++++++++ pkg/estimator/server/metrics/metrics.go | 25 ++ pkg/estimator/server/server.go | 37 ++- pkg/estimator/server/server_test.go | 4 +- 17 files changed, 1008 insertions(+), 21 deletions(-) create mode 100644 pkg/estimator/server/framework/interface.go create mode 100644 pkg/estimator/server/framework/plugins/registry.go create mode 100644 pkg/estimator/server/framework/runtime/framework.go create mode 100644 pkg/estimator/server/framework/runtime/framework_test.go create mode 100644 pkg/estimator/server/framework/runtime/registry.go create mode 100644 pkg/estimator/server/framework/runtime/registry_test.go diff --git a/charts/karmada/README.md b/charts/karmada/README.md index b5c2865ae..ec584e79d 100644 --- a/charts/karmada/README.md +++ b/charts/karmada/README.md @@ -200,7 +200,7 @@ helm install karmada-scheduler-estimator -n karmada-system ./charts/karmada | Name | Description | Value | -| ---------------------------------------- |-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| -------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- | +|------------------------------------------|-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| | `installMode` | InstallMode "host", "agent" and "component" are provided, "host" means install karmada in the control-cluster, "agent" means install agent client in the member cluster, "component" means install selected components in the control-cluster | `"host"` | | `clusterDomain` | Default cluster domain for karmada | `"cluster.local"` | | `components` | Selected components list, selectable values: "schedulerEstimator" | `[]` | @@ -300,13 +300,13 @@ helm install karmada-scheduler-estimator -n karmada-system ./charts/karmada | `controllerManager.controllers` | Controllers of the karmada-controller-manager | `""` | | `controllerManager.extraCommandArgs` | extra command args of the karmada-controller-manager | `{}` | | `apiServer.labels` | Labels of the karmada-apiserver deployment | `{"app": "karmada-apiserver"}` | -| `apiServer.serviceAnnotations` | Annotations of the karmada-apiserver service | `{}` | +| `apiServer.serviceAnnotations` | Annotations of the karmada-apiserver service | `{}` | | `apiServer.replicaCount` | Target replicas of the karmada-apiserver | `1` | | `apiServer.podLabels` | Labels of the karmada-apiserver pods | `{}` | | `apiServer.podAnnotations` | Annotations of the karmada-apiserver pods | `{}` | | `apiServer.imagePullSecrets` | Image pull secret of the karmada-apiserver | `[]` | | `apiServer.image.repository` | Image of the karmada-apiserver | `"registry.k8s.io/kube-apiserver"` | -| `apiServer.image.tag` | Image tag of the karmada-apiserver | `"v1.26.12"` | +| `apiServer.image.tag` | Image tag of the karmada-apiserver | `"v1.26.12"` | | `apiServer.image.pullPolicy` | Image pull policy of the karmada-apiserver | `"IfNotPresent"` | | `apiServer.resources` | Resource quota of the karmada-apiserver | `{}` | | `apiServer.hostNetwork` | Deploy karmada-apiserver with hostNetwork. If there are multiple karmadas in one cluster, you'd better set it to "false" | `"false"` | @@ -335,7 +335,7 @@ helm install karmada-scheduler-estimator -n karmada-system ./charts/karmada | `kubeControllerManager.podAnnotations` | Annotations of the kube-controller-manager pods | `{}` | | `kubeControllerManager.imagePullSecrets` | Image pull secret of the kube-controller-manager | `[]` | | `kubeControllerManager.image.repository` | Image of the kube-controller-manager | `"registry.k8s.io/kube-controller-manager"` | -| `kubeControllerManager.image.tag` | Image tag of the kube-controller-manager | `"v1.26.12"` | +| `kubeControllerManager.image.tag` | Image tag of the kube-controller-manager | `"v1.26.12"` | | `kubeControllerManager.image.pullPolicy` | Image pull policy of the kube-controller-manager | `"IfNotPresent"` | | `kubeControllerManager.resources` | Resource quota of the kube-controller-manager | `{}` | | `kubeControllerManager.nodeSelector` | Node selector of the kube-controller-manager | `{}` | @@ -359,6 +359,7 @@ helm install karmada-scheduler-estimator -n karmada-system ./charts/karmada | `schedulerEstimator.nodeSelector` | Node selector of the scheduler-estimator | `{}` | | `schedulerEstimator.affinity` | Affinity of the scheduler-estimator | `{}` | | `schedulerEstimator.tolerations` | Tolerations of the scheduler-estimator | `[]` | +| `schedulerEstimator.featureGates` | FeatureGates of the scheduler-estimator | `{"FeatureGateName": "false"}` | | `search.strategy` | Strategy of the scheduler-estimator | `{"type": "RollingUpdate", "rollingUpdate": {"maxUnavailable": "0", "maxSurge": "50%"} }` | | `descheduler.labels` | Labels of the descheduler deployment | `karmada-descheduler` | | `descheduler.replicaCount` | Target replicas of the descheduler | `2` | diff --git a/charts/karmada/templates/_helpers.tpl b/charts/karmada/templates/_helpers.tpl index bb46fd164..8e237e7bd 100644 --- a/charts/karmada/templates/_helpers.tpl +++ b/charts/karmada/templates/_helpers.tpl @@ -543,6 +543,23 @@ Return the proper karmada kubectl image name {{- end -}} {{- end -}} +{{- define "karmada.schedulerEstimator.featureGates" -}} + {{- $featureGatesArg := index . "featureGatesArg" -}} + {{- if (not (empty $featureGatesArg)) }} + {{- $featureGatesFlag := "" -}} + {{- range $key, $value := $featureGatesArg -}} + {{- if not (empty (toString $value)) }} + {{- $featureGatesFlag = cat $featureGatesFlag $key "=" $value "," -}} + {{- end -}} + {{- end -}} + + {{- if gt (len $featureGatesFlag) 0 }} + {{- $featureGatesFlag := trimSuffix "," $featureGatesFlag | nospace -}} + {{- printf "%s=%s" "--feature-gates" $featureGatesFlag -}} + {{- end -}} + {{- end -}} +{{- end -}} + {{- define "karmada.controllerManager.extraCommandArgs" -}} {{- if .Values.controllerManager.extraCommandArgs }} {{- range $key, $value := .Values.controllerManager.extraCommandArgs }} diff --git a/charts/karmada/templates/karmada-scheduler-estimator.yaml b/charts/karmada/templates/karmada-scheduler-estimator.yaml index 4e9dd3117..33bdd7846 100644 --- a/charts/karmada/templates/karmada-scheduler-estimator.yaml +++ b/charts/karmada/templates/karmada-scheduler-estimator.yaml @@ -48,6 +48,9 @@ spec: - /bin/karmada-scheduler-estimator - --kubeconfig=/etc/{{ $clusterName }}-kubeconfig - --cluster-name={{ $clusterName }} + {{- with (include "karmada.schedulerEstimator.featureGates" (dict "featureGatesArg" $.Values.schedulerEstimator.featureGates)) }} + - {{ . }} + {{- end}} livenessProbe: httpGet: path: /healthz diff --git a/charts/karmada/values.yaml b/charts/karmada/values.yaml index b0ef6182d..d72927eae 100644 --- a/charts/karmada/values.yaml +++ b/charts/karmada/values.yaml @@ -828,6 +828,9 @@ schedulerEstimator: maxSurge: 50% ## @param apiServer.podDisruptionBudget podDisruptionBudget: *podDisruptionBudget + ## @param featureGate to schedulerEstimator + # FooPluginName: true + featureGates: {} ## descheduler config descheduler: diff --git a/cmd/scheduler-estimator/app/options/options.go b/cmd/scheduler-estimator/app/options/options.go index 454e344d7..3835f0a83 100644 --- a/cmd/scheduler-estimator/app/options/options.go +++ b/cmd/scheduler-estimator/app/options/options.go @@ -19,6 +19,7 @@ package options import ( "github.com/spf13/pflag" + "github.com/karmada-io/karmada/pkg/features" "github.com/karmada-io/karmada/pkg/sharedcli/profileflag" ) @@ -67,5 +68,7 @@ func (o *Options) AddFlags(fs *pflag.FlagSet) { fs.Float32Var(&o.ClusterAPIQPS, "kube-api-qps", 20.0, "QPS to use while talking with apiserver. Doesn't cover events and node heartbeat apis which rate limiting is controlled by a different set of flags.") fs.IntVar(&o.ClusterAPIBurst, "kube-api-burst", 30, "Burst to use while talking with apiserver. Doesn't cover events and node heartbeat apis which rate limiting is controlled by a different set of flags.") fs.IntVar(&o.Parallelism, "parallelism", o.Parallelism, "Parallelism defines the amount of parallelism in algorithms for estimating. Must be greater than 0. Defaults to 16.") + features.FeatureGate.AddFlag(fs) + o.ProfileOpts.AddFlags(fs) } diff --git a/cmd/scheduler-estimator/app/scheduler-estimator.go b/cmd/scheduler-estimator/app/scheduler-estimator.go index 56aee8cea..7faf36d2d 100644 --- a/cmd/scheduler-estimator/app/scheduler-estimator.go +++ b/cmd/scheduler-estimator/app/scheduler-estimator.go @@ -128,9 +128,14 @@ func run(ctx context.Context, opts *options.Options) error { dynamicClient := dynamic.NewForConfigOrDie(restConfig) discoveryClient := discovery.NewDiscoveryClientForConfigOrDie(restConfig) - e := server.NewEstimatorServer(kubeClient, dynamicClient, discoveryClient, opts, ctx.Done()) + e, err := server.NewEstimatorServer(kubeClient, dynamicClient, discoveryClient, opts, ctx.Done()) + if err != nil { + klog.Errorf("Fail to create estimator server: %v", err) + return err + } + if err = e.Start(ctx); err != nil { - klog.Errorf("estimator server exits unexpectedly: %v", err) + klog.Errorf("Estimator server exits unexpectedly: %v", err) return err } diff --git a/hack/verify-license.sh b/hack/verify-license.sh index 97636c065..6f74c8113 100755 --- a/hack/verify-license.sh +++ b/hack/verify-license.sh @@ -39,6 +39,7 @@ missing_license_header_files="$($ADDLICENSE_BIN \ -ignore "**/*.yaml" \ -ignore "**/*.yml" \ -ignore "**/*.json" \ + -ignore ".idea/**" \ .)" || true if [[ "$missing_license_header_files" ]]; then diff --git a/pkg/estimator/server/estimate.go b/pkg/estimator/server/estimate.go index 8393e22e9..5bee0c746 100644 --- a/pkg/estimator/server/estimate.go +++ b/pkg/estimator/server/estimate.go @@ -18,6 +18,7 @@ package server import ( "context" + "fmt" "sync/atomic" "time" @@ -73,8 +74,11 @@ func (es *AccurateSchedulerEstimatorServer) estimateReplicas( tolerations = requirements.NodeClaim.Tolerations } - // TODO(Garrybest): design a framework and make filter and score plugins var res int32 + replicas, ret := es.estimateFramework.RunEstimateReplicasPlugins(ctx, snapshot, &requirements) + if !ret.IsSuccess() && !ret.IsNoOperation() { + return replicas, fmt.Errorf(fmt.Sprintf("estimate replice plugins fails with %s", ret.Reasons())) + } processNode := func(i int) { node := allNodes[i] if !nodeutil.IsNodeAffinityMatched(node.Node(), affinity) || !nodeutil.IsTolerationMatched(node.Node(), tolerations) { @@ -84,6 +88,10 @@ func (es *AccurateSchedulerEstimatorServer) estimateReplicas( atomic.AddInt32(&res, maxReplica) } es.parallelizer.Until(ctx, len(allNodes), processNode) + + if ret.IsSuccess() && replicas < res { + res = replicas + } return res, nil } diff --git a/pkg/estimator/server/framework/interface.go b/pkg/estimator/server/framework/interface.go new file mode 100644 index 000000000..6f0f4d446 --- /dev/null +++ b/pkg/estimator/server/framework/interface.go @@ -0,0 +1,196 @@ +/* +Copyright 2024 The Karmada Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package framework + +import ( + "context" + "errors" + "strings" + + "k8s.io/client-go/informers" + clientset "k8s.io/client-go/kubernetes" + + "github.com/karmada-io/karmada/pkg/estimator/pb" + schedcache "github.com/karmada-io/karmada/pkg/util/lifted/scheduler/cache" +) + +// Framework manages the set of plugins in use by the estimator. +type Framework interface { + Handle + // RunEstimateReplicasPlugins runs the set of configured EstimateReplicasPlugins + // for estimating replicas based on the given replicaRequirements. + // It returns an integer and an Result. + // The integer represents the minimum calculated value of estimated replicas from each EstimateReplicasPlugin. + // The Result contains code, reasons and error + // it is merged from all plugins returned result codes + RunEstimateReplicasPlugins(ctx context.Context, snapshot *schedcache.Snapshot, replicaRequirements *pb.ReplicaRequirements) (int32, *Result) + // TODO(wengyao04): we can add filter and score plugin extension points if needed in the future +} + +// Plugin is the parent type for all the scheduling framework plugins. +type Plugin interface { + Name() string +} + +// EstimateReplicasPlugin is an interface for replica estimation plugins. +// These estimators are used to estimate the replicas for a given pb.ReplicaRequirements +type EstimateReplicasPlugin interface { + Plugin + // Estimate is called for each MaxAvailableReplicas request. + // It returns an integer and an error + // The integer representing the number of calculated replica for the given replicaRequirements + // The Result contains code, reasons and error + // it is merged from all plugins returned result codes + Estimate(ctx context.Context, snapshot *schedcache.Snapshot, replicaRequirements *pb.ReplicaRequirements) (int32, *Result) +} + +// Handle provides data and some tools that plugins can use. It is +// passed to the plugin factories at the time of plugin initialization. Plugins +// must store and use this handle to call framework functions. +// We follow the design pattern as kubernetes scheduler framework +type Handle interface { + ClientSet() clientset.Interface + SharedInformerFactory() informers.SharedInformerFactory +} + +// Code is the Status code/type which is returned from plugins. +type Code int + +// Result indicates the result of running a plugin. It consists of a code, a +// message and (optionally) an error. When the status code is not `Success`, +// the reasons should explain why. +type Result struct { + code Code + reasons []string + err error +} + +// These are predefined codes used in a Status. +const ( + // Success means that plugin ran correctly and found resource schedulable. + // NOTE: A nil status is also considered as "Success". + Success Code = iota + // Unschedulable is used when a plugin finds the resource unschedulable. + // The accompanying status message should explain why the it is unschedulable. + Unschedulable + // Nooperation is used when a plugin is disabled or the plugin list are empty + Noopperation + // Error is used for internal plugin errors, unexpected input, etc. + Error +) + +// This list should be exactly the same as the codes iota defined above in the same order. +var codes = []string{"Success", "Unschedulable", "Nooperation", "Error"} + +func (c Code) String() string { + return codes[c] +} + +// NewResult makes a result out of the given arguments and returns its pointer. +func NewResult(code Code, reasons ...string) *Result { + s := &Result{ + code: code, + reasons: reasons, + } + if code == Error { + s.err = errors.New(strings.Join(reasons, ",")) + } + return s +} + +// PluginToResult maps plugin name to Result. +type PluginToResult map[string]*Result + +// Merge merges the statuses in the map into one. The resulting status code have the following +// precedence: Error, Unschedulable, Disabled. +func (p PluginToResult) Merge() *Result { + if len(p) == 0 { + return NewResult(Noopperation, "plugin results are empty") + } + + finalStatus := NewResult(Success) + var hasUnschedulable bool + hasAllNoOp := true + for _, s := range p { + if s.code == Error { + finalStatus.err = s.err + } else if s.code == Unschedulable { + hasUnschedulable = true + } + if s.code != Noopperation { + hasAllNoOp = false + } + finalStatus.code = s.code + finalStatus.reasons = append(finalStatus.reasons, s.reasons...) + } + + if finalStatus.err != nil { + finalStatus.code = Error + } else if hasUnschedulable { + finalStatus.code = Unschedulable + } else if hasAllNoOp { + finalStatus.code = Noopperation + } else { + finalStatus.code = Success + } + return finalStatus +} + +// IsSuccess returns true if and only if "Result" is nil or Code is "Success". +func (s *Result) IsSuccess() bool { + return s == nil || s.code == Success +} + +// IsNoOperation return true if "Result" is not nil and Code is "Nooperation" +// ToDo (wengyao04): we can remove it once we include node resource estimation as the default plugin in the future +func (s *Result) IsNoOperation() bool { + return s != nil && s.code == Noopperation +} + +// AsError returns nil if the Result is a success; otherwise returns an "error" object +// with a concatenated message on reasons of the Result. +func (s *Result) AsError() error { + if s.IsSuccess() { + return nil + } + if s.err != nil { + return s.err + } + return errors.New(strings.Join(s.reasons, ", ")) +} + +// Reasons returns reasons of the Result. +func (s *Result) Reasons() []string { + return s.reasons +} + +// Code returns code of the Result. +func (s *Result) Code() Code { + if s == nil { + return Success + } + return s.code +} + +// AsResult wraps an error in a Result. +func AsResult(err error) *Result { + return &Result{ + code: Error, + reasons: []string{err.Error()}, + err: err, + } +} diff --git a/pkg/estimator/server/framework/plugins/registry.go b/pkg/estimator/server/framework/plugins/registry.go new file mode 100644 index 000000000..082782ba1 --- /dev/null +++ b/pkg/estimator/server/framework/plugins/registry.go @@ -0,0 +1,27 @@ +/* +Copyright 2024 The Karmada Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package plugins + +import ( + "github.com/karmada-io/karmada/pkg/estimator/server/framework/runtime" +) + +// NewInTreeRegistry builds the registry with all the in-tree plugins. +func NewInTreeRegistry() runtime.Registry { + registry := runtime.Registry{} + return registry +} diff --git a/pkg/estimator/server/framework/runtime/framework.go b/pkg/estimator/server/framework/runtime/framework.go new file mode 100644 index 000000000..9c1ae5ae2 --- /dev/null +++ b/pkg/estimator/server/framework/runtime/framework.go @@ -0,0 +1,146 @@ +/* +Copyright 2024 The Karmada Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package runtime + +import ( + "context" + "fmt" + "math" + "reflect" + "time" + + "k8s.io/client-go/informers" + clientset "k8s.io/client-go/kubernetes" + + "github.com/karmada-io/karmada/pkg/estimator/pb" + "github.com/karmada-io/karmada/pkg/estimator/server/framework" + "github.com/karmada-io/karmada/pkg/estimator/server/metrics" + schedcache "github.com/karmada-io/karmada/pkg/util/lifted/scheduler/cache" + utilmetrics "github.com/karmada-io/karmada/pkg/util/metrics" +) + +const ( + estimator = "Estimator" +) + +// frameworkImpl implements the Framework interface and is responsible for initializing and running scheduler +// plugins. +type frameworkImpl struct { + estimateReplicasPlugins []framework.EstimateReplicasPlugin + clientSet clientset.Interface + informerFactory informers.SharedInformerFactory +} + +var _ framework.Framework = &frameworkImpl{} + +type frameworkOptions struct { + clientSet clientset.Interface + informerFactory informers.SharedInformerFactory +} + +// Option for the frameworkImpl. +type Option func(*frameworkOptions) + +func defaultFrameworkOptions() frameworkOptions { + return frameworkOptions{} +} + +// WithClientSet sets clientSet for the scheduling frameworkImpl. +func WithClientSet(clientSet clientset.Interface) Option { + return func(o *frameworkOptions) { + o.clientSet = clientSet + } +} + +// WithInformerFactory sets informer factory for the scheduling frameworkImpl. +func WithInformerFactory(informerFactory informers.SharedInformerFactory) Option { + return func(o *frameworkOptions) { + o.informerFactory = informerFactory + } +} + +// NewFramework creates a scheduling framework by registry. +func NewFramework(r Registry, opts ...Option) (framework.Framework, error) { + options := defaultFrameworkOptions() + for _, opt := range opts { + opt(&options) + } + f := &frameworkImpl{ + informerFactory: options.informerFactory, + } + estimateReplicasPluginsList := reflect.ValueOf(&f.estimateReplicasPlugins).Elem() + estimateReplicasType := estimateReplicasPluginsList.Type().Elem() + + for name, factory := range r { + p, err := factory(f) + if err != nil { + return nil, fmt.Errorf("failed to initialize plugin %q: %w", name, err) + } + addPluginToList(p, estimateReplicasType, &estimateReplicasPluginsList) + } + return f, nil +} + +func addPluginToList(plugin framework.Plugin, pluginType reflect.Type, pluginList *reflect.Value) { + if reflect.TypeOf(plugin).Implements(pluginType) { + newPlugins := reflect.Append(*pluginList, reflect.ValueOf(plugin)) + pluginList.Set(newPlugins) + } +} + +// ClientSet returns a kubernetes clientset. +func (frw *frameworkImpl) ClientSet() clientset.Interface { + return frw.clientSet +} + +// SharedInformerFactory returns a shared informer factory. +func (frw *frameworkImpl) SharedInformerFactory() informers.SharedInformerFactory { + return frw.informerFactory +} + +// RunEstimateReplicasPlugins runs the set of configured EstimateReplicasPlugins +// for estimating replicas based on the given replicaRequirements. +// It returns an integer and an error. +// The integer represents the minimum calculated value of estimated replicas from each EstimateReplicasPlugin. +func (frw *frameworkImpl) RunEstimateReplicasPlugins(ctx context.Context, snapshot *schedcache.Snapshot, replicaRequirements *pb.ReplicaRequirements) (int32, *framework.Result) { + startTime := time.Now() + defer func() { + metrics.FrameworkExtensionPointDuration.WithLabelValues(estimator).Observe(utilmetrics.DurationInSeconds(startTime)) + }() + var replica int32 = math.MaxInt32 + results := make(framework.PluginToResult) + for _, pl := range frw.estimateReplicasPlugins { + plReplica, ret := frw.runEstimateReplicasPlugins(ctx, pl, snapshot, replicaRequirements) + if ret.IsSuccess() && plReplica < replica { + replica = plReplica + } + results[pl.Name()] = ret + } + return replica, results.Merge() +} + +func (frw *frameworkImpl) runEstimateReplicasPlugins( + ctx context.Context, + pl framework.EstimateReplicasPlugin, + snapshot *schedcache.Snapshot, + replicaRequirements *pb.ReplicaRequirements, +) (int32, *framework.Result) { + startTime := time.Now() + replica, ret := pl.Estimate(ctx, snapshot, replicaRequirements) + metrics.PluginExecutionDuration.WithLabelValues(pl.Name(), estimator).Observe(utilmetrics.DurationInSeconds(startTime)) + return replica, ret +} diff --git a/pkg/estimator/server/framework/runtime/framework_test.go b/pkg/estimator/server/framework/runtime/framework_test.go new file mode 100644 index 000000000..c6c9fe5d7 --- /dev/null +++ b/pkg/estimator/server/framework/runtime/framework_test.go @@ -0,0 +1,254 @@ +/* +Copyright 2024 The Karmada Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package runtime + +import ( + "context" + "fmt" + "math" + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + + "github.com/karmada-io/karmada/pkg/estimator/pb" + "github.com/karmada-io/karmada/pkg/estimator/server/framework" + schedcache "github.com/karmada-io/karmada/pkg/util/lifted/scheduler/cache" +) + +type estimateReplicaResult struct { + replica int32 + ret *framework.Result +} + +type injectedResult struct { + estimateReplicaResult estimateReplicaResult +} + +// TestPlugin implements all Plugin interfaces. +type TestPlugin struct { + name string + inj injectedResult +} + +func (pl *TestPlugin) Name() string { + return pl.name +} + +func (pl *TestPlugin) Estimate(_ context.Context, _ *schedcache.Snapshot, _ *pb.ReplicaRequirements) (int32, *framework.Result) { + return pl.inj.estimateReplicaResult.replica, pl.inj.estimateReplicaResult.ret +} + +func Test_frameworkImpl_RunEstimateReplicasPlugins(t *testing.T) { + ctx := context.Background() + tests := []struct { + name string + plugins []*TestPlugin + expected estimateReplicaResult + }{ + { + name: "no EstimateReplicasPlugins", + plugins: []*TestPlugin{}, + expected: estimateReplicaResult{ + replica: math.MaxInt32, + ret: framework.NewResult(framework.Noopperation, "plugin results are empty"), + }, + }, + { + name: "one EstimateReplicasPlugin plugin returned success, but another EstimateReplicasPlugin plugin returned error", + plugins: []*TestPlugin{ + { + name: "success", + inj: injectedResult{ + estimateReplicaResult{ + replica: 1, + ret: framework.NewResult(framework.Success), + }, + }, + }, + { + name: "error", + inj: injectedResult{ + estimateReplicaResult{ + ret: framework.AsResult(fmt.Errorf("plugin 2 failed")), + }, + }, + }, + }, + expected: estimateReplicaResult{ + replica: 1, + ret: framework.AsResult(fmt.Errorf("plugin 2 failed")), + }, + }, + { + name: "one EstimateReplicasPlugin plugin returned success, but another EstimateReplicasPlugin plugin returned unschedulable", + plugins: []*TestPlugin{ + { + name: "success", + inj: injectedResult{ + estimateReplicaResult{ + replica: 1, + ret: framework.NewResult(framework.Success), + }, + }, + }, + { + name: "unschedulable", + inj: injectedResult{ + estimateReplicaResult{ + replica: 0, + ret: framework.NewResult(framework.Unschedulable, "plugin 2 is unschedulable"), + }, + }, + }, + }, + expected: estimateReplicaResult{ + replica: 1, + ret: framework.NewResult(framework.Unschedulable, "plugin 2 is unschedulable"), + }, + }, + { + name: "one EstimateReplicasPlugin plugin returned unschedulable, but another EstimateReplicasPlugin plugin returned noop", + plugins: []*TestPlugin{ + { + name: "unschedulable", + inj: injectedResult{ + estimateReplicaResult{ + replica: 0, + ret: framework.NewResult(framework.Unschedulable, "plugin 1 is unschedulable"), + }, + }, + }, + { + name: "noop", + inj: injectedResult{ + estimateReplicaResult{ + replica: math.MaxInt32, + ret: framework.NewResult(framework.Noopperation, "plugin 2 is no operation"), + }, + }, + }, + }, + expected: estimateReplicaResult{ + replica: math.MaxInt32, + ret: framework.NewResult(framework.Unschedulable, "plugin 1 is unschedulable", "plugin 2 is no operation"), + }, + }, + { + name: "one EstimateReplicasPlugin plugin returned success, but another EstimateReplicasPlugin plugin return no operation", + plugins: []*TestPlugin{ + { + name: "success", + inj: injectedResult{ + estimateReplicaResult{ + replica: 1, + ret: framework.NewResult(framework.Success), + }, + }, + }, + { + name: "noop", + inj: injectedResult{ + estimateReplicaResult{ + ret: framework.NewResult(framework.Noopperation, "plugin 2 is disabled"), + }, + }, + }, + }, + expected: estimateReplicaResult{ + replica: 1, + ret: framework.NewResult(framework.Success, "plugin 2 is disabled"), + }, + }, + { + name: "all EstimateReplicasPlugins returned success and 1 replica", + plugins: []*TestPlugin{ + { + name: "success1", + inj: injectedResult{ + estimateReplicaResult{ + replica: 1, + ret: framework.NewResult(framework.Success), + }, + }, + }, + { + name: "success2", + inj: injectedResult{ + estimateReplicaResult{ + replica: 1, + ret: framework.NewResult(framework.Success), + }, + }, + }, + }, + expected: estimateReplicaResult{ + replica: 1, + ret: framework.NewResult(framework.Success), + }, + }, + { + name: "all EstimateReplicasPlugins returned success and but different replica", + plugins: []*TestPlugin{ + { + name: "success1", + inj: injectedResult{ + estimateReplicaResult{ + replica: 1, + ret: framework.NewResult(framework.Success), + }, + }, + }, + { + name: "success2", + inj: injectedResult{ + estimateReplicaResult{ + replica: 2, + ret: framework.NewResult(framework.Success), + }, + }, + }, + }, + expected: estimateReplicaResult{ + replica: 1, + ret: framework.NewResult(framework.Success), + }, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + r := make(Registry) + for _, p := range tt.plugins { + p := p + if err := r.Register(p.name, func(fh framework.Handle) (framework.Plugin, error) { + return p, nil + }); err != nil { + t.Fatalf("fail to register PreScorePlugins plugin (%s)", p.Name()) + } + } + f, err := NewFramework(r) + if err != nil { + t.Errorf("create frame work error:%v", err) + } + replica, ret := f.RunEstimateReplicasPlugins(ctx, nil, &pb.ReplicaRequirements{}) + + require.Equal(t, tt.expected.ret.Code(), ret.Code()) + assert.ElementsMatch(t, tt.expected.ret.Reasons(), ret.Reasons()) + require.Equal(t, tt.expected.replica, replica) + }) + } +} diff --git a/pkg/estimator/server/framework/runtime/registry.go b/pkg/estimator/server/framework/runtime/registry.go new file mode 100644 index 000000000..250df8bdb --- /dev/null +++ b/pkg/estimator/server/framework/runtime/registry.go @@ -0,0 +1,61 @@ +/* +Copyright 2024 The Karmada Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package runtime + +import ( + "fmt" + + "github.com/karmada-io/karmada/pkg/estimator/server/framework" +) + +// PluginFactory is a function that builds a plugin. +type PluginFactory = func(f framework.Handle) (framework.Plugin, error) + +// Registry is a collection of all available plugins. The framework uses a +// registry to enable and initialize configured plugins. +// All plugins must be in the registry before initializing the framework. +type Registry map[string]PluginFactory + +// Register adds a new plugin to the registry. If a plugin with the same name +// exists, it returns an error. +func (r Registry) Register(name string, factory PluginFactory) error { + if _, ok := r[name]; ok { + return fmt.Errorf("a plugin named %v already exists", name) + } + r[name] = factory + return nil +} + +// Unregister removes an existing plugin from the registry. If no plugin with +// the provided name exists, it returns an error. +func (r Registry) Unregister(name string) error { + if _, ok := r[name]; !ok { + return fmt.Errorf("no plugin named %v exists", name) + } + delete(r, name) + return nil +} + +// Merge merges the provided registry to the current one. +func (r Registry) Merge(in Registry) error { + for name, factory := range in { + if err := r.Register(name, factory); err != nil { + return err + } + } + return nil +} diff --git a/pkg/estimator/server/framework/runtime/registry_test.go b/pkg/estimator/server/framework/runtime/registry_test.go new file mode 100644 index 000000000..ec5c04038 --- /dev/null +++ b/pkg/estimator/server/framework/runtime/registry_test.go @@ -0,0 +1,224 @@ +/* +Copyright 2024 The Karmada Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package runtime + +import ( + "testing" + + "github.com/google/uuid" + + "github.com/karmada-io/karmada/pkg/estimator/server/framework" +) + +type mockNoopPlugin struct { + uuid string +} + +func (p *mockNoopPlugin) Name() string { + return p.uuid +} + +func NewMockNoopPluginFactory() PluginFactory { + uuid := uuid.New().String() + return func(_ framework.Handle) (framework.Plugin, error) { + return &mockNoopPlugin{uuid}, nil + } +} + +// isRegistryEqual compares two registries for equality. This function is used in place of +// reflect.DeepEqual() and cmp() as they don't compare function values. +func isRegistryEqual(registryX, registryY Registry) bool { + for name, pluginFactory := range registryY { + if val, ok := registryX[name]; ok { + p1, _ := pluginFactory(nil) + p2, _ := val(nil) + if p1.Name() != p2.Name() { + // pluginFactory functions are not the same. + return false + } + } else { + // registryY contains an entry that is not present in registryX + return false + } + } + for name := range registryX { + if _, ok := registryY[name]; !ok { + // registryX contains an entry that is not present in registryY + return false + } + } + return true +} + +// TestRegistry_Register is same as https://github.com/kubernetes/kubernetes/blob/master/pkg/scheduler/framework/runtime/registry_test.go#L174-L223 +// because the framework registry design is same as kubernetes framework register +func TestRegistry_Register(t *testing.T) { + m1 := NewMockNoopPluginFactory() + m2 := NewMockNoopPluginFactory() + tests := []struct { + name string + registry Registry + nameToRegister string + factoryToRegister PluginFactory + expected Registry + shouldError bool + }{ + { + name: "valid Register", + registry: Registry{}, + nameToRegister: "pluginFactory1", + factoryToRegister: m1, + expected: Registry{ + "pluginFactory1": m1, + }, + shouldError: false, + }, + { + name: "Register duplicate factories", + registry: Registry{ + "pluginFactory1": m1, + }, + nameToRegister: "pluginFactory1", + factoryToRegister: m2, + expected: Registry{ + "pluginFactory1": m1, + }, + shouldError: true, + }, + } + + for _, scenario := range tests { + t.Run(scenario.name, func(t *testing.T) { + err := scenario.registry.Register(scenario.nameToRegister, scenario.factoryToRegister) + + if (err == nil) == scenario.shouldError { + t.Errorf("Register() shouldError is: %v however err is: %v.", scenario.shouldError, err) + return + } + + if !isRegistryEqual(scenario.expected, scenario.registry) { + t.Errorf("Register(). Expected %v. Got %v instead.", scenario.expected, scenario.registry) + } + }) + } +} + +// TestRegistry_Unregister is same as https://github.com/kubernetes/kubernetes/blob/master/pkg/scheduler/framework/runtime/registry_test.go#L225-L270 +// because the framework registry design is same as kubernetes framework register +func TestRegistry_Unregister(t *testing.T) { + m1 := NewMockNoopPluginFactory() + m2 := NewMockNoopPluginFactory() + tests := []struct { + name string + registry Registry + nameToUnregister string + expected Registry + shouldError bool + }{ + { + name: "valid Unregister", + registry: Registry{ + "pluginFactory1": m1, + "pluginFactory2": m2, + }, + nameToUnregister: "pluginFactory1", + expected: Registry{ + "pluginFactory2": m2, + }, + shouldError: false, + }, + { + name: "Unregister non-existent plugin factory", + registry: Registry{}, + nameToUnregister: "pluginFactory1", + expected: Registry{}, + shouldError: true, + }, + } + + for _, scenario := range tests { + t.Run(scenario.name, func(t *testing.T) { + err := scenario.registry.Unregister(scenario.nameToUnregister) + + if (err == nil) == scenario.shouldError { + t.Errorf("Unregister() shouldError is: %v however err is: %v.", scenario.shouldError, err) + return + } + + if !isRegistryEqual(scenario.expected, scenario.registry) { + t.Errorf("Unregister(). Expected %v. Got %v instead.", scenario.expected, scenario.registry) + } + }) + } +} + +// TestRegistry_Merge is same as https://github.com/kubernetes/kubernetes/blob/master/pkg/scheduler/framework/runtime/registry_test.go#L119-L172 +// because the framework registry design is same as kubernetes framework register +func TestRegistry_Merge(t *testing.T) { + m1 := NewMockNoopPluginFactory() + m2 := NewMockNoopPluginFactory() + tests := []struct { + name string + primaryRegistry Registry + registryToMerge Registry + expected Registry + shouldError bool + }{ + { + name: "valid Merge", + primaryRegistry: Registry{ + "pluginFactory1": m1, + }, + registryToMerge: Registry{ + "pluginFactory2": m2, + }, + expected: Registry{ + "pluginFactory1": m1, + "pluginFactory2": m2, + }, + shouldError: false, + }, + { + name: "Merge duplicate factories", + primaryRegistry: Registry{ + "pluginFactory1": m1, + }, + registryToMerge: Registry{ + "pluginFactory1": m2, + }, + expected: Registry{ + "pluginFactory1": m1, + }, + shouldError: true, + }, + } + + for _, scenario := range tests { + t.Run(scenario.name, func(t *testing.T) { + err := scenario.primaryRegistry.Merge(scenario.registryToMerge) + + if (err == nil) == scenario.shouldError { + t.Errorf("Merge() shouldError is: %v, however err is: %v.", scenario.shouldError, err) + return + } + + if !isRegistryEqual(scenario.expected, scenario.primaryRegistry) { + t.Errorf("Merge(). Expected %v. Got %v instead.", scenario.expected, scenario.primaryRegistry) + } + }) + } +} diff --git a/pkg/estimator/server/metrics/metrics.go b/pkg/estimator/server/metrics/metrics.go index c3ded713a..abbc74324 100644 --- a/pkg/estimator/server/metrics/metrics.go +++ b/pkg/estimator/server/metrics/metrics.go @@ -64,9 +64,34 @@ var ( Buckets: prometheus.ExponentialBuckets(0.001, 2, 10), }, []string{"result", "type", "step"}) + // FrameworkExtensionPointDuration is the metrics which indicates the latency for running all plugins of a specific extension point. + FrameworkExtensionPointDuration = prometheus.NewHistogramVec( + prometheus.HistogramOpts{ + Subsystem: SchedulerEstimatorSubsystem, + Name: "estimating_plugin_extension_point_duration_seconds", + Help: "Latency for running all plugins of a specific extension point.", + // Start with 0.1ms with the last bucket being [~200ms, Inf) + Buckets: prometheus.ExponentialBuckets(0.0001, 2, 12), + }, + []string{"estimating_plugin_extension_point"}) + + // PluginExecutionDuration is the metrics which indicates the duration for running a plugin at a specific extension point. + PluginExecutionDuration = prometheus.NewHistogramVec( + prometheus.HistogramOpts{ + Subsystem: SchedulerEstimatorSubsystem, + Name: "estimating_plugin_execution_duration_seconds", + Help: "Duration for running a plugin at a specific extension point.", + // Start with 0.01ms with the last bucket being [~22ms, Inf). We use a small factor (1.5) + // so that we have better granularity since plugin latency is very sensitive. + Buckets: prometheus.ExponentialBuckets(0.00001, 1.5, 20), + }, + []string{"estimating_plugin", "estimating_plugin_extension_point"}) + metrics = []prometheus.Collector{ requestCount, estimatingAlgorithmLatency, + FrameworkExtensionPointDuration, + PluginExecutionDuration, } ) diff --git a/pkg/estimator/server/server.go b/pkg/estimator/server/server.go index a477f3ecc..d8b6156c7 100644 --- a/pkg/estimator/server/server.go +++ b/pkg/estimator/server/server.go @@ -43,6 +43,9 @@ import ( "github.com/karmada-io/karmada/cmd/scheduler-estimator/app/options" "github.com/karmada-io/karmada/pkg/estimator/pb" + "github.com/karmada-io/karmada/pkg/estimator/server/framework" + frameworkplugins "github.com/karmada-io/karmada/pkg/estimator/server/framework/plugins" + frameworkruntime "github.com/karmada-io/karmada/pkg/estimator/server/framework/runtime" "github.com/karmada-io/karmada/pkg/estimator/server/metrics" "github.com/karmada-io/karmada/pkg/estimator/server/replica" estimatorservice "github.com/karmada-io/karmada/pkg/estimator/service" @@ -70,15 +73,16 @@ var ( // AccurateSchedulerEstimatorServer is the gRPC server of a cluster accurate scheduler estimator. // Please see https://github.com/karmada-io/karmada/pull/580 (#580). type AccurateSchedulerEstimatorServer struct { - port int - clusterName string - kubeClient kubernetes.Interface - restMapper meta.RESTMapper - informerFactory informers.SharedInformerFactory - nodeLister listv1.NodeLister - replicaLister *replica.ListerWrapper - informerManager genericmanager.SingleClusterInformerManager - parallelizer parallelize.Parallelizer + port int + clusterName string + kubeClient kubernetes.Interface + restMapper meta.RESTMapper + informerFactory informers.SharedInformerFactory + nodeLister listv1.NodeLister + replicaLister *replica.ListerWrapper + informerManager genericmanager.SingleClusterInformerManager + parallelizer parallelize.Parallelizer + estimateFramework framework.Framework Cache schedcache.Cache } @@ -90,7 +94,7 @@ func NewEstimatorServer( discoveryClient discovery.DiscoveryInterface, opts *options.Options, stopChan <-chan struct{}, -) *AccurateSchedulerEstimatorServer { +) (*AccurateSchedulerEstimatorServer, error) { cachedDiscoClient := cacheddiscovery.NewMemCacheClient(discoveryClient) restMapper := restmapper.NewDeferredDiscoveryRESTMapper(cachedDiscoClient) informerFactory := informers.NewSharedInformerFactory(kubeClient, 0) @@ -120,9 +124,19 @@ func NewEstimatorServer( es.informerManager.Lister(gvr) } + registry := frameworkplugins.NewInTreeRegistry() + estimateFramework, err := frameworkruntime.NewFramework(registry, + frameworkruntime.WithClientSet(kubeClient), + frameworkruntime.WithInformerFactory(informerFactory), + ) + if err != nil { + return es, err + } + es.estimateFramework = estimateFramework + addAllEventHandlers(es, informerFactory) - return es + return es, nil } // Start runs the accurate replica estimator server. @@ -193,7 +207,6 @@ func (es *AccurateSchedulerEstimatorServer) MaxAvailableReplicas(ctx context.Con if request.Cluster != es.clusterName { return nil, fmt.Errorf("cluster name does not match, got: %s, desire: %s", request.Cluster, es.clusterName) } - maxReplicas, err := es.EstimateReplicas(ctx, object, request) if err != nil { return nil, fmt.Errorf("failed to estimate replicas: %v", err) diff --git a/pkg/estimator/server/server_test.go b/pkg/estimator/server/server_test.go index ce67f5d41..ae6c0de49 100644 --- a/pkg/estimator/server/server_test.go +++ b/pkg/estimator/server/server_test.go @@ -244,7 +244,7 @@ func TestAccurateSchedulerEstimatorServer_MaxAvailableReplicas(t *testing.T) { }, } - es := NewEstimatorServer(fake.NewSimpleClientset(tt.objs...), dynamicClient, discoveryClient, opt, ctx.Done()) + es, _ := NewEstimatorServer(fake.NewSimpleClientset(tt.objs...), dynamicClient, discoveryClient, opt, ctx.Done()) es.informerFactory.Start(ctx.Done()) es.informerFactory.WaitForCacheSync(ctx.Done()) @@ -396,7 +396,7 @@ func BenchmarkAccurateSchedulerEstimatorServer_MaxAvailableReplicas(b *testing.B objs = append(objs, pod) } - es := NewEstimatorServer(fake.NewSimpleClientset(objs...), dynamicClient, discoveryClient, opt, ctx.Done()) + es, _ := NewEstimatorServer(fake.NewSimpleClientset(objs...), dynamicClient, discoveryClient, opt, ctx.Done()) es.informerFactory.Start(ctx.Done()) es.informerFactory.WaitForCacheSync(ctx.Done())