From 0d0e7c333e6eb2b715e910dcd418d8793093f08e Mon Sep 17 00:00:00 2001 From: changzhen Date: Fri, 9 Jul 2021 16:06:45 +0800 Subject: [PATCH] suppress reported work from propagating in agent Signed-off-by: changzhen --- cmd/agent/app/agent.go | 9 ++--- pkg/util/helper/predicate.go | 77 +++++++++++++++++++++++++++++++----- 2 files changed, 71 insertions(+), 15 deletions(-) diff --git a/cmd/agent/app/agent.go b/cmd/agent/app/agent.go index 03041f265..19a41bdc7 100644 --- a/cmd/agent/app/agent.go +++ b/cmd/agent/app/agent.go @@ -12,7 +12,6 @@ import ( "k8s.io/client-go/tools/clientcmd" "k8s.io/klog/v2" controllerruntime "sigs.k8s.io/controller-runtime" - "sigs.k8s.io/controller-runtime/pkg/predicate" "github.com/karmada-io/karmada/cmd/agent/app/options" clusterv1alpha1 "github.com/karmada-io/karmada/pkg/apis/cluster/v1alpha1" @@ -94,7 +93,7 @@ func setupControllers(mgr controllerruntime.Manager, opts *options.Options, stop Client: mgr.GetClient(), KubeClient: kubeclientset.NewForConfigOrDie(mgr.GetConfig()), EventRecorder: mgr.GetEventRecorderFor(status.ControllerName), - PredicateFunc: helper.NewClusterPredicateByAgent(opts.ClusterName), + PredicateFunc: helper.NewClusterPredicateOnAgent(opts.ClusterName), InformerManager: informermanager.GetInstance(), StopChan: stopChan, ClusterClientSetFunc: util.NewClusterClientSetForAgent, @@ -113,7 +112,7 @@ func setupControllers(mgr controllerruntime.Manager, opts *options.Options, stop EventRecorder: mgr.GetEventRecorderFor(execution.ControllerName), RESTMapper: mgr.GetRESTMapper(), ObjectWatcher: objectWatcher, - PredicateFunc: predicate.Funcs{}, + PredicateFunc: helper.NewExecutionPredicateOnAgent(), ClusterClientSetFunc: util.NewClusterDynamicClientSetForAgent, } if err := executionController.SetupWithManager(mgr); err != nil { @@ -128,7 +127,7 @@ func setupControllers(mgr controllerruntime.Manager, opts *options.Options, stop StopChan: stopChan, WorkerNumber: 1, ObjectWatcher: objectWatcher, - PredicateFunc: predicate.Funcs{}, + PredicateFunc: helper.NewExecutionPredicateOnAgent(), ClusterClientSetFunc: util.NewClusterDynamicClientSetForAgent, } workStatusController.RunWorkQueue() @@ -143,7 +142,7 @@ func setupControllers(mgr controllerruntime.Manager, opts *options.Options, stop InformerManager: informermanager.GetInstance(), StopChan: stopChan, WorkerNumber: 1, - PredicateFunc: helper.NewPredicateForServiceExportControllerByAgent(opts.ClusterName), + PredicateFunc: helper.NewPredicateForServiceExportControllerOnAgent(opts.ClusterName), ClusterDynamicClientSetFunc: util.NewClusterDynamicClientSetForAgent, } serviceExportController.RunWorkQueue() diff --git a/pkg/util/helper/predicate.go b/pkg/util/helper/predicate.go index 5818baeca..35fb71e60 100644 --- a/pkg/util/helper/predicate.go +++ b/pkg/util/helper/predicate.go @@ -99,10 +99,16 @@ func NewPredicateForServiceExportController(mgr controllerruntime.Manager) predi return false }, UpdateFunc: func(updateEvent event.UpdateEvent) bool { - clusterName, err := names.GetClusterName(updateEvent.ObjectOld.GetNamespace()) + obj := updateEvent.ObjectNew.(*workv1alpha1.Work) + + if util.GetLabelValue(obj.Labels, util.PropagationInstruction) == util.PropagationInstructionSuppressed { + klog.V(5).Infof("Ignored Work(%s/%s) update event as propagation instruction is suppressed.", obj.Namespace, obj.Name) + return false + } + + clusterName, err := names.GetClusterName(obj.GetNamespace()) if err != nil { - klog.Errorf("Failed to get member cluster name for work %s/%s", updateEvent.ObjectOld.GetNamespace(), - updateEvent.ObjectOld.GetName()) + klog.Errorf("Failed to get member cluster name for work %s/%s", obj.GetNamespace(), obj.GetName()) return false } @@ -122,8 +128,8 @@ func NewPredicateForServiceExportController(mgr controllerruntime.Manager) predi } } -// NewClusterPredicateByAgent generates an event filter function with Cluster for karmada-agent. -func NewClusterPredicateByAgent(clusterName string) predicate.Funcs { +// NewClusterPredicateOnAgent generates an event filter function with Cluster for karmada-agent. +func NewClusterPredicateOnAgent(clusterName string) predicate.Funcs { return predicate.Funcs{ CreateFunc: func(createEvent event.CreateEvent) bool { return createEvent.Object.GetName() == clusterName @@ -140,17 +146,23 @@ func NewClusterPredicateByAgent(clusterName string) predicate.Funcs { } } -// NewPredicateForServiceExportControllerByAgent generates an event filter function for ServiceExport controller running by karmada-agent. -func NewPredicateForServiceExportControllerByAgent(curClusterName string) predicate.Funcs { +// NewPredicateForServiceExportControllerOnAgent generates an event filter function for ServiceExport controller running by karmada-agent. +func NewPredicateForServiceExportControllerOnAgent(curClusterName string) predicate.Funcs { return predicate.Funcs{ CreateFunc: func(createEvent event.CreateEvent) bool { return false }, UpdateFunc: func(updateEvent event.UpdateEvent) bool { - clusterName, err := names.GetClusterName(updateEvent.ObjectOld.GetNamespace()) + obj := updateEvent.ObjectNew.(*workv1alpha1.Work) + + if util.GetLabelValue(obj.Labels, util.PropagationInstruction) == util.PropagationInstructionSuppressed { + klog.V(5).Infof("Ignored Work(%s/%s) update event as propagation instruction is suppressed.", obj.Namespace, obj.Name) + return false + } + + clusterName, err := names.GetClusterName(obj.GetNamespace()) if err != nil { - klog.Errorf("Failed to get member cluster name for work %s/%s", updateEvent.ObjectOld.GetNamespace(), - updateEvent.ObjectOld.GetName()) + klog.Errorf("Failed to get member cluster name for work %s/%s", obj.GetNamespace(), obj.GetName()) return false } return clusterName == curClusterName @@ -163,3 +175,48 @@ func NewPredicateForServiceExportControllerByAgent(curClusterName string) predic }, } } + +// NewExecutionPredicateOnAgent generates the event filter function to skip events that the controllers are uninterested. +// Used by controllers: +// - execution controller working in agent +// - work status controller working in agent +func NewExecutionPredicateOnAgent() predicate.Funcs { + return predicate.Funcs{ + CreateFunc: func(createEvent event.CreateEvent) bool { + obj := createEvent.Object.(*workv1alpha1.Work) + + // Ignore the object that has been suppressed. + if util.GetLabelValue(obj.Labels, util.PropagationInstruction) == util.PropagationInstructionSuppressed { + klog.V(5).Infof("Ignored Work(%s/%s) create event as propagation instruction is suppressed.", obj.Namespace, obj.Name) + return false + } + + return true + }, + UpdateFunc: func(updateEvent event.UpdateEvent) bool { + obj := updateEvent.ObjectNew.(*workv1alpha1.Work) + + // Ignore the object that has been suppressed. + if util.GetLabelValue(obj.Labels, util.PropagationInstruction) == util.PropagationInstructionSuppressed { + klog.V(5).Infof("Ignored Work(%s/%s) update event as propagation instruction is suppressed.", obj.Namespace, obj.Name) + return false + } + + return true + }, + DeleteFunc: func(deleteEvent event.DeleteEvent) bool { + obj := deleteEvent.Object.(*workv1alpha1.Work) + + // Ignore the object that has been suppressed. + if util.GetLabelValue(obj.Labels, util.PropagationInstruction) == util.PropagationInstructionSuppressed { + klog.V(5).Infof("Ignored Work(%s/%s) delete event as propagation instruction is suppressed.", obj.Namespace, obj.Name) + return false + } + + return true + }, + GenericFunc: func(genericEvent event.GenericEvent) bool { + return false + }, + } +}