// Copyright 2021 The Kubeflow Authors. // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. package e2e import ( "context" "fmt" "os" "os/exec" "testing" "time" "github.com/onsi/ginkgo" "github.com/onsi/gomega" rbacv1 "k8s.io/api/rbac/v1" storagev1 "k8s.io/api/storage/v1" "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/util/wait" "k8s.io/client-go/kubernetes" controllerruntime "sigs.k8s.io/controller-runtime" schedclientset "sigs.k8s.io/scheduler-plugins/pkg/generated/clientset/versioned" volcanoclient "volcano.sh/apis/pkg/client/clientset/versioned" clientset "github.com/kubeflow/mpi-operator/pkg/client/clientset/versioned" ) const ( envUseExistingCluster = "USE_EXISTING_CLUSTER" envUseExistingOperator = "USE_EXISTING_OPERATOR" envTestMPIOperatorImage = "TEST_MPI_OPERATOR_IMAGE" envTestOpenMPIImage = "TEST_OPENMPI_IMAGE" envTestIntelMPIImage = "TEST_INTELMPI_IMAGE" envTestMPICHImage = "TEST_MPICH_IMAGE" envTestKindImage = "TEST_KIND_IMAGE" envSchedulerPluginsVersion = "SCHEDULER_PLUGINS_VERSION" envVolcanoSchedulerVersion = "VOLCANO_SCHEDULER_VERSION" defaultMPIOperatorImage = "mpioperator/mpi-operator:local" defaultKindImage = "kindest/node:v1.33.4" defaultOpenMPIImage = "mpioperator/mpi-pi:openmpi" defaultIntelMPIImage = "mpioperator/mpi-pi:intel" defaultMPICHImage = "mpioperator/mpi-pi:mpich" rootPath = "../.." kubectlPath = rootPath + "/bin/kubectl" kindPath = rootPath + "/bin/kind" helmPath = rootPath + "/bin/helm" operatorManifestsPath = rootPath + "/manifests/overlays/dev" schedulerPluginsManifestPath = rootPath + "/dep-manifests/scheduler-plugins/" volcanoSchedulerManifestPath = rootPath + "/dep-manifests/volcano-scheduler/" // all in one yaml of volcano-development.yaml envUseExistingSchedulerPlugins = "USE_EXISTING_SCHEDULER_PLUGINS" envUseExistingVolcanoScheduler = "USE_EXISTING_VOLCANO_SCHEDULER" defaultSchedulerPluginsVersion = "v0.32.7" defaultVolcanoSchedulerVersion = "v1.12.2" mpiOperator = "mpi-operator" schedulerPlugins = "scheduler-plugins" waitInterval = 500 * time.Millisecond foreverTimeout = 200 * time.Second ) var ( useExistingCluster bool useExistingOperator bool useExistingSchedulerPlugins bool useExistingVolcanoScheduler bool mpiOperatorImage string openMPIImage string intelMPIImage string mpichImage string kindImage string schedulerPluginsVersion string volcanoSchedulerVersion string k8sClient kubernetes.Interface mpiClient clientset.Interface schedClient schedclientset.Interface volcanoClient volcanoclient.Interface ) func init() { useExistingCluster = getEnvDefault(envUseExistingCluster, "false") == "true" useExistingOperator = getEnvDefault(envUseExistingOperator, "false") == "true" useExistingSchedulerPlugins = getEnvDefault(envUseExistingSchedulerPlugins, "false") == "true" useExistingVolcanoScheduler = getEnvDefault(envUseExistingVolcanoScheduler, "false") == "true" mpiOperatorImage = getEnvDefault(envTestMPIOperatorImage, defaultMPIOperatorImage) openMPIImage = getEnvDefault(envTestOpenMPIImage, defaultOpenMPIImage) intelMPIImage = getEnvDefault(envTestIntelMPIImage, defaultIntelMPIImage) mpichImage = getEnvDefault(envTestMPICHImage, defaultMPICHImage) kindImage = getEnvDefault(envTestKindImage, defaultKindImage) schedulerPluginsVersion = getEnvDefault(envSchedulerPluginsVersion, defaultSchedulerPluginsVersion) volcanoSchedulerVersion = getEnvDefault(envVolcanoSchedulerVersion, defaultVolcanoSchedulerVersion) } func TestE2E(t *testing.T) { gomega.RegisterFailHandler(ginkgo.Fail) ginkgo.RunSpecs(t, "E2e Suite") } var _ = ginkgo.SynchronizedBeforeSuite(func() []byte { if !useExistingCluster { ginkgo.By("Creating a local cluster") err := bootstrapKindCluster() gomega.Expect(err).ToNot(gomega.HaveOccurred()) } ginkgo.By("Obtaining clients") restConfig, err := controllerruntime.GetConfig() gomega.Expect(err).ToNot(gomega.HaveOccurred()) k8sClient, err = kubernetes.NewForConfig(restConfig) gomega.Expect(err).ToNot(gomega.HaveOccurred()) mpiClient, err = clientset.NewForConfig(restConfig) gomega.Expect(err).ToNot(gomega.HaveOccurred()) schedClient, err = schedclientset.NewForConfig(restConfig) gomega.Expect(err).ToNot(gomega.HaveOccurred()) volcanoClient, err = volcanoclient.NewForConfig(restConfig) gomega.Expect(err).NotTo(gomega.HaveOccurred()) if !useExistingOperator { ginkgo.By("Installing operator") err = installOperator() gomega.Expect(err).ToNot(gomega.HaveOccurred()) } return nil }, func([]byte) {}) var _ = ginkgo.SynchronizedAfterSuite(func() { if !useExistingCluster { ginkgo.By("Deleting local cluster") err := runCommand(kindPath, "delete", "cluster") gomega.Expect(err).ToNot(gomega.HaveOccurred()) } else if !useExistingOperator { ginkgo.By("Uninstalling operator") err := runCommand(kubectlPath, "delete", "-k", operatorManifestsPath) gomega.Expect(err).ToNot(gomega.HaveOccurred()) } }, func() {}) func getEnvDefault(key, defaultVal string) string { v, ok := os.LookupEnv(key) if ok { return v } return defaultVal } func bootstrapKindCluster() error { err := runCommand(kindPath, "create", "cluster", "--image", kindImage) if err != nil { return fmt.Errorf("creating kind cluster: %w", err) } err = runCommand(kindPath, "load", "docker-image", mpiOperatorImage, openMPIImage, intelMPIImage, mpichImage) if err != nil { return fmt.Errorf("loading container images: %w", err) } return nil } func installOperator() error { err := runCommand(kubectlPath, "apply", "--server-side", "-k", operatorManifestsPath) if err != nil { return fmt.Errorf("applying operator YAMLs: %w", err) } return wait.PollUntilContextTimeout(context.Background(), waitInterval, foreverTimeout, false, func(ctx context.Context) (bool, error) { return ensureDeploymentAvailableReplicas(ctx, mpiOperator, mpiOperator) }) } func installSchedulerPlugins() error { // TODO: Remove flags to overwrite images once the scheduler-plugins with the appropriate helm charts is released. // In the specific scheudler-plugins version such as v0.26.7, manifests are incorrect. // So we overwrite images. overwriteControllerImage := fmt.Sprintf("controller.image=registry.k8s.io/scheduler-plugins/controller:%s", schedulerPluginsVersion) overwriteSchedulerImage := fmt.Sprintf("scheduler.image=registry.k8s.io/scheduler-plugins/kube-scheduler:%s", schedulerPluginsVersion) err := runCommand(helmPath, "install", "scheduler-plugins", schedulerPluginsManifestPath, "--create-namespace", "--namespace", schedulerPlugins, "--set", overwriteSchedulerImage, "--set", overwriteControllerImage) if err != nil { return fmt.Errorf("installing scheduler-plugins Helm Chart: %w", err) } // The following ClusterRole patch is workaround for the https://github.com/kubernetes-sigs/scheduler-plugins/commit/2aaf10fb0f6f657f21429a864268fa1ec0a3c29a. // TODO: Once the new scheduler-plugins version is released, we should remove the following workaround. cr, err := k8sClient.RbacV1().ClusterRoles().Get(context.Background(), "scheduler-plugins-scheduler", metav1.GetOptions{}) if err != nil { return err } cr.Rules = append(cr.Rules, rbacv1.PolicyRule{ APIGroups: []string{storagev1.GroupName}, Resources: []string{"volumeattachments"}, Verbs: []string{"get", "list", "watch"}, }) if _, err = k8sClient.RbacV1().ClusterRoles().Update(context.Background(), cr, metav1.UpdateOptions{}); err != nil { return err } return wait.PollUntilContextTimeout(context.Background(), waitInterval, foreverTimeout, false, func(ctx context.Context) (bool, error) { controllerName := fmt.Sprintf("%s-controller", schedulerPlugins) if ok, err := ensureDeploymentAvailableReplicas(ctx, schedulerPlugins, controllerName); !ok || err != nil { return false, err } schedulerName := fmt.Sprintf("%s-scheduler", schedulerPlugins) if ok, err := ensureDeploymentAvailableReplicas(ctx, schedulerPlugins, schedulerName); !ok || err != nil { return false, err } return true, nil }) } func installVolcanoScheduler() error { err := runCommand(kubectlPath, "apply", "-f", volcanoSchedulerManifestPath) if err != nil { return fmt.Errorf("failed to install volcano scheduler : %w", err) } volcanoNamespace := "volcano-system" return wait.PollUntilContextTimeout(context.Background(), waitInterval, foreverTimeout, false, func(ctx context.Context) (bool, error) { if ok, err := ensureDeploymentAvailableReplicas(ctx, volcanoNamespace, "volcano-scheduler"); !ok || err != nil { return false, err } if ok, err := ensureDeploymentAvailableReplicas(ctx, volcanoNamespace, "volcano-controllers"); !ok || err != nil { return false, err } if ok, err := ensureDeploymentAvailableReplicas(ctx, volcanoNamespace, "volcano-admission"); !ok || err != nil { return false, err } return true, nil }) } func runCommand(name string, args ...string) error { cmd := exec.Command(name, args...) cmd.Stderr = os.Stderr cmd.Stdout = os.Stdout return cmd.Run() } func ensureDeploymentAvailableReplicas(ctx context.Context, namespace, name string) (bool, error) { deployment, err := k8sClient.AppsV1().Deployments(namespace).Get(ctx, name, metav1.GetOptions{}) if errors.IsNotFound(err) { return false, nil } if err != nil { return false, err } return deployment.Status.AvailableReplicas != 0, nil }