package app import ( "context" "fmt" "net" "net/http" "os" "strconv" "time" "github.com/google/uuid" "github.com/prometheus/client_golang/prometheus/promhttp" "github.com/spf13/cobra" "k8s.io/client-go/kubernetes" "k8s.io/client-go/rest" "k8s.io/client-go/tools/clientcmd" "k8s.io/client-go/tools/leaderelection" "k8s.io/client-go/tools/leaderelection/resourcelock" cliflag "k8s.io/component-base/cli/flag" "k8s.io/component-base/term" "k8s.io/klog/v2" "github.com/karmada-io/karmada/cmd/descheduler/app/options" "github.com/karmada-io/karmada/pkg/descheduler" karmadaclientset "github.com/karmada-io/karmada/pkg/generated/clientset/versioned" "github.com/karmada-io/karmada/pkg/sharedcli" "github.com/karmada-io/karmada/pkg/sharedcli/klogflag" "github.com/karmada-io/karmada/pkg/sharedcli/profileflag" "github.com/karmada-io/karmada/pkg/version" "github.com/karmada-io/karmada/pkg/version/sharedcommand" ) const ( // ReadHeaderTimeout is the amount of time allowed to read // request headers. // HTTP timeouts are necessary to expire inactive connections // and failing to do so might make the application vulnerable // to attacks like slowloris which work by sending data very slow, // which in case of no timeout will keep the connection active // eventually leading to a denial-of-service (DoS) attack. // References: // - https://en.wikipedia.org/wiki/Slowloris_(computer_security) ReadHeaderTimeout = 32 * time.Second ) // NewDeschedulerCommand creates a *cobra.Command object with default parameters func NewDeschedulerCommand(stopChan <-chan struct{}) *cobra.Command { opts := options.NewOptions() cmd := &cobra.Command{ Use: "karmada-descheduler", Long: `The karmada-descheduler evicts replicas from member clusters if they are failed to be scheduled for a period of time. It relies on karmada-scheduler-estimator to get replica status.`, RunE: func(cmd *cobra.Command, args []string) error { // validate options if errs := opts.Validate(); len(errs) != 0 { return errs.ToAggregate() } if err := run(opts, stopChan); err != nil { return err } return nil }, Args: func(cmd *cobra.Command, args []string) error { for _, arg := range args { if len(arg) > 0 { return fmt.Errorf("%q does not take any arguments, got %q", cmd.CommandPath(), args) } } return nil }, } fss := cliflag.NamedFlagSets{} genericFlagSet := fss.FlagSet("generic") opts.AddFlags(genericFlagSet) // Set klog flags logsFlagSet := fss.FlagSet("logs") klogflag.Add(logsFlagSet) cmd.AddCommand(sharedcommand.NewCmdVersion("karmada-descheduler")) cmd.Flags().AddFlagSet(genericFlagSet) cmd.Flags().AddFlagSet(logsFlagSet) cols, _, _ := term.TerminalSize(cmd.OutOrStdout()) sharedcli.SetUsageAndHelpFunc(cmd, fss, cols) return cmd } func run(opts *options.Options, stopChan <-chan struct{}) error { klog.Infof("karmada-descheduler version: %s", version.Get()) klog.Infof("Please make sure the karmada-scheduler-estimator of all member clusters has been deployed") go serveHealthzAndMetrics(net.JoinHostPort(opts.BindAddress, strconv.Itoa(opts.SecurePort))) profileflag.ListenAndServe(opts.ProfileOpts) restConfig, err := clientcmd.BuildConfigFromFlags(opts.Master, opts.KubeConfig) if err != nil { return fmt.Errorf("error building kubeconfig: %s", err.Error()) } restConfig.QPS, restConfig.Burst = opts.KubeAPIQPS, opts.KubeAPIBurst karmadaClient := karmadaclientset.NewForConfigOrDie(restConfig) kubeClient := kubernetes.NewForConfigOrDie(restConfig) ctx, cancel := context.WithCancel(context.Background()) go func() { <-stopChan cancel() }() desched := descheduler.NewDescheduler(karmadaClient, kubeClient, opts) if !opts.LeaderElection.LeaderElect { desched.Run(ctx) return fmt.Errorf("descheduler exited") } leaderElectionClient, err := kubernetes.NewForConfig(rest.AddUserAgent(restConfig, "leader-election")) if err != nil { return err } hostname, err := os.Hostname() if err != nil { return fmt.Errorf("unable to get hostname: %v", err) } // add a uniquifier so that two processes on the same host don't accidentally both become active id := hostname + "_" + uuid.New().String() rl, err := resourcelock.New(opts.LeaderElection.ResourceLock, opts.LeaderElection.ResourceNamespace, opts.LeaderElection.ResourceName, leaderElectionClient.CoreV1(), leaderElectionClient.CoordinationV1(), resourcelock.ResourceLockConfig{ Identity: id, }) if err != nil { return fmt.Errorf("couldn't create resource lock: %v", err) } leaderelection.RunOrDie(ctx, leaderelection.LeaderElectionConfig{ Lock: rl, LeaseDuration: opts.LeaderElection.LeaseDuration.Duration, RenewDeadline: opts.LeaderElection.RenewDeadline.Duration, RetryPeriod: opts.LeaderElection.RetryPeriod.Duration, Callbacks: leaderelection.LeaderCallbacks{ OnStartedLeading: desched.Run, OnStoppedLeading: func() { klog.Fatalf("leaderelection lost") }, }, }) return nil } func serveHealthzAndMetrics(address string) { mux := http.NewServeMux() mux.HandleFunc("/healthz", func(w http.ResponseWriter, _ *http.Request) { w.WriteHeader(http.StatusOK) _, _ = w.Write([]byte("ok")) }) mux.Handle("/metrics", promhttp.Handler()) httpServer := http.Server{ Addr: address, Handler: mux, ReadHeaderTimeout: ReadHeaderTimeout, } if err := httpServer.ListenAndServe(); err != nil { klog.Errorf("Failed to serve healthz and metrics: %v", err) os.Exit(1) } }