chore(sdk): merge v2 dsl code back to v1. (#5227)

* Merge v2 dsl back to v1 dsl

* fix tests

* fix tests and always attach container_spec to container_op

* address review comments

* fix a bad indentation
This commit is contained in:
Chen Sun 2021-03-04 13:48:48 -08:00 committed by GitHub
parent 430bd6b096
commit d7eb168347
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
22 changed files with 512 additions and 1025 deletions

View File

@ -15,7 +15,7 @@
from ._pipeline_param import PipelineParam, match_serialized_pipelineparam
from ._pipeline import Pipeline, pipeline, get_pipeline_conf, PipelineConf
from ._container_op import ContainerOp, InputArgumentPath, UserContainer, Sidecar
from ._container_op import BaseOp, ContainerOp, InputArgumentPath, UserContainer, Sidecar
from ._resource_op import ResourceOp
from ._volume_op import (
VolumeOp, VOLUME_MODE_RWO, VOLUME_MODE_RWM, VOLUME_MODE_ROM
@ -28,4 +28,4 @@ from ._component import python_component, graph_component, component
EXECUTION_ID_PLACEHOLDER = '{{workflow.uid}}-{{pod.name}}'
RUN_ID_PLACEHOLDER = '{{workflow.uid}}'
ROOT_PARAMETER_NAME = 'pipeline-output-directory'
ROOT_PARAMETER_NAME = 'pipeline-output-directory'

View File

@ -14,71 +14,89 @@
import collections
import copy
import inspect
from typing import Any, Mapping, Optional
from ..components.structures import ComponentSpec, ComponentReference
from ..components._components import _default_component_name, _resolve_command_line_and_paths
from ..components._naming import _sanitize_python_function_name, generate_unique_name_conversion_table
from .. import dsl
from kfp.components import _structures
from kfp.components import _components
from kfp.components import _naming
from kfp import dsl
from kfp.dsl import _pipeline_param
from kfp.dsl import types
from kfp.dsl import component_spec as dsl_component_spec
from kfp.dsl import _container_op
from kfp.dsl import dsl_utils
from kfp.dsl import importer_node
from kfp.dsl import type_utils
from kfp.pipeline_spec import pipeline_spec_pb2
def _create_container_op_from_component_and_arguments(
component_spec: ComponentSpec,
component_spec: _structures.ComponentSpec,
arguments: Mapping[str, Any],
component_ref: Optional[ComponentReference] = None,
) -> 'dsl.ContainerOp':
component_ref: Optional[_structures.ComponentReference] = None,
) -> _container_op.ContainerOp:
"""Instantiates ContainerOp object.
Args:
component_spec: The component spec object.
arguments: The dictionary of component arguments.
component_ref: (only for v1) The component references.
Returns:
A ContainerOp instance.
"""
# Check types of the reference arguments and serialize PipelineParams
original_arguments = arguments
arguments = arguments.copy()
for input_name, argument_value in arguments.items():
if isinstance(argument_value, dsl.PipelineParam):
if isinstance(argument_value, _pipeline_param.PipelineParam):
input_type = component_spec._inputs_dict[input_name].type
reference_type = argument_value.param_type
dsl.types.verify_type_compatibility(
types.verify_type_compatibility(
reference_type, input_type,
'Incompatible argument passed to the input "{}" of component "{}": '
.format(input_name, component_spec.name))
arguments[input_name] = str(argument_value)
if isinstance(argument_value, dsl.ContainerOp):
if isinstance(argument_value, _container_op.ContainerOp):
raise TypeError(
'ContainerOp object was passed to component as an input argument. '
'Pass a single output instead.')
resolved_cmd = _resolve_command_line_and_paths(
resolved_cmd = _components._resolve_command_line_and_paths(
component_spec=component_spec,
arguments=arguments,
)
container_spec = component_spec.implementation.container
old_warn_value = dsl.ContainerOp._DISABLE_REUSABLE_COMPONENT_WARNING
dsl.ContainerOp._DISABLE_REUSABLE_COMPONENT_WARNING = True
# Merge output_paths and output_uris to get the file_outputs.
file_outputs = collections.OrderedDict(resolved_cmd.output_paths or {})
for name, output_uri in resolved_cmd.output_uris.items():
file_outputs[name] = output_uri
old_warn_value = _container_op.ContainerOp._DISABLE_REUSABLE_COMPONENT_WARNING
_container_op.ContainerOp._DISABLE_REUSABLE_COMPONENT_WARNING = True
output_paths_and_uris = collections.OrderedDict(resolved_cmd.output_paths or
{})
output_paths_and_uris.update(resolved_cmd.output_uris)
input_paths_and_uris = collections.OrderedDict(resolved_cmd.input_paths or {})
input_paths_and_uris.update(resolved_cmd.input_uris)
artifact_argument_paths = [
dsl.InputArgumentPath(
argument=arguments[input_name],
input=input_name,
path=path,
) for input_name, path in resolved_cmd.input_paths.items()
path=path_or_uri,
) for input_name, path_or_uri in input_paths_and_uris.items()
]
for input_name, input_uri in resolved_cmd.input_uris.items():
artifact_argument_paths.append(
dsl.InputArgumentPath(
argument=arguments[input_name], input=input_name, path=input_uri))
task = dsl.ContainerOp(
name=component_spec.name or _default_component_name,
task = _container_op.ContainerOp(
name=component_spec.name or _components._default_component_name,
image=container_spec.image,
command=resolved_cmd.command,
arguments=resolved_cmd.args,
file_outputs=file_outputs,
file_outputs=output_paths_and_uris,
artifact_argument_paths=artifact_argument_paths,
)
dsl.ContainerOp._DISABLE_REUSABLE_COMPONENT_WARNING = old_warn_value
_container_op.ContainerOp._DISABLE_REUSABLE_COMPONENT_WARNING = old_warn_value
component_meta = copy.copy(component_spec)
task._set_metadata(component_meta)
@ -99,8 +117,8 @@ def _create_container_op_from_component_and_arguments(
output_names = [
output_spec.name for output_spec in component_spec.outputs or []
] # Stabilizing the ordering
output_name_to_python = generate_unique_name_conversion_table(
output_names, _sanitize_python_function_name)
output_name_to_python = _naming.generate_unique_name_conversion_table(
output_names, _naming._sanitize_python_function_name)
for output_name in output_names:
pythonic_output_name = output_name_to_python[output_name]
# Note: Some component outputs are currently missing from task.outputs
@ -124,4 +142,251 @@ def _create_container_op_from_component_and_arguments(
if annotations.get('volatile_component', 'false') == 'true':
task.execution_options.caching_strategy.max_cache_staleness = 'P0D'
_attach_v2_specs(task, component_spec, original_arguments)
return task
def _attach_v2_specs(
task: _container_op.ContainerOp,
component_spec: _structures.ComponentSpec,
arguments: Mapping[str, Any],
) -> None:
"""Attaches v2 specs to a ContainerOp object.
Args:
task: The ContainerOp object to attach IR specs.
component_spec: The component spec object.
arguments: The dictionary of component arguments.
"""
# Attach v2_specs to the ContainerOp object regardless whether the pipeline is
# being compiled to v1 (Argo yaml) or v2 (IR json).
# However, there're different behaviors for the two cases. Namely, resolved
# commands and arguments, error handling, etc.
# Regarding the difference in error handling, v2 has a stricter requirement on
# input type annotation. For instance, an input without any type annotation is
# viewed as an artifact, and if it's paired with InputValuePlaceholder, an
# error will be thrown at compile time. However, we cannot raise such an error
# in v1, as it wouldn't break existing pipelines.
is_compiling_for_v2 = False
for frame in inspect.stack():
if '_create_pipeline_v2' in frame:
is_compiling_for_v2 = True
break
def _resolve_commands_and_args_v2(
component_spec: _structures.ComponentSpec,
arguments: Mapping[str, Any],
) -> _components._ResolvedCommandLineAndPaths:
"""Resolves the command line argument placeholders for v2 (IR).
Args:
component_spec: The component spec object.
arguments: The dictionary of component arguments.
Returns:
A named tuple: _components._ResolvedCommandLineAndPaths.
"""
inputs_dict = {
input_spec.name: input_spec
for input_spec in component_spec.inputs or []
}
outputs_dict = {
output_spec.name: output_spec
for output_spec in component_spec.outputs or []
}
def _input_artifact_uri_placeholder(input_key: str) -> str:
if is_compiling_for_v2 and type_utils.is_parameter_type(
inputs_dict[input_key].type):
raise TypeError('Input "{}" with type "{}" cannot be paired with '
'InputUriPlaceholder.'.format(
input_key, inputs_dict[input_key].type))
else:
return "{{{{$.inputs.artifacts['{}'].uri}}}}".format(input_key)
def _input_artifact_path_placeholder(input_key: str) -> str:
if is_compiling_for_v2 and type_utils.is_parameter_type(
inputs_dict[input_key].type):
raise TypeError('Input "{}" with type "{}" cannot be paired with '
'InputPathPlaceholder.'.format(
input_key, inputs_dict[input_key].type))
elif is_compiling_for_v2 and input_key in importer_specs:
raise TypeError(
'Input "{}" with type "{}" is not connected to any upstream output. '
'However it is used with InputPathPlaceholder. '
'If you want to import an existing artifact using a system-connected'
' importer node, use InputUriPlaceholder instead. '
'Or if you just want to pass a string parameter, use string type and'
' InputValuePlaceholder instead.'.format(
input_key, inputs_dict[input_key].type))
else:
return "{{{{$.inputs.artifacts['{}'].path}}}}".format(input_key)
def _input_parameter_placeholder(input_key: str) -> str:
if is_compiling_for_v2 and not type_utils.is_parameter_type(
inputs_dict[input_key].type):
raise TypeError('Input "{}" with type "{}" cannot be paired with '
'InputValuePlaceholder.'.format(
input_key, inputs_dict[input_key].type))
else:
return "{{{{$.inputs.parameters['{}']}}}}".format(input_key)
def _output_artifact_uri_placeholder(output_key: str) -> str:
if is_compiling_for_v2 and type_utils.is_parameter_type(
outputs_dict[output_key].type):
raise TypeError('Output "{}" with type "{}" cannot be paired with '
'OutputUriPlaceholder.'.format(
output_key, outputs_dict[output_key].type))
else:
return "{{{{$.outputs.artifacts['{}'].uri}}}}".format(output_key)
def _output_artifact_path_placeholder(output_key: str) -> str:
return "{{{{$.outputs.artifacts['{}'].path}}}}".format(output_key)
def _output_parameter_path_placeholder(output_key: str) -> str:
return "{{{{$.outputs.parameters['{}'].output_file}}}}".format(output_key)
def _resolve_output_path_placeholder(output_key: str) -> str:
if type_utils.is_parameter_type(outputs_dict[output_key].type):
return _output_parameter_path_placeholder(output_key)
else:
return _output_artifact_path_placeholder(output_key)
resolved_cmd = _components._resolve_command_line_and_paths(
component_spec=component_spec,
arguments=arguments,
input_value_generator=_input_parameter_placeholder,
input_uri_generator=_input_artifact_uri_placeholder,
output_uri_generator=_output_artifact_uri_placeholder,
input_path_generator=_input_artifact_path_placeholder,
output_path_generator=_resolve_output_path_placeholder,
)
return resolved_cmd
pipeline_task_spec = pipeline_spec_pb2.PipelineTaskSpec()
# Keep track of auto-injected importer spec.
importer_specs = {}
# Check types of the reference arguments and serialize PipelineParams
original_arguments = arguments
arguments = arguments.copy()
# Preserver input params for ContainerOp.inputs
input_params = list(
set([
param for param in arguments.values()
if isinstance(param, _pipeline_param.PipelineParam)
]))
for input_name, argument_value in arguments.items():
if isinstance(argument_value, _pipeline_param.PipelineParam):
input_type = component_spec._inputs_dict[input_name].type
reference_type = argument_value.param_type
types.verify_type_compatibility(
reference_type, input_type,
'Incompatible argument passed to the input "{}" of component "{}": '
.format(input_name, component_spec.name))
arguments[input_name] = str(argument_value)
if type_utils.is_parameter_type(input_type):
if argument_value.op_name:
pipeline_task_spec.inputs.parameters[
input_name].task_output_parameter.producer_task = (
dsl_utils.sanitize_task_name(argument_value.op_name))
pipeline_task_spec.inputs.parameters[
input_name].task_output_parameter.output_parameter_key = (
argument_value.name)
else:
pipeline_task_spec.inputs.parameters[
input_name].component_input_parameter = argument_value.name
else:
if argument_value.op_name:
pipeline_task_spec.inputs.artifacts[
input_name].task_output_artifact.producer_task = (
dsl_utils.sanitize_task_name(argument_value.op_name))
pipeline_task_spec.inputs.artifacts[
input_name].task_output_artifact.output_artifact_key = (
argument_value.name)
else:
# argument_value.op_name could be none, in which case an importer node
# will be inserted later.
pipeline_task_spec.inputs.artifacts[
input_name].task_output_artifact.producer_task = ''
type_schema = type_utils.get_input_artifact_type_schema(
input_name, component_spec.inputs)
importer_specs[input_name] = importer_node.build_importer_spec(
input_type_schema=type_schema,
pipeline_param_name=argument_value.name)
elif isinstance(argument_value, str):
pipeline_params = _pipeline_param.extract_pipelineparams_from_any(
argument_value)
if pipeline_params and is_compiling_for_v2:
# argument_value contains PipelineParam placeholders.
raise NotImplementedError(
'Currently, a component input can only accept either a constant '
'value or a reference to another pipeline parameter. It cannot be a '
'combination of both. Got: {} for input {}'.format(
argument_value, input_name))
input_type = component_spec._inputs_dict[input_name].type
if type_utils.is_parameter_type(input_type):
pipeline_task_spec.inputs.parameters[
input_name].runtime_value.constant_value.string_value = (
argument_value)
else:
# An importer node with constant value artifact_uri will be inserted.
pipeline_task_spec.inputs.artifacts[
input_name].task_output_artifact.producer_task = ''
type_schema = type_utils.get_input_artifact_type_schema(
input_name, component_spec.inputs)
importer_specs[input_name] = importer_node.build_importer_spec(
input_type_schema=type_schema, constant_value=argument_value)
elif isinstance(argument_value, int):
pipeline_task_spec.inputs.parameters[
input_name].runtime_value.constant_value.int_value = argument_value
elif isinstance(argument_value, float):
pipeline_task_spec.inputs.parameters[
input_name].runtime_value.constant_value.double_value = argument_value
elif isinstance(argument_value, _container_op.ContainerOp):
raise TypeError(
'ContainerOp object {} was passed to component as an input argument. '
'Pass a single output instead.'.format(input_name))
else:
if is_compiling_for_v2:
raise NotImplementedError(
'Input argument supports only the following types: PipelineParam'
', str, int, float. Got: "{}".'.format(argument_value))
if not component_spec.name:
component_spec.name = _components._default_component_name
# task.name is unique at this point.
pipeline_task_spec.task_info.name = (dsl_utils.sanitize_task_name(task.name))
pipeline_task_spec.component_ref.name = (
dsl_utils.sanitize_component_name(component_spec.name))
task.task_spec = pipeline_task_spec
task.importer_specs = importer_specs
task.component_spec = dsl_component_spec.build_component_spec_from_structure(
component_spec)
resolved_cmd = _resolve_commands_and_args_v2(
component_spec=component_spec, arguments=original_arguments)
task.container_spec = (
pipeline_spec_pb2.PipelineDeploymentConfig.PipelineContainerSpec(
image=component_spec.implementation.container.image,
command=resolved_cmd.command,
args=resolved_cmd.args))
# Override command and arguments if compiling to v2.
if is_compiling_for_v2:
task.command = resolved_cmd.command
task.arguments = resolved_cmd.args
# limit this to v2 compiling only to avoid possible behavior change in v1.
task.inputs = input_params

View File

@ -12,6 +12,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.
import inspect
import re
import warnings
from typing import Any, Dict, List, TypeVar, Union, Callable, Optional, Sequence
@ -23,8 +24,9 @@ from kubernetes.client.models import (V1Container, V1EnvVar, V1EnvFromSource,
V1VolumeMount, V1ContainerPort,
V1Lifecycle, V1Volume)
from . import _pipeline_param
from ..components.structures import ComponentSpec, ExecutionOptionsSpec, CachingStrategySpec
from kfp.components import _structures
from kfp.dsl import _pipeline_param
from kfp.pipeline_spec import pipeline_spec_pb2
# generics
T = TypeVar('T')
@ -37,6 +39,26 @@ ALLOWED_RETRY_POLICIES = (
'OnFailure',
)
# Shorthand for PipelineContainerSpec
_PipelineContainerSpec = pipeline_spec_pb2.PipelineDeploymentConfig.PipelineContainerSpec
# Unit constants for k8s size string.
_E = 10**18 # Exa
_EI = 1 << 60 # Exa: power-of-two approximate
_P = 10**15 # Peta
_PI = 1 << 50 # Peta: power-of-two approximate
# noinspection PyShadowingBuiltins
_T = 10**12 # Tera
_TI = 1 << 40 # Tera: power-of-two approximate
_G = 10**9 # Giga
_GI = 1 << 30 # Giga: power-of-two approximate
_M = 10**6 # Mega
_MI = 1 << 20 # Mega: power-of-two approximate
_K = 10**3 # Kilo
_KI = 1 << 10 # Kilo: power-of-two approximate
_GKE_ACCELERATOR_LABEL = 'cloud.google.com/gke-accelerator'
# util functions
def deprecation_warning(func: Callable, op_name: str,
@ -174,6 +196,9 @@ class Container(V1Container):
if not kwargs.get('name'):
kwargs['name'] = ''
# v2 container_spec
self._container_spec = None
super(Container, self).__init__(
image=image, command=command, args=args, **kwargs)
@ -273,6 +298,8 @@ class Container(V1Container):
"E", "P", "T", "G", "M", "K".
"""
self._validate_size_string(memory)
if self._container_spec:
self._container_spec.resources.memory_limit = _get_resource_number(memory)
return self.add_resource_limit('memory', memory)
def set_ephemeral_storage_request(self, size) -> 'Container':
@ -313,8 +340,9 @@ class Container(V1Container):
cpu: A string which can be a number or a number followed by "m", which
means 1/1000.
"""
self._validate_cpu_string(cpu)
if self._container_spec:
self._container_spec.resources.cpu_limit = _get_cpu_number(cpu)
return self.add_resource_limit('cpu', cpu)
def set_gpu_limit(self, gpu, vendor='nvidia') -> 'Container':
@ -328,11 +356,15 @@ class Container(V1Container):
Args:
gpu: A string which must be a positive number.
vendor: Optional. A string which is the vendor of the requested gpu.
The supported values
are: 'nvidia' (default), and 'amd'.
The supported values are: 'nvidia' (default), and 'amd'. The value is
ignored in v2.
"""
self._validate_positive_number(gpu, 'gpu')
if self._container_spec:
# For backforward compatibiliy, allow `gpu` to be a string.
self._container_spec.resources.accelerator.count = int(gpu)
if vendor != 'nvidia' and vendor != 'amd':
raise ValueError('vendor can only be nvidia or amd.')
@ -916,8 +948,7 @@ class BaseOp(object):
the backoff strategy.
"""
if policy is not None and policy not in ALLOWED_RETRY_POLICIES:
raise ValueError('policy must be one of: %r'
% (ALLOWED_RETRY_POLICIES,))
raise ValueError('policy must be one of: %r' % (ALLOWED_RETRY_POLICIES,))
self.num_retries = num_retries
self.retry_policy = policy
@ -1071,8 +1102,8 @@ class ContainerOp(BaseOp):
sidecars=sidecars,
is_exit_handler=is_exit_handler)
if not ContainerOp._DISABLE_REUSABLE_COMPONENT_WARNING and '--component_launcher_class_path' not in (
arguments or []):
if (not ContainerOp._DISABLE_REUSABLE_COMPONENT_WARNING) and (
'--component_launcher_class_path' not in (arguments or [])):
# The warning is suppressed for pipelines created using the TFX SDK.
warnings.warn(
'Please create reusable components instead of constructing ContainerOp instances directly.'
@ -1177,8 +1208,8 @@ class ContainerOp(BaseOp):
self._metadata = None
self._parameter_arguments = None
self.execution_options = ExecutionOptionsSpec(
caching_strategy=CachingStrategySpec(),)
self.execution_options = _structures.ExecutionOptionsSpec(
caching_strategy=_structures.CachingStrategySpec(),)
self.outputs = {}
if file_outputs:
@ -1201,6 +1232,18 @@ class ContainerOp(BaseOp):
self.pvolumes = {}
self.add_pvolumes(pvolumes)
# v2 container spec
@property
def container_spec(self):
return self._container._container_spec
@container_spec.setter
def container_spec(self, spec: _PipelineContainerSpec):
if not isinstance(spec, _PipelineContainerSpec):
raise TypeError('container_spec can only be PipelineContainerSpec. '
'Got: {}'.format(spec))
self._container._container_spec = spec
@property
def command(self):
return self._container.command
@ -1244,7 +1287,7 @@ class ContainerOp(BaseOp):
Args:
metadata (ComponentSpec): component metadata
"""
if not isinstance(metadata, ComponentSpec):
if not isinstance(metadata, _structures.ComponentSpec):
raise ValueError('_set_metadata is expecting ComponentSpec.')
self._metadata = metadata
@ -1281,6 +1324,36 @@ class ContainerOp(BaseOp):
self.pvolume = list(self.pvolumes.values())[0]
return self
def add_node_selector_constraint(self, label_name: str,
value: str) -> 'ContainerOp':
"""Sets accelerator type requirement for this task.
When compiling for v2, this function can be optionally used with
set_gpu_limit to set the number of accelerator required. Otherwise, by
default the number requested will be 1.
Args:
label_name: The name of the constraint label.
For v2, only 'cloud.google.com/gke-accelerator' is supported now.
value: The name of the accelerator.
For v2, available values include 'nvidia-tesla-k80', 'tpu-v3'.
Returns:
self return to allow chained call with other resource specification.
"""
if self.container_spec:
accelerator_cnt = 1
if self.container_spec.resources.accelerator.count > 1:
# Reserve the number if already set.
accelerator_cnt = self.container_spec.resources.accelerator.count
accelerator_config = _PipelineContainerSpec.ResourceSpec.AcceleratorConfig(
type=_sanitize_gpu_type(value), count=accelerator_cnt)
self.container_spec.resources.accelerator.CopyFrom(accelerator_config)
super(ContainerOp, self).add_node_selector_constraint(label_name, value)
return self
# proxy old ContainerOp properties to ContainerOp.container
# with PendingDeprecationWarning.
@ -1301,3 +1374,56 @@ class _MultipleOutputsError:
def __str__(self):
_MultipleOutputsError.raise_error()
def _get_cpu_number(cpu_string: str) -> float:
"""Converts the cpu string to number of vCPU core."""
# dsl.ContainerOp._validate_cpu_string guaranteed that cpu_string is either
# 1) a string can be converted to a float; or
# 2) a string followed by 'm', and it can be converted to a float.
if cpu_string.endswith('m'):
return float(cpu_string[:-1]) / 1000
else:
return float(cpu_string)
def _get_resource_number(resource_string: str) -> float:
"""Converts the resource string to number of resource in GB."""
# dsl.ContainerOp._validate_size_string guaranteed that memory_string
# represents an integer, optionally followed by one of (E, Ei, P, Pi, T, Ti,
# G, Gi, M, Mi, K, Ki).
# See the meaning of different suffix at
# https://kubernetes.io/docs/concepts/configuration/manage-resources-containers/#meaning-of-memory
# Also, ResourceSpec in pipeline IR expects a number in GB.
if resource_string.endswith('E'):
return float(resource_string[:-1]) * _E / _G
elif resource_string.endswith('Ei'):
return float(resource_string[:-2]) * _EI / _G
elif resource_string.endswith('P'):
return float(resource_string[:-1]) * _P / _G
elif resource_string.endswith('Pi'):
return float(resource_string[:-2]) * _PI / _G
elif resource_string.endswith('T'):
return float(resource_string[:-1]) * _T / _G
elif resource_string.endswith('Ti'):
return float(resource_string[:-2]) * _TI / _G
elif resource_string.endswith('G'):
return float(resource_string[:-1])
elif resource_string.endswith('Gi'):
return float(resource_string[:-2]) * _GI / _G
elif resource_string.endswith('M'):
return float(resource_string[:-1]) * _M / _G
elif resource_string.endswith('Mi'):
return float(resource_string[:-2]) * _MI / _G
elif resource_string.endswith('K'):
return float(resource_string[:-1]) * _K / _G
elif resource_string.endswith('Ki'):
return float(resource_string[:-2]) * _KI / _G
else:
# By default interpret as a plain integer, in the unit of Bytes.
return float(resource_string) / _G
def _sanitize_gpu_type(gpu_type: str) -> str:
"""Converts the GPU type to conform the enum style."""
return gpu_type.replace('-', '_').upper()

View File

@ -14,7 +14,7 @@
"""Tests for kfp.v2.dsl.container_op"""
import unittest
from kfp.v2.dsl import container_op
from kfp.dsl import _container_op
from kfp.pipeline_spec import pipeline_spec_pb2
from google.protobuf import text_format
@ -36,22 +36,9 @@ _PipelineContainerSpec = pipeline_spec_pb2.PipelineDeploymentConfig.PipelineCont
class ContainerOpTest(unittest.TestCase):
def test_illegal_resource_setter_fail(self):
task = container_op.ContainerOp(name='test_task', image='python:3.7')
with self.assertRaisesRegex(TypeError, 'ContainerOp.container_spec '
'is expected to be'):
task.set_cpu_limit('1')
def test_illegal_label_fail(self):
task = container_op.ContainerOp(name='test_task', image='python:3.7')
task.container_spec = _PipelineContainerSpec()
with self.assertRaisesRegex(
ValueError, 'Currently add_node_selector_constraint only supports'):
task.add_node_selector_constraint('cloud.google.com/test-label',
'test-value')
def test_chained_call_resource_setter(self):
task = container_op.ContainerOp(name='test_task', image='python:3.7')
task = _container_op.ContainerOp(name='test_task', image='python:3.7')
task.container_spec = _PipelineContainerSpec()
(task.
set_cpu_limit('1').

View File

@ -14,14 +14,12 @@
from typing import Callable, Optional, Union
from kubernetes.client.models import V1PodDNSConfig
from . import _container_op
from . import _resource_op
from . import _ops_group
from ._component_bridge import \
_create_container_op_from_component_and_arguments, \
_sanitize_python_function_name
from ..components import _components
from ..components._naming import _make_name_unique_by_adding_index
from kfp.dsl import _container_op
from kfp.dsl import _resource_op
from kfp.dsl import _ops_group
from kfp.dsl import _component_bridge
from kfp.components import _components
from kfp.components import _naming
import sys
# This handler is called whenever the @pipeline decorator is applied.
@ -193,8 +191,8 @@ class PipelineConf():
from kubernetes.client.models import V1PodDNSConfig, V1PodDNSConfigOption
pipeline_conf = kfp.dsl.PipelineConf()
pipeline_conf.set_dns_config(dns_config=V1PodDNSConfig(
nameservers=["1.2.3.4"], options=[V1PodDNSConfigOption(name="ndots",
value="2")]
nameservers=["1.2.3.4"],
options=[V1PodDNSConfigOption(name="ndots", value="2")],
))
"""
self.dns_config = dns_config
@ -286,8 +284,10 @@ class Pipeline():
raise Exception('Nested pipelines are not allowed.')
Pipeline._default_pipeline = self
self._old_container_task_constructor = _components._container_task_constructor
_components._container_task_constructor = _create_container_op_from_component_and_arguments
self._old_container_task_constructor = (
_components._container_task_constructor)
_components._container_task_constructor = (
_component_bridge._create_container_op_from_component_and_arguments)
def register_op_and_generate_id(op):
return self.add_op(op, op.is_exit_handler)
@ -299,7 +299,8 @@ class Pipeline():
def __exit__(self, *args):
Pipeline._default_pipeline = None
_container_op._register_op_handler = self._old__register_op_handler
_components._container_task_constructor = self._old_container_task_constructor
_components._container_task_constructor = (
self._old_container_task_constructor)
def add_op(self, op: _container_op.BaseOp, define_only: bool):
"""Add a new operator.
@ -312,13 +313,15 @@ class Pipeline():
# Sanitizing the op name.
# Technically this could be delayed to the compilation stage, but string
# serialization of PipelineParams make unsanitized names problematic.
op_name = _sanitize_python_function_name(op.human_name).replace('_', '-')
op_name = _naming._sanitize_python_function_name(op.human_name).replace(
'_', '-')
#If there is an existing op with this name then generate a new name.
op_name = _make_name_unique_by_adding_index(op_name, list(self.ops.keys()),
' ')
op_name = _naming._make_name_unique_by_adding_index(op_name,
list(self.ops.keys()),
' ')
if op_name == '':
op_name = _make_name_unique_by_adding_index('task', list(self.ops.keys()),
' ')
op_name = _naming._make_name_unique_by_adding_index(
'task', list(self.ops.keys()), ' ')
self.ops[op_name] = op
if not define_only:

View File

@ -16,9 +16,9 @@
from typing import List
from kfp.components import _structures as structures
from kfp.v2 import dsl
from kfp.v2.dsl import dsl_utils
from kfp.v2.dsl import type_utils
from kfp.dsl import _pipeline_param
from kfp.dsl import dsl_utils
from kfp.dsl import type_utils
from kfp.pipeline_spec import pipeline_spec_pb2
@ -60,7 +60,7 @@ def build_component_spec_from_structure(
def build_component_inputs_spec(
component_spec: pipeline_spec_pb2.ComponentSpec,
pipeline_params: List[dsl.PipelineParam],
pipeline_params: List[_pipeline_param.PipelineParam],
) -> None:
"""Builds component inputs spec from pipeline params.
@ -82,7 +82,7 @@ def build_component_inputs_spec(
def build_component_outputs_spec(
component_spec: pipeline_spec_pb2.ComponentSpec,
pipeline_params: List[dsl.PipelineParam],
pipeline_params: List[_pipeline_param.PipelineParam],
) -> None:
"""Builds component outputs spec from pipeline params.
@ -103,7 +103,7 @@ def build_component_outputs_spec(
def build_task_inputs_spec(
task_spec: pipeline_spec_pb2.PipelineTaskSpec,
pipeline_params: List[dsl.PipelineParam],
pipeline_params: List[_pipeline_param.PipelineParam],
tasks_in_current_dag: List[str],
) -> None:
"""Builds task inputs spec from pipeline params.

View File

@ -11,13 +11,13 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Tests for kfp.v2.dsl.component_spec."""
"""Tests for kfp.dsl.component_spec."""
import unittest
from kfp.components import _structures as structures
from kfp.v2 import dsl
from kfp.v2.dsl import component_spec as dsl_component_spec
from kfp.dsl import _pipeline_param
from kfp.dsl import component_spec as dsl_component_spec
from kfp.pipeline_spec import pipeline_spec_pb2
from google.protobuf import json_format
@ -83,10 +83,10 @@ class ComponentSpecTest(unittest.TestCase):
def test_build_component_inputs_spec(self):
pipeline_params = [
dsl.PipelineParam(name='input1', param_type='Dataset'),
dsl.PipelineParam(name='input2', param_type='Integer'),
dsl.PipelineParam(name='input3', param_type='String'),
dsl.PipelineParam(name='input4', param_type='Float'),
_pipeline_param.PipelineParam(name='input1', param_type='Dataset'),
_pipeline_param.PipelineParam(name='input2', param_type='Integer'),
_pipeline_param.PipelineParam(name='input3', param_type='String'),
_pipeline_param.PipelineParam(name='input4', param_type='Float'),
]
expected_dict = {
'inputDefinitions': {
@ -122,10 +122,10 @@ class ComponentSpecTest(unittest.TestCase):
def test_build_component_outputs_spec(self):
pipeline_params = [
dsl.PipelineParam(name='output1', param_type='Dataset'),
dsl.PipelineParam(name='output2', param_type='Integer'),
dsl.PipelineParam(name='output3', param_type='String'),
dsl.PipelineParam(name='output4', param_type='Float'),
_pipeline_param.PipelineParam(name='output1', param_type='Dataset'),
_pipeline_param.PipelineParam(name='output2', param_type='Integer'),
_pipeline_param.PipelineParam(name='output3', param_type='String'),
_pipeline_param.PipelineParam(name='output4', param_type='Float'),
]
expected_dict = {
'outputDefinitions': {
@ -161,10 +161,10 @@ class ComponentSpecTest(unittest.TestCase):
def test_build_task_inputs_spec(self):
pipeline_params = [
dsl.PipelineParam(name='output1', param_type='Dataset', op_name='op-1'),
dsl.PipelineParam(name='output2', param_type='Integer', op_name='op-2'),
dsl.PipelineParam(name='output3', param_type='Model', op_name='op-3'),
dsl.PipelineParam(name='output4', param_type='Double', op_name='op-4'),
_pipeline_param.PipelineParam(name='output1', param_type='Dataset', op_name='op-1'),
_pipeline_param.PipelineParam(name='output2', param_type='Integer', op_name='op-2'),
_pipeline_param.PipelineParam(name='output3', param_type='Model', op_name='op-3'),
_pipeline_param.PipelineParam(name='output4', param_type='Double', op_name='op-4'),
]
tasks_in_current_dag = ['op-1', 'op-2']
expected_dict = {

View File

@ -11,11 +11,11 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Tests for kfp.v2.dsl.dsl_utils."""
"""Tests for kfp.dsl.dsl_utils."""
import unittest
from kfp.v2.dsl import dsl_utils
from kfp.dsl import dsl_utils
from kfp.pipeline_spec import pipeline_spec_pb2
from google.protobuf import json_format

View File

@ -15,7 +15,7 @@
from typing import Optional
from kfp.v2.dsl import dsl_utils
from kfp.dsl import dsl_utils
from kfp.pipeline_spec import pipeline_spec_pb2
OUTPUT_KEY = 'result'

View File

@ -13,7 +13,7 @@
# limitations under the License.
import unittest
from kfp.v2.dsl import importer_node
from kfp.dsl import importer_node
from kfp.pipeline_spec import pipeline_spec_pb2 as pb
from google.protobuf import json_format

View File

@ -11,7 +11,7 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Tests for kfp.dsl.v2.serialization_utils module."""
"""Tests for kfp.dsl.serialization_utils module."""
import unittest
from kfp.dsl import serialization_utils

View File

@ -15,7 +15,7 @@
import sys
import unittest
from kfp.components import structures
from kfp.v2.dsl import type_utils
from kfp.dsl import type_utils
from kfp.pipeline_spec import pipeline_spec_pb2 as pb
_PARAMETER_TYPES = ['String', 'str', 'Integer', 'int', 'Float', 'Double']

View File

@ -28,15 +28,14 @@ from typing import Any, Callable, Dict, List, Mapping, Optional, Set, Tuple, Uni
import kfp
from kfp.compiler._k8s_helper import sanitize_k8s_name
from kfp.components import _python_op
from kfp.dsl import _container_op
from kfp import dsl
from kfp.dsl import _for_loop
from kfp.dsl import _pipeline_param
from kfp.v2 import dsl
from kfp.v2.compiler import compiler_utils
from kfp.v2.dsl import component_spec as dsl_component_spec
from kfp.v2.dsl import dsl_utils
from kfp.v2.dsl import importer_node
from kfp.v2.dsl import type_utils
from kfp.dsl import component_spec as dsl_component_spec
from kfp.dsl import dsl_utils
from kfp.dsl import importer_node
from kfp.dsl import type_utils
from kfp.pipeline_spec import pipeline_spec_pb2
from google.protobuf import json_format
@ -677,6 +676,14 @@ class Compiler(object):
getattr(subgroup, 'human_name', subgroup_key))
subgroup_task_spec.component_ref.name = subgroup_component_name
if isinstance(subgroup, dsl.OpsGroup) and subgroup.type == 'graph':
raise NotImplementedError(
'dsl.graph_component is not yet supported in KFP v2 compiler.')
if isinstance(subgroup, dsl.OpsGroup) and subgroup.type == 'exit_handler':
raise NotImplementedError(
'dsl.ExitHandler is not yet supported in KFP v2 compiler.')
if isinstance(subgroup, dsl.OpsGroup) and subgroup.type == 'condition':
condition = subgroup.condition
operand_values = []
@ -713,6 +720,11 @@ class Compiler(object):
[dsl_utils.sanitize_task_name(dep) for dep in group_dependencies])
if isinstance(subgroup, dsl.ParallelFor):
if subgroup.parallelism is not None:
warnings.warn(
'Setting parallelism in ParallelFor is not supported yet.'
'The setting is ignored.')
# Remove loop arguments related inputs from parent group component spec.
input_names = [param.full_name for param, _ in inputs[subgroup.name]]
for input_name in input_names:
@ -816,7 +828,7 @@ class Compiler(object):
executor_label = subgroup_component_spec.executor_label
if executor_label not in deployment_config.executors:
deployment_config.executors[
executor_label].custom_job.custom_job.update(custom_job_spec)
executor_label].custom_job.custom_job.update(custom_job_spec)
pipeline_spec.deployment_spec.update(
json_format.MessageToDict(deployment_config))
@ -887,7 +899,9 @@ class Compiler(object):
return pipeline_spec
def _create_pipeline(
# The name of this method is used to check if compiling for v2.
# See `is_compiling_for_v2` in `kfp/dsl/_component_bridge.py`
def _create_pipeline_v2(
self,
pipeline_func: Callable[..., Any],
pipeline_root: Optional[str] = None,
@ -989,7 +1003,7 @@ class Compiler(object):
type_check_old_value = kfp.TYPE_CHECK
try:
kfp.TYPE_CHECK = type_check
pipeline_job = self._create_pipeline(
pipeline_job = self._create_pipeline_v2(
pipeline_func=pipeline_func,
pipeline_root=pipeline_root,
pipeline_name=pipeline_name,

View File

@ -102,7 +102,7 @@ class CompilerTest(unittest.TestCase):
- {inputValue: msg}
""")
@dsl.pipeline()
@dsl.pipeline(name='pipeline-with-exit-handler')
def download_and_print(url='gs://ml-pipeline/shakespeare/shakespeare1.txt'):
"""A sample pipeline showing exit handler."""
@ -142,7 +142,7 @@ class CompilerTest(unittest.TestCase):
command=['sh', '-c'],
arguments=['echo "$0"', text2])
@dsl.pipeline()
@dsl.pipeline(name='pipeline-with-graph-component')
def opsgroups_pipeline(text1='message 1', text2='message 2'):
step1_graph_component = echo1_graph_component(text1)
step2_graph_component = echo2_graph_component(text2)

View File

@ -12,15 +12,4 @@
# See the License for the specific language governing permissions and
# limitations under the License.
from kfp.dsl._container_op import ContainerOp
from kfp.dsl._container_op import BaseOp
from kfp.dsl._pipeline_param import PipelineParam
from kfp.dsl._pipeline import pipeline
from kfp.v2.dsl._component import graph_component
from kfp.v2.dsl._pipeline import Pipeline
from kfp.v2.dsl._ops_group import OpsGroup
from kfp.v2.dsl._ops_group import Graph
from kfp.v2.dsl._ops_group import Condition
from kfp.v2.dsl._ops_group import ExitHandler
from kfp.v2.dsl._ops_group import ParallelFor
from kfp.dsl import *

View File

@ -1,19 +0,0 @@
# Copyright 2020 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
def graph_component(func):
"""Decorator for graph component functions."""
raise NotImplementedError(
'dsl.graph_component is not yet supported in KFP v2 compiler.')

View File

@ -1,214 +0,0 @@
# Copyright 2020 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from typing import Optional, Union
import warnings
from kfp.dsl import _container_op
from kfp.dsl import _for_loop
from kfp.dsl import _pipeline_param
from kfp.v2.dsl import _pipeline
class OpsGroup(object):
"""Represents a logical group of ops and group of OpsGroups.
This class is the base class for groups of ops, such as ops sharing an exit
handler,
a condition branch, or a loop. This class is not supposed to be used by
pipeline authors.
It is useful for implementing a compiler.
"""
def __init__(self,
group_type: str,
name: Optional[str] = None,
parallelism: Optional[int] = None):
"""Create a new instance of OpsGroup.
Args:
group_type (str): one of 'pipeline', 'exit_handler', 'condition',
'for_loop', and 'graph'.
name (str): name of the opsgroup
parallelism (int): parallelism for the sub-DAG:s
"""
#TODO: declare the group_type to be strongly typed
self.type = group_type
self.ops = list()
self.groups = list()
self.name = name
self.dependencies = []
self.parallelism = parallelism
# recursive_ref points to the opsgroups with the same name if exists.
self.recursive_ref = None
self.loop_args = None
@staticmethod
def _get_matching_opsgroup_already_in_pipeline(group_type, name):
"""Retrieves the opsgroup when the pipeline already contains it.
the opsgroup might be already in the pipeline in case of recursive calls.
Args:
group_type (str): one of 'pipeline', 'exit_handler', 'condition', and
'graph'.
name (str): the name before conversion.
"""
if not _pipeline.Pipeline.get_default_pipeline():
raise ValueError('Default pipeline not defined.')
if name is None:
return None
name_pattern = '^' + (group_type + '-' + name + '-').replace('_',
'-') + '[\d]+$'
for ops_group_already_in_pipeline in _pipeline.Pipeline.get_default_pipeline(
).groups:
import re
if ops_group_already_in_pipeline.type == group_type \
and re.match(name_pattern ,ops_group_already_in_pipeline.name):
return ops_group_already_in_pipeline
return None
def _make_name_unique(self):
"""Generate a unique opsgroup name in the pipeline"""
if not _pipeline.Pipeline.get_default_pipeline():
raise ValueError('Default pipeline not defined.')
self.name = (
self.type + '-' + ('' if self.name is None else self.name + '-') +
str(_pipeline.Pipeline.get_default_pipeline().get_next_group_id()))
self.name = self.name.replace('_', '-')
def __enter__(self):
if not _pipeline.Pipeline.get_default_pipeline():
raise ValueError('Default pipeline not defined.')
self.recursive_ref = self._get_matching_opsgroup_already_in_pipeline(
self.type, self.name)
if not self.recursive_ref:
self._make_name_unique()
_pipeline.Pipeline.get_default_pipeline().push_ops_group(self)
return self
def __exit__(self, *args):
_pipeline.Pipeline.get_default_pipeline().pop_ops_group()
def after(self, *ops):
"""Specify explicit dependency on other ops."""
for op in ops:
self.dependencies.append(op)
return self
def remove_op_recursive(self, op):
if self.ops and op in self.ops:
self.ops.remove(op)
for sub_group in self.groups or []:
sub_group.remove_op_recursive(op)
class SubGraph(OpsGroup):
TYPE_NAME = 'subgraph'
def __init__(self, parallelism: int):
raise NotImplementedError('Graph is not yet supported in KFP v2 compiler.')
class Condition(OpsGroup):
"""Represents an condition group with a condition.
Args:
condition (ConditionOperator): the condition.
name (str): name of the condition
Example: ::
with Condition(param1=='pizza', '[param1 is pizza]'): op1 =
ContainerOp(...) op2 = ContainerOp(...)
"""
def __init__(self,
condition: _pipeline_param.ConditionOperator,
name: Optional[str] = None):
super(Condition, self).__init__('condition', name)
self.condition = condition
class Graph(OpsGroup):
"""Graph DAG with inputs, recursive_inputs, and outputs.
This is not used directly by the users but auto generated when the
graph_component decoration exists
Args:
name: Name of the graph.
"""
def __init__(self, name):
raise NotImplementedError('Graph is not yet supported in KFP v2 compiler.')
class ExitHandler(OpsGroup):
"""Represents an exit handler that is invoked upon exiting a group of ops."""
def __init__(self, exit_op: _container_op.ContainerOp):
raise NotImplementedError(
'dsl.ExitHandler is not yet supported in KFP v2 compiler.')
class ParallelFor(OpsGroup):
"""Represents a parallel for loop over a static set of items.
Example:
In this case :code:`op1` would be executed twice, once with case
:code:`args=['echo 1']` and once with case :code:`args=['echo 2']`::
with dsl.ParallelFor([{'a': 1, 'b': 10}, {'a': 2, 'b': 20}]) as item:
op1 = ContainerOp(..., args=['echo {}'.format(item.a)])
op2 = ContainerOp(..., args=['echo {}'.format(item.b])
"""
TYPE_NAME = 'for_loop'
def __init__(self,
loop_args: Union[_for_loop.ItemList,
_pipeline_param.PipelineParam],
parallelism: Optional[int] = None):
if parallelism is not None:
warnings.warn('Setting parallelism in ParallelFor is not supported yet.'
'The setting is ignored.')
if parallelism and parallelism < 1:
raise ValueError(
'ParallelFor parallism set to < 1, allowed values are > 0')
self.items_is_pipeline_param = isinstance(loop_args,
_pipeline_param.PipelineParam)
super().__init__(self.TYPE_NAME, parallelism=parallelism)
if self.items_is_pipeline_param:
loop_args = _for_loop.LoopArguments.from_pipeline_param(loop_args)
elif not self.items_is_pipeline_param and not isinstance(
loop_args, _for_loop.LoopArguments):
# we were passed a raw list, wrap it in loop args
loop_args = _for_loop.LoopArguments(
loop_args,
code=str(
_pipeline.Pipeline.get_default_pipeline().get_next_group_id()),
)
self.loop_args = loop_args
def __enter__(self) -> _for_loop.LoopArguments:
_ = super().__enter__()
return self.loop_args

View File

@ -1,154 +0,0 @@
# Copyright 2020 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Pipeline class and decorator function definition."""
import collections
from typing import Any, Callable
from kfp.components import _naming
from kfp.components import _components as components
from kfp.dsl import _container_op
from kfp.v2.dsl import _ops_group
from kfp.v2.dsl import component_bridge
# TODO: Pipeline is in fact an opsgroup, refactor the code.
class Pipeline():
"""A pipeline contains a list of operators.
This class is not supposed to be used by pipeline authors since pipeline
authors can use
pipeline functions (decorated with @pipeline) to reference their pipelines.
This class
is useful for implementing a compiler. For example, the compiler can use the
following
to get the pipeline object and its ops:
Example:
::
with Pipeline() as p:
pipeline_func(*args_list)
traverse(p.ops)
"""
# _default_pipeline is set when it (usually a compiler) runs "with Pipeline()"
_default_pipeline = None
@staticmethod
def get_default_pipeline():
"""Get default pipeline."""
return Pipeline._default_pipeline
@staticmethod
def add_pipeline(name, description, func):
"""Add a pipeline function with the specified name and description."""
# Applying the @pipeline decorator to the pipeline function
func = pipeline(name=name, description=description)(func)
def __init__(self, name: str):
"""Create a new instance of Pipeline.
Args:
name: the name of the pipeline. Once deployed, the name will show up in
Pipeline System UI.
"""
self.name = name
self.ops = collections.OrderedDict()
# Add the root group.
self.groups = [_ops_group.OpsGroup('pipeline', name=name)]
self.group_id = 0
self._metadata = None
def __enter__(self):
if Pipeline._default_pipeline:
raise Exception('Nested pipelines are not allowed.')
Pipeline._default_pipeline = self
self._old_container_task_constructor = (
components._container_task_constructor)
components._container_task_constructor = (
component_bridge.create_container_op_from_component_and_arguments)
def register_op_and_generate_id(op):
return self.add_op(op, op.is_exit_handler)
self._old__register_op_handler = _container_op._register_op_handler
_container_op._register_op_handler = register_op_and_generate_id
return self
def __exit__(self, *args):
Pipeline._default_pipeline = None
_container_op._register_op_handler = self._old__register_op_handler
components._container_task_constructor = (
self._old_container_task_constructor)
def add_op(self, op: _container_op.BaseOp, define_only: bool) -> str:
"""Add a new operator.
Args:
op: An operator of ContainerOp, ResourceOp or their inherited types.
Returns:
The name of the op.
"""
# Sanitizing the op name.
# Technically this could be delayed to the compilation stage, but string serialization of PipelineParams make unsanitized names problematic.
op_name = component_bridge._sanitize_python_function_name(
op.human_name).replace('_', '-')
#If there is an existing op with this name then generate a new name.
op_name = _naming._make_name_unique_by_adding_index(op_name,
list(self.ops.keys()),
' ')
if op_name == '':
op_name = _nameing._make_name_unique_by_adding_index(
'task', list(self.ops.keys()), ' ')
self.ops[op_name] = op
if not define_only:
self.groups[-1].ops.append(op)
return op_name
def push_ops_group(self, group: _ops_group.OpsGroup):
"""Push an OpsGroup into the stack.
Args:
group: An OpsGroup. Typically it is one of ExitHandler, Branch, and Loop.
"""
self.groups[-1].groups.append(group)
self.groups.append(group)
def pop_ops_group(self):
"""Remove the current OpsGroup from the stack."""
del self.groups[-1]
def remove_op_from_groups(self, op):
for group in self.groups:
group.remove_op_recursive(op)
def get_next_group_id(self):
"""Get next id for a new group. """
self.group_id += 1
return self.group_id
def _set_metadata(self, metadata):
"""_set_metadata passes the containerop the metadata information
Args:
metadata (ComponentMeta): component metadata
"""
self._metadata = metadata

View File

@ -1,291 +0,0 @@
# Copyright 2020 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Function for creating ContainerOp instances from component spec."""
import copy
from typing import Any, Mapping, Optional
from kfp import dsl
from kfp.components import _structures as structures
from kfp.components._components import _default_component_name
from kfp.components._components import _resolve_command_line_and_paths
from kfp.components._naming import _sanitize_python_function_name
from kfp.components._naming import generate_unique_name_conversion_table
from kfp.dsl import _pipeline_param
from kfp.dsl import types
from kfp.v2.dsl import component_spec as dsl_component_spec
from kfp.v2.dsl import container_op
from kfp.v2.dsl import dsl_utils
from kfp.v2.dsl import importer_node
from kfp.v2.dsl import type_utils
from kfp.pipeline_spec import pipeline_spec_pb2
# TODO: cleanup unused code.
def create_container_op_from_component_and_arguments(
component_spec: structures.ComponentSpec,
arguments: Mapping[str, Any],
component_ref: Optional[structures.ComponentReference] = None,
) -> container_op.ContainerOp:
"""Instantiates ContainerOp object.
Args:
component_spec: The component spec object.
arguments: The dictionary of component arguments.
component_ref: (not used in v2)
Returns:
A ContainerOp instance.
"""
pipeline_task_spec = pipeline_spec_pb2.PipelineTaskSpec()
# Keep track of auto-injected importer spec.
importer_specs = {}
# Check types of the reference arguments and serialize PipelineParams
arguments = arguments.copy()
# Preserver input params for ContainerOp.inputs
input_params = list(
set([
param for param in arguments.values()
if isinstance(param, dsl.PipelineParam)
]))
for input_name, argument_value in arguments.items():
if isinstance(argument_value, dsl.PipelineParam):
input_type = component_spec._inputs_dict[input_name].type
reference_type = argument_value.param_type
types.verify_type_compatibility(
reference_type, input_type,
'Incompatible argument passed to the input "{}" of component "{}": '
.format(input_name, component_spec.name))
arguments[input_name] = str(argument_value)
if type_utils.is_parameter_type(input_type):
if argument_value.op_name:
pipeline_task_spec.inputs.parameters[
input_name].task_output_parameter.producer_task = (
dsl_utils.sanitize_task_name(argument_value.op_name))
pipeline_task_spec.inputs.parameters[
input_name].task_output_parameter.output_parameter_key = (
argument_value.name)
else:
pipeline_task_spec.inputs.parameters[
input_name].component_input_parameter = argument_value.name
else:
if argument_value.op_name:
pipeline_task_spec.inputs.artifacts[
input_name].task_output_artifact.producer_task = (
dsl_utils.sanitize_task_name(argument_value.op_name))
pipeline_task_spec.inputs.artifacts[
input_name].task_output_artifact.output_artifact_key = (
argument_value.name)
else:
# argument_value.op_name could be none, in which case an importer node
# will be inserted later.
pipeline_task_spec.inputs.artifacts[
input_name].task_output_artifact.producer_task = ''
type_schema = type_utils.get_input_artifact_type_schema(
input_name, component_spec.inputs)
importer_specs[input_name] = importer_node.build_importer_spec(
input_type_schema=type_schema,
pipeline_param_name=argument_value.name)
elif isinstance(argument_value, str):
pipeline_params = _pipeline_param.extract_pipelineparams_from_any(
argument_value)
if pipeline_params:
# argument_value contains PipelineParam placeholders.
raise NotImplementedError(
'Currently, a component input can only accept either a constant '
'value or a reference to another pipeline parameter. It cannot be a '
'combination of both. Got: {} for input {}'.format(
argument_value, input_name))
input_type = component_spec._inputs_dict[input_name].type
if type_utils.is_parameter_type(input_type):
pipeline_task_spec.inputs.parameters[
input_name].runtime_value.constant_value.string_value = (
argument_value)
else:
# An importer node with constant value artifact_uri will be inserted.
pipeline_task_spec.inputs.artifacts[
input_name].task_output_artifact.producer_task = ''
type_schema = type_utils.get_input_artifact_type_schema(
input_name, component_spec.inputs)
importer_specs[input_name] = importer_node.build_importer_spec(
input_type_schema=type_schema, constant_value=argument_value)
elif isinstance(argument_value, int):
pipeline_task_spec.inputs.parameters[
input_name].runtime_value.constant_value.int_value = argument_value
elif isinstance(argument_value, float):
pipeline_task_spec.inputs.parameters[
input_name].runtime_value.constant_value.double_value = argument_value
elif isinstance(argument_value, dsl.ContainerOp):
raise TypeError(
'ContainerOp object {} was passed to component as an input argument. '
'Pass a single output instead.'.format(input_name))
else:
raise NotImplementedError(
'Input argument supports only the following types: PipelineParam'
', str, int, float. Got: "{}".'.format(argument_value))
inputs_dict = {
input_spec.name: input_spec for input_spec in component_spec.inputs or []
}
outputs_dict = {
output_spec.name: output_spec
for output_spec in component_spec.outputs or []
}
def _input_artifact_uri_placeholder(input_key: str) -> str:
if type_utils.is_parameter_type(inputs_dict[input_key].type):
raise TypeError(
'Input "{}" with type "{}" cannot be paired with InputUriPlaceholder.'
.format(input_key, inputs_dict[input_key].type))
else:
return "{{{{$.inputs.artifacts['{}'].uri}}}}".format(input_key)
def _input_artifact_path_placeholder(input_key: str) -> str:
if type_utils.is_parameter_type(inputs_dict[input_key].type):
raise TypeError(
'Input "{}" with type "{}" cannot be paired with InputPathPlaceholder.'
.format(input_key, inputs_dict[input_key].type))
elif input_key in importer_specs:
raise TypeError(
'Input "{}" with type "{}" is not connected to any upstream output. '
'However it is used with InputPathPlaceholder. '
'If you want to import an existing artifact using a system-connected '
'importer node, use InputUriPlaceholder instead. '
'Or if you just want to pass a string parameter, use string type and '
'InputValuePlaceholder instead.'.format(input_key,
inputs_dict[input_key].type))
else:
return "{{{{$.inputs.artifacts['{}'].path}}}}".format(input_key)
def _input_parameter_placeholder(input_key: str) -> str:
if type_utils.is_parameter_type(inputs_dict[input_key].type):
return "{{{{$.inputs.parameters['{}']}}}}".format(input_key)
else:
raise TypeError(
'Input "{}" with type "{}" cannot be paired with InputValuePlaceholder.'
.format(input_key, inputs_dict[input_key].type))
def _output_artifact_uri_placeholder(output_key: str) -> str:
if type_utils.is_parameter_type(outputs_dict[output_key].type):
raise TypeError(
'Output "{}" with type "{}" cannot be paired with OutputUriPlaceholder.'
.format(output_key, outputs_dict[output_key].type))
else:
return "{{{{$.outputs.artifacts['{}'].uri}}}}".format(output_key)
def _output_artifact_path_placeholder(output_key: str) -> str:
return "{{{{$.outputs.artifacts['{}'].path}}}}".format(output_key)
def _output_parameter_path_placeholder(output_key: str) -> str:
return "{{{{$.outputs.parameters['{}'].output_file}}}}".format(output_key)
def _resolve_output_path_placeholder(output_key: str) -> str:
if type_utils.is_parameter_type(outputs_dict[output_key].type):
return _output_parameter_path_placeholder(output_key)
else:
return _output_artifact_path_placeholder(output_key)
resolved_cmd = _resolve_command_line_and_paths(
component_spec=component_spec,
arguments=arguments,
input_value_generator=_input_parameter_placeholder,
input_uri_generator=_input_artifact_uri_placeholder,
output_uri_generator=_output_artifact_uri_placeholder,
input_path_generator=_input_artifact_path_placeholder,
output_path_generator=_resolve_output_path_placeholder,
)
container_spec = component_spec.implementation.container
output_uris_and_paths = resolved_cmd.output_uris.copy()
output_uris_and_paths.update(resolved_cmd.output_paths)
input_uris_and_paths = resolved_cmd.input_uris.copy()
input_uris_and_paths.update(resolved_cmd.input_paths)
old_warn_value = dsl.ContainerOp._DISABLE_REUSABLE_COMPONENT_WARNING
dsl.ContainerOp._DISABLE_REUSABLE_COMPONENT_WARNING = True
task = container_op.ContainerOp(
name=component_spec.name or _default_component_name,
image=container_spec.image,
command=resolved_cmd.command,
arguments=resolved_cmd.args,
file_outputs=output_uris_and_paths,
artifact_argument_paths=[
dsl.InputArgumentPath(
argument=arguments[input_name],
input=input_name,
path=path,
) for input_name, path in input_uris_and_paths.items()
],
)
# task.name is unique at this point.
pipeline_task_spec.task_info.name = (dsl_utils.sanitize_task_name(task.name))
pipeline_task_spec.component_ref.name = (
dsl_utils.sanitize_component_name(component_spec.name))
task.task_spec = pipeline_task_spec
task.importer_specs = importer_specs
task.component_spec = dsl_component_spec.build_component_spec_from_structure(
component_spec)
task.container_spec = (
pipeline_spec_pb2.PipelineDeploymentConfig.PipelineContainerSpec(
image=container_spec.image,
command=resolved_cmd.command,
args=resolved_cmd.args))
dsl.ContainerOp._DISABLE_REUSABLE_COMPONENT_WARNING = old_warn_value
component_meta = copy.copy(component_spec)
task._set_metadata(component_meta)
task.inputs = input_params
# Previously, ContainerOp had strict requirements for the output names, so we
# had to convert all the names before passing them to the ContainerOp
# constructor. Outputs with non-pythonic names could not be accessed using
# their original names. Now ContainerOp supports any output names, so we're
# now using the original output names. However to support legacy pipelines,
# we're also adding output references with pythonic names.
# TODO: Add warning when people use the legacy output names.
output_names = [
output_spec.name for output_spec in component_spec.outputs or []
] # Stabilizing the ordering
output_name_to_python = generate_unique_name_conversion_table(
output_names, _sanitize_python_function_name)
for output_name in output_names:
pythonic_output_name = output_name_to_python[output_name]
# Note: Some component outputs are currently missing from task.outputs
# (e.g. MLPipeline UI Metadata)
if pythonic_output_name not in task.outputs and output_name in task.outputs:
task.outputs[pythonic_output_name] = task.outputs[output_name]
if component_spec.metadata:
annotations = component_spec.metadata.annotations or {}
for key, value in annotations.items():
task.add_pod_annotation(key, value)
for key, value in (component_spec.metadata.labels or {}).items():
task.add_pod_label(key, value)
# Disabling the caching for the volatile components by default
if annotations.get('volatile_component', 'false') == 'true':
task.execution_options.caching_strategy.max_cache_staleness = 'P0D'
return task

View File

@ -1,219 +0,0 @@
# Copyright 2020 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""IR-based ContainerOp."""
from typing import Callable, Text
from kfp import dsl
from kfp.components import _structures as structures
from kfp.pipeline_spec import pipeline_spec_pb2
# Unit constants for k8s size string.
_E = 10**18 # Exa
_EI = 1 << 60 # Exa: power-of-two approximate
_P = 10**15 # Peta
_PI = 1 << 50 # Peta: power-of-two approximate
# noinspection PyShadowingBuiltins
_T = 10**12 # Tera
_TI = 1 << 40 # Tera: power-of-two approximate
_G = 10**9 # Giga
_GI = 1 << 30 # Giga: power-of-two approximate
_M = 10**6 # Mega
_MI = 1 << 20 # Mega: power-of-two approximate
_K = 10**3 # Kilo
_KI = 1 << 10 # Kilo: power-of-two approximate
_GKE_ACCELERATOR_LABEL = 'cloud.google.com/gke-accelerator'
# Shorthand for PipelineContainerSpec
_PipelineContainerSpec = pipeline_spec_pb2.PipelineDeploymentConfig.PipelineContainerSpec
def resource_setter(func: Callable):
"""Function decorator for common validation before setting resource spec."""
def resource_setter_wrapper(container_op: 'ContainerOp', *args,
**kwargs) -> 'ContainerOp':
# Validate the container_op has right format of container_spec set.
if not hasattr(container_op, 'container_spec'):
raise ValueError('Expecting container_spec attribute of the container_op:'
' {}'.format(container_op))
if not isinstance(container_op.container_spec, _PipelineContainerSpec):
raise TypeError('ContainerOp.container_spec is expected to be a '
'PipelineContainerSpec proto. Got: {} for {}'.format(
type(container_op.container_spec),
container_op.container_spec))
# Run the resource setter function
return func(container_op, *args, **kwargs)
return resource_setter_wrapper
def _get_cpu_number(cpu_string: Text) -> float:
"""Converts the cpu string to number of vCPU core."""
# dsl.ContainerOp._validate_cpu_string guaranteed that cpu_string is either
# 1) a string can be converted to a float; or
# 2) a string followed by 'm', and it can be converted to a float.
if cpu_string.endswith('m'):
return float(cpu_string[:-1]) / 1000
else:
return float(cpu_string)
def _get_resource_number(resource_string: Text) -> float:
"""Converts the resource string to number of resource in GB."""
# dsl.ContainerOp._validate_size_string guaranteed that memory_string
# represents an integer, optionally followed by one of (E, Ei, P, Pi, T, Ti,
# G, Gi, M, Mi, K, Ki).
# See the meaning of different suffix at
# https://kubernetes.io/docs/concepts/configuration/manage-resources-containers/#meaning-of-memory
# Also, ResourceSpec in pipeline IR expects a number in GB.
if resource_string.endswith('E'):
return float(resource_string[:-1]) * _E / _G
elif resource_string.endswith('Ei'):
return float(resource_string[:-2]) * _EI / _G
elif resource_string.endswith('P'):
return float(resource_string[:-1]) * _P / _G
elif resource_string.endswith('Pi'):
return float(resource_string[:-2]) * _PI / _G
elif resource_string.endswith('T'):
return float(resource_string[:-1]) * _T / _G
elif resource_string.endswith('Ti'):
return float(resource_string[:-2]) * _TI / _G
elif resource_string.endswith('G'):
return float(resource_string[:-1])
elif resource_string.endswith('Gi'):
return float(resource_string[:-2]) * _GI / _G
elif resource_string.endswith('M'):
return float(resource_string[:-1]) * _M / _G
elif resource_string.endswith('Mi'):
return float(resource_string[:-2]) * _MI / _G
elif resource_string.endswith('K'):
return float(resource_string[:-1]) * _K / _G
elif resource_string.endswith('Ki'):
return float(resource_string[:-2]) * _KI / _G
else:
# By default interpret as a plain integer, in the unit of Bytes.
return float(resource_string) / _G
def _sanitize_gpu_type(gpu_type: Text) -> Text:
"""Converts the GPU type to conform the enum style."""
return gpu_type.replace('-', '_').upper()
class ContainerOp(dsl.ContainerOp):
"""V2 ContainerOp class.
This class inherits an almost identical behavior as the previous ContainerOp
class. The diffs are in two aspects:
- The source of truth is migrating to the PipelineContainerSpec proto.
- The implementation (and impact) of several APIs are different. For example,
resource spec will be set in the pipeline IR proto instead of using k8s API.
"""
def __init__(self, **kwargs):
super(ContainerOp, self).__init__(**kwargs)
self._container_spec = None
@property
def container_spec(self):
return self._container_spec
@container_spec.setter
def container_spec(self, spec: _PipelineContainerSpec):
if not isinstance(spec, _PipelineContainerSpec):
raise TypeError('container_spec can only be PipelineContainerSpec. '
'Got: {}'.format(spec))
self._container_spec = spec
# Override resource specification calls.
@resource_setter
def set_cpu_limit(self, cpu: Text) -> 'ContainerOp':
"""Sets the cpu provisioned for this task.
Args:
cpu: a string indicating the amount of vCPU required by this task. Please
refer to dsl.ContainerOp._validate_cpu_string regarding its format.
Returns:
self return to allow chained call with other resource specification.
"""
self.container._validate_cpu_string(cpu)
self.container_spec.resources.cpu_limit = _get_cpu_number(cpu)
return self
@resource_setter
def set_memory_limit(self, memory: Text) -> 'ContainerOp':
"""Sets the memory provisioned for this task.
Args:
memory: a string described the amount of memory required by this task.
Please refer to dsl.ContainerOp._validate_size_string regarding its
format.
Returns:
self return to allow chained call with other resource specification.
"""
self.container._validate_size_string(memory)
self.container_spec.resources.memory_limit = _get_resource_number(memory)
return self
@resource_setter
def add_node_selector_constraint(self, label_name: Text,
value: Text) -> 'ContainerOp':
"""Sets accelerator type requirement for this task.
This function is designed to enable users to specify accelerator using
a similar DSL syntax as KFP V1. Under the hood, it will directly specify
the accelerator required in the IR proto, instead of relying on the
k8s node selector API.
This function can be optionally used with set_gpu_limit to set the number
of accelerator required. Otherwise, by default the number requested will be
1.
Args:
label_name: only support 'cloud.google.com/gke-accelerator' now.
value: name of the accelerator. For example, 'nvidia-tesla-k80', or
'tpu-v3'.
Returns:
self return to allow chained call with other resource specification.
"""
if label_name != _GKE_ACCELERATOR_LABEL:
raise ValueError(
'Currently add_node_selector_constraint only supports '
'accelerator spec, with node label {}. Got {} instead'.format(
_GKE_ACCELERATOR_LABEL, label_name))
accelerator_cnt = 1
if self.container_spec.resources.accelerator.count > 1:
# Reserve the number if already set.
accelerator_cnt = self.container_spec.resources.accelerator.count
accelerator_config = _PipelineContainerSpec.ResourceSpec.AcceleratorConfig(
type=_sanitize_gpu_type(value), count=accelerator_cnt)
self.container_spec.resources.accelerator.CopyFrom(accelerator_config)
return self
@resource_setter
def set_gpu_limit(self, count: int) -> 'ContainerOp':
"""Sets the number of accelerator needed for this task."""
if count < 1:
raise ValueError('Accelerator count needs to be positive: Got: '
'{}'.format(count))
self.container_spec.resources.accelerator.count = count
return self