mpi-operator/test/e2e/mpi_job_test.go

796 lines
29 KiB
Go

// Copyright 2021 The Kubeflow Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package e2e
import (
"context"
"fmt"
"io"
"time"
"github.com/google/go-cmp/cmp"
"github.com/onsi/ginkgo"
"github.com/onsi/gomega"
batchv1 "k8s.io/api/batch/v1"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/util/intstr"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/utils/ptr"
schedv1alpha1 "sigs.k8s.io/scheduler-plugins/apis/scheduling/v1alpha1"
kubeflow "github.com/kubeflow/mpi-operator/pkg/apis/kubeflow/v2beta1"
"github.com/kubeflow/mpi-operator/test/util"
)
var _ = ginkgo.Describe("MPIJob", func() {
var (
namespace string
mpiJob *kubeflow.MPIJob
)
ginkgo.BeforeEach(func() {
ns := &corev1.Namespace{
ObjectMeta: metav1.ObjectMeta{
GenerateName: "e2e-",
},
}
var err error
ns, err = k8sClient.CoreV1().Namespaces().Create(context.Background(), ns, metav1.CreateOptions{})
gomega.Expect(err).ToNot(gomega.HaveOccurred())
namespace = ns.Name
})
ginkgo.AfterEach(func() {
if namespace != "" {
err := k8sClient.CoreV1().Namespaces().Delete(context.Background(), namespace, metav1.DeleteOptions{})
if !errors.IsNotFound(err) {
gomega.Expect(err).ToNot(gomega.HaveOccurred())
}
}
})
ginkgo.BeforeEach(func() {
mpiJob = &kubeflow.MPIJob{
ObjectMeta: metav1.ObjectMeta{
Name: "pi",
Namespace: namespace,
},
Spec: kubeflow.MPIJobSpec{
MPIReplicaSpecs: map[kubeflow.MPIReplicaType]*kubeflow.ReplicaSpec{
kubeflow.MPIReplicaTypeLauncher: {
RestartPolicy: kubeflow.RestartPolicyOnFailure,
},
kubeflow.MPIReplicaTypeWorker: {
Replicas: ptr.To[int32](2),
},
},
},
}
})
ginkgo.Context("with OpenMPI implementation", func() {
ginkgo.BeforeEach(func() {
createMPIJobWithOpenMPI(mpiJob)
})
ginkgo.When("has malformed command", func() {
ginkgo.BeforeEach(func() {
mpiJob.Spec.RunPolicy.BackoffLimit = ptr.To[int32](1)
})
ginkgo.It("should fail", func() {
mpiJob := createJobAndWaitForCompletion(mpiJob)
expectConditionToBeTrue(mpiJob, kubeflow.JobFailed)
})
})
ginkgo.When("running as root", func() {
ginkgo.BeforeEach(func() {
launcherContainer := &mpiJob.Spec.MPIReplicaSpecs[kubeflow.MPIReplicaTypeLauncher].Template.Spec.Containers[0]
launcherContainer.Command = append(launcherContainer.Command, "--allow-run-as-root")
})
ginkgo.It("should succeed", func() {
mpiJob := createJobAndWaitForCompletion(mpiJob)
expectConditionToBeTrue(mpiJob, kubeflow.JobSucceeded)
})
ginkgo.When("suspended on creation", func() {
ginkgo.BeforeEach(func() {
mpiJob.Spec.RunPolicy.Suspend = ptr.To(true)
})
ginkgo.It("should not create pods when suspended and succeed when resumed", func() {
ctx := context.Background()
mpiJob := createJob(ctx, mpiJob)
ginkgo.By("verifying there are no pods (neither launcher nor pods) running for the suspended MPIJob")
pods, err := k8sClient.CoreV1().Pods(mpiJob.Namespace).List(ctx, metav1.ListOptions{})
gomega.Expect(err).ToNot(gomega.HaveOccurred())
gomega.Expect(pods.Items).To(gomega.HaveLen(0))
mpiJob = resumeJob(ctx, mpiJob)
mpiJob = waitForCompletion(ctx, mpiJob)
expectConditionToBeTrue(mpiJob, kubeflow.JobSucceeded)
})
})
ginkgo.When("running with host network", func() {
ginkgo.BeforeEach(func() {
mpiJob.Spec.MPIReplicaSpecs[kubeflow.MPIReplicaTypeLauncher].Template.Spec.HostNetwork = true
mpiJob.Spec.MPIReplicaSpecs[kubeflow.MPIReplicaTypeWorker].Template.Spec.HostNetwork = true
// The test cluster has only one node.
// More than one pod cannot use the same host port for sshd.
mpiJob.Spec.MPIReplicaSpecs[kubeflow.MPIReplicaTypeLauncher].Template.Spec.Containers[0].Args = []string{"/home/mpiuser/pi"}
mpiJob.Spec.MPIReplicaSpecs[kubeflow.MPIReplicaTypeWorker].Replicas = ptr.To[int32](1)
})
ginkgo.It("should succeed", func() {
mpiJob := createJobAndWaitForCompletion(mpiJob)
expectConditionToBeTrue(mpiJob, kubeflow.JobSucceeded)
})
})
})
ginkgo.When("running as non-root", func() {
ginkgo.BeforeEach(func() {
mpiJob.Spec.SSHAuthMountPath = "/home/mpiuser/.ssh"
mpiJob.Spec.MPIReplicaSpecs[kubeflow.MPIReplicaTypeLauncher].Template.Spec.Containers[0].SecurityContext = &corev1.SecurityContext{
RunAsUser: ptr.To[int64](1000),
}
workerContainer := &mpiJob.Spec.MPIReplicaSpecs[kubeflow.MPIReplicaTypeWorker].Template.Spec.Containers[0]
workerContainer.SecurityContext = &corev1.SecurityContext{
RunAsUser: ptr.To[int64](1000),
Capabilities: &corev1.Capabilities{
Add: []corev1.Capability{"NET_BIND_SERVICE"},
},
}
workerContainer.Command = []string{"/usr/sbin/sshd"}
workerContainer.Args = []string{"-De", "-f", "/home/mpiuser/.sshd_config"}
})
ginkgo.It("should succeed", func() {
mpiJob := createJobAndWaitForCompletion(mpiJob)
expectConditionToBeTrue(mpiJob, kubeflow.JobSucceeded)
})
ginkgo.It("should not be updated when managed externaly, only created", func() {
mpiJob.Spec.RunPolicy.ManagedBy = ptr.To(kubeflow.MultiKueueController)
ctx := context.Background()
mpiJob = createJob(ctx, mpiJob)
time.Sleep(util.SleepDurationControllerSyncDelay)
mpiJob, err := mpiClient.KubeflowV2beta1().MPIJobs(mpiJob.Namespace).Get(ctx, mpiJob.Name, metav1.GetOptions{})
gomega.Expect(err).To(gomega.BeNil())
// job should be created, but status should not be updated neither for create nor for any other status
condition := getJobCondition(mpiJob, kubeflow.JobCreated)
gomega.Expect(condition).To(gomega.BeNil())
condition = getJobCondition(mpiJob, kubeflow.JobSucceeded)
gomega.Expect(condition).To(gomega.BeNil())
launcherJob, err := getLauncherJob(ctx, mpiJob)
gomega.Expect(err).To(gomega.BeNil())
gomega.Expect(launcherJob).To(gomega.BeNil())
launcherPods, err := getLauncherPods(ctx, mpiJob)
gomega.Expect(err).To(gomega.BeNil())
gomega.Expect(len(launcherPods.Items)).To(gomega.Equal(0))
workerPods, err := getWorkerPods(ctx, mpiJob)
gomega.Expect(err).To(gomega.BeNil())
gomega.Expect(len(workerPods.Items)).To(gomega.Equal(0))
secret, err := getSecretsForJob(ctx, mpiJob)
gomega.Expect(err).To(gomega.BeNil())
gomega.Expect(secret).To(gomega.BeNil())
})
ginkgo.It("should succeed when explicitly managed by mpi-operator", func() {
mpiJob.Spec.RunPolicy.ManagedBy = ptr.To(kubeflow.KubeflowJobController)
mpiJob := createJobAndWaitForCompletion(mpiJob)
expectConditionToBeTrue(mpiJob, kubeflow.JobSucceeded)
})
})
})
ginkgo.Context("with Intel Implementation", func() {
ginkgo.BeforeEach(func() {
mpiJob.Spec.MPIImplementation = kubeflow.MPIImplementationIntel
mpiJob.Spec.MPIReplicaSpecs[kubeflow.MPIReplicaTypeLauncher].Template.Spec.Containers = []corev1.Container{
{
Name: "launcher",
Image: intelMPIImage,
ImagePullPolicy: corev1.PullIfNotPresent, // use locally built image.
Command: []string{}, // uses entrypoint.
Args: []string{
"mpirun",
"-n",
"2",
"/home/mpiuser/pi",
},
},
}
mpiJob.Spec.MPIReplicaSpecs[kubeflow.MPIReplicaTypeWorker].Template.Spec.Containers = []corev1.Container{
{
Name: "worker",
Image: intelMPIImage,
ImagePullPolicy: corev1.PullIfNotPresent, // use locally built image.
Command: []string{}, // uses entrypoint.
Args: []string{
"/usr/sbin/sshd",
"-De",
},
ReadinessProbe: &corev1.Probe{
ProbeHandler: corev1.ProbeHandler{
TCPSocket: &corev1.TCPSocketAction{
Port: intstr.FromInt(2222),
},
},
InitialDelaySeconds: 3,
},
},
}
})
ginkgo.When("running as root", func() {
ginkgo.It("should succeed", func() {
mpiJob := createJobAndWaitForCompletion(mpiJob)
expectConditionToBeTrue(mpiJob, kubeflow.JobSucceeded)
})
})
ginkgo.When("running as non-root", func() {
ginkgo.BeforeEach(func() {
mpiJob.Spec.SSHAuthMountPath = "/home/mpiuser/.ssh"
mpiJob.Spec.MPIReplicaSpecs[kubeflow.MPIReplicaTypeLauncher].Template.Spec.Containers[0].SecurityContext = &corev1.SecurityContext{
RunAsUser: ptr.To[int64](1000),
}
workerContainer := &mpiJob.Spec.MPIReplicaSpecs[kubeflow.MPIReplicaTypeWorker].Template.Spec.Containers[0]
workerContainer.SecurityContext = &corev1.SecurityContext{
RunAsUser: ptr.To[int64](1000),
}
workerContainer.Args = append(workerContainer.Args, "-f", "/home/mpiuser/.sshd_config")
})
ginkgo.It("should succeed", func() {
mpiJob := createJobAndWaitForCompletion(mpiJob)
expectConditionToBeTrue(mpiJob, kubeflow.JobSucceeded)
})
})
})
ginkgo.Context("with MPICH Implementation", func() {
ginkgo.BeforeEach(func() {
mpiJob.Spec.MPIImplementation = kubeflow.MPIImplementationMPICH
mpiJob.Spec.MPIReplicaSpecs[kubeflow.MPIReplicaTypeLauncher].Template.Spec.Containers = []corev1.Container{
{
Name: "launcher",
Image: mpichImage,
ImagePullPolicy: corev1.PullIfNotPresent, // use locally built image.
Command: []string{}, // uses entrypoint.
Args: []string{
"mpirun",
"-n",
"2",
"/home/mpiuser/pi",
},
},
}
mpiJob.Spec.MPIReplicaSpecs[kubeflow.MPIReplicaTypeWorker].Template.Spec.Containers = []corev1.Container{
{
Name: "worker",
Image: mpichImage,
ImagePullPolicy: corev1.PullIfNotPresent, // use locally built image.
Command: []string{}, // uses entrypoint.
Args: []string{
"/usr/sbin/sshd",
"-De",
},
ReadinessProbe: &corev1.Probe{
ProbeHandler: corev1.ProbeHandler{
TCPSocket: &corev1.TCPSocketAction{
Port: intstr.FromInt(2222),
},
},
InitialDelaySeconds: 3,
},
},
}
})
ginkgo.When("running as root", func() {
ginkgo.It("should succeed", func() {
mpiJob := createJobAndWaitForCompletion(mpiJob)
expectConditionToBeTrue(mpiJob, kubeflow.JobSucceeded)
})
})
ginkgo.When("running as non-root", func() {
ginkgo.BeforeEach(func() {
mpiJob.Spec.SSHAuthMountPath = "/home/mpiuser/.ssh"
mpiJob.Spec.MPIReplicaSpecs[kubeflow.MPIReplicaTypeLauncher].Template.Spec.Containers[0].SecurityContext = &corev1.SecurityContext{
RunAsUser: ptr.To[int64](1000),
}
workerContainer := &mpiJob.Spec.MPIReplicaSpecs[kubeflow.MPIReplicaTypeWorker].Template.Spec.Containers[0]
workerContainer.SecurityContext = &corev1.SecurityContext{
RunAsUser: ptr.To[int64](1000),
}
workerContainer.Args = append(workerContainer.Args, "-f", "/home/mpiuser/.sshd_config")
})
ginkgo.It("should succeed", func() {
mpiJob := createJobAndWaitForCompletion(mpiJob)
expectConditionToBeTrue(mpiJob, kubeflow.JobSucceeded)
})
})
})
ginkgo.Context("with scheduler-plugins", func() {
const enableGangSchedulingFlag = "--gang-scheduling=scheduler-plugins-scheduler"
var (
ctx = context.Background()
unschedulableResources = &corev1.ResourceList{
corev1.ResourceCPU: resource.MustParse("100000"), // unschedulable
corev1.ResourceMemory: resource.MustParse("100000Gi"), // unschedulable
}
)
ginkgo.BeforeEach(func() {
// Set up the scheduler-plugins.
setUpSchedulerPlugins()
// Set up the mpi-operator so that the scheduler-plugins is used as gang-scheduler.
setupMPIOperator(ctx, mpiJob, enableGangSchedulingFlag, unschedulableResources)
})
ginkgo.AfterEach(func() {
operator, err := k8sClient.AppsV1().Deployments(mpiOperator).Get(ctx, mpiOperator, metav1.GetOptions{})
oldOperator := operator.DeepCopy()
gomega.Expect(err).Should(gomega.Succeed())
for i, arg := range operator.Spec.Template.Spec.Containers[0].Args {
if arg == enableGangSchedulingFlag {
operator.Spec.Template.Spec.Containers[0].Args = append(
operator.Spec.Template.Spec.Containers[0].Args[:i], operator.Spec.Template.Spec.Containers[0].Args[i+1:]...)
break
}
}
if diff := cmp.Diff(oldOperator, operator); len(diff) != 0 {
_, err = k8sClient.AppsV1().Deployments(mpiOperator).Update(ctx, operator, metav1.UpdateOptions{})
gomega.Expect(err).Should(gomega.Succeed())
gomega.Eventually(func() bool {
ok, err := ensureDeploymentAvailableReplicas(ctx, mpiOperator, mpiOperator)
gomega.Expect(err).Should(gomega.Succeed())
return ok
}, foreverTimeout, waitInterval).Should(gomega.BeTrue())
}
// Clean up the scheduler-plugins.
cleanUpSchedulerPlugins()
})
ginkgo.It("should create pending pods", func() {
ginkgo.By("Creating MPIJob")
mpiJob := createJob(ctx, mpiJob)
var jobCondition *kubeflow.JobCondition
gomega.Eventually(func() *kubeflow.JobCondition {
updatedMPIJob, err := mpiClient.KubeflowV2beta1().MPIJobs(mpiJob.Namespace).Get(ctx, mpiJob.Name, metav1.GetOptions{})
gomega.Expect(err).Should(gomega.Succeed())
jobCondition = getJobCondition(updatedMPIJob, kubeflow.JobCreated)
return jobCondition
}, foreverTimeout, waitInterval).ShouldNot(gomega.BeNil())
gomega.Expect(jobCondition.Status).To(gomega.Equal(corev1.ConditionTrue))
ginkgo.By("Waiting for Pods to created")
var pods *corev1.PodList
gomega.Eventually(func() error {
var err error
pods, err = k8sClient.CoreV1().Pods(mpiJob.Namespace).List(ctx, metav1.ListOptions{
LabelSelector: labels.FormatLabels(map[string]string{
schedv1alpha1.PodGroupLabel: mpiJob.Name,
}),
})
return err
}, foreverTimeout, waitInterval).Should(gomega.BeNil())
for _, pod := range pods.Items {
gomega.Expect(pod.Status.Phase).Should(gomega.Equal(corev1.PodPending))
}
pg, err := schedClient.SchedulingV1alpha1().PodGroups(mpiJob.Namespace).Get(ctx, mpiJob.Name, metav1.GetOptions{})
gomega.Expect(err).NotTo(gomega.HaveOccurred())
gomega.Expect(pg.Spec.MinResources.Cpu().String()).Should(gomega.BeComparableTo(unschedulableResources.Cpu().String()))
gomega.Expect(pg.Spec.MinResources.Memory().String()).Should(gomega.BeComparableTo(unschedulableResources.Memory().String()))
ginkgo.By("Updating MPIJob with schedulable schedulingPolicies")
gomega.Eventually(func() error {
updatedJob, err := mpiClient.KubeflowV2beta1().MPIJobs(mpiJob.Namespace).Get(ctx, mpiJob.Name, metav1.GetOptions{})
gomega.Expect(err).Should(gomega.Succeed())
updatedJob.Spec.RunPolicy.SchedulingPolicy.MinResources = nil
_, err = mpiClient.KubeflowV2beta1().MPIJobs(updatedJob.Namespace).Update(ctx, updatedJob, metav1.UpdateOptions{})
return err
}, foreverTimeout, waitInterval).Should(gomega.BeNil())
ginkgo.By("Waiting for MPIJob to running")
gomega.Eventually(func() corev1.ConditionStatus {
updatedJob, err := mpiClient.KubeflowV2beta1().MPIJobs(mpiJob.Namespace).Get(ctx, mpiJob.Name, metav1.GetOptions{})
gomega.Expect(err).Should(gomega.Succeed())
cond := getJobCondition(updatedJob, kubeflow.JobRunning)
if cond == nil {
return corev1.ConditionFalse
}
return cond.Status
}, foreverTimeout, waitInterval).Should(gomega.Equal(corev1.ConditionTrue))
})
})
// volcano e2e tests
ginkgo.Context("with volcano-scheduler", func() {
const enableGangSchedulingFlag = "--gang-scheduling=volcano"
var (
ctx = context.Background()
unschedulableResources = &corev1.ResourceList{
corev1.ResourceCPU: resource.MustParse("100000"), // unschedulable
corev1.ResourceMemory: resource.MustParse("100000Gi"), // unschedulable
}
)
ginkgo.BeforeEach(func() {
// Set up the volcano-scheduler.
setupVolcanoScheduler()
// Set up the mpi-operator so that the volcano scheduler is used as gang-scheduler.
setupMPIOperator(ctx, mpiJob, enableGangSchedulingFlag, unschedulableResources)
})
ginkgo.AfterEach(func() {
operator, err := k8sClient.AppsV1().Deployments(mpiOperator).Get(ctx, mpiOperator, metav1.GetOptions{})
oldOperator := operator.DeepCopy()
gomega.Expect(err).Should(gomega.Succeed())
// disable gang-scheduler in operator
for i, arg := range operator.Spec.Template.Spec.Containers[0].Args {
if arg == enableGangSchedulingFlag {
operator.Spec.Template.Spec.Containers[0].Args = append(
operator.Spec.Template.Spec.Containers[0].Args[:i], operator.Spec.Template.Spec.Containers[0].Args[i+1:]...)
break
}
}
if diff := cmp.Diff(oldOperator, operator); len(diff) != 0 {
_, err = k8sClient.AppsV1().Deployments(mpiOperator).Update(ctx, operator, metav1.UpdateOptions{})
gomega.Expect(err).Should(gomega.Succeed())
gomega.Eventually(func() bool {
ok, err := ensureDeploymentAvailableReplicas(ctx, mpiOperator, mpiOperator)
gomega.Expect(err).Should(gomega.Succeed())
return ok
}, foreverTimeout, waitInterval).Should(gomega.BeTrue())
}
// Clean up the volcano.
cleanUpVolcanoScheduler()
})
ginkgo.It("should create pending pods", func() {
ginkgo.By("Creating MPIJob")
mpiJob := createJob(ctx, mpiJob)
var jobCondition *kubeflow.JobCondition
gomega.Eventually(func() *kubeflow.JobCondition {
updatedMPIJob, err := mpiClient.KubeflowV2beta1().MPIJobs(mpiJob.Namespace).Get(ctx, mpiJob.Name, metav1.GetOptions{})
gomega.Expect(err).Should(gomega.Succeed())
jobCondition = getJobCondition(updatedMPIJob, kubeflow.JobCreated)
return jobCondition
}, foreverTimeout, waitInterval).ShouldNot(gomega.BeNil())
gomega.Expect(jobCondition.Status).To(gomega.Equal(corev1.ConditionTrue))
ginkgo.By("Waiting for Pods to created")
var pods *corev1.PodList
gomega.Eventually(func() error {
var err error
pods, err = k8sClient.CoreV1().Pods(mpiJob.Namespace).List(ctx, metav1.ListOptions{
LabelSelector: labels.FormatLabels(map[string]string{
kubeflow.JobNameLabel: mpiJob.Name,
}),
})
return err
}, foreverTimeout, waitInterval).Should(gomega.BeNil())
for _, pod := range pods.Items {
gomega.Expect(pod.Status.Phase).Should(gomega.Equal(corev1.PodPending))
}
pg, err := volcanoClient.SchedulingV1beta1().PodGroups(mpiJob.Namespace).Get(ctx, mpiJob.Name, metav1.GetOptions{})
gomega.Expect(err).NotTo(gomega.HaveOccurred())
gomega.Expect(pg.Spec.MinResources.Cpu().String()).Should(gomega.BeComparableTo(unschedulableResources.Cpu().String()))
gomega.Expect(pg.Spec.MinResources.Memory().String()).Should(gomega.BeComparableTo(unschedulableResources.Memory().String()))
ginkgo.By("Updating MPIJob with schedulable schedulingPolicies")
gomega.Eventually(func() error {
updatedJob, err := mpiClient.KubeflowV2beta1().MPIJobs(mpiJob.Namespace).Get(ctx, mpiJob.Name, metav1.GetOptions{})
gomega.Expect(err).Should(gomega.Succeed())
updatedJob.Spec.RunPolicy.SchedulingPolicy.MinResources = nil
_, err = mpiClient.KubeflowV2beta1().MPIJobs(updatedJob.Namespace).Update(ctx, updatedJob, metav1.UpdateOptions{})
return err
}, foreverTimeout, waitInterval).Should(gomega.BeNil())
ginkgo.By("Waiting for MPIJob to running")
gomega.Eventually(func() corev1.ConditionStatus {
updatedJob, err := mpiClient.KubeflowV2beta1().MPIJobs(mpiJob.Namespace).Get(ctx, mpiJob.Name, metav1.GetOptions{})
gomega.Expect(err).Should(gomega.Succeed())
cond := getJobCondition(updatedJob, kubeflow.JobRunning)
if cond == nil {
return corev1.ConditionFalse
}
return cond.Status
}, foreverTimeout, waitInterval).Should(gomega.Equal(corev1.ConditionTrue))
})
})
})
func resumeJob(ctx context.Context, mpiJob *kubeflow.MPIJob) *kubeflow.MPIJob {
mpiJob.Spec.RunPolicy.Suspend = ptr.To(false)
ginkgo.By("Resuming MPIJob")
mpiJob, err := mpiClient.KubeflowV2beta1().MPIJobs(mpiJob.Namespace).Update(ctx, mpiJob, metav1.UpdateOptions{})
gomega.Expect(err).ToNot(gomega.HaveOccurred())
return mpiJob
}
func createJobAndWaitForCompletion(mpiJob *kubeflow.MPIJob) *kubeflow.MPIJob {
ctx := context.Background()
mpiJob = createJob(ctx, mpiJob)
return waitForCompletion(ctx, mpiJob)
}
func createJob(ctx context.Context, mpiJob *kubeflow.MPIJob) *kubeflow.MPIJob {
ginkgo.By("Creating MPIJob")
mpiJob, err := mpiClient.KubeflowV2beta1().MPIJobs(mpiJob.Namespace).Create(ctx, mpiJob, metav1.CreateOptions{})
gomega.Expect(err).ToNot(gomega.HaveOccurred())
return mpiJob
}
func waitForCompletion(ctx context.Context, mpiJob *kubeflow.MPIJob) *kubeflow.MPIJob {
var err error
ginkgo.By("Waiting for MPIJob to finish")
err = wait.PollUntilContextTimeout(ctx, waitInterval, foreverTimeout, false, func(ctx context.Context) (bool, error) {
updatedJob, err := mpiClient.KubeflowV2beta1().MPIJobs(mpiJob.Namespace).Get(ctx, mpiJob.Name, metav1.GetOptions{})
if err != nil {
return false, err
}
mpiJob = updatedJob
return mpiJob.Status.CompletionTime != nil, nil
})
if err != nil {
err = debugJob(ctx, mpiJob)
if err != nil {
_, err = fmt.Fprintf(ginkgo.GinkgoWriter, "Failed to debug job: %v\n", err)
gomega.Expect(err).ToNot(gomega.HaveOccurred())
}
}
gomega.Expect(err).ToNot(gomega.HaveOccurred())
return mpiJob
}
func getLauncherPods(ctx context.Context, mpiJob *kubeflow.MPIJob) (*corev1.PodList, error) {
selector := metav1.LabelSelector{
MatchLabels: map[string]string{
kubeflow.OperatorNameLabel: kubeflow.OperatorName,
kubeflow.JobNameLabel: mpiJob.Name,
kubeflow.JobRoleLabel: "launcher",
},
}
launcherPods, err := k8sClient.CoreV1().Pods(mpiJob.Namespace).List(ctx, metav1.ListOptions{
LabelSelector: metav1.FormatLabelSelector(&selector),
})
if err != nil {
return &corev1.PodList{}, fmt.Errorf("getting launcher Pods: %w", err)
}
return launcherPods, nil
}
func getWorkerPods(ctx context.Context, mpiJob *kubeflow.MPIJob) (*corev1.PodList, error) {
selector := metav1.LabelSelector{
MatchLabels: map[string]string{
kubeflow.OperatorNameLabel: kubeflow.OperatorName,
kubeflow.JobNameLabel: mpiJob.Name,
kubeflow.JobRoleLabel: "worker",
},
}
workerPods, err := k8sClient.CoreV1().Pods(mpiJob.Namespace).List(ctx, metav1.ListOptions{
LabelSelector: metav1.FormatLabelSelector(&selector),
})
if err != nil {
return &corev1.PodList{}, fmt.Errorf("getting worker Pods: %w", err)
}
return workerPods, nil
}
func getSecretsForJob(ctx context.Context, mpiJob *kubeflow.MPIJob) (*corev1.Secret, error) {
result, err := k8sClient.CoreV1().Secrets(mpiJob.Namespace).List(ctx, metav1.ListOptions{})
if err != nil {
return nil, err
}
for _, obj := range result.Items {
if metav1.IsControlledBy(&obj, mpiJob) {
return &obj, nil
}
}
return nil, nil
}
func debugJob(ctx context.Context, mpiJob *kubeflow.MPIJob) error {
launcherPods, err := getLauncherPods(ctx, mpiJob)
if err != nil {
return err
}
if len(launcherPods.Items) == 0 {
return fmt.Errorf("no launcher Pods found")
}
lastPod := launcherPods.Items[0]
for _, p := range launcherPods.Items[1:] {
if p.CreationTimestamp.After(p.CreationTimestamp.Time) {
lastPod = p
}
}
err = podLogs(ctx, &lastPod)
if err != nil {
return fmt.Errorf("obtaining launcher logs: %w", err)
}
workerPods, err := getWorkerPods(ctx, mpiJob)
if err != nil {
return fmt.Errorf("getting worker Pods: %w", err)
}
for _, p := range workerPods.Items {
err = podLogs(ctx, &p)
if err != nil {
return fmt.Errorf("obtaining worker logs: %w", err)
}
}
return nil
}
func podLogs(ctx context.Context, p *corev1.Pod) (err error) {
req := k8sClient.CoreV1().Pods(p.Namespace).GetLogs(p.Name, &corev1.PodLogOptions{})
stream, streamErr := req.Stream(ctx)
if streamErr != nil {
err = fmt.Errorf("reading logs: %v", streamErr)
return
}
defer func() {
err = stream.Close()
}()
if _, err = fmt.Fprintf(ginkgo.GinkgoWriter, "== BEGIN %s pod logs ==\n", p.Name); err != nil {
return
}
_, err = io.Copy(ginkgo.GinkgoWriter, stream)
if err != nil {
err = fmt.Errorf("writing logs: %v", err)
return
}
if _, err = fmt.Fprintf(ginkgo.GinkgoWriter, "\n== END %s pod logs ==\n", p.Name); err != nil {
return
}
return
}
func expectConditionToBeTrue(mpiJob *kubeflow.MPIJob, condType kubeflow.JobConditionType) {
condition := getJobCondition(mpiJob, condType)
gomega.Expect(condition).ToNot(gomega.BeNil())
gomega.Expect(condition.Status).To(gomega.Equal(corev1.ConditionTrue))
}
func getJobCondition(mpiJob *kubeflow.MPIJob, condType kubeflow.JobConditionType) *kubeflow.JobCondition {
for _, cond := range mpiJob.Status.Conditions {
if cond.Type == condType {
return &cond
}
}
return nil
}
func getLauncherJob(ctx context.Context, mpiJob *kubeflow.MPIJob) (*batchv1.Job, error) {
result, err := k8sClient.BatchV1().Jobs(mpiJob.Namespace).List(ctx, metav1.ListOptions{})
if err != nil {
return nil, err
}
for _, j := range result.Items {
if metav1.IsControlledBy(&j, mpiJob) {
return &j, nil
}
}
return nil, nil
}
func createMPIJobWithOpenMPI(mpiJob *kubeflow.MPIJob) {
mpiJob.Spec.MPIReplicaSpecs[kubeflow.MPIReplicaTypeLauncher].Template.Spec.Containers = []corev1.Container{
{
Name: "launcher",
Image: openMPIImage,
ImagePullPolicy: corev1.PullIfNotPresent, // use locally built image.
Command: []string{"mpirun"},
Args: []string{
"-n",
"2",
"/home/mpiuser/pi",
},
},
}
mpiJob.Spec.MPIReplicaSpecs[kubeflow.MPIReplicaTypeWorker].Template.Spec.Containers = []corev1.Container{
{
Name: "worker",
Image: openMPIImage,
ImagePullPolicy: corev1.PullIfNotPresent, // use locally built image.
},
}
}
func setUpSchedulerPlugins() {
if !useExistingSchedulerPlugins {
ginkgo.By("Installing scheduler-plugins")
err := installSchedulerPlugins()
gomega.Expect(err).Should(gomega.Succeed())
}
}
func cleanUpSchedulerPlugins() {
if !useExistingSchedulerPlugins {
ginkgo.By("Uninstalling scheduler-plugins")
err := runCommand(helmPath, "uninstall", schedulerPlugins, "--namespace", schedulerPlugins)
gomega.Expect(err).Should(gomega.Succeed())
err = runCommand(kubectlPath, "delete", "namespace", schedulerPlugins)
gomega.Expect(err).Should(gomega.Succeed())
}
}
func setupVolcanoScheduler() {
if !useExistingVolcanoScheduler {
ginkgo.By("Installing volcano-scheduler")
err := installVolcanoScheduler()
gomega.Expect(err).Should(gomega.Succeed())
}
}
func cleanUpVolcanoScheduler() {
if !useExistingVolcanoScheduler {
ginkgo.By("Uninstalling volcano-scheduler")
err := runCommand(kubectlPath, "delete", "-f", volcanoSchedulerManifestPath)
gomega.Expect(err).Should(gomega.Succeed())
}
}
// setupMPIOperator scales down and scales up the MPIOperator replication so that set up gang-scheduler takes effect
func setupMPIOperator(ctx context.Context, mpiJob *kubeflow.MPIJob, enableGangSchedulingFlag string, unschedulableResources *corev1.ResourceList) {
ginkgo.By("Scale-In the deployment to 0")
operator, err := k8sClient.AppsV1().Deployments(mpiOperator).Get(ctx, mpiOperator, metav1.GetOptions{})
gomega.Expect(err).Should(gomega.Succeed())
operator.Spec.Replicas = ptr.To[int32](0)
_, err = k8sClient.AppsV1().Deployments(mpiOperator).Update(ctx, operator, metav1.UpdateOptions{})
gomega.Expect(err).Should(gomega.Succeed())
gomega.Eventually(func() bool {
isNotZero, err := ensureDeploymentAvailableReplicas(ctx, mpiOperator, mpiOperator)
gomega.Expect(err).Should(gomega.Succeed())
return isNotZero
}, foreverTimeout, waitInterval).Should(gomega.BeFalse())
ginkgo.By("Update the replicas and args")
gomega.Eventually(func() error {
updatedOperator, err := k8sClient.AppsV1().Deployments(mpiOperator).Get(ctx, mpiOperator, metav1.GetOptions{})
gomega.Expect(err).Should(gomega.Succeed())
updatedOperator.Spec.Template.Spec.Containers[0].Args = append(updatedOperator.Spec.Template.Spec.Containers[0].Args, enableGangSchedulingFlag)
updatedOperator.Spec.Replicas = ptr.To[int32](1)
_, err = k8sClient.AppsV1().Deployments(mpiOperator).Update(ctx, updatedOperator, metav1.UpdateOptions{})
return err
}, foreverTimeout, waitInterval).Should(gomega.BeNil())
ginkgo.By("Should be replicas is 1")
gomega.Eventually(func() bool {
isNotZero, err := ensureDeploymentAvailableReplicas(ctx, mpiOperator, mpiOperator)
gomega.Expect(err).Should(gomega.Succeed())
return isNotZero
}, foreverTimeout, waitInterval).Should(gomega.BeTrue())
createMPIJobWithOpenMPI(mpiJob)
mpiJob.Spec.RunPolicy.SchedulingPolicy = &kubeflow.SchedulingPolicy{MinResources: unschedulableResources}
}