pipelines/backend/test/v2/integration/run_api_test.go

445 lines
17 KiB
Go

// Copyright 2018-2023 The Kubeflow Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package integration
import (
"encoding/json"
"fmt"
"io/ioutil"
"testing"
"time"
"github.com/golang/glog"
experiment_params "github.com/kubeflow/pipelines/backend/api/v2beta1/go_http_client/experiment_client/experiment_service"
upload_params "github.com/kubeflow/pipelines/backend/api/v2beta1/go_http_client/pipeline_upload_client/pipeline_upload_service"
run_params "github.com/kubeflow/pipelines/backend/api/v2beta1/go_http_client/run_client/run_service"
"github.com/kubeflow/pipelines/backend/api/v2beta1/go_http_client/run_model"
api_server "github.com/kubeflow/pipelines/backend/src/common/client/api_server/v2"
"github.com/kubeflow/pipelines/backend/src/common/util"
test "github.com/kubeflow/pipelines/backend/test/v2"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/suite"
"google.golang.org/protobuf/types/known/structpb"
"sigs.k8s.io/yaml"
)
type RunApiTestSuite struct {
suite.Suite
namespace string
resourceNamespace string
experimentClient *api_server.ExperimentClient
pipelineClient *api_server.PipelineClient
pipelineUploadClient *api_server.PipelineUploadClient
runClient *api_server.RunClient
}
// Check the namespace have ML pipeline installed and ready
func (s *RunApiTestSuite) SetupTest() {
if !*runIntegrationTests {
s.T().SkipNow()
return
}
if !*isDevMode {
err := test.WaitForReady(*namespace, *initializeTimeout)
if err != nil {
glog.Exitf("Failed to initialize test. Error: %s", err.Error())
}
}
s.namespace = *namespace
var newExperimentClient func() (*api_server.ExperimentClient, error)
var newPipelineUploadClient func() (*api_server.PipelineUploadClient, error)
var newPipelineClient func() (*api_server.PipelineClient, error)
var newRunClient func() (*api_server.RunClient, error)
if *isKubeflowMode {
s.resourceNamespace = *resourceNamespace
newExperimentClient = func() (*api_server.ExperimentClient, error) {
return api_server.NewKubeflowInClusterExperimentClient(s.namespace, *isDebugMode)
}
newPipelineUploadClient = func() (*api_server.PipelineUploadClient, error) {
return api_server.NewKubeflowInClusterPipelineUploadClient(s.namespace, *isDebugMode)
}
newPipelineClient = func() (*api_server.PipelineClient, error) {
return api_server.NewKubeflowInClusterPipelineClient(s.namespace, *isDebugMode)
}
newRunClient = func() (*api_server.RunClient, error) {
return api_server.NewKubeflowInClusterRunClient(s.namespace, *isDebugMode)
}
} else {
clientConfig := test.GetClientConfig(*namespace)
newExperimentClient = func() (*api_server.ExperimentClient, error) {
return api_server.NewExperimentClient(clientConfig, *isDebugMode)
}
newPipelineUploadClient = func() (*api_server.PipelineUploadClient, error) {
return api_server.NewPipelineUploadClient(clientConfig, *isDebugMode)
}
newPipelineClient = func() (*api_server.PipelineClient, error) {
return api_server.NewPipelineClient(clientConfig, *isDebugMode)
}
newRunClient = func() (*api_server.RunClient, error) {
return api_server.NewRunClient(clientConfig, *isDebugMode)
}
}
var err error
s.experimentClient, err = newExperimentClient()
if err != nil {
glog.Exitf("Failed to get experiment client. Error: %v", err)
}
s.pipelineUploadClient, err = newPipelineUploadClient()
if err != nil {
glog.Exitf("Failed to get pipeline upload client. Error: %s", err.Error())
}
s.pipelineClient, err = newPipelineClient()
if err != nil {
glog.Exitf("Failed to get pipeline client. Error: %s", err.Error())
}
s.runClient, err = newRunClient()
if err != nil {
glog.Exitf("Failed to get run client. Error: %s", err.Error())
}
s.cleanUp()
}
func (s *RunApiTestSuite) TestRunApis() {
t := s.T()
/* ---------- Upload pipelines YAML ---------- */
helloWorldPipeline, err := s.pipelineUploadClient.UploadFile("../resources/hello-world.yaml", upload_params.NewUploadPipelineParams())
assert.Nil(t, err)
/* ---------- Upload a pipeline version YAML under helloWorldPipeline ---------- */
time.Sleep(1 * time.Second)
helloWorldPipelineVersion, err := s.pipelineUploadClient.UploadPipelineVersion(
"../resources/hello-world.yaml", &upload_params.UploadPipelineVersionParams{
Name: util.StringPointer("hello-world-version"),
Pipelineid: util.StringPointer(helloWorldPipeline.PipelineID),
})
assert.Nil(t, err)
/* ---------- Create a new hello world experiment ---------- */
experiment := test.MakeExperiment("hello world experiment", "", s.resourceNamespace)
helloWorldExperiment, err := s.experimentClient.Create(&experiment_params.CreateExperimentParams{Body: experiment})
assert.Nil(t, err)
/* ---------- Create a new hello world run by specifying pipeline version ID ---------- */
createRunRequest := &run_params.CreateRunParams{Body: &run_model.V2beta1Run{
DisplayName: "hello world",
Description: "this is hello world",
ExperimentID: helloWorldExperiment.ExperimentID,
PipelineVersionReference: &run_model.V2beta1PipelineVersionReference{
PipelineID: helloWorldPipelineVersion.PipelineID,
PipelineVersionID: helloWorldPipelineVersion.PipelineVersionID,
},
}}
helloWorldRunDetail, err := s.runClient.Create(createRunRequest)
assert.Nil(t, err)
s.checkHelloWorldRunDetail(t, helloWorldRunDetail, helloWorldExperiment.ExperimentID, helloWorldPipelineVersion.PipelineID, helloWorldPipelineVersion.PipelineVersionID)
/* ---------- Get hello world run ---------- */
helloWorldRunDetail, err = s.runClient.Get(&run_params.GetRunParams{RunID: helloWorldRunDetail.RunID})
assert.Nil(t, err)
s.checkHelloWorldRunDetail(t, helloWorldRunDetail, helloWorldExperiment.ExperimentID, helloWorldPipelineVersion.PipelineID, helloWorldPipelineVersion.PipelineVersionID)
/* ---------- Create a new argument parameter experiment ---------- */
createExperimentRequest := &experiment_params.CreateExperimentParams{
Body: test.MakeExperiment("argument parameter experiment", "", s.resourceNamespace),
}
argParamsExperiment, err := s.experimentClient.Create(createExperimentRequest)
assert.Nil(t, err)
/* ---------- Create a new argument parameter run by uploading workflow manifest ---------- */
argParamsBytes, err := ioutil.ReadFile("../resources/arguments-parameters.yaml")
assert.Nil(t, err)
pipeline_spec := &structpb.Struct{}
err = yaml.Unmarshal(argParamsBytes, pipeline_spec)
assert.Nil(t, err)
createRunRequest = &run_params.CreateRunParams{Body: &run_model.V2beta1Run{
DisplayName: "argument parameter",
Description: "this is argument parameter",
PipelineSpec: pipeline_spec,
RuntimeConfig: &run_model.V2beta1RuntimeConfig{
Parameters: map[string]interface{}{
"param1": "goodbye",
"param2": "world",
},
},
ExperimentID: argParamsExperiment.ExperimentID,
}}
argParamsRunDetail, err := s.runClient.Create(createRunRequest)
assert.Nil(t, err)
s.checkArgParamsRunDetail(t, argParamsRunDetail, argParamsExperiment.ExperimentID)
/* ---------- List all the runs. Both runs should be returned ---------- */
runs, totalSize, _, err := test.ListAllRuns(s.runClient, s.resourceNamespace)
assert.Nil(t, err)
assert.Equal(t, 2, len(runs))
assert.Equal(t, 2, totalSize)
/* ---------- List the runs, paginated, sorted by creation time ---------- */
runs, totalSize, nextPageToken, err := test.ListRuns(
s.runClient,
&run_params.ListRunsParams{
PageSize: util.Int32Pointer(1),
SortBy: util.StringPointer("created_at"),
},
s.resourceNamespace)
assert.Nil(t, err)
assert.Equal(t, 1, len(runs))
assert.Equal(t, 2, totalSize)
/* TODO(issues/1762): fix the following flaky assertion. */
/* assert.Equal(t, "hello world", runs[0].Name) */
runs, totalSize, _, err = test.ListRuns(
s.runClient,
&run_params.ListRunsParams{
PageSize: util.Int32Pointer(1),
PageToken: util.StringPointer(nextPageToken),
},
s.resourceNamespace)
assert.Nil(t, err)
assert.Equal(t, 1, len(runs))
assert.Equal(t, 2, totalSize)
/* TODO(issues/1762): fix the following flaky assertion. */
/* assert.Equal(t, "argument parameter", runs[0].Name) */
/* ---------- List the runs, paginated, sort by name ---------- */
runs, totalSize, nextPageToken, err = test.ListRuns(
s.runClient,
&run_params.ListRunsParams{
PageSize: util.Int32Pointer(1),
SortBy: util.StringPointer("name"),
},
s.resourceNamespace)
assert.Nil(t, err)
assert.Equal(t, 1, len(runs))
assert.Equal(t, 2, totalSize)
assert.Equal(t, "argument parameter", runs[0].DisplayName)
runs, totalSize, _, err = test.ListRuns(
s.runClient,
&run_params.ListRunsParams{
PageSize: util.Int32Pointer(1),
SortBy: util.StringPointer("name"),
PageToken: util.StringPointer(nextPageToken),
},
s.resourceNamespace)
assert.Nil(t, err)
assert.Equal(t, 1, len(runs))
assert.Equal(t, 2, totalSize)
assert.Equal(t, "hello world", runs[0].DisplayName)
/* ---------- List the runs, sort by unsupported field ---------- */
_, _, _, err = test.ListRuns(
s.runClient,
&run_params.ListRunsParams{PageSize: util.Int32Pointer(2), SortBy: util.StringPointer("unknownfield")},
s.resourceNamespace)
assert.NotNil(t, err)
/* ---------- List runs for hello world experiment. One run should be returned ---------- */
runs, totalSize, _, err = s.runClient.List(&run_params.ListRunsParams{
ExperimentID: util.StringPointer(helloWorldExperiment.ExperimentID),
})
assert.Nil(t, err)
assert.Equal(t, 1, len(runs))
assert.Equal(t, 1, totalSize)
assert.Equal(t, "hello world", runs[0].DisplayName)
/* ---------- List the runs, filtered by created_at, only return the previous two runs ---------- */
time.Sleep(5 * time.Second) // Sleep for 5 seconds to make sure the previous runs are created at a different timestamp
filterTime := time.Now().Unix()
time.Sleep(5 * time.Second)
// Create a new run
createRunRequest.Body.DisplayName = "argument parameter 2"
_, err = s.runClient.Create(createRunRequest)
assert.Nil(t, err)
// Check total number of runs is 3
runs, totalSize, _, err = test.ListAllRuns(s.runClient, s.resourceNamespace)
assert.Nil(t, err)
assert.Equal(t, 3, len(runs))
assert.Equal(t, 3, totalSize)
// Check number of filtered runs created before filterTime to be 2
runs, totalSize, _, err = test.ListRuns(
s.runClient,
&run_params.ListRunsParams{
Filter: util.StringPointer(`{"predicates": [{"key": "created_at", "operation": "LESS_THAN", "string_value": "` + fmt.Sprint(filterTime) + `"}]}`),
},
s.resourceNamespace)
assert.Nil(t, err)
assert.Equal(t, 2, len(runs))
assert.Equal(t, 2, totalSize)
/* ---------- Archive a run ------------*/
err = s.runClient.Archive(&run_params.ArchiveRunParams{
RunID: helloWorldRunDetail.RunID,
})
assert.Nil(t, err)
/* ---------- List runs for hello world experiment. The same run should still be returned, but should be archived ---------- */
runs, totalSize, _, err = s.runClient.List(&run_params.ListRunsParams{
ExperimentID: util.StringPointer(helloWorldExperiment.ExperimentID),
})
assert.Nil(t, err)
assert.Equal(t, 1, len(runs))
assert.Equal(t, 1, totalSize)
assert.Equal(t, "hello world", runs[0].DisplayName)
assert.Equal(t, run_model.V2beta1RunStorageStateARCHIVED, runs[0].StorageState)
/* ---------- Upload long-running pipeline YAML ---------- */
longRunningPipeline, err := s.pipelineUploadClient.UploadFile("../resources/long-running.yaml", upload_params.NewUploadPipelineParamsWithTimeout(350))
assert.Nil(t, err)
/* ---------- Upload a long-running pipeline version YAML under longRunningPipeline ---------- */
time.Sleep(1 * time.Second)
longRunningPipelineVersion, err := s.pipelineUploadClient.UploadPipelineVersion("../resources/long-running.yaml", &upload_params.UploadPipelineVersionParams{
Name: util.StringPointer("long-running-version"),
Pipelineid: util.StringPointer(longRunningPipeline.PipelineID),
})
assert.Nil(t, err)
/* ---------- Create a new long-running run by specifying pipeline ID ---------- */
createLongRunningRunRequest := &run_params.CreateRunParams{Body: &run_model.V2beta1Run{
DisplayName: "long running",
Description: "this pipeline will run long enough for us to manually terminate it before it finishes",
ExperimentID: helloWorldExperiment.ExperimentID,
PipelineVersionReference: &run_model.V2beta1PipelineVersionReference{
PipelineID: longRunningPipelineVersion.PipelineID,
PipelineVersionID: longRunningPipelineVersion.PipelineVersionID,
},
}}
longRunningRun, err := s.runClient.Create(createLongRunningRunRequest)
assert.Nil(t, err)
/* ---------- Terminate the long-running run ------------*/
err = s.runClient.Terminate(&run_params.TerminateRunParams{
RunID: longRunningRun.RunID,
})
assert.Nil(t, err)
/* ---------- Get long-running run ---------- */
longRunningRun, err = s.runClient.Get(&run_params.GetRunParams{RunID: longRunningRun.RunID})
assert.Nil(t, err)
s.checkTerminatedRunDetail(t, longRunningRun, helloWorldExperiment.ExperimentID, longRunningPipelineVersion.PipelineID, longRunningPipelineVersion.PipelineVersionID)
}
func (s *RunApiTestSuite) checkTerminatedRunDetail(t *testing.T, run *run_model.V2beta1Run, experimentId string, pipelineId string, pipelineVersionId string) {
expectedRun := &run_model.V2beta1Run{
RunID: run.RunID,
DisplayName: "long running",
Description: "this pipeline will run long enough for us to manually terminate it before it finishes",
State: run_model.V2beta1RuntimeStateCANCELING,
StateHistory: run.StateHistory,
StorageState: run.StorageState,
ServiceAccount: test.GetDefaultPipelineRunnerServiceAccount(*isKubeflowMode),
PipelineSpec: run.PipelineSpec,
ExperimentID: experimentId,
PipelineVersionReference: &run_model.V2beta1PipelineVersionReference{
PipelineID: pipelineId,
PipelineVersionID: pipelineVersionId,
},
CreatedAt: run.CreatedAt,
ScheduledAt: run.ScheduledAt,
FinishedAt: run.FinishedAt,
}
assert.Equal(t, expectedRun, run)
}
func (s *RunApiTestSuite) checkHelloWorldRunDetail(t *testing.T, run *run_model.V2beta1Run, experimentId string, pipelineId string, pipelineVersionId string) {
expectedRun := &run_model.V2beta1Run{
RunID: run.RunID,
DisplayName: "hello world",
Description: "this is hello world",
State: run.State,
StateHistory: run.StateHistory,
StorageState: run.StorageState,
ServiceAccount: test.GetDefaultPipelineRunnerServiceAccount(*isKubeflowMode),
PipelineSpec: run.PipelineSpec,
ExperimentID: experimentId,
PipelineVersionReference: &run_model.V2beta1PipelineVersionReference{
PipelineID: pipelineId,
PipelineVersionID: pipelineVersionId,
},
CreatedAt: run.CreatedAt,
ScheduledAt: run.ScheduledAt,
FinishedAt: run.FinishedAt,
}
assert.Equal(t, expectedRun, run)
}
func (s *RunApiTestSuite) checkArgParamsRunDetail(t *testing.T, run *run_model.V2beta1Run, experimentId string) {
// Compare the pipeline spec first.
argParamsBytes, err := ioutil.ReadFile("../resources/arguments-parameters.yaml")
assert.Nil(t, err)
// pipeline_spec := &structpb.Struct{}
// err = yaml.Unmarshal(argParamsBytes, pipeline_spec)
// assert.Nil(t, err)
expected_bytes, err := yaml.YAMLToJSON(argParamsBytes)
assert.Nil(t, err)
actual_bytes, err := json.Marshal(run.PipelineSpec)
assert.Nil(t, err)
assert.Equal(t, string(expected_bytes), string(actual_bytes))
expectedRun := &run_model.V2beta1Run{
RunID: run.RunID,
DisplayName: "argument parameter",
Description: "this is argument parameter",
State: run.State,
StateHistory: run.StateHistory,
StorageState: run.StorageState,
ServiceAccount: test.GetDefaultPipelineRunnerServiceAccount(*isKubeflowMode),
PipelineSpec: run.PipelineSpec,
RuntimeConfig: &run_model.V2beta1RuntimeConfig{
Parameters: map[string]interface{}{
"param1": "goodbye",
"param2": "world",
},
},
ExperimentID: experimentId,
CreatedAt: run.CreatedAt,
ScheduledAt: run.ScheduledAt,
FinishedAt: run.FinishedAt,
}
assert.Equal(t, expectedRun, run)
}
func TestRunApi(t *testing.T) {
suite.Run(t, new(RunApiTestSuite))
}
func (s *RunApiTestSuite) TearDownSuite() {
if *runIntegrationTests {
if !*isDevMode {
s.cleanUp()
}
}
}
func (s *RunApiTestSuite) cleanUp() {
/* ---------- Clean up ---------- */
test.DeleteAllRuns(s.runClient, s.resourceNamespace, s.T())
test.DeleteAllPipelines(s.pipelineClient, s.T())
test.DeleteAllExperiments(s.experimentClient, s.resourceNamespace, s.T())
}