Merge pull request #1906 from justadogistaken/optimize/reuse-predicate-error-on-same-task

optimize: reuse predicate error on same task group
This commit is contained in:
Volcano Bot 2021-12-18 14:31:28 +08:00 committed by GitHub
commit 5121f14b2f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 124 additions and 67 deletions

View File

@ -65,7 +65,7 @@ func (ji *JobInfo) SetJob(job *batch.Job) {
func (ji *JobInfo) AddPod(pod *v1.Pod) error {
taskName, found := pod.Annotations[batch.TaskSpecKey]
if !found {
return fmt.Errorf("failed to taskName of Pod <%s/%s>",
return fmt.Errorf("failed to find taskName of Pod <%s/%s>",
pod.Namespace, pod.Name)
}

View File

@ -198,7 +198,7 @@ func (alloc *Action) Execute(ssn *framework.Session) {
tasks.Len(), job.Namespace, job.Name)
stmt := framework.NewStatement(ssn)
ph := util.NewPredicateHelper()
for !tasks.Empty() {
task := tasks.Pop().(*api.TaskInfo)
@ -211,7 +211,7 @@ func (alloc *Action) Execute(ssn *framework.Session) {
klog.V(3).Infof("There are <%d> nodes for Job <%v/%v>", len(nodes), job.Namespace, job.Name)
predicateNodes, fitErrors := util.PredicateNodes(task, nodes, predicateFn)
predicateNodes, fitErrors := ph.PredicateNodes(task, nodes, predicateFn)
if len(predicateNodes) == 0 {
job.NodesFitErrors[task.UID] = fitErrors
break

View File

@ -79,6 +79,7 @@ func (alloc *Action) Execute(ssn *framework.Session) {
}
}
ph := util.NewPredicateHelper()
// Preemption between Jobs within Queue.
for _, queue := range queues {
for {
@ -124,7 +125,7 @@ func (alloc *Action) Execute(ssn *framework.Session) {
}
// Preempt other jobs within queue
return job.Queue == preemptorJob.Queue && preemptor.Job != task.Job
}); preempted {
}, ph); preempted {
assigned = true
}
}
@ -172,7 +173,7 @@ func (alloc *Action) Execute(ssn *framework.Session) {
}
// Preempt tasks within job.
return preemptor.Job == task.Job
})
}, ph)
stmt.Commit()
// If no preemption, next job.
@ -194,12 +195,13 @@ func preempt(
stmt *framework.Statement,
preemptor *api.TaskInfo,
filter func(*api.TaskInfo) bool,
predicateHelper util.PredicateHelper,
) (bool, error) {
assigned := false
allNodes := ssn.NodeList
predicateNodes, _ := util.PredicateNodes(preemptor, allNodes, ssn.PredicateFn)
predicateNodes, _ := predicateHelper.PredicateNodes(preemptor, allNodes, ssn.PredicateFn)
nodeScores := util.PrioritizeNodes(preemptor, predicateNodes, ssn.BatchNodeOrderFn, ssn.NodeOrderMapFn, ssn.NodeOrderReduceFn)

View File

@ -219,6 +219,13 @@ func (ti *TaskInfo) Clone() *TaskInfo {
}
}
func (ti *TaskInfo) GetTaskSpecKey() TaskID {
if ti.Pod == nil {
return ""
}
return getTaskID(ti.Pod)
}
// String returns the taskInfo details in a string
func (ti TaskInfo) String() string {
return fmt.Sprintf("Task (%v:%v/%v): job %v, status %v, pri %v"+
@ -382,7 +389,7 @@ func (ji *JobInfo) extractPreemptable(pg *PodGroup) bool {
// extractRevocableZone return volcano.sh/revocable-zone value for pod/podgroup
func (ji *JobInfo) extractRevocableZone(pg *PodGroup) string {
// check annotaion first
// check annotation first
if len(pg.Annotations) > 0 {
if value, found := pg.Annotations[v1beta1.RevocableZone]; found {
if value != "*" {

View File

@ -0,0 +1,108 @@
package util
import (
"context"
"fmt"
"sync"
"sync/atomic"
"k8s.io/client-go/util/workqueue"
"k8s.io/klog"
"volcano.sh/volcano/pkg/scheduler/api"
)
type PredicateHelper interface {
PredicateNodes(task *api.TaskInfo, nodes []*api.NodeInfo, fn api.PredicateFn) ([]*api.NodeInfo, *api.FitErrors)
}
type predicateHelper struct {
taskPredicateErrorCache map[string]map[string]error
}
// PredicateNodes returns the specified number of nodes that fit a task
func (ph *predicateHelper) PredicateNodes(task *api.TaskInfo, nodes []*api.NodeInfo, fn api.PredicateFn) ([]*api.NodeInfo, *api.FitErrors) {
var errorLock sync.RWMutex
fe := api.NewFitErrors()
allNodes := len(nodes)
if allNodes == 0 {
return make([]*api.NodeInfo, 0), fe
}
numNodesToFind := CalculateNumOfFeasibleNodesToFind(int32(allNodes))
//allocate enough space to avoid growing it
predicateNodes := make([]*api.NodeInfo, numNodesToFind)
numFoundNodes := int32(0)
processedNodes := int32(0)
taskGroupid := taskGroupID(task)
nodeErrorCache, taskFailedBefore := ph.taskPredicateErrorCache[taskGroupid]
if nodeErrorCache == nil {
nodeErrorCache = map[string]error{}
}
//create a context with cancellation
ctx, cancel := context.WithCancel(context.Background())
checkNode := func(index int) {
// Check the nodes starting from where is left off in the previous scheduling cycle,
// to make sure all nodes have the same chance of being examined across pods.
node := nodes[(lastProcessedNodeIndex+index)%allNodes]
atomic.AddInt32(&processedNodes, 1)
klog.V(4).Infof("Considering Task <%v/%v> on node <%v>: <%v> vs. <%v>",
task.Namespace, task.Name, node.Name, task.Resreq, node.Idle)
// Check if the task had "predicate" failure before.
// And then check if the task failed to predict on this node before.
if taskFailedBefore {
errorLock.RLock()
errC, ok := nodeErrorCache[node.Name]
errorLock.RUnlock()
if ok {
errorLock.Lock()
fe.SetNodeError(node.Name, errC)
errorLock.Unlock()
return
}
}
// TODO (k82cn): Enable eCache for performance improvement.
if err := fn(task, node); err != nil {
klog.V(3).Infof("Predicates failed for task <%s/%s> on node <%s>: %v",
task.Namespace, task.Name, node.Name, err)
errorLock.Lock()
nodeErrorCache[node.Name] = err
ph.taskPredicateErrorCache[taskGroupid] = nodeErrorCache
fe.SetNodeError(node.Name, err)
errorLock.Unlock()
return
}
//check if the number of found nodes is more than the numNodesTofind
length := atomic.AddInt32(&numFoundNodes, 1)
if length > numNodesToFind {
cancel()
atomic.AddInt32(&numFoundNodes, -1)
} else {
predicateNodes[length-1] = node
}
}
//workqueue.ParallelizeUntil(context.TODO(), 16, len(nodes), checkNode)
workqueue.ParallelizeUntil(ctx, 16, allNodes, checkNode)
//processedNodes := int(numFoundNodes) + len(filteredNodesStatuses) + len(failedPredicateMap)
lastProcessedNodeIndex = (lastProcessedNodeIndex + int(processedNodes)) % allNodes
predicateNodes = predicateNodes[:numFoundNodes]
return predicateNodes, fe
}
func taskGroupID(task *api.TaskInfo) string {
return fmt.Sprintf("%s/%s", task.Job, task.GetTaskSpecKey())
}
func NewPredicateHelper() PredicateHelper {
return &predicateHelper{taskPredicateErrorCache: map[string]map[string]error{}}
}

View File

@ -23,7 +23,6 @@ import (
"math/rand"
"sort"
"sync"
"sync/atomic"
"k8s.io/client-go/util/workqueue"
"k8s.io/klog"
@ -67,65 +66,6 @@ func CalculateNumOfFeasibleNodesToFind(numAllNodes int32) (numNodes int32) {
return numNodes
}
// PredicateNodes returns the specified number of nodes that fit a task
func PredicateNodes(task *api.TaskInfo, nodes []*api.NodeInfo, fn api.PredicateFn) ([]*api.NodeInfo, *api.FitErrors) {
//var workerLock sync.Mutex
var errorLock sync.Mutex
fe := api.NewFitErrors()
allNodes := len(nodes)
if allNodes == 0 {
return make([]*api.NodeInfo, 0), fe
}
numNodesToFind := CalculateNumOfFeasibleNodesToFind(int32(allNodes))
//allocate enough space to avoid growing it
predicateNodes := make([]*api.NodeInfo, numNodesToFind)
numFoundNodes := int32(0)
processedNodes := int32(0)
//create a context with cancellation
ctx, cancel := context.WithCancel(context.Background())
checkNode := func(index int) {
// Check the nodes starting from where is left off in the previous scheduling cycle,
// to make sure all nodes have the same chance of being examined across pods.
node := nodes[(lastProcessedNodeIndex+index)%allNodes]
atomic.AddInt32(&processedNodes, 1)
klog.V(4).Infof("Considering Task <%v/%v> on node <%v>: <%v> vs. <%v>",
task.Namespace, task.Name, node.Name, task.Resreq, node.Idle)
// TODO (k82cn): Enable eCache for performance improvement.
if err := fn(task, node); err != nil {
klog.V(3).Infof("Predicates failed for task <%s/%s> on node <%s>: %v",
task.Namespace, task.Name, node.Name, err)
errorLock.Lock()
fe.SetNodeError(node.Name, err)
errorLock.Unlock()
return
}
//check if the number of found nodes is more than the numNodesTofind
length := atomic.AddInt32(&numFoundNodes, 1)
if length > numNodesToFind {
cancel()
atomic.AddInt32(&numFoundNodes, -1)
} else {
predicateNodes[length-1] = node
}
}
//workqueue.ParallelizeUntil(context.TODO(), 16, len(nodes), checkNode)
workqueue.ParallelizeUntil(ctx, 16, allNodes, checkNode)
//processedNodes := int(numFoundNodes) + len(filteredNodesStatuses) + len(failedPredicateMap)
lastProcessedNodeIndex = (lastProcessedNodeIndex + int(processedNodes)) % allNodes
predicateNodes = predicateNodes[:numFoundNodes]
return predicateNodes, fe
}
// PrioritizeNodes returns a map whose key is node's score and value are corresponding nodes
func PrioritizeNodes(task *api.TaskInfo, nodes []*api.NodeInfo, batchFn api.BatchNodeOrderFn, mapFn api.NodeOrderMapFn, reduceFn api.NodeOrderReduceFn) map[float64][]*api.NodeInfo {
pluginNodeScoreMap := map[string]k8sframework.NodeScoreList{}