// Copyright 2018-2023 The Kubeflow Authors // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at // // https://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. package integration import ( "encoding/json" "fmt" "io/ioutil" "strings" "testing" "time" "github.com/golang/glog" experiment_params "github.com/kubeflow/pipelines/backend/api/v2beta1/go_http_client/experiment_client/experiment_service" "github.com/kubeflow/pipelines/backend/api/v2beta1/go_http_client/experiment_model" params "github.com/kubeflow/pipelines/backend/api/v2beta1/go_http_client/pipeline_client/pipeline_service" pipeline_params "github.com/kubeflow/pipelines/backend/api/v2beta1/go_http_client/pipeline_client/pipeline_service" "github.com/kubeflow/pipelines/backend/api/v2beta1/go_http_client/pipeline_model" upload_params "github.com/kubeflow/pipelines/backend/api/v2beta1/go_http_client/pipeline_upload_client/pipeline_upload_service" recurring_run_params "github.com/kubeflow/pipelines/backend/api/v2beta1/go_http_client/recurring_run_client/recurring_run_service" "github.com/kubeflow/pipelines/backend/api/v2beta1/go_http_client/recurring_run_model" runParams "github.com/kubeflow/pipelines/backend/api/v2beta1/go_http_client/run_client/run_service" "github.com/kubeflow/pipelines/backend/api/v2beta1/go_http_client/run_model" api_server "github.com/kubeflow/pipelines/backend/src/common/client/api_server/v2" "github.com/kubeflow/pipelines/backend/src/common/util" test "github.com/kubeflow/pipelines/backend/test/v2" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "github.com/stretchr/testify/suite" "sigs.k8s.io/yaml" ) // Methods are organized into two types: "prepare" and "verify". // "prepare" tests setup resources before upgrade // "verify" tests verifies resources are expected after upgrade type UpgradeTests struct { suite.Suite namespace string resourceNamespace string experimentClient *api_server.ExperimentClient pipelineClient *api_server.PipelineClient pipelineUploadClient *api_server.PipelineUploadClient runClient *api_server.RunClient recurringRunClient *api_server.RecurringRunClient } func TestUpgrade(t *testing.T) { suite.Run(t, new(UpgradeTests)) } func (s *UpgradeTests) TestPrepare() { t := s.T() test.DeleteAllRecurringRuns(s.recurringRunClient, s.resourceNamespace, t) test.DeleteAllRuns(s.runClient, s.resourceNamespace, t) test.DeleteAllPipelines(s.pipelineClient, t) test.DeleteAllExperiments(s.experimentClient, s.resourceNamespace, t) s.PrepareExperiments() s.PreparePipelines() s.PrepareRuns() s.PrepareRecurringRuns() } func (s *UpgradeTests) TestVerify() { s.VerifyExperiments() s.VerifyPipelines() s.VerifyRuns() s.VerifyRecurringRuns() s.VerifyCreatingRunsAndRecurringRuns() } // Check the namespace have ML job installed and ready func (s *UpgradeTests) SetupSuite() { // Integration tests also run these tests to first ensure they work, so that // when integration tests pass and upgrade tests fail, we know for sure // upgrade process went wrong somehow. if !(*runIntegrationTests || *runUpgradeTests) { s.T().SkipNow() return } if !*isDevMode { err := test.WaitForReady(*namespace, *initializeTimeout) if err != nil { glog.Exitf("Failed to initialize test. Error: %v", err) } } s.namespace = *namespace var newExperimentClient func() (*api_server.ExperimentClient, error) var newPipelineUploadClient func() (*api_server.PipelineUploadClient, error) var newPipelineClient func() (*api_server.PipelineClient, error) var newRunClient func() (*api_server.RunClient, error) var newRecurringRunClient func() (*api_server.RecurringRunClient, error) if *isKubeflowMode { s.resourceNamespace = *resourceNamespace newExperimentClient = func() (*api_server.ExperimentClient, error) { return api_server.NewKubeflowInClusterExperimentClient(s.namespace, *isDebugMode) } newPipelineUploadClient = func() (*api_server.PipelineUploadClient, error) { return api_server.NewKubeflowInClusterPipelineUploadClient(s.namespace, *isDebugMode) } newPipelineClient = func() (*api_server.PipelineClient, error) { return api_server.NewKubeflowInClusterPipelineClient(s.namespace, *isDebugMode) } newRunClient = func() (*api_server.RunClient, error) { return api_server.NewKubeflowInClusterRunClient(s.namespace, *isDebugMode) } newRecurringRunClient = func() (*api_server.RecurringRunClient, error) { return api_server.NewKubeflowInClusterRecurringRunClient(s.namespace, *isDebugMode) } } else { clientConfig := test.GetClientConfig(*namespace) newExperimentClient = func() (*api_server.ExperimentClient, error) { return api_server.NewExperimentClient(clientConfig, *isDebugMode) } newPipelineUploadClient = func() (*api_server.PipelineUploadClient, error) { return api_server.NewPipelineUploadClient(clientConfig, *isDebugMode) } newPipelineClient = func() (*api_server.PipelineClient, error) { return api_server.NewPipelineClient(clientConfig, *isDebugMode) } newRunClient = func() (*api_server.RunClient, error) { return api_server.NewRunClient(clientConfig, *isDebugMode) } newRecurringRunClient = func() (*api_server.RecurringRunClient, error) { return api_server.NewRecurringRunClient(clientConfig, *isDebugMode) } } var err error s.experimentClient, err = newExperimentClient() if err != nil { glog.Exitf("Failed to get experiment client. Error: %v", err) } s.pipelineUploadClient, err = newPipelineUploadClient() if err != nil { glog.Exitf("Failed to get pipeline upload client. Error: %s", err.Error()) } s.pipelineClient, err = newPipelineClient() if err != nil { glog.Exitf("Failed to get pipeline client. Error: %s", err.Error()) } s.runClient, err = newRunClient() if err != nil { glog.Exitf("Failed to get run client. Error: %s", err.Error()) } s.recurringRunClient, err = newRecurringRunClient() if err != nil { glog.Exitf("Failed to get job client. Error: %s", err.Error()) } } func (s *UpgradeTests) TearDownSuite() { if *runIntegrationTests { if !*isDevMode { t := s.T() // Clean up after the suite to unblock other tests. (Not needed for upgrade // tests because it needs changes in prepare tests to persist and verified // later.) test.DeleteAllRecurringRuns(s.recurringRunClient, s.resourceNamespace, t) test.DeleteAllRuns(s.runClient, s.resourceNamespace, t) test.DeleteAllPipelines(s.pipelineClient, t) test.DeleteAllExperiments(s.experimentClient, s.resourceNamespace, t) } } } func (s *UpgradeTests) PrepareExperiments() { t := s.T() /* ---------- Create a new experiment ---------- */ experiment := test.MakeExperiment("training", "my first experiment", s.resourceNamespace) _, err := s.experimentClient.Create(&experiment_params.CreateExperimentParams{ Body: experiment, }) require.Nil(t, err) /* ---------- Create a few more new experiment ---------- */ // This ensures they can be sorted by create time in expected order. time.Sleep(1 * time.Second) experiment = test.MakeExperiment("prediction", "my second experiment", s.resourceNamespace) _, err = s.experimentClient.Create(&experiment_params.CreateExperimentParams{ Body: experiment, }) require.Nil(t, err) time.Sleep(1 * time.Second) experiment = test.MakeExperiment("moonshot", "my third experiment", s.resourceNamespace) _, err = s.experimentClient.Create(&experiment_params.CreateExperimentParams{ Body: experiment, }) require.Nil(t, err) } func (s *UpgradeTests) VerifyExperiments() { t := s.T() /* ---------- Verify list experiments sorted by creation time ---------- */ // This should have the hello-world experiment in addition to the old experiments. experiments, _, _, err := test.ListExperiment( s.experimentClient, &experiment_params.ListExperimentsParams{SortBy: util.StringPointer("created_at")}, "", ) require.Nil(t, err) allExperiments := make([]string, len(experiments)) for i, exp := range experiments { allExperiments[i] = fmt.Sprintf("%v: %v/%v", i, exp.Namespace, exp.DisplayName) } fmt.Printf("All experiments: %v", allExperiments) assert.Equal(t, 5, len(experiments)) // Default experiment is no longer deletable assert.Equal(t, "Default", experiments[0].DisplayName) assert.Contains(t, experiments[0].Description, "All runs created without specifying an experiment will be grouped here") assert.NotEmpty(t, experiments[0].ExperimentID) assert.NotEmpty(t, experiments[0].CreatedAt) assert.Equal(t, "training", experiments[1].DisplayName) assert.Equal(t, "my first experiment", experiments[1].Description) assert.NotEmpty(t, experiments[1].ExperimentID) assert.NotEmpty(t, experiments[1].CreatedAt) assert.Equal(t, "prediction", experiments[2].DisplayName) assert.Equal(t, "my second experiment", experiments[2].Description) assert.NotEmpty(t, experiments[2].ExperimentID) assert.NotEmpty(t, experiments[2].CreatedAt) assert.Equal(t, "moonshot", experiments[3].DisplayName) assert.Equal(t, "my third experiment", experiments[3].Description) assert.NotEmpty(t, experiments[3].ExperimentID) assert.NotEmpty(t, experiments[3].CreatedAt) assert.Equal(t, "hello world experiment", experiments[4].DisplayName) assert.Equal(t, "", experiments[4].Description) assert.NotEmpty(t, experiments[4].ExperimentID) assert.NotEmpty(t, experiments[4].CreatedAt) } func (s *UpgradeTests) PreparePipelines() { t := s.T() test.DeleteAllPipelines(s.pipelineClient, t) /* ---------- Upload pipelines YAML ---------- */ argumentYAMLPipeline, err := s.pipelineUploadClient.UploadFile("../resources/arguments-parameters.yaml", upload_params.NewUploadPipelineParams()) require.Nil(t, err) assert.Equal(t, "arguments-parameters.yaml", argumentYAMLPipeline.DisplayName) /* ---------- Import pipeline YAML by URL ---------- */ time.Sleep(1 * time.Second) sequentialPipeline, err := s.pipelineClient.Create(&pipeline_params.CreatePipelineParams{ Body: &pipeline_model.V2beta1Pipeline{DisplayName: "sequential"}, }) require.Nil(t, err) assert.Equal(t, "sequential", sequentialPipeline.DisplayName) sequentialPipelineVersion, err := s.pipelineClient.CreatePipelineVersion(¶ms.CreatePipelineVersionParams{ PipelineID: sequentialPipeline.PipelineID, Body: &pipeline_model.V2beta1PipelineVersion{ DisplayName: "sequential", PackageURL: &pipeline_model.V2beta1URL{ PipelineURL: "https://storage.googleapis.com/ml-pipeline-dataset/v2/sequential.yaml", }, PipelineID: sequentialPipeline.PipelineID, }, }) require.Nil(t, err) assert.Equal(t, "sequential", sequentialPipelineVersion.DisplayName) /* ---------- Upload pipelines zip ---------- */ time.Sleep(1 * time.Second) argumentUploadPipeline, err := s.pipelineUploadClient.UploadFile( "../resources/arguments.pipeline.zip", &upload_params.UploadPipelineParams{Name: util.StringPointer("zip-arguments-parameters")}) require.Nil(t, err) assert.Equal(t, "zip-arguments-parameters", argumentUploadPipeline.DisplayName) /* ---------- Import pipeline tarball by URL ---------- */ time.Sleep(1 * time.Second) argumentUrlPipeline, err := s.pipelineClient.Create(&pipeline_params.CreatePipelineParams{ Body: &pipeline_model.V2beta1Pipeline{DisplayName: "arguments.pipeline.zip"}, }) require.Nil(t, err) assert.Equal(t, "arguments.pipeline.zip", argumentUrlPipeline.DisplayName) argumentUrlPipelineVersion, err := s.pipelineClient.CreatePipelineVersion(¶ms.CreatePipelineVersionParams{ PipelineID: argumentUrlPipeline.PipelineID, Body: &pipeline_model.V2beta1PipelineVersion{ DisplayName: "arguments", PackageURL: &pipeline_model.V2beta1URL{ PipelineURL: "https://storage.googleapis.com/ml-pipeline-dataset/v2/arguments.pipeline.zip", }, PipelineID: argumentUrlPipeline.PipelineID, }, }) require.Nil(t, err) assert.Equal(t, "arguments", argumentUrlPipelineVersion.DisplayName) time.Sleep(1 * time.Second) } func (s *UpgradeTests) VerifyPipelines() { t := s.T() /* ---------- Verify list pipeline sorted by creation time ---------- */ pipelines, _, _, err := s.pipelineClient.List( &pipeline_params.ListPipelinesParams{SortBy: util.StringPointer("created_at")}) require.Nil(t, err) // During upgrade, default pipelines may be installed, so we only verify the // 4 oldest pipelines here. assert.True(t, len(pipelines) >= 4) assert.Equal(t, "arguments-parameters.yaml", pipelines[0].DisplayName) assert.Equal(t, "sequential", pipelines[1].DisplayName) assert.Equal(t, "zip-arguments-parameters", pipelines[2].DisplayName) assert.Equal(t, "arguments.pipeline.zip", pipelines[3].DisplayName) /* ---------- Verify pipeline spec ---------- */ pipelineVersions, totalSize, _, err := s.pipelineClient.ListPipelineVersions( ¶ms.ListPipelineVersionsParams{ PipelineID: pipelines[0].PipelineID, }) require.Nil(t, err) assert.Equal(t, totalSize, 1) pipelineVersion, err := s.pipelineClient.GetPipelineVersion(¶ms.GetPipelineVersionParams{PipelineID: pipelines[0].PipelineID, PipelineVersionID: pipelineVersions[0].PipelineVersionID}) require.Nil(t, err) bytes, err := ioutil.ReadFile("../resources/arguments-parameters.yaml") expected_bytes, err := yaml.YAMLToJSON(bytes) require.Nil(t, err) actual_bytes, err := json.Marshal(pipelineVersion.PipelineSpec) require.Nil(t, err) // Override pipeline name, then compare assert.Equal(t, string(expected_bytes), strings.Replace(string(actual_bytes), "pipeline/arguments-parameters.yaml", "whalesay", 1)) } func (s *UpgradeTests) PrepareRuns() { t := s.T() helloWorldPipeline := s.getHelloWorldPipeline(true) helloWorldExperiment := s.getHelloWorldExperiment(true) if helloWorldExperiment == nil { helloWorldExperiment = s.createHelloWorldExperiment() } hello2 := s.getHelloWorldExperiment(true) require.Equal(t, hello2, helloWorldExperiment) /* ---------- Create a new hello world run by specifying pipeline ID ---------- */ createRunRequest := &runParams.CreateRunParams{Body: &run_model.V2beta1Run{ DisplayName: "hello world", Description: "this is hello world", ExperimentID: helloWorldExperiment.ExperimentID, PipelineVersionReference: &run_model.V2beta1PipelineVersionReference{ PipelineID: helloWorldPipeline.PipelineID, PipelineVersionID: helloWorldPipeline.PipelineVersionID, }, }} _, err := s.runClient.Create(createRunRequest) require.Nil(t, err) } func (s *UpgradeTests) VerifyRuns() { t := s.T() /* ---------- List the runs, sorted by creation time ---------- */ runs, _, _, err := test.ListRuns( s.runClient, &runParams.ListRunsParams{SortBy: util.StringPointer("created_at")}, s.resourceNamespace) require.Nil(t, err) assert.True(t, len(runs) >= 1) assert.Equal(t, "hello world", runs[0].DisplayName) /* ---------- Get hello world run ---------- */ helloWorldRunDetail, err := s.runClient.Get(&runParams.GetRunParams{RunID: runs[0].RunID}) require.Nil(t, err) assert.Equal(t, "hello world", helloWorldRunDetail.DisplayName) assert.Equal(t, "this is hello world", helloWorldRunDetail.Description) } func (s *UpgradeTests) PrepareRecurringRuns() { t := s.T() pipeline := s.getHelloWorldPipeline(true) experiment := s.getHelloWorldExperiment(true) /* ---------- Create a new hello world job by specifying pipeline ID ---------- */ createRecurringRunRequest := &recurring_run_params.CreateRecurringRunParams{Body: &recurring_run_model.V2beta1RecurringRun{ DisplayName: "hello world", Description: "this is hello world", PipelineVersionReference: &recurring_run_model.V2beta1PipelineVersionReference{ PipelineID: pipeline.PipelineID, PipelineVersionID: pipeline.PipelineVersionID, }, ExperimentID: experiment.ExperimentID, MaxConcurrency: 10, Mode: recurring_run_model.RecurringRunModeENABLE, NoCatchup: true, }} _, err := s.recurringRunClient.Create(createRecurringRunRequest) require.Nil(t, err) } func (s *UpgradeTests) VerifyRecurringRuns() { t := s.T() pipeline := s.getHelloWorldPipeline(false) experiment := s.getHelloWorldExperiment(false) /* ---------- Get hello world recurring run ---------- */ recurringRuns, _, _, err := test.ListAllRecurringRuns(s.recurringRunClient, s.resourceNamespace) require.Nil(t, err) require.Len(t, recurringRuns, 1) recurringRun := recurringRuns[0] expectedRecurringRun := &recurring_run_model.V2beta1RecurringRun{ RecurringRunID: recurringRun.RecurringRunID, DisplayName: "hello world", Description: "this is hello world", ExperimentID: experiment.ExperimentID, PipelineVersionReference: &recurring_run_model.V2beta1PipelineVersionReference{ PipelineID: pipeline.PipelineID, PipelineVersionID: pipeline.PipelineVersionID, }, ServiceAccount: test.GetDefaultPipelineRunnerServiceAccount(*isKubeflowMode), MaxConcurrency: 10, NoCatchup: true, Mode: recurring_run_model.RecurringRunModeENABLE, Namespace: recurringRun.Namespace, CreatedAt: recurringRun.CreatedAt, UpdatedAt: recurringRun.UpdatedAt, Trigger: recurringRun.Trigger, Status: recurringRun.Status, } assert.Equal(t, expectedRecurringRun, recurringRun) } func (s *UpgradeTests) VerifyCreatingRunsAndRecurringRuns() { t := s.T() /* ---------- Get the oldest pipeline and the newest experiment ---------- */ pipelines, _, _, err := s.pipelineClient.List( &pipeline_params.ListPipelinesParams{SortBy: util.StringPointer("created_at")}) require.Nil(t, err) assert.Equal(t, "arguments-parameters.yaml", pipelines[0].DisplayName) pipelineVersions, totalSize, _, err := s.pipelineClient.ListPipelineVersions(¶ms.ListPipelineVersionsParams{ PipelineID: pipelines[0].PipelineID, }) require.Nil(t, err) assert.Equal(t, 1, totalSize) experiments, _, _, err := test.ListExperiment( s.experimentClient, &experiment_params.ListExperimentsParams{SortBy: util.StringPointer("created_at")}, "", ) require.Nil(t, err) assert.Equal(t, "Default", experiments[0].DisplayName) assert.Equal(t, "training", experiments[1].DisplayName) assert.Equal(t, "hello world experiment", experiments[4].DisplayName) /* ---------- Create a new run based on the oldest pipeline and its default pipeline version ---------- */ createRunRequest := &runParams.CreateRunParams{Body: &run_model.V2beta1Run{ DisplayName: "argument parameter from pipeline", Description: "a run from an old pipeline", // This run should belong to the newest experiment (created after the upgrade) ExperimentID: experiments[4].ExperimentID, RuntimeConfig: &run_model.V2beta1RuntimeConfig{ Parameters: map[string]interface{}{ "param1": "goodbye", "param2": "world", }, }, PipelineVersionReference: &run_model.V2beta1PipelineVersionReference{ PipelineID: pipelineVersions[0].PipelineID, PipelineVersionID: pipelineVersions[0].PipelineVersionID, }, }} runFromPipeline, err := s.runClient.Create(createRunRequest) assert.Nil(t, err) assert.Equal(t, experiments[4].ExperimentID, runFromPipeline.ExperimentID) /* ---------- Create a new recurring run based on the second oldest pipeline version and belonging to the second oldest experiment ---------- */ pipelineVersions, totalSize, _, err = s.pipelineClient.ListPipelineVersions(¶ms.ListPipelineVersionsParams{ PipelineID: pipelines[1].PipelineID, }) require.Nil(t, err) assert.Equal(t, 1, totalSize) createRecurringRunRequest := &recurring_run_params.CreateRecurringRunParams{Body: &recurring_run_model.V2beta1RecurringRun{ DisplayName: "sequential job from pipeline version", Description: "a recurring run from an old pipeline version", ExperimentID: experiments[1].ExperimentID, PipelineVersionReference: &recurring_run_model.V2beta1PipelineVersionReference{ PipelineID: pipelineVersions[0].PipelineID, PipelineVersionID: pipelineVersions[0].PipelineVersionID, }, RuntimeConfig: &recurring_run_model.V2beta1RuntimeConfig{ Parameters: map[string]interface{}{ "url": "gs://ml-pipeline-playground/shakespeare1.txt", }, }, MaxConcurrency: 10, Mode: recurring_run_model.RecurringRunModeENABLE, }} createdRecurringRun, err := s.recurringRunClient.Create(createRecurringRunRequest) assert.Nil(t, err) assert.Equal(t, experiments[1].ExperimentID, createdRecurringRun.ExperimentID) } func (s *UpgradeTests) createHelloWorldExperiment() *experiment_model.V2beta1Experiment { t := s.T() experiment := test.MakeExperiment("hello world experiment", "", s.resourceNamespace) helloWorldExperiment, err := s.experimentClient.Create(&experiment_params.CreateExperimentParams{Body: experiment}) require.Nil(t, err) return helloWorldExperiment } func (s *UpgradeTests) getHelloWorldExperiment(createIfNotExist bool) *experiment_model.V2beta1Experiment { t := s.T() experiments, _, _, err := test.ListExperiment( s.experimentClient, &experiment_params.ListExperimentsParams{ PageSize: util.Int32Pointer(1000), }, s.resourceNamespace) require.Nil(t, err) var helloWorldExperiment *experiment_model.V2beta1Experiment for _, experiment := range experiments { if experiment.DisplayName == "hello world experiment" { helloWorldExperiment = experiment } } if helloWorldExperiment == nil && createIfNotExist { return s.createHelloWorldExperiment() } return helloWorldExperiment } func (s *UpgradeTests) getHelloWorldPipeline(createIfNotExist bool) *pipeline_model.V2beta1PipelineVersion { t := s.T() pipelines, err := s.pipelineClient.ListAll(&pipeline_params.ListPipelinesParams{}, 1000) require.Nil(t, err) var helloWorldPipeline *pipeline_model.V2beta1Pipeline for _, pipeline := range pipelines { if pipeline.DisplayName == "hello-world.yaml" { helloWorldPipeline = pipeline } } if helloWorldPipeline == nil && createIfNotExist { return s.createHelloWorldPipeline() } pipelineVersions, totalSize, _, err := s.pipelineClient.ListPipelineVersions(¶ms.ListPipelineVersionsParams{ PipelineID: helloWorldPipeline.PipelineID, }) require.Nil(t, err) require.Equal(t, 1, totalSize) return pipelineVersions[0] } func (s *UpgradeTests) createHelloWorldPipeline() *pipeline_model.V2beta1PipelineVersion { t := s.T() /* ---------- Upload pipelines YAML ---------- */ uploadedPipeline, err := s.pipelineUploadClient.UploadFile("../resources/hello-world.yaml", upload_params.NewUploadPipelineParams()) require.Nil(t, err) pipelineVersions, totalSize, _, err := s.pipelineClient.ListPipelineVersions(¶ms.ListPipelineVersionsParams{ PipelineID: uploadedPipeline.PipelineID, }) require.Nil(t, err) require.Equal(t, 1, totalSize) return pipelineVersions[0] }