Add Trivas img.

Signed-off-by: Klaus Ma <mada3@huawei.com>
This commit is contained in:
Klaus Ma 2019-01-14 14:20:43 +08:00 committed by Da K. Ma
parent fdc7770a24
commit d3ef54ebcf
8 changed files with 355 additions and 356 deletions

View File

@ -1,3 +1,5 @@
# Volcano
![travis](https://travis-ci.com/k82cn/volcano.svg?token=Qogbzw9M3Q6qBwA56otg&branch=master)
A Kubernetes-based system for high performance workload.

View File

@ -67,7 +67,7 @@ type VolumeSpec struct {
v1.VolumeMount `json:",inline"`
// VolumeClaim defines the PVC used by the VolumeMount.
VolumeClaim *v1.PersistentVolumeClaim `json:"claim,omitempty" protobuf:"bytes,1,opt,name=claim"`
VolumeClaim *v1.PersistentVolumeClaimSpec `json:"claim,omitempty" protobuf:"bytes,1,opt,name=claim"`
}
// Event represent the phase of Job, e.g. pod-failed.
@ -75,11 +75,11 @@ type Event string
const (
// AllEvent means all event
AllEvents Event = "*"
AllEvents Event = "*"
// PodFailedEvent is triggered if Pod was failed
PodFailedEvent Event = "PodFailed"
PodFailedEvent Event = "PodFailed"
// PodEvictedEvent is triggered if Pod was deleted
PodEvictedEvent Event = "PodEvicted"
PodEvictedEvent Event = "PodEvicted"
// JobUnschedulableEvent is triggered if part of pod can be scheduled
// when gang-scheduling enabled
JobUnschedulableEvent Event = "Unschedulable"
@ -91,12 +91,12 @@ type Action string
const (
// AbortJobAction if this action is set, the whole job will be aborted:
// all Pod of Job will be evicted, and no Pod will be recreated
AbortJobAction Action = "AbortJob"
AbortJobAction Action = "AbortJob"
// RestartJobAction if this action is set, the whole job will be restarted
RestartJobAction Action = "RestartJob"
RestartJobAction Action = "RestartJob"
// RestartTaskAction if this action is set, only the task will be restarted; default action.
// This action can not work togther with job level events, e.g. JobUnschedulable
RestartTaskAction Action = "RestartTask"
RestartTaskAction Action = "RestartTask"
// TerminateJobAction if this action is set, the whole job wil be terminated
// and can not be resumed: all Pod of Job will be evicted, and no Pod will be recreated.
TerminateJobAction Action = "TerminateJob"
@ -143,13 +143,15 @@ type JobPhase string
const (
// Pending is the phase that job is pending in the queue, waiting for scheduling decision
Pending JobPhase = "Pending"
Pending JobPhase = "Pending"
// Aborted is the phase that job is aborted by user or error handling
Aborted JobPhase = "Aborted"
Aborted JobPhase = "Aborted"
// Running is the phase that minimal available tasks of Job are running
Running JobPhase = "Running"
Running JobPhase = "Running"
// Restarting is the phase that the Job is restarting
Restarting JobPhase = "Restarting"
// Completed is the phase that all tasks of Job are completed successfully
Completed JobPhase = "Completed"
Completed JobPhase = "Completed"
// Teriminated is the phase that the job is finished unexpected, e.g. events
Teriminated JobPhase = "Terminated"
)

View File

@ -219,7 +219,7 @@ func (in *VolumeSpec) DeepCopyInto(out *VolumeSpec) {
in.VolumeMount.DeepCopyInto(&out.VolumeMount)
if in.VolumeClaim != nil {
in, out := &in.VolumeClaim, &out.VolumeClaim
*out = new(corev1.PersistentVolumeClaim)
*out = new(corev1.PersistentVolumeClaimSpec)
(*in).DeepCopyInto(*out)
}
return

View File

@ -17,16 +17,12 @@ limitations under the License.
package job
import (
"fmt"
"sync"
"time"
"github.com/golang/glog"
"k8s.io/api/core/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/api/meta"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/informers"
@ -36,7 +32,6 @@ import (
"k8s.io/client-go/rest"
"k8s.io/client-go/tools/cache"
kbv1 "github.com/kubernetes-sigs/kube-batch/pkg/apis/scheduling/v1alpha1"
kbver "github.com/kubernetes-sigs/kube-batch/pkg/client/clientset/versioned"
kbinfoext "github.com/kubernetes-sigs/kube-batch/pkg/client/informers/externalversions"
kbinfo "github.com/kubernetes-sigs/kube-batch/pkg/client/informers/externalversions/scheduling/v1alpha1"
@ -181,242 +176,3 @@ func (cc *Controller) worker() {
cc.eventQueue.Add(job)
}
}
func (cc *Controller) syncJob(j *vkapi.Job) error {
job, err := cc.jobLister.Jobs(j.Namespace).Get(j.Name)
if err != nil {
if apierrors.IsNotFound(err) {
glog.V(3).Infof("Job has been deleted: %v", j.Name)
return nil
}
return err
}
pods, err := cc.getPodsForJob(job)
if err != nil {
return err
}
return cc.manageJob(job, pods)
}
func (cc *Controller) getPodsForJob(job *vkapi.Job) (map[string]map[string]*v1.Pod, error) {
pods := map[string]map[string]*v1.Pod{}
// TODO (k82cn): optimist by cache and index of owner; add 'ControlledBy' extended interface.
ps, err := cc.podListr.Pods(job.Namespace).List(labels.Everything())
if err != nil {
return nil, err
}
for _, pod := range ps {
if !metav1.IsControlledBy(pod, job) {
continue
}
if len(pod.Annotations) == 0 {
glog.Errorf("The annotations of pod <%s/%s> is empty", pod.Namespace, pod.Name)
continue
}
tsName, found := pod.Annotations[vkapi.TaskSpecKey]
if found {
// Hash by TaskSpec.Template.Name
if _, exist := pods[tsName]; !exist {
pods[tsName] = make(map[string]*v1.Pod)
}
pods[tsName][pod.Name] = pod
}
}
return pods, nil
}
// manageJob is the core method responsible for managing the number of running
// pods according to what is specified in the job.Spec.
func (cc *Controller) manageJob(job *vkapi.Job, podsMap map[string]map[string]*v1.Pod) error {
var err error
if job.DeletionTimestamp != nil {
glog.Infof("Job <%s/%s> is terminating, skip management process.",
job.Namespace, job.Name)
return nil
}
glog.V(3).Infof("Start to manage job <%s/%s>", job.Namespace, job.Name)
// TODO(k82cn): add WebHook to validate job.
if err := validate(job); err != nil {
glog.Errorf("Failed to validate Job <%s/%s>: %v", job.Namespace, job.Name, err)
}
// If PodGroup does not exist, create one for Job.
if _, err := cc.pgLister.PodGroups(job.Namespace).Get(job.Name); err != nil {
if !apierrors.IsNotFound(err) {
glog.V(3).Infof("Failed to get PodGroup for Job <%s/%s>: %v",
job.Namespace, job.Name, err)
return err
}
pg := &kbv1.PodGroup{
ObjectMeta: metav1.ObjectMeta{
Namespace: job.Namespace,
Name: job.Name,
OwnerReferences: []metav1.OwnerReference{
*metav1.NewControllerRef(job, helpers.JobKind),
},
},
Spec: kbv1.PodGroupSpec{
MinMember: job.Spec.MinAvailable,
},
}
if _, e := cc.kbClients.SchedulingV1alpha1().PodGroups(job.Namespace).Create(pg); e != nil {
glog.V(3).Infof("Failed to create PodGroup for Job <%s/%s>: %v",
job.Namespace, job.Name, err)
return e
}
}
if _, err := cc.svcLister.Services(job.Namespace).Get(job.Name); err != nil {
if !apierrors.IsNotFound(err) {
glog.V(3).Infof("Failed to get Service for Job <%s/%s>: %v",
job.Namespace, job.Name, err)
return err
}
svc := &v1.Service{
ObjectMeta: metav1.ObjectMeta{
Namespace: job.Namespace,
Name: job.Name,
OwnerReferences: []metav1.OwnerReference{
*metav1.NewControllerRef(job, helpers.JobKind),
},
},
Spec: v1.ServiceSpec{
ClusterIP: "None",
Selector: map[string]string{
vkapi.JobNameKey: job.Name,
vkapi.JobNamespaceKey: job.Namespace,
},
},
}
if _, e := cc.kubeClients.CoreV1().Services(job.Namespace).Create(svc); e != nil {
glog.V(3).Infof("Failed to create Service for Job <%s/%s>: %v",
job.Namespace, job.Name, err)
return e
}
}
var podToCreate []*v1.Pod
var podToDelete []*v1.Pod
var running, pending, succeeded, failed int32
for _, ts := range job.Spec.Tasks {
name := ts.Template.Name
// TODO(k82cn): the template name should be set in default func.
if len(name) == 0 {
name = vkapi.DefaultTaskSpec
}
pods, found := podsMap[name]
if !found {
pods = map[string]*v1.Pod{}
}
for i := 0; i < int(ts.Replicas); i++ {
podName := fmt.Sprintf("%s-%s-%d", job.Name, name, i)
if pod, found := pods[podName]; !found {
newPod := createJobPod(job, &ts.Template, i)
podToCreate = append(podToCreate, newPod)
} else {
switch pod.Status.Phase {
case v1.PodPending:
pending ++
case v1.PodRunning:
running++
case v1.PodSucceeded:
succeeded++
case v1.PodFailed:
failed++
}
delete(pods, podName)
}
}
for _, pod := range pods {
podToDelete = append(podToDelete, pod)
}
var creationErrs []error
waitCreationGroup := sync.WaitGroup{}
waitCreationGroup.Add(len(podToCreate))
for _, pod := range podToCreate {
go func(pod *v1.Pod) {
defer waitCreationGroup.Done()
_, err := cc.kubeClients.CoreV1().Pods(pod.Namespace).Create(pod)
if err != nil {
// Failed to create Pod, waitCreationGroup a moment and then create it again
// This is to ensure all podsMap under the same Job created
// So gang-scheduling could schedule the Job successfully
glog.Errorf("Failed to create pod %s for Job %s, err %#v",
pod.Name, job.Name, err)
creationErrs = append(creationErrs, err)
} else {
glog.V(3).Infof("Created Task <%d> of Job <%s/%s>",
pod.Name, job.Namespace, job.Name)
}
}(pod)
}
waitCreationGroup.Wait()
if len(creationErrs) != 0 {
return fmt.Errorf("failed to create %d pods of %d", len(creationErrs), len(podToCreate))
}
// Delete unnecessary pods.
var deletionErrs []error
waitDeletionGroup := sync.WaitGroup{}
waitDeletionGroup.Add(len(podToDelete))
for _, pod := range podToDelete {
go func(pod *v1.Pod) {
defer waitDeletionGroup.Done()
err := cc.kubeClients.CoreV1().Pods(pod.Namespace).Delete(pod.Name, &metav1.DeleteOptions{})
if err != nil {
// Failed to create Pod, waitCreationGroup a moment and then create it again
// This is to ensure all podsMap under the same Job created
// So gang-scheduling could schedule the Job successfully
glog.Errorf("Failed to delete pod %s for Job %s, err %#v",
pod.Name, job.Name, err)
deletionErrs = append(deletionErrs, err)
} else {
glog.V(3).Infof("Deleted Task <%d> of Job <%s/%s>",
pod.Name, job.Namespace, job.Name)
}
}(pod)
}
waitDeletionGroup.Wait()
if len(deletionErrs) != 0 {
return fmt.Errorf("failed to delete %d pods of %d", len(deletionErrs), len(podToDelete))
}
}
job.Status = vkapi.JobStatus{
Pending: pending,
Running: running,
Succeeded: succeeded,
Failed: failed,
MinAvailable: int32(job.Spec.MinAvailable),
}
// TODO(k82cn): replaced it with `UpdateStatus` or `Patch`
if _, err := cc.vkClients.BatchV1alpha1().Jobs(job.Namespace).Update(job); err != nil {
glog.Errorf("Failed to update status of Job %v/%v: %v",
job.Namespace, job.Name, err)
return err
}
return err
}

View File

@ -0,0 +1,273 @@
/*
Copyright 2019 The Vulcan Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package job
import (
"fmt"
"sync"
"github.com/golang/glog"
"k8s.io/api/core/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
kbv1 "github.com/kubernetes-sigs/kube-batch/pkg/apis/scheduling/v1alpha1"
vkapi "hpw.cloud/volcano/pkg/apis/batch/v1alpha1"
"hpw.cloud/volcano/pkg/apis/helpers"
)
func (cc *Controller) syncJob(j *vkapi.Job) error {
job, err := cc.jobLister.Jobs(j.Namespace).Get(j.Name)
if err != nil {
if apierrors.IsNotFound(err) {
glog.V(3).Infof("Job has been deleted: %v", j.Name)
return nil
}
return err
}
pods, err := cc.getPodsForJob(job)
if err != nil {
return err
}
return cc.manageJob(job, pods)
}
func (cc *Controller) getPodsForJob(job *vkapi.Job) (map[string]map[string]*v1.Pod, error) {
pods := map[string]map[string]*v1.Pod{}
// TODO (k82cn): optimist by cache and index of owner; add 'ControlledBy' extended interface.
ps, err := cc.podListr.Pods(job.Namespace).List(labels.Everything())
if err != nil {
return nil, err
}
for _, pod := range ps {
if !metav1.IsControlledBy(pod, job) {
continue
}
if len(pod.Annotations) == 0 {
glog.Errorf("The annotations of pod <%s/%s> is empty", pod.Namespace, pod.Name)
continue
}
tsName, found := pod.Annotations[vkapi.TaskSpecKey]
if found {
// Hash by TaskSpec.Template.Name
if _, exist := pods[tsName]; !exist {
pods[tsName] = make(map[string]*v1.Pod)
}
pods[tsName][pod.Name] = pod
}
}
return pods, nil
}
// manageJob is the core method responsible for managing the number of running
// pods according to what is specified in the job.Spec.
func (cc *Controller) manageJob(job *vkapi.Job, podsMap map[string]map[string]*v1.Pod) error {
var err error
if job.DeletionTimestamp != nil {
glog.Infof("Job <%s/%s> is terminating, skip management process.",
job.Namespace, job.Name)
return nil
}
glog.V(3).Infof("Start to manage job <%s/%s>", job.Namespace, job.Name)
// TODO(k82cn): add WebHook to validate job.
if err := validate(job); err != nil {
glog.Errorf("Failed to validate Job <%s/%s>: %v", job.Namespace, job.Name, err)
}
// If PodGroup does not exist, create one for Job.
if _, err := cc.pgLister.PodGroups(job.Namespace).Get(job.Name); err != nil {
if !apierrors.IsNotFound(err) {
glog.V(3).Infof("Failed to get PodGroup for Job <%s/%s>: %v",
job.Namespace, job.Name, err)
return err
}
pg := &kbv1.PodGroup{
ObjectMeta: metav1.ObjectMeta{
Namespace: job.Namespace,
Name: job.Name,
OwnerReferences: []metav1.OwnerReference{
*metav1.NewControllerRef(job, helpers.JobKind),
},
},
Spec: kbv1.PodGroupSpec{
MinMember: job.Spec.MinAvailable,
},
}
if _, e := cc.kbClients.SchedulingV1alpha1().PodGroups(job.Namespace).Create(pg); e != nil {
glog.V(3).Infof("Failed to create PodGroup for Job <%s/%s>: %v",
job.Namespace, job.Name, err)
return e
}
}
if _, err := cc.svcLister.Services(job.Namespace).Get(job.Name); err != nil {
if !apierrors.IsNotFound(err) {
glog.V(3).Infof("Failed to get Service for Job <%s/%s>: %v",
job.Namespace, job.Name, err)
return err
}
svc := &v1.Service{
ObjectMeta: metav1.ObjectMeta{
Namespace: job.Namespace,
Name: job.Name,
OwnerReferences: []metav1.OwnerReference{
*metav1.NewControllerRef(job, helpers.JobKind),
},
},
Spec: v1.ServiceSpec{
ClusterIP: "None",
Selector: map[string]string{
vkapi.JobNameKey: job.Name,
vkapi.JobNamespaceKey: job.Namespace,
},
},
}
if _, e := cc.kubeClients.CoreV1().Services(job.Namespace).Create(svc); e != nil {
glog.V(3).Infof("Failed to create Service for Job <%s/%s>: %v",
job.Namespace, job.Name, err)
return e
}
}
var podToCreate []*v1.Pod
var podToDelete []*v1.Pod
var running, pending, succeeded, failed int32
for _, ts := range job.Spec.Tasks {
name := ts.Template.Name
// TODO(k82cn): the template name should be set in default func.
if len(name) == 0 {
name = vkapi.DefaultTaskSpec
}
pods, found := podsMap[name]
if !found {
pods = map[string]*v1.Pod{}
}
for i := 0; i < int(ts.Replicas); i++ {
podName := fmt.Sprintf("%s-%s-%d", job.Name, name, i)
if pod, found := pods[podName]; !found {
newPod := createJobPod(job, &ts.Template, i)
podToCreate = append(podToCreate, newPod)
} else {
switch pod.Status.Phase {
case v1.PodPending:
pending++
case v1.PodRunning:
running++
case v1.PodSucceeded:
succeeded++
case v1.PodFailed:
failed++
}
delete(pods, podName)
}
}
for _, pod := range pods {
podToDelete = append(podToDelete, pod)
}
var creationErrs []error
waitCreationGroup := sync.WaitGroup{}
waitCreationGroup.Add(len(podToCreate))
for _, pod := range podToCreate {
go func(pod *v1.Pod) {
defer waitCreationGroup.Done()
_, err := cc.kubeClients.CoreV1().Pods(pod.Namespace).Create(pod)
if err != nil {
// Failed to create Pod, waitCreationGroup a moment and then create it again
// This is to ensure all podsMap under the same Job created
// So gang-scheduling could schedule the Job successfully
glog.Errorf("Failed to create pod %s for Job %s, err %#v",
pod.Name, job.Name, err)
creationErrs = append(creationErrs, err)
} else {
glog.V(3).Infof("Created Task <%d> of Job <%s/%s>",
pod.Name, job.Namespace, job.Name)
}
}(pod)
}
waitCreationGroup.Wait()
if len(creationErrs) != 0 {
return fmt.Errorf("failed to create %d pods of %d", len(creationErrs), len(podToCreate))
}
// Delete unnecessary pods.
var deletionErrs []error
waitDeletionGroup := sync.WaitGroup{}
waitDeletionGroup.Add(len(podToDelete))
for _, pod := range podToDelete {
go func(pod *v1.Pod) {
defer waitDeletionGroup.Done()
err := cc.kubeClients.CoreV1().Pods(pod.Namespace).Delete(pod.Name, &metav1.DeleteOptions{})
if err != nil {
// Failed to create Pod, waitCreationGroup a moment and then create it again
// This is to ensure all podsMap under the same Job created
// So gang-scheduling could schedule the Job successfully
glog.Errorf("Failed to delete pod %s for Job %s, err %#v",
pod.Name, job.Name, err)
deletionErrs = append(deletionErrs, err)
} else {
glog.V(3).Infof("Deleted Task <%d> of Job <%s/%s>",
pod.Name, job.Namespace, job.Name)
}
}(pod)
}
waitDeletionGroup.Wait()
if len(deletionErrs) != 0 {
return fmt.Errorf("failed to delete %d pods of %d", len(deletionErrs), len(podToDelete))
}
}
job.Status = vkapi.JobStatus{
Pending: pending,
Running: running,
Succeeded: succeeded,
Failed: failed,
MinAvailable: int32(job.Spec.MinAvailable),
}
// TODO(k82cn): replaced it with `UpdateStatus` or `Patch`
if _, err := cc.vkClients.BatchV1alpha1().Jobs(job.Namespace).Update(job); err != nil {
glog.Errorf("Failed to update status of Job %v/%v: %v",
job.Namespace, job.Name, err)
return err
}
return err
}

View File

@ -21,7 +21,7 @@ import (
"github.com/golang/glog"
v1 "k8s.io/api/core/v1"
"k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/client-go/tools/cache"

View File

@ -19,10 +19,17 @@ package job
import (
"fmt"
vkapi "hpw.cloud/volcano/pkg/apis/batch/v1alpha1"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/meta"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
kbapi "github.com/kubernetes-sigs/kube-batch/pkg/apis/scheduling/v1alpha1"
vkv1 "hpw.cloud/volcano/pkg/apis/batch/v1alpha1"
"hpw.cloud/volcano/pkg/apis/helpers"
)
func validate(job *vkapi.Job) error {
func validate(job *vkv1.Job) error {
tsNames := map[string]string{}
for _, ts := range job.Spec.Tasks {
@ -35,3 +42,59 @@ func validate(job *vkapi.Job) error {
return nil
}
func eventKey(obj interface{}) (string, error) {
accessor, err := meta.Accessor(obj)
if err != nil {
return "", err
}
return string(accessor.GetUID()), nil
}
func createJobPod(job *vkv1.Job, template *corev1.PodTemplateSpec, ix int) *corev1.Pod {
templateCopy := template.DeepCopy()
pod := &corev1.Pod{
ObjectMeta: metav1.ObjectMeta{
Name: fmt.Sprintf("%s-%s-%d", job.Name, template.Name, ix),
Namespace: job.Namespace,
OwnerReferences: []metav1.OwnerReference{
*metav1.NewControllerRef(job, helpers.JobKind),
},
Labels: templateCopy.Labels,
Annotations: templateCopy.Annotations,
},
Spec: templateCopy.Spec,
}
if len(pod.Annotations) == 0 {
pod.Annotations = make(map[string]string)
}
tsKey := templateCopy.Name
if len(tsKey) == 0 {
tsKey = vkv1.DefaultTaskSpec
}
pod.Annotations[vkv1.TaskSpecKey] = tsKey
if len(pod.Annotations) == 0 {
pod.Annotations = make(map[string]string)
}
pod.Annotations[kbapi.GroupNameAnnotationKey] = job.Name
if len(pod.Labels) == 0 {
pod.Labels = make(map[string]string)
}
// Set pod labels for Service.
pod.Labels[vkv1.JobNameKey] = job.Name
pod.Labels[vkv1.JobNamespaceKey] = job.Namespace
// we fill the schedulerName in the pod definition with the one specified in the QJ template
if job.Spec.SchedulerName != "" && pod.Spec.SchedulerName == "" {
pod.Spec.SchedulerName = job.Spec.SchedulerName
}
return pod
}

View File

@ -1,97 +0,0 @@
/*
Copyright 2018 The Volcano Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package job
import (
"fmt"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/meta"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
kbapi "github.com/kubernetes-sigs/kube-batch/pkg/apis/scheduling/v1alpha1"
vkv1 "hpw.cloud/volcano/pkg/apis/batch/v1alpha1"
"hpw.cloud/volcano/pkg/apis/helpers"
)
// filterPods returns pods based on their phase.
func filterPods(pods []*corev1.Pod, phase corev1.PodPhase) int {
result := 0
for i := range pods {
if phase == pods[i].Status.Phase {
result++
}
}
return result
}
func eventKey(obj interface{}) (string, error) {
accessor, err := meta.Accessor(obj)
if err != nil {
return "", err
}
return string(accessor.GetUID()), nil
}
func createJobPod(job *vkv1.Job, template *corev1.PodTemplateSpec, ix int) *corev1.Pod {
templateCopy := template.DeepCopy()
pod := &corev1.Pod{
ObjectMeta: metav1.ObjectMeta{
Name: fmt.Sprintf("%s-%s-%d", job.Name, template.Name, ix),
Namespace: job.Namespace,
OwnerReferences: []metav1.OwnerReference{
*metav1.NewControllerRef(job, helpers.JobKind),
},
Labels: templateCopy.Labels,
Annotations: templateCopy.Annotations,
},
Spec: templateCopy.Spec,
}
if len(pod.Annotations) == 0 {
pod.Annotations = make(map[string]string)
}
tsKey := templateCopy.Name
if len(tsKey) == 0 {
tsKey = vkv1.DefaultTaskSpec
}
pod.Annotations[vkv1.TaskSpecKey] = tsKey
if len(pod.Annotations) == 0 {
pod.Annotations = make(map[string]string)
}
pod.Annotations[kbapi.GroupNameAnnotationKey] = job.Name
if len(pod.Labels) == 0 {
pod.Labels = make(map[string]string)
}
// Set pod labels for Service.
pod.Labels[vkv1.JobNameKey] = job.Name
pod.Labels[vkv1.JobNamespaceKey] = job.Namespace
// we fill the schedulerName in the pod definition with the one specified in the QJ template
if job.Spec.SchedulerName != "" && pod.Spec.SchedulerName == "" {
pod.Spec.SchedulerName = job.Spec.SchedulerName
}
return pod
}