docs(sdk): refresh SDK public API docstrings (#8057)

* add module-level docstrings

* update compiler docstrings

* update registry module docstrings

* add BaseComponent and children to public api, but discourage use

* update artifact docstrings and type annotations

* update dsl docstrings

* update client docstrings

* clean up kfp.__init__

* add dsl placeholder docstrings

* many more docstring updates

* document type aliases in dsl module
This commit is contained in:
Connor McCarthy 2022-07-21 13:51:18 -06:00 committed by GitHub
parent 693c3b1986
commit c6a5e387dd
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
27 changed files with 880 additions and 688 deletions

View File

@ -9,7 +9,6 @@ repos:
- id: trailing-whitespace
- id: debug-statements
- id: check-merge-conflict
- id: check-docstring-first
- id: name-tests-test
- id: double-quote-string-fixer
- id: no-commit-to-branch

View File

@ -6,3 +6,6 @@ kfp.dsl
:undoc-members:
:show-inheritance:
:noindex:
.. autodata:: Input
.. autodata:: Output

View File

@ -20,9 +20,4 @@ __version__ = '2.0.0-beta.1'
TYPE_CHECK = True
from kfp.client import Client # pylint: disable=wrong-import-position
# TODO: clean up COMPILING_FOR_V2
# COMPILING_FOR_V2 is True when using kfp.compiler or use (v1) kfp.compiler
# with V2_COMPATIBLE or V2_ENGINE mode
COMPILING_FOR_V2 = False
from kfp.client import Client

View File

@ -128,7 +128,7 @@ class TestGetParamDescr(unittest.TestCase):
self.assertEqual(
host_descr,
"The host name to use to talk to Kubeflow Pipelines. If not set, the in-cluster service DNS name will be used, which only works if the current environment is a pod in the same cluster (such as a Jupyter instance spawned by Kubeflow's JupyterHub). Set the host based on https://www.kubeflow.org/docs/components/pipelines/sdk/connect-api/"
"Host name to use to talk to Kubeflow Pipelines. If not set, the in-cluster service DNS name will be used, which only works if the current environment is a pod in the same cluster (such as a Jupyter instance spawned by Kubeflow's JupyterHub). (`More information on connecting. <https://www.kubeflow.org/docs/components/pipelines/sdk/connect-api/>`_)"
)

View File

@ -1,3 +1,4 @@
"""The `kfp.client` module contains the KFP API client."""
# Copyright 2022 The Kubeflow Authors
#
# Licensed under the Apache License, Version 2.0 (the "License");

File diff suppressed because it is too large Load Diff

View File

@ -1,3 +1,5 @@
"""The `kfp.compiler` module contains the compiler for compiling pipeline
definitions."""
# Copyright 2020 The Kubeflow Authors
#
# Licensed under the Apache License, Version 2.0 (the "License");

View File

@ -32,6 +32,7 @@ from kfp.compiler.pipeline_spec_builder import GroupOrTaskType
from kfp.components import base_component
from kfp.components import component_factory
from kfp.components import for_loop
from kfp.components import pipeline_channel
from kfp.components import pipeline_context
from kfp.components import pipeline_task
from kfp.components import structures
@ -43,27 +44,24 @@ import yaml
class Compiler:
"""Experimental DSL compiler that targets the PipelineSpec IR.
"""Compiles pipelines composed using the KFP SDK DSL to a YAML pipeline
definition.
It compiles pipeline function into PipelineSpec json string.
PipelineSpec is the IR protobuf message that defines a pipeline:
https://github.com/kubeflow/pipelines/blob/237795539f7b85bac77435e2464367226ee19391/api/v2alpha1/pipeline_spec.proto#L8
In this initial implementation, we only support components authored through
Component yaml spec. And we don't support advanced features like conditions,
static and dynamic loops, etc.
The pipeline definition is `PipelineSpec IR <https://github.com/kubeflow/pipelines/blob/2060e38c5591806d657d85b53eed2eef2e5de2ae/api/v2alpha1/pipeline_spec.proto#L50>`_, the protobuf message that defines a pipeline.
Example::
Example:
::
@dsl.pipeline(
name='name',
description='description',
)
def my_pipeline(a: int = 1, b: str = "default value"):
def my_pipeline(a: int, b: str = 'default value'):
...
kfp.compiler.Compiler().compile(
pipeline_func=my_pipeline,
package_path='path/to/pipeline.json',
package_path='path/to/pipeline.yaml',
pipeline_parameters={'a': 1},
)
"""
@ -72,20 +70,17 @@ class Compiler:
pipeline_func: Union[Callable[..., Any], base_component.BaseComponent],
package_path: str,
pipeline_name: Optional[str] = None,
pipeline_parameters: Optional[Mapping[str, Any]] = None,
pipeline_parameters: Optional[Dict[str, Any]] = None,
type_check: bool = True,
) -> None:
"""Compile the given pipeline function or component into pipeline job
json.
"""Compiles the pipeline or component function into IR YAML.
Args:
pipeline_func: Pipeline function with @dsl.pipeline or component with @dsl.component decorator.
package_path: The output pipeline spec .yaml file path. For example, "~/pipeline.yaml" or "~/component.yaml".
pipeline_name: Optional; the name of the pipeline.
pipeline_parameters: Optional; the mapping from parameter names to
values.
type_check: Optional; whether to enable the type check or not.
Default is True.
pipeline_func: Pipeline function constructed with the ``@dsl.pipeline`` or component constructed with the ``@dsl.component`` decorator.
package_path: Output YAML file path. For example, ``'~/my_pipeline.yaml'`` or ``'~/my_component.yaml'``.
pipeline_name: Name of the pipeline.
pipeline_parameters: Map of parameter names to argument values.
type_check: Whether to enable type checking of component interfaces during compilation.
"""
with type_utils.TypeCheckManager(enable=type_check):
@ -148,7 +143,7 @@ class Compiler:
builder.make_invalid_input_type_error_msg(
arg_name, arg_type))
args_list.append(
dsl.PipelineParameterChannel(
pipeline_channel.PipelineParameterChannel(
name=arg_name, channel_type=arg_type))
with pipeline_context.Pipeline(pipeline_name) as dsl_pipeline:
@ -172,7 +167,7 @@ class Compiler:
# Fill in the default values.
args_list_with_defaults = [
dsl.PipelineParameterChannel(
pipeline_channel.PipelineParameterChannel(
name=input_name,
channel_type=input_spec.type,
value=pipeline_parameters_override.get(input_name) or
@ -217,7 +212,7 @@ class Compiler:
raise TypeError(
builder.make_invalid_input_type_error_msg(
arg_name, arg_type))
args_dict[arg_name] = dsl.PipelineParameterChannel(
args_dict[arg_name] = pipeline_channel.PipelineParameterChannel(
name=arg_name, channel_type=arg_type)
task = pipeline_task.PipelineTask(component_spec, args_dict)
@ -232,7 +227,7 @@ class Compiler:
# Fill in the default values.
args_list_with_defaults = [
dsl.PipelineParameterChannel(
pipeline_channel.PipelineParameterChannel(
name=input_name,
channel_type=input_spec.type,
value=input_spec.default,
@ -287,7 +282,7 @@ class Compiler:
def _create_pipeline_spec(
self,
pipeline_args: List[dsl.PipelineChannel],
pipeline_args: List[pipeline_channel.PipelineChannel],
pipeline: pipeline_context.Pipeline,
) -> pipeline_spec_pb2.PipelineSpec:
"""Creates a pipeline spec object.
@ -436,7 +431,7 @@ class Compiler:
def _get_condition_channels_for_tasks(
self,
root_group: tasks_group.TasksGroup,
) -> Mapping[str, Set[dsl.PipelineChannel]]:
) -> Mapping[str, Set[pipeline_channel.PipelineChannel]]:
"""Gets channels referenced in conditions of tasks' parents.
Args:
@ -457,11 +452,11 @@ class Compiler:
new_current_conditions_channels = list(
current_conditions_channels)
if isinstance(group.condition.left_operand,
dsl.PipelineChannel):
pipeline_channel.PipelineChannel):
new_current_conditions_channels.append(
group.condition.left_operand)
if isinstance(group.condition.right_operand,
dsl.PipelineChannel):
pipeline_channel.PipelineChannel):
new_current_conditions_channels.append(
group.condition.right_operand)
for task in group.tasks:
@ -477,13 +472,14 @@ class Compiler:
def _get_inputs_for_all_groups(
self,
pipeline: pipeline_context.Pipeline,
pipeline_args: List[dsl.PipelineChannel],
pipeline_args: List[pipeline_channel.PipelineChannel],
root_group: tasks_group.TasksGroup,
task_name_to_parent_groups: Mapping[str, List[GroupOrTaskType]],
group_name_to_parent_groups: Mapping[str, List[tasks_group.TasksGroup]],
condition_channels: Mapping[str, Set[dsl.PipelineParameterChannel]],
condition_channels: Mapping[
str, Set[pipeline_channel.PipelineParameterChannel]],
name_to_for_loop_group: Mapping[str, dsl.ParallelFor],
) -> Mapping[str, List[Tuple[dsl.PipelineChannel, str]]]:
) -> Mapping[str, List[Tuple[pipeline_channel.PipelineChannel, str]]]:
"""Get inputs and outputs of each group and op.
Args:
@ -567,7 +563,7 @@ class Compiler:
else:
channel_to_add = channel_to_add.items_or_pipeline_channel
if isinstance(channel_to_add, dsl.PipelineChannel):
if isinstance(channel_to_add, pipeline_channel.PipelineChannel):
channels_to_add.append(channel_to_add)
if channel.task_name:
@ -710,7 +706,7 @@ class Compiler:
task_name_to_parent_groups: Mapping[str, List[GroupOrTaskType]],
group_name_to_parent_groups: Mapping[str, List[tasks_group.TasksGroup]],
group_name_to_group: Mapping[str, tasks_group.TasksGroup],
condition_channels: Dict[str, dsl.PipelineChannel],
condition_channels: Dict[str, pipeline_channel.PipelineChannel],
) -> Mapping[str, List[GroupOrTaskType]]:
"""Gets dependent groups and tasks for all tasks and groups.

View File

@ -122,18 +122,18 @@ def build_task_spec_for_task(
"""
pipeline_task_spec = pipeline_spec_pb2.PipelineTaskSpec()
pipeline_task_spec.task_info.name = (
task.task_spec.display_name or task.name)
task._task_spec.display_name or task.name)
# Use task.name for component_ref.name because we may customize component
# spec for individual tasks to work around the lack of optional inputs
# support in IR.
pipeline_task_spec.component_ref.name = (
component_utils.sanitize_component_name(task.name))
pipeline_task_spec.caching_options.enable_cache = (
task.task_spec.enable_caching)
task._task_spec.enable_caching)
if task.task_spec.retry_policy is not None:
if task._task_spec.retry_policy is not None:
pipeline_task_spec.retry_policy.CopyFrom(
task.task_spec.retry_policy.to_proto())
task._task_spec.retry_policy.to_proto())
for input_name, input_value in task.inputs.items():
if isinstance(input_value, pipeline_channel.PipelineArtifactChannel):
@ -975,7 +975,7 @@ def build_spec_by_group(
pipeline_spec: pipeline_spec_pb2.PipelineSpec,
deployment_config: pipeline_spec_pb2.PipelineDeploymentConfig,
group: tasks_group.TasksGroup,
inputs: Mapping[str, List[Tuple[dsl.PipelineChannel, str]]],
inputs: Mapping[str, List[Tuple[pipeline_channel.PipelineChannel, str]]],
dependencies: Dict[str, List[GroupOrTaskType]],
rootgroup_name: str,
task_name_to_parent_groups: Mapping[str, List[GroupOrTaskType]],
@ -1127,7 +1127,7 @@ def build_spec_by_group(
subgroup.condition.left_operand,
subgroup.condition.right_operand,
]:
if isinstance(operand, dsl.PipelineChannel):
if isinstance(operand, pipeline_channel.PipelineChannel):
condition_subgroup_channels.append(operand)
subgroup_component_spec = builder.build_component_spec_for_group(
@ -1269,7 +1269,8 @@ def validate_pipeline_name(name: str) -> None:
def create_pipeline_spec_for_component(
pipeline_name: str, pipeline_args: List[dsl.PipelineChannel],
pipeline_name: str,
pipeline_args: List[pipeline_channel.PipelineChannel],
task_group: tasks_group.TasksGroup) -> pipeline_spec_pb2.PipelineSpec:
"""Creates a pipeline spec object for a component (single-component
pipeline).

View File

@ -1,3 +1,5 @@
"""The `kfp.components` module contains functions for loading components from
compiled YAML."""
# Copyright 2021 The Kubeflow Authors
#
# Licensed under the Apache License, Version 2.0 (the "License");
@ -16,8 +18,14 @@ __all__ = [
'load_component_from_text',
'load_component_from_file',
'load_component_from_url',
'PythonComponent',
'BaseComponent',
'YamlComponent',
]
from kfp.components.base_component import BaseComponent
from kfp.components.python_component import PythonComponent
from kfp.components.yaml_component import load_component_from_file
from kfp.components.yaml_component import load_component_from_text
from kfp.components.yaml_component import load_component_from_url
from kfp.components.yaml_component import YamlComponent

View File

@ -20,12 +20,14 @@ from kfp.components import structures
from kfp.components.types import type_utils
class BaseComponent(metaclass=abc.ABCMeta):
class BaseComponent(abc.ABC):
"""Base class for a component.
**Note:** ``BaseComponent`` is not intended to be used to construct components directly. Use ``@kfp.dsl.component`` or ``kfp.components.load_component_from_*()`` instead.
Attributes:
name: The name of the component.
component_spec: The component definition.
name: Name of the component.
component_spec: Component definition.
"""
def __init__(self, component_spec: structures.ComponentSpec):
@ -93,11 +95,5 @@ class BaseComponent(metaclass=abc.ABCMeta):
@abc.abstractmethod
def execute(self, **kwargs):
"""Executes the component given the required inputs.
Subclasses of BaseComponent must override this abstract method
in order to be instantiated. For Python function-based
component, the implementation of this method could be calling
the function. For "Bring your own container" component, the
implementation of this method could be `docker run`.
"""
"""Executes the component locally if implemented by the inheriting
subclass."""

View File

@ -28,74 +28,76 @@ def component(func: Optional[Callable] = None,
output_component_file: Optional[str] = None,
install_kfp_package: bool = True,
kfp_package_path: Optional[str] = None):
"""Decorator for Python-function based components in KFP v2.
"""Decorator for Python-function based components.
A KFP v2 component can either be a lightweight component, or a containerized
one.
A KFP component can either be a lightweight component or a containerized
component.
If target_image is not specified, this function creates a lightweight
If ``target_image`` is not specified, this function creates a lightweight
component. A lightweight component is a self-contained Python function that
includes all necessary imports and dependencies. In lightweight components,
packages_to_install will be used to install dependencies at runtime. The
parameters install_kfp_package and kfp_package_path can be used to control
how KFP should be installed when the lightweight component is executed.
``packages_to_install`` will be used to install dependencies at runtime. The
parameters ``install_kfp_package`` and ``kfp_package_path`` can be used to control
how and from where KFP should be installed when the lightweight component is executed.
If target_image is specified, this function creates a component definition
based around the target_image. The assumption is that the function in func
will be packaged by KFP into this target_image. Use the KFP CLI's `build`
command to package func into target_image.
Example usage:
from kfp import dsl
@dsl.component
def my_function_one(input: str, output: Output[Model]):
...
@dsl.component(
base_image='python:3.9',
output_component_file='my_function.yaml'
)
def my_function_two(input: Input[Mode])):
...
@dsl.pipeline(pipeline_root='...',
name='my-pipeline')
def pipeline():
my_function_one_task = my_function_one(input=...)
my_function_two_task = my_function_two(input=my_function_one_task.outputs..
If ``target_image`` is specified, this function creates a component definition
based around the ``target_image``. The assumption is that the function in ``func``
will be packaged by KFP into this ``target_image``. You can use the KFP CLI's ``build``
command to package the function into ``target_image``.
Args:
func: The python function to create a component from. The function
func: Python function from which to create a component. The function
should have type annotations for all its arguments, indicating how
it is intended to be used (e.g. as an input/output Artifact object,
each argument is intended to be used (e.g. as an input/output artifact,
a plain parameter, or a path to a file).
base_image: The image to use when executing func. It should
base_image: Image to use when executing the Python function. It should
contain a default Python interpreter that is compatible with KFP.
packages_to_install: A list of optional packages to install before
executing func. These will always be installed at component runtime.
pip_index_urls: Python Package Index base URLS from which to
install `packages_to_install`. Defaults to installing from only
"https://pypi.org/simple". For more information, see:
https://pip.pypa.io/en/stable/cli/pip_install/#cmdoption-0.
target_image: Image to when creating containerized components.
packages_to_install: List of packages to install before
executing the Python function. These will always be installed at component runtime.
pip_index_urls: Python Package Index base URLs from which to
install ``packages_to_install``. Defaults to installing from only PyPI
(``'https://pypi.org/simple'``). For more information, see `pip install docs <https://pip.pypa.io/en/stable/cli/pip_install/#cmdoption-0>`_.
output_component_file: If specified, this function will write a
shareable/loadable version of the component spec into this file.
install_kfp_package: Specifies if we should add a KFP Python package to
packages_to_install. Lightweight Python functions always require
an installation of KFP in base_image to work. If you specify
a base_image that already contains KFP, you can set this to False.
This flag is ignored when target_image is specified, which implies
we're building a containerized component. Containerized components
**Warning:** This compilation approach is deprecated.
install_kfp_package: Specifies if the KFP SDK should add the ``kfp`` Python package to
``packages_to_install``. Lightweight Python functions always require
an installation of KFP in ``base_image`` to work. If you specify
a ``base_image`` that already contains KFP, you can set this to ``False``.
This flag is ignored when ``target_image`` is specified, which implies
a choice to build a containerized component. Containerized components
will always install KFP as part of the build process.
kfp_package_path: Specifies the location from which to install KFP. By
default, this will try to install from PyPi using the same version
as that used when this component was created. KFP developers can
choose to override this to point to a Github pull request or
other pip-compatible location when testing changes to lightweight
Python functions.
default, this will try to install from PyPI using the same version
as that used when this component was created. Component authors can
choose to override this to point to a GitHub pull request or
other pip-compatible package server.
Returns:
A component task factory that can be used in pipeline definitions.
Example:
::
from kfp import dsl
@dsl.component
def my_function_one(input: str, output: Output[Model]):
...
@dsl.component(
base_image='python:3.9',
output_component_file='my_function.yaml'
)
def my_function_two(input: Input[Mode])):
...
@dsl.pipeline(name='my-pipeline', pipeline_root='...')
def pipeline():
my_function_one_task = my_function_one(input=...)
my_function_two_task = my_function_two(input=my_function_one_task.outputs)
"""
if output_component_file is not None:
warnings.warn(

View File

@ -32,21 +32,27 @@ def importer(
reimport: bool = False,
metadata: Optional[Mapping[str, Any]] = None,
) -> pipeline_task.PipelineTask:
"""dsl.importer for importing an existing artifact. Only for v2 pipeline.
"""Imports an existing artifact for use in a downstream component.
Args:
artifact_uri: The artifact uri to import from.
artifact_type_schema: The user specified artifact type schema of the
artifact to be imported.
reimport: Whether to reimport the artifact. Defaults to False.
artifact_uri: The URI of the artifact to import.
artifact_class: The artifact class being imported.
reimport: Whether to reimport the artifact.
metadata: Properties of the artifact.
Returns:
A PipelineTask instance.
A task with the artifact accessible via its ``.output`` attribute.
Raises:
ValueError if the passed in artifact_uri is neither a PipelineParam nor a
constant string value.
Examples::
@dsl.pipeline(name='pipeline-with-importer')
def pipeline_with_importer():
importer1 = importer(
artifact_uri='gs://ml-pipeline-playground/shakespeare1.txt',
artifact_class=Dataset,
reimport=False)
train(dataset=importer1.output)
"""
component_spec = structures.ComponentSpec(
name='importer',

View File

@ -130,7 +130,7 @@ class PipelineChannel(abc.ABC):
We make repr return the placeholder string so that if someone
uses str()-based serialization of complex objects containing
`PipelineChannel`, it works properly. (e.g. str([1, 2, 3,
kfp.dsl.PipelineParameterChannel("aaa"), 4, 5, 6,]))
kfp.pipeline_channel.PipelineParameterChannel("aaa"), 4, 5, 6,]))
"""
return str(self)

View File

@ -28,8 +28,8 @@ pipeline_decorator_handler = None
def pipeline(name: Optional[str] = None,
description: Optional[str] = None,
pipeline_root: Optional[str] = None):
"""Decorator of pipeline functions.
pipeline_root: Optional[str] = None) -> Callable:
"""Decorator used to construct a pipeline.
Example
::
@ -43,12 +43,9 @@ def pipeline(name: Optional[str] = None,
...
Args:
name: The pipeline name. Default to a sanitized version of the function
name.
description: Optionally, a human-readable description of the pipeline.
pipeline_root: The root directory to generate input/output URI under this
pipeline. This is required if input/output URI placeholder is used in
this pipeline.
name: The pipeline name. Defaults to a sanitized version of the decorated function name.
description: A human-readable description of the pipeline.
pipeline_root: The root directory from which to read input and output parameters and artifacts.
"""
if callable(name):
strikethrough_decorator = '\u0336'.join('@pipeline') + '\u0336'
@ -62,7 +59,7 @@ def pipeline(name: Optional[str] = None,
...
"""))
def _pipeline(func: Callable):
def _pipeline(func: Callable) -> Callable:
func._is_pipeline = True
if name:
func._component_human_name = name
@ -138,15 +135,15 @@ class Pipeline:
add_to_group=not getattr(task, 'is_exit_handler', False))
self._old_register_task_handler = (
pipeline_task.PipelineTask.register_task_handler)
pipeline_task.PipelineTask.register_task_handler = (
pipeline_task.PipelineTask._register_task_handler)
pipeline_task.PipelineTask._register_task_handler = (
register_task_and_generate_id)
return self
def __exit__(self, *unused_args):
Pipeline._default_pipeline = None
pipeline_task.PipelineTask.register_task_handler = (
pipeline_task.PipelineTask._register_task_handler = (
self._old_register_task_handler)
def add_task(

View File

@ -33,26 +33,34 @@ def create_pipeline_task(
class PipelineTask:
"""Represents a pipeline task -- an instantiated component.
"""Represents a pipeline task (instantiated component).
Replaces `ContainerOp`. Holds operations available on a task object, such as
`.after()`, `.set_memory_limit()`, `enable_caching()`, etc.
**Note:** ``PipelineTask`` should not be constructed by pipeline authors directly, but instead obtained via an instantiated component (see example).
Attributes:
name: The name of the task. Unique within its parent group.
outputs:
task_spec: The task spec of the task.
component_spec: The component spec of the task.
container_spec: The resolved container spec of the task. Only one of
container_spec and importer_spec should be filled.
importer_spec: The resolved importer spec of the task. Only one of
container_spec and importer_spec should be filled.
Replaces ``ContainerOp`` from ``kfp`` v1. Holds operations available on a task object, such as
``.after()``, ``.set_memory_limit()``, ``.enable_caching()``, etc.
Args:
component_spec: The component definition.
args: The dictionary of arguments on which the component was called to instantiate this task.
Example:
::
@dsl.component
def identity(message: str) -> str:
return message
@dsl.pipeline(name='my_pipeline')
def my_pipeline():
# task is an instance of PipelineTask
task = identity(message='my string')
"""
# Fallback behavior for compiling a component. This should be overriden by
# pipeline `register_task_and_generate_id` if compiling a pipeline (more
# than one component).
register_task_handler = lambda task: utils.maybe_rename_for_k8s(
_register_task_handler = lambda task: utils.maybe_rename_for_k8s(
task.component_spec.name)
def __init__(
@ -60,12 +68,7 @@ class PipelineTask:
component_spec: structures.ComponentSpec,
args: Mapping[str, Any],
):
"""Initilizes a PipelineTask instance.
Args:
component_spec: The component definition.
args: The dictionary of component arguments.
"""
"""Initilizes a PipelineTask instance."""
args = args or {}
for input_name, argument_value in args.items():
@ -108,8 +111,8 @@ class PipelineTask:
self.component_spec = component_spec
self.task_spec = structures.TaskSpec(
name=self.register_task_handler(),
self._task_spec = structures.TaskSpec(
name=self._register_task_handler(),
inputs={input_name: value for input_name, value in args.items()},
dependent_tasks=[],
component_ref=component_spec.name,
@ -133,7 +136,7 @@ class PipelineTask:
output_name: pipeline_channel.create_pipeline_channel(
name=output_name,
channel_type=output_spec.type,
task_name=self.task_spec.name,
task_name=self._task_spec.name,
) for output_name, output_spec in (
component_spec.outputs or {}).items()
}
@ -150,38 +153,51 @@ class PipelineTask:
@property
def name(self) -> str:
"""Returns the name of the task."""
return self.task_spec.name
"""The name of the task.
Unique within its parent group.
"""
return self._task_spec.name
@property
def inputs(
self
) -> List[Union[type_utils.PARAMETER_TYPES,
pipeline_channel.PipelineChannel]]:
"""Returns the list of actual inputs passed to the task."""
"""The list of actual inputs passed to the task."""
return self._inputs
@property
def channel_inputs(self) -> List[pipeline_channel.PipelineChannel]:
"""Returns the list of all PipelineChannels passed to the task."""
"""The list of all channel inputs passed to the task.
:meta private:
"""
return self._channel_inputs
@property
def output(self) -> pipeline_channel.PipelineChannel:
"""Returns the single output object (a PipelineChannel) of the task."""
"""The single output of the task.
Used when a task has exactly one output parameter.
"""
if len(self._outputs) != 1:
raise AttributeError
return list(self._outputs.values())[0]
@property
def outputs(self) -> Mapping[str, pipeline_channel.PipelineChannel]:
"""Returns the dictionary of outputs (PipelineChannels) of the task."""
"""The dictionary of outputs of the task.
Used when a task has more the one output or uses an
``OutputPath`` or ``Output[Artifact]`` type annotation.
"""
return self._outputs
@property
def dependent_tasks(self) -> List[str]:
"""Returns the list of dependent task names."""
return self.task_spec.dependent_tasks
"""A list of the dependent task names."""
return self._task_spec.dependent_tasks
def _resolve_command_line_and_arguments(
self,
@ -323,23 +339,22 @@ class PipelineTask:
return resolved_container_spec
def set_caching_options(self, enable_caching: bool) -> 'PipelineTask':
"""Sets caching options for the Pipeline task.
"""Sets caching options for the task.
Args:
enable_caching: Whether or not to enable caching for this task.
enable_caching: Whether to enable caching.
Returns:
Self return to allow chained setting calls.
"""
self.task_spec.enable_caching = enable_caching
self._task_spec.enable_caching = enable_caching
return self
def set_cpu_limit(self, cpu: str) -> 'PipelineTask':
"""Set cpu limit (maximum) for this operator.
"""Sets CPU limit (maximum) for the task.
Args:
cpu(str): A string which can be a
number or a number followed by "m", whichmeans 1/1000.
cpu: Maximum CPU requests allowed. This string should be a number or a number followed by an "m" to indicate millicores (1/1000). For more information, see `Specify a CPU Request and a CPU Limit <https://kubernetes.io/docs/tasks/configure-pod-container/assign-cpu-resource/#specify-a-cpu-request-and-a-cpu-limit>`_.
Returns:
Self return to allow chained setting calls.
@ -367,10 +382,10 @@ class PipelineTask:
return self
def set_gpu_limit(self, gpu: str) -> 'PipelineTask':
"""Set gpu limit (maximum) for this operator.
"""Sets GPU limit (maximum) for the task.
Args:
gpu(str): Positive number required for number of GPUs.
gpu: The maximum GPU reuqests allowed. This string should be a positive integer number of GPUs.
Returns:
Self return to allow chained setting calls.
@ -393,12 +408,10 @@ class PipelineTask:
return self
def set_memory_limit(self, memory: str) -> 'PipelineTask':
"""Set memory limit (maximum) for this operator.
"""Sets memory limit (maximum) for the task.
Args:
memory(str): a string which can be a number or a number followed by
one of "E", "Ei", "P", "Pi", "T", "Ti", "G", "Gi", "M", "Mi",
"K", "Ki".
memory: The maximum memory requests allowed. This string should be a number or a number followed by one of "E", "Ei", "P", "Pi", "T", "Ti", "G", "Gi", "M", "Mi", "K", or "Ki".
Returns:
Self return to allow chained setting calls.
@ -458,15 +471,15 @@ class PipelineTask:
"""Sets task retry parameters.
Args:
num_retries (int): Number of times to retry on failure.
backoff_duration (Optional[int]): The the number of seconds to wait before triggering a retry. Defaults to '0s' (immediate retry).
backoff_factor (Optional[float]): The exponential backoff factor applied to backoff_duration. For example, if backoff_duration="60" (60 seconds) and backoff_factor=2, the first retry will happen after 60 seconds, then after 120, 240, and so on. Defaults to 2.0.
backoff_max_duration (Optional[int]): The maximum duration during which the task will be retried. Maximum duration is 1 hour (3600s). Defaults to '3600s'.
num_retries : Number of times to retry on failure.
backoff_duration: Number of seconds to wait before triggering a retry. Defaults to ``'0s'`` (immediate retry).
backoff_factor: Exponential backoff factor applied to ``backoff_duration``. For example, if ``backoff_duration="60"`` (60 seconds) and ``backoff_factor=2``, the first retry will happen after 60 seconds, then again after 120, 240, and so on. Defaults to ``2.0``.
backoff_max_duration: Maximum duration during which the task will be retried. Maximum duration is 1 hour (3600s). Defaults to ``'3600s'``.
Returns:
Self return to allow chained setting calls.
"""
self.task_spec.retry_policy = structures.RetryPolicy(
self._task_spec.retry_policy = structures.RetryPolicy(
max_retry_count=num_retries,
backoff_duration=backoff_duration,
backoff_factor=backoff_factor,
@ -475,11 +488,11 @@ class PipelineTask:
return self
def add_node_selector_constraint(self, accelerator: str) -> 'PipelineTask':
"""Sets accelerator type requirement for this task.
"""Sets accelerator type to use when executing this task.
Args:
value(str): The name of the accelerator. Available values include
'NVIDIA_TESLA_K80', 'TPU_V3'.
value: The name of the accelerator. Available values include
``'NVIDIA_TESLA_K80'`` and ``'TPU_V3'``.
Returns:
Self return to allow chained setting calls.
@ -499,23 +512,23 @@ class PipelineTask:
return self
def set_display_name(self, name: str) -> 'PipelineTask':
"""Set display name for the pipelineTask.
"""Sets display name for the task.
Args:
name(str): display name for the task.
name: Display name.
Returns:
Self return to allow chained setting calls.
"""
self.task_spec.display_name = name
self._task_spec.display_name = name
return self
def set_env_variable(self, name: str, value: str) -> 'PipelineTask':
"""Set environment variable for the pipelineTask.
"""Sets environment variable for the task.
Args:
name: The name of the environment variable.
value: The value of the environment variable.
name: Environment variable name.
value: Environment variable value.
Returns:
Self return to allow chained setting calls.
@ -527,14 +540,23 @@ class PipelineTask:
return self
def after(self, *tasks) -> 'PipelineTask':
"""Specify explicit dependency on other tasks.
"""Specifies an explicit dependency on other tasks by requiring this
task be executed after other tasks finish completion.
Args:
name(tasks): dependent tasks.
*tasks: Tasks after which this task should be executed.
Returns:
Self return to allow chained setting calls.
Example:
::
@dsl.pipeline(name='my-pipeline')
def my_pipeline():
task1 = my_component(text='1st task')
task2 = my_component(text='2nd task').after(task1)
"""
for task in tasks:
self.task_spec.dependent_tasks.append(task.name)
self._task_spec.dependent_tasks.append(task.name)
return self

View File

@ -114,7 +114,7 @@ class PipelineTaskTest(parameterized.TestCase):
V2_YAML),
args={'input1': 'value'},
)
self.assertEqual(task.task_spec, expected_task_spec)
self.assertEqual(task._task_spec, expected_task_spec)
self.assertEqual(task.component_spec, expected_component_spec)
self.assertEqual(task.container_spec, expected_container_spec)
@ -147,7 +147,7 @@ class PipelineTaskTest(parameterized.TestCase):
args={'input1': 'value'},
)
task.set_caching_options(False)
self.assertEqual(False, task.task_spec.enable_caching)
self.assertEqual(False, task._task_spec.enable_caching)
@parameterized.parameters(
{
@ -298,7 +298,7 @@ class PipelineTaskTest(parameterized.TestCase):
args={'input1': 'value'},
)
task.set_display_name('test_name')
self.assertEqual('test_name', task.task_spec.display_name)
self.assertEqual('test_name', task._task_spec.display_name)
if __name__ == '__main__':

View File

@ -15,16 +15,18 @@
from typing import Callable
from kfp.components import base_component
from kfp import components
from kfp.components import structures
class PythonComponent(base_component.BaseComponent):
"""Component defined via Python function.
class PythonComponent(components.BaseComponent):
"""A component defined via Python function.
Attribute:
pipeline_func: The Python function that becomes the implementation of
this component.
**Note:** ``PythonComponent`` is not intended to be used to construct components directly. Use ``@kfp.dsl.component`` instead.
Args:
component_spec: Component definition.
python_func: Python function that becomes the implementation of this component.
"""
def __init__(
@ -36,4 +38,5 @@ class PythonComponent(base_component.BaseComponent):
self.python_func = python_func
def execute(self, **kwargs):
return python_func(**kwargs)
"""Executes the Python function that defines the component."""
return self.python_func(**kwargs)

View File

@ -23,9 +23,9 @@ import uuid
from google.protobuf import json_format
import kfp
from kfp import dsl
from kfp.compiler import compiler
from kfp.components import base_model
from kfp.components import pipeline_channel
from kfp.components import placeholders
from kfp.components import utils
from kfp.components import v1_components
@ -671,7 +671,7 @@ class ComponentSpec(base_model.BaseModel):
raise TypeError(
builder.make_invalid_input_type_error_msg(
arg_name, arg_type))
args_dict[arg_name] = dsl.PipelineParameterChannel(
args_dict[arg_name] = pipeline_channel.PipelineParameterChannel(
name=arg_name, channel_type=arg_type)
task = pipeline_task.PipelineTask(self, args_dict)
@ -684,7 +684,7 @@ class ComponentSpec(base_model.BaseModel):
# Fill in the default values.
args_list_with_defaults = [
dsl.PipelineParameterChannel(
pipeline_channel.PipelineParameterChannel(
name=input_name,
channel_type=input_spec.type,
value=input_spec.default,

View File

@ -19,25 +19,37 @@ from typing import Optional
@dataclasses.dataclass
class PipelineTaskFinalStatus:
"""The final status of a pipeline task.
"""A final status of a pipeline task. Annotate a component parameter with
this class to obtain a handle to a task's status (see example).
This is the Python representation of the proto: PipelineTaskFinalStatus
https://github.com/kubeflow/pipelines/blob/1c3e2768e6177d5d6e3f4b8eff8fafb9a3b76c1f/api/v2alpha1/pipeline_spec.proto#L886
This is the Python representation of the proto message `PipelineTaskFinalStatus <https://github.com/kubeflow/pipelines/blob/d8b9439ef92b88da3420df9e8c67db0f1e89d4ef/api/v2alpha1/pipeline_spec.proto#L929-L951>`_.
Attributes:
state: The final state of the task. The value could be one of
'SUCCEEDED', 'FAILED' or 'CANCELLED'.
pipeline_job_resource_name: The pipeline job resource name, in the format
of `projects/{project}/locations/{location}/pipelineJobs/{pipeline_job}`.
pipeline_task_name: The pipeline task that produces this status.
error_code: In case of error, the oogle.rpc.Code
https://github.com/googleapis/googleapis/blob/master/google/rpc/code.proto
If state is 'SUCCEEDED', this is None.
error_message: In case of error, the detailed error message.
If state is 'SUCCEEDED', this is None.
Examples:
::
@dsl.component
def task_status(user_input: str, status: PipelineTaskFinalStatus):
print('Pipeline status: ', status.state)
print('Job resource name: ', status.pipeline_job_resource_name)
print('Pipeline task name: ', status.pipeline_task_name)
print('Error code: ', status.error_code)
print('Error message: ', status.error_message)
@dsl.pipeline(name='my_pipeline')
def my_pipeline():
task = task_status(user_input='my_input')
"""
state: str
"""Final state of the task. The value could be one of ``'SUCCEEDED'``, ``'FAILED'`` or ``'CANCELLED'``."""
pipeline_job_resource_name: str
"""Pipeline job resource name, in the format of ``projects/{project}/locations/{location}/pipelineJobs/{pipeline_job}``."""
pipeline_task_name: str
"""Name of the task that produced this status."""
error_code: Optional[int]
"""The `google.rpc.Code <github.com/googleapis/googleapis/blob/master/google/rpc/code.proto>`_ in case of error. If state is ``'SUCCEEDED'``, this is ``None``."""
error_message: Optional[str]
"""In case of error, the detailed error message. If state is ``'SUCCEEDED'``, this is ``None``."""

View File

@ -94,8 +94,12 @@ class TasksGroup:
class ExitHandler(TasksGroup):
"""Represents an exit handler that is invoked upon exiting a group of
tasks.
"""A class for setting an exit handler task that is invoked upon exiting a
group of other tasks.
Args:
exit_task: The task that is invoked after exiting a group of other tasks.
name: The name of the exit handler group.
Example:
::
@ -104,9 +108,6 @@ class ExitHandler(TasksGroup):
with ExitHandler(exit_task):
task1 = MyComponent1(...)
task2 = MyComponent2(...)
Attributes:
exit_task: The exit handler task.
"""
def __init__(
@ -114,15 +115,7 @@ class ExitHandler(TasksGroup):
exit_task: pipeline_task.PipelineTask,
name: Optional[str] = None,
):
"""Initializes a Condition task group.
Args:
exit_task: An operator invoked at exiting a group of ops.
name: Optional; the name of the exit handler group.
Raises:
ValueError: Raised if the exit_task is invalid.
"""
"""Initializes a Condition task group."""
super().__init__(group_type=TasksGroupType.EXIT_HANDLER, name=name)
if exit_task.dependent_tasks:
@ -139,7 +132,12 @@ class ExitHandler(TasksGroup):
class Condition(TasksGroup):
"""Represents an condition group with a condition.
"""A class for creating conditional control flow within a pipeline
definition.
Args:
condition: The condition expression. Can be constructed using constants or outputs from upstream tasks.
name: The name of the condition group.
Example:
::
@ -147,9 +145,6 @@ class Condition(TasksGroup):
with Condition(param1=='pizza', '[param1 is pizza]'):
task1 = MyComponent1(...)
task2 = MyComponent2(...)
Attributes:
condition: The condition expression.
"""
def __init__(
@ -157,18 +152,18 @@ class Condition(TasksGroup):
condition: pipeline_channel.ConditionOperator,
name: Optional[str] = None,
):
"""Initializes a conditional task group.
Args:
condition: The condition expression.
name: Optional; the name of the condition group.
"""
"""Initializes a conditional task group."""
super().__init__(group_type=TasksGroupType.CONDITION, name=name)
self.condition = condition
class ParallelFor(TasksGroup):
"""Represents a parallel for loop over a static set of items.
"""A class for creating parallelized for loop control flow over a static
set of items within a pipeline definition.
Args:
items: The items to loop over. It can be either a constant Python list or a list output from an upstream task.
name: The name of the for loop group.
Example:
::
@ -177,14 +172,8 @@ class ParallelFor(TasksGroup):
task1 = MyComponent(..., item.a)
task2 = MyComponent(..., item.b)
In this case :code:`task1` would be executed twice, once with case
:code:`args=['echo 1']` and once with case :code:`args=['echo 2']`::
Attributes:
loop_argument: The argument for each loop iteration.
items_is_pipeline_channel: Whether the loop items is PipelineChannel
instead of raw items.
In the example, ``task1`` would be executed twice, once with case
``args=['echo 1']`` and once with case ``args=['echo 2']``.
"""
def __init__(
@ -192,13 +181,7 @@ class ParallelFor(TasksGroup):
items: Union[for_loop.ItemList, pipeline_channel.PipelineChannel],
name: Optional[str] = None,
):
"""Initializes a for loop task group.
Args:
items: The argument to loop over. It can be either a raw list or a
pipeline channel.
name: Optional; the name of the for loop group.
"""
"""Initializes a for loop task group."""
super().__init__(group_type=TasksGroupType.FOR_LOOP, name=name)
if isinstance(items, pipeline_channel.PipelineChannel):

View File

@ -23,16 +23,45 @@ _MINIO_LOCAL_MOUNT_PREFIX = '/minio/'
_S3_LOCAL_MOUNT_PREFIX = '/s3/'
class Artifact(object):
"""Generic Artifact class.
class Artifact:
"""Represents a generic machine learning artifact.
This class is meant to represent the metadata around an input or output
machine-learning Artifact. Artifacts have URIs, which can either be a location
on disk (or Cloud storage) or some other resource identifier such as
an API resource name.
This class and all artifact classes store the name, uri, and metadata for a machine learning artifact. Use this artifact type when an artifact does not fit into another more specific artifact type (e.g., ``Model``, ``Dataset``).
Artifacts carry a `metadata` field, which is a dictionary for storing
metadata related to this artifact.
Args:
name: Name of the artifact.
uri: The artifact's location on disk or cloud storage.
metadata: Arbitrary key-value pairs about the artifact.
Example:
::
from kfp import dsl
from kfp.dsl import Output, Artifact, Input
@dsl.component
def create_artifact(
data: str,
output_artifact: Output[Artifact],
):
with open(output_artifact.path, 'w') as f:
f.write(data)
@dsl.component
def use_artifact(input_artifact: Input[Artifact]):
with open(input_artifact.path) as input_file:
artifact_contents = input_file.read()
print(artifact_contents)
@dsl.pipeline(name='my-pipeline', pipeline_root='gs://my/storage')
def my_pipeline():
create_task = create_artifact(data='my data')
use_artifact(input_artifact=create_task.outputs['output_artifact'])
Note: Other artifacts are used similarly to the usage of ``Artifact`` in the example above (within ``Input[]`` and ``Output[]``).
"""
TYPE_NAME = 'system.Artifact'
VERSION = '0.0.1'
@ -40,18 +69,18 @@ class Artifact(object):
def __init__(self,
name: Optional[str] = None,
uri: Optional[str] = None,
metadata: Optional[Dict] = None):
metadata: Optional[Dict] = None) -> None:
"""Initializes the Artifact with the given name, URI and metadata."""
self.uri = uri or ''
self.name = name or ''
self.metadata = metadata or {}
@property
def path(self):
def path(self) -> str:
return self._get_path()
@path.setter
def path(self, path):
def path(self, path: str) -> None:
self._set_path(path)
def _get_path(self) -> Optional[str]:
@ -63,7 +92,7 @@ class Artifact(object):
return _S3_LOCAL_MOUNT_PREFIX + self.uri[len('s3://'):]
return None
def _set_path(self, path):
def _set_path(self, path: str) -> None:
if path.startswith(_GCS_LOCAL_MOUNT_PREFIX):
path = 'gs://' + path[len(_GCS_LOCAL_MOUNT_PREFIX):]
elif path.startswith(_MINIO_LOCAL_MOUNT_PREFIX):
@ -74,13 +103,19 @@ class Artifact(object):
class Model(Artifact):
"""An artifact representing an ML Model."""
"""An artifact representing a machine learning model.
Args:
name: Name of the model.
uri: The model's location on disk or cloud storage.
metadata: Arbitrary key-value pairs about the model.
"""
TYPE_NAME = 'system.Model'
def __init__(self,
name: Optional[str] = None,
uri: Optional[str] = None,
metadata: Optional[Dict] = None):
metadata: Optional[Dict] = None) -> None:
super().__init__(uri=uri, name=name, metadata=metadata)
@property
@ -91,47 +126,64 @@ class Model(Artifact):
return self.metadata.get('framework', '')
@framework.setter
def framework(self, framework: str):
def framework(self, framework: str) -> None:
self._set_framework(framework)
def _set_framework(self, framework: str):
def _set_framework(self, framework: str) -> None:
self.metadata['framework'] = framework
class Dataset(Artifact):
"""An artifact representing an ML Dataset."""
"""An artifact representing a machine learning dataset.
Args:
name: Name of the dataset.
uri: The dataset's location on disk or cloud storage.
metadata: Arbitrary key-value pairs about the dataset.
"""
TYPE_NAME = 'system.Dataset'
def __init__(self,
name: Optional[str] = None,
uri: Optional[str] = None,
metadata: Optional[Dict] = None):
metadata: Optional[Dict] = None) -> None:
super().__init__(uri=uri, name=name, metadata=metadata)
class Metrics(Artifact):
"""Represent a simple base Artifact type to store key-value scalar
metrics."""
"""An artifact for storing key-value scalar metrics.
Args:
name: Name of the metrics artifact.
uri: The metrics artifact's location on disk or cloud storage.
metadata: Key-value scalar metrics.
"""
TYPE_NAME = 'system.Metrics'
def __init__(self,
name: Optional[str] = None,
uri: Optional[str] = None,
metadata: Optional[Dict] = None):
metadata: Optional[Dict] = None) -> None:
super().__init__(uri=uri, name=name, metadata=metadata)
def log_metric(self, metric: str, value: float):
"""Sets a custom scalar metric.
def log_metric(self, metric: str, value: float) -> None:
"""Sets a custom scalar metric in the artifact's metadata.
Args:
metric: Metric key
value: Value of the metric.
metric: The metric key.
value: The metric value.
"""
self.metadata[metric] = value
class ClassificationMetrics(Artifact):
"""Represents Artifact class to store Classification Metrics."""
"""An artifact for storing classification metrics.
Args:
name: Name of the metrics artifact.
uri: The metrics artifact's location on disk or cloud storage.
metadata: The key-value scalar metrics.
"""
TYPE_NAME = 'system.ClassificationMetrics'
def __init__(self,
@ -140,8 +192,9 @@ class ClassificationMetrics(Artifact):
metadata: Optional[Dict] = None):
super().__init__(uri=uri, name=name, metadata=metadata)
def log_roc_data_point(self, fpr: float, tpr: float, threshold: float):
"""Logs a single data point in the ROC Curve.
def log_roc_data_point(self, fpr: float, tpr: float,
threshold: float) -> None:
"""Logs a single data point in the ROC curve to metadata.
Args:
fpr: False positive rate value of the data point.
@ -160,15 +213,16 @@ class ClassificationMetrics(Artifact):
self.metadata['confidenceMetrics'].append(roc_reading)
def log_roc_curve(self, fpr: List[float], tpr: List[float],
threshold: List[float]):
"""Logs an ROC curve.
The list length of fpr, tpr and threshold must be the same.
threshold: List[float]) -> None:
"""Logs an ROC curve to metadata.
Args:
fpr: List of false positive rate values.
tpr: List of true positive rate values.
threshold: List of threshold values.
Raises:
ValueError: If the lists ``fpr``, ``tpr`` and ``threshold`` are not the same length.
"""
if len(fpr) != len(tpr) or len(fpr) != len(threshold) or len(
tpr) != len(threshold):
@ -181,8 +235,8 @@ class ClassificationMetrics(Artifact):
self.log_roc_data_point(
fpr=fpr[i], tpr=tpr[i], threshold=threshold[i])
def set_confusion_matrix_categories(self, categories: List[str]):
"""Stores confusion matrix categories.
def set_confusion_matrix_categories(self, categories: List[str]) -> None:
"""Stores confusion matrix categories to metadata.
Args:
categories: List of strings specifying the categories.
@ -204,16 +258,17 @@ class ClassificationMetrics(Artifact):
self._confusion_matrix['rows'] = self._matrix
self.metadata['confusionMatrix'] = self._confusion_matrix
def log_confusion_matrix_row(self, row_category: str, row: List[float]):
"""Logs a confusion matrix row.
def log_confusion_matrix_row(self, row_category: str,
row: List[float]) -> None:
"""Logs a confusion matrix row to metadata.
Args:
row_category: Category to which the row belongs.
row: List of integers specifying the values for the row.
Raises:
ValueError: If row_category is not in the list of categories
set in set_categories call.
Raises:
ValueError: If ``row_category`` is not in the list of categories
set in ``set_categories`` call.
"""
if row_category not in self._categories:
raise ValueError('Invalid category: {} passed. Expected one of: {}'.\
@ -227,17 +282,17 @@ class ClassificationMetrics(Artifact):
self.metadata['confusionMatrix'] = self._confusion_matrix
def log_confusion_matrix_cell(self, row_category: str, col_category: str,
value: int):
"""Logs a cell in the confusion matrix.
value: int) -> None:
"""Logs a cell in the confusion matrix to metadata.
Args:
row_category: String representing the name of the row category.
col_category: String representing the name of the column category.
value: Int value of the cell.
value: Value of the cell.
Raises:
ValueError: If row_category or col_category is not in the list of
categories set in set_categories.
ValueError: If ``row_category`` or ``col_category`` is not in the list of
categories set in ``set_categories``.
"""
if row_category not in self._categories:
raise ValueError('Invalid category: {} passed. Expected one of: {}'.\
@ -252,15 +307,15 @@ class ClassificationMetrics(Artifact):
self.metadata['confusionMatrix'] = self._confusion_matrix
def log_confusion_matrix(self, categories: List[str],
matrix: List[List[int]]):
"""Logs a confusion matrix.
matrix: List[List[int]]) -> None:
"""Logs a confusion matrix to metadata.
Args:
categories: List of the category names.
matrix: Complete confusion matrix.
Raises:
ValueError: Length of categories does not match number of rows or columns.
ValueError: If the length of ``categories`` does not match number of rows or columns of ``matrix``.
"""
self.set_confusion_matrix_categories(categories)
@ -279,12 +334,17 @@ class ClassificationMetrics(Artifact):
class SlicedClassificationMetrics(Artifact):
"""Metrics class representing Sliced Classification Metrics.
"""An artifact for storing sliced classification metrics.
Similar to ClassificationMetrics clients using this class are
Similar to ``ClassificationMetrics``, tasks using this class are
expected to use log methods of the class to log metrics with the
difference being each log method takes a slice to associate the
ClassificationMetrics.
``ClassificationMetrics``.
Args:
name: Name of the metrics artifact.
uri: The metrics artifact's location on disk or cloud storage.
metadata: Arbitrary key-value pairs about the metrics artifact.
"""
TYPE_NAME = 'system.SlicedClassificationMetrics'
@ -292,15 +352,15 @@ class SlicedClassificationMetrics(Artifact):
def __init__(self,
name: Optional[str] = None,
uri: Optional[str] = None,
metadata: Optional[Dict] = None):
metadata: Optional[Dict] = None) -> None:
super().__init__(uri=uri, name=name, metadata=metadata)
def _upsert_classification_metrics_for_slice(self, slice: str):
def _upsert_classification_metrics_for_slice(self, slice: str) -> None:
"""Upserts the classification metrics instance for a slice."""
if slice not in self._sliced_metrics:
self._sliced_metrics[slice] = ClassificationMetrics()
def _update_metadata(self, slice: str):
def _update_metadata(self, slice: str) -> None:
"""Updates metadata to adhere to the metrics schema."""
self.metadata = {}
self.metadata['evaluationSlices'] = []
@ -314,8 +374,8 @@ class SlicedClassificationMetrics(Artifact):
self.metadata['evaluationSlices'].append(slice_metrics)
def log_roc_reading(self, slice: str, threshold: float, tpr: float,
fpr: float):
"""Logs a single data point in the ROC Curve of a slice.
fpr: float) -> None:
"""Logs a single data point in the ROC curve of a slice to metadata.
Args:
slice: String representing slice label.
@ -328,24 +388,23 @@ class SlicedClassificationMetrics(Artifact):
self._sliced_metrics[slice].log_roc_reading(threshold, tpr, fpr)
self._update_metadata(slice)
def load_roc_readings(self, slice: str, readings: List[List[float]]):
"""Supports bulk loading ROC Curve readings for a slice.
def load_roc_readings(self, slice: str,
readings: List[List[float]]) -> None:
"""Bulk loads ROC curve readings for a slice.
Args:
slice: String representing slice label.
readings: A 2-D list providing ROC Curve data points.
The expected order of the data points is: threshold,
true_positive_rate, false_positive_rate.
readings: A 2-dimensional list providing ROC curve data points. The expected order of the data points is: threshold, true positive rate, false positive rate.
"""
self._upsert_classification_metrics_for_slice(slice)
self._sliced_metrics[slice].load_roc_readings(readings)
self._update_metadata(slice)
def set_confusion_matrix_categories(self, slice: str,
categories: List[str]):
"""Stores confusion matrix categories for a slice..
categories: List[str]) -> None:
"""Logs confusion matrix categories for a slice to metadata.
Categories are stored in the internal metrics_utils.ConfusionMatrix
Categories are stored in the internal ``metrics_utils.ConfusionMatrix``
instance of the slice.
Args:
@ -357,10 +416,10 @@ class SlicedClassificationMetrics(Artifact):
self._update_metadata(slice)
def log_confusion_matrix_row(self, slice: str, row_category: str,
row: List[int]):
"""Logs a confusion matrix row for a slice.
row: List[int]) -> None:
"""Logs a confusion matrix row for a slice to metadata.
Row is updated on the internal metrics_utils.ConfusionMatrix
Row is updated on the internal ``metrics_utils.ConfusionMatrix``
instance of the slice.
Args:
@ -373,17 +432,17 @@ class SlicedClassificationMetrics(Artifact):
self._update_metadata(slice)
def log_confusion_matrix_cell(self, slice: str, row_category: str,
col_category: str, value: int):
"""Logs a confusion matrix cell for a slice..
col_category: str, value: int) -> None:
"""Logs a confusion matrix cell for a slice to metadata.
Cell is updated on the internal metrics_utils.ConfusionMatrix
Cell is updated on the internal ``metrics_utils.ConfusionMatrix``
instance of the slice.
Args:
slice: String representing slice label.
row_category: String representing the name of the row category.
col_category: String representing the name of the column category.
value: Int value of the cell.
value: Value of the cell.
"""
self._upsert_classification_metrics_for_slice(slice)
self._sliced_metrics[slice].log_confusion_matrix_cell(
@ -391,8 +450,8 @@ class SlicedClassificationMetrics(Artifact):
self._update_metadata(slice)
def load_confusion_matrix(self, slice: str, categories: List[str],
matrix: List[List[int]]):
"""Supports bulk loading the whole confusion matrix for a slice.
matrix: List[List[int]]) -> None:
"""Bulk loads the whole confusion matrix for a slice.
Args:
slice: String representing slice label.
@ -406,18 +465,30 @@ class SlicedClassificationMetrics(Artifact):
class HTML(Artifact):
"""An artifact representing an HTML file."""
"""An artifact representing an HTML file.
Args:
name: Name of the HTML file.
uri: The HTML file's location on disk or cloud storage.
metadata: Arbitrary key-value pairs about the HTML file.
"""
TYPE_NAME = 'system.HTML'
def __init__(self,
name: Optional[str] = None,
uri: Optional[str] = None,
metadata: Optional[Dict] = None):
metadata: Optional[Dict] = None) -> None:
super().__init__(uri=uri, name=name, metadata=metadata)
class Markdown(Artifact):
"""An artifact representing an Markdown file."""
"""An artifact representing a markdown file.
Args:
name: Name of the markdown file.
uri: The markdown file's location on disk or cloud storage.
metadata: Arbitrary key-value pairs about the markdown file.
"""
TYPE_NAME = 'system.Markdown'
def __init__(self,

View File

@ -28,7 +28,38 @@ T = TypeVar('T')
class OutputPath:
"""Annotation for indicating a variable is a path to an output."""
"""Type annotation used in component definitions for indicating a parameter
is a path to an output. The path parameter typed with this annotation can
be treated as a locally accessible filepath within the component body.
The argument typed with this annotation is provided at runtime by the executing backend and does not need to be passed as an input by the pipeline author (see example).
Args:
type: The type of the value written to the output path.
Example:
::
@dsl.component
def create_parameter(
message: str,
output_parameter_path: OutputPath(str),
):
with open(output_parameter_path, 'w') as f:
f.write(message)
@dsl.component
def consume_parameter(message: str):
print(message)
@dsl.pipeline(name='my-pipeline', pipeline_root='gs://my-bucket')
def my_pipeline(message: str = 'default message'):
create_param_op = create_parameter(message=message)
consume_parameter(message=create_param_op.outputs['output_parameter_path'])
"""
def __init__(self, type=None):
self.type = type
@ -40,7 +71,30 @@ class OutputPath:
class InputPath:
"""Annotation for indicating a variable is a path to an input."""
"""Type annotation used in component definitions for indicating a parameter
is a path to an input.
Example:
::
@dsl.component
def create_dataset(dataset_path: OutputPath('Dataset'),):
import json
dataset = {'my_dataset': [[1, 2, 3], [4, 5, 6]]}
with open(dataset_path, 'w') as f:
json.dump(dataset, f)
@dsl.component
def consume_dataset(dataset: InputPath('Dataset')):
print(dataset)
@dsl.pipeline(name='my-pipeline', pipeline_root='gs://my-bucket')
def my_pipeline():
create_dataset_op = create_dataset()
consume_dataset(dataset=create_dataset_op.outputs['dataset_path'])
"""
def __init__(self, type=None):
self.type = type
@ -59,11 +113,57 @@ class OutputAnnotation():
"""Marker type for output artifacts."""
# Input represents an Input artifact of type T.
Input = Annotated[T, InputAnnotation]
Input.__doc__ = """Type generic used to represent an input artifact of type ``T``, where ``T`` is an artifact class.
Use ``Input[Artifact]`` or ``Output[Artifact]`` to indicate whether the enclosed artifact is a component input or output.
Args:
T: The type of the input artifact.
Example:
::
@dsl.component
def artifact_producer(model: Output[Artifact]):
with open(model.path, 'w') as f:
f.write('my model')
@dsl.component
def artifact_consumer(model: Input[Artifact]):
print(model)
@dsl.pipeline(name='my-pipeline')
def my_pipeline():
producer_task = artifact_producer()
artifact_consumer(model=producer_task.output)
"""
# Output represents an Output artifact of type T.
Output = Annotated[T, OutputAnnotation]
Output.__doc__ = """A type generic used to represent an output artifact of type ``T``, where ``T`` is an artifact class. The argument typed with this annotation is provided at runtime by the executing backend and does not need to be passed as an input by the pipeline author (see example).
Use ``Input[Artifact]`` or ``Output[Artifact]`` to indicate whether the enclosed artifact is a component input or output.
Args:
T: The type of the output artifact.
Example:
::
@dsl.component
def artifact_producer(model: Output[Artifact]):
with open(model.path, 'w') as f:
f.write('my model')
@dsl.component
def artifact_consumer(model: Input[Artifact]):
print(model)
@dsl.pipeline(name='my-pipeline')
def my_pipeline():
producer_task = artifact_producer()
artifact_consumer(model=producer_task.output)
"""
def is_artifact_annotation(typ) -> bool:

View File

@ -15,26 +15,33 @@
from typing import Optional, Tuple
from kfp.components import base_component
from kfp import components
from kfp.components import structures
import requests
class YamlComponent(base_component.BaseComponent):
"""A component loaded from YAML."""
class YamlComponent(components.BaseComponent):
"""A component loaded from a YAML file.
**Note:** ``YamlComponent`` is not intended to be used to construct components directly. Use ``kfp.components.load_component_from_*()`` instead.
Args:
component_spec: Component definition.
"""
def execute(self, *args, **kwargs):
pass
"""Not implemented."""
raise NotImplementedError
def load_component_from_text(text: str) -> YamlComponent:
"""Loads a component from text.
Args:
text (str): The component YAML text.
text (str): Component YAML text.
Returns:
YamlComponent: In-memory representation of a component loaded from YAML.
Component loaded from YAML.
"""
return YamlComponent(
structures.ComponentSpec.load_from_component_yaml(text))
@ -44,10 +51,17 @@ def load_component_from_file(file_path: str) -> YamlComponent:
"""Loads a component from a file.
Args:
file_path (str): The file path to a YAML component.
file_path (str): Filepath to a YAML component.
Returns:
YamlComponent: In-memory representation of a component loaded from YAML.
Component loaded from YAML.
Example:
::
from kfp import components
components.load_component_from_file('~/path/to/pipeline.yaml')
"""
with open(file_path, 'r') as component_stream:
return load_component_from_text(component_stream.read())
@ -56,14 +70,23 @@ def load_component_from_file(file_path: str) -> YamlComponent:
def load_component_from_url(url: str,
auth: Optional[Tuple[str,
str]] = None) -> YamlComponent:
"""Loads a component from a url.
"""Loads a component from a URL.
Args:
file_path (str): The url to a YAML component.
auth (Tuple[str, str], optional): The a ('username', 'password') tuple of authentication credentials necessary for url access. See Requests Authorization for more information: https://requests.readthedocs.io/en/latest/user/authentication/#authentication
url (str): URL to a YAML component.
auth (Tuple[str, str], optional): A ``('<username>', '<password>')`` tuple of authentication credentials necessary for URL access. See `Requests Authorization <https://requests.readthedocs.io/en/latest/user/authentication/#authentication>`_ for more information.
Returns:
YamlComponent: In-memory representation of a component loaded from YAML.
Component loaded from YAML.
Example:
::
from kfp import components
components.load_component_from_url('https://raw.githubusercontent.com/kubeflow/pipelines/7b49eadf621a9054e1f1315c86f95fb8cf8c17c3/sdk/python/kfp/compiler/test_data/components/identity.yaml')
components.load_component_from_url('gs://path/to/pipeline.yaml')
"""
if url is None:
raise ValueError('url must be a string.')

View File

@ -1,3 +1,5 @@
"""The `kfp.dsl` module contains domain-specific language objects used to
compose pipelines."""
# Copyright 2020 The Kubeflow Authors
#
# Licensed under the Apache License, Version 2.0 (the "License");
@ -15,9 +17,6 @@
__all__ = [
'component',
'importer',
'PipelineArtifactChannel',
'PipelineChannel',
'PipelineParameterChannel',
'pipeline',
'PipelineTask',
'PipelineTaskFinalStatus',
@ -45,9 +44,6 @@ __all__ = [
from kfp.components.component_decorator import component
from kfp.components.importer_node import importer
from kfp.components.pipeline_channel import PipelineArtifactChannel
from kfp.components.pipeline_channel import PipelineChannel
from kfp.components.pipeline_channel import PipelineParameterChannel
from kfp.components.pipeline_context import pipeline
from kfp.components.pipeline_task import PipelineTask
from kfp.components.task_final_status import PipelineTaskFinalStatus
@ -68,7 +64,71 @@ from kfp.components.types.type_annotations import Output
from kfp.components.types.type_annotations import OutputPath
PIPELINE_JOB_NAME_PLACEHOLDER = '{{$.pipeline_job_name}}'
"""A placeholder used to obtain a pipeline job name within a task at pipeline runtime.
Example:
::
@dsl.pipeline(name='my-pipeline')
def my_pipeline():
print_op(
msg='Job name:',
value=dsl.PIPELINE_JOB_NAME_PLACEHOLDER,
)
"""
PIPELINE_JOB_RESOURCE_NAME_PLACEHOLDER = '{{$.pipeline_job_resource_name}}'
"""A placeholder used to obtain a pipeline job resource name within a task at pipeline runtime.
Example:
::
@dsl.pipeline(name='my-pipeline')
def my_pipeline():
print_op(
msg='Job resource name:',
value=dsl.PIPELINE_JOB_RESOURCE_NAME_PLACEHOLDER,
)
"""
PIPELINE_JOB_ID_PLACEHOLDER = '{{$.pipeline_job_uuid}}'
"""A placeholder used to obtain a pipeline job ID within a task at pipeline runtime.
Example:
::
@dsl.pipeline(name='my-pipeline')
def my_pipeline():
print_op(
msg='Job ID:',
value=dsl.PIPELINE_JOB_ID_PLACEHOLDER,
)
"""
PIPELINE_TASK_NAME_PLACEHOLDER = '{{$.pipeline_task_name}}'
"""A placeholder used to obtain a task name within a task at pipeline runtime.
Example:
::
@dsl.pipeline(name='my-pipeline')
def my_pipeline():
print_op(
msg='Task name:',
value=dsl.PIPELINE_TASK_NAME_PLACEHOLDER,
)
"""
PIPELINE_TASK_ID_PLACEHOLDER = '{{$.pipeline_task_uuid}}'
"""A placeholder used to obtain a task ID within a task at pipeline runtime.
Example:
::
@dsl.pipeline(name='my-pipeline')
def my_pipeline():
print_op(
msg='Task ID:',
value=dsl.PIPELINE_TASK_ID_PLACEHOLDER,
)
"""

View File

@ -1,3 +1,5 @@
"""The `kfp.registry` module contains objects for communicating with registry
client hosts."""
# Copyright 2022 The Kubeflow Authors
#
# Licensed under the Apache License, Version 2.0 (the "License");

View File

@ -59,9 +59,20 @@ class _SafeDict(dict):
class ApiAuth(requests.auth.AuthBase):
"""Class for authentication using API token."""
"""Class for registry authentication using an API token.
Args:
token: The API token.
Example:
::
client = RegistryClient(
host='https://us-central1-kfp.pkg.dev/proj/repo', auth=ApiAuth('my_token'))
"""
def __init__(self, token: str) -> None:
"""Initializes the ApiAuth object."""
self._token = token
def __call__(self,
@ -71,7 +82,14 @@ class ApiAuth(requests.auth.AuthBase):
class RegistryClient:
"""Registry Client class for communicating with registry hosts."""
"""Class for communicating with registry hosts.
Args:
host: The address of the registry host. The host needs to be specified here or in the config file.
auth: Authentication using ``requests.auth.AuthBase`` or ``google.auth.credentials.Credentials``.
config_file: The location of the local config file. If not specified, defaults to ``'~/.config/kfp/context.json'`` (if it exists).
auth_file: The location of the local config file that contains the authentication token. If not specified, defaults to ``'~/.config/kfp/registry_credentials.json'`` (if it exists).
"""
def __init__(self,
host: Optional[str],
@ -79,22 +97,11 @@ class RegistryClient:
credentials.Credentials]] = None,
config_file: Optional[str] = None,
auth_file: Optional[str] = None) -> None:
"""Initializes the RegistryClient.
Args:
host: The address of the registry host. The host needs to be specified here
or in the config file.
auth: Optional. Authentication using python requests or google.auth.credentials.
config_file: Optional. The location of the local config file. If not specified,
defaults to ~/.config/kfp/context.json (if it exists).
auth_file: Optional. The location of the local config file that contains the
authentication token. If not specified, defaults to
~/.config/kfp/registry_credentials.json (if it exists).
"""
"""Initializes the RegistryClient."""
self._host = ''
self._known_host_key = ''
self._config = self.load_config(host, config_file)
self._auth = self.load_auth(auth, auth_file)
self._config = self._load_config(host, config_file)
self._auth = self._load_auth(auth, auth_file)
def _request(self,
request_url: str,
@ -161,7 +168,7 @@ class RegistryClient:
if tag.startswith(_VERSION_PREFIX):
raise ValueError('Tag should not start with \"sha256:\".')
def load_auth(
def _load_auth(
self,
auth: Optional[Union[requests.auth.AuthBase,
credentials.Credentials]] = None,
@ -170,10 +177,10 @@ class RegistryClient:
"""Loads the credentials for authentication.
Args:
auth: Optional. Authentication using python requests or google.auth credentials.
auth_file: Optional. The location of the local config file that contains the
auth: Authentication using ``requests.auth.AuthBase`` or ``google.auth.credentials.Credentials``.
auth_file: The location of the local config file that contains the
authentication token. If not specified, defaults to
~/.config/kfp/registry_credentials.json (if it exists).
``'~/.config/kfp/registry_credentials.json'`` (if it exists).
Returns:
The loaded authentication token.
@ -198,15 +205,14 @@ class RegistryClient:
return ApiAuth(auth_token)
return None
def load_config(self, host: Optional[str],
config_file: Optional[str]) -> dict:
def _load_config(self, host: Optional[str],
config_file: Optional[str]) -> dict:
"""Loads the config.
Args:
host: The address of the registry host. The host needs to be specified here
or in the config file.
config_file: Optional. The location of the local config file. If not specified,
defaults to ~/.config/kfp/context.json (if it exists).
host: The address of the registry host.
config_file: The location of the local config file. If not specified,
defaults to ``'~/.config/kfp/context.json'`` (if it exists).
Returns:
The loaded config.
@ -276,7 +282,7 @@ class RegistryClient:
Args:
config_file: The location of the config file.
config: Optional. An existing config to set as the default config.
config: An existing config to set as the default config.
Returns:
The loaded config.
@ -323,7 +329,7 @@ class RegistryClient:
extra_headers: Any extra headers required.
Returns:
A tuple representing the package name and the version
A tuple of the package name and the version.
"""
url = self._config['upload_url']
self._refresh_creds()
@ -384,7 +390,7 @@ class RegistryClient:
version: Optional[str] = None,
tag: Optional[str] = None,
file_name: Optional[str] = None) -> str:
"""Downloads a pipeline - either version or tag must be specified.
"""Downloads a pipeline. Either version or tag must be specified.
Args:
package_name: Name of the package.
@ -517,7 +523,7 @@ class RegistryClient:
tag: Tag to be attached to the package version.
Returns:
The metadata for the created tag.
Metadata for the created tag.
"""
self._validate_version(version)
self._validate_tag(tag)