Fix resource binding get reconciled multiple times

Signed-off-by: pigletfly <wangbing.adam@gmail.com>
This commit is contained in:
pigletfly 2022-03-03 18:38:42 +08:00
parent f3b1142599
commit d8470c2c52
2 changed files with 48 additions and 116 deletions

View File

@ -3,11 +3,14 @@ package binding
import (
"context"
"fmt"
"reflect"
corev1 "k8s.io/api/core/v1"
apiextensionsv1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/api/meta"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/errors"
"k8s.io/client-go/dynamic"
@ -21,6 +24,7 @@ import (
"sigs.k8s.io/controller-runtime/pkg/reconcile"
"sigs.k8s.io/controller-runtime/pkg/source"
configv1alpha1 "github.com/karmada-io/karmada/pkg/apis/config/v1alpha1"
policyv1alpha1 "github.com/karmada-io/karmada/pkg/apis/policy/v1alpha1"
workv1alpha1 "github.com/karmada-io/karmada/pkg/apis/work/v1alpha1"
workv1alpha2 "github.com/karmada-io/karmada/pkg/apis/work/v1alpha2"
@ -30,6 +34,7 @@ import (
"github.com/karmada-io/karmada/pkg/util/informermanager"
"github.com/karmada-io/karmada/pkg/util/overridemanager"
"github.com/karmada-io/karmada/pkg/util/ratelimiter"
"github.com/karmada-io/karmada/pkg/util/restmapper"
)
// ControllerName is the controller name that will be used when reporting events.
@ -145,9 +150,52 @@ func (c *ResourceBindingController) syncBinding(binding *workv1alpha2.ResourceBi
return controllerruntime.Result{Requeue: true}, errors.NewAggregate(errs)
}
err = c.updateResourceStatus(binding)
if err != nil {
return controllerruntime.Result{Requeue: true}, err
}
return controllerruntime.Result{}, nil
}
// updateResourceStatus will try to calculate the summary status and update to original object
// that the ResourceBinding refer to.
func (c *ResourceBindingController) updateResourceStatus(binding *workv1alpha2.ResourceBinding) error {
resource := binding.Spec.Resource
gvr, err := restmapper.GetGroupVersionResource(
c.RESTMapper, schema.FromAPIVersionAndKind(resource.APIVersion, resource.Kind),
)
if err != nil {
klog.Errorf("Failed to get GVR from GVK %s %s. Error: %v", resource.APIVersion, resource.Kind, err)
return err
}
obj, err := helper.FetchWorkload(c.DynamicClient, c.InformerManager, c.RESTMapper, resource)
if err != nil {
klog.Errorf("Failed to get resource(%s/%s/%s), Error: %v", resource.Kind, resource.Namespace, resource.Name, err)
return err
}
if !c.ResourceInterpreter.HookEnabled(obj.GroupVersionKind(), configv1alpha1.InterpreterOperationAggregateStatus) {
return nil
}
newObj, err := c.ResourceInterpreter.AggregateStatus(obj, binding.Status.AggregatedStatus)
if err != nil {
klog.Errorf("AggregateStatus for resource(%s/%s/%s) failed: %v", resource.Kind, resource.Namespace, resource.Name, err)
return err
}
if reflect.DeepEqual(obj, newObj) {
klog.V(3).Infof("ignore update resource(%s/%s/%s) status as up to date", resource.Kind, resource.Namespace, resource.Name)
return nil
}
if _, err = c.DynamicClient.Resource(gvr).Namespace(resource.Namespace).UpdateStatus(context.TODO(), newObj, metav1.UpdateOptions{}); err != nil {
klog.Errorf("Failed to update resource(%s/%s/%s), Error: %v", resource.Kind, resource.Namespace, resource.Name, err)
return err
}
klog.V(3).Infof("update resource status successfully for resource(%s/%s/%s)", resource.Kind, resource.Namespace, resource.Name)
return nil
}
// SetupWithManager creates a controller and register to controller manager.
func (c *ResourceBindingController) SetupWithManager(mgr controllerruntime.Manager) error {
workFn := handler.MapFunc(

View File

@ -3,7 +3,6 @@ package detector
import (
"context"
"fmt"
"reflect"
"sync"
"time"
@ -66,11 +65,6 @@ type ResourceDetector struct {
clusterPolicyReconcileWorker util.AsyncWorker
clusterPropagationPolicyLister cache.GenericLister
// bindingReconcileWorker maintains a rate limited queue which used to store ResourceBinding's key and
// a reconcile function to consume the items in queue.
bindingReconcileWorker util.AsyncWorker
resourceBindingLister cache.GenericLister
RESTMapper meta.RESTMapper
// waitingObjects tracks of objects which haven't be propagated yet as lack of appropriate policies.
@ -130,39 +124,12 @@ func (d *ResourceDetector) Start(ctx context.Context) error {
d.InformerManager.ForResource(clusterPropagationPolicyGVR, clusterPolicyHandler)
d.clusterPropagationPolicyLister = d.InformerManager.Lister(clusterPropagationPolicyGVR)
// setup binding reconcile worker
bindingWorkerOptions := util.Options{
Name: "resourceBinding reconciler",
KeyFunc: ClusterWideKeyFunc,
ReconcileFunc: d.ReconcileResourceBinding,
}
d.bindingReconcileWorker = util.NewAsyncWorker(bindingWorkerOptions)
d.bindingReconcileWorker.Run(d.ConcurrentResourceBindingSyncs, d.stopCh)
// watch and enqueue ResourceBinding changes.
resourceBindingGVR := schema.GroupVersionResource{
Group: workv1alpha2.GroupVersion.Group,
Version: workv1alpha2.GroupVersion.Version,
Resource: "resourcebindings",
}
bindingHandler := informermanager.NewHandlerOnEvents(d.OnResourceBindingAdd, d.OnResourceBindingUpdate, nil)
d.InformerManager.ForResource(resourceBindingGVR, bindingHandler)
d.resourceBindingLister = d.InformerManager.Lister(resourceBindingGVR)
// watch and enqueue ClusterResourceBinding changes.
clusterResourceBindingGVR := schema.GroupVersionResource{
Group: workv1alpha2.GroupVersion.Group,
Version: workv1alpha2.GroupVersion.Version,
Resource: "clusterresourcebindings",
}
detectorWorkerOptions := util.Options{
Name: "resource detector",
KeyFunc: ClusterWideKeyFunc,
ReconcileFunc: d.Reconcile,
RatelimiterOptions: d.RatelimiterOptions,
}
clusterBindingHandler := informermanager.NewHandlerOnEvents(d.OnClusterResourceBindingAdd, d.OnClusterResourceBindingUpdate, nil)
d.InformerManager.ForResource(clusterResourceBindingGVR, clusterBindingHandler)
d.EventHandler = informermanager.NewFilteringHandlerOnAllEvents(d.EventFilter, d.OnAdd, d.OnUpdate, d.OnDelete)
d.Processor = util.NewAsyncWorker(detectorWorkerOptions)
@ -1049,89 +1016,6 @@ func (d *ResourceDetector) HandleClusterPropagationPolicyCreation(policy *policy
return nil
}
// OnResourceBindingAdd handles object add event.
func (d *ResourceDetector) OnResourceBindingAdd(obj interface{}) {
key, err := ClusterWideKeyFunc(obj)
if err != nil {
return
}
d.bindingReconcileWorker.Add(key)
}
// OnResourceBindingUpdate handles object update event and push the object to queue.
func (d *ResourceDetector) OnResourceBindingUpdate(_, newObj interface{}) {
d.OnResourceBindingAdd(newObj)
}
// ReconcileResourceBinding handles ResourceBinding object changes.
// For each ResourceBinding changes, we will try to calculate the summary status and update to original object
// that the ResourceBinding refer to.
func (d *ResourceDetector) ReconcileResourceBinding(key util.QueueKey) error {
ckey, ok := key.(keys.ClusterWideKey)
if !ok { // should not happen
klog.Error("Found invalid key when reconciling resource binding.")
return fmt.Errorf("invalid key")
}
unstructuredObj, err := d.resourceBindingLister.Get(ckey.NamespaceKey())
if err != nil {
if apierrors.IsNotFound(err) {
return nil
}
return err
}
binding, err := helper.ConvertToResourceBinding(unstructuredObj.(*unstructured.Unstructured))
if err != nil {
klog.Errorf("Failed to convert ResourceBinding(%s) from unstructured object: %v", ckey.NamespaceKey(), err)
return err
}
klog.Infof("Reconciling resource binding(%s/%s)", binding.Namespace, binding.Name)
resource := binding.Spec.Resource
gvr, err := restmapper.GetGroupVersionResource(
d.RESTMapper, schema.FromAPIVersionAndKind(resource.APIVersion, resource.Kind),
)
if err != nil {
klog.Errorf("Failed to get GVR from GVK %s %s. Error: %v", resource.APIVersion, resource.Kind, err)
return err
}
obj, err := helper.FetchWorkload(d.DynamicClient, d.InformerManager, d.RESTMapper, resource)
if err != nil {
klog.Errorf("Failed to get resource(%s/%s/%s), Error: %v", resource.Kind, resource.Namespace, resource.Name, err)
return err
}
if !d.ResourceInterpreter.HookEnabled(obj.GroupVersionKind(), configv1alpha1.InterpreterOperationAggregateStatus) {
return nil
}
newObj, err := d.ResourceInterpreter.AggregateStatus(obj, binding.Status.AggregatedStatus)
if err != nil {
klog.Errorf("AggregateStatus for resource(%s/%s/%s) failed: %v", resource.Kind, resource.Namespace, resource.Name, err)
return err
}
if reflect.DeepEqual(obj, newObj) {
klog.V(3).Infof("ignore update resource(%s/%s/%s) status as up to date", resource.Kind, resource.Namespace, resource.Name)
return nil
}
if _, err = d.DynamicClient.Resource(gvr).Namespace(resource.Namespace).UpdateStatus(context.TODO(), newObj, metav1.UpdateOptions{}); err != nil {
klog.Errorf("Failed to update resource(%s/%s/%s), Error: %v", resource.Kind, resource.Namespace, resource.Name, err)
return err
}
return nil
}
// OnClusterResourceBindingAdd handles object add event.
func (d *ResourceDetector) OnClusterResourceBindingAdd(obj interface{}) {
}
// OnClusterResourceBindingUpdate handles object update event and push the object to queue.
func (d *ResourceDetector) OnClusterResourceBindingUpdate(oldObj, newObj interface{}) {
d.OnClusterResourceBindingAdd(newObj)
}
// CleanupLabels removes labels from object referencing by objRef.
func (d *ResourceDetector) CleanupLabels(objRef workv1alpha2.ObjectReference, labels ...string) error {
workload, err := helper.FetchWorkload(d.DynamicClient, d.InformerManager, d.RESTMapper, objRef)