mpi-operator/test/e2e/e2e_suite_test.go

262 lines
9.9 KiB
Go

// Copyright 2021 The Kubeflow Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package e2e
import (
"context"
"fmt"
"os"
"os/exec"
"testing"
"time"
"github.com/onsi/ginkgo"
"github.com/onsi/gomega"
rbacv1 "k8s.io/api/rbac/v1"
storagev1 "k8s.io/api/storage/v1"
"k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/kubernetes"
controllerruntime "sigs.k8s.io/controller-runtime"
schedclientset "sigs.k8s.io/scheduler-plugins/pkg/generated/clientset/versioned"
volcanoclient "volcano.sh/apis/pkg/client/clientset/versioned"
clientset "github.com/kubeflow/mpi-operator/pkg/client/clientset/versioned"
)
const (
envUseExistingCluster = "USE_EXISTING_CLUSTER"
envUseExistingOperator = "USE_EXISTING_OPERATOR"
envTestMPIOperatorImage = "TEST_MPI_OPERATOR_IMAGE"
envTestOpenMPIImage = "TEST_OPENMPI_IMAGE"
envTestIntelMPIImage = "TEST_INTELMPI_IMAGE"
envTestMPICHImage = "TEST_MPICH_IMAGE"
envTestKindImage = "TEST_KIND_IMAGE"
envSchedulerPluginsVersion = "SCHEDULER_PLUGINS_VERSION"
envVolcanoSchedulerVersion = "VOLCANO_SCHEDULER_VERSION"
defaultMPIOperatorImage = "mpioperator/mpi-operator:local"
defaultKindImage = "kindest/node:v1.33.4"
defaultOpenMPIImage = "mpioperator/mpi-pi:openmpi"
defaultIntelMPIImage = "mpioperator/mpi-pi:intel"
defaultMPICHImage = "mpioperator/mpi-pi:mpich"
rootPath = "../.."
kubectlPath = rootPath + "/bin/kubectl"
kindPath = rootPath + "/bin/kind"
helmPath = rootPath + "/bin/helm"
operatorManifestsPath = rootPath + "/manifests/overlays/dev"
schedulerPluginsManifestPath = rootPath + "/dep-manifests/scheduler-plugins/"
volcanoSchedulerManifestPath = rootPath + "/dep-manifests/volcano-scheduler/" // all in one yaml of volcano-development.yaml
envUseExistingSchedulerPlugins = "USE_EXISTING_SCHEDULER_PLUGINS"
envUseExistingVolcanoScheduler = "USE_EXISTING_VOLCANO_SCHEDULER"
defaultSchedulerPluginsVersion = "v0.32.7"
defaultVolcanoSchedulerVersion = "v1.12.2"
mpiOperator = "mpi-operator"
schedulerPlugins = "scheduler-plugins"
waitInterval = 500 * time.Millisecond
foreverTimeout = 200 * time.Second
)
var (
useExistingCluster bool
useExistingOperator bool
useExistingSchedulerPlugins bool
useExistingVolcanoScheduler bool
mpiOperatorImage string
openMPIImage string
intelMPIImage string
mpichImage string
kindImage string
schedulerPluginsVersion string
volcanoSchedulerVersion string
k8sClient kubernetes.Interface
mpiClient clientset.Interface
schedClient schedclientset.Interface
volcanoClient volcanoclient.Interface
)
func init() {
useExistingCluster = getEnvDefault(envUseExistingCluster, "false") == "true"
useExistingOperator = getEnvDefault(envUseExistingOperator, "false") == "true"
useExistingSchedulerPlugins = getEnvDefault(envUseExistingSchedulerPlugins, "false") == "true"
useExistingVolcanoScheduler = getEnvDefault(envUseExistingVolcanoScheduler, "false") == "true"
mpiOperatorImage = getEnvDefault(envTestMPIOperatorImage, defaultMPIOperatorImage)
openMPIImage = getEnvDefault(envTestOpenMPIImage, defaultOpenMPIImage)
intelMPIImage = getEnvDefault(envTestIntelMPIImage, defaultIntelMPIImage)
mpichImage = getEnvDefault(envTestMPICHImage, defaultMPICHImage)
kindImage = getEnvDefault(envTestKindImage, defaultKindImage)
schedulerPluginsVersion = getEnvDefault(envSchedulerPluginsVersion, defaultSchedulerPluginsVersion)
volcanoSchedulerVersion = getEnvDefault(envVolcanoSchedulerVersion, defaultVolcanoSchedulerVersion)
}
func TestE2E(t *testing.T) {
gomega.RegisterFailHandler(ginkgo.Fail)
ginkgo.RunSpecs(t, "E2e Suite")
}
var _ = ginkgo.SynchronizedBeforeSuite(func() []byte {
if !useExistingCluster {
ginkgo.By("Creating a local cluster")
err := bootstrapKindCluster()
gomega.Expect(err).ToNot(gomega.HaveOccurred())
}
ginkgo.By("Obtaining clients")
restConfig, err := controllerruntime.GetConfig()
gomega.Expect(err).ToNot(gomega.HaveOccurred())
k8sClient, err = kubernetes.NewForConfig(restConfig)
gomega.Expect(err).ToNot(gomega.HaveOccurred())
mpiClient, err = clientset.NewForConfig(restConfig)
gomega.Expect(err).ToNot(gomega.HaveOccurred())
schedClient, err = schedclientset.NewForConfig(restConfig)
gomega.Expect(err).ToNot(gomega.HaveOccurred())
volcanoClient, err = volcanoclient.NewForConfig(restConfig)
gomega.Expect(err).NotTo(gomega.HaveOccurred())
if !useExistingOperator {
ginkgo.By("Installing operator")
err = installOperator()
gomega.Expect(err).ToNot(gomega.HaveOccurred())
}
return nil
}, func([]byte) {})
var _ = ginkgo.SynchronizedAfterSuite(func() {
if !useExistingCluster {
ginkgo.By("Deleting local cluster")
err := runCommand(kindPath, "delete", "cluster")
gomega.Expect(err).ToNot(gomega.HaveOccurred())
} else if !useExistingOperator {
ginkgo.By("Uninstalling operator")
err := runCommand(kubectlPath, "delete", "-k", operatorManifestsPath)
gomega.Expect(err).ToNot(gomega.HaveOccurred())
}
}, func() {})
func getEnvDefault(key, defaultVal string) string {
v, ok := os.LookupEnv(key)
if ok {
return v
}
return defaultVal
}
func bootstrapKindCluster() error {
err := runCommand(kindPath, "create", "cluster", "--image", kindImage)
if err != nil {
return fmt.Errorf("creating kind cluster: %w", err)
}
err = runCommand(kindPath, "load", "docker-image", mpiOperatorImage, openMPIImage, intelMPIImage, mpichImage)
if err != nil {
return fmt.Errorf("loading container images: %w", err)
}
return nil
}
func installOperator() error {
err := runCommand(kubectlPath, "apply", "--server-side", "-k", operatorManifestsPath)
if err != nil {
return fmt.Errorf("applying operator YAMLs: %w", err)
}
return wait.PollUntilContextTimeout(context.Background(), waitInterval, foreverTimeout, false, func(ctx context.Context) (bool, error) {
return ensureDeploymentAvailableReplicas(ctx, mpiOperator, mpiOperator)
})
}
func installSchedulerPlugins() error {
// TODO: Remove flags to overwrite images once the scheduler-plugins with the appropriate helm charts is released.
// In the specific scheudler-plugins version such as v0.26.7, manifests are incorrect.
// So we overwrite images.
overwriteControllerImage := fmt.Sprintf("controller.image=registry.k8s.io/scheduler-plugins/controller:%s", schedulerPluginsVersion)
overwriteSchedulerImage := fmt.Sprintf("scheduler.image=registry.k8s.io/scheduler-plugins/kube-scheduler:%s", schedulerPluginsVersion)
err := runCommand(helmPath, "install", "scheduler-plugins", schedulerPluginsManifestPath, "--create-namespace", "--namespace", schedulerPlugins,
"--set", overwriteSchedulerImage, "--set", overwriteControllerImage)
if err != nil {
return fmt.Errorf("installing scheduler-plugins Helm Chart: %w", err)
}
// The following ClusterRole patch is workaround for the https://github.com/kubernetes-sigs/scheduler-plugins/commit/2aaf10fb0f6f657f21429a864268fa1ec0a3c29a.
// TODO: Once the new scheduler-plugins version is released, we should remove the following workaround.
cr, err := k8sClient.RbacV1().ClusterRoles().Get(context.Background(), "scheduler-plugins-scheduler", metav1.GetOptions{})
if err != nil {
return err
}
cr.Rules = append(cr.Rules, rbacv1.PolicyRule{
APIGroups: []string{storagev1.GroupName},
Resources: []string{"volumeattachments"},
Verbs: []string{"get", "list", "watch"},
})
if _, err = k8sClient.RbacV1().ClusterRoles().Update(context.Background(), cr, metav1.UpdateOptions{}); err != nil {
return err
}
return wait.PollUntilContextTimeout(context.Background(), waitInterval, foreverTimeout, false, func(ctx context.Context) (bool, error) {
controllerName := fmt.Sprintf("%s-controller", schedulerPlugins)
if ok, err := ensureDeploymentAvailableReplicas(ctx, schedulerPlugins, controllerName); !ok || err != nil {
return false, err
}
schedulerName := fmt.Sprintf("%s-scheduler", schedulerPlugins)
if ok, err := ensureDeploymentAvailableReplicas(ctx, schedulerPlugins, schedulerName); !ok || err != nil {
return false, err
}
return true, nil
})
}
func installVolcanoScheduler() error {
err := runCommand(kubectlPath, "apply", "-f", volcanoSchedulerManifestPath)
if err != nil {
return fmt.Errorf("failed to install volcano scheduler : %w", err)
}
volcanoNamespace := "volcano-system"
return wait.PollUntilContextTimeout(context.Background(), waitInterval, foreverTimeout, false, func(ctx context.Context) (bool, error) {
if ok, err := ensureDeploymentAvailableReplicas(ctx, volcanoNamespace, "volcano-scheduler"); !ok || err != nil {
return false, err
}
if ok, err := ensureDeploymentAvailableReplicas(ctx, volcanoNamespace, "volcano-controllers"); !ok || err != nil {
return false, err
}
if ok, err := ensureDeploymentAvailableReplicas(ctx, volcanoNamespace, "volcano-admission"); !ok || err != nil {
return false, err
}
return true, nil
})
}
func runCommand(name string, args ...string) error {
cmd := exec.Command(name, args...)
cmd.Stderr = os.Stderr
cmd.Stdout = os.Stdout
return cmd.Run()
}
func ensureDeploymentAvailableReplicas(ctx context.Context, namespace, name string) (bool, error) {
deployment, err := k8sClient.AppsV1().Deployments(namespace).Get(ctx, name, metav1.GetOptions{})
if errors.IsNotFound(err) {
return false, nil
}
if err != nil {
return false, err
}
return deployment.Status.AvailableReplicas != 0, nil
}