diff --git a/cmd/controller-manager/app/controllermanager.go b/cmd/controller-manager/app/controllermanager.go index ba80b23e5..08b9df6cd 100644 --- a/cmd/controller-manager/app/controllermanager.go +++ b/cmd/controller-manager/app/controllermanager.go @@ -184,6 +184,7 @@ func init() { controllers["clusterStatus"] = startClusterStatusController controllers["hpa"] = startHpaController controllers["binding"] = startBindingController + controllers["bindingStatus"] = startBindingStatusController controllers["execution"] = startExecutionController controllers["workStatus"] = startWorkStatusController controllers["namespace"] = startNamespaceController @@ -339,6 +340,36 @@ func startBindingController(ctx controllerscontext.Context) (enabled bool, err e return true, nil } +func startBindingStatusController(ctx controllerscontext.Context) (enabled bool, err error) { + rbStatusController := &status.RBStatusController{ + Client: ctx.Mgr.GetClient(), + DynamicClient: ctx.DynamicClientSet, + InformerManager: ctx.ControlPlaneInformerManager, + ResourceInterpreter: ctx.ResourceInterpreter, + EventRecorder: ctx.Mgr.GetEventRecorderFor(status.RBStatusControllerName), + RESTMapper: ctx.Mgr.GetRESTMapper(), + RateLimiterOptions: ctx.Opts.RateLimiterOptions, + } + if err := rbStatusController.SetupWithManager(ctx.Mgr); err != nil { + return false, err + } + + crbStatusController := &status.CRBStatusController{ + Client: ctx.Mgr.GetClient(), + DynamicClient: ctx.DynamicClientSet, + InformerManager: ctx.ControlPlaneInformerManager, + ResourceInterpreter: ctx.ResourceInterpreter, + EventRecorder: ctx.Mgr.GetEventRecorderFor(status.CRBStatusControllerName), + RESTMapper: ctx.Mgr.GetRESTMapper(), + RateLimiterOptions: ctx.Opts.RateLimiterOptions, + } + if err := crbStatusController.SetupWithManager(ctx.Mgr); err != nil { + return false, err + } + + return true, nil +} + func startExecutionController(ctx controllerscontext.Context) (enabled bool, err error) { executionController := &execution.Controller{ Client: ctx.Mgr.GetClient(), diff --git a/pkg/controllers/binding/binding_controller.go b/pkg/controllers/binding/binding_controller.go index e6c6e734a..d322146c9 100644 --- a/pkg/controllers/binding/binding_controller.go +++ b/pkg/controllers/binding/binding_controller.go @@ -3,17 +3,13 @@ package binding import ( "context" "fmt" - "reflect" "time" corev1 "k8s.io/api/core/v1" apiextensionsv1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1" apierrors "k8s.io/apimachinery/pkg/api/errors" "k8s.io/apimachinery/pkg/api/meta" - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/apimachinery/pkg/types" - "k8s.io/apimachinery/pkg/util/errors" "k8s.io/client-go/dynamic" "k8s.io/client-go/tools/record" "k8s.io/klog/v2" @@ -22,12 +18,11 @@ import ( "sigs.k8s.io/controller-runtime/pkg/controller" "sigs.k8s.io/controller-runtime/pkg/controller/controllerutil" "sigs.k8s.io/controller-runtime/pkg/handler" + "sigs.k8s.io/controller-runtime/pkg/predicate" "sigs.k8s.io/controller-runtime/pkg/reconcile" "sigs.k8s.io/controller-runtime/pkg/source" - configv1alpha1 "github.com/karmada-io/karmada/pkg/apis/config/v1alpha1" policyv1alpha1 "github.com/karmada-io/karmada/pkg/apis/policy/v1alpha1" - workv1alpha1 "github.com/karmada-io/karmada/pkg/apis/work/v1alpha1" workv1alpha2 "github.com/karmada-io/karmada/pkg/apis/work/v1alpha2" "github.com/karmada-io/karmada/pkg/events" "github.com/karmada-io/karmada/pkg/metrics" @@ -37,7 +32,6 @@ import ( "github.com/karmada-io/karmada/pkg/util/fedinformer/genericmanager" "github.com/karmada-io/karmada/pkg/util/helper" "github.com/karmada-io/karmada/pkg/util/overridemanager" - "github.com/karmada-io/karmada/pkg/util/restmapper" ) // ControllerName is the controller name that will be used when reporting events. @@ -103,7 +97,7 @@ func (c *ResourceBindingController) syncBinding(binding *workv1alpha2.ResourceBi return controllerruntime.Result{Requeue: true}, err } - workload, err := helper.FetchWorkload(c.DynamicClient, c.InformerManager, c.RESTMapper, binding.Spec.Resource) + workload, err := helper.FetchResourceTemplate(c.DynamicClient, c.InformerManager, c.RESTMapper, binding.Spec.Resource) if err != nil { if apierrors.IsNotFound(err) { // It might happen when the resource template has been removed but the garbage collector hasn't removed @@ -115,7 +109,6 @@ func (c *ResourceBindingController) syncBinding(binding *workv1alpha2.ResourceBi binding.GetNamespace(), binding.GetName(), err) return controllerruntime.Result{Requeue: true}, err } - var errs []error start := time.Now() err = ensureWork(c.Client, c.ResourceInterpreter, workload, c.OverrideManager, binding, apiextensionsv1.NamespaceScoped) metrics.ObserveSyncWorkLatency(binding.ObjectMeta, err, start) @@ -124,27 +117,13 @@ func (c *ResourceBindingController) syncBinding(binding *workv1alpha2.ResourceBi binding.GetNamespace(), binding.GetName(), err) c.EventRecorder.Event(binding, corev1.EventTypeWarning, events.EventReasonSyncWorkFailed, err.Error()) c.EventRecorder.Event(workload, corev1.EventTypeWarning, events.EventReasonSyncWorkFailed, err.Error()) - errs = append(errs, err) - } else { - msg := fmt.Sprintf("Sync work of resourceBinding(%s/%s) successful.", binding.Namespace, binding.Name) - klog.V(4).Infof(msg) - c.EventRecorder.Event(binding, corev1.EventTypeNormal, events.EventReasonSyncWorkSucceed, msg) - c.EventRecorder.Event(workload, corev1.EventTypeNormal, events.EventReasonSyncWorkSucceed, msg) - } - if err = helper.AggregateResourceBindingWorkStatus(c.Client, binding, workload, c.EventRecorder); err != nil { - klog.Errorf("Failed to aggregate workStatuses to resourceBinding(%s/%s). Error: %v.", - binding.GetNamespace(), binding.GetName(), err) - errs = append(errs, err) - } - if len(errs) > 0 { - return controllerruntime.Result{Requeue: true}, errors.NewAggregate(errs) - } - - err = c.updateResourceStatus(binding) - if err != nil { return controllerruntime.Result{Requeue: true}, err } + msg := fmt.Sprintf("Sync work of resourceBinding(%s/%s) successful.", binding.Namespace, binding.Name) + klog.V(4).Infof(msg) + c.EventRecorder.Event(binding, corev1.EventTypeNormal, events.EventReasonSyncWorkSucceed, msg) + c.EventRecorder.Event(workload, corev1.EventTypeNormal, events.EventReasonSyncWorkSucceed, msg) return controllerruntime.Result{}, nil } @@ -168,71 +147,13 @@ func (c *ResourceBindingController) removeOrphanWorks(binding *workv1alpha2.Reso return nil } -// updateResourceStatus will try to calculate the summary status and update to original object -// that the ResourceBinding refer to. -func (c *ResourceBindingController) updateResourceStatus(binding *workv1alpha2.ResourceBinding) error { - resource := binding.Spec.Resource - gvr, err := restmapper.GetGroupVersionResource( - c.RESTMapper, schema.FromAPIVersionAndKind(resource.APIVersion, resource.Kind), - ) - if err != nil { - klog.Errorf("Failed to get GVR from GVK %s %s. Error: %v", resource.APIVersion, resource.Kind, err) - return err - } - obj, err := helper.FetchWorkload(c.DynamicClient, c.InformerManager, c.RESTMapper, resource) - if err != nil { - klog.Errorf("Failed to get resource(%s/%s/%s), Error: %v", resource.Kind, resource.Namespace, resource.Name, err) - return err - } - - if !c.ResourceInterpreter.HookEnabled(obj.GroupVersionKind(), configv1alpha1.InterpreterOperationAggregateStatus) { - return nil - } - newObj, err := c.ResourceInterpreter.AggregateStatus(obj, binding.Status.AggregatedStatus) - if err != nil { - klog.Errorf("AggregateStatus for resource(%s/%s/%s) failed: %v", resource.Kind, resource.Namespace, resource.Name, err) - return err - } - if reflect.DeepEqual(obj, newObj) { - klog.V(3).Infof("Ignore update resource(%s/%s/%s) status as up to date", resource.Kind, resource.Namespace, resource.Name) - return nil - } - - if _, err = c.DynamicClient.Resource(gvr).Namespace(resource.Namespace).UpdateStatus(context.TODO(), newObj, metav1.UpdateOptions{}); err != nil { - klog.Errorf("Failed to update resource(%s/%s/%s), Error: %v", resource.Kind, resource.Namespace, resource.Name, err) - return err - } - klog.V(3).Infof("Update resource status successfully for resource(%s/%s/%s)", resource.Kind, resource.Namespace, resource.Name) - return nil -} - // SetupWithManager creates a controller and register to controller manager. func (c *ResourceBindingController) SetupWithManager(mgr controllerruntime.Manager) error { - workFn := handler.MapFunc( - func(a client.Object) []reconcile.Request { - var requests []reconcile.Request - annotations := a.GetAnnotations() - crNamespace, namespaceExist := annotations[workv1alpha2.ResourceBindingNamespaceAnnotationKey] - crName, nameExist := annotations[workv1alpha2.ResourceBindingNameAnnotationKey] - if namespaceExist && nameExist { - requests = append(requests, reconcile.Request{ - NamespacedName: types.NamespacedName{ - Namespace: crNamespace, - Name: crName, - }, - }) - } - - return requests - }) - return controllerruntime.NewControllerManagedBy(mgr).For(&workv1alpha2.ResourceBinding{}). - Watches(&source.Kind{Type: &workv1alpha1.Work{}}, handler.EnqueueRequestsFromMapFunc(workFn), workPredicateFn). + WithEventFilter(predicate.GenerationChangedPredicate{}). Watches(&source.Kind{Type: &policyv1alpha1.OverridePolicy{}}, handler.EnqueueRequestsFromMapFunc(c.newOverridePolicyFunc())). Watches(&source.Kind{Type: &policyv1alpha1.ClusterOverridePolicy{}}, handler.EnqueueRequestsFromMapFunc(c.newOverridePolicyFunc())). - WithOptions(controller.Options{ - RateLimiter: ratelimiterflag.DefaultControllerRateLimiter(c.RateLimiterOptions), - }). + WithOptions(controller.Options{RateLimiter: ratelimiterflag.DefaultControllerRateLimiter(c.RateLimiterOptions)}). Complete(c) } @@ -270,7 +191,7 @@ func (c *ResourceBindingController) newOverridePolicyFunc() handler.MapFunc { continue } - workload, err := helper.FetchWorkload(c.DynamicClient, c.InformerManager, c.RESTMapper, binding.Spec.Resource) + workload, err := helper.FetchResourceTemplate(c.DynamicClient, c.InformerManager, c.RESTMapper, binding.Spec.Resource) if err != nil { klog.Errorf("Failed to fetch workload for resourceBinding(%s/%s). Error: %v.", binding.Namespace, binding.Name, err) return nil diff --git a/pkg/controllers/binding/cluster_resource_binding_controller.go b/pkg/controllers/binding/cluster_resource_binding_controller.go index 29ca17581..5ccfc06d2 100644 --- a/pkg/controllers/binding/cluster_resource_binding_controller.go +++ b/pkg/controllers/binding/cluster_resource_binding_controller.go @@ -3,17 +3,13 @@ package binding import ( "context" "fmt" - "reflect" "time" corev1 "k8s.io/api/core/v1" apiextensionsv1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1" apierrors "k8s.io/apimachinery/pkg/api/errors" "k8s.io/apimachinery/pkg/api/meta" - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/apimachinery/pkg/types" - "k8s.io/apimachinery/pkg/util/errors" "k8s.io/client-go/dynamic" "k8s.io/client-go/tools/record" "k8s.io/klog/v2" @@ -22,12 +18,11 @@ import ( "sigs.k8s.io/controller-runtime/pkg/controller" "sigs.k8s.io/controller-runtime/pkg/controller/controllerutil" "sigs.k8s.io/controller-runtime/pkg/handler" + "sigs.k8s.io/controller-runtime/pkg/predicate" "sigs.k8s.io/controller-runtime/pkg/reconcile" "sigs.k8s.io/controller-runtime/pkg/source" - configv1alpha1 "github.com/karmada-io/karmada/pkg/apis/config/v1alpha1" policyv1alpha1 "github.com/karmada-io/karmada/pkg/apis/policy/v1alpha1" - workv1alpha1 "github.com/karmada-io/karmada/pkg/apis/work/v1alpha1" workv1alpha2 "github.com/karmada-io/karmada/pkg/apis/work/v1alpha2" "github.com/karmada-io/karmada/pkg/events" "github.com/karmada-io/karmada/pkg/metrics" @@ -37,7 +32,6 @@ import ( "github.com/karmada-io/karmada/pkg/util/fedinformer/genericmanager" "github.com/karmada-io/karmada/pkg/util/helper" "github.com/karmada-io/karmada/pkg/util/overridemanager" - "github.com/karmada-io/karmada/pkg/util/restmapper" ) // ClusterResourceBindingControllerName is the controller name that will be used when reporting events. @@ -103,7 +97,7 @@ func (c *ClusterResourceBindingController) syncBinding(binding *workv1alpha2.Clu return controllerruntime.Result{Requeue: true}, err } - workload, err := helper.FetchWorkload(c.DynamicClient, c.InformerManager, c.RESTMapper, binding.Spec.Resource) + workload, err := helper.FetchResourceTemplate(c.DynamicClient, c.InformerManager, c.RESTMapper, binding.Spec.Resource) if err != nil { if apierrors.IsNotFound(err) { // It might happen when the resource template has been removed but the garbage collector hasn't removed @@ -114,7 +108,7 @@ func (c *ClusterResourceBindingController) syncBinding(binding *workv1alpha2.Clu klog.Errorf("Failed to fetch workload for clusterResourceBinding(%s). Error: %v.", binding.GetName(), err) return controllerruntime.Result{Requeue: true}, err } - var errs []error + start := time.Now() err = ensureWork(c.Client, c.ResourceInterpreter, workload, c.OverrideManager, binding, apiextensionsv1.ClusterScoped) metrics.ObserveSyncWorkLatency(binding.ObjectMeta, err, start) @@ -122,28 +116,13 @@ func (c *ClusterResourceBindingController) syncBinding(binding *workv1alpha2.Clu klog.Errorf("Failed to transform clusterResourceBinding(%s) to works. Error: %v.", binding.GetName(), err) c.EventRecorder.Event(binding, corev1.EventTypeWarning, events.EventReasonSyncWorkFailed, err.Error()) c.EventRecorder.Event(workload, corev1.EventTypeWarning, events.EventReasonSyncWorkFailed, err.Error()) - errs = append(errs, err) - } else { - msg := fmt.Sprintf("Sync work of clusterResourceBinding(%s) successful.", binding.GetName()) - klog.V(4).Infof(msg) - c.EventRecorder.Event(binding, corev1.EventTypeNormal, events.EventReasonSyncWorkSucceed, msg) - c.EventRecorder.Event(workload, corev1.EventTypeNormal, events.EventReasonSyncWorkSucceed, msg) - } - - err = helper.AggregateClusterResourceBindingWorkStatus(c.Client, binding, workload, c.EventRecorder) - if err != nil { - klog.Errorf("Failed to aggregate workStatuses to clusterResourceBinding(%s). Error: %v.", binding.GetName(), err) - errs = append(errs, err) - } - if len(errs) > 0 { - return controllerruntime.Result{Requeue: true}, errors.NewAggregate(errs) - } - - err = c.updateResourceStatus(binding) - if err != nil { return controllerruntime.Result{Requeue: true}, err } + msg := fmt.Sprintf("Sync work of clusterResourceBinding(%s) successful.", binding.GetName()) + klog.V(4).Infof(msg) + c.EventRecorder.Event(binding, corev1.EventTypeNormal, events.EventReasonSyncWorkSucceed, msg) + c.EventRecorder.Event(workload, corev1.EventTypeNormal, events.EventReasonSyncWorkSucceed, msg) return controllerruntime.Result{}, nil } @@ -165,68 +144,12 @@ func (c *ClusterResourceBindingController) removeOrphanWorks(binding *workv1alph return nil } -// updateResourceStatus will try to calculate the summary status and update to original object -// that the ResourceBinding refer to. -func (c *ClusterResourceBindingController) updateResourceStatus(binding *workv1alpha2.ClusterResourceBinding) error { - resource := binding.Spec.Resource - gvr, err := restmapper.GetGroupVersionResource( - c.RESTMapper, schema.FromAPIVersionAndKind(resource.APIVersion, resource.Kind), - ) - if err != nil { - klog.Errorf("Failed to get GVR from GVK %s %s. Error: %v", resource.APIVersion, resource.Kind, err) - return err - } - obj, err := helper.FetchWorkload(c.DynamicClient, c.InformerManager, c.RESTMapper, resource) - if err != nil { - klog.Errorf("Failed to get resource(%s/%s/%s), Error: %v", resource.Kind, resource.Namespace, resource.Name, err) - return err - } - - if !c.ResourceInterpreter.HookEnabled(obj.GroupVersionKind(), configv1alpha1.InterpreterOperationAggregateStatus) { - return nil - } - newObj, err := c.ResourceInterpreter.AggregateStatus(obj, binding.Status.AggregatedStatus) - if err != nil { - klog.Errorf("AggregateStatus for resource(%s/%s/%s) failed: %v", resource.Kind, resource.Namespace, resource.Name, err) - return err - } - if reflect.DeepEqual(obj, newObj) { - klog.V(3).Infof("Ignore update resource(%s/%s/%s) status as up to date", resource.Kind, resource.Namespace, resource.Name) - return nil - } - - if _, err = c.DynamicClient.Resource(gvr).Namespace(resource.Namespace).UpdateStatus(context.TODO(), newObj, metav1.UpdateOptions{}); err != nil { - klog.Errorf("Failed to update resource(%s/%s/%s), Error: %v", resource.Kind, resource.Namespace, resource.Name, err) - return err - } - klog.V(3).Infof("Update resource status successfully for resource(%s/%s/%s)", resource.Kind, resource.Namespace, resource.Name) - return nil -} - // SetupWithManager creates a controller and register to controller manager. func (c *ClusterResourceBindingController) SetupWithManager(mgr controllerruntime.Manager) error { - workFn := handler.MapFunc( - func(a client.Object) []reconcile.Request { - var requests []reconcile.Request - annotations := a.GetAnnotations() - crbName, nameExist := annotations[workv1alpha2.ClusterResourceBindingAnnotationKey] - if nameExist { - requests = append(requests, reconcile.Request{ - NamespacedName: types.NamespacedName{ - Name: crbName, - }, - }) - } - - return requests - }) - return controllerruntime.NewControllerManagedBy(mgr).For(&workv1alpha2.ClusterResourceBinding{}). - Watches(&source.Kind{Type: &workv1alpha1.Work{}}, handler.EnqueueRequestsFromMapFunc(workFn), workPredicateFn). Watches(&source.Kind{Type: &policyv1alpha1.ClusterOverridePolicy{}}, handler.EnqueueRequestsFromMapFunc(c.newOverridePolicyFunc())). - WithOptions(controller.Options{ - RateLimiter: ratelimiterflag.DefaultControllerRateLimiter(c.RateLimiterOptions), - }). + WithEventFilter(predicate.GenerationChangedPredicate{}). + WithOptions(controller.Options{RateLimiter: ratelimiterflag.DefaultControllerRateLimiter(c.RateLimiterOptions)}). Complete(c) } @@ -255,7 +178,7 @@ func (c *ClusterResourceBindingController) newOverridePolicyFunc() handler.MapFu continue } - workload, err := helper.FetchWorkload(c.DynamicClient, c.InformerManager, c.RESTMapper, binding.Spec.Resource) + workload, err := helper.FetchResourceTemplate(c.DynamicClient, c.InformerManager, c.RESTMapper, binding.Spec.Resource) if err != nil { klog.Errorf("Failed to fetch workload for clusterResourceBinding(%s). Error: %v.", binding.Name, err) return nil diff --git a/pkg/controllers/binding/common.go b/pkg/controllers/binding/common.go index a13f5e7a8..cc6b7aa92 100644 --- a/pkg/controllers/binding/common.go +++ b/pkg/controllers/binding/common.go @@ -1,16 +1,11 @@ package binding import ( - "reflect" - apiextensionsv1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" "k8s.io/klog/v2" - "sigs.k8s.io/controller-runtime/pkg/builder" "sigs.k8s.io/controller-runtime/pkg/client" - "sigs.k8s.io/controller-runtime/pkg/event" - "sigs.k8s.io/controller-runtime/pkg/predicate" configv1alpha1 "github.com/karmada-io/karmada/pkg/apis/config/v1alpha1" workv1alpha1 "github.com/karmada-io/karmada/pkg/apis/work/v1alpha1" @@ -22,37 +17,6 @@ import ( "github.com/karmada-io/karmada/pkg/util/overridemanager" ) -var workPredicateFn = builder.WithPredicates(predicate.Funcs{ - CreateFunc: func(e event.CreateEvent) bool { - return false - }, - UpdateFunc: func(e event.UpdateEvent) bool { - var statusesOld, statusesNew workv1alpha1.WorkStatus - - switch oldWork := e.ObjectOld.(type) { - case *workv1alpha1.Work: - statusesOld = oldWork.Status - default: - return false - } - - switch newWork := e.ObjectNew.(type) { - case *workv1alpha1.Work: - statusesNew = newWork.Status - default: - return false - } - - return !reflect.DeepEqual(statusesOld, statusesNew) - }, - DeleteFunc: func(event.DeleteEvent) bool { - return true - }, - GenericFunc: func(event.GenericEvent) bool { - return false - }, -}) - // ensureWork ensure Work to be created or updated. func ensureWork( c client.Client, resourceInterpreter resourceinterpreter.ResourceInterpreter, workload *unstructured.Unstructured, diff --git a/pkg/controllers/status/common.go b/pkg/controllers/status/common.go new file mode 100644 index 000000000..067de868f --- /dev/null +++ b/pkg/controllers/status/common.go @@ -0,0 +1,83 @@ +package status + +import ( + "context" + "reflect" + + "k8s.io/apimachinery/pkg/api/meta" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" + "k8s.io/apimachinery/pkg/runtime/schema" + "k8s.io/client-go/dynamic" + "k8s.io/klog/v2" + "sigs.k8s.io/controller-runtime/pkg/builder" + "sigs.k8s.io/controller-runtime/pkg/event" + "sigs.k8s.io/controller-runtime/pkg/predicate" + + configv1alpha1 "github.com/karmada-io/karmada/pkg/apis/config/v1alpha1" + workv1alpha1 "github.com/karmada-io/karmada/pkg/apis/work/v1alpha1" + workv1alpha2 "github.com/karmada-io/karmada/pkg/apis/work/v1alpha2" + "github.com/karmada-io/karmada/pkg/resourceinterpreter" + "github.com/karmada-io/karmada/pkg/util/restmapper" +) + +var workPredicateFn = builder.WithPredicates(predicate.Funcs{ + CreateFunc: func(e event.CreateEvent) bool { return false }, + UpdateFunc: func(e event.UpdateEvent) bool { + var oldStatus, newStatus workv1alpha1.WorkStatus + + switch oldWork := e.ObjectOld.(type) { + case *workv1alpha1.Work: + oldStatus = oldWork.Status + default: + return false + } + + switch newWork := e.ObjectNew.(type) { + case *workv1alpha1.Work: + newStatus = newWork.Status + default: + return false + } + + return !reflect.DeepEqual(oldStatus, newStatus) + }, + DeleteFunc: func(event.DeleteEvent) bool { return true }, + GenericFunc: func(event.GenericEvent) bool { return false }, +}) + +// updateResourceStatus will try to calculate the summary status and +// update to original object that the ResourceBinding refer to. +func updateResourceStatus( + dynamicClient dynamic.Interface, + restMapper meta.RESTMapper, + interpreter resourceinterpreter.ResourceInterpreter, + resource *unstructured.Unstructured, + bindingStatus workv1alpha2.ResourceBindingStatus, +) error { + gvr, err := restmapper.GetGroupVersionResource(restMapper, schema.FromAPIVersionAndKind(resource.GetAPIVersion(), resource.GetKind())) + if err != nil { + klog.Errorf("Failed to get GVR from GVK(%s/%s), Error: %v", resource.GetAPIVersion(), resource.GetKind(), err) + return err + } + + if !interpreter.HookEnabled(resource.GroupVersionKind(), configv1alpha1.InterpreterOperationAggregateStatus) { + return nil + } + newObj, err := interpreter.AggregateStatus(resource, bindingStatus.AggregatedStatus) + if err != nil { + klog.Errorf("Failed to aggregate status for resource(%s/%s/%s, Error: %v", gvr, resource.GetNamespace(), resource.GetName(), err) + return err + } + if reflect.DeepEqual(resource, newObj) { + klog.V(3).Infof("Ignore update resource(%s/%s/%s) status as up to date.", gvr, resource.GetNamespace(), resource.GetName()) + return nil + } + + if _, err = dynamicClient.Resource(gvr).Namespace(resource.GetNamespace()).UpdateStatus(context.TODO(), newObj, metav1.UpdateOptions{}); err != nil { + klog.Errorf("Failed to update resource(%s/%s/%s), Error: %v", gvr, resource.GetNamespace(), resource.GetName(), err) + return err + } + klog.V(3).Infof("Update resource(%s/%s/%s) status successfully.", gvr, resource.GetNamespace(), resource.GetName()) + return nil +} diff --git a/pkg/controllers/status/crb_status_controller.go b/pkg/controllers/status/crb_status_controller.go new file mode 100644 index 000000000..85922d383 --- /dev/null +++ b/pkg/controllers/status/crb_status_controller.go @@ -0,0 +1,121 @@ +package status + +import ( + "context" + + apierrors "k8s.io/apimachinery/pkg/api/errors" + "k8s.io/apimachinery/pkg/api/meta" + "k8s.io/apimachinery/pkg/types" + "k8s.io/client-go/dynamic" + "k8s.io/client-go/tools/record" + "k8s.io/klog/v2" + controllerruntime "sigs.k8s.io/controller-runtime" + "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/controller" + "sigs.k8s.io/controller-runtime/pkg/handler" + "sigs.k8s.io/controller-runtime/pkg/reconcile" + "sigs.k8s.io/controller-runtime/pkg/source" + + workv1alpha1 "github.com/karmada-io/karmada/pkg/apis/work/v1alpha1" + workv1alpha2 "github.com/karmada-io/karmada/pkg/apis/work/v1alpha2" + "github.com/karmada-io/karmada/pkg/resourceinterpreter" + "github.com/karmada-io/karmada/pkg/sharedcli/ratelimiterflag" + "github.com/karmada-io/karmada/pkg/util/fedinformer/genericmanager" + "github.com/karmada-io/karmada/pkg/util/helper" +) + +// CRBStatusControllerName is the controller name that will be used when reporting events. +const CRBStatusControllerName = "cluster-resource-binding-status-controller" + +// CRBStatusController is to sync status of ClusterResourceBinding +// and aggregate status to the resource template. +type CRBStatusController struct { + client.Client // used to operate ClusterResourceBinding resources. + DynamicClient dynamic.Interface // used to fetch arbitrary resources from api server. + InformerManager genericmanager.SingleClusterInformerManager // used to fetch arbitrary resources from cache. + EventRecorder record.EventRecorder + RESTMapper meta.RESTMapper + ResourceInterpreter resourceinterpreter.ResourceInterpreter + RateLimiterOptions ratelimiterflag.Options +} + +// Reconcile performs a full reconciliation for the object referred to by the Request. +// The Controller will requeue the Request to be processed again if an error is non-nil or +// Result.Requeue is true, otherwise upon completion it will remove the work from the queue. +func (c *CRBStatusController) Reconcile(ctx context.Context, req controllerruntime.Request) (controllerruntime.Result, error) { + klog.V(4).Infof("Reconciling ClusterResourceBinding %s.", req.NamespacedName.String()) + + binding := &workv1alpha2.ClusterResourceBinding{} + if err := c.Client.Get(ctx, req.NamespacedName, binding); err != nil { + // The rb no longer exist, in which case we stop processing. + if apierrors.IsNotFound(err) { + return controllerruntime.Result{}, nil + } + + return controllerruntime.Result{Requeue: true}, err + } + + // The crb is being deleted, in which case we stop processing. + if !binding.DeletionTimestamp.IsZero() { + return controllerruntime.Result{}, nil + } + + err := c.syncBindingStatus(binding) + if err != nil { + return controllerruntime.Result{Requeue: true}, err + } + return controllerruntime.Result{}, nil +} + +// SetupWithManager creates a controller and register to controller manager. +func (c *CRBStatusController) SetupWithManager(mgr controllerruntime.Manager) error { + workMapFunc := handler.MapFunc( + func(workObj client.Object) []reconcile.Request { + var requests []reconcile.Request + + annotations := workObj.GetAnnotations() + name, nameExist := annotations[workv1alpha2.ClusterResourceBindingAnnotationKey] + if nameExist { + requests = append(requests, reconcile.Request{ + NamespacedName: types.NamespacedName{ + Name: name, + }, + }) + } + + return requests + }) + + return controllerruntime.NewControllerManagedBy(mgr).Named("clusterResourceBinding_status_controller"). + Watches(&source.Kind{Type: &workv1alpha1.Work{}}, handler.EnqueueRequestsFromMapFunc(workMapFunc), workPredicateFn). + WithOptions(controller.Options{RateLimiter: ratelimiterflag.DefaultControllerRateLimiter(c.RateLimiterOptions)}). + Complete(c) +} + +func (c *CRBStatusController) syncBindingStatus(binding *workv1alpha2.ClusterResourceBinding) error { + resource, err := helper.FetchResourceTemplate(c.DynamicClient, c.InformerManager, c.RESTMapper, binding.Spec.Resource) + if err != nil { + if apierrors.IsNotFound(err) { + // It might happen when the resource template has been removed but the garbage collector hasn't removed + // the ResourceBinding which dependent on resource template. + // So, just return without retry(requeue) would save unnecessary loop. + return nil + } + klog.Errorf("Failed to fetch workload for clusterResourceBinding(%s). Error: %v", + binding.GetName(), err) + return err + } + + err = helper.AggregateClusterResourceBindingWorkStatus(c.Client, binding, resource, c.EventRecorder) + if err != nil { + klog.Errorf("Failed to aggregate workStatues to clusterResourceBinding(%s), Error: %v", + binding.Name, err) + return err + } + + err = updateResourceStatus(c.DynamicClient, c.RESTMapper, c.ResourceInterpreter, resource, binding.Status) + if err != nil { + return err + } + return nil +} diff --git a/pkg/controllers/status/rb_status_controller.go b/pkg/controllers/status/rb_status_controller.go new file mode 100644 index 000000000..4070bc8ba --- /dev/null +++ b/pkg/controllers/status/rb_status_controller.go @@ -0,0 +1,123 @@ +package status + +import ( + "context" + + apierrors "k8s.io/apimachinery/pkg/api/errors" + "k8s.io/apimachinery/pkg/api/meta" + "k8s.io/apimachinery/pkg/types" + "k8s.io/client-go/dynamic" + "k8s.io/client-go/tools/record" + "k8s.io/klog/v2" + controllerruntime "sigs.k8s.io/controller-runtime" + "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/controller" + "sigs.k8s.io/controller-runtime/pkg/handler" + "sigs.k8s.io/controller-runtime/pkg/reconcile" + "sigs.k8s.io/controller-runtime/pkg/source" + + workv1alpha1 "github.com/karmada-io/karmada/pkg/apis/work/v1alpha1" + workv1alpha2 "github.com/karmada-io/karmada/pkg/apis/work/v1alpha2" + "github.com/karmada-io/karmada/pkg/resourceinterpreter" + "github.com/karmada-io/karmada/pkg/sharedcli/ratelimiterflag" + "github.com/karmada-io/karmada/pkg/util/fedinformer/genericmanager" + "github.com/karmada-io/karmada/pkg/util/helper" +) + +// RBStatusControllerName is the controller name that will be used when reporting events. +const RBStatusControllerName = "resource-binding-status-controller" + +// RBStatusController is to sync status of ResourceBinding +// and aggregate status to the resource template. +type RBStatusController struct { + client.Client // used to operate ResourceBinding resources. + DynamicClient dynamic.Interface // used to fetch arbitrary resources from api server. + InformerManager genericmanager.SingleClusterInformerManager // used to fetch arbitrary resources from cache. + ResourceInterpreter resourceinterpreter.ResourceInterpreter + EventRecorder record.EventRecorder + RESTMapper meta.RESTMapper + RateLimiterOptions ratelimiterflag.Options +} + +// Reconcile performs a full reconciliation for the object referred to by the Request. +// The Controller will requeue the Request to be processed again if an error is non-nil or +// Result.Requeue is true, otherwise upon completion it will remove the work from the queue. +func (c *RBStatusController) Reconcile(ctx context.Context, req controllerruntime.Request) (controllerruntime.Result, error) { + klog.V(4).Infof("Reconciling ResourceBinding %s.", req.NamespacedName.String()) + + binding := &workv1alpha2.ResourceBinding{} + if err := c.Client.Get(ctx, req.NamespacedName, binding); err != nil { + // The rb no longer exist, in which case we stop processing. + if apierrors.IsNotFound(err) { + return controllerruntime.Result{}, nil + } + + return controllerruntime.Result{Requeue: true}, err + } + + // The rb is being deleted, in which case we stop processing. + if !binding.DeletionTimestamp.IsZero() { + return controllerruntime.Result{}, nil + } + + err := c.syncBindingStatus(binding) + if err != nil { + return controllerruntime.Result{Requeue: true}, err + } + return controllerruntime.Result{}, nil +} + +// SetupWithManager creates a controller and register to controller manager. +func (c *RBStatusController) SetupWithManager(mgr controllerruntime.Manager) error { + workMapFunc := handler.MapFunc( + func(workObj client.Object) []reconcile.Request { + var requests []reconcile.Request + + annotations := workObj.GetAnnotations() + namespace, nsExist := annotations[workv1alpha2.ResourceBindingNamespaceAnnotationKey] + name, nameExist := annotations[workv1alpha2.ResourceBindingNameAnnotationKey] + if nsExist && nameExist { + requests = append(requests, reconcile.Request{ + NamespacedName: types.NamespacedName{ + Namespace: namespace, + Name: name, + }, + }) + } + + return requests + }) + + return controllerruntime.NewControllerManagedBy(mgr).Named("resourceBinding_status_controller"). + Watches(&source.Kind{Type: &workv1alpha1.Work{}}, handler.EnqueueRequestsFromMapFunc(workMapFunc), workPredicateFn). + WithOptions(controller.Options{RateLimiter: ratelimiterflag.DefaultControllerRateLimiter(c.RateLimiterOptions)}). + Complete(c) +} + +func (c *RBStatusController) syncBindingStatus(binding *workv1alpha2.ResourceBinding) error { + resourceTemplate, err := helper.FetchResourceTemplate(c.DynamicClient, c.InformerManager, c.RESTMapper, binding.Spec.Resource) + if err != nil { + if apierrors.IsNotFound(err) { + // It might happen when the resource template has been removed but the garbage collector hasn't removed + // the ResourceBinding which dependent on resource template. + // So, just return without retry(requeue) would save unnecessary loop. + return nil + } + klog.Errorf("Failed to fetch workload for resourceBinding(%s/%s). Error: %v.", + binding.GetNamespace(), binding.GetName(), err) + return err + } + + err = helper.AggregateResourceBindingWorkStatus(c.Client, binding, resourceTemplate, c.EventRecorder) + if err != nil { + klog.Errorf("Failed to aggregate workStatues to resourceBinding(%s/%s), Error: %v", + binding.Namespace, binding.Name, err) + return err + } + + err = updateResourceStatus(c.DynamicClient, c.RESTMapper, c.ResourceInterpreter, resourceTemplate, binding.Status) + if err != nil { + return err + } + return nil +} diff --git a/pkg/controllers/status/workstatus_controller.go b/pkg/controllers/status/work_status_controller.go similarity index 100% rename from pkg/controllers/status/workstatus_controller.go rename to pkg/controllers/status/work_status_controller.go diff --git a/pkg/dependenciesdistributor/dependencies_distributor.go b/pkg/dependenciesdistributor/dependencies_distributor.go index 5eeb9f794..09d1831e7 100644 --- a/pkg/dependenciesdistributor/dependencies_distributor.go +++ b/pkg/dependenciesdistributor/dependencies_distributor.go @@ -326,7 +326,7 @@ func (d *DependenciesDistributor) ReconcileResourceBinding(key util.QueueKey) er return d.handleResourceBindingDeletion(ckey) } - workload, err := helper.FetchWorkload(d.DynamicClient, d.InformerManager, d.RESTMapper, bindingObject.Spec.Resource) + workload, err := helper.FetchResourceTemplate(d.DynamicClient, d.InformerManager, d.RESTMapper, bindingObject.Spec.Resource) if err != nil { klog.Errorf("Failed to fetch workload for resourceBinding(%s/%s). Error: %v.", bindingObject.Namespace, bindingObject.Name, err) return err @@ -394,7 +394,7 @@ func (d *DependenciesDistributor) syncScheduleResultToAttachedBindings(binding * Name: dependent.Name, } - rawObject, err := helper.FetchWorkload(d.DynamicClient, d.InformerManager, d.RESTMapper, resource) + rawObject, err := helper.FetchResourceTemplate(d.DynamicClient, d.InformerManager, d.RESTMapper, resource) if err != nil { // do nothing if resource template not exist. if apierrors.IsNotFound(err) { diff --git a/pkg/detector/detector.go b/pkg/detector/detector.go index 1ec920ccf..d6d26d554 100644 --- a/pkg/detector/detector.go +++ b/pkg/detector/detector.go @@ -1078,7 +1078,7 @@ func (d *ResourceDetector) HandleClusterPropagationPolicyCreationOrUpdate(policy // CleanupLabels removes labels from object referencing by objRef. func (d *ResourceDetector) CleanupLabels(objRef workv1alpha2.ObjectReference, labels ...string) error { - workload, err := helper.FetchWorkload(d.DynamicClient, d.InformerManager, d.RESTMapper, objRef) + workload, err := helper.FetchResourceTemplate(d.DynamicClient, d.InformerManager, d.RESTMapper, objRef) if err != nil { // do nothing if resource template not exist, it might has been removed. if apierrors.IsNotFound(err) { diff --git a/pkg/util/helper/binding.go b/pkg/util/helper/binding.go index e6a140ce8..417733b91 100644 --- a/pkg/util/helper/binding.go +++ b/pkg/util/helper/binding.go @@ -215,45 +215,46 @@ func RemoveOrphanWorks(c client.Client, works []workv1alpha1.Work) error { return errors.NewAggregate(errs) } -// FetchWorkload fetches the kubernetes resource to be propagated. -func FetchWorkload(dynamicClient dynamic.Interface, informerManager genericmanager.SingleClusterInformerManager, - restMapper meta.RESTMapper, resource workv1alpha2.ObjectReference) (*unstructured.Unstructured, error) { - dynamicResource, err := restmapper.GetGroupVersionResource(restMapper, - schema.FromAPIVersionAndKind(resource.APIVersion, resource.Kind)) +// FetchResourceTemplate fetches the resource template to be propagated. +func FetchResourceTemplate( + dynamicClient dynamic.Interface, + informerManager genericmanager.SingleClusterInformerManager, + restMapper meta.RESTMapper, + resource workv1alpha2.ObjectReference, +) (*unstructured.Unstructured, error) { + gvr, err := restmapper.GetGroupVersionResource(restMapper, schema.FromAPIVersionAndKind(resource.APIVersion, resource.Kind)) if err != nil { - klog.Errorf("Failed to get GVR from GVK %s %s. Error: %v", resource.APIVersion, - resource.Kind, err) + klog.Errorf("Failed to get GVR from GVK(%s/%s), Error: %v", resource.APIVersion, resource.Kind, err) return nil, err } - var workload runtime.Object + var object runtime.Object if len(resource.Namespace) == 0 { // cluster-scoped resource - workload, err = informerManager.Lister(dynamicResource).Get(resource.Name) + object, err = informerManager.Lister(gvr).Get(resource.Name) } else { - workload, err = informerManager.Lister(dynamicResource).ByNamespace(resource.Namespace).Get(resource.Name) + object, err = informerManager.Lister(gvr).ByNamespace(resource.Namespace).Get(resource.Name) } if err != nil { // fall back to call api server in case the cache has not been synchronized yet - klog.Warningf("Failed to get workload from cache, kind: %s, namespace: %s, name: %s. Error: %v. Fall back to call api server", + klog.Warningf("Failed to get resource template (%s/%s/%s) from cache, Error: %v. Fall back to call api server.", resource.Kind, resource.Namespace, resource.Name, err) - workload, err = dynamicClient.Resource(dynamicResource).Namespace(resource.Namespace).Get(context.TODO(), - resource.Name, metav1.GetOptions{}) + object, err = dynamicClient.Resource(gvr).Namespace(resource.Namespace).Get(context.TODO(), resource.Name, metav1.GetOptions{}) if err != nil { - klog.Errorf("Failed to get workload from api server, kind: %s, namespace: %s, name: %s. Error: %v", + klog.Errorf("Failed to get resource template (%s/%s/%s) from api server, Error: %v", resource.Kind, resource.Namespace, resource.Name, err) return nil, err } } - unstructuredWorkLoad, err := ToUnstructured(workload) + unstructuredObj, err := ToUnstructured(object) if err != nil { - klog.Errorf("Failed to transform object(%s/%s): %v", resource.Namespace, resource.Name, err) + klog.Errorf("Failed to transform object(%s/%s), Error: %v", resource.Namespace, resource.Name, err) return nil, err } - return unstructuredWorkLoad, nil + return unstructuredObj, nil } // GetClusterResourceBindings returns a ClusterResourceBindingList by labels. diff --git a/pkg/util/helper/binding_test.go b/pkg/util/helper/binding_test.go index 1f0eeb48d..496da02af 100644 --- a/pkg/util/helper/binding_test.go +++ b/pkg/util/helper/binding_test.go @@ -852,9 +852,9 @@ func TestFetchWorkload(t *testing.T) { t.Run(tt.name, func(t *testing.T) { stopCh := make(chan struct{}) mgr := tt.args.informerManager(stopCh) - got, err := FetchWorkload(tt.args.dynamicClient, mgr, tt.args.restMapper, tt.args.resource) + got, err := FetchResourceTemplate(tt.args.dynamicClient, mgr, tt.args.restMapper, tt.args.resource) if (err != nil) != tt.wantErr { - t.Errorf("FetchWorkload() error = %v, wantErr %v", err, tt.wantErr) + t.Errorf("FetchResourceTemplate() error = %v, wantErr %v", err, tt.wantErr) return } if got != nil { @@ -863,7 +863,7 @@ func TestFetchWorkload(t *testing.T) { delete(got.Object, "status") } if !reflect.DeepEqual(got, tt.want) { - t.Errorf("FetchWorkload() got = %v, want %v", got, tt.want) + t.Errorf("FetchResourceTemplate() got = %v, want %v", got, tt.want) } }) } diff --git a/pkg/util/helper/work.go b/pkg/util/helper/work.go index dde7d64f1..617fd2a58 100644 --- a/pkg/util/helper/work.go +++ b/pkg/util/helper/work.go @@ -52,6 +52,9 @@ func CreateOrUpdateWork(client client.Client, workMeta metav1.ObjectMeta, resour var operationResult controllerutil.OperationResult err = retry.RetryOnConflict(retry.DefaultRetry, func() (err error) { operationResult, err = controllerutil.CreateOrUpdate(context.TODO(), client, runtimeObject, func() error { + if !runtimeObject.DeletionTimestamp.IsZero() { + return fmt.Errorf("work %s/%s is being deleted", runtimeObject.GetNamespace(), runtimeObject.GetName()) + } runtimeObject.Spec = work.Spec runtimeObject.Labels = work.Labels runtimeObject.Annotations = work.Annotations diff --git a/pkg/util/helper/workstatus.go b/pkg/util/helper/workstatus.go index b0d7fa83e..9be147af0 100644 --- a/pkg/util/helper/workstatus.go +++ b/pkg/util/helper/workstatus.go @@ -39,13 +39,20 @@ const ( // AggregateResourceBindingWorkStatus will collect all work statuses with current ResourceBinding objects, // then aggregate status info to current ResourceBinding status. -func AggregateResourceBindingWorkStatus(c client.Client, binding *workv1alpha2.ResourceBinding, workload *unstructured.Unstructured, eventRecorder record.EventRecorder) error { +func AggregateResourceBindingWorkStatus( + c client.Client, + binding *workv1alpha2.ResourceBinding, + resourceTemplate *unstructured.Unstructured, + eventRecorder record.EventRecorder, +) error { + binding.GetName() + workList, err := GetWorksByBindingNamespaceName(c, binding.Namespace, binding.Name) if err != nil { return err } - aggregatedStatuses, err := assembleWorkStatus(workList.Items, workload) + aggregatedStatuses, err := assembleWorkStatus(workList.Items, resourceTemplate) if err != nil { return err } @@ -82,25 +89,30 @@ func AggregateResourceBindingWorkStatus(c client.Client, binding *workv1alpha2.R }) if err != nil { eventRecorder.Event(binding, corev1.EventTypeWarning, events.EventReasonAggregateStatusFailed, err.Error()) - eventRecorder.Event(workload, corev1.EventTypeWarning, events.EventReasonAggregateStatusFailed, err.Error()) + eventRecorder.Event(resourceTemplate, corev1.EventTypeWarning, events.EventReasonAggregateStatusFailed, err.Error()) return err } msg := fmt.Sprintf("Update resourceBinding(%s/%s) with AggregatedStatus successfully.", binding.Namespace, binding.Name) eventRecorder.Event(binding, corev1.EventTypeNormal, events.EventReasonAggregateStatusSucceed, msg) - eventRecorder.Event(workload, corev1.EventTypeNormal, events.EventReasonAggregateStatusSucceed, msg) + eventRecorder.Event(resourceTemplate, corev1.EventTypeNormal, events.EventReasonAggregateStatusSucceed, msg) return nil } // AggregateClusterResourceBindingWorkStatus will collect all work statuses with current ClusterResourceBinding objects, // then aggregate status info to current ClusterResourceBinding status. -func AggregateClusterResourceBindingWorkStatus(c client.Client, binding *workv1alpha2.ClusterResourceBinding, workload *unstructured.Unstructured, eventRecorder record.EventRecorder) error { +func AggregateClusterResourceBindingWorkStatus( + c client.Client, + binding *workv1alpha2.ClusterResourceBinding, + resourceTemplate *unstructured.Unstructured, + eventRecorder record.EventRecorder, +) error { workList, err := GetWorksByBindingNamespaceName(c, "", binding.Name) if err != nil { return err } - aggregatedStatuses, err := assembleWorkStatus(workList.Items, workload) + aggregatedStatuses, err := assembleWorkStatus(workList.Items, resourceTemplate) if err != nil { return err } @@ -136,13 +148,13 @@ func AggregateClusterResourceBindingWorkStatus(c client.Client, binding *workv1a }) if err != nil { eventRecorder.Event(binding, corev1.EventTypeWarning, events.EventReasonAggregateStatusFailed, err.Error()) - eventRecorder.Event(workload, corev1.EventTypeWarning, events.EventReasonAggregateStatusFailed, err.Error()) + eventRecorder.Event(resourceTemplate, corev1.EventTypeWarning, events.EventReasonAggregateStatusFailed, err.Error()) return err } msg := fmt.Sprintf("Update clusterResourceBinding(%s) with AggregatedStatus successfully.", binding.Name) eventRecorder.Event(binding, corev1.EventTypeNormal, events.EventReasonAggregateStatusSucceed, msg) - eventRecorder.Event(workload, corev1.EventTypeNormal, events.EventReasonAggregateStatusSucceed, msg) + eventRecorder.Event(resourceTemplate, corev1.EventTypeNormal, events.EventReasonAggregateStatusSucceed, msg) return nil }