[SDK/Compiler] Add _create_and_write_workflow method (#2321)

* add _create_and_write_workflow

* Add pointer to TFX dag runner usage.
This commit is contained in:
Jiaxiao Zheng 2019-10-07 14:13:10 -07:00 committed by Kubernetes Prow Robot
parent 71c7100083
commit 092845d134
1 changed files with 28 additions and 2 deletions

View File

@ -814,6 +814,8 @@ class Compiler(object):
return workflow
# For now (0.1.31) this function is only used by TFX's KubeflowDagRunner.
# See https://github.com/tensorflow/tfx/blob/811e4c1cc0f7903d73d151b9d4f21f79f6013d4a/tfx/orchestration/kubeflow/kubeflow_dag_runner.py#L238
@deprecated(
version='0.1.32',
reason='Workflow spec is not intended to be handled by user, please '
@ -838,6 +840,9 @@ class Compiler(object):
"""
return self._create_workflow(pipeline_func, pipeline_name, pipeline_description, params_list, pipeline_conf)
@deprecated(
version='0.1.32',
reason='Switch to _create_workflow.')
def _compile(self, pipeline_func, pipeline_conf: dsl.PipelineConf = None):
"""Compile the given pipeline function into workflow."""
return self._create_workflow(pipeline_func=pipeline_func, pipeline_conf=pipeline_conf)
@ -855,8 +860,10 @@ class Compiler(object):
type_check_old_value = kfp.TYPE_CHECK
try:
kfp.TYPE_CHECK = type_check
workflow = self._compile(pipeline_func, pipeline_conf)
self._write_workflow(workflow, package_path)
self._create_and_write_workflow(
pipeline_func=pipeline_func,
pipeline_conf=pipeline_conf,
package_path=package_path)
finally:
kfp.TYPE_CHECK = type_check_old_value
@ -902,3 +909,22 @@ class Compiler(object):
'The output path '+ package_path +
' should ends with one of the following formats: '
'[.tar.gz, .tgz, .zip, .yaml, .yml]')
def _create_and_write_workflow(
self,
pipeline_func: Callable,
pipeline_name: Text=None,
pipeline_description: Text=None,
params_list: List[dsl.PipelineParam]=None,
pipeline_conf: dsl.PipelineConf=None,
package_path: Text=None
) -> None:
"""Compile the given pipeline function and dump it to specified file format."""
workflow = self._create_workflow(
pipeline_func,
pipeline_name,
pipeline_description,
params_list,
pipeline_conf)
self._write_workflow(workflow, package_path)