fix(backend): Allow runs and recurring runs without creator pipeline and pipeline version (#8926)

* Deprecate experiment_id in Run API v2

* Allow runs and recurring runs without pipeline version id

* Adjust the logic for retrieving existing pipeline name

* Remove pipeline parsing logic and tests

* Simplify the logic in fetching a template
This commit is contained in:
gkcalat 2023-03-07 12:14:11 -08:00 committed by GitHub
parent 2c4b128ece
commit 7fdb1b91da
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 136 additions and 404 deletions

View File

@ -387,55 +387,13 @@ func (r *ResourceManager) CreateRun(ctx context.Context, run *model.Run) (*model
run.ExperimentId = expId
run.Namespace = expNs
// Fetch pipeline version based on pipeline spec (do not create anything yet)
var wfTemplate *template.Template
pipelineVersion, err := r.fetchPipelineVersionFromPipelineSpec(run.PipelineSpec, run.DisplayName, run.Description, run.Namespace)
// Create a template based on the manifest of an existing pipeline version or used-provided manifest.
// Update the run.PipelineSpec if an existing pipeline version is used.
tmpl, manifest, err := r.fetchTemplateFromPipelineSpec(&run.PipelineSpec)
if err != nil {
return nil, util.Wrapf(err, "Failed to create a run. Specify a valid pipeline spec")
}
manifest := run.PipelineSpec.PipelineSpecManifest
if manifest == "" {
manifest = run.PipelineSpec.WorkflowSpecManifest
}
// If pipeline version does not exist, create it
if pipelineVersion == nil {
pipelineVersion, wfTemplate, err = r.createPipelineFromSpecIfNoExisting(manifest, run.Namespace, run.DisplayName, run.Description)
if err != nil {
return nil, util.Wrap(err, "Failed to create a run due to error fetching pipeline version from pipeline spec")
}
}
run.PipelineSpec.PipelineId = pipelineVersion.PipelineId
run.PipelineSpec.PipelineVersionId = pipelineVersion.UUID
run.PipelineSpec.PipelineName = pipelineVersion.Name
if manifest == "" {
manifest = pipelineVersion.PipelineSpec
}
// Get manifest from either of the two places:
// (1) raw manifest in pipeline_spec
// (2) pipeline version in resource_references
// And the latter takes priority over the former when the manifest is from pipeline_spec.pipeline_id
// workflow/pipeline manifest and pipeline id/version will not exist at the same time, guaranteed by the validation phase
var tmpl template.Template
if wfTemplate == nil {
tempBytes, _, err := r.fetchTemplateFromPipelineVersion(pipelineVersion)
if err != nil {
return nil, util.Wrap(err, "Failed to create a run with an empty pipeline spec manifest")
}
// If manifest is empty in the existing pipeline version (KFP 2.0.0-alpha.6 and prior to that)
if manifest == "" {
manifest = string(tempBytes)
}
// Prevent creating runs with inconsistent manifests
if string(tempBytes) != manifest {
return nil, util.NewInvalidInputError("Failed to create a run due to mismatch in the provided manifest and pipeline version. You need to create a new parent pipeline version. Or submit a run with an empty pipeline spec manifest (or matching one with the parent pipeline version)")
}
tmpl, err = template.New(tempBytes)
if err != nil {
return nil, util.Wrap(err, "Failed to create a run with an invalid pipeline spec manifest")
}
} else {
tmpl = *wfTemplate
return nil, util.NewInternalServerError(err, "Failed to create a run due to error fetching manifest")
}
// TODO(gkcalat): consider changing the flow. Other resource UUIDs are assigned by their respective stores (DB).
// Proposed flow:
// 1. Create an entry and assign creation timestamp and uuid.
@ -849,150 +807,40 @@ func (r *ResourceManager) GetJob(id string) (*model.Job, error) {
// 1. Pipeline version with the given pipeline version id
// 2. The latest pipeline version with given pipeline id
// 3. Repeats 1 and 2 for pipeline version id and pipeline id parsed from the pipeline name
func (r *ResourceManager) fetchPipelineVersionFromPipelineSpec(pipelineSpec model.PipelineSpec, displayName string, description string, namespace string) (*model.PipelineVersion, error) {
func (r *ResourceManager) fetchPipelineVersionFromPipelineSpec(pipelineSpec model.PipelineSpec) (*model.PipelineVersion, error) {
// Fetch or create a pipeline version
var pipelineVersion *model.PipelineVersion
if pipelineSpec.PipelineVersionId != "" {
pv, err := r.GetPipelineVersion(pipelineSpec.PipelineVersionId)
pipelineVersion, err := r.GetPipelineVersion(pipelineSpec.PipelineVersionId)
if err != nil {
return nil, util.Wrapf(err, "Failed to fetch a pipeline version and its manifest from pipeline version %v", pipelineSpec.PipelineVersionId)
}
pipelineVersion = pv
return pipelineVersion, nil
} else if pipelineSpec.PipelineId != "" {
pv, err := r.GetLatestPipelineVersion(pipelineSpec.PipelineId)
pipelineVersion, err := r.GetLatestPipelineVersion(pipelineSpec.PipelineId)
if err != nil {
return nil, util.Wrapf(err, "Failed to fetch a pipeline version and its manifest from pipeline %v", pipelineSpec.PipelineId)
}
pipelineVersion = pv
return pipelineVersion, nil
} else if pipelineSpec.PipelineName != "" {
resourceNames := common.ParseResourceIdsFromFullName(pipelineSpec.PipelineName)
if resourceNames["PipelineVersionId"] == "" && resourceNames["PipelineId"] == "" {
return nil, util.Wrapf(util.NewInvalidInputError("Pipeline spec source is missing"), "Failed to fetch a pipeline version and its manifest due to an empty pipeline spec source: %v", pipelineSpec.PipelineName)
}
if resourceNames["PipelineVersionId"] != "" {
pv, err := r.GetPipelineVersion(resourceNames["PipelineVersionId"])
pipelineVersion, err := r.GetPipelineVersion(resourceNames["PipelineVersionId"])
if err != nil {
return nil, util.Wrapf(err, "Failed to fetch a pipeline version and its manifest from pipeline %v. Check if pipeline version %v exists", pipelineSpec.PipelineName, resourceNames["PipelineVersionId"])
}
pipelineVersion = pv
return pipelineVersion, nil
} else {
pv, err := r.GetLatestPipelineVersion(resourceNames["PipelineId"])
pipelineVersion, err := r.GetLatestPipelineVersion(resourceNames["PipelineId"])
if err != nil {
return nil, util.Wrapf(err, "Failed to fetch a pipeline version and its manifest from pipeline %v. Check if pipeline %v exists", pipelineSpec.PipelineName, resourceNames["PipelineId"])
}
pipelineVersion = pv
}
} else {
return nil, nil
}
return pipelineVersion, nil
}
// Creates a pipeline and pipeline version with the following priority if does not exists.
// Returns a pipeline version and a workflow template.
// 1. Uses an existing pipeline version with the same name, namespace, and manifest (checks the last 10 pipeline version)
// 2. Creates a new pipeline version under an existing pipeline with the same name, namespace
// 3. Creates a new pipeline and a new pipeline version
func (r ResourceManager) createPipelineFromSpecIfNoExisting(manifest string, namespace string, displayName string, description string) (*model.PipelineVersion, *template.Template, error) {
// Read manifest and extract name and IDs
tmpl, err := template.New([]byte(manifest))
if err != nil {
return nil, nil, err
}
wfName := tmpl.V2PipelineName()
resourceNames := common.ParseResourceIdsFromFullName(wfName)
pipelineVersionId := resourceNames["PipelineVersionId"]
pipelineId := resourceNames["PipelineId"]
pipelineName := ""
if pipelineId == "" && pipelineVersionId == "" {
pipelineName = wfName
}
if pipelineName == "" && displayName != "" {
pipelineName = displayName
}
fetchingError := ""
// Quickly return if a pipeline version exists
if pipelineVersionId != "" {
pv, err := r.GetPipelineVersion(pipelineVersionId)
if err != nil {
fetchingError = fmt.Sprintf("%v: Failed to fetch a pipeline version with id %v: %v", fetchingError, pipelineVersionId, err.Error())
} else {
return pv, &tmpl, nil
return pipelineVersion, nil
}
}
// Try fetching an existing pipeline by ID
var existingPipeline *model.Pipeline
var newPipeline *model.Pipeline
if pipelineId != "" {
existingPipeline, err = r.GetPipeline(pipelineId)
if err != nil {
fetchingError = fmt.Sprintf("%v: Failed to fetch a pipeline with id %v: %v", fetchingError, pipelineId, err.Error())
}
}
// Try fetching an existing pipeline by name and namespace
if pipelineName != "" {
existingPipeline, err = r.GetPipelineByNameAndNamespace(pipelineName, namespace)
if err != nil {
fetchingError = fmt.Sprintf("%v: Failed to fetch a pipeline with name %v and namespace %v: %v", fetchingError, pipelineName, namespace, err.Error())
if pipelineName != displayName && displayName != "" {
existingPipeline, err = r.GetPipelineByNameAndNamespace(displayName, namespace)
if err != nil {
fetchingError = fmt.Sprintf("%v: Failed to fetch a pipeline with name %v and namespace %v: %v", fetchingError, displayName, namespace, err.Error())
}
}
}
}
// Create a new pipeline if not found
if existingPipeline == nil {
newPipeline = &model.Pipeline{
Name: pipelineName,
Description: description,
Namespace: namespace,
}
newPipeline, err = r.CreatePipeline(newPipeline)
if err != nil {
return nil, nil, util.Wrap(util.Wrap(err, fetchingError), "Failed to fetch a pipeline version and its manifest due to error creating a new pipeline")
}
}
// Try fetching existing pipeline versions
var pipelineVersion *model.PipelineVersion
if existingPipeline != nil {
opts, err := list.NewOptions(&model.PipelineVersion{}, 10, "created_at DESC", nil)
if err != nil {
fetchingError = fmt.Sprintf("%v: Failed to prepare pipeline version listing request: %v", fetchingError, err.Error())
}
existingVersions, _, _, err := r.ListPipelineVersions(existingPipeline.UUID, opts)
if err != nil {
fetchingError = fmt.Sprintf("%v: Failed to list pipeline versions for pipeline %v: %v", fetchingError, existingPipeline.UUID, err.Error())
}
for _, version := range existingVersions {
if version.PipelineSpec == manifest {
pipelineVersion = version
break
}
}
}
// Create a new pipeline version
if pipelineVersion == nil {
pId := ""
if existingPipeline != nil {
pId = existingPipeline.UUID
} else {
pId = newPipeline.UUID
}
pipelineVersion = &model.PipelineVersion{
PipelineId: pId,
Name: fmt.Sprintf("%v-%v", pipelineName, r.time.Now().Unix()),
PipelineSpec: manifest,
Description: description,
}
pipelineVersion, err = r.CreatePipelineVersion(pipelineVersion)
if err != nil {
return nil, nil, util.Wrap(util.Wrap(err, fetchingError), "Failed to fetch a pipeline version and its manifest due to error creating a new pipeline version")
}
}
return pipelineVersion, &tmpl, nil
return nil, nil
}
// Checks if experiment exists and whether it belongs to the specified namespace.
@ -1047,56 +895,13 @@ func (r *ResourceManager) CreateJob(ctx context.Context, job *model.Job) (*model
job.ExperimentId = expId
job.Namespace = expNs
// Fetch pipeline version based on pipeline spec (do not create anything yet)
var wfTemplate *template.Template
pipelineVersion, err := r.fetchPipelineVersionFromPipelineSpec(job.PipelineSpec, job.DisplayName, job.Description, job.Namespace)
// Create a template based on the manifest of an existing pipeline version or used-provided manifest.
// Update the job.PipelineSpec if an existing pipeline version is used.
tmpl, manifest, err := r.fetchTemplateFromPipelineSpec(&job.PipelineSpec)
if err != nil {
return nil, util.Wrapf(err, "Failed to create a recurring run. Specify a valid pipeline spec")
}
manifest := job.PipelineSpec.PipelineSpecManifest
if manifest == "" {
manifest = job.PipelineSpec.WorkflowSpecManifest
}
// If pipeline version does not exist, create it
if pipelineVersion == nil {
pipelineVersion, wfTemplate, err = r.createPipelineFromSpecIfNoExisting(manifest, job.Namespace, job.DisplayName, job.Description)
if err != nil {
return nil, util.Wrap(err, "Failed to create a recurring run due to error fetching pipeline version from pipeline spec")
}
}
job.PipelineSpec.PipelineId = pipelineVersion.PipelineId
job.PipelineSpec.PipelineVersionId = pipelineVersion.UUID
job.PipelineSpec.PipelineName = pipelineVersion.Name
if manifest == "" {
manifest = pipelineVersion.PipelineSpec
}
// Get manifest from either of the two places:
// (1) raw manifest in pipeline_spec
// (2) pipeline version in resource_references
// And the latter takes priority over the former when the manifest is from pipeline_spec.pipeline_id
// workflow/pipeline manifest and pipeline id/version will not exist at the same time, guaranteed by the validation phase
var tmpl template.Template
// This should only happen when creating a run from an existing pipeline or pipeline version
if wfTemplate == nil {
tempBytes, _, err := r.fetchTemplateFromPipelineVersion(pipelineVersion)
if err != nil {
return nil, util.Wrap(err, "Failed to create a recurring run with an empty pipeline spec manifest")
}
// If manifest is empty in the existing pipeline version (KFP 2.0.0-alpha.6 and prior to that)
if manifest == "" {
manifest = string(tempBytes)
}
// Prevent creating runs with inconsistent manifests
if string(tempBytes) != manifest {
return nil, util.NewInvalidInputError("Failed to create a recurring run due to mismatch in the provided manifest and pipeline version. You need to create a new parent pipeline version. Or submit a run with an empty pipeline spec manifest (or matching one with the parent pipeline version)")
}
tmpl, err = template.New(tempBytes)
if err != nil {
return nil, util.Wrap(err, "Failed to create a recurring run with an invalid pipeline spec manifest")
}
} else {
tmpl = *wfTemplate
return nil, util.NewInternalServerError(err, "Failed to create a recurring run with an invalid pipeline spec manifest")
}
// TODO(gkcalat): consider changing the flow. Other resource UUIDs are assigned by their respective stores (DB).
// Convert modelJob into scheduledWorkflow.
scheduledWorkflow, err := tmpl.ScheduledWorkflow(job)
@ -1415,53 +1220,41 @@ func (r *ResourceManager) ReportScheduledWorkflowResource(swf *util.ScheduledWor
return r.jobStore.UpdateJob(swf)
}
// Fetches PipelineSpec's manifest as []byte array.
// It attempts to fetch PipelineSpec manifest in the following order:
// 1. Directly read from PipelineSpec's PipelineSpecManifest field.
// 2. Directly read from PipelineSpec's WorkflowSpecManifest field.
// 3. Fetch pipeline spec manifest from the pipeline version for PipelineSpec's PipelineVersionId field.
// 4. Fetch pipeline spec manifest from the latest pipeline version for PipelineSpec's PipelineId field.
func (r *ResourceManager) fetchTemplateFromPipelineSpec(p *model.PipelineSpec) ([]byte, error) {
if p == nil {
return nil, util.NewInvalidInputError("Failed to read pipeline spec manifest from nil")
}
if len(p.PipelineSpecManifest) != 0 {
return []byte(p.PipelineSpecManifest), nil
}
if len(p.WorkflowSpecManifest) != 0 {
return []byte(p.WorkflowSpecManifest), nil
}
var errPv, errP error
if p.PipelineVersionId != "" {
pv, errPv1 := r.GetPipelineVersion(p.PipelineVersionId)
if errPv1 == nil {
bytes, _, errPv2 := r.fetchTemplateFromPipelineVersion(pv)
if errPv2 == nil {
return bytes, nil
} else {
errPv = errPv2
}
} else {
errPv = errPv1
// Returns a workflow template based on the manifest in the following priority:
// 1. Pipeline spec manifest from an existing pipeline version,
// 2. Pipeline spec manifest or workflow spec manifest provided by a user.
// If an existing pipeline version is found, the referenced pipeline and pipeline version are updated.
func (r *ResourceManager) fetchTemplateFromPipelineSpec(pipelineSpec *model.PipelineSpec) (template.Template, string, error) {
manifest := ""
pipelineVersion, err := r.fetchPipelineVersionFromPipelineSpec(*pipelineSpec)
if err != nil {
return nil, "", util.Wrapf(err, "Failed to fetch a template due to error retrieving pipeline version")
} else if pipelineVersion != nil {
// Update references to the existing pipeline version
pipelineSpec.PipelineId = pipelineVersion.PipelineId
pipelineSpec.PipelineVersionId = pipelineVersion.UUID
pipelineSpec.PipelineName = pipelineVersion.Name
// Fetch the template from PipelineSpec field or the corresponding YAML file
tempBytes, _, err := r.fetchTemplateFromPipelineVersion(pipelineVersion)
if err != nil {
return nil, "", util.Wrapf(err, "Failed to fetch a template due invalid manifest in pipeline version %v", pipelineSpec.PipelineVersionId)
}
manifest = string(tempBytes)
} else {
// Read the provided manifest and fail if it is empty
manifest = pipelineSpec.PipelineSpecManifest
if manifest == "" {
manifest = pipelineSpec.WorkflowSpecManifest
}
if manifest == "" {
return nil, "", util.NewInvalidInputError("Failed to fetch a template with an empty pipeline spec manifest")
}
}
if p.PipelineId != "" {
pv, errP1 := r.GetLatestPipelineVersion(p.PipelineId)
if errP1 == nil {
bytes, _, errP2 := r.fetchTemplateFromPipelineVersion(pv)
if errP2 == nil {
return bytes, nil
} else {
errP = errP2
}
} else {
errP = errP1
}
tmpl, err := template.New([]byte(manifest))
if err != nil {
return nil, "", util.Wrap(err, "Failed to fetch a template with an invalid pipeline spec manifest")
}
return nil, util.Wrap(
util.Wrapf(errPv, "Failed to read pipeline spec for pipeline version id %v", p.PipelineVersionId),
util.Wrapf(errP, "Failed to read pipeline spec for pipeline id %v", p.PipelineId).Error(),
)
return tmpl, manifest, nil
}
// Fetches PipelineSpec as []byte array and a new URI of PipelineSpec.

View File

@ -1573,19 +1573,16 @@ func TestCreateRun_ThroughWorkflowSpecV2(t *testing.T) {
Namespace: runDetail.Namespace,
StorageState: model.StorageStateAvailable,
PipelineSpec: model.PipelineSpec{
PipelineId: "123e4567-e89b-12d3-a456-426655440000",
PipelineVersionId: "123e4567-e89b-12d3-a456-426655440000",
PipelineName: "hello-world-3",
PipelineSpecManifest: v2SpecHelloWorld,
},
RunDetails: model.RunDetails{
CreatedAtInSec: 5,
ScheduledAtInSec: 5,
CreatedAtInSec: 2,
ScheduledAtInSec: 2,
Conditions: "Pending",
State: model.RuntimeStatePending,
StateHistory: []*model.RuntimeStatus{
{
UpdateTimeInSec: 6,
UpdateTimeInSec: 3,
State: model.RuntimeStatePending,
},
},
@ -1624,20 +1621,17 @@ func TestCreateRun_ThroughWorkflowSpec(t *testing.T) {
ServiceAccount: "pipeline-runner",
StorageState: model.StorageStateAvailable,
PipelineSpec: model.PipelineSpec{
PipelineId: "123e4567-e89b-12d3-a456-426655440000",
PipelineVersionId: "123e4567-e89b-12d3-a456-426655440000",
PipelineName: "run1-3",
WorkflowSpecManifest: testWorkflow.ToStringForStore(),
Parameters: "[{\"name\":\"param1\",\"value\":\"world\"}]",
},
RunDetails: model.RunDetails{
CreatedAtInSec: 5,
ScheduledAtInSec: 5,
CreatedAtInSec: 2,
ScheduledAtInSec: 2,
Conditions: "Pending",
State: "PENDING",
StateHistory: []*model.RuntimeStatus{
{
UpdateTimeInSec: 6,
UpdateTimeInSec: 3,
State: model.RuntimeStatePending,
},
},
@ -1678,12 +1672,12 @@ func TestCreateRun_ThroughWorkflowSpecWithPatch(t *testing.T) {
ServiceAccount: "pipeline-runner",
StorageState: model.StorageStateAvailable,
RunDetails: model.RunDetails{
CreatedAtInSec: 5,
ScheduledAtInSec: 5,
CreatedAtInSec: 2,
ScheduledAtInSec: 2,
Conditions: "Pending",
StateHistory: []*model.RuntimeStatus{
{
UpdateTimeInSec: 6,
UpdateTimeInSec: 3,
State: model.RuntimeStatePending,
},
},
@ -1694,9 +1688,7 @@ func TestCreateRun_ThroughWorkflowSpecWithPatch(t *testing.T) {
Parameters: "[{\"name\":\"param1\",\"value\":\"{{kfp-default-bucket}}\"}]",
},
}
expectedRunDetail.PipelineSpec.PipelineId = runDetail.PipelineSpec.PipelineId
expectedRunDetail.PipelineSpec.PipelineName = runDetail.PipelineSpec.PipelineName
expectedRunDetail.PipelineSpec.PipelineVersionId = runDetail.PipelineSpec.PipelineVersionId
expectedRunDetail = expectedRunDetail.ToV2().ToV1()
assert.Equal(t, expectedRunDetail.ToV1(), runDetail.ToV1(), "The CreateRun return has unexpected value")
assert.Equal(t, 1, store.ExecClientFake.GetWorkflowCount(), "Workflow CRD is not created")
@ -1727,14 +1719,12 @@ func TestCreateRun_ThroughWorkflowSpecSameManifest(t *testing.T) {
},
)
assert.Nil(t, err)
assert.NotEmpty(t, newRun.PipelineVersionId)
assert.NotEmpty(t, newRun.PipelineId)
assert.Equal(t, "run1", newRun.DisplayName)
assert.Equal(t, runDetail.PipelineId, newRun.PipelineId)
assert.Equal(t, runDetail.PipelineVersionId, newRun.PipelineVersionId)
assert.Empty(t, newRun.PipelineId)
assert.Empty(t, newRun.PipelineVersionId)
assert.NotEqual(t, runDetail.WorkflowRuntimeManifest, newRun.WorkflowRuntimeManifest)
assert.Equal(t, runDetail.WorkflowSpecManifest, newRun.WorkflowSpecManifest)
assert.Equal(t, runDetail.PipelineSpecManifest, newRun.PipelineSpecManifest)
assert.Empty(t, newRun.PipelineSpecManifest)
manager.uuid = util.NewFakeUUIDGeneratorOrFatal(DefaultFakePipelineIdThree, nil)
pipelineStore.SetUUIDGenerator(util.NewFakeUUIDGeneratorOrFatal(DefaultFakePipelineIdThree, nil))
@ -1750,14 +1740,11 @@ func TestCreateRun_ThroughWorkflowSpecSameManifest(t *testing.T) {
},
)
assert.Nil(t, err)
assert.NotEmpty(t, newRun2.PipelineVersionId)
assert.NotEmpty(t, newRun2.PipelineId)
assert.Equal(t, "run1", newRun2.DisplayName)
assert.Equal(t, newRun.PipelineId, newRun2.PipelineId)
assert.NotEqual(t, newRun.PipelineVersionId, newRun2.PipelineVersionId)
assert.NotEqual(t, newRun.WorkflowRuntimeManifest, newRun2.WorkflowRuntimeManifest)
assert.NotEqual(t, newRun.WorkflowSpecManifest, newRun2.WorkflowSpecManifest)
assert.NotEqual(t, newRun.PipelineSpecManifest, newRun2.PipelineSpecManifest)
assert.Empty(t, newRun2.PipelineId)
assert.Empty(t, newRun2.PipelineVersionId)
assert.Empty(t, newRun2.WorkflowRuntimeManifest)
assert.Equal(t, v2SpecHelloWorld, newRun2.WorkflowSpecManifest)
assert.Equal(t, v2SpecHelloWorld, newRun2.PipelineSpecManifest)
}
@ -1976,7 +1963,7 @@ func TestCreateRun_EmptyPipelineSpec(t *testing.T) {
}
_, err := manager.CreateRun(context.Background(), apiRun)
assert.NotNil(t, err)
assert.Contains(t, err.Error(), "unknown template format: pipeline spec is invalid")
assert.Contains(t, err.Error(), "Failed to fetch a template with an empty pipeline spec manifest")
}
func TestCreateRun_InvalidWorkflowSpec(t *testing.T) {
@ -2308,16 +2295,14 @@ func TestCreateJob_ThroughWorkflowSpec(t *testing.T) {
ServiceAccount: "pipeline-runner",
ExperimentId: DefaultFakeUUID,
Enabled: true,
CreatedAtInSec: 5,
UpdatedAtInSec: 5,
CreatedAtInSec: 2,
UpdatedAtInSec: 2,
Conditions: "STATUS_UNSPECIFIED",
PipelineSpec: model.PipelineSpec{
WorkflowSpecManifest: testWorkflow.ToStringForStore(),
},
}
expectedJob.PipelineSpec.PipelineId = job.PipelineSpec.PipelineId
expectedJob.PipelineSpec.PipelineName = job.PipelineSpec.PipelineName
expectedJob.PipelineSpec.PipelineVersionId = job.PipelineSpec.PipelineVersionId
assert.Equal(t, expectedJob.ToV1(), job.ToV1())
}
@ -2332,8 +2317,8 @@ func TestCreateJob_ThroughWorkflowSpecV2(t *testing.T) {
ServiceAccount: "pipeline-runner",
Enabled: true,
ExperimentId: DefaultFakeUUID,
CreatedAtInSec: 5,
UpdatedAtInSec: 5,
CreatedAtInSec: 2,
UpdatedAtInSec: 2,
Conditions: "STATUS_UNSPECIFIED",
PipelineSpec: model.PipelineSpec{
PipelineSpecManifest: v2SpecHelloWorld,
@ -2343,9 +2328,7 @@ func TestCreateJob_ThroughWorkflowSpecV2(t *testing.T) {
},
},
}
expectedJob.PipelineSpec.PipelineId = job.PipelineSpec.PipelineId
expectedJob.PipelineSpec.PipelineName = job.PipelineSpec.PipelineName
expectedJob.PipelineSpec.PipelineVersionId = job.PipelineSpec.PipelineVersionId
assert.Equal(t, expectedJob.ToV1(), job.ToV1())
fetchedJob, err := manager.GetJob(job.UUID)
assert.Nil(t, err)
@ -2531,7 +2514,7 @@ func TestCreateJob_EmptyPipelineSpec(t *testing.T) {
}
_, err := manager.CreateJob(context.Background(), job)
assert.NotNil(t, err)
assert.Contains(t, err.Error(), "unknown template format: pipeline spec is invalid")
assert.Contains(t, err.Error(), "Failed to fetch a template with an empty pipeline spec manifest")
}
func TestCreateJob_InvalidWorkflowSpec(t *testing.T) {
@ -2619,8 +2602,8 @@ func TestEnableJob(t *testing.T) {
Namespace: "ns1",
ServiceAccount: "pipeline-runner",
Enabled: false,
CreatedAtInSec: 5,
UpdatedAtInSec: 6,
CreatedAtInSec: 2,
UpdatedAtInSec: 3,
Conditions: "STATUS_UNSPECIFIED",
ExperimentId: DefaultFakeUUID,
PipelineSpec: model.PipelineSpec{
@ -2773,16 +2756,16 @@ func TestReportWorkflowResource_ScheduledWorkflowIDEmpty_Success(t *testing.T) {
ServiceAccount: "pipeline-runner",
StorageState: model.StorageStateAvailable,
RunDetails: model.RunDetails{
CreatedAtInSec: 5,
ScheduledAtInSec: 5,
CreatedAtInSec: 2,
ScheduledAtInSec: 2,
Conditions: "Running",
StateHistory: []*model.RuntimeStatus{
{
UpdateTimeInSec: 6,
UpdateTimeInSec: 3,
State: model.RuntimeStatePending,
},
{
UpdateTimeInSec: 7,
UpdateTimeInSec: 4,
State: model.RuntimeStateRunning,
},
},
@ -2792,8 +2775,6 @@ func TestReportWorkflowResource_ScheduledWorkflowIDEmpty_Success(t *testing.T) {
Parameters: "[{\"name\":\"param1\",\"value\":\"world\"}]",
},
}
expectedRun.PipelineSpec.PipelineId = run.PipelineSpec.PipelineId
expectedRun.PipelineSpec.PipelineVersionId = run.PipelineSpec.PipelineVersionId
expectedRun.PipelineSpec.PipelineName = run.PipelineSpec.PipelineName
expectedRun.RunDetails.WorkflowRuntimeManifest = run.RunDetails.WorkflowRuntimeManifest
assert.Equal(t, expectedRun.ToV1(), run.ToV1())
@ -2849,7 +2830,7 @@ func TestReportWorkflowResource_ScheduledWorkflowIDNotEmpty_Success(t *testing.T
State: model.RuntimeStateUnspecified,
StateHistory: []*model.RuntimeStatus{
{
UpdateTimeInSec: 6,
UpdateTimeInSec: 3,
State: model.RuntimeStateUnspecified,
},
},
@ -2920,7 +2901,7 @@ func TestReportWorkflowResource_ScheduledWorkflowIDNotEmpty_NoExperiment_Success
State: model.RuntimeStatePending,
StateHistory: []*model.RuntimeStatus{
{
UpdateTimeInSec: 6,
UpdateTimeInSec: 3,
State: model.RuntimeStatePending,
},
},
@ -3093,12 +3074,10 @@ func TestReportScheduledWorkflowResource_Success(t *testing.T) {
WorkflowSpecManifest: testWorkflow.ToStringForStore(),
Parameters: "[]",
PipelineSpecManifest: actualJob.PipelineSpec.PipelineSpecManifest,
PipelineId: actualJob.PipelineSpec.PipelineId,
PipelineName: actualJob.PipelineSpec.PipelineName,
PipelineVersionId: actualJob.PipelineSpec.PipelineVersionId,
},
CreatedAtInSec: 5,
UpdatedAtInSec: 6,
CreatedAtInSec: 2,
UpdatedAtInSec: 3,
}
expectedJob.Conditions = "STATUS_UNSPECIFIED"
assert.Equal(t, expectedJob.ToV1(), actualJob.ToV1())
@ -3991,3 +3970,28 @@ root:
schemaVersion: 2.0.0
sdkVersion: kfp-1.6.5
`
var v2SpecHelloWorldMutated = `
components:
comp-hello-world:
executorLabel: exec-hello-world
deploymentSpec:
executors:
exec-hello-world:
container:
image: python:3.7
pipelineInfo:
name: pipelines/p1/versions/v1
root:
dag:
tasks:
hello-world:
cachingOptions:
enableCache: true
componentRef:
name: comp-hello-world
taskInfo:
name: hello-world
schemaVersion: 2.0.0
sdkVersion: kfp-1.6.5
`

View File

@ -103,7 +103,7 @@ var (
)
func TestCreateJob_WrongInput(t *testing.T) {
clients, manager, experiment, pipelineVersion := initWithExperimentAndPipelineVersion(t)
clients, manager, experiment, _ := initWithExperimentAndPipelineVersion(t)
defer clients.Close()
server := NewJobServer(manager, &JobServerOptions{CollectMetrics: false})
tests := []struct {
@ -151,37 +151,7 @@ func TestCreateJob_WrongInput(t *testing.T) {
}}},
ResourceReferences: validReference,
},
"InvalidInputError: unknown template format: pipeline spec is invalid",
},
{
"duplicate pipeline spec",
&apiv1beta1.Job{
Name: "job1",
Enabled: true,
MaxConcurrency: 1,
Trigger: &apiv1beta1.Trigger{
Trigger: &apiv1beta1.Trigger_CronSchedule{CronSchedule: &apiv1beta1.CronSchedule{
StartTime: &timestamp.Timestamp{Seconds: 1},
Cron: "1 * * * *",
}}},
PipelineSpec: &apiv1beta1.PipelineSpec{
WorkflowManifest: v2SpecHelloWorld,
},
ResourceReferences: []*api.ResourceReference{
{
Key: &apiv1beta1.ResourceKey{
Type: apiv1beta1.ResourceType_EXPERIMENT,
Id: DefaultFakeUUID,
},
Relationship: apiv1beta1.Relationship_OWNER,
},
{
Key: &apiv1beta1.ResourceKey{Type: apiv1beta1.ResourceType_PIPELINE_VERSION, Id: pipelineVersion.UUID},
Relationship: apiv1beta1.Relationship_CREATOR,
},
},
},
"Failed to create a recurring run due to mismatch in the provided manifest and pipeline version",
"Failed to fetch a template with an empty pipeline spec manifest",
},
{
"invalid pipeline spec",
@ -374,13 +344,6 @@ func TestCreateJob_NoResRefs(t *testing.T) {
},
Relationship: apiv1beta1.Relationship_OWNER,
},
{
Key: &apiv1beta1.ResourceKey{
Type: apiv1beta1.ResourceType_PIPELINE_VERSION,
Id: DefaultFakeIdTwo,
},
Relationship: apiv1beta1.Relationship_CREATOR,
},
}
expectedJob := &apiv1beta1.Job{
Id: DefaultFakeIdOne,
@ -396,8 +359,6 @@ func TestCreateJob_NoResRefs(t *testing.T) {
ServiceAccount: "pipeline-runner",
Status: "STATUS_UNSPECIFIED",
PipelineSpec: &apiv1beta1.PipelineSpec{
PipelineId: DefaultFakeIdTwo,
PipelineName: "job1-7",
WorkflowManifest: testWorkflow.ToStringForStore(),
Parameters: []*apiv1beta1.Parameter{{Name: "param1", Value: "world"}},
},
@ -674,15 +635,11 @@ func TestListJobs_Multiuser(t *testing.T) {
assert.Nil(t, err)
var expectedJobs []*apiv1beta1.Job
commonExpectedJob.PipelineSpec.PipelineId = "123e4567-e89b-12d3-a456-426655440000"
commonExpectedJob.PipelineSpec.PipelineName = "job1-3"
commonExpectedJob.CreatedAt = &timestamp.Timestamp{Seconds: 5}
commonExpectedJob.UpdatedAt = &timestamp.Timestamp{Seconds: 5}
commonExpectedJob.CreatedAt = &timestamp.Timestamp{Seconds: 2}
commonExpectedJob.UpdatedAt = &timestamp.Timestamp{Seconds: 2}
commonExpectedJob.ResourceReferences = []*apiv1beta1.ResourceReference{
{Key: &apiv1beta1.ResourceKey{Type: apiv1beta1.ResourceType_NAMESPACE, Id: "ns1"}, Relationship: apiv1beta1.Relationship_OWNER},
{Key: &apiv1beta1.ResourceKey{Type: apiv1beta1.ResourceType_EXPERIMENT, Id: DefaultFakeIdOne}, Relationship: apiv1beta1.Relationship_OWNER},
{Key: &apiv1beta1.ResourceKey{Type: apiv1beta1.ResourceType_PIPELINE, Id: DefaultFakeIdOne}, Relationship: apiv1beta1.Relationship_CREATOR},
{Key: &apiv1beta1.ResourceKey{Type: apiv1beta1.ResourceType_PIPELINE_VERSION, Id: DefaultFakeIdOne}, Relationship: apiv1beta1.Relationship_CREATOR},
}
expectedJobs = append(expectedJobs, commonExpectedJob)
expectedJobsEmpty := []*apiv1beta1.Job{}
@ -940,10 +897,10 @@ func TestCreateRecurringRun(t *testing.T) {
Cron: "1 * * * *",
}},
},
CreatedAt: &timestamp.Timestamp{Seconds: 5},
UpdatedAt: &timestamp.Timestamp{Seconds: 5},
CreatedAt: &timestamp.Timestamp{Seconds: 2},
UpdatedAt: &timestamp.Timestamp{Seconds: 2},
Status: apiv2beta1.RecurringRun_ENABLED,
PipelineSource: &apiv2beta1.RecurringRun_PipelineVersionId{PipelineVersionId: recurringRun.GetPipelineVersionId()},
PipelineSource: &apiv2beta1.RecurringRun_PipelineSpec{PipelineSpec: pipelineSpecStruct},
RuntimeConfig: &apiv2beta1.RuntimeConfig{
PipelineRoot: "model-pipeline-root",
Parameters: make(map[string]*structpb.Value),
@ -994,10 +951,10 @@ func TestGetRecurringRun(t *testing.T) {
Cron: "1 * * * *",
}},
},
CreatedAt: &timestamp.Timestamp{Seconds: 5},
UpdatedAt: &timestamp.Timestamp{Seconds: 5},
CreatedAt: &timestamp.Timestamp{Seconds: 2},
UpdatedAt: &timestamp.Timestamp{Seconds: 2},
Status: apiv2beta1.RecurringRun_ENABLED,
PipelineSource: &apiv2beta1.RecurringRun_PipelineVersionId{PipelineVersionId: createdRecurringRun.GetPipelineVersionId()},
PipelineSource: &apiv2beta1.RecurringRun_PipelineSpec{PipelineSpec: pipelineSpecStruct},
RuntimeConfig: &apiv2beta1.RuntimeConfig{
PipelineRoot: "model-pipeline-root",
Parameters: make(map[string]*structpb.Value),
@ -1035,7 +992,7 @@ func TestListRecurringRuns(t *testing.T) {
ExperimentId: "123e4567-e89b-12d3-a456-426655440000",
}
createdRecurringRun, err := server.CreateRecurringRun(nil, &apiv2beta1.CreateRecurringRunRequest{RecurringRun: apiRecurringRun})
_, err := server.CreateRecurringRun(nil, &apiv2beta1.CreateRecurringRunRequest{RecurringRun: apiRecurringRun})
assert.Nil(t, err)
expectedRecurringRun := &apiv2beta1.RecurringRun{
@ -1051,9 +1008,9 @@ func TestListRecurringRuns(t *testing.T) {
Cron: "1 * * * *",
}},
},
CreatedAt: &timestamp.Timestamp{Seconds: 5},
UpdatedAt: &timestamp.Timestamp{Seconds: 5},
PipelineSource: &apiv2beta1.RecurringRun_PipelineVersionId{PipelineVersionId: createdRecurringRun.GetPipelineVersionId()},
CreatedAt: &timestamp.Timestamp{Seconds: 2},
UpdatedAt: &timestamp.Timestamp{Seconds: 2},
PipelineSource: &apiv2beta1.RecurringRun_PipelineSpec{PipelineSpec: pipelineSpecStruct},
RuntimeConfig: &apiv2beta1.RuntimeConfig{
PipelineRoot: "model-pipeline-root",
Parameters: make(map[string]*structpb.Value),

View File

@ -123,7 +123,7 @@ func TestCreateRunV1_no_pipeline_source(t *testing.T) {
runDetail, err := server.CreateRunV1(nil, &apiv1beta1.CreateRunRequest{Run: run})
assert.NotNil(t, err)
assert.Nil(t, runDetail)
assert.Contains(t, err.Error(), "unknown template format: pipeline spec is invalid")
assert.Contains(t, err.Error(), "Failed to fetch a template with an empty pipeline spec manifest")
}
func TestCreateRunV1_invalid_spec(t *testing.T) {
@ -216,24 +216,6 @@ func TestCreateRunV1_pipelineversion(t *testing.T) {
assert.Equal(t, exp.UUID, runDetail.Run.ResourceReferences[0].Key.Id)
}
func TestCreateRunV1_MismatchedManifests(t *testing.T) {
clients, manager, _, _ := initWithExperimentAndPipelineVersion(t)
defer clients.Close()
server := NewRunServer(manager, &RunServerOptions{CollectMetrics: false})
run := &apiv1beta1.Run{
Name: "run1",
ResourceReferences: validReferencesOfExperimentAndPipelineVersion,
PipelineSpec: &apiv1beta1.PipelineSpec{
WorkflowManifest: testWorkflow2.ToStringForStore(),
Parameters: []*apiv1beta1.Parameter{{Name: "param1", Value: "world"}},
},
}
runDetail, err := server.CreateRunV1(nil, &apiv1beta1.CreateRunRequest{Run: run})
assert.NotNil(t, err)
assert.Nil(t, runDetail)
assert.Contains(t, err.Error(), "Invalid input error: Failed to create a run due to mismatch in the provided manifest and pipeline version")
}
func TestCreateRunV1_Manifest_and_pipeline_version(t *testing.T) {
clients, manager, exp, _ := initWithExperimentAndPipelineVersion(t)
defer clients.Close()
@ -646,8 +628,8 @@ func TestCreateRun(t *testing.T) {
DisplayName: "run1",
ServiceAccount: "pipeline-runner",
StorageState: apiv2beta1.Run_AVAILABLE,
CreatedAt: &timestamp.Timestamp{Seconds: 5},
ScheduledAt: &timestamp.Timestamp{Seconds: 5},
CreatedAt: &timestamp.Timestamp{Seconds: 2},
ScheduledAt: &timestamp.Timestamp{Seconds: 2},
FinishedAt: &timestamp.Timestamp{},
PipelineSource: &apiv2beta1.Run_PipelineSpec{
PipelineSpec: run.GetPipelineSpec(),
@ -659,7 +641,7 @@ func TestCreateRun(t *testing.T) {
State: apiv2beta1.RuntimeState_PENDING,
StateHistory: []*apiv2beta1.RuntimeStatus{
{
UpdateTime: &timestamp.Timestamp{Seconds: 6},
UpdateTime: &timestamp.Timestamp{Seconds: 3},
State: apiv2beta1.RuntimeState_PENDING,
},
},
@ -745,8 +727,8 @@ func TestGetRun(t *testing.T) {
DisplayName: "run1",
ServiceAccount: "pipeline-runner",
StorageState: apiv2beta1.Run_AVAILABLE,
CreatedAt: &timestamp.Timestamp{Seconds: 5},
ScheduledAt: &timestamp.Timestamp{Seconds: 5},
CreatedAt: &timestamp.Timestamp{Seconds: 2},
ScheduledAt: &timestamp.Timestamp{Seconds: 2},
FinishedAt: &timestamp.Timestamp{},
PipelineSource: &apiv2beta1.Run_PipelineSpec{
PipelineSpec: returnedRun.GetPipelineSpec(),
@ -758,7 +740,7 @@ func TestGetRun(t *testing.T) {
State: apiv2beta1.RuntimeState_PENDING,
StateHistory: []*apiv2beta1.RuntimeStatus{
{
UpdateTime: &timestamp.Timestamp{Seconds: 6},
UpdateTime: &timestamp.Timestamp{Seconds: 3},
State: apiv2beta1.RuntimeState_PENDING,
},
},
@ -900,8 +882,8 @@ func TestListRunsV1_Multiuser(t *testing.T) {
Name: "run1",
ServiceAccount: "pipeline-runner",
StorageState: apiv1beta1.Run_STORAGESTATE_AVAILABLE,
CreatedAt: &timestamp.Timestamp{Seconds: 5},
ScheduledAt: &timestamp.Timestamp{Seconds: 5},
CreatedAt: &timestamp.Timestamp{Seconds: 2},
ScheduledAt: &timestamp.Timestamp{Seconds: 2},
FinishedAt: &timestamp.Timestamp{},
Status: "Pending",
PipelineSpec: &apiv1beta1.PipelineSpec{
@ -919,10 +901,6 @@ func TestListRunsV1_Multiuser(t *testing.T) {
Key: &apiv1beta1.ResourceKey{Type: apiv1beta1.ResourceType_NAMESPACE, Id: experiment.Namespace},
Relationship: apiv1beta1.Relationship_OWNER,
},
{
Key: &apiv1beta1.ResourceKey{Type: apiv1beta1.ResourceType_PIPELINE_VERSION, Id: createdRun.Run.PipelineSpec.GetPipelineId()},
Relationship: apiv1beta1.Relationship_CREATOR,
},
},
}}
expectedRunsEmpty := []*apiv1beta1.Run{}
@ -978,7 +956,7 @@ func TestListRunsV1_Multiuser(t *testing.T) {
expectedRunsEmpty,
},
{
"Inalid - invalid filter type",
"Invalid - invalid filter type",
&apiv1beta1.ListRunsRequest{
ResourceReferenceKey: &apiv1beta1.ResourceKey{
Type: apiv1beta1.ResourceType_UNKNOWN_RESOURCE_TYPE,
@ -1036,8 +1014,8 @@ func TestListRuns(t *testing.T) {
DisplayName: "run1",
ServiceAccount: "pipeline-runner",
StorageState: apiv2beta1.Run_AVAILABLE,
CreatedAt: &timestamp.Timestamp{Seconds: 5},
ScheduledAt: &timestamp.Timestamp{Seconds: 5},
CreatedAt: &timestamp.Timestamp{Seconds: 2},
ScheduledAt: &timestamp.Timestamp{Seconds: 2},
FinishedAt: &timestamp.Timestamp{},
PipelineSource: &apiv2beta1.Run_PipelineSpec{
PipelineSpec: createdRun.GetPipelineSpec(),
@ -1049,7 +1027,7 @@ func TestListRuns(t *testing.T) {
State: apiv2beta1.RuntimeState_PENDING,
StateHistory: []*apiv2beta1.RuntimeStatus{
{
UpdateTime: &timestamp.Timestamp{Seconds: 6},
UpdateTime: &timestamp.Timestamp{Seconds: 3},
State: apiv2beta1.RuntimeState_PENDING,
},
},