pipelines/backend/test/v2/integration/recurring_run_api_test.go

721 lines
30 KiB
Go

// Copyright 2018-2023 The Kubeflow Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package integration
import (
"context"
"fmt"
"io/ioutil"
"strings"
"testing"
"time"
"github.com/eapache/go-resiliency/retrier"
"github.com/go-openapi/strfmt"
"github.com/golang/glog"
experiment_params "github.com/kubeflow/pipelines/backend/api/v2beta1/go_http_client/experiment_client/experiment_service"
params "github.com/kubeflow/pipelines/backend/api/v2beta1/go_http_client/pipeline_client/pipeline_service"
upload_params "github.com/kubeflow/pipelines/backend/api/v2beta1/go_http_client/pipeline_upload_client/pipeline_upload_service"
recurring_run_params "github.com/kubeflow/pipelines/backend/api/v2beta1/go_http_client/recurring_run_client/recurring_run_service"
"github.com/kubeflow/pipelines/backend/api/v2beta1/go_http_client/recurring_run_model"
run_params "github.com/kubeflow/pipelines/backend/api/v2beta1/go_http_client/run_client/run_service"
"github.com/kubeflow/pipelines/backend/api/v2beta1/go_http_client/run_model"
"github.com/kubeflow/pipelines/backend/src/apiserver/client"
api_server "github.com/kubeflow/pipelines/backend/src/common/client/api_server/v2"
"github.com/kubeflow/pipelines/backend/src/common/util"
test "github.com/kubeflow/pipelines/backend/test/v2"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/suite"
"google.golang.org/protobuf/types/known/structpb"
v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"sigs.k8s.io/yaml"
)
const (
second = 1
minute = 60 * second
hour = 60 * minute
)
type RecurringRunApiTestSuite struct {
suite.Suite
namespace string
resourceNamespace string
experimentClient *api_server.ExperimentClient
pipelineClient *api_server.PipelineClient
pipelineUploadClient *api_server.PipelineUploadClient
runClient *api_server.RunClient
recurringRunClient *api_server.RecurringRunClient
swfClient client.SwfClientInterface
}
// Check the namespace have ML pipeline installed and ready
func (s *RecurringRunApiTestSuite) SetupTest() {
if !*runIntegrationTests {
s.T().SkipNow()
return
}
if !*isDevMode {
err := test.WaitForReady(*namespace, *initializeTimeout)
if err != nil {
glog.Exitf("Failed to initialize test. Error: %s", err.Error())
}
}
s.namespace = *namespace
var newExperimentClient func() (*api_server.ExperimentClient, error)
var newPipelineUploadClient func() (*api_server.PipelineUploadClient, error)
var newPipelineClient func() (*api_server.PipelineClient, error)
var newRunClient func() (*api_server.RunClient, error)
var newRecurringRunClient func() (*api_server.RecurringRunClient, error)
if *isKubeflowMode {
s.resourceNamespace = *resourceNamespace
newExperimentClient = func() (*api_server.ExperimentClient, error) {
return api_server.NewKubeflowInClusterExperimentClient(s.namespace, *isDebugMode)
}
newPipelineUploadClient = func() (*api_server.PipelineUploadClient, error) {
return api_server.NewKubeflowInClusterPipelineUploadClient(s.namespace, *isDebugMode)
}
newPipelineClient = func() (*api_server.PipelineClient, error) {
return api_server.NewKubeflowInClusterPipelineClient(s.namespace, *isDebugMode)
}
newRunClient = func() (*api_server.RunClient, error) {
return api_server.NewKubeflowInClusterRunClient(s.namespace, *isDebugMode)
}
newRecurringRunClient = func() (*api_server.RecurringRunClient, error) {
return api_server.NewKubeflowInClusterRecurringRunClient(s.namespace, *isDebugMode)
}
} else {
clientConfig := test.GetClientConfig(*namespace)
newExperimentClient = func() (*api_server.ExperimentClient, error) {
return api_server.NewExperimentClient(clientConfig, *isDebugMode)
}
newPipelineUploadClient = func() (*api_server.PipelineUploadClient, error) {
return api_server.NewPipelineUploadClient(clientConfig, *isDebugMode)
}
newPipelineClient = func() (*api_server.PipelineClient, error) {
return api_server.NewPipelineClient(clientConfig, *isDebugMode)
}
newRunClient = func() (*api_server.RunClient, error) {
return api_server.NewRunClient(clientConfig, *isDebugMode)
}
newRecurringRunClient = func() (*api_server.RecurringRunClient, error) {
return api_server.NewRecurringRunClient(clientConfig, *isDebugMode)
}
}
var err error
s.experimentClient, err = newExperimentClient()
if err != nil {
glog.Exitf("Failed to get experiment client. Error: %v", err)
}
s.pipelineUploadClient, err = newPipelineUploadClient()
if err != nil {
glog.Exitf("Failed to get pipeline upload client. Error: %s", err.Error())
}
s.pipelineClient, err = newPipelineClient()
if err != nil {
glog.Exitf("Failed to get pipeline client. Error: %s", err.Error())
}
s.runClient, err = newRunClient()
if err != nil {
glog.Exitf("Failed to get run client. Error: %s", err.Error())
}
s.recurringRunClient, err = newRecurringRunClient()
if err != nil {
glog.Exitf("Failed to get recurringRun client. Error: %s", err.Error())
}
s.swfClient = client.NewScheduledWorkflowClientOrFatal(time.Second*30, util.ClientParameters{QPS: 5, Burst: 10})
s.cleanUp()
}
func (s *RecurringRunApiTestSuite) TestRecurringRunApis() {
t := s.T()
/* ---------- Upload pipelines YAML ---------- */
helloWorldPipeline, err := s.pipelineUploadClient.UploadFile("../resources/hello-world.yaml", upload_params.NewUploadPipelineParams())
assert.Nil(t, err)
/* ---------- Upload pipeline version YAML ---------- */
time.Sleep(1 * time.Second)
helloWorldPipelineVersion, err := s.pipelineUploadClient.UploadPipelineVersion(
"../resources/hello-world.yaml", &upload_params.UploadPipelineVersionParams{
Name: util.StringPointer("hello-world-version"),
Pipelineid: util.StringPointer(helloWorldPipeline.PipelineID),
})
assert.Nil(t, err)
/* ---------- Create a new hello world experiment ---------- */
experiment := test.MakeExperiment("hello world experiment", "", s.resourceNamespace)
helloWorldExperiment, err := s.experimentClient.Create(&experiment_params.CreateExperimentParams{Body: experiment})
assert.Nil(t, err)
/* ---------- Create a new hello world recurringRun by specifying pipeline ID ---------- */
createRecurringRunRequest := &recurring_run_params.CreateRecurringRunParams{Body: &recurring_run_model.V2beta1RecurringRun{
DisplayName: "hello world",
Description: "this is hello world",
ExperimentID: helloWorldExperiment.ExperimentID,
PipelineVersionReference: &recurring_run_model.V2beta1PipelineVersionReference{
PipelineID: helloWorldPipelineVersion.PipelineID,
PipelineVersionID: helloWorldPipelineVersion.PipelineVersionID,
},
MaxConcurrency: 10,
Mode: recurring_run_model.RecurringRunModeENABLE,
}}
helloWorldRecurringRun, err := s.recurringRunClient.Create(createRecurringRunRequest)
assert.Nil(t, err)
s.checkHelloWorldRecurringRun(t, helloWorldRecurringRun, helloWorldExperiment.ExperimentID, helloWorldPipelineVersion.PipelineID, helloWorldPipelineVersion.PipelineVersionID)
/* ---------- Get hello world recurringRun ---------- */
helloWorldRecurringRun, err = s.recurringRunClient.Get(&recurring_run_params.GetRecurringRunParams{RecurringRunID: helloWorldRecurringRun.RecurringRunID})
assert.Nil(t, err)
s.checkHelloWorldRecurringRun(t, helloWorldRecurringRun, helloWorldExperiment.ExperimentID, helloWorldPipelineVersion.PipelineID, helloWorldPipelineVersion.PipelineVersionID)
/* ---------- Create a new argument parameter experiment ---------- */
experiment = test.MakeExperiment("argument parameter experiment", "", s.resourceNamespace)
argParamsExperiment, err := s.experimentClient.Create(&experiment_params.CreateExperimentParams{Body: experiment})
assert.Nil(t, err)
/* ---------- Create a new argument parameter recurringRun by uploading workflow manifest ---------- */
// Make sure the recurringRun is created at least 1 second later than the first one,
// because sort by created_at has precision of 1 second.
time.Sleep(1 * time.Second)
argParamsBytes, err := ioutil.ReadFile("../resources/arguments-parameters.yaml")
assert.Nil(t, err)
pipeline_spec := &structpb.Struct{}
err = yaml.Unmarshal(argParamsBytes, pipeline_spec)
assert.Nil(t, err)
createRecurringRunRequest = &recurring_run_params.CreateRecurringRunParams{Body: &recurring_run_model.V2beta1RecurringRun{
DisplayName: "argument parameter",
Description: "this is argument parameter",
ExperimentID: argParamsExperiment.ExperimentID,
PipelineSpec: pipeline_spec,
RuntimeConfig: &recurring_run_model.V2beta1RuntimeConfig{
Parameters: map[string]interface{}{
"param1": "goodbye",
"param2": "world",
},
},
MaxConcurrency: 10,
Mode: recurring_run_model.RecurringRunModeENABLE,
}}
argParamsRecurringRun, err := s.recurringRunClient.Create(createRecurringRunRequest)
assert.Nil(t, err)
s.checkArgParamsRecurringRun(t, argParamsRecurringRun, argParamsExperiment.ExperimentID)
/* ---------- List all the recurringRuns. Both recurringRuns should be returned ---------- */
recurringRuns, totalSize, _, err := test.ListAllRecurringRuns(s.recurringRunClient, s.resourceNamespace)
assert.Nil(t, err)
assert.Equal(t, 2, totalSize, "Incorrect total number of recurring runs in the namespace %v", s.resourceNamespace)
assert.Equal(t, 2, len(recurringRuns), "Incorrect total number of recurring runs in the namespace %v", s.resourceNamespace)
/* ---------- List the recurringRuns, paginated, sort by creation time ---------- */
recurringRuns, totalSize, nextPageToken, err := test.ListRecurringRuns(
s.recurringRunClient,
&recurring_run_params.ListRecurringRunsParams{
PageSize: util.Int32Pointer(1),
SortBy: util.StringPointer("created_at"),
},
s.resourceNamespace)
assert.Nil(t, err)
assert.Equal(t, 1, len(recurringRuns))
assert.Equal(t, 2, totalSize)
assert.Equal(t, "hello world", recurringRuns[0].DisplayName)
recurringRuns, totalSize, _, err = test.ListRecurringRuns(
s.recurringRunClient,
&recurring_run_params.ListRecurringRunsParams{
PageSize: util.Int32Pointer(1),
PageToken: util.StringPointer(nextPageToken),
},
s.resourceNamespace)
assert.Nil(t, err)
assert.Equal(t, 1, len(recurringRuns))
assert.Equal(t, 2, totalSize)
assert.Equal(t, "argument parameter", recurringRuns[0].DisplayName)
/* ---------- List the recurringRuns, paginated, sort by name ---------- */
recurringRuns, totalSize, nextPageToken, err = test.ListRecurringRuns(
s.recurringRunClient,
&recurring_run_params.ListRecurringRunsParams{
PageSize: util.Int32Pointer(1),
SortBy: util.StringPointer("name"),
},
s.resourceNamespace)
assert.Nil(t, err)
assert.Equal(t, 2, totalSize)
assert.Equal(t, 1, len(recurringRuns))
assert.Equal(t, "argument parameter", recurringRuns[0].DisplayName)
recurringRuns, totalSize, _, err = test.ListRecurringRuns(
s.recurringRunClient,
&recurring_run_params.ListRecurringRunsParams{
PageSize: util.Int32Pointer(1),
SortBy: util.StringPointer("name"),
PageToken: util.StringPointer(nextPageToken),
},
s.resourceNamespace)
assert.Nil(t, err)
assert.Equal(t, 2, totalSize)
assert.Equal(t, 1, len(recurringRuns))
assert.Equal(t, "hello world", recurringRuns[0].DisplayName)
/* ---------- List the recurringRuns, sort by unsupported field ---------- */
recurringRuns, _, _, err = test.ListRecurringRuns(
s.recurringRunClient,
&recurring_run_params.ListRecurringRunsParams{
PageSize: util.Int32Pointer(2),
SortBy: util.StringPointer("unknown"),
},
s.resourceNamespace)
assert.NotNil(t, err)
assert.Equal(t, len(recurringRuns), 0)
/* ---------- List recurringRuns for hello world experiment. One recurringRun should be returned ---------- */
recurringRuns, totalSize, _, err = s.recurringRunClient.List(&recurring_run_params.ListRecurringRunsParams{
ExperimentID: util.StringPointer(helloWorldExperiment.ExperimentID),
})
assert.Nil(t, err)
assert.Equal(t, 1, len(recurringRuns))
assert.Equal(t, 1, totalSize)
assert.Equal(t, "hello world", recurringRuns[0].DisplayName)
/* ---------- List the recurringRuns, filtered by created_at, only return the previous two recurringRuns ---------- */
time.Sleep(5 * time.Second) // Sleep for 5 seconds to make sure the previous recurringRuns are created at a different timestamp
filterTime := time.Now().Unix()
time.Sleep(5 * time.Second)
createRecurringRunRequestNew := &recurring_run_params.CreateRecurringRunParams{Body: &recurring_run_model.V2beta1RecurringRun{
DisplayName: "new hello world recurringRun",
Description: "this is a new hello world",
ExperimentID: helloWorldExperiment.ExperimentID,
PipelineVersionReference: &recurring_run_model.V2beta1PipelineVersionReference{
PipelineID: helloWorldPipelineVersion.PipelineID,
PipelineVersionID: helloWorldPipelineVersion.PipelineVersionID,
},
MaxConcurrency: 10,
Mode: recurring_run_model.RecurringRunModeDISABLE,
}}
_, err = s.recurringRunClient.Create(createRecurringRunRequestNew)
assert.Nil(t, err)
// Check total number of recurringRuns to be 3
recurringRuns, totalSize, _, err = test.ListAllRecurringRuns(s.recurringRunClient, s.resourceNamespace)
assert.Nil(t, err)
assert.Equal(t, 3, totalSize)
assert.Equal(t, 3, len(recurringRuns))
// Check number of filtered recurringRuns finished before filterTime to be 2
recurringRuns, totalSize, _, err = test.ListRecurringRuns(
s.recurringRunClient,
&recurring_run_params.ListRecurringRunsParams{
Filter: util.StringPointer(`{"predicates": [{"key": "created_at", "operation": "LESS_THAN", "string_value": "` + fmt.Sprint(filterTime) + `"}]}`),
},
s.resourceNamespace)
assert.Nil(t, err)
assert.Equal(t, 2, len(recurringRuns))
assert.Equal(t, 2, totalSize)
// The scheduledWorkflow CRD would create the run and it synced to the DB by persistent agent.
// This could take a few seconds to finish.
/* ---------- Check run for hello world recurringRun ---------- */
if err := retrier.New(retrier.ConstantBackoff(8, 5*time.Second), nil).Run(func() error {
runs, totalSize, _, err := s.runClient.List(&run_params.ListRunsParams{
ExperimentID: util.StringPointer(helloWorldExperiment.ExperimentID),
})
if err != nil {
return err
}
if len(runs) != 1 {
return fmt.Errorf("expected runs to be length 1, got: %v", len(runs))
}
if totalSize != 1 {
return fmt.Errorf("expected total size 1, got: %v", totalSize)
}
helloWorldRun := runs[0]
return s.checkHelloWorldRun(helloWorldRun, helloWorldExperiment.ExperimentID, helloWorldRecurringRun.RecurringRunID)
}); err != nil {
assert.Nil(t, err)
}
/* ---------- Check run for argument parameter recurringRun ---------- */
if err := retrier.New(retrier.ConstantBackoff(8, 5*time.Second), nil).Run(func() error {
runs, totalSize, _, err := s.runClient.List(&run_params.ListRunsParams{
ExperimentID: util.StringPointer(argParamsExperiment.ExperimentID),
})
if err != nil {
return err
}
if len(runs) != 1 {
return fmt.Errorf("expected runs to be length 1, got: %v", len(runs))
}
if totalSize != 1 {
return fmt.Errorf("expected total size 1, got: %v", totalSize)
}
argParamsRun := runs[0]
return s.checkArgParamsRun(argParamsRun, argParamsExperiment.ExperimentID, argParamsRecurringRun.RecurringRunID)
}); err != nil {
assert.Nil(t, err)
}
}
func (s *RecurringRunApiTestSuite) TestRecurringRunApis_noCatchupOption() {
t := s.T()
/* ---------- Upload pipelines YAML ---------- */
pipeline, err := s.pipelineUploadClient.UploadFile("../resources/hello-world.yaml", upload_params.NewUploadPipelineParams())
assert.Nil(t, err)
/* ---------- Upload pipeline version YAML ---------- */
time.Sleep(1 * time.Second)
helloWorldPipelineVersion, err := s.pipelineUploadClient.UploadPipelineVersion(
"../resources/hello-world.yaml", &upload_params.UploadPipelineVersionParams{
Name: util.StringPointer("hello-world-version"),
Pipelineid: util.StringPointer(pipeline.PipelineID),
})
assert.Nil(t, err)
/* ---------- Create a periodic recurringRun with start and end date in the past and catchup = true ---------- */
experiment := test.MakeExperiment("periodic catchup true", "", s.resourceNamespace)
periodicCatchupTrueExperiment, err := s.experimentClient.Create(&experiment_params.CreateExperimentParams{Body: experiment})
assert.Nil(t, err)
recurringRun := recurringRunInThePastForTwoMinutes(recurringRunOptions{
pipelineId: helloWorldPipelineVersion.PipelineID,
pipelineVersionId: helloWorldPipelineVersion.PipelineVersionID,
experimentId: periodicCatchupTrueExperiment.ExperimentID,
periodic: true,
})
recurringRun.DisplayName = "periodic-catchup-true-"
recurringRun.Description = "A recurringRun with NoCatchup=false will backfill each past interval when behind schedule."
recurringRun.NoCatchup = false // This is the key difference.
createRecurringRunRequest := &recurring_run_params.CreateRecurringRunParams{Body: recurringRun}
_, err = s.recurringRunClient.Create(createRecurringRunRequest)
assert.Nil(t, err)
/* -------- Create another periodic recurringRun with start and end date in the past but catchup = false ------ */
experiment = test.MakeExperiment("periodic catchup false", "", s.resourceNamespace)
periodicCatchupFalseExperiment, err := s.experimentClient.Create(&experiment_params.CreateExperimentParams{Body: experiment})
assert.Nil(t, err)
recurringRun = recurringRunInThePastForTwoMinutes(recurringRunOptions{
pipelineId: helloWorldPipelineVersion.PipelineID,
pipelineVersionId: helloWorldPipelineVersion.PipelineVersionID,
experimentId: periodicCatchupFalseExperiment.ExperimentID,
periodic: true,
})
recurringRun.DisplayName = "periodic-catchup-false-"
recurringRun.Description = "A recurringRun with NoCatchup=true only schedules the last interval when behind schedule."
recurringRun.NoCatchup = true // This is the key difference.
createRecurringRunRequest = &recurring_run_params.CreateRecurringRunParams{Body: recurringRun}
_, err = s.recurringRunClient.Create(createRecurringRunRequest)
assert.Nil(t, err)
/* ---------- Create a cron recurringRun with start and end date in the past and catchup = true ---------- */
experiment = test.MakeExperiment("cron catchup true", "", s.resourceNamespace)
cronCatchupTrueExperiment, err := s.experimentClient.Create(&experiment_params.CreateExperimentParams{Body: experiment})
assert.Nil(t, err)
recurringRun = recurringRunInThePastForTwoMinutes(recurringRunOptions{
pipelineId: helloWorldPipelineVersion.PipelineID,
pipelineVersionId: helloWorldPipelineVersion.PipelineVersionID,
experimentId: cronCatchupTrueExperiment.ExperimentID,
periodic: false,
})
recurringRun.DisplayName = "cron-catchup-true-"
recurringRun.Description = "A recurringRun with NoCatchup=false will backfill each past interval when behind schedule."
recurringRun.NoCatchup = false // This is the key difference.
createRecurringRunRequest = &recurring_run_params.CreateRecurringRunParams{Body: recurringRun}
_, err = s.recurringRunClient.Create(createRecurringRunRequest)
assert.Nil(t, err)
/* -------- Create another cron recurringRun with start and end date in the past but catchup = false ------ */
experiment = test.MakeExperiment("cron catchup false", "", s.resourceNamespace)
cronCatchupFalseExperiment, err := s.experimentClient.Create(&experiment_params.CreateExperimentParams{Body: experiment})
assert.Nil(t, err)
recurringRun = recurringRunInThePastForTwoMinutes(recurringRunOptions{
pipelineId: helloWorldPipelineVersion.PipelineID,
pipelineVersionId: helloWorldPipelineVersion.PipelineVersionID,
experimentId: cronCatchupFalseExperiment.ExperimentID,
periodic: false,
})
recurringRun.DisplayName = "cron-catchup-false-"
recurringRun.Description = "A recurringRun with NoCatchup=true only schedules the last interval when behind schedule."
recurringRun.NoCatchup = true // This is the key difference.
createRecurringRunRequest = &recurring_run_params.CreateRecurringRunParams{Body: recurringRun}
_, err = s.recurringRunClient.Create(createRecurringRunRequest)
assert.Nil(t, err)
// The scheduledWorkflow CRD would create the run and it is synced to the DB by persistent agent.
// This could take a few seconds to finish.
/* ---------- Assert number of runs when catchup = true ---------- */
if err := retrier.New(retrier.ConstantBackoff(8, 5*time.Second), nil).Run(func() error {
_, runsWhenCatchupTrue, _, err := s.runClient.List(&run_params.ListRunsParams{
ExperimentID: util.StringPointer(periodicCatchupTrueExperiment.ExperimentID),
})
if err != nil {
return err
}
if runsWhenCatchupTrue != 2 {
return fmt.Errorf("expected runsWhenCatchupTrue with periodic schedule to be 2, got: %v", runsWhenCatchupTrue)
}
_, runsWhenCatchupTrue, _, err = s.runClient.List(&run_params.ListRunsParams{
ExperimentID: util.StringPointer(cronCatchupTrueExperiment.ExperimentID),
})
if err != nil {
return err
}
if runsWhenCatchupTrue != 2 {
return fmt.Errorf("expected runsWhenCatchupTrue with cron schedule to be 2, got: %v", runsWhenCatchupTrue)
}
return nil
}); err != nil {
assert.Nil(t, err)
}
/* ---------- Assert number of runs when catchup = false ---------- */
if err := retrier.New(retrier.ConstantBackoff(8, 5*time.Second), nil).Run(func() error {
_, runsWhenCatchupFalse, _, err := s.runClient.List(&run_params.ListRunsParams{
ExperimentID: util.StringPointer(periodicCatchupFalseExperiment.ExperimentID),
})
if err != nil {
return err
}
if runsWhenCatchupFalse != 1 {
return fmt.Errorf("expected runsWhenCatchupFalse with periodic schedule to be 1, got: %v", runsWhenCatchupFalse)
}
_, runsWhenCatchupFalse, _, err = s.runClient.List(&run_params.ListRunsParams{
ExperimentID: util.StringPointer(cronCatchupFalseExperiment.ExperimentID),
})
if err != nil {
return err
}
if runsWhenCatchupFalse != 1 {
return fmt.Errorf("expected runsWhenCatchupFalse with cron schedule to be 1, got: %v", runsWhenCatchupFalse)
}
return nil
}); err != nil {
assert.Nil(t, err)
}
}
func (s *RecurringRunApiTestSuite) checkHelloWorldRecurringRun(t *testing.T, recurringRun *recurring_run_model.V2beta1RecurringRun, experimentID string, pipelineId string, pipelineVersionId string) {
expectedRecurringRun := &recurring_run_model.V2beta1RecurringRun{
RecurringRunID: recurringRun.RecurringRunID,
DisplayName: "hello world",
Description: "this is hello world",
ServiceAccount: test.GetDefaultPipelineRunnerServiceAccount(*isKubeflowMode),
PipelineSpec: recurringRun.PipelineSpec,
ExperimentID: experimentID,
PipelineVersionReference: &recurring_run_model.V2beta1PipelineVersionReference{
PipelineID: pipelineId,
PipelineVersionID: pipelineVersionId,
},
MaxConcurrency: 10,
Mode: recurring_run_model.RecurringRunModeENABLE,
Namespace: recurringRun.Namespace,
CreatedAt: recurringRun.CreatedAt,
UpdatedAt: recurringRun.UpdatedAt,
Trigger: recurringRun.Trigger,
Status: recurringRun.Status,
}
assert.Equal(t, expectedRecurringRun, recurringRun)
}
func (s *RecurringRunApiTestSuite) checkArgParamsRecurringRun(t *testing.T, recurringRun *recurring_run_model.V2beta1RecurringRun, experimentID string) {
expectedRecurringRun := &recurring_run_model.V2beta1RecurringRun{
RecurringRunID: recurringRun.RecurringRunID,
DisplayName: "argument parameter",
Description: "this is argument parameter",
ServiceAccount: test.GetDefaultPipelineRunnerServiceAccount(*isKubeflowMode),
PipelineSpec: recurringRun.PipelineSpec,
RuntimeConfig: &recurring_run_model.V2beta1RuntimeConfig{
Parameters: map[string]interface{}{
"param1": "goodbye",
"param2": "world",
},
},
ExperimentID: experimentID,
MaxConcurrency: 10,
Mode: recurring_run_model.RecurringRunModeENABLE,
Namespace: recurringRun.Namespace,
CreatedAt: recurringRun.CreatedAt,
UpdatedAt: recurringRun.UpdatedAt,
Trigger: recurringRun.Trigger,
Status: recurringRun.Status,
}
assert.Equal(t, expectedRecurringRun, recurringRun)
}
func (s *RecurringRunApiTestSuite) TestRecurringRunApis_SwfNotFound() {
t := s.T()
/* ---------- Upload pipelines YAML ---------- */
pipeline, err := s.pipelineUploadClient.UploadFile("../resources/hello-world.yaml", upload_params.NewUploadPipelineParams())
assert.Nil(t, err)
pipelineVersions, totalSize, _, err := s.pipelineClient.ListPipelineVersions(&params.ListPipelineVersionsParams{
PipelineID: pipeline.PipelineID,
})
assert.Nil(t, err)
assert.Equal(t, totalSize, 1)
/* ---------- Create a new hello world recurringRun by specifying pipeline ID ---------- */
experiment := test.MakeExperiment("test-swf-not-found experiment", "", s.resourceNamespace)
swfNotFoundExperiment, err := s.experimentClient.Create(&experiment_params.CreateExperimentParams{Body: experiment})
assert.Nil(t, err)
createRecurringRunRequest := &recurring_run_params.CreateRecurringRunParams{Body: &recurring_run_model.V2beta1RecurringRun{
DisplayName: "test-swf-not-found",
ExperimentID: swfNotFoundExperiment.ExperimentID,
PipelineVersionReference: &recurring_run_model.V2beta1PipelineVersionReference{
PipelineID: pipelineVersions[0].PipelineID,
PipelineVersionID: pipelineVersions[0].PipelineVersionID,
},
MaxConcurrency: 10,
Mode: recurring_run_model.RecurringRunModeDISABLE,
}}
recurringRun, err := s.recurringRunClient.Create(createRecurringRunRequest)
assert.Nil(t, err)
// Delete all ScheduledWorkflow custom resources to simulate the situation
// that after reinstalling KFP with managed storage, only KFP DB is kept,
// but all KFP custom resources are gone.
swfNamespace := s.namespace
if s.resourceNamespace != "" {
swfNamespace = s.resourceNamespace
}
err = s.swfClient.ScheduledWorkflow(swfNamespace).DeleteCollection(context.Background(), &v1.DeleteOptions{}, v1.ListOptions{})
assert.Nil(t, err)
err = s.recurringRunClient.Delete(&recurring_run_params.DeleteRecurringRunParams{RecurringRunID: recurringRun.RecurringRunID})
assert.Nil(t, err)
/* ---------- Get recurringRun ---------- */
_, err = s.recurringRunClient.Get(&recurring_run_params.GetRecurringRunParams{RecurringRunID: recurringRun.RecurringRunID})
assert.NotNil(t, err)
assert.Contains(t, err.Error(), "status 404")
}
func (s *RecurringRunApiTestSuite) checkHelloWorldRun(run *run_model.V2beta1Run, experimentID string, recurringRunID string) error {
if !strings.Contains(run.DisplayName, "helloworld") {
return fmt.Errorf("expected: %+v got: %+v", "helloworld", run.DisplayName)
}
if run.ExperimentID != experimentID {
return fmt.Errorf("expected: %+v got: %+v", experimentID, run.ExperimentID)
}
if run.RecurringRunID != recurringRunID {
return fmt.Errorf("expected: %+v got: %+v", recurringRunID, run.RecurringRunID)
}
return nil
}
func (s *RecurringRunApiTestSuite) checkArgParamsRun(run *run_model.V2beta1Run, experimentID, recurringRunID string) error {
if !strings.Contains(run.DisplayName, "argumentparameter") {
return fmt.Errorf("expected: %+v got: %+v", "argumentparameter", run.DisplayName)
}
if run.ExperimentID != experimentID {
return fmt.Errorf("expected: %+v got: %+v", experimentID, run.ExperimentID)
}
if run.RecurringRunID != recurringRunID {
return fmt.Errorf("expected: %+v got: %+v", recurringRunID, run.RecurringRunID)
}
return nil
}
func TestRecurringRunApi(t *testing.T) {
suite.Run(t, new(RecurringRunApiTestSuite))
}
func (s *RecurringRunApiTestSuite) TearDownSuite() {
if *runIntegrationTests {
if !*isDevMode {
s.cleanUp()
}
}
}
/** ======== the following are util functions ========= **/
func (s *RecurringRunApiTestSuite) cleanUp() {
test.DeleteAllRuns(s.runClient, s.resourceNamespace, s.T())
test.DeleteAllRecurringRuns(s.recurringRunClient, s.resourceNamespace, s.T())
test.DeleteAllPipelines(s.pipelineClient, s.T())
test.DeleteAllExperiments(s.experimentClient, s.resourceNamespace, s.T())
}
func defaultV2beta1RecurringRun(pipelineId, pipelineVersionId, experimentId string) *recurring_run_model.V2beta1RecurringRun {
return &recurring_run_model.V2beta1RecurringRun{
DisplayName: "default-pipeline-name",
Description: "This is a default pipeline",
ExperimentID: experimentId,
PipelineVersionReference: &recurring_run_model.V2beta1PipelineVersionReference{
PipelineID: pipelineId,
PipelineVersionID: pipelineVersionId,
},
MaxConcurrency: 10,
NoCatchup: false,
Trigger: &recurring_run_model.V2beta1Trigger{
PeriodicSchedule: &recurring_run_model.V2beta1PeriodicSchedule{
StartTime: strfmt.NewDateTime(),
EndTime: strfmt.NewDateTime(),
IntervalSecond: 60,
},
},
Mode: recurring_run_model.RecurringRunModeENABLE,
}
}
type recurringRunOptions struct {
pipelineId, pipelineVersionId, experimentId string
periodic bool
}
func recurringRunInThePastForTwoMinutes(options recurringRunOptions) *recurring_run_model.V2beta1RecurringRun {
startTime := strfmt.DateTime(time.Unix(10*hour, 0))
endTime := strfmt.DateTime(time.Unix(10*hour+2*minute, 0))
recurringRun := defaultV2beta1RecurringRun(options.pipelineId, options.pipelineVersionId, options.experimentId)
if options.periodic {
recurringRun.Trigger = &recurring_run_model.V2beta1Trigger{
PeriodicSchedule: &recurring_run_model.V2beta1PeriodicSchedule{
StartTime: startTime,
EndTime: endTime,
IntervalSecond: 60, // Runs every 1 minute.
},
}
} else {
recurringRun.Trigger = &recurring_run_model.V2beta1Trigger{
CronSchedule: &recurring_run_model.V2beta1CronSchedule{
StartTime: startTime,
EndTime: endTime,
Cron: "0 * * * * ?", // Runs every 1 minute.
},
}
}
return recurringRun
}