Optimize AsyncWorker make it could be re-use by other components (#174)

Signed-off-by: RainbowMango <renhongcai@huawei.com>
This commit is contained in:
Hongcai Ren 2021-02-25 20:27:07 +08:00 committed by GitHub
parent 5c5aacdf51
commit e7298f6f14
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 41 additions and 36 deletions

View File

@ -90,23 +90,26 @@ func (c *WorkStatusController) getEventHandler() cache.ResourceEventHandler {
// RunWorkQueue initializes worker and run it, worker will process resource asynchronously.
func (c *WorkStatusController) RunWorkQueue() {
c.worker = util.NewAsyncWorker(c.syncWorkStatus, "work-status", time.Second)
c.worker = util.NewAsyncWorker("work-status", time.Second, util.GenerateKey, c.syncWorkStatus)
c.worker.Run(c.WorkerNumber, c.StopChan)
}
// syncWorkStatus will find work by label in workload, then update resource status to work status.
// label example: "karmada.io/created-by: karmada-es-member-cluster-1.default-deployment-nginx"
func (c *WorkStatusController) syncWorkStatus(key string) error {
obj, err := c.getObjectFromCache(key)
func (c *WorkStatusController) syncWorkStatus(key util.QueueKey) error {
// we know the key is a string.
// TODO(RainbowMango): change key type from string to struct making better readability.
keyStr := key.(string)
obj, err := c.getObjectFromCache(keyStr)
if err != nil {
if errors.IsNotFound(err) {
return c.handleDeleteEvent(key)
return c.handleDeleteEvent(keyStr)
}
return err
}
if errors.IsNotFound(err) {
return c.handleDeleteEvent(key)
return c.handleDeleteEvent(keyStr)
}
if obj == nil {
// Ignore the object which not managed by current karmada.

View File

@ -33,12 +33,25 @@ type AsyncWorker interface {
Run(workerNumber int, stopChan <-chan struct{})
}
// ReconcileHandler is a callback function for process resources.
type ReconcileHandler func(key string) error
// QueueKey is the item key that stores in queue.
// The key could be arbitrary types.
//
// In some cases, people would like store different resources in a same queue, the traditional full-qualified key,
// such as '<namespace>/<name>', can't distinguish which resource the key belongs to, the key might carry more information
// of a resource, such as GVK(Group Version Kind), in that cases people need to use self-defined key, e.g. a struct.
type QueueKey interface{}
// KeyFunc knows how to make a key from an object. Implementations should be deterministic.
type KeyFunc func(obj interface{}) (QueueKey, error)
// ReconcileFunc knows how to consume items(key) from the queue.
type ReconcileFunc func(key QueueKey) error
type asyncWorker struct {
// reconcile is callback function to process object in the queue.
reconcile ReconcileHandler
// keyFunc is the function that make keys for API objects.
keyFunc KeyFunc
// reconcileFunc is the function that process keys from the queue.
reconcileFunc ReconcileFunc
// queue allowing parallel processing of resources.
queue workqueue.RateLimitingInterface
// interval is the interval for process object in the queue.
@ -46,11 +59,12 @@ type asyncWorker struct {
}
// NewAsyncWorker returns a asyncWorker which can process resource periodic.
func NewAsyncWorker(reconcile ReconcileHandler, name string, interval time.Duration) AsyncWorker {
func NewAsyncWorker(name string, interval time.Duration, keyFunc KeyFunc, reconcileFunc ReconcileFunc) AsyncWorker {
return &asyncWorker{
reconcile: reconcile,
queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), name),
interval: interval,
keyFunc: keyFunc,
reconcileFunc: reconcileFunc,
queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), name),
interval: interval,
}
}
@ -71,7 +85,7 @@ func (w *ClusterWorkload) GetListerKey() string {
}
// GenerateKey generates a key from obj, the key contains cluster, GVK, namespace and name.
func GenerateKey(obj runtime.Object) (string, error) {
func GenerateKey(obj interface{}) (QueueKey, error) {
resource := obj.(*unstructured.Unstructured)
gvk := schema.FromAPIVersionAndKind(resource.GetAPIVersion(), resource.GetKind())
key, err := cache.DeletionHandlingMetaNamespaceKeyFunc(obj)
@ -83,8 +97,9 @@ func GenerateKey(obj runtime.Object) (string, error) {
if err != nil {
return "", err
}
// it happens when the obj not managed by Karmada.
if cluster == "" {
return "", nil
return nil, nil
}
return cluster + "/" + gvk.Group + "/" + gvk.Version + "/" + gvk.Kind + "/" + key, nil
}
@ -138,22 +153,14 @@ func SplitMetaKey(key string) (ClusterWorkload, error) {
return clusterWorkload, nil
}
func (w *asyncWorker) processKey(obj runtime.Object) string {
key, err := GenerateKey(obj)
if err != nil {
klog.Errorf("Couldn't get key for object %#v: %v.", obj, err)
return ""
}
if key == "" {
klog.V(2).Infof("The key is empty, object is not created by karmada.")
return ""
}
return key
}
func (w *asyncWorker) EnqueueRateLimited(obj runtime.Object) {
key := w.processKey(obj)
if key == "" {
key, err := w.keyFunc(obj)
if err != nil {
klog.Warningf("Failed to generate key for obj: %s", obj.GetObjectKind().GroupVersionKind())
return
}
// it happens when the obj not managed by Karmada.
if key == nil {
return
}
w.queue.AddRateLimited(key)
@ -165,11 +172,6 @@ func (w *asyncWorker) handleError(err error, key interface{}) {
return
}
_, keyErr := SplitMetaKey(key.(string))
if keyErr != nil {
klog.ErrorS(err, "Failed to split meta namespace cache key", "key", key)
}
if w.queue.NumRequeues(key) < maxRetries {
w.queue.AddRateLimited(key)
return
@ -186,7 +188,7 @@ func (w *asyncWorker) worker() {
}
defer w.queue.Done(key)
err := w.reconcile(key.(string))
err := w.reconcileFunc(key.(string))
w.handleError(err, key)
}