task level advanced scheduling policy

Signed-off-by: hwdef <hwdefcom@outlook.com>
This commit is contained in:
hwdef 2021-11-15 18:25:51 +08:00
parent 8a7e95e302
commit f3e928db2f
20 changed files with 557 additions and 50 deletions

View File

@ -115,6 +115,23 @@ spec:
items: items:
description: TaskSpec specifies the task specification of Job. description: TaskSpec specifies the task specification of Job.
properties: properties:
dependsOn:
description: Specifies the tasks that this task depends on.
properties:
iteration:
description: This field specifies that when there are multiple
dependent tasks, as long as one task becomes the specified
state, the task scheduling is triggered or all tasks must
be changed to the specified state to trigger the task
scheduling
type: string
name:
description: Indicates the name of the tasks that this task
depends on, which can depend on multiple tasks
items:
type: string
type: array
type: object
maxRetry: maxRetry:
description: Specifies the maximum number of retries before description: Specifies the maximum number of retries before
marking this Task failed. Defaults to 3. marking this Task failed. Defaults to 3.

View File

@ -115,6 +115,22 @@ spec:
items: items:
description: TaskSpec specifies the task specification of Job. description: TaskSpec specifies the task specification of Job.
properties: properties:
dependsOn:
description: Specifies the tasks that this task depends on.
properties:
iteration:
description: This field specifies that when there are multiple
dependent tasks, as long as one task becomes the specified
state, the task scheduling is triggered or all tasks must
be changed to the specified state to trigger the task scheduling
type: string
name:
description: Indicates the name of the tasks that this task
depends on, which can depend on multiple tasks
items:
type: string
type: array
type: object
maxRetry: maxRetry:
description: Specifies the maximum number of retries before marking description: Specifies the maximum number of retries before marking
this Task failed. Defaults to 3. this Task failed. Defaults to 3.

View File

@ -157,7 +157,7 @@ And we can generate a `tfClusterSpec` for each pod in the job, here is an exampl
// generateTFConfig generate tfClusterSpec by a given pod and job // generateTFConfig generate tfClusterSpec by a given pod and job
func (tp *tensorflowPlugin) generateTFConfig(pod *v1.Pod, job *batch.Job) (tfClusterSpec, error) { func (tp *tensorflowPlugin) generateTFConfig(pod *v1.Pod, job *batch.Job) (tfClusterSpec, error) {
// get task index by pod // get task index by pod
index, err := strconv.Atoi(helpers.GetTaskIndex(pod)) index, err := strconv.Atoi(helpers.GetPodIndexUnderTask(pod))
if err != nil { if err != nil {
return tfClusterSpec{}, err return tfClusterSpec{}, err
} }

2
go.mod
View File

@ -26,7 +26,7 @@ require (
k8s.io/kubernetes v1.19.15 k8s.io/kubernetes v1.19.15
sigs.k8s.io/yaml v1.2.0 sigs.k8s.io/yaml v1.2.0
stathat.com/c/consistent v1.0.0 stathat.com/c/consistent v1.0.0
volcano.sh/apis v0.0.0-20210923020136-eb779276d17e volcano.sh/apis v0.0.0-20211109104455-7e8d893a9f66
) )
replace ( replace (

4
go.sum
View File

@ -1143,5 +1143,5 @@ sigs.k8s.io/yaml v1.2.0/go.mod h1:yfXDCHCao9+ENCvLSE62v9VSji2MKu5jeNfTrofGhJc=
stathat.com/c/consistent v1.0.0 h1:ezyc51EGcRPJUxfHGSgJjWzJdj3NiMU9pNfLNGiXV0c= stathat.com/c/consistent v1.0.0 h1:ezyc51EGcRPJUxfHGSgJjWzJdj3NiMU9pNfLNGiXV0c=
stathat.com/c/consistent v1.0.0/go.mod h1:QkzMWzcbB+yQBL2AttO6sgsQS/JSTapcDISJalmCDS0= stathat.com/c/consistent v1.0.0/go.mod h1:QkzMWzcbB+yQBL2AttO6sgsQS/JSTapcDISJalmCDS0=
vbom.ml/util v0.0.0-20160121211510-db5cfe13f5cc/go.mod h1:so/NYdZXCz+E3ZpW0uAoCj6uzU2+8OWDFv/HxUSs7kI= vbom.ml/util v0.0.0-20160121211510-db5cfe13f5cc/go.mod h1:so/NYdZXCz+E3ZpW0uAoCj6uzU2+8OWDFv/HxUSs7kI=
volcano.sh/apis v0.0.0-20210923020136-eb779276d17e h1:F89sQUZt3YjQ6EYyeu1Nqecruc7V2/2N8Q1CiSLlGYE= volcano.sh/apis v0.0.0-20211109104455-7e8d893a9f66 h1:vmCou3lcAeNSmyMXBVlfQnJYsQvhfS3mMv2FJrtACyg=
volcano.sh/apis v0.0.0-20210923020136-eb779276d17e/go.mod h1:UaeJ/s5Hyd+ZhFLc+Kw9YlgM8gRZ/5OzXqHa0yKOoXY= volcano.sh/apis v0.0.0-20211109104455-7e8d893a9f66/go.mod h1:UaeJ/s5Hyd+ZhFLc+Kw9YlgM8gRZ/5OzXqHa0yKOoXY=

View File

@ -113,6 +113,23 @@ spec:
items: items:
description: TaskSpec specifies the task specification of Job. description: TaskSpec specifies the task specification of Job.
properties: properties:
dependsOn:
description: Specifies the tasks that this task depends on.
properties:
iteration:
description: This field specifies that when there are multiple
dependent tasks, as long as one task becomes the specified
state, the task scheduling is triggered or all tasks must
be changed to the specified state to trigger the task
scheduling
type: string
name:
description: Indicates the name of the tasks that this task
depends on, which can depend on multiple tasks
items:
type: string
type: array
type: object
maxRetry: maxRetry:
description: Specifies the maximum number of retries before description: Specifies the maximum number of retries before
marking this Task failed. Defaults to 3. marking this Task failed. Defaults to 3.

View File

@ -113,6 +113,22 @@ spec:
items: items:
description: TaskSpec specifies the task specification of Job. description: TaskSpec specifies the task specification of Job.
properties: properties:
dependsOn:
description: Specifies the tasks that this task depends on.
properties:
iteration:
description: This field specifies that when there are multiple
dependent tasks, as long as one task becomes the specified
state, the task scheduling is triggered or all tasks must
be changed to the specified state to trigger the task scheduling
type: string
name:
description: Indicates the name of the tasks that this task
depends on, which can depend on multiple tasks
items:
type: string
type: array
type: object
maxRetry: maxRetry:
description: Specifies the maximum number of retries before marking description: Specifies the maximum number of retries before marking
this Task failed. Defaults to 3. this Task failed. Defaults to 3.

View File

@ -298,6 +298,23 @@ spec:
items: items:
description: TaskSpec specifies the task specification of Job. description: TaskSpec specifies the task specification of Job.
properties: properties:
dependsOn:
description: Specifies the tasks that this task depends on.
properties:
iteration:
description: This field specifies that when there are multiple
dependent tasks, as long as one task becomes the specified
state, the task scheduling is triggered or all tasks must
be changed to the specified state to trigger the task
scheduling
type: string
name:
description: Indicates the name of the tasks that this task
depends on, which can depend on multiple tasks
items:
type: string
type: array
type: object
maxRetry: maxRetry:
description: Specifies the maximum number of retries before description: Specifies the maximum number of retries before
marking this Task failed. Defaults to 3. marking this Task failed. Defaults to 3.

File diff suppressed because one or more lines are too long

View File

@ -37,8 +37,8 @@ const (
persistentVolumeClaimFmt = "%s-pvc-%s" persistentVolumeClaimFmt = "%s-pvc-%s"
) )
// GetTaskIndex returns task Index. // GetPodIndexUnderTask returns task Index.
func GetTaskIndex(pod *v1.Pod) string { func GetPodIndexUnderTask(pod *v1.Pod) string {
num := strings.Split(pod.Name, "-") num := strings.Split(pod.Name, "-")
if len(num) >= 3 { if len(num) >= 3 {
return num[len(num)-1] return num[len(num)-1]
@ -49,8 +49,8 @@ func GetTaskIndex(pod *v1.Pod) string {
// ComparePodByIndex by pod index // ComparePodByIndex by pod index
func CompareTask(lv, rv *api.TaskInfo) bool { func CompareTask(lv, rv *api.TaskInfo) bool {
lStr := GetTaskIndex(lv.Pod) lStr := GetPodIndexUnderTask(lv.Pod)
rStr := GetTaskIndex(rv.Pod) rStr := GetPodIndexUnderTask(rv.Pod)
lIndex, lErr := strconv.Atoi(lStr) lIndex, lErr := strconv.Atoi(lStr)
rIndex, rErr := strconv.Atoi(rStr) rIndex, rErr := strconv.Atoi(rStr)
if lErr != nil || rErr != nil || lIndex == rIndex { if lErr != nil || rErr != nil || lIndex == rIndex {
@ -119,3 +119,27 @@ func GenPVCName(jobName string) string {
func GetJobKeyByReq(req *apis.Request) string { func GetJobKeyByReq(req *apis.Request) string {
return fmt.Sprintf("%s/%s", req.Namespace, req.JobName) return fmt.Sprintf("%s/%s", req.Namespace, req.JobName)
} }
// GetTasklndexUnderJob return index of the task in the job.
func GetTasklndexUnderJob(taskName string, job *batch.Job) int {
for index, task := range job.Spec.Tasks {
if task.Name == taskName {
return index
}
}
return -1
}
// GetPodsNameUnderTask return names of all pods in the task.
func GetPodsNameUnderTask(taskName string, job *batch.Job) []string {
var res []string
for _, task := range job.Spec.Tasks {
if task.Name == taskName {
for index := 0; index < int(task.Replicas); index++ {
res = append(res, MakePodName(job.Name, taskName, index))
}
break
}
}
return res
}

View File

@ -22,6 +22,8 @@ import (
v1 "k8s.io/api/core/v1" v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
batch "volcano.sh/apis/pkg/apis/batch/v1alpha1"
"volcano.sh/volcano/pkg/scheduler/api" "volcano.sh/volcano/pkg/scheduler/api"
) )
@ -73,3 +75,250 @@ func generateTaskInfo(name string, createTime time.Time) *api.TaskInfo {
Pod: pod, Pod: pod,
} }
} }
func TestGetTasklndexUnderJobFunc(t *testing.T) {
namespace := "test"
testCases := []struct {
Name string
TaskName string
Job *batch.Job
Expect int
}{
{
Name: "GetTasklndexUnderJob1",
TaskName: "task1",
Job: &batch.Job{
ObjectMeta: metav1.ObjectMeta{
Name: "job1",
Namespace: namespace,
},
Spec: batch.JobSpec{
Tasks: []batch.TaskSpec{
{
Name: "task1",
Replicas: 2,
Template: v1.PodTemplateSpec{
ObjectMeta: metav1.ObjectMeta{
Name: "pods",
Namespace: namespace,
},
Spec: v1.PodSpec{
Containers: []v1.Container{
{
Name: "Containers",
},
},
},
},
},
{
Name: "task2",
Replicas: 2,
Template: v1.PodTemplateSpec{
ObjectMeta: metav1.ObjectMeta{
Name: "pods",
Namespace: namespace,
},
Spec: v1.PodSpec{
Containers: []v1.Container{
{
Name: "Containers",
},
},
},
},
},
},
},
},
Expect: 0,
},
{
Name: "GetTasklndexUnderJob2",
TaskName: "task2",
Job: &batch.Job{
ObjectMeta: metav1.ObjectMeta{
Name: "job1",
Namespace: namespace,
},
Spec: batch.JobSpec{
Tasks: []batch.TaskSpec{
{
Name: "task1",
Replicas: 2,
Template: v1.PodTemplateSpec{
ObjectMeta: metav1.ObjectMeta{
Name: "pods",
Namespace: namespace,
},
Spec: v1.PodSpec{
Containers: []v1.Container{
{
Name: "Containers",
},
},
},
},
},
{
Name: "task2",
Replicas: 2,
Template: v1.PodTemplateSpec{
ObjectMeta: metav1.ObjectMeta{
Name: "pods",
Namespace: namespace,
},
Spec: v1.PodSpec{
Containers: []v1.Container{
{
Name: "Containers",
},
},
},
},
},
},
},
},
Expect: 1,
},
}
for _, testCase := range testCases {
t.Run(testCase.Name, func(t *testing.T) {
index := GetTasklndexUnderJob(testCase.TaskName, testCase.Job)
if index != testCase.Expect {
t.Errorf("GetTasklndexUnderJobFunc(%s) = %d, expect %d", testCase.TaskName, index, testCase.Expect)
}
})
}
}
func TestGetPodsNameUnderTaskFunc(t *testing.T) {
namespace := "test"
testCases := []struct {
Name string
TaskName string
Job *batch.Job
Expect []string
}{
{
Name: "GetTasklndexUnderJob1",
TaskName: "task1",
Job: &batch.Job{
ObjectMeta: metav1.ObjectMeta{
Name: "job1",
Namespace: namespace,
},
Spec: batch.JobSpec{
Tasks: []batch.TaskSpec{
{
Name: "task1",
Replicas: 2,
Template: v1.PodTemplateSpec{
ObjectMeta: metav1.ObjectMeta{
Name: "pods1",
Namespace: namespace,
},
Spec: v1.PodSpec{
Containers: []v1.Container{
{
Name: "Containers",
},
},
},
},
},
{
Name: "task2",
Replicas: 2,
Template: v1.PodTemplateSpec{
ObjectMeta: metav1.ObjectMeta{
Name: "pods2",
Namespace: namespace,
},
Spec: v1.PodSpec{
Containers: []v1.Container{
{
Name: "Containers",
},
},
},
},
},
},
},
},
Expect: []string{"job1-task1-0", "job1-task1-1"},
},
{
Name: "GetTasklndexUnderJob2",
TaskName: "task2",
Job: &batch.Job{
ObjectMeta: metav1.ObjectMeta{
Name: "job1",
Namespace: namespace,
},
Spec: batch.JobSpec{
Tasks: []batch.TaskSpec{
{
Name: "task1",
Replicas: 2,
Template: v1.PodTemplateSpec{
ObjectMeta: metav1.ObjectMeta{
Name: "pods1",
Namespace: namespace,
},
Spec: v1.PodSpec{
Containers: []v1.Container{
{
Name: "Containers",
},
},
},
},
},
{
Name: "task2",
Replicas: 2,
Template: v1.PodTemplateSpec{
ObjectMeta: metav1.ObjectMeta{
Name: "pods2",
Namespace: namespace,
},
Spec: v1.PodSpec{
Containers: []v1.Container{
{
Name: "Containers",
},
},
},
},
},
},
},
},
Expect: []string{"job1-task2-0", "job1-task2-1"},
},
}
for _, testCase := range testCases {
t.Run(testCase.Name, func(t *testing.T) {
pods := GetPodsNameUnderTask(testCase.TaskName, testCase.Job)
for _, pod := range pods {
if !contains(testCase.Expect, pod) {
t.Errorf("Test case failed: %s, expect: %v, got: %v", testCase.Name, testCase.Expect, pods)
}
}
})
}
}
func contains(s []string, e string) bool {
for _, a := range s {
if a == e {
return true
}
}
return false
}

View File

@ -28,6 +28,7 @@ import (
v1 "k8s.io/api/core/v1" v1 "k8s.io/api/core/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors" apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/klog" "k8s.io/klog"
batch "volcano.sh/apis/pkg/apis/batch/v1alpha1" batch "volcano.sh/apis/pkg/apis/batch/v1alpha1"
@ -313,7 +314,7 @@ func (cc *jobcontroller) syncJob(jobInfo *apis.JobInfo, updateStatus state.Updat
var running, pending, terminating, succeeded, failed, unknown int32 var running, pending, terminating, succeeded, failed, unknown int32
taskStatusCount := make(map[string]batch.TaskState) taskStatusCount := make(map[string]batch.TaskState)
var podToCreate []*v1.Pod podToCreate := make(map[string][]*v1.Pod)
var podToDelete []*v1.Pod var podToDelete []*v1.Pod
var creationErrs []error var creationErrs []error
var deletionErrs []error var deletionErrs []error
@ -325,6 +326,8 @@ func (cc *jobcontroller) syncJob(jobInfo *apis.JobInfo, updateStatus state.Updat
*container = append(*container, err) *container = append(*container, err)
} }
waitCreationGroup := sync.WaitGroup{}
for _, ts := range job.Spec.Tasks { for _, ts := range job.Spec.Tasks {
ts.Template.Name = ts.Name ts.Template.Name = ts.Name
tc := ts.Template.DeepCopy() tc := ts.Template.DeepCopy()
@ -335,6 +338,7 @@ func (cc *jobcontroller) syncJob(jobInfo *apis.JobInfo, updateStatus state.Updat
pods = map[string]*v1.Pod{} pods = map[string]*v1.Pod{}
} }
var podToCreateEachTask []*v1.Pod
for i := 0; i < int(ts.Replicas); i++ { for i := 0; i < int(ts.Replicas); i++ {
podName := fmt.Sprintf(jobhelpers.PodNameFmt, job.Name, name, i) podName := fmt.Sprintf(jobhelpers.PodNameFmt, job.Name, name, i)
if pod, found := pods[podName]; !found { if pod, found := pods[podName]; !found {
@ -342,7 +346,8 @@ func (cc *jobcontroller) syncJob(jobInfo *apis.JobInfo, updateStatus state.Updat
if err := cc.pluginOnPodCreate(job, newPod); err != nil { if err := cc.pluginOnPodCreate(job, newPod); err != nil {
return err return err
} }
podToCreate = append(podToCreate, newPod) podToCreateEachTask = append(podToCreateEachTask, newPod)
waitCreationGroup.Add(1)
} else { } else {
delete(pods, podName) delete(pods, podName)
if pod.DeletionTimestamp != nil { if pod.DeletionTimestamp != nil {
@ -355,33 +360,44 @@ func (cc *jobcontroller) syncJob(jobInfo *apis.JobInfo, updateStatus state.Updat
calcPodStatus(pod, taskStatusCount) calcPodStatus(pod, taskStatusCount)
} }
} }
podToCreate[ts.Name] = podToCreateEachTask
for _, pod := range pods { for _, pod := range pods {
podToDelete = append(podToDelete, pod) podToDelete = append(podToDelete, pod)
} }
} }
waitCreationGroup := sync.WaitGroup{} for taskName, podToCreateEachTask := range podToCreate {
waitCreationGroup.Add(len(podToCreate)) if len(podToCreateEachTask) == 0 {
for _, pod := range podToCreate { continue
go func(pod *v1.Pod) { }
defer waitCreationGroup.Done() go func(taskName string, podToCreateEachTask []*v1.Pod) {
newPod, err := cc.kubeClient.CoreV1().Pods(pod.Namespace).Create(context.TODO(), pod, metav1.CreateOptions{}) taskIndex := jobhelpers.GetTasklndexUnderJob(taskName, job)
if err != nil && !apierrors.IsAlreadyExists(err) { if job.Spec.Tasks[taskIndex].DependsOn != nil {
// Failed to create Pod, waitCreationGroup a moment and then create it again cc.waitDependsOnTaskMeetCondition(taskName, podToCreateEachTask, job)
// This is to ensure all podsMap under the same Job created
// So gang-scheduling could schedule the Job successfully
klog.Errorf("Failed to create pod %s for Job %s, err %#v",
pod.Name, job.Name, err)
appendError(&creationErrs, fmt.Errorf("failed to create pod %s, err: %#v", pod.Name, err))
} else {
classifyAndAddUpPodBaseOnPhase(newPod, &pending, &running, &succeeded, &failed, &unknown)
calcPodStatus(pod, taskStatusCount)
klog.V(3).Infof("Created Task <%s> of Job <%s/%s>",
pod.Name, job.Namespace, job.Name)
} }
}(pod)
for _, pod := range podToCreateEachTask {
go func(pod *v1.Pod) {
defer waitCreationGroup.Done()
newPod, err := cc.kubeClient.CoreV1().Pods(pod.Namespace).Create(context.TODO(), pod, metav1.CreateOptions{})
if err != nil && !apierrors.IsAlreadyExists(err) {
// Failed to create Pod, waitCreationGroup a moment and then create it again
// This is to ensure all podsMap under the same Job created
// So gang-scheduling could schedule the Job successfully
klog.Errorf("Failed to create pod %s for Job %s, err %#v",
pod.Name, job.Name, err)
appendError(&creationErrs, fmt.Errorf("failed to create pod %s, err: %#v", pod.Name, err))
} else {
classifyAndAddUpPodBaseOnPhase(newPod, &pending, &running, &succeeded, &failed, &unknown)
calcPodStatus(pod, taskStatusCount)
klog.V(5).Infof("Created Task <%s> of Job <%s/%s>",
pod.Name, job.Namespace, job.Name)
}
}(pod)
}
}(taskName, podToCreateEachTask)
} }
waitCreationGroup.Wait() waitCreationGroup.Wait()
if len(creationErrs) != 0 { if len(creationErrs) != 0 {
@ -458,6 +474,69 @@ func (cc *jobcontroller) syncJob(jobInfo *apis.JobInfo, updateStatus state.Updat
return nil return nil
} }
func (cc *jobcontroller) waitDependsOnTaskMeetCondition(taskName string, podToCreateEachTask []*v1.Pod, job *batch.Job) {
taskIndex := jobhelpers.GetTasklndexUnderJob(taskName, job)
if job.Spec.Tasks[taskIndex].DependsOn != nil {
dependsOn := *job.Spec.Tasks[taskIndex].DependsOn
if len(dependsOn.Name) > 1 && dependsOn.Iteration == batch.IterationAny {
wait.PollInfinite(100*time.Millisecond, func() (bool, error) {
for _, task := range dependsOn.Name {
if cc.isDependsOnPodsReady(task, job) {
return true, nil
}
}
return false, nil
})
} else {
for _, dependsOnTask := range dependsOn.Name {
wait.PollInfinite(100*time.Millisecond, func() (bool, error) {
if cc.isDependsOnPodsReady(dependsOnTask, job) {
return true, nil
}
return false, nil
})
}
}
}
}
func (cc *jobcontroller) isDependsOnPodsReady(task string, job *batch.Job) bool {
dependsOnPods := jobhelpers.GetPodsNameUnderTask(task, job)
dependsOnTaskIndex := jobhelpers.GetTasklndexUnderJob(task, job)
runningPodCount := 0
for _, podName := range dependsOnPods {
pod, err := cc.podLister.Pods(job.Namespace).Get(podName)
if err != nil {
klog.Errorf("Failed to get pod %v/%v %v", job.Namespace, podName, err)
continue
}
if pod.Status.Phase != v1.PodRunning && pod.Status.Phase != v1.PodSucceeded {
klog.V(5).Infof("Sequential state, pod %v/%v of depends on tasks is not running", pod.Namespace, pod.Name)
continue
}
allContainerReady := true
for _, containerStatus := range pod.Status.ContainerStatuses {
if !containerStatus.Ready {
allContainerReady = false
break
}
}
if allContainerReady {
runningPodCount++
}
}
dependsOnTaskMinReplicas := job.Spec.Tasks[dependsOnTaskIndex].MinAvailable
if dependsOnTaskMinReplicas != nil {
if runningPodCount < int(*dependsOnTaskMinReplicas) {
klog.V(5).Infof("In a depends on startup state, there are already %d pods running, which is less than the minimum number of runs", runningPodCount)
return false
}
}
return true
}
func (cc *jobcontroller) createJobIOIfNotExist(job *batch.Job) (*batch.Job, error) { func (cc *jobcontroller) createJobIOIfNotExist(job *batch.Job) (*batch.Job, error) {
// If PVC does not exist, create them for Job. // If PVC does not exist, create them for Job.
var needUpdate bool var needUpdate bool

View File

@ -116,7 +116,7 @@ func (tp *tensorflowPlugin) OnJobUpdate(job *batch.Job) error {
} }
func (tp *tensorflowPlugin) generateTFClusterSpec(pod *v1.Pod, job *batch.Job) (tfClusterSpec, error) { func (tp *tensorflowPlugin) generateTFClusterSpec(pod *v1.Pod, job *batch.Job) (tfClusterSpec, error) {
index, err := strconv.Atoi(jobhelpers.GetTaskIndex(pod)) index, err := strconv.Atoi(jobhelpers.GetPodIndexUnderTask(pod))
if err != nil { if err != nil {
return tfClusterSpec{}, err return tfClusterSpec{}, err
} }

View File

@ -43,7 +43,7 @@ func (ep *envPlugin) Name() string {
} }
func (ep *envPlugin) OnPodCreate(pod *v1.Pod, job *batch.Job) error { func (ep *envPlugin) OnPodCreate(pod *v1.Pod, job *batch.Job) error {
index := jobhelpers.GetTaskIndex(pod) index := jobhelpers.GetPodIndexUnderTask(pod)
// add VK_TASK_INDEX and VC_TASK_INDEX env to each container // add VK_TASK_INDEX and VC_TASK_INDEX env to each container
for i := range pod.Spec.Containers { for i := range pod.Spec.Containers {

2
vendor/modules.txt vendored
View File

@ -855,7 +855,7 @@ sigs.k8s.io/yaml
# stathat.com/c/consistent v1.0.0 # stathat.com/c/consistent v1.0.0
## explicit ## explicit
stathat.com/c/consistent stathat.com/c/consistent
# volcano.sh/apis v0.0.0-20210923020136-eb779276d17e # volcano.sh/apis v0.0.0-20211109104455-7e8d893a9f66
## explicit ## explicit
volcano.sh/apis/pkg/apis/batch/v1alpha1 volcano.sh/apis/pkg/apis/batch/v1alpha1
volcano.sh/apis/pkg/apis/bus/v1alpha1 volcano.sh/apis/pkg/apis/bus/v1alpha1

View File

@ -74,16 +74,16 @@ type JobSpec struct {
// Running Estimate is a user running duration estimate for the job // Running Estimate is a user running duration estimate for the job
// Default to nil // Default to nil
RunningEstimate *metav1.Duration `json:"runningEstimate,omitempty" protobuf:"bytes,4,opt,name=runningEstimate"` RunningEstimate *metav1.Duration `json:"runningEstimate,omitempty" protobuf:"bytes,7,opt,name=runningEstimate"`
//Specifies the queue that will be used in the scheduler, "default" queue is used this leaves empty. //Specifies the queue that will be used in the scheduler, "default" queue is used this leaves empty.
// +optional // +optional
Queue string `json:"queue,omitempty" protobuf:"bytes,7,opt,name=queue"` Queue string `json:"queue,omitempty" protobuf:"bytes,8,opt,name=queue"`
// Specifies the maximum number of retries before marking this Job failed. // Specifies the maximum number of retries before marking this Job failed.
// Defaults to 3. // Defaults to 3.
// +optional // +optional
MaxRetry int32 `json:"maxRetry,omitempty" protobuf:"bytes,8,opt,name=maxRetry"` MaxRetry int32 `json:"maxRetry,omitempty" protobuf:"bytes,9,opt,name=maxRetry"`
// ttlSecondsAfterFinished limits the lifetime of a Job that has finished // ttlSecondsAfterFinished limits the lifetime of a Job that has finished
// execution (either Completed or Failed). If this field is set, // execution (either Completed or Failed). If this field is set,
@ -92,16 +92,16 @@ type JobSpec struct {
// the Job won't be automatically deleted. If this field is set to zero, // the Job won't be automatically deleted. If this field is set to zero,
// the Job becomes eligible to be deleted immediately after it finishes. // the Job becomes eligible to be deleted immediately after it finishes.
// +optional // +optional
TTLSecondsAfterFinished *int32 `json:"ttlSecondsAfterFinished,omitempty" protobuf:"varint,9,opt,name=ttlSecondsAfterFinished"` TTLSecondsAfterFinished *int32 `json:"ttlSecondsAfterFinished,omitempty" protobuf:"varint,10,opt,name=ttlSecondsAfterFinished"`
// If specified, indicates the job's priority. // If specified, indicates the job's priority.
// +optional // +optional
PriorityClassName string `json:"priorityClassName,omitempty" protobuf:"bytes,10,opt,name=priorityClassName"` PriorityClassName string `json:"priorityClassName,omitempty" protobuf:"bytes,11,opt,name=priorityClassName"`
// The minimal success pods to run for this Job // The minimal success pods to run for this Job
// +kubebuilder:validation:Minimum=1 // +kubebuilder:validation:Minimum=1
// +optional // +optional
MinSuccess *int32 `json:"minSuccess,omitempty" protobuf:"varint,11,opt,name=minSuccess"` MinSuccess *int32 `json:"minSuccess,omitempty" protobuf:"varint,12,opt,name=minSuccess"`
} }
// VolumeSpec defines the specification of Volume, e.g. PVC. // VolumeSpec defines the specification of Volume, e.g. PVC.
@ -161,12 +161,12 @@ type LifecyclePolicy struct {
// according to this code. // according to this code.
// Note: only one of `Event` or `ExitCode` can be specified. // Note: only one of `Event` or `ExitCode` can be specified.
// +optional // +optional
ExitCode *int32 `json:"exitCode,omitempty" protobuf:"bytes,5,opt,name=exitCode"` ExitCode *int32 `json:"exitCode,omitempty" protobuf:"bytes,4,opt,name=exitCode"`
// Timeout is the grace period for controller to take actions. // Timeout is the grace period for controller to take actions.
// Default to nil (take action immediately). // Default to nil (take action immediately).
// +optional // +optional
Timeout *metav1.Duration `json:"timeout,omitempty" protobuf:"bytes,4,opt,name=timeout"` Timeout *metav1.Duration `json:"timeout,omitempty" protobuf:"bytes,5,opt,name=timeout"`
} }
type NumaPolicy string type NumaPolicy string
@ -191,25 +191,29 @@ type TaskSpec struct {
// The minimal available pods to run for this Task // The minimal available pods to run for this Task
// Defaults to the task replicas // Defaults to the task replicas
// +optional // +optional
MinAvailable *int32 `json:"minAvailable,omitempty" protobuf:"bytes,2,opt,name=minAvailable"` MinAvailable *int32 `json:"minAvailable,omitempty" protobuf:"bytes,3,opt,name=minAvailable"`
// Specifies the pod that will be created for this TaskSpec // Specifies the pod that will be created for this TaskSpec
// when executing a Job // when executing a Job
// +optional // +optional
Template v1.PodTemplateSpec `json:"template,omitempty" protobuf:"bytes,3,opt,name=template"` Template v1.PodTemplateSpec `json:"template,omitempty" protobuf:"bytes,4,opt,name=template"`
// Specifies the lifecycle of task // Specifies the lifecycle of task
// +optional // +optional
Policies []LifecyclePolicy `json:"policies,omitempty" protobuf:"bytes,4,opt,name=policies"` Policies []LifecyclePolicy `json:"policies,omitempty" protobuf:"bytes,5,opt,name=policies"`
// Specifies the topology policy of task // Specifies the topology policy of task
// +optional // +optional
TopologyPolicy NumaPolicy `json:"topologyPolicy,omitempty" protobuf:"bytes,5,opt,name=topologyPolicy"` TopologyPolicy NumaPolicy `json:"topologyPolicy,omitempty" protobuf:"bytes,6,opt,name=topologyPolicy"`
// Specifies the maximum number of retries before marking this Task failed. // Specifies the maximum number of retries before marking this Task failed.
// Defaults to 3. // Defaults to 3.
// +optional // +optional
MaxRetry int32 `json:"maxRetry,omitempty" protobuf:"bytes,5,opt,name=maxRetry"` MaxRetry int32 `json:"maxRetry,omitempty" protobuf:"bytes,7,opt,name=maxRetry"`
// Specifies the tasks that this task depends on.
// +optional
DependsOn *DependsOn `json:"dependsOn,omitempty" protobuf:"bytes,8,opt,name=dependsOn"`
} }
// JobPhase defines the phase of the job. // JobPhase defines the phase of the job.
@ -312,17 +316,17 @@ type JobStatus struct {
// The job running duration is the length of time from job running to complete. // The job running duration is the length of time from job running to complete.
// +optional // +optional
RunningDuration *metav1.Duration `json:"runningDuration,omitempty" protobuf:"bytes,4,opt,name=runningDuration"` RunningDuration *metav1.Duration `json:"runningDuration,omitempty" protobuf:"bytes,11,opt,name=runningDuration"`
// The resources that controlled by this job, e.g. Service, ConfigMap // The resources that controlled by this job, e.g. Service, ConfigMap
// +optional // +optional
ControlledResources map[string]string `json:"controlledResources,omitempty" protobuf:"bytes,11,opt,name=controlledResources"` ControlledResources map[string]string `json:"controlledResources,omitempty" protobuf:"bytes,12,opt,name=controlledResources"`
// Which conditions caused the current job state. // Which conditions caused the current job state.
// +optional // +optional
// +patchMergeKey=status // +patchMergeKey=status
// +patchStrategy=merge // +patchStrategy=merge
Conditions []JobCondition `json:"conditions,omitempty" patchStrategy:"merge" patchMergeKey:"status" protobuf:"bytes,12,rep,name=conditions"` Conditions []JobCondition `json:"conditions,omitempty" patchStrategy:"merge" patchMergeKey:"status" protobuf:"bytes,13,rep,name=conditions"`
} }
// +k8s:deepcopy-gen:interfaces=k8s.io/apimachinery/pkg/runtime.Object // +k8s:deepcopy-gen:interfaces=k8s.io/apimachinery/pkg/runtime.Object
@ -344,3 +348,31 @@ type JobCondition struct {
// +optional // +optional
LastTransitionTime *metav1.Time `json:"lastTransitionTime,omitempty" protobuf:"bytes,2,opt,name=lastTransitionTime"` LastTransitionTime *metav1.Time `json:"lastTransitionTime,omitempty" protobuf:"bytes,2,opt,name=lastTransitionTime"`
} }
// Iteration defines the phase of the iteration.
type Iteration string
const (
// Indicates that when there are multiple tasks,
// as long as one task becomes the specified state,
// the task scheduling will be triggered
IterationAny Iteration = "any"
// Indicates that when there are multiple tasks,
// all tasks must become the specified state,
// the task scheduling will be triggered
IterationAll Iteration = "all"
)
// DependsOn represents the tasks that this task depends on and their dependencies
type DependsOn struct {
// Indicates the name of the tasks that this task depends on,
// which can depend on multiple tasks
// +optional
Name []string `json:"name,omitempty" protobuf:"bytes,1,opt,name=name"`
// This field specifies that when there are multiple dependent tasks,
// as long as one task becomes the specified state,
// the task scheduling is triggered or
// all tasks must be changed to the specified state to trigger the task scheduling
// +optional
Iteration Iteration `json:"iteration,omitempty" protobuf:"bytes,2,opt,name=iteration"`
}

View File

@ -1,3 +1,4 @@
//go:build !ignore_autogenerated
// +build !ignore_autogenerated // +build !ignore_autogenerated
/* /*
@ -26,6 +27,27 @@ import (
busv1alpha1 "volcano.sh/apis/pkg/apis/bus/v1alpha1" busv1alpha1 "volcano.sh/apis/pkg/apis/bus/v1alpha1"
) )
// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil.
func (in *DependsOn) DeepCopyInto(out *DependsOn) {
*out = *in
if in.Name != nil {
in, out := &in.Name, &out.Name
*out = make([]string, len(*in))
copy(*out, *in)
}
return
}
// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new DependsOn.
func (in *DependsOn) DeepCopy() *DependsOn {
if in == nil {
return nil
}
out := new(DependsOn)
in.DeepCopyInto(out)
return out
}
// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil.
func (in *Job) DeepCopyInto(out *Job) { func (in *Job) DeepCopyInto(out *Job) {
*out = *in *out = *in
@ -281,6 +303,11 @@ func (in *TaskSpec) DeepCopyInto(out *TaskSpec) {
(*in)[i].DeepCopyInto(&(*out)[i]) (*in)[i].DeepCopyInto(&(*out)[i])
} }
} }
if in.DependsOn != nil {
in, out := &in.DependsOn, &out.DependsOn
*out = new(DependsOn)
(*in).DeepCopyInto(*out)
}
return return
} }

View File

@ -1,3 +1,4 @@
//go:build !ignore_autogenerated
// +build !ignore_autogenerated // +build !ignore_autogenerated
/* /*

View File

@ -1,3 +1,4 @@
//go:build !ignore_autogenerated
// +build !ignore_autogenerated // +build !ignore_autogenerated
/* /*

View File

@ -1,3 +1,4 @@
//go:build !ignore_autogenerated
// +build !ignore_autogenerated // +build !ignore_autogenerated
/* /*