resolve AlreadyExists conflict when create resource in member cluster

Signed-off-by: lihanbo <lihanbo2@huawei.com>
This commit is contained in:
lihanbo 2021-07-06 09:57:21 +08:00
parent ea47422461
commit 8e588ac06f
4 changed files with 62 additions and 48 deletions

View File

@ -28,3 +28,17 @@ func GetCluster(hostClient client.Client, clusterName string) (*v1alpha1.Cluster
}
return cluster, nil
}
// IsAPIInstallInCluster checks if the given api is installed in member cluster
func IsAPIInstallInCluster(clusterStatus v1alpha1.ClusterStatus, groupVersion, name, kind string) bool {
for _, apiEnablement := range clusterStatus.APIEnablements {
if apiEnablement.GroupVersion == groupVersion {
for _, apiResource := range apiEnablement.Resources {
if apiResource.Kind == kind && apiResource.Name == name {
return true
}
}
}
}
return false
}

View File

@ -11,7 +11,6 @@ import (
"k8s.io/apimachinery/pkg/api/meta"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/klog/v2"
"sigs.k8s.io/controller-runtime/pkg/client"
@ -71,13 +70,24 @@ func (o *objectWatcherImpl) Create(cluster *v1alpha1.Cluster, desireObj *unstruc
// users should resolve the conflict in person.
clusterObj, err := dynamicClusterClient.DynamicClientSet.Resource(gvr).Namespace(desireObj.GetNamespace()).Create(context.TODO(), desireObj, metav1.CreateOptions{})
if err != nil {
// The 'IsAlreadyExists' conflict may happen in following known scenarios:
// - 1. In a reconcile process, the execution controller successfully applied resource to member cluster but failed to update the work conditions(Applied=True),
// when reconcile again, the controller will try to apply(by create) the resource again.
// - 2. The resource already exist in the member cluster but it's not created by karmada.
if apierrors.IsAlreadyExists(err) {
if err := o.resolveAlreadyExist(dynamicClusterClient, gvr, desireObj); err != nil {
return err
existObj, err := dynamicClusterClient.DynamicClientSet.Resource(gvr).Namespace(desireObj.GetNamespace()).Get(context.TODO(), desireObj.GetName(), metav1.GetOptions{})
if err != nil {
return fmt.Errorf("failed to get exist resource(kind=%s, %s/%s) in cluster %v: %v", desireObj.GetKind(), desireObj.GetNamespace(), desireObj.GetName(), cluster.Name, err)
}
return nil
// Avoid updating resources that not managed by karmada.
if util.GetLabelValue(desireObj.GetLabels(), util.WorkNameLabel) != util.GetLabelValue(existObj.GetLabels(), util.WorkNameLabel) {
return fmt.Errorf("resource(kind=%s, %s/%s) already exist in cluster %v but not managed by karamda", desireObj.GetKind(), desireObj.GetNamespace(), desireObj.GetName(), cluster.Name)
}
return o.Update(cluster, desireObj, existObj)
}
klog.Errorf("Failed to create resource %v, err is %v ", desireObj.GetName(), err)
klog.Errorf("Failed to create resource(kind=%s, %s/%s), err is %v ", desireObj.GetKind(), desireObj.GetNamespace(), desireObj.GetName(), err)
return err
}
klog.Infof("Created resource(kind=%s, %s/%s) on cluster: %s", desireObj.GetKind(), desireObj.GetNamespace(), desireObj.GetName(), cluster.Name)
@ -87,31 +97,6 @@ func (o *objectWatcherImpl) Create(cluster *v1alpha1.Cluster, desireObj *unstruc
return nil
}
// resolveAlreadyExist used to resolve the 'AlreadyExist' conflicts in a 'Create' process.
// The conflict may happen in following known scenarios:
// - 1. In a reconcile process, the execution controller successfully applied resource to member cluster but failed to update the work conditions(Applied=True),
// when reconcile again, the controller will try to apply(by create) the resource again.
// - 2. The resource already exist in the member cluster but it's not created by karmada.
func (o *objectWatcherImpl) resolveAlreadyExist(dynamicClusterClient *util.DynamicClusterClient, gvr schema.GroupVersionResource, desireObj *unstructured.Unstructured) error {
existObj, err := dynamicClusterClient.DynamicClientSet.Resource(gvr).Namespace(desireObj.GetNamespace()).Get(context.TODO(), desireObj.GetName(), metav1.GetOptions{})
if err != nil {
return fmt.Errorf("failed to get exist resource %s(%s/%s): %v", desireObj.GetKind(), desireObj.GetNamespace(), desireObj.GetName(), err)
}
// Avoid updating resources that not managed by karmada.
if util.GetLabelValue(desireObj.GetLabels(), util.WorkNameLabel) != util.GetLabelValue(existObj.GetLabels(), util.WorkNameLabel) {
return fmt.Errorf("resource %s(%s/%s) already exist but not managed by karamda", desireObj.GetKind(), desireObj.GetNamespace(), desireObj.GetName())
}
_, err = dynamicClusterClient.DynamicClientSet.Resource(gvr).Namespace(desireObj.GetNamespace()).Update(context.TODO(), desireObj, metav1.UpdateOptions{})
if err != nil {
klog.Errorf("Failed to update exist resource %s(%s/%s), err is %v", desireObj.GetKind(), desireObj.GetNamespace(), desireObj.GetName(), err)
return fmt.Errorf("failed to update exist resource %s(%s/%s), err is %v", desireObj.GetKind(), desireObj.GetNamespace(), desireObj.GetName(), err)
}
return nil
}
func (o *objectWatcherImpl) Update(cluster *v1alpha1.Cluster, desireObj, clusterObj *unstructured.Unstructured) error {
dynamicClusterClient, err := o.ClusterClientSetFunc(cluster, o.KubeClientSet)
if err != nil {
@ -127,13 +112,13 @@ func (o *objectWatcherImpl) Update(cluster *v1alpha1.Cluster, desireObj, cluster
err = RetainClusterFields(desireObj, clusterObj)
if err != nil {
klog.Errorf("Failed to retain fields: %v", err)
klog.Errorf("Failed to retain fields for resource(kind=%s, %s/%s) : %v", desireObj.GetKind(), desireObj.GetNamespace(), desireObj.GetName(), err)
return err
}
resource, err := dynamicClusterClient.DynamicClientSet.Resource(gvr).Namespace(desireObj.GetNamespace()).Update(context.TODO(), desireObj, metav1.UpdateOptions{})
if err != nil {
klog.Errorf("Failed to update resource %v/%v, err is %v ", desireObj.GetNamespace(), desireObj.GetName(), err)
klog.Errorf("Failed to update resource(kind=%s, %s/%s), err is %v ", desireObj.GetKind(), desireObj.GetNamespace(), desireObj.GetName(), err)
return err
}

View File

@ -17,6 +17,7 @@ import (
"k8s.io/klog/v2"
policyv1alpha1 "github.com/karmada-io/karmada/pkg/apis/policy/v1alpha1"
"github.com/karmada-io/karmada/pkg/util"
"github.com/karmada-io/karmada/test/helper"
)
@ -73,23 +74,20 @@ var _ = ginkgo.Describe("[BasicClusterPropagation] basic cluster propagation tes
})
ginkgo.By("check if crd present on member clusters", func() {
for _, cluster := range clusters {
clusterDynamicClient := getClusterDynamicClient(cluster.Name)
gomega.Expect(clusterDynamicClient).ShouldNot(gomega.BeNil())
klog.Infof("Waiting for crd(%s) present on cluster(%s)", crd.Name, cluster.Name)
err := wait.PollImmediate(pollInterval, pollTimeout, func() (done bool, err error) {
_, err = clusterDynamicClient.Resource(crdGVR).Namespace(crd.Namespace).Get(context.TODO(), crd.Name, metav1.GetOptions{})
if err != nil {
if errors.IsNotFound(err) {
return false, nil
}
return false, err
crAPIVersion := fmt.Sprintf("%s/%s", crd.Spec.Group, "v1alpha1")
err := wait.PollImmediate(pollInterval, pollTimeout, func() (done bool, err error) {
clusters, err := fetchClusters(karmadaClient)
if err != nil {
return false, err
}
for _, cluster := range clusters {
if !util.IsAPIInstallInCluster(cluster.Status, crAPIVersion, crdSpecNames.Plural, crdSpecNames.Kind) {
return false, nil
}
return true, nil
})
gomega.Expect(err).ShouldNot(gomega.HaveOccurred())
}
}
return true, nil
})
gomega.Expect(err).ShouldNot(gomega.HaveOccurred())
})
ginkgo.By(fmt.Sprintf("removing crd(%s)", crd.Name), func() {

View File

@ -20,6 +20,7 @@ import (
"k8s.io/utils/pointer"
policyv1alpha1 "github.com/karmada-io/karmada/pkg/apis/policy/v1alpha1"
"github.com/karmada-io/karmada/pkg/util"
"github.com/karmada-io/karmada/test/helper"
)
@ -454,6 +455,22 @@ var _ = ginkgo.Describe("[BasicPropagation] basic propagation testing", func() {
_, err := dynamicClient.Resource(crdGVR).Namespace(crd.Namespace).Get(context.TODO(), crd.Name, metav1.GetOptions{})
gomega.Expect(err).ShouldNot(gomega.HaveOccurred())
})
ginkgo.By("check if crd present on member clusters", func() {
err := wait.PollImmediate(pollInterval, pollTimeout, func() (done bool, err error) {
clusters, err := fetchClusters(karmadaClient)
if err != nil {
return false, err
}
for _, cluster := range clusters {
if !util.IsAPIInstallInCluster(cluster.Status, crAPIVersion, crdSpecNames.Plural, crdSpecNames.Kind) {
return false, nil
}
}
return true, nil
})
gomega.Expect(err).ShouldNot(gomega.HaveOccurred())
})
})
ginkgo.BeforeEach(func() {