Replaced FIFO by workqueue.

Signed-off-by: Da K. Ma <klaus1982.cn@gmail.com>
This commit is contained in:
Da K. Ma 2019-02-11 14:24:23 +08:00
parent 7588ed66f6
commit 16512bcd39
1 changed files with 47 additions and 94 deletions

View File

@ -26,6 +26,7 @@ import (
"k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/informers"
infov1 "k8s.io/client-go/informers/core/v1"
policyv1 "k8s.io/client-go/informers/policy/v1beta1"
@ -35,6 +36,7 @@ import (
"k8s.io/client-go/rest"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/tools/record"
"k8s.io/client-go/util/workqueue"
podutil "k8s.io/kubernetes/pkg/api/v1/pod"
"k8s.io/kubernetes/pkg/scheduler/volumebinder"
@ -79,8 +81,8 @@ type SchedulerCache struct {
Nodes map[string]*kbapi.NodeInfo
Queues map[kbapi.QueueID]*kbapi.QueueInfo
errTasks *cache.FIFO
deletedJobs *cache.FIFO
errTasks workqueue.RateLimitingInterface
deletedJobs workqueue.RateLimitingInterface
namespaceAsQueue bool
}
@ -164,41 +166,13 @@ func (dvb *defaultVolumeBinder) BindVolumes(task *api.TaskInfo) error {
return dvb.volumeBinder.Binder.BindPodVolumes(task.Pod)
}
func taskKey(obj interface{}) (string, error) {
if obj == nil {
return "", fmt.Errorf("the object is nil")
}
task, ok := obj.(*kbapi.TaskInfo)
if !ok {
return "", fmt.Errorf("failed to convert %v to TaskInfo", obj)
}
return string(task.UID), nil
}
func jobKey(obj interface{}) (string, error) {
if obj == nil {
return "", fmt.Errorf("the object is nil")
}
job, ok := obj.(*kbapi.JobInfo)
if !ok {
return "", fmt.Errorf("failed to convert %v to TaskInfo", obj)
}
return string(job.UID), nil
}
func newSchedulerCache(config *rest.Config, schedulerName string, nsAsQueue bool) *SchedulerCache {
sc := &SchedulerCache{
Jobs: make(map[kbapi.JobID]*kbapi.JobInfo),
Nodes: make(map[string]*kbapi.NodeInfo),
Queues: make(map[kbapi.QueueID]*kbapi.QueueInfo),
errTasks: cache.NewFIFO(taskKey),
deletedJobs: cache.NewFIFO(jobKey),
errTasks: workqueue.NewRateLimitingQueue(workqueue.DefaultControllerRateLimiter()),
deletedJobs: workqueue.NewRateLimitingQueue(workqueue.DefaultControllerRateLimiter()),
kubeclient: kubernetes.NewForConfigOrDie(config),
kbclient: kbver.NewForConfigOrDie(config),
namespaceAsQueue: nsAsQueue,
@ -324,10 +298,10 @@ func (sc *SchedulerCache) Run(stopCh <-chan struct{}) {
}
// Re-sync error tasks.
go sc.resync()
go wait.Until(sc.processResyncTask, 0, stopCh)
// Cleanup jobs.
go sc.cleanupJobs()
go wait.Until(sc.processCleanupJob, 0, stopCh)
}
func (sc *SchedulerCache) WaitForCacheSync(stopCh <-chan struct{}) bool {
@ -388,7 +362,9 @@ func (sc *SchedulerCache) Evict(taskInfo *kbapi.TaskInfo, reason string) error {
}
// Add new task to node.
node.UpdateTask(task)
if err := node.UpdateTask(task); err != nil {
return err
}
p := task.Pod
@ -430,7 +406,9 @@ func (sc *SchedulerCache) Bind(taskInfo *kbapi.TaskInfo, hostname string) error
task.NodeName = hostname
// Add task to the node.
node.AddTask(task)
if err := node.AddTask(task); err != nil {
return err
}
p := task.Pod
@ -476,77 +454,52 @@ func (sc *SchedulerCache) taskUnschedulable(task *api.TaskInfo, message string)
func (sc *SchedulerCache) deleteJob(job *kbapi.JobInfo) {
glog.V(3).Infof("Try to delete Job <%v:%v/%v>", job.UID, job.Namespace, job.Name)
time.AfterFunc(5*time.Second, func() {
sc.deletedJobs.AddIfNotPresent(job)
})
sc.deletedJobs.AddRateLimited(job)
}
func (sc *SchedulerCache) processCleanupJob() error {
_, err := sc.deletedJobs.Pop(func(obj interface{}) error {
job, ok := obj.(*kbapi.JobInfo)
if !ok {
return fmt.Errorf("failed to convert %v to *v1.Pod", obj)
}
func (sc *SchedulerCache) processCleanupJob() {
obj, shutdown := sc.deletedJobs.Get()
if shutdown {
return
}
func() {
sc.Mutex.Lock()
defer sc.Mutex.Unlock()
job, found := obj.(*kbapi.JobInfo)
if !found {
glog.Errorf("Failed to convert <%v> to *JobInfo", obj)
return
}
if kbapi.JobTerminated(job) {
delete(sc.Jobs, job.UID)
glog.V(3).Infof("Job <%v:%v/%v> was deleted.", job.UID, job.Namespace, job.Name)
} else {
// Retry
sc.deleteJob(job)
}
}()
sc.Mutex.Lock()
defer sc.Mutex.Unlock()
return nil
})
return err
}
func (sc *SchedulerCache) cleanupJobs() {
for {
err := sc.processCleanupJob()
if err != nil {
glog.Errorf("Failed to process job clean up: %v", err)
}
if kbapi.JobTerminated(job) {
delete(sc.Jobs, job.UID)
glog.V(3).Infof("Job <%v:%v/%v> was deleted.", job.UID, job.Namespace, job.Name)
} else {
// Retry
sc.deleteJob(job)
}
}
func (sc *SchedulerCache) resyncTask(task *kbapi.TaskInfo) {
if err := sc.errTasks.AddIfNotPresent(task); err != nil {
glog.Errorf("Failed to re-sync tasks <%v/%v>: %v",
task.Namespace, task.Name, err)
}
sc.errTasks.AddRateLimited(task)
}
func (sc *SchedulerCache) resync() {
for {
err := sc.processResyncTask()
if err != nil {
glog.Errorf("Failed to process resync: %v", err)
}
func (sc *SchedulerCache) processResyncTask() {
obj, shutdown := sc.errTasks.Get()
if shutdown {
return
}
task, ok := obj.(*kbapi.TaskInfo)
if !ok {
glog.Errorf("failed to convert %v to *v1.Pod", obj)
return
}
}
func (sc *SchedulerCache) processResyncTask() error {
_, err := sc.errTasks.Pop(func(obj interface{}) error {
task, ok := obj.(*kbapi.TaskInfo)
if !ok {
return fmt.Errorf("failed to convert %v to *v1.Pod", obj)
}
if err := sc.syncTask(task); err != nil {
glog.Errorf("Failed to sync pod <%v/%v>", task.Namespace, task.Name)
return err
}
return nil
})
return err
if err := sc.syncTask(task); err != nil {
glog.Errorf("Failed to sync pod <%v/%v>, retry it.", task.Namespace, task.Name)
sc.resyncTask(task)
}
}
func (sc *SchedulerCache) Snapshot() *kbapi.ClusterInfo {