diff --git a/pkg/clusterdiscovery/clusterapi/clusterapi.go b/pkg/clusterdiscovery/clusterapi/clusterapi.go index f89a16f22..91256d6f1 100644 --- a/pkg/clusterdiscovery/clusterapi/clusterapi.go +++ b/pkg/clusterdiscovery/clusterapi/clusterapi.go @@ -82,7 +82,7 @@ func (d *ClusterDetector) OnAdd(obj interface{}) { if !ok { return } - d.Processor.EnqueueRateLimited(runtimeObj) + d.Processor.Enqueue(runtimeObj) } // OnUpdate handles object update event and push the object to queue. diff --git a/pkg/controllers/mcs/service_export_controller.go b/pkg/controllers/mcs/service_export_controller.go index 331723bf0..9a2e7e45f 100644 --- a/pkg/controllers/mcs/service_export_controller.go +++ b/pkg/controllers/mcs/service_export_controller.go @@ -221,7 +221,7 @@ func (c *ServiceExportController) genHandlerAddFunc(clusterName string) func(obj klog.Warningf("Failed to generate key for obj: %s", curObj.GetObjectKind().GroupVersionKind()) return } - c.worker.AddRateLimited(key) + c.worker.Add(key) } } @@ -234,7 +234,7 @@ func (c *ServiceExportController) genHandlerUpdateFunc(clusterName string) func( klog.Warningf("Failed to generate key for obj: %s", curObj.GetObjectKind().GroupVersionKind()) return } - c.worker.AddRateLimited(key) + c.worker.Add(key) } } } @@ -254,7 +254,7 @@ func (c *ServiceExportController) genHandlerDeleteFunc(clusterName string) func( klog.Warningf("Failed to generate key for obj: %s", oldObj.GetObjectKind().GroupVersionKind()) return } - c.worker.AddRateLimited(key) + c.worker.Add(key) } } diff --git a/pkg/controllers/status/workstatus_controller.go b/pkg/controllers/status/workstatus_controller.go index b3cef8f6c..539253f79 100644 --- a/pkg/controllers/status/workstatus_controller.go +++ b/pkg/controllers/status/workstatus_controller.go @@ -106,7 +106,7 @@ func (c *WorkStatusController) buildResourceInformers(cluster *clusterv1alpha1.C // getEventHandler return callback function that knows how to handle events from the member cluster. func (c *WorkStatusController) getEventHandler() cache.ResourceEventHandler { if c.eventHandler == nil { - c.eventHandler = informermanager.NewHandlerOnAllEvents(c.worker.EnqueueRateLimited) + c.eventHandler = informermanager.NewHandlerOnAllEvents(c.worker.Enqueue) } return c.eventHandler } diff --git a/pkg/detector/detector.go b/pkg/detector/detector.go index 3af2e63aa..6fa816d6b 100644 --- a/pkg/detector/detector.go +++ b/pkg/detector/detector.go @@ -314,7 +314,7 @@ func (d *ResourceDetector) OnAdd(obj interface{}) { if !ok { return } - d.Processor.EnqueueRateLimited(runtimeObj) + d.Processor.Enqueue(runtimeObj) } // OnUpdate handles object update event and push the object to queue. @@ -730,7 +730,7 @@ func (d *ResourceDetector) OnPropagationPolicyAdd(obj interface{}) { } klog.V(2).Infof("Create PropagationPolicy(%s)", key) - d.policyReconcileWorker.AddRateLimited(key) + d.policyReconcileWorker.Add(key) } // OnPropagationPolicyUpdate handles object update event and push the object to queue. @@ -746,7 +746,7 @@ func (d *ResourceDetector) OnPropagationPolicyDelete(obj interface{}) { } klog.V(2).Infof("Delete PropagationPolicy(%s)", key) - d.policyReconcileWorker.AddRateLimited(key) + d.policyReconcileWorker.Add(key) } // ReconcilePropagationPolicy handles PropagationPolicy resource changes. @@ -788,7 +788,7 @@ func (d *ResourceDetector) OnClusterPropagationPolicyAdd(obj interface{}) { } klog.V(2).Infof("Create ClusterPropagationPolicy(%s)", key) - d.clusterPolicyReconcileWorker.AddRateLimited(key) + d.clusterPolicyReconcileWorker.Add(key) } // OnClusterPropagationPolicyUpdate handles object update event and push the object to queue. @@ -804,7 +804,7 @@ func (d *ResourceDetector) OnClusterPropagationPolicyDelete(obj interface{}) { } klog.V(2).Infof("Delete ClusterPropagationPolicy(%s)", key) - d.clusterPolicyReconcileWorker.AddRateLimited(key) + d.clusterPolicyReconcileWorker.Add(key) } // ReconcileClusterPropagationPolicy handles ClusterPropagationPolicy resource changes. @@ -937,7 +937,7 @@ func (d *ResourceDetector) HandlePropagationPolicyCreation(policy *policyv1alpha for _, key := range matchedKeys { d.RemoveWaiting(key) - d.Processor.AddRateLimited(key) + d.Processor.Add(key) } return nil @@ -961,7 +961,7 @@ func (d *ResourceDetector) HandleClusterPropagationPolicyCreation(policy *policy for _, key := range matchedKeys { d.RemoveWaiting(key) - d.Processor.AddRateLimited(key) + d.Processor.Add(key) } return nil @@ -974,7 +974,7 @@ func (d *ResourceDetector) OnResourceBindingAdd(obj interface{}) { return } - d.bindingReconcileWorker.AddRateLimited(key) + d.bindingReconcileWorker.Add(key) } // OnResourceBindingUpdate handles object update event and push the object to queue. diff --git a/pkg/scheduler/scheduler.go b/pkg/scheduler/scheduler.go index 84fab054d..9de423870 100644 --- a/pkg/scheduler/scheduler.go +++ b/pkg/scheduler/scheduler.go @@ -563,7 +563,7 @@ func (s *Scheduler) handleErr(err error, key interface{}) { return } - s.queue.AddRateLimited(key) + s.queue.Add(key) metrics.CountSchedulerBindings(metrics.ScheduleAttemptFailure) } @@ -576,7 +576,7 @@ func (s *Scheduler) addCluster(obj interface{}) { klog.V(3).Infof("Add event for cluster %s", cluster.Name) if s.enableSchedulerEstimator { - s.schedulerEstimatorWorker.AddRateLimited(cluster.Name) + s.schedulerEstimatorWorker.Add(cluster.Name) } } @@ -589,7 +589,7 @@ func (s *Scheduler) updateCluster(_, newObj interface{}) { klog.V(3).Infof("Update event for cluster %s", newCluster.Name) if s.enableSchedulerEstimator { - s.schedulerEstimatorWorker.AddRateLimited(newCluster.Name) + s.schedulerEstimatorWorker.Add(newCluster.Name) } // Check if cluster becomes failure @@ -623,7 +623,7 @@ func (s *Scheduler) deleteCluster(obj interface{}) { klog.V(3).Infof("Delete event for cluster %s", cluster.Name) if s.enableSchedulerEstimator { - s.schedulerEstimatorWorker.AddRateLimited(cluster.Name) + s.schedulerEstimatorWorker.Add(cluster.Name) } } diff --git a/pkg/util/worker.go b/pkg/util/worker.go index 89cc1b2d3..8233979da 100644 --- a/pkg/util/worker.go +++ b/pkg/util/worker.go @@ -18,10 +18,10 @@ const ( // AsyncWorker is a worker to process resources periodic with a rateLimitingQueue. type AsyncWorker interface { - // AddRateLimited adds item to queue. - AddRateLimited(item interface{}) - // EnqueueRateLimited generates the key for objects then adds the key as an item to queue. - EnqueueRateLimited(obj runtime.Object) + // Add adds item to queue. + Add(item interface{}) + // Enqueue generates the key for objects then adds the key as an item to queue. + Enqueue(obj runtime.Object) Run(workerNumber int, stopChan <-chan struct{}) } @@ -57,7 +57,7 @@ func NewAsyncWorker(name string, keyFunc KeyFunc, reconcileFunc ReconcileFunc) A } } -func (w *asyncWorker) EnqueueRateLimited(obj runtime.Object) { +func (w *asyncWorker) Enqueue(obj runtime.Object) { key, err := w.keyFunc(obj) if err != nil { klog.Warningf("Failed to generate key for obj: %s", obj.GetObjectKind().GroupVersionKind()) @@ -68,16 +68,16 @@ func (w *asyncWorker) EnqueueRateLimited(obj runtime.Object) { return } - w.AddRateLimited(key) + w.Add(key) } -func (w *asyncWorker) AddRateLimited(item interface{}) { +func (w *asyncWorker) Add(item interface{}) { if item == nil { klog.Warningf("Ignore nil item from queue") return } - w.queue.AddRateLimited(item) + w.queue.Add(item) } func (w *asyncWorker) handleError(err error, key interface{}) {