From 8e588ac06fc004e31a22f25975e9d17f7a8ae5d3 Mon Sep 17 00:00:00 2001 From: lihanbo Date: Tue, 6 Jul 2021 09:57:21 +0800 Subject: [PATCH] resolve AlreadyExists conflict when create resource in member cluster Signed-off-by: lihanbo --- pkg/util/cluster.go | 14 +++++++ pkg/util/objectwatcher/objectwatcher.go | 49 ++++++++--------------- test/e2e/clusterpropagationpolicy_test.go | 30 +++++++------- test/e2e/propagationpolicy_test.go | 17 ++++++++ 4 files changed, 62 insertions(+), 48 deletions(-) diff --git a/pkg/util/cluster.go b/pkg/util/cluster.go index 0e5eb5b8f..eb673f163 100644 --- a/pkg/util/cluster.go +++ b/pkg/util/cluster.go @@ -28,3 +28,17 @@ func GetCluster(hostClient client.Client, clusterName string) (*v1alpha1.Cluster } return cluster, nil } + +// IsAPIInstallInCluster checks if the given api is installed in member cluster +func IsAPIInstallInCluster(clusterStatus v1alpha1.ClusterStatus, groupVersion, name, kind string) bool { + for _, apiEnablement := range clusterStatus.APIEnablements { + if apiEnablement.GroupVersion == groupVersion { + for _, apiResource := range apiEnablement.Resources { + if apiResource.Kind == kind && apiResource.Name == name { + return true + } + } + } + } + return false +} diff --git a/pkg/util/objectwatcher/objectwatcher.go b/pkg/util/objectwatcher/objectwatcher.go index 13f6facbb..d3b72bfab 100644 --- a/pkg/util/objectwatcher/objectwatcher.go +++ b/pkg/util/objectwatcher/objectwatcher.go @@ -11,7 +11,6 @@ import ( "k8s.io/apimachinery/pkg/api/meta" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" - "k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/klog/v2" "sigs.k8s.io/controller-runtime/pkg/client" @@ -71,13 +70,24 @@ func (o *objectWatcherImpl) Create(cluster *v1alpha1.Cluster, desireObj *unstruc // users should resolve the conflict in person. clusterObj, err := dynamicClusterClient.DynamicClientSet.Resource(gvr).Namespace(desireObj.GetNamespace()).Create(context.TODO(), desireObj, metav1.CreateOptions{}) if err != nil { + // The 'IsAlreadyExists' conflict may happen in following known scenarios: + // - 1. In a reconcile process, the execution controller successfully applied resource to member cluster but failed to update the work conditions(Applied=True), + // when reconcile again, the controller will try to apply(by create) the resource again. + // - 2. The resource already exist in the member cluster but it's not created by karmada. if apierrors.IsAlreadyExists(err) { - if err := o.resolveAlreadyExist(dynamicClusterClient, gvr, desireObj); err != nil { - return err + existObj, err := dynamicClusterClient.DynamicClientSet.Resource(gvr).Namespace(desireObj.GetNamespace()).Get(context.TODO(), desireObj.GetName(), metav1.GetOptions{}) + if err != nil { + return fmt.Errorf("failed to get exist resource(kind=%s, %s/%s) in cluster %v: %v", desireObj.GetKind(), desireObj.GetNamespace(), desireObj.GetName(), cluster.Name, err) } - return nil + + // Avoid updating resources that not managed by karmada. + if util.GetLabelValue(desireObj.GetLabels(), util.WorkNameLabel) != util.GetLabelValue(existObj.GetLabels(), util.WorkNameLabel) { + return fmt.Errorf("resource(kind=%s, %s/%s) already exist in cluster %v but not managed by karamda", desireObj.GetKind(), desireObj.GetNamespace(), desireObj.GetName(), cluster.Name) + } + + return o.Update(cluster, desireObj, existObj) } - klog.Errorf("Failed to create resource %v, err is %v ", desireObj.GetName(), err) + klog.Errorf("Failed to create resource(kind=%s, %s/%s), err is %v ", desireObj.GetKind(), desireObj.GetNamespace(), desireObj.GetName(), err) return err } klog.Infof("Created resource(kind=%s, %s/%s) on cluster: %s", desireObj.GetKind(), desireObj.GetNamespace(), desireObj.GetName(), cluster.Name) @@ -87,31 +97,6 @@ func (o *objectWatcherImpl) Create(cluster *v1alpha1.Cluster, desireObj *unstruc return nil } -// resolveAlreadyExist used to resolve the 'AlreadyExist' conflicts in a 'Create' process. -// The conflict may happen in following known scenarios: -// - 1. In a reconcile process, the execution controller successfully applied resource to member cluster but failed to update the work conditions(Applied=True), -// when reconcile again, the controller will try to apply(by create) the resource again. -// - 2. The resource already exist in the member cluster but it's not created by karmada. -func (o *objectWatcherImpl) resolveAlreadyExist(dynamicClusterClient *util.DynamicClusterClient, gvr schema.GroupVersionResource, desireObj *unstructured.Unstructured) error { - existObj, err := dynamicClusterClient.DynamicClientSet.Resource(gvr).Namespace(desireObj.GetNamespace()).Get(context.TODO(), desireObj.GetName(), metav1.GetOptions{}) - if err != nil { - return fmt.Errorf("failed to get exist resource %s(%s/%s): %v", desireObj.GetKind(), desireObj.GetNamespace(), desireObj.GetName(), err) - } - - // Avoid updating resources that not managed by karmada. - if util.GetLabelValue(desireObj.GetLabels(), util.WorkNameLabel) != util.GetLabelValue(existObj.GetLabels(), util.WorkNameLabel) { - return fmt.Errorf("resource %s(%s/%s) already exist but not managed by karamda", desireObj.GetKind(), desireObj.GetNamespace(), desireObj.GetName()) - } - - _, err = dynamicClusterClient.DynamicClientSet.Resource(gvr).Namespace(desireObj.GetNamespace()).Update(context.TODO(), desireObj, metav1.UpdateOptions{}) - if err != nil { - klog.Errorf("Failed to update exist resource %s(%s/%s), err is %v", desireObj.GetKind(), desireObj.GetNamespace(), desireObj.GetName(), err) - return fmt.Errorf("failed to update exist resource %s(%s/%s), err is %v", desireObj.GetKind(), desireObj.GetNamespace(), desireObj.GetName(), err) - } - - return nil -} - func (o *objectWatcherImpl) Update(cluster *v1alpha1.Cluster, desireObj, clusterObj *unstructured.Unstructured) error { dynamicClusterClient, err := o.ClusterClientSetFunc(cluster, o.KubeClientSet) if err != nil { @@ -127,13 +112,13 @@ func (o *objectWatcherImpl) Update(cluster *v1alpha1.Cluster, desireObj, cluster err = RetainClusterFields(desireObj, clusterObj) if err != nil { - klog.Errorf("Failed to retain fields: %v", err) + klog.Errorf("Failed to retain fields for resource(kind=%s, %s/%s) : %v", desireObj.GetKind(), desireObj.GetNamespace(), desireObj.GetName(), err) return err } resource, err := dynamicClusterClient.DynamicClientSet.Resource(gvr).Namespace(desireObj.GetNamespace()).Update(context.TODO(), desireObj, metav1.UpdateOptions{}) if err != nil { - klog.Errorf("Failed to update resource %v/%v, err is %v ", desireObj.GetNamespace(), desireObj.GetName(), err) + klog.Errorf("Failed to update resource(kind=%s, %s/%s), err is %v ", desireObj.GetKind(), desireObj.GetNamespace(), desireObj.GetName(), err) return err } diff --git a/test/e2e/clusterpropagationpolicy_test.go b/test/e2e/clusterpropagationpolicy_test.go index 094ce623a..99b8ae6b6 100644 --- a/test/e2e/clusterpropagationpolicy_test.go +++ b/test/e2e/clusterpropagationpolicy_test.go @@ -17,6 +17,7 @@ import ( "k8s.io/klog/v2" policyv1alpha1 "github.com/karmada-io/karmada/pkg/apis/policy/v1alpha1" + "github.com/karmada-io/karmada/pkg/util" "github.com/karmada-io/karmada/test/helper" ) @@ -73,23 +74,20 @@ var _ = ginkgo.Describe("[BasicClusterPropagation] basic cluster propagation tes }) ginkgo.By("check if crd present on member clusters", func() { - for _, cluster := range clusters { - clusterDynamicClient := getClusterDynamicClient(cluster.Name) - gomega.Expect(clusterDynamicClient).ShouldNot(gomega.BeNil()) - - klog.Infof("Waiting for crd(%s) present on cluster(%s)", crd.Name, cluster.Name) - err := wait.PollImmediate(pollInterval, pollTimeout, func() (done bool, err error) { - _, err = clusterDynamicClient.Resource(crdGVR).Namespace(crd.Namespace).Get(context.TODO(), crd.Name, metav1.GetOptions{}) - if err != nil { - if errors.IsNotFound(err) { - return false, nil - } - return false, err + crAPIVersion := fmt.Sprintf("%s/%s", crd.Spec.Group, "v1alpha1") + err := wait.PollImmediate(pollInterval, pollTimeout, func() (done bool, err error) { + clusters, err := fetchClusters(karmadaClient) + if err != nil { + return false, err + } + for _, cluster := range clusters { + if !util.IsAPIInstallInCluster(cluster.Status, crAPIVersion, crdSpecNames.Plural, crdSpecNames.Kind) { + return false, nil } - return true, nil - }) - gomega.Expect(err).ShouldNot(gomega.HaveOccurred()) - } + } + return true, nil + }) + gomega.Expect(err).ShouldNot(gomega.HaveOccurred()) }) ginkgo.By(fmt.Sprintf("removing crd(%s)", crd.Name), func() { diff --git a/test/e2e/propagationpolicy_test.go b/test/e2e/propagationpolicy_test.go index e21434fd6..6fe368fd1 100644 --- a/test/e2e/propagationpolicy_test.go +++ b/test/e2e/propagationpolicy_test.go @@ -20,6 +20,7 @@ import ( "k8s.io/utils/pointer" policyv1alpha1 "github.com/karmada-io/karmada/pkg/apis/policy/v1alpha1" + "github.com/karmada-io/karmada/pkg/util" "github.com/karmada-io/karmada/test/helper" ) @@ -454,6 +455,22 @@ var _ = ginkgo.Describe("[BasicPropagation] basic propagation testing", func() { _, err := dynamicClient.Resource(crdGVR).Namespace(crd.Namespace).Get(context.TODO(), crd.Name, metav1.GetOptions{}) gomega.Expect(err).ShouldNot(gomega.HaveOccurred()) }) + + ginkgo.By("check if crd present on member clusters", func() { + err := wait.PollImmediate(pollInterval, pollTimeout, func() (done bool, err error) { + clusters, err := fetchClusters(karmadaClient) + if err != nil { + return false, err + } + for _, cluster := range clusters { + if !util.IsAPIInstallInCluster(cluster.Status, crAPIVersion, crdSpecNames.Plural, crdSpecNames.Kind) { + return false, nil + } + } + return true, nil + }) + gomega.Expect(err).ShouldNot(gomega.HaveOccurred()) + }) }) ginkgo.BeforeEach(func() {