454 lines
16 KiB
Go
454 lines
16 KiB
Go
package integration
|
|
|
|
import (
|
|
"io/ioutil"
|
|
"sort"
|
|
"testing"
|
|
"time"
|
|
|
|
"github.com/argoproj/argo-workflows/v3/pkg/apis/workflow/v1alpha1"
|
|
"github.com/ghodss/yaml"
|
|
"github.com/golang/glog"
|
|
"github.com/stretchr/testify/assert"
|
|
"github.com/stretchr/testify/require"
|
|
"github.com/stretchr/testify/suite"
|
|
|
|
experimentParams "github.com/kubeflow/pipelines/backend/api/go_http_client/experiment_client/experiment_service"
|
|
"github.com/kubeflow/pipelines/backend/api/go_http_client/experiment_model"
|
|
jobparams "github.com/kubeflow/pipelines/backend/api/go_http_client/job_client/job_service"
|
|
"github.com/kubeflow/pipelines/backend/api/go_http_client/job_model"
|
|
pipelineParams "github.com/kubeflow/pipelines/backend/api/go_http_client/pipeline_client/pipeline_service"
|
|
"github.com/kubeflow/pipelines/backend/api/go_http_client/pipeline_model"
|
|
uploadParams "github.com/kubeflow/pipelines/backend/api/go_http_client/pipeline_upload_client/pipeline_upload_service"
|
|
runParams "github.com/kubeflow/pipelines/backend/api/go_http_client/run_client/run_service"
|
|
"github.com/kubeflow/pipelines/backend/api/go_http_client/run_model"
|
|
"github.com/kubeflow/pipelines/backend/src/common/client/api_server"
|
|
"github.com/kubeflow/pipelines/backend/src/common/util"
|
|
"github.com/kubeflow/pipelines/backend/test"
|
|
)
|
|
|
|
// Methods are organized into two types: "prepare" and "verify".
|
|
// "prepare" tests setup resources before upgrade
|
|
// "verify" tests verifies resources are expected after upgrade
|
|
type UpgradeTests struct {
|
|
suite.Suite
|
|
namespace string
|
|
experimentClient *api_server.ExperimentClient
|
|
pipelineClient *api_server.PipelineClient
|
|
pipelineUploadClient *api_server.PipelineUploadClient
|
|
runClient *api_server.RunClient
|
|
jobClient *api_server.JobClient
|
|
}
|
|
|
|
func TestUpgrade(t *testing.T) {
|
|
suite.Run(t, new(UpgradeTests))
|
|
}
|
|
|
|
func (s *UpgradeTests) TestPrepare() {
|
|
t := s.T()
|
|
|
|
test.DeleteAllJobs(s.jobClient, t)
|
|
test.DeleteAllRuns(s.runClient, t)
|
|
test.DeleteAllPipelines(s.pipelineClient, t)
|
|
test.DeleteAllExperiments(s.experimentClient, t)
|
|
|
|
s.PrepareExperiments()
|
|
s.PreparePipelines()
|
|
s.PrepareRuns()
|
|
s.PrepareJobs()
|
|
}
|
|
|
|
func (s *UpgradeTests) TestVerify() {
|
|
s.VerifyExperiments()
|
|
s.VerifyPipelines()
|
|
s.VerifyRuns()
|
|
s.VerifyJobs()
|
|
}
|
|
|
|
// Check the namespace have ML job installed and ready
|
|
func (s *UpgradeTests) SetupSuite() {
|
|
// Integration tests also run these tests to first ensure they work, so that
|
|
// when integration tests pass and upgrade tests fail, we know for sure
|
|
// upgrade process went wrong somehow.
|
|
if !(*runIntegrationTests || *runUpgradeTests) {
|
|
s.T().SkipNow()
|
|
return
|
|
}
|
|
|
|
var err error
|
|
if !*isDevMode {
|
|
err = test.WaitForReady(*namespace, *initializeTimeout)
|
|
if err != nil {
|
|
glog.Exitf("Failed to initialize test. Error: %v", err)
|
|
}
|
|
}
|
|
s.namespace = *namespace
|
|
clientConfig := test.GetClientConfig(*namespace)
|
|
s.experimentClient, err = api_server.NewExperimentClient(clientConfig, false)
|
|
if err != nil {
|
|
glog.Exitf("Failed to get experiment client. Error: %v", err)
|
|
}
|
|
s.pipelineUploadClient, err = api_server.NewPipelineUploadClient(clientConfig, false)
|
|
if err != nil {
|
|
glog.Exitf("Failed to get pipeline upload client. Error: %s", err.Error())
|
|
}
|
|
s.pipelineClient, err = api_server.NewPipelineClient(clientConfig, false)
|
|
if err != nil {
|
|
glog.Exitf("Failed to get pipeline client. Error: %s", err.Error())
|
|
}
|
|
s.runClient, err = api_server.NewRunClient(clientConfig, false)
|
|
if err != nil {
|
|
glog.Exitf("Failed to get run client. Error: %s", err.Error())
|
|
}
|
|
s.jobClient, err = api_server.NewJobClient(clientConfig, false)
|
|
if err != nil {
|
|
glog.Exitf("Failed to get job client. Error: %s", err.Error())
|
|
}
|
|
}
|
|
|
|
func (s *UpgradeTests) TearDownSuite() {
|
|
if *runIntegrationTests {
|
|
if !*isDevMode {
|
|
t := s.T()
|
|
// Clean up after the suite to unblock other tests. (Not needed for upgrade
|
|
// tests because it needs changes in prepare tests to persist and verified
|
|
// later.)
|
|
test.DeleteAllExperiments(s.experimentClient, t)
|
|
test.DeleteAllPipelines(s.pipelineClient, t)
|
|
test.DeleteAllRuns(s.runClient, t)
|
|
test.DeleteAllJobs(s.jobClient, t)
|
|
}
|
|
}
|
|
}
|
|
|
|
func (s *UpgradeTests) PrepareExperiments() {
|
|
t := s.T()
|
|
|
|
/* ---------- Create a new experiment ---------- */
|
|
experiment := &experiment_model.APIExperiment{Name: "training", Description: "my first experiment"}
|
|
_, err := s.experimentClient.Create(&experimentParams.CreateExperimentParams{
|
|
Body: experiment,
|
|
})
|
|
require.Nil(t, err)
|
|
|
|
/* ---------- Create a few more new experiment ---------- */
|
|
// This ensures they can be sorted by create time in expected order.
|
|
time.Sleep(1 * time.Second)
|
|
experiment = &experiment_model.APIExperiment{Name: "prediction", Description: "my second experiment"}
|
|
_, err = s.experimentClient.Create(&experimentParams.CreateExperimentParams{
|
|
Body: experiment,
|
|
})
|
|
require.Nil(t, err)
|
|
|
|
time.Sleep(1 * time.Second)
|
|
experiment = &experiment_model.APIExperiment{Name: "moonshot", Description: "my third experiment"}
|
|
_, err = s.experimentClient.Create(&experimentParams.CreateExperimentParams{
|
|
Body: experiment,
|
|
})
|
|
require.Nil(t, err)
|
|
}
|
|
|
|
func (s *UpgradeTests) VerifyExperiments() {
|
|
t := s.T()
|
|
|
|
/* ---------- Verify list experiments sorted by creation time ---------- */
|
|
experiments, _, _, err := s.experimentClient.List(
|
|
&experimentParams.ListExperimentParams{SortBy: util.StringPointer("created_at")})
|
|
require.Nil(t, err)
|
|
// after upgrade, default experiment may be inserted, but the oldest 3
|
|
// experiments should be the ones created in this test
|
|
require.True(t, len(experiments) >= 3)
|
|
|
|
assert.Equal(t, "training", experiments[0].Name)
|
|
assert.Equal(t, "my first experiment", experiments[0].Description)
|
|
assert.NotEmpty(t, experiments[0].ID)
|
|
assert.NotEmpty(t, experiments[0].CreatedAt)
|
|
|
|
assert.Equal(t, "prediction", experiments[1].Name)
|
|
assert.Equal(t, "my second experiment", experiments[1].Description)
|
|
assert.NotEmpty(t, experiments[1].ID)
|
|
assert.NotEmpty(t, experiments[1].CreatedAt)
|
|
|
|
assert.Equal(t, "moonshot", experiments[2].Name)
|
|
assert.Equal(t, "my third experiment", experiments[2].Description)
|
|
assert.NotEmpty(t, experiments[2].ID)
|
|
assert.NotEmpty(t, experiments[2].CreatedAt)
|
|
}
|
|
|
|
// TODO(jingzhang36): prepare pipeline versions.
|
|
func (s *UpgradeTests) PreparePipelines() {
|
|
t := s.T()
|
|
|
|
test.DeleteAllPipelines(s.pipelineClient, t)
|
|
|
|
/* ---------- Upload pipelines YAML ---------- */
|
|
argumentYAMLPipeline, err := s.pipelineUploadClient.UploadFile("../resources/arguments-parameters.yaml", uploadParams.NewUploadPipelineParams())
|
|
require.Nil(t, err)
|
|
assert.Equal(t, "arguments-parameters.yaml", argumentYAMLPipeline.Name)
|
|
|
|
/* ---------- Import pipeline YAML by URL ---------- */
|
|
time.Sleep(1 * time.Second)
|
|
sequentialPipeline, err := s.pipelineClient.Create(&pipelineParams.CreatePipelineParams{
|
|
Body: &pipeline_model.APIPipeline{Name: "sequential", URL: &pipeline_model.APIURL{
|
|
PipelineURL: "https://storage.googleapis.com/ml-pipeline-dataset/sequential.yaml"}}})
|
|
require.Nil(t, err)
|
|
assert.Equal(t, "sequential", sequentialPipeline.Name)
|
|
|
|
/* ---------- Upload pipelines zip ---------- */
|
|
time.Sleep(1 * time.Second)
|
|
argumentUploadPipeline, err := s.pipelineUploadClient.UploadFile(
|
|
"../resources/arguments.pipeline.zip", &uploadParams.UploadPipelineParams{Name: util.StringPointer("zip-arguments-parameters")})
|
|
require.Nil(t, err)
|
|
assert.Equal(t, "zip-arguments-parameters", argumentUploadPipeline.Name)
|
|
|
|
/* ---------- Import pipeline tarball by URL ---------- */
|
|
time.Sleep(1 * time.Second)
|
|
argumentUrlPipeline, err := s.pipelineClient.Create(&pipelineParams.CreatePipelineParams{
|
|
Body: &pipeline_model.APIPipeline{URL: &pipeline_model.APIURL{
|
|
PipelineURL: "https://storage.googleapis.com/ml-pipeline-dataset/arguments.pipeline.zip"}}})
|
|
require.Nil(t, err)
|
|
assert.Equal(t, "arguments.pipeline.zip", argumentUrlPipeline.Name)
|
|
|
|
time.Sleep(1 * time.Second)
|
|
}
|
|
|
|
func (s *UpgradeTests) VerifyPipelines() {
|
|
t := s.T()
|
|
|
|
/* ---------- Verify list pipeline sorted by creation time ---------- */
|
|
pipelines, _, _, err := s.pipelineClient.List(
|
|
&pipelineParams.ListPipelinesParams{SortBy: util.StringPointer("created_at")})
|
|
require.Nil(t, err)
|
|
// During upgrade, default pipelines may be installed, so we only verify the
|
|
// 4 oldest pipelines here.
|
|
assert.True(t, len(pipelines) >= 4)
|
|
assert.Equal(t, "arguments-parameters.yaml", pipelines[0].Name)
|
|
assert.Equal(t, "sequential", pipelines[1].Name)
|
|
assert.Equal(t, "zip-arguments-parameters", pipelines[2].Name)
|
|
assert.Equal(t, "arguments.pipeline.zip", pipelines[3].Name)
|
|
|
|
verifyPipeline(t, pipelines[0])
|
|
|
|
/* ---------- Verify get template works ---------- */
|
|
template, err := s.pipelineClient.GetTemplate(&pipelineParams.GetTemplateParams{ID: pipelines[0].ID})
|
|
require.Nil(t, err)
|
|
expected, err := ioutil.ReadFile("../resources/arguments-parameters.yaml")
|
|
require.Nil(t, err)
|
|
var expectedWorkflow v1alpha1.Workflow
|
|
err = yaml.Unmarshal(expected, &expectedWorkflow)
|
|
assert.Equal(t, expectedWorkflow, *template)
|
|
}
|
|
|
|
func (s *UpgradeTests) PrepareRuns() {
|
|
t := s.T()
|
|
|
|
helloWorldPipeline := s.getHelloWorldPipeline(true)
|
|
helloWorldExperiment := s.getHelloWorldExperiment(true)
|
|
if helloWorldExperiment == nil {
|
|
helloWorldExperiment = s.createHelloWorldExperiment()
|
|
}
|
|
|
|
hello2 := s.getHelloWorldExperiment(true)
|
|
require.Equal(t, hello2, helloWorldExperiment)
|
|
|
|
/* ---------- Create a new hello world run by specifying pipeline ID ---------- */
|
|
createRunRequest := &runParams.CreateRunParams{Body: &run_model.APIRun{
|
|
Name: "hello world",
|
|
Description: "this is hello world",
|
|
PipelineSpec: &run_model.APIPipelineSpec{
|
|
PipelineID: helloWorldPipeline.ID,
|
|
},
|
|
ResourceReferences: []*run_model.APIResourceReference{
|
|
{Key: &run_model.APIResourceKey{Type: run_model.APIResourceTypeEXPERIMENT, ID: helloWorldExperiment.ID},
|
|
Name: helloWorldExperiment.Name, Relationship: run_model.APIRelationshipOWNER},
|
|
},
|
|
}}
|
|
_, _, err := s.runClient.Create(createRunRequest)
|
|
require.Nil(t, err)
|
|
}
|
|
|
|
func (s *UpgradeTests) VerifyRuns() {
|
|
t := s.T()
|
|
|
|
/* ---------- List the runs, sorted by creation time ---------- */
|
|
runs, _, _, err := s.runClient.List(
|
|
&runParams.ListRunsParams{SortBy: util.StringPointer("created_at")})
|
|
require.Nil(t, err)
|
|
require.True(t, len(runs) >= 1)
|
|
require.Equal(t, "hello world", runs[0].Name)
|
|
|
|
/* ---------- Get hello world run ---------- */
|
|
helloWorldRunDetail, _, err := s.runClient.Get(&runParams.GetRunParams{RunID: runs[0].ID})
|
|
require.Nil(t, err)
|
|
checkHelloWorldRunDetail(t, helloWorldRunDetail)
|
|
}
|
|
|
|
func (s *UpgradeTests) PrepareJobs() {
|
|
t := s.T()
|
|
|
|
pipeline := s.getHelloWorldPipeline(true)
|
|
experiment := s.getHelloWorldExperiment(true)
|
|
|
|
/* ---------- Create a new hello world job by specifying pipeline ID ---------- */
|
|
createJobRequest := &jobparams.CreateJobParams{Body: &job_model.APIJob{
|
|
Name: "hello world",
|
|
Description: "this is hello world",
|
|
PipelineSpec: &job_model.APIPipelineSpec{
|
|
PipelineID: pipeline.ID,
|
|
},
|
|
ResourceReferences: []*job_model.APIResourceReference{
|
|
{Key: &job_model.APIResourceKey{Type: job_model.APIResourceTypeEXPERIMENT, ID: experiment.ID},
|
|
Relationship: job_model.APIRelationshipOWNER},
|
|
},
|
|
MaxConcurrency: 10,
|
|
Enabled: true,
|
|
NoCatchup: true,
|
|
}}
|
|
_, err := s.jobClient.Create(createJobRequest)
|
|
require.Nil(t, err)
|
|
}
|
|
|
|
func (s *UpgradeTests) VerifyJobs() {
|
|
t := s.T()
|
|
|
|
pipeline := s.getHelloWorldPipeline(false)
|
|
experiment := s.getHelloWorldExperiment(false)
|
|
|
|
/* ---------- Get hello world job ---------- */
|
|
jobs, _, _, err := s.jobClient.List(&jobparams.ListJobsParams{})
|
|
require.Nil(t, err)
|
|
require.Len(t, jobs, 1)
|
|
job := jobs[0]
|
|
|
|
// Check workflow manifest is not empty
|
|
assert.Contains(t, job.PipelineSpec.WorkflowManifest, "whalesay")
|
|
expectedJob := &job_model.APIJob{
|
|
ID: job.ID,
|
|
Name: "hello world",
|
|
Description: "this is hello world",
|
|
PipelineSpec: &job_model.APIPipelineSpec{
|
|
PipelineID: pipeline.ID,
|
|
PipelineName: "hello-world.yaml",
|
|
WorkflowManifest: job.PipelineSpec.WorkflowManifest,
|
|
},
|
|
ResourceReferences: []*job_model.APIResourceReference{
|
|
{Key: &job_model.APIResourceKey{Type: job_model.APIResourceTypeEXPERIMENT, ID: experiment.ID},
|
|
Name: experiment.Name, Relationship: job_model.APIRelationshipOWNER,
|
|
},
|
|
{Key: &job_model.APIResourceKey{ID: pipeline.ID, Type: job_model.APIResourceTypePIPELINEVERSION},
|
|
Name: "hello-world.yaml", Relationship: job_model.APIRelationshipCREATOR,
|
|
},
|
|
},
|
|
ServiceAccount: "pipeline-runner",
|
|
MaxConcurrency: 10,
|
|
NoCatchup: true,
|
|
Enabled: true,
|
|
CreatedAt: job.CreatedAt,
|
|
UpdatedAt: job.UpdatedAt,
|
|
Status: job.Status,
|
|
Trigger: &job_model.APITrigger{},
|
|
}
|
|
|
|
sort.Sort(JobResourceReferenceSorter(job.ResourceReferences))
|
|
sort.Sort(JobResourceReferenceSorter(expectedJob.ResourceReferences))
|
|
assert.Equal(t, expectedJob, job)
|
|
}
|
|
|
|
func checkHelloWorldRunDetail(t *testing.T, runDetail *run_model.APIRunDetail) {
|
|
// Check workflow manifest is not empty
|
|
assert.Contains(t, runDetail.Run.PipelineSpec.WorkflowManifest, "whalesay")
|
|
// Check runtime workflow manifest is not empty
|
|
assert.Contains(t, runDetail.PipelineRuntime.WorkflowManifest, "whalesay")
|
|
|
|
expectedExperimentID := test.GetExperimentIDFromAPIResourceReferences(runDetail.Run.ResourceReferences)
|
|
require.NotEmpty(t, expectedExperimentID)
|
|
|
|
expectedRun := &run_model.APIRun{
|
|
ID: runDetail.Run.ID,
|
|
Name: "hello world",
|
|
Description: "this is hello world",
|
|
Status: runDetail.Run.Status,
|
|
PipelineSpec: &run_model.APIPipelineSpec{
|
|
PipelineID: runDetail.Run.PipelineSpec.PipelineID,
|
|
PipelineName: "hello-world.yaml",
|
|
WorkflowManifest: runDetail.Run.PipelineSpec.WorkflowManifest,
|
|
},
|
|
ResourceReferences: []*run_model.APIResourceReference{
|
|
{Key: &run_model.APIResourceKey{Type: run_model.APIResourceTypeEXPERIMENT, ID: expectedExperimentID},
|
|
Name: "hello world experiment", Relationship: run_model.APIRelationshipOWNER,
|
|
},
|
|
{Key: &run_model.APIResourceKey{ID: runDetail.Run.PipelineSpec.PipelineID, Type: run_model.APIResourceTypePIPELINEVERSION},
|
|
Name: "hello-world.yaml", Relationship: run_model.APIRelationshipCREATOR,
|
|
},
|
|
},
|
|
ServiceAccount: "pipeline-runner",
|
|
CreatedAt: runDetail.Run.CreatedAt,
|
|
ScheduledAt: runDetail.Run.ScheduledAt,
|
|
FinishedAt: runDetail.Run.FinishedAt,
|
|
}
|
|
sort.Sort(RunResourceReferenceSorter(expectedRun.ResourceReferences))
|
|
sort.Sort(RunResourceReferenceSorter(runDetail.Run.ResourceReferences))
|
|
assert.Equal(t, expectedRun, runDetail.Run)
|
|
}
|
|
|
|
func (s *UpgradeTests) createHelloWorldExperiment() *experiment_model.APIExperiment {
|
|
t := s.T()
|
|
|
|
experiment := &experiment_model.APIExperiment{Name: "hello world experiment"}
|
|
helloWorldExperiment, err := s.experimentClient.Create(&experimentParams.CreateExperimentParams{Body: experiment})
|
|
require.Nil(t, err)
|
|
|
|
return helloWorldExperiment
|
|
}
|
|
|
|
func (s *UpgradeTests) getHelloWorldExperiment(createIfNotExist bool) *experiment_model.APIExperiment {
|
|
t := s.T()
|
|
|
|
experiments, err := s.experimentClient.ListAll(&experimentParams.ListExperimentParams{}, 1000)
|
|
require.Nil(t, err)
|
|
var helloWorldExperiment *experiment_model.APIExperiment
|
|
for _, experiment := range experiments {
|
|
if experiment.Name == "hello world experiment" {
|
|
helloWorldExperiment = experiment
|
|
}
|
|
}
|
|
|
|
if helloWorldExperiment == nil && createIfNotExist {
|
|
return s.createHelloWorldExperiment()
|
|
}
|
|
|
|
return helloWorldExperiment
|
|
}
|
|
|
|
func (s *UpgradeTests) getHelloWorldPipeline(createIfNotExist bool) *pipeline_model.APIPipeline {
|
|
t := s.T()
|
|
|
|
pipelines, err := s.pipelineClient.ListAll(&pipelineParams.ListPipelinesParams{}, 1000)
|
|
require.Nil(t, err)
|
|
var helloWorldPipeline *pipeline_model.APIPipeline
|
|
for _, pipeline := range pipelines {
|
|
if pipeline.Name == "hello-world.yaml" {
|
|
helloWorldPipeline = pipeline
|
|
}
|
|
}
|
|
|
|
if helloWorldPipeline == nil && createIfNotExist {
|
|
return s.createHelloWorldPipeline()
|
|
}
|
|
|
|
return helloWorldPipeline
|
|
}
|
|
|
|
func (s *UpgradeTests) createHelloWorldPipeline() *pipeline_model.APIPipeline {
|
|
t := s.T()
|
|
|
|
/* ---------- Upload pipelines YAML ---------- */
|
|
uploadedPipeline, err := s.pipelineUploadClient.UploadFile("../resources/hello-world.yaml", uploadParams.NewUploadPipelineParams())
|
|
require.Nil(t, err)
|
|
|
|
helloWorldPipeline, err := s.pipelineClient.Get(&pipelineParams.GetPipelineParams{ID: uploadedPipeline.ID})
|
|
require.Nil(t, err)
|
|
|
|
return helloWorldPipeline
|
|
}
|