package app import ( "context" "flag" "fmt" "net/http" "os" "github.com/google/uuid" "github.com/spf13/cobra" "k8s.io/client-go/dynamic" "k8s.io/client-go/kubernetes" "k8s.io/client-go/rest" "k8s.io/client-go/tools/clientcmd" "k8s.io/client-go/tools/leaderelection" "k8s.io/client-go/tools/leaderelection/resourcelock" "k8s.io/klog/v2" "github.com/karmada-io/karmada/cmd/scheduler/app/options" karmadaclientset "github.com/karmada-io/karmada/pkg/generated/clientset/versioned" "github.com/karmada-io/karmada/pkg/scheduler" ) // NewSchedulerCommand creates a *cobra.Command object with default parameters func NewSchedulerCommand(stopChan <-chan struct{}) *cobra.Command { opts := options.NewOptions() cmd := &cobra.Command{ Use: "scheduler", Long: `The karmada scheduler binds resources to the clusters it manages.`, Run: func(cmd *cobra.Command, args []string) { if err := run(opts, stopChan); err != nil { fmt.Fprintf(os.Stderr, "%v\n", err) os.Exit(1) } }, } opts.AddFlags(cmd.Flags()) cmd.Flags().AddGoFlagSet(flag.CommandLine) return cmd } func run(opts *options.Options, stopChan <-chan struct{}) error { go serveHealthz(fmt.Sprintf("%s:%d", opts.BindAddress, opts.SecurePort)) restConfig, err := clientcmd.BuildConfigFromFlags(opts.Master, opts.KubeConfig) if err != nil { return fmt.Errorf("error building kubeconfig: %s", err.Error()) } dynamicClientSet := dynamic.NewForConfigOrDie(restConfig) karmadaClient := karmadaclientset.NewForConfigOrDie(restConfig) kubeClientSet := kubernetes.NewForConfigOrDie(restConfig) ctx, cancel := context.WithCancel(context.Background()) go func() { <-stopChan cancel() }() scheduler.Failover = opts.Failover sched := scheduler.NewScheduler(dynamicClientSet, karmadaClient, kubeClientSet) if !opts.LeaderElection.LeaderElect { sched.Run(ctx) return fmt.Errorf("scheduler exited") } leaderElectionClient, err := kubernetes.NewForConfig(rest.AddUserAgent(restConfig, "leader-election")) if err != nil { return err } hostname, err := os.Hostname() if err != nil { return fmt.Errorf("unable to get hostname: %v", err) } // add a uniquifier so that two processes on the same host don't accidentally both become active id := hostname + "_" + uuid.New().String() rl, err := resourcelock.New(opts.LeaderElection.ResourceLock, opts.LeaderElection.ResourceNamespace, "karmada-scheduler", leaderElectionClient.CoreV1(), leaderElectionClient.CoordinationV1(), resourcelock.ResourceLockConfig{ Identity: id, }) if err != nil { return fmt.Errorf("couldn't create resource lock: %v", err) } leaderelection.RunOrDie(ctx, leaderelection.LeaderElectionConfig{ Lock: rl, LeaseDuration: opts.LeaderElection.LeaseDuration.Duration, RenewDeadline: opts.LeaderElection.RenewDeadline.Duration, RetryPeriod: opts.LeaderElection.RetryPeriod.Duration, Callbacks: leaderelection.LeaderCallbacks{ OnStartedLeading: sched.Run, OnStoppedLeading: func() { klog.Fatalf("leaderelection lost") }, }, }) return nil } func serveHealthz(address string) { http.HandleFunc("/healthz", func(w http.ResponseWriter, _ *http.Request) { w.WriteHeader(http.StatusOK) _, _ = w.Write([]byte("ok")) }) klog.Fatal(http.ListenAndServe(address, nil)) }