From 7fdb1b91dab6bffc5ec0d8b4e0aa549cb7e4d183 Mon Sep 17 00:00:00 2001 From: gkcalat <35157096+gkcalat@users.noreply.github.com> Date: Tue, 7 Mar 2023 12:14:11 -0800 Subject: [PATCH] fix(backend): Allow runs and recurring runs without creator pipeline and pipeline version (#8926) * Deprecate experiment_id in Run API v2 * Allow runs and recurring runs without pipeline version id * Adjust the logic for retrieving existing pipeline name * Remove pipeline parsing logic and tests * Simplify the logic in fetching a template --- .../apiserver/resource/resource_manager.go | 311 +++--------------- .../resource/resource_manager_test.go | 110 ++++--- .../src/apiserver/server/job_server_test.go | 71 +--- .../src/apiserver/server/run_server_test.go | 48 +-- 4 files changed, 136 insertions(+), 404 deletions(-) diff --git a/backend/src/apiserver/resource/resource_manager.go b/backend/src/apiserver/resource/resource_manager.go index 583f054956..914bce462f 100644 --- a/backend/src/apiserver/resource/resource_manager.go +++ b/backend/src/apiserver/resource/resource_manager.go @@ -387,55 +387,13 @@ func (r *ResourceManager) CreateRun(ctx context.Context, run *model.Run) (*model run.ExperimentId = expId run.Namespace = expNs - // Fetch pipeline version based on pipeline spec (do not create anything yet) - var wfTemplate *template.Template - pipelineVersion, err := r.fetchPipelineVersionFromPipelineSpec(run.PipelineSpec, run.DisplayName, run.Description, run.Namespace) + // Create a template based on the manifest of an existing pipeline version or used-provided manifest. + // Update the run.PipelineSpec if an existing pipeline version is used. + tmpl, manifest, err := r.fetchTemplateFromPipelineSpec(&run.PipelineSpec) if err != nil { - return nil, util.Wrapf(err, "Failed to create a run. Specify a valid pipeline spec") - } - manifest := run.PipelineSpec.PipelineSpecManifest - if manifest == "" { - manifest = run.PipelineSpec.WorkflowSpecManifest - } - // If pipeline version does not exist, create it - if pipelineVersion == nil { - pipelineVersion, wfTemplate, err = r.createPipelineFromSpecIfNoExisting(manifest, run.Namespace, run.DisplayName, run.Description) - if err != nil { - return nil, util.Wrap(err, "Failed to create a run due to error fetching pipeline version from pipeline spec") - } - } - run.PipelineSpec.PipelineId = pipelineVersion.PipelineId - run.PipelineSpec.PipelineVersionId = pipelineVersion.UUID - run.PipelineSpec.PipelineName = pipelineVersion.Name - if manifest == "" { - manifest = pipelineVersion.PipelineSpec - } - // Get manifest from either of the two places: - // (1) raw manifest in pipeline_spec - // (2) pipeline version in resource_references - // And the latter takes priority over the former when the manifest is from pipeline_spec.pipeline_id - // workflow/pipeline manifest and pipeline id/version will not exist at the same time, guaranteed by the validation phase - var tmpl template.Template - if wfTemplate == nil { - tempBytes, _, err := r.fetchTemplateFromPipelineVersion(pipelineVersion) - if err != nil { - return nil, util.Wrap(err, "Failed to create a run with an empty pipeline spec manifest") - } - // If manifest is empty in the existing pipeline version (KFP 2.0.0-alpha.6 and prior to that) - if manifest == "" { - manifest = string(tempBytes) - } - // Prevent creating runs with inconsistent manifests - if string(tempBytes) != manifest { - return nil, util.NewInvalidInputError("Failed to create a run due to mismatch in the provided manifest and pipeline version. You need to create a new parent pipeline version. Or submit a run with an empty pipeline spec manifest (or matching one with the parent pipeline version)") - } - tmpl, err = template.New(tempBytes) - if err != nil { - return nil, util.Wrap(err, "Failed to create a run with an invalid pipeline spec manifest") - } - } else { - tmpl = *wfTemplate + return nil, util.NewInternalServerError(err, "Failed to create a run due to error fetching manifest") } + // TODO(gkcalat): consider changing the flow. Other resource UUIDs are assigned by their respective stores (DB). // Proposed flow: // 1. Create an entry and assign creation timestamp and uuid. @@ -849,150 +807,40 @@ func (r *ResourceManager) GetJob(id string) (*model.Job, error) { // 1. Pipeline version with the given pipeline version id // 2. The latest pipeline version with given pipeline id // 3. Repeats 1 and 2 for pipeline version id and pipeline id parsed from the pipeline name -func (r *ResourceManager) fetchPipelineVersionFromPipelineSpec(pipelineSpec model.PipelineSpec, displayName string, description string, namespace string) (*model.PipelineVersion, error) { +func (r *ResourceManager) fetchPipelineVersionFromPipelineSpec(pipelineSpec model.PipelineSpec) (*model.PipelineVersion, error) { // Fetch or create a pipeline version - var pipelineVersion *model.PipelineVersion if pipelineSpec.PipelineVersionId != "" { - pv, err := r.GetPipelineVersion(pipelineSpec.PipelineVersionId) + pipelineVersion, err := r.GetPipelineVersion(pipelineSpec.PipelineVersionId) if err != nil { return nil, util.Wrapf(err, "Failed to fetch a pipeline version and its manifest from pipeline version %v", pipelineSpec.PipelineVersionId) } - pipelineVersion = pv + return pipelineVersion, nil } else if pipelineSpec.PipelineId != "" { - pv, err := r.GetLatestPipelineVersion(pipelineSpec.PipelineId) + pipelineVersion, err := r.GetLatestPipelineVersion(pipelineSpec.PipelineId) if err != nil { return nil, util.Wrapf(err, "Failed to fetch a pipeline version and its manifest from pipeline %v", pipelineSpec.PipelineId) } - pipelineVersion = pv + return pipelineVersion, nil } else if pipelineSpec.PipelineName != "" { resourceNames := common.ParseResourceIdsFromFullName(pipelineSpec.PipelineName) if resourceNames["PipelineVersionId"] == "" && resourceNames["PipelineId"] == "" { return nil, util.Wrapf(util.NewInvalidInputError("Pipeline spec source is missing"), "Failed to fetch a pipeline version and its manifest due to an empty pipeline spec source: %v", pipelineSpec.PipelineName) } if resourceNames["PipelineVersionId"] != "" { - pv, err := r.GetPipelineVersion(resourceNames["PipelineVersionId"]) + pipelineVersion, err := r.GetPipelineVersion(resourceNames["PipelineVersionId"]) if err != nil { return nil, util.Wrapf(err, "Failed to fetch a pipeline version and its manifest from pipeline %v. Check if pipeline version %v exists", pipelineSpec.PipelineName, resourceNames["PipelineVersionId"]) } - pipelineVersion = pv + return pipelineVersion, nil } else { - pv, err := r.GetLatestPipelineVersion(resourceNames["PipelineId"]) + pipelineVersion, err := r.GetLatestPipelineVersion(resourceNames["PipelineId"]) if err != nil { return nil, util.Wrapf(err, "Failed to fetch a pipeline version and its manifest from pipeline %v. Check if pipeline %v exists", pipelineSpec.PipelineName, resourceNames["PipelineId"]) } - pipelineVersion = pv - } - } else { - return nil, nil - } - return pipelineVersion, nil -} - -// Creates a pipeline and pipeline version with the following priority if does not exists. -// Returns a pipeline version and a workflow template. -// 1. Uses an existing pipeline version with the same name, namespace, and manifest (checks the last 10 pipeline version) -// 2. Creates a new pipeline version under an existing pipeline with the same name, namespace -// 3. Creates a new pipeline and a new pipeline version -func (r ResourceManager) createPipelineFromSpecIfNoExisting(manifest string, namespace string, displayName string, description string) (*model.PipelineVersion, *template.Template, error) { - // Read manifest and extract name and IDs - tmpl, err := template.New([]byte(manifest)) - if err != nil { - return nil, nil, err - } - wfName := tmpl.V2PipelineName() - resourceNames := common.ParseResourceIdsFromFullName(wfName) - pipelineVersionId := resourceNames["PipelineVersionId"] - pipelineId := resourceNames["PipelineId"] - pipelineName := "" - if pipelineId == "" && pipelineVersionId == "" { - pipelineName = wfName - } - if pipelineName == "" && displayName != "" { - pipelineName = displayName - } - - fetchingError := "" - // Quickly return if a pipeline version exists - if pipelineVersionId != "" { - pv, err := r.GetPipelineVersion(pipelineVersionId) - if err != nil { - fetchingError = fmt.Sprintf("%v: Failed to fetch a pipeline version with id %v: %v", fetchingError, pipelineVersionId, err.Error()) - } else { - return pv, &tmpl, nil + return pipelineVersion, nil } } - // Try fetching an existing pipeline by ID - var existingPipeline *model.Pipeline - var newPipeline *model.Pipeline - if pipelineId != "" { - existingPipeline, err = r.GetPipeline(pipelineId) - if err != nil { - fetchingError = fmt.Sprintf("%v: Failed to fetch a pipeline with id %v: %v", fetchingError, pipelineId, err.Error()) - } - } - // Try fetching an existing pipeline by name and namespace - if pipelineName != "" { - existingPipeline, err = r.GetPipelineByNameAndNamespace(pipelineName, namespace) - if err != nil { - fetchingError = fmt.Sprintf("%v: Failed to fetch a pipeline with name %v and namespace %v: %v", fetchingError, pipelineName, namespace, err.Error()) - if pipelineName != displayName && displayName != "" { - existingPipeline, err = r.GetPipelineByNameAndNamespace(displayName, namespace) - if err != nil { - fetchingError = fmt.Sprintf("%v: Failed to fetch a pipeline with name %v and namespace %v: %v", fetchingError, displayName, namespace, err.Error()) - } - } - } - } - // Create a new pipeline if not found - if existingPipeline == nil { - newPipeline = &model.Pipeline{ - Name: pipelineName, - Description: description, - Namespace: namespace, - } - newPipeline, err = r.CreatePipeline(newPipeline) - if err != nil { - return nil, nil, util.Wrap(util.Wrap(err, fetchingError), "Failed to fetch a pipeline version and its manifest due to error creating a new pipeline") - } - } - // Try fetching existing pipeline versions - var pipelineVersion *model.PipelineVersion - if existingPipeline != nil { - opts, err := list.NewOptions(&model.PipelineVersion{}, 10, "created_at DESC", nil) - if err != nil { - fetchingError = fmt.Sprintf("%v: Failed to prepare pipeline version listing request: %v", fetchingError, err.Error()) - } - existingVersions, _, _, err := r.ListPipelineVersions(existingPipeline.UUID, opts) - if err != nil { - fetchingError = fmt.Sprintf("%v: Failed to list pipeline versions for pipeline %v: %v", fetchingError, existingPipeline.UUID, err.Error()) - } - for _, version := range existingVersions { - if version.PipelineSpec == manifest { - pipelineVersion = version - break - } - } - } - // Create a new pipeline version - if pipelineVersion == nil { - pId := "" - if existingPipeline != nil { - pId = existingPipeline.UUID - } else { - pId = newPipeline.UUID - } - pipelineVersion = &model.PipelineVersion{ - PipelineId: pId, - Name: fmt.Sprintf("%v-%v", pipelineName, r.time.Now().Unix()), - PipelineSpec: manifest, - Description: description, - } - pipelineVersion, err = r.CreatePipelineVersion(pipelineVersion) - if err != nil { - return nil, nil, util.Wrap(util.Wrap(err, fetchingError), "Failed to fetch a pipeline version and its manifest due to error creating a new pipeline version") - } - } - return pipelineVersion, &tmpl, nil + return nil, nil } // Checks if experiment exists and whether it belongs to the specified namespace. @@ -1047,56 +895,13 @@ func (r *ResourceManager) CreateJob(ctx context.Context, job *model.Job) (*model job.ExperimentId = expId job.Namespace = expNs - // Fetch pipeline version based on pipeline spec (do not create anything yet) - var wfTemplate *template.Template - pipelineVersion, err := r.fetchPipelineVersionFromPipelineSpec(job.PipelineSpec, job.DisplayName, job.Description, job.Namespace) + // Create a template based on the manifest of an existing pipeline version or used-provided manifest. + // Update the job.PipelineSpec if an existing pipeline version is used. + tmpl, manifest, err := r.fetchTemplateFromPipelineSpec(&job.PipelineSpec) if err != nil { - return nil, util.Wrapf(err, "Failed to create a recurring run. Specify a valid pipeline spec") - } - manifest := job.PipelineSpec.PipelineSpecManifest - if manifest == "" { - manifest = job.PipelineSpec.WorkflowSpecManifest - } - // If pipeline version does not exist, create it - if pipelineVersion == nil { - pipelineVersion, wfTemplate, err = r.createPipelineFromSpecIfNoExisting(manifest, job.Namespace, job.DisplayName, job.Description) - if err != nil { - return nil, util.Wrap(err, "Failed to create a recurring run due to error fetching pipeline version from pipeline spec") - } - } - job.PipelineSpec.PipelineId = pipelineVersion.PipelineId - job.PipelineSpec.PipelineVersionId = pipelineVersion.UUID - job.PipelineSpec.PipelineName = pipelineVersion.Name - if manifest == "" { - manifest = pipelineVersion.PipelineSpec - } - // Get manifest from either of the two places: - // (1) raw manifest in pipeline_spec - // (2) pipeline version in resource_references - // And the latter takes priority over the former when the manifest is from pipeline_spec.pipeline_id - // workflow/pipeline manifest and pipeline id/version will not exist at the same time, guaranteed by the validation phase - var tmpl template.Template - // This should only happen when creating a run from an existing pipeline or pipeline version - if wfTemplate == nil { - tempBytes, _, err := r.fetchTemplateFromPipelineVersion(pipelineVersion) - if err != nil { - return nil, util.Wrap(err, "Failed to create a recurring run with an empty pipeline spec manifest") - } - // If manifest is empty in the existing pipeline version (KFP 2.0.0-alpha.6 and prior to that) - if manifest == "" { - manifest = string(tempBytes) - } - // Prevent creating runs with inconsistent manifests - if string(tempBytes) != manifest { - return nil, util.NewInvalidInputError("Failed to create a recurring run due to mismatch in the provided manifest and pipeline version. You need to create a new parent pipeline version. Or submit a run with an empty pipeline spec manifest (or matching one with the parent pipeline version)") - } - tmpl, err = template.New(tempBytes) - if err != nil { - return nil, util.Wrap(err, "Failed to create a recurring run with an invalid pipeline spec manifest") - } - } else { - tmpl = *wfTemplate + return nil, util.NewInternalServerError(err, "Failed to create a recurring run with an invalid pipeline spec manifest") } + // TODO(gkcalat): consider changing the flow. Other resource UUIDs are assigned by their respective stores (DB). // Convert modelJob into scheduledWorkflow. scheduledWorkflow, err := tmpl.ScheduledWorkflow(job) @@ -1415,53 +1220,41 @@ func (r *ResourceManager) ReportScheduledWorkflowResource(swf *util.ScheduledWor return r.jobStore.UpdateJob(swf) } -// Fetches PipelineSpec's manifest as []byte array. -// It attempts to fetch PipelineSpec manifest in the following order: -// 1. Directly read from PipelineSpec's PipelineSpecManifest field. -// 2. Directly read from PipelineSpec's WorkflowSpecManifest field. -// 3. Fetch pipeline spec manifest from the pipeline version for PipelineSpec's PipelineVersionId field. -// 4. Fetch pipeline spec manifest from the latest pipeline version for PipelineSpec's PipelineId field. -func (r *ResourceManager) fetchTemplateFromPipelineSpec(p *model.PipelineSpec) ([]byte, error) { - if p == nil { - return nil, util.NewInvalidInputError("Failed to read pipeline spec manifest from nil") - } - if len(p.PipelineSpecManifest) != 0 { - return []byte(p.PipelineSpecManifest), nil - } - if len(p.WorkflowSpecManifest) != 0 { - return []byte(p.WorkflowSpecManifest), nil - } - var errPv, errP error - if p.PipelineVersionId != "" { - pv, errPv1 := r.GetPipelineVersion(p.PipelineVersionId) - if errPv1 == nil { - bytes, _, errPv2 := r.fetchTemplateFromPipelineVersion(pv) - if errPv2 == nil { - return bytes, nil - } else { - errPv = errPv2 - } - } else { - errPv = errPv1 +// Returns a workflow template based on the manifest in the following priority: +// 1. Pipeline spec manifest from an existing pipeline version, +// 2. Pipeline spec manifest or workflow spec manifest provided by a user. +// If an existing pipeline version is found, the referenced pipeline and pipeline version are updated. +func (r *ResourceManager) fetchTemplateFromPipelineSpec(pipelineSpec *model.PipelineSpec) (template.Template, string, error) { + manifest := "" + pipelineVersion, err := r.fetchPipelineVersionFromPipelineSpec(*pipelineSpec) + if err != nil { + return nil, "", util.Wrapf(err, "Failed to fetch a template due to error retrieving pipeline version") + } else if pipelineVersion != nil { + // Update references to the existing pipeline version + pipelineSpec.PipelineId = pipelineVersion.PipelineId + pipelineSpec.PipelineVersionId = pipelineVersion.UUID + pipelineSpec.PipelineName = pipelineVersion.Name + // Fetch the template from PipelineSpec field or the corresponding YAML file + tempBytes, _, err := r.fetchTemplateFromPipelineVersion(pipelineVersion) + if err != nil { + return nil, "", util.Wrapf(err, "Failed to fetch a template due invalid manifest in pipeline version %v", pipelineSpec.PipelineVersionId) + } + manifest = string(tempBytes) + } else { + // Read the provided manifest and fail if it is empty + manifest = pipelineSpec.PipelineSpecManifest + if manifest == "" { + manifest = pipelineSpec.WorkflowSpecManifest + } + if manifest == "" { + return nil, "", util.NewInvalidInputError("Failed to fetch a template with an empty pipeline spec manifest") } } - if p.PipelineId != "" { - pv, errP1 := r.GetLatestPipelineVersion(p.PipelineId) - if errP1 == nil { - bytes, _, errP2 := r.fetchTemplateFromPipelineVersion(pv) - if errP2 == nil { - return bytes, nil - } else { - errP = errP2 - } - } else { - errP = errP1 - } + tmpl, err := template.New([]byte(manifest)) + if err != nil { + return nil, "", util.Wrap(err, "Failed to fetch a template with an invalid pipeline spec manifest") } - return nil, util.Wrap( - util.Wrapf(errPv, "Failed to read pipeline spec for pipeline version id %v", p.PipelineVersionId), - util.Wrapf(errP, "Failed to read pipeline spec for pipeline id %v", p.PipelineId).Error(), - ) + return tmpl, manifest, nil } // Fetches PipelineSpec as []byte array and a new URI of PipelineSpec. diff --git a/backend/src/apiserver/resource/resource_manager_test.go b/backend/src/apiserver/resource/resource_manager_test.go index 7e54995e4c..e19b4c376c 100644 --- a/backend/src/apiserver/resource/resource_manager_test.go +++ b/backend/src/apiserver/resource/resource_manager_test.go @@ -1573,19 +1573,16 @@ func TestCreateRun_ThroughWorkflowSpecV2(t *testing.T) { Namespace: runDetail.Namespace, StorageState: model.StorageStateAvailable, PipelineSpec: model.PipelineSpec{ - PipelineId: "123e4567-e89b-12d3-a456-426655440000", - PipelineVersionId: "123e4567-e89b-12d3-a456-426655440000", - PipelineName: "hello-world-3", PipelineSpecManifest: v2SpecHelloWorld, }, RunDetails: model.RunDetails{ - CreatedAtInSec: 5, - ScheduledAtInSec: 5, + CreatedAtInSec: 2, + ScheduledAtInSec: 2, Conditions: "Pending", State: model.RuntimeStatePending, StateHistory: []*model.RuntimeStatus{ { - UpdateTimeInSec: 6, + UpdateTimeInSec: 3, State: model.RuntimeStatePending, }, }, @@ -1624,20 +1621,17 @@ func TestCreateRun_ThroughWorkflowSpec(t *testing.T) { ServiceAccount: "pipeline-runner", StorageState: model.StorageStateAvailable, PipelineSpec: model.PipelineSpec{ - PipelineId: "123e4567-e89b-12d3-a456-426655440000", - PipelineVersionId: "123e4567-e89b-12d3-a456-426655440000", - PipelineName: "run1-3", WorkflowSpecManifest: testWorkflow.ToStringForStore(), Parameters: "[{\"name\":\"param1\",\"value\":\"world\"}]", }, RunDetails: model.RunDetails{ - CreatedAtInSec: 5, - ScheduledAtInSec: 5, + CreatedAtInSec: 2, + ScheduledAtInSec: 2, Conditions: "Pending", State: "PENDING", StateHistory: []*model.RuntimeStatus{ { - UpdateTimeInSec: 6, + UpdateTimeInSec: 3, State: model.RuntimeStatePending, }, }, @@ -1678,12 +1672,12 @@ func TestCreateRun_ThroughWorkflowSpecWithPatch(t *testing.T) { ServiceAccount: "pipeline-runner", StorageState: model.StorageStateAvailable, RunDetails: model.RunDetails{ - CreatedAtInSec: 5, - ScheduledAtInSec: 5, + CreatedAtInSec: 2, + ScheduledAtInSec: 2, Conditions: "Pending", StateHistory: []*model.RuntimeStatus{ { - UpdateTimeInSec: 6, + UpdateTimeInSec: 3, State: model.RuntimeStatePending, }, }, @@ -1694,9 +1688,7 @@ func TestCreateRun_ThroughWorkflowSpecWithPatch(t *testing.T) { Parameters: "[{\"name\":\"param1\",\"value\":\"{{kfp-default-bucket}}\"}]", }, } - expectedRunDetail.PipelineSpec.PipelineId = runDetail.PipelineSpec.PipelineId expectedRunDetail.PipelineSpec.PipelineName = runDetail.PipelineSpec.PipelineName - expectedRunDetail.PipelineSpec.PipelineVersionId = runDetail.PipelineSpec.PipelineVersionId expectedRunDetail = expectedRunDetail.ToV2().ToV1() assert.Equal(t, expectedRunDetail.ToV1(), runDetail.ToV1(), "The CreateRun return has unexpected value") assert.Equal(t, 1, store.ExecClientFake.GetWorkflowCount(), "Workflow CRD is not created") @@ -1727,14 +1719,12 @@ func TestCreateRun_ThroughWorkflowSpecSameManifest(t *testing.T) { }, ) assert.Nil(t, err) - assert.NotEmpty(t, newRun.PipelineVersionId) - assert.NotEmpty(t, newRun.PipelineId) assert.Equal(t, "run1", newRun.DisplayName) - assert.Equal(t, runDetail.PipelineId, newRun.PipelineId) - assert.Equal(t, runDetail.PipelineVersionId, newRun.PipelineVersionId) + assert.Empty(t, newRun.PipelineId) + assert.Empty(t, newRun.PipelineVersionId) assert.NotEqual(t, runDetail.WorkflowRuntimeManifest, newRun.WorkflowRuntimeManifest) assert.Equal(t, runDetail.WorkflowSpecManifest, newRun.WorkflowSpecManifest) - assert.Equal(t, runDetail.PipelineSpecManifest, newRun.PipelineSpecManifest) + assert.Empty(t, newRun.PipelineSpecManifest) manager.uuid = util.NewFakeUUIDGeneratorOrFatal(DefaultFakePipelineIdThree, nil) pipelineStore.SetUUIDGenerator(util.NewFakeUUIDGeneratorOrFatal(DefaultFakePipelineIdThree, nil)) @@ -1750,14 +1740,11 @@ func TestCreateRun_ThroughWorkflowSpecSameManifest(t *testing.T) { }, ) assert.Nil(t, err) - assert.NotEmpty(t, newRun2.PipelineVersionId) - assert.NotEmpty(t, newRun2.PipelineId) assert.Equal(t, "run1", newRun2.DisplayName) - assert.Equal(t, newRun.PipelineId, newRun2.PipelineId) - assert.NotEqual(t, newRun.PipelineVersionId, newRun2.PipelineVersionId) - assert.NotEqual(t, newRun.WorkflowRuntimeManifest, newRun2.WorkflowRuntimeManifest) - assert.NotEqual(t, newRun.WorkflowSpecManifest, newRun2.WorkflowSpecManifest) - assert.NotEqual(t, newRun.PipelineSpecManifest, newRun2.PipelineSpecManifest) + assert.Empty(t, newRun2.PipelineId) + assert.Empty(t, newRun2.PipelineVersionId) + assert.Empty(t, newRun2.WorkflowRuntimeManifest) + assert.Equal(t, v2SpecHelloWorld, newRun2.WorkflowSpecManifest) assert.Equal(t, v2SpecHelloWorld, newRun2.PipelineSpecManifest) } @@ -1976,7 +1963,7 @@ func TestCreateRun_EmptyPipelineSpec(t *testing.T) { } _, err := manager.CreateRun(context.Background(), apiRun) assert.NotNil(t, err) - assert.Contains(t, err.Error(), "unknown template format: pipeline spec is invalid") + assert.Contains(t, err.Error(), "Failed to fetch a template with an empty pipeline spec manifest") } func TestCreateRun_InvalidWorkflowSpec(t *testing.T) { @@ -2308,16 +2295,14 @@ func TestCreateJob_ThroughWorkflowSpec(t *testing.T) { ServiceAccount: "pipeline-runner", ExperimentId: DefaultFakeUUID, Enabled: true, - CreatedAtInSec: 5, - UpdatedAtInSec: 5, + CreatedAtInSec: 2, + UpdatedAtInSec: 2, Conditions: "STATUS_UNSPECIFIED", PipelineSpec: model.PipelineSpec{ WorkflowSpecManifest: testWorkflow.ToStringForStore(), }, } - expectedJob.PipelineSpec.PipelineId = job.PipelineSpec.PipelineId expectedJob.PipelineSpec.PipelineName = job.PipelineSpec.PipelineName - expectedJob.PipelineSpec.PipelineVersionId = job.PipelineSpec.PipelineVersionId assert.Equal(t, expectedJob.ToV1(), job.ToV1()) } @@ -2332,8 +2317,8 @@ func TestCreateJob_ThroughWorkflowSpecV2(t *testing.T) { ServiceAccount: "pipeline-runner", Enabled: true, ExperimentId: DefaultFakeUUID, - CreatedAtInSec: 5, - UpdatedAtInSec: 5, + CreatedAtInSec: 2, + UpdatedAtInSec: 2, Conditions: "STATUS_UNSPECIFIED", PipelineSpec: model.PipelineSpec{ PipelineSpecManifest: v2SpecHelloWorld, @@ -2343,9 +2328,7 @@ func TestCreateJob_ThroughWorkflowSpecV2(t *testing.T) { }, }, } - expectedJob.PipelineSpec.PipelineId = job.PipelineSpec.PipelineId expectedJob.PipelineSpec.PipelineName = job.PipelineSpec.PipelineName - expectedJob.PipelineSpec.PipelineVersionId = job.PipelineSpec.PipelineVersionId assert.Equal(t, expectedJob.ToV1(), job.ToV1()) fetchedJob, err := manager.GetJob(job.UUID) assert.Nil(t, err) @@ -2531,7 +2514,7 @@ func TestCreateJob_EmptyPipelineSpec(t *testing.T) { } _, err := manager.CreateJob(context.Background(), job) assert.NotNil(t, err) - assert.Contains(t, err.Error(), "unknown template format: pipeline spec is invalid") + assert.Contains(t, err.Error(), "Failed to fetch a template with an empty pipeline spec manifest") } func TestCreateJob_InvalidWorkflowSpec(t *testing.T) { @@ -2619,8 +2602,8 @@ func TestEnableJob(t *testing.T) { Namespace: "ns1", ServiceAccount: "pipeline-runner", Enabled: false, - CreatedAtInSec: 5, - UpdatedAtInSec: 6, + CreatedAtInSec: 2, + UpdatedAtInSec: 3, Conditions: "STATUS_UNSPECIFIED", ExperimentId: DefaultFakeUUID, PipelineSpec: model.PipelineSpec{ @@ -2773,16 +2756,16 @@ func TestReportWorkflowResource_ScheduledWorkflowIDEmpty_Success(t *testing.T) { ServiceAccount: "pipeline-runner", StorageState: model.StorageStateAvailable, RunDetails: model.RunDetails{ - CreatedAtInSec: 5, - ScheduledAtInSec: 5, + CreatedAtInSec: 2, + ScheduledAtInSec: 2, Conditions: "Running", StateHistory: []*model.RuntimeStatus{ { - UpdateTimeInSec: 6, + UpdateTimeInSec: 3, State: model.RuntimeStatePending, }, { - UpdateTimeInSec: 7, + UpdateTimeInSec: 4, State: model.RuntimeStateRunning, }, }, @@ -2792,8 +2775,6 @@ func TestReportWorkflowResource_ScheduledWorkflowIDEmpty_Success(t *testing.T) { Parameters: "[{\"name\":\"param1\",\"value\":\"world\"}]", }, } - expectedRun.PipelineSpec.PipelineId = run.PipelineSpec.PipelineId - expectedRun.PipelineSpec.PipelineVersionId = run.PipelineSpec.PipelineVersionId expectedRun.PipelineSpec.PipelineName = run.PipelineSpec.PipelineName expectedRun.RunDetails.WorkflowRuntimeManifest = run.RunDetails.WorkflowRuntimeManifest assert.Equal(t, expectedRun.ToV1(), run.ToV1()) @@ -2849,7 +2830,7 @@ func TestReportWorkflowResource_ScheduledWorkflowIDNotEmpty_Success(t *testing.T State: model.RuntimeStateUnspecified, StateHistory: []*model.RuntimeStatus{ { - UpdateTimeInSec: 6, + UpdateTimeInSec: 3, State: model.RuntimeStateUnspecified, }, }, @@ -2920,7 +2901,7 @@ func TestReportWorkflowResource_ScheduledWorkflowIDNotEmpty_NoExperiment_Success State: model.RuntimeStatePending, StateHistory: []*model.RuntimeStatus{ { - UpdateTimeInSec: 6, + UpdateTimeInSec: 3, State: model.RuntimeStatePending, }, }, @@ -3093,12 +3074,10 @@ func TestReportScheduledWorkflowResource_Success(t *testing.T) { WorkflowSpecManifest: testWorkflow.ToStringForStore(), Parameters: "[]", PipelineSpecManifest: actualJob.PipelineSpec.PipelineSpecManifest, - PipelineId: actualJob.PipelineSpec.PipelineId, PipelineName: actualJob.PipelineSpec.PipelineName, - PipelineVersionId: actualJob.PipelineSpec.PipelineVersionId, }, - CreatedAtInSec: 5, - UpdatedAtInSec: 6, + CreatedAtInSec: 2, + UpdatedAtInSec: 3, } expectedJob.Conditions = "STATUS_UNSPECIFIED" assert.Equal(t, expectedJob.ToV1(), actualJob.ToV1()) @@ -3991,3 +3970,28 @@ root: schemaVersion: 2.0.0 sdkVersion: kfp-1.6.5 ` + +var v2SpecHelloWorldMutated = ` +components: + comp-hello-world: + executorLabel: exec-hello-world +deploymentSpec: + executors: + exec-hello-world: + container: + image: python:3.7 +pipelineInfo: + name: pipelines/p1/versions/v1 +root: + dag: + tasks: + hello-world: + cachingOptions: + enableCache: true + componentRef: + name: comp-hello-world + taskInfo: + name: hello-world +schemaVersion: 2.0.0 +sdkVersion: kfp-1.6.5 +` diff --git a/backend/src/apiserver/server/job_server_test.go b/backend/src/apiserver/server/job_server_test.go index cd6dd463a2..196b905764 100644 --- a/backend/src/apiserver/server/job_server_test.go +++ b/backend/src/apiserver/server/job_server_test.go @@ -103,7 +103,7 @@ var ( ) func TestCreateJob_WrongInput(t *testing.T) { - clients, manager, experiment, pipelineVersion := initWithExperimentAndPipelineVersion(t) + clients, manager, experiment, _ := initWithExperimentAndPipelineVersion(t) defer clients.Close() server := NewJobServer(manager, &JobServerOptions{CollectMetrics: false}) tests := []struct { @@ -151,37 +151,7 @@ func TestCreateJob_WrongInput(t *testing.T) { }}}, ResourceReferences: validReference, }, - "InvalidInputError: unknown template format: pipeline spec is invalid", - }, - { - "duplicate pipeline spec", - &apiv1beta1.Job{ - Name: "job1", - Enabled: true, - MaxConcurrency: 1, - Trigger: &apiv1beta1.Trigger{ - Trigger: &apiv1beta1.Trigger_CronSchedule{CronSchedule: &apiv1beta1.CronSchedule{ - StartTime: ×tamp.Timestamp{Seconds: 1}, - Cron: "1 * * * *", - }}}, - PipelineSpec: &apiv1beta1.PipelineSpec{ - WorkflowManifest: v2SpecHelloWorld, - }, - ResourceReferences: []*api.ResourceReference{ - { - Key: &apiv1beta1.ResourceKey{ - Type: apiv1beta1.ResourceType_EXPERIMENT, - Id: DefaultFakeUUID, - }, - Relationship: apiv1beta1.Relationship_OWNER, - }, - { - Key: &apiv1beta1.ResourceKey{Type: apiv1beta1.ResourceType_PIPELINE_VERSION, Id: pipelineVersion.UUID}, - Relationship: apiv1beta1.Relationship_CREATOR, - }, - }, - }, - "Failed to create a recurring run due to mismatch in the provided manifest and pipeline version", + "Failed to fetch a template with an empty pipeline spec manifest", }, { "invalid pipeline spec", @@ -374,13 +344,6 @@ func TestCreateJob_NoResRefs(t *testing.T) { }, Relationship: apiv1beta1.Relationship_OWNER, }, - { - Key: &apiv1beta1.ResourceKey{ - Type: apiv1beta1.ResourceType_PIPELINE_VERSION, - Id: DefaultFakeIdTwo, - }, - Relationship: apiv1beta1.Relationship_CREATOR, - }, } expectedJob := &apiv1beta1.Job{ Id: DefaultFakeIdOne, @@ -396,8 +359,6 @@ func TestCreateJob_NoResRefs(t *testing.T) { ServiceAccount: "pipeline-runner", Status: "STATUS_UNSPECIFIED", PipelineSpec: &apiv1beta1.PipelineSpec{ - PipelineId: DefaultFakeIdTwo, - PipelineName: "job1-7", WorkflowManifest: testWorkflow.ToStringForStore(), Parameters: []*apiv1beta1.Parameter{{Name: "param1", Value: "world"}}, }, @@ -674,15 +635,11 @@ func TestListJobs_Multiuser(t *testing.T) { assert.Nil(t, err) var expectedJobs []*apiv1beta1.Job - commonExpectedJob.PipelineSpec.PipelineId = "123e4567-e89b-12d3-a456-426655440000" - commonExpectedJob.PipelineSpec.PipelineName = "job1-3" - commonExpectedJob.CreatedAt = ×tamp.Timestamp{Seconds: 5} - commonExpectedJob.UpdatedAt = ×tamp.Timestamp{Seconds: 5} + commonExpectedJob.CreatedAt = ×tamp.Timestamp{Seconds: 2} + commonExpectedJob.UpdatedAt = ×tamp.Timestamp{Seconds: 2} commonExpectedJob.ResourceReferences = []*apiv1beta1.ResourceReference{ {Key: &apiv1beta1.ResourceKey{Type: apiv1beta1.ResourceType_NAMESPACE, Id: "ns1"}, Relationship: apiv1beta1.Relationship_OWNER}, {Key: &apiv1beta1.ResourceKey{Type: apiv1beta1.ResourceType_EXPERIMENT, Id: DefaultFakeIdOne}, Relationship: apiv1beta1.Relationship_OWNER}, - {Key: &apiv1beta1.ResourceKey{Type: apiv1beta1.ResourceType_PIPELINE, Id: DefaultFakeIdOne}, Relationship: apiv1beta1.Relationship_CREATOR}, - {Key: &apiv1beta1.ResourceKey{Type: apiv1beta1.ResourceType_PIPELINE_VERSION, Id: DefaultFakeIdOne}, Relationship: apiv1beta1.Relationship_CREATOR}, } expectedJobs = append(expectedJobs, commonExpectedJob) expectedJobsEmpty := []*apiv1beta1.Job{} @@ -940,10 +897,10 @@ func TestCreateRecurringRun(t *testing.T) { Cron: "1 * * * *", }}, }, - CreatedAt: ×tamp.Timestamp{Seconds: 5}, - UpdatedAt: ×tamp.Timestamp{Seconds: 5}, + CreatedAt: ×tamp.Timestamp{Seconds: 2}, + UpdatedAt: ×tamp.Timestamp{Seconds: 2}, Status: apiv2beta1.RecurringRun_ENABLED, - PipelineSource: &apiv2beta1.RecurringRun_PipelineVersionId{PipelineVersionId: recurringRun.GetPipelineVersionId()}, + PipelineSource: &apiv2beta1.RecurringRun_PipelineSpec{PipelineSpec: pipelineSpecStruct}, RuntimeConfig: &apiv2beta1.RuntimeConfig{ PipelineRoot: "model-pipeline-root", Parameters: make(map[string]*structpb.Value), @@ -994,10 +951,10 @@ func TestGetRecurringRun(t *testing.T) { Cron: "1 * * * *", }}, }, - CreatedAt: ×tamp.Timestamp{Seconds: 5}, - UpdatedAt: ×tamp.Timestamp{Seconds: 5}, + CreatedAt: ×tamp.Timestamp{Seconds: 2}, + UpdatedAt: ×tamp.Timestamp{Seconds: 2}, Status: apiv2beta1.RecurringRun_ENABLED, - PipelineSource: &apiv2beta1.RecurringRun_PipelineVersionId{PipelineVersionId: createdRecurringRun.GetPipelineVersionId()}, + PipelineSource: &apiv2beta1.RecurringRun_PipelineSpec{PipelineSpec: pipelineSpecStruct}, RuntimeConfig: &apiv2beta1.RuntimeConfig{ PipelineRoot: "model-pipeline-root", Parameters: make(map[string]*structpb.Value), @@ -1035,7 +992,7 @@ func TestListRecurringRuns(t *testing.T) { ExperimentId: "123e4567-e89b-12d3-a456-426655440000", } - createdRecurringRun, err := server.CreateRecurringRun(nil, &apiv2beta1.CreateRecurringRunRequest{RecurringRun: apiRecurringRun}) + _, err := server.CreateRecurringRun(nil, &apiv2beta1.CreateRecurringRunRequest{RecurringRun: apiRecurringRun}) assert.Nil(t, err) expectedRecurringRun := &apiv2beta1.RecurringRun{ @@ -1051,9 +1008,9 @@ func TestListRecurringRuns(t *testing.T) { Cron: "1 * * * *", }}, }, - CreatedAt: ×tamp.Timestamp{Seconds: 5}, - UpdatedAt: ×tamp.Timestamp{Seconds: 5}, - PipelineSource: &apiv2beta1.RecurringRun_PipelineVersionId{PipelineVersionId: createdRecurringRun.GetPipelineVersionId()}, + CreatedAt: ×tamp.Timestamp{Seconds: 2}, + UpdatedAt: ×tamp.Timestamp{Seconds: 2}, + PipelineSource: &apiv2beta1.RecurringRun_PipelineSpec{PipelineSpec: pipelineSpecStruct}, RuntimeConfig: &apiv2beta1.RuntimeConfig{ PipelineRoot: "model-pipeline-root", Parameters: make(map[string]*structpb.Value), diff --git a/backend/src/apiserver/server/run_server_test.go b/backend/src/apiserver/server/run_server_test.go index f47706d7af..c1595b15b9 100644 --- a/backend/src/apiserver/server/run_server_test.go +++ b/backend/src/apiserver/server/run_server_test.go @@ -123,7 +123,7 @@ func TestCreateRunV1_no_pipeline_source(t *testing.T) { runDetail, err := server.CreateRunV1(nil, &apiv1beta1.CreateRunRequest{Run: run}) assert.NotNil(t, err) assert.Nil(t, runDetail) - assert.Contains(t, err.Error(), "unknown template format: pipeline spec is invalid") + assert.Contains(t, err.Error(), "Failed to fetch a template with an empty pipeline spec manifest") } func TestCreateRunV1_invalid_spec(t *testing.T) { @@ -216,24 +216,6 @@ func TestCreateRunV1_pipelineversion(t *testing.T) { assert.Equal(t, exp.UUID, runDetail.Run.ResourceReferences[0].Key.Id) } -func TestCreateRunV1_MismatchedManifests(t *testing.T) { - clients, manager, _, _ := initWithExperimentAndPipelineVersion(t) - defer clients.Close() - server := NewRunServer(manager, &RunServerOptions{CollectMetrics: false}) - run := &apiv1beta1.Run{ - Name: "run1", - ResourceReferences: validReferencesOfExperimentAndPipelineVersion, - PipelineSpec: &apiv1beta1.PipelineSpec{ - WorkflowManifest: testWorkflow2.ToStringForStore(), - Parameters: []*apiv1beta1.Parameter{{Name: "param1", Value: "world"}}, - }, - } - runDetail, err := server.CreateRunV1(nil, &apiv1beta1.CreateRunRequest{Run: run}) - assert.NotNil(t, err) - assert.Nil(t, runDetail) - assert.Contains(t, err.Error(), "Invalid input error: Failed to create a run due to mismatch in the provided manifest and pipeline version") -} - func TestCreateRunV1_Manifest_and_pipeline_version(t *testing.T) { clients, manager, exp, _ := initWithExperimentAndPipelineVersion(t) defer clients.Close() @@ -646,8 +628,8 @@ func TestCreateRun(t *testing.T) { DisplayName: "run1", ServiceAccount: "pipeline-runner", StorageState: apiv2beta1.Run_AVAILABLE, - CreatedAt: ×tamp.Timestamp{Seconds: 5}, - ScheduledAt: ×tamp.Timestamp{Seconds: 5}, + CreatedAt: ×tamp.Timestamp{Seconds: 2}, + ScheduledAt: ×tamp.Timestamp{Seconds: 2}, FinishedAt: ×tamp.Timestamp{}, PipelineSource: &apiv2beta1.Run_PipelineSpec{ PipelineSpec: run.GetPipelineSpec(), @@ -659,7 +641,7 @@ func TestCreateRun(t *testing.T) { State: apiv2beta1.RuntimeState_PENDING, StateHistory: []*apiv2beta1.RuntimeStatus{ { - UpdateTime: ×tamp.Timestamp{Seconds: 6}, + UpdateTime: ×tamp.Timestamp{Seconds: 3}, State: apiv2beta1.RuntimeState_PENDING, }, }, @@ -745,8 +727,8 @@ func TestGetRun(t *testing.T) { DisplayName: "run1", ServiceAccount: "pipeline-runner", StorageState: apiv2beta1.Run_AVAILABLE, - CreatedAt: ×tamp.Timestamp{Seconds: 5}, - ScheduledAt: ×tamp.Timestamp{Seconds: 5}, + CreatedAt: ×tamp.Timestamp{Seconds: 2}, + ScheduledAt: ×tamp.Timestamp{Seconds: 2}, FinishedAt: ×tamp.Timestamp{}, PipelineSource: &apiv2beta1.Run_PipelineSpec{ PipelineSpec: returnedRun.GetPipelineSpec(), @@ -758,7 +740,7 @@ func TestGetRun(t *testing.T) { State: apiv2beta1.RuntimeState_PENDING, StateHistory: []*apiv2beta1.RuntimeStatus{ { - UpdateTime: ×tamp.Timestamp{Seconds: 6}, + UpdateTime: ×tamp.Timestamp{Seconds: 3}, State: apiv2beta1.RuntimeState_PENDING, }, }, @@ -900,8 +882,8 @@ func TestListRunsV1_Multiuser(t *testing.T) { Name: "run1", ServiceAccount: "pipeline-runner", StorageState: apiv1beta1.Run_STORAGESTATE_AVAILABLE, - CreatedAt: ×tamp.Timestamp{Seconds: 5}, - ScheduledAt: ×tamp.Timestamp{Seconds: 5}, + CreatedAt: ×tamp.Timestamp{Seconds: 2}, + ScheduledAt: ×tamp.Timestamp{Seconds: 2}, FinishedAt: ×tamp.Timestamp{}, Status: "Pending", PipelineSpec: &apiv1beta1.PipelineSpec{ @@ -919,10 +901,6 @@ func TestListRunsV1_Multiuser(t *testing.T) { Key: &apiv1beta1.ResourceKey{Type: apiv1beta1.ResourceType_NAMESPACE, Id: experiment.Namespace}, Relationship: apiv1beta1.Relationship_OWNER, }, - { - Key: &apiv1beta1.ResourceKey{Type: apiv1beta1.ResourceType_PIPELINE_VERSION, Id: createdRun.Run.PipelineSpec.GetPipelineId()}, - Relationship: apiv1beta1.Relationship_CREATOR, - }, }, }} expectedRunsEmpty := []*apiv1beta1.Run{} @@ -978,7 +956,7 @@ func TestListRunsV1_Multiuser(t *testing.T) { expectedRunsEmpty, }, { - "Inalid - invalid filter type", + "Invalid - invalid filter type", &apiv1beta1.ListRunsRequest{ ResourceReferenceKey: &apiv1beta1.ResourceKey{ Type: apiv1beta1.ResourceType_UNKNOWN_RESOURCE_TYPE, @@ -1036,8 +1014,8 @@ func TestListRuns(t *testing.T) { DisplayName: "run1", ServiceAccount: "pipeline-runner", StorageState: apiv2beta1.Run_AVAILABLE, - CreatedAt: ×tamp.Timestamp{Seconds: 5}, - ScheduledAt: ×tamp.Timestamp{Seconds: 5}, + CreatedAt: ×tamp.Timestamp{Seconds: 2}, + ScheduledAt: ×tamp.Timestamp{Seconds: 2}, FinishedAt: ×tamp.Timestamp{}, PipelineSource: &apiv2beta1.Run_PipelineSpec{ PipelineSpec: createdRun.GetPipelineSpec(), @@ -1049,7 +1027,7 @@ func TestListRuns(t *testing.T) { State: apiv2beta1.RuntimeState_PENDING, StateHistory: []*apiv2beta1.RuntimeStatus{ { - UpdateTime: ×tamp.Timestamp{Seconds: 6}, + UpdateTime: ×tamp.Timestamp{Seconds: 3}, State: apiv2beta1.RuntimeState_PENDING, }, },