From c923705c6981c092212ad80755556345fa7a1b21 Mon Sep 17 00:00:00 2001 From: jwcesign Date: Mon, 20 Feb 2023 11:35:16 +0800 Subject: [PATCH] Refactor execution_controller.go Signed-off-by: jwcesign --- .../execution/execution_controller.go | 30 +++------- pkg/util/objectwatcher/objectwatcher.go | 57 +++++++++---------- 2 files changed, 33 insertions(+), 54 deletions(-) diff --git a/pkg/controllers/execution/execution_controller.go b/pkg/controllers/execution/execution_controller.go index 1debef921..25f8155d1 100644 --- a/pkg/controllers/execution/execution_controller.go +++ b/pkg/controllers/execution/execution_controller.go @@ -195,23 +195,11 @@ func (c *Controller) syncToClusters(clusterName string, work *workv1alpha1.Work) continue } - applied := helper.IsResourceApplied(&work.Status) - if applied { - err = c.tryUpdateWorkload(clusterName, workload) - if err != nil { - klog.Errorf("Failed to update resource(%v/%v) in the given member cluster %s, err is %v", workload.GetNamespace(), workload.GetName(), clusterName, err) - c.eventf(workload, corev1.EventTypeWarning, events.EventReasonSyncWorkloadFailed, "Failed to update resource(%s) in member cluster(%s): %v", klog.KObj(workload), clusterName, err) - errs = append(errs, err) - continue - } - } else { - err = c.tryCreateWorkload(clusterName, workload) - if err != nil { - klog.Errorf("Failed to create resource(%v/%v) in the given member cluster %s, err is %v", workload.GetNamespace(), workload.GetName(), clusterName, err) - c.eventf(workload, corev1.EventTypeWarning, events.EventReasonSyncWorkloadFailed, "Failed to create resource(%s) in member cluster(%s): %v", klog.KObj(workload), clusterName, err) - errs = append(errs, err) - continue - } + if err = c.tryCreateOrUpdateWorkload(clusterName, workload); err != nil { + klog.Errorf("Failed to create or update resource(%v/%v) in the given member cluster %s, err is %v", workload.GetNamespace(), workload.GetName(), clusterName, err) + c.eventf(workload, corev1.EventTypeWarning, events.EventReasonSyncWorkloadFailed, "Failed to create or update resource(%s) in member cluster(%s): %v", klog.KObj(workload), clusterName, err) + errs = append(errs, err) + continue } c.eventf(workload, corev1.EventTypeNormal, events.EventReasonSyncWorkloadSucceed, "Successfully applied resource(%v/%v) to cluster %s", workload.GetNamespace(), workload.GetName(), clusterName) syncSucceedNum++ @@ -237,7 +225,7 @@ func (c *Controller) syncToClusters(clusterName string, work *workv1alpha1.Work) return nil } -func (c *Controller) tryUpdateWorkload(clusterName string, workload *unstructured.Unstructured) error { +func (c *Controller) tryCreateOrUpdateWorkload(clusterName string, workload *unstructured.Unstructured) error { fedKey, err := keys.FederatedKeyFunc(clusterName, workload) if err != nil { klog.Errorf("Failed to get FederatedKey %s, error: %v", workload.GetName(), err) @@ -250,7 +238,7 @@ func (c *Controller) tryUpdateWorkload(clusterName string, workload *unstructure klog.Errorf("Failed to get resource %v from member cluster, err is %v ", workload.GetName(), err) return err } - err = c.tryCreateWorkload(clusterName, workload) + err = c.ObjectWatcher.Create(clusterName, workload) if err != nil { klog.Errorf("Failed to create resource(%v/%v) in the given member cluster %s, err is %v", workload.GetNamespace(), workload.GetName(), clusterName, err) return err @@ -266,10 +254,6 @@ func (c *Controller) tryUpdateWorkload(clusterName string, workload *unstructure return nil } -func (c *Controller) tryCreateWorkload(clusterName string, workload *unstructured.Unstructured) error { - return c.ObjectWatcher.Create(clusterName, workload) -} - // updateAppliedCondition update the Applied condition for the given Work func (c *Controller) updateAppliedCondition(work *workv1alpha1.Work, status metav1.ConditionStatus, reason, message string) error { newWorkAppliedCondition := metav1.Condition{ diff --git a/pkg/util/objectwatcher/objectwatcher.go b/pkg/util/objectwatcher/objectwatcher.go index 3fa7020b5..ac3a8ee31 100644 --- a/pkg/util/objectwatcher/objectwatcher.go +++ b/pkg/util/objectwatcher/objectwatcher.go @@ -75,7 +75,7 @@ func (o *objectWatcherImpl) Create(clusterName string, desireObj *unstructured.U klog.Errorf("Failed to get FederatedKey %s, error: %v", desireObj.GetName(), err) return err } - existObj, err := helper.GetObjectFromCache(o.RESTMapper, o.InformerManager, fedKey) + _, err = helper.GetObjectFromCache(o.RESTMapper, o.InformerManager, fedKey) if err != nil { if !apierrors.IsNotFound(err) { klog.Errorf("Failed to get resource %v from member cluster, err is %v ", desireObj.GetName(), err) @@ -91,37 +91,11 @@ func (o *objectWatcherImpl) Create(clusterName string, desireObj *unstructured.U klog.Infof("Created resource(kind=%s, %s/%s) on cluster: %s", desireObj.GetKind(), desireObj.GetNamespace(), desireObj.GetName(), clusterName) // record version o.recordVersion(clusterObj, dynamicClusterClient.ClusterName) - } else { - // If the existing resource is managed by Karmada, then just update it. - if util.GetLabelValue(desireObj.GetLabels(), workv1alpha1.WorkNameLabel) == util.GetLabelValue(existObj.GetLabels(), workv1alpha1.WorkNameLabel) { - return o.Update(clusterName, desireObj, existObj) - } - // update the resource if a conflict resolution instruction is existed in annotation. - err = o.resourceConflictOverwrite(clusterName, desireObj, existObj) - if err != nil { - return fmt.Errorf("failed to update exist resource(kind=%s, %s/%s) in cluster %v: %v", desireObj.GetKind(), desireObj.GetNamespace(), desireObj.GetName(), clusterName, err) - } } return nil } -// resourceConflictOverwrite update the resource if a conflict resolution instruction is existed in annotation. -// The existing resource is not managed by Karmada, then we should consult conflict resolution instruction in annotation. -func (o *objectWatcherImpl) resourceConflictOverwrite(clusterName string, desireObj *unstructured.Unstructured, existObj *unstructured.Unstructured) error { - switch util.GetAnnotationValue(desireObj.GetAnnotations(), workv1alpha2.ResourceConflictResolutionAnnotation) { - case workv1alpha2.ResourceConflictResolutionOverwrite: - klog.Infof("Overwriting the resource(kind=%s, %s/%s) as %s=%s", desireObj.GetKind(), desireObj.GetNamespace(), desireObj.GetName(), - workv1alpha2.ResourceConflictResolutionAnnotation, workv1alpha2.ResourceConflictResolutionOverwrite) - return o.Update(clusterName, desireObj, existObj) - default: - // The existing resource is not managed by Karmada, and no conflict resolution found, avoid updating the existing resource by default. - return fmt.Errorf("resource(kind=%s, %s/%s) already exist in cluster %v and the %s strategy value is empty, karmada will not manage this resource", - desireObj.GetKind(), desireObj.GetNamespace(), desireObj.GetName(), clusterName, workv1alpha2.ResourceConflictResolutionAnnotation, - ) - } -} - func (o *objectWatcherImpl) retainClusterFields(desired, observed *unstructured.Unstructured) (*unstructured.Unstructured, error) { // Pass the same ResourceVersion as in the cluster object for update operation, otherwise operation will fail. desired.SetResourceVersion(observed.GetResourceVersion()) @@ -150,6 +124,11 @@ func (o *objectWatcherImpl) retainClusterFields(desired, observed *unstructured. } func (o *objectWatcherImpl) Update(clusterName string, desireObj, clusterObj *unstructured.Unstructured) error { + updateAllowed := o.allowUpdate(clusterName, desireObj, clusterObj) + if !updateAllowed { + return nil + } + dynamicClusterClient, err := o.ClusterClientSetFunc(clusterName, o.KubeClientSet) if err != nil { klog.Errorf("Failed to build dynamic cluster client for cluster %s.", clusterName) @@ -205,10 +184,7 @@ func (o *objectWatcherImpl) Delete(clusterName string, desireObj *unstructured.U } err = dynamicClusterClient.DynamicClientSet.Resource(gvr).Namespace(desireObj.GetNamespace()).Delete(context.TODO(), desireObj.GetName(), deleteOption) - if apierrors.IsNotFound(err) { - err = nil - } - if err != nil { + if err != nil && !apierrors.IsNotFound(err) { klog.Errorf("Failed to delete resource %v in cluster %s, err is %v ", desireObj.GetName(), clusterName, err) return err } @@ -285,3 +261,22 @@ func (o *objectWatcherImpl) NeedsUpdate(clusterName string, desiredObj, clusterO return lifted.ObjectNeedsUpdate(desiredObj, clusterObj, version), nil } + +func (o *objectWatcherImpl) allowUpdate(clusterName string, desiredObj, clusterObj *unstructured.Unstructured) bool { + // If the existing resource is managed by Karmada, then the updating is allowed. + if util.GetLabelValue(desiredObj.GetLabels(), workv1alpha1.WorkNameLabel) == util.GetLabelValue(clusterObj.GetLabels(), workv1alpha1.WorkNameLabel) { + return true + } + + // This happens when promoting workload to the Karmada control plane + conflictResolution := util.GetAnnotationValue(desiredObj.GetAnnotations(), workv1alpha2.ResourceConflictResolutionAnnotation) + if conflictResolution == workv1alpha2.ResourceConflictResolutionOverwrite { + return true + } + + // The existing resource is not managed by Karmada, and no conflict resolution found, avoid updating the existing resource by default. + klog.Warningf("resource(kind=%s, %s/%s) already exist in cluster %v and the %s strategy value is empty, karmada will not manage this resource", + desiredObj.GetKind(), desiredObj.GetNamespace(), desiredObj.GetName(), clusterName, workv1alpha2.ResourceConflictResolutionAnnotation, + ) + return false +}