Refactor execution_controller.go
Signed-off-by: jwcesign <jiangwei115@huawei.com>
This commit is contained in:
parent
e523977cb0
commit
c923705c69
|
@ -195,23 +195,11 @@ func (c *Controller) syncToClusters(clusterName string, work *workv1alpha1.Work)
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
applied := helper.IsResourceApplied(&work.Status)
|
if err = c.tryCreateOrUpdateWorkload(clusterName, workload); err != nil {
|
||||||
if applied {
|
klog.Errorf("Failed to create or update resource(%v/%v) in the given member cluster %s, err is %v", workload.GetNamespace(), workload.GetName(), clusterName, err)
|
||||||
err = c.tryUpdateWorkload(clusterName, workload)
|
c.eventf(workload, corev1.EventTypeWarning, events.EventReasonSyncWorkloadFailed, "Failed to create or update resource(%s) in member cluster(%s): %v", klog.KObj(workload), clusterName, err)
|
||||||
if err != nil {
|
errs = append(errs, err)
|
||||||
klog.Errorf("Failed to update resource(%v/%v) in the given member cluster %s, err is %v", workload.GetNamespace(), workload.GetName(), clusterName, err)
|
continue
|
||||||
c.eventf(workload, corev1.EventTypeWarning, events.EventReasonSyncWorkloadFailed, "Failed to update resource(%s) in member cluster(%s): %v", klog.KObj(workload), clusterName, err)
|
|
||||||
errs = append(errs, err)
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
err = c.tryCreateWorkload(clusterName, workload)
|
|
||||||
if err != nil {
|
|
||||||
klog.Errorf("Failed to create resource(%v/%v) in the given member cluster %s, err is %v", workload.GetNamespace(), workload.GetName(), clusterName, err)
|
|
||||||
c.eventf(workload, corev1.EventTypeWarning, events.EventReasonSyncWorkloadFailed, "Failed to create resource(%s) in member cluster(%s): %v", klog.KObj(workload), clusterName, err)
|
|
||||||
errs = append(errs, err)
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
c.eventf(workload, corev1.EventTypeNormal, events.EventReasonSyncWorkloadSucceed, "Successfully applied resource(%v/%v) to cluster %s", workload.GetNamespace(), workload.GetName(), clusterName)
|
c.eventf(workload, corev1.EventTypeNormal, events.EventReasonSyncWorkloadSucceed, "Successfully applied resource(%v/%v) to cluster %s", workload.GetNamespace(), workload.GetName(), clusterName)
|
||||||
syncSucceedNum++
|
syncSucceedNum++
|
||||||
|
@ -237,7 +225,7 @@ func (c *Controller) syncToClusters(clusterName string, work *workv1alpha1.Work)
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *Controller) tryUpdateWorkload(clusterName string, workload *unstructured.Unstructured) error {
|
func (c *Controller) tryCreateOrUpdateWorkload(clusterName string, workload *unstructured.Unstructured) error {
|
||||||
fedKey, err := keys.FederatedKeyFunc(clusterName, workload)
|
fedKey, err := keys.FederatedKeyFunc(clusterName, workload)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
klog.Errorf("Failed to get FederatedKey %s, error: %v", workload.GetName(), err)
|
klog.Errorf("Failed to get FederatedKey %s, error: %v", workload.GetName(), err)
|
||||||
|
@ -250,7 +238,7 @@ func (c *Controller) tryUpdateWorkload(clusterName string, workload *unstructure
|
||||||
klog.Errorf("Failed to get resource %v from member cluster, err is %v ", workload.GetName(), err)
|
klog.Errorf("Failed to get resource %v from member cluster, err is %v ", workload.GetName(), err)
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
err = c.tryCreateWorkload(clusterName, workload)
|
err = c.ObjectWatcher.Create(clusterName, workload)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
klog.Errorf("Failed to create resource(%v/%v) in the given member cluster %s, err is %v", workload.GetNamespace(), workload.GetName(), clusterName, err)
|
klog.Errorf("Failed to create resource(%v/%v) in the given member cluster %s, err is %v", workload.GetNamespace(), workload.GetName(), clusterName, err)
|
||||||
return err
|
return err
|
||||||
|
@ -266,10 +254,6 @@ func (c *Controller) tryUpdateWorkload(clusterName string, workload *unstructure
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *Controller) tryCreateWorkload(clusterName string, workload *unstructured.Unstructured) error {
|
|
||||||
return c.ObjectWatcher.Create(clusterName, workload)
|
|
||||||
}
|
|
||||||
|
|
||||||
// updateAppliedCondition update the Applied condition for the given Work
|
// updateAppliedCondition update the Applied condition for the given Work
|
||||||
func (c *Controller) updateAppliedCondition(work *workv1alpha1.Work, status metav1.ConditionStatus, reason, message string) error {
|
func (c *Controller) updateAppliedCondition(work *workv1alpha1.Work, status metav1.ConditionStatus, reason, message string) error {
|
||||||
newWorkAppliedCondition := metav1.Condition{
|
newWorkAppliedCondition := metav1.Condition{
|
||||||
|
|
|
@ -75,7 +75,7 @@ func (o *objectWatcherImpl) Create(clusterName string, desireObj *unstructured.U
|
||||||
klog.Errorf("Failed to get FederatedKey %s, error: %v", desireObj.GetName(), err)
|
klog.Errorf("Failed to get FederatedKey %s, error: %v", desireObj.GetName(), err)
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
existObj, err := helper.GetObjectFromCache(o.RESTMapper, o.InformerManager, fedKey)
|
_, err = helper.GetObjectFromCache(o.RESTMapper, o.InformerManager, fedKey)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
if !apierrors.IsNotFound(err) {
|
if !apierrors.IsNotFound(err) {
|
||||||
klog.Errorf("Failed to get resource %v from member cluster, err is %v ", desireObj.GetName(), err)
|
klog.Errorf("Failed to get resource %v from member cluster, err is %v ", desireObj.GetName(), err)
|
||||||
|
@ -91,37 +91,11 @@ func (o *objectWatcherImpl) Create(clusterName string, desireObj *unstructured.U
|
||||||
klog.Infof("Created resource(kind=%s, %s/%s) on cluster: %s", desireObj.GetKind(), desireObj.GetNamespace(), desireObj.GetName(), clusterName)
|
klog.Infof("Created resource(kind=%s, %s/%s) on cluster: %s", desireObj.GetKind(), desireObj.GetNamespace(), desireObj.GetName(), clusterName)
|
||||||
// record version
|
// record version
|
||||||
o.recordVersion(clusterObj, dynamicClusterClient.ClusterName)
|
o.recordVersion(clusterObj, dynamicClusterClient.ClusterName)
|
||||||
} else {
|
|
||||||
// If the existing resource is managed by Karmada, then just update it.
|
|
||||||
if util.GetLabelValue(desireObj.GetLabels(), workv1alpha1.WorkNameLabel) == util.GetLabelValue(existObj.GetLabels(), workv1alpha1.WorkNameLabel) {
|
|
||||||
return o.Update(clusterName, desireObj, existObj)
|
|
||||||
}
|
|
||||||
// update the resource if a conflict resolution instruction is existed in annotation.
|
|
||||||
err = o.resourceConflictOverwrite(clusterName, desireObj, existObj)
|
|
||||||
if err != nil {
|
|
||||||
return fmt.Errorf("failed to update exist resource(kind=%s, %s/%s) in cluster %v: %v", desireObj.GetKind(), desireObj.GetNamespace(), desireObj.GetName(), clusterName, err)
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// resourceConflictOverwrite update the resource if a conflict resolution instruction is existed in annotation.
|
|
||||||
// The existing resource is not managed by Karmada, then we should consult conflict resolution instruction in annotation.
|
|
||||||
func (o *objectWatcherImpl) resourceConflictOverwrite(clusterName string, desireObj *unstructured.Unstructured, existObj *unstructured.Unstructured) error {
|
|
||||||
switch util.GetAnnotationValue(desireObj.GetAnnotations(), workv1alpha2.ResourceConflictResolutionAnnotation) {
|
|
||||||
case workv1alpha2.ResourceConflictResolutionOverwrite:
|
|
||||||
klog.Infof("Overwriting the resource(kind=%s, %s/%s) as %s=%s", desireObj.GetKind(), desireObj.GetNamespace(), desireObj.GetName(),
|
|
||||||
workv1alpha2.ResourceConflictResolutionAnnotation, workv1alpha2.ResourceConflictResolutionOverwrite)
|
|
||||||
return o.Update(clusterName, desireObj, existObj)
|
|
||||||
default:
|
|
||||||
// The existing resource is not managed by Karmada, and no conflict resolution found, avoid updating the existing resource by default.
|
|
||||||
return fmt.Errorf("resource(kind=%s, %s/%s) already exist in cluster %v and the %s strategy value is empty, karmada will not manage this resource",
|
|
||||||
desireObj.GetKind(), desireObj.GetNamespace(), desireObj.GetName(), clusterName, workv1alpha2.ResourceConflictResolutionAnnotation,
|
|
||||||
)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func (o *objectWatcherImpl) retainClusterFields(desired, observed *unstructured.Unstructured) (*unstructured.Unstructured, error) {
|
func (o *objectWatcherImpl) retainClusterFields(desired, observed *unstructured.Unstructured) (*unstructured.Unstructured, error) {
|
||||||
// Pass the same ResourceVersion as in the cluster object for update operation, otherwise operation will fail.
|
// Pass the same ResourceVersion as in the cluster object for update operation, otherwise operation will fail.
|
||||||
desired.SetResourceVersion(observed.GetResourceVersion())
|
desired.SetResourceVersion(observed.GetResourceVersion())
|
||||||
|
@ -150,6 +124,11 @@ func (o *objectWatcherImpl) retainClusterFields(desired, observed *unstructured.
|
||||||
}
|
}
|
||||||
|
|
||||||
func (o *objectWatcherImpl) Update(clusterName string, desireObj, clusterObj *unstructured.Unstructured) error {
|
func (o *objectWatcherImpl) Update(clusterName string, desireObj, clusterObj *unstructured.Unstructured) error {
|
||||||
|
updateAllowed := o.allowUpdate(clusterName, desireObj, clusterObj)
|
||||||
|
if !updateAllowed {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
dynamicClusterClient, err := o.ClusterClientSetFunc(clusterName, o.KubeClientSet)
|
dynamicClusterClient, err := o.ClusterClientSetFunc(clusterName, o.KubeClientSet)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
klog.Errorf("Failed to build dynamic cluster client for cluster %s.", clusterName)
|
klog.Errorf("Failed to build dynamic cluster client for cluster %s.", clusterName)
|
||||||
|
@ -205,10 +184,7 @@ func (o *objectWatcherImpl) Delete(clusterName string, desireObj *unstructured.U
|
||||||
}
|
}
|
||||||
|
|
||||||
err = dynamicClusterClient.DynamicClientSet.Resource(gvr).Namespace(desireObj.GetNamespace()).Delete(context.TODO(), desireObj.GetName(), deleteOption)
|
err = dynamicClusterClient.DynamicClientSet.Resource(gvr).Namespace(desireObj.GetNamespace()).Delete(context.TODO(), desireObj.GetName(), deleteOption)
|
||||||
if apierrors.IsNotFound(err) {
|
if err != nil && !apierrors.IsNotFound(err) {
|
||||||
err = nil
|
|
||||||
}
|
|
||||||
if err != nil {
|
|
||||||
klog.Errorf("Failed to delete resource %v in cluster %s, err is %v ", desireObj.GetName(), clusterName, err)
|
klog.Errorf("Failed to delete resource %v in cluster %s, err is %v ", desireObj.GetName(), clusterName, err)
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
@ -285,3 +261,22 @@ func (o *objectWatcherImpl) NeedsUpdate(clusterName string, desiredObj, clusterO
|
||||||
|
|
||||||
return lifted.ObjectNeedsUpdate(desiredObj, clusterObj, version), nil
|
return lifted.ObjectNeedsUpdate(desiredObj, clusterObj, version), nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (o *objectWatcherImpl) allowUpdate(clusterName string, desiredObj, clusterObj *unstructured.Unstructured) bool {
|
||||||
|
// If the existing resource is managed by Karmada, then the updating is allowed.
|
||||||
|
if util.GetLabelValue(desiredObj.GetLabels(), workv1alpha1.WorkNameLabel) == util.GetLabelValue(clusterObj.GetLabels(), workv1alpha1.WorkNameLabel) {
|
||||||
|
return true
|
||||||
|
}
|
||||||
|
|
||||||
|
// This happens when promoting workload to the Karmada control plane
|
||||||
|
conflictResolution := util.GetAnnotationValue(desiredObj.GetAnnotations(), workv1alpha2.ResourceConflictResolutionAnnotation)
|
||||||
|
if conflictResolution == workv1alpha2.ResourceConflictResolutionOverwrite {
|
||||||
|
return true
|
||||||
|
}
|
||||||
|
|
||||||
|
// The existing resource is not managed by Karmada, and no conflict resolution found, avoid updating the existing resource by default.
|
||||||
|
klog.Warningf("resource(kind=%s, %s/%s) already exist in cluster %v and the %s strategy value is empty, karmada will not manage this resource",
|
||||||
|
desiredObj.GetKind(), desiredObj.GetNamespace(), desiredObj.GetName(), clusterName, workv1alpha2.ResourceConflictResolutionAnnotation,
|
||||||
|
)
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
|
Loading…
Reference in New Issue