feat(sdk): support for fan-in from `dsl.ParallelFor` (#8631)

* implement parallelfor fan-in

* temporarily block fan-in of artifacts

* clean up

* unify and clarify error messages

* remove comment

* remove unnecessary compiler condition

* make test cases more realistic

* use better ParallelFor variable name

* add and rename test cases

* add ParallelFor fan-in test cases

* fix return collected bug

* clean up compilation tests

* update docstrings; update error message

* remove fan-in artifacts read/write tests

* implement review feedback

* change use of additional_input_name_for_pipeline_channel

* move additional_input_name_for_pipeline_channel

* provide additional pipeline topology validation and tests; refactor existing validation

* add release note for parameter fan-in
This commit is contained in:
Connor McCarthy 2023-02-04 20:50:51 -08:00 committed by GitHub
parent 8f052d0012
commit b575950a75
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
20 changed files with 2473 additions and 178 deletions

View File

@ -1,6 +1,7 @@
# Current Version (in development)
## Features
* Support fanning-in parameter outputs from a task in a `dsl.ParellelFor` context using `dsl.Collected` [\#8631](https://github.com/kubeflow/pipelines/pull/8631)
## Breaking changes

View File

@ -29,6 +29,7 @@ from kfp import components
from kfp import dsl
from kfp.cli import cli
from kfp.compiler import compiler
from kfp.compiler import compiler_utils
from kfp.components.types import type_utils
from kfp.dsl import Artifact
from kfp.dsl import ContainerSpec
@ -443,8 +444,9 @@ class TestCompilePipeline(parameterized.TestCase):
pass
with self.assertRaisesRegex(
RuntimeError,
r'Tasks cannot depend on an upstream task inside'):
compiler_utils.InvalidTopologyException,
r'Illegal task dependency across DSL context managers\. A downstream task cannot depend on an upstream task within a dsl\.ParallelFor context unless the downstream is within that context too or the outputs are begin fanned-in to a list using dsl\.Collected\. Found task dummy-op which depends on upstream task producer-op within an uncommon dsl\.ParallelFor context\.'
):
@dsl.pipeline(name='test-pipeline')
def my_pipeline(val: bool):
@ -453,27 +455,6 @@ class TestCompilePipeline(parameterized.TestCase):
dummy_op(msg=producer_task.output)
def test_valid_data_dependency_loop(self):
@dsl.component
def producer_op() -> str:
return 'a'
@dsl.component
def dummy_op(msg: str = ''):
pass
@dsl.pipeline(name='test-pipeline')
def my_pipeline(val: bool):
with dsl.ParallelFor(['a, b']):
producer_task = producer_op()
dummy_op(msg=producer_task.output)
with tempfile.TemporaryDirectory() as tmpdir:
package_path = os.path.join(tmpdir, 'pipeline.yaml')
compiler.Compiler().compile(
pipeline_func=my_pipeline, package_path=package_path)
def test_invalid_data_dependency_condition(self):
@dsl.component
@ -485,8 +466,9 @@ class TestCompilePipeline(parameterized.TestCase):
pass
with self.assertRaisesRegex(
RuntimeError,
r'Tasks cannot depend on an upstream task inside'):
compiler_utils.InvalidTopologyException,
r'Illegal task dependency across DSL context managers\. A downstream task cannot depend on an upstream task within a dsl\.Condition context unless the downstream is within that context too\. Found task dummy-op which depends on upstream task producer-op within an uncommon dsl\.Condition context\.'
):
@dsl.pipeline(name='test-pipeline')
def my_pipeline(val: bool):
@ -527,8 +509,9 @@ class TestCompilePipeline(parameterized.TestCase):
pass
with self.assertRaisesRegex(
RuntimeError,
r'Tasks cannot depend on an upstream task inside'):
compiler_utils.InvalidTopologyException,
r'Illegal task dependency across DSL context managers\. A downstream task cannot depend on an upstream task within a dsl\.ExitHandler context unless the downstream is within that context too\. Found task dummy-op which depends on upstream task producer-op-2 within an uncommon dsl\.ExitHandler context\.'
):
@dsl.pipeline(name='test-pipeline')
def my_pipeline(val: bool):
@ -1533,8 +1516,9 @@ class TestValidLegalTopologies(unittest.TestCase):
def test_upstream_inside_deeper_condition_blocked(self):
with self.assertRaisesRegex(
RuntimeError,
r'Tasks cannot depend on an upstream task inside'):
compiler_utils.InvalidTopologyException,
r'Illegal task dependency across DSL context managers. A downstream task cannot depend on an upstream task within a dsl.Condition context unless the downstream is within that context too\. Found task print-op-3 which depends on upstream task print-op-2 within an uncommon dsl\.Condition context\.'
):
@dsl.pipeline()
def my_pipeline():
@ -1587,8 +1571,9 @@ class TestValidLegalTopologies(unittest.TestCase):
self):
with self.assertRaisesRegex(
RuntimeError,
r'Tasks cannot depend on an upstream task inside'):
compiler_utils.InvalidTopologyException,
r'Illegal task dependency across DSL context managers. A downstream task cannot depend on an upstream task within a dsl\.Condition context unless the downstream is within that context too\. Found task print-op-3 which depends on upstream task print-op-2 within an uncommon dsl\.Condition context\.'
):
@dsl.pipeline()
def my_pipeline():
@ -1627,8 +1612,9 @@ class TestValidLegalTopologies(unittest.TestCase):
def test_upstream_inside_deeper_nested_condition_blocked(self):
with self.assertRaisesRegex(
RuntimeError,
r'Tasks cannot depend on an upstream task inside'):
compiler_utils.InvalidTopologyException,
r'Illegal task dependency across DSL context managers\. A downstream task cannot depend on an upstream task within a dsl\.Condition context unless the downstream is within that context too\. Found task print-op-3 which depends on upstream task print-op-2 within an uncommon dsl\.Condition context\.'
):
@dsl.pipeline()
def my_pipeline():
@ -1663,8 +1649,9 @@ class TestValidLegalTopologies(unittest.TestCase):
def test_downstream_not_in_same_for_loop_with_upstream_blocked(self):
with self.assertRaisesRegex(
RuntimeError,
r'Tasks cannot depend on an upstream task inside'):
compiler_utils.InvalidTopologyException,
r'Illegal task dependency across DSL context managers\. A downstream task cannot depend on an upstream task within a dsl\.ParallelFor context unless the downstream is within that context too or the outputs are begin fanned-in to a list using dsl\.Collected\. Found task print-op-2 which depends on upstream task print-op within an uncommon dsl\.ParallelFor context\.'
):
@dsl.pipeline()
def my_pipeline():
@ -1683,8 +1670,9 @@ class TestValidLegalTopologies(unittest.TestCase):
self):
with self.assertRaisesRegex(
RuntimeError,
r'Tasks cannot depend on an upstream task inside'):
compiler_utils.InvalidTopologyException,
r'Illegal task dependency across DSL context managers\. A downstream task cannot depend on an upstream task within a dsl\.ParallelFor context unless the downstream is within that context too or the outputs are begin fanned-in to a list using dsl\.Collected\. Found task print-op-2 which depends on upstream task print-op within an uncommon dsl\.ParallelFor context\.'
):
@dsl.pipeline()
def my_pipeline():
@ -1704,7 +1692,7 @@ class TestValidLegalTopologies(unittest.TestCase):
def test_downstream_not_in_same_for_loop_with_upstream_nested_blocked(self):
with self.assertRaisesRegex(
RuntimeError,
compiler_utils.InvalidTopologyException,
r'Downstream tasks in a nested ParallelFor group cannot depend on an upstream task in a shallower ParallelFor group.'
):
@ -1775,7 +1763,10 @@ class TestValidLegalTopologies(unittest.TestCase):
class TestCannotUseAfterCrossDAG(unittest.TestCase):
def test_inner_task_prevented(self):
with self.assertRaisesRegex(RuntimeError, r'Task'):
with self.assertRaisesRegex(
compiler_utils.InvalidTopologyException,
r'Illegal task dependency across DSL context managers\. A downstream task cannot depend on an upstream task within a dsl\.ExitHandler context unless the downstream is within that context too\. Found task print-op-4 which depends on upstream task print-op-2 within an uncommon dsl\.ExitHandler context\.'
):
@dsl.component
def print_op(message: str):
@ -1800,7 +1791,10 @@ class TestCannotUseAfterCrossDAG(unittest.TestCase):
pipeline_func=my_pipeline, package_path=package_path)
def test_exit_handler_task_prevented(self):
with self.assertRaisesRegex(RuntimeError, r'Task'):
with self.assertRaisesRegex(
compiler_utils.InvalidTopologyException,
r'Illegal task dependency across DSL context managers\. A downstream task cannot depend on an upstream task within a dsl\.ExitHandler context unless the downstream is within that context too\. Found task print-op-4 which depends on upstream task print-op-2 within an uncommon dsl\.ExitHandler context\.'
):
@dsl.component
def print_op(message: str):
@ -1851,7 +1845,10 @@ class TestCannotUseAfterCrossDAG(unittest.TestCase):
pipeline_func=my_pipeline, package_path=package_path)
def test_outside_of_condition_blocked(self):
with self.assertRaisesRegex(RuntimeError, r'Task'):
with self.assertRaisesRegex(
compiler_utils.InvalidTopologyException,
r'Illegal task dependency across DSL context managers\. A downstream task cannot depend on an upstream task within a dsl\.Condition context unless the downstream is within that context too\. Found task print-op-3 which depends on upstream task print-op within an uncommon dsl\.Condition context\.'
):
@dsl.component
def print_op(message: str):
@ -2652,5 +2649,245 @@ class TestCompileOptionalArtifacts(unittest.TestCase):
comp()
class TestCrossTasksGroupFanInCollection(unittest.TestCase):
def test_missing_collected_with_correct_annotation(self):
from typing import List
from kfp import dsl
@dsl.component
def double(num: int) -> int:
return 2 * num
@dsl.component
def add(nums: List[int]) -> int:
return sum(nums)
with self.assertRaisesRegex(
type_utils.InconsistentTypeException,
'Argument type "NUMBER_INTEGER" is incompatible with the input type "LIST"'
):
@dsl.pipeline
def math_pipeline() -> int:
with dsl.ParallelFor([1, 2, 3]) as v:
t = double(num=v)
return add(nums=t.output).output
def test_missing_collected_with_incorrect_annotation(self):
@dsl.component
def double(num: int) -> int:
return 2 * num
@dsl.component
def add(nums: int) -> int:
return nums
# the annotation is incorrect, but the user didn't use dsl.Collected
with self.assertRaisesRegex(
compiler_utils.InvalidTopologyException,
r'Illegal task dependency across DSL context managers\. A downstream task cannot depend on an upstream task within a dsl\.ParallelFor context unless the downstream is within that context too or the outputs are begin fanned-in to a list using dsl\.Collected\. Found task add which depends on upstream task double within an uncommon dsl\.ParallelFor context\.'
):
@dsl.pipeline
def math_pipeline() -> int:
with dsl.ParallelFor([1, 2, 3]) as v:
t = double(num=v)
return add(nums=t.output).output
def test_producer_condition_legal1(self):
from kfp import dsl
@dsl.component
def double(num: int) -> int:
return 2 * num
@dsl.component
def add(nums: List[int]) -> int:
return sum(nums)
@dsl.pipeline
def math_pipeline(text: str) -> int:
with dsl.Condition(text == 'text'):
with dsl.ParallelFor([1, 2, 3]) as v:
t = double(num=v)
return add(nums=dsl.Collected(t.output)).output
def test_producer_condition_legal2(self):
@dsl.component
def double(num: int) -> int:
return 2 * num
@dsl.component
def add(nums: List[int]) -> int:
return sum(nums)
@dsl.pipeline
def my_pipeline(a: str):
with dsl.ParallelFor([1, 2, 3]) as v:
with dsl.Condition(v == 1):
t = double(num=v)
with dsl.Condition(a == 'a'):
x = add(nums=dsl.Collected(t.output))
def test_producer_condition_illegal1(self):
@dsl.component
def double(num: int) -> int:
return 2 * num
@dsl.component
def add(nums: List[int]) -> int:
return sum(nums)
with self.assertRaisesRegex(
compiler_utils.InvalidTopologyException,
r'Illegal task dependency across DSL context managers\. When using dsl\.Collected to fan-in outputs from a task within a dsl\.ParallelFor context, the dsl\.ParallelFor context manager cannot be nested within a dsl.Condition context manager unless the consumer task is too\. Task add consumes from double within a dsl\.Condition context\.'
):
@dsl.pipeline
def my_pipeline(a: str = '', b: str = ''):
with dsl.Condition(a == 'a'):
with dsl.ParallelFor([1, 2, 3]) as v:
t = double(num=v)
with dsl.Condition(b == 'b'):
x = add(nums=dsl.Collected(t.output))
def test_producer_condition_illegal2(self):
@dsl.component
def double(num: int) -> int:
return 2 * num
@dsl.component
def add(nums: List[int]) -> int:
return sum(nums)
with self.assertRaisesRegex(
compiler_utils.InvalidTopologyException,
r'Illegal task dependency across DSL context managers\. When using dsl\.Collected to fan-in outputs from a task within a dsl\.ParallelFor context, the dsl\.ParallelFor context manager cannot be nested within a dsl\.Condition context manager unless the consumer task is too\. Task add consumes from double within a dsl\.Condition context\.'
):
@dsl.pipeline
def my_pipeline(a: str = ''):
with dsl.Condition(a == 'a'):
with dsl.ParallelFor([1, 2, 3]) as v:
t = double(num=v)
add(nums=dsl.Collected(t.output))
def test_producer_exit_handler_illegal1(self):
@dsl.component
def double(num: int) -> int:
return 2 * num
@dsl.component
def exit_comp():
print('Running exit task!')
@dsl.component
def add(nums: List[int]) -> int:
return sum(nums)
with self.assertRaisesRegex(
compiler_utils.InvalidTopologyException,
r'Illegal task dependency across DSL context managers\. When using dsl\.Collected to fan-in outputs from a task within a dsl\.ParallelFor context, the dsl\.ParallelFor context manager cannot be nested within a dsl\.ExitHandler context manager unless the consumer task is too\. Task add consumes from double within a dsl\.ExitHandler context\.'
):
@dsl.pipeline
def my_pipeline():
with dsl.ExitHandler(exit_comp()):
with dsl.ParallelFor([1, 2, 3]) as v:
t = double(num=v)
add(nums=dsl.Collected(t.output))
def test_parallelfor_nested_legal_params1(self):
@dsl.component
def add_two_ints(num1: int, num2: int) -> int:
return num1 + num2
@dsl.component
def add(nums: List[List[int]]) -> int:
import itertools
return sum(itertools.chain(*nums))
@dsl.pipeline
def my_pipeline():
with dsl.ParallelFor([1, 2, 3]) as v1:
with dsl.ParallelFor([1, 2, 3]) as v2:
t = add_two_ints(num1=v1, num2=v2)
x = add(nums=dsl.Collected(t.output))
def test_parallelfor_nested_legal_params2(self):
@dsl.component
def add_two_ints(num1: int, num2: int) -> int:
return num1 + num2
@dsl.component
def add(nums: List[List[int]]) -> int:
import itertools
return sum(itertools.chain(*nums))
@dsl.pipeline
def my_pipeline():
with dsl.ParallelFor([1, 2, 3]) as v1:
with dsl.ParallelFor([1, 2, 3]) as v2:
t = add_two_ints(num1=v1, num2=v2)
x = add(nums=dsl.Collected(t.output))
def test_producer_and_consumer_in_same_context(self):
@dsl.component
def double(num: int) -> int:
return 2 * num
@dsl.component
def add(nums: List[int]) -> int:
return sum(nums)
with self.assertRaisesRegex(
compiler_utils.InvalidTopologyException,
r'dsl\.Collected can only be used to fan-in outputs produced by a task within a dsl\.ParallelFor context to a task outside of the dsl\.ParallelFor context\. Producer task double is either not in a dsl\.ParallelFor context or is only in a dsl\.ParallelFor that also contains consumer task add\.'
):
@dsl.pipeline
def math_pipeline():
with dsl.ParallelFor([1, 2, 3]) as x:
t = double(num=x)
add(nums=dsl.Collected(t.output))
def test_no_parallelfor_context(self):
@dsl.component
def double(num: int) -> int:
return 2 * num
@dsl.component
def add(nums: List[int]) -> int:
return sum(nums)
with self.assertRaisesRegex(
compiler_utils.InvalidTopologyException,
r'dsl\.Collected can only be used to fan-in outputs produced by a task within a dsl\.ParallelFor context to a task outside of the dsl\.ParallelFor context\. Producer task double is either not in a dsl\.ParallelFor context or is only in a dsl\.ParallelFor that also contains consumer task add\.'
):
@dsl.pipeline
def math_pipeline():
t = double(num=1)
add(nums=dsl.Collected(t.output))
if __name__ == '__main__':
unittest.main()

View File

@ -15,7 +15,7 @@
import collections
from copy import deepcopy
from typing import Dict, List, Mapping, Set, Tuple, Union
from typing import DefaultDict, Dict, List, Mapping, Set, Tuple, Union
from kfp.components import for_loop
from kfp.components import pipeline_channel
@ -25,6 +25,20 @@ from kfp.components import tasks_group
GroupOrTaskType = Union[tasks_group.TasksGroup, pipeline_task.PipelineTask]
ILLEGAL_CROSS_DAG_ERROR_PREFIX = 'Illegal task dependency across DSL context managers.'
def additional_input_name_for_pipeline_channel(
channel_or_name: Union[pipeline_channel.PipelineChannel, str]) -> str:
"""Gets the name for an additional (compiler-injected) input."""
# Adding a prefix to avoid (reduce chance of) name collision between the
# original component inputs and the injected input.
return 'pipelinechannel--' + (
channel_or_name.full_name if isinstance(
channel_or_name, pipeline_channel.PipelineChannel) else
channel_or_name)
def get_all_groups(
root_group: tasks_group.TasksGroup,) -> List[tasks_group.TasksGroup]:
@ -52,8 +66,7 @@ def get_all_groups(
def get_parent_groups(
root_group: tasks_group.TasksGroup,
) -> Tuple[Mapping[str, List[GroupOrTaskType]], Mapping[str,
List[GroupOrTaskType]]]:
) -> Tuple[Mapping[str, List[str]], Mapping[str, List[str]]]:
"""Get parent groups that contain the specified tasks.
Each pipeline has a root group. Each group has a list of tasks (leaf)
@ -149,8 +162,8 @@ def get_condition_channels_for_tasks(
def get_inputs_for_all_groups(
pipeline: pipeline_context.Pipeline,
task_name_to_parent_groups: Mapping[str, List[GroupOrTaskType]],
group_name_to_parent_groups: Mapping[str, List[tasks_group.TasksGroup]],
task_name_to_parent_groups: Mapping[str, List[str]],
group_name_to_parent_groups: Mapping[str, List[str]],
condition_channels: Mapping[str,
Set[pipeline_channel.PipelineParameterChannel]],
name_to_for_loop_group: Mapping[str, tasks_group.ParallelFor],
@ -325,9 +338,212 @@ def get_inputs_for_all_groups(
return inputs
class InvalidTopologyException(Exception):
pass
def validate_parallel_for_fan_in_consumption_legal(
consumer_task_name: str,
upstream_groups: List[str],
group_name_to_group: Dict[str, tasks_group.TasksGroup],
) -> None:
"""Checks that a dsl.Collected object is being used results in an
unambiguous pipeline topology and is therefore legal.
Args:
consumer_task_name: The name of the consumer task.
upstream_groups: The names of the producer task's upstream groups, ordered from outermost group at beginning to producer task at end. This is produced by produced by _get_uncommon_ancestors.
group_name_to_group: Map of group name to TasksGroup, for fast lookups.
"""
# handles cases like this:
# @dsl.pipeline
# def my_pipeline():
# with dsl.ParallelFor([1, 2, 3]) as x:
# t = double(num=x)
# x = add(dsl.Collected(t.output))
#
# and this:
# @dsl.pipeline
# def my_pipeline():
# t = double(num=1)
# x = add(dsl.Collected(t.output))
producer_task_idx = -1
producer_task_name = upstream_groups[producer_task_idx]
if all(group_name_to_group[group_name].group_type !=
tasks_group.TasksGroupType.FOR_LOOP
for group_name in upstream_groups[:producer_task_idx]):
raise InvalidTopologyException(
f'dsl.{for_loop.Collected.__name__} can only be used to fan-in outputs produced by a task within a dsl.{tasks_group.ParallelFor.__name__} context to a task outside of the dsl.{tasks_group.ParallelFor.__name__} context. Producer task {producer_task_name} is either not in a dsl.{tasks_group.ParallelFor.__name__} context or is only in a dsl.{tasks_group.ParallelFor.__name__} that also contains consumer task {consumer_task_name}.'
)
# illegal if the producer has a parent conditional outside of its outermost for loop, since the for loop may or may not be executed
# for example, what happens if text == 'b'? the resulting execution behavior is ambiguous.
#
# @dsl.pipeline
# def my_pipeline(text: str = ''):
# with dsl.Condition(text == 'a'):
# with dsl.ParallelFor([1, 2, 3]) as x:
# t = double(num=x)
# x = add(nums=dsl.Collected(t.output))
outermost_uncommon_upstream_group = upstream_groups[0]
group = group_name_to_group[outermost_uncommon_upstream_group]
if group.group_type in [
tasks_group.TasksGroupType.CONDITION,
tasks_group.TasksGroupType.EXIT_HANDLER,
]:
raise InvalidTopologyException(
f'{ILLEGAL_CROSS_DAG_ERROR_PREFIX} When using dsl.{for_loop.Collected.__name__} to fan-in outputs from a task within a dsl.{tasks_group.ParallelFor.__name__} context, the dsl.{tasks_group.ParallelFor.__name__} context manager cannot be nested within a dsl.{group.__class__.__name__} context manager unless the consumer task is too. Task {consumer_task_name} consumes from {producer_task_name} within a dsl.{group.__class__.__name__} context.'
)
elif group.group_type != tasks_group.TasksGroupType.FOR_LOOP:
raise ValueError(
f'Got unexpected group type when validating fanning-in outputs from task in dsl.{tasks_group.ParallelFor.__name__}: {group.group_type}'
)
def make_new_channel_for_collected_outputs(
channel_name: str,
starting_channel: pipeline_channel.PipelineChannel,
task_name: str,
) -> pipeline_channel.PipelineChannel:
"""Creates a new PipelineParameterChannel/PipelineArtifactChannel (with
type List) from a Collected channel, a PipelineParameterChannel, or a
PipelineArtifactChannel."""
return starting_channel.__class__(
channel_name,
channel_type=starting_channel.channel_type if isinstance(
starting_channel, pipeline_channel.PipelineArtifactChannel) else
'LIST',
task_name=task_name,
)
def get_outputs_for_all_groups(
pipeline: pipeline_context.Pipeline,
task_name_to_parent_groups: Mapping[str, List[str]],
group_name_to_parent_groups: Mapping[str, List[str]],
all_groups: List[tasks_group.TasksGroup],
pipeline_outputs_dict: Dict[str, pipeline_channel.PipelineChannel]
) -> Tuple[DefaultDict[str, Dict[str, pipeline_channel.PipelineChannel]], Dict[
str, pipeline_channel.PipelineChannel]]:
"""Gets a dictionary of all TasksGroup names to an inner dictionary. The
inner dictionary is TasksGroup output keys to channels corresponding to
those keys.
It constructs this dictionary from both data passing within the pipeline body, as well as the outputs returned from the pipeline (e.g., return dsl.Collected(...)).
Also returns as the second item of tuple the updated pipeline_outputs_dict. This dict is modified so that the values (PipelineChannel) references the group that surfaces the task output, instead of the original task that produced it.
"""
# unlike inputs, which will be surfaced as component input parameters,
# consumers of surfaced outputs need to have a reference to what the parent
# component calls them when they surface them, which will be different than
# the producer task name and channel name (the information contained in the
# pipeline channel)
# for this reason, we use additional_input_name_for_pipeline_channel here
# to set the name of the surfaced output once
group_name_to_group = {group.name: group for group in all_groups}
group_name_to_children = {
group.name: [group.name for group in group.groups] +
[task.name for task in group.tasks] for group in all_groups
}
outputs = collections.defaultdict(dict)
# handle dsl.Collected consumed by tasks
for task in pipeline.tasks.values():
for channel in task.channel_inputs:
if not isinstance(channel, for_loop.Collected):
continue
producer_task = pipeline.tasks[channel.task_name]
consumer_task = task
upstream_groups, downstream_groups = (
_get_uncommon_ancestors(
task_name_to_parent_groups=task_name_to_parent_groups,
group_name_to_parent_groups=group_name_to_parent_groups,
task1=producer_task,
task2=consumer_task,
))
validate_parallel_for_fan_in_consumption_legal(
consumer_task_name=consumer_task.name,
upstream_groups=upstream_groups,
group_name_to_group=group_name_to_group,
)
# producer_task's immediate parent group and the name by which
# to surface the channel
surfaced_output_name = additional_input_name_for_pipeline_channel(
channel)
# the highest-level task group that "consumes" the
# collected output
parent_consumer = downstream_groups[0]
producer_task_name = upstream_groups.pop()
# process from the upstream groups from the inside out
for upstream_name in reversed(upstream_groups):
outputs[upstream_name][
surfaced_output_name] = make_new_channel_for_collected_outputs(
channel_name=channel.name,
starting_channel=channel.output,
task_name=producer_task_name,
)
# on each iteration, mutate the channel being consumed so
# that it references the last parent group surfacer
channel.name = surfaced_output_name
channel.task_name = upstream_name
# for the next iteration, set the consumer to the current
# surfacer (parent group)
producer_task_name = upstream_name
parent_of_current_surfacer = group_name_to_parent_groups[
upstream_name][-2]
if parent_consumer in group_name_to_children[
parent_of_current_surfacer]:
break
# handle dsl.Collected returned from pipeline
for output_key, channel in pipeline_outputs_dict.items():
if isinstance(channel, for_loop.Collected):
surfaced_output_name = additional_input_name_for_pipeline_channel(
channel)
upstream_groups = task_name_to_parent_groups[
channel.task_name][1:]
producer_task_name = upstream_groups.pop()
# process upstream groups from the inside out, until getting to the pipeline level
for upstream_name in reversed(upstream_groups):
new_channel = make_new_channel_for_collected_outputs(
channel_name=channel.name,
starting_channel=channel.output,
task_name=producer_task_name,
)
# on each iteration, mutate the channel being consumed so
# that it references the last parent group surfacer
channel.name = surfaced_output_name
channel.task_name = upstream_name
# for the next iteration, set the consumer to the current
# surfacer (parent group)
producer_task_name = upstream_name
outputs[upstream_name][surfaced_output_name] = new_channel
# after surfacing from all inner TasksGroup, change the PipelineChannel output to also return from the correct TasksGroup
pipeline_outputs_dict[
output_key] = make_new_channel_for_collected_outputs(
channel_name=surfaced_output_name,
starting_channel=channel.output,
task_name=upstream_name,
)
return outputs, pipeline_outputs_dict
def _get_uncommon_ancestors(
task_name_to_parent_groups: Mapping[str, List[GroupOrTaskType]],
group_name_to_parent_groups: Mapping[str, List[tasks_group.TasksGroup]],
task_name_to_parent_groups: Mapping[str, List[str]],
group_name_to_parent_groups: Mapping[str, List[str]],
task1: GroupOrTaskType,
task2: GroupOrTaskType,
) -> Tuple[List[GroupOrTaskType], List[GroupOrTaskType]]:
@ -372,8 +588,8 @@ def _get_uncommon_ancestors(
def get_dependencies(
pipeline: pipeline_context.Pipeline,
task_name_to_parent_groups: Mapping[str, List[GroupOrTaskType]],
group_name_to_parent_groups: Mapping[str, List[tasks_group.TasksGroup]],
task_name_to_parent_groups: Mapping[str, List[str]],
group_name_to_parent_groups: Mapping[str, List[str]],
group_name_to_group: Mapping[str, tasks_group.TasksGroup],
condition_channels: Dict[str, pipeline_channel.PipelineChannel],
) -> Mapping[str, List[GroupOrTaskType]]:
@ -436,34 +652,37 @@ def get_dependencies(
if uncommon_upstream_groups:
dependent_group = group_name_to_group.get(
uncommon_upstream_groups[0], None)
if isinstance(dependent_group, tasks_group.ExitHandler):
task_group_type = 'an ' + tasks_group.ExitHandler.__name__
elif isinstance(dependent_group, tasks_group.Condition):
task_group_type = 'a ' + tasks_group.Condition.__name__
else:
task_group_type = 'a ' + tasks_group.ParallelFor.__name__
raise RuntimeError(
f'Tasks cannot depend on an upstream task inside {task_group_type} that is not a common ancestor of both tasks. Task {task.name} depends on upstream task {upstream_task.name}.'
)
if isinstance(dependent_group,
(tasks_group.Condition, tasks_group.ExitHandler)):
raise InvalidTopologyException(
f'{ILLEGAL_CROSS_DAG_ERROR_PREFIX} A downstream task cannot depend on an upstream task within a dsl.{dependent_group.__class__.__name__} context unless the downstream is within that context too. Found task {task.name} which depends on upstream task {upstream_task.name} within an uncommon dsl.{dependent_group.__class__.__name__} context.'
)
elif isinstance(dependent_group, tasks_group.ParallelFor):
raise InvalidTopologyException(
f'{ILLEGAL_CROSS_DAG_ERROR_PREFIX} A downstream task cannot depend on an upstream task within a dsl.{dependent_group.__class__.__name__} context unless the downstream is within that context too or the outputs are begin fanned-in to a list using dsl.{for_loop.Collected.__name__}. Found task {task.name} which depends on upstream task {upstream_task.name} within an uncommon dsl.{dependent_group.__class__.__name__} context.'
)
# ParralelFor Nested Check
# if there is a parrallelFor group type in the upstream parents tasks and there also exists a parallelFor in the uncommon_ancestors of downstream: this means a nested for loop exists in the DAG
upstream_parent_tasks = task_name_to_parent_groups[
upstream_task.name]
for group in downstream_groups:
if isinstance(
group_name_to_group.get(group, None),
tasks_group.ParallelFor):
for parent_task in upstream_parent_tasks:
if isinstance(
group_name_to_group.get(parent_task, None),
tasks_group.ParallelFor):
raise RuntimeError(
f'Downstream tasks in a nested {tasks_group.ParallelFor.__name__} group cannot depend on an upstream task in a shallower {tasks_group.ParallelFor.__name__} group. Task {task.name} depends on upstream task {upstream_task.name}, while {group} is nested in {parent_task}.'
)
# only check when upstream_task is a PipelineTask, since checking
# for TasksGroup results in catching dsl.Collected cases.
if isinstance(upstream_task, pipeline_task.PipelineTask):
upstream_parent_tasks = task_name_to_parent_groups[
upstream_task.name]
for group in downstream_groups:
if isinstance(
group_name_to_group.get(group, None),
tasks_group.ParallelFor):
for parent_task in upstream_parent_tasks:
if isinstance(
group_name_to_group.get(parent_task, None),
tasks_group.ParallelFor):
raise InvalidTopologyException(
f'{ILLEGAL_CROSS_DAG_ERROR_PREFIX} Downstream tasks in a nested {tasks_group.ParallelFor.__name__} group cannot depend on an upstream task in a shallower {tasks_group.ParallelFor.__name__} group. Task {task.name} depends on upstream task {upstream_task.name}, while {group} is nested in {parent_task}.'
)
dependencies[downstream_groups[0]].add(upstream_groups[0])

View File

@ -0,0 +1,58 @@
# Copyright 2023 The Kubeflow Authors
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import unittest
from absl.testing import parameterized
from kfp.compiler import compiler_utils
from kfp.components import pipeline_channel
class TestAdditionalInputNameForPipelineChannel(parameterized.TestCase):
@parameterized.parameters(
{
'channel':
pipeline_channel.PipelineParameterChannel(
name='output1', task_name='task1', channel_type='String'),
'expected':
'pipelinechannel--task1-output1',
},
{
'channel':
pipeline_channel.PipelineArtifactChannel(
name='output1',
task_name='task1',
channel_type='system.Artifact@0.0.1',
),
'expected':
'pipelinechannel--task1-output1',
},
{
'channel':
pipeline_channel.PipelineParameterChannel(
name='param1', channel_type='String'),
'expected':
'pipelinechannel--param1',
},
)
def test_additional_input_name_for_pipeline_channel(self, channel,
expected):
self.assertEqual(
expected,
compiler_utils.additional_input_name_for_pipeline_channel(channel))
if __name__ == '__main__':
unittest.main()

View File

@ -14,7 +14,9 @@
"""Functions for creating PipelineSpec proto objects."""
import json
from typing import Any, Dict, List, Mapping, Optional, Tuple, Union
import typing
from typing import (Any, DefaultDict, Dict, List, Mapping, Optional, Tuple,
Union)
import warnings
from google.protobuf import json_format
@ -45,18 +47,6 @@ group_type_to_dsl_class = {
_SINGLE_OUTPUT_NAME = 'Output'
def _additional_input_name_for_pipeline_channel(
channel_or_name: Union[pipeline_channel.PipelineChannel, str]) -> str:
"""Gets the name for an additional (compiler-injected) input."""
# Adding a prefix to avoid (reduce chance of) name collision between the
# original component inputs and the injected input.
return 'pipelinechannel--' + (
channel_or_name.full_name if isinstance(
channel_or_name, pipeline_channel.PipelineChannel) else
channel_or_name)
def to_protobuf_value(value: type_utils.PARAMETER_TYPES) -> struct_pb2.Value:
"""Creates a google.protobuf.struct_pb2.Value message out of a provide
value.
@ -137,7 +127,11 @@ def build_task_spec_for_task(
task._task_spec.retry_policy.to_proto())
for input_name, input_value in task.inputs.items():
if isinstance(input_value, pipeline_channel.PipelineArtifactChannel):
if isinstance(input_value,
pipeline_channel.PipelineArtifactChannel) or (
isinstance(input_value, for_loop.Collected) and
input_value.is_artifact_channel):
if input_value.task_name:
# Value is produced by an upstream task.
@ -152,8 +146,8 @@ def build_task_spec_for_task(
else:
# Dependent task not from the same DAG.
component_input_artifact = (
_additional_input_name_for_pipeline_channel(input_value)
)
compiler_utils.
additional_input_name_for_pipeline_channel(input_value))
assert component_input_artifact in parent_component_inputs.artifacts, \
f'component_input_artifact: {component_input_artifact} not found. All inputs: {parent_component_inputs}'
pipeline_task_spec.inputs.artifacts[
@ -163,15 +157,18 @@ def build_task_spec_for_task(
component_input_artifact = input_value.full_name
if component_input_artifact not in parent_component_inputs.artifacts:
component_input_artifact = (
_additional_input_name_for_pipeline_channel(input_value)
)
compiler_utils.
additional_input_name_for_pipeline_channel(input_value))
pipeline_task_spec.inputs.artifacts[
input_name].component_input_artifact = (
component_input_artifact)
elif isinstance(input_value, pipeline_channel.PipelineParameterChannel):
elif isinstance(input_value,
pipeline_channel.PipelineParameterChannel) or (
isinstance(input_value, for_loop.Collected) and
not input_value.is_artifact_channel):
if input_value.task_name:
# Value is produced by an upstream task.
if input_value.task_name in tasks_in_current_dag:
# Dependent task within the same DAG.
@ -184,8 +181,8 @@ def build_task_spec_for_task(
else:
# Dependent task not from the same DAG.
component_input_parameter = (
_additional_input_name_for_pipeline_channel(input_value)
)
compiler_utils.
additional_input_name_for_pipeline_channel(input_value))
assert component_input_parameter in parent_component_inputs.parameters, \
f'component_input_parameter: {component_input_parameter} not found. All inputs: {parent_component_inputs}'
pipeline_task_spec.inputs.parameters[
@ -196,8 +193,8 @@ def build_task_spec_for_task(
component_input_parameter = input_value.full_name
if component_input_parameter not in parent_component_inputs.parameters:
component_input_parameter = (
_additional_input_name_for_pipeline_channel(input_value)
)
compiler_utils.
additional_input_name_for_pipeline_channel(input_value))
pipeline_task_spec.inputs.parameters[
input_name].component_input_parameter = (
component_input_parameter)
@ -205,7 +202,8 @@ def build_task_spec_for_task(
elif isinstance(input_value, for_loop.LoopArgument):
component_input_parameter = (
_additional_input_name_for_pipeline_channel(input_value))
compiler_utils.additional_input_name_for_pipeline_channel(
input_value))
assert component_input_parameter in parent_component_inputs.parameters, \
f'component_input_parameter: {component_input_parameter} not found. All inputs: {parent_component_inputs}'
pipeline_task_spec.inputs.parameters[
@ -215,7 +213,7 @@ def build_task_spec_for_task(
elif isinstance(input_value, for_loop.LoopArgumentVariable):
component_input_parameter = (
_additional_input_name_for_pipeline_channel(
compiler_utils.additional_input_name_for_pipeline_channel(
input_value.loop_argument))
assert component_input_parameter in parent_component_inputs.parameters, \
f'component_input_parameter: {component_input_parameter} not found. All inputs: {parent_component_inputs}'
@ -238,7 +236,8 @@ def build_task_spec_for_task(
# Form the name for the compiler injected input, and make sure it
# doesn't collide with any existing input names.
additional_input_name = (
_additional_input_name_for_pipeline_channel(channel))
compiler_utils.additional_input_name_for_pipeline_channel(
channel))
# We don't expect collision to happen because we prefix the name
# of additional input with 'pipelinechannel--'. But just in case
@ -268,8 +267,8 @@ def build_task_spec_for_task(
else:
# Dependent task not from the same DAG.
component_input_parameter = (
_additional_input_name_for_pipeline_channel(channel)
)
compiler_utils.
additional_input_name_for_pipeline_channel(channel))
assert component_input_parameter in parent_component_inputs.parameters, \
f'component_input_parameter: {component_input_parameter} not found. All inputs: {parent_component_inputs}'
pipeline_task_spec.inputs.parameters[
@ -280,8 +279,8 @@ def build_task_spec_for_task(
component_input_parameter = channel.full_name
if component_input_parameter not in parent_component_inputs.parameters:
component_input_parameter = (
_additional_input_name_for_pipeline_channel(channel)
)
compiler_utils.
additional_input_name_for_pipeline_channel(channel))
pipeline_task_spec.inputs.parameters[
additional_input_name].component_input_parameter = (
component_input_parameter)
@ -400,7 +399,7 @@ def _connect_dag_outputs(
output_name: str,
output_channel: pipeline_channel.PipelineChannel,
) -> None:
"""Connects dag ouptut to a subtask output.
"""Connects dag output to a subtask output.
Args:
component_spec: The component spec to modify its dag outputs.
@ -409,7 +408,7 @@ def _connect_dag_outputs(
"""
if isinstance(output_channel, pipeline_channel.PipelineArtifactChannel):
if output_name not in component_spec.output_definitions.artifacts:
raise ValueError(f'Pipeline output not defined: {output_name}.')
raise ValueError(f'DAG output not defined: {output_name}.')
component_spec.dag.outputs.artifacts[
output_name].artifact_selectors.append(
pipeline_spec_pb2.DagOutputsSpec.ArtifactSelectorSpec(
@ -427,23 +426,11 @@ def _connect_dag_outputs(
def _build_dag_outputs(
component_spec: pipeline_spec_pb2.ComponentSpec,
dag_outputs: Optional[Any],
dag_outputs: Dict[str, pipeline_channel.PipelineChannel],
) -> None:
"""Builds DAG output spec."""
if dag_outputs is not None:
if isinstance(dag_outputs, pipeline_channel.PipelineChannel):
_connect_dag_outputs(
component_spec=component_spec,
output_name=_SINGLE_OUTPUT_NAME,
output_channel=dag_outputs,
)
elif isinstance(dag_outputs, tuple) and hasattr(dag_outputs, '_asdict'):
for output_name, output_channel in dag_outputs._asdict().items():
_connect_dag_outputs(
component_spec=component_spec,
output_name=output_name,
output_channel=output_channel,
)
for output_name, output_channel in dag_outputs.items():
_connect_dag_outputs(component_spec, output_name, output_channel)
# Valid dag outputs covers all outptus in component definition.
for output_name in component_spec.output_definitions.artifacts:
if output_name not in component_spec.dag.outputs.artifacts:
@ -561,7 +548,8 @@ def _fill_in_component_input_default_value(
def build_component_spec_for_group(
pipeline_channels: List[pipeline_channel.PipelineChannel]
input_pipeline_channels: List[pipeline_channel.PipelineChannel],
output_pipeline_channels: Dict[str, pipeline_channel.PipelineChannel],
) -> pipeline_spec_pb2.ComponentSpec:
"""Builds ComponentSpec for a TasksGroup.
@ -574,8 +562,9 @@ def build_component_spec_for_group(
"""
component_spec = pipeline_spec_pb2.ComponentSpec()
for channel in pipeline_channels:
input_name = _additional_input_name_for_pipeline_channel(channel)
for channel in input_pipeline_channels:
input_name = compiler_utils.additional_input_name_for_pipeline_channel(
channel)
if isinstance(channel, pipeline_channel.PipelineArtifactChannel):
component_spec.input_definitions.artifacts[
@ -588,6 +577,18 @@ def build_component_spec_for_group(
component_spec.input_definitions.parameters[
input_name].parameter_type = type_utils.get_parameter_type(
channel.channel_type)
for output_name, output in output_pipeline_channels.items():
if isinstance(output, pipeline_channel.PipelineArtifactChannel):
component_spec.output_definitions.artifacts[
output_name].artifact_type.CopyFrom(
type_utils.bundled_artifact_to_artifact_proto(
output.channel_type))
else:
component_spec.output_definitions.parameters[
output_name].parameter_type = type_utils.get_parameter_type(
channel.channel_type)
return component_spec
@ -620,9 +621,9 @@ def _update_task_spec_for_loop_group(
"""
if group.items_is_pipeline_channel:
loop_items_channel = group.loop_argument.items_or_pipeline_channel
input_parameter_name = _additional_input_name_for_pipeline_channel(
input_parameter_name = compiler_utils.additional_input_name_for_pipeline_channel(
loop_items_channel)
loop_argument_item_name = _additional_input_name_for_pipeline_channel(
loop_argument_item_name = compiler_utils.additional_input_name_for_pipeline_channel(
group.loop_argument.full_name)
loop_arguments_item = f'{input_parameter_name}-{for_loop.LoopArgument.LOOP_ITEM_NAME_BASE}'
@ -642,11 +643,11 @@ def _update_task_spec_for_loop_group(
)
pipeline_task_spec.inputs.parameters[
input_parameter_name].component_input_parameter = (
_additional_input_name_for_pipeline_channel(
compiler_utils.additional_input_name_for_pipeline_channel(
loop_items_channel.loop_argument))
else:
input_parameter_name = _additional_input_name_for_pipeline_channel(
input_parameter_name = compiler_utils.additional_input_name_for_pipeline_channel(
group.loop_argument)
raw_values = group.loop_argument.items_or_pipeline_channel
@ -690,7 +691,7 @@ def _resolve_condition_operands(
pipeline_spec_pb2.ParameterType
.PARAMETER_TYPE_ENUM_UNSPECIFIED,
]:
input_name = _additional_input_name_for_pipeline_channel(
input_name = compiler_utils.additional_input_name_for_pipeline_channel(
value_or_reference)
raise ValueError(
f'Conditional requires scalar parameter values for comparison. Found input "{input_name}" of type {value_or_reference.channel_type} in pipeline definition instead.'
@ -733,7 +734,7 @@ def _resolve_condition_operands(
operand_values = []
for value_or_reference in [left_operand, right_operand]:
if isinstance(value_or_reference, pipeline_channel.PipelineChannel):
input_name = _additional_input_name_for_pipeline_channel(
input_name = compiler_utils.additional_input_name_for_pipeline_channel(
value_or_reference)
operand_value = f"inputs.parameter_values['{input_name}']"
parameter_type = type_utils.get_parameter_type(
@ -866,7 +867,8 @@ def build_task_spec_for_group(
channel_full_name = channel.loop_argument.full_name
subvar_name = channel.subvar_name
input_name = _additional_input_name_for_pipeline_channel(channel)
input_name = compiler_utils.additional_input_name_for_pipeline_channel(
channel)
channel_name = channel.name
if subvar_name:
@ -900,7 +902,7 @@ def build_task_spec_for_group(
pipeline_task_spec.inputs.parameters[
input_name].component_input_parameter = (
channel_full_name if is_parent_component_root else
_additional_input_name_for_pipeline_channel(
compiler_utils.additional_input_name_for_pipeline_channel(
channel_full_name))
if isinstance(group, tasks_group.ParallelFor):
@ -1025,6 +1027,7 @@ def build_spec_by_group(
deployment_config: pipeline_spec_pb2.PipelineDeploymentConfig,
group: tasks_group.TasksGroup,
inputs: Mapping[str, List[Tuple[pipeline_channel.PipelineChannel, str]]],
outputs: DefaultDict[str, Dict[str, pipeline_channel.PipelineChannel]],
dependencies: Dict[str, List[compiler_utils.GroupOrTaskType]],
rootgroup_name: str,
task_name_to_parent_groups: Mapping[str,
@ -1072,8 +1075,10 @@ def build_spec_by_group(
subgroups = group.groups + group.tasks
for subgroup in subgroups:
subgroup_inputs = inputs.get(subgroup.name, [])
subgroup_channels = [channel for channel, _ in subgroup_inputs]
subgroup_input_channels = [
channel for channel, _ in inputs.get(subgroup.name, [])
]
subgroup_output_channels = outputs.get(subgroup.name, {})
subgroup_component_name = (utils.sanitize_component_name(subgroup.name))
@ -1128,7 +1133,7 @@ def build_spec_by_group(
# subgroups or tasks.
loop_subgroup_channels = []
for channel in subgroup_channels:
for channel in subgroup_input_channels:
# Skip 'withItems' loop arguments if it's from an inner loop.
if isinstance(
channel,
@ -1156,7 +1161,9 @@ def build_spec_by_group(
loop_subgroup_channels.append(subgroup.loop_argument)
subgroup_component_spec = build_component_spec_for_group(
pipeline_channels=loop_subgroup_channels)
input_pipeline_channels=loop_subgroup_channels,
output_pipeline_channels=subgroup_output_channels,
)
subgroup_task_spec = build_task_spec_for_group(
group=subgroup,
@ -1165,11 +1172,14 @@ def build_spec_by_group(
is_parent_component_root=is_parent_component_root,
)
_build_dag_outputs(subgroup_component_spec,
subgroup_output_channels)
elif isinstance(subgroup, tasks_group.Condition):
# "Punch the hole", adding inputs needed by its subgroups or
# tasks.
condition_subgroup_channels = list(subgroup_channels)
condition_subgroup_channels = list(subgroup_input_channels)
for operand in [
subgroup.condition.left_operand,
subgroup.condition.right_operand,
@ -1178,7 +1188,9 @@ def build_spec_by_group(
condition_subgroup_channels.append(operand)
subgroup_component_spec = build_component_spec_for_group(
pipeline_channels=condition_subgroup_channels)
input_pipeline_channels=condition_subgroup_channels,
output_pipeline_channels=subgroup_output_channels,
)
subgroup_task_spec = build_task_spec_for_group(
group=subgroup,
@ -1187,14 +1199,19 @@ def build_spec_by_group(
is_parent_component_root=is_parent_component_root,
)
_build_dag_outputs(subgroup_component_spec,
subgroup_output_channels)
elif isinstance(subgroup, tasks_group.ExitHandler):
subgroup_component_spec = build_component_spec_for_group(
pipeline_channels=subgroup_channels)
input_pipeline_channels=subgroup_input_channels,
output_pipeline_channels={},
)
subgroup_task_spec = build_task_spec_for_group(
group=subgroup,
pipeline_channels=subgroup_channels,
pipeline_channels=subgroup_input_channels,
tasks_in_current_dag=tasks_in_current_dag,
is_parent_component_root=is_parent_component_root,
)
@ -1483,9 +1500,10 @@ def create_pipeline_spec(
pipeline_spec.root.CopyFrom(
_build_component_spec_from_component_spec_structure(component_spec))
_build_dag_outputs(
component_spec=pipeline_spec.root, dag_outputs=pipeline_outputs)
# TODO: add validation of returned outputs -- it's possible to return
# an output from a task in a condition group, for example, which isn't
# caught until submission time using Vertex SDK client
pipeline_outputs_dict = convert_pipeline_outputs_to_dict(pipeline_outputs)
root_group = pipeline.groups[0]
all_groups = compiler_utils.get_all_groups(root_group)
@ -1506,6 +1524,12 @@ def create_pipeline_spec(
condition_channels=condition_channels,
name_to_for_loop_group=name_to_for_loop_group,
)
outputs, modified_pipeline_outputs_dict = compiler_utils.get_outputs_for_all_groups(
pipeline=pipeline,
task_name_to_parent_groups=task_name_to_parent_groups,
group_name_to_parent_groups=group_name_to_parent_groups,
all_groups=all_groups,
pipeline_outputs_dict=pipeline_outputs_dict)
dependencies = compiler_utils.get_dependencies(
pipeline=pipeline,
task_name_to_parent_groups=task_name_to_parent_groups,
@ -1520,6 +1544,7 @@ def create_pipeline_spec(
deployment_config=deployment_config,
group=group,
inputs=inputs,
outputs=outputs,
dependencies=dependencies,
rootgroup_name=root_group.name,
task_name_to_parent_groups=task_name_to_parent_groups,
@ -1533,9 +1558,31 @@ def create_pipeline_spec(
deployment_config=deployment_config,
)
_build_dag_outputs(
component_spec=pipeline_spec.root,
dag_outputs=modified_pipeline_outputs_dict,
)
return pipeline_spec
def convert_pipeline_outputs_to_dict(
pipeline_outputs: Union[pipeline_channel.PipelineChannel, typing.NamedTuple,
None]
) -> Dict[str, pipeline_channel.PipelineChannel]:
"""Converts the outputs from a pipeline function into a dictionary of
output name to PipelineChannel."""
if pipeline_outputs is None:
return {}
elif isinstance(pipeline_outputs, pipeline_channel.PipelineChannel):
return {_SINGLE_OUTPUT_NAME: pipeline_outputs}
elif isinstance(pipeline_outputs, tuple) and hasattr(
pipeline_outputs, '_asdict'):
return dict(pipeline_outputs._asdict())
else:
raise ValueError(f'Got unknown pipeline output: {pipeline_outputs}')
def write_pipeline_spec_to_file(pipeline_spec: pipeline_spec_pb2.PipelineSpec,
pipeline_description: str,
package_path: str) -> None:

View File

@ -19,7 +19,6 @@ from absl.testing import parameterized
from google.protobuf import json_format
from google.protobuf import struct_pb2
from kfp.compiler import pipeline_spec_builder
from kfp.components import pipeline_channel
from kfp.pipeline_spec import pipeline_spec_pb2
import yaml
@ -29,39 +28,6 @@ class PipelineSpecBuilderTest(parameterized.TestCase):
def setUp(self):
self.maxDiff = None
@parameterized.parameters(
{
'channel':
pipeline_channel.PipelineParameterChannel(
name='output1', task_name='task1', channel_type='String'),
'expected':
'pipelinechannel--task1-output1',
},
{
'channel':
pipeline_channel.PipelineArtifactChannel(
name='output1',
task_name='task1',
channel_type='system.Artifact@0.0.1',
),
'expected':
'pipelinechannel--task1-output1',
},
{
'channel':
pipeline_channel.PipelineParameterChannel(
name='param1', channel_type='String'),
'expected':
'pipelinechannel--param1',
},
)
def test_additional_input_name_for_pipeline_channel(self, channel,
expected):
self.assertEqual(
expected,
pipeline_spec_builder._additional_input_name_for_pipeline_channel(
channel))
@parameterized.parameters(
{
'parameter_type': pipeline_spec_pb2.ParameterType.NUMBER_INTEGER,

View File

@ -272,3 +272,46 @@ class LoopArgumentVariable(pipeline_channel.PipelineChannel):
The name of this loop arg variable.
"""
return f'{loop_arg_name}{self.SUBVAR_NAME_DELIMITER}{subvar_name}'
class Collected(pipeline_channel.PipelineChannel):
"""For collecting into a list the output from a task in dsl.ParallelFor
loops.
Args:
output: The output of an upstream task within a dsl.ParallelFor loop.
Example:
::
@dsl.pipeline
def math_pipeline() -> int:
with dsl.ParallelFor([1, 2, 3]) as x:
t = double(num=x)
return add(nums=dsl.Collected(t.output)).output
"""
def __init__(
self,
output: pipeline_channel.PipelineChannel,
) -> None:
self.output = output
if isinstance(output, pipeline_channel.PipelineArtifactChannel):
channel_type = output.channel_type
self.is_artifact_channel = True
else:
channel_type = 'LIST'
self.is_artifact_channel = False
# TODO: remove to support artifact fan-in
if self.is_artifact_channel:
raise NotImplementedError(
'Fan-in of artifacts created in a ParallelFor loop is not yet supported.'
)
super().__init__(
output.name,
channel_type=channel_type,
task_name=output.task_name,
)

View File

@ -964,6 +964,8 @@ class ComponentSpec:
deployment_config=deployment_config,
group=root_group,
inputs=inputs,
outputs=collections.defaultdict(
dict), # empty -- no sub-DAG outputs to surface
dependencies={}, # no dependencies for single-component pipeline
rootgroup_name=root_group.name,
task_name_to_parent_groups=task_name_to_parent_groups,

View File

@ -23,6 +23,7 @@ __all__ = [
'Condition',
'ExitHandler',
'ParallelFor',
'Collected',
'Input',
'Output',
'InputPath',
@ -55,6 +56,7 @@ from typing import TypeVar
from kfp.components.component_decorator import component
from kfp.components.container_component_decorator import container_component
from kfp.components.for_loop import Collected
from kfp.components.importer_node import importer
from kfp.components.pipeline_context import pipeline
from kfp.components.pipeline_task import PipelineTask

View File

@ -0,0 +1,31 @@
from typing import List
from kfp import compiler
from kfp import dsl
@dsl.component
def double(num: int) -> int:
return 2 * num
@dsl.component
def add(nums: List[int]) -> int:
return sum(nums)
@dsl.pipeline
def math_pipeline(threshold: int = 2) -> List[int]:
with dsl.ParallelFor([1, 2, 3]) as x:
with dsl.Condition(x >= threshold):
t = double(num=x)
with dsl.Condition(threshold == 2):
t = add(nums=dsl.Collected(t.output))
return dsl.Collected(t.output)
if __name__ == '__main__':
compiler.Compiler().compile(
pipeline_func=math_pipeline,
package_path=__file__.replace('.py', '.yaml'))

View File

@ -0,0 +1,228 @@
# PIPELINE DEFINITION
# Name: math-pipeline
# Inputs:
# threshold: int [Default: 2.0]
# Outputs:
# Output: list
components:
comp-add:
executorLabel: exec-add
inputDefinitions:
parameters:
nums:
parameterType: LIST
outputDefinitions:
parameters:
Output:
parameterType: NUMBER_INTEGER
comp-condition-3:
dag:
outputs:
parameters:
pipelinechannel--double-Output:
valueFromParameter:
outputParameterKey: Output
producerSubtask: double
tasks:
double:
cachingOptions:
enableCache: true
componentRef:
name: comp-double
inputs:
parameters:
num:
componentInputParameter: pipelinechannel--loop-item-param-1
taskInfo:
name: double
inputDefinitions:
parameters:
pipelinechannel--loop-item-param-1:
parameterType: NUMBER_INTEGER
pipelinechannel--threshold:
parameterType: NUMBER_INTEGER
outputDefinitions:
parameters:
pipelinechannel--double-Output:
parameterType: NUMBER_INTEGER
comp-condition-4:
dag:
outputs:
parameters:
pipelinechannel--add-Output:
valueFromParameter:
outputParameterKey: Output
producerSubtask: add
tasks:
add:
cachingOptions:
enableCache: true
componentRef:
name: comp-add
inputs:
parameters:
nums:
componentInputParameter: pipelinechannel--for-loop-2-pipelinechannel--double-Output
taskInfo:
name: add
inputDefinitions:
parameters:
pipelinechannel--for-loop-2-pipelinechannel--double-Output:
parameterType: LIST
pipelinechannel--threshold:
parameterType: NUMBER_INTEGER
outputDefinitions:
parameters:
pipelinechannel--add-Output:
parameterType: NUMBER_INTEGER
comp-double:
executorLabel: exec-double
inputDefinitions:
parameters:
num:
parameterType: NUMBER_INTEGER
outputDefinitions:
parameters:
Output:
parameterType: NUMBER_INTEGER
comp-for-loop-2:
dag:
outputs:
parameters:
pipelinechannel--double-Output:
valueFromParameter:
outputParameterKey: pipelinechannel--double-Output
producerSubtask: condition-3
tasks:
condition-3:
componentRef:
name: comp-condition-3
inputs:
parameters:
pipelinechannel--loop-item-param-1:
componentInputParameter: pipelinechannel--loop-item-param-1
pipelinechannel--threshold:
componentInputParameter: pipelinechannel--threshold
taskInfo:
name: condition-3
triggerPolicy:
condition: int(inputs.parameter_values['pipelinechannel--loop-item-param-1'])
>= int(inputs.parameter_values['pipelinechannel--threshold'])
inputDefinitions:
parameters:
pipelinechannel--loop-item-param-1:
parameterType: NUMBER_INTEGER
pipelinechannel--threshold:
parameterType: NUMBER_INTEGER
outputDefinitions:
parameters:
pipelinechannel--double-Output:
parameterType: NUMBER_INTEGER
deploymentSpec:
executors:
exec-add:
container:
args:
- --executor_input
- '{{$}}'
- --function_to_execute
- add
command:
- sh
- -c
- "\nif ! [ -x \"$(command -v pip)\" ]; then\n python3 -m ensurepip ||\
\ python3 -m ensurepip --user || apt-get install python3-pip\nfi\n\nPIP_DISABLE_PIP_VERSION_CHECK=1\
\ python3 -m pip install --quiet --no-warn-script-location 'kfp==2.0.0-beta.11'\
\ && \"$0\" \"$@\"\n"
- sh
- -ec
- 'program_path=$(mktemp -d)
printf "%s" "$0" > "$program_path/ephemeral_component.py"
python3 -m kfp.components.executor_main --component_module_path "$program_path/ephemeral_component.py" "$@"
'
- "\nimport kfp\nfrom kfp import dsl\nfrom kfp.dsl import *\nfrom typing import\
\ *\n\ndef add(nums: List[int]) -> int:\n return sum(nums)\n\n"
image: python:3.7
exec-double:
container:
args:
- --executor_input
- '{{$}}'
- --function_to_execute
- double
command:
- sh
- -c
- "\nif ! [ -x \"$(command -v pip)\" ]; then\n python3 -m ensurepip ||\
\ python3 -m ensurepip --user || apt-get install python3-pip\nfi\n\nPIP_DISABLE_PIP_VERSION_CHECK=1\
\ python3 -m pip install --quiet --no-warn-script-location 'kfp==2.0.0-beta.11'\
\ && \"$0\" \"$@\"\n"
- sh
- -ec
- 'program_path=$(mktemp -d)
printf "%s" "$0" > "$program_path/ephemeral_component.py"
python3 -m kfp.components.executor_main --component_module_path "$program_path/ephemeral_component.py" "$@"
'
- "\nimport kfp\nfrom kfp import dsl\nfrom kfp.dsl import *\nfrom typing import\
\ *\n\ndef double(num: int) -> int:\n return 2 * num\n\n"
image: python:3.7
pipelineInfo:
name: math-pipeline
root:
dag:
outputs:
parameters:
Output:
valueFromParameter:
outputParameterKey: pipelinechannel--add-Output
producerSubtask: condition-4
tasks:
condition-4:
componentRef:
name: comp-condition-4
dependentTasks:
- for-loop-2
inputs:
parameters:
pipelinechannel--for-loop-2-pipelinechannel--double-Output:
taskOutputParameter:
outputParameterKey: pipelinechannel--double-Output
producerTask: for-loop-2
pipelinechannel--threshold:
componentInputParameter: threshold
taskInfo:
name: condition-4
triggerPolicy:
condition: int(inputs.parameter_values['pipelinechannel--threshold']) ==
2
for-loop-2:
componentRef:
name: comp-for-loop-2
inputs:
parameters:
pipelinechannel--threshold:
componentInputParameter: threshold
parameterIterator:
itemInput: pipelinechannel--loop-item-param-1
items:
raw: '[1, 2, 3]'
taskInfo:
name: for-loop-2
inputDefinitions:
parameters:
threshold:
defaultValue: 2.0
isOptional: true
parameterType: NUMBER_INTEGER
outputDefinitions:
parameters:
Output:
parameterType: LIST
schemaVersion: 2.1.0
sdkVersion: kfp-2.0.0-beta.11

View File

@ -0,0 +1,50 @@
from typing import List
from kfp import compiler
from kfp import dsl
@dsl.component
def double(num: int) -> int:
return 2 * num
@dsl.component
def simple_add(nums: List[int]) -> int:
return sum(nums)
@dsl.component
def nested_add(nums: List[List[int]]) -> int:
import itertools
return sum(itertools.chain(*nums))
@dsl.component
def add_two_numbers(x: List[int], y: List[int]) -> int:
return sum(x) + sum(y)
@dsl.pipeline
def math_pipeline() -> int:
with dsl.ParallelFor([1, 2, 3]) as a:
t1 = double(num=a)
with dsl.ParallelFor([4, 5, 6]) as b:
t2 = double(num=b)
simple_add(nums=dsl.Collected(t2.output))
nested_add(nums=dsl.Collected(t2.output))
with dsl.ParallelFor([0, 0, 0]) as _:
t3 = simple_add(nums=dsl.Collected(t1.output))
t4 = nested_add(nums=dsl.Collected(t2.output))
return add_two_numbers(
x=dsl.Collected(t3.output), y=dsl.Collected(t4.output)).output
if __name__ == '__main__':
compiler.Compiler().compile(
pipeline_func=math_pipeline,
package_path=__file__.replace('.py', '.yaml'))

View File

@ -0,0 +1,480 @@
# PIPELINE DEFINITION
# Name: math-pipeline
# Outputs:
# Output: int
components:
comp-add-two-numbers:
executorLabel: exec-add-two-numbers
inputDefinitions:
parameters:
x:
parameterType: LIST
y:
parameterType: LIST
outputDefinitions:
parameters:
Output:
parameterType: NUMBER_INTEGER
comp-double:
executorLabel: exec-double
inputDefinitions:
parameters:
num:
parameterType: NUMBER_INTEGER
outputDefinitions:
parameters:
Output:
parameterType: NUMBER_INTEGER
comp-double-2:
executorLabel: exec-double-2
inputDefinitions:
parameters:
num:
parameterType: NUMBER_INTEGER
outputDefinitions:
parameters:
Output:
parameterType: NUMBER_INTEGER
comp-for-loop-2:
dag:
outputs:
parameters:
pipelinechannel--double-2-Output:
valueFromParameter:
outputParameterKey: pipelinechannel--double-2-Output
producerSubtask: for-loop-4
pipelinechannel--double-Output:
valueFromParameter:
outputParameterKey: Output
producerSubtask: double
tasks:
double:
cachingOptions:
enableCache: true
componentRef:
name: comp-double
inputs:
parameters:
num:
componentInputParameter: pipelinechannel--loop-item-param-1
taskInfo:
name: double
for-loop-4:
componentRef:
name: comp-for-loop-4
parameterIterator:
itemInput: pipelinechannel--loop-item-param-3
items:
raw: '[4, 5, 6]'
taskInfo:
name: for-loop-4
simple-add:
cachingOptions:
enableCache: true
componentRef:
name: comp-simple-add
dependentTasks:
- for-loop-4
inputs:
parameters:
nums:
taskOutputParameter:
outputParameterKey: pipelinechannel--double-2-Output
producerTask: for-loop-4
taskInfo:
name: simple-add
inputDefinitions:
parameters:
pipelinechannel--loop-item-param-1:
parameterType: NUMBER_INTEGER
outputDefinitions:
parameters:
pipelinechannel--double-2-Output:
parameterType: NUMBER_INTEGER
pipelinechannel--double-Output:
parameterType: NUMBER_INTEGER
comp-for-loop-4:
dag:
outputs:
parameters:
pipelinechannel--double-2-Output:
valueFromParameter:
outputParameterKey: Output
producerSubtask: double-2
tasks:
double-2:
cachingOptions:
enableCache: true
componentRef:
name: comp-double-2
inputs:
parameters:
num:
componentInputParameter: pipelinechannel--loop-item-param-3
taskInfo:
name: double-2
inputDefinitions:
parameters:
pipelinechannel--loop-item-param-3:
parameterType: NUMBER_INTEGER
outputDefinitions:
parameters:
pipelinechannel--double-2-Output:
parameterType: NUMBER_INTEGER
comp-for-loop-6:
dag:
outputs:
parameters:
pipelinechannel--nested-add-2-Output:
valueFromParameter:
outputParameterKey: Output
producerSubtask: nested-add-2
pipelinechannel--simple-add-2-Output:
valueFromParameter:
outputParameterKey: Output
producerSubtask: simple-add-2
tasks:
nested-add-2:
cachingOptions:
enableCache: true
componentRef:
name: comp-nested-add-2
inputs:
parameters:
nums:
componentInputParameter: pipelinechannel--for-loop-2-pipelinechannel--double-2-Output
taskInfo:
name: nested-add-2
simple-add-2:
cachingOptions:
enableCache: true
componentRef:
name: comp-simple-add-2
inputs:
parameters:
nums:
componentInputParameter: pipelinechannel--for-loop-2-pipelinechannel--double-Output
taskInfo:
name: simple-add-2
inputDefinitions:
parameters:
pipelinechannel--for-loop-2-pipelinechannel--double-2-Output:
parameterType: LIST
pipelinechannel--for-loop-2-pipelinechannel--double-Output:
parameterType: LIST
pipelinechannel--loop-item-param-5:
parameterType: NUMBER_INTEGER
outputDefinitions:
parameters:
pipelinechannel--nested-add-2-Output:
parameterType: NUMBER_INTEGER
pipelinechannel--simple-add-2-Output:
parameterType: NUMBER_INTEGER
comp-nested-add:
executorLabel: exec-nested-add
inputDefinitions:
parameters:
nums:
parameterType: LIST
outputDefinitions:
parameters:
Output:
parameterType: NUMBER_INTEGER
comp-nested-add-2:
executorLabel: exec-nested-add-2
inputDefinitions:
parameters:
nums:
parameterType: LIST
outputDefinitions:
parameters:
Output:
parameterType: NUMBER_INTEGER
comp-simple-add:
executorLabel: exec-simple-add
inputDefinitions:
parameters:
nums:
parameterType: LIST
outputDefinitions:
parameters:
Output:
parameterType: NUMBER_INTEGER
comp-simple-add-2:
executorLabel: exec-simple-add-2
inputDefinitions:
parameters:
nums:
parameterType: LIST
outputDefinitions:
parameters:
Output:
parameterType: NUMBER_INTEGER
deploymentSpec:
executors:
exec-add-two-numbers:
container:
args:
- --executor_input
- '{{$}}'
- --function_to_execute
- add_two_numbers
command:
- sh
- -c
- "\nif ! [ -x \"$(command -v pip)\" ]; then\n python3 -m ensurepip ||\
\ python3 -m ensurepip --user || apt-get install python3-pip\nfi\n\nPIP_DISABLE_PIP_VERSION_CHECK=1\
\ python3 -m pip install --quiet --no-warn-script-location 'kfp==2.0.0-beta.11'\
\ && \"$0\" \"$@\"\n"
- sh
- -ec
- 'program_path=$(mktemp -d)
printf "%s" "$0" > "$program_path/ephemeral_component.py"
python3 -m kfp.components.executor_main --component_module_path "$program_path/ephemeral_component.py" "$@"
'
- "\nimport kfp\nfrom kfp import dsl\nfrom kfp.dsl import *\nfrom typing import\
\ *\n\ndef add_two_numbers(x: List[int], y: List[int]) -> int:\n return\
\ sum(x) + sum(y)\n\n"
image: python:3.7
exec-double:
container:
args:
- --executor_input
- '{{$}}'
- --function_to_execute
- double
command:
- sh
- -c
- "\nif ! [ -x \"$(command -v pip)\" ]; then\n python3 -m ensurepip ||\
\ python3 -m ensurepip --user || apt-get install python3-pip\nfi\n\nPIP_DISABLE_PIP_VERSION_CHECK=1\
\ python3 -m pip install --quiet --no-warn-script-location 'kfp==2.0.0-beta.11'\
\ && \"$0\" \"$@\"\n"
- sh
- -ec
- 'program_path=$(mktemp -d)
printf "%s" "$0" > "$program_path/ephemeral_component.py"
python3 -m kfp.components.executor_main --component_module_path "$program_path/ephemeral_component.py" "$@"
'
- "\nimport kfp\nfrom kfp import dsl\nfrom kfp.dsl import *\nfrom typing import\
\ *\n\ndef double(num: int) -> int:\n return 2 * num\n\n"
image: python:3.7
exec-double-2:
container:
args:
- --executor_input
- '{{$}}'
- --function_to_execute
- double
command:
- sh
- -c
- "\nif ! [ -x \"$(command -v pip)\" ]; then\n python3 -m ensurepip ||\
\ python3 -m ensurepip --user || apt-get install python3-pip\nfi\n\nPIP_DISABLE_PIP_VERSION_CHECK=1\
\ python3 -m pip install --quiet --no-warn-script-location 'kfp==2.0.0-beta.11'\
\ && \"$0\" \"$@\"\n"
- sh
- -ec
- 'program_path=$(mktemp -d)
printf "%s" "$0" > "$program_path/ephemeral_component.py"
python3 -m kfp.components.executor_main --component_module_path "$program_path/ephemeral_component.py" "$@"
'
- "\nimport kfp\nfrom kfp import dsl\nfrom kfp.dsl import *\nfrom typing import\
\ *\n\ndef double(num: int) -> int:\n return 2 * num\n\n"
image: python:3.7
exec-nested-add:
container:
args:
- --executor_input
- '{{$}}'
- --function_to_execute
- nested_add
command:
- sh
- -c
- "\nif ! [ -x \"$(command -v pip)\" ]; then\n python3 -m ensurepip ||\
\ python3 -m ensurepip --user || apt-get install python3-pip\nfi\n\nPIP_DISABLE_PIP_VERSION_CHECK=1\
\ python3 -m pip install --quiet --no-warn-script-location 'kfp==2.0.0-beta.11'\
\ && \"$0\" \"$@\"\n"
- sh
- -ec
- 'program_path=$(mktemp -d)
printf "%s" "$0" > "$program_path/ephemeral_component.py"
python3 -m kfp.components.executor_main --component_module_path "$program_path/ephemeral_component.py" "$@"
'
- "\nimport kfp\nfrom kfp import dsl\nfrom kfp.dsl import *\nfrom typing import\
\ *\n\ndef nested_add(nums: List[List[int]]) -> int:\n import itertools\n\
\ return sum(itertools.chain(*nums))\n\n"
image: python:3.7
exec-nested-add-2:
container:
args:
- --executor_input
- '{{$}}'
- --function_to_execute
- nested_add
command:
- sh
- -c
- "\nif ! [ -x \"$(command -v pip)\" ]; then\n python3 -m ensurepip ||\
\ python3 -m ensurepip --user || apt-get install python3-pip\nfi\n\nPIP_DISABLE_PIP_VERSION_CHECK=1\
\ python3 -m pip install --quiet --no-warn-script-location 'kfp==2.0.0-beta.11'\
\ && \"$0\" \"$@\"\n"
- sh
- -ec
- 'program_path=$(mktemp -d)
printf "%s" "$0" > "$program_path/ephemeral_component.py"
python3 -m kfp.components.executor_main --component_module_path "$program_path/ephemeral_component.py" "$@"
'
- "\nimport kfp\nfrom kfp import dsl\nfrom kfp.dsl import *\nfrom typing import\
\ *\n\ndef nested_add(nums: List[List[int]]) -> int:\n import itertools\n\
\ return sum(itertools.chain(*nums))\n\n"
image: python:3.7
exec-simple-add:
container:
args:
- --executor_input
- '{{$}}'
- --function_to_execute
- simple_add
command:
- sh
- -c
- "\nif ! [ -x \"$(command -v pip)\" ]; then\n python3 -m ensurepip ||\
\ python3 -m ensurepip --user || apt-get install python3-pip\nfi\n\nPIP_DISABLE_PIP_VERSION_CHECK=1\
\ python3 -m pip install --quiet --no-warn-script-location 'kfp==2.0.0-beta.11'\
\ && \"$0\" \"$@\"\n"
- sh
- -ec
- 'program_path=$(mktemp -d)
printf "%s" "$0" > "$program_path/ephemeral_component.py"
python3 -m kfp.components.executor_main --component_module_path "$program_path/ephemeral_component.py" "$@"
'
- "\nimport kfp\nfrom kfp import dsl\nfrom kfp.dsl import *\nfrom typing import\
\ *\n\ndef simple_add(nums: List[int]) -> int:\n return sum(nums)\n\n"
image: python:3.7
exec-simple-add-2:
container:
args:
- --executor_input
- '{{$}}'
- --function_to_execute
- simple_add
command:
- sh
- -c
- "\nif ! [ -x \"$(command -v pip)\" ]; then\n python3 -m ensurepip ||\
\ python3 -m ensurepip --user || apt-get install python3-pip\nfi\n\nPIP_DISABLE_PIP_VERSION_CHECK=1\
\ python3 -m pip install --quiet --no-warn-script-location 'kfp==2.0.0-beta.11'\
\ && \"$0\" \"$@\"\n"
- sh
- -ec
- 'program_path=$(mktemp -d)
printf "%s" "$0" > "$program_path/ephemeral_component.py"
python3 -m kfp.components.executor_main --component_module_path "$program_path/ephemeral_component.py" "$@"
'
- "\nimport kfp\nfrom kfp import dsl\nfrom kfp.dsl import *\nfrom typing import\
\ *\n\ndef simple_add(nums: List[int]) -> int:\n return sum(nums)\n\n"
image: python:3.7
pipelineInfo:
name: math-pipeline
root:
dag:
outputs:
parameters:
Output:
valueFromParameter:
outputParameterKey: Output
producerSubtask: add-two-numbers
tasks:
add-two-numbers:
cachingOptions:
enableCache: true
componentRef:
name: comp-add-two-numbers
dependentTasks:
- for-loop-6
inputs:
parameters:
x:
taskOutputParameter:
outputParameterKey: pipelinechannel--simple-add-2-Output
producerTask: for-loop-6
y:
taskOutputParameter:
outputParameterKey: pipelinechannel--nested-add-2-Output
producerTask: for-loop-6
taskInfo:
name: add-two-numbers
for-loop-2:
componentRef:
name: comp-for-loop-2
parameterIterator:
itemInput: pipelinechannel--loop-item-param-1
items:
raw: '[1, 2, 3]'
taskInfo:
name: for-loop-2
for-loop-6:
componentRef:
name: comp-for-loop-6
dependentTasks:
- for-loop-2
inputs:
parameters:
pipelinechannel--for-loop-2-pipelinechannel--double-2-Output:
taskOutputParameter:
outputParameterKey: pipelinechannel--double-2-Output
producerTask: for-loop-2
pipelinechannel--for-loop-2-pipelinechannel--double-Output:
taskOutputParameter:
outputParameterKey: pipelinechannel--double-Output
producerTask: for-loop-2
parameterIterator:
itemInput: pipelinechannel--loop-item-param-5
items:
raw: '[0, 0, 0]'
taskInfo:
name: for-loop-6
nested-add:
cachingOptions:
enableCache: true
componentRef:
name: comp-nested-add
dependentTasks:
- for-loop-2
inputs:
parameters:
nums:
taskOutputParameter:
outputParameterKey: pipelinechannel--double-2-Output
producerTask: for-loop-2
taskInfo:
name: nested-add
outputDefinitions:
parameters:
Output:
parameterType: NUMBER_INTEGER
schemaVersion: 2.1.0
sdkVersion: kfp-2.0.0-beta.11

View File

@ -0,0 +1,37 @@
from typing import List
from kfp import compiler
from kfp import dsl
@dsl.component
def double(num: int) -> int:
return 2 * num
@dsl.component
def add(nums: List[List[int]]) -> int:
import itertools
return sum(itertools.chain(*nums))
@dsl.component
def add_two_nums(x: int, y: int) -> int:
return x + y
@dsl.pipeline
def math_pipeline() -> List[int]:
with dsl.ParallelFor([1, 2, 3]) as x:
with dsl.ParallelFor([1, 2, 3]) as y:
t1 = double(num=x)
t2 = double(num=y)
t3 = add_two_nums(x=t1.output, y=t2.output)
t4 = add(nums=dsl.Collected(t3.output))
return dsl.Collected(t3.output)
if __name__ == '__main__':
compiler.Compiler().compile(
pipeline_func=math_pipeline,
package_path=__file__.replace('.py', '.yaml'))

View File

@ -0,0 +1,286 @@
# PIPELINE DEFINITION
# Name: math-pipeline
# Outputs:
# Output: list
components:
comp-add:
executorLabel: exec-add
inputDefinitions:
parameters:
nums:
parameterType: LIST
outputDefinitions:
parameters:
Output:
parameterType: NUMBER_INTEGER
comp-add-two-nums:
executorLabel: exec-add-two-nums
inputDefinitions:
parameters:
x:
parameterType: NUMBER_INTEGER
y:
parameterType: NUMBER_INTEGER
outputDefinitions:
parameters:
Output:
parameterType: NUMBER_INTEGER
comp-double:
executorLabel: exec-double
inputDefinitions:
parameters:
num:
parameterType: NUMBER_INTEGER
outputDefinitions:
parameters:
Output:
parameterType: NUMBER_INTEGER
comp-double-2:
executorLabel: exec-double-2
inputDefinitions:
parameters:
num:
parameterType: NUMBER_INTEGER
outputDefinitions:
parameters:
Output:
parameterType: NUMBER_INTEGER
comp-for-loop-2:
dag:
outputs:
parameters:
pipelinechannel--add-two-nums-Output:
valueFromParameter:
outputParameterKey: pipelinechannel--add-two-nums-Output
producerSubtask: for-loop-4
tasks:
for-loop-4:
componentRef:
name: comp-for-loop-4
inputs:
parameters:
pipelinechannel--loop-item-param-1:
componentInputParameter: pipelinechannel--loop-item-param-1
parameterIterator:
itemInput: pipelinechannel--loop-item-param-3
items:
raw: '[1, 2, 3]'
taskInfo:
name: for-loop-4
inputDefinitions:
parameters:
pipelinechannel--loop-item-param-1:
parameterType: NUMBER_INTEGER
outputDefinitions:
parameters:
pipelinechannel--add-two-nums-Output:
parameterType: NUMBER_INTEGER
comp-for-loop-4:
dag:
outputs:
parameters:
pipelinechannel--add-two-nums-Output:
valueFromParameter:
outputParameterKey: Output
producerSubtask: add-two-nums
tasks:
add-two-nums:
cachingOptions:
enableCache: true
componentRef:
name: comp-add-two-nums
dependentTasks:
- double
- double-2
inputs:
parameters:
x:
taskOutputParameter:
outputParameterKey: Output
producerTask: double
y:
taskOutputParameter:
outputParameterKey: Output
producerTask: double-2
taskInfo:
name: add-two-nums
double:
cachingOptions:
enableCache: true
componentRef:
name: comp-double
inputs:
parameters:
num:
componentInputParameter: pipelinechannel--loop-item-param-1
taskInfo:
name: double
double-2:
cachingOptions:
enableCache: true
componentRef:
name: comp-double-2
inputs:
parameters:
num:
componentInputParameter: pipelinechannel--loop-item-param-3
taskInfo:
name: double-2
inputDefinitions:
parameters:
pipelinechannel--loop-item-param-1:
parameterType: NUMBER_INTEGER
pipelinechannel--loop-item-param-3:
parameterType: NUMBER_INTEGER
outputDefinitions:
parameters:
pipelinechannel--add-two-nums-Output:
parameterType: NUMBER_INTEGER
deploymentSpec:
executors:
exec-add:
container:
args:
- --executor_input
- '{{$}}'
- --function_to_execute
- add
command:
- sh
- -c
- "\nif ! [ -x \"$(command -v pip)\" ]; then\n python3 -m ensurepip ||\
\ python3 -m ensurepip --user || apt-get install python3-pip\nfi\n\nPIP_DISABLE_PIP_VERSION_CHECK=1\
\ python3 -m pip install --quiet --no-warn-script-location 'kfp==2.0.0-beta.11'\
\ && \"$0\" \"$@\"\n"
- sh
- -ec
- 'program_path=$(mktemp -d)
printf "%s" "$0" > "$program_path/ephemeral_component.py"
python3 -m kfp.components.executor_main --component_module_path "$program_path/ephemeral_component.py" "$@"
'
- "\nimport kfp\nfrom kfp import dsl\nfrom kfp.dsl import *\nfrom typing import\
\ *\n\ndef add(nums: List[int]) -> int:\n import itertools\n return\
\ sum(itertools.chain(*nums))\n\n"
image: python:3.7
exec-add-two-nums:
container:
args:
- --executor_input
- '{{$}}'
- --function_to_execute
- add_two_nums
command:
- sh
- -c
- "\nif ! [ -x \"$(command -v pip)\" ]; then\n python3 -m ensurepip ||\
\ python3 -m ensurepip --user || apt-get install python3-pip\nfi\n\nPIP_DISABLE_PIP_VERSION_CHECK=1\
\ python3 -m pip install --quiet --no-warn-script-location 'kfp==2.0.0-beta.11'\
\ && \"$0\" \"$@\"\n"
- sh
- -ec
- 'program_path=$(mktemp -d)
printf "%s" "$0" > "$program_path/ephemeral_component.py"
python3 -m kfp.components.executor_main --component_module_path "$program_path/ephemeral_component.py" "$@"
'
- "\nimport kfp\nfrom kfp import dsl\nfrom kfp.dsl import *\nfrom typing import\
\ *\n\ndef add_two_nums(x: int, y: int) -> int:\n return x + y\n\n"
image: python:3.7
exec-double:
container:
args:
- --executor_input
- '{{$}}'
- --function_to_execute
- double
command:
- sh
- -c
- "\nif ! [ -x \"$(command -v pip)\" ]; then\n python3 -m ensurepip ||\
\ python3 -m ensurepip --user || apt-get install python3-pip\nfi\n\nPIP_DISABLE_PIP_VERSION_CHECK=1\
\ python3 -m pip install --quiet --no-warn-script-location 'kfp==2.0.0-beta.11'\
\ && \"$0\" \"$@\"\n"
- sh
- -ec
- 'program_path=$(mktemp -d)
printf "%s" "$0" > "$program_path/ephemeral_component.py"
python3 -m kfp.components.executor_main --component_module_path "$program_path/ephemeral_component.py" "$@"
'
- "\nimport kfp\nfrom kfp import dsl\nfrom kfp.dsl import *\nfrom typing import\
\ *\n\ndef double(num: int) -> int:\n return 2 * num\n\n"
image: python:3.7
exec-double-2:
container:
args:
- --executor_input
- '{{$}}'
- --function_to_execute
- double
command:
- sh
- -c
- "\nif ! [ -x \"$(command -v pip)\" ]; then\n python3 -m ensurepip ||\
\ python3 -m ensurepip --user || apt-get install python3-pip\nfi\n\nPIP_DISABLE_PIP_VERSION_CHECK=1\
\ python3 -m pip install --quiet --no-warn-script-location 'kfp==2.0.0-beta.11'\
\ && \"$0\" \"$@\"\n"
- sh
- -ec
- 'program_path=$(mktemp -d)
printf "%s" "$0" > "$program_path/ephemeral_component.py"
python3 -m kfp.components.executor_main --component_module_path "$program_path/ephemeral_component.py" "$@"
'
- "\nimport kfp\nfrom kfp import dsl\nfrom kfp.dsl import *\nfrom typing import\
\ *\n\ndef double(num: int) -> int:\n return 2 * num\n\n"
image: python:3.7
pipelineInfo:
name: math-pipeline
root:
dag:
outputs:
parameters:
Output:
valueFromParameter:
outputParameterKey: pipelinechannel--add-two-nums-Output
producerSubtask: for-loop-2
tasks:
add:
cachingOptions:
enableCache: true
componentRef:
name: comp-add
dependentTasks:
- for-loop-2
inputs:
parameters:
nums:
taskOutputParameter:
outputParameterKey: pipelinechannel--add-two-nums-Output
producerTask: for-loop-2
taskInfo:
name: add
for-loop-2:
componentRef:
name: comp-for-loop-2
parameterIterator:
itemInput: pipelinechannel--loop-item-param-1
items:
raw: '[1, 2, 3]'
taskInfo:
name: for-loop-2
outputDefinitions:
parameters:
Output:
parameterType: LIST
schemaVersion: 2.1.0
sdkVersion: kfp-2.0.0-beta.11

View File

@ -0,0 +1,61 @@
from typing import List, NamedTuple
from kfp import compiler
from kfp import dsl
@dsl.component
def double(num: int) -> int:
return 2 * num
@dsl.component
def add(nums: List[List[int]]) -> int:
import itertools
return sum(itertools.chain(*nums))
@dsl.pipeline
def double_pipeline(num: int) -> int:
return double(num=num).output
@dsl.component
def echo_and_return(string: str) -> str:
print(string)
return string
@dsl.component
def join_and_print(strings: List[str]):
print(''.join(strings))
@dsl.pipeline
def add_pipeline(
nums: List[List[int]]
) -> NamedTuple('Outpus', [('out1', int), ('out2', List[str])]):
with dsl.ParallelFor(['m', 'a', 't', 'h']) as char:
id_task = echo_and_return(string=char)
Outputs = NamedTuple('Outpus', [('out1', int), ('out2', List[str])])
return Outputs(
out1=add(nums=nums).output, out2=dsl.Collected(id_task.output))
@dsl.pipeline
def math_pipeline() -> int:
with dsl.ParallelFor([1, 2, 3]) as x:
with dsl.ParallelFor([1, 2, 3]) as x:
t1 = double_pipeline(num=x)
t2 = add_pipeline(nums=dsl.Collected(t1.output))
join_and_print(strings=t2.outputs['out2'])
return t2.outputs['out1']
if __name__ == '__main__':
compiler.Compiler().compile(
pipeline_func=math_pipeline,
package_path=__file__.replace('.py', '.yaml'))

View File

@ -0,0 +1,359 @@
# PIPELINE DEFINITION
# Name: math-pipeline
# Outputs:
# Output: int
components:
comp-add:
executorLabel: exec-add
inputDefinitions:
parameters:
nums:
parameterType: LIST
outputDefinitions:
parameters:
Output:
parameterType: NUMBER_INTEGER
comp-add-pipeline:
dag:
outputs:
parameters:
out1:
valueFromParameter:
outputParameterKey: Output
producerSubtask: add
out2:
valueFromParameter:
outputParameterKey: pipelinechannel--echo-and-return-Output
producerSubtask: for-loop-2
tasks:
add:
cachingOptions:
enableCache: true
componentRef:
name: comp-add
inputs:
parameters:
nums:
componentInputParameter: nums
taskInfo:
name: add
for-loop-2:
componentRef:
name: comp-for-loop-2-2
parameterIterator:
itemInput: pipelinechannel--loop-item-param-1
items:
raw: '["m", "a", "t", "h"]'
taskInfo:
name: for-loop-2
inputDefinitions:
parameters:
nums:
parameterType: LIST
outputDefinitions:
parameters:
out1:
parameterType: NUMBER_INTEGER
out2:
parameterType: LIST
comp-double:
executorLabel: exec-double
inputDefinitions:
parameters:
num:
parameterType: NUMBER_INTEGER
outputDefinitions:
parameters:
Output:
parameterType: NUMBER_INTEGER
comp-double-pipeline:
dag:
outputs:
parameters:
Output:
valueFromParameter:
outputParameterKey: Output
producerSubtask: double
tasks:
double:
cachingOptions:
enableCache: true
componentRef:
name: comp-double
inputs:
parameters:
num:
componentInputParameter: num
taskInfo:
name: double
inputDefinitions:
parameters:
num:
parameterType: NUMBER_INTEGER
outputDefinitions:
parameters:
Output:
parameterType: NUMBER_INTEGER
comp-echo-and-return:
executorLabel: exec-echo-and-return
inputDefinitions:
parameters:
string:
parameterType: STRING
outputDefinitions:
parameters:
Output:
parameterType: STRING
comp-for-loop-2:
dag:
outputs:
parameters:
pipelinechannel--double-pipeline-Output:
valueFromParameter:
outputParameterKey: pipelinechannel--double-pipeline-Output
producerSubtask: for-loop-4
tasks:
for-loop-4:
componentRef:
name: comp-for-loop-4
parameterIterator:
itemInput: pipelinechannel--loop-item-param-3
items:
raw: '[1, 2, 3]'
taskInfo:
name: for-loop-4
inputDefinitions:
parameters:
pipelinechannel--loop-item-param-1:
parameterType: NUMBER_INTEGER
outputDefinitions:
parameters:
pipelinechannel--double-pipeline-Output:
parameterType: NUMBER_INTEGER
comp-for-loop-2-2:
dag:
outputs:
parameters:
pipelinechannel--echo-and-return-Output:
valueFromParameter:
outputParameterKey: Output
producerSubtask: echo-and-return
tasks:
echo-and-return:
cachingOptions:
enableCache: true
componentRef:
name: comp-echo-and-return
inputs:
parameters:
string:
componentInputParameter: pipelinechannel--loop-item-param-1
taskInfo:
name: echo-and-return
inputDefinitions:
parameters:
pipelinechannel--loop-item-param-1:
parameterType: STRING
outputDefinitions:
parameters:
pipelinechannel--echo-and-return-Output:
parameterType: STRING
comp-for-loop-4:
dag:
outputs:
parameters:
pipelinechannel--double-pipeline-Output:
valueFromParameter:
outputParameterKey: Output
producerSubtask: double-pipeline
tasks:
double-pipeline:
cachingOptions:
enableCache: true
componentRef:
name: comp-double-pipeline
inputs:
parameters:
num:
componentInputParameter: pipelinechannel--loop-item-param-3
taskInfo:
name: double-pipeline
inputDefinitions:
parameters:
pipelinechannel--loop-item-param-3:
parameterType: NUMBER_INTEGER
outputDefinitions:
parameters:
pipelinechannel--double-pipeline-Output:
parameterType: NUMBER_INTEGER
comp-join-and-print:
executorLabel: exec-join-and-print
inputDefinitions:
parameters:
strings:
parameterType: LIST
deploymentSpec:
executors:
exec-add:
container:
args:
- --executor_input
- '{{$}}'
- --function_to_execute
- add
command:
- sh
- -c
- "\nif ! [ -x \"$(command -v pip)\" ]; then\n python3 -m ensurepip ||\
\ python3 -m ensurepip --user || apt-get install python3-pip\nfi\n\nPIP_DISABLE_PIP_VERSION_CHECK=1\
\ python3 -m pip install --quiet --no-warn-script-location 'kfp==2.0.0-beta.11'\
\ && \"$0\" \"$@\"\n"
- sh
- -ec
- 'program_path=$(mktemp -d)
printf "%s" "$0" > "$program_path/ephemeral_component.py"
python3 -m kfp.components.executor_main --component_module_path "$program_path/ephemeral_component.py" "$@"
'
- "\nimport kfp\nfrom kfp import dsl\nfrom kfp.dsl import *\nfrom typing import\
\ *\n\ndef add(nums: List[List[int]]) -> int:\n import itertools\n \
\ return sum(itertools.chain(*nums))\n\n"
image: python:3.7
exec-double:
container:
args:
- --executor_input
- '{{$}}'
- --function_to_execute
- double
command:
- sh
- -c
- "\nif ! [ -x \"$(command -v pip)\" ]; then\n python3 -m ensurepip ||\
\ python3 -m ensurepip --user || apt-get install python3-pip\nfi\n\nPIP_DISABLE_PIP_VERSION_CHECK=1\
\ python3 -m pip install --quiet --no-warn-script-location 'kfp==2.0.0-beta.11'\
\ && \"$0\" \"$@\"\n"
- sh
- -ec
- 'program_path=$(mktemp -d)
printf "%s" "$0" > "$program_path/ephemeral_component.py"
python3 -m kfp.components.executor_main --component_module_path "$program_path/ephemeral_component.py" "$@"
'
- "\nimport kfp\nfrom kfp import dsl\nfrom kfp.dsl import *\nfrom typing import\
\ *\n\ndef double(num: int) -> int:\n return 2 * num\n\n"
image: python:3.7
exec-echo-and-return:
container:
args:
- --executor_input
- '{{$}}'
- --function_to_execute
- echo_and_return
command:
- sh
- -c
- "\nif ! [ -x \"$(command -v pip)\" ]; then\n python3 -m ensurepip ||\
\ python3 -m ensurepip --user || apt-get install python3-pip\nfi\n\nPIP_DISABLE_PIP_VERSION_CHECK=1\
\ python3 -m pip install --quiet --no-warn-script-location 'kfp==2.0.0-beta.11'\
\ && \"$0\" \"$@\"\n"
- sh
- -ec
- 'program_path=$(mktemp -d)
printf "%s" "$0" > "$program_path/ephemeral_component.py"
python3 -m kfp.components.executor_main --component_module_path "$program_path/ephemeral_component.py" "$@"
'
- "\nimport kfp\nfrom kfp import dsl\nfrom kfp.dsl import *\nfrom typing import\
\ *\n\ndef echo_and_return(string: str) -> str:\n print(string)\n \
\ return string\n\n"
image: python:3.7
exec-join-and-print:
container:
args:
- --executor_input
- '{{$}}'
- --function_to_execute
- join_and_print
command:
- sh
- -c
- "\nif ! [ -x \"$(command -v pip)\" ]; then\n python3 -m ensurepip ||\
\ python3 -m ensurepip --user || apt-get install python3-pip\nfi\n\nPIP_DISABLE_PIP_VERSION_CHECK=1\
\ python3 -m pip install --quiet --no-warn-script-location 'kfp==2.0.0-beta.11'\
\ && \"$0\" \"$@\"\n"
- sh
- -ec
- 'program_path=$(mktemp -d)
printf "%s" "$0" > "$program_path/ephemeral_component.py"
python3 -m kfp.components.executor_main --component_module_path "$program_path/ephemeral_component.py" "$@"
'
- "\nimport kfp\nfrom kfp import dsl\nfrom kfp.dsl import *\nfrom typing import\
\ *\n\ndef join_and_print(strings: List[str]):\n print(''.join(strings))\n\
\n"
image: python:3.7
pipelineInfo:
name: math-pipeline
root:
dag:
outputs:
parameters:
Output:
valueFromParameter:
outputParameterKey: out1
producerSubtask: add-pipeline
tasks:
add-pipeline:
cachingOptions:
enableCache: true
componentRef:
name: comp-add-pipeline
dependentTasks:
- for-loop-2
inputs:
parameters:
nums:
taskOutputParameter:
outputParameterKey: pipelinechannel--double-pipeline-Output
producerTask: for-loop-2
taskInfo:
name: add-pipeline
for-loop-2:
componentRef:
name: comp-for-loop-2
parameterIterator:
itemInput: pipelinechannel--loop-item-param-1
items:
raw: '[1, 2, 3]'
taskInfo:
name: for-loop-2
join-and-print:
cachingOptions:
enableCache: true
componentRef:
name: comp-join-and-print
dependentTasks:
- add-pipeline
inputs:
parameters:
strings:
taskOutputParameter:
outputParameterKey: out2
producerTask: add-pipeline
taskInfo:
name: join-and-print
outputDefinitions:
parameters:
Output:
parameterType: NUMBER_INTEGER
schemaVersion: 2.1.0
sdkVersion: kfp-2.0.0-beta.11

View File

@ -0,0 +1,28 @@
from typing import List
from kfp import compiler
from kfp import dsl
@dsl.component
def double(num: int) -> int:
return 2 * num
@dsl.component
def add(nums: List[int]) -> int:
return sum(nums)
@dsl.pipeline
def math_pipeline() -> List[int]:
with dsl.ParallelFor([1, 2, 3]) as x:
t = double(num=x)
add(nums=dsl.Collected(t.output))
return dsl.Collected(t.output)
if __name__ == '__main__':
compiler.Compiler().compile(
pipeline_func=math_pipeline,
package_path=__file__.replace('.py', '.yaml'))

View File

@ -0,0 +1,148 @@
# PIPELINE DEFINITION
# Name: math-pipeline
# Outputs:
# Output: list
components:
comp-add:
executorLabel: exec-add
inputDefinitions:
parameters:
nums:
parameterType: LIST
outputDefinitions:
parameters:
Output:
parameterType: NUMBER_INTEGER
comp-double:
executorLabel: exec-double
inputDefinitions:
parameters:
num:
parameterType: NUMBER_INTEGER
outputDefinitions:
parameters:
Output:
parameterType: NUMBER_INTEGER
comp-for-loop-2:
dag:
outputs:
parameters:
pipelinechannel--double-Output:
valueFromParameter:
outputParameterKey: Output
producerSubtask: double
tasks:
double:
cachingOptions:
enableCache: true
componentRef:
name: comp-double
inputs:
parameters:
num:
componentInputParameter: pipelinechannel--loop-item-param-1
taskInfo:
name: double
inputDefinitions:
parameters:
pipelinechannel--loop-item-param-1:
parameterType: NUMBER_INTEGER
outputDefinitions:
parameters:
pipelinechannel--double-Output:
parameterType: NUMBER_INTEGER
deploymentSpec:
executors:
exec-add:
container:
args:
- --executor_input
- '{{$}}'
- --function_to_execute
- add
command:
- sh
- -c
- "\nif ! [ -x \"$(command -v pip)\" ]; then\n python3 -m ensurepip ||\
\ python3 -m ensurepip --user || apt-get install python3-pip\nfi\n\nPIP_DISABLE_PIP_VERSION_CHECK=1\
\ python3 -m pip install --quiet --no-warn-script-location 'kfp==2.0.0-beta.11'\
\ && \"$0\" \"$@\"\n"
- sh
- -ec
- 'program_path=$(mktemp -d)
printf "%s" "$0" > "$program_path/ephemeral_component.py"
python3 -m kfp.components.executor_main --component_module_path "$program_path/ephemeral_component.py" "$@"
'
- "\nimport kfp\nfrom kfp import dsl\nfrom kfp.dsl import *\nfrom typing import\
\ *\n\ndef add(nums: List[int]) -> int:\n return sum(nums)\n\n"
image: python:3.7
exec-double:
container:
args:
- --executor_input
- '{{$}}'
- --function_to_execute
- double
command:
- sh
- -c
- "\nif ! [ -x \"$(command -v pip)\" ]; then\n python3 -m ensurepip ||\
\ python3 -m ensurepip --user || apt-get install python3-pip\nfi\n\nPIP_DISABLE_PIP_VERSION_CHECK=1\
\ python3 -m pip install --quiet --no-warn-script-location 'kfp==2.0.0-beta.11'\
\ && \"$0\" \"$@\"\n"
- sh
- -ec
- 'program_path=$(mktemp -d)
printf "%s" "$0" > "$program_path/ephemeral_component.py"
python3 -m kfp.components.executor_main --component_module_path "$program_path/ephemeral_component.py" "$@"
'
- "\nimport kfp\nfrom kfp import dsl\nfrom kfp.dsl import *\nfrom typing import\
\ *\n\ndef double(num: int) -> int:\n return 2 * num\n\n"
image: python:3.7
pipelineInfo:
name: math-pipeline
root:
dag:
outputs:
parameters:
Output:
valueFromParameter:
outputParameterKey: pipelinechannel--double-Output
producerSubtask: for-loop-2
tasks:
add:
cachingOptions:
enableCache: true
componentRef:
name: comp-add
dependentTasks:
- for-loop-2
inputs:
parameters:
nums:
taskOutputParameter:
outputParameterKey: pipelinechannel--double-Output
producerTask: for-loop-2
taskInfo:
name: add
for-loop-2:
componentRef:
name: comp-for-loop-2
parameterIterator:
itemInput: pipelinechannel--loop-item-param-1
items:
raw: '[1, 2, 3]'
taskInfo:
name: for-loop-2
outputDefinitions:
parameters:
Output:
parameterType: LIST
schemaVersion: 2.1.0
sdkVersion: kfp-2.0.0-beta.11

View File

@ -144,6 +144,18 @@ pipelines:
- module: components_with_optional_artifacts
name: pipeline
execute: false
- module: parallelfor_fan_in/fan_in_complex
name: math_pipeline
execute: false
- module: parallelfor_fan_in/pipeline_producer_consumer
name: math_pipeline
execute: false
- module: parallelfor_fan_in/simple_with_parameters
name: math_pipeline
execute: false
- module: parallelfor_fan_in/conditional_producer_and_consumers
name: math_pipeline
execute: false
components:
test_data_dir: sdk/python/test_data/components
read: true