399 lines
16 KiB
Go
399 lines
16 KiB
Go
package app
|
|
|
|
import (
|
|
"context"
|
|
"flag"
|
|
"fmt"
|
|
"net"
|
|
"strconv"
|
|
|
|
"github.com/spf13/cobra"
|
|
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
|
"k8s.io/apimachinery/pkg/util/sets"
|
|
"k8s.io/client-go/dynamic"
|
|
kubeclientset "k8s.io/client-go/kubernetes"
|
|
"k8s.io/client-go/tools/clientcmd"
|
|
cliflag "k8s.io/component-base/cli/flag"
|
|
"k8s.io/component-base/term"
|
|
"k8s.io/klog/v2"
|
|
controllerruntime "sigs.k8s.io/controller-runtime"
|
|
"sigs.k8s.io/controller-runtime/pkg/cache"
|
|
"sigs.k8s.io/controller-runtime/pkg/config/v1alpha1"
|
|
"sigs.k8s.io/controller-runtime/pkg/healthz"
|
|
|
|
"github.com/karmada-io/karmada/cmd/agent/app/options"
|
|
clusterv1alpha1 "github.com/karmada-io/karmada/pkg/apis/cluster/v1alpha1"
|
|
workv1alpha1 "github.com/karmada-io/karmada/pkg/apis/work/v1alpha1"
|
|
controllerscontext "github.com/karmada-io/karmada/pkg/controllers/context"
|
|
"github.com/karmada-io/karmada/pkg/controllers/execution"
|
|
"github.com/karmada-io/karmada/pkg/controllers/mcs"
|
|
"github.com/karmada-io/karmada/pkg/controllers/status"
|
|
karmadaclientset "github.com/karmada-io/karmada/pkg/generated/clientset/versioned"
|
|
"github.com/karmada-io/karmada/pkg/karmadactl"
|
|
"github.com/karmada-io/karmada/pkg/resourceinterpreter"
|
|
"github.com/karmada-io/karmada/pkg/sharedcli"
|
|
"github.com/karmada-io/karmada/pkg/sharedcli/klogflag"
|
|
"github.com/karmada-io/karmada/pkg/sharedcli/profileflag"
|
|
"github.com/karmada-io/karmada/pkg/util"
|
|
"github.com/karmada-io/karmada/pkg/util/fedinformer"
|
|
"github.com/karmada-io/karmada/pkg/util/fedinformer/genericmanager"
|
|
"github.com/karmada-io/karmada/pkg/util/fedinformer/typedmanager"
|
|
"github.com/karmada-io/karmada/pkg/util/gclient"
|
|
"github.com/karmada-io/karmada/pkg/util/helper"
|
|
"github.com/karmada-io/karmada/pkg/util/names"
|
|
"github.com/karmada-io/karmada/pkg/util/objectwatcher"
|
|
"github.com/karmada-io/karmada/pkg/util/restmapper"
|
|
"github.com/karmada-io/karmada/pkg/version"
|
|
"github.com/karmada-io/karmada/pkg/version/sharedcommand"
|
|
)
|
|
|
|
// NewAgentCommand creates a *cobra.Command object with default parameters
|
|
func NewAgentCommand(ctx context.Context) *cobra.Command {
|
|
opts := options.NewOptions()
|
|
karmadaConfig := karmadactl.NewKarmadaConfig(clientcmd.NewDefaultPathOptions())
|
|
|
|
cmd := &cobra.Command{
|
|
Use: "karmada-agent",
|
|
Long: `The karmada agent runs the cluster registration agent`,
|
|
RunE: func(cmd *cobra.Command, args []string) error {
|
|
// validate options
|
|
if errs := opts.Validate(); len(errs) != 0 {
|
|
return errs.ToAggregate()
|
|
}
|
|
if err := run(ctx, karmadaConfig, opts); err != nil {
|
|
return err
|
|
}
|
|
return nil
|
|
},
|
|
Args: func(cmd *cobra.Command, args []string) error {
|
|
for _, arg := range args {
|
|
if len(arg) > 0 {
|
|
return fmt.Errorf("%q does not take any arguments, got %q", cmd.CommandPath(), args)
|
|
}
|
|
}
|
|
return nil
|
|
},
|
|
}
|
|
|
|
fss := cliflag.NamedFlagSets{}
|
|
|
|
genericFlagSet := fss.FlagSet("generic")
|
|
genericFlagSet.AddGoFlagSet(flag.CommandLine)
|
|
opts.AddFlags(genericFlagSet, controllers.ControllerNames())
|
|
|
|
// Set klog flags
|
|
logsFlagSet := fss.FlagSet("logs")
|
|
klogflag.Add(logsFlagSet)
|
|
|
|
cmd.AddCommand(sharedcommand.NewCmdVersion("karmada-agent"))
|
|
cmd.Flags().AddFlagSet(genericFlagSet)
|
|
cmd.Flags().AddFlagSet(logsFlagSet)
|
|
|
|
cols, _, _ := term.TerminalSize(cmd.OutOrStdout())
|
|
sharedcli.SetUsageAndHelpFunc(cmd, fss, cols)
|
|
return cmd
|
|
}
|
|
|
|
var controllers = make(controllerscontext.Initializers)
|
|
|
|
var controllersDisabledByDefault = sets.NewString()
|
|
|
|
func init() {
|
|
controllers["clusterStatus"] = startClusterStatusController
|
|
controllers["execution"] = startExecutionController
|
|
controllers["workStatus"] = startWorkStatusController
|
|
controllers["serviceExport"] = startServiceExportController
|
|
}
|
|
|
|
func run(ctx context.Context, karmadaConfig karmadactl.KarmadaConfig, opts *options.Options) error {
|
|
klog.Infof("karmada-agent version: %s", version.Get())
|
|
|
|
profileflag.ListenAndServe(opts.ProfileOpts)
|
|
|
|
controlPlaneRestConfig, err := karmadaConfig.GetRestConfig(opts.KarmadaContext, opts.KarmadaKubeConfig)
|
|
if err != nil {
|
|
return fmt.Errorf("error building kubeconfig of karmada control plane: %w", err)
|
|
}
|
|
controlPlaneRestConfig.QPS, controlPlaneRestConfig.Burst = opts.KubeAPIQPS, opts.KubeAPIBurst
|
|
|
|
clusterConfig, err := controllerruntime.GetConfig()
|
|
if err != nil {
|
|
return fmt.Errorf("error building kubeconfig of member cluster: %w", err)
|
|
}
|
|
clusterKubeClient := kubeclientset.NewForConfigOrDie(clusterConfig)
|
|
controlPlaneKubeClient := kubeclientset.NewForConfigOrDie(controlPlaneRestConfig)
|
|
karmadaClient := karmadaclientset.NewForConfigOrDie(controlPlaneRestConfig)
|
|
|
|
registerOption := util.ClusterRegisterOption{
|
|
ClusterNamespace: opts.ClusterNamespace,
|
|
ClusterName: opts.ClusterName,
|
|
ReportSecrets: opts.ReportSecrets,
|
|
ClusterAPIEndpoint: opts.ClusterAPIEndpoint,
|
|
ProxyServerAddress: opts.ProxyServerAddress,
|
|
ClusterProvider: opts.ClusterProvider,
|
|
ClusterRegion: opts.ClusterRegion,
|
|
DryRun: false,
|
|
ControlPlaneConfig: controlPlaneRestConfig,
|
|
ClusterConfig: clusterConfig,
|
|
}
|
|
|
|
id, err := util.ObtainClusterID(clusterKubeClient)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
ok, name, err := util.IsClusterIdentifyUnique(karmadaClient, id)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
if !ok && opts.ClusterName != name {
|
|
return fmt.Errorf("the same cluster has been registered with name %s", name)
|
|
}
|
|
|
|
registerOption.ClusterID = id
|
|
|
|
clusterSecret, impersonatorSecret, err := util.ObtainCredentialsFromMemberCluster(clusterKubeClient, registerOption)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
registerOption.Secret = *clusterSecret
|
|
registerOption.ImpersonatorSecret = *impersonatorSecret
|
|
err = util.RegisterClusterInControllerPlane(registerOption, controlPlaneKubeClient, generateClusterInControllerPlane)
|
|
if err != nil {
|
|
return fmt.Errorf("failed to register with karmada control plane: %w", err)
|
|
}
|
|
|
|
executionSpace, err := names.GenerateExecutionSpaceName(opts.ClusterName)
|
|
if err != nil {
|
|
return fmt.Errorf("failed to generate execution space name for member cluster %s, err is %v", opts.ClusterName, err)
|
|
}
|
|
|
|
controllerManager, err := controllerruntime.NewManager(controlPlaneRestConfig, controllerruntime.Options{
|
|
Scheme: gclient.NewSchema(),
|
|
SyncPeriod: &opts.ResyncPeriod.Duration,
|
|
Namespace: executionSpace,
|
|
LeaderElection: opts.LeaderElection.LeaderElect,
|
|
LeaderElectionID: fmt.Sprintf("karmada-agent-%s", opts.ClusterName),
|
|
LeaderElectionNamespace: opts.LeaderElection.ResourceNamespace,
|
|
LeaderElectionResourceLock: opts.LeaderElection.ResourceLock,
|
|
LeaseDuration: &opts.LeaderElection.LeaseDuration.Duration,
|
|
RenewDeadline: &opts.LeaderElection.RenewDeadline.Duration,
|
|
RetryPeriod: &opts.LeaderElection.RetryPeriod.Duration,
|
|
HealthProbeBindAddress: net.JoinHostPort(opts.BindAddress, strconv.Itoa(opts.SecurePort)),
|
|
LivenessEndpointName: "/healthz",
|
|
MetricsBindAddress: opts.MetricsBindAddress,
|
|
MapperProvider: restmapper.MapperProvider,
|
|
Controller: v1alpha1.ControllerConfigurationSpec{
|
|
GroupKindConcurrency: map[string]int{
|
|
workv1alpha1.SchemeGroupVersion.WithKind("Work").GroupKind().String(): opts.ConcurrentWorkSyncs,
|
|
clusterv1alpha1.SchemeGroupVersion.WithKind("Cluster").GroupKind().String(): opts.ConcurrentClusterSyncs,
|
|
},
|
|
},
|
|
NewCache: cache.BuilderWithOptions(cache.Options{
|
|
DefaultTransform: fedinformer.StripUnusedFields,
|
|
}),
|
|
})
|
|
if err != nil {
|
|
return fmt.Errorf("failed to build controller manager: %w", err)
|
|
}
|
|
|
|
if err := controllerManager.AddHealthzCheck("ping", healthz.Ping); err != nil {
|
|
klog.Errorf("failed to add health check endpoint: %v", err)
|
|
return err
|
|
}
|
|
|
|
if err = setupControllers(controllerManager, opts, ctx.Done()); err != nil {
|
|
return err
|
|
}
|
|
|
|
// blocks until the context is done.
|
|
if err := controllerManager.Start(ctx); err != nil {
|
|
return fmt.Errorf("controller manager exits unexpectedly: %w", err)
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func setupControllers(mgr controllerruntime.Manager, opts *options.Options, stopChan <-chan struct{}) error {
|
|
restConfig := mgr.GetConfig()
|
|
dynamicClientSet := dynamic.NewForConfigOrDie(restConfig)
|
|
controlPlaneInformerManager := genericmanager.NewSingleClusterInformerManager(dynamicClientSet, 0, stopChan)
|
|
resourceInterpreter := resourceinterpreter.NewResourceInterpreter("", controlPlaneInformerManager)
|
|
if err := mgr.Add(resourceInterpreter); err != nil {
|
|
return fmt.Errorf("failed to setup custom resource interpreter: %w", err)
|
|
}
|
|
|
|
objectWatcher := objectwatcher.NewObjectWatcher(mgr.GetClient(), mgr.GetRESTMapper(), util.NewClusterDynamicClientSetForAgent, resourceInterpreter)
|
|
controllerContext := controllerscontext.Context{
|
|
Mgr: mgr,
|
|
ObjectWatcher: objectWatcher,
|
|
Opts: controllerscontext.Options{
|
|
Controllers: opts.Controllers,
|
|
ClusterName: opts.ClusterName,
|
|
ClusterStatusUpdateFrequency: opts.ClusterStatusUpdateFrequency,
|
|
ClusterLeaseDuration: opts.ClusterLeaseDuration,
|
|
ClusterLeaseRenewIntervalFraction: opts.ClusterLeaseRenewIntervalFraction,
|
|
ClusterSuccessThreshold: opts.ClusterSuccessThreshold,
|
|
ClusterFailureThreshold: opts.ClusterFailureThreshold,
|
|
ClusterCacheSyncTimeout: opts.ClusterCacheSyncTimeout,
|
|
ClusterAPIQPS: opts.ClusterAPIQPS,
|
|
ClusterAPIBurst: opts.ClusterAPIBurst,
|
|
ConcurrentWorkSyncs: opts.ConcurrentWorkSyncs,
|
|
RateLimiterOptions: opts.RateLimiterOpts,
|
|
EnableClusterResourceModeling: opts.EnableClusterResourceModeling,
|
|
},
|
|
StopChan: stopChan,
|
|
ResourceInterpreter: resourceInterpreter,
|
|
}
|
|
|
|
if err := controllers.StartControllers(controllerContext, controllersDisabledByDefault); err != nil {
|
|
return fmt.Errorf("error starting controllers: %w", err)
|
|
}
|
|
|
|
// Ensure the InformerManager stops when the stop channel closes
|
|
go func() {
|
|
<-stopChan
|
|
genericmanager.StopInstance()
|
|
}()
|
|
|
|
return nil
|
|
}
|
|
|
|
func startClusterStatusController(ctx controllerscontext.Context) (bool, error) {
|
|
clusterStatusController := &status.ClusterStatusController{
|
|
Client: ctx.Mgr.GetClient(),
|
|
KubeClient: kubeclientset.NewForConfigOrDie(ctx.Mgr.GetConfig()),
|
|
EventRecorder: ctx.Mgr.GetEventRecorderFor(status.ControllerName),
|
|
PredicateFunc: helper.NewClusterPredicateOnAgent(ctx.Opts.ClusterName),
|
|
TypedInformerManager: typedmanager.GetInstance(),
|
|
GenericInformerManager: genericmanager.GetInstance(),
|
|
StopChan: ctx.StopChan,
|
|
ClusterClientSetFunc: util.NewClusterClientSetForAgent,
|
|
ClusterDynamicClientSetFunc: util.NewClusterDynamicClientSetForAgent,
|
|
ClusterClientOption: &util.ClientOption{QPS: ctx.Opts.ClusterAPIQPS, Burst: ctx.Opts.ClusterAPIBurst},
|
|
ClusterStatusUpdateFrequency: ctx.Opts.ClusterStatusUpdateFrequency,
|
|
ClusterLeaseDuration: ctx.Opts.ClusterLeaseDuration,
|
|
ClusterLeaseRenewIntervalFraction: ctx.Opts.ClusterLeaseRenewIntervalFraction,
|
|
ClusterSuccessThreshold: ctx.Opts.ClusterSuccessThreshold,
|
|
ClusterFailureThreshold: ctx.Opts.ClusterFailureThreshold,
|
|
ClusterCacheSyncTimeout: ctx.Opts.ClusterCacheSyncTimeout,
|
|
RateLimiterOptions: ctx.Opts.RateLimiterOptions,
|
|
EnableClusterResourceModeling: ctx.Opts.EnableClusterResourceModeling,
|
|
}
|
|
if err := clusterStatusController.SetupWithManager(ctx.Mgr); err != nil {
|
|
return false, err
|
|
}
|
|
return true, nil
|
|
}
|
|
|
|
func startExecutionController(ctx controllerscontext.Context) (bool, error) {
|
|
executionController := &execution.Controller{
|
|
Client: ctx.Mgr.GetClient(),
|
|
EventRecorder: ctx.Mgr.GetEventRecorderFor(execution.ControllerName),
|
|
RESTMapper: ctx.Mgr.GetRESTMapper(),
|
|
ObjectWatcher: ctx.ObjectWatcher,
|
|
PredicateFunc: helper.NewExecutionPredicateOnAgent(),
|
|
InformerManager: genericmanager.GetInstance(),
|
|
RatelimiterOptions: ctx.Opts.RateLimiterOptions,
|
|
}
|
|
if err := executionController.SetupWithManager(ctx.Mgr); err != nil {
|
|
return false, err
|
|
}
|
|
return true, nil
|
|
}
|
|
|
|
func startWorkStatusController(ctx controllerscontext.Context) (bool, error) {
|
|
workStatusController := &status.WorkStatusController{
|
|
Client: ctx.Mgr.GetClient(),
|
|
EventRecorder: ctx.Mgr.GetEventRecorderFor(status.WorkStatusControllerName),
|
|
RESTMapper: ctx.Mgr.GetRESTMapper(),
|
|
InformerManager: genericmanager.GetInstance(),
|
|
StopChan: ctx.StopChan,
|
|
ObjectWatcher: ctx.ObjectWatcher,
|
|
PredicateFunc: helper.NewExecutionPredicateOnAgent(),
|
|
ClusterDynamicClientSetFunc: util.NewClusterDynamicClientSetForAgent,
|
|
ClusterCacheSyncTimeout: ctx.Opts.ClusterCacheSyncTimeout,
|
|
ConcurrentWorkStatusSyncs: ctx.Opts.ConcurrentWorkSyncs,
|
|
RateLimiterOptions: ctx.Opts.RateLimiterOptions,
|
|
ResourceInterpreter: ctx.ResourceInterpreter,
|
|
}
|
|
workStatusController.RunWorkQueue()
|
|
if err := workStatusController.SetupWithManager(ctx.Mgr); err != nil {
|
|
return false, err
|
|
}
|
|
return true, nil
|
|
}
|
|
|
|
func startServiceExportController(ctx controllerscontext.Context) (bool, error) {
|
|
serviceExportController := &mcs.ServiceExportController{
|
|
Client: ctx.Mgr.GetClient(),
|
|
EventRecorder: ctx.Mgr.GetEventRecorderFor(mcs.ServiceExportControllerName),
|
|
RESTMapper: ctx.Mgr.GetRESTMapper(),
|
|
InformerManager: genericmanager.GetInstance(),
|
|
StopChan: ctx.StopChan,
|
|
WorkerNumber: 3,
|
|
PredicateFunc: helper.NewPredicateForServiceExportControllerOnAgent(ctx.Opts.ClusterName),
|
|
ClusterDynamicClientSetFunc: util.NewClusterDynamicClientSetForAgent,
|
|
ClusterCacheSyncTimeout: ctx.Opts.ClusterCacheSyncTimeout,
|
|
}
|
|
serviceExportController.RunWorkQueue()
|
|
if err := serviceExportController.SetupWithManager(ctx.Mgr); err != nil {
|
|
return false, err
|
|
}
|
|
return true, nil
|
|
}
|
|
|
|
func generateClusterInControllerPlane(opts util.ClusterRegisterOption) (*clusterv1alpha1.Cluster, error) {
|
|
clusterObj := &clusterv1alpha1.Cluster{ObjectMeta: metav1.ObjectMeta{Name: opts.ClusterName}}
|
|
mutateFunc := func(cluster *clusterv1alpha1.Cluster) {
|
|
cluster.Spec.SyncMode = clusterv1alpha1.Pull
|
|
cluster.Spec.APIEndpoint = opts.ClusterAPIEndpoint
|
|
cluster.Spec.ProxyURL = opts.ProxyServerAddress
|
|
cluster.Spec.ID = opts.ClusterID
|
|
if opts.ClusterProvider != "" {
|
|
cluster.Spec.Provider = opts.ClusterProvider
|
|
}
|
|
|
|
if opts.ClusterZone != "" {
|
|
cluster.Spec.Zone = opts.ClusterZone
|
|
}
|
|
|
|
if opts.ClusterRegion != "" {
|
|
cluster.Spec.Region = opts.ClusterRegion
|
|
}
|
|
|
|
if opts.ClusterConfig.TLSClientConfig.Insecure {
|
|
cluster.Spec.InsecureSkipTLSVerification = true
|
|
}
|
|
if opts.ClusterConfig.Proxy != nil {
|
|
url, err := opts.ClusterConfig.Proxy(nil)
|
|
if err != nil {
|
|
klog.Errorf("clusterConfig.Proxy error, %v", err)
|
|
} else {
|
|
cluster.Spec.ProxyURL = url.String()
|
|
}
|
|
}
|
|
if opts.IsKubeCredentialsEnabled() {
|
|
cluster.Spec.SecretRef = &clusterv1alpha1.LocalSecretReference{
|
|
Namespace: opts.Secret.Namespace,
|
|
Name: opts.Secret.Name,
|
|
}
|
|
}
|
|
if opts.IsKubeImpersonatorEnabled() {
|
|
cluster.Spec.ImpersonatorSecretRef = &clusterv1alpha1.LocalSecretReference{
|
|
Namespace: opts.ImpersonatorSecret.Namespace,
|
|
Name: opts.ImpersonatorSecret.Name,
|
|
}
|
|
}
|
|
}
|
|
controlPlaneKarmadaClient := karmadaclientset.NewForConfigOrDie(opts.ControlPlaneConfig)
|
|
cluster, err := util.CreateOrUpdateClusterObject(controlPlaneKarmadaClient, clusterObj, mutateFunc)
|
|
if err != nil {
|
|
klog.Errorf("Failed to create cluster(%s) object, error: %v", clusterObj.Name, err)
|
|
return nil, err
|
|
}
|
|
|
|
return cluster, nil
|
|
}
|