karmada/cmd/scheduler/app/scheduler.go

117 lines
3.2 KiB
Go

package app
import (
"context"
"flag"
"fmt"
"net/http"
"os"
"github.com/google/uuid"
"github.com/spf13/cobra"
"k8s.io/client-go/dynamic"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/rest"
"k8s.io/client-go/tools/clientcmd"
"k8s.io/client-go/tools/leaderelection"
"k8s.io/client-go/tools/leaderelection/resourcelock"
"k8s.io/klog/v2"
"github.com/karmada-io/karmada/cmd/scheduler/app/options"
karmadaclientset "github.com/karmada-io/karmada/pkg/generated/clientset/versioned"
"github.com/karmada-io/karmada/pkg/scheduler"
)
// NewSchedulerCommand creates a *cobra.Command object with default parameters
func NewSchedulerCommand(stopChan <-chan struct{}) *cobra.Command {
opts := options.NewOptions()
cmd := &cobra.Command{
Use: "scheduler",
Long: `The karmada scheduler binds resources to the clusters it manages.`,
Run: func(cmd *cobra.Command, args []string) {
if err := run(opts, stopChan); err != nil {
fmt.Fprintf(os.Stderr, "%v\n", err)
os.Exit(1)
}
},
}
opts.AddFlags(cmd.Flags())
cmd.Flags().AddGoFlagSet(flag.CommandLine)
return cmd
}
func run(opts *options.Options, stopChan <-chan struct{}) error {
go serveHealthz(fmt.Sprintf("%s:%d", opts.BindAddress, opts.SecurePort))
resetConfig, err := clientcmd.BuildConfigFromFlags(opts.Master, opts.KubeConfig)
if err != nil {
return fmt.Errorf("error building kubeconfig: %s", err.Error())
}
dynamicClientSet := dynamic.NewForConfigOrDie(resetConfig)
karmadaClient := karmadaclientset.NewForConfigOrDie(resetConfig)
kubeClientSet := kubernetes.NewForConfigOrDie(resetConfig)
ctx, cancel := context.WithCancel(context.Background())
go func() {
<-stopChan
cancel()
}()
scheduler.Failover = opts.Failover
sched := scheduler.NewScheduler(dynamicClientSet, karmadaClient, kubeClientSet)
if !opts.LeaderElection.LeaderElect {
sched.Run(ctx)
return fmt.Errorf("scheduler exited")
}
leaderElectionClient, err := kubernetes.NewForConfig(rest.AddUserAgent(resetConfig, "leader-election"))
if err != nil {
return err
}
hostname, err := os.Hostname()
if err != nil {
return fmt.Errorf("unable to get hostname: %v", err)
}
// add a uniquifier so that two processes on the same host don't accidentally both become active
id := hostname + "_" + uuid.New().String()
rl, err := resourcelock.New(opts.LeaderElection.ResourceLock,
opts.LeaderElection.ResourceNamespace,
"karmada-scheduler",
leaderElectionClient.CoreV1(),
leaderElectionClient.CoordinationV1(),
resourcelock.ResourceLockConfig{
Identity: id,
})
if err != nil {
return fmt.Errorf("couldn't create resource lock: %v", err)
}
leaderelection.RunOrDie(ctx, leaderelection.LeaderElectionConfig{
Lock: rl,
LeaseDuration: opts.LeaderElection.LeaseDuration.Duration,
RenewDeadline: opts.LeaderElection.RenewDeadline.Duration,
RetryPeriod: opts.LeaderElection.RetryPeriod.Duration,
Callbacks: leaderelection.LeaderCallbacks{
OnStartedLeading: sched.Run,
OnStoppedLeading: func() {
klog.Fatalf("leaderelection lost")
},
},
})
return nil
}
func serveHealthz(address string) {
http.HandleFunc("/healthz", func(w http.ResponseWriter, _ *http.Request) {
w.WriteHeader(http.StatusOK)
_, _ = w.Write([]byte("ok"))
})
klog.Fatal(http.ListenAndServe(address, nil))
}