diff --git a/pkg/controllers/binding/binding_controller.go b/pkg/controllers/binding/binding_controller.go index 0a5442c55..2c4c1e89c 100644 --- a/pkg/controllers/binding/binding_controller.go +++ b/pkg/controllers/binding/binding_controller.go @@ -2,17 +2,26 @@ package binding import ( "context" + "reflect" apiextensionsv1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1" "k8s.io/apimachinery/pkg/api/errors" "k8s.io/apimachinery/pkg/api/meta" + "k8s.io/apimachinery/pkg/types" "k8s.io/client-go/dynamic" "k8s.io/client-go/tools/record" "k8s.io/klog/v2" controllerruntime "sigs.k8s.io/controller-runtime" + "sigs.k8s.io/controller-runtime/pkg/builder" "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/event" + "sigs.k8s.io/controller-runtime/pkg/handler" + "sigs.k8s.io/controller-runtime/pkg/predicate" + "sigs.k8s.io/controller-runtime/pkg/reconcile" + "sigs.k8s.io/controller-runtime/pkg/source" workv1alpha1 "github.com/karmada-io/karmada/pkg/apis/work/v1alpha1" + "github.com/karmada-io/karmada/pkg/util" "github.com/karmada-io/karmada/pkg/util/helper" "github.com/karmada-io/karmada/pkg/util/overridemanager" ) @@ -53,7 +62,7 @@ func (c *ResourceBindingController) Reconcile(req controllerruntime.Request) (co isReady := helper.IsBindingReady(binding.Spec.Clusters) if !isReady { - klog.Infof("ResourceBinding %s/%s is not ready to sync", binding.GetNamespace(), binding.GetName()) + klog.Infof("ResourceBinding(%s/%s) is not ready to sync", binding.GetNamespace(), binding.GetName()) return controllerruntime.Result{}, nil } @@ -65,28 +74,35 @@ func (c *ResourceBindingController) syncBinding(binding *workv1alpha1.ResourceBi clusterNames := helper.GetBindingClusterNames(binding.Spec.Clusters) works, err := helper.FindOrphanWorks(c.Client, binding.Namespace, binding.Name, clusterNames, apiextensionsv1.NamespaceScoped) if err != nil { - klog.Errorf("Failed to find orphan works by resourceBinding %s/%s. Error: %v.", + klog.Errorf("Failed to find orphan works by resourceBinding(%s/%s). Error: %v.", binding.GetNamespace(), binding.GetName(), err) return controllerruntime.Result{Requeue: true}, err } err = helper.RemoveOrphanWorks(c.Client, works) if err != nil { - klog.Errorf("Failed to remove orphan works by resourceBinding %s/%s. Error: %v.", + klog.Errorf("Failed to remove orphan works by resourceBinding(%s/%s). Error: %v.", binding.GetNamespace(), binding.GetName(), err) return controllerruntime.Result{Requeue: true}, err } workload, err := helper.FetchWorkload(c.DynamicClient, c.RESTMapper, binding.Spec.Resource) if err != nil { - klog.Errorf("Failed to fetch workload for resourceBinding %s/%s. Error: %v.", + klog.Errorf("Failed to fetch workload for resourceBinding(%s/%s). Error: %v.", binding.GetNamespace(), binding.GetName(), err) return controllerruntime.Result{Requeue: true}, err } err = helper.EnsureWork(c.Client, workload, clusterNames, c.OverrideManager, binding, apiextensionsv1.NamespaceScoped) if err != nil { - klog.Errorf("Failed to transform resourceBinding %s/%s to works. Error: %v.", + klog.Errorf("Failed to transform resourceBinding(%s/%s) to works. Error: %v.", + binding.GetNamespace(), binding.GetName(), err) + return controllerruntime.Result{Requeue: true}, err + } + + err = helper.AggregateResourceBindingWorkStatus(c.Client, binding, workload) + if err != nil { + klog.Errorf("Failed to aggregate workStatuses to resourceBinding(%s/%s). Error: %v.", binding.GetNamespace(), binding.GetName(), err) return controllerruntime.Result{Requeue: true}, err } @@ -96,5 +112,52 @@ func (c *ResourceBindingController) syncBinding(binding *workv1alpha1.ResourceBi // SetupWithManager creates a controller and register to controller manager. func (c *ResourceBindingController) SetupWithManager(mgr controllerruntime.Manager) error { - return controllerruntime.NewControllerManagedBy(mgr).For(&workv1alpha1.ResourceBinding{}).Complete(c) + workFn := handler.ToRequestsFunc( + func(a handler.MapObject) []reconcile.Request { + var requests []reconcile.Request + + labels := a.Meta.GetLabels() + namespacesName := types.NamespacedName{ + Namespace: labels[util.ResourceBindingNamespaceLabel], + Name: labels[util.ResourceBindingNameLabel], + } + + requests = append(requests, reconcile.Request{NamespacedName: namespacesName}) + return requests + }) + + predicateFn := builder.WithPredicates(predicate.Funcs{ + CreateFunc: func(e event.CreateEvent) bool { + return false + }, + UpdateFunc: func(e event.UpdateEvent) bool { + var statusesOld, statusesNew []workv1alpha1.ManifestStatus + + switch oldWork := e.ObjectOld.(type) { + case *workv1alpha1.Work: + statusesOld = oldWork.Status.ManifestStatuses + default: + return false + } + + switch newWork := e.ObjectNew.(type) { + case *workv1alpha1.Work: + statusesNew = newWork.Status.ManifestStatuses + default: + return false + } + + return !reflect.DeepEqual(statusesOld, statusesNew) + }, + DeleteFunc: func(event.DeleteEvent) bool { + return false + }, + GenericFunc: func(event.GenericEvent) bool { + return false + }, + }) + + return controllerruntime.NewControllerManagedBy(mgr).For(&workv1alpha1.ResourceBinding{}). + Watches(&source.Kind{Type: &workv1alpha1.Work{}}, &handler.EnqueueRequestsFromMapFunc{ToRequests: workFn}, predicateFn). + Complete(c) } diff --git a/pkg/controllers/status/workstatus_controller.go b/pkg/controllers/status/workstatus_controller.go index a52588060..9e63811d2 100644 --- a/pkg/controllers/status/workstatus_controller.go +++ b/pkg/controllers/status/workstatus_controller.go @@ -22,6 +22,7 @@ import ( workv1alpha1 "github.com/karmada-io/karmada/pkg/apis/work/v1alpha1" "github.com/karmada-io/karmada/pkg/util" + "github.com/karmada-io/karmada/pkg/util/helper" "github.com/karmada-io/karmada/pkg/util/informermanager" "github.com/karmada-io/karmada/pkg/util/names" "github.com/karmada-io/karmada/pkg/util/objectwatcher" @@ -246,7 +247,7 @@ func (c *WorkStatusController) reflectStatus(work *workv1alpha1.Work, clusterObj } func (c *WorkStatusController) buildStatusIdentifier(work *workv1alpha1.Work, clusterObj *unstructured.Unstructured) (*workv1alpha1.ResourceIdentifier, error) { - ordinal, err := c.getManifestIndex(work.Spec.Workload.Manifests, clusterObj) + ordinal, err := helper.GetManifestIndex(work.Spec.Workload.Manifests, clusterObj) if err != nil { return nil, err } @@ -289,24 +290,6 @@ func (c *WorkStatusController) mergeStatus(statuses []workv1alpha1.ManifestStatu return []workv1alpha1.ManifestStatus{newStatus} } -func (c *WorkStatusController) getManifestIndex(manifests []workv1alpha1.Manifest, clusterObj *unstructured.Unstructured) (int, error) { - for index, rawManifest := range manifests { - manifest := &unstructured.Unstructured{} - if err := manifest.UnmarshalJSON(rawManifest.Raw); err != nil { - return -1, err - } - - if manifest.GetAPIVersion() == clusterObj.GetAPIVersion() && - manifest.GetKind() == clusterObj.GetKind() && - manifest.GetNamespace() == clusterObj.GetNamespace() && - manifest.GetName() == clusterObj.GetName() { - return index, nil - } - } - - return -1, fmt.Errorf("no such manifest exist") -} - func (c *WorkStatusController) getRawManifest(manifests []workv1alpha1.Manifest, clusterObj *unstructured.Unstructured) (*unstructured.Unstructured, error) { for _, rawManifest := range manifests { manifest := &unstructured.Unstructured{} diff --git a/pkg/util/helper/workstatus.go b/pkg/util/helper/workstatus.go new file mode 100644 index 000000000..45c3a591c --- /dev/null +++ b/pkg/util/helper/workstatus.go @@ -0,0 +1,129 @@ +package helper + +import ( + "context" + "fmt" + "reflect" + + apiextensionsv1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1" + "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" + "k8s.io/apimachinery/pkg/labels" + "k8s.io/apimachinery/pkg/runtime/schema" + "k8s.io/klog/v2" + "sigs.k8s.io/controller-runtime/pkg/client" + + workv1alpha1 "github.com/karmada-io/karmada/pkg/apis/work/v1alpha1" + "github.com/karmada-io/karmada/pkg/util" + "github.com/karmada-io/karmada/pkg/util/names" +) + +// AggregateResourceBindingWorkStatus will collect all work statuses with current ResourceBinding objects, +// then aggregate status info to current ResourceBinding status. +func AggregateResourceBindingWorkStatus(c client.Client, binding *workv1alpha1.ResourceBinding, workload *unstructured.Unstructured) error { + aggregatedStatuses, err := assembleWorkStatus(c, binding.Namespace, binding.Name, workload, apiextensionsv1.NamespaceScoped) + if err != nil { + return err + } + + if reflect.DeepEqual(binding.Status.AggregatedStatus, aggregatedStatuses) { + klog.Infof("New aggregatedStatuses are equal with old resourceBinding(%s/%s) AggregatedStatus, no update required.", + binding.Namespace, binding.Name) + return nil + } + + binding.Status.AggregatedStatus = aggregatedStatuses + err = c.Status().Update(context.TODO(), binding) + if err != nil { + klog.Errorf("Failed update resourceBinding(%s/%s). Error: %v.", binding.Namespace, binding.Name, err) + return err + } + klog.Infof("Update resourceBinding(%s/%s) with AggregatedStatus successfully.", binding.Namespace, binding.Name) + + return nil +} + +func assembleWorkStatus(c client.Client, bindingNamespace, bindingName string, workload *unstructured.Unstructured, + scope apiextensionsv1.ResourceScope) ([]workv1alpha1.AggregatedStatusItem, error) { + + workList := &workv1alpha1.WorkList{} + switch scope { + case apiextensionsv1.NamespaceScoped: + selector := labels.SelectorFromSet(labels.Set{ + util.ResourceBindingNamespaceLabel: bindingNamespace, + util.ResourceBindingNameLabel: bindingName, + }) + + if err := c.List(context.TODO(), workList, &client.ListOptions{LabelSelector: selector}); err != nil { + return nil, err + } + } + + statuses := make([]workv1alpha1.AggregatedStatusItem, 0) + for _, work := range workList.Items { + identifierIndex, err := GetManifestIndex(work.Spec.Workload.Manifests, workload) + if err != nil { + klog.Errorf("Failed to get manifestIndex of workload in work.Spec.Workload.Manifests. Error: %v.", err) + return nil, err + } + + for _, manifestStatus := range work.Status.ManifestStatuses { + equal, err := equalIdentifier(&manifestStatus.Identifier, identifierIndex, workload) + if err != nil { + return nil, err + } + if equal { + clusterName, err := names.GetClusterName(work.Namespace) + if err != nil { + klog.Errorf("Failed to get clusterName from work namespace %s. Error: %v.", work.Namespace, err) + return nil, err + } + + aggregatedStatus := workv1alpha1.AggregatedStatusItem{ + ClusterName: clusterName, + Status: manifestStatus.Status, + } + statuses = append(statuses, aggregatedStatus) + break + } + } + } + + return statuses, nil +} + +// GetManifestIndex get the index of clusterObj in manifest list, if not exist return -1. +func GetManifestIndex(manifests []workv1alpha1.Manifest, clusterObj *unstructured.Unstructured) (int, error) { + for index, rawManifest := range manifests { + manifest := &unstructured.Unstructured{} + if err := manifest.UnmarshalJSON(rawManifest.Raw); err != nil { + return -1, err + } + + if manifest.GetAPIVersion() == clusterObj.GetAPIVersion() && + manifest.GetKind() == clusterObj.GetKind() && + manifest.GetNamespace() == clusterObj.GetNamespace() && + manifest.GetName() == clusterObj.GetName() { + return index, nil + } + } + + return -1, fmt.Errorf("no such manifest exist") +} + +func equalIdentifier(targetIdentifier *workv1alpha1.ResourceIdentifier, ordinal int, workload *unstructured.Unstructured) (bool, error) { + groupVersion, err := schema.ParseGroupVersion(workload.GetAPIVersion()) + if err != nil { + return false, err + } + + if targetIdentifier.Ordinal == ordinal && + targetIdentifier.Group == groupVersion.Group && + targetIdentifier.Version == groupVersion.Version && + targetIdentifier.Kind == workload.GetKind() && + targetIdentifier.Namespace == workload.GetNamespace() && + targetIdentifier.Name == workload.GetName() { + return true, nil + } + + return false, nil +}