add karmada-descheduler

Signed-off-by: Garrybest <garrybest@foxmail.com>
This commit is contained in:
Garrybest 2022-02-23 16:18:55 +08:00
parent fcbbe3f0a2
commit 85d8a6ccf4
16 changed files with 857 additions and 24 deletions

View File

@ -0,0 +1,142 @@
package app
import (
"context"
"flag"
"fmt"
"net"
"net/http"
"os"
"strconv"
"github.com/google/uuid"
"github.com/prometheus/client_golang/prometheus/promhttp"
"github.com/spf13/cobra"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/rest"
"k8s.io/client-go/tools/clientcmd"
"k8s.io/client-go/tools/leaderelection"
"k8s.io/client-go/tools/leaderelection/resourcelock"
"k8s.io/klog/v2"
"github.com/karmada-io/karmada/cmd/descheduler/app/options"
"github.com/karmada-io/karmada/pkg/descheduler"
karmadaclientset "github.com/karmada-io/karmada/pkg/generated/clientset/versioned"
"github.com/karmada-io/karmada/pkg/version"
"github.com/karmada-io/karmada/pkg/version/sharedcommand"
)
// NewDeschedulerCommand creates a *cobra.Command object with default parameters
func NewDeschedulerCommand(stopChan <-chan struct{}) *cobra.Command {
opts := options.NewOptions()
cmd := &cobra.Command{
Use: "karmada-descheduler",
Long: `The karmada-descheduler evicts replicas from member clusters
if they are failed to be scheduled for a period of time. It relies on
karmada-scheduler-estimator to get replica status.`,
RunE: func(cmd *cobra.Command, args []string) error {
// validate options
if errs := opts.Validate(); len(errs) != 0 {
return errs.ToAggregate()
}
if err := run(opts, stopChan); err != nil {
return err
}
return nil
},
Args: func(cmd *cobra.Command, args []string) error {
for _, arg := range args {
if len(arg) > 0 {
return fmt.Errorf("%q does not take any arguments, got %q", cmd.CommandPath(), args)
}
}
return nil
},
}
// Init log flags
// TODO(@RainbowMango): Group the flags to "logs" flag set.
klog.InitFlags(flag.CommandLine)
opts.AddFlags(cmd.Flags())
cmd.AddCommand(sharedcommand.NewCmdVersion(os.Stdout, "karmada-descheduler"))
cmd.Flags().AddGoFlagSet(flag.CommandLine)
return cmd
}
func run(opts *options.Options, stopChan <-chan struct{}) error {
klog.Infof("karmada-descheduler version: %s", version.Get())
klog.Infof("Please make sure the karmada-scheduler-estimator of all member clusters has been deployed")
go serveHealthzAndMetrics(net.JoinHostPort(opts.BindAddress, strconv.Itoa(opts.SecurePort)))
restConfig, err := clientcmd.BuildConfigFromFlags(opts.Master, opts.KubeConfig)
if err != nil {
return fmt.Errorf("error building kubeconfig: %s", err.Error())
}
restConfig.QPS, restConfig.Burst = opts.KubeAPIQPS, opts.KubeAPIBurst
karmadaClient := karmadaclientset.NewForConfigOrDie(restConfig)
kubeClient := kubernetes.NewForConfigOrDie(restConfig)
ctx, cancel := context.WithCancel(context.Background())
go func() {
<-stopChan
cancel()
}()
desched := descheduler.NewDescheduler(karmadaClient, kubeClient, opts)
if !opts.LeaderElection.LeaderElect {
desched.Run(ctx)
return fmt.Errorf("descheduler exited")
}
leaderElectionClient, err := kubernetes.NewForConfig(rest.AddUserAgent(restConfig, "leader-election"))
if err != nil {
return err
}
hostname, err := os.Hostname()
if err != nil {
return fmt.Errorf("unable to get hostname: %v", err)
}
// add a uniquifier so that two processes on the same host don't accidentally both become active
id := hostname + "_" + uuid.New().String()
rl, err := resourcelock.New(opts.LeaderElection.ResourceLock,
opts.LeaderElection.ResourceNamespace,
opts.LeaderElection.ResourceName,
leaderElectionClient.CoreV1(),
leaderElectionClient.CoordinationV1(),
resourcelock.ResourceLockConfig{
Identity: id,
})
if err != nil {
return fmt.Errorf("couldn't create resource lock: %v", err)
}
leaderelection.RunOrDie(ctx, leaderelection.LeaderElectionConfig{
Lock: rl,
LeaseDuration: opts.LeaderElection.LeaseDuration.Duration,
RenewDeadline: opts.LeaderElection.RenewDeadline.Duration,
RetryPeriod: opts.LeaderElection.RetryPeriod.Duration,
Callbacks: leaderelection.LeaderCallbacks{
OnStartedLeading: desched.Run,
OnStoppedLeading: func() {
klog.Fatalf("leaderelection lost")
},
},
})
return nil
}
func serveHealthzAndMetrics(address string) {
http.HandleFunc("/healthz", func(w http.ResponseWriter, _ *http.Request) {
w.WriteHeader(http.StatusOK)
_, _ = w.Write([]byte("ok"))
})
http.Handle("/metrics", promhttp.Handler())
klog.Fatal(http.ListenAndServe(address, nil))
}

View File

@ -0,0 +1,67 @@
package options
import (
"time"
"github.com/spf13/pflag"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
componentbaseconfig "k8s.io/component-base/config"
"github.com/karmada-io/karmada/pkg/util"
)
const (
defaultBindAddress = "0.0.0.0"
defaultPort = 10358
defaultEstimatorPort = 10352
defaultDeschedulingInterval = 2 * time.Minute
defaultUnschedulableThreshold = 5 * time.Minute
)
// Options contains everything necessary to create and run scheduler-estimator.
type Options struct {
LeaderElection componentbaseconfig.LeaderElectionConfiguration
KubeConfig string
Master string
// BindAddress is the IP address on which to listen for the --secure-port port.
BindAddress string
// SecurePort is the port that the server serves at.
SecurePort int
KubeAPIQPS float32
// KubeAPIBurst is the burst to allow while talking with karmada-apiserver.
KubeAPIBurst int
// SchedulerEstimatorTimeout specifies the timeout period of calling the accurate scheduler estimator service.
SchedulerEstimatorTimeout metav1.Duration
// SchedulerEstimatorPort is the port that the accurate scheduler estimator server serves at.
SchedulerEstimatorPort int
// DeschedulingInterval specifies time interval for descheduler to run.
DeschedulingInterval metav1.Duration
// UnschedulableThreshold specifies the period of pod unschedulable condition.
UnschedulableThreshold metav1.Duration
}
// NewOptions builds an empty options.
func NewOptions() *Options {
return &Options{}
}
// AddFlags adds flags of estimator to the specified FlagSet
func (o *Options) AddFlags(fs *pflag.FlagSet) {
if o == nil {
return
}
fs.BoolVar(&o.LeaderElection.LeaderElect, "leader-elect", true, "Enable leader election, which must be true when running multi instances.")
fs.StringVar(&o.LeaderElection.ResourceNamespace, "leader-elect-resource-namespace", util.NamespaceKarmadaSystem, "The namespace of resource object that is used for locking during leader election.")
fs.StringVar(&o.KubeConfig, "kubeconfig", o.KubeConfig, "Path to a KubeConfig. Only required if out-of-cluster.")
fs.StringVar(&o.Master, "master", o.Master, "The address of the Kubernetes API server. Overrides any value in KubeConfig. Only required if out-of-cluster.")
fs.StringVar(&o.BindAddress, "bind-address", defaultBindAddress, "The IP address on which to listen for the --secure-port port.")
fs.IntVar(&o.SecurePort, "secure-port", defaultPort, "The secure port on which to serve HTTPS.")
fs.Float32Var(&o.KubeAPIQPS, "kube-api-qps", 40.0, "QPS to use while talking with karmada-apiserver. Doesn't cover events and node heartbeat apis which rate limiting is controlled by a different set of flags.")
fs.IntVar(&o.KubeAPIBurst, "kube-api-burst", 60, "Burst to use while talking with karmada-apiserver. Doesn't cover events and node heartbeat apis which rate limiting is controlled by a different set of flags.")
fs.DurationVar(&o.SchedulerEstimatorTimeout.Duration, "scheduler-estimator-timeout", 3*time.Second, "Specifies the timeout period of calling the scheduler estimator service.")
fs.IntVar(&o.SchedulerEstimatorPort, "scheduler-estimator-port", defaultEstimatorPort, "The secure port on which to connect the accurate scheduler estimator.")
fs.DurationVar(&o.DeschedulingInterval.Duration, "descheduling-interval", defaultDeschedulingInterval, "Time interval between two consecutive descheduler executions. Setting this value instructs the descheduler to run in a continuous loop at the interval specified.")
fs.DurationVar(&o.UnschedulableThreshold.Duration, "unschedulable-threshold", defaultUnschedulableThreshold, "The period of pod unschedulable condition. This value is considered as a classification standard of unschedulable replicas.")
}

View File

@ -0,0 +1,39 @@
package options
import (
"net"
"k8s.io/apimachinery/pkg/util/validation/field"
)
// Validate checks Options and return a slice of found errs.
func (o *Options) Validate() field.ErrorList {
errs := field.ErrorList{}
newPath := field.NewPath("Options")
if net.ParseIP(o.BindAddress) == nil {
errs = append(errs, field.Invalid(newPath.Child("BindAddress"), o.BindAddress, "not a valid textual representation of an IP address"))
}
if o.SecurePort < 0 || o.SecurePort > 65535 {
errs = append(errs, field.Invalid(newPath.Child("SecurePort"), o.SecurePort, "must be a valid port between 0 and 65535 inclusive"))
}
if o.SchedulerEstimatorPort < 0 || o.SchedulerEstimatorPort > 65535 {
errs = append(errs, field.Invalid(newPath.Child("SchedulerEstimatorPort"), o.SchedulerEstimatorPort, "must be a valid port between 0 and 65535 inclusive"))
}
if o.SchedulerEstimatorTimeout.Duration < 0 {
errs = append(errs, field.Invalid(newPath.Child("SchedulerEstimatorTimeout"), o.SchedulerEstimatorTimeout, "must be greater than or equal to 0"))
}
if o.DeschedulingInterval.Duration < 0 {
errs = append(errs, field.Invalid(newPath.Child("DeschedulingInterval"), o.DeschedulingInterval, "must be greater than or equal to 0"))
}
if o.UnschedulableThreshold.Duration < 0 {
errs = append(errs, field.Invalid(newPath.Child("UnschedulableThreshold"), o.UnschedulableThreshold, "must be greater than or equal to 0"))
}
return errs
}

29
cmd/descheduler/main.go Normal file
View File

@ -0,0 +1,29 @@
package main
import (
"os"
apiserver "k8s.io/apiserver/pkg/server"
"k8s.io/component-base/logs"
"github.com/karmada-io/karmada/cmd/descheduler/app"
)
func main() {
if err := runDeschedulerCmd(); err != nil {
os.Exit(1)
}
}
func runDeschedulerCmd() error {
logs.InitLogs()
defer logs.FlushLogs()
stopChan := apiserver.SetupSignalHandler()
command := app.NewDeschedulerCommand(stopChan)
if err := command.Execute(); err != nil {
return err
}
return nil
}

View File

@ -20,4 +20,8 @@ const (
EventReasonScheduleBindingFailed = "ScheduleBindingFailed"
// EventReasonScheduleBindingSucceed indicates that schedule binding succeed.
EventReasonScheduleBindingSucceed = "ScheduleBindingSucceed"
// EventReasonDescheduleBindingFailed indicates that deschedule binding failed.
EventReasonDescheduleBindingFailed = "DescheduleBindingFailed"
// EventReasonDescheduleBindingSucceed indicates that deschedule binding succeed.
EventReasonDescheduleBindingSucceed = "DescheduleBindingSucceed"
)

View File

@ -2,7 +2,6 @@ package status
import (
"context"
"encoding/json"
"fmt"
"reflect"
@ -10,7 +9,6 @@ import (
"k8s.io/apimachinery/pkg/api/meta"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/tools/record"
@ -285,7 +283,7 @@ func (c *WorkStatusController) reflectStatus(work *workv1alpha1.Work, clusterObj
}
if statusMap != nil {
rawExtension, err := c.buildStatusRawExtension(statusMap)
rawExtension, err := helper.BuildStatusRawExtension(statusMap)
if err != nil {
return err
}
@ -336,18 +334,6 @@ func (c *WorkStatusController) buildStatusIdentifier(work *workv1alpha1.Work, cl
return identifier, nil
}
func (c *WorkStatusController) buildStatusRawExtension(status map[string]interface{}) (*runtime.RawExtension, error) {
statusJSON, err := json.Marshal(status)
if err != nil {
klog.Errorf("Failed to marshal status. Error: %v.", statusJSON)
return nil, err
}
return &runtime.RawExtension{
Raw: statusJSON,
}, nil
}
func (c *WorkStatusController) mergeStatus(statuses []workv1alpha1.ManifestStatus, newStatus workv1alpha1.ManifestStatus) []workv1alpha1.ManifestStatus {
// TODO(RainbowMango): update 'statuses' if 'newStatus' already exist.
// For now, we only have at most one manifest in Work, so just override current 'statuses'.

View File

@ -0,0 +1,55 @@
package core
import (
"encoding/json"
appsv1 "k8s.io/api/apps/v1"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/klog/v2"
policyv1alpha1 "github.com/karmada-io/karmada/pkg/apis/policy/v1alpha1"
workv1alpha2 "github.com/karmada-io/karmada/pkg/apis/work/v1alpha2"
"github.com/karmada-io/karmada/pkg/util"
"github.com/karmada-io/karmada/pkg/util/helper"
)
// TODO(Garrybest): make it as an option
var supportedGVKs = []schema.GroupVersionKind{
appsv1.SchemeGroupVersion.WithKind("Deployment"),
}
// FilterBindings will filter ResourceBindings that could be descheduled
// based on their GVK and applied placement.
func FilterBindings(bindings []*workv1alpha2.ResourceBinding) []*workv1alpha2.ResourceBinding {
var res []*workv1alpha2.ResourceBinding
for _, binding := range bindings {
if validateGVK(&binding.Spec.Resource) && validatePlacement(binding) {
res = append(res, binding)
}
}
return res
}
func validateGVK(reference *workv1alpha2.ObjectReference) bool {
gvr := schema.FromAPIVersionAndKind(reference.APIVersion, reference.Kind)
for i := range supportedGVKs {
if gvr == supportedGVKs[i] {
return true
}
}
return false
}
func validatePlacement(binding *workv1alpha2.ResourceBinding) bool {
// Check whether the policy allows rescheduling.
appliedPlacement := util.GetLabelValue(binding.Annotations, util.PolicyPlacementAnnotation)
if len(appliedPlacement) == 0 {
return false
}
placement := &policyv1alpha1.Placement{}
if err := json.Unmarshal([]byte(appliedPlacement), placement); err != nil {
klog.ErrorS(err, "Failed to unmarshal placement when validating", "ResourceBinding", klog.KObj(binding))
return false
}
return helper.IsReplicaDynamicDivided(placement.ReplicaScheduling)
}

View File

@ -0,0 +1,125 @@
package core
import (
"context"
"encoding/json"
"fmt"
"math"
"time"
"k8s.io/klog/v2"
workv1alpha2 "github.com/karmada-io/karmada/pkg/apis/work/v1alpha2"
estimatorclient "github.com/karmada-io/karmada/pkg/estimator/client"
"github.com/karmada-io/karmada/pkg/util"
)
// SchedulingResultHelper is a helper to wrap the ResourceBinding and its target cluster result.
type SchedulingResultHelper struct {
*workv1alpha2.ResourceBinding
TargetClusters []*TargetClusterWrapper
}
// NewSchedulingResultHelper returns a new SchedulingResultHelper based on ResourceBinding.
func NewSchedulingResultHelper(binding *workv1alpha2.ResourceBinding) *SchedulingResultHelper {
h := &SchedulingResultHelper{ResourceBinding: binding}
readyReplicas := getReadyReplicas(binding)
for i := range binding.Spec.Clusters {
targetCluster := &binding.Spec.Clusters[i]
targetClusterHelper := &TargetClusterWrapper{
ClusterName: targetCluster.Name,
Spec: targetCluster.Replicas,
}
if ready, exist := readyReplicas[targetCluster.Name]; exist {
targetClusterHelper.Ready = ready
} else {
targetClusterHelper.Ready = estimatorclient.UnauthenticReplica
}
h.TargetClusters = append(h.TargetClusters, targetClusterHelper)
}
return h
}
// FillUnschedulableReplicas will detect the unschedulable replicas of member cluster by calling
// unschedulable replica estimators and fill the unschedulable field of TargetClusterWrapper.
func (h *SchedulingResultHelper) FillUnschedulableReplicas(unschedulableThreshold time.Duration) {
reference := &h.Spec.Resource
undesiredClusters, undesiredClusterNames := h.GetUndesiredClusters()
// Set the boundary.
for i := range undesiredClusters {
undesiredClusters[i].Unschedulable = math.MaxInt32
}
// Get the minimum value of MaxAvailableReplicas in terms of all estimators.
estimators := estimatorclient.GetUnschedulableReplicaEstimators()
ctx := context.WithValue(context.TODO(), util.ContextKeyObject,
fmt.Sprintf("kind=%s, name=%s/%s", reference.Kind, reference.Namespace, reference.Name))
for _, estimator := range estimators {
res, err := estimator.GetUnschedulableReplicas(ctx, undesiredClusterNames, reference, unschedulableThreshold)
if err != nil {
klog.Errorf("Max cluster unschedulable replicas error: %v", err)
continue
}
for i := range res {
if res[i].Replicas == estimatorclient.UnauthenticReplica {
continue
}
if undesiredClusters[i].ClusterName == res[i].Name && undesiredClusters[i].Unschedulable > res[i].Replicas {
undesiredClusters[i].Unschedulable = res[i].Replicas
}
}
}
for i := range undesiredClusters {
if undesiredClusters[i].Unschedulable == math.MaxInt32 {
undesiredClusters[i].Unschedulable = 0
}
}
klog.V(4).Infof("Target undesired cluster of unschedulable replica result: %v", undesiredClusters)
}
// GetUndesiredClusters returns the cluster which of ready replicas are not reach the ready ones.
func (h *SchedulingResultHelper) GetUndesiredClusters() ([]*TargetClusterWrapper, []string) {
var clusters []*TargetClusterWrapper
var names []string
for _, cluster := range h.TargetClusters {
if cluster.Ready < cluster.Spec {
clusters = append(clusters, cluster)
names = append(names, cluster.ClusterName)
}
}
return clusters, names
}
// TargetClusterWrapper is a wrapper to wrap the target cluster name, spec replicas,
// ready replicas and unschedulable replicas.
type TargetClusterWrapper struct {
ClusterName string
Spec int32
Ready int32
Unschedulable int32
}
func getReadyReplicas(binding *workv1alpha2.ResourceBinding) map[string]int32 {
aggregatedStatus := binding.Status.AggregatedStatus
res := make(map[string]int32, len(aggregatedStatus))
for i := range aggregatedStatus {
item := aggregatedStatus[i]
if item.Status == nil {
continue
}
workloadStatus := make(map[string]interface{})
if err := json.Unmarshal(item.Status.Raw, &workloadStatus); err != nil {
klog.ErrorS(err, "Failed to unmarshal workload status when get ready replicas", "ResourceBinding", klog.KObj(binding))
continue
}
readyReplicas := int32(0)
// TODO(Garrybest): cooperate with custom resource interpreter
if r, ok := workloadStatus[util.ReadyReplicasField]; ok {
readyReplicas = int32(r.(float64))
res[item.ClusterName] = readyReplicas
}
}
return res
}

View File

@ -0,0 +1,281 @@
package descheduler
import (
"context"
"fmt"
"time"
corev1 "k8s.io/api/core/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/kubernetes"
v1core "k8s.io/client-go/kubernetes/typed/core/v1"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/tools/record"
"k8s.io/klog/v2"
"github.com/karmada-io/karmada/cmd/descheduler/app/options"
clusterv1alpha1 "github.com/karmada-io/karmada/pkg/apis/cluster/v1alpha1"
workv1alpha2 "github.com/karmada-io/karmada/pkg/apis/work/v1alpha2"
"github.com/karmada-io/karmada/pkg/descheduler/core"
estimatorclient "github.com/karmada-io/karmada/pkg/estimator/client"
karmadaclientset "github.com/karmada-io/karmada/pkg/generated/clientset/versioned"
informerfactory "github.com/karmada-io/karmada/pkg/generated/informers/externalversions"
clusterlister "github.com/karmada-io/karmada/pkg/generated/listers/cluster/v1alpha1"
worklister "github.com/karmada-io/karmada/pkg/generated/listers/work/v1alpha2"
"github.com/karmada-io/karmada/pkg/util"
"github.com/karmada-io/karmada/pkg/util/gclient"
)
const (
descheduleSuccessMessage = "Binding has been descheduled"
)
// Descheduler is the descheduler schema, which is used to evict replicas from specific clusters
type Descheduler struct {
KarmadaClient karmadaclientset.Interface
KubeClient kubernetes.Interface
informerFactory informerfactory.SharedInformerFactory
bindingInformer cache.SharedIndexInformer
bindingLister worklister.ResourceBindingLister
clusterInformer cache.SharedIndexInformer
clusterLister clusterlister.ClusterLister
eventRecorder record.EventRecorder
schedulerEstimatorCache *estimatorclient.SchedulerEstimatorCache
schedulerEstimatorPort int
schedulerEstimatorWorker util.AsyncWorker
unschedulableThreshold time.Duration
deschedulingInterval time.Duration
deschedulerWorker util.AsyncWorker
}
// NewDescheduler instantiates a descheduler
func NewDescheduler(karmadaClient karmadaclientset.Interface, kubeClient kubernetes.Interface, opts *options.Options) *Descheduler {
factory := informerfactory.NewSharedInformerFactory(karmadaClient, 0)
desched := &Descheduler{
KarmadaClient: karmadaClient,
KubeClient: kubeClient,
informerFactory: factory,
bindingInformer: factory.Work().V1alpha2().ResourceBindings().Informer(),
bindingLister: factory.Work().V1alpha2().ResourceBindings().Lister(),
clusterInformer: factory.Cluster().V1alpha1().Clusters().Informer(),
clusterLister: factory.Cluster().V1alpha1().Clusters().Lister(),
schedulerEstimatorCache: estimatorclient.NewSchedulerEstimatorCache(),
schedulerEstimatorPort: opts.SchedulerEstimatorPort,
unschedulableThreshold: opts.UnschedulableThreshold.Duration,
deschedulingInterval: opts.DeschedulingInterval.Duration,
}
desched.schedulerEstimatorWorker = util.NewAsyncWorker("scheduler-estimator", nil, desched.reconcileEstimatorConnection)
schedulerEstimator := estimatorclient.NewSchedulerEstimator(desched.schedulerEstimatorCache, opts.SchedulerEstimatorTimeout.Duration)
estimatorclient.RegisterSchedulerEstimator(schedulerEstimator)
desched.deschedulerWorker = util.NewAsyncWorker("descheduler", util.MetaNamespaceKeyFunc, desched.worker)
desched.clusterInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: desched.addCluster,
UpdateFunc: desched.updateCluster,
DeleteFunc: desched.deleteCluster,
})
eventBroadcaster := record.NewBroadcaster()
eventBroadcaster.StartStructuredLogging(0)
eventBroadcaster.StartRecordingToSink(&v1core.EventSinkImpl{Interface: kubeClient.CoreV1().Events("")})
desched.eventRecorder = eventBroadcaster.NewRecorder(gclient.NewSchema(), corev1.EventSource{Component: "karmada-descheduler"})
return desched
}
// Run runs the scheduler
func (d *Descheduler) Run(ctx context.Context) {
stopCh := ctx.Done()
klog.Infof("Starting karmada descheduler")
defer klog.Infof("Shutting down karmada descheduler")
// Establish all connections first and then begin scheduling.
d.establishEstimatorConnections()
d.schedulerEstimatorWorker.Run(1, stopCh)
d.informerFactory.Start(stopCh)
if !cache.WaitForCacheSync(stopCh, d.bindingInformer.HasSynced, d.clusterInformer.HasSynced) {
klog.Errorf("Failed to wait for cache sync")
}
go wait.Until(d.descheduleOnce, d.deschedulingInterval, stopCh)
d.deschedulerWorker.Run(1, stopCh)
<-stopCh
}
func (d *Descheduler) descheduleOnce() {
bindings, err := d.bindingLister.List(labels.Everything())
if err != nil {
klog.Errorf("List all ResourceBindings error: %v", err)
}
bindings = core.FilterBindings(bindings)
for _, binding := range bindings {
d.deschedulerWorker.Add(binding)
}
}
func (d *Descheduler) worker(key util.QueueKey) error {
namespacedName, ok := key.(string)
if !ok {
return fmt.Errorf("failed to deschedule as invalid key: %v", key)
}
namespace, name, err := cache.SplitMetaNamespaceKey(namespacedName)
if err != nil {
return fmt.Errorf("invalid resource key: %s", namespacedName)
}
binding, err := d.bindingLister.ResourceBindings(namespace).Get(name)
if err != nil {
if apierrors.IsNotFound(err) {
klog.Infof("ResourceBinding(%s) in work queue no longer exists, ignore.", namespacedName)
return nil
}
return fmt.Errorf("get ResourceBinding(%s) error: %v", namespacedName, err)
}
h := core.NewSchedulingResultHelper(binding)
if _, undesiredClusters := h.GetUndesiredClusters(); len(undesiredClusters) == 0 {
return nil
}
h.FillUnschedulableReplicas(d.unschedulableThreshold)
klog.V(3).Infof("Unschedulable result of resource(%s): %v", namespacedName, h.TargetClusters)
return d.updateScheduleResult(h)
}
func (d *Descheduler) updateScheduleResult(h *core.SchedulingResultHelper) error {
unschedulableSum := int32(0)
message := descheduleSuccessMessage
binding := h.ResourceBinding.DeepCopy()
for i, cluster := range h.TargetClusters {
if cluster.Unschedulable > 0 && cluster.Spec >= cluster.Unschedulable {
target := cluster.Spec - cluster.Unschedulable
// The target cluster replicas must not be less than ready replicas.
if target < cluster.Ready && cluster.Ready <= cluster.Spec {
target = cluster.Ready
}
binding.Spec.Clusters[i].Replicas = target
unschedulable := cluster.Spec - target
unschedulableSum += unschedulable
message += fmt.Sprintf(", %d replica(s) in cluster(%s)", unschedulable, cluster.ClusterName)
}
}
if unschedulableSum == 0 {
return nil
}
message = fmt.Sprintf(", %d total descheduled replica(s)", unschedulableSum) + message
var err error
defer func() {
d.recordDescheduleResultEventForResourceBinding(binding, message, err)
}()
binding, err = d.KarmadaClient.WorkV1alpha2().ResourceBindings(binding.Namespace).Update(context.TODO(), binding, metav1.UpdateOptions{})
if err != nil {
return err
}
return nil
}
func (d *Descheduler) addCluster(obj interface{}) {
cluster, ok := obj.(*clusterv1alpha1.Cluster)
if !ok {
klog.Errorf("Cannot convert to Cluster: %v", obj)
return
}
klog.V(4).Infof("Receiving add event for cluster %s", cluster.Name)
d.schedulerEstimatorWorker.Add(cluster.Name)
}
func (d *Descheduler) updateCluster(_, newObj interface{}) {
cluster, ok := newObj.(*clusterv1alpha1.Cluster)
if !ok {
klog.Errorf("Cannot convert to Cluster: %v", newObj)
return
}
klog.V(4).Infof("Receiving update event for cluster %s", cluster.Name)
d.schedulerEstimatorWorker.Add(cluster.Name)
}
func (d *Descheduler) deleteCluster(obj interface{}) {
var cluster *clusterv1alpha1.Cluster
switch t := obj.(type) {
case *clusterv1alpha1.Cluster:
cluster = t
case cache.DeletedFinalStateUnknown:
var ok bool
cluster, ok = t.Obj.(*clusterv1alpha1.Cluster)
if !ok {
klog.Errorf("Cannot convert to clusterv1alpha1.Cluster: %v", t.Obj)
return
}
default:
klog.Errorf("Cannot convert to clusterv1alpha1.Cluster: %v", t)
return
}
klog.V(4).Infof("Receiving delete event for cluster %s", cluster.Name)
d.schedulerEstimatorWorker.Add(cluster.Name)
}
func (d *Descheduler) establishEstimatorConnections() {
clusterList, err := d.KarmadaClient.ClusterV1alpha1().Clusters().List(context.TODO(), metav1.ListOptions{})
if err != nil {
klog.Errorf("Cannot list all clusters when establish all cluster estimator connections: %v", err)
return
}
for i := range clusterList.Items {
if err = estimatorclient.EstablishConnection(clusterList.Items[i].Name, d.schedulerEstimatorCache, d.schedulerEstimatorPort); err != nil {
klog.Error(err)
}
}
}
func (d *Descheduler) reconcileEstimatorConnection(key util.QueueKey) error {
name, ok := key.(string)
if !ok {
return fmt.Errorf("failed to reconcile estimator connection as invalid key: %v", key)
}
_, err := d.clusterLister.Get(name)
if err != nil {
if apierrors.IsNotFound(err) {
d.schedulerEstimatorCache.DeleteCluster(name)
return nil
}
return err
}
return estimatorclient.EstablishConnection(name, d.schedulerEstimatorCache, d.schedulerEstimatorPort)
}
func (d *Descheduler) recordDescheduleResultEventForResourceBinding(rb *workv1alpha2.ResourceBinding, message string, err error) {
if rb == nil {
return
}
ref := &corev1.ObjectReference{
Kind: rb.Spec.Resource.Kind,
APIVersion: rb.Spec.Resource.APIVersion,
Namespace: rb.Spec.Resource.Namespace,
Name: rb.Spec.Resource.Name,
UID: rb.Spec.Resource.UID,
}
if err == nil {
d.eventRecorder.Event(rb, corev1.EventTypeNormal, workv1alpha2.EventReasonDescheduleBindingSucceed, message)
d.eventRecorder.Event(ref, corev1.EventTypeNormal, workv1alpha2.EventReasonDescheduleBindingSucceed, message)
} else {
d.eventRecorder.Event(rb, corev1.EventTypeNormal, workv1alpha2.EventReasonDescheduleBindingFailed, err.Error())
d.eventRecorder.Event(ref, corev1.EventTypeNormal, workv1alpha2.EventReasonDescheduleBindingFailed, err.Error())
}
}

View File

@ -17,6 +17,7 @@ import (
// RegisterSchedulerEstimator will register a SchedulerEstimator.
func RegisterSchedulerEstimator(se *SchedulerEstimator) {
replicaEstimators["scheduler-estimator"] = se
unschedulableReplicaEstimators["scheduler-estimator"] = se
}
type getClusterReplicasFunc func(ctx context.Context, cluster string) (int32, error)
@ -36,12 +37,32 @@ func NewSchedulerEstimator(cache *SchedulerEstimatorCache, timeout time.Duration
}
// MaxAvailableReplicas estimates the maximum replicas that can be applied to the target cluster by calling karmada-scheduler-estimator.
func (se *SchedulerEstimator) MaxAvailableReplicas(parentCtx context.Context, clusters []*clusterv1alpha1.Cluster, replicaRequirements *workv1alpha2.ReplicaRequirements) ([]workv1alpha2.TargetCluster, error) {
return getClusterReplicasConcurrently(parentCtx, clusters, se.timeout, func(ctx context.Context, cluster string) (int32, error) {
func (se *SchedulerEstimator) MaxAvailableReplicas(
parentCtx context.Context,
clusters []*clusterv1alpha1.Cluster,
replicaRequirements *workv1alpha2.ReplicaRequirements,
) ([]workv1alpha2.TargetCluster, error) {
clusterNames := make([]string, len(clusters))
for i, cluster := range clusters {
clusterNames[i] = cluster.Name
}
return getClusterReplicasConcurrently(parentCtx, clusterNames, se.timeout, func(ctx context.Context, cluster string) (int32, error) {
return se.maxAvailableReplicas(ctx, cluster, replicaRequirements.DeepCopy())
})
}
// GetUnschedulableReplicas gets the unschedulable replicas which belong to a specified workload by calling karmada-scheduler-estimator.
func (se *SchedulerEstimator) GetUnschedulableReplicas(
parentCtx context.Context,
clusters []string,
reference *workv1alpha2.ObjectReference,
unscheduableThreshold time.Duration,
) ([]workv1alpha2.TargetCluster, error) {
return getClusterReplicasConcurrently(parentCtx, clusters, se.timeout, func(ctx context.Context, cluster string) (int32, error) {
return se.maxUnscheduableReplicas(ctx, cluster, reference.DeepCopy(), unscheduableThreshold)
})
}
func (se *SchedulerEstimator) maxAvailableReplicas(ctx context.Context, cluster string, replicaRequirements *workv1alpha2.ReplicaRequirements) (int32, error) {
client, err := se.cache.GetClient(cluster)
if err != nil {
@ -64,12 +85,40 @@ func (se *SchedulerEstimator) maxAvailableReplicas(ctx context.Context, cluster
}
res, err := client.MaxAvailableReplicas(ctx, req)
if err != nil {
return UnauthenticReplica, fmt.Errorf("gRPC request cluster(%s) estimator error: %v", cluster, err)
return UnauthenticReplica, fmt.Errorf("gRPC request cluster(%s) estimator error when calling MaxAvailableReplicas: %v", cluster, err)
}
return res.MaxReplicas, nil
}
func getClusterReplicasConcurrently(parentCtx context.Context, clusters []*clusterv1alpha1.Cluster,
func (se *SchedulerEstimator) maxUnscheduableReplicas(
ctx context.Context,
cluster string,
reference *workv1alpha2.ObjectReference,
threshold time.Duration,
) (int32, error) {
client, err := se.cache.GetClient(cluster)
if err != nil {
return UnauthenticReplica, err
}
req := &pb.UnschedulableReplicasRequest{
Cluster: cluster,
Resource: pb.ObjectReference{
APIVersion: reference.APIVersion,
Kind: reference.Kind,
Namespace: reference.Namespace,
Name: reference.Name,
},
UnschedulableThreshold: threshold,
}
res, err := client.GetUnschedulableReplicas(ctx, req)
if err != nil {
return UnauthenticReplica, fmt.Errorf("gRPC request cluster(%s) estimator error when calling UnschedulableReplicas: %v", cluster, err)
}
return res.UnschedulableReplicas, nil
}
func getClusterReplicasConcurrently(parentCtx context.Context, clusters []string,
timeout time.Duration, getClusterReplicas getClusterReplicasFunc) ([]workv1alpha2.TargetCluster, error) {
// add object information into gRPC metadata
if u, ok := parentCtx.Value(util.ContextKeyObject).(string); ok {
@ -91,7 +140,7 @@ func getClusterReplicasConcurrently(parentCtx context.Context, clusters []*clust
errChan <- err
}
availableTargetClusters[idx] = workv1alpha2.TargetCluster{Name: cluster, Replicas: replicas}
}(i, clusters[i].Name)
}(i, clusters[i])
}
wg.Wait()

View File

@ -2,6 +2,7 @@ package client
import (
"context"
"time"
clusterv1alpha1 "github.com/karmada-io/karmada/pkg/apis/cluster/v1alpha1"
workv1alpha2 "github.com/karmada-io/karmada/pkg/apis/work/v1alpha2"
@ -13,7 +14,8 @@ import (
const UnauthenticReplica = -1
var (
replicaEstimators = map[string]ReplicaEstimator{}
replicaEstimators = map[string]ReplicaEstimator{}
unschedulableReplicaEstimators = map[string]UnschedulableReplicaEstimator{}
)
// ReplicaEstimator is an estimator which estimates the maximum replicas that can be applied to the target cluster.
@ -21,7 +23,17 @@ type ReplicaEstimator interface {
MaxAvailableReplicas(ctx context.Context, clusters []*clusterv1alpha1.Cluster, replicaRequirements *workv1alpha2.ReplicaRequirements) ([]workv1alpha2.TargetCluster, error)
}
// UnschedulableReplicaEstimator is an estimator which estimates the unschedulable replicas which belong to a specified workload.
type UnschedulableReplicaEstimator interface {
GetUnschedulableReplicas(ctx context.Context, clusters []string, reference *workv1alpha2.ObjectReference, unschedulableThreshold time.Duration) ([]workv1alpha2.TargetCluster, error)
}
// GetReplicaEstimators returns all replica estimators.
func GetReplicaEstimators() map[string]ReplicaEstimator {
return replicaEstimators
}
// GetUnschedulableReplicaEstimators returns all unschedulable replica estimators.
func GetUnschedulableReplicaEstimators() map[string]UnschedulableReplicaEstimator {
return unschedulableReplicaEstimators
}

View File

@ -296,14 +296,14 @@ func traceMaxAvailableReplicas(object string, start time.Time, request *pb.MaxAv
}
func traceGetUnschedulableReplicas(object string, start time.Time, request *pb.UnschedulableReplicasRequest) func(response **pb.UnschedulableReplicasResponse, err *error) {
klog.V(4).Infof("Begin detecting cluster unscheduable replicas of resource(%s), request: %s", object, pretty.Sprint(*request))
klog.V(4).Infof("Begin detecting cluster unschedulable replicas of resource(%s), request: %s", object, pretty.Sprint(*request))
return func(response **pb.UnschedulableReplicasResponse, err *error) {
metrics.CountRequests(*err, metrics.EstimatingTypeGetUnschedulableReplicas)
metrics.UpdateEstimatingAlgorithmLatency(*err, metrics.EstimatingTypeGetUnschedulableReplicas, metrics.EstimatingStepTotal, start)
if *err != nil {
klog.Errorf("Failed to detect cluster unscheduable replicas: %v", *err)
klog.Errorf("Failed to detect cluster unschedulable replicas: %v", *err)
return
}
klog.Infof("Finish detecting cluster unscheduable replicas of resource(%s), unschedulable replicas: %d, time elapsed: %s", object, (*response).UnschedulableReplicas, time.Since(start))
klog.Infof("Finish detecting cluster unschedulable replicas of resource(%s), unschedulable replicas: %d, time elapsed: %s", object, (*response).UnschedulableReplicas, time.Since(start))
}
}

View File

@ -111,6 +111,8 @@ const (
SpecField = "spec"
// ReplicasField indicates the 'replicas' field of a resource
ReplicasField = "replicas"
// ReadyReplicasField indicates the 'readyReplicas' field of a resource status
ReadyReplicasField = "readyReplicas"
// ParallelismField indicates the 'parallelism' field of a job
ParallelismField = "parallelism"
// CompletionsField indicates the 'completions' field of a job

View File

@ -120,3 +120,19 @@ func GenerateResourceSelectorForServiceImport(svcImport policyv1alpha1.ResourceS
},
}
}
// IsReplicaDynamicDivided checks if a PropagationPolicy schedules replicas as dynamic.
func IsReplicaDynamicDivided(strategy *policyv1alpha1.ReplicaSchedulingStrategy) bool {
if strategy == nil || strategy.ReplicaSchedulingType != policyv1alpha1.ReplicaSchedulingTypeDivided {
return false
}
switch strategy.ReplicaDivisionPreference {
case policyv1alpha1.ReplicaDivisionPreferenceWeighted:
return strategy.WeightPreference != nil && len(strategy.WeightPreference.DynamicWeight) != 0
case policyv1alpha1.ReplicaDivisionPreferenceAggregated:
return true
default:
return false
}
}

View File

@ -2,6 +2,7 @@ package helper
import (
"context"
"encoding/json"
"fmt"
"reflect"
@ -9,6 +10,7 @@ import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/client-go/util/retry"
"k8s.io/klog/v2"
@ -261,3 +263,16 @@ func IsWorkContains(workStatus *workv1alpha1.WorkStatus, targetResource schema.G
}
return false
}
// BuildStatusRawExtension builds raw JSON by a status map.
func BuildStatusRawExtension(status map[string]interface{}) (*runtime.RawExtension, error) {
statusJSON, err := json.Marshal(status)
if err != nil {
klog.Errorf("Failed to marshal status. Error: %v.", statusJSON)
return nil, err
}
return &runtime.RawExtension{
Raw: statusJSON,
}, nil
}

View File

@ -3,6 +3,7 @@ package util
import (
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/util/workqueue"
"k8s.io/klog/v2"
)
@ -122,3 +123,13 @@ func (w *asyncWorker) Run(workerNumber int, stopChan <-chan struct{}) {
w.queue.ShutDown()
}()
}
// MetaNamespaceKeyFunc generates a namespaced key for object.
func MetaNamespaceKeyFunc(obj interface{}) (QueueKey, error) {
var key string
var err error
if key, err = cache.MetaNamespaceKeyFunc(obj); err != nil {
return nil, err
}
return key, nil
}