chore: Clean up KFP SDK docstrings, make formatting a little more consistent (#4218)

* Prepare SDK docs environment so its easier to understand how to build the docs locally so theyre consistent with ReadTheDocs.

* Clean up docstrings for kfp.Client

* Add in updates to the docs for compiler and components

* Update components area to add in code references and make formatting a little more consistent.

* Clean up containers, add in custom CSS to ensure we do not overflow on inline code blocks

* Clean up containers, add in custom CSS to ensure we do not overflow on inline code blocks

* Remove unused kfp.notebook package links

* Clean up a few more errant references

* Clean up the DSL docs some more

* Update SDK docs for KFP extensions to follow Sphinx guidelines

* Clean up formatting of docstrings after Ark-Kuns comments
This commit is contained in:
Alex Latchford 2020-08-03 09:33:47 -07:00 committed by GitHub
parent 335323353f
commit 704c8c7660
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
32 changed files with 694 additions and 628 deletions

8
.readthedocs.yml Normal file
View File

@ -0,0 +1,8 @@
# See https://docs.readthedocs.io/en/stable/config-file/v2.html for details
version: 2
sphinx:
configuration: docs/conf.py
python:
version: 3.7
install:
- requirements: sdk/python/requirements.txt

3
docs/_static/custom.css vendored Normal file
View File

@ -0,0 +1,3 @@
.rst-content code, .rst-content tt, code {
white-space: normal;
}

View File

@ -55,7 +55,7 @@ sys.path.insert(0, os.path.abspath('../sdk/python/'))
# -- Project information -----------------------------------------------------
project = 'Kubeflow Pipelines'
copyright = '2019, Google'
copyright = '2020, Google'
author = 'Google'
# The short X.Y version
@ -106,7 +106,6 @@ exclude_patterns = ['_build', 'Thumbs.db', '.DS_Store']
# The name of the Pygments (syntax highlighting) style to use.
pygments_style = None
# -- Options for HTML output -------------------------------------------------
# The theme to use for HTML and HTML Help pages. See the documentation for
@ -140,6 +139,10 @@ html_static_path = ['_static']
#
# html_sidebars = {}
html_css_files = [
'custom.css',
]
# -- Options for HTMLHelp output ---------------------------------------------

View File

@ -3,8 +3,8 @@
You can adapt this file completely to your liking, but it should at least
contain the root `toctree` directive.
Welcome to Kubeflow Pipelines SDK API reference
================================================
Kubeflow Pipelines SDK API
==========================
Main documentation: https://www.kubeflow.org/docs/pipelines/
@ -12,11 +12,11 @@ Source code: https://github.com/kubeflow/pipelines/
.. toctree::
:maxdepth: 3
:caption: Contents:
:caption: Contents
self
source/kfp
.. * :ref:`modindex`
.. * :ref:`kfp-ref`
.. * :ref:`search`

2
docs/requirements.txt Normal file
View File

@ -0,0 +1,2 @@
sphinx==3.1.2
sphinx_rtd_theme==0.5.0

View File

@ -1,5 +1,5 @@
kfp.Client class
------------------
================
.. autoclass:: kfp.Client
:members:
@ -7,7 +7,7 @@ kfp.Client class
:show-inheritance:
Generated APIs
------------------
--------------
.. toctree::
:maxdepth: 2

View File

@ -9,5 +9,6 @@ kfp.components.structures package
:imported-members:
.. toctree::
:maxdepth: 2
kfp.components.structures.kubernetes

View File

@ -1,8 +0,0 @@
kfp.notebook package
====================
.. automodule:: kfp.notebook
:members:
:undoc-members:
:show-inheritance:
:imported-members:

View File

@ -7,12 +7,11 @@ kfp package
.. toctree::
:maxdepth: 2
kfp.client
kfp.compiler
kfp.components
kfp.containers
kfp.dsl
kfp.client
kfp.notebook
kfp.extensions
.. automodule:: kfp

View File

@ -1,7 +0,0 @@
kfp
===
.. toctree::
:maxdepth: 4
kfp

View File

@ -50,9 +50,9 @@ _FILTER_OPERATIONS = {"UNKNOWN": 0,
"LESS_THAN_EQUALS": 7}
def _add_generated_apis(target_struct, api_module, api_client):
'''Initializes a hierarchical API object based on the generated API module.
"""Initializes a hierarchical API object based on the generated API module.
PipelineServiceApi.create_pipeline becomes target_struct.pipelines.create_pipeline
'''
"""
Struct = type('Struct', (), {})
def camel_case_to_snake_case(name):
@ -88,7 +88,26 @@ KF_PIPELINES_OVERRIDE_EXPERIMENT_NAME = 'KF_PIPELINES_OVERRIDE_EXPERIMENT_NAME'
class Client(object):
""" API Client for KubeFlow Pipeline.
"""API Client for KubeFlow Pipeline.
Args:
host: The host name to use to talk to Kubeflow Pipelines. If not set, the in-cluster
service DNS name will be used, which only works if the current environment is a pod
in the same cluster (such as a Jupyter instance spawned by Kubeflow's
JupyterHub). If you have a different connection to cluster, such as a kubectl
proxy connection, then set it to something like "127.0.0.1:8080/pipeline.
If you connect to an IAP enabled cluster, set it to
https://<your-deployment>.endpoints.<your-project>.cloud.goog/pipeline".
client_id: The client ID used by Identity-Aware Proxy.
namespace: The namespace where the kubeflow pipeline system is run.
other_client_id: The client ID used to obtain the auth codes and refresh tokens.
Reference: https://cloud.google.com/iap/docs/authentication-howto#authenticating_from_a_desktop_app.
other_client_secret: The client secret used to obtain the auth codes and refresh tokens.
existing_token: Pass in token directly, it's used for cases better get token outside of SDK, e.x. GCP Cloud Functions
or caller already has a token
cookies: CookieJar object containing cookies that will be passed to the pipelines API.
proxy: HTTP or HTTPS proxy server
ssl_ca_cert: Cert for proxy
"""
# in-cluster DNS name of the pipeline service
@ -100,25 +119,6 @@ class Client(object):
# TODO: Wrap the configurations for different authentication methods.
def __init__(self, host=None, client_id=None, namespace='kubeflow', other_client_id=None, other_client_secret=None, existing_token=None, cookies=None, proxy=None, ssl_ca_cert=None):
"""Create a new instance of kfp client.
Args:
host: the host name to use to talk to Kubeflow Pipelines. If not set, the in-cluster
service DNS name will be used, which only works if the current environment is a pod
in the same cluster (such as a Jupyter instance spawned by Kubeflow's
JupyterHub). If you have a different connection to cluster, such as a kubectl
proxy connection, then set it to something like "127.0.0.1:8080/pipeline.
If you connect to an IAP enabled cluster, set it to
https://<your-deployment>.endpoints.<your-project>.cloud.goog/pipeline".
client_id: The client ID used by Identity-Aware Proxy.
namespace: the namespace where the kubeflow pipeline system is run.
other_client_id: The client ID used to obtain the auth codes and refresh tokens.
Reference: https://cloud.google.com/iap/docs/authentication-howto#authenticating_from_a_desktop_app.
other_client_secret: The client secret used to obtain the auth codes and refresh tokens.
existing_token: pass in token directly, it's used for cases better get token outside of SDK, e.x. GCP Cloud Functions
or caller already has a token
cookies: CookieJar object containing cookies that will be passed to the pipelines API.
proxy: HTTP or HTTPS proxy server
ssl_ca_cert: cert for proxy
"""
host = host or os.environ.get(KF_PIPELINES_ENDPOINT_ENV)
self._uihost = os.environ.get(KF_PIPELINES_UI_ENDPOINT_ENV, host)
@ -265,7 +265,9 @@ class Client(object):
def set_user_namespace(self, namespace):
"""Set user namespace into local context setting file.
This function should only be used when Kubeflow Pipelines is in the multi-user mode.
This function should only be used when Kubeflow Pipelines is in the multi-user mode.
Args:
namespace: kubernetes namespace the user has access to.
"""
@ -275,6 +277,7 @@ class Client(object):
def get_user_namespace(self):
"""Get user namespace in context config.
Returns:
namespace: kubernetes namespace from the local context file or empty if it wasn't set.
"""
@ -282,12 +285,14 @@ class Client(object):
def create_experiment(self, name, description=None, namespace=None):
"""Create a new experiment.
Args:
name: the name of the experiment.
description: description of the experiment.
namespace: kubernetes namespace where the experiment should be created.
name: The name of the experiment.
description: Description of the experiment.
namespace: Kubernetes namespace where the experiment should be created.
For single user deployment, leave it as None;
For multi user, input a namespace where the user is authorized.
Returns:
An Experiment object. Most important field is id.
"""
@ -323,11 +328,13 @@ class Client(object):
return experiment
def get_pipeline_id(self, name):
"""Returns the pipeline id if a pipeline with the name exsists.
"""Find the id of a pipeline by name.
Args:
name: pipeline name
name: Pipeline name.
Returns:
A response object including a list of experiments and next page token.
Returns the pipeline id if a pipeline with the name exists.
"""
pipeline_filter = json.dumps({
"predicates": [
@ -347,13 +354,15 @@ class Client(object):
def list_experiments(self, page_token='', page_size=10, sort_by='', namespace=None):
"""List experiments.
Args:
page_token: token for starting of the page.
page_size: size of the page.
sort_by: can be '[field_name]', '[field_name] des'. For example, 'name desc'.
namespace: kubernetes namespace where the experiment was created.
page_token: Token for starting of the page.
page_size: Size of the page.
sort_by: Can be '[field_name]', '[field_name] des'. For example, 'name desc'.
namespace: Kubernetes namespace where the experiment was created.
For single user deployment, leave it as None;
For multi user, input a namespace where the user is authorized.
Returns:
A response object including a list of experiments and next page token.
"""
@ -368,15 +377,19 @@ class Client(object):
def get_experiment(self, experiment_id=None, experiment_name=None, namespace=None):
"""Get details of an experiment
Either experiment_id or experiment_name is required
Args:
experiment_id: id of the experiment. (Optional)
experiment_name: name of the experiment. (Optional)
namespace: kubernetes namespace where the experiment was created.
experiment_id: Id of the experiment. (Optional)
experiment_name: Name of the experiment. (Optional)
namespace: Kubernetes namespace where the experiment was created.
For single user deployment, leave it as None;
For multi user, input the namespace where the user is authorized.
Returns:
A response object including details of a experiment.
Throws:
Exception if experiment is not found or None of the arguments is provided
"""
@ -426,10 +439,12 @@ class Client(object):
def list_pipelines(self, page_token='', page_size=10, sort_by=''):
"""List pipelines.
Args:
page_token: token for starting of the page.
page_size: size of the page.
page_token: Token for starting of the page.
page_size: Size of the page.
sort_by: one of 'field_name', 'field_name desc'. For example, 'name desc'.
Returns:
A response object including a list of pipelines and next page token.
"""
@ -437,13 +452,15 @@ class Client(object):
def list_pipeline_versions(self, pipeline_id: str, page_token='', page_size=10, sort_by=''):
"""List all versions of a given pipeline.
Args:
pipeline_id: the string ID of a pipeline.
page_token: token for starting of the page.
page_size: size of the page.
sort_by: one of 'field_name', 'field_name desc'. For example, 'name desc'.
pipeline_id: The id of a pipeline.
page_token: Token for starting of the page.
page_size: Size of the page.
sort_by: one of 'field_name', 'field_name desc'. For example, 'name desc'.
Returns:
A response object including a list of pipelines and next page token.
A response object including a list of pipeline versions and next page token.
"""
return self._pipelines_api.list_pipeline_versions(
resource_key_type="PIPELINE",
@ -458,12 +475,12 @@ class Client(object):
"""Run a specified pipeline.
Args:
experiment_id: The string id of an experiment.
job_name: name of the job.
pipeline_package_path: local path of the pipeline package(the filename should end with one of the following .tar.gz, .tgz, .zip, .yaml, .yml).
params: a dictionary with key (string) as param name and value (string) as as param value.
pipeline_id: the string ID of a pipeline.
version_id: the string ID of a pipeline version.
experiment_id: The id of an experiment.
job_name: Name of the job.
pipeline_package_path: Local path of the pipeline package(the filename should end with one of the following .tar.gz, .tgz, .zip, .yaml, .yml).
params: A dictionary with key (string) as param name and value (string) as as param value.
pipeline_id: The id of a pipeline.
version_id: The id of a pipeline version.
If both pipeline_id and version_id are specified, version_id will take precendence.
If only pipeline_id is specified, the default version of this pipeline is used to create the run.
@ -490,9 +507,10 @@ class Client(object):
def create_recurring_run(self, experiment_id, job_name, description=None, start_time=None, end_time=None, interval_second=None, cron_expression=None, max_concurrency=1, no_catchup=None, params={}, pipeline_package_path=None, pipeline_id=None, version_id=None, enabled=True):
"""Create a recurring run.
Args:
experiment_id: The string id of an experiment.
job_name: name of the job.
job_name: Name of the job.
description: An optional job description.
start_time: The RFC3339 time string of the time when to start the job.
end_time: The RFC3339 time string of the time when to end the job.
@ -513,6 +531,7 @@ class Client(object):
If both pipeline_id and version_id are specified, pipeline_id will take precendence
This will change in a future version, so it is recommended to use version_id by itself.
enabled: A bool indicating whether the recurring run is enabled or disabled.
Returns:
A Job object. Most important field is id.
"""
@ -549,14 +568,16 @@ class Client(object):
def _create_job_config(self, experiment_id, params, pipeline_package_path, pipeline_id, version_id):
"""Create a JobConfig with spec and resource_references.
Args:
experiment_id: The string id of an experiment.
experiment_id: The id of an experiment.
pipeline_package_path: Local path of the pipeline package(the filename should end with one of the following .tar.gz, .tgz, .zip, .yaml, .yml).
params: A dictionary with key (string) as param name and value (string) as param value.
pipeline_id: The string ID of a pipeline.
version_id: The string ID of a pipeline version.
pipeline_id: The id of a pipeline.
version_id: The id of a pipeline version.
If both pipeline_id and version_id are specified, pipeline_id will take precendence
This will change in a future version, so it is recommended to use version_id by itself.
Returns:
A JobConfig object with attributes spec and resource_reference.
"""
@ -594,7 +615,8 @@ class Client(object):
return JobConfig(spec=spec, resource_references=resource_references)
def create_run_from_pipeline_func(self, pipeline_func: Callable, arguments: Mapping[str, str], run_name=None, experiment_name=None, pipeline_conf: kfp.dsl.PipelineConf = None, namespace=None):
'''Runs pipeline on KFP-enabled Kubernetes cluster.
"""Runs pipeline on KFP-enabled Kubernetes cluster.
This command compiles the pipeline function, creates or gets an experiment and submits the pipeline for execution.
Args:
@ -602,10 +624,10 @@ class Client(object):
arguments: Arguments to the pipeline function provided as a dict.
run_name: Optional. Name of the run to be shown in the UI.
experiment_name: Optional. Name of the experiment to add the run to.
namespace: kubernetes namespace where the pipeline runs are created.
namespace: Kubernetes namespace where the pipeline runs are created.
For single user deployment, leave it as None;
For multi user, input a namespace where the user is authorized
'''
"""
#TODO: Check arguments against the pipeline function
pipeline_name = pipeline_func.__name__
run_name = run_name or pipeline_name + ' ' + datetime.datetime.now().strftime('%Y-%m-%d %H-%M-%S')
@ -615,7 +637,8 @@ class Client(object):
return self.create_run_from_pipeline_package(pipeline_package_path, arguments, run_name, experiment_name, namespace)
def create_run_from_pipeline_package(self, pipeline_file: str, arguments: Mapping[str, str], run_name=None, experiment_name=None, namespace=None):
'''Runs pipeline on KFP-enabled Kubernetes cluster.
"""Runs pipeline on KFP-enabled Kubernetes cluster.
This command compiles the pipeline function, creates or gets an experiment and submits the pipeline for execution.
Args:
@ -623,10 +646,10 @@ class Client(object):
arguments: Arguments to the pipeline function provided as a dict.
run_name: Optional. Name of the run to be shown in the UI.
experiment_name: Optional. Name of the experiment to add the run to.
namespace: kubernetes namespace where the pipeline runs are created.
namespace: Kubernetes namespace where the pipeline runs are created.
For single user deployment, leave it as None;
For multi user, input a namespace where the user is authorized
'''
"""
class RunPipelineResult:
def __init__(self, client, run_info):
@ -657,15 +680,17 @@ class Client(object):
return RunPipelineResult(self, run_info)
def list_runs(self, page_token='', page_size=10, sort_by='', experiment_id=None, namespace=None):
"""List runs.
"""List runs, optionally can be filtered by experiment or namespace.
Args:
page_token: token for starting of the page.
page_size: size of the page.
sort_by: one of 'field_name', 'field_name desc'. For example, 'name desc'.
experiment_id: experiment id to filter upon
namespace: kubernetes namespace to filter upon.
page_token: Token for starting of the page.
page_size: Size of the page.
sort_by: One of 'field_name', 'field_name desc'. For example, 'name desc'.
experiment_id: Experiment id to filter upon
namespace: Kubernetes namespace to filter upon.
For single user deployment, leave it as None;
For multi user, input a namespace where the user is authorized.
Returns:
A response object including a list of experiments and next page token.
"""
@ -680,11 +705,13 @@ class Client(object):
def list_recurring_runs(self, page_token='', page_size=10, sort_by='', experiment_id=None):
"""List recurring runs.
Args:
page_token: token for starting of the page.
page_size: size of the page.
sort_by: one of 'field_name', 'field_name desc'. For example, 'name desc'.
experiment_id: experiment id to filter upon
page_token: Token for starting of the page.
page_size: Size of the page.
sort_by: One of 'field_name', 'field_name desc'. For example, 'name desc'.
experiment_id: Experiment id to filter upon.
Returns:
A response object including a list of recurring_runs and next page token.
"""
@ -696,10 +723,13 @@ class Client(object):
def get_recurring_run(self, job_id):
"""Get recurring_run details.
Args:
id of the recurring_run.
job_id: id of the recurring_run.
Returns:
A response object including details of a recurring_run.
Throws:
Exception if recurring_run is not found.
"""
@ -708,10 +738,13 @@ class Client(object):
def get_run(self, run_id):
"""Get run details.
Args:
id of the run.
run_id: id of the run.
Returns:
A response object including details of a run.
Throws:
Exception if run is not found.
"""
@ -719,14 +752,16 @@ class Client(object):
def wait_for_run_completion(self, run_id, timeout):
"""Waits for a run to complete.
Args:
run_id: run id, returned from run_pipeline.
timeout: timeout in seconds.
run_id: Run id, returned from run_pipeline.
timeout: Timeout in seconds.
Returns:
A run detail object: Most important fields are run and pipeline_runtime.
Raises:
TimeoutError: if the pipeline run failed to finish before the specified
timeout.
TimeoutError: if the pipeline run failed to finish before the specified timeout.
"""
status = 'Running:'
start_time = datetime.datetime.now()
@ -750,10 +785,12 @@ class Client(object):
def _get_workflow_json(self, run_id):
"""Get the workflow json.
Args:
run_id: run id, returned from run_pipeline.
Returns:
workflow: json workflow
workflow: Json workflow
"""
get_run_response = self._run_api.get_run(run_id=run_id)
workflow = get_run_response.pipeline_runtime.workflow_manifest
@ -767,10 +804,12 @@ class Client(object):
description: str = None,
):
"""Uploads the pipeline to the Kubeflow Pipelines cluster.
Args:
pipeline_package_path: Local path to the pipeline package.
pipeline_name: Optional. Name of the pipeline to be shown in the UI.
description: Optional. Description of the pipeline to be shown in the UI.
Returns:
Server response object containing pipleine id and other information.
"""
@ -822,10 +861,13 @@ class Client(object):
def get_pipeline(self, pipeline_id):
"""Get pipeline details.
Args:
id of the pipeline.
pipeline_id: id of the pipeline.
Returns:
A response object including details of a pipeline.
Throws:
Exception if pipeline is not found.
"""
@ -833,11 +875,13 @@ class Client(object):
def delete_pipeline(self, pipeline_id):
"""Delete pipeline.
Args:
id of the pipeline.
pipeline_id: id of the pipeline.
Returns:
Object. If the method is called asynchronously,
returns the request thread.
Object. If the method is called asynchronously, returns the request thread.
Throws:
Exception if pipeline is not found.
"""
@ -845,11 +889,13 @@ class Client(object):
def list_pipeline_versions(self, pipeline_id, page_token='', page_size=10, sort_by=''):
"""Lists pipeline versions.
Args:
pipeline_id: id of the pipeline to list versions
page_token: token for starting of the page.
page_size: size of the page.
sort_by: one of 'field_name', 'field_name des'. For example, 'name des'.
pipeline_id: Id of the pipeline to list versions
page_token: Token for starting of the page.
page_size: Size of the page.
sort_by: One of 'field_name', 'field_name des'. For example, 'name des'.
Returns:
A response object including a list of versions and next page token.
"""

View File

@ -15,9 +15,11 @@
def use_aws_secret(secret_name='aws-secret', aws_access_key_id_name='AWS_ACCESS_KEY_ID', aws_secret_access_key_name='AWS_SECRET_ACCESS_KEY'):
"""An operator that configures the container to use AWS credentials.
AWS doesn't create secret along with kubeflow deployment and it requires users
to manually create credential secret with proper permissions.
---
AWS doesn't create secret along with kubeflow deployment and it requires users
to manually create credential secret with proper permissions.
::
apiVersion: v1
kind: Secret
metadata:

View File

@ -15,11 +15,10 @@
def use_azure_secret(secret_name='azcreds'):
"""An operator that configures the container to use Azure user credentials.
The azcreds secret is created as part of the kubeflow deployment that
stores the client ID and secrets for the kubeflow azure service principal.
The azcreds secret is created as part of the kubeflow deployment that
stores the client ID and secrets for the kubeflow azure service principal.
With this service principal, the container has a range of Azure APIs to
access to.
With this service principal, the container has a range of Azure APIs to access to.
"""
def _use_azure_secret(task):

View File

@ -36,19 +36,19 @@ from ..dsl._ops_group import OpsGroup
class Compiler(object):
"""DSL Compiler.
"""DSL Compiler that compiles pipeline functions into workflow yaml.
Example:
How to use the compiler to construct workflow yaml::
It compiles DSL pipeline functions into workflow yaml. Example usage:
```python
@dsl.pipeline(
name='name',
description='description'
)
def my_pipeline(a: int = 1, b: str = "default value"):
...
@dsl.pipeline(
name='name',
description='description'
)
def my_pipeline(a: int = 1, b: str = "default value"):
...
Compiler().compile(my_pipeline, 'path/to/workflow.yaml')
```
Compiler().compile(my_pipeline, 'path/to/workflow.yaml')
"""
def _pipelineparam_full_name(self, param):
@ -875,17 +875,20 @@ class Compiler(object):
pipeline_description: Text=None,
params_list: List[dsl.PipelineParam]=None,
pipeline_conf: dsl.PipelineConf = None) -> Dict[Text, Any]:
""" Create workflow spec from pipeline function and specified pipeline
"""Create workflow spec from pipeline function and specified pipeline
params/metadata. Currently, the pipeline params are either specified in
the signature of the pipeline function or by passing a list of
dsl.PipelineParam. Conflict will cause ValueError.
:param pipeline_func: pipeline function where ContainerOps are invoked.
:param pipeline_name:
:param pipeline_description:
:param params_list: list of pipeline params to append to the pipeline.
:param pipeline_conf: PipelineConf instance. Can specify op transforms, image pull secrets and other pipeline-level configuration options. Overrides any configuration that may be set by the pipeline.
:return: workflow dict.
Args:
pipeline_func: Pipeline function where ContainerOps are invoked.
pipeline_name: The name of the pipeline to compile.
pipeline_description: The description of the pipeline.
params_list: List of pipeline params to append to the pipeline.
pipeline_conf: PipelineConf instance. Can specify op transforms, image pull secrets and other pipeline-level configuration options. Overrides any configuration that may be set by the pipeline.
Returns:
The created workflow dictionary.
"""
return self._create_workflow(pipeline_func, pipeline_name, pipeline_description, params_list, pipeline_conf)
@ -900,9 +903,9 @@ class Compiler(object):
"""Compile the given pipeline function into workflow yaml.
Args:
pipeline_func: pipeline functions with @dsl.pipeline decorator.
package_path: the output workflow tar.gz file path. for example, "~/a.tar.gz"
type_check: whether to enable the type check or not, default: False.
pipeline_func: Pipeline functions with @dsl.pipeline decorator.
package_path: The output workflow tar.gz file path. for example, "~/a.tar.gz"
type_check: Whether to enable the type check or not, default: False.
pipeline_conf: PipelineConf instance. Can specify op transforms, image pull secrets and other pipeline-level configuration options. Overrides any configuration that may be set by the pipeline.
"""
import kfp
@ -922,8 +925,7 @@ class Compiler(object):
Args:
workflow: Workflow spec of the pipline, dict.
package_path: file path to be written. If not specified, a yaml_text string
will be returned.
package_path: file path to be written. If not specified, a yaml_text string will be returned.
"""
yaml_text = dump_yaml(workflow)

View File

@ -33,26 +33,45 @@ class ComponentStore:
self._url_to_info_db = KeyValueStore(cache_dir=cache_base_dir / 'url_to_info')
def load_component_from_url(self, url):
"""Loads a component from a URL.
Args:
url: The url of the component specification.
Returns:
A factory function with a strongly-typed signature.
"""
return comp.load_component_from_url(url=url, auth=self._auth)
def load_component_from_file(self, path):
"""Loads a component from a path.
Args:
path: The path of the component specification.
Returns:
A factory function with a strongly-typed signature.
"""
return comp.load_component_from_file(path)
def load_component(self, name, digest=None, tag=None):
'''
"""
Loads component local file or URL and creates a task factory function
Search locations:
<local-search-path>/<name>/component.yaml
<url-search-prefix>/<name>/component.yaml
* :code:`<local-search-path>/<name>/component.yaml`
* :code:`<url-search-prefix>/<name>/component.yaml`
If the digest is specified, then the search locations are:
<local-search-path>/<name>/versions/sha256/<digest>
<url-search-prefix>/<name>/versions/sha256/<digest>
* :code:`<local-search-path>/<name>/versions/sha256/<digest>`
* :code:`<url-search-prefix>/<name>/versions/sha256/<digest>`
If the tag is specified, then the search locations are:
<local-search-path>/<name>/versions/tags/<digest>
<url-search-prefix>/<name>/versions/tags/<digest>
* :code:`<local-search-path>/<name>/versions/tags/<digest>`
* :code:`<url-search-prefix>/<name>/versions/tags/<digest>`
Args:
name: Component name used to search and load the component artifact containing the component definition.
@ -63,7 +82,7 @@ class ComponentStore:
Returns:
A factory function with a strongly-typed signature.
Once called with the required arguments, the factory constructs a pipeline task instance (ContainerOp).
'''
"""
#This function should be called load_task_factory since it returns a factory function.
#The real load_component function should produce an object with component properties (e.g. name, description, inputs/outputs).
#TODO: Change this function to return component spec object but it should be callable to construct tasks.
@ -78,10 +97,10 @@ class ComponentStore:
self,
component_ref: ComponentReference,
) -> ComponentReference:
'''Takes component_ref, finds the component spec and returns component_ref with .spec set to the component spec.
"""Takes component_ref, finds the component spec and returns component_ref with .spec set to the component spec.
See ComponentStore.load_component for the details of the search logic.
'''
"""
if component_ref.spec:
return component_ref
@ -144,7 +163,7 @@ class ComponentStore:
return comp._create_task_factory_from_component_spec(component_spec=component_ref.spec, component_ref=component_ref)
def search(self, name: str):
'''Searches for components by name in the configured component store.
"""Searches for components by name in the configured component store.
Prints the component name and URL for components that match the given name.
Only components on GitHub are currently supported.
@ -152,10 +171,11 @@ class ComponentStore:
Example::
kfp.components.ComponentStore.default_store.search('xgboost')
>>> Xgboost train https://raw.githubusercontent.com/.../components/XGBoost/Train/component.yaml
>>> Xgboost predict https://raw.githubusercontent.com/.../components/XGBoost/Predict/component.yaml
'''
# Returns results:
# Xgboost train https://raw.githubusercontent.com/.../components/XGBoost/Train/component.yaml
# Xgboost predict https://raw.githubusercontent.com/.../components/XGBoost/Predict/component.yaml
"""
self._refresh_component_cache()
for url in self._url_to_info_db.keys():
component_info = json.loads(self._url_to_info_db.try_get_value_bytes(url))

View File

@ -33,20 +33,19 @@ _default_component_name = 'Component'
def load_component(filename=None, url=None, text=None):
'''
Loads component from text, file or URL and creates a task factory function
"""Loads component from text, file or URL and creates a task factory function
Only one argument should be specified.
Args:
filename: Path of local file containing the component definition.
url: The URL of the component file data
url: The URL of the component file data.
text: A string containing the component file data.
Returns:
A factory function with a strongly-typed signature.
Once called with the required arguments, the factory constructs a pipeline task instance (ContainerOp).
'''
"""
#This function should be called load_task_factory since it returns a factory function.
#The real load_component function should produce an object with component properties (e.g. name, description, inputs/outputs).
#TODO: Change this function to return component spec object but it should be callable to construct tasks.
@ -64,8 +63,7 @@ def load_component(filename=None, url=None, text=None):
def load_component_from_url(url: str, auth=None):
'''
Loads component from URL and creates a task factory function
"""Loads component from URL and creates a task factory function
Args:
url: The URL of the component file data
@ -74,7 +72,7 @@ def load_component_from_url(url: str, auth=None):
Returns:
A factory function with a strongly-typed signature.
Once called with the required arguments, the factory constructs a pipeline task instance (ContainerOp).
'''
"""
component_spec = _load_component_spec_from_url(url, auth)
url = _fix_component_uri(url)
component_ref = ComponentReference(url=url)
@ -86,8 +84,7 @@ def load_component_from_url(url: str, auth=None):
def load_component_from_file(filename):
'''
Loads component from file and creates a task factory function
"""Loads component from file and creates a task factory function
Args:
filename: Path of local file containing the component definition.
@ -95,7 +92,7 @@ def load_component_from_file(filename):
Returns:
A factory function with a strongly-typed signature.
Once called with the required arguments, the factory constructs a pipeline task instance (ContainerOp).
'''
"""
component_spec = _load_component_spec_from_file(path=filename)
return _create_task_factory_from_component_spec(
component_spec=component_spec,
@ -104,8 +101,7 @@ def load_component_from_file(filename):
def load_component_from_text(text):
'''
Loads component from text and creates a task factory function
"""Loads component from text and creates a task factory function
Args:
text: A string containing the component file data.
@ -113,7 +109,7 @@ def load_component_from_text(text):
Returns:
A factory function with a strongly-typed signature.
Once called with the required arguments, the factory constructs a pipeline task instance (ContainerOp).
'''
"""
if text is None:
raise TypeError
component_spec = _load_component_spec_from_component_text(text)
@ -149,10 +145,10 @@ _COMPONENT_FILE_NAME_IN_ARCHIVE = 'component.yaml'
def _load_component_spec_from_yaml_or_zip_bytes(data: bytes):
'''Loads component spec from binary data.
"""Loads component spec from binary data.
The data can be a YAML file or a zip file with a component.yaml file inside.
'''
"""
import zipfile
import io
stream = io.BytesIO(data)

View File

@ -42,40 +42,37 @@ T = TypeVar('T')
# InputPath(list) or InputPath('JsonObject')
class InputPath:
'''When creating component from function, InputPath should be used as function parameter annotation to tell the system to pass the *data file path* to the function instead of passing the actual data.'''
'''When creating component from function, :class:`.InputPath` should be used as function parameter annotation to tell the system to pass the *data file path* to the function instead of passing the actual data.'''
def __init__(self, type=None):
self.type = type
class InputTextFile:
'''When creating component from function, InputTextFile should be used as function parameter annotation to tell the system to pass the *text data stream* object (`io.TextIOWrapper`) to the function instead of passing the actual data.'''
'''When creating component from function, :class:`.InputTextFile` should be used as function parameter annotation to tell the system to pass the *text data stream* object (`io.TextIOWrapper`) to the function instead of passing the actual data.'''
def __init__(self, type=None):
self.type = type
class InputBinaryFile:
'''When creating component from function, InputBinaryFile should be used as function parameter annotation to tell the system to pass the *binary data stream* object (`io.BytesIO`) to the function instead of passing the actual data.'''
'''When creating component from function, :class:`.InputBinaryFile` should be used as function parameter annotation to tell the system to pass the *binary data stream* object (`io.BytesIO`) to the function instead of passing the actual data.'''
def __init__(self, type=None):
self.type = type
#OutputFile[GcsPath[Gzipped[Text]]]
class OutputPath:
'''When creating component from function, OutputPath should be used as function parameter annotation to tell the system that the function wants to output data by writing it into a file with the given path instead of returning the data from the function.'''
'''When creating component from function, :class:`.OutputPath` should be used as function parameter annotation to tell the system that the function wants to output data by writing it into a file with the given path instead of returning the data from the function.'''
def __init__(self, type=None):
self.type = type
class OutputTextFile:
'''When creating component from function, OutputTextFile should be used as function parameter annotation to tell the system that the function wants to output data by writing it into a given text file stream (`io.TextIOWrapper`) instead of returning the data from the function.'''
'''When creating component from function, :class:`.OutputTextFile` should be used as function parameter annotation to tell the system that the function wants to output data by writing it into a given text file stream (`io.TextIOWrapper`) instead of returning the data from the function.'''
def __init__(self, type=None):
self.type = type
class OutputBinaryFile:
'''When creating component from function, OutputBinaryFile should be used as function parameter annotation to tell the system that the function wants to output data by writing it into a given binary file stream (`io.BytesIO`) instead of returning the data from the function.'''
'''When creating component from function, :class:`.OutputBinaryFile` should be used as function parameter annotation to tell the system that the function wants to output data by writing it into a given binary file stream (:code:`io.BytesIO`) instead of returning the data from the function.'''
def __init__(self, type=None):
self.type = type
@ -405,7 +402,7 @@ def _extract_component_interface(func) -> ComponentSpec:
def _func_to_component_spec(func, extra_code='', base_image : str = None, packages_to_install: List[str] = None, modules_to_capture: List[str] = None, use_code_pickling=False) -> ComponentSpec:
'''Takes a self-contained python function and converts it to component
'''Takes a self-contained python function and converts it to component.
Args:
func: Required. The function to be converted
@ -413,8 +410,11 @@ def _func_to_component_spec(func, extra_code='', base_image : str = None, packag
Note: The image can also be specified by decorating the function with the @python_component decorator. If different base images are explicitly specified in both places, an error is raised.
extra_code: Optional. Python source code that gets placed before the function code. Can be used as workaround to define types used in function signature.
packages_to_install: Optional. List of [versioned] python packages to pip install before executing the user function.
modules_to_capture: Optional. List of module names that will be captured (instead of just referencing) during the dependency scan. By default the func.__module__ is captured.
modules_to_capture: Optional. List of module names that will be captured (instead of just referencing) during the dependency scan. By default the :code:`func.__module__` is captured.
use_code_pickling: Specifies whether the function code should be captured using pickling as opposed to source code manipulation. Pickling has better support for capturing dependencies, but is sensitive to version mismatch between python in component creation environment and runtime image.
Returns:
A :py:class:`kfp.components.structures.ComponentSpec` instance.
'''
decorator_base_image = getattr(func, '_component_base_image', None)
if decorator_base_image is not None:
@ -650,11 +650,10 @@ def _func_to_component_dict(func, extra_code='', base_image: str = None, package
def func_to_component_text(func, extra_code='', base_image: str = None, packages_to_install: List[str] = None, modules_to_capture: List[str] = None, use_code_pickling=False):
'''
Converts a Python function to a component definition and returns its textual representation
'''Converts a Python function to a component definition and returns its textual representation.
Function docstring is used as component description. Argument and return annotations are used as component input/output types.
Function docstring is used as component description.
Argument and return annotations are used as component input/output types.
To declare a function with multiple return values, use the NamedTuple return annotation syntax::
from typing import NamedTuple
@ -667,7 +666,7 @@ def func_to_component_text(func, extra_code='', base_image: str = None, packages
base_image: Optional. Specify a custom Docker container image to use in the component. For lightweight components, the image needs to have python 3.5+. Default is python:3.7
extra_code: Optional. Extra code to add before the function code. Can be used as workaround to define types used in function signature.
packages_to_install: Optional. List of [versioned] python packages to pip install before executing the user function.
modules_to_capture: Optional. List of module names that will be captured (instead of just referencing) during the dependency scan. By default the func.__module__ is captured. The actual algorithm: Starting with the initial function, start traversing dependencies. If the dependecy.__module__ is in the modules_to_capture list then it's captured and it's dependencies are traversed. Otherwise the dependency is only referenced instead of capturing and its dependencies are not traversed.
modules_to_capture: Optional. List of module names that will be captured (instead of just referencing) during the dependency scan. By default the :code:`func.__module__` is captured. The actual algorithm: Starting with the initial function, start traversing dependencies. If the dependency.__module__ is in the modules_to_capture list then it's captured and it's dependencies are traversed. Otherwise the dependency is only referenced instead of capturing and its dependencies are not traversed.
use_code_pickling: Specifies whether the function code should be captured using pickling as opposed to source code manipulation. Pickling has better support for capturing dependencies, but is sensitive to version mismatch between python in component creation environment and runtime image.
Returns:
@ -685,11 +684,10 @@ def func_to_component_text(func, extra_code='', base_image: str = None, packages
def func_to_component_file(func, output_component_file, base_image: str = None, extra_code='', packages_to_install: List[str] = None, modules_to_capture: List[str] = None, use_code_pickling=False) -> None:
'''
Converts a Python function to a component definition and writes it to a file
'''Converts a Python function to a component definition and writes it to a file.
Function docstring is used as component description. Argument and return annotations are used as component input/output types.
Function docstring is used as component description.
Argument and return annotations are used as component input/output types.
To declare a function with multiple return values, use the NamedTuple return annotation syntax::
from typing import NamedTuple
@ -703,7 +701,7 @@ def func_to_component_file(func, output_component_file, base_image: str = None,
base_image: Optional. Specify a custom Docker container image to use in the component. For lightweight components, the image needs to have python 3.5+. Default is tensorflow/tensorflow:1.13.2-py3
extra_code: Optional. Extra code to add before the function code. Can be used as workaround to define types used in function signature.
packages_to_install: Optional. List of [versioned] python packages to pip install before executing the user function.
modules_to_capture: Optional. List of module names that will be captured (instead of just referencing) during the dependency scan. By default the func.__module__ is captured. The actual algorithm: Starting with the initial function, start traversing dependencies. If the dependecy.__module__ is in the modules_to_capture list then it's captured and it's dependencies are traversed. Otherwise the dependency is only referenced instead of capturing and its dependencies are not traversed.
modules_to_capture: Optional. List of module names that will be captured (instead of just referencing) during the dependency scan. By default the :code:`func.__module__` is captured. The actual algorithm: Starting with the initial function, start traversing dependencies. If the :code:`dependency.__module__` is in the :code:`modules_to_capture` list then it's captured and it's dependencies are traversed. Otherwise the dependency is only referenced instead of capturing and its dependencies are not traversed.
use_code_pickling: Specifies whether the function code should be captured using pickling as opposed to source code manipulation. Pickling has better support for capturing dependencies, but is sensitive to version mismatch between python in component creation environment and runtime image.
'''
@ -720,12 +718,11 @@ def func_to_component_file(func, output_component_file, base_image: str = None,
def func_to_container_op(func, output_component_file=None, base_image: str = None, extra_code='', packages_to_install: List[str] = None, modules_to_capture: List[str] = None, use_code_pickling=False):
'''
Converts a Python function to a component and returns a task (ContainerOp) factory
'''Converts a Python function to a component and returns a task (:class:`kfp.dsl.ContainerOp`) factory.
Function docstring is used as component description.
Argument and return annotations are used as component input/output types.
To declare a function with multiple return values, use the NamedTuple return annotation syntax::
Function docstring is used as component description. Argument and return annotations are used as component input/output types.
To declare a function with multiple return values, use the :code:`NamedTuple` return annotation syntax::
from typing import NamedTuple
def add_multiply_two_numbers(a: float, b: float) -> NamedTuple('DummyName', [('sum', float), ('product', float)]):
@ -738,12 +735,12 @@ def func_to_container_op(func, output_component_file=None, base_image: str = Non
output_component_file: Optional. Write a component definition to a local file. Can be used for sharing.
extra_code: Optional. Extra code to add before the function code. Can be used as workaround to define types used in function signature.
packages_to_install: Optional. List of [versioned] python packages to pip install before executing the user function.
modules_to_capture: Optional. List of module names that will be captured (instead of just referencing) during the dependency scan. By default the func.__module__ is captured. The actual algorithm: Starting with the initial function, start traversing dependencies. If the dependecy.__module__ is in the modules_to_capture list then it's captured and it's dependencies are traversed. Otherwise the dependency is only referenced instead of capturing and its dependencies are not traversed.
modules_to_capture: Optional. List of module names that will be captured (instead of just referencing) during the dependency scan. By default the :code:`func.__module__` is captured. The actual algorithm: Starting with the initial function, start traversing dependencies. If the :code:`dependency.__module__` is in the :code:`modules_to_capture` list then it's captured and it's dependencies are traversed. Otherwise the dependency is only referenced instead of capturing and its dependencies are not traversed.
use_code_pickling: Specifies whether the function code should be captured using pickling as opposed to source code manipulation. Pickling has better support for capturing dependencies, but is sensitive to version mismatch between python in component creation environment and runtime image.
Returns:
A factory function with a strongly-typed signature taken from the python function.
Once called with the required arguments, the factory constructs a pipeline task instance (ContainerOp) that can run the original function in a container.
Once called with the required arguments, the factory constructs a pipeline task instance (:class:`kfp.dsl.ContainerOp`) that can run the original function in a container.
'''
component_spec = _func_to_component_spec(
@ -769,98 +766,95 @@ def create_component_from_func(
base_image: str = None,
packages_to_install: List[str] = None,
):
'''
Converts a Python function to a component and returns a task factory (a function that accepts arguments and returns a task object).
Function name and docstring are used as component name and description.
Argument and return annotations are used as component input/output types.
Example::
def add(a: float, b: float) -> float:
"""Returns sum of two arguments"""
return a + b
# add_op is a task factory function that creates a task object when given arguments
add_op = create_component_from_func(
func=add,
base_image='python:3.7', # Optional
output_component_file='add.component.yaml', # Optional
packages_to_install=['pandas==0.24'], # Optional
)
# The component spec can be accessed through the .component_spec attribute:
add_op.component_spec.save('add.component.yaml')
# The component function can be called with arguments to create a task:
add_task = add_op(1, 3)
# The resulting task has output references, corresponding to the component outputs.
# When the function only has a single anonymous return value, the output name is "Output":
sum_output_ref = add_task.outputs['Output']
# These task output references can be passed to other component functions, constructing a computation graph:
task2 = add_op(sum_output_ref, 5)
`create_component_from_func` function can also be used as decorator::
@create_component_from_func
def add_op(a: float, b: float) -> float:
"""Returns sum of two arguments"""
return a + b
To declare a function with multiple return values, use the NamedTuple return annotation syntax::
from typing import NamedTuple
def add_multiply_two_numbers(a: float, b: float) -> NamedTuple('Outputs', [('sum', float), ('product', float)]):
"""Returns sum and product of two arguments"""
return (a + b, a * b)
add_multiply_op = create_component_from_func(add_multiply_two_numbers)
# The component function can be called with arguments to create a task:
add_multiply_task = add_multiply_op(1, 3)
# The resulting task has output references, corresponding to the component outputs:
sum_output_ref = add_multiply_task.outputs['sum']
# These task output references can be passed to other component functions, constructing a computation graph:
task2 = add_multiply_op(sum_output_ref, 5)
Bigger data should be read from files and written to files.
Use the `InputPath` parameter annotation to tell the system that the function wants to consume the corresponding input data as a file. The system will download the data, write it to a local file and then pass the **path** of that file to the function.
Use the `OutputPath` parameter annotation to tell the system that the function wants to produce the corresponding output data as a file. The system will prepare and pass the **path** of a file where the function should write the output data. After the function exits, the system will upload the data to the storage system so that it can be passed to downstream components.
You can specify the type of the consumed/produced data by specifying the type argument to `InputPath` and `OutputPath`. The type can be a python type or an arbitrary type name string. `OutputPath('CatBoostModel')` means that the function states that the data it has written to a file has type 'CatBoostModel'. `InputPath('CatBoostModel')` means that the function states that it expect the data it reads from a file to have type 'CatBoostModel'. When the pipeline author connects inputs to outputs the system checks whether the types match.
Every kind of data can be consumed as a file input. Conversely, bigger data should not be consumed by value as all value inputs pass through the command line.
Example of a component function declaring file input and output::
def catboost_train_classifier(
training_data_path: InputPath('CSV'), # Path to input data file of type "CSV"
trained_model_path: OutputPath('CatBoostModel'), # Path to output data file of type "CatBoostModel"
number_of_trees: int = 100, # Small output of type "Integer"
) -> NamedTuple('Outputs', [
('Accuracy', float), # Small output of type "Float"
('Precision', float), # Small output of type "Float"
('JobUri', 'URI'), # Small output of type "URI"
]):
"""Trains CatBoost classification model"""
...
return (accuracy, precision, recall)
'''Converts a Python function to a component and returns a task factory (a function that accepts arguments and returns a task object).
Args:
func: The python function to convert
base_image: Optional. Specify a custom Docker container image to use in the component. For lightweight components, the image needs to have python 3.5+. Default is the python image corresponding to the current python environment.
output_component_file: Optional. Write a component definition to a local file. The produced component file can be loaded back by calling `load_component_from_file` or `load_component_from_uri`.
output_component_file: Optional. Write a component definition to a local file. The produced component file can be loaded back by calling :code:`load_component_from_file` or :code:`load_component_from_uri`.
packages_to_install: Optional. List of [versioned] python packages to pip install before executing the user function.
Returns:
A factory function with a strongly-typed signature taken from the python function.
Once called with the required arguments, the factory constructs a task instance that can run the original function in a container.
Examples:
The function name and docstring are used as component name and description. Argument and return annotations are used as component input/output types::
def add(a: float, b: float) -> float:
"""Returns sum of two arguments"""
return a + b
# add_op is a task factory function that creates a task object when given arguments
add_op = create_component_from_func(
func=add,
base_image='python:3.7', # Optional
output_component_file='add.component.yaml', # Optional
packages_to_install=['pandas==0.24'], # Optional
)
# The component spec can be accessed through the .component_spec attribute:
add_op.component_spec.save('add.component.yaml')
# The component function can be called with arguments to create a task:
add_task = add_op(1, 3)
# The resulting task has output references, corresponding to the component outputs.
# When the function only has a single anonymous return value, the output name is "Output":
sum_output_ref = add_task.outputs['Output']
# These task output references can be passed to other component functions, constructing a computation graph:
task2 = add_op(sum_output_ref, 5)
:code:`create_component_from_func` function can also be used as decorator::
@create_component_from_func
def add_op(a: float, b: float) -> float:
"""Returns sum of two arguments"""
return a + b
To declare a function with multiple return values, use the :code:`NamedTuple` return annotation syntax::
from typing import NamedTuple
def add_multiply_two_numbers(a: float, b: float) -> NamedTuple('Outputs', [('sum', float), ('product', float)]):
"""Returns sum and product of two arguments"""
return (a + b, a * b)
add_multiply_op = create_component_from_func(add_multiply_two_numbers)
# The component function can be called with arguments to create a task:
add_multiply_task = add_multiply_op(1, 3)
# The resulting task has output references, corresponding to the component outputs:
sum_output_ref = add_multiply_task.outputs['sum']
# These task output references can be passed to other component functions, constructing a computation graph:
task2 = add_multiply_op(sum_output_ref, 5)
Bigger data should be read from files and written to files.
Use the :py:class:`kfp.components.InputPath` parameter annotation to tell the system that the function wants to consume the corresponding input data as a file. The system will download the data, write it to a local file and then pass the **path** of that file to the function.
Use the :py:class:`kfp.components.OutputPath` parameter annotation to tell the system that the function wants to produce the corresponding output data as a file. The system will prepare and pass the **path** of a file where the function should write the output data. After the function exits, the system will upload the data to the storage system so that it can be passed to downstream components.
You can specify the type of the consumed/produced data by specifying the type argument to :py:class:`kfp.components.InputPath` and :py:class:`kfp.components.OutputPath`. The type can be a python type or an arbitrary type name string. :code:`OutputPath('CatBoostModel')` means that the function states that the data it has written to a file has type :code:`CatBoostModel`. :code:`InputPath('CatBoostModel')` means that the function states that it expect the data it reads from a file to have type 'CatBoostModel'. When the pipeline author connects inputs to outputs the system checks whether the types match.
Every kind of data can be consumed as a file input. Conversely, bigger data should not be consumed by value as all value inputs pass through the command line.
Example of a component function declaring file input and output::
def catboost_train_classifier(
training_data_path: InputPath('CSV'), # Path to input data file of type "CSV"
trained_model_path: OutputPath('CatBoostModel'), # Path to output data file of type "CatBoostModel"
number_of_trees: int = 100, # Small output of type "Integer"
) -> NamedTuple('Outputs', [
('Accuracy', float), # Small output of type "Float"
('Precision', float), # Small output of type "Float"
('JobUri', 'URI'), # Small output of type "URI"
]):
"""Trains CatBoost classification model"""
...
return (accuracy, precision, recall)
'''
component_spec = _func_to_component_spec(

View File

@ -33,11 +33,16 @@ def create_graph_component_from_pipeline_func(
output_component_file: str = None,
embed_component_specs: bool = False,
) -> Callable:
'''Experimental! Creates graph component definition from a python pipeline function. The component file can be published for sharing.
'''Creates graph component definition from a python pipeline function. The component file can be published for sharing.
Pipeline function is a function that only calls component functions and passes outputs to inputs.
This feature is experimental and lacks support for some of the DSL features like conditions and loops.
Only pipelines consisting of loaded components or python components are currently supported (no manually created ContainerOps or ResourceOps).
.. warning::
Please note this feature is considered experimental!
Args:
pipeline_func: Python function to convert
output_component_file: Path of the file where the component definition will be written. The `component.yaml` file can then be published for sharing.

View File

@ -64,11 +64,12 @@ def _generate_dockerfile_text(context_dir: str, dockerfile_path: str, base_image
def build_image_from_working_dir(image_name: str = None, working_dir: str = None, file_filter_re: str = r'.*\.py', timeout: int = 1000, base_image: str = None, builder: ContainerBuilder = None) -> str:
'''build_image_from_working_dir builds and pushes a new container image that captures the current python working directory.
'''Builds and pushes a new container image that captures the current python working directory.
This function recursively scans the working directory and captures the following files in the container image context:
* requirements.txt files
* all python files (can be overridden by passing a different `file_filter_re` argument)
* :code:`requirements.txt` files
* All python files (can be overridden by passing a different `file_filter_re` argument)
The function generates Dockerfile that starts from a python container image, install packages from requirements.txt (if present) and copies all the captured python files to the container image.
The Dockerfile can be overridden by placing a custom Dockerfile in the root of the working directory.
@ -79,17 +80,19 @@ def build_image_from_working_dir(image_name: str = None, working_dir: str = None
file_filter_re: Optional. A regular expression that will be used to decide which files to include in the container building context.
timeout: Optional. The image building timeout in seconds.
base_image: Optional. The container image to use as the base for the new image. If not set, the Google Deep Learning Tensorflow CPU image will be used.
builder: Optional. An instance of ContainerBuilder or compatible class that will be used to build the image.
builder: Optional. An instance of :py:class:`kfp.containers.ContainerBuilder` or compatible class that will be used to build the image.
The default builder uses "kubeflow-pipelines-container-builder" service account in "kubeflow" namespace. It works with Kubeflow Pipelines clusters installed in "kubeflow" namespace using Google Cloud Marketplace or Standalone with version > 0.4.0.
If your Kubeflow Pipelines is installed in a different namespace, you should use ContainerBuilder(namespace='<your-kfp-namespace>', ...).
Depending on how you installed Kubeflow Pipelines, you need to configure your ContainerBuilder instance's namespace and service_account:
For clusters installed with Kubeflow >= 0.7, use ContainerBuidler(namespace='<your-user-namespace>', service_account='default-editor', ...). You can omit the namespace if you use kfp sdk from in-cluster notebook, it uses notebook namespace by default.
For clusters installed with Kubeflow < 0.7, use ContainerBuilder(service_account='default', ...).
For clusters installed using Google Cloud Marketplace or Standalone with version <= 0.4.0, use ContainerBuilder(namespace='<your-kfp-namespace>' service_account='default')
You may refer to https://www.kubeflow.org/docs/pipelines/installation/overview/ for more details about different installation options.
If your Kubeflow Pipelines is installed in a different namespace, you should use :code:`ContainerBuilder(namespace='<your-kfp-namespace>', ...)`.
Depending on how you installed Kubeflow Pipelines, you need to configure your :code:`ContainerBuilder` instance's namespace and service_account:
* For clusters installed with Kubeflow >= 0.7, use :code:`ContainerBuilder(namespace='<your-user-namespace>', service_account='default-editor', ...)`. You can omit the namespace if you use kfp sdk from in-cluster notebook, it uses notebook namespace by default.
* For clusters installed with Kubeflow < 0.7, use :code:`ContainerBuilder(service_account='default', ...)`.
* For clusters installed using Google Cloud Marketplace or Standalone with version <= 0.4.0, use :code:`ContainerBuilder(namespace='<your-kfp-namespace>' service_account='default')`
You may refer to `installation guide <https://www.kubeflow.org/docs/pipelines/installation/overview/>`_ for more details about different installation options.
Returns:
The full name of the container image including the hash digest. E.g. gcr.io/my-org/my-image@sha256:86c1...793c.
The full name of the container image including the hash digest. E.g. :code:`gcr.io/my-org/my-image@sha256:86c1...793c`.
'''
current_dir = working_dir or os.getcwd()
with tempfile.TemporaryDirectory() as context_dir:

View File

@ -170,8 +170,7 @@ def _configure_logger(logger):
@deprecated(version='0.1.32', reason='`build_python_component` is deprecated. Use `kfp.containers.build_image_from_working_dir` + `kfp.components.func_to_container_op` instead.')
def build_python_component(component_func, target_image, base_image=None, dependency=[], staging_gcs_path=None, timeout=600, namespace=None, target_component_file=None, python_version='python3'):
""" build_component automatically builds a container image for the component_func
based on the base_image and pushes to the target_image.
"""build_component automatically builds a container image for the component_func based on the base_image and pushes to the target_image.
Args:
component_func (python function): The python function to build components upon
@ -184,6 +183,7 @@ def build_python_component(component_func, target_image, base_image=None, depend
job is running on GKE and value is None the underlying functions will use the default namespace from GKE. .
dependency (list): a list of VersionedDependency, which includes the package name and versions, default is empty
python_version (str): choose python2 or python3, default is python3
Raises:
ValueError: The function is not decorated with python_component decorator or the python_version is neither python2 nor python3
"""
@ -271,7 +271,7 @@ def build_python_component(component_func, target_image, base_image=None, depend
@deprecated(version='0.1.32', reason='`build_docker_image` is deprecated. Use `kfp.containers.build_image_from_working_dir` instead.')
def build_docker_image(staging_gcs_path, target_image, dockerfile_path, timeout=600, namespace=None):
""" build_docker_image automatically builds a container image based on the specification in the dockerfile and
"""build_docker_image automatically builds a container image based on the specification in the dockerfile and
pushes to the target_image.
Args:

View File

@ -37,16 +37,16 @@ def python_component(name, description=None, base_image=None, target_component_f
Returns:
The same function (with some metadata fields set).
Usage:
```python
@dsl.python_component(
name='my awesome component',
description='Come, Let's play',
base_image='tensorflow/tensorflow:1.11.0-py3',
)
def my_component(a: str, b: int) -> str:
...
```
Example:
::
@dsl.python_component(
name='my awesome component',
description='Come, Let's play',
base_image='tensorflow/tensorflow:1.11.0-py3',
)
def my_component(a: str, b: int) -> str:
...
"""
def _python_component(func):
func._component_human_name = name
@ -64,11 +64,12 @@ def component(func):
"""Decorator for component functions that returns a ContainerOp.
This is useful to enable type checking in the DSL compiler
Usage:
```python
@dsl.component
def foobar(model: TFModel(), step: MLStep()):
return dsl.ContainerOp()
Example:
::
@dsl.component
def foobar(model: TFModel(), step: MLStep()):
return dsl.ContainerOp()
"""
from functools import wraps
@wraps(func)
@ -103,19 +104,20 @@ def graph_component(func):
"""Decorator for graph component functions.
This decorator returns an ops_group.
Usage:
```python
# Warning: caching is tricky when recursion is involved. Please be careful and
# set proper max_cache_staleness in case of infinite loop.
import kfp.dsl as dsl
@dsl.graph_component
def flip_component(flip_result):
print_flip = PrintOp(flip_result)
flipA = FlipCoinOp().after(print_flip)
flipA.execution_options.caching_strategy.max_cache_staleness = "P0D"
with dsl.Condition(flipA.output == 'heads'):
flip_component(flipA.output)
return {'flip_result': flipA.output}
Example:
::
# Warning: caching is tricky when recursion is involved. Please be careful and
# set proper max_cache_staleness in case of infinite loop.
import kfp.dsl as dsl
@dsl.graph_component
def flip_component(flip_result):
print_flip = PrintOp(flip_result)
flipA = FlipCoinOp().after(print_flip)
flipA.execution_options.caching_strategy.max_cache_staleness = "P0D"
with dsl.Condition(flipA.output == 'heads'):
flip_component(flipA.output)
return {'flip_result': flipA.output}
"""
from functools import wraps
@wraps(func)

View File

@ -106,28 +106,26 @@ class Container(V1Container):
required property).
See:
- https://github.com/kubernetes-client/python/blob/master/kubernetes/client/models/v1_container.py
- https://github.com/argoproj/argo/blob/master/api/openapi-spec/swagger.json
* https://github.com/kubernetes-client/python/blob/master/kubernetes/client/models/v1_container.py
* https://github.com/argoproj/argo/blob/master/api/openapi-spec/swagger.json
Example:
::
from kfp.dsl import ContainerOp
from kubernetes.client.models import V1EnvVar
from kfp.dsl import ContainerOp
from kubernetes.client.models import V1EnvVar
# creates a operation
op = ContainerOp(name='bash-ops',
image='busybox:latest',
command=['echo'],
arguments=['$MSG'])
# creates a operation
op = ContainerOp(name='bash-ops',
image='busybox:latest',
command=['echo'],
arguments=['$MSG'])
# returns a `Container` object from `ContainerOp`
# and add an environment variable to `Container`
op.container.add_env_variable(V1EnvVar(name='MSG', value='hello world'))
# returns a `Container` object from `ContainerOp`
# and add an environment variable to `Container`
op.container.add_env_variable(V1EnvVar(name='MSG', value='hello world'))
"""
"""
Attributes:
attribute_map (dict): The key is attribute name
and the value is json key in definition.
@ -556,7 +554,23 @@ class UserContainer(Container):
See https://github.com/argoproj/argo/blob/master/api/openapi-spec/swagger.json
Example
Args:
name: unique name for the user container
image: image to use for the user container, e.g. redis:alpine
command: entrypoint array. Not executed within a shell.
args: arguments to the entrypoint.
mirror_volume_mounts: MirrorVolumeMounts will mount the same
volumes specified in the main container to the container (including artifacts),
at the same mountPaths. This enables dind daemon to partially see the same
filesystem as the main container in order to use features such as docker
volume binding
**kwargs: keyword arguments available for `Container`
Attributes:
swagger_types (dict): The key is attribute name
and the value is attribute type.
Example::
from kfp.dsl import ContainerOp, UserContainer
@ -564,14 +578,6 @@ class UserContainer(Container):
op = (ContainerOp(name='foo-op', image='busybox:latest')
.add_initContainer(
UserContainer(name='redis', image='redis:alpine')))
"""
"""
Attributes:
swagger_types (dict): The key is attribute name
and the value is attribute type.
attribute_map (dict): The key is attribute name
and the value is json key in definition.
"""
# adds `mirror_volume_mounts` to `UserContainer` swagger definition
# NOTE inherits definition from `V1Container` rather than `Container`
@ -592,21 +598,6 @@ class UserContainer(Container):
args: StringOrStringList = None,
mirror_volume_mounts: bool = None,
**kwargs):
"""Creates a new instance of `UserContainer`.
Args:
name {str}: unique name for the user container
image {str}: image to use for the user container, e.g. redis:alpine
command {StringOrStringList}: entrypoint array. Not executed within a shell.
args {StringOrStringList}: arguments to the entrypoint.
mirror_volume_mounts {bool}: MirrorVolumeMounts will mount the same
volumes specified in the main container to the container (including artifacts),
at the same mountPaths. This enables dind daemon to partially see the same
filesystem as the main container in order to use features such as docker
volume binding
**kwargs: keyword arguments available for `Container`
"""
super().__init__(
name=name,
image=image,
@ -638,6 +629,20 @@ class UserContainer(Container):
class Sidecar(UserContainer):
"""Creates a new instance of `Sidecar`.
Args:
name: unique name for the sidecar container
image: image to use for the sidecar container, e.g. redis:alpine
command: entrypoint array. Not executed within a shell.
args: arguments to the entrypoint.
mirror_volume_mounts: MirrorVolumeMounts will mount the same
volumes specified in the main container to the sidecar (including artifacts),
at the same mountPaths. This enables dind daemon to partially see the same
filesystem as the main container in order to use features such as docker
volume binding
**kwargs: keyword arguments available for `Container`
"""
def __init__(self,
name: str,
@ -646,21 +651,6 @@ class Sidecar(UserContainer):
args: StringOrStringList = None,
mirror_volume_mounts: bool = None,
**kwargs):
"""Creates a new instance of `Sidecar`.
Args:
name {str}: unique name for the sidecar container
image {str}: image to use for the sidecar container, e.g. redis:alpine
command {StringOrStringList}: entrypoint array. Not executed within a shell.
args {StringOrStringList}: arguments to the entrypoint.
mirror_volume_mounts {bool}: MirrorVolumeMounts will mount the same
volumes specified in the main container to the sidecar (including artifacts),
at the same mountPaths. This enables dind daemon to partially see the same
filesystem as the main container in order to use features such as docker
volume binding
**kwargs: keyword arguments available for `Container`
"""
super().__init__(
name=name,
image=image,
@ -680,6 +670,17 @@ _register_op_handler = _make_hash_based_id_for_op
class BaseOp(object):
"""Base operator
Args:
name: the name of the op. It does not have to be unique within a pipeline
because the pipeline will generates a unique new name in case of conflicts.
init_containers: the list of `UserContainer` objects describing the InitContainer
to deploy before the `main` container.
sidecars: the list of `Sidecar` objects describing the sidecar containers to deploy
together with the `main` container.
is_exit_handler: Deprecated.
"""
# list of attributes that might have pipeline params - used to generate
# the input parameters during compilation.
@ -695,18 +696,6 @@ class BaseOp(object):
init_containers: List[UserContainer] = None,
sidecars: List[Sidecar] = None,
is_exit_handler: bool = False):
"""Create a new instance of BaseOp
Args:
name: the name of the op. It does not have to be unique within a pipeline
because the pipeline will generates a unique new name in case of conflicts.
init_containers: the list of `UserContainer` objects describing the InitContainer
to deploy before the `main` container.
sidecars: the list of `Sidecar` objects describing the sidecar containers to deploy
together with the `main` container.
is_exit_handler: Deprecated.
"""
valid_name_regex = r'^[A-Za-z][A-Za-z0-9\s_-]*$'
if not re.match(valid_name_regex, name):
raise ValueError(
@ -772,14 +761,15 @@ class BaseOp(object):
"""Applies a modifier function to self. The function should return the passed object.
This is needed to chain "extention methods" to this class.
Example:
from kfp.gcp import use_gcp_secret
task = (
train_op(...)
.set_memory_request('1G')
.apply(use_gcp_secret('user-gcp-sa'))
.set_memory_limit('2G')
)
Example::
from kfp.gcp import use_gcp_secret
task = (
train_op(...)
.set_memory_request('1G')
.apply(use_gcp_secret('user-gcp-sa'))
.set_memory_limit('2G')
)
"""
return mod_func(self) or self
@ -813,16 +803,20 @@ class BaseOp(object):
def add_affinity(self, affinity: V1Affinity):
"""Add K8s Affinity
Args:
affinity: Kubernetes affinity
For detailed spec, check affinity definition
https://github.com/kubernetes-client/python/blob/master/kubernetes/client/models/v1_affinity.py
example: V1Affinity(
node_affinity=V1NodeAffinity(
required_during_scheduling_ignored_during_execution=V1NodeSelector(
node_selector_terms=[V1NodeSelectorTerm(
match_expressions=[V1NodeSelectorRequirement(
key='beta.kubernetes.io/instance-type', operator='In', values=['p2.xlarge'])])])))
Example::
V1Affinity(
node_affinity=V1NodeAffinity(
required_during_scheduling_ignored_during_execution=V1NodeSelector(
node_selector_terms=[V1NodeSelectorTerm(
match_expressions=[V1NodeSelectorRequirement(
key='beta.kubernetes.io/instance-type', operator='In', values=['p2.xlarge'])])])))
"""
self.affinity = affinity
return self
@ -921,15 +915,44 @@ class InputArgumentPath:
class ContainerOp(BaseOp):
"""
Represents an op implemented by a container image.
"""Represents an op implemented by a container image.
Args:
name: the name of the op. It does not have to be unique within a pipeline
because the pipeline will generates a unique new name in case of conflicts.
image: the container image name, such as 'python:3.5-jessie'
command: the command to run in the container.
If None, uses default CMD in defined in container.
arguments: the arguments of the command. The command can include "%s" and supply
a PipelineParam as the string replacement. For example, ('echo %s' % input_param).
At container run time the argument will be 'echo param_value'.
init_containers: the list of `UserContainer` objects describing the InitContainer
to deploy before the `main` container.
sidecars: the list of `Sidecar` objects describing the sidecar containers to deploy
together with the `main` container.
container_kwargs: the dict of additional keyword arguments to pass to the
op's `Container` definition.
artifact_argument_paths: Optional. Maps input artifact arguments (values or references) to the local file paths where they'll be placed.
At pipeline run time, the value of the artifact argument is saved to a local file with specified path.
This parameter is only needed when the input file paths are hard-coded in the program.
Otherwise it's better to pass input artifact placement paths by including artifact arguments in the command-line using the InputArgumentPath class instances.
file_outputs: Maps output labels to local file paths. At pipeline run time,
the value of a PipelineParam is saved to its corresponding local file. It's
one way for outside world to receive outputs of the container.
output_artifact_paths: Maps output artifact labels to local artifact file paths.
It has the following default artifact paths during compile time.
{'mlpipeline-ui-metadata': '/mlpipeline-ui-metadata.json',
'mlpipeline-metrics': '/mlpipeline-metrics.json'}
is_exit_handler: Deprecated. This is no longer needed.
pvolumes: Dictionary for the user to match a path on the op's fs with a
V1Volume or it inherited type.
E.g {"/my/path": vol, "/mnt": other_op.pvolumes["/output"]}.
Example::
from kfp import dsl
from kubernetes.client.models import V1EnvVar, V1SecretKeySelector
@dsl.pipeline(
name='foo',
description='hello world')
@ -952,7 +975,6 @@ class ContainerOp(BaseOp):
# add sidecar with parameterized image tag
# sidecar follows the argo sidecar swagger spec
op.add_sidecar(dsl.Sidecar('redis', 'redis:%s' % tag).set_image_pull_policy('Always'))
"""
# list of attributes that might have pipeline params - used to generate
@ -977,40 +999,6 @@ class ContainerOp(BaseOp):
is_exit_handler=False,
pvolumes: Dict[str, V1Volume] = None,
):
"""Create a new instance of ContainerOp.
Args:
name: the name of the op. It does not have to be unique within a pipeline
because the pipeline will generates a unique new name in case of conflicts.
image: the container image name, such as 'python:3.5-jessie'
command: the command to run in the container.
If None, uses default CMD in defined in container.
arguments: the arguments of the command. The command can include "%s" and supply
a PipelineParam as the string replacement. For example, ('echo %s' % input_param).
At container run time the argument will be 'echo param_value'.
init_containers: the list of `UserContainer` objects describing the InitContainer
to deploy before the `main` container.
sidecars: the list of `Sidecar` objects describing the sidecar containers to deploy
together with the `main` container.
container_kwargs: the dict of additional keyword arguments to pass to the
op's `Container` definition.
artifact_argument_paths: Optional. Maps input artifact arguments (values or references) to the local file paths where they'll be placed.
At pipeline run time, the value of the artifact argument is saved to a local file with specified path.
This parameter is only needed when the input file paths are hard-coded in the program.
Otherwise it's better to pass input artifact placement paths by including artifact arguments in the command-line using the InputArgumentPath class instances.
file_outputs: Maps output labels to local file paths. At pipeline run time,
the value of a PipelineParam is saved to its corresponding local file. It's
one way for outside world to receive outputs of the container.
output_artifact_paths: Maps output artifact labels to local artifact file paths.
It has the following default artifact paths during compile time.
{'mlpipeline-ui-metadata': '/mlpipeline-ui-metadata.json',
'mlpipeline-metrics': '/mlpipeline-metrics.json'}
is_exit_handler: Deprecated. This is no longer needed.
pvolumes: Dictionary for the user to match a path on the op's fs with a
V1Volume or it inherited type.
E.g {"/my/path": vol, "/mnt": other_op.pvolumes["/output"]}.
"""
super().__init__(name=name, init_containers=init_containers, sidecars=sidecars, is_exit_handler=is_exit_handler)
if not ContainerOp._DISABLE_REUSABLE_COMPONENT_WARNING and '--component_launcher_class_path' not in (arguments or []):
@ -1149,14 +1137,15 @@ class ContainerOp(BaseOp):
`io.argoproj.workflow.v1alpha1.Template`. Can be used to update the
container configurations.
Example:
Example::
import kfp.dsl as dsl
from kubernetes.client.models import V1EnvVar
@dsl.pipeline(name='example_pipeline')
def immediate_value_pipeline():
op1 = (dsl.ContainerOp(name='example', image='nginx:alpine')
.container
.container
.add_env_variable(V1EnvVar(name='HOST', value='foo.bar'))
.add_env_variable(V1EnvVar(name='PORT', value='80'))
.parent # return the parent `ContainerOp`
@ -1165,8 +1154,9 @@ class ContainerOp(BaseOp):
return self._container
def _set_metadata(self, metadata):
'''_set_metadata passes the containerop the metadata information
'''Passes the ContainerOp the metadata information
and configures the right output
Args:
metadata (ComponentSpec): component metadata
'''

View File

@ -20,11 +20,13 @@ from ..components.structures import ComponentSpec, InputSpec, OutputSpec
def _annotation_to_typemeta(annotation):
'''_annotation_to_type_meta converts an annotation to a type structure
Args:
annotation(BaseType/str/dict): input/output annotations
BaseType: registered in kfp.dsl.types
str: either a string of a dict serialization or a string of the type name
dict: type name and properties. note that the properties values can be dict.
Returns:
dict or string representing the type
'''

View File

@ -30,6 +30,7 @@ class OpsGroup(object):
def __init__(self, group_type: str, name: str=None, parallelism: int=None):
"""Create a new instance of OpsGroup.
Args:
group_type (str): one of 'pipeline', 'exit_handler', 'condition', 'for_loop', and 'graph'.
name (str): name of the opsgroup
@ -50,11 +51,13 @@ class OpsGroup(object):
@staticmethod
def _get_matching_opsgroup_already_in_pipeline(group_type, name):
"""retrieves the opsgroup when the pipeline already contains it.
"""Retrieves the opsgroup when the pipeline already contains it.
the opsgroup might be already in the pipeline in case of recursive calls.
Args:
group_type (str): one of 'pipeline', 'exit_handler', 'condition', and 'graph'.
name (str): the name before conversion. """
name (str): the name before conversion.
"""
if not _pipeline.Pipeline.get_default_pipeline():
raise ValueError('Default pipeline not defined.')
if name is None:
@ -104,23 +107,22 @@ class OpsGroup(object):
class ExitHandler(OpsGroup):
"""Represents an exit handler that is invoked upon exiting a group of ops.
Example usage:
```python
exit_op = ContainerOp(...)
with ExitHandler(exit_op):
op1 = ContainerOp(...)
op2 = ContainerOp(...)
```
Args:
exit_op: An operator invoked at exiting a group of ops.
Raises:
ValueError: Raised if the exit_op is invalid.
Example:
::
exit_op = ContainerOp(...)
with ExitHandler(exit_op):
op1 = ContainerOp(...)
op2 = ContainerOp(...)
"""
def __init__(self, exit_op: _container_op.ContainerOp):
"""Create a new instance of ExitHandler.
Args:
exit_op: an operator invoked at exiting a group of ops.
Raises:
ValueError is the exit_op is invalid.
"""
super(ExitHandler, self).__init__('exit_handler')
if exit_op.dependent_names:
raise ValueError('exit_op cannot depend on any other ops.')
@ -137,20 +139,19 @@ class ExitHandler(OpsGroup):
class Condition(OpsGroup):
"""Represents an condition group with a condition.
Example usage:
```python
with Condition(param1=='pizza', '[param1 is pizza]'):
op1 = ContainerOp(...)
op2 = ContainerOp(...)
```
Args:
condition (ConditionOperator): the condition.
name (str): name of the condition
Example:
::
with Condition(param1=='pizza', '[param1 is pizza]'):
op1 = ContainerOp(...)
op2 = ContainerOp(...)
"""
def __init__(self, condition, name = None):
"""Create a new instance of Condition.
Args:
condition (ConditionOperator): the condition.
name (str): name of the condition
"""
super(Condition, self).__init__('condition', name)
self.condition = condition
@ -158,6 +159,9 @@ class Condition(OpsGroup):
class Graph(OpsGroup):
"""Graph DAG with inputs, recursive_inputs, and outputs.
This is not used directly by the users but auto generated when the graph_component decoration exists
Args:
name: Name of the graph.
"""
def __init__(self, name):
super(Graph, self).__init__(group_type='graph', name=name)
@ -169,13 +173,12 @@ class Graph(OpsGroup):
class ParallelFor(OpsGroup):
"""Represents a parallel for loop over a static set of items.
Example usage:
```python
with dsl.ParallelFor([{'a': 1, 'b': 10}, {'a': 2, 'b': 20}]) as item:
op1 = ContainerOp(..., args=['echo {}'.format(item.a)])
op2 = ContainerOp(..., args=['echo {}'.format(item.b])
```
and op1 would be executed twice, once with args=['echo 1'] and once with args=['echo 2']
Example:
In this case :code:`op1` would be executed twice, once with case :code:`args=['echo 1']` and once with case :code:`args=['echo 2']`::
with dsl.ParallelFor([{'a': 1, 'b': 10}, {'a': 2, 'b': 20}]) as item:
op1 = ContainerOp(..., args=['echo {}'.format(item.a)])
op2 = ContainerOp(..., args=['echo {}'.format(item.b])
"""
TYPE_NAME = 'for_loop'

View File

@ -30,15 +30,15 @@ _pipeline_decorator_handler = None
def pipeline(name : str = None, description : str = None):
"""Decorator of pipeline functions.
Usage:
```python
@pipeline(
name='my awesome pipeline',
description='Is it really awesome?'
)
def my_pipeline(a: PipelineParam, b: PipelineParam):
...
```
Example
::
@pipeline(
name='my awesome pipeline',
description='Is it really awesome?'
)
def my_pipeline(a: PipelineParam, b: PipelineParam):
...
"""
def _pipeline(func):
if name:
@ -71,8 +71,8 @@ class PipelineConf():
Args:
image_pull_secrets: a list of Kubernetes V1LocalObjectReference
For detailed description, check Kubernetes V1LocalObjectReference definition
https://github.com/kubernetes-client/python/blob/master/kubernetes/docs/V1LocalObjectReference.md
For detailed description, check Kubernetes V1LocalObjectReference definition
https://github.com/kubernetes-client/python/blob/master/kubernetes/docs/V1LocalObjectReference.md
"""
self.image_pull_secrets = image_pull_secrets
return self
@ -90,7 +90,7 @@ class PipelineConf():
"""Configures the max number of total parallel pods that can execute at the same time in a workflow.
Args:
max_num_pods (int): max number of total parallel pods.
max_num_pods: max number of total parallel pods.
"""
self.parallelism = max_num_pods
return self
@ -129,10 +129,10 @@ class PipelineConf():
def add_op_transformer(self, transformer):
"""Configures the op_transformers which will be applied to all ops in the pipeline.
The ops can be ResourceOp, VolumenOp, or ContainerOp.
The ops can be ResourceOp, VolumeOp, or ContainerOp.
Args:
transformer: a function that takes a kfp Op as input and returns a kfp Op
transformer: A function that takes a kfp Op as input and returns a kfp Op
"""
self.op_transformers.append(transformer)
@ -142,20 +142,22 @@ class PipelineConf():
@data_passing_method.setter
def data_passing_method(self, value):
'''Sets the object representing the method used for intermediate data passing.
Example::
"""Sets the object representing the method used for intermediate data passing.
from kfp.dsl import PipelineConf, data_passing_methods
from kubernetes.client.models import V1Volume, V1PersistentVolumeClaim
pipeline_conf = PipelineConf()
pipeline_conf.data_passing_method = data_passing_methods.KubernetesVolume(
volume=V1Volume(
name='data',
persistent_volume_claim=V1PersistentVolumeClaim('data-volume'),
),
path_prefix='artifact_data/',
)
'''
Example:
::
from kfp.dsl import PipelineConf, data_passing_methods
from kubernetes.client.models import V1Volume, V1PersistentVolumeClaim
pipeline_conf = PipelineConf()
pipeline_conf.data_passing_method = data_passing_methods.KubernetesVolume(
volume=V1Volume(
name='data',
persistent_volume_claim=V1PersistentVolumeClaim('data-volume'),
),
path_prefix='artifact_data/',
)
"""
self._data_passing_method = value
def get_pipeline_conf():
@ -173,12 +175,13 @@ class Pipeline():
is useful for implementing a compiler. For example, the compiler can use the following
to get the pipeline object and its ops:
```python
with Pipeline() as p:
pipeline_func(*args_list)
Example:
::
traverse(p.ops)
```
with Pipeline() as p:
pipeline_func(*args_list)
traverse(p.ops)
"""
# _default_pipeline is set when it (usually a compiler) runs "with Pipeline()"
@ -271,8 +274,9 @@ class Pipeline():
return self.group_id
def _set_metadata(self, metadata):
'''_set_metadata passes the containerop the metadata information
"""_set_metadata passes the containerop the metadata information
Args:
metadata (ComponentMeta): component metadata
'''
"""
self._metadata = metadata

View File

@ -23,16 +23,14 @@ PipelineParamTuple = namedtuple('PipelineParamTuple', 'name op pattern')
def sanitize_k8s_name(name, allow_capital_underscore=False):
"""From _make_kubernetes_name
sanitize_k8s_name cleans and converts the names in the workflow.
"""Cleans and converts the names in the workflow.
Args:
name: original name,
allow_capital_underscore: whether to allow capital letter and underscore
in this name.
allow_capital_underscore: whether to allow capital letter and underscore in this name.
Returns:
sanitized name.
A sanitized name.
"""
if allow_capital_underscore:
return re.sub('-+', '-', re.sub('[^-_0-9A-Za-z]+', '-', name)).lstrip('-').rstrip('-')
@ -40,13 +38,14 @@ def sanitize_k8s_name(name, allow_capital_underscore=False):
return re.sub('-+', '-', re.sub('[^-0-9a-z]+', '-', name.lower())).lstrip('-').rstrip('-')
def match_serialized_pipelineparam(payload: str):
"""match_serialized_pipelineparam matches the serialized pipelineparam.
def match_serialized_pipelineparam(payload: str) -> List[PipelineParamTuple]:
"""Matches the supplied serialized pipelineparam.
Args:
payloads (str): a string that contains the serialized pipelineparam.
payloads: The search space for the serialized pipelineparams.
Returns:
PipelineParamTuple
The matched pipeline params we found in the supplied payload.
"""
matches = re.findall(r'{{pipelineparam:op=([\w\s_-]*);name=([\w\s_-]+)}}', payload)
param_tuples = []
@ -59,14 +58,15 @@ def match_serialized_pipelineparam(payload: str):
return param_tuples
def _extract_pipelineparams(payloads: str or List[str]):
"""_extract_pipelineparam extract a list of PipelineParam instances from the payload string.
def _extract_pipelineparams(payloads: Union[str, List[str]]) -> List['PipelineParam']:
"""Extracts a list of PipelineParam instances from the payload string.
Note: this function removes all duplicate matches.
Args:
payload (str or list[str]): a string/a list of strings that contains serialized pipelineparams
payload: a string/a list of strings that contains serialized pipelineparams
Return:
List[PipelineParam]
List[]
"""
if isinstance(payloads, str):
payloads = [payloads]
@ -81,7 +81,7 @@ def _extract_pipelineparams(payloads: str or List[str]):
return pipeline_params
def extract_pipelineparams_from_any(payload) -> List['PipelineParam']:
def extract_pipelineparams_from_any(payload: Union['PipelineParam', str, list, tuple, dict]) -> List['PipelineParam']:
"""Recursively extract PipelineParam instances or serialized string from any object or list of objects.
Args:
@ -135,24 +135,23 @@ class PipelineParam(object):
A PipelineParam object can be used as a pipeline function argument so that it will be a
pipeline parameter that shows up in ML Pipelines system UI. It can also represent an intermediate
value passed between components.
"""
def __init__(self, name: str, op_name: str=None, value: str=None, param_type : Union[str, Dict] = None, pattern: str=None):
"""Create a new instance of PipelineParam.
Args:
name: name of the pipeline parameter.
op_name: the name of the operation that produces the PipelineParam. None means
it is not produced by any operator, so if None, either user constructs it
directly (for providing an immediate value), or it is a pipeline function
argument.
value: The actual value of the PipelineParam. If provided, the PipelineParam is
"resolved" immediately. For now, we support string only.
param_type: the type of the PipelineParam.
pattern: the serialized string regex pattern this pipeline parameter created from.
Raises: ValueError in name or op_name contains invalid characters, or both op_name
and value are set.
"""
Args:
name: name of the pipeline parameter.
op_name: the name of the operation that produces the PipelineParam. None means
it is not produced by any operator, so if None, either user constructs it
directly (for providing an immediate value), or it is a pipeline function
argument.
value: The actual value of the PipelineParam. If provided, the PipelineParam is
"resolved" immediately. For now, we support string only.
param_type: the type of the PipelineParam.
pattern: the serialized string regex pattern this pipeline parameter created from.
Raises: ValueError in name or op_name contains invalid characters, or both op_name
and value are set.
"""
def __init__(self, name: str, op_name: str=None, value: str=None, param_type : Union[str, Dict] = None, pattern: str=None):
valid_name_regex = r'^[A-Za-z][A-Za-z0-9\s_-]*$'
if not re.match(valid_name_regex, name):
raise ValueError('Only letters, numbers, spaces, "_", and "-" are allowed in name. Must begin with a letter. '

View File

@ -38,21 +38,19 @@ class PipelineVolume(V1Volume):
A PipelineVolume object can be used as an extention of the pipeline
function's filesystem. It may then be passed between ContainerOps,
exposing dependencies.
Args:
pvc: The name of an existing PVC
volume: Create a deep copy out of a V1Volume or PipelineVolume with no deps
Raises:
ValueError: If volume is not None and kwargs is not None
If pvc is not None and kwargs.pop("name") is not None
"""
def __init__(self,
pvc: str = None,
volume: V1Volume = None,
**kwargs):
"""Create a new instance of PipelineVolume.
Args:
pvc: The name of an existing PVC
volume: Create a deep copy out of a V1Volume or PipelineVolume
with no deps
Raises:
ValueError: if volume is not None and kwargs is not None
if pvc is not None and kwargs.pop("name") is not None
"""
if volume and kwargs:
raise ValueError("You can't pass a volume along with other "
"kwargs.")
@ -91,6 +89,7 @@ class PipelineVolume(V1Volume):
def after(self, *ops):
"""Creates a duplicate of self with the required dependecies excluding
the redundant dependenices.
Args:
*ops: Pipeline operators to add as dependencies
"""

View File

@ -63,7 +63,27 @@ class Resource(object):
class ResourceOp(BaseOp):
"""Represents an op which will be translated into a resource template"""
"""Represents an op which will be translated into a resource template
Args:
k8s_resource: A k8s resource which will be submitted to the cluster
action: One of "create"/"delete"/"apply"/"patch"
(default is "create")
merge_strategy: The merge strategy for the "apply" action
success_condition: The successCondition of the template
failure_condition: The failureCondition of the template
For more info see:
https://github.com/argoproj/argo/blob/master/examples/k8s-jobs.yaml
attribute_outputs: Maps output labels to resource's json paths,
similarly to file_outputs of ContainerOp
kwargs: name, sidecars. See BaseOp definition
Raises:
ValueError: if not inside a pipeline
if the name is an invalid string
if no k8s_resource is provided
if merge_strategy is set without "apply" action
"""
def __init__(self,
k8s_resource=None,
@ -73,26 +93,6 @@ class ResourceOp(BaseOp):
failure_condition: str = None,
attribute_outputs: Dict[str, str] = None,
**kwargs):
"""Create a new instance of ResourceOp.
Args:
k8s_resource: A k8s resource which will be submitted to the cluster
action: One of "create"/"delete"/"apply"/"patch"
(default is "create")
merge_strategy: The merge strategy for the "apply" action
success_condition: The successCondition of the template
failure_condition: The failureCondition of the template
For more info see:
https://github.com/argoproj/argo/blob/master/examples/k8s-jobs.yaml
attribute_outputs: Maps output labels to resource's json paths,
similarly to file_outputs of ContainerOp
kwargs: name, sidecars. See BaseOp definition
Raises:
ValueError: if not inside a pipeline
if the name is an invalid string
if no k8s_resource is provided
if merge_strategy is set without "apply" action
"""
super().__init__(**kwargs)
self.attrs_with_pipelineparams = list(self.attrs_with_pipelineparams)

View File

@ -35,6 +35,29 @@ VOLUME_MODE_ROM = ["ReadOnlyMany"]
class VolumeOp(ResourceOp):
"""Represents an op which will be translated into a resource template
which will be creating a PVC.
Args:
resource_name: A desired name for the PVC which will be created
size: The size of the PVC which will be created
storage_class: The storage class to use for the dynamically created PVC
modes: The access modes for the PVC
annotations: Annotations to be patched in the PVC
data_source: May be a V1TypedLocalObjectReference, and then it is
used in the data_source field of the PVC as is. Can also be a
string/PipelineParam, and in that case it will be used as a
VolumeSnapshot name (Alpha feature)
volume_name: VolumeName is the binding reference to the PersistentVolume
backing this claim.
kwargs: See :py:class:`kfp.dsl.ResourceOp`
Raises:
ValueError: if k8s_resource is provided along with other arguments
if k8s_resource is not a V1PersistentVolumeClaim
if size is None
if size is an invalid memory string (when not a
PipelineParam)
if data_source is not one of (str, PipelineParam,
V1TypedLocalObjectReference)
"""
def __init__(self,
@ -46,31 +69,6 @@ class VolumeOp(ResourceOp):
data_source=None,
volume_name=None,
**kwargs):
"""Create a new instance of VolumeOp.
Args:
resource_name: A desired name for the PVC which will be created
size: The size of the PVC which will be created
storage_class: The storage class to use for the dynamically created
PVC
modes: The access modes for the PVC
annotations: Annotations to be patched in the PVC
data_source: May be a V1TypedLocalObjectReference, and then it is
used in the data_source field of the PVC as is. Can also be a
string/PipelineParam, and in that case it will be used as a
VolumeSnapshot name (Alpha feature)
volume_name: VolumeName is the binding reference to the PersistentVolume
backing this claim.
kwargs: See ResourceOp definition
Raises:
ValueError: if k8s_resource is provided along with other arguments
if k8s_resource is not a V1PersistentVolumeClaim
if size is None
if size is an invalid memory string (when not a
PipelineParam)
if data_source is not one of (str, PipelineParam,
V1TypedLocalObjectReference)
"""
# Add size to attribute outputs
self.attribute_outputs = {"size": "{.status.capacity.storage}"}

View File

@ -29,6 +29,22 @@ class VolumeSnapshotOp(ResourceOp):
At the time that this feature is written, VolumeSnapshots are an Alpha
feature in Kubernetes. You should check with your Kubernetes Cluster admin
if they have it enabled.
Args:
resource_name: A desired name for the VolumeSnapshot which will be created
pvc: The name of the PVC which will be snapshotted
snapshot_class: The snapshot class to use for the dynamically created VolumeSnapshot
annotations: Annotations to be patched in the VolumeSnapshot
volume: An instance of V1Volume
kwargs: See :py:class:`kfp.dsl.ResourceOp`
Raises:
ValueError: if k8s_resource is provided along with other arguments
if k8s_resource is not a VolumeSnapshot
if pvc and volume are None
if pvc and volume are not None
if volume does not reference a PVC
"""
def __init__(self,
@ -38,24 +54,6 @@ class VolumeSnapshotOp(ResourceOp):
annotations: Dict[str, str] = None,
volume: V1Volume = None,
**kwargs):
"""Create a new instance of VolumeSnapshotOp.
Args:
resource_name: A desired name for the VolumeSnapshot which will be
created
pvc: The name of the PVC which will be snapshotted
snapshot_class: The snapshot class to use for the dynamically
created VolumeSnapshot
annotations: Annotations to be patched in the VolumeSnapshot
volume: An instance of V1Volume
kwargs: See ResourceOp definition
Raises:
ValueError: if k8s_resource is provided along with other arguments
if k8s_resource is not a VolumeSnapshot
if pvc and volume are None
if pvc and volume are not None
if volume does not reference a PVC
"""
# Add size to output params
self.attribute_outputs = {"size": "{.status.restoreSize}"}
# Add default success_condition if None provided

View File

@ -17,9 +17,9 @@ from kubernetes.client import V1Toleration, V1Affinity, V1NodeAffinity, \
def use_gcp_secret(secret_name='user-gcp-sa', secret_file_path_in_volume=None, volume_name=None, secret_volume_mount_path='/secret/gcp-credentials'):
"""An operator that configures the container to use GCP service account by service account key
stored in a Kubernetes secret.
stored in a Kubernetes secret.
For cluster setup and alternatives to using service account key, check https://www.kubeflow.org/docs/gke/authentication-pipelines/.
For cluster setup and alternatives to using service account key, check https://www.kubeflow.org/docs/gke/authentication-pipelines/.
"""
# permitted values for secret_name = ['admin-gcp-sa', 'user-gcp-sa']
@ -95,9 +95,10 @@ def use_preemptible_nodepool(toleration: V1Toleration = V1Toleration(effect='NoS
value='true'),
hard_constraint: bool = False):
"""An operator that configures the GKE preemptible in a container op.
Args:
toleration (V1Toleration): toleration to pods, default is the preemptible label.
hard_constraint (bool): the constraint of scheduling the pods on preemptible
toleration: toleration to pods, default is the preemptible label.
hard_constraint: the constraint of scheduling the pods on preemptible
nodepools is hard. (Default: False)
"""
@ -127,7 +128,7 @@ def add_gpu_toleration(toleration: V1Toleration = V1Toleration(
"""An operator that configures the GKE GPU nodes in a container op.
Args:
toleration {V1Toleration} -- toleration to pods, default is the nvidia.com/gpu label.
toleration: toleration to pods, default is the nvidia.com/gpu label.
"""
def _set_toleration(task):

View File

@ -1,9 +1,11 @@
def mount_pvc(pvc_name='pipeline-claim', volume_name='pipeline', volume_mount_path='/mnt/pipeline'):
"""
Modifier function to apply to a Container Op to simplify volume, volume mount addition and
enable better reuse of volumes, volume claims across container ops.
Usage:
"""Modifier function to apply to a Container Op to simplify volume, volume mount addition and
enable better reuse of volumes, volume claims across container ops.
Example:
::
train = train_op(...)
train.apply(mount_pvc('claim-name', 'pipeline', '/mnt/pipeline'))
"""