Merge pull request #4016 from ctripcloud/revert-3808

Revert "Merge pull request #3808 from ctripcloud/refactor-execution-workstatus"
This commit is contained in:
karmada-bot 2023-08-31 17:31:43 +08:00 committed by GitHub
commit e78e86e259
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
15 changed files with 765 additions and 2075 deletions

View File

@ -44,7 +44,6 @@ import (
"github.com/karmada-io/karmada/pkg/util/fedinformer/typedmanager"
"github.com/karmada-io/karmada/pkg/util/gclient"
"github.com/karmada-io/karmada/pkg/util/helper"
"github.com/karmada-io/karmada/pkg/util/memberclusterinformer"
"github.com/karmada-io/karmada/pkg/util/names"
"github.com/karmada-io/karmada/pkg/util/objectwatcher"
"github.com/karmada-io/karmada/pkg/util/restmapper"
@ -218,7 +217,7 @@ func run(ctx context.Context, opts *options.Options) error {
crtlmetrics.Registry.MustRegister(metrics.ResourceCollectorsForAgent()...)
crtlmetrics.Registry.MustRegister(metrics.PoolCollectors()...)
if err = setupControllers(ctx, controllerManager, opts); err != nil {
if err = setupControllers(controllerManager, opts, ctx.Done()); err != nil {
return err
}
@ -230,18 +229,18 @@ func run(ctx context.Context, opts *options.Options) error {
return nil
}
func setupControllers(ctx context.Context, mgr controllerruntime.Manager, opts *options.Options) error {
func setupControllers(mgr controllerruntime.Manager, opts *options.Options, stopChan <-chan struct{}) error {
restConfig := mgr.GetConfig()
dynamicClientSet := dynamic.NewForConfigOrDie(restConfig)
controlPlaneInformerManager := genericmanager.NewSingleClusterInformerManager(dynamicClientSet, 0, ctx.Done())
controlPlaneInformerManager := genericmanager.NewSingleClusterInformerManager(dynamicClientSet, 0, stopChan)
controlPlaneKubeClientSet := kubeclientset.NewForConfigOrDie(restConfig)
// We need a service lister to build a resource interpreter with `ClusterIPServiceResolver`
// witch allows connection to the customized interpreter webhook without a cluster DNS service.
sharedFactory := informers.NewSharedInformerFactory(controlPlaneKubeClientSet, 0)
serviceLister := sharedFactory.Core().V1().Services().Lister()
sharedFactory.Start(ctx.Done())
sharedFactory.WaitForCacheSync(ctx.Done())
sharedFactory.Start(stopChan)
sharedFactory.WaitForCacheSync(stopChan)
resourceInterpreter := resourceinterpreter.NewResourceInterpreter(controlPlaneInformerManager, serviceLister)
if err := mgr.Add(resourceInterpreter); err != nil {
@ -249,11 +248,8 @@ func setupControllers(ctx context.Context, mgr controllerruntime.Manager, opts *
}
objectWatcher := objectwatcher.NewObjectWatcher(mgr.GetClient(), mgr.GetRESTMapper(), util.NewClusterDynamicClientSetForAgent, resourceInterpreter)
memberClusterInformer := memberclusterinformer.NewMemberClusterInformer(mgr.GetClient(), mgr.GetRESTMapper(), genericmanager.GetInstance(), opts.ClusterCacheSyncTimeout, util.NewClusterDynamicClientSetForAgent)
controllerContext := controllerscontext.Context{
Mgr: mgr,
Ctx: ctx,
ObjectWatcher: objectWatcher,
Opts: controllerscontext.Options{
Controllers: opts.Controllers,
@ -273,9 +269,8 @@ func setupControllers(ctx context.Context, mgr controllerruntime.Manager, opts *
CertRotationRemainingTimeThreshold: opts.CertRotationRemainingTimeThreshold,
KarmadaKubeconfigNamespace: opts.KarmadaKubeconfigNamespace,
},
StopChan: ctx.Done(),
ResourceInterpreter: resourceInterpreter,
MemberClusterInformer: memberClusterInformer,
StopChan: stopChan,
ResourceInterpreter: resourceInterpreter,
}
if err := controllers.StartControllers(controllerContext, controllersDisabledByDefault); err != nil {
@ -284,7 +279,7 @@ func setupControllers(ctx context.Context, mgr controllerruntime.Manager, opts *
// Ensure the InformerManager stops when the stop channel closes
go func() {
<-ctx.Done()
<-stopChan
genericmanager.StopInstance()
}()
@ -320,17 +315,14 @@ func startClusterStatusController(ctx controllerscontext.Context) (bool, error)
func startExecutionController(ctx controllerscontext.Context) (bool, error) {
executionController := &execution.Controller{
Ctx: ctx.Ctx,
Client: ctx.Mgr.GetClient(),
EventRecorder: ctx.Mgr.GetEventRecorderFor(execution.ControllerName),
ObjectWatcher: ctx.ObjectWatcher,
PredicateFunc: helper.NewExecutionPredicateOnAgent(),
RatelimiterOptions: ctx.Opts.RateLimiterOptions,
ConcurrentWorkSyncs: ctx.Opts.ConcurrentWorkSyncs,
StopChan: ctx.StopChan,
MemberClusterInformer: ctx.MemberClusterInformer,
Client: ctx.Mgr.GetClient(),
EventRecorder: ctx.Mgr.GetEventRecorderFor(execution.ControllerName),
RESTMapper: ctx.Mgr.GetRESTMapper(),
ObjectWatcher: ctx.ObjectWatcher,
PredicateFunc: helper.NewExecutionPredicateOnAgent(),
InformerManager: genericmanager.GetInstance(),
RatelimiterOptions: ctx.Opts.RateLimiterOptions,
}
executionController.RunWorkQueue()
if err := executionController.SetupWithManager(ctx.Mgr); err != nil {
return false, err
}
@ -339,14 +331,18 @@ func startExecutionController(ctx controllerscontext.Context) (bool, error) {
func startWorkStatusController(ctx controllerscontext.Context) (bool, error) {
workStatusController := &status.WorkStatusController{
Client: ctx.Mgr.GetClient(),
EventRecorder: ctx.Mgr.GetEventRecorderFor(status.WorkStatusControllerName),
StopChan: ctx.StopChan,
PredicateFunc: helper.NewExecutionPredicateOnAgent(),
ConcurrentWorkStatusSyncs: ctx.Opts.ConcurrentWorkSyncs,
RateLimiterOptions: ctx.Opts.RateLimiterOptions,
ResourceInterpreter: ctx.ResourceInterpreter,
MemberClusterInformer: ctx.MemberClusterInformer,
Client: ctx.Mgr.GetClient(),
EventRecorder: ctx.Mgr.GetEventRecorderFor(status.WorkStatusControllerName),
RESTMapper: ctx.Mgr.GetRESTMapper(),
InformerManager: genericmanager.GetInstance(),
StopChan: ctx.StopChan,
ObjectWatcher: ctx.ObjectWatcher,
PredicateFunc: helper.NewExecutionPredicateOnAgent(),
ClusterDynamicClientSetFunc: util.NewClusterDynamicClientSetForAgent,
ClusterCacheSyncTimeout: ctx.Opts.ClusterCacheSyncTimeout,
ConcurrentWorkStatusSyncs: ctx.Opts.ConcurrentWorkSyncs,
RateLimiterOptions: ctx.Opts.RateLimiterOptions,
ResourceInterpreter: ctx.ResourceInterpreter,
}
workStatusController.RunWorkQueue()
if err := workStatusController.SetupWithManager(ctx.Mgr); err != nil {

View File

@ -63,7 +63,6 @@ import (
"github.com/karmada-io/karmada/pkg/util/fedinformer/typedmanager"
"github.com/karmada-io/karmada/pkg/util/gclient"
"github.com/karmada-io/karmada/pkg/util/helper"
"github.com/karmada-io/karmada/pkg/util/memberclusterinformer"
"github.com/karmada-io/karmada/pkg/util/objectwatcher"
"github.com/karmada-io/karmada/pkg/util/overridemanager"
"github.com/karmada-io/karmada/pkg/util/restmapper"
@ -171,7 +170,7 @@ func Run(ctx context.Context, opts *options.Options) error {
crtlmetrics.Registry.MustRegister(metrics.ResourceCollectors()...)
crtlmetrics.Registry.MustRegister(metrics.PoolCollectors()...)
setupControllers(ctx, controllerManager, opts)
setupControllers(controllerManager, opts, ctx.Done())
// blocks until the context is done.
if err := controllerManager.Start(ctx); err != nil {
@ -370,17 +369,14 @@ func startBindingStatusController(ctx controllerscontext.Context) (enabled bool,
func startExecutionController(ctx controllerscontext.Context) (enabled bool, err error) {
executionController := &execution.Controller{
Ctx: ctx.Ctx,
Client: ctx.Mgr.GetClient(),
EventRecorder: ctx.Mgr.GetEventRecorderFor(execution.ControllerName),
ObjectWatcher: ctx.ObjectWatcher,
PredicateFunc: helper.NewExecutionPredicate(ctx.Mgr),
RatelimiterOptions: ctx.Opts.RateLimiterOptions,
ConcurrentWorkSyncs: ctx.Opts.ConcurrentWorkSyncs,
StopChan: ctx.StopChan,
MemberClusterInformer: ctx.MemberClusterInformer,
Client: ctx.Mgr.GetClient(),
EventRecorder: ctx.Mgr.GetEventRecorderFor(execution.ControllerName),
RESTMapper: ctx.Mgr.GetRESTMapper(),
ObjectWatcher: ctx.ObjectWatcher,
PredicateFunc: helper.NewExecutionPredicate(ctx.Mgr),
InformerManager: genericmanager.GetInstance(),
RatelimiterOptions: ctx.Opts.RateLimiterOptions,
}
executionController.RunWorkQueue()
if err := executionController.SetupWithManager(ctx.Mgr); err != nil {
return false, err
}
@ -390,14 +386,18 @@ func startExecutionController(ctx controllerscontext.Context) (enabled bool, err
func startWorkStatusController(ctx controllerscontext.Context) (enabled bool, err error) {
opts := ctx.Opts
workStatusController := &status.WorkStatusController{
Client: ctx.Mgr.GetClient(),
EventRecorder: ctx.Mgr.GetEventRecorderFor(status.WorkStatusControllerName),
StopChan: ctx.StopChan,
PredicateFunc: helper.NewExecutionPredicate(ctx.Mgr),
ConcurrentWorkStatusSyncs: opts.ConcurrentWorkSyncs,
RateLimiterOptions: ctx.Opts.RateLimiterOptions,
ResourceInterpreter: ctx.ResourceInterpreter,
MemberClusterInformer: ctx.MemberClusterInformer,
Client: ctx.Mgr.GetClient(),
EventRecorder: ctx.Mgr.GetEventRecorderFor(status.WorkStatusControllerName),
RESTMapper: ctx.Mgr.GetRESTMapper(),
InformerManager: genericmanager.GetInstance(),
StopChan: ctx.StopChan,
ObjectWatcher: ctx.ObjectWatcher,
PredicateFunc: helper.NewExecutionPredicate(ctx.Mgr),
ClusterDynamicClientSetFunc: util.NewClusterDynamicClientSet,
ClusterCacheSyncTimeout: opts.ClusterCacheSyncTimeout,
ConcurrentWorkStatusSyncs: opts.ConcurrentWorkSyncs,
RateLimiterOptions: ctx.Opts.RateLimiterOptions,
ResourceInterpreter: ctx.ResourceInterpreter,
}
workStatusController.RunWorkQueue()
if err := workStatusController.SetupWithManager(ctx.Mgr); err != nil {
@ -592,7 +592,7 @@ func startCronFederatedHorizontalPodAutoscalerController(ctx controllerscontext.
}
// setupControllers initialize controllers and setup one by one.
func setupControllers(ctx context.Context, mgr controllerruntime.Manager, opts *options.Options) {
func setupControllers(mgr controllerruntime.Manager, opts *options.Options, stopChan <-chan struct{}) {
restConfig := mgr.GetConfig()
dynamicClientSet := dynamic.NewForConfigOrDie(restConfig)
discoverClientSet := discovery.NewDiscoveryClientForConfigOrDie(restConfig)
@ -605,13 +605,13 @@ func setupControllers(ctx context.Context, mgr controllerruntime.Manager, opts *
return
}
controlPlaneInformerManager := genericmanager.NewSingleClusterInformerManager(dynamicClientSet, 0, ctx.Done())
controlPlaneInformerManager := genericmanager.NewSingleClusterInformerManager(dynamicClientSet, 0, stopChan)
// We need a service lister to build a resource interpreter with `ClusterIPServiceResolver`
// witch allows connection to the customized interpreter webhook without a cluster DNS service.
sharedFactory := informers.NewSharedInformerFactory(kubeClientSet, 0)
serviceLister := sharedFactory.Core().V1().Services().Lister()
sharedFactory.Start(ctx.Done())
sharedFactory.WaitForCacheSync(ctx.Done())
sharedFactory.Start(stopChan)
sharedFactory.WaitForCacheSync(stopChan)
resourceInterpreter := resourceinterpreter.NewResourceInterpreter(controlPlaneInformerManager, serviceLister)
if err := mgr.Add(resourceInterpreter); err != nil {
@ -619,7 +619,6 @@ func setupControllers(ctx context.Context, mgr controllerruntime.Manager, opts *
}
objectWatcher := objectwatcher.NewObjectWatcher(mgr.GetClient(), mgr.GetRESTMapper(), util.NewClusterDynamicClientSet, resourceInterpreter)
memberClusterInformer := memberclusterinformer.NewMemberClusterInformer(mgr.GetClient(), mgr.GetRESTMapper(), genericmanager.GetInstance(), opts.ClusterCacheSyncTimeout, util.NewClusterDynamicClientSet)
resourceDetector := &detector.ResourceDetector{
DiscoveryClientSet: discoverClientSet,
@ -654,10 +653,9 @@ func setupControllers(ctx context.Context, mgr controllerruntime.Manager, opts *
klog.Fatalf("Failed to setup dependencies distributor: %v", err)
}
}
setupClusterAPIClusterDetector(mgr, opts, ctx.Done())
setupClusterAPIClusterDetector(mgr, opts, stopChan)
controllerContext := controllerscontext.Context{
Mgr: mgr,
Ctx: ctx,
ObjectWatcher: objectWatcher,
Opts: controllerscontext.Options{
Controllers: opts.Controllers,
@ -681,13 +679,12 @@ func setupControllers(ctx context.Context, mgr controllerruntime.Manager, opts *
EnableClusterResourceModeling: opts.EnableClusterResourceModeling,
HPAControllerConfiguration: opts.HPAControllerConfiguration,
},
StopChan: ctx.Done(),
StopChan: stopChan,
DynamicClientSet: dynamicClientSet,
KubeClientSet: kubeClientSet,
OverrideManager: overrideManager,
ControlPlaneInformerManager: controlPlaneInformerManager,
ResourceInterpreter: resourceInterpreter,
MemberClusterInformer: memberClusterInformer,
}
if err := controllers.StartControllers(controllerContext, controllersDisabledByDefault); err != nil {
@ -696,7 +693,7 @@ func setupControllers(ctx context.Context, mgr controllerruntime.Manager, opts *
// Ensure the InformerManager stops when the stop channel closes
go func() {
<-ctx.Done()
<-stopChan
genericmanager.StopInstance()
}()
}

View File

@ -1,7 +1,6 @@
package context
import (
"context"
"regexp"
"time"
@ -16,7 +15,6 @@ import (
"github.com/karmada-io/karmada/pkg/resourceinterpreter"
"github.com/karmada-io/karmada/pkg/sharedcli/ratelimiterflag"
"github.com/karmada-io/karmada/pkg/util/fedinformer/genericmanager"
"github.com/karmada-io/karmada/pkg/util/memberclusterinformer"
"github.com/karmada-io/karmada/pkg/util/objectwatcher"
"github.com/karmada-io/karmada/pkg/util/overridemanager"
)
@ -90,7 +88,6 @@ type Options struct {
// Context defines the context object for controller.
type Context struct {
Mgr controllerruntime.Manager
Ctx context.Context
ObjectWatcher objectwatcher.ObjectWatcher
Opts Options
StopChan <-chan struct{}
@ -99,7 +96,6 @@ type Context struct {
OverrideManager overridemanager.OverrideManager
ControlPlaneInformerManager genericmanager.SingleClusterInformerManager
ResourceInterpreter resourceinterpreter.ResourceInterpreter
MemberClusterInformer memberclusterinformer.MemberClusterInformer
}
// IsControllerEnabled check if a specified controller enabled or not.

View File

@ -3,7 +3,6 @@ package execution
import (
"context"
"fmt"
"reflect"
"time"
corev1 "k8s.io/api/core/v1"
@ -11,9 +10,7 @@ import (
"k8s.io/apimachinery/pkg/api/meta"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/errors"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/tools/record"
"k8s.io/client-go/util/retry"
"k8s.io/klog/v2"
@ -30,10 +27,9 @@ import (
"github.com/karmada-io/karmada/pkg/metrics"
"github.com/karmada-io/karmada/pkg/sharedcli/ratelimiterflag"
"github.com/karmada-io/karmada/pkg/util"
"github.com/karmada-io/karmada/pkg/util/fedinformer"
"github.com/karmada-io/karmada/pkg/util/fedinformer/genericmanager"
"github.com/karmada-io/karmada/pkg/util/fedinformer/keys"
"github.com/karmada-io/karmada/pkg/util/helper"
"github.com/karmada-io/karmada/pkg/util/memberclusterinformer"
"github.com/karmada-io/karmada/pkg/util/names"
"github.com/karmada-io/karmada/pkg/util/objectwatcher"
)
@ -46,19 +42,12 @@ const (
// Controller is to sync Work.
type Controller struct {
client.Client // used to operate Work resources.
Ctx context.Context
EventRecorder record.EventRecorder
RESTMapper meta.RESTMapper
ObjectWatcher objectwatcher.ObjectWatcher
PredicateFunc predicate.Predicate
InformerManager genericmanager.MultiClusterInformerManager
RatelimiterOptions ratelimiterflag.Options
// Extend execution controller like work status controller to handle event from member cluster.
worker util.AsyncWorker // worker process resources periodic from rateLimitingQueue.
// ConcurrentExecutionSyncs is the number of object that are allowed to sync concurrently.
ConcurrentWorkSyncs int
eventHandler cache.ResourceEventHandler // eventHandler knows how to handle events from the member cluster.
StopChan <-chan struct{}
MemberClusterInformer memberclusterinformer.MemberClusterInformer
}
// Reconcile performs a full reconciliation for the object referred to by the Request.
@ -77,12 +66,6 @@ func (c *Controller) Reconcile(ctx context.Context, req controllerruntime.Reques
return controllerruntime.Result{Requeue: true}, err
}
// Enqueue to try removing object in member cluster if work deleted.
if !work.DeletionTimestamp.IsZero() {
c.worker.Add(req)
return controllerruntime.Result{}, nil
}
clusterName, err := names.GetClusterName(work.Namespace)
if err != nil {
klog.Errorf("Failed to get member cluster name for work %s/%s", work.Namespace, work.Name)
@ -95,76 +78,31 @@ func (c *Controller) Reconcile(ctx context.Context, req controllerruntime.Reques
return controllerruntime.Result{Requeue: true}, err
}
if !work.DeletionTimestamp.IsZero() {
// Abort deleting workload if cluster is unready when unjoining cluster, otherwise the unjoin process will be failed.
if util.IsClusterReady(&cluster.Status) {
err := c.tryDeleteWorkload(clusterName, work)
if err != nil {
klog.Errorf("Failed to delete work %v, namespace is %v, err is %v", work.Name, work.Namespace, err)
return controllerruntime.Result{Requeue: true}, err
}
} else if cluster.DeletionTimestamp.IsZero() { // cluster is unready, but not terminating
return controllerruntime.Result{Requeue: true}, fmt.Errorf("cluster(%s) not ready", cluster.Name)
}
return c.removeFinalizer(work)
}
if !util.IsClusterReady(&cluster.Status) {
klog.Errorf("Stop sync work(%s/%s) for cluster(%s) as cluster not ready.", work.Namespace, work.Name, cluster.Name)
return controllerruntime.Result{Requeue: true}, fmt.Errorf("cluster(%s) not ready", cluster.Name)
}
err = c.MemberClusterInformer.BuildResourceInformers(cluster, work, c.eventHandler)
if err != nil {
return controllerruntime.Result{Requeue: true}, err
}
c.worker.Add(req)
return controllerruntime.Result{}, nil
}
func (c *Controller) onUpdate(old, cur interface{}) {
oldObj := old.(*unstructured.Unstructured)
curObj := cur.(*unstructured.Unstructured)
oldObjCopy := oldObj.DeepCopy()
curObjCopy := curObj.DeepCopy()
clusterName, _ := names.GetClusterName(curObjCopy.GetLabels()[workv1alpha1.WorkNamespaceLabel])
if clusterName == "" {
return
}
if c.ObjectWatcher.NeedsUpdate(clusterName, oldObjCopy, curObjCopy) {
c.worker.Enqueue(curObj)
}
}
func (c *Controller) onDelete(obj interface{}) {
curObj := obj.(*unstructured.Unstructured)
c.worker.Enqueue(curObj)
}
// RunWorkQueue initializes worker and run it, worker will process resource asynchronously.
func (c *Controller) RunWorkQueue() {
workerOptions := util.Options{
Name: "work-execution",
KeyFunc: generateKey,
ReconcileFunc: c.syncWork,
}
c.worker = util.NewAsyncWorker(workerOptions)
c.worker.Run(c.ConcurrentWorkSyncs, c.StopChan)
}
// generateKey generates a key from obj, the key contains cluster, GVK, namespace and name.
func generateKey(obj interface{}) (util.QueueKey, error) {
resource, ok := obj.(*unstructured.Unstructured)
if !ok {
return nil, fmt.Errorf("object is not unstructured")
}
workName := util.GetLabelValue(resource.GetLabels(), workv1alpha1.WorkNameLabel)
workNamespace := util.GetLabelValue(resource.GetLabels(), workv1alpha1.WorkNamespaceLabel)
if workName == "" || workNamespace == "" {
return nil, nil
}
return controllerruntime.Request{NamespacedName: types.NamespacedName{Namespace: workNamespace, Name: workName}}, nil
return c.syncWork(clusterName, work)
}
// SetupWithManager creates a controller and register to controller manager.
func (c *Controller) SetupWithManager(mgr controllerruntime.Manager) error {
c.eventHandler = fedinformer.NewHandlerOnEvents(nil, c.onUpdate, c.onDelete)
return controllerruntime.NewControllerManagedBy(mgr).
For(&workv1alpha1.Work{}, builder.WithPredicates(c.PredicateFunc)).
WithEventFilter(predicate.GenerationChangedPredicate{}).
@ -174,71 +112,20 @@ func (c *Controller) SetupWithManager(mgr controllerruntime.Manager) error {
Complete(c)
}
func (c *Controller) syncWork(key util.QueueKey) error {
req, ok := key.(controllerruntime.Request)
if !ok {
klog.Warningf("Skip sync work for key(%+v) is not controllerruntime.Request type", key)
return nil
}
klog.Infof("Begin to sync work %s/%s", req.Namespace, req.Name)
func (c *Controller) syncWork(clusterName string, work *workv1alpha1.Work) (controllerruntime.Result, error) {
start := time.Now()
work := &workv1alpha1.Work{}
if err := c.Client.Get(c.Ctx, req.NamespacedName, work); err != nil {
// The resource may no longer exist, in which case we stop processing.
if apierrors.IsNotFound(err) {
return nil
}
return err
}
clusterName, err := names.GetClusterName(work.Namespace)
if err != nil {
klog.Errorf("Failed to get member cluster name for work %s/%s", work.Namespace, work.Name)
return err
}
cluster, err := util.GetCluster(c.Client, clusterName)
if err != nil {
klog.Errorf("Failed to get the given member cluster %s", clusterName)
return err
}
if !work.DeletionTimestamp.IsZero() {
// Abort deleting workload if cluster is unready when unjoining cluster, otherwise the unjoin process will be failed.
if util.IsClusterReady(&cluster.Status) {
err := c.tryDeleteWorkload(clusterName, work)
if err != nil {
klog.Errorf("Failed to delete work %v, namespace is %v, err is %v", work.Name, work.Namespace, err)
return err
}
} else if cluster.DeletionTimestamp.IsZero() { // cluster is unready, but not terminating
return fmt.Errorf("cluster(%s) not ready", cluster.Name)
}
return c.removeFinalizer(work)
}
if !util.IsClusterReady(&cluster.Status) {
klog.Errorf("Stop sync work(%s/%s) for cluster(%s) as cluster not ready.", work.Namespace, work.Name, cluster.Name)
return fmt.Errorf("cluster(%s) not ready", cluster.Name)
}
err = c.syncToClusters(clusterName, work)
err := c.syncToClusters(clusterName, work)
metrics.ObserveSyncWorkloadLatency(err, start)
if err != nil {
msg := fmt.Sprintf("Failed to sync work(%s) to cluster(%s): %v", work.Name, clusterName, err)
klog.Errorf(msg)
c.EventRecorder.Event(work, corev1.EventTypeWarning, events.EventReasonSyncWorkloadFailed, msg)
return err
return controllerruntime.Result{Requeue: true}, err
}
msg := fmt.Sprintf("Sync work(%s) to cluster(%s) successfully.", work.Name, clusterName)
msg := fmt.Sprintf("Sync work (%s) to cluster(%s) successful.", work.Name, clusterName)
klog.V(4).Infof(msg)
c.EventRecorder.Event(work, corev1.EventTypeNormal, events.EventReasonSyncWorkloadSucceed, msg)
return nil
return controllerruntime.Result{}, nil
}
// tryDeleteWorkload tries to delete resource in the given member cluster.
@ -257,7 +144,7 @@ func (c *Controller) tryDeleteWorkload(clusterName string, work *workv1alpha1.Wo
return err
}
clusterObj, err := c.MemberClusterInformer.GetObjectFromCache(fedKey)
clusterObj, err := helper.GetObjectFromCache(c.RESTMapper, c.InformerManager, fedKey)
if err != nil {
if apierrors.IsNotFound(err) {
return nil
@ -267,8 +154,7 @@ func (c *Controller) tryDeleteWorkload(clusterName string, work *workv1alpha1.Wo
}
// Avoid deleting resources that not managed by karmada.
if util.GetLabelValue(clusterObj.GetLabels(), workv1alpha1.WorkNameLabel) != util.GetLabelValue(workload.GetLabels(), workv1alpha1.WorkNameLabel) ||
util.GetLabelValue(clusterObj.GetLabels(), workv1alpha1.WorkNamespaceLabel) != util.GetLabelValue(workload.GetLabels(), workv1alpha1.WorkNamespaceLabel) {
if util.GetLabelValue(clusterObj.GetLabels(), workv1alpha1.WorkNameLabel) != util.GetLabelValue(workload.GetLabels(), workv1alpha1.WorkNameLabel) {
klog.Infof("Abort deleting the resource(kind=%s, %s/%s) exists in cluster %v but not managed by karmada", clusterObj.GetKind(), clusterObj.GetNamespace(), clusterObj.GetName(), clusterName)
return nil
}
@ -284,17 +170,17 @@ func (c *Controller) tryDeleteWorkload(clusterName string, work *workv1alpha1.Wo
}
// removeFinalizer remove finalizer from the given Work
func (c *Controller) removeFinalizer(work *workv1alpha1.Work) error {
func (c *Controller) removeFinalizer(work *workv1alpha1.Work) (controllerruntime.Result, error) {
if !controllerutil.ContainsFinalizer(work, util.ExecutionControllerFinalizer) {
return nil
return controllerruntime.Result{}, nil
}
controllerutil.RemoveFinalizer(work, util.ExecutionControllerFinalizer)
err := c.Client.Update(context.TODO(), work)
if err != nil {
return err
return controllerruntime.Result{Requeue: true}, err
}
return nil
return controllerruntime.Result{}, nil
}
// syncToClusters ensures that the state of the given object is synchronized to member clusters.
@ -324,7 +210,7 @@ func (c *Controller) syncToClusters(clusterName string, work *workv1alpha1.Work)
if len(errs) > 0 {
total := len(work.Spec.Workload.Manifests)
message := fmt.Sprintf("Failed to apply all manifests (%d/%d): %s", syncSucceedNum, total, errors.NewAggregate(errs).Error())
err := c.updateAppliedConditionIfNeed(work, metav1.ConditionFalse, "AppliedFailed", message)
err := c.updateAppliedCondition(work, metav1.ConditionFalse, "AppliedFailed", message)
if err != nil {
klog.Errorf("Failed to update applied status for given work %v, namespace is %v, err is %v", work.Name, work.Namespace, err)
errs = append(errs, err)
@ -332,7 +218,7 @@ func (c *Controller) syncToClusters(clusterName string, work *workv1alpha1.Work)
return errors.NewAggregate(errs)
}
err := c.updateAppliedConditionIfNeed(work, metav1.ConditionTrue, "AppliedSuccessful", "Manifest has been successfully applied")
err := c.updateAppliedCondition(work, metav1.ConditionTrue, "AppliedSuccessful", "Manifest has been successfully applied")
if err != nil {
klog.Errorf("Failed to update applied status for given work %v, namespace is %v, err is %v", work.Name, work.Namespace, err)
return err
@ -348,7 +234,7 @@ func (c *Controller) tryCreateOrUpdateWorkload(clusterName string, workload *uns
return err
}
clusterObj, err := c.MemberClusterInformer.GetObjectFromCache(fedKey)
clusterObj, err := helper.GetObjectFromCache(c.RESTMapper, c.InformerManager, fedKey)
if err != nil {
if !apierrors.IsNotFound(err) {
klog.Errorf("Failed to get resource %v from member cluster, err is %v ", workload.GetName(), err)
@ -370,8 +256,8 @@ func (c *Controller) tryCreateOrUpdateWorkload(clusterName string, workload *uns
return nil
}
// updateAppliedConditionIfNeed update the Applied condition for the given Work if the reason or message of Applied condition changed
func (c *Controller) updateAppliedConditionIfNeed(work *workv1alpha1.Work, status metav1.ConditionStatus, reason, message string) error {
// updateAppliedCondition update the Applied condition for the given Work
func (c *Controller) updateAppliedCondition(work *workv1alpha1.Work, status metav1.ConditionStatus, reason, message string) error {
newWorkAppliedCondition := metav1.Condition{
Type: workv1alpha1.WorkApplied,
Status: status,
@ -380,24 +266,7 @@ func (c *Controller) updateAppliedConditionIfNeed(work *workv1alpha1.Work, statu
LastTransitionTime: metav1.Now(),
}
// needUpdateCondition judges if the Applied condition needs to update.
needUpdateCondition := func() bool {
lastWorkAppliedCondition := meta.FindStatusCondition(work.Status.Conditions, workv1alpha1.WorkApplied).DeepCopy()
if lastWorkAppliedCondition != nil {
lastWorkAppliedCondition.LastTransitionTime = newWorkAppliedCondition.LastTransitionTime
return !reflect.DeepEqual(newWorkAppliedCondition, *lastWorkAppliedCondition)
}
return true
}
return retry.RetryOnConflict(retry.DefaultRetry, func() (err error) {
if !needUpdateCondition() {
return nil
}
meta.SetStatusCondition(&work.Status.Conditions, newWorkAppliedCondition)
updateErr := c.Status().Update(context.TODO(), work)
if updateErr == nil {

View File

@ -1,919 +0,0 @@
package execution
import (
"context"
"testing"
"time"
"github.com/stretchr/testify/assert"
corev1 "k8s.io/api/core/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/api/meta"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/types"
"k8s.io/client-go/dynamic"
dynamicfake "k8s.io/client-go/dynamic/fake"
"k8s.io/client-go/kubernetes/scheme"
"k8s.io/client-go/tools/record"
controllerruntime "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/client/fake"
clusterv1alpha1 "github.com/karmada-io/karmada/pkg/apis/cluster/v1alpha1"
workv1alpha1 "github.com/karmada-io/karmada/pkg/apis/work/v1alpha1"
workv1alpha2 "github.com/karmada-io/karmada/pkg/apis/work/v1alpha2"
interpreterfake "github.com/karmada-io/karmada/pkg/resourceinterpreter/fake"
"github.com/karmada-io/karmada/pkg/sharedcli/ratelimiterflag"
"github.com/karmada-io/karmada/pkg/util"
"github.com/karmada-io/karmada/pkg/util/fedinformer/genericmanager"
"github.com/karmada-io/karmada/pkg/util/gclient"
"github.com/karmada-io/karmada/pkg/util/helper"
"github.com/karmada-io/karmada/pkg/util/memberclusterinformer"
"github.com/karmada-io/karmada/pkg/util/objectwatcher"
testhelper "github.com/karmada-io/karmada/test/helper"
)
func TestExecutionController_RunWorkQueue(t *testing.T) {
c := Controller{
Client: fake.NewClientBuilder().WithScheme(gclient.NewSchema()).WithObjects(testhelper.NewClusterWithTypeAndStatus("cluster", clusterv1alpha1.ClusterConditionReady, metav1.ConditionFalse)).Build(),
PredicateFunc: helper.NewClusterPredicateOnAgent("test"),
RatelimiterOptions: ratelimiterflag.Options{},
}
injectMemberClusterInformer(&c, genericmanager.GetInstance(), util.NewClusterDynamicClientSetForAgent)
c.RunWorkQueue()
}
func TestExecutionController_Reconcile(t *testing.T) {
tests := []struct {
name string
c Controller
dFunc func(clusterName string, client client.Client) (*util.DynamicClusterClient, error)
work *workv1alpha1.Work
ns string
expectRes controllerruntime.Result
existErr bool
}{
{
name: "normal case",
c: Controller{
Client: fake.NewClientBuilder().WithScheme(gclient.NewSchema()).WithObjects(
&clusterv1alpha1.Cluster{
ObjectMeta: metav1.ObjectMeta{Name: "cluster"},
Spec: clusterv1alpha1.ClusterSpec{
APIEndpoint: "https://127.0.0.1",
SecretRef: &clusterv1alpha1.LocalSecretReference{Namespace: "ns1", Name: "secret1"},
InsecureSkipTLSVerification: true,
},
Status: clusterv1alpha1.ClusterStatus{
Conditions: []metav1.Condition{
{
Type: clusterv1alpha1.ClusterConditionReady,
Status: metav1.ConditionTrue,
},
},
},
},
&corev1.Secret{
ObjectMeta: metav1.ObjectMeta{Namespace: "ns1", Name: "secret1"},
Data: map[string][]byte{clusterv1alpha1.SecretTokenKey: []byte("token")},
}).Build(),
PredicateFunc: helper.NewClusterPredicateOnAgent("test"),
RatelimiterOptions: ratelimiterflag.Options{},
},
work: &workv1alpha1.Work{
ObjectMeta: metav1.ObjectMeta{
Name: "work",
Namespace: "karmada-es-cluster",
},
Status: workv1alpha1.WorkStatus{
Conditions: []metav1.Condition{
{
Type: workv1alpha1.WorkApplied,
Status: metav1.ConditionTrue,
},
},
},
},
dFunc: util.NewClusterDynamicClientSet,
ns: "karmada-es-cluster",
expectRes: controllerruntime.Result{},
existErr: false,
},
{
name: "work not exists",
c: Controller{
Client: fake.NewClientBuilder().WithScheme(gclient.NewSchema()).WithObjects(testhelper.NewClusterWithTypeAndStatus("cluster", clusterv1alpha1.ClusterConditionReady, metav1.ConditionTrue)).Build(),
PredicateFunc: helper.NewClusterPredicateOnAgent("test"),
RatelimiterOptions: ratelimiterflag.Options{},
},
work: &workv1alpha1.Work{
ObjectMeta: metav1.ObjectMeta{
Name: "work-1",
Namespace: "karmada-es-cluster",
},
Status: workv1alpha1.WorkStatus{
Conditions: []metav1.Condition{
{
Type: workv1alpha1.WorkApplied,
Status: metav1.ConditionTrue,
},
},
},
},
dFunc: util.NewClusterDynamicClientSetForAgent,
ns: "karmada-es-cluster",
expectRes: controllerruntime.Result{},
existErr: false,
},
{
name: "work's DeletionTimestamp isn't zero",
c: Controller{
Client: fake.NewClientBuilder().WithScheme(gclient.NewSchema()).WithObjects(testhelper.NewClusterWithTypeAndStatus("cluster", clusterv1alpha1.ClusterConditionReady, metav1.ConditionTrue)).Build(),
PredicateFunc: helper.NewClusterPredicateOnAgent("test"),
RatelimiterOptions: ratelimiterflag.Options{},
},
work: &workv1alpha1.Work{
ObjectMeta: metav1.ObjectMeta{
Name: "work",
Namespace: "karmada-es-cluster",
DeletionTimestamp: &metav1.Time{Time: time.Now()},
},
Status: workv1alpha1.WorkStatus{
Conditions: []metav1.Condition{
{
Type: workv1alpha1.WorkApplied,
Status: metav1.ConditionTrue,
},
},
},
},
dFunc: util.NewClusterDynamicClientSetForAgent,
ns: "karmada-es-cluster",
expectRes: controllerruntime.Result{},
existErr: false,
},
{
name: "failed to get cluster name",
c: Controller{
Client: fake.NewClientBuilder().WithScheme(gclient.NewSchema()).WithObjects(testhelper.NewClusterWithTypeAndStatus("cluster", clusterv1alpha1.ClusterConditionReady, metav1.ConditionTrue)).Build(),
PredicateFunc: helper.NewClusterPredicateOnAgent("test"),
RatelimiterOptions: ratelimiterflag.Options{},
},
work: &workv1alpha1.Work{
ObjectMeta: metav1.ObjectMeta{
Name: "work",
Namespace: "karmada-cluster",
},
Status: workv1alpha1.WorkStatus{
Conditions: []metav1.Condition{
{
Type: workv1alpha1.WorkApplied,
Status: metav1.ConditionTrue,
},
},
},
},
dFunc: util.NewClusterDynamicClientSetForAgent,
ns: "karmada-cluster",
expectRes: controllerruntime.Result{Requeue: true},
existErr: true,
},
{
name: "failed to get cluster",
c: Controller{
Client: fake.NewClientBuilder().WithScheme(gclient.NewSchema()).WithObjects(testhelper.NewClusterWithTypeAndStatus("cluster1", clusterv1alpha1.ClusterConditionReady, metav1.ConditionTrue)).Build(),
PredicateFunc: helper.NewClusterPredicateOnAgent("test"),
RatelimiterOptions: ratelimiterflag.Options{},
},
work: &workv1alpha1.Work{
ObjectMeta: metav1.ObjectMeta{
Name: "work",
Namespace: "karmada-es-cluster",
},
Status: workv1alpha1.WorkStatus{
Conditions: []metav1.Condition{
{
Type: workv1alpha1.WorkApplied,
Status: metav1.ConditionTrue,
},
},
},
},
dFunc: util.NewClusterDynamicClientSetForAgent,
ns: "karmada-es-cluster",
expectRes: controllerruntime.Result{Requeue: true},
existErr: true,
},
{
name: "cluster is not ready",
c: Controller{
Client: fake.NewClientBuilder().WithScheme(gclient.NewSchema()).WithObjects(testhelper.NewClusterWithTypeAndStatus("cluster", clusterv1alpha1.ClusterConditionReady, metav1.ConditionFalse)).Build(),
PredicateFunc: helper.NewClusterPredicateOnAgent("test"),
RatelimiterOptions: ratelimiterflag.Options{},
},
work: &workv1alpha1.Work{
ObjectMeta: metav1.ObjectMeta{
Name: "work",
Namespace: "karmada-es-cluster",
},
Status: workv1alpha1.WorkStatus{
Conditions: []metav1.Condition{
{
Type: workv1alpha1.WorkApplied,
Status: metav1.ConditionTrue,
},
},
},
},
dFunc: util.NewClusterDynamicClientSetForAgent,
ns: "karmada-es-cluster",
expectRes: controllerruntime.Result{Requeue: true},
existErr: true,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
injectMemberClusterInformer(&tt.c, genericmanager.GetInstance(), tt.dFunc)
tt.c.RunWorkQueue()
req := controllerruntime.Request{
NamespacedName: types.NamespacedName{
Name: "work",
Namespace: tt.ns,
},
}
if err := tt.c.Create(context.Background(), tt.work); err != nil {
t.Fatalf("Failed to create work: %v", err)
}
res, err := tt.c.Reconcile(context.Background(), req)
assert.Equal(t, tt.expectRes, res)
if tt.existErr {
assert.NotEmpty(t, err)
} else {
assert.Empty(t, err)
}
})
}
}
func Test_generateKey(t *testing.T) {
tests := []struct {
name string
obj *unstructured.Unstructured
expectRes controllerruntime.Request
existRes bool
existErr bool
}{
{
name: "normal case",
obj: &unstructured.Unstructured{
Object: map[string]interface{}{
"apiVersion": "v1",
"kind": "Pod",
"metadata": map[string]interface{}{
"name": "test",
"namespace": "default",
"labels": map[string]interface{}{
workv1alpha1.WorkNamespaceLabel: "karmada-es-cluster",
workv1alpha1.WorkNameLabel: "work",
},
},
},
},
expectRes: makeRequest("karmada-es-cluster", "work"),
existRes: true,
existErr: false,
},
{
name: "getWorkNameFromLabel failed",
obj: &unstructured.Unstructured{
Object: map[string]interface{}{
"apiVersion": "v1",
"kind": "Pod",
"metadata": map[string]interface{}{
"name": "test",
"namespace": "default",
"labels": map[string]interface{}{
workv1alpha1.WorkNamespaceLabel: "karmada-es-cluster",
},
},
},
},
existRes: false,
existErr: false,
},
{
name: "getWorkNamespaceFromLabel failed",
obj: &unstructured.Unstructured{
Object: map[string]interface{}{
"apiVersion": "v1",
"kind": "Pod",
"metadata": map[string]interface{}{
"name": "test",
"namespace": "default",
"labels": map[string]interface{}{
workv1alpha1.WorkNameLabel: "work",
},
},
},
},
existRes: false,
existErr: false,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
queueKey, err := generateKey(tt.obj)
if tt.existRes {
assert.Equal(t, tt.expectRes, queueKey)
} else {
assert.Empty(t, queueKey)
}
if tt.existErr {
assert.NotEmpty(t, err)
} else {
assert.Empty(t, err)
}
})
}
}
func newPodObj(name string, labels map[string]string) *unstructured.Unstructured {
labelsCopy := map[string]interface{}{}
for k, v := range labels {
labelsCopy[k] = v
}
return &unstructured.Unstructured{
Object: map[string]interface{}{
"apiVersion": "v1",
"kind": "Pod",
"metadata": map[string]interface{}{
"name": name,
"namespace": "default",
"labels": labelsCopy,
},
},
}
}
func newPod(name string, labels map[string]string) *corev1.Pod {
return &corev1.Pod{
ObjectMeta: metav1.ObjectMeta{
Name: name,
Namespace: "default",
Labels: labels,
},
}
}
func newWorkloadLabels(workNs, workName, workUID string) map[string]string {
labels := map[string]string{}
if workNs != "" {
labels[workv1alpha1.WorkNamespaceLabel] = workNs
}
if workName != "" {
labels[workv1alpha1.WorkNameLabel] = workName
}
if workUID != "" {
labels[workv1alpha2.WorkUIDLabel] = workUID
}
return labels
}
func newWorkWithStatus(status metav1.ConditionStatus, reason, message string, t time.Time) *workv1alpha1.Work {
return &workv1alpha1.Work{
ObjectMeta: metav1.ObjectMeta{
Namespace: "karmada-es-cluster",
Name: "work-name",
},
Status: workv1alpha1.WorkStatus{
Conditions: []metav1.Condition{
{
Type: workv1alpha1.WorkApplied,
Status: status,
Reason: reason,
Message: message,
LastTransitionTime: metav1.NewTime(t),
},
},
},
}
}
func TestExecutionController_tryDeleteWorkload(t *testing.T) {
raw := []byte(`
{
"apiVersion":"v1",
"kind":"Pod",
"metadata":{
"name":"pod",
"namespace":"default",
"labels":{
"work.karmada.io/name":"work-name",
"work.karmada.io/namespace":"karmada-es-cluster"
}
}
}`)
podName := "pod"
workName := "work-name"
workNs := "karmada-es-cluster"
workUID := "99f1f7c3-1f1f-4f1f-9f1f-7c3f1f1f9f1f"
clusterName := "cluster"
podGVR := corev1.SchemeGroupVersion.WithResource("pods")
tests := []struct {
name string
pod *corev1.Pod
work *workv1alpha1.Work
controllerWithoutInformer bool
expectedError bool
objectNeedDelete bool
}{
{
name: "failed to GetObjectFromCache, wrong InformerManager in ExecutionController",
pod: newPod(podName, newWorkloadLabels(workNs, workName, workUID)),
work: testhelper.NewWork(workName, workNs, workUID, raw),
controllerWithoutInformer: false,
expectedError: false,
objectNeedDelete: false,
},
{
name: "workload is not managed by karmada, without work-related labels",
pod: newPod(podName, nil),
work: testhelper.NewWork(workName, workNs, workUID, raw),
controllerWithoutInformer: true,
expectedError: false,
objectNeedDelete: false,
},
{
name: "workload is not related to current work",
pod: newPod(podName, newWorkloadLabels(workNs, "wrong-work", workUID)),
work: testhelper.NewWork(workName, workNs, workUID, raw),
controllerWithoutInformer: true,
expectedError: false,
objectNeedDelete: false,
},
{
name: "normal case",
pod: newPod(podName, newWorkloadLabels(workNs, workName, workUID)),
work: testhelper.NewWork(workName, workNs, workUID, raw),
controllerWithoutInformer: true,
expectedError: false,
objectNeedDelete: true,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
var dynamicClientSet *dynamicfake.FakeDynamicClient
if tt.pod != nil {
dynamicClientSet = dynamicfake.NewSimpleDynamicClient(scheme.Scheme, tt.pod)
} else {
dynamicClientSet = dynamicfake.NewSimpleDynamicClient(scheme.Scheme)
}
fedDynamicClientSet := dynamicClientSet
if !tt.controllerWithoutInformer {
fedDynamicClientSet = dynamicfake.NewSimpleDynamicClient(scheme.Scheme)
}
o := newExecutionOptions()
o.dynamicClientSet = fedDynamicClientSet
c := newExecutionController(ctx, o)
err := c.tryDeleteWorkload(clusterName, tt.work)
if tt.expectedError {
assert.NotEmpty(t, err)
} else {
assert.Empty(t, err)
}
_, err = dynamicClientSet.Resource(podGVR).Namespace("default").Get(context.Background(), "pod", metav1.GetOptions{})
if tt.objectNeedDelete {
assert.True(t, apierrors.IsNotFound(err))
} else {
assert.Empty(t, err)
}
})
}
}
func TestExecutionController_tryCreateOrUpdateWorkload(t *testing.T) {
podName := "pod"
workName := "work-name"
workNs := "karmada-es-cluster"
workUID := "99f1f7c3-1f1f-4f1f-9f1f-7c3f1f1f9f1f"
clusterName := "cluster"
podGVR := corev1.SchemeGroupVersion.WithResource("pods")
annotations := map[string]string{
workv1alpha2.ResourceConflictResolutionAnnotation: workv1alpha2.ResourceConflictResolutionOverwrite,
}
tests := []struct {
name string
pod *corev1.Pod
obj *unstructured.Unstructured
withAnnotation bool
expectedError bool
objectExist bool
labelMatch bool
}{
{
name: "created workload",
pod: newPod("wrong-pod", nil),
obj: newPodObj(podName, newWorkloadLabels(workNs, workName, workUID)),
withAnnotation: false,
expectedError: false,
objectExist: true,
labelMatch: true,
},
{
name: "failed to update object, overwrite conflict resolusion not set",
pod: newPod(podName, nil),
obj: newPodObj(podName, newWorkloadLabels(workNs, workName, workUID)),
withAnnotation: false,
expectedError: true,
objectExist: true,
labelMatch: false,
},
{
name: "updated object",
pod: newPod(podName, nil),
obj: newPodObj(podName, newWorkloadLabels(workNs, workName, workUID)),
withAnnotation: true,
expectedError: false,
objectExist: true,
labelMatch: true,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
dynamicClientSet := dynamicfake.NewSimpleDynamicClient(scheme.Scheme, tt.pod)
o := newExecutionOptions()
o.dynamicClientSet = dynamicClientSet
c := newExecutionController(ctx, o)
if tt.withAnnotation {
tt.obj.SetAnnotations(annotations)
}
err := c.tryCreateOrUpdateWorkload(clusterName, tt.obj)
if tt.expectedError {
assert.NotEmpty(t, err)
} else {
assert.Empty(t, err)
}
resource, err := dynamicClientSet.Resource(podGVR).Namespace("default").Get(context.Background(), "pod", metav1.GetOptions{})
if tt.objectExist {
assert.Empty(t, err)
} else {
assert.True(t, apierrors.IsNotFound(err))
return
}
labels := map[string]string{
workv1alpha1.WorkNamespaceLabel: workNs,
workv1alpha1.WorkNameLabel: workName,
workv1alpha2.WorkUIDLabel: workUID,
}
if tt.labelMatch {
assert.Equal(t, resource.GetLabels(), labels)
} else {
assert.Empty(t, resource.GetLabels())
}
})
}
}
func TestExecutionController_updateAppliedConditionIfNeed(t *testing.T) {
baseTime := time.Now().Add(-time.Minute)
tests := []struct {
name string
work *workv1alpha1.Work
status metav1.ConditionStatus
reason string
message string
updated bool
}{
{
name: "update condition, from false to true",
work: newWorkWithStatus(metav1.ConditionFalse, "reason1", "message1", baseTime),
status: metav1.ConditionTrue,
reason: "",
message: "",
updated: true,
},
{
name: "update condition, from true to false",
work: newWorkWithStatus(metav1.ConditionTrue, "", "", baseTime),
status: metav1.ConditionFalse,
reason: "reason1",
message: "message1",
updated: true,
},
{
name: "update condition, for reason changed",
work: newWorkWithStatus(metav1.ConditionFalse, "reason1", "message1", baseTime),
status: metav1.ConditionFalse,
reason: "reason2",
message: "message1",
updated: true,
},
{
name: "update condition, for message changed",
work: newWorkWithStatus(metav1.ConditionFalse, "reason1", "message1", baseTime),
status: metav1.ConditionFalse,
reason: "reason1",
message: "message2",
updated: true,
},
{
name: "not update condition, for nothing changed",
work: newWorkWithStatus(metav1.ConditionFalse, "reason1", "message1", baseTime),
status: metav1.ConditionFalse,
reason: "reason1",
message: "message1",
updated: false,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
o := newExecutionOptions()
o.objects = append(o.objects, tt.work)
o.objectsWithStatus = append(o.objectsWithStatus, &workv1alpha1.Work{})
c := newExecutionController(ctx, o)
workOld := &workv1alpha1.Work{}
err := c.Get(ctx, types.NamespacedName{Namespace: tt.work.Namespace, Name: tt.work.Name}, workOld)
if err != nil {
t.Errorf("failed to get created work: %v", err)
return
}
t.Logf("Got work: %+v", *workOld)
appliedCopy := meta.FindStatusCondition(workOld.DeepCopy().Status.Conditions, workv1alpha1.WorkApplied).DeepCopy()
err = c.updateAppliedConditionIfNeed(workOld, tt.status, tt.reason, tt.message)
if err != nil {
t.Errorf("failed to update work: %v", err)
return
}
workNew := &workv1alpha1.Work{}
err = c.Get(ctx, types.NamespacedName{Namespace: tt.work.Namespace, Name: tt.work.Name}, workNew)
if err != nil {
t.Errorf("failed to get updated work: %v", err)
return
}
newApplied := meta.FindStatusCondition(workNew.Status.Conditions, workv1alpha1.WorkApplied)
if tt.updated {
if appliedCopy.Status != newApplied.Status {
assert.NotEqual(t, newApplied.LastTransitionTime, appliedCopy.LastTransitionTime)
}
appliedCopy.LastTransitionTime = newApplied.LastTransitionTime
assert.NotEqual(t, newApplied, appliedCopy)
} else {
assert.Equal(t, newApplied, appliedCopy)
}
})
}
}
func TestExecutionController_syncWork(t *testing.T) {
basePod := newPod("pod", nil)
workName := "work"
workNs := "karmada-es-cluster"
workUID := "93162d3c-ee8e-4995-9034-05f4d5d2c2b9"
podGVR := corev1.SchemeGroupVersion.WithResource("pods")
podRaw := []byte(`
{
"apiVersion":"v1",
"kind":"Pod",
"metadata":{
"name":"pod",
"namespace":"default",
"annotations":{
"work.karmada.io/conflict-resolution": "overwrite"
},
"labels":{
"work.karmada.io/name":"work",
"work.karmada.io/namespace":"karmada-es-cluster"
}
}
}`)
tests := []struct {
name string
workNamespace string
controllerWithoutInformer bool
expectedError bool
clusterNotReady bool
updated bool
}{
{
name: "failed to GetObjectFromCache, wrong InformerManager in ExecutionController",
workNamespace: workNs,
controllerWithoutInformer: false,
expectedError: true,
},
{
name: "obj not found in informer, wrong dynamicClientSet without pod",
workNamespace: workNs,
controllerWithoutInformer: true,
expectedError: false,
},
{
name: "workNamespace is zero",
workNamespace: "",
controllerWithoutInformer: true,
expectedError: true,
},
{
name: "failed to exec Client.Get, set wrong cluster name in work",
workNamespace: "karmada-es-wrong-cluster",
controllerWithoutInformer: true,
expectedError: true,
},
{
name: "cluster is not ready",
workNamespace: workNs,
controllerWithoutInformer: true,
expectedError: true,
clusterNotReady: true,
},
{
name: "normal case",
workNamespace: workNs,
controllerWithoutInformer: true,
expectedError: false,
updated: true,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
o := newExecutionOptions()
if tt.clusterNotReady {
o = newExecutionOptions(testhelper.NewClusterWithTypeAndStatus("cluster", clusterv1alpha1.ClusterConditionReady, metav1.ConditionFalse))
}
pod := basePod.DeepCopy()
dynamicClientSet := dynamicfake.NewSimpleDynamicClient(scheme.Scheme, pod)
if tt.controllerWithoutInformer {
o.dynamicClientSet = dynamicClientSet
}
work := testhelper.NewWork(workName, tt.workNamespace, workUID, podRaw)
o.objects = append(o.objects, work)
o.objectsWithStatus = append(o.objectsWithStatus, &workv1alpha1.Work{})
key := makeRequest(work.Namespace, work.Name)
c := newExecutionController(ctx, o)
err := c.syncWork(key)
if tt.expectedError {
assert.NotEmpty(t, err)
} else {
assert.Empty(t, err)
}
if tt.updated {
resource, err := dynamicClientSet.Resource(podGVR).Namespace(basePod.Namespace).Get(ctx, basePod.Name, metav1.GetOptions{})
if err != nil {
t.Fatalf("Failed to get pod: %v", err)
}
expectedLabels := newWorkloadLabels(workNs, workName, workUID)
assert.Equal(t, resource.GetLabels(), expectedLabels)
}
})
}
}
func injectMemberClusterInformer(c *Controller,
informerManager genericmanager.MultiClusterInformerManager,
clusterDynamicClientSetFunc func(clusterName string, client client.Client) (*util.DynamicClusterClient, error),
) {
r := memberclusterinformer.NewMemberClusterInformer(c.Client, newRESTMapper(), informerManager, metav1.Duration{}, clusterDynamicClientSetFunc)
c.MemberClusterInformer = r
}
func newRESTMapper() meta.RESTMapper {
m := meta.NewDefaultRESTMapper([]schema.GroupVersion{corev1.SchemeGroupVersion})
m.Add(corev1.SchemeGroupVersion.WithKind("Pod"), meta.RESTScopeNamespace)
return m
}
type executionOptions struct {
dynamicClientSet dynamic.Interface
cluster *clusterv1alpha1.Cluster
objects []client.Object
objectsWithStatus []client.Object
}
func newExecutionOptions(cluster ...*clusterv1alpha1.Cluster) *executionOptions {
o := &executionOptions{}
if len(cluster) > 0 {
o.cluster = cluster[0]
} else {
o.cluster = testhelper.NewClusterWithTypeAndStatus("cluster", clusterv1alpha1.ClusterConditionReady, metav1.ConditionTrue)
}
o.objects = append(o.objects, o.cluster)
o.objectsWithStatus = append(o.objectsWithStatus, &clusterv1alpha1.Cluster{})
return o
}
func newExecutionController(ctx context.Context, opt *executionOptions) Controller {
c := Controller{
Ctx: ctx,
Client: fake.NewClientBuilder().WithScheme(gclient.NewSchema()).
WithObjects(opt.objects...).WithStatusSubresource(opt.objectsWithStatus...).Build(),
PredicateFunc: helper.NewClusterPredicateOnAgent("test"),
RatelimiterOptions: ratelimiterflag.Options{},
EventRecorder: record.NewFakeRecorder(1024),
}
informerManager := genericmanager.GetInstance()
clusterDynamicClientSetFunc := util.NewClusterDynamicClientSetForAgent
if opt.dynamicClientSet != nil {
clusterName := opt.cluster.Name
// Generate ResourceInterpreter and ObjectWatcher
resourceInterpreter := interpreterfake.NewFakeInterpreter()
clusterDynamicClientSetFunc = newClusterDynamicClientSetForAgent(clusterName, opt.dynamicClientSet)
c.ObjectWatcher = objectwatcher.NewObjectWatcher(c.Client, newRESTMapper(), clusterDynamicClientSetFunc, resourceInterpreter)
// Generate InformerManager
m := genericmanager.NewMultiClusterInformerManager(ctx.Done())
m.ForCluster(clusterName, opt.dynamicClientSet, 0).Lister(corev1.SchemeGroupVersion.WithResource("pods")) // register pod informer
m.Start(clusterName)
m.WaitForCacheSync(clusterName)
informerManager = m
}
injectMemberClusterInformer(&c, informerManager, clusterDynamicClientSetFunc)
return c
}
func newClusterDynamicClientSetForAgent(clusterName string, dynamicClientSet dynamic.Interface) func(string, client.Client) (*util.DynamicClusterClient, error) {
return func(string, client.Client) (*util.DynamicClusterClient, error) {
return &util.DynamicClusterClient{
ClusterName: clusterName,
DynamicClientSet: dynamicClientSet,
}, nil
}
}
func makeRequest(workNamespace, workName string) controllerruntime.Request {
return controllerruntime.Request{
NamespacedName: types.NamespacedName{
Namespace: workNamespace,
Name: workName,
},
}
}

View File

@ -7,8 +7,9 @@ import (
corev1 "k8s.io/api/core/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/api/meta"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/tools/record"
@ -20,16 +21,19 @@ import (
"sigs.k8s.io/controller-runtime/pkg/controller"
"sigs.k8s.io/controller-runtime/pkg/predicate"
clusterv1alpha1 "github.com/karmada-io/karmada/pkg/apis/cluster/v1alpha1"
workv1alpha1 "github.com/karmada-io/karmada/pkg/apis/work/v1alpha1"
"github.com/karmada-io/karmada/pkg/events"
"github.com/karmada-io/karmada/pkg/resourceinterpreter"
"github.com/karmada-io/karmada/pkg/sharedcli/ratelimiterflag"
"github.com/karmada-io/karmada/pkg/util"
"github.com/karmada-io/karmada/pkg/util/fedinformer"
"github.com/karmada-io/karmada/pkg/util/fedinformer/genericmanager"
"github.com/karmada-io/karmada/pkg/util/fedinformer/keys"
"github.com/karmada-io/karmada/pkg/util/helper"
"github.com/karmada-io/karmada/pkg/util/memberclusterinformer"
"github.com/karmada-io/karmada/pkg/util/names"
"github.com/karmada-io/karmada/pkg/util/objectwatcher"
"github.com/karmada-io/karmada/pkg/util/restmapper"
)
// WorkStatusControllerName is the controller name that will be used when reporting events.
@ -37,17 +41,21 @@ const WorkStatusControllerName = "work-status-controller"
// WorkStatusController is to sync status of Work.
type WorkStatusController struct {
client.Client // used to operate Work resources.
EventRecorder record.EventRecorder
eventHandler cache.ResourceEventHandler // eventHandler knows how to handle events from the member cluster.
StopChan <-chan struct{}
worker util.AsyncWorker // worker process resources periodic from rateLimitingQueue.
client.Client // used to operate Work resources.
EventRecorder record.EventRecorder
RESTMapper meta.RESTMapper
InformerManager genericmanager.MultiClusterInformerManager
eventHandler cache.ResourceEventHandler // eventHandler knows how to handle events from the member cluster.
StopChan <-chan struct{}
worker util.AsyncWorker // worker process resources periodic from rateLimitingQueue.
// ConcurrentWorkStatusSyncs is the number of Work status that are allowed to sync concurrently.
ConcurrentWorkStatusSyncs int
PredicateFunc predicate.Predicate
RateLimiterOptions ratelimiterflag.Options
ResourceInterpreter resourceinterpreter.ResourceInterpreter
MemberClusterInformer memberclusterinformer.MemberClusterInformer
ConcurrentWorkStatusSyncs int
ObjectWatcher objectwatcher.ObjectWatcher
PredicateFunc predicate.Predicate
ClusterDynamicClientSetFunc func(clusterName string, client client.Client) (*util.DynamicClusterClient, error)
ClusterCacheSyncTimeout metav1.Duration
RateLimiterOptions ratelimiterflag.Options
ResourceInterpreter resourceinterpreter.ResourceInterpreter
}
// Reconcile performs a full reconciliation for the object referred to by the Request.
@ -91,25 +99,26 @@ func (c *WorkStatusController) Reconcile(ctx context.Context, req controllerrunt
return controllerruntime.Result{Requeue: true}, fmt.Errorf("cluster(%s) not ready", cluster.Name)
}
err = c.MemberClusterInformer.BuildResourceInformers(cluster, work, c.eventHandler)
return c.buildResourceInformers(cluster, work)
}
// buildResourceInformers builds informer dynamically for managed resources in member cluster.
// The created informer watches resource change and then sync to the relevant Work object.
func (c *WorkStatusController) buildResourceInformers(cluster *clusterv1alpha1.Cluster, work *workv1alpha1.Work) (controllerruntime.Result, error) {
err := c.registerInformersAndStart(cluster, work)
if err != nil {
klog.Errorf("Failed to register informer for Work %s/%s. Error: %v.", work.GetNamespace(), work.GetName(), err)
return controllerruntime.Result{Requeue: true}, err
}
return controllerruntime.Result{}, nil
}
func (c *WorkStatusController) onAdd(obj interface{}) {
curObj := obj.(runtime.Object)
c.worker.Enqueue(curObj)
}
func (c *WorkStatusController) onUpdate(old, cur interface{}) {
// Still need to compare the whole object because the interpreter is able to take any part of object to reflect status.
if !reflect.DeepEqual(old, cur) {
curObj := cur.(runtime.Object)
c.worker.Enqueue(curObj)
// getEventHandler return callback function that knows how to handle events from the member cluster.
func (c *WorkStatusController) getEventHandler() cache.ResourceEventHandler {
if c.eventHandler == nil {
c.eventHandler = fedinformer.NewHandlerOnAllEvents(c.worker.Enqueue)
}
return c.eventHandler
}
// RunWorkQueue initializes worker and run it, worker will process resource asynchronously.
@ -141,7 +150,7 @@ func generateKey(obj interface{}) (util.QueueKey, error) {
// getClusterNameFromLabel gets cluster name from ownerLabel, if label not exist, means resource is not created by karmada.
func getClusterNameFromLabel(resource *unstructured.Unstructured) (string, error) {
workNamespace := util.GetLabelValue(resource.GetLabels(), workv1alpha1.WorkNamespaceLabel)
if workNamespace == "" {
if len(workNamespace) == 0 {
klog.V(4).Infof("Ignore resource(%s/%s/%s) which not managed by karmada", resource.GetKind(), resource.GetNamespace(), resource.GetName())
return "", nil
}
@ -162,23 +171,17 @@ func (c *WorkStatusController) syncWorkStatus(key util.QueueKey) error {
return fmt.Errorf("invalid key")
}
klog.Infof("Begin to sync status to Work of object(%s)", fedKey.String())
observedObj, err := c.MemberClusterInformer.GetObjectFromCache(fedKey)
observedObj, err := helper.GetObjectFromCache(c.RESTMapper, c.InformerManager, fedKey)
if err != nil {
if apierrors.IsNotFound(err) {
executionSpace := names.GenerateExecutionSpaceName(fedKey.Cluster)
workName := names.GenerateWorkName(fedKey.Kind, fedKey.Name, fedKey.Namespace)
klog.Warningf("Skip reflecting %s(%s/%s) status to Work(%s/%s) for not applied successfully yet.",
fedKey.Kind, fedKey.Namespace, fedKey.Name, executionSpace, workName)
return nil
return c.handleDeleteEvent(fedKey)
}
return err
}
workNamespace := util.GetLabelValue(observedObj.GetLabels(), workv1alpha1.WorkNamespaceLabel)
workName := util.GetLabelValue(observedObj.GetLabels(), workv1alpha1.WorkNameLabel)
if workNamespace == "" || workName == "" {
if len(workNamespace) == 0 || len(workName) == 0 {
klog.Infof("Ignore object(%s) which not managed by karmada.", fedKey.String())
return nil
}
@ -199,17 +202,85 @@ func (c *WorkStatusController) syncWorkStatus(key util.QueueKey) error {
return nil
}
if !helper.IsResourceApplied(&workObject.Status) {
err = fmt.Errorf("work hadn't been applied yet")
klog.Errorf("Failed to reflect status of %s(%s/%s) to Work(%s/%s): %v.",
observedObj.GetKind(), observedObj.GetNamespace(), observedObj.GetName(), workNamespace, workName, err)
desiredObj, err := c.getRawManifest(workObject.Spec.Workload.Manifests, observedObj)
if err != nil {
return err
}
clusterName, err := names.GetClusterName(workNamespace)
if err != nil {
klog.Errorf("Failed to get member cluster name: %v", err)
return err
}
// we should check if the observed status is consistent with the declaration to prevent accidental changes made
// in member clusters.
needUpdate, err := c.ObjectWatcher.NeedsUpdate(clusterName, desiredObj, observedObj)
if err != nil {
return err
}
if needUpdate {
if err := c.ObjectWatcher.Update(clusterName, desiredObj, observedObj); err != nil {
klog.Errorf("Update %s failed: %v", fedKey.String(), err)
return err
}
// We can't return even after a success updates, because that might lose the chance to collect status.
// Not all updates are real, they might be no change, in that case there will be no more event for this update,
// this usually happens with those resources not enables 'metadata.generation', like 'Service'.
// When a Service's status changes, it's 'metadata.resourceVersion' will be increased, but 'metadata.generation'
// not increased(defaults to 0), the ObjectWatcher can't easily tell what happened to the object, so ObjectWatcher
// also needs to update again. The update operation will be a non-operation if the event triggered by Service's
// status changes.
}
klog.Infof("reflecting %s(%s/%s) status to Work(%s/%s)", observedObj.GetKind(), observedObj.GetNamespace(), observedObj.GetName(), workNamespace, workName)
return c.reflectStatus(workObject, observedObj)
}
func (c *WorkStatusController) handleDeleteEvent(key keys.FederatedKey) error {
executionSpace := names.GenerateExecutionSpaceName(key.Cluster)
// Given the workload might has been deleted from informer cache, so that we can't get work object by it's label,
// we have to get work by naming rule as the work's name is generated by the workload's kind, name and namespace.
workName := names.GenerateWorkName(key.Kind, key.Name, key.Namespace)
work := &workv1alpha1.Work{}
if err := c.Client.Get(context.TODO(), client.ObjectKey{Namespace: executionSpace, Name: workName}, work); err != nil {
// stop processing as the work object has been removed, assume it's a normal delete operation.
if apierrors.IsNotFound(err) {
return nil
}
klog.Errorf("Failed to get Work from cache: %v", err)
return err
}
// stop processing as the work object being deleting.
if !work.DeletionTimestamp.IsZero() {
return nil
}
return c.recreateResourceIfNeeded(work, key)
}
func (c *WorkStatusController) recreateResourceIfNeeded(work *workv1alpha1.Work, workloadKey keys.FederatedKey) error {
for _, rawManifest := range work.Spec.Workload.Manifests {
manifest := &unstructured.Unstructured{}
if err := manifest.UnmarshalJSON(rawManifest.Raw); err != nil {
return err
}
desiredGVK := schema.FromAPIVersionAndKind(manifest.GetAPIVersion(), manifest.GetKind())
if reflect.DeepEqual(desiredGVK, workloadKey.GroupVersionKind()) &&
manifest.GetNamespace() == workloadKey.Namespace &&
manifest.GetName() == workloadKey.Name {
klog.Infof("recreating %s", workloadKey.String())
return c.ObjectWatcher.Create(workloadKey.Cluster, manifest)
}
}
return nil
}
// reflectStatus grabs cluster object's running status then updates to its owner object(Work).
func (c *WorkStatusController) reflectStatus(work *workv1alpha1.Work, clusterObj *unstructured.Unstructured) error {
statusRaw, err := c.ResourceInterpreter.ReflectStatus(clusterObj)
@ -303,10 +374,109 @@ func (c *WorkStatusController) mergeStatus(_ []workv1alpha1.ManifestStatus, newS
return []workv1alpha1.ManifestStatus{newStatus}
}
func (c *WorkStatusController) getRawManifest(manifests []workv1alpha1.Manifest, clusterObj *unstructured.Unstructured) (*unstructured.Unstructured, error) {
for _, rawManifest := range manifests {
manifest := &unstructured.Unstructured{}
if err := manifest.UnmarshalJSON(rawManifest.Raw); err != nil {
return nil, err
}
if manifest.GetAPIVersion() == clusterObj.GetAPIVersion() &&
manifest.GetKind() == clusterObj.GetKind() &&
manifest.GetNamespace() == clusterObj.GetNamespace() &&
manifest.GetName() == clusterObj.GetName() {
return manifest, nil
}
}
return nil, fmt.Errorf("no such manifest exist")
}
// registerInformersAndStart builds informer manager for cluster if it doesn't exist, then constructs informers for gvr
// and start it.
func (c *WorkStatusController) registerInformersAndStart(cluster *clusterv1alpha1.Cluster, work *workv1alpha1.Work) error {
singleClusterInformerManager, err := c.getSingleClusterManager(cluster)
if err != nil {
return err
}
gvrTargets, err := c.getGVRsFromWork(work)
if err != nil {
return err
}
allSynced := true
for gvr := range gvrTargets {
if !singleClusterInformerManager.IsInformerSynced(gvr) || !singleClusterInformerManager.IsHandlerExist(gvr, c.getEventHandler()) {
allSynced = false
singleClusterInformerManager.ForResource(gvr, c.getEventHandler())
}
}
if allSynced {
return nil
}
c.InformerManager.Start(cluster.Name)
if err := func() error {
synced := c.InformerManager.WaitForCacheSyncWithTimeout(cluster.Name, c.ClusterCacheSyncTimeout.Duration)
if synced == nil {
return fmt.Errorf("no informerFactory for cluster %s exist", cluster.Name)
}
for gvr := range gvrTargets {
if !synced[gvr] {
return fmt.Errorf("informer for %s hasn't synced", gvr)
}
}
return nil
}(); err != nil {
klog.Errorf("Failed to sync cache for cluster: %s, error: %v", cluster.Name, err)
c.InformerManager.Stop(cluster.Name)
return err
}
return nil
}
// getGVRsFromWork traverses the manifests in work to find groupVersionResource list.
func (c *WorkStatusController) getGVRsFromWork(work *workv1alpha1.Work) (map[schema.GroupVersionResource]bool, error) {
gvrTargets := map[schema.GroupVersionResource]bool{}
for _, manifest := range work.Spec.Workload.Manifests {
workload := &unstructured.Unstructured{}
err := workload.UnmarshalJSON(manifest.Raw)
if err != nil {
klog.Errorf("Failed to unmarshal workload. Error: %v.", err)
return nil, err
}
gvr, err := restmapper.GetGroupVersionResource(c.RESTMapper, workload.GroupVersionKind())
if err != nil {
klog.Errorf("Failed to get GVR from GVK for resource %s/%s. Error: %v.", workload.GetNamespace(), workload.GetName(), err)
return nil, err
}
gvrTargets[gvr] = true
}
return gvrTargets, nil
}
// getSingleClusterManager gets singleClusterInformerManager with clusterName.
// If manager is not exist, create it, otherwise gets it from map.
func (c *WorkStatusController) getSingleClusterManager(cluster *clusterv1alpha1.Cluster) (genericmanager.SingleClusterInformerManager, error) {
// TODO(chenxianpao): If cluster A is removed, then a new cluster that name also is A joins karmada,
// the cache in informer manager should be updated.
singleClusterInformerManager := c.InformerManager.GetSingleClusterManager(cluster.Name)
if singleClusterInformerManager == nil {
dynamicClusterClient, err := c.ClusterDynamicClientSetFunc(cluster.Name, c.Client)
if err != nil {
klog.Errorf("Failed to build dynamic cluster client for cluster %s.", cluster.Name)
return nil, err
}
singleClusterInformerManager = c.InformerManager.ForCluster(dynamicClusterClient.ClusterName, dynamicClusterClient.DynamicClientSet, 0)
}
return singleClusterInformerManager, nil
}
// SetupWithManager creates a controller and register to controller manager.
func (c *WorkStatusController) SetupWithManager(mgr controllerruntime.Manager) error {
c.eventHandler = fedinformer.NewHandlerOnEvents(c.onAdd, c.onUpdate, nil)
return controllerruntime.NewControllerManagedBy(mgr).
For(&workv1alpha1.Work{}, builder.WithPredicates(c.PredicateFunc)).
WithOptions(controller.Options{

View File

@ -20,7 +20,6 @@ import (
kubernetesfake "k8s.io/client-go/kubernetes/fake"
"k8s.io/client-go/kubernetes/scheme"
controllerruntime "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/client/fake"
clusterv1alpha1 "github.com/karmada-io/karmada/pkg/apis/cluster/v1alpha1"
@ -29,17 +28,34 @@ import (
"github.com/karmada-io/karmada/pkg/sharedcli/ratelimiterflag"
"github.com/karmada-io/karmada/pkg/util"
"github.com/karmada-io/karmada/pkg/util/fedinformer/genericmanager"
"github.com/karmada-io/karmada/pkg/util/fedinformer/keys"
"github.com/karmada-io/karmada/pkg/util/gclient"
"github.com/karmada-io/karmada/pkg/util/helper"
"github.com/karmada-io/karmada/pkg/util/memberclusterinformer"
"github.com/karmada-io/karmada/pkg/util/objectwatcher"
testhelper "github.com/karmada-io/karmada/test/helper"
)
func newCluster(name string, clusterType string, clusterStatus metav1.ConditionStatus) *clusterv1alpha1.Cluster {
return &clusterv1alpha1.Cluster{
ObjectMeta: metav1.ObjectMeta{
Name: name,
},
Spec: clusterv1alpha1.ClusterSpec{},
Status: clusterv1alpha1.ClusterStatus{
Conditions: []metav1.Condition{
{
Type: clusterType,
Status: clusterStatus,
},
},
},
}
}
func TestWorkStatusController_Reconcile(t *testing.T) {
tests := []struct {
name string
c WorkStatusController
dFunc func(clusterName string, client client.Client) (*util.DynamicClusterClient, error)
work *workv1alpha1.Work
ns string
expectRes controllerruntime.Result
@ -69,8 +85,11 @@ func TestWorkStatusController_Reconcile(t *testing.T) {
ObjectMeta: metav1.ObjectMeta{Namespace: "ns1", Name: "secret1"},
Data: map[string][]byte{clusterv1alpha1.SecretTokenKey: []byte("token")},
}).Build(),
PredicateFunc: helper.NewClusterPredicateOnAgent("test"),
RateLimiterOptions: ratelimiterflag.Options{},
InformerManager: genericmanager.GetInstance(),
PredicateFunc: helper.NewClusterPredicateOnAgent("test"),
ClusterDynamicClientSetFunc: util.NewClusterDynamicClientSet,
ClusterCacheSyncTimeout: metav1.Duration{},
RateLimiterOptions: ratelimiterflag.Options{},
},
work: &workv1alpha1.Work{
ObjectMeta: metav1.ObjectMeta{
@ -86,7 +105,6 @@ func TestWorkStatusController_Reconcile(t *testing.T) {
},
},
},
dFunc: util.NewClusterDynamicClientSet,
ns: "karmada-es-cluster",
expectRes: controllerruntime.Result{},
existErr: false,
@ -94,9 +112,12 @@ func TestWorkStatusController_Reconcile(t *testing.T) {
{
name: "work not exists",
c: WorkStatusController{
Client: fake.NewClientBuilder().WithScheme(gclient.NewSchema()).WithObjects(testhelper.NewClusterWithTypeAndStatus("cluster", clusterv1alpha1.ClusterConditionReady, metav1.ConditionTrue)).Build(),
PredicateFunc: helper.NewClusterPredicateOnAgent("test"),
RateLimiterOptions: ratelimiterflag.Options{},
Client: fake.NewClientBuilder().WithScheme(gclient.NewSchema()).WithObjects(newCluster("cluster", clusterv1alpha1.ClusterConditionReady, metav1.ConditionTrue)).Build(),
InformerManager: genericmanager.GetInstance(),
PredicateFunc: helper.NewClusterPredicateOnAgent("test"),
ClusterDynamicClientSetFunc: util.NewClusterDynamicClientSetForAgent,
ClusterCacheSyncTimeout: metav1.Duration{},
RateLimiterOptions: ratelimiterflag.Options{},
},
work: &workv1alpha1.Work{
ObjectMeta: metav1.ObjectMeta{
@ -112,7 +133,6 @@ func TestWorkStatusController_Reconcile(t *testing.T) {
},
},
},
dFunc: util.NewClusterDynamicClientSetForAgent,
ns: "karmada-es-cluster",
expectRes: controllerruntime.Result{},
existErr: false,
@ -120,9 +140,12 @@ func TestWorkStatusController_Reconcile(t *testing.T) {
{
name: "work's DeletionTimestamp isn't zero",
c: WorkStatusController{
Client: fake.NewClientBuilder().WithScheme(gclient.NewSchema()).WithObjects(testhelper.NewClusterWithTypeAndStatus("cluster", clusterv1alpha1.ClusterConditionReady, metav1.ConditionTrue)).Build(),
PredicateFunc: helper.NewClusterPredicateOnAgent("test"),
RateLimiterOptions: ratelimiterflag.Options{},
Client: fake.NewClientBuilder().WithScheme(gclient.NewSchema()).WithObjects(newCluster("cluster", clusterv1alpha1.ClusterConditionReady, metav1.ConditionTrue)).Build(),
InformerManager: genericmanager.GetInstance(),
PredicateFunc: helper.NewClusterPredicateOnAgent("test"),
ClusterDynamicClientSetFunc: util.NewClusterDynamicClientSetForAgent,
ClusterCacheSyncTimeout: metav1.Duration{},
RateLimiterOptions: ratelimiterflag.Options{},
},
work: &workv1alpha1.Work{
ObjectMeta: metav1.ObjectMeta{
@ -139,7 +162,6 @@ func TestWorkStatusController_Reconcile(t *testing.T) {
},
},
},
dFunc: util.NewClusterDynamicClientSetForAgent,
ns: "karmada-es-cluster",
expectRes: controllerruntime.Result{},
existErr: false,
@ -147,9 +169,12 @@ func TestWorkStatusController_Reconcile(t *testing.T) {
{
name: "work's status is not applied",
c: WorkStatusController{
Client: fake.NewClientBuilder().WithScheme(gclient.NewSchema()).WithObjects(testhelper.NewClusterWithTypeAndStatus("cluster", clusterv1alpha1.ClusterConditionReady, metav1.ConditionTrue)).Build(),
PredicateFunc: helper.NewClusterPredicateOnAgent("test"),
RateLimiterOptions: ratelimiterflag.Options{},
Client: fake.NewClientBuilder().WithScheme(gclient.NewSchema()).WithObjects(newCluster("cluster", clusterv1alpha1.ClusterConditionReady, metav1.ConditionTrue)).Build(),
InformerManager: genericmanager.GetInstance(),
PredicateFunc: helper.NewClusterPredicateOnAgent("test"),
ClusterDynamicClientSetFunc: util.NewClusterDynamicClientSetForAgent,
ClusterCacheSyncTimeout: metav1.Duration{},
RateLimiterOptions: ratelimiterflag.Options{},
},
work: &workv1alpha1.Work{
ObjectMeta: metav1.ObjectMeta{
@ -165,7 +190,6 @@ func TestWorkStatusController_Reconcile(t *testing.T) {
},
},
},
dFunc: util.NewClusterDynamicClientSetForAgent,
ns: "karmada-es-cluster",
expectRes: controllerruntime.Result{},
existErr: false,
@ -173,9 +197,12 @@ func TestWorkStatusController_Reconcile(t *testing.T) {
{
name: "failed to get cluster name",
c: WorkStatusController{
Client: fake.NewClientBuilder().WithScheme(gclient.NewSchema()).WithObjects(testhelper.NewClusterWithTypeAndStatus("cluster", clusterv1alpha1.ClusterConditionReady, metav1.ConditionTrue)).Build(),
PredicateFunc: helper.NewClusterPredicateOnAgent("test"),
RateLimiterOptions: ratelimiterflag.Options{},
Client: fake.NewClientBuilder().WithScheme(gclient.NewSchema()).WithObjects(newCluster("cluster", clusterv1alpha1.ClusterConditionReady, metav1.ConditionTrue)).Build(),
InformerManager: genericmanager.GetInstance(),
PredicateFunc: helper.NewClusterPredicateOnAgent("test"),
ClusterDynamicClientSetFunc: util.NewClusterDynamicClientSetForAgent,
ClusterCacheSyncTimeout: metav1.Duration{},
RateLimiterOptions: ratelimiterflag.Options{},
},
work: &workv1alpha1.Work{
ObjectMeta: metav1.ObjectMeta{
@ -191,7 +218,6 @@ func TestWorkStatusController_Reconcile(t *testing.T) {
},
},
},
dFunc: util.NewClusterDynamicClientSetForAgent,
ns: "karmada-cluster",
expectRes: controllerruntime.Result{Requeue: true},
existErr: true,
@ -199,9 +225,12 @@ func TestWorkStatusController_Reconcile(t *testing.T) {
{
name: "failed to get cluster",
c: WorkStatusController{
Client: fake.NewClientBuilder().WithScheme(gclient.NewSchema()).WithObjects(testhelper.NewClusterWithTypeAndStatus("cluster1", clusterv1alpha1.ClusterConditionReady, metav1.ConditionTrue)).Build(),
PredicateFunc: helper.NewClusterPredicateOnAgent("test"),
RateLimiterOptions: ratelimiterflag.Options{},
Client: fake.NewClientBuilder().WithScheme(gclient.NewSchema()).WithObjects(newCluster("cluster1", clusterv1alpha1.ClusterConditionReady, metav1.ConditionTrue)).Build(),
InformerManager: genericmanager.GetInstance(),
PredicateFunc: helper.NewClusterPredicateOnAgent("test"),
ClusterDynamicClientSetFunc: util.NewClusterDynamicClientSetForAgent,
ClusterCacheSyncTimeout: metav1.Duration{},
RateLimiterOptions: ratelimiterflag.Options{},
},
work: &workv1alpha1.Work{
ObjectMeta: metav1.ObjectMeta{
@ -217,7 +246,6 @@ func TestWorkStatusController_Reconcile(t *testing.T) {
},
},
},
dFunc: util.NewClusterDynamicClientSetForAgent,
ns: "karmada-es-cluster",
expectRes: controllerruntime.Result{Requeue: true},
existErr: true,
@ -225,9 +253,12 @@ func TestWorkStatusController_Reconcile(t *testing.T) {
{
name: "cluster is not ready",
c: WorkStatusController{
Client: fake.NewClientBuilder().WithScheme(gclient.NewSchema()).WithObjects(testhelper.NewClusterWithTypeAndStatus("cluster", clusterv1alpha1.ClusterConditionReady, metav1.ConditionFalse)).Build(),
PredicateFunc: helper.NewClusterPredicateOnAgent("test"),
RateLimiterOptions: ratelimiterflag.Options{},
Client: fake.NewClientBuilder().WithScheme(gclient.NewSchema()).WithObjects(newCluster("cluster", clusterv1alpha1.ClusterConditionReady, metav1.ConditionFalse)).Build(),
InformerManager: genericmanager.GetInstance(),
PredicateFunc: helper.NewClusterPredicateOnAgent("test"),
ClusterDynamicClientSetFunc: util.NewClusterDynamicClientSetForAgent,
ClusterCacheSyncTimeout: metav1.Duration{},
RateLimiterOptions: ratelimiterflag.Options{},
},
work: &workv1alpha1.Work{
ObjectMeta: metav1.ObjectMeta{
@ -243,7 +274,6 @@ func TestWorkStatusController_Reconcile(t *testing.T) {
},
},
},
dFunc: util.NewClusterDynamicClientSetForAgent,
ns: "karmada-es-cluster",
expectRes: controllerruntime.Result{Requeue: true},
existErr: true,
@ -252,8 +282,6 @@ func TestWorkStatusController_Reconcile(t *testing.T) {
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
injectMemberClusterInformer(&tt.c, genericmanager.GetInstance(), tt.dFunc)
req := controllerruntime.Request{
NamespacedName: types.NamespacedName{
Name: "work",
@ -261,7 +289,7 @@ func TestWorkStatusController_Reconcile(t *testing.T) {
},
}
if err := tt.c.Create(context.Background(), tt.work); err != nil {
if err := tt.c.Client.Create(context.Background(), tt.work); err != nil {
t.Fatalf("Failed to create cluster: %v", err)
}
@ -276,15 +304,38 @@ func TestWorkStatusController_Reconcile(t *testing.T) {
}
}
func TestWorkStatusController_RunWorkQueue(t *testing.T) {
c := WorkStatusController{
Client: fake.NewClientBuilder().WithScheme(gclient.NewSchema()).WithObjects(testhelper.NewClusterWithTypeAndStatus("cluster", clusterv1alpha1.ClusterConditionReady, metav1.ConditionFalse)).Build(),
PredicateFunc: helper.NewClusterPredicateOnAgent("test"),
RateLimiterOptions: ratelimiterflag.Options{},
eventHandler: nil,
func TestWorkStatusController_getEventHandler(t *testing.T) {
opt := util.Options{
Name: "opt",
KeyFunc: nil,
ReconcileFunc: nil,
}
injectMemberClusterInformer(&c, genericmanager.GetInstance(), util.NewClusterDynamicClientSetForAgent)
c := WorkStatusController{
Client: fake.NewClientBuilder().WithScheme(gclient.NewSchema()).WithObjects(newCluster("cluster", clusterv1alpha1.ClusterConditionReady, metav1.ConditionFalse)).Build(),
InformerManager: genericmanager.GetInstance(),
PredicateFunc: helper.NewClusterPredicateOnAgent("test"),
ClusterDynamicClientSetFunc: util.NewClusterDynamicClientSetForAgent,
ClusterCacheSyncTimeout: metav1.Duration{},
RateLimiterOptions: ratelimiterflag.Options{},
eventHandler: nil,
worker: util.NewAsyncWorker(opt),
}
eventHandler := c.getEventHandler()
assert.NotEmpty(t, eventHandler)
}
func TestWorkStatusController_RunWorkQueue(t *testing.T) {
c := WorkStatusController{
Client: fake.NewClientBuilder().WithScheme(gclient.NewSchema()).WithObjects(newCluster("cluster", clusterv1alpha1.ClusterConditionReady, metav1.ConditionFalse)).Build(),
InformerManager: genericmanager.GetInstance(),
PredicateFunc: helper.NewClusterPredicateOnAgent("test"),
ClusterDynamicClientSetFunc: util.NewClusterDynamicClientSetForAgent,
ClusterCacheSyncTimeout: metav1.Duration{},
RateLimiterOptions: ratelimiterflag.Options{},
eventHandler: nil,
}
c.RunWorkQueue()
}
@ -493,7 +544,7 @@ func newPod(workNs, workName string, wrongLabel ...bool) *corev1.Pod {
}
func TestWorkStatusController_syncWorkStatus(t *testing.T) {
cluster := testhelper.NewClusterWithTypeAndStatus("cluster", clusterv1alpha1.ClusterConditionReady, metav1.ConditionFalse)
cluster := newCluster("cluster", clusterv1alpha1.ClusterConditionReady, metav1.ConditionFalse)
workName := "work"
workNs := "karmada-es-cluster"
workUID := "92345678-1234-5678-1234-567812345678"
@ -506,8 +557,18 @@ func TestWorkStatusController_syncWorkStatus(t *testing.T) {
controllerWithoutInformer bool
workWithRigntNS bool
expectedError bool
workWithDeletionTimestamp bool
wrongWorkNS bool
}{
{
name: "failed to exec NeedUpdate",
obj: newPodObj("karmada-es-cluster"),
pod: newPod(workNs, workName),
raw: []byte(`{"apiVersion":"v1","kind":"Pod","metadata":{"name":"pod","namespace":"default"}}`),
controllerWithoutInformer: true,
workWithRigntNS: true,
expectedError: true,
},
{
name: "invalid key, wrong WorkNamespaceLabel in obj",
obj: newPodObj("karmada-cluster"),
@ -552,6 +613,25 @@ func TestWorkStatusController_syncWorkStatus(t *testing.T) {
workWithRigntNS: false,
expectedError: false,
},
{
name: "failed to getRawManifest, wrong Manifests in work",
obj: newPodObj("karmada-es-cluster"),
pod: newPod(workNs, workName),
raw: []byte(`{"apiVersion":"v1","kind":"Pod","metadata":{"name":"pod1","namespace":"default"}}`),
controllerWithoutInformer: true,
workWithRigntNS: true,
expectedError: true,
},
{
name: "failed to exec GetClusterName, wrong workNamespace",
obj: newPodObj("karmada-es-cluster"),
pod: newPod(workNs, workName),
raw: []byte(`{"apiVersion":"v1","kind":"Pod","metadata":{"name":"pod","namespace":"default"}}`),
controllerWithoutInformer: true,
workWithRigntNS: true,
expectedError: true,
wrongWorkNS: true,
},
}
for _, tt := range tests {
@ -598,30 +678,22 @@ func TestWorkStatusController_syncWorkStatus(t *testing.T) {
}
}
func injectMemberClusterInformer(c *WorkStatusController,
informerManager genericmanager.MultiClusterInformerManager,
clusterDynamicClientSetFunc func(clusterName string, client client.Client) (*util.DynamicClusterClient, error),
) {
r := memberclusterinformer.NewMemberClusterInformer(c.Client, newRESTMapper(), informerManager, metav1.Duration{}, clusterDynamicClientSetFunc)
c.MemberClusterInformer = r
}
func newRESTMapper() meta.RESTMapper {
m := meta.NewDefaultRESTMapper([]schema.GroupVersion{corev1.SchemeGroupVersion})
m.Add(corev1.SchemeGroupVersion.WithKind("Pod"), meta.RESTScopeNamespace)
return m
}
func newWorkStatusController(cluster *clusterv1alpha1.Cluster, dynamicClientSets ...*dynamicfake.FakeDynamicClient) WorkStatusController {
c := WorkStatusController{
Client: fake.NewClientBuilder().WithScheme(gclient.NewSchema()).WithObjects(cluster).Build(),
PredicateFunc: helper.NewClusterPredicateOnAgent("test"),
RateLimiterOptions: ratelimiterflag.Options{},
eventHandler: nil,
Client: fake.NewClientBuilder().WithScheme(gclient.NewSchema()).WithObjects(cluster).Build(),
InformerManager: genericmanager.GetInstance(),
PredicateFunc: helper.NewClusterPredicateOnAgent("test"),
ClusterDynamicClientSetFunc: util.NewClusterDynamicClientSetForAgent,
ClusterCacheSyncTimeout: metav1.Duration{},
RateLimiterOptions: ratelimiterflag.Options{},
eventHandler: nil,
RESTMapper: func() meta.RESTMapper {
m := meta.NewDefaultRESTMapper([]schema.GroupVersion{corev1.SchemeGroupVersion})
m.Add(corev1.SchemeGroupVersion.WithKind("Pod"), meta.RESTScopeNamespace)
return m
}(),
}
informerManager := genericmanager.GetInstance()
if len(dynamicClientSets) > 0 {
clusterName := cluster.Name
dynamicClientSet := dynamicClientSets[0]
@ -635,22 +707,170 @@ func newWorkStatusController(cluster *clusterv1alpha1.Cluster, dynamicClientSets
serviceLister := sharedFactory.Core().V1().Services().Lister()
c.ResourceInterpreter = resourceinterpreter.NewResourceInterpreter(controlPlaneInformerManager, serviceLister)
c.ObjectWatcher = objectwatcher.NewObjectWatcher(c.Client, c.RESTMapper, util.NewClusterDynamicClientSetForAgent, c.ResourceInterpreter)
// Generate InformerManager
m := genericmanager.NewMultiClusterInformerManager(stopCh)
m.ForCluster(clusterName, dynamicClientSet, 0).Lister(corev1.SchemeGroupVersion.WithResource("pods")) // register pod informer
m.Start(clusterName)
m.WaitForCacheSync(clusterName)
informerManager = m
c.InformerManager = m
}
injectMemberClusterInformer(&c, informerManager, util.NewClusterDynamicClientSetForAgent)
return c
}
func TestWorkStatusController_getSingleClusterManager(t *testing.T) {
clusterName := "cluster"
cluster := newCluster(clusterName, clusterv1alpha1.ClusterConditionReady, metav1.ConditionTrue)
// Generate InformerManager
stopCh := make(chan struct{})
defer close(stopCh)
dynamicClientSet := dynamicfake.NewSimpleDynamicClient(scheme.Scheme)
tests := []struct {
name string
rightClusterName bool
expectInformer bool
expectError bool
wrongClusterDynamicClientSetFunc bool
}{
{
name: "normal case",
rightClusterName: true,
expectInformer: true,
expectError: false,
},
{
name: "failed to build dynamic cluster client",
rightClusterName: false,
expectInformer: false,
expectError: true,
wrongClusterDynamicClientSetFunc: true,
},
{
name: "failed to get single cluster",
rightClusterName: false,
expectInformer: true,
expectError: false,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
c := newWorkStatusController(cluster)
m := genericmanager.NewMultiClusterInformerManager(stopCh)
if tt.rightClusterName {
m.ForCluster(clusterName, dynamicClientSet, 0).Lister(corev1.SchemeGroupVersion.WithResource("pods"))
} else {
m.ForCluster("test", dynamicClientSet, 0).Lister(corev1.SchemeGroupVersion.WithResource("pods"))
}
m.Start(clusterName)
m.WaitForCacheSync(clusterName)
c.InformerManager = m
if tt.wrongClusterDynamicClientSetFunc {
c.ClusterDynamicClientSetFunc = NewClusterDynamicClientSetForAgentWithError
} else {
c.ClusterDynamicClientSetFunc = util.NewClusterDynamicClientSet
c.Client = fake.NewClientBuilder().WithScheme(gclient.NewSchema()).WithObjects(
&clusterv1alpha1.Cluster{
ObjectMeta: metav1.ObjectMeta{Name: "cluster"},
Spec: clusterv1alpha1.ClusterSpec{
APIEndpoint: "https://127.0.0.1",
SecretRef: &clusterv1alpha1.LocalSecretReference{Namespace: "ns1", Name: "secret1"},
InsecureSkipTLSVerification: true,
},
Status: clusterv1alpha1.ClusterStatus{
Conditions: []metav1.Condition{
{
Type: clusterv1alpha1.ClusterConditionReady,
Status: metav1.ConditionTrue,
},
},
},
},
&corev1.Secret{
ObjectMeta: metav1.ObjectMeta{Namespace: "ns1", Name: "secret1"},
Data: map[string][]byte{clusterv1alpha1.SecretTokenKey: []byte("token")},
}).Build()
}
informerManager, err := c.getSingleClusterManager(cluster)
if tt.expectInformer {
assert.NotEmpty(t, informerManager)
} else {
assert.Empty(t, informerManager)
}
if tt.expectError {
assert.NotEmpty(t, err)
} else {
assert.Empty(t, err)
}
})
}
}
func TestWorkStatusController_recreateResourceIfNeeded(t *testing.T) {
c := WorkStatusController{
Client: fake.NewClientBuilder().WithScheme(gclient.NewSchema()).WithObjects(newCluster("cluster", clusterv1alpha1.ClusterConditionReady, metav1.ConditionTrue)).Build(),
InformerManager: genericmanager.GetInstance(),
PredicateFunc: helper.NewClusterPredicateOnAgent("test"),
ClusterDynamicClientSetFunc: util.NewClusterDynamicClientSetForAgent,
ClusterCacheSyncTimeout: metav1.Duration{},
RateLimiterOptions: ratelimiterflag.Options{},
}
workUID := "92345678-1234-5678-1234-567812345678"
raw := []byte(`{"apiVersion":"v1","kind":"Pod","metadata":{"name":"pod","namespace":"default"}}`)
work := testhelper.NewWork("work", "default", workUID, raw)
obj := &unstructured.Unstructured{
Object: map[string]interface{}{
"apiVersion": "v1",
"kind": "Pod",
"metadata": map[string]interface{}{
"name": "pod1",
"namespace": "default",
"labels": map[string]interface{}{
workv1alpha1.WorkNamespaceLabel: "karmada-es-cluster",
},
},
},
}
key, _ := generateKey(obj)
fedKey, ok := key.(keys.FederatedKey)
if !ok {
t.Fatalf("Invalid key, key: %v", key)
}
t.Run("normal case", func(t *testing.T) {
err := c.recreateResourceIfNeeded(work, fedKey)
assert.Empty(t, err)
})
t.Run("failed to UnmarshalJSON", func(t *testing.T) {
work.Spec.Workload.Manifests[0].RawExtension.Raw = []byte(`{"apiVersion":"v1","kind":"Pod","metadata":{"name":"pod","namespace":"default"}},`)
err := c.recreateResourceIfNeeded(work, fedKey)
assert.NotEmpty(t, err)
})
}
func TestWorkStatusController_buildStatusIdentifier(t *testing.T) {
c := WorkStatusController{}
c := WorkStatusController{
Client: fake.NewClientBuilder().WithScheme(gclient.NewSchema()).WithObjects(newCluster("cluster", clusterv1alpha1.ClusterConditionReady, metav1.ConditionTrue)).Build(),
InformerManager: genericmanager.GetInstance(),
PredicateFunc: helper.NewClusterPredicateOnAgent("test"),
ClusterDynamicClientSetFunc: util.NewClusterDynamicClientSetForAgent,
ClusterCacheSyncTimeout: metav1.Duration{},
RateLimiterOptions: ratelimiterflag.Options{},
}
work := &workv1alpha1.Work{
ObjectMeta: metav1.ObjectMeta{
@ -690,7 +910,7 @@ func TestWorkStatusController_buildStatusIdentifier(t *testing.T) {
})
t.Run("failed to GetManifestIndex", func(t *testing.T) {
wrongClusterObj, _ := helper.ToUnstructured(testhelper.NewClusterWithTypeAndStatus("cluster", clusterv1alpha1.ClusterConditionReady, metav1.ConditionTrue))
wrongClusterObj, _ := helper.ToUnstructured(newCluster("cluster", clusterv1alpha1.ClusterConditionReady, metav1.ConditionTrue))
wrongClusterJson, _ := json.Marshal(wrongClusterObj)
work.Spec.Workload.Manifests = []workv1alpha1.Manifest{
{
@ -704,7 +924,14 @@ func TestWorkStatusController_buildStatusIdentifier(t *testing.T) {
}
func TestWorkStatusController_mergeStatus(t *testing.T) {
c := WorkStatusController{}
c := WorkStatusController{
Client: fake.NewClientBuilder().WithScheme(gclient.NewSchema()).WithObjects(newCluster("cluster", clusterv1alpha1.ClusterConditionReady, metav1.ConditionTrue)).Build(),
InformerManager: genericmanager.GetInstance(),
PredicateFunc: helper.NewClusterPredicateOnAgent("test"),
ClusterDynamicClientSetFunc: util.NewClusterDynamicClientSetForAgent,
ClusterCacheSyncTimeout: metav1.Duration{},
RateLimiterOptions: ratelimiterflag.Options{},
}
newStatus := workv1alpha1.ManifestStatus{
Health: "health",
@ -712,3 +939,61 @@ func TestWorkStatusController_mergeStatus(t *testing.T) {
actual := c.mergeStatus([]workv1alpha1.ManifestStatus{}, newStatus)
assert.Equal(t, []workv1alpha1.ManifestStatus{newStatus}, actual)
}
func TestWorkStatusController_registerInformersAndStart(t *testing.T) {
clusterName := "cluster"
cluster := newCluster(clusterName, clusterv1alpha1.ClusterConditionReady, metav1.ConditionTrue)
// Generate InformerManager
stopCh := make(chan struct{})
defer close(stopCh)
dynamicClientSet := dynamicfake.NewSimpleDynamicClient(scheme.Scheme)
c := newWorkStatusController(cluster)
opt := util.Options{
Name: "opt",
KeyFunc: nil,
ReconcileFunc: nil,
}
c.worker = util.NewAsyncWorker(opt)
workUID := "92345678-1234-5678-1234-567812345678"
raw := []byte(`{"apiVersion":"v1","kind":"Pod","metadata":{"name":"pod","namespace":"default"}}`)
work := testhelper.NewWork("work", "default", workUID, raw)
t.Run("normal case", func(t *testing.T) {
m := genericmanager.NewMultiClusterInformerManager(stopCh)
m.ForCluster(clusterName, dynamicClientSet, 0).Lister(corev1.SchemeGroupVersion.WithResource("pods")) // register pod informer
m.Start(clusterName)
m.WaitForCacheSync(clusterName)
c.InformerManager = m
err := c.registerInformersAndStart(cluster, work)
assert.Empty(t, err)
})
t.Run("failed to getSingleClusterManager", func(t *testing.T) {
c := newWorkStatusController(cluster)
m := genericmanager.NewMultiClusterInformerManager(stopCh)
m.ForCluster("test", dynamicClientSet, 0).Lister(corev1.SchemeGroupVersion.WithResource("pods")) // register pod informer
m.Start(clusterName)
m.WaitForCacheSync(clusterName)
c.InformerManager = m
c.ClusterDynamicClientSetFunc = NewClusterDynamicClientSetForAgentWithError
err := c.registerInformersAndStart(cluster, work)
assert.NotEmpty(t, err)
})
t.Run("failed to getGVRsFromWork", func(t *testing.T) {
work.Spec.Workload.Manifests[0].RawExtension.Raw = []byte(`{"apiVersion":"v1","kind":"Pod","metadata":{"name":"pod","namespace":"default"}},`)
m := genericmanager.NewMultiClusterInformerManager(stopCh)
m.ForCluster(clusterName, dynamicClientSet, 0).Lister(corev1.SchemeGroupVersion.WithResource("pods")) // register pod informer
m.Start(clusterName)
m.WaitForCacheSync(clusterName)
c.InformerManager = m
err := c.registerInformersAndStart(cluster, work)
assert.NotEmpty(t, err)
})
}

View File

@ -1,158 +0,0 @@
package fake
import (
"context"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
configv1alpha1 "github.com/karmada-io/karmada/pkg/apis/config/v1alpha1"
workv1alpha2 "github.com/karmada-io/karmada/pkg/apis/work/v1alpha2"
"github.com/karmada-io/karmada/pkg/resourceinterpreter"
)
var _ resourceinterpreter.ResourceInterpreter = &FakeInterpreter{}
type (
GetReplicasFunc = func(*unstructured.Unstructured) (int32, *workv1alpha2.ReplicaRequirements, error)
ReviseReplicaFunc = func(*unstructured.Unstructured, int64) (*unstructured.Unstructured, error)
RetainFunc = func(*unstructured.Unstructured, *unstructured.Unstructured) (*unstructured.Unstructured, error)
AggregateStatusFunc = func(*unstructured.Unstructured, []workv1alpha2.AggregatedStatusItem) (*unstructured.Unstructured, error)
GetDependenciesFunc = func(*unstructured.Unstructured) ([]configv1alpha1.DependentObjectReference, error)
ReflectStatusFunc = func(*unstructured.Unstructured) (*runtime.RawExtension, error)
InterpretHealthFunc = func(*unstructured.Unstructured) (bool, error)
)
type FakeInterpreter struct {
getReplicasFunc map[schema.GroupVersionKind]GetReplicasFunc
reviseReplicaFunc map[schema.GroupVersionKind]ReviseReplicaFunc
retainFunc map[schema.GroupVersionKind]RetainFunc
aggregateStatusFunc map[schema.GroupVersionKind]AggregateStatusFunc
getDependenciesFunc map[schema.GroupVersionKind]GetDependenciesFunc
reflectStatusFunc map[schema.GroupVersionKind]ReflectStatusFunc
interpretHealthFunc map[schema.GroupVersionKind]InterpretHealthFunc
}
func (f *FakeInterpreter) Start(ctx context.Context) (err error) {
return nil
}
func (f *FakeInterpreter) HookEnabled(objGVK schema.GroupVersionKind, operationType configv1alpha1.InterpreterOperation) bool {
var exist bool
switch operationType {
case configv1alpha1.InterpreterOperationInterpretReplica:
_, exist = f.getReplicasFunc[objGVK]
case configv1alpha1.InterpreterOperationReviseReplica:
_, exist = f.reviseReplicaFunc[objGVK]
case configv1alpha1.InterpreterOperationRetain:
_, exist = f.retainFunc[objGVK]
case configv1alpha1.InterpreterOperationAggregateStatus:
_, exist = f.aggregateStatusFunc[objGVK]
case configv1alpha1.InterpreterOperationInterpretDependency:
_, exist = f.getDependenciesFunc[objGVK]
case configv1alpha1.InterpreterOperationInterpretStatus:
_, exist = f.reflectStatusFunc[objGVK]
case configv1alpha1.InterpreterOperationInterpretHealth:
_, exist = f.interpretHealthFunc[objGVK]
default:
exist = false
}
return exist
}
func (f *FakeInterpreter) GetReplicas(object *unstructured.Unstructured) (replica int32, requires *workv1alpha2.ReplicaRequirements, err error) {
return f.getReplicasFunc[object.GetObjectKind().GroupVersionKind()](object)
}
func (f *FakeInterpreter) ReviseReplica(object *unstructured.Unstructured, replica int64) (*unstructured.Unstructured, error) {
return f.reviseReplicaFunc[object.GetObjectKind().GroupVersionKind()](object, replica)
}
func (f *FakeInterpreter) Retain(desired *unstructured.Unstructured, observed *unstructured.Unstructured) (*unstructured.Unstructured, error) {
return f.retainFunc[observed.GetObjectKind().GroupVersionKind()](desired, observed)
}
func (f *FakeInterpreter) AggregateStatus(object *unstructured.Unstructured, aggregatedStatusItems []workv1alpha2.AggregatedStatusItem) (*unstructured.Unstructured, error) {
return f.aggregateStatusFunc[object.GetObjectKind().GroupVersionKind()](object, aggregatedStatusItems)
}
func (f *FakeInterpreter) GetDependencies(object *unstructured.Unstructured) (dependencies []configv1alpha1.DependentObjectReference, err error) {
return f.getDependenciesFunc[object.GetObjectKind().GroupVersionKind()](object)
}
func (f *FakeInterpreter) ReflectStatus(object *unstructured.Unstructured) (status *runtime.RawExtension, err error) {
return f.reflectStatusFunc[object.GetObjectKind().GroupVersionKind()](object)
}
func (f *FakeInterpreter) InterpretHealth(object *unstructured.Unstructured) (healthy bool, err error) {
return f.interpretHealthFunc[object.GetObjectKind().GroupVersionKind()](object)
}
func NewFakeInterpreter() *FakeInterpreter {
return &FakeInterpreter{}
}
func (f *FakeInterpreter) WithGetReplicas(objGVK schema.GroupVersionKind, iFunc GetReplicasFunc) *FakeInterpreter {
if f.getReplicasFunc == nil {
f.getReplicasFunc = make(map[schema.GroupVersionKind]GetReplicasFunc)
}
f.getReplicasFunc[objGVK] = iFunc
return f
}
func (f *FakeInterpreter) WithReviseReplica(objGVK schema.GroupVersionKind, iFunc ReviseReplicaFunc) *FakeInterpreter {
if f.reviseReplicaFunc == nil {
f.reviseReplicaFunc = make(map[schema.GroupVersionKind]ReviseReplicaFunc)
}
f.reviseReplicaFunc[objGVK] = iFunc
return f
}
func (f *FakeInterpreter) WithRetain(objGVK schema.GroupVersionKind, iFunc RetainFunc) *FakeInterpreter {
if f.retainFunc == nil {
f.retainFunc = make(map[schema.GroupVersionKind]RetainFunc)
}
f.retainFunc[objGVK] = iFunc
return f
}
func (f *FakeInterpreter) WithAggregateStatus(objGVK schema.GroupVersionKind, iFunc AggregateStatusFunc) *FakeInterpreter {
if f.aggregateStatusFunc == nil {
f.aggregateStatusFunc = make(map[schema.GroupVersionKind]AggregateStatusFunc)
}
f.aggregateStatusFunc[objGVK] = iFunc
return f
}
func (f *FakeInterpreter) WithGetDependencies(objGVK schema.GroupVersionKind, iFunc GetDependenciesFunc) *FakeInterpreter {
if f.getDependenciesFunc == nil {
f.getDependenciesFunc = make(map[schema.GroupVersionKind]GetDependenciesFunc)
}
f.getDependenciesFunc[objGVK] = iFunc
return f
}
func (f *FakeInterpreter) WithReflectStatus(objGVK schema.GroupVersionKind, iFunc ReflectStatusFunc) *FakeInterpreter {
if f.reflectStatusFunc == nil {
f.reflectStatusFunc = make(map[schema.GroupVersionKind]ReflectStatusFunc)
}
f.reflectStatusFunc[objGVK] = iFunc
return f
}
func (f *FakeInterpreter) WithInterpretHealth(objGVK schema.GroupVersionKind, iFunc InterpretHealthFunc) *FakeInterpreter {
if f.interpretHealthFunc == nil {
f.interpretHealthFunc = make(map[schema.GroupVersionKind]InterpretHealthFunc)
}
f.interpretHealthFunc[objGVK] = iFunc
return f
}

View File

@ -1,9 +1,39 @@
package fedinformer
import (
"reflect"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/client-go/tools/cache"
)
// NewHandlerOnAllEvents builds a ResourceEventHandler that the function 'fn' will be called on all events(add/update/delete).
func NewHandlerOnAllEvents(fn func(runtime.Object)) cache.ResourceEventHandler {
return &cache.ResourceEventHandlerFuncs{
AddFunc: func(cur interface{}) {
curObj := cur.(runtime.Object)
fn(curObj)
},
UpdateFunc: func(old, cur interface{}) {
curObj := cur.(runtime.Object)
if !reflect.DeepEqual(old, cur) {
fn(curObj)
}
},
DeleteFunc: func(old interface{}) {
if deleted, ok := old.(cache.DeletedFinalStateUnknown); ok {
// This object might be stale but ok for our current usage.
old = deleted.Obj
if old == nil {
return
}
}
oldObj := old.(runtime.Object)
fn(oldObj)
},
}
}
// NewHandlerOnEvents builds a ResourceEventHandler.
func NewHandlerOnEvents(addFunc func(obj interface{}), updateFunc func(oldObj, newObj interface{}), deleteFunc func(obj interface{})) cache.ResourceEventHandler {
return &cache.ResourceEventHandlerFuncs{

View File

@ -40,21 +40,21 @@ const (
// ObjectVersion retrieves the field type-prefixed value used for
// determining currency of the given cluster object.
func ObjectVersion(obj *unstructured.Unstructured) string {
generation := obj.GetGeneration()
func ObjectVersion(clusterObj *unstructured.Unstructured) string {
generation := clusterObj.GetGeneration()
if generation != 0 {
return fmt.Sprintf("%s%d", generationPrefix, generation)
}
return fmt.Sprintf("%s%s", resourceVersionPrefix, obj.GetResourceVersion())
return fmt.Sprintf("%s%s", resourceVersionPrefix, clusterObj.GetResourceVersion())
}
// +lifted:source=https://github.com/kubernetes-sigs/kubefed/blob/master/pkg/controller/util/propagatedversion.go#L45-L59
// ObjectNeedsUpdate determines whether the 2 objects provided cluster
// object needs to be updated according to the old object and the
// object needs to be updated according to the desired object and the
// recorded version.
func ObjectNeedsUpdate(oldObj, currentObj *unstructured.Unstructured, recordedVersion string) bool {
targetVersion := ObjectVersion(currentObj)
func ObjectNeedsUpdate(desiredObj, clusterObj *unstructured.Unstructured, recordedVersion string) bool {
targetVersion := ObjectVersion(clusterObj)
if recordedVersion != targetVersion {
return true
@ -63,7 +63,7 @@ func ObjectNeedsUpdate(oldObj, currentObj *unstructured.Unstructured, recordedVe
// If versions match and the version is sourced from the
// generation field, a further check of metadata equivalency is
// required.
return strings.HasPrefix(targetVersion, generationPrefix) && !objectMetaObjEquivalent(oldObj, currentObj)
return strings.HasPrefix(targetVersion, generationPrefix) && !objectMetaObjEquivalent(desiredObj, clusterObj)
}
// +lifted:source=https://github.com/kubernetes-retired/kubefed/blob/master/pkg/controller/util/meta.go#L82-L103

View File

@ -1,156 +0,0 @@
package memberclusterinformer
import (
"fmt"
"k8s.io/apimachinery/pkg/api/meta"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/client-go/tools/cache"
"k8s.io/klog/v2"
"sigs.k8s.io/controller-runtime/pkg/client"
clusterv1alpha1 "github.com/karmada-io/karmada/pkg/apis/cluster/v1alpha1"
workv1alpha1 "github.com/karmada-io/karmada/pkg/apis/work/v1alpha1"
"github.com/karmada-io/karmada/pkg/util"
"github.com/karmada-io/karmada/pkg/util/fedinformer/genericmanager"
"github.com/karmada-io/karmada/pkg/util/fedinformer/keys"
"github.com/karmada-io/karmada/pkg/util/helper"
"github.com/karmada-io/karmada/pkg/util/restmapper"
)
var _ MemberClusterInformer = &memberClusterInformerImpl{}
type MemberClusterInformer interface {
// BuildResourceInformers builds informer dynamically for managed resources in member cluster.
// The created informer watches resource change and then sync to the relevant Work object.
BuildResourceInformers(cluster *clusterv1alpha1.Cluster, work *workv1alpha1.Work, handler cache.ResourceEventHandler) error
// GetObjectFromCache returns object in cache by federated key.
GetObjectFromCache(fedKey keys.FederatedKey) (*unstructured.Unstructured, error)
}
func NewMemberClusterInformer(
c client.Client,
restMapper meta.RESTMapper,
informerManager genericmanager.MultiClusterInformerManager,
clusterCacheSyncTimeout metav1.Duration,
clusterDynamicClientSetFunc func(clusterName string, client client.Client) (*util.DynamicClusterClient, error),
) MemberClusterInformer {
return &memberClusterInformerImpl{
Client: c,
restMapper: restMapper,
informerManager: informerManager,
clusterDynamicClientSetFunc: clusterDynamicClientSetFunc,
clusterCacheSyncTimeout: clusterCacheSyncTimeout,
}
}
type memberClusterInformerImpl struct {
client.Client // used to get Cluster and Secret resources.
restMapper meta.RESTMapper
informerManager genericmanager.MultiClusterInformerManager
clusterDynamicClientSetFunc func(clusterName string, client client.Client) (*util.DynamicClusterClient, error)
clusterCacheSyncTimeout metav1.Duration
}
// BuildResourceInformers builds informer dynamically for managed resources in member cluster.
// The created informer watches resource change and then sync to the relevant Work object.
func (m *memberClusterInformerImpl) BuildResourceInformers(cluster *clusterv1alpha1.Cluster, work *workv1alpha1.Work, handler cache.ResourceEventHandler) error {
err := m.registerInformersAndStart(cluster, work, handler)
if err != nil {
klog.Errorf("Failed to register informer for Work %s/%s. Error: %v.", work.GetNamespace(), work.GetName(), err)
return err
}
return nil
}
// registerInformersAndStart builds informer manager for cluster if it doesn't exist, then constructs informers for gvr
// and start it.
func (m *memberClusterInformerImpl) registerInformersAndStart(cluster *clusterv1alpha1.Cluster, work *workv1alpha1.Work, handler cache.ResourceEventHandler) error {
singleClusterInformerManager, err := m.getSingleClusterManager(cluster)
if err != nil {
return err
}
gvrTargets, err := m.getGVRsFromWork(work)
if err != nil {
return err
}
allSynced := true
for _, gvr := range gvrTargets {
if !singleClusterInformerManager.IsInformerSynced(gvr) || !singleClusterInformerManager.IsHandlerExist(gvr, handler) {
allSynced = false
singleClusterInformerManager.ForResource(gvr, handler)
}
}
if allSynced {
return nil
}
m.informerManager.Start(cluster.Name)
if err := func() error {
synced := m.informerManager.WaitForCacheSyncWithTimeout(cluster.Name, m.clusterCacheSyncTimeout.Duration)
if synced == nil {
return fmt.Errorf("no informerFactory for cluster %s exist", cluster.Name)
}
for _, gvr := range gvrTargets {
if !synced[gvr] {
return fmt.Errorf("informer for %s hasn't synced", gvr)
}
}
return nil
}(); err != nil {
klog.Errorf("Failed to sync cache for cluster: %s, error: %v", cluster.Name, err)
m.informerManager.Stop(cluster.Name)
return err
}
return nil
}
// getSingleClusterManager gets singleClusterInformerManager with clusterName.
// If manager is not exist, create it, otherwise gets it from map.
func (m *memberClusterInformerImpl) getSingleClusterManager(cluster *clusterv1alpha1.Cluster) (genericmanager.SingleClusterInformerManager, error) {
// TODO(chenxianpao): If cluster A is removed, then a new cluster that name also is A joins karmada,
// the cache in informer manager should be updated.
singleClusterInformerManager := m.informerManager.GetSingleClusterManager(cluster.Name)
if singleClusterInformerManager == nil {
dynamicClusterClient, err := m.clusterDynamicClientSetFunc(cluster.Name, m.Client)
if err != nil {
klog.Errorf("Failed to build dynamic cluster client for cluster %s.", cluster.Name)
return nil, err
}
singleClusterInformerManager = m.informerManager.ForCluster(dynamicClusterClient.ClusterName, dynamicClusterClient.DynamicClientSet, 0)
}
return singleClusterInformerManager, nil
}
// getGVRsFromWork traverses the manifests in work to find groupVersionResource list.
func (m *memberClusterInformerImpl) getGVRsFromWork(work *workv1alpha1.Work) ([]schema.GroupVersionResource, error) {
gvrTargets := sets.New[schema.GroupVersionResource]()
for _, manifest := range work.Spec.Workload.Manifests {
workload := &unstructured.Unstructured{}
err := workload.UnmarshalJSON(manifest.Raw)
if err != nil {
klog.Errorf("Failed to unmarshal workload. Error: %v.", err)
return nil, err
}
gvr, err := restmapper.GetGroupVersionResource(m.restMapper, workload.GroupVersionKind())
if err != nil {
klog.Errorf("Failed to get GVR from GVK for resource %s/%s. Error: %v.", workload.GetNamespace(), workload.GetName(), err)
return nil, err
}
gvrTargets.Insert(gvr)
}
return gvrTargets.UnsortedList(), nil
}
// GetObjectFromCache returns object in cache by federated key.
func (m *memberClusterInformerImpl) GetObjectFromCache(fedKey keys.FederatedKey) (*unstructured.Unstructured, error) {
return helper.GetObjectFromCache(m.restMapper, m.informerManager, fedKey)
}

View File

@ -1,188 +0,0 @@
package memberclusterinformer
import (
"fmt"
"testing"
"github.com/stretchr/testify/assert"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/meta"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime/schema"
dynamicfake "k8s.io/client-go/dynamic/fake"
"k8s.io/client-go/kubernetes/scheme"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/client/fake"
clusterv1alpha1 "github.com/karmada-io/karmada/pkg/apis/cluster/v1alpha1"
"github.com/karmada-io/karmada/pkg/util"
"github.com/karmada-io/karmada/pkg/util/fedinformer"
"github.com/karmada-io/karmada/pkg/util/fedinformer/genericmanager"
"github.com/karmada-io/karmada/pkg/util/gclient"
testhelper "github.com/karmada-io/karmada/test/helper"
)
func Test_getSingleClusterManager(t *testing.T) {
clusterName := "cluster"
cluster := testhelper.NewClusterWithTypeAndStatus(clusterName, clusterv1alpha1.ClusterConditionReady, metav1.ConditionTrue)
// Generate InformerManager
stopCh := make(chan struct{})
defer close(stopCh)
dynamicClientSet := dynamicfake.NewSimpleDynamicClient(scheme.Scheme)
tests := []struct {
name string
rightClusterName bool
expectInformer bool
expectError bool
wrongClusterDynamicClientSetFunc bool
}{
{
name: "normal case",
rightClusterName: true,
expectInformer: true,
expectError: false,
},
{
name: "failed to build dynamic cluster client",
rightClusterName: false,
expectInformer: false,
expectError: true,
wrongClusterDynamicClientSetFunc: true,
},
{
name: "failed to get single cluster",
rightClusterName: false,
expectInformer: true,
expectError: false,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
c := newMemberClusterInformer(cluster)
m := genericmanager.NewMultiClusterInformerManager(stopCh)
if tt.rightClusterName {
m.ForCluster(clusterName, dynamicClientSet, 0).Lister(corev1.SchemeGroupVersion.WithResource("pods"))
} else {
m.ForCluster("test", dynamicClientSet, 0).Lister(corev1.SchemeGroupVersion.WithResource("pods"))
}
m.Start(clusterName)
m.WaitForCacheSync(clusterName)
c.informerManager = m
if tt.wrongClusterDynamicClientSetFunc {
c.clusterDynamicClientSetFunc = NewClusterDynamicClientSetForAgentWithError
} else {
c.clusterDynamicClientSetFunc = util.NewClusterDynamicClientSet
c.Client = fake.NewClientBuilder().WithScheme(gclient.NewSchema()).WithObjects(
&clusterv1alpha1.Cluster{
ObjectMeta: metav1.ObjectMeta{Name: "cluster"},
Spec: clusterv1alpha1.ClusterSpec{
APIEndpoint: "https://127.0.0.1",
SecretRef: &clusterv1alpha1.LocalSecretReference{Namespace: "ns1", Name: "secret1"},
InsecureSkipTLSVerification: true,
},
Status: clusterv1alpha1.ClusterStatus{
Conditions: []metav1.Condition{
{
Type: clusterv1alpha1.ClusterConditionReady,
Status: metav1.ConditionTrue,
},
},
},
},
&corev1.Secret{
ObjectMeta: metav1.ObjectMeta{Namespace: "ns1", Name: "secret1"},
Data: map[string][]byte{clusterv1alpha1.SecretTokenKey: []byte("token")},
}).Build()
}
informerManager, err := c.getSingleClusterManager(cluster)
if tt.expectInformer {
assert.NotEmpty(t, informerManager)
} else {
assert.Empty(t, informerManager)
}
if tt.expectError {
assert.NotEmpty(t, err)
} else {
assert.Empty(t, err)
}
})
}
}
func Test_registerInformersAndStart(t *testing.T) {
workUID := "93162d3c-ee8e-4995-9034-05f4d5d2c2b9"
clusterName := "cluster"
cluster := testhelper.NewClusterWithTypeAndStatus(clusterName, clusterv1alpha1.ClusterConditionReady, metav1.ConditionTrue)
// Generate InformerManager
stopCh := make(chan struct{})
defer close(stopCh)
dynamicClientSet := dynamicfake.NewSimpleDynamicClient(scheme.Scheme)
c := newMemberClusterInformer(cluster)
raw := []byte(`{"apiVersion":"v1","kind":"Pod","metadata":{"name":"pod","namespace":"default"}}`)
work := testhelper.NewWork("work", "default", workUID, raw)
eventHandler := fedinformer.NewHandlerOnEvents(nil, nil, nil)
t.Run("normal case", func(t *testing.T) {
m := genericmanager.NewMultiClusterInformerManager(stopCh)
m.ForCluster(clusterName, dynamicClientSet, 0).Lister(corev1.SchemeGroupVersion.WithResource("pods")) // register pod informer
m.Start(clusterName)
m.WaitForCacheSync(clusterName)
c.informerManager = m
err := c.registerInformersAndStart(cluster, work, eventHandler)
assert.Empty(t, err)
})
t.Run("failed to getSingleClusterManager", func(t *testing.T) {
c := newMemberClusterInformer(cluster)
m := genericmanager.NewMultiClusterInformerManager(stopCh)
m.ForCluster("test", dynamicClientSet, 0).Lister(corev1.SchemeGroupVersion.WithResource("pods")) // register pod informer
m.Start(clusterName)
m.WaitForCacheSync(clusterName)
c.informerManager = m
c.clusterDynamicClientSetFunc = NewClusterDynamicClientSetForAgentWithError
err := c.registerInformersAndStart(cluster, work, eventHandler)
assert.NotEmpty(t, err)
})
t.Run("failed to getGVRsFromWork", func(t *testing.T) {
work.Spec.Workload.Manifests[0].RawExtension.Raw = []byte(`{"apiVersion":"v1","kind":"Pod","metadata":{"name":"pod","namespace":"default"}},`)
m := genericmanager.NewMultiClusterInformerManager(stopCh)
m.ForCluster(clusterName, dynamicClientSet, 0).Lister(corev1.SchemeGroupVersion.WithResource("pods")) // register pod informer
m.Start(clusterName)
m.WaitForCacheSync(clusterName)
c.informerManager = m
err := c.registerInformersAndStart(cluster, work, eventHandler)
assert.NotEmpty(t, err)
})
}
func NewClusterDynamicClientSetForAgentWithError(clusterName string, client client.Client) (*util.DynamicClusterClient, error) {
return nil, fmt.Errorf("err")
}
func newMemberClusterInformer(cluster *clusterv1alpha1.Cluster) *memberClusterInformerImpl {
mapper := meta.NewDefaultRESTMapper([]schema.GroupVersion{corev1.SchemeGroupVersion})
mapper.Add(corev1.SchemeGroupVersion.WithKind("Pod"), meta.RESTScopeNamespace)
return &memberClusterInformerImpl{
Client: fake.NewClientBuilder().WithScheme(gclient.NewSchema()).WithObjects(cluster).Build(),
restMapper: mapper,
clusterCacheSyncTimeout: metav1.Duration{},
clusterDynamicClientSetFunc: util.NewClusterDynamicClientSetForAgent,
}
}

View File

@ -9,7 +9,6 @@ import (
"k8s.io/apimachinery/pkg/api/meta"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/client-go/util/retry"
"k8s.io/klog/v2"
"sigs.k8s.io/controller-runtime/pkg/client"
@ -18,6 +17,9 @@ import (
workv1alpha2 "github.com/karmada-io/karmada/pkg/apis/work/v1alpha2"
"github.com/karmada-io/karmada/pkg/resourceinterpreter"
"github.com/karmada-io/karmada/pkg/util"
"github.com/karmada-io/karmada/pkg/util/fedinformer/genericmanager"
"github.com/karmada-io/karmada/pkg/util/fedinformer/keys"
"github.com/karmada-io/karmada/pkg/util/helper"
"github.com/karmada-io/karmada/pkg/util/lifted"
"github.com/karmada-io/karmada/pkg/util/restmapper"
)
@ -27,36 +29,31 @@ type ObjectWatcher interface {
Create(clusterName string, desireObj *unstructured.Unstructured) error
Update(clusterName string, desireObj, clusterObj *unstructured.Unstructured) error
Delete(clusterName string, desireObj *unstructured.Unstructured) error
NeedsUpdate(clusterName string, oldObj, currentObj *unstructured.Unstructured) bool
NeedsUpdate(clusterName string, desiredObj, clusterObj *unstructured.Unstructured) (bool, error)
}
// ClientSetFunc is used to generate client set of member cluster
type ClientSetFunc func(c string, client client.Client) (*util.DynamicClusterClient, error)
type versionFunc func() (objectVersion string, err error)
type versionWithLock struct {
lock sync.RWMutex
version string
}
type objectWatcherImpl struct {
Lock sync.RWMutex
RESTMapper meta.RESTMapper
KubeClientSet client.Client
VersionRecord map[string]map[string]*versionWithLock
VersionRecord map[string]map[string]string
ClusterClientSetFunc ClientSetFunc
resourceInterpreter resourceinterpreter.ResourceInterpreter
InformerManager genericmanager.MultiClusterInformerManager
}
// NewObjectWatcher returns an instance of ObjectWatcher
func NewObjectWatcher(kubeClientSet client.Client, restMapper meta.RESTMapper, clusterClientSetFunc ClientSetFunc, interpreter resourceinterpreter.ResourceInterpreter) ObjectWatcher {
return &objectWatcherImpl{
KubeClientSet: kubeClientSet,
VersionRecord: make(map[string]map[string]*versionWithLock),
VersionRecord: make(map[string]map[string]string),
RESTMapper: restMapper,
ClusterClientSetFunc: clusterClientSetFunc,
resourceInterpreter: interpreter,
InformerManager: genericmanager.GetInstance(),
}
}
@ -73,16 +70,30 @@ func (o *objectWatcherImpl) Create(clusterName string, desireObj *unstructured.U
return err
}
clusterObj, err := dynamicClusterClient.DynamicClientSet.Resource(gvr).Namespace(desireObj.GetNamespace()).Create(context.TODO(), desireObj, metav1.CreateOptions{})
fedKey, err := keys.FederatedKeyFunc(clusterName, desireObj)
if err != nil {
klog.Errorf("Failed to create resource(kind=%s, %s/%s) in cluster %s, err is %v ", desireObj.GetKind(), desireObj.GetNamespace(), desireObj.GetName(), clusterName, err)
klog.Errorf("Failed to get FederatedKey %s, error: %v", desireObj.GetName(), err)
return err
}
_, err = helper.GetObjectFromCache(o.RESTMapper, o.InformerManager, fedKey)
if err != nil {
if !apierrors.IsNotFound(err) {
klog.Errorf("Failed to get resource %v from member cluster, err is %v ", desireObj.GetName(), err)
return err
}
klog.Infof("Created resource(kind=%s, %s/%s) on cluster: %s", desireObj.GetKind(), desireObj.GetNamespace(), desireObj.GetName(), clusterName)
clusterObj, err := dynamicClusterClient.DynamicClientSet.Resource(gvr).Namespace(desireObj.GetNamespace()).Create(context.TODO(), desireObj, metav1.CreateOptions{})
if err != nil {
klog.Errorf("Failed to create resource(kind=%s, %s/%s) in cluster %s, err is %v ", desireObj.GetKind(), desireObj.GetNamespace(), desireObj.GetName(), clusterName, err)
return err
}
// record version
return o.recordVersionWithVersionFunc(clusterObj, dynamicClusterClient.ClusterName, func() (string, error) { return lifted.ObjectVersion(clusterObj), nil })
klog.Infof("Created resource(kind=%s, %s/%s) on cluster: %s", desireObj.GetKind(), desireObj.GetNamespace(), desireObj.GetName(), clusterName)
// record version
o.recordVersion(clusterObj, dynamicClusterClient.ClusterName)
}
return nil
}
func (o *objectWatcherImpl) retainClusterFields(desired, observed *unstructured.Unstructured) (*unstructured.Unstructured, error) {
@ -133,48 +144,22 @@ func (o *objectWatcherImpl) Update(clusterName string, desireObj, clusterObj *un
return err
}
var errMsg string
var desireCopy, resource *unstructured.Unstructured
err = retry.RetryOnConflict(retry.DefaultBackoff, func() error {
desireCopy = desireObj.DeepCopy()
if err != nil {
clusterObj, err = dynamicClusterClient.DynamicClientSet.Resource(gvr).Namespace(desireObj.GetNamespace()).Get(context.TODO(), desireObj.GetName(), metav1.GetOptions{})
if err != nil {
errMsg = fmt.Sprintf("Failed to get resource(kind=%s, %s/%s) in cluster %s, err is %v ", desireObj.GetKind(), desireObj.GetNamespace(), desireObj.GetName(), clusterName, err)
return err
}
}
desireCopy, err = o.retainClusterFields(desireCopy, clusterObj)
if err != nil {
errMsg = fmt.Sprintf("Failed to retain fields for resource(kind=%s, %s/%s) in cluster %s: %v", clusterObj.GetKind(), clusterObj.GetNamespace(), clusterObj.GetName(), clusterName, err)
return err
}
versionFuncWithUpdate := func() (string, error) {
resource, err = dynamicClusterClient.DynamicClientSet.Resource(gvr).Namespace(desireObj.GetNamespace()).Update(context.TODO(), desireCopy, metav1.UpdateOptions{})
if err != nil {
return "", err
}
return lifted.ObjectVersion(resource), nil
}
err = o.recordVersionWithVersionFunc(desireCopy, clusterName, versionFuncWithUpdate)
if err == nil {
return nil
}
errMsg = fmt.Sprintf("Failed to update resource(kind=%s, %s/%s) in cluster %s, err is %v ", desireObj.GetKind(), desireObj.GetNamespace(), desireObj.GetName(), clusterName, err)
return err
})
desireObj, err = o.retainClusterFields(desireObj, clusterObj)
if err != nil {
klog.Errorf(errMsg)
klog.Errorf("Failed to retain fields for resource(kind=%s, %s/%s) in cluster %s: %v", clusterObj.GetKind(), clusterObj.GetNamespace(), clusterObj.GetName(), clusterName, err)
return err
}
resource, err := dynamicClusterClient.DynamicClientSet.Resource(gvr).Namespace(desireObj.GetNamespace()).Update(context.TODO(), desireObj, metav1.UpdateOptions{})
if err != nil {
klog.Errorf("Failed to update resource(kind=%s, %s/%s) in cluster %s, err is %v ", desireObj.GetKind(), desireObj.GetNamespace(), desireObj.GetName(), clusterName, err)
return err
}
klog.Infof("Updated resource(kind=%s, %s/%s) on cluster: %s", desireObj.GetKind(), desireObj.GetNamespace(), desireObj.GetName(), clusterName)
// record version
o.recordVersion(resource, clusterName)
return nil
}
@ -208,7 +193,8 @@ func (o *objectWatcherImpl) Delete(clusterName string, desireObj *unstructured.U
}
klog.Infof("Deleted resource(kind=%s, %s/%s) on cluster: %s", desireObj.GetKind(), desireObj.GetNamespace(), desireObj.GetName(), clusterName)
o.deleteVersionRecord(desireObj, dynamicClusterClient.ClusterName)
objectKey := o.genObjectKey(desireObj)
o.deleteVersionRecord(dynamicClusterClient.ClusterName, objectKey)
return nil
}
@ -217,98 +203,71 @@ func (o *objectWatcherImpl) genObjectKey(obj *unstructured.Unstructured) string
return obj.GroupVersionKind().String() + "/" + obj.GetNamespace() + "/" + obj.GetName()
}
// recordVersion will add or update resource version records with the version returned by versionFunc
func (o *objectWatcherImpl) recordVersionWithVersionFunc(obj *unstructured.Unstructured, clusterName string, fn versionFunc) error {
objectKey := o.genObjectKey(obj)
return o.addOrUpdateVersionRecordWithVersionFunc(clusterName, objectKey, fn)
// recordVersion will add or update resource version records
func (o *objectWatcherImpl) recordVersion(clusterObj *unstructured.Unstructured, clusterName string) {
objVersion := lifted.ObjectVersion(clusterObj)
objectKey := o.genObjectKey(clusterObj)
if o.isClusterVersionRecordExist(clusterName) {
o.updateVersionRecord(clusterName, objectKey, objVersion)
} else {
o.addVersionRecord(clusterName, objectKey, objVersion)
}
}
// isClusterVersionRecordExist checks if the version record map of given member cluster exist
func (o *objectWatcherImpl) isClusterVersionRecordExist(clusterName string) bool {
o.Lock.RLock()
defer o.Lock.RUnlock()
_, exist := o.VersionRecord[clusterName]
return exist
}
// getVersionRecord will return the recorded version of given resource(if exist)
func (o *objectWatcherImpl) getVersionRecord(clusterName, resourceName string) (string, bool) {
versionLock, exist := o.getVersionWithLockRecord(clusterName, resourceName)
if !exist {
return "", false
}
versionLock.lock.RLock()
defer versionLock.lock.RUnlock()
return versionLock.version, true
}
// getVersionRecordWithLock will return the recorded versionWithLock of given resource(if exist)
func (o *objectWatcherImpl) getVersionWithLockRecord(clusterName, resourceName string) (*versionWithLock, bool) {
o.Lock.RLock()
defer o.Lock.RUnlock()
versionLock, exist := o.VersionRecord[clusterName][resourceName]
return versionLock, exist
version, exist := o.VersionRecord[clusterName][resourceName]
return version, exist
}
// newVersionWithLockRecord will add new versionWithLock record of given resource
func (o *objectWatcherImpl) newVersionWithLockRecord(clusterName, resourceName string) *versionWithLock {
// addVersionRecord will add new version record of given resource
func (o *objectWatcherImpl) addVersionRecord(clusterName, resourceName, version string) {
o.Lock.Lock()
defer o.Lock.Unlock()
v, exist := o.VersionRecord[clusterName][resourceName]
if exist {
return v
}
v = &versionWithLock{}
if o.VersionRecord[clusterName] == nil {
o.VersionRecord[clusterName] = map[string]*versionWithLock{}
}
o.VersionRecord[clusterName][resourceName] = v
return v
o.VersionRecord[clusterName] = map[string]string{resourceName: version}
}
// addOrUpdateVersionRecordWithVersionFunc will add or update the recorded version of given resource with version returned by versionFunc
func (o *objectWatcherImpl) addOrUpdateVersionRecordWithVersionFunc(clusterName, resourceName string, fn versionFunc) error {
versionLock, exist := o.getVersionWithLockRecord(clusterName, resourceName)
if !exist {
versionLock = o.newVersionWithLockRecord(clusterName, resourceName)
}
versionLock.lock.Lock()
defer versionLock.lock.Unlock()
version, err := fn()
if err != nil {
return err
}
versionLock.version = version
return nil
// updateVersionRecord will update the recorded version of given resource
func (o *objectWatcherImpl) updateVersionRecord(clusterName, resourceName, version string) {
o.Lock.Lock()
defer o.Lock.Unlock()
o.VersionRecord[clusterName][resourceName] = version
}
// deleteVersionRecord will delete the recorded version of given resource
func (o *objectWatcherImpl) deleteVersionRecord(obj *unstructured.Unstructured, clusterName string) {
objectKey := o.genObjectKey(obj)
func (o *objectWatcherImpl) deleteVersionRecord(clusterName, resourceName string) {
o.Lock.Lock()
defer o.Lock.Unlock()
delete(o.VersionRecord[clusterName], objectKey)
delete(o.VersionRecord[clusterName], resourceName)
}
func (o *objectWatcherImpl) NeedsUpdate(clusterName string, oldObj, currentObj *unstructured.Unstructured) bool {
// get resource version, and if no recorded version, that means the object hasn't been processed yet since last restart, it should be processed now.
version, exist := o.getVersionRecord(clusterName, o.genObjectKey(oldObj))
func (o *objectWatcherImpl) NeedsUpdate(clusterName string, desiredObj, clusterObj *unstructured.Unstructured) (bool, error) {
// get resource version
version, exist := o.getVersionRecord(clusterName, desiredObj.GroupVersionKind().String()+"/"+desiredObj.GetNamespace()+"/"+desiredObj.GetName())
if !exist {
return true
klog.Errorf("Failed to update resource(kind=%s, %s/%s) in cluster %s for the version record does not exist", desiredObj.GetKind(), desiredObj.GetNamespace(), desiredObj.GetName(), clusterName)
return false, fmt.Errorf("failed to update resource(kind=%s, %s/%s) in cluster %s for the version record does not exist", desiredObj.GetKind(), desiredObj.GetNamespace(), desiredObj.GetName(), clusterName)
}
return lifted.ObjectNeedsUpdate(oldObj, currentObj, version)
return lifted.ObjectNeedsUpdate(desiredObj, clusterObj, version), nil
}
func (o *objectWatcherImpl) allowUpdate(clusterName string, desiredObj, clusterObj *unstructured.Unstructured) bool {
// If the existing resource is managed by Karmada, then the updating is allowed.
if util.GetLabelValue(desiredObj.GetLabels(), workv1alpha1.WorkNameLabel) == util.GetLabelValue(clusterObj.GetLabels(), workv1alpha1.WorkNameLabel) &&
util.GetLabelValue(desiredObj.GetLabels(), workv1alpha1.WorkNamespaceLabel) == util.GetLabelValue(clusterObj.GetLabels(), workv1alpha1.WorkNamespaceLabel) {
if util.GetLabelValue(desiredObj.GetLabels(), workv1alpha1.WorkNameLabel) == util.GetLabelValue(clusterObj.GetLabels(), workv1alpha1.WorkNameLabel) {
return true
}

View File

@ -1,173 +0,0 @@
package objectwatcher
import (
"context"
"encoding/json"
"testing"
"github.com/stretchr/testify/assert"
appsv1 "k8s.io/api/apps/v1"
"k8s.io/apimachinery/pkg/api/meta"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/client-go/dynamic"
"k8s.io/client-go/dynamic/dynamicinformer"
dynamicfake "k8s.io/client-go/dynamic/fake"
"k8s.io/client-go/kubernetes/scheme"
"k8s.io/client-go/tools/cache"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/client/fake"
workv1alpha1 "github.com/karmada-io/karmada/pkg/apis/work/v1alpha1"
interpreterfake "github.com/karmada-io/karmada/pkg/resourceinterpreter/fake"
"github.com/karmada-io/karmada/pkg/util"
"github.com/karmada-io/karmada/pkg/util/gclient"
"github.com/karmada-io/karmada/pkg/util/names"
)
func TestObjectWatcher_NeedsUpdate(t *testing.T) {
deployGVR := appsv1.SchemeGroupVersion.WithResource("deployments")
tests := []struct {
name string
createInternal bool
updateInternal bool
expected bool
}{
{
name: "true, empty record",
createInternal: false,
updateInternal: false,
expected: true,
},
{
name: "false, update from internal",
createInternal: true,
updateInternal: true,
expected: false,
},
{
name: "true, update from client",
createInternal: true,
updateInternal: false,
expected: true,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
resChan := make(chan bool)
dynamicClientSet := dynamicfake.NewSimpleDynamicClient(scheme.Scheme)
o := newObjectWatcher(dynamicClientSet)
informer := dynamicinformer.NewDynamicSharedInformerFactory(dynamicClientSet, 0)
_, err := informer.ForResource(deployGVR).Informer().AddEventHandler(newEventHandlerWithResultChan(o, resChan))
if err != nil {
t.Fatalf("Failed to add event handler: %v", err)
}
informer.Start(ctx.Done())
informer.WaitForCacheSync(ctx.Done())
clusterDeploy := newDeploymentObj(1, 1)
if tt.createInternal {
err = o.Create("cluster", clusterDeploy)
if err != nil {
t.Fatalf("Failed to create deploy from internal: %v", err)
}
} else {
_, err = dynamicClientSet.Resource(deployGVR).Namespace(clusterDeploy.GetNamespace()).Create(ctx, clusterDeploy, metav1.CreateOptions{})
if err != nil {
t.Fatalf("Failed to create deploy from client: %v", err)
}
}
newDeploy := newDeploymentObj(2, 2)
if tt.updateInternal {
err = o.Update("cluster", newDeploy, clusterDeploy)
if err != nil {
t.Fatalf("Failed to update deploy from internal: %v", err)
}
} else {
_, err = dynamicClientSet.Resource(deployGVR).Namespace(newDeploy.GetNamespace()).Update(ctx, newDeploy, metav1.UpdateOptions{})
if err != nil {
t.Fatalf("Failed to update deploy from client: %v", err)
}
}
res := <-resChan
assert.Equal(t, tt.expected, res)
})
}
}
func newDeploymentObj(replicas int32, generation int64) *unstructured.Unstructured {
deployObj := map[string]interface{}{
"apiVersion": "apps/v1",
"kind": "Deployment",
"metadata": map[string]interface{}{
"name": "deployment",
"namespace": "default",
"labels": map[string]interface{}{
workv1alpha1.WorkNamespaceLabel: "karmada-es-cluster",
workv1alpha1.WorkNameLabel: "work",
},
"generation": generation,
},
"spec": map[string]interface{}{
"replicas": &replicas,
},
}
deployBytes, _ := json.Marshal(deployObj)
deployUnstructured := &unstructured.Unstructured{}
_ = deployUnstructured.UnmarshalJSON(deployBytes)
return deployUnstructured
}
func newRESTMapper() meta.RESTMapper {
m := meta.NewDefaultRESTMapper([]schema.GroupVersion{appsv1.SchemeGroupVersion})
m.Add(appsv1.SchemeGroupVersion.WithKind("Deployment"), meta.RESTScopeNamespace)
return m
}
func newObjectWatcher(dynamicClientSets ...dynamic.Interface) ObjectWatcher {
c := fake.NewClientBuilder().WithScheme(gclient.NewSchema()).Build()
clusterDynamicClientSetFunc := util.NewClusterDynamicClientSetForAgent
if len(dynamicClientSets) > 0 {
clusterDynamicClientSetFunc = newClusterDynamicClientSetForAgent("cluster", dynamicClientSets[0])
}
return NewObjectWatcher(c, newRESTMapper(), clusterDynamicClientSetFunc, interpreterfake.NewFakeInterpreter())
}
func newClusterDynamicClientSetForAgent(clusterName string, dynamicClientSet dynamic.Interface) func(string, client.Client) (*util.DynamicClusterClient, error) {
return func(string, client.Client) (*util.DynamicClusterClient, error) {
return &util.DynamicClusterClient{
ClusterName: clusterName,
DynamicClientSet: dynamicClientSet,
}, nil
}
}
func newEventHandlerWithResultChan(o ObjectWatcher, resChan chan<- bool) cache.ResourceEventHandler {
return &cache.ResourceEventHandlerFuncs{
UpdateFunc: func(oldObj, newObj interface{}) {
objOld := oldObj.(*unstructured.Unstructured)
objNew := newObj.(*unstructured.Unstructured)
res := false
clusterName, _ := names.GetClusterName(util.GetLabelValue(objNew.GetLabels(), workv1alpha1.WorkNamespaceLabel))
if clusterName != "" {
res = o.NeedsUpdate(clusterName, objOld, objNew)
}
resChan <- res
},
}
}

View File

@ -610,24 +610,6 @@ func NewClusterWithResource(name string, allocatable, allocating, allocated core
}
}
// NewClusterWithTypeAndStatus will build a Cluster with type and status.
func NewClusterWithTypeAndStatus(name string, clusterType string, clusterStatus metav1.ConditionStatus) *clusterv1alpha1.Cluster {
return &clusterv1alpha1.Cluster{
ObjectMeta: metav1.ObjectMeta{
Name: name,
},
Spec: clusterv1alpha1.ClusterSpec{},
Status: clusterv1alpha1.ClusterStatus{
Conditions: []metav1.Condition{
{
Type: clusterType,
Status: clusterStatus,
},
},
},
}
}
// NewWorkload will build a workload object.
func NewWorkload(namespace, name string) *workloadv1alpha1.Workload {
podLabels := map[string]string{"app": "nginx"}