Fix the logic of passing default values of pipeline parameters. (#2098)

* Fix the logic of passing default values.

* Modify unit test

* Solve.
This commit is contained in:
Jiaxiao Zheng 2019-09-12 17:10:33 -07:00 committed by Kubernetes Prow Robot
parent 97b01b8f4a
commit 1449d08aee
3 changed files with 16 additions and 4 deletions

View File

@ -718,7 +718,7 @@ class Compiler(object):
pipeline_func: Callable,
pipeline_name: Text=None,
pipeline_description: Text=None,
params_list: List[dsl.PipelineParam]=()) -> Dict[Text, Any]:
params_list: List[dsl.PipelineParam]=None) -> Dict[Text, Any]:
""" Create workflow spec from pipeline function and specified pipeline
params/metadata. Currently, the pipeline params are either specified in
the signature of the pipeline function or by passing a list of
@ -730,6 +730,7 @@ class Compiler(object):
:param params_list: list of pipeline params to append to the pipeline.
:return: workflow dict.
"""
params_list = params_list or []
argspec = inspect.getfullargspec(pipeline_func)
# Create the arg list with no default values and call pipeline function.
@ -739,6 +740,13 @@ class Compiler(object):
pipeline_meta.description = pipeline_description or pipeline_meta.description
pipeline_name = K8sHelper.sanitize_k8s_name(pipeline_meta.name)
# Need to first clear the default value of dsl.PipelineParams. Otherwise, it
# will be resolved immediately in place when being to each component.
default_param_values = {}
for param in params_list:
default_param_values[param.name] = param.value
param.value = None
# Currently only allow specifying pipeline params at one place.
if params_list and pipeline_meta.inputs:
raise ValueError('Either specify pipeline params in the pipeline function, or in "params_list", but not both.')
@ -767,6 +775,9 @@ class Compiler(object):
arg.value = default.value if isinstance(default, dsl.PipelineParam) else default
else:
# Or, if args are provided by params_list, fill in pipeline_meta.
for param in params_list:
param.value = default_param_values[param.name]
args_list_with_defaults = params_list
pipeline_meta.inputs = [
ParameterMeta(
@ -790,7 +801,7 @@ class Compiler(object):
def _compile(self, pipeline_func):
"""Compile the given pipeline function into workflow."""
return self.create_workflow(pipeline_func, [])
return self.create_workflow(pipeline_func=pipeline_func)
def compile(self, pipeline_func, package_path, type_check=True):
"""Compile the given pipeline function into workflow yaml.

View File

@ -18,7 +18,7 @@ import kfp.gcp as gcp
message_param = dsl.PipelineParam(name='message')
output_path_param = dsl.PipelineParam(name='outputpath')
output_path_param = dsl.PipelineParam(name='outputpath', value='default_output')
class GetFrequentWordOp(dsl.ContainerOp):
"""A get frequent word class representing a component in ML Pipelines.

View File

@ -15,13 +15,14 @@ apiVersion: argoproj.io/v1alpha1
kind: Workflow
metadata:
annotations:
pipelines.kubeflow.org/pipeline_spec: '{"description": "Get Most Frequent Word and Save to GCS", "inputs": [{"name": "message"}, {"name": "outputpath"}], "name": "Save Most Frequent"}'
pipelines.kubeflow.org/pipeline_spec: '{"description": "Get Most Frequent Word and Save to GCS", "inputs": [{"name": "message"}, {"default": "default_output", "name": "outputpath"}], "name": "Save Most Frequent"}'
generateName: save-most-frequent-
spec:
arguments:
parameters:
- name: message
- name: outputpath
value: default_output
entrypoint: save-most-frequent
serviceAccountName: pipeline-runner
onExit: exiting