diff --git a/pkg/controllers/status/workstatus_controller.go b/pkg/controllers/status/workstatus_controller.go index 71030e679..e3bd7fb96 100644 --- a/pkg/controllers/status/workstatus_controller.go +++ b/pkg/controllers/status/workstatus_controller.go @@ -90,23 +90,26 @@ func (c *WorkStatusController) getEventHandler() cache.ResourceEventHandler { // RunWorkQueue initializes worker and run it, worker will process resource asynchronously. func (c *WorkStatusController) RunWorkQueue() { - c.worker = util.NewAsyncWorker(c.syncWorkStatus, "work-status", time.Second) + c.worker = util.NewAsyncWorker("work-status", time.Second, util.GenerateKey, c.syncWorkStatus) c.worker.Run(c.WorkerNumber, c.StopChan) } // syncWorkStatus will find work by label in workload, then update resource status to work status. // label example: "karmada.io/created-by: karmada-es-member-cluster-1.default-deployment-nginx" -func (c *WorkStatusController) syncWorkStatus(key string) error { - obj, err := c.getObjectFromCache(key) +func (c *WorkStatusController) syncWorkStatus(key util.QueueKey) error { + // we know the key is a string. + // TODO(RainbowMango): change key type from string to struct making better readability. + keyStr := key.(string) + obj, err := c.getObjectFromCache(keyStr) if err != nil { if errors.IsNotFound(err) { - return c.handleDeleteEvent(key) + return c.handleDeleteEvent(keyStr) } return err } if errors.IsNotFound(err) { - return c.handleDeleteEvent(key) + return c.handleDeleteEvent(keyStr) } if obj == nil { // Ignore the object which not managed by current karmada. diff --git a/pkg/util/worker.go b/pkg/util/worker.go index bafa23346..e640377ac 100644 --- a/pkg/util/worker.go +++ b/pkg/util/worker.go @@ -33,12 +33,25 @@ type AsyncWorker interface { Run(workerNumber int, stopChan <-chan struct{}) } -// ReconcileHandler is a callback function for process resources. -type ReconcileHandler func(key string) error +// QueueKey is the item key that stores in queue. +// The key could be arbitrary types. +// +// In some cases, people would like store different resources in a same queue, the traditional full-qualified key, +// such as '/', can't distinguish which resource the key belongs to, the key might carry more information +// of a resource, such as GVK(Group Version Kind), in that cases people need to use self-defined key, e.g. a struct. +type QueueKey interface{} + +// KeyFunc knows how to make a key from an object. Implementations should be deterministic. +type KeyFunc func(obj interface{}) (QueueKey, error) + +// ReconcileFunc knows how to consume items(key) from the queue. +type ReconcileFunc func(key QueueKey) error type asyncWorker struct { - // reconcile is callback function to process object in the queue. - reconcile ReconcileHandler + // keyFunc is the function that make keys for API objects. + keyFunc KeyFunc + // reconcileFunc is the function that process keys from the queue. + reconcileFunc ReconcileFunc // queue allowing parallel processing of resources. queue workqueue.RateLimitingInterface // interval is the interval for process object in the queue. @@ -46,11 +59,12 @@ type asyncWorker struct { } // NewAsyncWorker returns a asyncWorker which can process resource periodic. -func NewAsyncWorker(reconcile ReconcileHandler, name string, interval time.Duration) AsyncWorker { +func NewAsyncWorker(name string, interval time.Duration, keyFunc KeyFunc, reconcileFunc ReconcileFunc) AsyncWorker { return &asyncWorker{ - reconcile: reconcile, - queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), name), - interval: interval, + keyFunc: keyFunc, + reconcileFunc: reconcileFunc, + queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), name), + interval: interval, } } @@ -71,7 +85,7 @@ func (w *ClusterWorkload) GetListerKey() string { } // GenerateKey generates a key from obj, the key contains cluster, GVK, namespace and name. -func GenerateKey(obj runtime.Object) (string, error) { +func GenerateKey(obj interface{}) (QueueKey, error) { resource := obj.(*unstructured.Unstructured) gvk := schema.FromAPIVersionAndKind(resource.GetAPIVersion(), resource.GetKind()) key, err := cache.DeletionHandlingMetaNamespaceKeyFunc(obj) @@ -83,8 +97,9 @@ func GenerateKey(obj runtime.Object) (string, error) { if err != nil { return "", err } + // it happens when the obj not managed by Karmada. if cluster == "" { - return "", nil + return nil, nil } return cluster + "/" + gvk.Group + "/" + gvk.Version + "/" + gvk.Kind + "/" + key, nil } @@ -138,22 +153,14 @@ func SplitMetaKey(key string) (ClusterWorkload, error) { return clusterWorkload, nil } -func (w *asyncWorker) processKey(obj runtime.Object) string { - key, err := GenerateKey(obj) - if err != nil { - klog.Errorf("Couldn't get key for object %#v: %v.", obj, err) - return "" - } - if key == "" { - klog.V(2).Infof("The key is empty, object is not created by karmada.") - return "" - } - return key -} - func (w *asyncWorker) EnqueueRateLimited(obj runtime.Object) { - key := w.processKey(obj) - if key == "" { + key, err := w.keyFunc(obj) + if err != nil { + klog.Warningf("Failed to generate key for obj: %s", obj.GetObjectKind().GroupVersionKind()) + return + } + // it happens when the obj not managed by Karmada. + if key == nil { return } w.queue.AddRateLimited(key) @@ -165,11 +172,6 @@ func (w *asyncWorker) handleError(err error, key interface{}) { return } - _, keyErr := SplitMetaKey(key.(string)) - if keyErr != nil { - klog.ErrorS(err, "Failed to split meta namespace cache key", "key", key) - } - if w.queue.NumRequeues(key) < maxRetries { w.queue.AddRateLimited(key) return @@ -186,7 +188,7 @@ func (w *asyncWorker) worker() { } defer w.queue.Done(key) - err := w.reconcile(key.(string)) + err := w.reconcileFunc(key.(string)) w.handleError(err, key) }