preventing the same job handling by multiple workers
This commit is contained in:
parent
07471c35e8
commit
7521589501
|
|
@ -21,6 +21,7 @@ import (
|
|||
"sync"
|
||||
|
||||
"github.com/golang/glog"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"k8s.io/api/core/v1"
|
||||
|
|
@ -105,6 +106,7 @@ type Controller struct {
|
|||
|
||||
sync.Mutex
|
||||
errTasks workqueue.RateLimitingInterface
|
||||
|
||||
}
|
||||
|
||||
// NewJobController create new Job Controller
|
||||
|
|
@ -130,6 +132,7 @@ func NewJobController(
|
|||
errTasks: newRateLimitingQueue(),
|
||||
recorder: recorder,
|
||||
priorityClasses: make(map[string]*v1beta1.PriorityClass),
|
||||
|
||||
}
|
||||
|
||||
cc.jobInformer = vkinfoext.NewSharedInformerFactory(cc.vkClients, 0).Batch().V1alpha1().Jobs()
|
||||
|
|
@ -234,6 +237,25 @@ func (cc *Controller) processNextReq() bool {
|
|||
req := obj.(apis.Request)
|
||||
defer cc.queue.Done(req)
|
||||
|
||||
key := jobcache.JobKeyByReq(&req)
|
||||
|
||||
// prevent multi threads processing the same job simultaneously.
|
||||
cc.jobsLock.Lock()
|
||||
if cc.jobsMap[key] {
|
||||
// the job is being processed by some other thread
|
||||
cc.queue.AddRateLimited(req)
|
||||
cc.jobsLock.Unlock()
|
||||
return true
|
||||
} else {
|
||||
cc.jobsMap[key] = true
|
||||
cc.jobsLock.Unlock()
|
||||
defer func() {
|
||||
cc.jobsLock.Lock()
|
||||
delete(cc.jobsMap, key)
|
||||
cc.jobsLock.Unlock()
|
||||
}()
|
||||
}
|
||||
|
||||
glog.V(3).Infof("Try to handle request <%v>", req)
|
||||
|
||||
jobInfo, err := cc.cache.Get(jobcache.JobKeyByReq(&req))
|
||||
|
|
|
|||
Loading…
Reference in New Issue