Merge pull request #1069 from Garrybest/pr_queue

fix slow enqueue of async worker
This commit is contained in:
karmada-bot 2021-12-06 10:26:13 +08:00 committed by GitHub
commit 85920e7ef4
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 25 additions and 25 deletions

View File

@ -82,7 +82,7 @@ func (d *ClusterDetector) OnAdd(obj interface{}) {
if !ok { if !ok {
return return
} }
d.Processor.EnqueueRateLimited(runtimeObj) d.Processor.Enqueue(runtimeObj)
} }
// OnUpdate handles object update event and push the object to queue. // OnUpdate handles object update event and push the object to queue.

View File

@ -221,7 +221,7 @@ func (c *ServiceExportController) genHandlerAddFunc(clusterName string) func(obj
klog.Warningf("Failed to generate key for obj: %s", curObj.GetObjectKind().GroupVersionKind()) klog.Warningf("Failed to generate key for obj: %s", curObj.GetObjectKind().GroupVersionKind())
return return
} }
c.worker.AddRateLimited(key) c.worker.Add(key)
} }
} }
@ -234,7 +234,7 @@ func (c *ServiceExportController) genHandlerUpdateFunc(clusterName string) func(
klog.Warningf("Failed to generate key for obj: %s", curObj.GetObjectKind().GroupVersionKind()) klog.Warningf("Failed to generate key for obj: %s", curObj.GetObjectKind().GroupVersionKind())
return return
} }
c.worker.AddRateLimited(key) c.worker.Add(key)
} }
} }
} }
@ -254,7 +254,7 @@ func (c *ServiceExportController) genHandlerDeleteFunc(clusterName string) func(
klog.Warningf("Failed to generate key for obj: %s", oldObj.GetObjectKind().GroupVersionKind()) klog.Warningf("Failed to generate key for obj: %s", oldObj.GetObjectKind().GroupVersionKind())
return return
} }
c.worker.AddRateLimited(key) c.worker.Add(key)
} }
} }

View File

@ -106,7 +106,7 @@ func (c *WorkStatusController) buildResourceInformers(cluster *clusterv1alpha1.C
// getEventHandler return callback function that knows how to handle events from the member cluster. // getEventHandler return callback function that knows how to handle events from the member cluster.
func (c *WorkStatusController) getEventHandler() cache.ResourceEventHandler { func (c *WorkStatusController) getEventHandler() cache.ResourceEventHandler {
if c.eventHandler == nil { if c.eventHandler == nil {
c.eventHandler = informermanager.NewHandlerOnAllEvents(c.worker.EnqueueRateLimited) c.eventHandler = informermanager.NewHandlerOnAllEvents(c.worker.Enqueue)
} }
return c.eventHandler return c.eventHandler
} }

View File

@ -314,7 +314,7 @@ func (d *ResourceDetector) OnAdd(obj interface{}) {
if !ok { if !ok {
return return
} }
d.Processor.EnqueueRateLimited(runtimeObj) d.Processor.Enqueue(runtimeObj)
} }
// OnUpdate handles object update event and push the object to queue. // OnUpdate handles object update event and push the object to queue.
@ -730,7 +730,7 @@ func (d *ResourceDetector) OnPropagationPolicyAdd(obj interface{}) {
} }
klog.V(2).Infof("Create PropagationPolicy(%s)", key) klog.V(2).Infof("Create PropagationPolicy(%s)", key)
d.policyReconcileWorker.AddRateLimited(key) d.policyReconcileWorker.Add(key)
} }
// OnPropagationPolicyUpdate handles object update event and push the object to queue. // OnPropagationPolicyUpdate handles object update event and push the object to queue.
@ -746,7 +746,7 @@ func (d *ResourceDetector) OnPropagationPolicyDelete(obj interface{}) {
} }
klog.V(2).Infof("Delete PropagationPolicy(%s)", key) klog.V(2).Infof("Delete PropagationPolicy(%s)", key)
d.policyReconcileWorker.AddRateLimited(key) d.policyReconcileWorker.Add(key)
} }
// ReconcilePropagationPolicy handles PropagationPolicy resource changes. // ReconcilePropagationPolicy handles PropagationPolicy resource changes.
@ -788,7 +788,7 @@ func (d *ResourceDetector) OnClusterPropagationPolicyAdd(obj interface{}) {
} }
klog.V(2).Infof("Create ClusterPropagationPolicy(%s)", key) klog.V(2).Infof("Create ClusterPropagationPolicy(%s)", key)
d.clusterPolicyReconcileWorker.AddRateLimited(key) d.clusterPolicyReconcileWorker.Add(key)
} }
// OnClusterPropagationPolicyUpdate handles object update event and push the object to queue. // OnClusterPropagationPolicyUpdate handles object update event and push the object to queue.
@ -804,7 +804,7 @@ func (d *ResourceDetector) OnClusterPropagationPolicyDelete(obj interface{}) {
} }
klog.V(2).Infof("Delete ClusterPropagationPolicy(%s)", key) klog.V(2).Infof("Delete ClusterPropagationPolicy(%s)", key)
d.clusterPolicyReconcileWorker.AddRateLimited(key) d.clusterPolicyReconcileWorker.Add(key)
} }
// ReconcileClusterPropagationPolicy handles ClusterPropagationPolicy resource changes. // ReconcileClusterPropagationPolicy handles ClusterPropagationPolicy resource changes.
@ -937,7 +937,7 @@ func (d *ResourceDetector) HandlePropagationPolicyCreation(policy *policyv1alpha
for _, key := range matchedKeys { for _, key := range matchedKeys {
d.RemoveWaiting(key) d.RemoveWaiting(key)
d.Processor.AddRateLimited(key) d.Processor.Add(key)
} }
return nil return nil
@ -961,7 +961,7 @@ func (d *ResourceDetector) HandleClusterPropagationPolicyCreation(policy *policy
for _, key := range matchedKeys { for _, key := range matchedKeys {
d.RemoveWaiting(key) d.RemoveWaiting(key)
d.Processor.AddRateLimited(key) d.Processor.Add(key)
} }
return nil return nil
@ -974,7 +974,7 @@ func (d *ResourceDetector) OnResourceBindingAdd(obj interface{}) {
return return
} }
d.bindingReconcileWorker.AddRateLimited(key) d.bindingReconcileWorker.Add(key)
} }
// OnResourceBindingUpdate handles object update event and push the object to queue. // OnResourceBindingUpdate handles object update event and push the object to queue.

View File

@ -563,7 +563,7 @@ func (s *Scheduler) handleErr(err error, key interface{}) {
return return
} }
s.queue.AddRateLimited(key) s.queue.Add(key)
metrics.CountSchedulerBindings(metrics.ScheduleAttemptFailure) metrics.CountSchedulerBindings(metrics.ScheduleAttemptFailure)
} }
@ -576,7 +576,7 @@ func (s *Scheduler) addCluster(obj interface{}) {
klog.V(3).Infof("Add event for cluster %s", cluster.Name) klog.V(3).Infof("Add event for cluster %s", cluster.Name)
if s.enableSchedulerEstimator { if s.enableSchedulerEstimator {
s.schedulerEstimatorWorker.AddRateLimited(cluster.Name) s.schedulerEstimatorWorker.Add(cluster.Name)
} }
} }
@ -589,7 +589,7 @@ func (s *Scheduler) updateCluster(_, newObj interface{}) {
klog.V(3).Infof("Update event for cluster %s", newCluster.Name) klog.V(3).Infof("Update event for cluster %s", newCluster.Name)
if s.enableSchedulerEstimator { if s.enableSchedulerEstimator {
s.schedulerEstimatorWorker.AddRateLimited(newCluster.Name) s.schedulerEstimatorWorker.Add(newCluster.Name)
} }
// Check if cluster becomes failure // Check if cluster becomes failure
@ -623,7 +623,7 @@ func (s *Scheduler) deleteCluster(obj interface{}) {
klog.V(3).Infof("Delete event for cluster %s", cluster.Name) klog.V(3).Infof("Delete event for cluster %s", cluster.Name)
if s.enableSchedulerEstimator { if s.enableSchedulerEstimator {
s.schedulerEstimatorWorker.AddRateLimited(cluster.Name) s.schedulerEstimatorWorker.Add(cluster.Name)
} }
} }

View File

@ -18,10 +18,10 @@ const (
// AsyncWorker is a worker to process resources periodic with a rateLimitingQueue. // AsyncWorker is a worker to process resources periodic with a rateLimitingQueue.
type AsyncWorker interface { type AsyncWorker interface {
// AddRateLimited adds item to queue. // Add adds item to queue.
AddRateLimited(item interface{}) Add(item interface{})
// EnqueueRateLimited generates the key for objects then adds the key as an item to queue. // Enqueue generates the key for objects then adds the key as an item to queue.
EnqueueRateLimited(obj runtime.Object) Enqueue(obj runtime.Object)
Run(workerNumber int, stopChan <-chan struct{}) Run(workerNumber int, stopChan <-chan struct{})
} }
@ -57,7 +57,7 @@ func NewAsyncWorker(name string, keyFunc KeyFunc, reconcileFunc ReconcileFunc) A
} }
} }
func (w *asyncWorker) EnqueueRateLimited(obj runtime.Object) { func (w *asyncWorker) Enqueue(obj runtime.Object) {
key, err := w.keyFunc(obj) key, err := w.keyFunc(obj)
if err != nil { if err != nil {
klog.Warningf("Failed to generate key for obj: %s", obj.GetObjectKind().GroupVersionKind()) klog.Warningf("Failed to generate key for obj: %s", obj.GetObjectKind().GroupVersionKind())
@ -68,16 +68,16 @@ func (w *asyncWorker) EnqueueRateLimited(obj runtime.Object) {
return return
} }
w.AddRateLimited(key) w.Add(key)
} }
func (w *asyncWorker) AddRateLimited(item interface{}) { func (w *asyncWorker) Add(item interface{}) {
if item == nil { if item == nil {
klog.Warningf("Ignore nil item from queue") klog.Warningf("Ignore nil item from queue")
return return
} }
w.queue.AddRateLimited(item) w.queue.Add(item)
} }
func (w *asyncWorker) handleError(err error, key interface{}) { func (w *asyncWorker) handleError(err error, key interface{}) {