// Copyright 2018-2023 The Kubeflow Authors // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at // // https://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. package test import ( "fmt" "net/http" "os" "testing" "time" "github.com/cenkalti/backoff" experiment_params "github.com/kubeflow/pipelines/backend/api/v2beta1/go_http_client/experiment_client/experiment_service" "github.com/kubeflow/pipelines/backend/api/v2beta1/go_http_client/experiment_model" pipeline_params "github.com/kubeflow/pipelines/backend/api/v2beta1/go_http_client/pipeline_client/pipeline_service" "github.com/kubeflow/pipelines/backend/api/v2beta1/go_http_client/pipeline_model" recurring_run_params "github.com/kubeflow/pipelines/backend/api/v2beta1/go_http_client/recurring_run_client/recurring_run_service" "github.com/kubeflow/pipelines/backend/api/v2beta1/go_http_client/recurring_run_model" run_params "github.com/kubeflow/pipelines/backend/api/v2beta1/go_http_client/run_client/run_service" "github.com/kubeflow/pipelines/backend/api/v2beta1/go_http_client/run_model" api_server "github.com/kubeflow/pipelines/backend/src/common/client/api_server/v2" "github.com/pkg/errors" "github.com/stretchr/testify/assert" "k8s.io/client-go/tools/clientcmd" clientcmdapi "k8s.io/client-go/tools/clientcmd/api" ) func WaitForReady(namespace string, initializeTimeout time.Duration) error { operation := func() error { response, err := http.Get(fmt.Sprintf("http://ml-pipeline.%s.svc.cluster.local:8888/apis/v2beta1/healthz", namespace)) if err != nil { return err } // If we get a 503 service unavailable, it's a non-retriable error. if response.StatusCode == 503 { return backoff.Permanent(errors.Wrapf( err, "Waiting for ml pipeline API server failed with non retriable error.")) } return nil } b := backoff.NewExponentialBackOff() b.MaxElapsedTime = initializeTimeout err := backoff.Retry(operation, b) return errors.Wrapf(err, "Waiting for ml pipeline API server failed after all attempts.") } func GetClientConfig(namespace string) clientcmd.ClientConfig { loadingRules := clientcmd.NewDefaultClientConfigLoadingRules() loadingRules.DefaultClientConfig = &clientcmd.DefaultClientConfig overrides := clientcmd.ConfigOverrides{Context: clientcmdapi.Context{Namespace: namespace}} return clientcmd.NewInteractiveDeferredLoadingClientConfig(loadingRules, &overrides, os.Stdin) } func GetDefaultPipelineRunnerServiceAccount(isKubeflowMode bool) string { if isKubeflowMode { return "default-editor" } else { return "pipeline-runner" } } func ListAllExperiment(client *api_server.ExperimentClient, namespace string) ([]*experiment_model.V2beta1Experiment, int, string, error) { return ListExperiment(client, &experiment_params.ListExperimentsParams{}, namespace) } func ListExperiment(client *api_server.ExperimentClient, parameters *experiment_params.ListExperimentsParams, namespace string) ([]*experiment_model.V2beta1Experiment, int, string, error) { if namespace != "" { parameters.Namespace = &namespace } return client.List(parameters) } func DeleteAllExperiments(client *api_server.ExperimentClient, namespace string, t *testing.T) { experiments, _, _, err := ListAllExperiment(client, namespace) assert.Nil(t, err) for _, e := range experiments { if e.DisplayName != "Default" { assert.Nil(t, client.Delete(&experiment_params.DeleteExperimentParams{ExperimentID: e.ExperimentID})) } } } func MakeExperiment(name string, description string, namespace string) *experiment_model.V2beta1Experiment { experiment := &experiment_model.V2beta1Experiment{ DisplayName: name, Description: description, } if namespace != "" { experiment.Namespace = namespace } return experiment } func ListRuns(client *api_server.RunClient, parameters *run_params.ListRunsParams, namespace string) ([]*run_model.V2beta1Run, int, string, error) { if namespace != "" { parameters.Namespace = &namespace } return client.List(parameters) } func ListAllRuns(client *api_server.RunClient, namespace string) ([]*run_model.V2beta1Run, int, string, error) { parameters := &run_params.ListRunsParams{} return ListRuns(client, parameters, namespace) } func DeleteAllRuns(client *api_server.RunClient, namespace string, t *testing.T) { runs, _, _, err := ListAllRuns(client, namespace) assert.Nil(t, err) for _, r := range runs { assert.Nil(t, client.Delete(&run_params.DeleteRunParams{RunID: r.RunID})) } } func ListRecurringRuns(client *api_server.RecurringRunClient, parameters *recurring_run_params.ListRecurringRunsParams, namespace string) ([]*recurring_run_model.V2beta1RecurringRun, int, string, error) { if namespace != "" { parameters.Namespace = &namespace } return client.List(parameters) } func ListAllRecurringRuns(client *api_server.RecurringRunClient, namespace string) ([]*recurring_run_model.V2beta1RecurringRun, int, string, error) { return ListRecurringRuns(client, &recurring_run_params.ListRecurringRunsParams{}, namespace) } func DeleteAllRecurringRuns(client *api_server.RecurringRunClient, namespace string, t *testing.T) { recurringRuns, _, _, err := ListAllRecurringRuns(client, namespace) assert.Nil(t, err) for _, r := range recurringRuns { assert.Nil(t, client.Delete(&recurring_run_params.DeleteRecurringRunParams{RecurringRunID: r.RecurringRunID})) } } func ListPipelineVersions(client *api_server.PipelineClient, pipelineId string) ( []*pipeline_model.V2beta1PipelineVersion, int, string, error, ) { parameters := &pipeline_params.ListPipelineVersionsParams{PipelineID: pipelineId} return client.ListPipelineVersions(parameters) } func ListPipelines(client *api_server.PipelineClient) ( []*pipeline_model.V2beta1Pipeline, int, string, error, ) { parameters := &pipeline_params.ListPipelinesParams{} return client.List(parameters) } func DeleteAllPipelineVersions(client *api_server.PipelineClient, t *testing.T, pipelineId string) { pipelineVersions, _, _, err := ListPipelineVersions(client, pipelineId) assert.Nil(t, err) for _, pv := range pipelineVersions { assert.Nil(t, client.DeletePipelineVersion(&pipeline_params.DeletePipelineVersionParams{PipelineID: pipelineId, PipelineVersionID: pv.PipelineVersionID})) } } func DeleteAllPipelines(client *api_server.PipelineClient, t *testing.T) { pipelines, _, _, err := ListPipelines(client) assert.Nil(t, err) deletedPipelines := make(map[string]bool) for _, p := range pipelines { deletedPipelines[p.PipelineID] = false } for pId, isRemoved := range deletedPipelines { if !isRemoved { DeleteAllPipelineVersions(client, t, pId) deletedPipelines[pId] = true } assert.Nil(t, client.Delete(&pipeline_params.DeletePipelineParams{PipelineID: pId})) } for _, isRemoved := range deletedPipelines { assert.True(t, isRemoved) } }