diff --git a/pkg/clusterdiscovery/clusterapi/clusterapi.go b/pkg/clusterdiscovery/clusterapi/clusterapi.go index 304eb219f..ce240044d 100644 --- a/pkg/clusterdiscovery/clusterapi/clusterapi.go +++ b/pkg/clusterdiscovery/clusterapi/clusterapi.go @@ -4,7 +4,6 @@ import ( "context" "fmt" "os" - "time" corev1 "k8s.io/api/core/v1" apierrors "k8s.io/apimachinery/pkg/api/errors" @@ -52,7 +51,7 @@ func (d *ClusterDetector) Start(ctx context.Context) error { d.stopCh = ctx.Done() d.EventHandler = informermanager.NewHandlerOnEvents(d.OnAdd, d.OnUpdate, d.OnDelete) - d.Processor = util.NewAsyncWorker("cluster-api cluster detector", time.Second, ClusterWideKeyFunc, d.Reconcile) + d.Processor = util.NewAsyncWorker("cluster-api cluster detector", ClusterWideKeyFunc, d.Reconcile) d.Processor.Run(1, d.stopCh) d.discoveryCluster() diff --git a/pkg/controllers/mcs/service_export_controller.go b/pkg/controllers/mcs/service_export_controller.go index c1025ad0a..da91882c1 100644 --- a/pkg/controllers/mcs/service_export_controller.go +++ b/pkg/controllers/mcs/service_export_controller.go @@ -5,7 +5,6 @@ import ( "fmt" "reflect" "sync" - "time" discoveryv1beta1 "k8s.io/api/discovery/v1beta1" apierrors "k8s.io/apimachinery/pkg/api/errors" @@ -106,7 +105,7 @@ func (c *ServiceExportController) SetupWithManager(mgr controllerruntime.Manager // RunWorkQueue initializes worker and run it, worker will process resource asynchronously. func (c *ServiceExportController) RunWorkQueue() { - c.worker = util.NewAsyncWorker("service-export", time.Second, nil, c.syncServiceExportOrEndpointSlice) + c.worker = util.NewAsyncWorker("service-export", nil, c.syncServiceExportOrEndpointSlice) c.worker.Run(c.WorkerNumber, c.StopChan) } diff --git a/pkg/controllers/status/workstatus_controller.go b/pkg/controllers/status/workstatus_controller.go index af71f1860..8d378c195 100644 --- a/pkg/controllers/status/workstatus_controller.go +++ b/pkg/controllers/status/workstatus_controller.go @@ -5,7 +5,6 @@ import ( "encoding/json" "fmt" "reflect" - "time" apierrors "k8s.io/apimachinery/pkg/api/errors" "k8s.io/apimachinery/pkg/api/meta" @@ -108,7 +107,7 @@ func (c *WorkStatusController) getEventHandler() cache.ResourceEventHandler { // RunWorkQueue initializes worker and run it, worker will process resource asynchronously. func (c *WorkStatusController) RunWorkQueue() { - c.worker = util.NewAsyncWorker("work-status", time.Second, generateKey, c.syncWorkStatus) + c.worker = util.NewAsyncWorker("work-status", generateKey, c.syncWorkStatus) c.worker.Run(c.WorkerNumber, c.StopChan) } diff --git a/pkg/scheduler/scheduler.go b/pkg/scheduler/scheduler.go index d6d1d3d6d..43d12a578 100644 --- a/pkg/scheduler/scheduler.go +++ b/pkg/scheduler/scheduler.go @@ -131,7 +131,7 @@ func NewScheduler(dynamicClient dynamic.Interface, karmadaClient karmadaclientse if opts.EnableSchedulerEstimator { sched.schedulerEstimatorCache = estimatorclient.NewSchedulerEstimatorCache() sched.schedulerEstimatorPort = opts.SchedulerEstimatorPort - sched.schedulerEstimatorWorker = util.NewAsyncWorker("scheduler-estimator", 0, nil, sched.reconcileEstimatorConnection) + sched.schedulerEstimatorWorker = util.NewAsyncWorker("scheduler-estimator", nil, sched.reconcileEstimatorConnection) schedulerEstimator := estimatorclient.NewSchedulerEstimator(sched.schedulerEstimatorCache, opts.SchedulerEstimatorTimeout.Duration) estimatorclient.RegisterSchedulerEstimator(schedulerEstimator) } diff --git a/pkg/util/detector/detector.go b/pkg/util/detector/detector.go index 9c2fef768..830073ef0 100644 --- a/pkg/util/detector/detector.go +++ b/pkg/util/detector/detector.go @@ -81,9 +81,9 @@ func (d *ResourceDetector) Start(ctx context.Context) error { d.stopCh = ctx.Done() // setup policy reconcile worker - d.policyReconcileWorker = util.NewAsyncWorker("propagationPolicy reconciler", 1*time.Millisecond, ClusterWideKeyFunc, d.ReconcilePropagationPolicy) + d.policyReconcileWorker = util.NewAsyncWorker("propagationPolicy reconciler", ClusterWideKeyFunc, d.ReconcilePropagationPolicy) d.policyReconcileWorker.Run(1, d.stopCh) - d.clusterPolicyReconcileWorker = util.NewAsyncWorker("clusterPropagationPolicy reconciler", time.Microsecond, ClusterWideKeyFunc, d.ReconcileClusterPropagationPolicy) + d.clusterPolicyReconcileWorker = util.NewAsyncWorker("clusterPropagationPolicy reconciler", ClusterWideKeyFunc, d.ReconcileClusterPropagationPolicy) d.clusterPolicyReconcileWorker.Run(1, d.stopCh) // watch and enqueue PropagationPolicy changes. @@ -107,7 +107,7 @@ func (d *ResourceDetector) Start(ctx context.Context) error { d.clusterPropagationPolicyLister = d.InformerManager.Lister(clusterPropagationPolicyGVR) // setup binding reconcile worker - d.bindingReconcileWorker = util.NewAsyncWorker("resourceBinding reconciler", time.Microsecond, ClusterWideKeyFunc, d.ReconcileResourceBinding) + d.bindingReconcileWorker = util.NewAsyncWorker("resourceBinding reconciler", ClusterWideKeyFunc, d.ReconcileResourceBinding) d.bindingReconcileWorker.Run(1, d.stopCh) // watch and enqueue ResourceBinding changes. @@ -130,7 +130,7 @@ func (d *ResourceDetector) Start(ctx context.Context) error { d.InformerManager.ForResource(clusterResourceBindingGVR, clusterBindingHandler) d.EventHandler = informermanager.NewFilteringHandlerOnAllEvents(d.EventFilter, d.OnAdd, d.OnUpdate, d.OnDelete) - d.Processor = util.NewAsyncWorker("resource detector", time.Microsecond, ClusterWideKeyFunc, d.Reconcile) + d.Processor = util.NewAsyncWorker("resource detector", ClusterWideKeyFunc, d.Reconcile) d.Processor.Run(1, d.stopCh) go d.discoverResources(30 * time.Second) diff --git a/pkg/util/worker.go b/pkg/util/worker.go index 237cc5d7e..be1c4deb3 100644 --- a/pkg/util/worker.go +++ b/pkg/util/worker.go @@ -1,12 +1,11 @@ package util import ( - "time" + "k8s.io/client-go/util/workqueue" + "k8s.io/klog/v2" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/util/wait" - "k8s.io/client-go/util/workqueue" - "k8s.io/klog/v2" ) const ( @@ -48,17 +47,14 @@ type asyncWorker struct { reconcileFunc ReconcileFunc // queue allowing parallel processing of resources. queue workqueue.RateLimitingInterface - // interval is the interval for process object in the queue. - interval time.Duration } // NewAsyncWorker returns a asyncWorker which can process resource periodic. -func NewAsyncWorker(name string, interval time.Duration, keyFunc KeyFunc, reconcileFunc ReconcileFunc) AsyncWorker { +func NewAsyncWorker(name string, keyFunc KeyFunc, reconcileFunc ReconcileFunc) AsyncWorker { return &asyncWorker{ keyFunc: keyFunc, reconcileFunc: reconcileFunc, queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), name), - interval: interval, } } @@ -113,7 +109,7 @@ func (w *asyncWorker) worker() { func (w *asyncWorker) Run(workerNumber int, stopChan <-chan struct{}) { for i := 0; i < workerNumber; i++ { - go wait.Until(w.worker, w.interval, stopChan) + go wait.Until(w.worker, 0, stopChan) } // Ensure all goroutines are cleaned up when the stop channel closes go func() {