package app import ( "context" "flag" "fmt" "net" "strconv" "github.com/spf13/cobra" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/util/sets" "k8s.io/client-go/dynamic" kubeclientset "k8s.io/client-go/kubernetes" "k8s.io/client-go/tools/clientcmd" cliflag "k8s.io/component-base/cli/flag" "k8s.io/component-base/term" "k8s.io/klog/v2" controllerruntime "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/config/v1alpha1" "sigs.k8s.io/controller-runtime/pkg/healthz" "github.com/karmada-io/karmada/cmd/agent/app/options" clusterv1alpha1 "github.com/karmada-io/karmada/pkg/apis/cluster/v1alpha1" workv1alpha1 "github.com/karmada-io/karmada/pkg/apis/work/v1alpha1" controllerscontext "github.com/karmada-io/karmada/pkg/controllers/context" "github.com/karmada-io/karmada/pkg/controllers/execution" "github.com/karmada-io/karmada/pkg/controllers/mcs" "github.com/karmada-io/karmada/pkg/controllers/status" karmadaclientset "github.com/karmada-io/karmada/pkg/generated/clientset/versioned" "github.com/karmada-io/karmada/pkg/karmadactl" "github.com/karmada-io/karmada/pkg/resourceinterpreter" "github.com/karmada-io/karmada/pkg/sharedcli" "github.com/karmada-io/karmada/pkg/sharedcli/klogflag" "github.com/karmada-io/karmada/pkg/sharedcli/profileflag" "github.com/karmada-io/karmada/pkg/util" "github.com/karmada-io/karmada/pkg/util/fedinformer/genericmanager" "github.com/karmada-io/karmada/pkg/util/fedinformer/typedManager" "github.com/karmada-io/karmada/pkg/util/gclient" "github.com/karmada-io/karmada/pkg/util/helper" "github.com/karmada-io/karmada/pkg/util/names" "github.com/karmada-io/karmada/pkg/util/objectwatcher" "github.com/karmada-io/karmada/pkg/util/restmapper" "github.com/karmada-io/karmada/pkg/version" "github.com/karmada-io/karmada/pkg/version/sharedcommand" ) // NewAgentCommand creates a *cobra.Command object with default parameters func NewAgentCommand(ctx context.Context) *cobra.Command { opts := options.NewOptions() karmadaConfig := karmadactl.NewKarmadaConfig(clientcmd.NewDefaultPathOptions()) cmd := &cobra.Command{ Use: "karmada-agent", Long: `The karmada agent runs the cluster registration agent`, RunE: func(cmd *cobra.Command, args []string) error { // validate options if errs := opts.Validate(); len(errs) != 0 { return errs.ToAggregate() } if err := run(ctx, karmadaConfig, opts); err != nil { return err } return nil }, Args: func(cmd *cobra.Command, args []string) error { for _, arg := range args { if len(arg) > 0 { return fmt.Errorf("%q does not take any arguments, got %q", cmd.CommandPath(), args) } } return nil }, } fss := cliflag.NamedFlagSets{} genericFlagSet := fss.FlagSet("generic") genericFlagSet.AddGoFlagSet(flag.CommandLine) opts.AddFlags(genericFlagSet, controllers.ControllerNames()) // Set klog flags logsFlagSet := fss.FlagSet("logs") klogflag.Add(logsFlagSet) cmd.AddCommand(sharedcommand.NewCmdVersion("karmada-agent")) cmd.Flags().AddFlagSet(genericFlagSet) cmd.Flags().AddFlagSet(logsFlagSet) cols, _, _ := term.TerminalSize(cmd.OutOrStdout()) sharedcli.SetUsageAndHelpFunc(cmd, fss, cols) return cmd } var controllers = make(controllerscontext.Initializers) var controllersDisabledByDefault = sets.NewString() func init() { controllers["clusterStatus"] = startClusterStatusController controllers["execution"] = startExecutionController controllers["workStatus"] = startWorkStatusController controllers["serviceExport"] = startServiceExportController } func run(ctx context.Context, karmadaConfig karmadactl.KarmadaConfig, opts *options.Options) error { klog.Infof("karmada-agent version: %s", version.Get()) profileflag.ListenAndServe(opts.ProfileOpts) controlPlaneRestConfig, err := karmadaConfig.GetRestConfig(opts.KarmadaContext, opts.KarmadaKubeConfig) if err != nil { return fmt.Errorf("error building kubeconfig of karmada control plane: %w", err) } controlPlaneRestConfig.QPS, controlPlaneRestConfig.Burst = opts.KubeAPIQPS, opts.KubeAPIBurst clusterConfig, err := controllerruntime.GetConfig() if err != nil { return fmt.Errorf("error building kubeconfig of member cluster: %w", err) } clusterKubeClient := kubeclientset.NewForConfigOrDie(clusterConfig) controlPlaneKubeClient := kubeclientset.NewForConfigOrDie(controlPlaneRestConfig) registerOption := util.ClusterRegisterOption{ ClusterNamespace: opts.ClusterNamespace, ClusterName: opts.ClusterName, ReportSecrets: opts.ReportSecrets, ClusterAPIEndpoint: opts.ClusterAPIEndpoint, ProxyServerAddress: opts.ProxyServerAddress, ClusterProvider: opts.ClusterProvider, ClusterRegion: opts.ClusterRegion, DryRun: false, ControlPlaneConfig: controlPlaneRestConfig, ClusterConfig: clusterConfig, } id, err := util.ObtainClusterID(clusterKubeClient) if err != nil { return err } registerOption.ClusterID = id clusterSecret, impersonatorSecret, err := util.ObtainCredentialsFromMemberCluster(clusterKubeClient, registerOption) if err != nil { return err } registerOption.Secret = *clusterSecret registerOption.ImpersonatorSecret = *impersonatorSecret err = util.RegisterClusterInControllerPlane(registerOption, controlPlaneKubeClient, generateClusterInControllerPlane) if err != nil { return fmt.Errorf("failed to register with karmada control plane: %w", err) } executionSpace, err := names.GenerateExecutionSpaceName(opts.ClusterName) if err != nil { return fmt.Errorf("failed to generate execution space name for member cluster %s, err is %v", opts.ClusterName, err) } controllerManager, err := controllerruntime.NewManager(controlPlaneRestConfig, controllerruntime.Options{ Scheme: gclient.NewSchema(), SyncPeriod: &opts.ResyncPeriod.Duration, Namespace: executionSpace, LeaderElection: opts.LeaderElection.LeaderElect, LeaderElectionID: fmt.Sprintf("karmada-agent-%s", opts.ClusterName), LeaderElectionNamespace: opts.LeaderElection.ResourceNamespace, LeaderElectionResourceLock: opts.LeaderElection.ResourceLock, LeaseDuration: &opts.LeaderElection.LeaseDuration.Duration, RenewDeadline: &opts.LeaderElection.RenewDeadline.Duration, RetryPeriod: &opts.LeaderElection.RetryPeriod.Duration, HealthProbeBindAddress: net.JoinHostPort(opts.BindAddress, strconv.Itoa(opts.SecurePort)), LivenessEndpointName: "/healthz", MetricsBindAddress: opts.MetricsBindAddress, MapperProvider: restmapper.MapperProvider, Controller: v1alpha1.ControllerConfigurationSpec{ GroupKindConcurrency: map[string]int{ workv1alpha1.SchemeGroupVersion.WithKind("Work").GroupKind().String(): opts.ConcurrentWorkSyncs, clusterv1alpha1.SchemeGroupVersion.WithKind("Cluster").GroupKind().String(): opts.ConcurrentClusterSyncs, }, }, }) if err != nil { return fmt.Errorf("failed to build controller manager: %w", err) } if err := controllerManager.AddHealthzCheck("ping", healthz.Ping); err != nil { klog.Errorf("failed to add health check endpoint: %v", err) return err } if err = setupControllers(controllerManager, opts, ctx.Done()); err != nil { return err } // blocks until the context is done. if err := controllerManager.Start(ctx); err != nil { return fmt.Errorf("controller manager exits unexpectedly: %w", err) } return nil } func setupControllers(mgr controllerruntime.Manager, opts *options.Options, stopChan <-chan struct{}) error { restConfig := mgr.GetConfig() dynamicClientSet := dynamic.NewForConfigOrDie(restConfig) controlPlaneInformerManager := genericmanager.NewSingleClusterInformerManager(dynamicClientSet, 0, stopChan) resourceInterpreter := resourceinterpreter.NewResourceInterpreter("", controlPlaneInformerManager) if err := mgr.Add(resourceInterpreter); err != nil { return fmt.Errorf("failed to setup custom resource interpreter: %w", err) } objectWatcher := objectwatcher.NewObjectWatcher(mgr.GetClient(), mgr.GetRESTMapper(), util.NewClusterDynamicClientSetForAgent, resourceInterpreter) controllerContext := controllerscontext.Context{ Mgr: mgr, ObjectWatcher: objectWatcher, Opts: controllerscontext.Options{ Controllers: opts.Controllers, ClusterName: opts.ClusterName, ClusterStatusUpdateFrequency: opts.ClusterStatusUpdateFrequency, ClusterLeaseDuration: opts.ClusterLeaseDuration, ClusterLeaseRenewIntervalFraction: opts.ClusterLeaseRenewIntervalFraction, ClusterSuccessThreshold: opts.ClusterSuccessThreshold, ClusterFailureThreshold: opts.ClusterFailureThreshold, ClusterCacheSyncTimeout: opts.ClusterCacheSyncTimeout, ClusterAPIQPS: opts.ClusterAPIQPS, ClusterAPIBurst: opts.ClusterAPIBurst, ConcurrentWorkSyncs: opts.ConcurrentWorkSyncs, RateLimiterOptions: opts.RateLimiterOpts, }, StopChan: stopChan, ResourceInterpreter: resourceInterpreter, } if err := controllers.StartControllers(controllerContext, controllersDisabledByDefault); err != nil { return fmt.Errorf("error starting controllers: %w", err) } // Ensure the InformerManager stops when the stop channel closes go func() { <-stopChan genericmanager.StopInstance() }() return nil } func startClusterStatusController(ctx controllerscontext.Context) (bool, error) { clusterStatusController := &status.ClusterStatusController{ Client: ctx.Mgr.GetClient(), KubeClient: kubeclientset.NewForConfigOrDie(ctx.Mgr.GetConfig()), EventRecorder: ctx.Mgr.GetEventRecorderFor(status.ControllerName), PredicateFunc: helper.NewClusterPredicateOnAgent(ctx.Opts.ClusterName), TypedInformerManager: typedManager.GetInstance(), GenericInformerManager: genericmanager.GetInstance(), StopChan: ctx.StopChan, ClusterClientSetFunc: util.NewClusterClientSetForAgent, ClusterDynamicClientSetFunc: util.NewClusterDynamicClientSetForAgent, ClusterClientOption: &util.ClientOption{QPS: ctx.Opts.ClusterAPIQPS, Burst: ctx.Opts.ClusterAPIBurst}, ClusterStatusUpdateFrequency: ctx.Opts.ClusterStatusUpdateFrequency, ClusterLeaseDuration: ctx.Opts.ClusterLeaseDuration, ClusterLeaseRenewIntervalFraction: ctx.Opts.ClusterLeaseRenewIntervalFraction, ClusterSuccessThreshold: ctx.Opts.ClusterSuccessThreshold, ClusterFailureThreshold: ctx.Opts.ClusterFailureThreshold, ClusterCacheSyncTimeout: ctx.Opts.ClusterCacheSyncTimeout, RateLimiterOptions: ctx.Opts.RateLimiterOptions, } if err := clusterStatusController.SetupWithManager(ctx.Mgr); err != nil { return false, err } return true, nil } func startExecutionController(ctx controllerscontext.Context) (bool, error) { executionController := &execution.Controller{ Client: ctx.Mgr.GetClient(), EventRecorder: ctx.Mgr.GetEventRecorderFor(execution.ControllerName), RESTMapper: ctx.Mgr.GetRESTMapper(), ObjectWatcher: ctx.ObjectWatcher, PredicateFunc: helper.NewExecutionPredicateOnAgent(), InformerManager: genericmanager.GetInstance(), RatelimiterOptions: ctx.Opts.RateLimiterOptions, } if err := executionController.SetupWithManager(ctx.Mgr); err != nil { return false, err } return true, nil } func startWorkStatusController(ctx controllerscontext.Context) (bool, error) { workStatusController := &status.WorkStatusController{ Client: ctx.Mgr.GetClient(), EventRecorder: ctx.Mgr.GetEventRecorderFor(status.WorkStatusControllerName), RESTMapper: ctx.Mgr.GetRESTMapper(), InformerManager: genericmanager.GetInstance(), StopChan: ctx.StopChan, ObjectWatcher: ctx.ObjectWatcher, PredicateFunc: helper.NewExecutionPredicateOnAgent(), ClusterClientSetFunc: util.NewClusterDynamicClientSetForAgent, ClusterCacheSyncTimeout: ctx.Opts.ClusterCacheSyncTimeout, ConcurrentWorkStatusSyncs: ctx.Opts.ConcurrentWorkSyncs, RateLimiterOptions: ctx.Opts.RateLimiterOptions, ResourceInterpreter: ctx.ResourceInterpreter, } workStatusController.RunWorkQueue() if err := workStatusController.SetupWithManager(ctx.Mgr); err != nil { return false, err } return true, nil } func startServiceExportController(ctx controllerscontext.Context) (bool, error) { serviceExportController := &mcs.ServiceExportController{ Client: ctx.Mgr.GetClient(), EventRecorder: ctx.Mgr.GetEventRecorderFor(mcs.ServiceExportControllerName), RESTMapper: ctx.Mgr.GetRESTMapper(), InformerManager: genericmanager.GetInstance(), StopChan: ctx.StopChan, WorkerNumber: 3, PredicateFunc: helper.NewPredicateForServiceExportControllerOnAgent(ctx.Opts.ClusterName), ClusterDynamicClientSetFunc: util.NewClusterDynamicClientSetForAgent, ClusterCacheSyncTimeout: ctx.Opts.ClusterCacheSyncTimeout, } serviceExportController.RunWorkQueue() if err := serviceExportController.SetupWithManager(ctx.Mgr); err != nil { return false, err } return true, nil } func generateClusterInControllerPlane(opts util.ClusterRegisterOption) (*clusterv1alpha1.Cluster, error) { clusterObj := &clusterv1alpha1.Cluster{ObjectMeta: metav1.ObjectMeta{Name: opts.ClusterName}} mutateFunc := func(cluster *clusterv1alpha1.Cluster) { cluster.Spec.SyncMode = clusterv1alpha1.Pull cluster.Spec.APIEndpoint = opts.ClusterAPIEndpoint cluster.Spec.ProxyURL = opts.ProxyServerAddress cluster.Spec.ID = opts.ClusterID if opts.ClusterProvider != "" { cluster.Spec.Provider = opts.ClusterProvider } if opts.ClusterZone != "" { cluster.Spec.Zone = opts.ClusterZone } if opts.ClusterRegion != "" { cluster.Spec.Region = opts.ClusterRegion } if opts.ClusterConfig.TLSClientConfig.Insecure { cluster.Spec.InsecureSkipTLSVerification = true } if opts.ClusterConfig.Proxy != nil { url, err := opts.ClusterConfig.Proxy(nil) if err != nil { klog.Errorf("clusterConfig.Proxy error, %v", err) } else { cluster.Spec.ProxyURL = url.String() } } if opts.IsKubeCredentialsEnabled() { cluster.Spec.SecretRef = &clusterv1alpha1.LocalSecretReference{ Namespace: opts.Secret.Namespace, Name: opts.Secret.Name, } } if opts.IsKubeImpersonatorEnabled() { cluster.Spec.ImpersonatorSecretRef = &clusterv1alpha1.LocalSecretReference{ Namespace: opts.ImpersonatorSecret.Namespace, Name: opts.ImpersonatorSecret.Name, } } } controlPlaneKarmadaClient := karmadaclientset.NewForConfigOrDie(opts.ControlPlaneConfig) cluster, err := util.CreateOrUpdateClusterObject(controlPlaneKarmadaClient, clusterObj, mutateFunc) if err != nil { klog.Errorf("Failed to create cluster(%s) object, error: %v", clusterObj.Name, err) return nil, err } return cluster, nil }