diff --git a/pkg/controllers/binding/binding_controller.go b/pkg/controllers/binding/binding_controller.go index 135ee5326..8cc61d9bf 100644 --- a/pkg/controllers/binding/binding_controller.go +++ b/pkg/controllers/binding/binding_controller.go @@ -3,11 +3,14 @@ package binding import ( "context" "fmt" + "reflect" corev1 "k8s.io/api/core/v1" apiextensionsv1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1" apierrors "k8s.io/apimachinery/pkg/api/errors" "k8s.io/apimachinery/pkg/api/meta" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/util/errors" "k8s.io/client-go/dynamic" @@ -21,6 +24,7 @@ import ( "sigs.k8s.io/controller-runtime/pkg/reconcile" "sigs.k8s.io/controller-runtime/pkg/source" + configv1alpha1 "github.com/karmada-io/karmada/pkg/apis/config/v1alpha1" policyv1alpha1 "github.com/karmada-io/karmada/pkg/apis/policy/v1alpha1" workv1alpha1 "github.com/karmada-io/karmada/pkg/apis/work/v1alpha1" workv1alpha2 "github.com/karmada-io/karmada/pkg/apis/work/v1alpha2" @@ -30,6 +34,7 @@ import ( "github.com/karmada-io/karmada/pkg/util/informermanager" "github.com/karmada-io/karmada/pkg/util/overridemanager" "github.com/karmada-io/karmada/pkg/util/ratelimiter" + "github.com/karmada-io/karmada/pkg/util/restmapper" ) // ControllerName is the controller name that will be used when reporting events. @@ -145,9 +150,52 @@ func (c *ResourceBindingController) syncBinding(binding *workv1alpha2.ResourceBi return controllerruntime.Result{Requeue: true}, errors.NewAggregate(errs) } + err = c.updateResourceStatus(binding) + if err != nil { + return controllerruntime.Result{Requeue: true}, err + } + return controllerruntime.Result{}, nil } +// updateResourceStatus will try to calculate the summary status and update to original object +// that the ResourceBinding refer to. +func (c *ResourceBindingController) updateResourceStatus(binding *workv1alpha2.ResourceBinding) error { + resource := binding.Spec.Resource + gvr, err := restmapper.GetGroupVersionResource( + c.RESTMapper, schema.FromAPIVersionAndKind(resource.APIVersion, resource.Kind), + ) + if err != nil { + klog.Errorf("Failed to get GVR from GVK %s %s. Error: %v", resource.APIVersion, resource.Kind, err) + return err + } + obj, err := helper.FetchWorkload(c.DynamicClient, c.InformerManager, c.RESTMapper, resource) + if err != nil { + klog.Errorf("Failed to get resource(%s/%s/%s), Error: %v", resource.Kind, resource.Namespace, resource.Name, err) + return err + } + + if !c.ResourceInterpreter.HookEnabled(obj.GroupVersionKind(), configv1alpha1.InterpreterOperationAggregateStatus) { + return nil + } + newObj, err := c.ResourceInterpreter.AggregateStatus(obj, binding.Status.AggregatedStatus) + if err != nil { + klog.Errorf("AggregateStatus for resource(%s/%s/%s) failed: %v", resource.Kind, resource.Namespace, resource.Name, err) + return err + } + if reflect.DeepEqual(obj, newObj) { + klog.V(3).Infof("ignore update resource(%s/%s/%s) status as up to date", resource.Kind, resource.Namespace, resource.Name) + return nil + } + + if _, err = c.DynamicClient.Resource(gvr).Namespace(resource.Namespace).UpdateStatus(context.TODO(), newObj, metav1.UpdateOptions{}); err != nil { + klog.Errorf("Failed to update resource(%s/%s/%s), Error: %v", resource.Kind, resource.Namespace, resource.Name, err) + return err + } + klog.V(3).Infof("update resource status successfully for resource(%s/%s/%s)", resource.Kind, resource.Namespace, resource.Name) + return nil +} + // SetupWithManager creates a controller and register to controller manager. func (c *ResourceBindingController) SetupWithManager(mgr controllerruntime.Manager) error { workFn := handler.MapFunc( diff --git a/pkg/detector/detector.go b/pkg/detector/detector.go index ac8469b05..b0566fda8 100644 --- a/pkg/detector/detector.go +++ b/pkg/detector/detector.go @@ -3,7 +3,6 @@ package detector import ( "context" "fmt" - "reflect" "sync" "time" @@ -66,11 +65,6 @@ type ResourceDetector struct { clusterPolicyReconcileWorker util.AsyncWorker clusterPropagationPolicyLister cache.GenericLister - // bindingReconcileWorker maintains a rate limited queue which used to store ResourceBinding's key and - // a reconcile function to consume the items in queue. - bindingReconcileWorker util.AsyncWorker - resourceBindingLister cache.GenericLister - RESTMapper meta.RESTMapper // waitingObjects tracks of objects which haven't be propagated yet as lack of appropriate policies. @@ -130,39 +124,12 @@ func (d *ResourceDetector) Start(ctx context.Context) error { d.InformerManager.ForResource(clusterPropagationPolicyGVR, clusterPolicyHandler) d.clusterPropagationPolicyLister = d.InformerManager.Lister(clusterPropagationPolicyGVR) - // setup binding reconcile worker - bindingWorkerOptions := util.Options{ - Name: "resourceBinding reconciler", - KeyFunc: ClusterWideKeyFunc, - ReconcileFunc: d.ReconcileResourceBinding, - } - d.bindingReconcileWorker = util.NewAsyncWorker(bindingWorkerOptions) - d.bindingReconcileWorker.Run(d.ConcurrentResourceBindingSyncs, d.stopCh) - - // watch and enqueue ResourceBinding changes. - resourceBindingGVR := schema.GroupVersionResource{ - Group: workv1alpha2.GroupVersion.Group, - Version: workv1alpha2.GroupVersion.Version, - Resource: "resourcebindings", - } - bindingHandler := informermanager.NewHandlerOnEvents(d.OnResourceBindingAdd, d.OnResourceBindingUpdate, nil) - d.InformerManager.ForResource(resourceBindingGVR, bindingHandler) - d.resourceBindingLister = d.InformerManager.Lister(resourceBindingGVR) - - // watch and enqueue ClusterResourceBinding changes. - clusterResourceBindingGVR := schema.GroupVersionResource{ - Group: workv1alpha2.GroupVersion.Group, - Version: workv1alpha2.GroupVersion.Version, - Resource: "clusterresourcebindings", - } detectorWorkerOptions := util.Options{ Name: "resource detector", KeyFunc: ClusterWideKeyFunc, ReconcileFunc: d.Reconcile, RatelimiterOptions: d.RatelimiterOptions, } - clusterBindingHandler := informermanager.NewHandlerOnEvents(d.OnClusterResourceBindingAdd, d.OnClusterResourceBindingUpdate, nil) - d.InformerManager.ForResource(clusterResourceBindingGVR, clusterBindingHandler) d.EventHandler = informermanager.NewFilteringHandlerOnAllEvents(d.EventFilter, d.OnAdd, d.OnUpdate, d.OnDelete) d.Processor = util.NewAsyncWorker(detectorWorkerOptions) @@ -1049,89 +1016,6 @@ func (d *ResourceDetector) HandleClusterPropagationPolicyCreation(policy *policy return nil } -// OnResourceBindingAdd handles object add event. -func (d *ResourceDetector) OnResourceBindingAdd(obj interface{}) { - key, err := ClusterWideKeyFunc(obj) - if err != nil { - return - } - - d.bindingReconcileWorker.Add(key) -} - -// OnResourceBindingUpdate handles object update event and push the object to queue. -func (d *ResourceDetector) OnResourceBindingUpdate(_, newObj interface{}) { - d.OnResourceBindingAdd(newObj) -} - -// ReconcileResourceBinding handles ResourceBinding object changes. -// For each ResourceBinding changes, we will try to calculate the summary status and update to original object -// that the ResourceBinding refer to. -func (d *ResourceDetector) ReconcileResourceBinding(key util.QueueKey) error { - ckey, ok := key.(keys.ClusterWideKey) - if !ok { // should not happen - klog.Error("Found invalid key when reconciling resource binding.") - return fmt.Errorf("invalid key") - } - - unstructuredObj, err := d.resourceBindingLister.Get(ckey.NamespaceKey()) - if err != nil { - if apierrors.IsNotFound(err) { - return nil - } - return err - } - - binding, err := helper.ConvertToResourceBinding(unstructuredObj.(*unstructured.Unstructured)) - if err != nil { - klog.Errorf("Failed to convert ResourceBinding(%s) from unstructured object: %v", ckey.NamespaceKey(), err) - return err - } - - klog.Infof("Reconciling resource binding(%s/%s)", binding.Namespace, binding.Name) - resource := binding.Spec.Resource - gvr, err := restmapper.GetGroupVersionResource( - d.RESTMapper, schema.FromAPIVersionAndKind(resource.APIVersion, resource.Kind), - ) - if err != nil { - klog.Errorf("Failed to get GVR from GVK %s %s. Error: %v", resource.APIVersion, resource.Kind, err) - return err - } - obj, err := helper.FetchWorkload(d.DynamicClient, d.InformerManager, d.RESTMapper, resource) - if err != nil { - klog.Errorf("Failed to get resource(%s/%s/%s), Error: %v", resource.Kind, resource.Namespace, resource.Name, err) - return err - } - - if !d.ResourceInterpreter.HookEnabled(obj.GroupVersionKind(), configv1alpha1.InterpreterOperationAggregateStatus) { - return nil - } - newObj, err := d.ResourceInterpreter.AggregateStatus(obj, binding.Status.AggregatedStatus) - if err != nil { - klog.Errorf("AggregateStatus for resource(%s/%s/%s) failed: %v", resource.Kind, resource.Namespace, resource.Name, err) - return err - } - if reflect.DeepEqual(obj, newObj) { - klog.V(3).Infof("ignore update resource(%s/%s/%s) status as up to date", resource.Kind, resource.Namespace, resource.Name) - return nil - } - - if _, err = d.DynamicClient.Resource(gvr).Namespace(resource.Namespace).UpdateStatus(context.TODO(), newObj, metav1.UpdateOptions{}); err != nil { - klog.Errorf("Failed to update resource(%s/%s/%s), Error: %v", resource.Kind, resource.Namespace, resource.Name, err) - return err - } - return nil -} - -// OnClusterResourceBindingAdd handles object add event. -func (d *ResourceDetector) OnClusterResourceBindingAdd(obj interface{}) { -} - -// OnClusterResourceBindingUpdate handles object update event and push the object to queue. -func (d *ResourceDetector) OnClusterResourceBindingUpdate(oldObj, newObj interface{}) { - d.OnClusterResourceBindingAdd(newObj) -} - // CleanupLabels removes labels from object referencing by objRef. func (d *ResourceDetector) CleanupLabels(objRef workv1alpha2.ObjectReference, labels ...string) error { workload, err := helper.FetchWorkload(d.DynamicClient, d.InformerManager, d.RESTMapper, objRef)