WithItems Support (#1868)

* hacking

* hacking 2

* moved withitems to opsgroup

* basic loop test working

* fixed nested loop bug, added tests

* cleanup

* gitignore; compiler tests

* cleanup

* tests fixup

* removed format strings

* removed uuid override from test

* cleanup

* responding to comments

* removed compiler withitems test

* removed pipeline param typemeta
This commit is contained in:
Kevin Bache 2019-08-23 21:00:28 -07:00 committed by Kubernetes Prow Robot
parent e48d563cb9
commit 96fd19356c
16 changed files with 768 additions and 77 deletions

4
.gitignore vendored
View File

@ -52,5 +52,9 @@ bazel-*
# VSCode
.vscode
# test yaml
sdk/python/tests/compiler/pipeline.yaml
sdk/python/tests/compiler/testdata/testpackage/pipeline.yaml
# Test temporary files
_artifacts

View File

@ -65,8 +65,7 @@ def _process_obj(obj: Any, map_to_tmpl_var: dict):
# pipelineparam
if isinstance(obj, dsl.PipelineParam):
# if not found in unsanitized map, then likely to be sanitized
return map_to_tmpl_var.get(
str(obj), '{{inputs.parameters.%s}}' % obj.full_name)
return map_to_tmpl_var.get(str(obj), '{{inputs.parameters.%s}}' % obj.full_name)
# k8s objects (generated from swaggercodegen)
if hasattr(obj, 'swagger_types') and isinstance(obj.swagger_types, dict):

View File

@ -15,10 +15,12 @@
from collections import defaultdict
import inspect
import re
import tarfile
import zipfile
from typing import Set, List, Text, Dict
import yaml
from kfp.dsl import _container_op, _for_loop
from .. import dsl
from ._k8s_helper import K8sHelper
@ -28,6 +30,7 @@ from ._default_transformers import add_pod_env
from ..dsl._metadata import _extract_pipeline_metadata
from ..dsl._ops_group import OpsGroup
class Compiler(object):
"""DSL Compiler.
@ -157,7 +160,6 @@ class Compiler(object):
def _get_condition_params_for_ops(self, root_group):
"""Get parameters referenced in conditions of ops."""
conditions = defaultdict(set)
def _get_condition_params_for_ops_helper(group, current_conditions_params):
@ -182,7 +184,49 @@ class Compiler(object):
_get_condition_params_for_ops_helper(root_group, [])
return conditions
def _get_inputs_outputs(self, pipeline, root_group, op_groups, opsgroup_groups, condition_params):
def _get_next_group_or_op(cls, to_visit: List, already_visited: Set):
"""Get next group or op to visit."""
if len(to_visit) == 0:
return None
next = to_visit.pop(0)
while next in already_visited:
next = to_visit.pop(0)
already_visited.add(next)
return next
def _get_for_loop_ops(self, new_root) -> Dict[Text, dsl.ParallelFor]:
to_visit = self._get_all_subgroups_and_ops(new_root)
op_name_to_op = {}
already_visited = set()
while len(to_visit):
next_op = self._get_next_group_or_op(to_visit, already_visited)
if next_op is None:
break
to_visit.extend(self._get_all_subgroups_and_ops(next_op))
if isinstance(next_op, dsl.ParallelFor):
op_name_to_op[next_op.name] = next_op
return op_name_to_op
def _get_all_subgroups_and_ops(self, op):
"""Get all ops and groups contained within this group."""
subgroups = []
if hasattr(op, 'ops'):
subgroups.extend(op.ops)
if hasattr(op, 'groups'):
subgroups.extend(op.groups)
return subgroups
def _get_inputs_outputs(
self,
pipeline,
root_group,
op_groups,
opsgroup_groups,
condition_params,
op_name_to_for_loop_op: Dict[Text, dsl.ParallelFor],
):
"""Get inputs and outputs of each group and op.
Returns:
@ -202,31 +246,42 @@ class Compiler(object):
# it as input for its parent groups.
if param.value:
continue
full_name = self._pipelineparam_full_name(param)
if param.op_name:
upstream_op = pipeline.ops[param.op_name]
upstream_groups, downstream_groups = self._get_uncommon_ancestors(
op_groups, opsgroup_groups, upstream_op, op)
for i, g in enumerate(downstream_groups):
upstream_groups, downstream_groups = \
self._get_uncommon_ancestors(op_groups, opsgroup_groups, upstream_op, op)
for i, group_name in enumerate(downstream_groups):
if i == 0:
# If it is the first uncommon downstream group, then the input comes from
# the first uncommon upstream group.
inputs[g].add((full_name, upstream_groups[0]))
inputs[group_name].add((param.full_name, upstream_groups[0]))
else:
# If not the first downstream group, then the input is passed down from
# its ancestor groups so the upstream group is None.
inputs[g].add((full_name, None))
for i, g in enumerate(upstream_groups):
inputs[group_name].add((param.full_name, None))
for i, group_name in enumerate(upstream_groups):
if i == len(upstream_groups) - 1:
# If last upstream group, it is an operator and output comes from container.
outputs[g].add((full_name, None))
outputs[group_name].add((param.full_name, None))
else:
# If not last upstream group, output value comes from one of its child.
outputs[g].add((full_name, upstream_groups[i+1]))
outputs[group_name].add((param.full_name, upstream_groups[i+1]))
else:
if not op.is_exit_handler:
for g in op_groups[op.name]:
inputs[g].add((full_name, None))
for group_name in op_groups[op.name][::-1]:
# if group is for loop group and param is that loop's param, then the param
# is created by that for loop ops_group and it shouldn't be an input to
# any of its parent groups.
inputs[group_name].add((param.full_name, None))
if group_name in op_name_to_for_loop_op:
# for example:
# loop_group.loop_args.name = 'loop-item-param-99ca152e'
# param.name = 'loop-item-param-99ca152e--a'
loop_group = op_name_to_for_loop_op[group_name]
if loop_group.loop_args.name in param.name:
break
# Generate the input/output for recursive opsgroups
# It propagates the recursive opsgroups IO to their ancester opsgroups
def _get_inputs_outputs_recursive_opsgroup(group):
@ -240,8 +295,8 @@ class Compiler(object):
full_name = self._pipelineparam_full_name(param)
if param.op_name:
upstream_op = pipeline.ops[param.op_name]
upstream_groups, downstream_groups = self._get_uncommon_ancestors(
op_groups, opsgroup_groups, upstream_op, group)
upstream_groups, downstream_groups = \
self._get_uncommon_ancestors(op_groups, opsgroup_groups, upstream_op, group)
for i, g in enumerate(downstream_groups):
if i == 0:
inputs[g].add((full_name, upstream_groups[0]))
@ -261,7 +316,9 @@ class Compiler(object):
inputs[g].add((full_name, None))
for subgroup in group.groups:
_get_inputs_outputs_recursive_opsgroup(subgroup)
_get_inputs_outputs_recursive_opsgroup(root_group)
return inputs, outputs
def _get_dependencies(self, pipeline, root_group, op_groups, opsgroups_groups, opsgroups, condition_params):
@ -291,8 +348,8 @@ class Compiler(object):
else:
raise ValueError('compiler cannot find the ' + op_name)
upstream_groups, downstream_groups = self._get_uncommon_ancestors(
op_groups, opsgroups_groups, upstream_op, op)
upstream_groups, downstream_groups = \
self._get_uncommon_ancestors(op_groups, opsgroups_groups, upstream_op, op)
dependencies[downstream_groups[0]].add(upstream_groups[0])
# Generate dependencies based on the recursive opsgroups
@ -311,8 +368,8 @@ class Compiler(object):
upstream_op = opsgroups_groups[op_name]
else:
raise ValueError('compiler cannot find the ' + op_name)
upstream_groups, downstream_groups = self._get_uncommon_ancestors(
op_groups, opsgroups_groups, upstream_op, group)
upstream_groups, downstream_groups = \
self._get_uncommon_ancestors(op_groups, opsgroups_groups, upstream_op, group)
dependencies[downstream_groups[0]].add(upstream_groups[0])
for subgroup in group.groups:
@ -345,7 +402,7 @@ class Compiler(object):
else:
return str(value_or_reference)
def _group_to_template(self, group, inputs, outputs, dependencies):
def _group_to_dag_template(self, group, inputs, outputs, dependencies):
"""Generate template given an OpsGroup.
inputs, outputs, dependencies are all helper dicts.
@ -359,6 +416,7 @@ class Compiler(object):
template['inputs'] = {
'parameters': template_inputs
}
# Generate outputs section.
if outputs.get(group.name, None):
template_outputs = []
@ -436,9 +494,24 @@ class Compiler(object):
'value': '{{inputs.parameters.%s}}' % param_name
})
else:
if isinstance(sub_group, dsl.ParallelFor):
if sub_group.loop_args.name in param_name:
if _for_loop.LoopArgumentVariable.name_is_loop_arguments_variable(param_name):
subvar_name = _for_loop.LoopArgumentVariable.get_subvar_name(param_name)
value = '{{item.%s}}' % subvar_name
elif _for_loop.LoopArguments.name_is_loop_arguments(param_name):
value = '{{item}}'
else:
raise ValueError("Failed to match loop args with param. param_name: {}, ".format(param_name) +
"sub_group.loop_args.name: {}.".format(sub_group.loop_args.name))
else:
value = '{{inputs.parameters.%s}}' % param_name
task['withItems'] = sub_group.loop_args.to_list_for_task_yaml()
else:
value = '{{inputs.parameters.%s}}' % param_name
arguments.append({
'name': param_name,
'value': '{{inputs.parameters.%s}}' % param_name
'value': value,
})
arguments.sort(key=lambda x: x['name'])
task['arguments'] = {'parameters': arguments}
@ -447,7 +520,7 @@ class Compiler(object):
template['dag'] = {'tasks': tasks}
return template
def _create_templates(self, pipeline, op_transformers=None, op_to_templates_handler=None):
def _create_dag_templates(self, pipeline, op_transformers=None, op_to_templates_handler=None):
"""Create all groups and ops templates in the pipeline.
Args:
@ -455,9 +528,8 @@ class Compiler(object):
op_transformers: A list of functions that are applied to all ContainerOp instances that are being processed.
op_to_templates_handler: Handler which converts a base op into a list of argo templates.
"""
op_to_templates_handler = op_to_templates_handler or (lambda op : [_op_to_template(op)])
new_root_group = pipeline.groups[0]
root_group = pipeline.groups[0]
# Call the transformation functions before determining the inputs/outputs, otherwise
# the user would not be able to use pipeline parameters in the container definition
@ -467,25 +539,40 @@ class Compiler(object):
transformer(op)
# Generate core data structures to prepare for argo yaml generation
# op_groups: op name -> list of ancestor groups including the current op
# op_name_to_parent_groups: op name -> list of ancestor groups including the current op
# opsgroups: a dictionary of ospgroup.name -> opsgroup
# inputs, outputs: group/op names -> list of tuples (full_param_name, producing_op_name)
# condition_params: recursive_group/op names -> list of pipelineparam
# dependencies: group/op name -> list of dependent groups/ops.
# Special Handling for the recursive opsgroup
# op_groups also contains the recursive opsgroups
# op_name_to_parent_groups also contains the recursive opsgroups
# condition_params from _get_condition_params_for_ops also contains the recursive opsgroups
# groups does not include the recursive opsgroups
opsgroups = self._get_groups(new_root_group)
op_groups = self._get_groups_for_ops(new_root_group)
opsgroups_groups = self._get_groups_for_opsgroups(new_root_group)
condition_params = self._get_condition_params_for_ops(new_root_group)
inputs, outputs = self._get_inputs_outputs(pipeline, new_root_group, op_groups, opsgroups_groups, condition_params)
dependencies = self._get_dependencies(pipeline, new_root_group, op_groups, opsgroups_groups, opsgroups, condition_params)
opsgroups = self._get_groups(root_group)
op_name_to_parent_groups = self._get_groups_for_ops(root_group)
opgroup_name_to_parent_groups = self._get_groups_for_opsgroups(root_group)
condition_params = self._get_condition_params_for_ops(root_group)
op_name_to_for_loop_op = self._get_for_loop_ops(root_group)
inputs, outputs = self._get_inputs_outputs(
pipeline,
root_group,
op_name_to_parent_groups,
opgroup_name_to_parent_groups,
condition_params,
op_name_to_for_loop_op,
)
dependencies = self._get_dependencies(
pipeline,
root_group,
op_name_to_parent_groups,
opgroup_name_to_parent_groups,
opsgroups,
condition_params,
)
templates = []
for opsgroup in opsgroups.keys():
template = self._group_to_template(opsgroups[opsgroup], inputs, outputs, dependencies)
template = self._group_to_dag_template(opsgroups[opsgroup], inputs, outputs, dependencies)
templates.append(template)
for op in pipeline.ops.values():
@ -519,7 +606,7 @@ class Compiler(object):
input_params.append(param)
# Templates
templates = self._create_templates(pipeline, op_transformers)
templates = self._create_dag_templates(pipeline, op_transformers)
templates.sort(key=lambda x: x['name'])
# Exit Handler
@ -601,7 +688,7 @@ class Compiler(object):
if arg_name == input.name:
arg_type = input.param_type
break
args_list.append(dsl.PipelineParam(K8sHelper.sanitize_k8s_name(arg_name), param_type = arg_type))
args_list.append(dsl.PipelineParam(K8sHelper.sanitize_k8s_name(arg_name), param_type=arg_type))
with dsl.Pipeline(pipeline_name) as p:
pipeline_func(*args_list)
@ -677,6 +764,9 @@ class Compiler(object):
yaml.Dumper.ignore_aliases = lambda *args : True
yaml_text = yaml.dump(workflow, default_flow_style=False)
if package_path is None:
return yaml_text
if package_path.endswith('.tar.gz') or package_path.endswith('.tgz'):
from contextlib import closing
from io import BytesIO

View File

@ -22,6 +22,6 @@ from ._volume_op import (
)
from ._pipeline_volume import PipelineVolume
from ._volume_snapshot_op import VolumeSnapshotOp
from ._ops_group import OpsGroup, ExitHandler, Condition
from ._ops_group import OpsGroup, ExitHandler, Condition, ParallelFor
from ._component import python_component, graph_component, component
from ._artifact_location import ArtifactLocation

View File

@ -11,7 +11,7 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import collections
import re
import warnings
from typing import Any, Dict, List, TypeVar, Union, Callable, Optional, Sequence
@ -729,6 +729,9 @@ class BaseOp(object):
self.init_containers = init_containers or []
self.sidecars = sidecars or []
# used to mark this op with loop arguments
self.loop_args = None
# attributes specific to `BaseOp`
self._inputs = []
self.dependent_names = []
@ -745,10 +748,7 @@ class BaseOp(object):
self._inputs = []
# TODO replace with proper k8s obj?
for key in self.attrs_with_pipelineparams:
self._inputs += [
param for param in _pipeline_param.
extract_pipelineparams_from_any(getattr(self, key))
]
self._inputs += _pipeline_param.extract_pipelineparams_from_any(getattr(self, key))
# keep only unique
self._inputs = list(set(self._inputs))
return self._inputs
@ -900,7 +900,7 @@ class BaseOp(object):
return str({self.__class__.__name__: self.__dict__})
from ._pipeline_volume import PipelineVolume #The import is here to prevent circular reference problems.
from ._pipeline_volume import PipelineVolume # The import is here to prevent circular reference problems.
class ContainerOp(BaseOp):
@ -952,20 +952,21 @@ class ContainerOp(BaseOp):
# Excludes `file_outputs` and `outputs` as they are handled separately
# in the compilation process to generate the DAGs and task io parameters.
def __init__(self,
name: str,
image: str,
command: StringOrStringList = None,
arguments: StringOrStringList = None,
init_containers: List[UserContainer] = None,
sidecars: List[Sidecar] = None,
container_kwargs: Dict = None,
file_outputs: Dict[str, str] = None,
output_artifact_paths : Dict[str, str]=None,
artifact_location: V1alpha1ArtifactLocation=None,
is_exit_handler=False,
pvolumes: Dict[str, V1Volume] = None,
):
def __init__(
self,
name: str,
image: str,
command: StringOrStringList = None,
arguments: StringOrStringList = None,
init_containers: List[UserContainer] = None,
sidecars: List[Sidecar] = None,
container_kwargs: Dict = None,
file_outputs: Dict[str, str] = None,
output_artifact_paths : Dict[str, str]=None,
artifact_location: V1alpha1ArtifactLocation=None,
is_exit_handler=False,
pvolumes: Dict[str, V1Volume] = None,
):
"""Create a new instance of ContainerOp.
Args:
@ -1060,6 +1061,7 @@ class ContainerOp(BaseOp):
self.pvolumes = {}
self.add_pvolumes(pvolumes)
@property
def command(self):
return self._container.command

View File

@ -0,0 +1,128 @@
import re
from typing import List, Union, Dict, Text, Any, Tuple
from kfp import dsl
ItemList = List[Union[int, float, str, Dict[Text, Any]]]
class LoopArguments(dsl.PipelineParam):
"""Class representing the arguments that are looped over in a ParallelFor loop in the KFP DSL.
This doesn't need to be instantiated by the end user, rather it will be automatically created by a
ParallelFor ops group."""
LOOP_ITEM_PARAM_NAME_BASE = 'loop-item-param'
# number of characters in the code which is passed to the constructor
NUM_CODE_CHARS = 8
LEGAL_SUBVAR_NAME_REGEX = re.compile(r'[a-zA-Z_][0-9a-zA-Z_]*')
@classmethod
def _subvar_name_is_legal(cls, proposed_variable_name: Text):
return re.match(cls.LEGAL_SUBVAR_NAME_REGEX, proposed_variable_name) is not None
def __init__(self, items: ItemList, code: Text):
"""_LoopArguments represent the set of items to loop over in a ParallelFor loop. This class shoudn't be
instantiated by the user but rather is created by _ops_group.ParallelFor.
Args:
items: List of items to loop over. If a list of dicts then, all dicts must have the same keys and every
key must be a legal Python variable name.
code: A unique code used to identify these loop arguments. Should match the code for the ParallelFor
ops_group which created these _LoopArguments. This prevents parameter name collissions.
"""
super().__init__(name=self._make_name(code))
if not isinstance(items, (list, tuple)):
raise TypeError("Expected list or tuple, got {}.".format(type(items)))
if isinstance(items[0], dict):
subvar_names = set(items[0].keys())
for item in items:
if not set(item.keys()) == subvar_names:
raise ValueError("If you input a list of dicts then all dicts should have the same keys. "
"Got: {}.".format(items))
# then this block creates loop_args.variable_a and loop_args.variable_b
for subvar_name in subvar_names:
if not self._subvar_name_is_legal(subvar_name):
raise ValueError("Tried to create subvariable named {} but that's not a legal Python variable "
"name.".format(subvar_name))
setattr(self, subvar_name, LoopArgumentVariable(self.name, subvar_name))
self.items = items
def to_list_for_task_yaml(self):
return self.items
@classmethod
def _make_name(cls, code: Text):
"""Make a name for this parameter. Code is a """
return '{}-{}'.format(cls.LOOP_ITEM_PARAM_NAME_BASE, code)
@classmethod
def name_is_loop_arguments(cls, param_name: Text) -> bool:
"""Return True if the given parameter name looks like it came from a loop arguments parameter."""
return re.match(
'%s-[0-9a-f]{%s}' % (cls.LOOP_ITEM_PARAM_NAME_BASE, cls.NUM_CODE_CHARS),
param_name
) is not None
class LoopArgumentVariable(dsl.PipelineParam):
"""Represents a subvariable for loop arguments. This is used for cases where we're looping over maps,
each of which contains several variables."""
SUBVAR_NAME_DELIMITER = '-subvar-'
def __init__(self, loop_args_name: Text, this_variable_name: Text):
"""
If the user ran:
with dsl.ParallelFor([{'a': 1, 'b': 2}, {'a': 3, 'b': 4}]) as item:
...
Then there's be one _LoopArgumentsVariable for 'a' and another for 'b'.
Args:
loop_args_name: the name of the _LoopArguments object that this is a subvariable to.
this_variable_name: the name of this subvariable, which is the name of the dict key that spawned
this subvariable.
"""
super().__init__(name=self.get_name(loop_args_name=loop_args_name, this_variable_name=this_variable_name))
@classmethod
def get_name(cls, loop_args_name: Text, this_variable_name: Text) -> Text:
"""Get the name
Args:
loop_args_name: the name of the loop args parameter that this LoopArgsVariable is attached to.
this_variable_name: the name of this LoopArgumentsVariable subvar.
Returns: The name of this loop args variable.
"""
return '{}{}{}'.format(loop_args_name, cls.SUBVAR_NAME_DELIMITER, this_variable_name)
@classmethod
def name_is_loop_arguments_variable(cls, param_name: Text) -> bool:
"""Return True if the given parameter name looks like it came from a LoopArgumentsVariable."""
return re.match(
'%s-[0-9a-f]{%s}%s.*' % (
LoopArguments.LOOP_ITEM_PARAM_NAME_BASE,
LoopArguments.NUM_CODE_CHARS,
cls.SUBVAR_NAME_DELIMITER
),
param_name
) is not None
@classmethod
def parse_loop_args_name_and_this_var_name(cls, t: Text) -> Tuple[Text, Text]:
"""Get the loop arguments param name and this subvariable name from the given parameter name."""
m = re.match('(?P<loop_args_name>.*){}(?P<this_var_name>.*)'.format(cls.SUBVAR_NAME_DELIMITER), t)
if m is None:
return None
else:
return m.groupdict()['loop_args_name'], m.groupdict()['this_var_name']
@classmethod
def get_subvar_name(cls, t: Text) -> Text:
"""Get the subvariable name from a given LoopArgumentsVariable parameter name."""
out = cls.parse_loop_args_name_and_this_var_name(t)
if out is None:
raise ValueError("Couldn't parse variable name: {}".format(t))
return out[1]

View File

@ -16,6 +16,7 @@ from typing import Dict, List
from abc import ABCMeta, abstractmethod
from .types import BaseType, _check_valid_type_dict, _instance_to_dict
class BaseMeta(object):
__metaclass__ = ABCMeta
def __init__(self):
@ -50,6 +51,7 @@ class ParameterMeta(BaseMeta):
'type': self.param_type or '',
'default': self.default}
class ComponentMeta(BaseMeta):
def __init__(
self,
@ -70,6 +72,7 @@ class ComponentMeta(BaseMeta):
'outputs': [ output.to_dict() for output in self.outputs ]
}
# Add a pipeline level metadata calss here.
# If one day we combine the component and pipeline yaml, ComponentMeta and PipelineMeta will become one, too.
class PipelineMeta(BaseMeta):

View File

@ -11,7 +11,10 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from typing import Union
import uuid
from kfp.dsl import _for_loop
from . import _container_op
from . import _pipeline
@ -28,7 +31,7 @@ class OpsGroup(object):
def __init__(self, group_type: str, name: str=None):
"""Create a new instance of OpsGroup.
Args:
group_type (str): one of 'pipeline', 'exit_handler', 'condition', and 'graph'.
group_type (str): one of 'pipeline', 'exit_handler', 'condition', 'for_loop', and 'graph'.
name (str): name of the opsgroup
"""
#TODO: declare the group_type to be strongly typed
@ -40,8 +43,11 @@ class OpsGroup(object):
# recursive_ref points to the opsgroups with the same name if exists.
self.recursive_ref = None
self.loop_args = None
@staticmethod
def _get_opsgroup_pipeline(group_type, name):
def _get_matching_opsgroup_already_in_pipeline(group_type, name):
"""retrieves the opsgroup when the pipeline already contains it.
the opsgroup might be already in the pipeline in case of recursive calls.
Args:
@ -52,10 +58,11 @@ class OpsGroup(object):
if name is None:
return None
name_pattern = '^' + (group_type + '-' + name + '-').replace('_', '-') + '[\d]+$'
for ops_group in _pipeline.Pipeline.get_default_pipeline().groups:
for ops_group_already_in_pipeline in _pipeline.Pipeline.get_default_pipeline().groups:
import re
if ops_group.type == group_type and re.match(name_pattern ,ops_group.name):
return ops_group
if ops_group_already_in_pipeline.type == group_type \
and re.match(name_pattern ,ops_group_already_in_pipeline.name):
return ops_group_already_in_pipeline
return None
def _make_name_unique(self):
@ -71,7 +78,7 @@ class OpsGroup(object):
if not _pipeline.Pipeline.get_default_pipeline():
raise ValueError('Default pipeline not defined.')
self.recursive_ref = self._get_opsgroup_pipeline(self.type, self.name)
self.recursive_ref = self._get_matching_opsgroup_already_in_pipeline(self.type, self.name)
if not self.recursive_ref:
self._make_name_unique()
@ -132,6 +139,7 @@ class Condition(OpsGroup):
super(Condition, self).__init__('condition')
self.condition = condition
class Graph(OpsGroup):
"""Graph DAG with inputs, recursive_inputs, and outputs.
This is not used directly by the users but auto generated when the graph_component decoration exists
@ -141,3 +149,36 @@ class Graph(OpsGroup):
self.inputs = []
self.outputs = {}
self.dependencies = []
class ParallelFor(OpsGroup):
"""Represents a parallel for loop over a static set of items.
Example usage:
```python
with dsl.ParallelFor([{'a': 1, 'b': 10}, {'a': 2, 'b': 20}]) as item:
op1 = ContainerOp(..., args=['echo {}'.format(item.a)])
op2 = ContainerOp(..., args=['echo {}'.format(item.b])
```
and op1 would be executed twice, once with args=['echo 1'] and once with args=['echo 2']
"""
TYPE_NAME = 'for_loop'
@staticmethod
def _get_unique_id_code():
return uuid.uuid4().hex[:_for_loop.LoopArguments.NUM_CODE_CHARS]
def __init__(self, loop_args: _for_loop.ItemList):
# random code to id this loop
code = self._get_unique_id_code()
group_name = 'for-loop-{}'.format(code)
super().__init__(self.TYPE_NAME, name=group_name)
if not isinstance(loop_args, _for_loop.LoopArguments):
loop_args = _for_loop.LoopArguments(loop_args, code)
self.loop_args = loop_args
def __enter__(self) -> _for_loop.LoopArguments:
_ = super().__enter__()
return self.loop_args

View File

@ -11,11 +11,9 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import re
from collections import namedtuple
from typing import Dict, List, Union
from typing import List, Dict, Union
# TODO: Move this to a separate class
@ -44,11 +42,12 @@ def match_serialized_pipelineparam(payload: str):
for match in matches:
pattern = '{{pipelineparam:op=%s;name=%s}}' % (match[0], match[1])
param_tuples.append(PipelineParamTuple(
name=sanitize_k8s_name(match[1]),
op=sanitize_k8s_name(match[0]),
pattern=pattern))
name=sanitize_k8s_name(match[1]),
op=sanitize_k8s_name(match[0]),
pattern=pattern))
return param_tuples
def _extract_pipelineparams(payloads: str or List[str]):
"""_extract_pipelineparam extract a list of PipelineParam instances from the payload string.
Note: this function removes all duplicate matches.
@ -122,7 +121,6 @@ def extract_pipelineparams_from_any(payload) -> List['PipelineParam']:
return list(set(pipeline_params))
# return empty list
return []
@ -153,7 +151,8 @@ class PipelineParam(object):
valid_name_regex = r'^[A-Za-z][A-Za-z0-9\s_-]*$'
if not re.match(valid_name_regex, name):
raise ValueError('Only letters, numbers, spaces, "_", and "-" are allowed in name. Must begin with letter: %s' % (name))
raise ValueError('Only letters, numbers, spaces, "_", and "-" are allowed in name. Must begin with a letter. '
'Got name: {}'.format(name))
if op_name and value:
raise ValueError('op_name and value cannot be both set.')

View File

@ -639,6 +639,12 @@ implementation:
if container:
self.assertEqual(template['retryStrategy']['limit'], 5)
def test_withitem_basic(self):
self._test_py_compile_yaml('withitem_basic')
def test_withitem_nested(self):
self._test_py_compile_yaml('withitem_nested')
def test_add_pod_env(self):
self._test_py_compile_yaml('add_pod_env')

View File

@ -15,7 +15,6 @@
import kfp.dsl as dsl
@dsl.pipeline(
name='Default Value',
description='A pipeline with parameter and default value.'

View File

@ -23,7 +23,8 @@ def pipelineparams_pipeline(tag: str = 'latest', sleep_ms: int = 10):
echo = dsl.Sidecar(
name='echo',
image='hashicorp/http-echo:%s' % tag,
args=['-text="hello world"'])
args=['-text="hello world"'],
)
op1 = dsl.ContainerOp(
name='download',

View File

@ -0,0 +1,59 @@
# Copyright 2018 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import kfp.dsl as dsl
from kfp.dsl import _for_loop
class Coder:
def __init__(self, ):
self._code_id = 0
def get_code(self, ):
self._code_id += 1
return '{code:0{num_chars:}d}'.format(code=self._code_id, num_chars=_for_loop.LoopArguments.NUM_CODE_CHARS)
dsl.ParallelFor._get_unique_id_code = Coder().get_code
@dsl.pipeline(name='my-pipeline')
def pipeline(my_pipe_param=10):
loop_args = [{'a': 1, 'b': 2}, {'a': 10, 'b': 20}]
with dsl.ParallelFor(loop_args) as item:
op1 = dsl.ContainerOp(
name="my-in-coop1",
image="library/bash:4.4.23",
command=["sh", "-c"],
arguments=["echo op1 %s %s" % (item.a, my_pipe_param)],
)
op2 = dsl.ContainerOp(
name="my-in-coop2",
image="library/bash:4.4.23",
command=["sh", "-c"],
arguments=["echo op2 %s" % item.b],
)
op_out = dsl.ContainerOp(
name="my-out-cop",
image="library/bash:4.4.23",
command=["sh", "-c"],
arguments=["echo %s" % my_pipe_param],
)
if __name__ == '__main__':
from kfp import compiler
print(compiler.Compiler().compile(pipeline, package_path=None))

View File

@ -0,0 +1,122 @@
apiVersion: argoproj.io/v1alpha1
kind: Workflow
metadata:
annotations:
pipelines.kubeflow.org/pipeline_spec: '{"description": null, "inputs": [{"default":
10, "description": "", "name": "my_pipe_param", "type": ""}], "name": "my-pipeline"}'
generateName: my-pipeline-
spec:
arguments:
parameters:
- name: my-pipe-param
value: '10'
entrypoint: my-pipeline
serviceAccountName: pipeline-runner
templates:
- dag:
tasks:
- arguments:
parameters:
- name: loop-item-param-00000001-subvar-a
value: '{{inputs.parameters.loop-item-param-00000001-subvar-a}}'
- name: my-pipe-param
value: '{{inputs.parameters.my-pipe-param}}'
name: my-in-coop1
template: my-in-coop1
- arguments:
parameters:
- name: loop-item-param-00000001-subvar-b
value: '{{inputs.parameters.loop-item-param-00000001-subvar-b}}'
name: my-in-coop2
template: my-in-coop2
inputs:
parameters:
- name: loop-item-param-00000001-subvar-a
- name: loop-item-param-00000001-subvar-b
- name: my-pipe-param
name: for-loop-for-loop-00000001-1
- container:
args:
- echo op1 {{inputs.parameters.loop-item-param-00000001-subvar-a}} {{inputs.parameters.my-pipe-param}}
command:
- sh
- -c
image: library/bash:4.4.23
inputs:
parameters:
- name: loop-item-param-00000001-subvar-a
- name: my-pipe-param
name: my-in-coop1
outputs:
artifacts:
- name: mlpipeline-ui-metadata
optional: true
path: /mlpipeline-ui-metadata.json
- name: mlpipeline-metrics
optional: true
path: /mlpipeline-metrics.json
- container:
args:
- echo op2 {{inputs.parameters.loop-item-param-00000001-subvar-b}}
command:
- sh
- -c
image: library/bash:4.4.23
inputs:
parameters:
- name: loop-item-param-00000001-subvar-b
name: my-in-coop2
outputs:
artifacts:
- name: mlpipeline-ui-metadata
optional: true
path: /mlpipeline-ui-metadata.json
- name: mlpipeline-metrics
optional: true
path: /mlpipeline-metrics.json
- container:
args:
- echo {{inputs.parameters.my-pipe-param}}
command:
- sh
- -c
image: library/bash:4.4.23
inputs:
parameters:
- name: my-pipe-param
name: my-out-cop
outputs:
artifacts:
- name: mlpipeline-ui-metadata
optional: true
path: /mlpipeline-ui-metadata.json
- name: mlpipeline-metrics
optional: true
path: /mlpipeline-metrics.json
- dag:
tasks:
- arguments:
parameters:
- name: loop-item-param-00000001-subvar-a
value: '{{item.a}}'
- name: loop-item-param-00000001-subvar-b
value: '{{item.b}}'
- name: my-pipe-param
value: '{{inputs.parameters.my-pipe-param}}'
name: for-loop-for-loop-00000001-1
template: for-loop-for-loop-00000001-1
withItems:
- a: 1
b: 2
- a: 10
b: 20
- arguments:
parameters:
- name: my-pipe-param
value: '{{inputs.parameters.my-pipe-param}}'
name: my-out-cop
template: my-out-cop
inputs:
parameters:
- name: my-pipe-param
name: my-pipeline

View File

@ -0,0 +1,62 @@
# Copyright 2018 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import kfp.dsl as dsl
from kfp.dsl import _for_loop
class Coder:
def __init__(self, ):
self._code_id = 0
def get_code(self, ):
self._code_id += 1
return '{code:0{num_chars:}d}'.format(code=self._code_id, num_chars=_for_loop.LoopArguments.NUM_CODE_CHARS)
dsl.ParallelFor._get_unique_id_code = Coder().get_code
@dsl.pipeline(name='my-pipeline')
def pipeline(my_pipe_param=10):
loop_args = [{'a': 1, 'b': 2}, {'a': 10, 'b': 20}]
with dsl.ParallelFor(loop_args) as item:
op1 = dsl.ContainerOp(
name="my-in-coop1",
image="library/bash:4.4.23",
command=["sh", "-c"],
arguments=["echo op1 %s %s" % (item.a, my_pipe_param)],
)
with dsl.ParallelFor([100, 200, 300]) as inner_item:
op11 = dsl.ContainerOp(
name="my-inner-inner-coop",
image="library/bash:4.4.23",
command=["sh", "-c"],
arguments=["echo op1 %s %s %s" % (item.a, inner_item, my_pipe_param)],
)
op2 = dsl.ContainerOp(
name="my-in-coop2",
image="library/bash:4.4.23",
command=["sh", "-c"],
arguments=["echo op2 %s" % item.b],
)
op_out = dsl.ContainerOp(
name="my-out-cop",
image="library/bash:4.4.23",
command=["sh", "-c"],
arguments=["echo %s" % my_pipe_param],
)

View File

@ -0,0 +1,176 @@
apiVersion: argoproj.io/v1alpha1
kind: Workflow
metadata:
annotations:
pipelines.kubeflow.org/pipeline_spec: '{"description": null, "inputs": [{"default":
10, "description": "", "name": "my_pipe_param", "type": ""}], "name": "my-pipeline"}'
generateName: my-pipeline-
spec:
arguments:
parameters:
- name: my-pipe-param
value: '10'
entrypoint: my-pipeline
serviceAccountName: pipeline-runner
templates:
- dag:
tasks:
- arguments:
parameters:
- name: loop-item-param-00000001-subvar-a
value: '{{inputs.parameters.loop-item-param-00000001-subvar-a}}'
- name: loop-item-param-00000002
value: '{{item}}'
- name: my-pipe-param
value: '{{inputs.parameters.my-pipe-param}}'
name: for-loop-for-loop-00000002-2
template: for-loop-for-loop-00000002-2
withItems:
- 100
- 200
- 300
- arguments:
parameters:
- name: loop-item-param-00000001-subvar-a
value: '{{inputs.parameters.loop-item-param-00000001-subvar-a}}'
- name: my-pipe-param
value: '{{inputs.parameters.my-pipe-param}}'
name: my-in-coop1
template: my-in-coop1
- arguments:
parameters:
- name: loop-item-param-00000001-subvar-b
value: '{{inputs.parameters.loop-item-param-00000001-subvar-b}}'
name: my-in-coop2
template: my-in-coop2
inputs:
parameters:
- name: loop-item-param-00000001-subvar-a
- name: loop-item-param-00000001-subvar-b
- name: my-pipe-param
name: for-loop-for-loop-00000001-1
- dag:
tasks:
- arguments:
parameters:
- name: loop-item-param-00000001-subvar-a
value: '{{inputs.parameters.loop-item-param-00000001-subvar-a}}'
- name: loop-item-param-00000002
value: '{{inputs.parameters.loop-item-param-00000002}}'
- name: my-pipe-param
value: '{{inputs.parameters.my-pipe-param}}'
name: my-inner-inner-coop
template: my-inner-inner-coop
inputs:
parameters:
- name: loop-item-param-00000001-subvar-a
- name: loop-item-param-00000002
- name: my-pipe-param
name: for-loop-for-loop-00000002-2
- container:
args:
- echo op1 {{inputs.parameters.loop-item-param-00000001-subvar-a}} {{inputs.parameters.my-pipe-param}}
command:
- sh
- -c
image: library/bash:4.4.23
inputs:
parameters:
- name: loop-item-param-00000001-subvar-a
- name: my-pipe-param
name: my-in-coop1
outputs:
artifacts:
- name: mlpipeline-ui-metadata
optional: true
path: /mlpipeline-ui-metadata.json
- name: mlpipeline-metrics
optional: true
path: /mlpipeline-metrics.json
- container:
args:
- echo op2 {{inputs.parameters.loop-item-param-00000001-subvar-b}}
command:
- sh
- -c
image: library/bash:4.4.23
inputs:
parameters:
- name: loop-item-param-00000001-subvar-b
name: my-in-coop2
outputs:
artifacts:
- name: mlpipeline-ui-metadata
optional: true
path: /mlpipeline-ui-metadata.json
- name: mlpipeline-metrics
optional: true
path: /mlpipeline-metrics.json
- container:
args:
- echo op1 {{inputs.parameters.loop-item-param-00000001-subvar-a}} {{inputs.parameters.loop-item-param-00000002}}
{{inputs.parameters.my-pipe-param}}
command:
- sh
- -c
image: library/bash:4.4.23
inputs:
parameters:
- name: loop-item-param-00000001-subvar-a
- name: loop-item-param-00000002
- name: my-pipe-param
name: my-inner-inner-coop
outputs:
artifacts:
- name: mlpipeline-ui-metadata
optional: true
path: /mlpipeline-ui-metadata.json
- name: mlpipeline-metrics
optional: true
path: /mlpipeline-metrics.json
- container:
args:
- echo {{inputs.parameters.my-pipe-param}}
command:
- sh
- -c
image: library/bash:4.4.23
inputs:
parameters:
- name: my-pipe-param
name: my-out-cop
outputs:
artifacts:
- name: mlpipeline-ui-metadata
optional: true
path: /mlpipeline-ui-metadata.json
- name: mlpipeline-metrics
optional: true
path: /mlpipeline-metrics.json
- dag:
tasks:
- arguments:
parameters:
- name: loop-item-param-00000001-subvar-a
value: '{{item.a}}'
- name: loop-item-param-00000001-subvar-b
value: '{{item.b}}'
- name: my-pipe-param
value: '{{inputs.parameters.my-pipe-param}}'
name: for-loop-for-loop-00000001-1
template: for-loop-for-loop-00000001-1
withItems:
- a: 1
b: 2
- a: 10
b: 20
- arguments:
parameters:
- name: my-pipe-param
value: '{{inputs.parameters.my-pipe-param}}'
name: my-out-cop
template: my-out-cop
inputs:
parameters:
- name: my-pipe-param
name: my-pipeline