[SDK-compiler] Refactor Compiler to expose an API to write out yaml spec of pipeline. (#2146)

* Refactor.

* Remove redundant code.

* Fix.

* Move the implementation of create_workflow into a private api.

* Change write_workflow to private.

* deprecation warning
This commit is contained in:
Jiaxiao Zheng 2019-10-03 16:45:56 -07:00 committed by Kubernetes Prow Robot
parent b850360cd6
commit 9a9bd904ac
1 changed files with 68 additions and 38 deletions

View File

@ -13,6 +13,7 @@
# limitations under the License.
import json
from collections import defaultdict
from deprecated import deprecated
import inspect
import tarfile
import zipfile
@ -728,25 +729,14 @@ class Compiler(object):
sanitized_ops[sanitized_name] = op
pipeline.ops = sanitized_ops
def create_workflow(self,
def _create_workflow(self,
pipeline_func: Callable,
pipeline_name: Text=None,
pipeline_description: Text=None,
params_list: List[dsl.PipelineParam]=None,
pipeline_conf: dsl.PipelineConf = None,
) -> Dict[Text, Any]:
""" Create workflow spec from pipeline function and specified pipeline
params/metadata. Currently, the pipeline params are either specified in
the signature of the pipeline function or by passing a list of
dsl.PipelineParam. Conflict will cause ValueError.
:param pipeline_func: pipeline function where ContainerOps are invoked.
:param pipeline_name:
:param pipeline_description:
:param params_list: list of pipeline params to append to the pipeline.
:param pipeline_conf: PipelineConf instance. Can specify op transforms, image pull secrets and other pipeline-level configuration options. Overrides any configuration that may be set by the pipeline.
:return: workflow dict.
"""
) -> Dict[Text, Any]:
""" Internal implementation of create_workflow."""
params_list = params_list or []
argspec = inspect.getfullargspec(pipeline_func)
@ -824,6 +814,30 @@ class Compiler(object):
return workflow
@deprecated(
version='0.1.32',
reason='Workflow spec is not intended to be handled by user, please '
'switch to _create_workflow')
def create_workflow(self,
pipeline_func: Callable,
pipeline_name: Text=None,
pipeline_description: Text=None,
params_list: List[dsl.PipelineParam]=None,
pipeline_conf: dsl.PipelineConf = None) -> Dict[Text, Any]:
""" Create workflow spec from pipeline function and specified pipeline
params/metadata. Currently, the pipeline params are either specified in
the signature of the pipeline function or by passing a list of
dsl.PipelineParam. Conflict will cause ValueError.
:param pipeline_func: pipeline function where ContainerOps are invoked.
:param pipeline_name:
:param pipeline_description:
:param params_list: list of pipeline params to append to the pipeline.
:param pipeline_conf: PipelineConf instance. Can specify op transforms, image pull secrets and other pipeline-level configuration options. Overrides any configuration that may be set by the pipeline.
:return: workflow dict.
"""
return self._create_workflow(pipeline_func, pipeline_name, pipeline_description, params_list, pipeline_conf)
def _compile(self, pipeline_func, pipeline_conf: dsl.PipelineConf = None):
"""Compile the given pipeline function into workflow."""
return self.create_workflow(pipeline_func=pipeline_func, pipeline_conf=pipeline_conf)
@ -842,33 +856,49 @@ class Compiler(object):
try:
kfp.TYPE_CHECK = type_check
workflow = self._compile(pipeline_func, pipeline_conf)
yaml.Dumper.ignore_aliases = lambda *args : True
yaml_text = yaml.dump(workflow, default_flow_style=False)
self._write_workflow(workflow, package_path)
finally:
kfp.TYPE_CHECK = type_check_old_value
if package_path is None:
return yaml_text
@staticmethod
def _write_workflow(workflow: Dict[Text, Any], package_path: Text = None):
"""Dump pipeline workflow into yaml spec and write out in the format specified by the user.
if package_path.endswith('.tar.gz') or package_path.endswith('.tgz'):
from contextlib import closing
from io import BytesIO
with tarfile.open(package_path, "w:gz") as tar:
Args:
workflow: Workflow spec of the pipline, dict.
package_path: file path to be written. If not specified, a yaml_text string
will be returned.
"""
yaml.Dumper.ignore_aliases = lambda *args : True
yaml_text = yaml.dump(workflow, default_flow_style=False)
if '{{pipelineparam' in yaml_text:
raise RuntimeError(
'Internal compiler error: Found unresolved PipelineParam. '
'Please create a new issue at https://github.com/kubeflow/pipelines/issues '
'attaching the pipeline code and the pipeline package.' )
if package_path is None:
return yaml_text
if package_path.endswith('.tar.gz') or package_path.endswith('.tgz'):
from contextlib import closing
from io import BytesIO
with tarfile.open(package_path, "w:gz") as tar:
with closing(BytesIO(yaml_text.encode())) as yaml_file:
tarinfo = tarfile.TarInfo('pipeline.yaml')
tarinfo.size = len(yaml_file.getvalue())
tar.addfile(tarinfo, fileobj=yaml_file)
elif package_path.endswith('.zip'):
with zipfile.ZipFile(package_path, "w") as zip:
zipinfo = zipfile.ZipInfo('pipeline.yaml')
zipinfo.compress_type = zipfile.ZIP_DEFLATED
zip.writestr(zipinfo, yaml_text)
elif package_path.endswith('.yaml') or package_path.endswith('.yml'):
with open(package_path, 'w') as yaml_file:
yaml_file.write(yaml_text)
else:
raise ValueError('The output path '+ package_path + ' should ends with one of the following formats: [.tar.gz, .tgz, .zip, .yaml, .yml]')
finally:
kfp.TYPE_CHECK = type_check_old_value
if '{{pipelineparam' in yaml_text:
raise RuntimeError('Internal compiler error: Found unresolved PipelineParam. Please create a new issue at https://github.com/kubeflow/pipelines/issues attaching the pipeline code and the pipeline package.' )
elif package_path.endswith('.zip'):
with zipfile.ZipFile(package_path, "w") as zip:
zipinfo = zipfile.ZipInfo('pipeline.yaml')
zipinfo.compress_type = zipfile.ZIP_DEFLATED
zip.writestr(zipinfo, yaml_text)
elif package_path.endswith('.yaml') or package_path.endswith('.yml'):
with open(package_path, 'w') as yaml_file:
yaml_file.write(yaml_text)
else:
raise ValueError(
'The output path '+ package_path +
' should ends with one of the following formats: '
'[.tar.gz, .tgz, .zip, .yaml, .yml]')