feat(sdk): Always add pipeline root as a pipeline parameter (#5122)

* refactor pipeline root passing

* fix test
This commit is contained in:
Jiaxiao Zheng 2021-02-11 08:29:57 +08:00 committed by GitHub
parent b5e820e217
commit 846423a870
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 63 additions and 37 deletions

View File

@ -834,14 +834,17 @@ class Compiler(object):
# Need to first clear the default value of dsl.PipelineParams. Otherwise, it
# will be resolved immediately in place when being to each component.
default_param_values = OrderedDict()
if getattr(pipeline_func, 'output_directory', None):
dsl_pipeline_root = dsl.PipelineParam(
name=dsl.ROOT_PARAMETER_NAME, value=pipeline_func.output_directory)
pipeline_func.output_directory = dsl_pipeline_root
params_list.append(dsl_pipeline_root)
for param in params_list:
default_param_values[param.name] = param.value
param.value = None
# Currently only allow specifying pipeline params at one place.
if params_list and pipeline_meta.inputs:
raise ValueError('Either specify pipeline params in the pipeline function, or in "params_list", but not both.')
args_list = []
kwargs_dict = dict()
signature = inspect.signature(pipeline_func)
@ -865,24 +868,26 @@ class Compiler(object):
self._validate_exit_handler(dsl_pipeline)
self._sanitize_and_inject_artifact(dsl_pipeline, pipeline_conf)
# Fill in the default values.
# Fill in the default values by merging two param lists.
args_list_with_defaults = OrderedDict()
if pipeline_meta.inputs:
args_list_with_defaults = OrderedDict([
(sanitize_k8s_name(input_spec.name, True), input_spec.default)
for input_spec in pipeline_meta.inputs
])
elif params_list:
if params_list:
# Or, if args are provided by params_list, fill in pipeline_meta.
args_list_with_defaults = default_param_values
pipeline_meta.inputs = [
InputSpec(
name=param.name,
type=param.param_type,
default=default_param_values[param.name]
)
for param in params_list
]
for k, v in default_param_values.items():
args_list_with_defaults[k] = v
pipeline_meta.inputs = pipeline_meta.inputs or []
for param in params_list:
pipeline_meta.inputs.append(
InputSpec(
name=param.name,
type=param.param_type,
default=default_param_values[param.name]))
op_transformers = [add_pod_env]
op_transformers.extend(pipeline_conf.op_transformers)
@ -898,8 +903,8 @@ class Compiler(object):
workflow = fix_big_data_passing(workflow)
output_directory = getattr(pipeline_func, 'output_directory', None)
workflow = _data_passing_rewriter.add_pod_name_passing(workflow,
output_directory)
workflow = _data_passing_rewriter.add_pod_name_passing(
workflow, str(output_directory))
if pipeline_conf and pipeline_conf.data_passing_method != None:
workflow = pipeline_conf.data_passing_method(workflow)

View File

@ -196,7 +196,11 @@ def _generate_output_file_name(port_name):
# Placeholder to represent the output directory hosting all the generated URIs.
# Its actual value will be specified during pipeline compilation.
OUTPUT_DIR_PLACEHOLDER = '{{kfp.output_dir}}'
# The format of OUTPUT_DIR_PLACEHOLDER is serialized dsl.PipelineParam, to
# ensure being extracted as a pipeline parameter during compilation.
# Note that we cannot direclty import dsl module here due to circular
# dependencies.
OUTPUT_DIR_PLACEHOLDER = '{{pipelineparam:op=;name=pipeline-output-directory}}'
# Placeholder to represent to UID of the current pipeline at runtime.
# Will be replaced by engine-specific placeholder during compilation.
RUN_ID_PLACEHOLDER = '{{kfp.run_uid}}'

View File

@ -748,9 +748,9 @@ implementation:
[
'program',
'--in1-uri',
'{{kfp.output_dir}}/{{kfp.run_uid}}/{{inputs.parameters.In1-producer-pod-id-}}/In1',
'{{pipelineparam:op=;name=pipeline-output-directory}}/{{kfp.run_uid}}/{{inputs.parameters.In1-producer-pod-id-}}/In1',
'--out1-uri',
'{{kfp.output_dir}}/{{kfp.run_uid}}/{{pod.name}}/Out1',
'{{pipelineparam:op=;name=pipeline-output-directory}}/{{kfp.run_uid}}/{{pod.name}}/Out1',
],
resolved_cmd.command
)
@ -789,13 +789,13 @@ implementation:
self.assertEqual(
['--a',
'{{kfp.output_dir}}/{{kfp.run_uid}}/{{inputs.parameters.a-producer-pod-id-}}/executor_output.json',
'{{pipelineparam:op=;name=pipeline-output-directory}}/{{kfp.run_uid}}/{{inputs.parameters.a-producer-pod-id-}}/executor_output.json',
'--c',
'bar',
'--b',
'{{kfp.input-output-name.a}}',
'--metadata-location',
'{{kfp.output_dir}}/{{kfp.run_uid}}/executor_output.json'],
'{{pipelineparam:op=;name=pipeline-output-directory}}/{{kfp.run_uid}}/executor_output.json'],
resolved_cmd.args
)

View File

@ -27,3 +27,5 @@ from ._component import python_component, graph_component, component
EXECUTION_ID_PLACEHOLDER = '{{workflow.uid}}-{{pod.name}}'
RUN_ID_PLACEHOLDER = '{{workflow.uid}}'
ROOT_PARAMETER_NAME = 'pipeline-output-directory'

View File

@ -2,16 +2,18 @@ apiVersion: argoproj.io/v1alpha1
kind: Workflow
metadata:
generateName: uri-artifact-pipeline-
annotations: {pipelines.kubeflow.org/kfp_sdk_version: 1.2.0, pipelines.kubeflow.org/pipeline_compilation_time: '2020-12-23T10:49:24.672741',
pipelines.kubeflow.org/pipeline_spec: '{"inputs": [{"default": "Hello world!",
"name": "text", "optional": true}], "name": "uri-artifact-pipeline"}'}
labels: {pipelines.kubeflow.org/kfp_sdk_version: 1.2.0}
annotations: {pipelines.kubeflow.org/kfp_sdk_version: 1.4.0, pipelines.kubeflow.org/pipeline_compilation_time: '2021-02-10T16:46:54.476610',
pipelines.kubeflow.org/pipeline_spec: '{"inputs": [{"default": "Hello world!",
"name": "text", "optional": true}, {"default": "gs://my-bucket/my-output-dir",
"name": "pipeline-output-directory"}], "name": "uri-artifact-pipeline"}'}
labels: {pipelines.kubeflow.org/kfp_sdk_version: 1.4.0}
spec:
entrypoint: uri-artifact-pipeline
templates:
- name: condition-3
inputs:
parameters:
- {name: pipeline-output-directory}
- {name: input_gcs_path-producer-pod-id-}
dag:
tasks:
@ -19,6 +21,7 @@ spec:
template: read-from-gcs-3
arguments:
parameters:
- {name: pipeline-output-directory, value: '{{inputs.parameters.pipeline-output-directory}}'}
- {name: input_gcs_path-producer-pod-id-, value: '{{inputs.parameters.input_gcs_path-producer-pod-id-}}'}
- name: flip-coin
container:
@ -35,6 +38,7 @@ spec:
- name: for-loop-2
inputs:
parameters:
- {name: pipeline-output-directory}
- {name: input_gcs_path-producer-pod-id-}
dag:
tasks:
@ -42,6 +46,7 @@ spec:
template: read-from-gcs-2
arguments:
parameters:
- {name: pipeline-output-directory, value: '{{inputs.parameters.pipeline-output-directory}}'}
- {name: input_gcs_path-producer-pod-id-, value: '{{inputs.parameters.input_gcs_path-producer-pod-id-}}'}
- name: read-from-gcs
container:
@ -52,17 +57,18 @@ spec:
- |
set -e -x
gsutil cat "$0"
- gs://my-bucket/my-output-dir/{{workflow.uid}}/{{inputs.parameters.input_gcs_path-producer-pod-id-}}/output_gcs_path
- '{{inputs.parameters.pipeline-output-directory}}/{{workflow.uid}}/{{inputs.parameters.input_gcs_path-producer-pod-id-}}/output_gcs_path'
image: google/cloud-sdk:slim
inputs:
parameters:
- {name: pipeline-output-directory}
- {name: input_gcs_path-producer-pod-id-}
metadata:
annotations: {pipelines.kubeflow.org/component_spec: '{"implementation": {"container":
{"command": ["sh", "-c", "set -e -x\ngsutil cat \"$0\"\n", {"inputUri":
"input_gcs_path"}], "image": "google/cloud-sdk:slim"}}, "inputs": [{"description":
"GCS file path", "name": "input_gcs_path", "type": "String"}], "name":
"Read from GCS"}', pipelines.kubeflow.org/component_ref: '{"digest": "03872dc24d4e846635a06a596d840649f9f9d0744862c7762c0151d0cd65d60b"}'}
"GCS file path", "name": "input_gcs_path", "type": "String"}], "name": "Read
from GCS"}', pipelines.kubeflow.org/component_ref: '{"digest": "d87d6c97f22a73ec8cd2e086a7727b2e46c391f09612da1b5001dfd9b824e5c7"}'}
- name: read-from-gcs-2
container:
args: []
@ -72,17 +78,18 @@ spec:
- |
set -e -x
gsutil cat "$0"
- gs://my-bucket/my-output-dir/{{workflow.uid}}/{{inputs.parameters.input_gcs_path-producer-pod-id-}}/output_gcs_path
- '{{inputs.parameters.pipeline-output-directory}}/{{workflow.uid}}/{{inputs.parameters.input_gcs_path-producer-pod-id-}}/output_gcs_path'
image: google/cloud-sdk:slim
inputs:
parameters:
- {name: pipeline-output-directory}
- {name: input_gcs_path-producer-pod-id-}
metadata:
annotations: {pipelines.kubeflow.org/component_spec: '{"implementation": {"container":
{"command": ["sh", "-c", "set -e -x\ngsutil cat \"$0\"\n", {"inputUri":
"input_gcs_path"}], "image": "google/cloud-sdk:slim"}}, "inputs": [{"description":
"GCS file path", "name": "input_gcs_path", "type": "String"}], "name":
"Read from GCS"}', pipelines.kubeflow.org/component_ref: '{"digest": "03872dc24d4e846635a06a596d840649f9f9d0744862c7762c0151d0cd65d60b"}'}
"GCS file path", "name": "input_gcs_path", "type": "String"}], "name": "Read
from GCS"}', pipelines.kubeflow.org/component_ref: '{"digest": "d87d6c97f22a73ec8cd2e086a7727b2e46c391f09612da1b5001dfd9b824e5c7"}'}
- name: read-from-gcs-3
container:
args: []
@ -92,20 +99,22 @@ spec:
- |
set -e -x
gsutil cat "$0"
- gs://my-bucket/my-output-dir/{{workflow.uid}}/{{inputs.parameters.input_gcs_path-producer-pod-id-}}/output_gcs_path
- '{{inputs.parameters.pipeline-output-directory}}/{{workflow.uid}}/{{inputs.parameters.input_gcs_path-producer-pod-id-}}/output_gcs_path'
image: google/cloud-sdk:slim
inputs:
parameters:
- {name: pipeline-output-directory}
- {name: input_gcs_path-producer-pod-id-}
metadata:
annotations: {pipelines.kubeflow.org/component_spec: '{"implementation": {"container":
{"command": ["sh", "-c", "set -e -x\ngsutil cat \"$0\"\n", {"inputUri":
"input_gcs_path"}], "image": "google/cloud-sdk:slim"}}, "inputs": [{"description":
"GCS file path", "name": "input_gcs_path", "type": "String"}], "name":
"Read from GCS"}', pipelines.kubeflow.org/component_ref: '{"digest": "03872dc24d4e846635a06a596d840649f9f9d0744862c7762c0151d0cd65d60b"}'}
"GCS file path", "name": "input_gcs_path", "type": "String"}], "name": "Read
from GCS"}', pipelines.kubeflow.org/component_ref: '{"digest": "d87d6c97f22a73ec8cd2e086a7727b2e46c391f09612da1b5001dfd9b824e5c7"}'}
- name: uri-artifact-pipeline
inputs:
parameters:
- {name: pipeline-output-directory}
- {name: text}
dag:
tasks:
@ -115,6 +124,7 @@ spec:
dependencies: [flip-coin, write-to-gcs]
arguments:
parameters:
- {name: pipeline-output-directory, value: '{{inputs.parameters.pipeline-output-directory}}'}
- {name: input_gcs_path-producer-pod-id-, value: '{{tasks.write-to-gcs.outputs.parameters.write-to-gcs-output_gcs_path-producer-pod-id-}}'}
- {name: flip-coin, template: flip-coin}
- name: for-loop-2
@ -122,6 +132,7 @@ spec:
dependencies: [write-to-gcs]
arguments:
parameters:
- {name: pipeline-output-directory, value: '{{inputs.parameters.pipeline-output-directory}}'}
- {name: input_gcs_path-producer-pod-id-, value: '{{tasks.write-to-gcs.outputs.parameters.write-to-gcs-output_gcs_path-producer-pod-id-}}'}
withItems: [1, 2, 3, 4]
- name: read-from-gcs
@ -129,11 +140,13 @@ spec:
dependencies: [write-to-gcs]
arguments:
parameters:
- {name: pipeline-output-directory, value: '{{inputs.parameters.pipeline-output-directory}}'}
- {name: input_gcs_path-producer-pod-id-, value: '{{tasks.write-to-gcs.outputs.parameters.write-to-gcs-output_gcs_path-producer-pod-id-}}'}
- name: write-to-gcs
template: write-to-gcs
arguments:
parameters:
- {name: pipeline-output-directory, value: '{{inputs.parameters.pipeline-output-directory}}'}
- {name: text, value: '{{inputs.parameters.text}}'}
- name: write-to-gcs
container:
@ -145,10 +158,11 @@ spec:
set -e -x
echo "$0" | gsutil cp - "$1"
- '{{inputs.parameters.text}}'
- gs://my-bucket/my-output-dir/{{workflow.uid}}/{{pod.name}}/output_gcs_path
- '{{inputs.parameters.pipeline-output-directory}}/{{workflow.uid}}/{{pod.name}}/output_gcs_path'
image: google/cloud-sdk:slim
inputs:
parameters:
- {name: pipeline-output-directory}
- {name: text}
outputs:
parameters:
@ -160,9 +174,10 @@ spec:
"inputs": [{"description": "Content to be written to GCS", "name": "text",
"type": "String"}], "name": "Write to GCS", "outputs": [{"description":
"GCS file path", "name": "output_gcs_path", "type": "String"}]}', pipelines.kubeflow.org/component_ref: '{"digest":
"8ec42d2ddcbbce0607719e277bbe8e40f3649a894e46c8e92309951733bc7766"}', pipelines.kubeflow.org/arguments.parameters: '{"text":
"aad0cd48834955bdd0d570a625283957f9c6ff3b77f59a2c47af933ae4b8ef64"}', pipelines.kubeflow.org/arguments.parameters: '{"text":
"{{inputs.parameters.text}}"}'}
arguments:
parameters:
- {name: text, value: Hello world!}
- {name: pipeline-output-directory, value: 'gs://my-bucket/my-output-dir'}
serviceAccountName: pipeline-runner