optimize: reuse predicate error on same task group
Signed-off-by: eric.bao <baojn1998@163.com>
This commit is contained in:
parent
1399ca1c1f
commit
d1231a049f
|
|
@ -65,7 +65,7 @@ func (ji *JobInfo) SetJob(job *batch.Job) {
|
|||
func (ji *JobInfo) AddPod(pod *v1.Pod) error {
|
||||
taskName, found := pod.Annotations[batch.TaskSpecKey]
|
||||
if !found {
|
||||
return fmt.Errorf("failed to taskName of Pod <%s/%s>",
|
||||
return fmt.Errorf("failed to find taskName of Pod <%s/%s>",
|
||||
pod.Namespace, pod.Name)
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -198,7 +198,7 @@ func (alloc *Action) Execute(ssn *framework.Session) {
|
|||
tasks.Len(), job.Namespace, job.Name)
|
||||
|
||||
stmt := framework.NewStatement(ssn)
|
||||
|
||||
ph := util.NewPredicateHelper()
|
||||
for !tasks.Empty() {
|
||||
task := tasks.Pop().(*api.TaskInfo)
|
||||
|
||||
|
|
@ -211,7 +211,7 @@ func (alloc *Action) Execute(ssn *framework.Session) {
|
|||
|
||||
klog.V(3).Infof("There are <%d> nodes for Job <%v/%v>", len(nodes), job.Namespace, job.Name)
|
||||
|
||||
predicateNodes, fitErrors := util.PredicateNodes(task, nodes, predicateFn)
|
||||
predicateNodes, fitErrors := ph.PredicateNodes(task, nodes, predicateFn)
|
||||
if len(predicateNodes) == 0 {
|
||||
job.NodesFitErrors[task.UID] = fitErrors
|
||||
break
|
||||
|
|
|
|||
|
|
@ -79,6 +79,7 @@ func (alloc *Action) Execute(ssn *framework.Session) {
|
|||
}
|
||||
}
|
||||
|
||||
ph := util.NewPredicateHelper()
|
||||
// Preemption between Jobs within Queue.
|
||||
for _, queue := range queues {
|
||||
for {
|
||||
|
|
@ -124,7 +125,7 @@ func (alloc *Action) Execute(ssn *framework.Session) {
|
|||
}
|
||||
// Preempt other jobs within queue
|
||||
return job.Queue == preemptorJob.Queue && preemptor.Job != task.Job
|
||||
}); preempted {
|
||||
}, ph); preempted {
|
||||
assigned = true
|
||||
}
|
||||
}
|
||||
|
|
@ -172,7 +173,7 @@ func (alloc *Action) Execute(ssn *framework.Session) {
|
|||
}
|
||||
// Preempt tasks within job.
|
||||
return preemptor.Job == task.Job
|
||||
})
|
||||
}, ph)
|
||||
stmt.Commit()
|
||||
|
||||
// If no preemption, next job.
|
||||
|
|
@ -194,12 +195,13 @@ func preempt(
|
|||
stmt *framework.Statement,
|
||||
preemptor *api.TaskInfo,
|
||||
filter func(*api.TaskInfo) bool,
|
||||
predicateHelper util.PredicateHelper,
|
||||
) (bool, error) {
|
||||
assigned := false
|
||||
|
||||
allNodes := ssn.NodeList
|
||||
|
||||
predicateNodes, _ := util.PredicateNodes(preemptor, allNodes, ssn.PredicateFn)
|
||||
predicateNodes, _ := predicateHelper.PredicateNodes(preemptor, allNodes, ssn.PredicateFn)
|
||||
|
||||
nodeScores := util.PrioritizeNodes(preemptor, predicateNodes, ssn.BatchNodeOrderFn, ssn.NodeOrderMapFn, ssn.NodeOrderReduceFn)
|
||||
|
||||
|
|
|
|||
|
|
@ -219,6 +219,13 @@ func (ti *TaskInfo) Clone() *TaskInfo {
|
|||
}
|
||||
}
|
||||
|
||||
func (ti *TaskInfo) GetTaskSpecKey() TaskID {
|
||||
if ti.Pod == nil {
|
||||
return ""
|
||||
}
|
||||
return getTaskID(ti.Pod)
|
||||
}
|
||||
|
||||
// String returns the taskInfo details in a string
|
||||
func (ti TaskInfo) String() string {
|
||||
return fmt.Sprintf("Task (%v:%v/%v): job %v, status %v, pri %v"+
|
||||
|
|
@ -382,7 +389,7 @@ func (ji *JobInfo) extractPreemptable(pg *PodGroup) bool {
|
|||
|
||||
// extractRevocableZone return volcano.sh/revocable-zone value for pod/podgroup
|
||||
func (ji *JobInfo) extractRevocableZone(pg *PodGroup) string {
|
||||
// check annotaion first
|
||||
// check annotation first
|
||||
if len(pg.Annotations) > 0 {
|
||||
if value, found := pg.Annotations[v1beta1.RevocableZone]; found {
|
||||
if value != "*" {
|
||||
|
|
|
|||
|
|
@ -0,0 +1,108 @@
|
|||
package util
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
|
||||
"k8s.io/client-go/util/workqueue"
|
||||
"k8s.io/klog"
|
||||
"volcano.sh/volcano/pkg/scheduler/api"
|
||||
)
|
||||
|
||||
type PredicateHelper interface {
|
||||
PredicateNodes(task *api.TaskInfo, nodes []*api.NodeInfo, fn api.PredicateFn) ([]*api.NodeInfo, *api.FitErrors)
|
||||
}
|
||||
|
||||
type predicateHelper struct {
|
||||
taskPredicateErrorCache map[string]map[string]error
|
||||
}
|
||||
|
||||
// PredicateNodes returns the specified number of nodes that fit a task
|
||||
func (ph *predicateHelper) PredicateNodes(task *api.TaskInfo, nodes []*api.NodeInfo, fn api.PredicateFn) ([]*api.NodeInfo, *api.FitErrors) {
|
||||
var errorLock sync.RWMutex
|
||||
fe := api.NewFitErrors()
|
||||
|
||||
allNodes := len(nodes)
|
||||
if allNodes == 0 {
|
||||
return make([]*api.NodeInfo, 0), fe
|
||||
}
|
||||
numNodesToFind := CalculateNumOfFeasibleNodesToFind(int32(allNodes))
|
||||
|
||||
//allocate enough space to avoid growing it
|
||||
predicateNodes := make([]*api.NodeInfo, numNodesToFind)
|
||||
|
||||
numFoundNodes := int32(0)
|
||||
processedNodes := int32(0)
|
||||
|
||||
taskGroupid := taskGroupID(task)
|
||||
nodeErrorCache, taskFailedBefore := ph.taskPredicateErrorCache[taskGroupid]
|
||||
if nodeErrorCache == nil {
|
||||
nodeErrorCache = map[string]error{}
|
||||
}
|
||||
|
||||
//create a context with cancellation
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
|
||||
checkNode := func(index int) {
|
||||
// Check the nodes starting from where is left off in the previous scheduling cycle,
|
||||
// to make sure all nodes have the same chance of being examined across pods.
|
||||
node := nodes[(lastProcessedNodeIndex+index)%allNodes]
|
||||
atomic.AddInt32(&processedNodes, 1)
|
||||
klog.V(4).Infof("Considering Task <%v/%v> on node <%v>: <%v> vs. <%v>",
|
||||
task.Namespace, task.Name, node.Name, task.Resreq, node.Idle)
|
||||
|
||||
// Check if the task had "predicate" failure before.
|
||||
// And then check if the task failed to predict on this node before.
|
||||
if taskFailedBefore {
|
||||
errorLock.RLock()
|
||||
errC, ok := nodeErrorCache[node.Name]
|
||||
errorLock.RUnlock()
|
||||
|
||||
if ok {
|
||||
errorLock.Lock()
|
||||
fe.SetNodeError(node.Name, errC)
|
||||
errorLock.Unlock()
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
// TODO (k82cn): Enable eCache for performance improvement.
|
||||
if err := fn(task, node); err != nil {
|
||||
klog.V(3).Infof("Predicates failed for task <%s/%s> on node <%s>: %v",
|
||||
task.Namespace, task.Name, node.Name, err)
|
||||
errorLock.Lock()
|
||||
nodeErrorCache[node.Name] = err
|
||||
ph.taskPredicateErrorCache[taskGroupid] = nodeErrorCache
|
||||
fe.SetNodeError(node.Name, err)
|
||||
errorLock.Unlock()
|
||||
return
|
||||
}
|
||||
|
||||
//check if the number of found nodes is more than the numNodesTofind
|
||||
length := atomic.AddInt32(&numFoundNodes, 1)
|
||||
if length > numNodesToFind {
|
||||
cancel()
|
||||
atomic.AddInt32(&numFoundNodes, -1)
|
||||
} else {
|
||||
predicateNodes[length-1] = node
|
||||
}
|
||||
}
|
||||
|
||||
//workqueue.ParallelizeUntil(context.TODO(), 16, len(nodes), checkNode)
|
||||
workqueue.ParallelizeUntil(ctx, 16, allNodes, checkNode)
|
||||
|
||||
//processedNodes := int(numFoundNodes) + len(filteredNodesStatuses) + len(failedPredicateMap)
|
||||
lastProcessedNodeIndex = (lastProcessedNodeIndex + int(processedNodes)) % allNodes
|
||||
predicateNodes = predicateNodes[:numFoundNodes]
|
||||
return predicateNodes, fe
|
||||
}
|
||||
|
||||
func taskGroupID(task *api.TaskInfo) string {
|
||||
return fmt.Sprintf("%s/%s", task.Job, task.GetTaskSpecKey())
|
||||
}
|
||||
|
||||
func NewPredicateHelper() PredicateHelper {
|
||||
return &predicateHelper{taskPredicateErrorCache: map[string]map[string]error{}}
|
||||
}
|
||||
|
|
@ -23,7 +23,6 @@ import (
|
|||
"math/rand"
|
||||
"sort"
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
|
||||
"k8s.io/client-go/util/workqueue"
|
||||
"k8s.io/klog"
|
||||
|
|
@ -67,65 +66,6 @@ func CalculateNumOfFeasibleNodesToFind(numAllNodes int32) (numNodes int32) {
|
|||
return numNodes
|
||||
}
|
||||
|
||||
// PredicateNodes returns the specified number of nodes that fit a task
|
||||
func PredicateNodes(task *api.TaskInfo, nodes []*api.NodeInfo, fn api.PredicateFn) ([]*api.NodeInfo, *api.FitErrors) {
|
||||
//var workerLock sync.Mutex
|
||||
|
||||
var errorLock sync.Mutex
|
||||
fe := api.NewFitErrors()
|
||||
|
||||
allNodes := len(nodes)
|
||||
if allNodes == 0 {
|
||||
return make([]*api.NodeInfo, 0), fe
|
||||
}
|
||||
numNodesToFind := CalculateNumOfFeasibleNodesToFind(int32(allNodes))
|
||||
|
||||
//allocate enough space to avoid growing it
|
||||
predicateNodes := make([]*api.NodeInfo, numNodesToFind)
|
||||
|
||||
numFoundNodes := int32(0)
|
||||
processedNodes := int32(0)
|
||||
|
||||
//create a context with cancellation
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
|
||||
checkNode := func(index int) {
|
||||
// Check the nodes starting from where is left off in the previous scheduling cycle,
|
||||
// to make sure all nodes have the same chance of being examined across pods.
|
||||
node := nodes[(lastProcessedNodeIndex+index)%allNodes]
|
||||
atomic.AddInt32(&processedNodes, 1)
|
||||
klog.V(4).Infof("Considering Task <%v/%v> on node <%v>: <%v> vs. <%v>",
|
||||
task.Namespace, task.Name, node.Name, task.Resreq, node.Idle)
|
||||
|
||||
// TODO (k82cn): Enable eCache for performance improvement.
|
||||
if err := fn(task, node); err != nil {
|
||||
klog.V(3).Infof("Predicates failed for task <%s/%s> on node <%s>: %v",
|
||||
task.Namespace, task.Name, node.Name, err)
|
||||
errorLock.Lock()
|
||||
fe.SetNodeError(node.Name, err)
|
||||
errorLock.Unlock()
|
||||
return
|
||||
}
|
||||
|
||||
//check if the number of found nodes is more than the numNodesTofind
|
||||
length := atomic.AddInt32(&numFoundNodes, 1)
|
||||
if length > numNodesToFind {
|
||||
cancel()
|
||||
atomic.AddInt32(&numFoundNodes, -1)
|
||||
} else {
|
||||
predicateNodes[length-1] = node
|
||||
}
|
||||
}
|
||||
|
||||
//workqueue.ParallelizeUntil(context.TODO(), 16, len(nodes), checkNode)
|
||||
workqueue.ParallelizeUntil(ctx, 16, allNodes, checkNode)
|
||||
|
||||
//processedNodes := int(numFoundNodes) + len(filteredNodesStatuses) + len(failedPredicateMap)
|
||||
lastProcessedNodeIndex = (lastProcessedNodeIndex + int(processedNodes)) % allNodes
|
||||
predicateNodes = predicateNodes[:numFoundNodes]
|
||||
return predicateNodes, fe
|
||||
}
|
||||
|
||||
// PrioritizeNodes returns a map whose key is node's score and value are corresponding nodes
|
||||
func PrioritizeNodes(task *api.TaskInfo, nodes []*api.NodeInfo, batchFn api.BatchNodeOrderFn, mapFn api.NodeOrderMapFn, reduceFn api.NodeOrderReduceFn) map[float64][]*api.NodeInfo {
|
||||
pluginNodeScoreMap := map[string]k8sframework.NodeScoreList{}
|
||||
|
|
|
|||
Loading…
Reference in New Issue