From dac00692e7d14c1054ee9669790896e2b7152ea9 Mon Sep 17 00:00:00 2001 From: pigletfly Date: Thu, 4 Nov 2021 18:12:47 +0800 Subject: [PATCH] refactor object watcher interface Signed-off-by: pigletfly --- .../execution/execution_controller.go | 60 ++++++++--------- .../mcs/service_export_controller.go | 4 +- .../status/cluster_status_controller.go | 8 +-- .../status/workstatus_controller.go | 21 ++---- pkg/util/membercluster_client.go | 27 ++++---- pkg/util/objectwatcher/objectwatcher.go | 65 +++++++++---------- test/e2e/framework/cluster.go | 4 +- test/e2e/namespace_test.go | 2 +- 8 files changed, 87 insertions(+), 104 deletions(-) diff --git a/pkg/controllers/execution/execution_controller.go b/pkg/controllers/execution/execution_controller.go index c8e8aa6a6..89d054487 100644 --- a/pkg/controllers/execution/execution_controller.go +++ b/pkg/controllers/execution/execution_controller.go @@ -17,7 +17,6 @@ import ( "sigs.k8s.io/controller-runtime/pkg/controller/controllerutil" "sigs.k8s.io/controller-runtime/pkg/predicate" - clusterv1alpha1 "github.com/karmada-io/karmada/pkg/apis/cluster/v1alpha1" workv1alpha1 "github.com/karmada-io/karmada/pkg/apis/work/v1alpha1" "github.com/karmada-io/karmada/pkg/util" "github.com/karmada-io/karmada/pkg/util/helper" @@ -69,11 +68,15 @@ func (c *Controller) Reconcile(ctx context.Context, req controllerruntime.Reques klog.Errorf("Failed to get the given member cluster %s", clusterName) return controllerruntime.Result{Requeue: true}, err } + if !util.IsClusterReady(&cluster.Status) { + klog.Errorf("Stop sync work(%s/%s) for cluster(%s) as cluster not ready.", work.Namespace, work.Name, cluster.Name) + return controllerruntime.Result{Requeue: true}, fmt.Errorf("cluster(%s) not ready", cluster.Name) + } if !work.DeletionTimestamp.IsZero() { applied := helper.IsResourceApplied(&work.Status) if applied { - err := c.tryDeleteWorkload(cluster, work) + err := c.tryDeleteWorkload(clusterName, work) if err != nil { klog.Errorf("Failed to delete work %v, namespace is %v, err is %v", work.Name, work.Namespace, err) return controllerruntime.Result{Requeue: true}, err @@ -82,7 +85,7 @@ func (c *Controller) Reconcile(ctx context.Context, req controllerruntime.Reques return c.removeFinalizer(work) } - return c.syncWork(cluster, work) + return c.syncWork(clusterName, work) } // SetupWithManager creates a controller and register to controller manager. @@ -94,20 +97,15 @@ func (c *Controller) SetupWithManager(mgr controllerruntime.Manager) error { Complete(c) } -func (c *Controller) syncWork(cluster *clusterv1alpha1.Cluster, work *workv1alpha1.Work) (controllerruntime.Result, error) { - if !util.IsClusterReady(&cluster.Status) { - klog.Errorf("Stop sync work(%s/%s) for cluster(%s) as cluster not ready.", work.Namespace, work.Name, cluster.Name) - return controllerruntime.Result{Requeue: true}, fmt.Errorf("cluster(%s) not ready", cluster.Name) - } - - err := c.syncToClusters(cluster, work) +func (c *Controller) syncWork(clusterName string, work *workv1alpha1.Work) (controllerruntime.Result, error) { + err := c.syncToClusters(clusterName, work) if err != nil { - msg := fmt.Sprintf("Failed to sync work(%s) to cluster(%s): %v", work.Name, cluster.Name, err) + msg := fmt.Sprintf("Failed to sync work(%s) to cluster(%s): %v", work.Name, clusterName, err) klog.Errorf(msg) c.EventRecorder.Event(work, corev1.EventTypeWarning, workv1alpha1.EventReasonSyncWorkFailed, msg) return controllerruntime.Result{Requeue: true}, err } - msg := fmt.Sprintf("Sync work (%s) to cluster(%s) successful.", work.Name, cluster.Name) + msg := fmt.Sprintf("Sync work (%s) to cluster(%s) successful.", work.Name, clusterName) klog.V(4).Infof(msg) c.EventRecorder.Event(work, corev1.EventTypeNormal, workv1alpha1.EventReasonSyncWorkSucceed, msg) return controllerruntime.Result{}, nil @@ -115,13 +113,7 @@ func (c *Controller) syncWork(cluster *clusterv1alpha1.Cluster, work *workv1alph // tryDeleteWorkload tries to delete resource in the given member cluster. // Abort deleting when the member cluster is unready, otherwise we can't unjoin the member cluster when the member cluster is unready -func (c *Controller) tryDeleteWorkload(cluster *clusterv1alpha1.Cluster, work *workv1alpha1.Work) error { - // Do not clean up resource in the given member cluster if the status of the given member cluster is unready - if !util.IsClusterReady(&cluster.Status) { - klog.Infof("Do not clean up resource in the given member cluster if the status of the given member cluster %s is unready", cluster.Name) - return nil - } - +func (c *Controller) tryDeleteWorkload(clusterName string, work *workv1alpha1.Work) error { for _, manifest := range work.Spec.Workload.Manifests { workload := &unstructured.Unstructured{} err := workload.UnmarshalJSON(manifest.Raw) @@ -130,9 +122,9 @@ func (c *Controller) tryDeleteWorkload(cluster *clusterv1alpha1.Cluster, work *w return err } - err = c.ObjectWatcher.Delete(cluster, workload) + err = c.ObjectWatcher.Delete(clusterName, workload) if err != nil { - klog.Errorf("Failed to delete resource in the given member cluster %v, err is %v", cluster.Name, err) + klog.Errorf("Failed to delete resource in the given member cluster %v, err is %v", clusterName, err) return err } } @@ -155,7 +147,7 @@ func (c *Controller) removeFinalizer(work *workv1alpha1.Work) (controllerruntime } // syncToClusters ensures that the state of the given object is synchronized to member clusters. -func (c *Controller) syncToClusters(cluster *clusterv1alpha1.Cluster, work *workv1alpha1.Work) error { +func (c *Controller) syncToClusters(clusterName string, work *workv1alpha1.Work) error { var errs []error syncSucceedNum := 0 for _, manifest := range work.Spec.Workload.Manifests { @@ -169,16 +161,16 @@ func (c *Controller) syncToClusters(cluster *clusterv1alpha1.Cluster, work *work applied := helper.IsResourceApplied(&work.Status) if applied { - err = c.tryUpdateWorkload(cluster, workload) + err = c.tryUpdateWorkload(clusterName, workload) if err != nil { - klog.Errorf("Failed to update resource(%v/%v) in the given member cluster %s, err is %v", workload.GetNamespace(), workload.GetName(), cluster.Name, err) + klog.Errorf("Failed to update resource(%v/%v) in the given member cluster %s, err is %v", workload.GetNamespace(), workload.GetName(), clusterName, err) errs = append(errs, err) continue } } else { - err = c.tryCreateWorkload(cluster, workload) + err = c.tryCreateWorkload(clusterName, workload) if err != nil { - klog.Errorf("Failed to create resource(%v/%v) in the given member cluster %s, err is %v", workload.GetNamespace(), workload.GetName(), cluster.Name, err) + klog.Errorf("Failed to create resource(%v/%v) in the given member cluster %s, err is %v", workload.GetNamespace(), workload.GetName(), clusterName, err) errs = append(errs, err) continue } @@ -206,8 +198,8 @@ func (c *Controller) syncToClusters(cluster *clusterv1alpha1.Cluster, work *work return nil } -func (c *Controller) tryUpdateWorkload(cluster *clusterv1alpha1.Cluster, workload *unstructured.Unstructured) error { - fedKey, err := keys.FederatedKeyFunc(cluster.Name, workload) +func (c *Controller) tryUpdateWorkload(clusterName string, workload *unstructured.Unstructured) error { + fedKey, err := keys.FederatedKeyFunc(clusterName, workload) if err != nil { klog.Errorf("Failed to get FederatedKey %s, error: %v", workload.GetName(), err) return err @@ -219,24 +211,24 @@ func (c *Controller) tryUpdateWorkload(cluster *clusterv1alpha1.Cluster, workloa klog.Errorf("Failed to get resource %v from member cluster, err is %v ", workload.GetName(), err) return err } - err = c.tryCreateWorkload(cluster, workload) + err = c.tryCreateWorkload(clusterName, workload) if err != nil { - klog.Errorf("Failed to create resource(%v/%v) in the given member cluster %s, err is %v", workload.GetNamespace(), workload.GetName(), cluster.Name, err) + klog.Errorf("Failed to create resource(%v/%v) in the given member cluster %s, err is %v", workload.GetNamespace(), workload.GetName(), clusterName, err) return err } return nil } - err = c.ObjectWatcher.Update(cluster, workload, clusterObj) + err = c.ObjectWatcher.Update(clusterName, workload, clusterObj) if err != nil { - klog.Errorf("Failed to update resource in the given member cluster %s, err is %v", cluster.Name, err) + klog.Errorf("Failed to update resource in the given member cluster %s, err is %v", clusterName, err) return err } return nil } -func (c *Controller) tryCreateWorkload(cluster *clusterv1alpha1.Cluster, workload *unstructured.Unstructured) error { - return c.ObjectWatcher.Create(cluster, workload) +func (c *Controller) tryCreateWorkload(clusterName string, workload *unstructured.Unstructured) error { + return c.ObjectWatcher.Create(clusterName, workload) } // updateAppliedCondition update the Applied condition for the given Work diff --git a/pkg/controllers/mcs/service_export_controller.go b/pkg/controllers/mcs/service_export_controller.go index da91882c1..9f49daa63 100644 --- a/pkg/controllers/mcs/service_export_controller.go +++ b/pkg/controllers/mcs/service_export_controller.go @@ -45,7 +45,7 @@ type ServiceExportController struct { InformerManager informermanager.MultiClusterInformerManager WorkerNumber int // WorkerNumber is the number of worker goroutines PredicateFunc predicate.Predicate // PredicateFunc is the function that filters events before enqueuing the keys. - ClusterDynamicClientSetFunc func(c *clusterv1alpha1.Cluster, client client.Client) (*util.DynamicClusterClient, error) + ClusterDynamicClientSetFunc func(clusterName string, client client.Client) (*util.DynamicClusterClient, error) // eventHandlers holds the handlers which used to handle events reported from member clusters. // Each handler takes the cluster name as key and takes the handler function as the value, e.g. // "member1": instance of ResourceEventHandler @@ -150,7 +150,7 @@ func (c *ServiceExportController) buildResourceInformers(cluster *clusterv1alpha func (c *ServiceExportController) registerInformersAndStart(cluster *clusterv1alpha1.Cluster) error { singleClusterInformerManager := c.InformerManager.GetSingleClusterManager(cluster.Name) if singleClusterInformerManager == nil { - dynamicClusterClient, err := c.ClusterDynamicClientSetFunc(cluster, c.Client) + dynamicClusterClient, err := c.ClusterDynamicClientSetFunc(cluster.Name, c.Client) if err != nil { klog.Errorf("Failed to build dynamic cluster client for cluster %s.", cluster.Name) return err diff --git a/pkg/controllers/status/cluster_status_controller.go b/pkg/controllers/status/cluster_status_controller.go index 47caf7f94..3e2d23b3a 100644 --- a/pkg/controllers/status/cluster_status_controller.go +++ b/pkg/controllers/status/cluster_status_controller.go @@ -61,8 +61,8 @@ type ClusterStatusController struct { PredicateFunc predicate.Predicate InformerManager informermanager.MultiClusterInformerManager StopChan <-chan struct{} - ClusterClientSetFunc func(*clusterv1alpha1.Cluster, client.Client, *util.ClientOption) (*util.ClusterClient, error) - ClusterDynamicClientSetFunc func(c *clusterv1alpha1.Cluster, client client.Client) (*util.DynamicClusterClient, error) + ClusterClientSetFunc func(string, client.Client, *util.ClientOption) (*util.ClusterClient, error) + ClusterDynamicClientSetFunc func(clusterName string, client client.Client) (*util.DynamicClusterClient, error) // ClusterClientOption holds the attributes that should be injected to a Kubernetes client. ClusterClientOption *util.ClientOption @@ -112,7 +112,7 @@ func (c *ClusterStatusController) SetupWithManager(mgr controllerruntime.Manager func (c *ClusterStatusController) syncClusterStatus(cluster *clusterv1alpha1.Cluster) (controllerruntime.Result, error) { // create a ClusterClient for the given member cluster - clusterClient, err := c.ClusterClientSetFunc(cluster, c.Client, c.ClusterClientOption) + clusterClient, err := c.ClusterClientSetFunc(cluster.Name, c.Client, c.ClusterClientOption) if err != nil { klog.Errorf("Failed to create a ClusterClient for the given member cluster: %v, err is : %v", cluster.Name, err) return controllerruntime.Result{Requeue: true}, err @@ -204,7 +204,7 @@ func (c *ClusterStatusController) updateStatusIfNeeded(cluster *clusterv1alpha1. func (c *ClusterStatusController) buildInformerForCluster(cluster *clusterv1alpha1.Cluster) (informermanager.SingleClusterInformerManager, error) { singleClusterInformerManager := c.InformerManager.GetSingleClusterManager(cluster.Name) if singleClusterInformerManager == nil { - clusterClient, err := c.ClusterDynamicClientSetFunc(cluster, c.Client) + clusterClient, err := c.ClusterDynamicClientSetFunc(cluster.Name, c.Client) if err != nil { klog.Errorf("Failed to build dynamic cluster client for cluster %s.", cluster.Name) return nil, err diff --git a/pkg/controllers/status/workstatus_controller.go b/pkg/controllers/status/workstatus_controller.go index 8d378c195..7062232d8 100644 --- a/pkg/controllers/status/workstatus_controller.go +++ b/pkg/controllers/status/workstatus_controller.go @@ -44,7 +44,7 @@ type WorkStatusController struct { worker util.AsyncWorker // worker process resources periodic from rateLimitingQueue. ObjectWatcher objectwatcher.ObjectWatcher PredicateFunc predicate.Predicate - ClusterClientSetFunc func(c *clusterv1alpha1.Cluster, client client.Client) (*util.DynamicClusterClient, error) + ClusterClientSetFunc func(clusterName string, client client.Client) (*util.DynamicClusterClient, error) } // Reconcile performs a full reconciliation for the object referred to by the Request. @@ -188,20 +188,14 @@ func (c *WorkStatusController) syncWorkStatus(key util.QueueKey) error { return err } - cluster, err := util.GetCluster(c.Client, clusterName) - if err != nil { - klog.Errorf("Failed to the get given member cluster %s", clusterName) - return err - } - // compare version to determine if need to update resource - needUpdate, err := c.ObjectWatcher.NeedsUpdate(cluster, desireObj, obj) + needUpdate, err := c.ObjectWatcher.NeedsUpdate(clusterName, desireObj, obj) if err != nil { return err } if needUpdate { - if err := c.ObjectWatcher.Update(cluster, desireObj, obj); err != nil { + if err := c.ObjectWatcher.Update(clusterName, desireObj, obj); err != nil { klog.Errorf("Update %s failed: %v", fedKey.String(), err) return err } @@ -258,12 +252,7 @@ func (c *WorkStatusController) recreateResourceIfNeeded(work *workv1alpha1.Work, manifest.GetNamespace() == workloadKey.Namespace && manifest.GetName() == workloadKey.Name { klog.Infof("recreating %s", workloadKey.String()) - cluster, err := util.GetCluster(c.Client, workloadKey.Cluster) - if err != nil { - klog.Errorf("Failed to the get given member cluster %s", workloadKey.Cluster) - return err - } - return c.ObjectWatcher.Create(cluster, manifest) + return c.ObjectWatcher.Create(workloadKey.Cluster, manifest) } } return nil @@ -432,7 +421,7 @@ func (c *WorkStatusController) getSingleClusterManager(cluster *clusterv1alpha1. // the cache in informer manager should be updated. singleClusterInformerManager := c.InformerManager.GetSingleClusterManager(cluster.Name) if singleClusterInformerManager == nil { - dynamicClusterClient, err := c.ClusterClientSetFunc(cluster, c.Client) + dynamicClusterClient, err := c.ClusterClientSetFunc(cluster.Name, c.Client) if err != nil { klog.Errorf("Failed to build dynamic cluster client for cluster %s.", cluster.Name) return nil, err diff --git a/pkg/util/membercluster_client.go b/pkg/util/membercluster_client.go index 083314736..e9d1a86de 100644 --- a/pkg/util/membercluster_client.go +++ b/pkg/util/membercluster_client.go @@ -46,13 +46,13 @@ type ClientOption struct { } // NewClusterClientSet returns a ClusterClient for the given member cluster. -func NewClusterClientSet(c *clusterv1alpha1.Cluster, client client.Client, clientOption *ClientOption) (*ClusterClient, error) { - clusterConfig, err := buildClusterConfig(c, client) +func NewClusterClientSet(clusterName string, client client.Client, clientOption *ClientOption) (*ClusterClient, error) { + clusterConfig, err := buildClusterConfig(clusterName, client) if err != nil { return nil, err } - var clusterClientSet = ClusterClient{ClusterName: c.Name} + var clusterClientSet = ClusterClient{ClusterName: clusterName} if clusterConfig != nil { if clientOption != nil { @@ -65,13 +65,13 @@ func NewClusterClientSet(c *clusterv1alpha1.Cluster, client client.Client, clien } // NewClusterClientSetForAgent returns a ClusterClient for the given member cluster which will be used in karmada agent. -func NewClusterClientSetForAgent(c *clusterv1alpha1.Cluster, client client.Client, clientOption *ClientOption) (*ClusterClient, error) { +func NewClusterClientSetForAgent(clusterName string, client client.Client, clientOption *ClientOption) (*ClusterClient, error) { clusterConfig, err := controllerruntime.GetConfig() if err != nil { return nil, fmt.Errorf("error building kubeconfig of member cluster: %s", err.Error()) } - var clusterClientSet = ClusterClient{ClusterName: c.Name} + var clusterClientSet = ClusterClient{ClusterName: clusterName} if clusterConfig != nil { if clientOption != nil { @@ -84,12 +84,12 @@ func NewClusterClientSetForAgent(c *clusterv1alpha1.Cluster, client client.Clien } // NewClusterDynamicClientSet returns a dynamic client for the given member cluster. -func NewClusterDynamicClientSet(c *clusterv1alpha1.Cluster, client client.Client) (*DynamicClusterClient, error) { - clusterConfig, err := buildClusterConfig(c, client) +func NewClusterDynamicClientSet(clusterName string, client client.Client) (*DynamicClusterClient, error) { + clusterConfig, err := buildClusterConfig(clusterName, client) if err != nil { return nil, err } - var clusterClientSet = DynamicClusterClient{ClusterName: c.Name} + var clusterClientSet = DynamicClusterClient{ClusterName: clusterName} if clusterConfig != nil { clusterClientSet.DynamicClientSet = dynamic.NewForConfigOrDie(clusterConfig) @@ -98,12 +98,12 @@ func NewClusterDynamicClientSet(c *clusterv1alpha1.Cluster, client client.Client } // NewClusterDynamicClientSetForAgent returns a dynamic client for the given member cluster which will be used in karmada agent. -func NewClusterDynamicClientSetForAgent(c *clusterv1alpha1.Cluster, client client.Client) (*DynamicClusterClient, error) { +func NewClusterDynamicClientSetForAgent(clusterName string, client client.Client) (*DynamicClusterClient, error) { clusterConfig, err := controllerruntime.GetConfig() if err != nil { return nil, fmt.Errorf("error building kubeconfig of member cluster: %s", err.Error()) } - var clusterClientSet = DynamicClusterClient{ClusterName: c.Name} + var clusterClientSet = DynamicClusterClient{ClusterName: clusterName} if clusterConfig != nil { clusterClientSet.DynamicClientSet = dynamic.NewForConfigOrDie(clusterConfig) @@ -111,8 +111,11 @@ func NewClusterDynamicClientSetForAgent(c *clusterv1alpha1.Cluster, client clien return &clusterClientSet, nil } -func buildClusterConfig(cluster *clusterv1alpha1.Cluster, client client.Client) (*rest.Config, error) { - clusterName := cluster.Name +func buildClusterConfig(clusterName string, client client.Client) (*rest.Config, error) { + cluster, err := GetCluster(client, clusterName) + if err != nil { + return nil, err + } apiEndpoint := cluster.Spec.APIEndpoint if apiEndpoint == "" { return nil, fmt.Errorf("the api endpoint of cluster %s is empty", clusterName) diff --git a/pkg/util/objectwatcher/objectwatcher.go b/pkg/util/objectwatcher/objectwatcher.go index 228a439eb..21f89a586 100644 --- a/pkg/util/objectwatcher/objectwatcher.go +++ b/pkg/util/objectwatcher/objectwatcher.go @@ -14,7 +14,6 @@ import ( "k8s.io/klog/v2" "sigs.k8s.io/controller-runtime/pkg/client" - clusterv1alpha1 "github.com/karmada-io/karmada/pkg/apis/cluster/v1alpha1" workv1alpha1 "github.com/karmada-io/karmada/pkg/apis/work/v1alpha1" "github.com/karmada-io/karmada/pkg/util" "github.com/karmada-io/karmada/pkg/util/restmapper" @@ -27,14 +26,14 @@ const ( // ObjectWatcher manages operations for object dispatched to member clusters. type ObjectWatcher interface { - Create(cluster *clusterv1alpha1.Cluster, desireObj *unstructured.Unstructured) error - Update(cluster *clusterv1alpha1.Cluster, desireObj, clusterObj *unstructured.Unstructured) error - Delete(cluster *clusterv1alpha1.Cluster, desireObj *unstructured.Unstructured) error - NeedsUpdate(cluster *clusterv1alpha1.Cluster, desiredObj, clusterObj *unstructured.Unstructured) (bool, error) + Create(clusterName string, desireObj *unstructured.Unstructured) error + Update(clusterName string, desireObj, clusterObj *unstructured.Unstructured) error + Delete(clusterName string, desireObj *unstructured.Unstructured) error + NeedsUpdate(clusterName string, desiredObj, clusterObj *unstructured.Unstructured) (bool, error) } // ClientSetFunc is used to generate client set of member cluster -type ClientSetFunc func(c *clusterv1alpha1.Cluster, client client.Client) (*util.DynamicClusterClient, error) +type ClientSetFunc func(c string, client client.Client) (*util.DynamicClusterClient, error) type objectWatcherImpl struct { Lock sync.RWMutex @@ -54,16 +53,16 @@ func NewObjectWatcher(kubeClientSet client.Client, restMapper meta.RESTMapper, c } } -func (o *objectWatcherImpl) Create(cluster *clusterv1alpha1.Cluster, desireObj *unstructured.Unstructured) error { - dynamicClusterClient, err := o.ClusterClientSetFunc(cluster, o.KubeClientSet) +func (o *objectWatcherImpl) Create(clusterName string, desireObj *unstructured.Unstructured) error { + dynamicClusterClient, err := o.ClusterClientSetFunc(clusterName, o.KubeClientSet) if err != nil { - klog.Errorf("Failed to build dynamic cluster client for cluster %s.", cluster.Name) + klog.Errorf("Failed to build dynamic cluster client for cluster %s.", clusterName) return err } gvr, err := restmapper.GetGroupVersionResource(o.RESTMapper, desireObj.GroupVersionKind()) if err != nil { - klog.Errorf("Failed to create resource(%s/%s) in cluster %s as mapping GVK to GVR failed: %v", desireObj.GetNamespace(), desireObj.GetName(), cluster.Name, err) + klog.Errorf("Failed to create resource(%s/%s) in cluster %s as mapping GVK to GVR failed: %v", desireObj.GetNamespace(), desireObj.GetName(), clusterName, err) return err } @@ -78,68 +77,68 @@ func (o *objectWatcherImpl) Create(cluster *clusterv1alpha1.Cluster, desireObj * if apierrors.IsAlreadyExists(err) { existObj, err := dynamicClusterClient.DynamicClientSet.Resource(gvr).Namespace(desireObj.GetNamespace()).Get(context.TODO(), desireObj.GetName(), metav1.GetOptions{}) if err != nil { - return fmt.Errorf("failed to get exist resource(kind=%s, %s/%s) in cluster %v: %v", desireObj.GetKind(), desireObj.GetNamespace(), desireObj.GetName(), cluster.Name, err) + return fmt.Errorf("failed to get exist resource(kind=%s, %s/%s) in cluster %v: %v", desireObj.GetKind(), desireObj.GetNamespace(), desireObj.GetName(), clusterName, err) } // Avoid updating resources that not managed by karmada. if util.GetLabelValue(desireObj.GetLabels(), workv1alpha1.WorkNameLabel) != util.GetLabelValue(existObj.GetLabels(), workv1alpha1.WorkNameLabel) { - return fmt.Errorf("resource(kind=%s, %s/%s) already exist in cluster %v but not managed by karamda", desireObj.GetKind(), desireObj.GetNamespace(), desireObj.GetName(), cluster.Name) + return fmt.Errorf("resource(kind=%s, %s/%s) already exist in cluster %v but not managed by karamda", desireObj.GetKind(), desireObj.GetNamespace(), desireObj.GetName(), clusterName) } - return o.Update(cluster, desireObj, existObj) + return o.Update(clusterName, desireObj, existObj) } - klog.Errorf("Failed to create resource(kind=%s, %s/%s) in cluster %s, err is %v ", desireObj.GetKind(), desireObj.GetNamespace(), desireObj.GetName(), cluster.Name, err) + klog.Errorf("Failed to create resource(kind=%s, %s/%s) in cluster %s, err is %v ", desireObj.GetKind(), desireObj.GetNamespace(), desireObj.GetName(), clusterName, err) return err } - klog.Infof("Created resource(kind=%s, %s/%s) on cluster: %s", desireObj.GetKind(), desireObj.GetNamespace(), desireObj.GetName(), cluster.Name) + klog.Infof("Created resource(kind=%s, %s/%s) on cluster: %s", desireObj.GetKind(), desireObj.GetNamespace(), desireObj.GetName(), clusterName) // record version o.recordVersion(clusterObj, dynamicClusterClient.ClusterName) return nil } -func (o *objectWatcherImpl) Update(cluster *clusterv1alpha1.Cluster, desireObj, clusterObj *unstructured.Unstructured) error { - dynamicClusterClient, err := o.ClusterClientSetFunc(cluster, o.KubeClientSet) +func (o *objectWatcherImpl) Update(clusterName string, desireObj, clusterObj *unstructured.Unstructured) error { + dynamicClusterClient, err := o.ClusterClientSetFunc(clusterName, o.KubeClientSet) if err != nil { - klog.Errorf("Failed to build dynamic cluster client for cluster %s.", cluster.Name) + klog.Errorf("Failed to build dynamic cluster client for cluster %s.", clusterName) return err } gvr, err := restmapper.GetGroupVersionResource(o.RESTMapper, desireObj.GroupVersionKind()) if err != nil { - klog.Errorf("Failed to update resource(%s/%s) in cluster %s as mapping GVK to GVR failed: %v", desireObj.GetNamespace(), desireObj.GetName(), cluster.Name, err) + klog.Errorf("Failed to update resource(%s/%s) in cluster %s as mapping GVK to GVR failed: %v", desireObj.GetNamespace(), desireObj.GetName(), clusterName, err) return err } err = RetainClusterFields(desireObj, clusterObj) if err != nil { - klog.Errorf("Failed to retain fields for resource(kind=%s, %s/%s) in cluster %s: %v", desireObj.GetKind(), desireObj.GetNamespace(), desireObj.GetName(), cluster.Name, err) + klog.Errorf("Failed to retain fields for resource(kind=%s, %s/%s) in cluster %s: %v", desireObj.GetKind(), desireObj.GetNamespace(), desireObj.GetName(), clusterName, err) return err } resource, err := dynamicClusterClient.DynamicClientSet.Resource(gvr).Namespace(desireObj.GetNamespace()).Update(context.TODO(), desireObj, metav1.UpdateOptions{}) if err != nil { - klog.Errorf("Failed to update resource(kind=%s, %s/%s) in cluster %s, err is %v ", desireObj.GetKind(), desireObj.GetNamespace(), desireObj.GetName(), cluster.Name, err) + klog.Errorf("Failed to update resource(kind=%s, %s/%s) in cluster %s, err is %v ", desireObj.GetKind(), desireObj.GetNamespace(), desireObj.GetName(), clusterName, err) return err } - klog.Infof("Updated resource(kind=%s, %s/%s) on cluster: %s", desireObj.GetKind(), desireObj.GetNamespace(), desireObj.GetName(), cluster.Name) + klog.Infof("Updated resource(kind=%s, %s/%s) on cluster: %s", desireObj.GetKind(), desireObj.GetNamespace(), desireObj.GetName(), clusterName) // record version - o.recordVersion(resource, cluster.Name) + o.recordVersion(resource, clusterName) return nil } -func (o *objectWatcherImpl) Delete(cluster *clusterv1alpha1.Cluster, desireObj *unstructured.Unstructured) error { - dynamicClusterClient, err := o.ClusterClientSetFunc(cluster, o.KubeClientSet) +func (o *objectWatcherImpl) Delete(clusterName string, desireObj *unstructured.Unstructured) error { + dynamicClusterClient, err := o.ClusterClientSetFunc(clusterName, o.KubeClientSet) if err != nil { - klog.Errorf("Failed to build dynamic cluster client for cluster %s.", cluster.Name) + klog.Errorf("Failed to build dynamic cluster client for cluster %s.", clusterName) return err } gvr, err := restmapper.GetGroupVersionResource(o.RESTMapper, desireObj.GroupVersionKind()) if err != nil { - klog.Errorf("Failed to delete resource(%s/%s) in cluster %s as mapping GVK to GVR failed: %v", desireObj.GetNamespace(), desireObj.GetName(), cluster.Name, err) + klog.Errorf("Failed to delete resource(%s/%s) in cluster %s as mapping GVK to GVR failed: %v", desireObj.GetNamespace(), desireObj.GetName(), clusterName, err) return err } @@ -148,10 +147,10 @@ func (o *objectWatcherImpl) Delete(cluster *clusterv1alpha1.Cluster, desireObj * err = nil } if err != nil { - klog.Errorf("Failed to delete resource %v in cluster %s, err is %v ", desireObj.GetName(), cluster.Name, err) + klog.Errorf("Failed to delete resource %v in cluster %s, err is %v ", desireObj.GetName(), clusterName, err) return err } - klog.Infof("Deleted resource(kind=%s, %s/%s) on cluster: %s", desireObj.GetKind(), desireObj.GetNamespace(), desireObj.GetName(), cluster.Name) + klog.Infof("Deleted resource(kind=%s, %s/%s) on cluster: %s", desireObj.GetKind(), desireObj.GetNamespace(), desireObj.GetName(), clusterName) objectKey := o.genObjectKey(desireObj) o.deleteVersionRecord(dynamicClusterClient.ClusterName, objectKey) @@ -214,12 +213,12 @@ func (o *objectWatcherImpl) deleteVersionRecord(clusterName, resourceName string delete(o.VersionRecord[clusterName], resourceName) } -func (o *objectWatcherImpl) NeedsUpdate(cluster *clusterv1alpha1.Cluster, desiredObj, clusterObj *unstructured.Unstructured) (bool, error) { +func (o *objectWatcherImpl) NeedsUpdate(clusterName string, desiredObj, clusterObj *unstructured.Unstructured) (bool, error) { // get resource version - version, exist := o.getVersionRecord(cluster.Name, desiredObj.GroupVersionKind().String()+"/"+desiredObj.GetNamespace()+"/"+desiredObj.GetName()) + version, exist := o.getVersionRecord(clusterName, desiredObj.GroupVersionKind().String()+"/"+desiredObj.GetNamespace()+"/"+desiredObj.GetName()) if !exist { - klog.Errorf("Failed to update resource %v/%v in cluster %s for the version record does not exist", desiredObj.GetNamespace(), desiredObj.GetName(), cluster.Name) - return false, fmt.Errorf("failed to update resource %v/%v in cluster %s for the version record does not exist", desiredObj.GetNamespace(), desiredObj.GetName(), cluster.Name) + klog.Errorf("Failed to update resource %v/%v in cluster %s for the version record does not exist", desiredObj.GetNamespace(), desiredObj.GetName(), clusterName) + return false, fmt.Errorf("failed to update resource %v/%v in cluster %s for the version record does not exist", desiredObj.GetNamespace(), desiredObj.GetName(), clusterName) } return objectNeedsUpdate(desiredObj, clusterObj, version), nil diff --git a/test/e2e/framework/cluster.go b/test/e2e/framework/cluster.go index b1ad06e57..106a32dff 100644 --- a/test/e2e/framework/cluster.go +++ b/test/e2e/framework/cluster.go @@ -165,11 +165,11 @@ func isClusterMeetRequirements(clusters []*clusterv1alpha1.Cluster) (bool, error func newClusterClientSet(controlPlaneClient client.Client, c *clusterv1alpha1.Cluster) (*util.ClusterClient, *util.DynamicClusterClient, error) { if c.Spec.SyncMode == clusterv1alpha1.Push { - clusterClient, err := util.NewClusterClientSet(c, controlPlaneClient, nil) + clusterClient, err := util.NewClusterClientSet(c.Name, controlPlaneClient, nil) if err != nil { return nil, nil, err } - clusterDynamicClient, err := util.NewClusterDynamicClientSet(c, controlPlaneClient) + clusterDynamicClient, err := util.NewClusterDynamicClientSet(c.Name, controlPlaneClient) if err != nil { return nil, nil, err } diff --git a/test/e2e/namespace_test.go b/test/e2e/namespace_test.go index 7108b42ba..a5a082965 100644 --- a/test/e2e/namespace_test.go +++ b/test/e2e/namespace_test.go @@ -194,7 +194,7 @@ var _ = ginkgo.Describe("[namespace auto-provision] namespace auto-provision tes ginkgo.By(fmt.Sprintf("waiting namespace(%s) present on cluster: %s", namespaceName, clusterName), func() { clusterJoined, err := karmadaClient.ClusterV1alpha1().Clusters().Get(context.TODO(), clusterName, metav1.GetOptions{}) gomega.Expect(err).ShouldNot(gomega.HaveOccurred()) - clusterClient, err := util.NewClusterClientSet(clusterJoined, controlPlaneClient, nil) + clusterClient, err := util.NewClusterClientSet(clusterJoined.Name, controlPlaneClient, nil) gomega.Expect(err).ShouldNot(gomega.HaveOccurred()) err = wait.PollImmediate(pollInterval, pollTimeout, func() (done bool, err error) { _, err = clusterClient.KubeClient.CoreV1().Namespaces().Get(context.TODO(), namespaceName, metav1.GetOptions{})