Added Job error handling.

Signed-off-by: Klaus Ma <mada3@huawei.com>
This commit is contained in:
Klaus Ma 2019-01-19 08:31:25 +08:00 committed by Da K. Ma
parent 59b0a5c9f3
commit 26b05f511d
21 changed files with 818 additions and 427 deletions

View File

@ -17,7 +17,7 @@ cli:
generate-code:
go build -o ${BIN_DIR}/deepcopy-gen ./cmd/deepcopy-gen/
${BIN_DIR}/deepcopy-gen -i ./pkg/apis/batch/v1alpha1/ -O zz_generated.deepcopy
${BIN_DIR}/deepcopy-gen -i ./pkg/apis/core/v1alpha1/ -O zz_generated.deepcopy
${BIN_DIR}/deepcopy-gen -i ./pkg/apis/bus/v1alpha1/ -O zz_generated.deepcopy
e2e-test:
./hack/run-e2e.sh

View File

@ -30,29 +30,13 @@ spec:
input:
description: The volume mount for input of Job
properties:
claim:
volumeClaim:
description: VolumeClaim defines the PVC used by the VolumeMount.
type: object
mountPath:
description: Path within the container at which the volume should
be mounted. Must not contain ':'.
type: string
mountPropagation:
description: mountPropagation determines how mounts are propagated
from the host to container and the other way around. When not
set, MountPropagationNone is used. This field is beta in 1.10.
type: string
name:
description: This must match the Name of a Volume.
type: string
readOnly:
description: Mounted read-only if true, read-write otherwise (false
or unspecified). Defaults to false.
type: boolean
subPath:
description: Path within the volume from which the container's volume
should be mounted. Defaults to "" (volume's root).
type: string
required:
- mountPath
type: object
@ -63,29 +47,13 @@ spec:
output:
description: The volume mount for output of Job
properties:
claim:
volumeClaim:
description: VolumeClaim defines the PVC used by the VolumeMount.
type: object
mountPath:
description: Path within the container at which the volume should
be mounted. Must not contain ':'.
type: string
mountPropagation:
description: mountPropagation determines how mounts are propagated
from the host to container and the other way around. When not
set, MountPropagationNone is used. This field is beta in 1.10.
type: string
name:
description: This must match the Name of a Volume.
type: string
readOnly:
description: Mounted read-only if true, read-write otherwise (false
or unspecified). Defaults to false.
type: boolean
subPath:
description: Path within the volume from which the container's volume
should be mounted. Defaults to "" (volume's root).
type: string
required:
- mountPath
type: object

View File

@ -4,11 +4,14 @@ metadata:
name: test-job
spec:
minAvailable: 3
policies:
- event: PodEvicted
action: RestartJob
input:
mountPath: "/myinput"
output:
mountPath: "/myoutput"
claim:
volumeClaim:
accessModes: [ "ReadWriteOnce" ]
storageClassName: "my-storage-class"
resources:

View File

@ -64,10 +64,12 @@ type JobSpec struct {
// VolumeSpec defines the specification of Volume, e.g. PVC
type VolumeSpec struct {
v1.VolumeMount `json:",inline"`
// Path within the container at which the volume should be mounted. Must
// not contain ':'.
MountPath string `json:"mountPath" protobuf:"bytes,1,opt,name=mountPath"`
// VolumeClaim defines the PVC used by the VolumeMount.
VolumeClaim *v1.PersistentVolumeClaimSpec `json:"claim,omitempty" protobuf:"bytes,1,opt,name=claim"`
VolumeClaim *v1.PersistentVolumeClaimSpec `json:"volumeClaim,omitempty" protobuf:"bytes,1,opt,name=volumeClaim"`
}
// Event represent the phase of Job, e.g. pod-failed.
@ -75,7 +77,7 @@ type Event string
const (
// AllEvent means all event
AllEvents Event = "*"
AnyEvent Event = "*"
// PodFailedEvent is triggered if Pod was failed
PodFailedEvent Event = "PodFailed"
// PodEvictedEvent is triggered if Pod was deleted
@ -165,8 +167,8 @@ const (
Completed JobPhase = "Completed"
// Terminating is the phase that the Job is terminated, waiting for releasing pods
Terminating JobPhase = "Terminating"
// Teriminated is the phase that the job is finished unexpected, e.g. events
Teriminated JobPhase = "Terminated"
// Terminated is the phase that the job is finished unexpected, e.g. events
Terminated JobPhase = "Terminated"
)
// JobState contains details for the current state of the job.
@ -189,25 +191,29 @@ type JobStatus struct {
// Current state of Job.
State JobState `json:"state,omitempty" protobuf:"bytes,1,opt,name=state"`
// The minimal available pods to run for this Job
// +optional
MinAvailable int32 `json:"minAvailable,omitempty" protobuf:"bytes,2,opt,name=minAvailable"`
// The number of pending pods.
// +optional
Pending int32 `json:"pending,omitempty" protobuf:"bytes,2,opt,name=pending"`
Pending int32 `json:"pending,omitempty" protobuf:"bytes,3,opt,name=pending"`
// The number of running pods.
// +optional
Running int32 `json:"running,omitempty" protobuf:"bytes,3,opt,name=running"`
Running int32 `json:"running,omitempty" protobuf:"bytes,4,opt,name=running"`
// The number of pods which reached phase Succeeded.
// +optional
Succeeded int32 `json:"Succeeded,omitempty" protobuf:"bytes,4,opt,name=succeeded"`
Succeeded int32 `json:"Succeeded,omitempty" protobuf:"bytes,5,opt,name=succeeded"`
// The number of pods which reached phase Failed.
// +optional
Failed int32 `json:"failed,omitempty" protobuf:"bytes,5,opt,name=failed"`
Failed int32 `json:"failed,omitempty" protobuf:"bytes,6,opt,name=failed"`
// The minimal available pods to run for this Job
// The number of pods which reached phase Terminating.
// +optional
MinAvailable int32 `json:"minAvailable,omitempty" protobuf:"bytes,6,opt,name=minAvailable"`
Terminating int32 `json:"terminating,omitempty" protobuf:"bytes,7,opt,name=terminating"`
}
// +k8s:deepcopy-gen:interfaces=k8s.io/apimachinery/pkg/runtime.Object

View File

@ -1,7 +1,7 @@
// +build !ignore_autogenerated
/*
Copyright The Kubernetes Authors.
Copyright 2019 The Vulcan Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
@ -208,7 +208,6 @@ func (in *TaskSpec) DeepCopy() *TaskSpec {
// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil.
func (in *VolumeSpec) DeepCopyInto(out *VolumeSpec) {
*out = *in
in.VolumeMount.DeepCopyInto(&out.VolumeMount)
if in.VolumeClaim != nil {
in, out := &in.VolumeClaim, &out.VolumeClaim
*out = new(corev1.PersistentVolumeClaimSpec)

View File

@ -1,7 +1,7 @@
// +build !ignore_autogenerated
/*
Copyright The Kubernetes Authors.
Copyright 2019 The Vulcan Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.

View File

@ -21,22 +21,10 @@ import (
"github.com/golang/glog"
"k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/informers"
coreinformers "k8s.io/client-go/informers/core/v1"
"k8s.io/client-go/kubernetes"
corelisters "k8s.io/client-go/listers/core/v1"
"k8s.io/client-go/rest"
"k8s.io/client-go/tools/cache"
kbver "github.com/kubernetes-sigs/kube-batch/pkg/client/clientset/versioned"
kbinfoext "github.com/kubernetes-sigs/kube-batch/pkg/client/informers/externalversions"
kbinfo "github.com/kubernetes-sigs/kube-batch/pkg/client/informers/externalversions/scheduling/v1alpha1"
kblister "github.com/kubernetes-sigs/kube-batch/pkg/client/listers/scheduling/v1alpha1"
vkbatchv1 "hpw.cloud/volcano/pkg/apis/batch/v1alpha1"
v1corev1 "hpw.cloud/volcano/pkg/apis/bus/v1alpha1"
"hpw.cloud/volcano/pkg/apis/helpers"
@ -47,6 +35,15 @@ import (
vkbatchlister "hpw.cloud/volcano/pkg/client/listers/batch/v1alpha1"
vkcorelister "hpw.cloud/volcano/pkg/client/listers/bus/v1alpha1"
"hpw.cloud/volcano/pkg/controllers/job/state"
"k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/informers"
coreinformers "k8s.io/client-go/informers/core/v1"
"k8s.io/client-go/kubernetes"
corelisters "k8s.io/client-go/listers/core/v1"
"k8s.io/client-go/rest"
"k8s.io/client-go/tools/cache"
)
// Controller the Job Controller type
@ -173,15 +170,8 @@ func NewJobController(config *rest.Config) *Controller {
cc.pgSynced = cc.pgInformer.Informer().HasSynced
// Register actions
actionFns := map[vkbatchv1.Action]state.ActionFn{
vkbatchv1.ResumeJobAction: cc.resumeJob,
vkbatchv1.SyncJobAction: cc.syncJob,
vkbatchv1.AbortJobAction: cc.abortJob,
vkbatchv1.TerminateJobAction: cc.terminateJob,
vkbatchv1.RestartJobAction: cc.restartJob,
vkbatchv1.RestartTaskAction: cc.syncJob,
}
state.RegisterActions(actionFns)
state.SyncJob = cc.syncJob
state.KillJob = cc.killJob
return cc
}
@ -203,52 +193,54 @@ func (cc *Controller) Run(stopCh <-chan struct{}) {
glog.Infof("JobController is running ...... ")
}
type Request struct {
Namespace string
JobName string
PodName string
Event vkbatchv1.Event
Action vkbatchv1.Action
Reason string
Message string
}
func (cc *Controller) worker() {
obj := cache.Pop(cc.eventQueue)
if obj == nil {
glog.Errorf("Fail to pop item from eventQueue")
}
req := obj.(*state.Request)
if req.Target != nil {
if job, err := cc.jobLister.Jobs(req.Namespace).Get(req.Target.Name); err != nil {
req.Job = job
}
}
if req.Job == nil {
if req.Pod == nil {
glog.Errorf("Empty data for request %v", req)
return
}
jobs, err := cc.jobLister.List(labels.Everything())
if err != nil {
glog.Errorf("Failed to list Jobs for Pod %v/%v", req.Pod.Namespace, req.Pod.Name)
}
// TODO(k82cn): select by UID instead of loop
ctl := helpers.GetController(req.Pod)
for _, j := range jobs {
if j.UID == ctl {
req.Job = j
break
}
}
}
if req.Job == nil {
glog.Errorf("No Job for request %v from pod %v/%v",
req.Event, req.Pod.Namespace, req.Pod.Name)
return
}
st := state.NewState(req)
if err := st.Execute(); err != nil {
glog.Errorf("Failed to handle Job %s/%s: %v",
req.Job.Namespace, req.Job.Name, err)
req := obj.(*Request)
job, err := cc.jobLister.Jobs(req.Namespace).Get(req.JobName)
if err != nil {
return
}
st := state.NewState(job)
if st == nil {
glog.Errorf("Invalid state <%s> of Job <%v/%v>",
job.Status.State, job.Namespace, job.Name)
return
}
action := req.Action
if len(action) == 0 {
pod, err := cc.podLister.Pods(req.Namespace).Get(req.PodName)
if err != nil {
pod = nil
}
action = applyPolicies(req.Event, job, pod)
}
if err := st.Execute(action, req.Reason, req.Message); err != nil {
glog.Errorf("Failed to handle Job <%s/%s>: %v",
job.Namespace, job.Name, err)
// If any error, requeue it.
// TODO(k82cn): replace with RateLimteQueue
cc.eventQueue.Add(req)
if e := cc.eventQueue.Add(req); e != nil {
glog.Errorf("Failed to reqeueue request <%v>", req)
}
}
}

View File

@ -28,61 +28,125 @@ import (
kbv1 "github.com/kubernetes-sigs/kube-batch/pkg/apis/scheduling/v1alpha1"
vkapi "hpw.cloud/volcano/pkg/apis/batch/v1alpha1"
vkv1 "hpw.cloud/volcano/pkg/apis/batch/v1alpha1"
"hpw.cloud/volcano/pkg/apis/helpers"
"hpw.cloud/volcano/pkg/controllers/job/state"
)
func (cc *Controller) resumeJob(req *state.Request) error {
switch req.Reason {
}
return nil
}
func (cc *Controller) abortJob(req *state.Request) error {
switch req.Reason {
}
return nil
}
func (cc *Controller) terminateJob(req *state.Request) error {
switch req.Reason {
}
return nil
}
func (cc *Controller) restartJob(req *state.Request) error {
switch req.Reason {
}
return nil
}
func (cc *Controller) syncJob(req *state.Request) error {
j := req.Job
job, err := cc.jobLister.Jobs(j.Namespace).Get(j.Name)
func (cc *Controller) killJob(job *vkv1.Job, nextState state.NextStateFn) error {
job, err := cc.jobLister.Jobs(job.Namespace).Get(job.Name)
if err != nil {
if apierrors.IsNotFound(err) {
glog.V(3).Infof("Job has been deleted: %v", j.Name)
glog.V(3).Infof("Job has been deleted: %v", job.Name)
return nil
}
return err
}
podsMap, err := getPodsForJob(cc.podLister, job)
if err != nil {
return err
}
if job.DeletionTimestamp != nil {
glog.Infof("Job <%s/%s> is terminating, skip management process.",
job.Namespace, job.Name)
return nil
}
podsMap, err := getPodsForJob(cc.podLister, job)
if err != nil {
return err
}
var pending, running, terminating, succeeded, failed int32
var errs []error
var total int
for _, pods := range podsMap {
for _, pod := range pods {
total++
switch pod.Status.Phase {
case v1.PodRunning:
err := cc.kubeClients.CoreV1().Pods(pod.Namespace).Delete(pod.Name, nil)
if err != nil {
running++
errs = append(errs, err)
continue
}
terminating++
case v1.PodPending:
err := cc.kubeClients.CoreV1().Pods(pod.Namespace).Delete(pod.Name, nil)
if err != nil {
pending++
errs = append(errs, err)
continue
}
terminating++
case v1.PodSucceeded:
succeeded++
case v1.PodFailed:
failed++
}
}
}
if len(errs) != 0 {
return fmt.Errorf("failed to kill %d pods of %d", len(errs), total)
}
job.Status = vkv1.JobStatus{
Pending: pending,
Running: running,
Succeeded: succeeded,
Failed: failed,
Terminating: terminating,
MinAvailable: int32(job.Spec.MinAvailable),
}
if nextState != nil {
job.Status.State = nextState(job.Status)
}
if _, err := cc.vkClients.BatchV1alpha1().Jobs(job.Namespace).Update(job); err != nil {
glog.Errorf("Failed to update status of Job %v/%v: %v",
job.Namespace, job.Name, err)
return err
}
if err := cc.kbClients.SchedulingV1alpha1().PodGroups(job.Namespace).Delete(job.Name, nil); err != nil {
glog.Errorf("Failed to delete PodGroup of Job %v/%v: %v",
job.Namespace, job.Name, err)
return err
}
if err := cc.kubeClients.CoreV1().Services(job.Namespace).Delete(job.Name, nil); err != nil {
glog.Errorf("Failed to delete Service of Job %v/%v: %v",
job.Namespace, job.Name, err)
return err
}
// NOTE(k82cn): DO NOT delete input/output until job is deleted.
return nil
}
func (cc *Controller) syncJob(job *vkv1.Job, nextState state.NextStateFn) error {
job, err := cc.jobLister.Jobs(job.Namespace).Get(job.Name)
if err != nil {
if apierrors.IsNotFound(err) {
return nil
}
return err
}
if job.DeletionTimestamp != nil {
glog.Infof("Job <%s/%s> is terminating, skip management process.",
job.Namespace, job.Name)
return nil
}
podsMap, err := getPodsForJob(cc.podLister, job)
if err != nil {
return err
}
glog.V(3).Infof("Start to manage job <%s/%s>", job.Namespace, job.Name)
// TODO(k82cn): add WebHook to validate job.
@ -90,14 +154,155 @@ func (cc *Controller) syncJob(req *state.Request) error {
glog.Errorf("Failed to validate Job <%s/%s>: %v", job.Namespace, job.Name, err)
}
// If PodGroup does not exist, create one for Job.
if _, err := cc.pgLister.PodGroups(job.Namespace).Get(job.Name); err != nil {
if err := cc.createPodGroupIfNotExist(job); err != nil {
return err
}
if err := cc.createJobIOIfNotExist(job); err != nil {
return err
}
if err := cc.createServiceIfNotExist(job); err != nil {
return err
}
var podToCreate []*v1.Pod
var podToDelete []*v1.Pod
var running, pending, terminating, succeeded, failed int32
for _, ts := range job.Spec.Tasks {
name := ts.Template.Name
// TODO(k82cn): the template name should be set in default func.
if len(name) == 0 {
name = vkv1.DefaultTaskSpec
}
pods, found := podsMap[name]
if !found {
pods = map[string]*v1.Pod{}
}
for i := 0; i < int(ts.Replicas); i++ {
podName := fmt.Sprintf(TaskNameFmt, job.Name, name, i)
if pod, found := pods[podName]; !found {
newPod := createJobPod(job, &ts.Template, i)
podToCreate = append(podToCreate, newPod)
} else {
switch pod.Status.Phase {
case v1.PodPending:
if pod.DeletionTimestamp != nil {
terminating++
} else {
pending++
}
case v1.PodRunning:
if pod.DeletionTimestamp != nil {
terminating++
} else {
running++
} /**/
case v1.PodSucceeded:
succeeded++
case v1.PodFailed:
failed++
}
delete(pods, podName)
}
}
for _, pod := range pods {
podToDelete = append(podToDelete, pod)
}
var creationErrs []error
waitCreationGroup := sync.WaitGroup{}
waitCreationGroup.Add(len(podToCreate))
for _, pod := range podToCreate {
go func(pod *v1.Pod) {
defer waitCreationGroup.Done()
_, err := cc.kubeClients.CoreV1().Pods(pod.Namespace).Create(pod)
if err != nil {
// Failed to create Pod, waitCreationGroup a moment and then create it again
// This is to ensure all podsMap under the same Job created
// So gang-scheduling could schedule the Job successfully
glog.Errorf("Failed to create pod %s for Job %s, err %#v",
pod.Name, job.Name, err)
creationErrs = append(creationErrs, err)
} else {
pending++
glog.V(3).Infof("Created Task <%s> of Job <%s/%s>",
pod.Name, job.Namespace, job.Name)
}
}(pod)
}
waitCreationGroup.Wait()
if len(creationErrs) != 0 {
return fmt.Errorf("failed to create %d pods of %d", len(creationErrs), len(podToCreate))
}
// Delete unnecessary pods.
var deletionErrs []error
waitDeletionGroup := sync.WaitGroup{}
waitDeletionGroup.Add(len(podToDelete))
for _, pod := range podToDelete {
go func(pod *v1.Pod) {
defer waitDeletionGroup.Done()
err := cc.kubeClients.CoreV1().Pods(pod.Namespace).Delete(pod.Name, nil)
if err != nil {
// Failed to create Pod, waitCreationGroup a moment and then create it again
// This is to ensure all podsMap under the same Job created
// So gang-scheduling could schedule the Job successfully
glog.Errorf("Failed to delete pod %s for Job %s, err %#v",
pod.Name, job.Name, err)
deletionErrs = append(deletionErrs, err)
} else {
glog.V(3).Infof("Deleted Task <%s> of Job <%s/%s>",
pod.Name, job.Namespace, job.Name)
terminating++
}
}(pod)
}
waitDeletionGroup.Wait()
if len(deletionErrs) != 0 {
return fmt.Errorf("failed to delete %d pods of %d", len(deletionErrs), len(podToDelete))
}
}
job.Status = vkv1.JobStatus{
Pending: pending,
Running: running,
Succeeded: succeeded,
Failed: failed,
Terminating: terminating,
MinAvailable: int32(job.Spec.MinAvailable),
}
if nextState != nil {
job.Status.State = nextState(job.Status)
}
if _, err := cc.vkClients.BatchV1alpha1().Jobs(job.Namespace).Update(job); err != nil {
glog.Errorf("Failed to update status of Job %v/%v: %v",
job.Namespace, job.Name, err)
return err
}
return err
}
func (cc *Controller) createServiceIfNotExist(job *vkv1.Job) error {
// If Service does not exist, create one for Job.
if _, err := cc.svcLister.Services(job.Namespace).Get(job.Name); err != nil {
if !apierrors.IsNotFound(err) {
glog.V(3).Infof("Failed to get PodGroup for Job <%s/%s>: %v",
glog.V(3).Infof("Failed to get Service for Job <%s/%s>: %v",
job.Namespace, job.Name, err)
return err
}
pg := &kbv1.PodGroup{
svc := &v1.Service{
ObjectMeta: metav1.ObjectMeta{
Namespace: job.Namespace,
Name: job.Name,
@ -105,19 +310,27 @@ func (cc *Controller) syncJob(req *state.Request) error {
*metav1.NewControllerRef(job, helpers.JobKind),
},
},
Spec: kbv1.PodGroupSpec{
MinMember: job.Spec.MinAvailable,
Spec: v1.ServiceSpec{
ClusterIP: "None",
Selector: map[string]string{
vkv1.JobNameKey: job.Name,
vkv1.JobNamespaceKey: job.Namespace,
},
},
}
if _, e := cc.kbClients.SchedulingV1alpha1().PodGroups(job.Namespace).Create(pg); e != nil {
glog.V(3).Infof("Failed to create PodGroup for Job <%s/%s>: %v",
if _, e := cc.kubeClients.CoreV1().Services(job.Namespace).Create(svc); e != nil {
glog.V(3).Infof("Failed to create Service for Job <%s/%s>: %v",
job.Namespace, job.Name, err)
return e
}
}
return nil
}
func (cc *Controller) createJobIOIfNotExist(job *vkv1.Job) error {
// If input/output PVC does not exist, create them for Job.
inputPVC := fmt.Sprintf("%s-input", job.Name)
outputPVC := fmt.Sprintf("%s-output", job.Name)
@ -151,14 +364,13 @@ func (cc *Controller) syncJob(req *state.Request) error {
}
}
}
if job.Spec.Output != nil {
if job.Spec.Output.VolumeClaim != nil {
if _, err := cc.pvcLister.PersistentVolumeClaims(job.Namespace).Get(outputPVC); err != nil {
if !apierrors.IsNotFound(err) {
glog.V(3).Infof("Failed to get output PVC for Job <%s/%s>: %v",
job.Namespace, job.Name, err)
return err
//return err
}
pvc := &v1.PersistentVolumeClaim{
@ -182,16 +394,18 @@ func (cc *Controller) syncJob(req *state.Request) error {
}
}
}
return nil
}
// If Service does not exist, create one for Job.
if _, err := cc.svcLister.Services(job.Namespace).Get(job.Name); err != nil {
func (cc *Controller) createPodGroupIfNotExist(job *vkv1.Job) error {
// If PodGroup does not exist, create one for Job.
if _, err := cc.pgLister.PodGroups(job.Namespace).Get(job.Name); err != nil {
if !apierrors.IsNotFound(err) {
glog.V(3).Infof("Failed to get Service for Job <%s/%s>: %v",
glog.V(3).Infof("Failed to get PodGroup for Job <%s/%s>: %v",
job.Namespace, job.Name, err)
return err
}
svc := &v1.Service{
pg := &kbv1.PodGroup{
ObjectMeta: metav1.ObjectMeta{
Namespace: job.Namespace,
Name: job.Name,
@ -199,132 +413,18 @@ func (cc *Controller) syncJob(req *state.Request) error {
*metav1.NewControllerRef(job, helpers.JobKind),
},
},
Spec: v1.ServiceSpec{
ClusterIP: "None",
Selector: map[string]string{
vkapi.JobNameKey: job.Name,
vkapi.JobNamespaceKey: job.Namespace,
},
Spec: kbv1.PodGroupSpec{
MinMember: job.Spec.MinAvailable,
},
}
if _, e := cc.kubeClients.CoreV1().Services(job.Namespace).Create(svc); e != nil {
glog.V(3).Infof("Failed to create Service for Job <%s/%s>: %v",
if _, e := cc.kbClients.SchedulingV1alpha1().PodGroups(job.Namespace).Create(pg); e != nil {
glog.V(3).Infof("Failed to create PodGroup for Job <%s/%s>: %v",
job.Namespace, job.Name, err)
return e
}
}
var podToCreate []*v1.Pod
var podToDelete []*v1.Pod
var running, pending, succeeded, failed int32
for _, ts := range job.Spec.Tasks {
name := ts.Template.Name
// TODO(k82cn): the template name should be set in default func.
if len(name) == 0 {
name = vkapi.DefaultTaskSpec
}
pods, found := podsMap[name]
if !found {
pods = map[string]*v1.Pod{}
}
for i := 0; i < int(ts.Replicas); i++ {
podName := fmt.Sprintf("%s-%s-%d", job.Name, name, i)
if pod, found := pods[podName]; !found {
newPod := createJobPod(job, &ts.Template, i)
podToCreate = append(podToCreate, newPod)
} else {
switch pod.Status.Phase {
case v1.PodPending:
pending++
case v1.PodRunning:
running++
case v1.PodSucceeded:
succeeded++
case v1.PodFailed:
failed++
}
delete(pods, podName)
}
}
for _, pod := range pods {
podToDelete = append(podToDelete, pod)
}
var creationErrs []error
waitCreationGroup := sync.WaitGroup{}
waitCreationGroup.Add(len(podToCreate))
for _, pod := range podToCreate {
go func(pod *v1.Pod) {
defer waitCreationGroup.Done()
_, err := cc.kubeClients.CoreV1().Pods(pod.Namespace).Create(pod)
if err != nil {
// Failed to create Pod, waitCreationGroup a moment and then create it again
// This is to ensure all podsMap under the same Job created
// So gang-scheduling could schedule the Job successfully
glog.Errorf("Failed to create pod %s for Job %s, err %#v",
pod.Name, job.Name, err)
creationErrs = append(creationErrs, err)
} else {
glog.V(3).Infof("Created Task <%s> of Job <%s/%s>",
pod.Name, job.Namespace, job.Name)
}
}(pod)
}
waitCreationGroup.Wait()
if len(creationErrs) != 0 {
return fmt.Errorf("failed to create %d pods of %d", len(creationErrs), len(podToCreate))
}
// Delete unnecessary pods.
var deletionErrs []error
waitDeletionGroup := sync.WaitGroup{}
waitDeletionGroup.Add(len(podToDelete))
for _, pod := range podToDelete {
go func(pod *v1.Pod) {
defer waitDeletionGroup.Done()
err := cc.kubeClients.CoreV1().Pods(pod.Namespace).Delete(pod.Name, &metav1.DeleteOptions{})
if err != nil {
// Failed to create Pod, waitCreationGroup a moment and then create it again
// This is to ensure all podsMap under the same Job created
// So gang-scheduling could schedule the Job successfully
glog.Errorf("Failed to delete pod %s for Job %s, err %#v",
pod.Name, job.Name, err)
deletionErrs = append(deletionErrs, err)
} else {
glog.V(3).Infof("Deleted Task <%s> of Job <%s/%s>",
pod.Name, job.Namespace, job.Name)
}
}(pod)
}
waitDeletionGroup.Wait()
if len(deletionErrs) != 0 {
return fmt.Errorf("failed to delete %d pods of %d", len(deletionErrs), len(podToDelete))
}
}
job.Status = vkapi.JobStatus{
Pending: pending,
Running: running,
Succeeded: succeeded,
Failed: failed,
MinAvailable: int32(job.Spec.MinAvailable),
}
// TODO(k82cn): replaced it with `UpdateStatus` or `Patch`
if _, err := cc.vkClients.BatchV1alpha1().Jobs(job.Namespace).Update(job); err != nil {
glog.Errorf("Failed to update status of Job %v/%v: %v",
job.Namespace, job.Name, err)
return err
}
return err
return nil
}

View File

@ -0,0 +1,21 @@
/*
Copyright 2019 The Volcano Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package job
const (
TaskNameFmt = "%s-%s-%d"
)

View File

@ -1,5 +1,5 @@
/*
Copyright 2017 The Vulcan Authors.
Copyright 2017 The Volcano Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
@ -24,71 +24,76 @@ import (
"k8s.io/api/core/v1"
"k8s.io/client-go/tools/cache"
vkbatch "hpw.cloud/volcano/pkg/apis/batch/v1alpha1"
vkcore "hpw.cloud/volcano/pkg/apis/bus/v1alpha1"
"hpw.cloud/volcano/pkg/controllers/job/state"
vkbatchv1 "hpw.cloud/volcano/pkg/apis/batch/v1alpha1"
vkbusv1 "hpw.cloud/volcano/pkg/apis/bus/v1alpha1"
)
func (cc *Controller) addCommand(obj interface{}) {
cmd, ok := obj.(*vkcore.Command)
cmd, ok := obj.(*vkbusv1.Command)
if !ok {
glog.Errorf("obj is not Command")
return
}
cc.enqueue(&state.Request{
Event: vkbatch.CommandIssuedEvent,
Action: vkbatch.Action(cmd.Action),
cc.eventQueue.Add(&Request{
Namespace: cmd.Namespace,
Target: cmd.TargetObject,
JobName: cmd.TargetObject.Name,
Event: vkbatchv1.CommandIssuedEvent,
Action: vkbatchv1.Action(cmd.Action),
})
}
func (cc *Controller) addJob(obj interface{}) {
job, ok := obj.(*vkbatch.Job)
job, ok := obj.(*vkbatchv1.Job)
if !ok {
glog.Errorf("obj is not Job")
return
}
cc.enqueue(&state.Request{
Event: vkbatch.OutOfSyncEvent,
Job: job,
cc.eventQueue.Add(&Request{
Namespace: job.Namespace,
JobName: job.Name,
Event: vkbatchv1.OutOfSyncEvent,
})
}
func (cc *Controller) updateJob(oldObj, newObj interface{}) {
newJob, ok := newObj.(*vkbatch.Job)
newJob, ok := newObj.(*vkbatchv1.Job)
if !ok {
glog.Errorf("newObj is not Job")
return
}
oldJob, ok := oldObj.(*vkbatch.Job)
oldJob, ok := oldObj.(*vkbatchv1.Job)
if !ok {
glog.Errorf("oldObj is not Job")
return
}
if !reflect.DeepEqual(oldJob.Spec, newJob.Spec) {
cc.enqueue(&state.Request{
Event: vkbatch.OutOfSyncEvent,
Job: newJob,
cc.eventQueue.Add(&Request{
Namespace: newJob.Namespace,
JobName: newJob.Name,
Event: vkbatchv1.OutOfSyncEvent,
})
}
}
func (cc *Controller) deleteJob(obj interface{}) {
job, ok := obj.(*vkbatch.Job)
job, ok := obj.(*vkbatchv1.Job)
if !ok {
glog.Errorf("obj is not Job")
return
}
cc.enqueue(&state.Request{
Event: vkbatch.OutOfSyncEvent,
Job: job,
cc.eventQueue.Add(&Request{
Namespace: job.Namespace,
JobName: job.Name,
Event: vkbatchv1.OutOfSyncEvent,
})
}
@ -99,9 +104,17 @@ func (cc *Controller) addPod(obj interface{}) {
return
}
cc.enqueue(&state.Request{
Event: vkbatch.OutOfSyncEvent,
Pod: pod,
jobName, found := pod.Annotations[vkbatchv1.JobNameKey]
if !found {
return
}
cc.eventQueue.Add(&Request{
Namespace: pod.Namespace,
JobName: jobName,
PodName: pod.Name,
Event: vkbatchv1.OutOfSyncEvent,
})
}
@ -112,9 +125,17 @@ func (cc *Controller) updatePod(oldObj, newObj interface{}) {
return
}
cc.enqueue(&state.Request{
Event: vkbatch.OutOfSyncEvent,
Pod: pod,
jobName, found := pod.Annotations[vkbatchv1.JobNameKey]
if !found {
return
}
cc.eventQueue.Add(&Request{
Namespace: pod.Namespace,
JobName: jobName,
PodName: pod.Name,
Event: vkbatchv1.OutOfSyncEvent,
})
}
@ -135,15 +156,16 @@ func (cc *Controller) deletePod(obj interface{}) {
return
}
cc.enqueue(&state.Request{
Event: vkbatch.OutOfSyncEvent,
Pod: pod,
jobName, found := pod.Annotations[vkbatchv1.JobNameKey]
if !found {
return
}
cc.eventQueue.Add(&Request{
Namespace: pod.Namespace,
JobName: jobName,
PodName: pod.Name,
Event: vkbatchv1.OutOfSyncEvent,
})
}
func (cc *Controller) enqueue(obj interface{}) {
err := cc.eventQueue.Add(obj)
if err != nil {
glog.Errorf("Fail to enqueue Job to update queue, err %v", err)
}
}

View File

@ -18,7 +18,6 @@ package job
import (
"fmt"
"github.com/golang/glog"
"k8s.io/api/core/v1"
@ -30,7 +29,6 @@ import (
vkv1 "hpw.cloud/volcano/pkg/apis/batch/v1alpha1"
"hpw.cloud/volcano/pkg/apis/helpers"
"hpw.cloud/volcano/pkg/controllers/job/state"
)
func validate(job *vkv1.Job) error {
@ -48,22 +46,12 @@ func validate(job *vkv1.Job) error {
}
func eventKey(obj interface{}) (string, error) {
req := obj.(*state.Request)
if req.Pod == nil && req.Job == nil {
return "", fmt.Errorf("empty data for request")
req, ok := obj.(*Request)
if !ok {
return "", fmt.Errorf("failed to convert %v to *Request", obj)
}
if req.Job != nil {
return fmt.Sprintf("%s/%s", req.Job.Namespace, req.Job.Name), nil
}
name, found := req.Pod.Annotations[vkv1.JobNameKey]
if !found {
return "", fmt.Errorf("failed to find job of pod <%s/%s>",
req.Pod.Namespace, req.Pod.Name)
}
return fmt.Sprintf("%s/%s", req.Pod.Namespace, name), nil
return fmt.Sprintf("%s/%s", req.Namespace, req.JobName), nil
}
func createJobPod(job *vkv1.Job, template *v1.PodTemplateSpec, ix int) *v1.Pod {
@ -71,7 +59,7 @@ func createJobPod(job *vkv1.Job, template *v1.PodTemplateSpec, ix int) *v1.Pod {
pod := &v1.Pod{
ObjectMeta: metav1.ObjectMeta{
Name: fmt.Sprintf("%s-%s-%d", job.Name, template.Name, ix),
Name: fmt.Sprintf(TaskNameFmt, job.Name, template.Name, ix),
Namespace: job.Namespace,
OwnerReferences: []metav1.OwnerReference{
*metav1.NewControllerRef(job, helpers.JobKind),
@ -101,8 +89,10 @@ func createJobPod(job *vkv1.Job, template *v1.PodTemplateSpec, ix int) *v1.Pod {
}
for i, c := range pod.Spec.Containers {
vm := job.Spec.Output.VolumeMount
vm.Name = fmt.Sprintf("%s-output", job.Name)
vm := v1.VolumeMount{
MountPath: job.Spec.Output.MountPath,
Name: fmt.Sprintf("%s-output", job.Name),
}
pod.Spec.Containers[i].VolumeMounts = append(c.VolumeMounts, vm)
}
}
@ -126,9 +116,13 @@ func createJobPod(job *vkv1.Job, template *v1.PodTemplateSpec, ix int) *v1.Pod {
}
for i, c := range pod.Spec.Containers {
vm := job.Spec.Input.VolumeMount
vm.Name = fmt.Sprintf("%s-input", job.Name)
vm := v1.VolumeMount{
MountPath: job.Spec.Input.MountPath,
Name: fmt.Sprintf("%s-input", job.Name),
}
pod.Spec.Containers[i].VolumeMounts = append(c.VolumeMounts, vm)
}
}
@ -194,3 +188,33 @@ func getPodsForJob(podLister corelisters.PodLister, job *vkv1.Job) (map[string]m
return pods, nil
}
func applyPolicies(event vkv1.Event, job *vkv1.Job, pod *v1.Pod) vkv1.Action {
// Overwrite Job level policies
if pod != nil {
// Parse task level policies
if taskName, found := pod.Annotations[vkv1.TaskSpecKey]; found {
for _, task := range job.Spec.Tasks {
if task.Name == taskName {
for _, policy := range task.Policies {
if policy.Event == event || policy.Event == vkv1.AnyEvent {
return policy.Action
}
}
}
}
} else {
glog.Errorf("Failed to find taskSpecKey in Pod <%s/%s>",
pod.Namespace, pod.Name)
}
}
// Parse Job level policies
for _, policy := range job.Spec.Policies {
if policy.Event == event || policy.Event == vkv1.AnyEvent {
return policy.Action
}
}
return vkv1.SyncJobAction
}

View File

@ -20,19 +20,21 @@ import (
vkv1 "hpw.cloud/volcano/pkg/apis/batch/v1alpha1"
)
type restartingState struct {
request *Request
policies map[vkv1.Event]vkv1.Action
type abortedState struct {
job *vkv1.Job
}
func (ps *restartingState) Execute() error {
action := ps.policies[ps.request.Event]
func (as *abortedState) Execute(action vkv1.Action, reason string, msg string) (error) {
switch action {
case vkv1.RestartJobAction, vkv1.RestartTaskAction:
// Already in Restarting phase, just sync it
return actionFns[vkv1.SyncJobAction](ps.request)
case vkv1.ResumeJobAction:
return SyncJob(as.job, func(status vkv1.JobStatus) vkv1.JobState {
return vkv1.JobState{
Phase: vkv1.Restarting,
Reason: reason,
Message: msg,
}
})
default:
fn := actionFns[action]
return fn(ps.request)
return KillJob(as.job, nil)
}
}

View File

@ -0,0 +1,56 @@
/*
Copyright 2017 The Volcano Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package state
import (
vkv1 "hpw.cloud/volcano/pkg/apis/batch/v1alpha1"
)
type abortingState struct {
job *vkv1.Job
}
func (ps *abortingState) Execute(action vkv1.Action, reason string, msg string) (error) {
switch action {
case vkv1.ResumeJobAction:
// Already in Restarting phase, just sync it
return SyncJob(ps.job, func(status vkv1.JobStatus) vkv1.JobState {
return vkv1.JobState{
Phase: vkv1.Restarting,
Reason: reason,
Message: msg,
}
})
default:
return KillJob(ps.job, func(status vkv1.JobStatus) vkv1.JobState {
// If any "alive" pods, still in Aborting phase
if status.Terminating != 0 || status.Pending != 0 || status.Running != 0 {
return vkv1.JobState{
Phase: vkv1.Aborting,
Reason: reason,
Message: msg,
}
}
return vkv1.JobState{
Phase: vkv1.Aborted,
Reason: reason,
Message: msg,
}
})
}
}

View File

@ -17,54 +17,42 @@ limitations under the License.
package state
import (
"k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
kbv1 "github.com/kubernetes-sigs/kube-batch/pkg/apis/scheduling/v1alpha1"
vkv1 "hpw.cloud/volcano/pkg/apis/batch/v1alpha1"
)
type Request struct {
Event vkv1.Event
Action vkv1.Action
type NextStateFn func(status vkv1.JobStatus) vkv1.JobState
type ActionFn func(job *vkv1.Job, fn NextStateFn) error
Namespace string
Target *metav1.OwnerReference
Job *vkv1.Job
Pod *v1.Pod
PodGroup *kbv1.PodGroup
Reason string
Message string
}
type ActionFn func(req *Request) error
var actionFns = map[vkv1.Action]ActionFn{}
func RegisterActions(afs map[vkv1.Action]ActionFn) {
actionFns = afs
}
var (
// SyncJob will create or delete Pods according to Job's spec.
SyncJob ActionFn
// KillJob kill all Pods of Job.
KillJob ActionFn
)
type State interface {
Execute() error
// Execute executes the actions based on current state.
Execute(act vkv1.Action, reason string, msg string) error
}
func NewState(req *Request) State {
policies := parsePolicies(req)
switch req.Job.Status.State.Phase {
func NewState(job *vkv1.Job) State {
switch job.Status.State.Phase {
case vkv1.Pending:
return &pendingState{job: job}
case vkv1.Running:
return &runningState{job: job}
case vkv1.Restarting:
return &restartingState{
request: req,
policies: policies,
}
default:
return &baseState{
request: req,
policies: policies,
}
return &restartingState{job: job}
case vkv1.Terminated, vkv1.Completed:
return &finishedState{job: job}
case vkv1.Terminating:
return &terminatingState{job: job}
case vkv1.Aborting:
return &abortingState{job: job}
case vkv1.Aborted:
return &abortedState{job: job}
}
// It's pending by default.
return &pendingState{job: job}
}

View File

@ -17,26 +17,15 @@ limitations under the License.
package state
import (
"github.com/golang/glog"
vkv1 "hpw.cloud/volcano/pkg/apis/batch/v1alpha1"
)
type baseState struct {
request *Request
policies map[vkv1.Event]vkv1.Action
type finishedState struct {
job *vkv1.Job
}
func (ps *baseState) Execute() error {
action := ps.policies[ps.request.Event]
glog.V(3).Infof("The action for event <%s> is <%s>",
ps.request.Event, action)
switch action {
case vkv1.RestartJobAction:
fn := actionFns[action]
return fn(ps.request)
default:
fn := actionFns[action]
return fn(ps.request)
}
func (ps *finishedState) Execute(action vkv1.Action, reason string, msg string) (error) {
// In finished state, e.g. Completed, always kill the whole job.
return KillJob(ps.job, nil)
}

View File

@ -0,0 +1,57 @@
/*
Copyright 2017 The Volcano Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package state
import (
vkv1 "hpw.cloud/volcano/pkg/apis/batch/v1alpha1"
)
type pendingState struct {
job *vkv1.Job
}
func (ps *pendingState) Execute(action vkv1.Action, reason string, msg string) error {
switch action {
case vkv1.RestartJobAction:
return KillJob(ps.job, func(status vkv1.JobStatus) vkv1.JobState {
phase := vkv1.Pending
if status.Terminating != 0 {
phase = vkv1.Restarting
}
return vkv1.JobState{
Phase: phase,
Reason: reason,
Message: msg,
}
})
default:
return SyncJob(ps.job, func(status vkv1.JobStatus) vkv1.JobState {
total := totalTasks(ps.job)
phase := vkv1.Pending
if total == status.Running {
phase = vkv1.Running
}
return vkv1.JobState{
Phase: phase,
Reason: reason,
Message: msg,
}
})
}
}

View File

@ -0,0 +1,45 @@
/*
Copyright 2017 The Volcano Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package state
import (
vkv1 "hpw.cloud/volcano/pkg/apis/batch/v1alpha1"
)
type restartingState struct {
job *vkv1.Job
}
func (ps *restartingState) Execute(action vkv1.Action, reason string, msg string) ( error) {
return SyncJob(ps.job, func(status vkv1.JobStatus) vkv1.JobState {
phase := vkv1.Restarting
if status.Terminating == 0 {
if status.Pending == 0 && status.Running != 0 {
phase = vkv1.Running
} else {
phase = vkv1.Pending
}
}
return vkv1.JobState{
Phase: phase,
Reason: reason,
Message: msg,
}
})
}

View File

@ -0,0 +1,82 @@
/*
Copyright 2017 The Volcano Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package state
import (
vkv1 "hpw.cloud/volcano/pkg/apis/batch/v1alpha1"
)
type runningState struct {
job *vkv1.Job
}
func (ps *runningState) Execute(action vkv1.Action, reason string, msg string) (error) {
switch action {
case vkv1.RestartJobAction:
return KillJob(ps.job, func(status vkv1.JobStatus) vkv1.JobState {
phase := vkv1.Running
if status.Terminating != 0 {
phase = vkv1.Restarting
}
return vkv1.JobState{
Phase: phase,
Reason: reason,
Message: msg,
}
})
case vkv1.AbortJobAction:
return KillJob(ps.job, func(status vkv1.JobStatus) vkv1.JobState {
phase := vkv1.Running
if status.Terminating != 0 {
phase = vkv1.Aborting
}
return vkv1.JobState{
Phase: phase,
Reason: reason,
Message: msg,
}
})
case vkv1.TerminateJobAction:
return KillJob(ps.job, func(status vkv1.JobStatus) vkv1.JobState {
phase := vkv1.Running
if status.Terminating != 0 {
phase = vkv1.Terminating
}
return vkv1.JobState{
Phase: phase,
Reason: reason,
Message: msg,
}
})
default:
return SyncJob(ps.job, func(status vkv1.JobStatus) vkv1.JobState {
phase := vkv1.Running
if status.Succeeded+status.Failed == totalTasks(ps.job) {
phase = vkv1.Completed
}
return vkv1.JobState{
Phase: phase,
Reason: reason,
Message: msg,
}
})
}
}

View File

@ -0,0 +1,44 @@
/*
Copyright 2017 The Volcano Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package state
import (
vkv1 "hpw.cloud/volcano/pkg/apis/batch/v1alpha1"
)
type terminatingState struct {
job *vkv1.Job
}
func (ps *terminatingState) Execute(action vkv1.Action, reason string, msg string) (error) {
return KillJob(ps.job, func(status vkv1.JobStatus) vkv1.JobState {
// If any "alive" pods, still in Terminating phase
if status.Terminating != 0 || status.Pending != 0 || status.Running != 0 {
return vkv1.JobState{
Phase: vkv1.Terminating,
Reason: reason,
Message: msg,
}
}
return vkv1.JobState{
Phase: vkv1.Terminated,
Reason: reason,
Message: msg,
}
})
}

View File

@ -20,23 +20,12 @@ import (
vkv1 "hpw.cloud/volcano/pkg/apis/batch/v1alpha1"
)
func parsePolicies(req *Request) map[vkv1.Event]vkv1.Action {
actions := map[vkv1.Event]vkv1.Action{}
func totalTasks(job *vkv1.Job) int32 {
var rep int32
// Set Job level policies
for _, policy := range req.Job.Spec.Policies {
actions[policy.Event] = policy.Action
for _, task := range job.Spec.Tasks {
rep += task.Replicas
}
// TODO(k82cn): set task level polices
// Set default action
actions[vkv1.OutOfSyncEvent] = vkv1.SyncJobAction
// Set command action
if len(req.Action) != 0 {
actions[req.Event] = req.Action
}
return actions
return rep
}

View File

@ -344,10 +344,14 @@ func jobUnschedulable(ctx *context, job *vkv1.Job, time time.Time) wait.Conditio
// TODO(k82cn): check Job's Condition instead of PodGroup's event.
return func() (bool, error) {
pg, err := ctx.kbclient.SchedulingV1alpha1().PodGroups(job.Namespace).Get(job.Name, metav1.GetOptions{})
Expect(err).NotTo(HaveOccurred())
if err != nil {
return false, nil
}
events, err := ctx.kubeclient.CoreV1().Events(pg.Namespace).List(metav1.ListOptions{})
Expect(err).NotTo(HaveOccurred())
if err != nil {
return false, nil
}
for _, event := range events.Items {
target := event.InvolvedObject