diff --git a/pkg/scheduler/cache/cache.go b/pkg/scheduler/cache/cache.go index 988e1d6fa..8f688a83b 100644 --- a/pkg/scheduler/cache/cache.go +++ b/pkg/scheduler/cache/cache.go @@ -26,6 +26,7 @@ import ( "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/util/wait" "k8s.io/client-go/informers" infov1 "k8s.io/client-go/informers/core/v1" policyv1 "k8s.io/client-go/informers/policy/v1beta1" @@ -35,6 +36,7 @@ import ( "k8s.io/client-go/rest" "k8s.io/client-go/tools/cache" "k8s.io/client-go/tools/record" + "k8s.io/client-go/util/workqueue" podutil "k8s.io/kubernetes/pkg/api/v1/pod" "k8s.io/kubernetes/pkg/scheduler/volumebinder" @@ -79,8 +81,8 @@ type SchedulerCache struct { Nodes map[string]*kbapi.NodeInfo Queues map[kbapi.QueueID]*kbapi.QueueInfo - errTasks *cache.FIFO - deletedJobs *cache.FIFO + errTasks workqueue.RateLimitingInterface + deletedJobs workqueue.RateLimitingInterface namespaceAsQueue bool } @@ -164,41 +166,13 @@ func (dvb *defaultVolumeBinder) BindVolumes(task *api.TaskInfo) error { return dvb.volumeBinder.Binder.BindPodVolumes(task.Pod) } -func taskKey(obj interface{}) (string, error) { - if obj == nil { - return "", fmt.Errorf("the object is nil") - } - - task, ok := obj.(*kbapi.TaskInfo) - - if !ok { - return "", fmt.Errorf("failed to convert %v to TaskInfo", obj) - } - - return string(task.UID), nil -} - -func jobKey(obj interface{}) (string, error) { - if obj == nil { - return "", fmt.Errorf("the object is nil") - } - - job, ok := obj.(*kbapi.JobInfo) - - if !ok { - return "", fmt.Errorf("failed to convert %v to TaskInfo", obj) - } - - return string(job.UID), nil -} - func newSchedulerCache(config *rest.Config, schedulerName string, nsAsQueue bool) *SchedulerCache { sc := &SchedulerCache{ Jobs: make(map[kbapi.JobID]*kbapi.JobInfo), Nodes: make(map[string]*kbapi.NodeInfo), Queues: make(map[kbapi.QueueID]*kbapi.QueueInfo), - errTasks: cache.NewFIFO(taskKey), - deletedJobs: cache.NewFIFO(jobKey), + errTasks: workqueue.NewRateLimitingQueue(workqueue.DefaultControllerRateLimiter()), + deletedJobs: workqueue.NewRateLimitingQueue(workqueue.DefaultControllerRateLimiter()), kubeclient: kubernetes.NewForConfigOrDie(config), kbclient: kbver.NewForConfigOrDie(config), namespaceAsQueue: nsAsQueue, @@ -324,10 +298,10 @@ func (sc *SchedulerCache) Run(stopCh <-chan struct{}) { } // Re-sync error tasks. - go sc.resync() + go wait.Until(sc.processResyncTask, 0, stopCh) // Cleanup jobs. - go sc.cleanupJobs() + go wait.Until(sc.processCleanupJob, 0, stopCh) } func (sc *SchedulerCache) WaitForCacheSync(stopCh <-chan struct{}) bool { @@ -388,7 +362,9 @@ func (sc *SchedulerCache) Evict(taskInfo *kbapi.TaskInfo, reason string) error { } // Add new task to node. - node.UpdateTask(task) + if err := node.UpdateTask(task); err != nil { + return err + } p := task.Pod @@ -430,7 +406,9 @@ func (sc *SchedulerCache) Bind(taskInfo *kbapi.TaskInfo, hostname string) error task.NodeName = hostname // Add task to the node. - node.AddTask(task) + if err := node.AddTask(task); err != nil { + return err + } p := task.Pod @@ -476,77 +454,52 @@ func (sc *SchedulerCache) taskUnschedulable(task *api.TaskInfo, message string) func (sc *SchedulerCache) deleteJob(job *kbapi.JobInfo) { glog.V(3).Infof("Try to delete Job <%v:%v/%v>", job.UID, job.Namespace, job.Name) - time.AfterFunc(5*time.Second, func() { - sc.deletedJobs.AddIfNotPresent(job) - }) + sc.deletedJobs.AddRateLimited(job) } -func (sc *SchedulerCache) processCleanupJob() error { - _, err := sc.deletedJobs.Pop(func(obj interface{}) error { - job, ok := obj.(*kbapi.JobInfo) - if !ok { - return fmt.Errorf("failed to convert %v to *v1.Pod", obj) - } +func (sc *SchedulerCache) processCleanupJob() { + obj, shutdown := sc.deletedJobs.Get() + if shutdown { + return + } - func() { - sc.Mutex.Lock() - defer sc.Mutex.Unlock() + job, found := obj.(*kbapi.JobInfo) + if !found { + glog.Errorf("Failed to convert <%v> to *JobInfo", obj) + return + } - if kbapi.JobTerminated(job) { - delete(sc.Jobs, job.UID) - glog.V(3).Infof("Job <%v:%v/%v> was deleted.", job.UID, job.Namespace, job.Name) - } else { - // Retry - sc.deleteJob(job) - } - }() + sc.Mutex.Lock() + defer sc.Mutex.Unlock() - return nil - }) - - return err -} - -func (sc *SchedulerCache) cleanupJobs() { - for { - err := sc.processCleanupJob() - if err != nil { - glog.Errorf("Failed to process job clean up: %v", err) - } + if kbapi.JobTerminated(job) { + delete(sc.Jobs, job.UID) + glog.V(3).Infof("Job <%v:%v/%v> was deleted.", job.UID, job.Namespace, job.Name) + } else { + // Retry + sc.deleteJob(job) } } func (sc *SchedulerCache) resyncTask(task *kbapi.TaskInfo) { - if err := sc.errTasks.AddIfNotPresent(task); err != nil { - glog.Errorf("Failed to re-sync tasks <%v/%v>: %v", - task.Namespace, task.Name, err) - } + sc.errTasks.AddRateLimited(task) } -func (sc *SchedulerCache) resync() { - for { - err := sc.processResyncTask() - if err != nil { - glog.Errorf("Failed to process resync: %v", err) - } +func (sc *SchedulerCache) processResyncTask() { + obj, shutdown := sc.errTasks.Get() + if shutdown { + return + } + task, ok := obj.(*kbapi.TaskInfo) + if !ok { + glog.Errorf("failed to convert %v to *v1.Pod", obj) + return } -} -func (sc *SchedulerCache) processResyncTask() error { - _, err := sc.errTasks.Pop(func(obj interface{}) error { - task, ok := obj.(*kbapi.TaskInfo) - if !ok { - return fmt.Errorf("failed to convert %v to *v1.Pod", obj) - } - - if err := sc.syncTask(task); err != nil { - glog.Errorf("Failed to sync pod <%v/%v>", task.Namespace, task.Name) - return err - } - return nil - }) - - return err + if err := sc.syncTask(task); err != nil { + glog.Errorf("Failed to sync pod <%v/%v>, retry it.", task.Namespace, task.Name) + sc.resyncTask(task) + } } func (sc *SchedulerCache) Snapshot() *kbapi.ClusterInfo {