From 89e3673fbbecfa9d1213e4c26f8fb3d4f4d3b503 Mon Sep 17 00:00:00 2001 From: lxtywypc Date: Wed, 16 Aug 2023 16:36:22 +0800 Subject: [PATCH] only update object in member cluster by execution controller Signed-off-by: lxtywypc --- cmd/agent/app/agent.go | 58 +- .../app/controllermanager.go | 57 +- pkg/controllers/context/context.go | 4 + .../execution/execution_controller.go | 201 +++- .../execution/execution_controller_test.go | 907 ++++++++++++++++++ .../status/work_status_controller.go | 250 +---- .../status/work_status_controller_test.go | 449 ++------- pkg/resourceinterpreter/fake/fake.go | 158 +++ pkg/util/fedinformer/handlers.go | 30 - pkg/util/lifted/objectwatcher.go | 14 +- .../memberclusterinformer.go | 156 +++ .../memberclusterinformer_test.go | 187 ++++ pkg/util/objectwatcher/objectwatcher.go | 193 ++-- pkg/util/objectwatcher/objectwatcher_test.go | 173 ++++ test/helper/resource.go | 42 + 15 files changed, 2087 insertions(+), 792 deletions(-) create mode 100644 pkg/controllers/execution/execution_controller_test.go create mode 100644 pkg/resourceinterpreter/fake/fake.go create mode 100644 pkg/util/memberclusterinformer/memberclusterinformer.go create mode 100644 pkg/util/memberclusterinformer/memberclusterinformer_test.go create mode 100644 pkg/util/objectwatcher/objectwatcher_test.go diff --git a/cmd/agent/app/agent.go b/cmd/agent/app/agent.go index 0a9caa7e4..8bb5a25c5 100644 --- a/cmd/agent/app/agent.go +++ b/cmd/agent/app/agent.go @@ -44,6 +44,7 @@ import ( "github.com/karmada-io/karmada/pkg/util/fedinformer/typedmanager" "github.com/karmada-io/karmada/pkg/util/gclient" "github.com/karmada-io/karmada/pkg/util/helper" + "github.com/karmada-io/karmada/pkg/util/memberclusterinformer" "github.com/karmada-io/karmada/pkg/util/names" "github.com/karmada-io/karmada/pkg/util/objectwatcher" "github.com/karmada-io/karmada/pkg/util/restmapper" @@ -217,7 +218,7 @@ func run(ctx context.Context, opts *options.Options) error { crtlmetrics.Registry.MustRegister(metrics.ResourceCollectorsForAgent()...) crtlmetrics.Registry.MustRegister(metrics.PoolCollectors()...) - if err = setupControllers(controllerManager, opts, ctx.Done()); err != nil { + if err = setupControllers(ctx, controllerManager, opts); err != nil { return err } @@ -229,18 +230,18 @@ func run(ctx context.Context, opts *options.Options) error { return nil } -func setupControllers(mgr controllerruntime.Manager, opts *options.Options, stopChan <-chan struct{}) error { +func setupControllers(ctx context.Context, mgr controllerruntime.Manager, opts *options.Options) error { restConfig := mgr.GetConfig() dynamicClientSet := dynamic.NewForConfigOrDie(restConfig) - controlPlaneInformerManager := genericmanager.NewSingleClusterInformerManager(dynamicClientSet, 0, stopChan) + controlPlaneInformerManager := genericmanager.NewSingleClusterInformerManager(dynamicClientSet, 0, ctx.Done()) controlPlaneKubeClientSet := kubeclientset.NewForConfigOrDie(restConfig) // We need a service lister to build a resource interpreter with `ClusterIPServiceResolver` // witch allows connection to the customized interpreter webhook without a cluster DNS service. sharedFactory := informers.NewSharedInformerFactory(controlPlaneKubeClientSet, 0) serviceLister := sharedFactory.Core().V1().Services().Lister() - sharedFactory.Start(stopChan) - sharedFactory.WaitForCacheSync(stopChan) + sharedFactory.Start(ctx.Done()) + sharedFactory.WaitForCacheSync(ctx.Done()) resourceInterpreter := resourceinterpreter.NewResourceInterpreter(controlPlaneInformerManager, serviceLister) if err := mgr.Add(resourceInterpreter); err != nil { @@ -248,8 +249,11 @@ func setupControllers(mgr controllerruntime.Manager, opts *options.Options, stop } objectWatcher := objectwatcher.NewObjectWatcher(mgr.GetClient(), mgr.GetRESTMapper(), util.NewClusterDynamicClientSetForAgent, resourceInterpreter) + memberClusterInformer := memberclusterinformer.NewMemberClusterInformer(mgr.GetClient(), mgr.GetRESTMapper(), genericmanager.GetInstance(), opts.ClusterCacheSyncTimeout, util.NewClusterDynamicClientSetForAgent) + controllerContext := controllerscontext.Context{ Mgr: mgr, + Ctx: ctx, ObjectWatcher: objectWatcher, Opts: controllerscontext.Options{ Controllers: opts.Controllers, @@ -269,8 +273,9 @@ func setupControllers(mgr controllerruntime.Manager, opts *options.Options, stop CertRotationRemainingTimeThreshold: opts.CertRotationRemainingTimeThreshold, KarmadaKubeconfigNamespace: opts.KarmadaKubeconfigNamespace, }, - StopChan: stopChan, - ResourceInterpreter: resourceInterpreter, + StopChan: ctx.Done(), + ResourceInterpreter: resourceInterpreter, + MemberClusterInformer: memberClusterInformer, } if err := controllers.StartControllers(controllerContext, controllersDisabledByDefault); err != nil { @@ -279,7 +284,7 @@ func setupControllers(mgr controllerruntime.Manager, opts *options.Options, stop // Ensure the InformerManager stops when the stop channel closes go func() { - <-stopChan + <-ctx.Done() genericmanager.StopInstance() }() @@ -315,14 +320,17 @@ func startClusterStatusController(ctx controllerscontext.Context) (bool, error) func startExecutionController(ctx controllerscontext.Context) (bool, error) { executionController := &execution.Controller{ - Client: ctx.Mgr.GetClient(), - EventRecorder: ctx.Mgr.GetEventRecorderFor(execution.ControllerName), - RESTMapper: ctx.Mgr.GetRESTMapper(), - ObjectWatcher: ctx.ObjectWatcher, - PredicateFunc: helper.NewExecutionPredicateOnAgent(), - InformerManager: genericmanager.GetInstance(), - RatelimiterOptions: ctx.Opts.RateLimiterOptions, + Ctx: ctx.Ctx, + Client: ctx.Mgr.GetClient(), + EventRecorder: ctx.Mgr.GetEventRecorderFor(execution.ControllerName), + ObjectWatcher: ctx.ObjectWatcher, + PredicateFunc: helper.NewExecutionPredicateOnAgent(), + RatelimiterOptions: ctx.Opts.RateLimiterOptions, + ConcurrentWorkSyncs: ctx.Opts.ConcurrentWorkSyncs, + StopChan: ctx.StopChan, + MemberClusterInformer: ctx.MemberClusterInformer, } + executionController.RunWorkQueue() if err := executionController.SetupWithManager(ctx.Mgr); err != nil { return false, err } @@ -331,18 +339,14 @@ func startExecutionController(ctx controllerscontext.Context) (bool, error) { func startWorkStatusController(ctx controllerscontext.Context) (bool, error) { workStatusController := &status.WorkStatusController{ - Client: ctx.Mgr.GetClient(), - EventRecorder: ctx.Mgr.GetEventRecorderFor(status.WorkStatusControllerName), - RESTMapper: ctx.Mgr.GetRESTMapper(), - InformerManager: genericmanager.GetInstance(), - StopChan: ctx.StopChan, - ObjectWatcher: ctx.ObjectWatcher, - PredicateFunc: helper.NewExecutionPredicateOnAgent(), - ClusterDynamicClientSetFunc: util.NewClusterDynamicClientSetForAgent, - ClusterCacheSyncTimeout: ctx.Opts.ClusterCacheSyncTimeout, - ConcurrentWorkStatusSyncs: ctx.Opts.ConcurrentWorkSyncs, - RateLimiterOptions: ctx.Opts.RateLimiterOptions, - ResourceInterpreter: ctx.ResourceInterpreter, + Client: ctx.Mgr.GetClient(), + EventRecorder: ctx.Mgr.GetEventRecorderFor(status.WorkStatusControllerName), + StopChan: ctx.StopChan, + PredicateFunc: helper.NewExecutionPredicateOnAgent(), + ConcurrentWorkStatusSyncs: ctx.Opts.ConcurrentWorkSyncs, + RateLimiterOptions: ctx.Opts.RateLimiterOptions, + ResourceInterpreter: ctx.ResourceInterpreter, + MemberClusterInformer: ctx.MemberClusterInformer, } workStatusController.RunWorkQueue() if err := workStatusController.SetupWithManager(ctx.Mgr); err != nil { diff --git a/cmd/controller-manager/app/controllermanager.go b/cmd/controller-manager/app/controllermanager.go index 854f4928b..0635d7eb8 100644 --- a/cmd/controller-manager/app/controllermanager.go +++ b/cmd/controller-manager/app/controllermanager.go @@ -63,6 +63,7 @@ import ( "github.com/karmada-io/karmada/pkg/util/fedinformer/typedmanager" "github.com/karmada-io/karmada/pkg/util/gclient" "github.com/karmada-io/karmada/pkg/util/helper" + "github.com/karmada-io/karmada/pkg/util/memberclusterinformer" "github.com/karmada-io/karmada/pkg/util/objectwatcher" "github.com/karmada-io/karmada/pkg/util/overridemanager" "github.com/karmada-io/karmada/pkg/util/restmapper" @@ -170,7 +171,7 @@ func Run(ctx context.Context, opts *options.Options) error { crtlmetrics.Registry.MustRegister(metrics.ResourceCollectors()...) crtlmetrics.Registry.MustRegister(metrics.PoolCollectors()...) - setupControllers(controllerManager, opts, ctx.Done()) + setupControllers(ctx, controllerManager, opts) // blocks until the context is done. if err := controllerManager.Start(ctx); err != nil { @@ -369,14 +370,17 @@ func startBindingStatusController(ctx controllerscontext.Context) (enabled bool, func startExecutionController(ctx controllerscontext.Context) (enabled bool, err error) { executionController := &execution.Controller{ - Client: ctx.Mgr.GetClient(), - EventRecorder: ctx.Mgr.GetEventRecorderFor(execution.ControllerName), - RESTMapper: ctx.Mgr.GetRESTMapper(), - ObjectWatcher: ctx.ObjectWatcher, - PredicateFunc: helper.NewExecutionPredicate(ctx.Mgr), - InformerManager: genericmanager.GetInstance(), - RatelimiterOptions: ctx.Opts.RateLimiterOptions, + Ctx: ctx.Ctx, + Client: ctx.Mgr.GetClient(), + EventRecorder: ctx.Mgr.GetEventRecorderFor(execution.ControllerName), + ObjectWatcher: ctx.ObjectWatcher, + PredicateFunc: helper.NewExecutionPredicate(ctx.Mgr), + RatelimiterOptions: ctx.Opts.RateLimiterOptions, + ConcurrentWorkSyncs: ctx.Opts.ConcurrentWorkSyncs, + StopChan: ctx.StopChan, + MemberClusterInformer: ctx.MemberClusterInformer, } + executionController.RunWorkQueue() if err := executionController.SetupWithManager(ctx.Mgr); err != nil { return false, err } @@ -386,18 +390,14 @@ func startExecutionController(ctx controllerscontext.Context) (enabled bool, err func startWorkStatusController(ctx controllerscontext.Context) (enabled bool, err error) { opts := ctx.Opts workStatusController := &status.WorkStatusController{ - Client: ctx.Mgr.GetClient(), - EventRecorder: ctx.Mgr.GetEventRecorderFor(status.WorkStatusControllerName), - RESTMapper: ctx.Mgr.GetRESTMapper(), - InformerManager: genericmanager.GetInstance(), - StopChan: ctx.StopChan, - ObjectWatcher: ctx.ObjectWatcher, - PredicateFunc: helper.NewExecutionPredicate(ctx.Mgr), - ClusterDynamicClientSetFunc: util.NewClusterDynamicClientSet, - ClusterCacheSyncTimeout: opts.ClusterCacheSyncTimeout, - ConcurrentWorkStatusSyncs: opts.ConcurrentWorkSyncs, - RateLimiterOptions: ctx.Opts.RateLimiterOptions, - ResourceInterpreter: ctx.ResourceInterpreter, + Client: ctx.Mgr.GetClient(), + EventRecorder: ctx.Mgr.GetEventRecorderFor(status.WorkStatusControllerName), + StopChan: ctx.StopChan, + PredicateFunc: helper.NewExecutionPredicate(ctx.Mgr), + ConcurrentWorkStatusSyncs: opts.ConcurrentWorkSyncs, + RateLimiterOptions: ctx.Opts.RateLimiterOptions, + ResourceInterpreter: ctx.ResourceInterpreter, + MemberClusterInformer: ctx.MemberClusterInformer, } workStatusController.RunWorkQueue() if err := workStatusController.SetupWithManager(ctx.Mgr); err != nil { @@ -592,7 +592,7 @@ func startCronFederatedHorizontalPodAutoscalerController(ctx controllerscontext. } // setupControllers initialize controllers and setup one by one. -func setupControllers(mgr controllerruntime.Manager, opts *options.Options, stopChan <-chan struct{}) { +func setupControllers(ctx context.Context, mgr controllerruntime.Manager, opts *options.Options) { restConfig := mgr.GetConfig() dynamicClientSet := dynamic.NewForConfigOrDie(restConfig) discoverClientSet := discovery.NewDiscoveryClientForConfigOrDie(restConfig) @@ -605,13 +605,13 @@ func setupControllers(mgr controllerruntime.Manager, opts *options.Options, stop return } - controlPlaneInformerManager := genericmanager.NewSingleClusterInformerManager(dynamicClientSet, 0, stopChan) + controlPlaneInformerManager := genericmanager.NewSingleClusterInformerManager(dynamicClientSet, 0, ctx.Done()) // We need a service lister to build a resource interpreter with `ClusterIPServiceResolver` // witch allows connection to the customized interpreter webhook without a cluster DNS service. sharedFactory := informers.NewSharedInformerFactory(kubeClientSet, 0) serviceLister := sharedFactory.Core().V1().Services().Lister() - sharedFactory.Start(stopChan) - sharedFactory.WaitForCacheSync(stopChan) + sharedFactory.Start(ctx.Done()) + sharedFactory.WaitForCacheSync(ctx.Done()) resourceInterpreter := resourceinterpreter.NewResourceInterpreter(controlPlaneInformerManager, serviceLister) if err := mgr.Add(resourceInterpreter); err != nil { @@ -619,6 +619,7 @@ func setupControllers(mgr controllerruntime.Manager, opts *options.Options, stop } objectWatcher := objectwatcher.NewObjectWatcher(mgr.GetClient(), mgr.GetRESTMapper(), util.NewClusterDynamicClientSet, resourceInterpreter) + memberClusterInformer := memberclusterinformer.NewMemberClusterInformer(mgr.GetClient(), mgr.GetRESTMapper(), genericmanager.GetInstance(), opts.ClusterCacheSyncTimeout, util.NewClusterDynamicClientSet) resourceDetector := &detector.ResourceDetector{ DiscoveryClientSet: discoverClientSet, @@ -653,9 +654,10 @@ func setupControllers(mgr controllerruntime.Manager, opts *options.Options, stop klog.Fatalf("Failed to setup dependencies distributor: %v", err) } } - setupClusterAPIClusterDetector(mgr, opts, stopChan) + setupClusterAPIClusterDetector(mgr, opts, ctx.Done()) controllerContext := controllerscontext.Context{ Mgr: mgr, + Ctx: ctx, ObjectWatcher: objectWatcher, Opts: controllerscontext.Options{ Controllers: opts.Controllers, @@ -679,12 +681,13 @@ func setupControllers(mgr controllerruntime.Manager, opts *options.Options, stop EnableClusterResourceModeling: opts.EnableClusterResourceModeling, HPAControllerConfiguration: opts.HPAControllerConfiguration, }, - StopChan: stopChan, + StopChan: ctx.Done(), DynamicClientSet: dynamicClientSet, KubeClientSet: kubeClientSet, OverrideManager: overrideManager, ControlPlaneInformerManager: controlPlaneInformerManager, ResourceInterpreter: resourceInterpreter, + MemberClusterInformer: memberClusterInformer, } if err := controllers.StartControllers(controllerContext, controllersDisabledByDefault); err != nil { @@ -693,7 +696,7 @@ func setupControllers(mgr controllerruntime.Manager, opts *options.Options, stop // Ensure the InformerManager stops when the stop channel closes go func() { - <-stopChan + <-ctx.Done() genericmanager.StopInstance() }() } diff --git a/pkg/controllers/context/context.go b/pkg/controllers/context/context.go index 683128029..5ffdf40a4 100644 --- a/pkg/controllers/context/context.go +++ b/pkg/controllers/context/context.go @@ -1,6 +1,7 @@ package context import ( + "context" "regexp" "time" @@ -15,6 +16,7 @@ import ( "github.com/karmada-io/karmada/pkg/resourceinterpreter" "github.com/karmada-io/karmada/pkg/sharedcli/ratelimiterflag" "github.com/karmada-io/karmada/pkg/util/fedinformer/genericmanager" + "github.com/karmada-io/karmada/pkg/util/memberclusterinformer" "github.com/karmada-io/karmada/pkg/util/objectwatcher" "github.com/karmada-io/karmada/pkg/util/overridemanager" ) @@ -88,6 +90,7 @@ type Options struct { // Context defines the context object for controller. type Context struct { Mgr controllerruntime.Manager + Ctx context.Context ObjectWatcher objectwatcher.ObjectWatcher Opts Options StopChan <-chan struct{} @@ -96,6 +99,7 @@ type Context struct { OverrideManager overridemanager.OverrideManager ControlPlaneInformerManager genericmanager.SingleClusterInformerManager ResourceInterpreter resourceinterpreter.ResourceInterpreter + MemberClusterInformer memberclusterinformer.MemberClusterInformer } // IsControllerEnabled check if a specified controller enabled or not. diff --git a/pkg/controllers/execution/execution_controller.go b/pkg/controllers/execution/execution_controller.go index 3a3ba5724..fc96aed95 100644 --- a/pkg/controllers/execution/execution_controller.go +++ b/pkg/controllers/execution/execution_controller.go @@ -3,6 +3,7 @@ package execution import ( "context" "fmt" + "reflect" "time" corev1 "k8s.io/api/core/v1" @@ -10,7 +11,9 @@ import ( "k8s.io/apimachinery/pkg/api/meta" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" + "k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/util/errors" + "k8s.io/client-go/tools/cache" "k8s.io/client-go/tools/record" "k8s.io/client-go/util/retry" "k8s.io/klog/v2" @@ -26,9 +29,10 @@ import ( "github.com/karmada-io/karmada/pkg/metrics" "github.com/karmada-io/karmada/pkg/sharedcli/ratelimiterflag" "github.com/karmada-io/karmada/pkg/util" - "github.com/karmada-io/karmada/pkg/util/fedinformer/genericmanager" + "github.com/karmada-io/karmada/pkg/util/fedinformer" "github.com/karmada-io/karmada/pkg/util/fedinformer/keys" "github.com/karmada-io/karmada/pkg/util/helper" + "github.com/karmada-io/karmada/pkg/util/memberclusterinformer" "github.com/karmada-io/karmada/pkg/util/names" "github.com/karmada-io/karmada/pkg/util/objectwatcher" ) @@ -41,12 +45,19 @@ const ( // Controller is to sync Work. type Controller struct { client.Client // used to operate Work resources. + Ctx context.Context EventRecorder record.EventRecorder - RESTMapper meta.RESTMapper ObjectWatcher objectwatcher.ObjectWatcher PredicateFunc predicate.Predicate - InformerManager genericmanager.MultiClusterInformerManager RatelimiterOptions ratelimiterflag.Options + + // Extend execution controller like work status controller to handle event from member cluster. + worker util.AsyncWorker // worker process resources periodic from rateLimitingQueue. + // ConcurrentExecutionSyncs is the number of object that are allowed to sync concurrently. + ConcurrentWorkSyncs int + eventHandler cache.ResourceEventHandler // eventHandler knows how to handle events from the member cluster. + StopChan <-chan struct{} + MemberClusterInformer memberclusterinformer.MemberClusterInformer } // Reconcile performs a full reconciliation for the object referred to by the Request. @@ -65,6 +76,12 @@ func (c *Controller) Reconcile(ctx context.Context, req controllerruntime.Reques return controllerruntime.Result{Requeue: true}, err } + // Enqueue to try removing object in member cluster if work deleted. + if !work.DeletionTimestamp.IsZero() { + c.worker.Add(req) + return controllerruntime.Result{}, nil + } + clusterName, err := names.GetClusterName(work.Namespace) if err != nil { klog.Errorf("Failed to get member cluster name for work %s/%s", work.Namespace, work.Name) @@ -77,31 +94,76 @@ func (c *Controller) Reconcile(ctx context.Context, req controllerruntime.Reques return controllerruntime.Result{Requeue: true}, err } - if !work.DeletionTimestamp.IsZero() { - // Abort deleting workload if cluster is unready when unjoining cluster, otherwise the unjoin process will be failed. - if util.IsClusterReady(&cluster.Status) { - err := c.tryDeleteWorkload(clusterName, work) - if err != nil { - klog.Errorf("Failed to delete work %v, namespace is %v, err is %v", work.Name, work.Namespace, err) - return controllerruntime.Result{Requeue: true}, err - } - } else if cluster.DeletionTimestamp.IsZero() { // cluster is unready, but not terminating - return controllerruntime.Result{Requeue: true}, fmt.Errorf("cluster(%s) not ready", cluster.Name) - } - - return c.removeFinalizer(work) - } - if !util.IsClusterReady(&cluster.Status) { klog.Errorf("Stop sync work(%s/%s) for cluster(%s) as cluster not ready.", work.Namespace, work.Name, cluster.Name) return controllerruntime.Result{Requeue: true}, fmt.Errorf("cluster(%s) not ready", cluster.Name) } - return c.syncWork(clusterName, work) + err = c.MemberClusterInformer.BuildResourceInformers(cluster, work, c.eventHandler) + if err != nil { + return controllerruntime.Result{Requeue: true}, err + } + + c.worker.Add(req) + + return controllerruntime.Result{}, nil +} + +func (c *Controller) onUpdate(old, cur interface{}) { + oldObj := old.(*unstructured.Unstructured) + curObj := cur.(*unstructured.Unstructured) + + oldObjCopy := oldObj.DeepCopy() + curObjCopy := curObj.DeepCopy() + + clusterName, _ := names.GetClusterName(curObjCopy.GetLabels()[workv1alpha1.WorkNamespaceLabel]) + if clusterName == "" { + return + } + + if c.ObjectWatcher.NeedsUpdate(clusterName, oldObjCopy, curObjCopy) { + c.worker.Enqueue(curObj) + } +} + +func (c *Controller) onDelete(obj interface{}) { + curObj := obj.(*unstructured.Unstructured) + + c.worker.Enqueue(curObj) +} + +// RunWorkQueue initializes worker and run it, worker will process resource asynchronously. +func (c *Controller) RunWorkQueue() { + workerOptions := util.Options{ + Name: "work-execution", + KeyFunc: generateKey, + ReconcileFunc: c.syncWork, + } + c.worker = util.NewAsyncWorker(workerOptions) + c.worker.Run(c.ConcurrentWorkSyncs, c.StopChan) +} + +// generateKey generates a key from obj, the key contains cluster, GVK, namespace and name. +func generateKey(obj interface{}) (util.QueueKey, error) { + resource, ok := obj.(*unstructured.Unstructured) + if !ok { + return nil, fmt.Errorf("object is not unstructured") + } + + workName := util.GetLabelValue(resource.GetLabels(), workv1alpha1.WorkNameLabel) + workNamespace := util.GetLabelValue(resource.GetLabels(), workv1alpha1.WorkNamespaceLabel) + + if workName == "" || workNamespace == "" { + return nil, nil + } + + return controllerruntime.Request{NamespacedName: types.NamespacedName{Namespace: workNamespace, Name: workName}}, nil } // SetupWithManager creates a controller and register to controller manager. func (c *Controller) SetupWithManager(mgr controllerruntime.Manager) error { + c.eventHandler = fedinformer.NewHandlerOnEvents(nil, c.onUpdate, c.onDelete) + return controllerruntime.NewControllerManagedBy(mgr). For(&workv1alpha1.Work{}, builder.WithPredicates(c.PredicateFunc)). WithEventFilter(predicate.GenerationChangedPredicate{}). @@ -111,20 +173,71 @@ func (c *Controller) SetupWithManager(mgr controllerruntime.Manager) error { Complete(c) } -func (c *Controller) syncWork(clusterName string, work *workv1alpha1.Work) (controllerruntime.Result, error) { +func (c *Controller) syncWork(key util.QueueKey) error { + req, ok := key.(controllerruntime.Request) + if !ok { + klog.Warningf("Skip sync work for key(%+v) is not controllerruntime.Request type", key) + return nil + } + + klog.Infof("Begin to sync work %s/%s", req.Namespace, req.Name) + start := time.Now() - err := c.syncToClusters(clusterName, work) + + work := &workv1alpha1.Work{} + if err := c.Client.Get(c.Ctx, req.NamespacedName, work); err != nil { + // The resource may no longer exist, in which case we stop processing. + if apierrors.IsNotFound(err) { + return nil + } + + return err + } + + clusterName, err := names.GetClusterName(work.Namespace) + if err != nil { + klog.Errorf("Failed to get member cluster name for work %s/%s", work.Namespace, work.Name) + return err + } + + cluster, err := util.GetCluster(c.Client, clusterName) + if err != nil { + klog.Errorf("Failed to get the given member cluster %s", clusterName) + return err + } + + if !work.DeletionTimestamp.IsZero() { + // Abort deleting workload if cluster is unready when unjoining cluster, otherwise the unjoin process will be failed. + if util.IsClusterReady(&cluster.Status) { + err := c.tryDeleteWorkload(clusterName, work) + if err != nil { + klog.Errorf("Failed to delete work %v, namespace is %v, err is %v", work.Name, work.Namespace, err) + return err + } + } else if cluster.DeletionTimestamp.IsZero() { // cluster is unready, but not terminating + return fmt.Errorf("cluster(%s) not ready", cluster.Name) + } + + return c.removeFinalizer(work) + } + + if !util.IsClusterReady(&cluster.Status) { + klog.Errorf("Stop sync work(%s/%s) for cluster(%s) as cluster not ready.", work.Namespace, work.Name, cluster.Name) + return fmt.Errorf("cluster(%s) not ready", cluster.Name) + } + + err = c.syncToClusters(clusterName, work) metrics.ObserveSyncWorkloadLatency(err, start) if err != nil { msg := fmt.Sprintf("Failed to sync work(%s) to cluster(%s): %v", work.Name, clusterName, err) klog.Errorf(msg) c.EventRecorder.Event(work, corev1.EventTypeWarning, events.EventReasonSyncWorkloadFailed, msg) - return controllerruntime.Result{Requeue: true}, err + return err } - msg := fmt.Sprintf("Sync work (%s) to cluster(%s) successful.", work.Name, clusterName) + msg := fmt.Sprintf("Sync work(%s) to cluster(%s) successfully.", work.Name, clusterName) klog.V(4).Infof(msg) c.EventRecorder.Event(work, corev1.EventTypeNormal, events.EventReasonSyncWorkloadSucceed, msg) - return controllerruntime.Result{}, nil + return nil } // tryDeleteWorkload tries to delete resource in the given member cluster. @@ -143,7 +256,7 @@ func (c *Controller) tryDeleteWorkload(clusterName string, work *workv1alpha1.Wo return err } - clusterObj, err := helper.GetObjectFromCache(c.RESTMapper, c.InformerManager, fedKey) + clusterObj, err := c.MemberClusterInformer.GetObjectFromCache(fedKey) if err != nil { if apierrors.IsNotFound(err) { return nil @@ -153,7 +266,8 @@ func (c *Controller) tryDeleteWorkload(clusterName string, work *workv1alpha1.Wo } // Avoid deleting resources that not managed by karmada. - if util.GetLabelValue(clusterObj.GetLabels(), workv1alpha1.WorkNameLabel) != util.GetLabelValue(workload.GetLabels(), workv1alpha1.WorkNameLabel) { + if util.GetLabelValue(clusterObj.GetLabels(), workv1alpha1.WorkNameLabel) != util.GetLabelValue(workload.GetLabels(), workv1alpha1.WorkNameLabel) || + util.GetLabelValue(clusterObj.GetLabels(), workv1alpha1.WorkNamespaceLabel) != util.GetLabelValue(workload.GetLabels(), workv1alpha1.WorkNamespaceLabel) { klog.Infof("Abort deleting the resource(kind=%s, %s/%s) exists in cluster %v but not managed by karmada", clusterObj.GetKind(), clusterObj.GetNamespace(), clusterObj.GetName(), clusterName) return nil } @@ -169,17 +283,17 @@ func (c *Controller) tryDeleteWorkload(clusterName string, work *workv1alpha1.Wo } // removeFinalizer remove finalizer from the given Work -func (c *Controller) removeFinalizer(work *workv1alpha1.Work) (controllerruntime.Result, error) { +func (c *Controller) removeFinalizer(work *workv1alpha1.Work) error { if !controllerutil.ContainsFinalizer(work, util.ExecutionControllerFinalizer) { - return controllerruntime.Result{}, nil + return nil } controllerutil.RemoveFinalizer(work, util.ExecutionControllerFinalizer) err := c.Client.Update(context.TODO(), work) if err != nil { - return controllerruntime.Result{Requeue: true}, err + return err } - return controllerruntime.Result{}, nil + return nil } // syncToClusters ensures that the state of the given object is synchronized to member clusters. @@ -209,7 +323,7 @@ func (c *Controller) syncToClusters(clusterName string, work *workv1alpha1.Work) if len(errs) > 0 { total := len(work.Spec.Workload.Manifests) message := fmt.Sprintf("Failed to apply all manifests (%d/%d): %s", syncSucceedNum, total, errors.NewAggregate(errs).Error()) - err := c.updateAppliedCondition(work, metav1.ConditionFalse, "AppliedFailed", message) + err := c.updateAppliedConditionIfNeed(work, metav1.ConditionFalse, "AppliedFailed", message) if err != nil { klog.Errorf("Failed to update applied status for given work %v, namespace is %v, err is %v", work.Name, work.Namespace, err) errs = append(errs, err) @@ -217,7 +331,7 @@ func (c *Controller) syncToClusters(clusterName string, work *workv1alpha1.Work) return errors.NewAggregate(errs) } - err := c.updateAppliedCondition(work, metav1.ConditionTrue, "AppliedSuccessful", "Manifest has been successfully applied") + err := c.updateAppliedConditionIfNeed(work, metav1.ConditionTrue, "AppliedSuccessful", "Manifest has been successfully applied") if err != nil { klog.Errorf("Failed to update applied status for given work %v, namespace is %v, err is %v", work.Name, work.Namespace, err) return err @@ -233,7 +347,7 @@ func (c *Controller) tryCreateOrUpdateWorkload(clusterName string, workload *uns return err } - clusterObj, err := helper.GetObjectFromCache(c.RESTMapper, c.InformerManager, fedKey) + clusterObj, err := c.MemberClusterInformer.GetObjectFromCache(fedKey) if err != nil { if !apierrors.IsNotFound(err) { klog.Errorf("Failed to get resource %v from member cluster, err is %v ", workload.GetName(), err) @@ -255,8 +369,8 @@ func (c *Controller) tryCreateOrUpdateWorkload(clusterName string, workload *uns return nil } -// updateAppliedCondition update the Applied condition for the given Work -func (c *Controller) updateAppliedCondition(work *workv1alpha1.Work, status metav1.ConditionStatus, reason, message string) error { +// updateAppliedConditionIfNeed update the Applied condition for the given Work if the reason or message of Applied condition changed +func (c *Controller) updateAppliedConditionIfNeed(work *workv1alpha1.Work, status metav1.ConditionStatus, reason, message string) error { newWorkAppliedCondition := metav1.Condition{ Type: workv1alpha1.WorkApplied, Status: status, @@ -265,7 +379,24 @@ func (c *Controller) updateAppliedCondition(work *workv1alpha1.Work, status meta LastTransitionTime: metav1.Now(), } + // needUpdateCondition judges if the Applied condition needs to update. + needUpdateCondition := func() bool { + lastWorkAppliedCondition := meta.FindStatusCondition(work.Status.Conditions, workv1alpha1.WorkApplied).DeepCopy() + + if lastWorkAppliedCondition != nil { + lastWorkAppliedCondition.LastTransitionTime = newWorkAppliedCondition.LastTransitionTime + + return !reflect.DeepEqual(newWorkAppliedCondition, *lastWorkAppliedCondition) + } + + return true + } + return retry.RetryOnConflict(retry.DefaultRetry, func() (err error) { + if !needUpdateCondition() { + return nil + } + meta.SetStatusCondition(&work.Status.Conditions, newWorkAppliedCondition) updateErr := c.Status().Update(context.TODO(), work) if updateErr == nil { diff --git a/pkg/controllers/execution/execution_controller_test.go b/pkg/controllers/execution/execution_controller_test.go new file mode 100644 index 000000000..95b645d32 --- /dev/null +++ b/pkg/controllers/execution/execution_controller_test.go @@ -0,0 +1,907 @@ +package execution + +import ( + "context" + "testing" + "time" + + "github.com/stretchr/testify/assert" + corev1 "k8s.io/api/core/v1" + apierrors "k8s.io/apimachinery/pkg/api/errors" + "k8s.io/apimachinery/pkg/api/meta" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" + "k8s.io/apimachinery/pkg/runtime/schema" + "k8s.io/apimachinery/pkg/types" + "k8s.io/client-go/dynamic" + dynamicfake "k8s.io/client-go/dynamic/fake" + "k8s.io/client-go/kubernetes/scheme" + "k8s.io/client-go/tools/record" + controllerruntime "sigs.k8s.io/controller-runtime" + "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/client/fake" + + clusterv1alpha1 "github.com/karmada-io/karmada/pkg/apis/cluster/v1alpha1" + workv1alpha1 "github.com/karmada-io/karmada/pkg/apis/work/v1alpha1" + workv1alpha2 "github.com/karmada-io/karmada/pkg/apis/work/v1alpha2" + interpreterfake "github.com/karmada-io/karmada/pkg/resourceinterpreter/fake" + "github.com/karmada-io/karmada/pkg/sharedcli/ratelimiterflag" + "github.com/karmada-io/karmada/pkg/util" + "github.com/karmada-io/karmada/pkg/util/fedinformer/genericmanager" + "github.com/karmada-io/karmada/pkg/util/gclient" + "github.com/karmada-io/karmada/pkg/util/helper" + "github.com/karmada-io/karmada/pkg/util/memberclusterinformer" + "github.com/karmada-io/karmada/pkg/util/objectwatcher" + testhelper "github.com/karmada-io/karmada/test/helper" +) + +func TestExecutionController_RunWorkQueue(t *testing.T) { + c := Controller{ + Client: fake.NewClientBuilder().WithScheme(gclient.NewSchema()).WithObjects(testhelper.NewClusterWithTypeAndStatus("cluster", clusterv1alpha1.ClusterConditionReady, metav1.ConditionFalse)).Build(), + PredicateFunc: helper.NewClusterPredicateOnAgent("test"), + RatelimiterOptions: ratelimiterflag.Options{}, + } + + injectMemberClusterInformer(&c, genericmanager.GetInstance(), util.NewClusterDynamicClientSetForAgent) + + c.RunWorkQueue() +} + +func TestExecutionController_Reconcile(t *testing.T) { + tests := []struct { + name string + c Controller + dFunc func(clusterName string, client client.Client) (*util.DynamicClusterClient, error) + work *workv1alpha1.Work + ns string + expectRes controllerruntime.Result + existErr bool + }{ + { + name: "normal case", + c: Controller{ + Client: fake.NewClientBuilder().WithScheme(gclient.NewSchema()).WithObjects( + &clusterv1alpha1.Cluster{ + ObjectMeta: metav1.ObjectMeta{Name: "cluster"}, + Spec: clusterv1alpha1.ClusterSpec{ + APIEndpoint: "https://127.0.0.1", + SecretRef: &clusterv1alpha1.LocalSecretReference{Namespace: "ns1", Name: "secret1"}, + InsecureSkipTLSVerification: true, + }, + Status: clusterv1alpha1.ClusterStatus{ + Conditions: []metav1.Condition{ + { + Type: clusterv1alpha1.ClusterConditionReady, + Status: metav1.ConditionTrue, + }, + }, + }, + }, + &corev1.Secret{ + ObjectMeta: metav1.ObjectMeta{Namespace: "ns1", Name: "secret1"}, + Data: map[string][]byte{clusterv1alpha1.SecretTokenKey: []byte("token")}, + }).Build(), + PredicateFunc: helper.NewClusterPredicateOnAgent("test"), + RatelimiterOptions: ratelimiterflag.Options{}, + }, + work: &workv1alpha1.Work{ + ObjectMeta: metav1.ObjectMeta{ + Name: "work", + Namespace: "karmada-es-cluster", + }, + Status: workv1alpha1.WorkStatus{ + Conditions: []metav1.Condition{ + { + Type: workv1alpha1.WorkApplied, + Status: metav1.ConditionTrue, + }, + }, + }, + }, + dFunc: util.NewClusterDynamicClientSet, + ns: "karmada-es-cluster", + expectRes: controllerruntime.Result{}, + existErr: false, + }, + { + name: "work not exists", + c: Controller{ + Client: fake.NewClientBuilder().WithScheme(gclient.NewSchema()).WithObjects(testhelper.NewClusterWithTypeAndStatus("cluster", clusterv1alpha1.ClusterConditionReady, metav1.ConditionTrue)).Build(), + PredicateFunc: helper.NewClusterPredicateOnAgent("test"), + RatelimiterOptions: ratelimiterflag.Options{}, + }, + work: &workv1alpha1.Work{ + ObjectMeta: metav1.ObjectMeta{ + Name: "work-1", + Namespace: "karmada-es-cluster", + }, + Status: workv1alpha1.WorkStatus{ + Conditions: []metav1.Condition{ + { + Type: workv1alpha1.WorkApplied, + Status: metav1.ConditionTrue, + }, + }, + }, + }, + dFunc: util.NewClusterDynamicClientSetForAgent, + ns: "karmada-es-cluster", + expectRes: controllerruntime.Result{}, + existErr: false, + }, + { + name: "work's DeletionTimestamp isn't zero", + c: Controller{ + Client: fake.NewClientBuilder().WithScheme(gclient.NewSchema()).WithObjects(testhelper.NewClusterWithTypeAndStatus("cluster", clusterv1alpha1.ClusterConditionReady, metav1.ConditionTrue)).Build(), + PredicateFunc: helper.NewClusterPredicateOnAgent("test"), + RatelimiterOptions: ratelimiterflag.Options{}, + }, + work: &workv1alpha1.Work{ + ObjectMeta: metav1.ObjectMeta{ + Name: "work", + Namespace: "karmada-es-cluster", + DeletionTimestamp: &metav1.Time{Time: time.Now()}, + }, + Status: workv1alpha1.WorkStatus{ + Conditions: []metav1.Condition{ + { + Type: workv1alpha1.WorkApplied, + Status: metav1.ConditionTrue, + }, + }, + }, + }, + dFunc: util.NewClusterDynamicClientSetForAgent, + ns: "karmada-es-cluster", + expectRes: controllerruntime.Result{}, + existErr: false, + }, + { + name: "failed to get cluster name", + c: Controller{ + Client: fake.NewClientBuilder().WithScheme(gclient.NewSchema()).WithObjects(testhelper.NewClusterWithTypeAndStatus("cluster", clusterv1alpha1.ClusterConditionReady, metav1.ConditionTrue)).Build(), + PredicateFunc: helper.NewClusterPredicateOnAgent("test"), + RatelimiterOptions: ratelimiterflag.Options{}, + }, + work: &workv1alpha1.Work{ + ObjectMeta: metav1.ObjectMeta{ + Name: "work", + Namespace: "karmada-cluster", + }, + Status: workv1alpha1.WorkStatus{ + Conditions: []metav1.Condition{ + { + Type: workv1alpha1.WorkApplied, + Status: metav1.ConditionTrue, + }, + }, + }, + }, + dFunc: util.NewClusterDynamicClientSetForAgent, + ns: "karmada-cluster", + expectRes: controllerruntime.Result{Requeue: true}, + existErr: true, + }, + { + name: "failed to get cluster", + c: Controller{ + Client: fake.NewClientBuilder().WithScheme(gclient.NewSchema()).WithObjects(testhelper.NewClusterWithTypeAndStatus("cluster1", clusterv1alpha1.ClusterConditionReady, metav1.ConditionTrue)).Build(), + PredicateFunc: helper.NewClusterPredicateOnAgent("test"), + RatelimiterOptions: ratelimiterflag.Options{}, + }, + work: &workv1alpha1.Work{ + ObjectMeta: metav1.ObjectMeta{ + Name: "work", + Namespace: "karmada-es-cluster", + }, + Status: workv1alpha1.WorkStatus{ + Conditions: []metav1.Condition{ + { + Type: workv1alpha1.WorkApplied, + Status: metav1.ConditionTrue, + }, + }, + }, + }, + dFunc: util.NewClusterDynamicClientSetForAgent, + ns: "karmada-es-cluster", + expectRes: controllerruntime.Result{Requeue: true}, + existErr: true, + }, + { + name: "cluster is not ready", + c: Controller{ + Client: fake.NewClientBuilder().WithScheme(gclient.NewSchema()).WithObjects(testhelper.NewClusterWithTypeAndStatus("cluster", clusterv1alpha1.ClusterConditionReady, metav1.ConditionFalse)).Build(), + PredicateFunc: helper.NewClusterPredicateOnAgent("test"), + RatelimiterOptions: ratelimiterflag.Options{}, + }, + work: &workv1alpha1.Work{ + ObjectMeta: metav1.ObjectMeta{ + Name: "work", + Namespace: "karmada-es-cluster", + }, + Status: workv1alpha1.WorkStatus{ + Conditions: []metav1.Condition{ + { + Type: workv1alpha1.WorkApplied, + Status: metav1.ConditionTrue, + }, + }, + }, + }, + dFunc: util.NewClusterDynamicClientSetForAgent, + ns: "karmada-es-cluster", + expectRes: controllerruntime.Result{Requeue: true}, + existErr: true, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + injectMemberClusterInformer(&tt.c, genericmanager.GetInstance(), tt.dFunc) + + tt.c.RunWorkQueue() + + req := controllerruntime.Request{ + NamespacedName: types.NamespacedName{ + Name: "work", + Namespace: tt.ns, + }, + } + + if err := tt.c.Create(context.Background(), tt.work); err != nil { + t.Fatalf("Failed to create work: %v", err) + } + + res, err := tt.c.Reconcile(context.Background(), req) + assert.Equal(t, tt.expectRes, res) + if tt.existErr { + assert.NotEmpty(t, err) + } else { + assert.Empty(t, err) + } + }) + } +} + +func Test_generateKey(t *testing.T) { + tests := []struct { + name string + obj *unstructured.Unstructured + expectRes controllerruntime.Request + existRes bool + existErr bool + }{ + { + name: "normal case", + obj: &unstructured.Unstructured{ + Object: map[string]interface{}{ + "apiVersion": "v1", + "kind": "Pod", + "metadata": map[string]interface{}{ + "name": "test", + "namespace": "default", + "labels": map[string]interface{}{ + workv1alpha1.WorkNamespaceLabel: "karmada-es-cluster", + workv1alpha1.WorkNameLabel: "work", + }, + }, + }, + }, + expectRes: makeRequest("karmada-es-cluster", "work"), + existRes: true, + existErr: false, + }, + { + name: "getWorkNameFromLabel failed", + obj: &unstructured.Unstructured{ + Object: map[string]interface{}{ + "apiVersion": "v1", + "kind": "Pod", + "metadata": map[string]interface{}{ + "name": "test", + "namespace": "default", + "labels": map[string]interface{}{ + workv1alpha1.WorkNamespaceLabel: "karmada-es-cluster", + }, + }, + }, + }, + existRes: false, + existErr: false, + }, + { + name: "getWorkNamespaceFromLabel failed", + obj: &unstructured.Unstructured{ + Object: map[string]interface{}{ + "apiVersion": "v1", + "kind": "Pod", + "metadata": map[string]interface{}{ + "name": "test", + "namespace": "default", + "labels": map[string]interface{}{ + workv1alpha1.WorkNameLabel: "work", + }, + }, + }, + }, + existRes: false, + existErr: false, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + queueKey, err := generateKey(tt.obj) + if tt.existRes { + assert.Equal(t, tt.expectRes, queueKey) + } else { + assert.Empty(t, queueKey) + } + + if tt.existErr { + assert.NotEmpty(t, err) + } else { + assert.Empty(t, err) + } + }) + } +} + +func newPodObj(name string, labels map[string]string) *unstructured.Unstructured { + labelsCopy := map[string]interface{}{} + + for k, v := range labels { + labelsCopy[k] = v + } + + return &unstructured.Unstructured{ + Object: map[string]interface{}{ + "apiVersion": "v1", + "kind": "Pod", + "metadata": map[string]interface{}{ + "name": name, + "namespace": "default", + "labels": labelsCopy, + }, + }, + } +} + +func newPod(name string, labels map[string]string) *corev1.Pod { + return &corev1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Name: name, + Namespace: "default", + Labels: labels, + }, + } +} + +func newWorkLabels(workNs, workName string) map[string]string { + labels := map[string]string{} + if workNs != "" { + labels[workv1alpha1.WorkNamespaceLabel] = workNs + } + + if workName != "" { + labels[workv1alpha1.WorkNameLabel] = workName + } + + return labels +} + +func newWorkWithStatus(status metav1.ConditionStatus, reason, message string, t time.Time) *workv1alpha1.Work { + return &workv1alpha1.Work{ + ObjectMeta: metav1.ObjectMeta{ + Namespace: "karmada-es-cluster", + Name: "work-name", + }, + Status: workv1alpha1.WorkStatus{ + Conditions: []metav1.Condition{ + { + Type: workv1alpha1.WorkApplied, + Status: status, + Reason: reason, + Message: message, + LastTransitionTime: metav1.NewTime(t), + }, + }, + }, + } +} + +func TestExecutionController_tryDeleteWorkload(t *testing.T) { + raw := []byte(` + { + "apiVersion":"v1", + "kind":"Pod", + "metadata":{ + "name":"pod", + "namespace":"default", + "labels":{ + "work.karmada.io/name":"work-name", + "work.karmada.io/namespace":"karmada-es-cluster" + } + } + }`) + podName := "pod" + workName := "work-name" + workNs := "karmada-es-cluster" + clusterName := "cluster" + podGVR := corev1.SchemeGroupVersion.WithResource("pods") + + tests := []struct { + name string + pod *corev1.Pod + work *workv1alpha1.Work + controllerWithoutInformer bool + expectedError bool + objectNeedDelete bool + }{ + { + name: "failed to GetObjectFromCache, wrong InformerManager in ExecutionController", + pod: newPod(podName, newWorkLabels(workNs, workName)), + work: testhelper.NewWork(workName, workNs, raw), + controllerWithoutInformer: false, + expectedError: false, + objectNeedDelete: false, + }, + { + name: "workload is not managed by karmada, without work-related labels", + pod: newPod(podName, nil), + work: testhelper.NewWork(workName, workNs, raw), + controllerWithoutInformer: true, + expectedError: false, + objectNeedDelete: false, + }, + { + name: "workload is not related to current work", + pod: newPod(podName, newWorkLabels(workNs, "wrong-work")), + work: testhelper.NewWork(workName, workNs, raw), + controllerWithoutInformer: true, + expectedError: false, + objectNeedDelete: false, + }, + { + name: "normal case", + pod: newPod(podName, newWorkLabels(workNs, workName)), + work: testhelper.NewWork(workName, workNs, raw), + controllerWithoutInformer: true, + expectedError: false, + objectNeedDelete: true, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + var dynamicClientSet *dynamicfake.FakeDynamicClient + if tt.pod != nil { + dynamicClientSet = dynamicfake.NewSimpleDynamicClient(scheme.Scheme, tt.pod) + } else { + dynamicClientSet = dynamicfake.NewSimpleDynamicClient(scheme.Scheme) + } + + fedDynamicClientSet := dynamicClientSet + if !tt.controllerWithoutInformer { + fedDynamicClientSet = dynamicfake.NewSimpleDynamicClient(scheme.Scheme) + } + + o := newExecutionOptions() + o.dynamicClientSet = fedDynamicClientSet + + c := newExecutionController(ctx, o) + + err := c.tryDeleteWorkload(clusterName, tt.work) + if tt.expectedError { + assert.NotEmpty(t, err) + } else { + assert.Empty(t, err) + } + + _, err = dynamicClientSet.Resource(podGVR).Namespace("default").Get(context.Background(), "pod", metav1.GetOptions{}) + if tt.objectNeedDelete { + assert.True(t, apierrors.IsNotFound(err)) + } else { + assert.Empty(t, err) + } + }) + } +} + +func TestExecutionController_tryCreateOrUpdateWorkload(t *testing.T) { + podName := "pod" + workName := "work-name" + workNs := "karmada-es-cluster" + clusterName := "cluster" + podGVR := corev1.SchemeGroupVersion.WithResource("pods") + annotations := map[string]string{ + workv1alpha2.ResourceConflictResolutionAnnotation: workv1alpha2.ResourceConflictResolutionOverwrite, + } + + tests := []struct { + name string + pod *corev1.Pod + obj *unstructured.Unstructured + withAnnotation bool + expectedError bool + objectExist bool + labelMatch bool + }{ + { + name: "created workload", + pod: newPod("wrong-pod", nil), + obj: newPodObj(podName, newWorkLabels(workNs, workName)), + withAnnotation: false, + expectedError: false, + objectExist: true, + labelMatch: true, + }, + { + name: "failed to update object, overwrite conflict resolusion not set", + pod: newPod(podName, nil), + obj: newPodObj(podName, newWorkLabels(workNs, workName)), + withAnnotation: false, + expectedError: true, + objectExist: true, + labelMatch: false, + }, + { + name: "updated object", + pod: newPod(podName, nil), + obj: newPodObj(podName, newWorkLabels(workNs, workName)), + withAnnotation: true, + expectedError: false, + objectExist: true, + labelMatch: true, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + dynamicClientSet := dynamicfake.NewSimpleDynamicClient(scheme.Scheme, tt.pod) + + o := newExecutionOptions() + o.dynamicClientSet = dynamicClientSet + + c := newExecutionController(ctx, o) + + if tt.withAnnotation { + tt.obj.SetAnnotations(annotations) + } + err := c.tryCreateOrUpdateWorkload(clusterName, tt.obj) + if tt.expectedError { + assert.NotEmpty(t, err) + } else { + assert.Empty(t, err) + } + + resource, err := dynamicClientSet.Resource(podGVR).Namespace("default").Get(context.Background(), "pod", metav1.GetOptions{}) + if tt.objectExist { + assert.Empty(t, err) + } else { + assert.True(t, apierrors.IsNotFound(err)) + return + } + + labels := map[string]string{workv1alpha1.WorkNamespaceLabel: workNs, workv1alpha1.WorkNameLabel: workName} + if tt.labelMatch { + assert.Equal(t, resource.GetLabels(), labels) + } else { + assert.Empty(t, resource.GetLabels()) + } + }) + } +} + +func TestExecutionController_updateAppliedConditionIfNeed(t *testing.T) { + baseTime := time.Now().Add(-time.Minute) + + tests := []struct { + name string + work *workv1alpha1.Work + status metav1.ConditionStatus + reason string + message string + updated bool + }{ + { + name: "update condition, from false to true", + work: newWorkWithStatus(metav1.ConditionFalse, "reason1", "message1", baseTime), + status: metav1.ConditionTrue, + reason: "", + message: "", + updated: true, + }, + { + name: "update condition, from true to false", + work: newWorkWithStatus(metav1.ConditionTrue, "", "", baseTime), + status: metav1.ConditionFalse, + reason: "reason1", + message: "message1", + updated: true, + }, + { + name: "update condition, for reason changed", + work: newWorkWithStatus(metav1.ConditionFalse, "reason1", "message1", baseTime), + status: metav1.ConditionFalse, + reason: "reason2", + message: "message1", + updated: true, + }, + { + name: "update condition, for message changed", + work: newWorkWithStatus(metav1.ConditionFalse, "reason1", "message1", baseTime), + status: metav1.ConditionFalse, + reason: "reason1", + message: "message2", + updated: true, + }, + { + name: "not update condition, for nothing changed", + work: newWorkWithStatus(metav1.ConditionFalse, "reason1", "message1", baseTime), + status: metav1.ConditionFalse, + reason: "reason1", + message: "message1", + updated: false, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + o := newExecutionOptions() + o.objects = append(o.objects, tt.work) + o.objectsWithStatus = append(o.objectsWithStatus, &workv1alpha1.Work{}) + + c := newExecutionController(ctx, o) + + workOld := &workv1alpha1.Work{} + err := c.Get(ctx, types.NamespacedName{Namespace: tt.work.Namespace, Name: tt.work.Name}, workOld) + if err != nil { + t.Errorf("failed to get created work: %v", err) + return + } + + t.Logf("Got work: %+v", *workOld) + + appliedCopy := meta.FindStatusCondition(workOld.DeepCopy().Status.Conditions, workv1alpha1.WorkApplied).DeepCopy() + err = c.updateAppliedConditionIfNeed(workOld, tt.status, tt.reason, tt.message) + if err != nil { + t.Errorf("failed to update work: %v", err) + return + } + + workNew := &workv1alpha1.Work{} + err = c.Get(ctx, types.NamespacedName{Namespace: tt.work.Namespace, Name: tt.work.Name}, workNew) + if err != nil { + t.Errorf("failed to get updated work: %v", err) + return + } + + newApplied := meta.FindStatusCondition(workNew.Status.Conditions, workv1alpha1.WorkApplied) + if tt.updated { + if appliedCopy.Status != newApplied.Status { + assert.NotEqual(t, newApplied.LastTransitionTime, appliedCopy.LastTransitionTime) + } + + appliedCopy.LastTransitionTime = newApplied.LastTransitionTime + assert.NotEqual(t, newApplied, appliedCopy) + } else { + assert.Equal(t, newApplied, appliedCopy) + } + }) + } +} + +func TestExecutionController_syncWork(t *testing.T) { + basePod := newPod("pod", nil) + workName := "work" + workNs := "karmada-es-cluster" + podGVR := corev1.SchemeGroupVersion.WithResource("pods") + podRaw := []byte(` + { + "apiVersion":"v1", + "kind":"Pod", + "metadata":{ + "name":"pod", + "namespace":"default", + "annotations":{ + "work.karmada.io/conflict-resolution": "overwrite" + }, + "labels":{ + "work.karmada.io/name":"work", + "work.karmada.io/namespace":"karmada-es-cluster" + } + } + }`) + + tests := []struct { + name string + workNamespace string + controllerWithoutInformer bool + expectedError bool + clusterNotReady bool + updated bool + }{ + { + name: "failed to GetObjectFromCache, wrong InformerManager in ExecutionController", + workNamespace: workNs, + controllerWithoutInformer: false, + expectedError: true, + }, + { + name: "obj not found in informer, wrong dynamicClientSet without pod", + workNamespace: workNs, + controllerWithoutInformer: true, + expectedError: false, + }, + { + name: "workNamespace is zero", + workNamespace: "", + controllerWithoutInformer: true, + expectedError: true, + }, + { + name: "failed to exec Client.Get, set wrong cluster name in work", + workNamespace: "karmada-es-wrong-cluster", + controllerWithoutInformer: true, + expectedError: true, + }, + { + name: "cluster is not ready", + workNamespace: workNs, + controllerWithoutInformer: true, + expectedError: true, + clusterNotReady: true, + }, + { + name: "normal case", + workNamespace: workNs, + controllerWithoutInformer: true, + expectedError: false, + updated: true, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + o := newExecutionOptions() + if tt.clusterNotReady { + o = newExecutionOptions(testhelper.NewClusterWithTypeAndStatus("cluster", clusterv1alpha1.ClusterConditionReady, metav1.ConditionFalse)) + } + + pod := basePod.DeepCopy() + dynamicClientSet := dynamicfake.NewSimpleDynamicClient(scheme.Scheme, pod) + + if tt.controllerWithoutInformer { + o.dynamicClientSet = dynamicClientSet + } + + work := testhelper.NewWork(workName, tt.workNamespace, podRaw) + o.objects = append(o.objects, work) + o.objectsWithStatus = append(o.objectsWithStatus, &workv1alpha1.Work{}) + + key := makeRequest(work.Namespace, work.Name) + + c := newExecutionController(ctx, o) + + err := c.syncWork(key) + if tt.expectedError { + assert.NotEmpty(t, err) + } else { + assert.Empty(t, err) + } + + if tt.updated { + resource, err := dynamicClientSet.Resource(podGVR).Namespace(basePod.Namespace).Get(ctx, basePod.Name, metav1.GetOptions{}) + if err != nil { + t.Fatalf("Failed to get pod: %v", err) + } + + expectedLabels := newWorkLabels(workNs, workName) + assert.Equal(t, resource.GetLabels(), expectedLabels) + } + }) + } +} + +func injectMemberClusterInformer(c *Controller, + informerManager genericmanager.MultiClusterInformerManager, + clusterDynamicClientSetFunc func(clusterName string, client client.Client) (*util.DynamicClusterClient, error), +) { + r := memberclusterinformer.NewMemberClusterInformer(c.Client, newRESTMapper(), informerManager, metav1.Duration{}, clusterDynamicClientSetFunc) + c.MemberClusterInformer = r +} + +func newRESTMapper() meta.RESTMapper { + m := meta.NewDefaultRESTMapper([]schema.GroupVersion{corev1.SchemeGroupVersion}) + m.Add(corev1.SchemeGroupVersion.WithKind("Pod"), meta.RESTScopeNamespace) + return m +} + +type executionOptions struct { + dynamicClientSet dynamic.Interface + cluster *clusterv1alpha1.Cluster + objects []client.Object + objectsWithStatus []client.Object +} + +func newExecutionOptions(cluster ...*clusterv1alpha1.Cluster) *executionOptions { + o := &executionOptions{} + + if len(cluster) > 0 { + o.cluster = cluster[0] + } else { + o.cluster = testhelper.NewClusterWithTypeAndStatus("cluster", clusterv1alpha1.ClusterConditionReady, metav1.ConditionTrue) + } + + o.objects = append(o.objects, o.cluster) + o.objectsWithStatus = append(o.objectsWithStatus, &clusterv1alpha1.Cluster{}) + + return o +} + +func newExecutionController(ctx context.Context, opt *executionOptions) Controller { + c := Controller{ + Ctx: ctx, + Client: fake.NewClientBuilder().WithScheme(gclient.NewSchema()). + WithObjects(opt.objects...).WithStatusSubresource(opt.objectsWithStatus...).Build(), + PredicateFunc: helper.NewClusterPredicateOnAgent("test"), + RatelimiterOptions: ratelimiterflag.Options{}, + EventRecorder: record.NewFakeRecorder(1024), + } + + informerManager := genericmanager.GetInstance() + clusterDynamicClientSetFunc := util.NewClusterDynamicClientSetForAgent + + if opt.dynamicClientSet != nil { + clusterName := opt.cluster.Name + + // Generate ResourceInterpreter and ObjectWatcher + resourceInterpreter := interpreterfake.NewFakeInterpreter() + clusterDynamicClientSetFunc = newClusterDynamicClientSetForAgent(clusterName, opt.dynamicClientSet) + + c.ObjectWatcher = objectwatcher.NewObjectWatcher(c.Client, newRESTMapper(), clusterDynamicClientSetFunc, resourceInterpreter) + + // Generate InformerManager + m := genericmanager.NewMultiClusterInformerManager(ctx.Done()) + m.ForCluster(clusterName, opt.dynamicClientSet, 0).Lister(corev1.SchemeGroupVersion.WithResource("pods")) // register pod informer + m.Start(clusterName) + m.WaitForCacheSync(clusterName) + informerManager = m + } + + injectMemberClusterInformer(&c, informerManager, clusterDynamicClientSetFunc) + + return c +} + +func newClusterDynamicClientSetForAgent(clusterName string, dynamicClientSet dynamic.Interface) func(string, client.Client) (*util.DynamicClusterClient, error) { + return func(string, client.Client) (*util.DynamicClusterClient, error) { + return &util.DynamicClusterClient{ + ClusterName: clusterName, + DynamicClientSet: dynamicClientSet, + }, nil + } +} + +func makeRequest(workNamespace, workName string) controllerruntime.Request { + return controllerruntime.Request{ + NamespacedName: types.NamespacedName{ + Namespace: workNamespace, + Name: workName, + }, + } +} diff --git a/pkg/controllers/status/work_status_controller.go b/pkg/controllers/status/work_status_controller.go index 1daa96411..50b3a8f4a 100644 --- a/pkg/controllers/status/work_status_controller.go +++ b/pkg/controllers/status/work_status_controller.go @@ -7,9 +7,8 @@ import ( corev1 "k8s.io/api/core/v1" apierrors "k8s.io/apimachinery/pkg/api/errors" - "k8s.io/apimachinery/pkg/api/meta" - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" + "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/client-go/tools/cache" "k8s.io/client-go/tools/record" @@ -21,19 +20,16 @@ import ( "sigs.k8s.io/controller-runtime/pkg/controller" "sigs.k8s.io/controller-runtime/pkg/predicate" - clusterv1alpha1 "github.com/karmada-io/karmada/pkg/apis/cluster/v1alpha1" workv1alpha1 "github.com/karmada-io/karmada/pkg/apis/work/v1alpha1" "github.com/karmada-io/karmada/pkg/events" "github.com/karmada-io/karmada/pkg/resourceinterpreter" "github.com/karmada-io/karmada/pkg/sharedcli/ratelimiterflag" "github.com/karmada-io/karmada/pkg/util" "github.com/karmada-io/karmada/pkg/util/fedinformer" - "github.com/karmada-io/karmada/pkg/util/fedinformer/genericmanager" "github.com/karmada-io/karmada/pkg/util/fedinformer/keys" "github.com/karmada-io/karmada/pkg/util/helper" + "github.com/karmada-io/karmada/pkg/util/memberclusterinformer" "github.com/karmada-io/karmada/pkg/util/names" - "github.com/karmada-io/karmada/pkg/util/objectwatcher" - "github.com/karmada-io/karmada/pkg/util/restmapper" ) // WorkStatusControllerName is the controller name that will be used when reporting events. @@ -41,21 +37,17 @@ const WorkStatusControllerName = "work-status-controller" // WorkStatusController is to sync status of Work. type WorkStatusController struct { - client.Client // used to operate Work resources. - EventRecorder record.EventRecorder - RESTMapper meta.RESTMapper - InformerManager genericmanager.MultiClusterInformerManager - eventHandler cache.ResourceEventHandler // eventHandler knows how to handle events from the member cluster. - StopChan <-chan struct{} - worker util.AsyncWorker // worker process resources periodic from rateLimitingQueue. + client.Client // used to operate Work resources. + EventRecorder record.EventRecorder + eventHandler cache.ResourceEventHandler // eventHandler knows how to handle events from the member cluster. + StopChan <-chan struct{} + worker util.AsyncWorker // worker process resources periodic from rateLimitingQueue. // ConcurrentWorkStatusSyncs is the number of Work status that are allowed to sync concurrently. - ConcurrentWorkStatusSyncs int - ObjectWatcher objectwatcher.ObjectWatcher - PredicateFunc predicate.Predicate - ClusterDynamicClientSetFunc func(clusterName string, client client.Client) (*util.DynamicClusterClient, error) - ClusterCacheSyncTimeout metav1.Duration - RateLimiterOptions ratelimiterflag.Options - ResourceInterpreter resourceinterpreter.ResourceInterpreter + ConcurrentWorkStatusSyncs int + PredicateFunc predicate.Predicate + RateLimiterOptions ratelimiterflag.Options + ResourceInterpreter resourceinterpreter.ResourceInterpreter + MemberClusterInformer memberclusterinformer.MemberClusterInformer } // Reconcile performs a full reconciliation for the object referred to by the Request. @@ -99,26 +91,25 @@ func (c *WorkStatusController) Reconcile(ctx context.Context, req controllerrunt return controllerruntime.Result{Requeue: true}, fmt.Errorf("cluster(%s) not ready", cluster.Name) } - return c.buildResourceInformers(cluster, work) -} - -// buildResourceInformers builds informer dynamically for managed resources in member cluster. -// The created informer watches resource change and then sync to the relevant Work object. -func (c *WorkStatusController) buildResourceInformers(cluster *clusterv1alpha1.Cluster, work *workv1alpha1.Work) (controllerruntime.Result, error) { - err := c.registerInformersAndStart(cluster, work) + err = c.MemberClusterInformer.BuildResourceInformers(cluster, work, c.eventHandler) if err != nil { - klog.Errorf("Failed to register informer for Work %s/%s. Error: %v.", work.GetNamespace(), work.GetName(), err) return controllerruntime.Result{Requeue: true}, err } + return controllerruntime.Result{}, nil } -// getEventHandler return callback function that knows how to handle events from the member cluster. -func (c *WorkStatusController) getEventHandler() cache.ResourceEventHandler { - if c.eventHandler == nil { - c.eventHandler = fedinformer.NewHandlerOnAllEvents(c.worker.Enqueue) +func (c *WorkStatusController) onAdd(obj interface{}) { + curObj := obj.(runtime.Object) + c.worker.Enqueue(curObj) +} + +func (c *WorkStatusController) onUpdate(old, cur interface{}) { + // Still need to compare the whole object because the interpreter is able to take any part of object to reflect status. + if !reflect.DeepEqual(old, cur) { + curObj := cur.(runtime.Object) + c.worker.Enqueue(curObj) } - return c.eventHandler } // RunWorkQueue initializes worker and run it, worker will process resource asynchronously. @@ -150,7 +141,7 @@ func generateKey(obj interface{}) (util.QueueKey, error) { // getClusterNameFromLabel gets cluster name from ownerLabel, if label not exist, means resource is not created by karmada. func getClusterNameFromLabel(resource *unstructured.Unstructured) (string, error) { workNamespace := util.GetLabelValue(resource.GetLabels(), workv1alpha1.WorkNamespaceLabel) - if len(workNamespace) == 0 { + if workNamespace == "" { klog.V(4).Infof("Ignore resource(%s/%s/%s) which not managed by karmada", resource.GetKind(), resource.GetNamespace(), resource.GetName()) return "", nil } @@ -171,17 +162,23 @@ func (c *WorkStatusController) syncWorkStatus(key util.QueueKey) error { return fmt.Errorf("invalid key") } - observedObj, err := helper.GetObjectFromCache(c.RESTMapper, c.InformerManager, fedKey) + klog.Infof("Begin to sync status to Work of object(%s)", fedKey.String()) + + observedObj, err := c.MemberClusterInformer.GetObjectFromCache(fedKey) if err != nil { if apierrors.IsNotFound(err) { - return c.handleDeleteEvent(fedKey) + executionSpace := names.GenerateExecutionSpaceName(fedKey.Cluster) + workName := names.GenerateWorkName(fedKey.Kind, fedKey.Name, fedKey.Namespace) + klog.Warningf("Skip reflecting %s(%s/%s) status to Work(%s/%s) for not applied successfully yet.", + fedKey.Kind, fedKey.Namespace, fedKey.Name, executionSpace, workName) + return nil } return err } workNamespace := util.GetLabelValue(observedObj.GetLabels(), workv1alpha1.WorkNamespaceLabel) workName := util.GetLabelValue(observedObj.GetLabels(), workv1alpha1.WorkNameLabel) - if len(workNamespace) == 0 || len(workName) == 0 { + if workNamespace == "" || workName == "" { klog.Infof("Ignore object(%s) which not managed by karmada.", fedKey.String()) return nil } @@ -202,85 +199,17 @@ func (c *WorkStatusController) syncWorkStatus(key util.QueueKey) error { return nil } - desiredObj, err := c.getRawManifest(workObject.Spec.Workload.Manifests, observedObj) - if err != nil { + if !helper.IsResourceApplied(&workObject.Status) { + err = fmt.Errorf("work hadn't been applied yet") + klog.Errorf("Failed to reflect status of %s(%s/%s) to Work(%s/%s): %v.", + observedObj.GetKind(), observedObj.GetNamespace(), observedObj.GetName(), workNamespace, workName, err) return err } - clusterName, err := names.GetClusterName(workNamespace) - if err != nil { - klog.Errorf("Failed to get member cluster name: %v", err) - return err - } - - // we should check if the observed status is consistent with the declaration to prevent accidental changes made - // in member clusters. - needUpdate, err := c.ObjectWatcher.NeedsUpdate(clusterName, desiredObj, observedObj) - if err != nil { - return err - } - - if needUpdate { - if err := c.ObjectWatcher.Update(clusterName, desiredObj, observedObj); err != nil { - klog.Errorf("Update %s failed: %v", fedKey.String(), err) - return err - } - // We can't return even after a success updates, because that might lose the chance to collect status. - // Not all updates are real, they might be no change, in that case there will be no more event for this update, - // this usually happens with those resources not enables 'metadata.generation', like 'Service'. - // When a Service's status changes, it's 'metadata.resourceVersion' will be increased, but 'metadata.generation' - // not increased(defaults to 0), the ObjectWatcher can't easily tell what happened to the object, so ObjectWatcher - // also needs to update again. The update operation will be a non-operation if the event triggered by Service's - // status changes. - } - klog.Infof("reflecting %s(%s/%s) status to Work(%s/%s)", observedObj.GetKind(), observedObj.GetNamespace(), observedObj.GetName(), workNamespace, workName) return c.reflectStatus(workObject, observedObj) } -func (c *WorkStatusController) handleDeleteEvent(key keys.FederatedKey) error { - executionSpace := names.GenerateExecutionSpaceName(key.Cluster) - - // Given the workload might has been deleted from informer cache, so that we can't get work object by it's label, - // we have to get work by naming rule as the work's name is generated by the workload's kind, name and namespace. - workName := names.GenerateWorkName(key.Kind, key.Name, key.Namespace) - work := &workv1alpha1.Work{} - if err := c.Client.Get(context.TODO(), client.ObjectKey{Namespace: executionSpace, Name: workName}, work); err != nil { - // stop processing as the work object has been removed, assume it's a normal delete operation. - if apierrors.IsNotFound(err) { - return nil - } - - klog.Errorf("Failed to get Work from cache: %v", err) - return err - } - - // stop processing as the work object being deleting. - if !work.DeletionTimestamp.IsZero() { - return nil - } - - return c.recreateResourceIfNeeded(work, key) -} - -func (c *WorkStatusController) recreateResourceIfNeeded(work *workv1alpha1.Work, workloadKey keys.FederatedKey) error { - for _, rawManifest := range work.Spec.Workload.Manifests { - manifest := &unstructured.Unstructured{} - if err := manifest.UnmarshalJSON(rawManifest.Raw); err != nil { - return err - } - - desiredGVK := schema.FromAPIVersionAndKind(manifest.GetAPIVersion(), manifest.GetKind()) - if reflect.DeepEqual(desiredGVK, workloadKey.GroupVersionKind()) && - manifest.GetNamespace() == workloadKey.Namespace && - manifest.GetName() == workloadKey.Name { - klog.Infof("recreating %s", workloadKey.String()) - return c.ObjectWatcher.Create(workloadKey.Cluster, manifest) - } - } - return nil -} - // reflectStatus grabs cluster object's running status then updates to its owner object(Work). func (c *WorkStatusController) reflectStatus(work *workv1alpha1.Work, clusterObj *unstructured.Unstructured) error { statusRaw, err := c.ResourceInterpreter.ReflectStatus(clusterObj) @@ -374,109 +303,10 @@ func (c *WorkStatusController) mergeStatus(_ []workv1alpha1.ManifestStatus, newS return []workv1alpha1.ManifestStatus{newStatus} } -func (c *WorkStatusController) getRawManifest(manifests []workv1alpha1.Manifest, clusterObj *unstructured.Unstructured) (*unstructured.Unstructured, error) { - for _, rawManifest := range manifests { - manifest := &unstructured.Unstructured{} - if err := manifest.UnmarshalJSON(rawManifest.Raw); err != nil { - return nil, err - } - - if manifest.GetAPIVersion() == clusterObj.GetAPIVersion() && - manifest.GetKind() == clusterObj.GetKind() && - manifest.GetNamespace() == clusterObj.GetNamespace() && - manifest.GetName() == clusterObj.GetName() { - return manifest, nil - } - } - - return nil, fmt.Errorf("no such manifest exist") -} - -// registerInformersAndStart builds informer manager for cluster if it doesn't exist, then constructs informers for gvr -// and start it. -func (c *WorkStatusController) registerInformersAndStart(cluster *clusterv1alpha1.Cluster, work *workv1alpha1.Work) error { - singleClusterInformerManager, err := c.getSingleClusterManager(cluster) - if err != nil { - return err - } - - gvrTargets, err := c.getGVRsFromWork(work) - if err != nil { - return err - } - - allSynced := true - for gvr := range gvrTargets { - if !singleClusterInformerManager.IsInformerSynced(gvr) || !singleClusterInformerManager.IsHandlerExist(gvr, c.getEventHandler()) { - allSynced = false - singleClusterInformerManager.ForResource(gvr, c.getEventHandler()) - } - } - if allSynced { - return nil - } - - c.InformerManager.Start(cluster.Name) - - if err := func() error { - synced := c.InformerManager.WaitForCacheSyncWithTimeout(cluster.Name, c.ClusterCacheSyncTimeout.Duration) - if synced == nil { - return fmt.Errorf("no informerFactory for cluster %s exist", cluster.Name) - } - for gvr := range gvrTargets { - if !synced[gvr] { - return fmt.Errorf("informer for %s hasn't synced", gvr) - } - } - return nil - }(); err != nil { - klog.Errorf("Failed to sync cache for cluster: %s, error: %v", cluster.Name, err) - c.InformerManager.Stop(cluster.Name) - return err - } - - return nil -} - -// getGVRsFromWork traverses the manifests in work to find groupVersionResource list. -func (c *WorkStatusController) getGVRsFromWork(work *workv1alpha1.Work) (map[schema.GroupVersionResource]bool, error) { - gvrTargets := map[schema.GroupVersionResource]bool{} - for _, manifest := range work.Spec.Workload.Manifests { - workload := &unstructured.Unstructured{} - err := workload.UnmarshalJSON(manifest.Raw) - if err != nil { - klog.Errorf("Failed to unmarshal workload. Error: %v.", err) - return nil, err - } - gvr, err := restmapper.GetGroupVersionResource(c.RESTMapper, workload.GroupVersionKind()) - if err != nil { - klog.Errorf("Failed to get GVR from GVK for resource %s/%s. Error: %v.", workload.GetNamespace(), workload.GetName(), err) - return nil, err - } - gvrTargets[gvr] = true - } - return gvrTargets, nil -} - -// getSingleClusterManager gets singleClusterInformerManager with clusterName. -// If manager is not exist, create it, otherwise gets it from map. -func (c *WorkStatusController) getSingleClusterManager(cluster *clusterv1alpha1.Cluster) (genericmanager.SingleClusterInformerManager, error) { - // TODO(chenxianpao): If cluster A is removed, then a new cluster that name also is A joins karmada, - // the cache in informer manager should be updated. - singleClusterInformerManager := c.InformerManager.GetSingleClusterManager(cluster.Name) - if singleClusterInformerManager == nil { - dynamicClusterClient, err := c.ClusterDynamicClientSetFunc(cluster.Name, c.Client) - if err != nil { - klog.Errorf("Failed to build dynamic cluster client for cluster %s.", cluster.Name) - return nil, err - } - singleClusterInformerManager = c.InformerManager.ForCluster(dynamicClusterClient.ClusterName, dynamicClusterClient.DynamicClientSet, 0) - } - return singleClusterInformerManager, nil -} - // SetupWithManager creates a controller and register to controller manager. func (c *WorkStatusController) SetupWithManager(mgr controllerruntime.Manager) error { + c.eventHandler = fedinformer.NewHandlerOnEvents(c.onAdd, c.onUpdate, nil) + return controllerruntime.NewControllerManagedBy(mgr). For(&workv1alpha1.Work{}, builder.WithPredicates(c.PredicateFunc)). WithOptions(controller.Options{ diff --git a/pkg/controllers/status/work_status_controller_test.go b/pkg/controllers/status/work_status_controller_test.go index b9e9dc1c1..a4053243e 100644 --- a/pkg/controllers/status/work_status_controller_test.go +++ b/pkg/controllers/status/work_status_controller_test.go @@ -20,6 +20,7 @@ import ( kubernetesfake "k8s.io/client-go/kubernetes/fake" "k8s.io/client-go/kubernetes/scheme" controllerruntime "sigs.k8s.io/controller-runtime" + "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/controller-runtime/pkg/client/fake" clusterv1alpha1 "github.com/karmada-io/karmada/pkg/apis/cluster/v1alpha1" @@ -28,33 +29,17 @@ import ( "github.com/karmada-io/karmada/pkg/sharedcli/ratelimiterflag" "github.com/karmada-io/karmada/pkg/util" "github.com/karmada-io/karmada/pkg/util/fedinformer/genericmanager" - "github.com/karmada-io/karmada/pkg/util/fedinformer/keys" "github.com/karmada-io/karmada/pkg/util/gclient" "github.com/karmada-io/karmada/pkg/util/helper" - "github.com/karmada-io/karmada/pkg/util/objectwatcher" + "github.com/karmada-io/karmada/pkg/util/memberclusterinformer" + testhelper "github.com/karmada-io/karmada/test/helper" ) -func newCluster(name string, clusterType string, clusterStatus metav1.ConditionStatus) *clusterv1alpha1.Cluster { - return &clusterv1alpha1.Cluster{ - ObjectMeta: metav1.ObjectMeta{ - Name: name, - }, - Spec: clusterv1alpha1.ClusterSpec{}, - Status: clusterv1alpha1.ClusterStatus{ - Conditions: []metav1.Condition{ - { - Type: clusterType, - Status: clusterStatus, - }, - }, - }, - } -} - func TestWorkStatusController_Reconcile(t *testing.T) { tests := []struct { name string c WorkStatusController + dFunc func(clusterName string, client client.Client) (*util.DynamicClusterClient, error) work *workv1alpha1.Work ns string expectRes controllerruntime.Result @@ -84,11 +69,8 @@ func TestWorkStatusController_Reconcile(t *testing.T) { ObjectMeta: metav1.ObjectMeta{Namespace: "ns1", Name: "secret1"}, Data: map[string][]byte{clusterv1alpha1.SecretTokenKey: []byte("token")}, }).Build(), - InformerManager: genericmanager.GetInstance(), - PredicateFunc: helper.NewClusterPredicateOnAgent("test"), - ClusterDynamicClientSetFunc: util.NewClusterDynamicClientSet, - ClusterCacheSyncTimeout: metav1.Duration{}, - RateLimiterOptions: ratelimiterflag.Options{}, + PredicateFunc: helper.NewClusterPredicateOnAgent("test"), + RateLimiterOptions: ratelimiterflag.Options{}, }, work: &workv1alpha1.Work{ ObjectMeta: metav1.ObjectMeta{ @@ -104,6 +86,7 @@ func TestWorkStatusController_Reconcile(t *testing.T) { }, }, }, + dFunc: util.NewClusterDynamicClientSet, ns: "karmada-es-cluster", expectRes: controllerruntime.Result{}, existErr: false, @@ -111,12 +94,9 @@ func TestWorkStatusController_Reconcile(t *testing.T) { { name: "work not exists", c: WorkStatusController{ - Client: fake.NewClientBuilder().WithScheme(gclient.NewSchema()).WithObjects(newCluster("cluster", clusterv1alpha1.ClusterConditionReady, metav1.ConditionTrue)).Build(), - InformerManager: genericmanager.GetInstance(), - PredicateFunc: helper.NewClusterPredicateOnAgent("test"), - ClusterDynamicClientSetFunc: util.NewClusterDynamicClientSetForAgent, - ClusterCacheSyncTimeout: metav1.Duration{}, - RateLimiterOptions: ratelimiterflag.Options{}, + Client: fake.NewClientBuilder().WithScheme(gclient.NewSchema()).WithObjects(testhelper.NewClusterWithTypeAndStatus("cluster", clusterv1alpha1.ClusterConditionReady, metav1.ConditionTrue)).Build(), + PredicateFunc: helper.NewClusterPredicateOnAgent("test"), + RateLimiterOptions: ratelimiterflag.Options{}, }, work: &workv1alpha1.Work{ ObjectMeta: metav1.ObjectMeta{ @@ -132,6 +112,7 @@ func TestWorkStatusController_Reconcile(t *testing.T) { }, }, }, + dFunc: util.NewClusterDynamicClientSetForAgent, ns: "karmada-es-cluster", expectRes: controllerruntime.Result{}, existErr: false, @@ -139,12 +120,9 @@ func TestWorkStatusController_Reconcile(t *testing.T) { { name: "work's DeletionTimestamp isn't zero", c: WorkStatusController{ - Client: fake.NewClientBuilder().WithScheme(gclient.NewSchema()).WithObjects(newCluster("cluster", clusterv1alpha1.ClusterConditionReady, metav1.ConditionTrue)).Build(), - InformerManager: genericmanager.GetInstance(), - PredicateFunc: helper.NewClusterPredicateOnAgent("test"), - ClusterDynamicClientSetFunc: util.NewClusterDynamicClientSetForAgent, - ClusterCacheSyncTimeout: metav1.Duration{}, - RateLimiterOptions: ratelimiterflag.Options{}, + Client: fake.NewClientBuilder().WithScheme(gclient.NewSchema()).WithObjects(testhelper.NewClusterWithTypeAndStatus("cluster", clusterv1alpha1.ClusterConditionReady, metav1.ConditionTrue)).Build(), + PredicateFunc: helper.NewClusterPredicateOnAgent("test"), + RateLimiterOptions: ratelimiterflag.Options{}, }, work: &workv1alpha1.Work{ ObjectMeta: metav1.ObjectMeta{ @@ -161,6 +139,7 @@ func TestWorkStatusController_Reconcile(t *testing.T) { }, }, }, + dFunc: util.NewClusterDynamicClientSetForAgent, ns: "karmada-es-cluster", expectRes: controllerruntime.Result{}, existErr: false, @@ -168,12 +147,9 @@ func TestWorkStatusController_Reconcile(t *testing.T) { { name: "work's status is not applied", c: WorkStatusController{ - Client: fake.NewClientBuilder().WithScheme(gclient.NewSchema()).WithObjects(newCluster("cluster", clusterv1alpha1.ClusterConditionReady, metav1.ConditionTrue)).Build(), - InformerManager: genericmanager.GetInstance(), - PredicateFunc: helper.NewClusterPredicateOnAgent("test"), - ClusterDynamicClientSetFunc: util.NewClusterDynamicClientSetForAgent, - ClusterCacheSyncTimeout: metav1.Duration{}, - RateLimiterOptions: ratelimiterflag.Options{}, + Client: fake.NewClientBuilder().WithScheme(gclient.NewSchema()).WithObjects(testhelper.NewClusterWithTypeAndStatus("cluster", clusterv1alpha1.ClusterConditionReady, metav1.ConditionTrue)).Build(), + PredicateFunc: helper.NewClusterPredicateOnAgent("test"), + RateLimiterOptions: ratelimiterflag.Options{}, }, work: &workv1alpha1.Work{ ObjectMeta: metav1.ObjectMeta{ @@ -189,6 +165,7 @@ func TestWorkStatusController_Reconcile(t *testing.T) { }, }, }, + dFunc: util.NewClusterDynamicClientSetForAgent, ns: "karmada-es-cluster", expectRes: controllerruntime.Result{}, existErr: false, @@ -196,12 +173,9 @@ func TestWorkStatusController_Reconcile(t *testing.T) { { name: "failed to get cluster name", c: WorkStatusController{ - Client: fake.NewClientBuilder().WithScheme(gclient.NewSchema()).WithObjects(newCluster("cluster", clusterv1alpha1.ClusterConditionReady, metav1.ConditionTrue)).Build(), - InformerManager: genericmanager.GetInstance(), - PredicateFunc: helper.NewClusterPredicateOnAgent("test"), - ClusterDynamicClientSetFunc: util.NewClusterDynamicClientSetForAgent, - ClusterCacheSyncTimeout: metav1.Duration{}, - RateLimiterOptions: ratelimiterflag.Options{}, + Client: fake.NewClientBuilder().WithScheme(gclient.NewSchema()).WithObjects(testhelper.NewClusterWithTypeAndStatus("cluster", clusterv1alpha1.ClusterConditionReady, metav1.ConditionTrue)).Build(), + PredicateFunc: helper.NewClusterPredicateOnAgent("test"), + RateLimiterOptions: ratelimiterflag.Options{}, }, work: &workv1alpha1.Work{ ObjectMeta: metav1.ObjectMeta{ @@ -217,6 +191,7 @@ func TestWorkStatusController_Reconcile(t *testing.T) { }, }, }, + dFunc: util.NewClusterDynamicClientSetForAgent, ns: "karmada-cluster", expectRes: controllerruntime.Result{Requeue: true}, existErr: true, @@ -224,12 +199,9 @@ func TestWorkStatusController_Reconcile(t *testing.T) { { name: "failed to get cluster", c: WorkStatusController{ - Client: fake.NewClientBuilder().WithScheme(gclient.NewSchema()).WithObjects(newCluster("cluster1", clusterv1alpha1.ClusterConditionReady, metav1.ConditionTrue)).Build(), - InformerManager: genericmanager.GetInstance(), - PredicateFunc: helper.NewClusterPredicateOnAgent("test"), - ClusterDynamicClientSetFunc: util.NewClusterDynamicClientSetForAgent, - ClusterCacheSyncTimeout: metav1.Duration{}, - RateLimiterOptions: ratelimiterflag.Options{}, + Client: fake.NewClientBuilder().WithScheme(gclient.NewSchema()).WithObjects(testhelper.NewClusterWithTypeAndStatus("cluster1", clusterv1alpha1.ClusterConditionReady, metav1.ConditionTrue)).Build(), + PredicateFunc: helper.NewClusterPredicateOnAgent("test"), + RateLimiterOptions: ratelimiterflag.Options{}, }, work: &workv1alpha1.Work{ ObjectMeta: metav1.ObjectMeta{ @@ -245,6 +217,7 @@ func TestWorkStatusController_Reconcile(t *testing.T) { }, }, }, + dFunc: util.NewClusterDynamicClientSetForAgent, ns: "karmada-es-cluster", expectRes: controllerruntime.Result{Requeue: true}, existErr: true, @@ -252,12 +225,9 @@ func TestWorkStatusController_Reconcile(t *testing.T) { { name: "cluster is not ready", c: WorkStatusController{ - Client: fake.NewClientBuilder().WithScheme(gclient.NewSchema()).WithObjects(newCluster("cluster", clusterv1alpha1.ClusterConditionReady, metav1.ConditionFalse)).Build(), - InformerManager: genericmanager.GetInstance(), - PredicateFunc: helper.NewClusterPredicateOnAgent("test"), - ClusterDynamicClientSetFunc: util.NewClusterDynamicClientSetForAgent, - ClusterCacheSyncTimeout: metav1.Duration{}, - RateLimiterOptions: ratelimiterflag.Options{}, + Client: fake.NewClientBuilder().WithScheme(gclient.NewSchema()).WithObjects(testhelper.NewClusterWithTypeAndStatus("cluster", clusterv1alpha1.ClusterConditionReady, metav1.ConditionFalse)).Build(), + PredicateFunc: helper.NewClusterPredicateOnAgent("test"), + RateLimiterOptions: ratelimiterflag.Options{}, }, work: &workv1alpha1.Work{ ObjectMeta: metav1.ObjectMeta{ @@ -273,6 +243,7 @@ func TestWorkStatusController_Reconcile(t *testing.T) { }, }, }, + dFunc: util.NewClusterDynamicClientSetForAgent, ns: "karmada-es-cluster", expectRes: controllerruntime.Result{Requeue: true}, existErr: true, @@ -281,6 +252,8 @@ func TestWorkStatusController_Reconcile(t *testing.T) { for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { + injectMemberClusterInformer(&tt.c, genericmanager.GetInstance(), tt.dFunc) + req := controllerruntime.Request{ NamespacedName: types.NamespacedName{ Name: "work", @@ -288,7 +261,7 @@ func TestWorkStatusController_Reconcile(t *testing.T) { }, } - if err := tt.c.Client.Create(context.Background(), tt.work); err != nil { + if err := tt.c.Create(context.Background(), tt.work); err != nil { t.Fatalf("Failed to create cluster: %v", err) } @@ -303,39 +276,16 @@ func TestWorkStatusController_Reconcile(t *testing.T) { } } -func TestWorkStatusController_getEventHandler(t *testing.T) { - opt := util.Options{ - Name: "opt", - KeyFunc: nil, - ReconcileFunc: nil, - } - - c := WorkStatusController{ - Client: fake.NewClientBuilder().WithScheme(gclient.NewSchema()).WithObjects(newCluster("cluster", clusterv1alpha1.ClusterConditionReady, metav1.ConditionFalse)).Build(), - InformerManager: genericmanager.GetInstance(), - PredicateFunc: helper.NewClusterPredicateOnAgent("test"), - ClusterDynamicClientSetFunc: util.NewClusterDynamicClientSetForAgent, - ClusterCacheSyncTimeout: metav1.Duration{}, - RateLimiterOptions: ratelimiterflag.Options{}, - eventHandler: nil, - worker: util.NewAsyncWorker(opt), - } - - eventHandler := c.getEventHandler() - assert.NotEmpty(t, eventHandler) -} - func TestWorkStatusController_RunWorkQueue(t *testing.T) { c := WorkStatusController{ - Client: fake.NewClientBuilder().WithScheme(gclient.NewSchema()).WithObjects(newCluster("cluster", clusterv1alpha1.ClusterConditionReady, metav1.ConditionFalse)).Build(), - InformerManager: genericmanager.GetInstance(), - PredicateFunc: helper.NewClusterPredicateOnAgent("test"), - ClusterDynamicClientSetFunc: util.NewClusterDynamicClientSetForAgent, - ClusterCacheSyncTimeout: metav1.Duration{}, - RateLimiterOptions: ratelimiterflag.Options{}, - eventHandler: nil, + Client: fake.NewClientBuilder().WithScheme(gclient.NewSchema()).WithObjects(testhelper.NewClusterWithTypeAndStatus("cluster", clusterv1alpha1.ClusterConditionReady, metav1.ConditionFalse)).Build(), + PredicateFunc: helper.NewClusterPredicateOnAgent("test"), + RateLimiterOptions: ratelimiterflag.Options{}, + eventHandler: nil, } + injectMemberClusterInformer(&c, genericmanager.GetInstance(), util.NewClusterDynamicClientSetForAgent) + c.RunWorkQueue() } @@ -543,7 +493,7 @@ func newPod(workNs, workName string, wrongLabel ...bool) *corev1.Pod { } func TestWorkStatusController_syncWorkStatus(t *testing.T) { - cluster := newCluster("cluster", clusterv1alpha1.ClusterConditionReady, metav1.ConditionFalse) + cluster := testhelper.NewClusterWithTypeAndStatus("cluster", clusterv1alpha1.ClusterConditionReady, metav1.ConditionFalse) workName := "work" workNs := "karmada-es-cluster" @@ -555,18 +505,8 @@ func TestWorkStatusController_syncWorkStatus(t *testing.T) { controllerWithoutInformer bool workWithRigntNS bool expectedError bool - workWithDeletionTimestamp bool wrongWorkNS bool }{ - { - name: "failed to exec NeedUpdate", - obj: newPodObj("karmada-es-cluster"), - pod: newPod(workNs, workName), - raw: []byte(`{"apiVersion":"v1","kind":"Pod","metadata":{"name":"pod","namespace":"default"}}`), - controllerWithoutInformer: true, - workWithRigntNS: true, - expectedError: true, - }, { name: "invalid key, wrong WorkNamespaceLabel in obj", obj: newPodObj("karmada-cluster"), @@ -611,25 +551,6 @@ func TestWorkStatusController_syncWorkStatus(t *testing.T) { workWithRigntNS: false, expectedError: false, }, - { - name: "failed to getRawManifest, wrong Manifests in work", - obj: newPodObj("karmada-es-cluster"), - pod: newPod(workNs, workName), - raw: []byte(`{"apiVersion":"v1","kind":"Pod","metadata":{"name":"pod1","namespace":"default"}}`), - controllerWithoutInformer: true, - workWithRigntNS: true, - expectedError: true, - }, - { - name: "failed to exec GetClusterName, wrong workNamespace", - obj: newPodObj("karmada-es-cluster"), - pod: newPod(workNs, workName), - raw: []byte(`{"apiVersion":"v1","kind":"Pod","metadata":{"name":"pod","namespace":"default"}}`), - controllerWithoutInformer: true, - workWithRigntNS: true, - expectedError: true, - wrongWorkNS: true, - }, } for _, tt := range tests { @@ -655,13 +576,9 @@ func TestWorkStatusController_syncWorkStatus(t *testing.T) { var work *workv1alpha1.Work if tt.workWithRigntNS { - work = newWork(workName, workNs, tt.raw) + work = testhelper.NewWork(workName, workNs, tt.raw) } else { - work = newWork(workName, fmt.Sprintf("%v-test", workNs), tt.raw) - } - - if tt.workWithDeletionTimestamp { - work = newWork(workName, workNs, tt.raw, false) + work = testhelper.NewWork(workName, fmt.Sprintf("%v-test", workNs), tt.raw) } key, _ := generateKey(tt.obj) @@ -680,22 +597,30 @@ func TestWorkStatusController_syncWorkStatus(t *testing.T) { } } +func injectMemberClusterInformer(c *WorkStatusController, + informerManager genericmanager.MultiClusterInformerManager, + clusterDynamicClientSetFunc func(clusterName string, client client.Client) (*util.DynamicClusterClient, error), +) { + r := memberclusterinformer.NewMemberClusterInformer(c.Client, newRESTMapper(), informerManager, metav1.Duration{}, clusterDynamicClientSetFunc) + c.MemberClusterInformer = r +} + +func newRESTMapper() meta.RESTMapper { + m := meta.NewDefaultRESTMapper([]schema.GroupVersion{corev1.SchemeGroupVersion}) + m.Add(corev1.SchemeGroupVersion.WithKind("Pod"), meta.RESTScopeNamespace) + return m +} + func newWorkStatusController(cluster *clusterv1alpha1.Cluster, dynamicClientSets ...*dynamicfake.FakeDynamicClient) WorkStatusController { c := WorkStatusController{ - Client: fake.NewClientBuilder().WithScheme(gclient.NewSchema()).WithObjects(cluster).Build(), - InformerManager: genericmanager.GetInstance(), - PredicateFunc: helper.NewClusterPredicateOnAgent("test"), - ClusterDynamicClientSetFunc: util.NewClusterDynamicClientSetForAgent, - ClusterCacheSyncTimeout: metav1.Duration{}, - RateLimiterOptions: ratelimiterflag.Options{}, - eventHandler: nil, - RESTMapper: func() meta.RESTMapper { - m := meta.NewDefaultRESTMapper([]schema.GroupVersion{corev1.SchemeGroupVersion}) - m.Add(corev1.SchemeGroupVersion.WithKind("Pod"), meta.RESTScopeNamespace) - return m - }(), + Client: fake.NewClientBuilder().WithScheme(gclient.NewSchema()).WithObjects(cluster).Build(), + PredicateFunc: helper.NewClusterPredicateOnAgent("test"), + RateLimiterOptions: ratelimiterflag.Options{}, + eventHandler: nil, } + informerManager := genericmanager.GetInstance() + if len(dynamicClientSets) > 0 { clusterName := cluster.Name dynamicClientSet := dynamicClientSets[0] @@ -709,194 +634,22 @@ func newWorkStatusController(cluster *clusterv1alpha1.Cluster, dynamicClientSets serviceLister := sharedFactory.Core().V1().Services().Lister() c.ResourceInterpreter = resourceinterpreter.NewResourceInterpreter(controlPlaneInformerManager, serviceLister) - c.ObjectWatcher = objectwatcher.NewObjectWatcher(c.Client, c.RESTMapper, util.NewClusterDynamicClientSetForAgent, c.ResourceInterpreter) // Generate InformerManager m := genericmanager.NewMultiClusterInformerManager(stopCh) m.ForCluster(clusterName, dynamicClientSet, 0).Lister(corev1.SchemeGroupVersion.WithResource("pods")) // register pod informer m.Start(clusterName) m.WaitForCacheSync(clusterName) - c.InformerManager = m + informerManager = m } + injectMemberClusterInformer(&c, informerManager, util.NewClusterDynamicClientSetForAgent) + return c } -func newWork(workName, workNs string, raw []byte, deletionTimestampIsZero ...bool) *workv1alpha1.Work { - work := &workv1alpha1.Work{ - ObjectMeta: metav1.ObjectMeta{ - Name: workName, - Namespace: workNs, - }, - Spec: workv1alpha1.WorkSpec{ - Workload: workv1alpha1.WorkloadTemplate{ - Manifests: []workv1alpha1.Manifest{ - {RawExtension: runtime.RawExtension{ - Raw: raw, - }, - }, - }, - }, - }, - } - - if len(deletionTimestampIsZero) > 0 && !deletionTimestampIsZero[0] { - work.DeletionTimestamp = &metav1.Time{Time: time.Now()} - } - - return work -} - -func TestWorkStatusController_getSingleClusterManager(t *testing.T) { - clusterName := "cluster" - cluster := newCluster(clusterName, clusterv1alpha1.ClusterConditionReady, metav1.ConditionTrue) - - // Generate InformerManager - stopCh := make(chan struct{}) - defer close(stopCh) - - dynamicClientSet := dynamicfake.NewSimpleDynamicClient(scheme.Scheme) - - tests := []struct { - name string - rightClusterName bool - expectInformer bool - expectError bool - wrongClusterDynamicClientSetFunc bool - }{ - { - name: "normal case", - rightClusterName: true, - expectInformer: true, - expectError: false, - }, - { - name: "failed to build dynamic cluster client", - rightClusterName: false, - expectInformer: false, - expectError: true, - wrongClusterDynamicClientSetFunc: true, - }, - { - name: "failed to get single cluster", - rightClusterName: false, - expectInformer: true, - expectError: false, - }, - } - - for _, tt := range tests { - t.Run(tt.name, func(t *testing.T) { - c := newWorkStatusController(cluster) - m := genericmanager.NewMultiClusterInformerManager(stopCh) - if tt.rightClusterName { - m.ForCluster(clusterName, dynamicClientSet, 0).Lister(corev1.SchemeGroupVersion.WithResource("pods")) - } else { - m.ForCluster("test", dynamicClientSet, 0).Lister(corev1.SchemeGroupVersion.WithResource("pods")) - } - m.Start(clusterName) - m.WaitForCacheSync(clusterName) - c.InformerManager = m - - if tt.wrongClusterDynamicClientSetFunc { - c.ClusterDynamicClientSetFunc = NewClusterDynamicClientSetForAgentWithError - } else { - c.ClusterDynamicClientSetFunc = util.NewClusterDynamicClientSet - c.Client = fake.NewClientBuilder().WithScheme(gclient.NewSchema()).WithObjects( - &clusterv1alpha1.Cluster{ - ObjectMeta: metav1.ObjectMeta{Name: "cluster"}, - Spec: clusterv1alpha1.ClusterSpec{ - APIEndpoint: "https://127.0.0.1", - SecretRef: &clusterv1alpha1.LocalSecretReference{Namespace: "ns1", Name: "secret1"}, - InsecureSkipTLSVerification: true, - }, - Status: clusterv1alpha1.ClusterStatus{ - Conditions: []metav1.Condition{ - { - Type: clusterv1alpha1.ClusterConditionReady, - Status: metav1.ConditionTrue, - }, - }, - }, - }, - &corev1.Secret{ - ObjectMeta: metav1.ObjectMeta{Namespace: "ns1", Name: "secret1"}, - Data: map[string][]byte{clusterv1alpha1.SecretTokenKey: []byte("token")}, - }).Build() - } - - informerManager, err := c.getSingleClusterManager(cluster) - - if tt.expectInformer { - assert.NotEmpty(t, informerManager) - } else { - assert.Empty(t, informerManager) - } - - if tt.expectError { - assert.NotEmpty(t, err) - } else { - assert.Empty(t, err) - } - }) - } -} - -func TestWorkStatusController_recreateResourceIfNeeded(t *testing.T) { - c := WorkStatusController{ - Client: fake.NewClientBuilder().WithScheme(gclient.NewSchema()).WithObjects(newCluster("cluster", clusterv1alpha1.ClusterConditionReady, metav1.ConditionTrue)).Build(), - InformerManager: genericmanager.GetInstance(), - PredicateFunc: helper.NewClusterPredicateOnAgent("test"), - ClusterDynamicClientSetFunc: util.NewClusterDynamicClientSetForAgent, - ClusterCacheSyncTimeout: metav1.Duration{}, - RateLimiterOptions: ratelimiterflag.Options{}, - } - - raw := []byte(`{"apiVersion":"v1","kind":"Pod","metadata":{"name":"pod","namespace":"default"}}`) - work := newWork("work", "default", raw) - - obj := &unstructured.Unstructured{ - Object: map[string]interface{}{ - "apiVersion": "v1", - "kind": "Pod", - "metadata": map[string]interface{}{ - "name": "pod1", - "namespace": "default", - "labels": map[string]interface{}{ - workv1alpha1.WorkNamespaceLabel: "karmada-es-cluster", - }, - }, - }, - } - - key, _ := generateKey(obj) - - fedKey, ok := key.(keys.FederatedKey) - if !ok { - t.Fatalf("Invalid key, key: %v", key) - } - - t.Run("normal case", func(t *testing.T) { - err := c.recreateResourceIfNeeded(work, fedKey) - assert.Empty(t, err) - }) - - t.Run("failed to UnmarshalJSON", func(t *testing.T) { - work.Spec.Workload.Manifests[0].RawExtension.Raw = []byte(`{"apiVersion":"v1","kind":"Pod","metadata":{"name":"pod","namespace":"default"}},`) - err := c.recreateResourceIfNeeded(work, fedKey) - assert.NotEmpty(t, err) - }) -} - func TestWorkStatusController_buildStatusIdentifier(t *testing.T) { - c := WorkStatusController{ - Client: fake.NewClientBuilder().WithScheme(gclient.NewSchema()).WithObjects(newCluster("cluster", clusterv1alpha1.ClusterConditionReady, metav1.ConditionTrue)).Build(), - InformerManager: genericmanager.GetInstance(), - PredicateFunc: helper.NewClusterPredicateOnAgent("test"), - ClusterDynamicClientSetFunc: util.NewClusterDynamicClientSetForAgent, - ClusterCacheSyncTimeout: metav1.Duration{}, - RateLimiterOptions: ratelimiterflag.Options{}, - } + c := WorkStatusController{} work := &workv1alpha1.Work{ ObjectMeta: metav1.ObjectMeta{ @@ -936,7 +689,7 @@ func TestWorkStatusController_buildStatusIdentifier(t *testing.T) { }) t.Run("failed to GetManifestIndex", func(t *testing.T) { - wrongClusterObj, _ := helper.ToUnstructured(newCluster("cluster", clusterv1alpha1.ClusterConditionReady, metav1.ConditionTrue)) + wrongClusterObj, _ := helper.ToUnstructured(testhelper.NewClusterWithTypeAndStatus("cluster", clusterv1alpha1.ClusterConditionReady, metav1.ConditionTrue)) wrongClusterJson, _ := json.Marshal(wrongClusterObj) work.Spec.Workload.Manifests = []workv1alpha1.Manifest{ { @@ -950,14 +703,7 @@ func TestWorkStatusController_buildStatusIdentifier(t *testing.T) { } func TestWorkStatusController_mergeStatus(t *testing.T) { - c := WorkStatusController{ - Client: fake.NewClientBuilder().WithScheme(gclient.NewSchema()).WithObjects(newCluster("cluster", clusterv1alpha1.ClusterConditionReady, metav1.ConditionTrue)).Build(), - InformerManager: genericmanager.GetInstance(), - PredicateFunc: helper.NewClusterPredicateOnAgent("test"), - ClusterDynamicClientSetFunc: util.NewClusterDynamicClientSetForAgent, - ClusterCacheSyncTimeout: metav1.Duration{}, - RateLimiterOptions: ratelimiterflag.Options{}, - } + c := WorkStatusController{} newStatus := workv1alpha1.ManifestStatus{ Health: "health", @@ -965,60 +711,3 @@ func TestWorkStatusController_mergeStatus(t *testing.T) { actual := c.mergeStatus([]workv1alpha1.ManifestStatus{}, newStatus) assert.Equal(t, []workv1alpha1.ManifestStatus{newStatus}, actual) } - -func TestWorkStatusController_registerInformersAndStart(t *testing.T) { - clusterName := "cluster" - cluster := newCluster(clusterName, clusterv1alpha1.ClusterConditionReady, metav1.ConditionTrue) - - // Generate InformerManager - stopCh := make(chan struct{}) - defer close(stopCh) - dynamicClientSet := dynamicfake.NewSimpleDynamicClient(scheme.Scheme) - c := newWorkStatusController(cluster) - opt := util.Options{ - Name: "opt", - KeyFunc: nil, - ReconcileFunc: nil, - } - c.worker = util.NewAsyncWorker(opt) - - raw := []byte(`{"apiVersion":"v1","kind":"Pod","metadata":{"name":"pod","namespace":"default"}}`) - work := newWork("work", "default", raw) - - t.Run("normal case", func(t *testing.T) { - m := genericmanager.NewMultiClusterInformerManager(stopCh) - m.ForCluster(clusterName, dynamicClientSet, 0).Lister(corev1.SchemeGroupVersion.WithResource("pods")) // register pod informer - m.Start(clusterName) - m.WaitForCacheSync(clusterName) - c.InformerManager = m - - err := c.registerInformersAndStart(cluster, work) - assert.Empty(t, err) - }) - - t.Run("failed to getSingleClusterManager", func(t *testing.T) { - c := newWorkStatusController(cluster) - m := genericmanager.NewMultiClusterInformerManager(stopCh) - m.ForCluster("test", dynamicClientSet, 0).Lister(corev1.SchemeGroupVersion.WithResource("pods")) // register pod informer - m.Start(clusterName) - m.WaitForCacheSync(clusterName) - c.InformerManager = m - c.ClusterDynamicClientSetFunc = NewClusterDynamicClientSetForAgentWithError - - err := c.registerInformersAndStart(cluster, work) - assert.NotEmpty(t, err) - }) - - t.Run("failed to getGVRsFromWork", func(t *testing.T) { - work.Spec.Workload.Manifests[0].RawExtension.Raw = []byte(`{"apiVersion":"v1","kind":"Pod","metadata":{"name":"pod","namespace":"default"}},`) - - m := genericmanager.NewMultiClusterInformerManager(stopCh) - m.ForCluster(clusterName, dynamicClientSet, 0).Lister(corev1.SchemeGroupVersion.WithResource("pods")) // register pod informer - m.Start(clusterName) - m.WaitForCacheSync(clusterName) - c.InformerManager = m - - err := c.registerInformersAndStart(cluster, work) - assert.NotEmpty(t, err) - }) -} diff --git a/pkg/resourceinterpreter/fake/fake.go b/pkg/resourceinterpreter/fake/fake.go new file mode 100644 index 000000000..7ace16e41 --- /dev/null +++ b/pkg/resourceinterpreter/fake/fake.go @@ -0,0 +1,158 @@ +package fake + +import ( + "context" + + "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/runtime/schema" + + configv1alpha1 "github.com/karmada-io/karmada/pkg/apis/config/v1alpha1" + workv1alpha2 "github.com/karmada-io/karmada/pkg/apis/work/v1alpha2" + "github.com/karmada-io/karmada/pkg/resourceinterpreter" +) + +var _ resourceinterpreter.ResourceInterpreter = &FakeInterpreter{} + +type ( + GetReplicasFunc = func(*unstructured.Unstructured) (int32, *workv1alpha2.ReplicaRequirements, error) + ReviseReplicaFunc = func(*unstructured.Unstructured, int64) (*unstructured.Unstructured, error) + RetainFunc = func(*unstructured.Unstructured, *unstructured.Unstructured) (*unstructured.Unstructured, error) + AggregateStatusFunc = func(*unstructured.Unstructured, []workv1alpha2.AggregatedStatusItem) (*unstructured.Unstructured, error) + GetDependenciesFunc = func(*unstructured.Unstructured) ([]configv1alpha1.DependentObjectReference, error) + ReflectStatusFunc = func(*unstructured.Unstructured) (*runtime.RawExtension, error) + InterpretHealthFunc = func(*unstructured.Unstructured) (bool, error) +) + +type FakeInterpreter struct { + getReplicasFunc map[schema.GroupVersionKind]GetReplicasFunc + reviseReplicaFunc map[schema.GroupVersionKind]ReviseReplicaFunc + retainFunc map[schema.GroupVersionKind]RetainFunc + aggregateStatusFunc map[schema.GroupVersionKind]AggregateStatusFunc + getDependenciesFunc map[schema.GroupVersionKind]GetDependenciesFunc + reflectStatusFunc map[schema.GroupVersionKind]ReflectStatusFunc + interpretHealthFunc map[schema.GroupVersionKind]InterpretHealthFunc +} + +func (f *FakeInterpreter) Start(ctx context.Context) (err error) { + return nil +} + +func (f *FakeInterpreter) HookEnabled(objGVK schema.GroupVersionKind, operationType configv1alpha1.InterpreterOperation) bool { + var exist bool + switch operationType { + case configv1alpha1.InterpreterOperationInterpretReplica: + _, exist = f.getReplicasFunc[objGVK] + case configv1alpha1.InterpreterOperationReviseReplica: + _, exist = f.reviseReplicaFunc[objGVK] + case configv1alpha1.InterpreterOperationRetain: + _, exist = f.retainFunc[objGVK] + case configv1alpha1.InterpreterOperationAggregateStatus: + _, exist = f.aggregateStatusFunc[objGVK] + case configv1alpha1.InterpreterOperationInterpretDependency: + _, exist = f.getDependenciesFunc[objGVK] + case configv1alpha1.InterpreterOperationInterpretStatus: + _, exist = f.reflectStatusFunc[objGVK] + case configv1alpha1.InterpreterOperationInterpretHealth: + _, exist = f.interpretHealthFunc[objGVK] + default: + exist = false + } + + return exist +} + +func (f *FakeInterpreter) GetReplicas(object *unstructured.Unstructured) (replica int32, requires *workv1alpha2.ReplicaRequirements, err error) { + return f.getReplicasFunc[object.GetObjectKind().GroupVersionKind()](object) +} + +func (f *FakeInterpreter) ReviseReplica(object *unstructured.Unstructured, replica int64) (*unstructured.Unstructured, error) { + return f.reviseReplicaFunc[object.GetObjectKind().GroupVersionKind()](object, replica) +} + +func (f *FakeInterpreter) Retain(desired *unstructured.Unstructured, observed *unstructured.Unstructured) (*unstructured.Unstructured, error) { + return f.retainFunc[observed.GetObjectKind().GroupVersionKind()](desired, observed) +} + +func (f *FakeInterpreter) AggregateStatus(object *unstructured.Unstructured, aggregatedStatusItems []workv1alpha2.AggregatedStatusItem) (*unstructured.Unstructured, error) { + return f.aggregateStatusFunc[object.GetObjectKind().GroupVersionKind()](object, aggregatedStatusItems) +} + +func (f *FakeInterpreter) GetDependencies(object *unstructured.Unstructured) (dependencies []configv1alpha1.DependentObjectReference, err error) { + return f.getDependenciesFunc[object.GetObjectKind().GroupVersionKind()](object) +} + +func (f *FakeInterpreter) ReflectStatus(object *unstructured.Unstructured) (status *runtime.RawExtension, err error) { + return f.reflectStatusFunc[object.GetObjectKind().GroupVersionKind()](object) +} + +func (f *FakeInterpreter) InterpretHealth(object *unstructured.Unstructured) (healthy bool, err error) { + return f.interpretHealthFunc[object.GetObjectKind().GroupVersionKind()](object) +} + +func NewFakeInterpreter() *FakeInterpreter { + return &FakeInterpreter{} +} + +func (f *FakeInterpreter) WithGetReplicas(objGVK schema.GroupVersionKind, iFunc GetReplicasFunc) *FakeInterpreter { + if f.getReplicasFunc == nil { + f.getReplicasFunc = make(map[schema.GroupVersionKind]GetReplicasFunc) + } + f.getReplicasFunc[objGVK] = iFunc + + return f +} + +func (f *FakeInterpreter) WithReviseReplica(objGVK schema.GroupVersionKind, iFunc ReviseReplicaFunc) *FakeInterpreter { + if f.reviseReplicaFunc == nil { + f.reviseReplicaFunc = make(map[schema.GroupVersionKind]ReviseReplicaFunc) + } + f.reviseReplicaFunc[objGVK] = iFunc + + return f +} + +func (f *FakeInterpreter) WithRetain(objGVK schema.GroupVersionKind, iFunc RetainFunc) *FakeInterpreter { + if f.retainFunc == nil { + f.retainFunc = make(map[schema.GroupVersionKind]RetainFunc) + } + f.retainFunc[objGVK] = iFunc + + return f +} + +func (f *FakeInterpreter) WithAggregateStatus(objGVK schema.GroupVersionKind, iFunc AggregateStatusFunc) *FakeInterpreter { + if f.aggregateStatusFunc == nil { + f.aggregateStatusFunc = make(map[schema.GroupVersionKind]AggregateStatusFunc) + } + f.aggregateStatusFunc[objGVK] = iFunc + + return f +} + +func (f *FakeInterpreter) WithGetDependencies(objGVK schema.GroupVersionKind, iFunc GetDependenciesFunc) *FakeInterpreter { + if f.getDependenciesFunc == nil { + f.getDependenciesFunc = make(map[schema.GroupVersionKind]GetDependenciesFunc) + } + f.getDependenciesFunc[objGVK] = iFunc + + return f +} + +func (f *FakeInterpreter) WithReflectStatus(objGVK schema.GroupVersionKind, iFunc ReflectStatusFunc) *FakeInterpreter { + if f.reflectStatusFunc == nil { + f.reflectStatusFunc = make(map[schema.GroupVersionKind]ReflectStatusFunc) + } + f.reflectStatusFunc[objGVK] = iFunc + + return f +} + +func (f *FakeInterpreter) WithInterpretHealth(objGVK schema.GroupVersionKind, iFunc InterpretHealthFunc) *FakeInterpreter { + if f.interpretHealthFunc == nil { + f.interpretHealthFunc = make(map[schema.GroupVersionKind]InterpretHealthFunc) + } + f.interpretHealthFunc[objGVK] = iFunc + + return f +} diff --git a/pkg/util/fedinformer/handlers.go b/pkg/util/fedinformer/handlers.go index 7ff76b092..c5b630ac5 100644 --- a/pkg/util/fedinformer/handlers.go +++ b/pkg/util/fedinformer/handlers.go @@ -1,39 +1,9 @@ package fedinformer import ( - "reflect" - - "k8s.io/apimachinery/pkg/runtime" "k8s.io/client-go/tools/cache" ) -// NewHandlerOnAllEvents builds a ResourceEventHandler that the function 'fn' will be called on all events(add/update/delete). -func NewHandlerOnAllEvents(fn func(runtime.Object)) cache.ResourceEventHandler { - return &cache.ResourceEventHandlerFuncs{ - AddFunc: func(cur interface{}) { - curObj := cur.(runtime.Object) - fn(curObj) - }, - UpdateFunc: func(old, cur interface{}) { - curObj := cur.(runtime.Object) - if !reflect.DeepEqual(old, cur) { - fn(curObj) - } - }, - DeleteFunc: func(old interface{}) { - if deleted, ok := old.(cache.DeletedFinalStateUnknown); ok { - // This object might be stale but ok for our current usage. - old = deleted.Obj - if old == nil { - return - } - } - oldObj := old.(runtime.Object) - fn(oldObj) - }, - } -} - // NewHandlerOnEvents builds a ResourceEventHandler. func NewHandlerOnEvents(addFunc func(obj interface{}), updateFunc func(oldObj, newObj interface{}), deleteFunc func(obj interface{})) cache.ResourceEventHandler { return &cache.ResourceEventHandlerFuncs{ diff --git a/pkg/util/lifted/objectwatcher.go b/pkg/util/lifted/objectwatcher.go index 574ee652b..1da18659a 100644 --- a/pkg/util/lifted/objectwatcher.go +++ b/pkg/util/lifted/objectwatcher.go @@ -40,21 +40,21 @@ const ( // ObjectVersion retrieves the field type-prefixed value used for // determining currency of the given cluster object. -func ObjectVersion(clusterObj *unstructured.Unstructured) string { - generation := clusterObj.GetGeneration() +func ObjectVersion(obj *unstructured.Unstructured) string { + generation := obj.GetGeneration() if generation != 0 { return fmt.Sprintf("%s%d", generationPrefix, generation) } - return fmt.Sprintf("%s%s", resourceVersionPrefix, clusterObj.GetResourceVersion()) + return fmt.Sprintf("%s%s", resourceVersionPrefix, obj.GetResourceVersion()) } // +lifted:source=https://github.com/kubernetes-sigs/kubefed/blob/master/pkg/controller/util/propagatedversion.go#L45-L59 // ObjectNeedsUpdate determines whether the 2 objects provided cluster -// object needs to be updated according to the desired object and the +// object needs to be updated according to the old object and the // recorded version. -func ObjectNeedsUpdate(desiredObj, clusterObj *unstructured.Unstructured, recordedVersion string) bool { - targetVersion := ObjectVersion(clusterObj) +func ObjectNeedsUpdate(oldObj, currentObj *unstructured.Unstructured, recordedVersion string) bool { + targetVersion := ObjectVersion(currentObj) if recordedVersion != targetVersion { return true @@ -63,7 +63,7 @@ func ObjectNeedsUpdate(desiredObj, clusterObj *unstructured.Unstructured, record // If versions match and the version is sourced from the // generation field, a further check of metadata equivalency is // required. - return strings.HasPrefix(targetVersion, generationPrefix) && !objectMetaObjEquivalent(desiredObj, clusterObj) + return strings.HasPrefix(targetVersion, generationPrefix) && !objectMetaObjEquivalent(oldObj, currentObj) } // +lifted:source=https://github.com/kubernetes-retired/kubefed/blob/master/pkg/controller/util/meta.go#L82-L103 diff --git a/pkg/util/memberclusterinformer/memberclusterinformer.go b/pkg/util/memberclusterinformer/memberclusterinformer.go new file mode 100644 index 000000000..9da08ecfb --- /dev/null +++ b/pkg/util/memberclusterinformer/memberclusterinformer.go @@ -0,0 +1,156 @@ +package memberclusterinformer + +import ( + "fmt" + + "k8s.io/apimachinery/pkg/api/meta" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" + "k8s.io/apimachinery/pkg/runtime/schema" + "k8s.io/apimachinery/pkg/util/sets" + "k8s.io/client-go/tools/cache" + "k8s.io/klog/v2" + "sigs.k8s.io/controller-runtime/pkg/client" + + clusterv1alpha1 "github.com/karmada-io/karmada/pkg/apis/cluster/v1alpha1" + workv1alpha1 "github.com/karmada-io/karmada/pkg/apis/work/v1alpha1" + "github.com/karmada-io/karmada/pkg/util" + "github.com/karmada-io/karmada/pkg/util/fedinformer/genericmanager" + "github.com/karmada-io/karmada/pkg/util/fedinformer/keys" + "github.com/karmada-io/karmada/pkg/util/helper" + "github.com/karmada-io/karmada/pkg/util/restmapper" +) + +var _ MemberClusterInformer = &memberClusterInformerImpl{} + +type MemberClusterInformer interface { + // BuildResourceInformers builds informer dynamically for managed resources in member cluster. + // The created informer watches resource change and then sync to the relevant Work object. + BuildResourceInformers(cluster *clusterv1alpha1.Cluster, work *workv1alpha1.Work, handler cache.ResourceEventHandler) error + + // GetObjectFromCache returns object in cache by federated key. + GetObjectFromCache(fedKey keys.FederatedKey) (*unstructured.Unstructured, error) +} + +func NewMemberClusterInformer( + c client.Client, + restMapper meta.RESTMapper, + informerManager genericmanager.MultiClusterInformerManager, + clusterCacheSyncTimeout metav1.Duration, + clusterDynamicClientSetFunc func(clusterName string, client client.Client) (*util.DynamicClusterClient, error), +) MemberClusterInformer { + return &memberClusterInformerImpl{ + Client: c, + restMapper: restMapper, + informerManager: informerManager, + clusterDynamicClientSetFunc: clusterDynamicClientSetFunc, + clusterCacheSyncTimeout: clusterCacheSyncTimeout, + } +} + +type memberClusterInformerImpl struct { + client.Client // used to get Cluster and Secret resources. + restMapper meta.RESTMapper + informerManager genericmanager.MultiClusterInformerManager + clusterDynamicClientSetFunc func(clusterName string, client client.Client) (*util.DynamicClusterClient, error) + clusterCacheSyncTimeout metav1.Duration +} + +// BuildResourceInformers builds informer dynamically for managed resources in member cluster. +// The created informer watches resource change and then sync to the relevant Work object. +func (m *memberClusterInformerImpl) BuildResourceInformers(cluster *clusterv1alpha1.Cluster, work *workv1alpha1.Work, handler cache.ResourceEventHandler) error { + err := m.registerInformersAndStart(cluster, work, handler) + if err != nil { + klog.Errorf("Failed to register informer for Work %s/%s. Error: %v.", work.GetNamespace(), work.GetName(), err) + return err + } + return nil +} + +// registerInformersAndStart builds informer manager for cluster if it doesn't exist, then constructs informers for gvr +// and start it. +func (m *memberClusterInformerImpl) registerInformersAndStart(cluster *clusterv1alpha1.Cluster, work *workv1alpha1.Work, handler cache.ResourceEventHandler) error { + singleClusterInformerManager, err := m.getSingleClusterManager(cluster) + if err != nil { + return err + } + + gvrTargets, err := m.getGVRsFromWork(work) + if err != nil { + return err + } + + allSynced := true + for _, gvr := range gvrTargets { + if !singleClusterInformerManager.IsInformerSynced(gvr) || !singleClusterInformerManager.IsHandlerExist(gvr, handler) { + allSynced = false + singleClusterInformerManager.ForResource(gvr, handler) + } + } + if allSynced { + return nil + } + + m.informerManager.Start(cluster.Name) + + if err := func() error { + synced := m.informerManager.WaitForCacheSyncWithTimeout(cluster.Name, m.clusterCacheSyncTimeout.Duration) + if synced == nil { + return fmt.Errorf("no informerFactory for cluster %s exist", cluster.Name) + } + for _, gvr := range gvrTargets { + if !synced[gvr] { + return fmt.Errorf("informer for %s hasn't synced", gvr) + } + } + return nil + }(); err != nil { + klog.Errorf("Failed to sync cache for cluster: %s, error: %v", cluster.Name, err) + m.informerManager.Stop(cluster.Name) + return err + } + + return nil +} + +// getSingleClusterManager gets singleClusterInformerManager with clusterName. +// If manager is not exist, create it, otherwise gets it from map. +func (m *memberClusterInformerImpl) getSingleClusterManager(cluster *clusterv1alpha1.Cluster) (genericmanager.SingleClusterInformerManager, error) { + // TODO(chenxianpao): If cluster A is removed, then a new cluster that name also is A joins karmada, + // the cache in informer manager should be updated. + singleClusterInformerManager := m.informerManager.GetSingleClusterManager(cluster.Name) + if singleClusterInformerManager == nil { + dynamicClusterClient, err := m.clusterDynamicClientSetFunc(cluster.Name, m.Client) + if err != nil { + klog.Errorf("Failed to build dynamic cluster client for cluster %s.", cluster.Name) + return nil, err + } + singleClusterInformerManager = m.informerManager.ForCluster(dynamicClusterClient.ClusterName, dynamicClusterClient.DynamicClientSet, 0) + } + return singleClusterInformerManager, nil +} + +// getGVRsFromWork traverses the manifests in work to find groupVersionResource list. +func (m *memberClusterInformerImpl) getGVRsFromWork(work *workv1alpha1.Work) ([]schema.GroupVersionResource, error) { + gvrTargets := sets.New[schema.GroupVersionResource]() + for _, manifest := range work.Spec.Workload.Manifests { + workload := &unstructured.Unstructured{} + err := workload.UnmarshalJSON(manifest.Raw) + if err != nil { + klog.Errorf("Failed to unmarshal workload. Error: %v.", err) + return nil, err + } + gvr, err := restmapper.GetGroupVersionResource(m.restMapper, workload.GroupVersionKind()) + if err != nil { + klog.Errorf("Failed to get GVR from GVK for resource %s/%s. Error: %v.", workload.GetNamespace(), workload.GetName(), err) + return nil, err + } + gvrTargets.Insert(gvr) + } + return gvrTargets.UnsortedList(), nil +} + +// GetObjectFromCache returns object in cache by federated key. +func (m *memberClusterInformerImpl) GetObjectFromCache(fedKey keys.FederatedKey) (*unstructured.Unstructured, error) { + return helper.GetObjectFromCache(m.restMapper, m.informerManager, fedKey) +} diff --git a/pkg/util/memberclusterinformer/memberclusterinformer_test.go b/pkg/util/memberclusterinformer/memberclusterinformer_test.go new file mode 100644 index 000000000..1bf7d0539 --- /dev/null +++ b/pkg/util/memberclusterinformer/memberclusterinformer_test.go @@ -0,0 +1,187 @@ +package memberclusterinformer + +import ( + "fmt" + "testing" + + "github.com/stretchr/testify/assert" + corev1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/api/meta" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime/schema" + dynamicfake "k8s.io/client-go/dynamic/fake" + "k8s.io/client-go/kubernetes/scheme" + "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/client/fake" + + clusterv1alpha1 "github.com/karmada-io/karmada/pkg/apis/cluster/v1alpha1" + "github.com/karmada-io/karmada/pkg/util" + "github.com/karmada-io/karmada/pkg/util/fedinformer" + "github.com/karmada-io/karmada/pkg/util/fedinformer/genericmanager" + "github.com/karmada-io/karmada/pkg/util/gclient" + testhelper "github.com/karmada-io/karmada/test/helper" +) + +func Test_getSingleClusterManager(t *testing.T) { + clusterName := "cluster" + cluster := testhelper.NewClusterWithTypeAndStatus(clusterName, clusterv1alpha1.ClusterConditionReady, metav1.ConditionTrue) + + // Generate InformerManager + stopCh := make(chan struct{}) + defer close(stopCh) + + dynamicClientSet := dynamicfake.NewSimpleDynamicClient(scheme.Scheme) + + tests := []struct { + name string + rightClusterName bool + expectInformer bool + expectError bool + wrongClusterDynamicClientSetFunc bool + }{ + { + name: "normal case", + rightClusterName: true, + expectInformer: true, + expectError: false, + }, + { + name: "failed to build dynamic cluster client", + rightClusterName: false, + expectInformer: false, + expectError: true, + wrongClusterDynamicClientSetFunc: true, + }, + { + name: "failed to get single cluster", + rightClusterName: false, + expectInformer: true, + expectError: false, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + c := newMemberClusterInformer(cluster) + m := genericmanager.NewMultiClusterInformerManager(stopCh) + if tt.rightClusterName { + m.ForCluster(clusterName, dynamicClientSet, 0).Lister(corev1.SchemeGroupVersion.WithResource("pods")) + } else { + m.ForCluster("test", dynamicClientSet, 0).Lister(corev1.SchemeGroupVersion.WithResource("pods")) + } + m.Start(clusterName) + m.WaitForCacheSync(clusterName) + c.informerManager = m + + if tt.wrongClusterDynamicClientSetFunc { + c.clusterDynamicClientSetFunc = NewClusterDynamicClientSetForAgentWithError + } else { + c.clusterDynamicClientSetFunc = util.NewClusterDynamicClientSet + c.Client = fake.NewClientBuilder().WithScheme(gclient.NewSchema()).WithObjects( + &clusterv1alpha1.Cluster{ + ObjectMeta: metav1.ObjectMeta{Name: "cluster"}, + Spec: clusterv1alpha1.ClusterSpec{ + APIEndpoint: "https://127.0.0.1", + SecretRef: &clusterv1alpha1.LocalSecretReference{Namespace: "ns1", Name: "secret1"}, + InsecureSkipTLSVerification: true, + }, + Status: clusterv1alpha1.ClusterStatus{ + Conditions: []metav1.Condition{ + { + Type: clusterv1alpha1.ClusterConditionReady, + Status: metav1.ConditionTrue, + }, + }, + }, + }, + &corev1.Secret{ + ObjectMeta: metav1.ObjectMeta{Namespace: "ns1", Name: "secret1"}, + Data: map[string][]byte{clusterv1alpha1.SecretTokenKey: []byte("token")}, + }).Build() + } + + informerManager, err := c.getSingleClusterManager(cluster) + + if tt.expectInformer { + assert.NotEmpty(t, informerManager) + } else { + assert.Empty(t, informerManager) + } + + if tt.expectError { + assert.NotEmpty(t, err) + } else { + assert.Empty(t, err) + } + }) + } +} + +func Test_registerInformersAndStart(t *testing.T) { + clusterName := "cluster" + cluster := testhelper.NewClusterWithTypeAndStatus(clusterName, clusterv1alpha1.ClusterConditionReady, metav1.ConditionTrue) + + // Generate InformerManager + stopCh := make(chan struct{}) + defer close(stopCh) + dynamicClientSet := dynamicfake.NewSimpleDynamicClient(scheme.Scheme) + c := newMemberClusterInformer(cluster) + + raw := []byte(`{"apiVersion":"v1","kind":"Pod","metadata":{"name":"pod","namespace":"default"}}`) + work := testhelper.NewWork("work", "default", raw) + + eventHandler := fedinformer.NewHandlerOnEvents(nil, nil, nil) + + t.Run("normal case", func(t *testing.T) { + m := genericmanager.NewMultiClusterInformerManager(stopCh) + m.ForCluster(clusterName, dynamicClientSet, 0).Lister(corev1.SchemeGroupVersion.WithResource("pods")) // register pod informer + m.Start(clusterName) + m.WaitForCacheSync(clusterName) + c.informerManager = m + + err := c.registerInformersAndStart(cluster, work, eventHandler) + assert.Empty(t, err) + }) + + t.Run("failed to getSingleClusterManager", func(t *testing.T) { + c := newMemberClusterInformer(cluster) + m := genericmanager.NewMultiClusterInformerManager(stopCh) + m.ForCluster("test", dynamicClientSet, 0).Lister(corev1.SchemeGroupVersion.WithResource("pods")) // register pod informer + m.Start(clusterName) + m.WaitForCacheSync(clusterName) + c.informerManager = m + c.clusterDynamicClientSetFunc = NewClusterDynamicClientSetForAgentWithError + + err := c.registerInformersAndStart(cluster, work, eventHandler) + assert.NotEmpty(t, err) + }) + + t.Run("failed to getGVRsFromWork", func(t *testing.T) { + work.Spec.Workload.Manifests[0].RawExtension.Raw = []byte(`{"apiVersion":"v1","kind":"Pod","metadata":{"name":"pod","namespace":"default"}},`) + + m := genericmanager.NewMultiClusterInformerManager(stopCh) + m.ForCluster(clusterName, dynamicClientSet, 0).Lister(corev1.SchemeGroupVersion.WithResource("pods")) // register pod informer + m.Start(clusterName) + m.WaitForCacheSync(clusterName) + c.informerManager = m + + err := c.registerInformersAndStart(cluster, work, eventHandler) + assert.NotEmpty(t, err) + }) +} + +func NewClusterDynamicClientSetForAgentWithError(clusterName string, client client.Client) (*util.DynamicClusterClient, error) { + return nil, fmt.Errorf("err") +} + +func newMemberClusterInformer(cluster *clusterv1alpha1.Cluster) *memberClusterInformerImpl { + mapper := meta.NewDefaultRESTMapper([]schema.GroupVersion{corev1.SchemeGroupVersion}) + mapper.Add(corev1.SchemeGroupVersion.WithKind("Pod"), meta.RESTScopeNamespace) + + return &memberClusterInformerImpl{ + Client: fake.NewClientBuilder().WithScheme(gclient.NewSchema()).WithObjects(cluster).Build(), + restMapper: mapper, + clusterCacheSyncTimeout: metav1.Duration{}, + clusterDynamicClientSetFunc: util.NewClusterDynamicClientSetForAgent, + } +} diff --git a/pkg/util/objectwatcher/objectwatcher.go b/pkg/util/objectwatcher/objectwatcher.go index 2c3c3f364..4d9b76f0c 100644 --- a/pkg/util/objectwatcher/objectwatcher.go +++ b/pkg/util/objectwatcher/objectwatcher.go @@ -9,6 +9,7 @@ import ( "k8s.io/apimachinery/pkg/api/meta" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" + "k8s.io/client-go/util/retry" "k8s.io/klog/v2" "sigs.k8s.io/controller-runtime/pkg/client" @@ -17,9 +18,6 @@ import ( workv1alpha2 "github.com/karmada-io/karmada/pkg/apis/work/v1alpha2" "github.com/karmada-io/karmada/pkg/resourceinterpreter" "github.com/karmada-io/karmada/pkg/util" - "github.com/karmada-io/karmada/pkg/util/fedinformer/genericmanager" - "github.com/karmada-io/karmada/pkg/util/fedinformer/keys" - "github.com/karmada-io/karmada/pkg/util/helper" "github.com/karmada-io/karmada/pkg/util/lifted" "github.com/karmada-io/karmada/pkg/util/restmapper" ) @@ -29,31 +27,36 @@ type ObjectWatcher interface { Create(clusterName string, desireObj *unstructured.Unstructured) error Update(clusterName string, desireObj, clusterObj *unstructured.Unstructured) error Delete(clusterName string, desireObj *unstructured.Unstructured) error - NeedsUpdate(clusterName string, desiredObj, clusterObj *unstructured.Unstructured) (bool, error) + NeedsUpdate(clusterName string, oldObj, currentObj *unstructured.Unstructured) bool } // ClientSetFunc is used to generate client set of member cluster type ClientSetFunc func(c string, client client.Client) (*util.DynamicClusterClient, error) +type versionFunc func() (objectVersion string, err error) + +type versionWithLock struct { + lock sync.RWMutex + version string +} + type objectWatcherImpl struct { Lock sync.RWMutex RESTMapper meta.RESTMapper KubeClientSet client.Client - VersionRecord map[string]map[string]string + VersionRecord map[string]map[string]*versionWithLock ClusterClientSetFunc ClientSetFunc resourceInterpreter resourceinterpreter.ResourceInterpreter - InformerManager genericmanager.MultiClusterInformerManager } // NewObjectWatcher returns an instance of ObjectWatcher func NewObjectWatcher(kubeClientSet client.Client, restMapper meta.RESTMapper, clusterClientSetFunc ClientSetFunc, interpreter resourceinterpreter.ResourceInterpreter) ObjectWatcher { return &objectWatcherImpl{ KubeClientSet: kubeClientSet, - VersionRecord: make(map[string]map[string]string), + VersionRecord: make(map[string]map[string]*versionWithLock), RESTMapper: restMapper, ClusterClientSetFunc: clusterClientSetFunc, resourceInterpreter: interpreter, - InformerManager: genericmanager.GetInstance(), } } @@ -70,30 +73,16 @@ func (o *objectWatcherImpl) Create(clusterName string, desireObj *unstructured.U return err } - fedKey, err := keys.FederatedKeyFunc(clusterName, desireObj) + clusterObj, err := dynamicClusterClient.DynamicClientSet.Resource(gvr).Namespace(desireObj.GetNamespace()).Create(context.TODO(), desireObj, metav1.CreateOptions{}) if err != nil { - klog.Errorf("Failed to get FederatedKey %s, error: %v", desireObj.GetName(), err) + klog.Errorf("Failed to create resource(kind=%s, %s/%s) in cluster %s, err is %v ", desireObj.GetKind(), desireObj.GetNamespace(), desireObj.GetName(), clusterName, err) return err } - _, err = helper.GetObjectFromCache(o.RESTMapper, o.InformerManager, fedKey) - if err != nil { - if !apierrors.IsNotFound(err) { - klog.Errorf("Failed to get resource %v from member cluster, err is %v ", desireObj.GetName(), err) - return err - } - clusterObj, err := dynamicClusterClient.DynamicClientSet.Resource(gvr).Namespace(desireObj.GetNamespace()).Create(context.TODO(), desireObj, metav1.CreateOptions{}) - if err != nil { - klog.Errorf("Failed to create resource(kind=%s, %s/%s) in cluster %s, err is %v ", desireObj.GetKind(), desireObj.GetNamespace(), desireObj.GetName(), clusterName, err) - return err - } + klog.Infof("Created resource(kind=%s, %s/%s) on cluster: %s", desireObj.GetKind(), desireObj.GetNamespace(), desireObj.GetName(), clusterName) - klog.Infof("Created resource(kind=%s, %s/%s) on cluster: %s", desireObj.GetKind(), desireObj.GetNamespace(), desireObj.GetName(), clusterName) - // record version - o.recordVersion(clusterObj, dynamicClusterClient.ClusterName) - } - - return nil + // record version + return o.recordVersionWithVersionFunc(clusterObj, dynamicClusterClient.ClusterName, func() (string, error) { return lifted.ObjectVersion(clusterObj), nil }) } func (o *objectWatcherImpl) retainClusterFields(desired, observed *unstructured.Unstructured) (*unstructured.Unstructured, error) { @@ -144,22 +133,48 @@ func (o *objectWatcherImpl) Update(clusterName string, desireObj, clusterObj *un return err } - desireObj, err = o.retainClusterFields(desireObj, clusterObj) - if err != nil { - klog.Errorf("Failed to retain fields for resource(kind=%s, %s/%s) in cluster %s: %v", clusterObj.GetKind(), clusterObj.GetNamespace(), clusterObj.GetName(), clusterName, err) - return err - } + var errMsg string + var desireCopy, resource *unstructured.Unstructured + err = retry.RetryOnConflict(retry.DefaultBackoff, func() error { + desireCopy = desireObj.DeepCopy() + if err != nil { + clusterObj, err = dynamicClusterClient.DynamicClientSet.Resource(gvr).Namespace(desireObj.GetNamespace()).Get(context.TODO(), desireObj.GetName(), metav1.GetOptions{}) + if err != nil { + errMsg = fmt.Sprintf("Failed to get resource(kind=%s, %s/%s) in cluster %s, err is %v ", desireObj.GetKind(), desireObj.GetNamespace(), desireObj.GetName(), clusterName, err) + return err + } + } + + desireCopy, err = o.retainClusterFields(desireCopy, clusterObj) + if err != nil { + errMsg = fmt.Sprintf("Failed to retain fields for resource(kind=%s, %s/%s) in cluster %s: %v", clusterObj.GetKind(), clusterObj.GetNamespace(), clusterObj.GetName(), clusterName, err) + return err + } + + versionFuncWithUpdate := func() (string, error) { + resource, err = dynamicClusterClient.DynamicClientSet.Resource(gvr).Namespace(desireObj.GetNamespace()).Update(context.TODO(), desireCopy, metav1.UpdateOptions{}) + if err != nil { + return "", err + } + + return lifted.ObjectVersion(resource), nil + } + err = o.recordVersionWithVersionFunc(desireCopy, clusterName, versionFuncWithUpdate) + if err == nil { + return nil + } + + errMsg = fmt.Sprintf("Failed to update resource(kind=%s, %s/%s) in cluster %s, err is %v ", desireObj.GetKind(), desireObj.GetNamespace(), desireObj.GetName(), clusterName, err) + return err + }) - resource, err := dynamicClusterClient.DynamicClientSet.Resource(gvr).Namespace(desireObj.GetNamespace()).Update(context.TODO(), desireObj, metav1.UpdateOptions{}) if err != nil { - klog.Errorf("Failed to update resource(kind=%s, %s/%s) in cluster %s, err is %v ", desireObj.GetKind(), desireObj.GetNamespace(), desireObj.GetName(), clusterName, err) + klog.Errorf(errMsg) return err } klog.Infof("Updated resource(kind=%s, %s/%s) on cluster: %s", desireObj.GetKind(), desireObj.GetNamespace(), desireObj.GetName(), clusterName) - // record version - o.recordVersion(resource, clusterName) return nil } @@ -193,8 +208,7 @@ func (o *objectWatcherImpl) Delete(clusterName string, desireObj *unstructured.U } klog.Infof("Deleted resource(kind=%s, %s/%s) on cluster: %s", desireObj.GetKind(), desireObj.GetNamespace(), desireObj.GetName(), clusterName) - objectKey := o.genObjectKey(desireObj) - o.deleteVersionRecord(dynamicClusterClient.ClusterName, objectKey) + o.deleteVersionRecord(desireObj, dynamicClusterClient.ClusterName) return nil } @@ -203,71 +217,98 @@ func (o *objectWatcherImpl) genObjectKey(obj *unstructured.Unstructured) string return obj.GroupVersionKind().String() + "/" + obj.GetNamespace() + "/" + obj.GetName() } -// recordVersion will add or update resource version records -func (o *objectWatcherImpl) recordVersion(clusterObj *unstructured.Unstructured, clusterName string) { - objVersion := lifted.ObjectVersion(clusterObj) - objectKey := o.genObjectKey(clusterObj) - if o.isClusterVersionRecordExist(clusterName) { - o.updateVersionRecord(clusterName, objectKey, objVersion) - } else { - o.addVersionRecord(clusterName, objectKey, objVersion) - } -} - -// isClusterVersionRecordExist checks if the version record map of given member cluster exist -func (o *objectWatcherImpl) isClusterVersionRecordExist(clusterName string) bool { - o.Lock.RLock() - defer o.Lock.RUnlock() - - _, exist := o.VersionRecord[clusterName] - - return exist +// recordVersion will add or update resource version records with the version returned by versionFunc +func (o *objectWatcherImpl) recordVersionWithVersionFunc(obj *unstructured.Unstructured, clusterName string, fn versionFunc) error { + objectKey := o.genObjectKey(obj) + return o.addOrUpdateVersionRecordWithVersionFunc(clusterName, objectKey, fn) } // getVersionRecord will return the recorded version of given resource(if exist) func (o *objectWatcherImpl) getVersionRecord(clusterName, resourceName string) (string, bool) { + versionLock, exist := o.getVersionWithLockRecord(clusterName, resourceName) + if !exist { + return "", false + } + + versionLock.lock.RLock() + defer versionLock.lock.RUnlock() + + return versionLock.version, true +} + +// getVersionRecordWithLock will return the recorded versionWithLock of given resource(if exist) +func (o *objectWatcherImpl) getVersionWithLockRecord(clusterName, resourceName string) (*versionWithLock, bool) { o.Lock.RLock() defer o.Lock.RUnlock() - version, exist := o.VersionRecord[clusterName][resourceName] - return version, exist + versionLock, exist := o.VersionRecord[clusterName][resourceName] + return versionLock, exist } -// addVersionRecord will add new version record of given resource -func (o *objectWatcherImpl) addVersionRecord(clusterName, resourceName, version string) { +// newVersionWithLockRecord will add new versionWithLock record of given resource +func (o *objectWatcherImpl) newVersionWithLockRecord(clusterName, resourceName string) *versionWithLock { o.Lock.Lock() defer o.Lock.Unlock() - o.VersionRecord[clusterName] = map[string]string{resourceName: version} + + v, exist := o.VersionRecord[clusterName][resourceName] + if exist { + return v + } + + v = &versionWithLock{} + if o.VersionRecord[clusterName] == nil { + o.VersionRecord[clusterName] = map[string]*versionWithLock{} + } + + o.VersionRecord[clusterName][resourceName] = v + + return v } -// updateVersionRecord will update the recorded version of given resource -func (o *objectWatcherImpl) updateVersionRecord(clusterName, resourceName, version string) { - o.Lock.Lock() - defer o.Lock.Unlock() - o.VersionRecord[clusterName][resourceName] = version +// addOrUpdateVersionRecordWithVersionFunc will add or update the recorded version of given resource with version returned by versionFunc +func (o *objectWatcherImpl) addOrUpdateVersionRecordWithVersionFunc(clusterName, resourceName string, fn versionFunc) error { + versionLock, exist := o.getVersionWithLockRecord(clusterName, resourceName) + + if !exist { + versionLock = o.newVersionWithLockRecord(clusterName, resourceName) + } + + versionLock.lock.Lock() + defer versionLock.lock.Unlock() + + version, err := fn() + if err != nil { + return err + } + + versionLock.version = version + + return nil } // deleteVersionRecord will delete the recorded version of given resource -func (o *objectWatcherImpl) deleteVersionRecord(clusterName, resourceName string) { +func (o *objectWatcherImpl) deleteVersionRecord(obj *unstructured.Unstructured, clusterName string) { + objectKey := o.genObjectKey(obj) + o.Lock.Lock() defer o.Lock.Unlock() - delete(o.VersionRecord[clusterName], resourceName) + delete(o.VersionRecord[clusterName], objectKey) } -func (o *objectWatcherImpl) NeedsUpdate(clusterName string, desiredObj, clusterObj *unstructured.Unstructured) (bool, error) { - // get resource version - version, exist := o.getVersionRecord(clusterName, desiredObj.GroupVersionKind().String()+"/"+desiredObj.GetNamespace()+"/"+desiredObj.GetName()) +func (o *objectWatcherImpl) NeedsUpdate(clusterName string, oldObj, currentObj *unstructured.Unstructured) bool { + // get resource version, and if no recorded version, that means the object hasn't been processed yet since last restart, it should be processed now. + version, exist := o.getVersionRecord(clusterName, o.genObjectKey(oldObj)) if !exist { - klog.Errorf("Failed to update resource(kind=%s, %s/%s) in cluster %s for the version record does not exist", desiredObj.GetKind(), desiredObj.GetNamespace(), desiredObj.GetName(), clusterName) - return false, fmt.Errorf("failed to update resource(kind=%s, %s/%s) in cluster %s for the version record does not exist", desiredObj.GetKind(), desiredObj.GetNamespace(), desiredObj.GetName(), clusterName) + return true } - return lifted.ObjectNeedsUpdate(desiredObj, clusterObj, version), nil + return lifted.ObjectNeedsUpdate(oldObj, currentObj, version) } func (o *objectWatcherImpl) allowUpdate(clusterName string, desiredObj, clusterObj *unstructured.Unstructured) bool { // If the existing resource is managed by Karmada, then the updating is allowed. - if util.GetLabelValue(desiredObj.GetLabels(), workv1alpha1.WorkNameLabel) == util.GetLabelValue(clusterObj.GetLabels(), workv1alpha1.WorkNameLabel) { + if util.GetLabelValue(desiredObj.GetLabels(), workv1alpha1.WorkNameLabel) == util.GetLabelValue(clusterObj.GetLabels(), workv1alpha1.WorkNameLabel) && + util.GetLabelValue(desiredObj.GetLabels(), workv1alpha1.WorkNamespaceLabel) == util.GetLabelValue(clusterObj.GetLabels(), workv1alpha1.WorkNamespaceLabel) { return true } diff --git a/pkg/util/objectwatcher/objectwatcher_test.go b/pkg/util/objectwatcher/objectwatcher_test.go new file mode 100644 index 000000000..581a1c32c --- /dev/null +++ b/pkg/util/objectwatcher/objectwatcher_test.go @@ -0,0 +1,173 @@ +package objectwatcher + +import ( + "context" + "encoding/json" + "testing" + + "github.com/stretchr/testify/assert" + appsv1 "k8s.io/api/apps/v1" + "k8s.io/apimachinery/pkg/api/meta" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" + "k8s.io/apimachinery/pkg/runtime/schema" + "k8s.io/client-go/dynamic" + "k8s.io/client-go/dynamic/dynamicinformer" + dynamicfake "k8s.io/client-go/dynamic/fake" + "k8s.io/client-go/kubernetes/scheme" + "k8s.io/client-go/tools/cache" + "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/client/fake" + + workv1alpha1 "github.com/karmada-io/karmada/pkg/apis/work/v1alpha1" + interpreterfake "github.com/karmada-io/karmada/pkg/resourceinterpreter/fake" + "github.com/karmada-io/karmada/pkg/util" + "github.com/karmada-io/karmada/pkg/util/gclient" + "github.com/karmada-io/karmada/pkg/util/names" +) + +func TestObjectWatcher_NeedsUpdate(t *testing.T) { + deployGVR := appsv1.SchemeGroupVersion.WithResource("deployments") + tests := []struct { + name string + createInternal bool + updateInternal bool + expected bool + }{ + { + name: "true, empty record", + createInternal: false, + updateInternal: false, + expected: true, + }, + { + name: "false, update from internal", + createInternal: true, + updateInternal: true, + expected: false, + }, + { + name: "true, update from client", + createInternal: true, + updateInternal: false, + expected: true, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + resChan := make(chan bool) + dynamicClientSet := dynamicfake.NewSimpleDynamicClient(scheme.Scheme) + o := newObjectWatcher(dynamicClientSet) + + informer := dynamicinformer.NewDynamicSharedInformerFactory(dynamicClientSet, 0) + _, err := informer.ForResource(deployGVR).Informer().AddEventHandler(newEventHandlerWithResultChan(o, resChan)) + if err != nil { + t.Fatalf("Failed to add event handler: %v", err) + } + informer.Start(ctx.Done()) + informer.WaitForCacheSync(ctx.Done()) + + clusterDeploy := newDeploymentObj(1, 1) + if tt.createInternal { + err = o.Create("cluster", clusterDeploy) + if err != nil { + t.Fatalf("Failed to create deploy from internal: %v", err) + } + } else { + _, err = dynamicClientSet.Resource(deployGVR).Namespace(clusterDeploy.GetNamespace()).Create(ctx, clusterDeploy, metav1.CreateOptions{}) + if err != nil { + t.Fatalf("Failed to create deploy from client: %v", err) + } + } + + newDeploy := newDeploymentObj(2, 2) + if tt.updateInternal { + err = o.Update("cluster", newDeploy, clusterDeploy) + if err != nil { + t.Fatalf("Failed to update deploy from internal: %v", err) + } + } else { + _, err = dynamicClientSet.Resource(deployGVR).Namespace(newDeploy.GetNamespace()).Update(ctx, newDeploy, metav1.UpdateOptions{}) + if err != nil { + t.Fatalf("Failed to update deploy from client: %v", err) + } + } + + res := <-resChan + assert.Equal(t, tt.expected, res) + }) + } +} + +func newDeploymentObj(replicas int32, generation int64) *unstructured.Unstructured { + deployObj := map[string]interface{}{ + "apiVersion": "apps/v1", + "kind": "Deployment", + "metadata": map[string]interface{}{ + "name": "deployment", + "namespace": "default", + "labels": map[string]interface{}{ + workv1alpha1.WorkNamespaceLabel: "karmada-es-cluster", + workv1alpha1.WorkNameLabel: "work", + }, + "generation": generation, + }, + "spec": map[string]interface{}{ + "replicas": &replicas, + }, + } + + deployBytes, _ := json.Marshal(deployObj) + deployUnstructured := &unstructured.Unstructured{} + _ = deployUnstructured.UnmarshalJSON(deployBytes) + + return deployUnstructured +} + +func newRESTMapper() meta.RESTMapper { + m := meta.NewDefaultRESTMapper([]schema.GroupVersion{appsv1.SchemeGroupVersion}) + m.Add(appsv1.SchemeGroupVersion.WithKind("Deployment"), meta.RESTScopeNamespace) + return m +} + +func newObjectWatcher(dynamicClientSets ...dynamic.Interface) ObjectWatcher { + c := fake.NewClientBuilder().WithScheme(gclient.NewSchema()).Build() + + clusterDynamicClientSetFunc := util.NewClusterDynamicClientSetForAgent + + if len(dynamicClientSets) > 0 { + clusterDynamicClientSetFunc = newClusterDynamicClientSetForAgent("cluster", dynamicClientSets[0]) + } + + return NewObjectWatcher(c, newRESTMapper(), clusterDynamicClientSetFunc, interpreterfake.NewFakeInterpreter()) +} + +func newClusterDynamicClientSetForAgent(clusterName string, dynamicClientSet dynamic.Interface) func(string, client.Client) (*util.DynamicClusterClient, error) { + return func(string, client.Client) (*util.DynamicClusterClient, error) { + return &util.DynamicClusterClient{ + ClusterName: clusterName, + DynamicClientSet: dynamicClientSet, + }, nil + } +} + +func newEventHandlerWithResultChan(o ObjectWatcher, resChan chan<- bool) cache.ResourceEventHandler { + return &cache.ResourceEventHandlerFuncs{ + UpdateFunc: func(oldObj, newObj interface{}) { + objOld := oldObj.(*unstructured.Unstructured) + objNew := newObj.(*unstructured.Unstructured) + + res := false + clusterName, _ := names.GetClusterName(util.GetLabelValue(objNew.GetLabels(), workv1alpha1.WorkNamespaceLabel)) + if clusterName != "" { + res = o.NeedsUpdate(clusterName, objOld, objNew) + } + + resChan <- res + }, + } +} diff --git a/test/helper/resource.go b/test/helper/resource.go index adb0cc198..5e8d37433 100644 --- a/test/helper/resource.go +++ b/test/helper/resource.go @@ -14,6 +14,7 @@ import ( "k8s.io/apimachinery/pkg/api/resource" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" + "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/util/intstr" "k8s.io/utils/pointer" @@ -21,6 +22,7 @@ import ( workloadv1alpha1 "github.com/karmada-io/karmada/examples/customresourceinterpreter/apis/workload/v1alpha1" autoscalingv1alpha1 "github.com/karmada-io/karmada/pkg/apis/autoscaling/v1alpha1" clusterv1alpha1 "github.com/karmada-io/karmada/pkg/apis/cluster/v1alpha1" + workv1alpha1 "github.com/karmada-io/karmada/pkg/apis/work/v1alpha1" ) // These are different resource units. @@ -608,6 +610,24 @@ func NewClusterWithResource(name string, allocatable, allocating, allocated core } } +// NewClusterWithTypeAndStatus will build a Cluster with type and status. +func NewClusterWithTypeAndStatus(name string, clusterType string, clusterStatus metav1.ConditionStatus) *clusterv1alpha1.Cluster { + return &clusterv1alpha1.Cluster{ + ObjectMeta: metav1.ObjectMeta{ + Name: name, + }, + Spec: clusterv1alpha1.ClusterSpec{}, + Status: clusterv1alpha1.ClusterStatus{ + Conditions: []metav1.Condition{ + { + Type: clusterType, + Status: clusterStatus, + }, + }, + }, + } +} + // NewWorkload will build a workload object. func NewWorkload(namespace, name string) *workloadv1alpha1.Workload { podLabels := map[string]string{"app": "nginx"} @@ -868,3 +888,25 @@ func NewPodDisruptionBudget(namespace, name string, maxUnAvailable intstr.IntOrS }, } } + +// NewWork will build a new Work object. +func NewWork(workName, workNs string, raw []byte) *workv1alpha1.Work { + work := &workv1alpha1.Work{ + ObjectMeta: metav1.ObjectMeta{ + Name: workName, + Namespace: workNs, + }, + Spec: workv1alpha1.WorkSpec{ + Workload: workv1alpha1.WorkloadTemplate{ + Manifests: []workv1alpha1.Manifest{ + {RawExtension: runtime.RawExtension{ + Raw: raw, + }, + }, + }, + }, + }, + } + + return work +}