feat(sdk): support dsl.If, dsl.Elif, and dsl.Else (#9894)

* support if/elif/else

* deprecate dsl.Condition

* alter rebase

* update release notes

* address review feedback

* change BinaryOperation to ConditionOperation
This commit is contained in:
Connor McCarthy 2023-09-11 13:19:35 -07:00 committed by GitHub
parent 1791818323
commit c6b236d1a0
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
16 changed files with 2183 additions and 65 deletions

View File

@ -2,6 +2,7 @@
## Features
* Add support for `dsl.If`, `dsl.Elif`, and `dsl.Else` control flow context managers; deprecate `dsl.Condition` in favor of `dsl.If` [\#9894](https://github.com/kubeflow/pipelines/pull/9894)
## Breaking changes

View File

@ -40,6 +40,7 @@ from kfp.dsl import Output
from kfp.dsl import OutputPath
from kfp.dsl import pipeline_task
from kfp.dsl import PipelineTaskFinalStatus
from kfp.dsl import tasks_group
from kfp.dsl import yaml_component
from kfp.dsl.types import type_utils
from kfp.pipeline_spec import pipeline_spec_pb2
@ -4161,5 +4162,330 @@ class ExtractInputOutputDescription(unittest.TestCase):
'Component output artifact.')
@dsl.component
def flip_coin() -> str:
import random
return 'heads' if random.randint(0, 1) == 0 else 'tails'
@dsl.component
def print_and_return(text: str) -> str:
print(text)
return text
@dsl.component
def flip_three_sided_coin() -> str:
import random
val = random.randint(0, 2)
if val == 0:
return 'heads'
elif val == 1:
return 'tails'
else:
return 'draw'
@dsl.component
def int_zero_through_three() -> int:
import random
return random.randint(0, 3)
class TestConditionLogic(unittest.TestCase):
def test_if(self):
@dsl.pipeline
def flip_coin_pipeline():
flip_coin_task = flip_coin()
with dsl.If(flip_coin_task.output == 'heads'):
print_and_return(text='Got heads!')
self.assertEqual(
flip_coin_pipeline.pipeline_spec.root.dag.tasks['condition-1']
.trigger_policy.condition,
"inputs.parameter_values['pipelinechannel--flip-coin-Output'] == 'heads'"
)
def test_if_else(self):
@dsl.pipeline
def flip_coin_pipeline():
flip_coin_task = flip_coin()
with dsl.If(flip_coin_task.output == 'heads'):
print_and_return(text='Got heads!')
with dsl.Else():
print_and_return(text='Got tails!')
self.assertEqual(
flip_coin_pipeline.pipeline_spec.root.dag.tasks['condition-1']
.trigger_policy.condition,
"inputs.parameter_values['pipelinechannel--flip-coin-Output'] == 'heads'"
)
self.assertEqual(
flip_coin_pipeline.pipeline_spec.root.dag.tasks['condition-2']
.trigger_policy.condition,
"!(inputs.parameter_values['pipelinechannel--flip-coin-Output'] == 'heads')"
)
def test_if_elif_else(self):
@dsl.pipeline
def flip_coin_pipeline():
flip_coin_task = flip_three_sided_coin()
with dsl.If(flip_coin_task.output == 'heads'):
print_and_return(text='Got heads!')
with dsl.Elif(flip_coin_task.output == 'tails'):
print_and_return(text='Got tails!')
with dsl.Else():
print_and_return(text='Draw!')
self.assertEqual(
flip_coin_pipeline.pipeline_spec.root.dag.tasks['condition-1']
.trigger_policy.condition,
"inputs.parameter_values['pipelinechannel--flip-three-sided-coin-Output'] == 'heads'"
)
self.assertEqual(
flip_coin_pipeline.pipeline_spec.root.dag.tasks['condition-2']
.trigger_policy.condition,
"!(inputs.parameter_values['pipelinechannel--flip-three-sided-coin-Output'] == 'heads') && inputs.parameter_values['pipelinechannel--flip-three-sided-coin-Output'] == 'tails'"
)
self.assertEqual(
flip_coin_pipeline.pipeline_spec.root.dag.tasks['condition-3']
.trigger_policy.condition,
"!(inputs.parameter_values['pipelinechannel--flip-three-sided-coin-Output'] == 'heads') && !(inputs.parameter_values['pipelinechannel--flip-three-sided-coin-Output'] == 'tails')"
)
def test_if_multiple_elif_else(self):
@dsl.pipeline
def int_to_string():
int_task = int_zero_through_three()
with dsl.If(int_task.output == 0):
print_and_return(text='Got zero!')
with dsl.Elif(int_task.output == 1):
print_and_return(text='Got one!')
with dsl.Elif(int_task.output == 2):
print_and_return(text='Got two!')
with dsl.Else():
print_and_return(text='Got three!')
self.assertEqual(
int_to_string.pipeline_spec.root.dag.tasks['condition-1']
.trigger_policy.condition,
"int(inputs.parameter_values['pipelinechannel--int-zero-through-three-Output']) == 0"
)
self.assertEqual(
int_to_string.pipeline_spec.root.dag.tasks['condition-2']
.trigger_policy.condition,
"!(int(inputs.parameter_values['pipelinechannel--int-zero-through-three-Output']) == 0) && int(inputs.parameter_values['pipelinechannel--int-zero-through-three-Output']) == 1"
)
self.assertEqual(
int_to_string.pipeline_spec.root.dag.tasks['condition-3']
.trigger_policy.condition,
"!(int(inputs.parameter_values['pipelinechannel--int-zero-through-three-Output']) == 0) && !(int(inputs.parameter_values['pipelinechannel--int-zero-through-three-Output']) == 1) && int(inputs.parameter_values['pipelinechannel--int-zero-through-three-Output']) == 2"
)
self.assertEqual(
int_to_string.pipeline_spec.root.dag.tasks['condition-4']
.trigger_policy.condition,
"!(int(inputs.parameter_values['pipelinechannel--int-zero-through-three-Output']) == 0) && !(int(inputs.parameter_values['pipelinechannel--int-zero-through-three-Output']) == 1) && !(int(inputs.parameter_values['pipelinechannel--int-zero-through-three-Output']) == 2)"
)
def test_nested_if_elif_else_with_pipeline_param(self):
@dsl.pipeline
def flip_coin_pipeline(confirm: bool):
int_task = int_zero_through_three()
heads_task = flip_coin()
with dsl.If(heads_task.output == 'heads'):
with dsl.If(int_task.output == 0):
print_and_return(text='Got zero!')
with dsl.Elif(int_task.output == 1):
task = print_and_return(text='Got one!')
with dsl.If(confirm == True):
print_and_return(text='Confirmed: definitely got one.')
with dsl.Elif(int_task.output == 2):
print_and_return(text='Got two!')
with dsl.Else():
print_and_return(text='Got three!')
# top level conditions
self.assertEqual(
flip_coin_pipeline.pipeline_spec.root.dag.tasks['condition-1']
.trigger_policy.condition,
"inputs.parameter_values['pipelinechannel--flip-coin-Output'] == 'heads'"
)
# second level nested conditions
self.assertEqual(
flip_coin_pipeline.pipeline_spec.components['comp-condition-1'].dag
.tasks['condition-2'].trigger_policy.condition,
"int(inputs.parameter_values[\'pipelinechannel--int-zero-through-three-Output\']) == 0"
)
self.assertEqual(
flip_coin_pipeline.pipeline_spec.components['comp-condition-1'].dag
.tasks['condition-3'].trigger_policy.condition,
"!(int(inputs.parameter_values['pipelinechannel--int-zero-through-three-Output']) == 0) && int(inputs.parameter_values['pipelinechannel--int-zero-through-three-Output']) == 1"
)
self.assertEqual(
flip_coin_pipeline.pipeline_spec.components['comp-condition-1'].dag
.tasks['condition-5'].trigger_policy.condition,
"!(int(inputs.parameter_values['pipelinechannel--int-zero-through-three-Output']) == 0) && !(int(inputs.parameter_values['pipelinechannel--int-zero-through-three-Output']) == 1) && int(inputs.parameter_values['pipelinechannel--int-zero-through-three-Output']) == 2"
)
self.assertEqual(
flip_coin_pipeline.pipeline_spec.components['comp-condition-1'].dag
.tasks['condition-6'].trigger_policy.condition,
"!(int(inputs.parameter_values['pipelinechannel--int-zero-through-three-Output']) == 0) && !(int(inputs.parameter_values['pipelinechannel--int-zero-through-three-Output']) == 1) && !(int(inputs.parameter_values['pipelinechannel--int-zero-through-three-Output']) == 2)"
)
# third level nested conditions
self.assertEqual(
flip_coin_pipeline.pipeline_spec.components['comp-condition-3'].dag
.tasks['condition-4'].trigger_policy.condition,
"inputs.parameter_values['pipelinechannel--confirm'] == true")
def test_multiple_ifs_permitted(self):
@dsl.pipeline
def flip_coin_pipeline():
flip_coin_task = flip_coin()
with dsl.If(flip_coin_task.output == 'heads'):
print_and_return(text='Got heads!')
with dsl.If(flip_coin_task.output == 'tails'):
print_and_return(text='Got tails!')
self.assertEqual(
flip_coin_pipeline.pipeline_spec.root.dag.tasks['condition-1']
.trigger_policy.condition,
"inputs.parameter_values['pipelinechannel--flip-coin-Output'] == 'heads'"
)
self.assertEqual(
flip_coin_pipeline.pipeline_spec.root.dag.tasks['condition-2']
.trigger_policy.condition,
"inputs.parameter_values['pipelinechannel--flip-coin-Output'] == 'tails'"
)
def test_multiple_else_not_permitted(self):
with self.assertRaisesRegex(
tasks_group.InvalidControlFlowException,
r'Cannot use dsl\.Else following another dsl\.Else\. dsl\.Else can only be used following an upstream dsl\.If or dsl\.Elif\.'
):
@dsl.pipeline
def flip_coin_pipeline():
flip_coin_task = flip_coin()
with dsl.If(flip_coin_task.output == 'heads'):
print_and_return(text='Got heads!')
with dsl.Else():
print_and_return(text='Got tails!')
with dsl.Else():
print_and_return(text='Got tails!')
def test_else_no_if_not_supported(self):
with self.assertRaisesRegex(
tasks_group.InvalidControlFlowException,
r'dsl\.Else can only be used following an upstream dsl\.If or dsl\.Elif\.'
):
@dsl.pipeline
def flip_coin_pipeline():
with dsl.Else():
print_and_return(text='Got unknown')
def test_elif_no_if_not_supported(self):
with self.assertRaisesRegex(
tasks_group.InvalidControlFlowException,
r'dsl\.Elif can only be used following an upstream dsl\.If or dsl\.Elif\.'
):
@dsl.pipeline
def flip_coin_pipeline():
flip_coin_task = flip_coin()
with dsl.Elif(flip_coin_task.output == 'heads'):
print_and_return(text='Got heads!')
def test_boolean_condition_has_helpful_error(self):
with self.assertRaisesRegex(
ValueError,
r'Got constant boolean True as a condition\. This is likely because the provided condition evaluated immediately\. At least one of the operands must be an output from an upstream task or a pipeline parameter\.'
):
@dsl.pipeline
def my_pipeline():
with dsl.Condition('foo' == 'foo'):
print_and_return(text='I will always run.')
def test_boolean_elif_has_helpful_error(self):
with self.assertRaisesRegex(
ValueError,
r'Got constant boolean False as a condition\. This is likely because the provided condition evaluated immediately\. At least one of the operands must be an output from an upstream task or a pipeline parameter\.'
):
@dsl.pipeline
def my_pipeline(text: str):
with dsl.If(text == 'foo'):
print_and_return(text='I will always run.')
with dsl.Elif('foo' == 'bar'):
print_and_return(text='I will never run.')
def test_tasks_instantiated_between_if_else_and_elif_permitted(self):
@dsl.pipeline
def flip_coin_pipeline():
flip_coin_task = flip_coin()
with dsl.If(flip_coin_task.output == 'heads'):
print_and_return(text='Got heads on coin one!')
flip_coin_task_2 = flip_coin()
with dsl.Elif(flip_coin_task_2.output == 'tails'):
print_and_return(text='Got heads on coin two!')
flip_coin_task_3 = flip_coin()
with dsl.Else():
print_and_return(
text=f'Coin three result: {flip_coin_task_3.output}')
self.assertEqual(
flip_coin_pipeline.pipeline_spec.root.dag.tasks['condition-1']
.trigger_policy.condition,
"inputs.parameter_values['pipelinechannel--flip-coin-Output'] == 'heads'"
)
self.assertEqual(
flip_coin_pipeline.pipeline_spec.root.dag.tasks['condition-2']
.trigger_policy.condition,
"!(inputs.parameter_values['pipelinechannel--flip-coin-Output'] == 'heads') && inputs.parameter_values['pipelinechannel--flip-coin-2-Output'] == 'tails'"
)
self.assertEqual(
flip_coin_pipeline.pipeline_spec.root.dag.tasks['condition-3']
.trigger_policy.condition,
"!(inputs.parameter_values['pipelinechannel--flip-coin-Output'] == 'heads') && !(inputs.parameter_values['pipelinechannel--flip-coin-2-Output'] == 'tails')"
)
def test_other_control_flow_instantiated_between_if_else_not_permitted(
self):
with self.assertRaisesRegex(
tasks_group.InvalidControlFlowException,
'dsl\.Else can only be used following an upstream dsl\.If or dsl\.Elif\.'
):
@dsl.pipeline
def flip_coin_pipeline():
flip_coin_task = flip_coin()
with dsl.If(flip_coin_task.output == 'heads'):
print_and_return(text='Got heads!')
with dsl.ParallelFor(['foo', 'bar']) as item:
print_and_return(text=item)
with dsl.Else():
print_and_return(text='Got tails!')
if __name__ == '__main__':
unittest.main()

View File

@ -119,7 +119,18 @@ def get_parent_groups(
return (tasks_to_groups, groups_to_groups)
# TODO: do we really need this?
def get_channels_from_condition(
operations: List[pipeline_channel.ConditionOperation],
collected_channels: list,
) -> None:
"""Appends to collected_channels each pipeline channels used in each
operand of each operation in operations."""
for operation in operations:
for operand in [operation.left_operand, operation.right_operand]:
if isinstance(operand, pipeline_channel.PipelineChannel):
collected_channels.append(operand)
def get_condition_channels_for_tasks(
root_group: tasks_group.TasksGroup,
) -> Mapping[str, Set[pipeline_channel.PipelineChannel]]:
@ -139,16 +150,13 @@ def get_condition_channels_for_tasks(
current_conditions_channels,
):
new_current_conditions_channels = current_conditions_channels
if isinstance(group, tasks_group.Condition):
if isinstance(group, tasks_group._ConditionBase):
new_current_conditions_channels = list(current_conditions_channels)
if isinstance(group.condition.left_operand,
pipeline_channel.PipelineChannel):
new_current_conditions_channels.append(
group.condition.left_operand)
if isinstance(group.condition.right_operand,
pipeline_channel.PipelineChannel):
new_current_conditions_channels.append(
group.condition.right_operand)
get_channels_from_condition(
group.conditions,
new_current_conditions_channels,
)
for task in group.tasks:
for channel in new_current_conditions_channels:
conditions[task.name].add(channel)
@ -661,8 +669,9 @@ def get_dependencies(
dependent_group = group_name_to_group.get(
uncommon_upstream_groups[0], None)
if isinstance(dependent_group,
(tasks_group.Condition, tasks_group.ExitHandler)):
if isinstance(
dependent_group,
(tasks_group._ConditionBase, tasks_group.ExitHandler)):
raise InvalidTopologyException(
f'{ILLEGAL_CROSS_DAG_ERROR_PREFIX} A downstream task cannot depend on an upstream task within a dsl.{dependent_group.__class__.__name__} context unless the downstream is within that context too. Found task {task.name} which depends on upstream task {upstream_task.name} within an uncommon dsl.{dependent_group.__class__.__name__} context.'
)

View File

@ -709,22 +709,38 @@ def _update_task_spec_for_loop_group(
input_name=pipeline_task_spec.parameter_iterator.item_input)
def _resolve_condition_operands(
left_operand: Union[str, pipeline_channel.PipelineChannel],
right_operand: Union[str, pipeline_channel.PipelineChannel],
) -> Tuple[str, str]:
"""Resolves values and PipelineChannels for condition operands.
def _binary_operations_to_cel_conjunctive(
operations: List[pipeline_channel.ConditionOperation]) -> str:
"""Converts a list of ConditionOperation to a CEL string with placeholders.
Each ConditionOperation will be joined the others via the conjunctive (&&).
Args:
left_operand: The left operand of a condition expression.
right_operand: The right operand of a condition expression.
operations: The binary operations to convert to convert and join.
Returns:
A tuple of the resolved operands values:
(left_operand_value, right_operand_value).
The binary operations as a CEL string.
"""
operands = [
_single_binary_operation_to_cel_condition(operation=bin_op)
for bin_op in operations
]
return ' && '.join(operands)
# Pre-scan the operand to get the type of constant value if there's any.
def _single_binary_operation_to_cel_condition(
operation: pipeline_channel.ConditionOperation) -> str:
"""Converts a ConditionOperation to a CEL string with placeholders.
Args:
operation: The binary operation to convert to a string.
Returns:
The binary operation as a CEL string.
"""
left_operand = operation.left_operand
right_operand = operation.right_operand
# cannot make comparisons involving particular types
for value_or_reference in [left_operand, right_operand]:
if isinstance(value_or_reference, pipeline_channel.PipelineChannel):
parameter_type = type_utils.get_parameter_type(
@ -738,8 +754,10 @@ def _resolve_condition_operands(
input_name = compiler_utils.additional_input_name_for_pipeline_channel(
value_or_reference)
raise ValueError(
f'Conditional requires scalar parameter values for comparison. Found input "{input_name}" of type {value_or_reference.channel_type} in pipeline definition instead.'
f'Conditional requires primitive parameter values for comparison. Found input "{input_name}" of type {value_or_reference.channel_type} in pipeline definition instead.'
)
# ensure the types compared are the same or compatible
parameter_types = set()
for value_or_reference in [left_operand, right_operand]:
if isinstance(value_or_reference, pipeline_channel.PipelineChannel):
@ -822,11 +840,16 @@ def _resolve_condition_operands(
operand_values.append(operand_value)
return tuple(operand_values)
left_operand_value, right_operand_value = tuple(operand_values)
condition_string = (
f'{left_operand_value} {operation.operator} {right_operand_value}')
return f'!({condition_string})' if operation.negate else condition_string
def _update_task_spec_for_condition_group(
group: tasks_group.Condition,
group: tasks_group._ConditionBase,
pipeline_task_spec: pipeline_spec_pb2.PipelineTaskSpec,
) -> None:
"""Updates PipelineTaskSpec for condition group.
@ -835,15 +858,9 @@ def _update_task_spec_for_condition_group(
group: The condition group to update task spec for.
pipeline_task_spec: The pipeline task spec to update in place.
"""
left_operand_value, right_operand_value = _resolve_condition_operands(
group.condition.left_operand, group.condition.right_operand)
condition_string = (
f'{left_operand_value} {group.condition.operator} {right_operand_value}'
)
condition = _binary_operations_to_cel_conjunctive(group.conditions)
pipeline_task_spec.trigger_policy.CopyFrom(
pipeline_spec_pb2.PipelineTaskSpec.TriggerPolicy(
condition=condition_string))
pipeline_spec_pb2.PipelineTaskSpec.TriggerPolicy(condition=condition))
def build_task_spec_for_exit_task(
@ -954,7 +971,7 @@ def build_task_spec_for_group(
group=group,
pipeline_task_spec=pipeline_task_spec,
)
elif isinstance(group, tasks_group.Condition):
elif isinstance(group, tasks_group._ConditionBase):
_update_task_spec_for_condition_group(
group=group,
pipeline_task_spec=pipeline_task_spec,
@ -1236,17 +1253,14 @@ def build_spec_by_group(
_build_dag_outputs(subgroup_component_spec,
subgroup_output_channels)
elif isinstance(subgroup, tasks_group.Condition):
elif isinstance(subgroup, tasks_group._ConditionBase):
# "Punch the hole", adding inputs needed by its subgroups or
# tasks.
condition_subgroup_channels = list(subgroup_input_channels)
for operand in [
subgroup.condition.left_operand,
subgroup.condition.right_operand,
]:
if isinstance(operand, pipeline_channel.PipelineChannel):
condition_subgroup_channels.append(operand)
compiler_utils.get_channels_from_condition(
subgroup.conditions, condition_subgroup_channels)
subgroup_component_spec = build_component_spec_for_group(
input_pipeline_channels=condition_subgroup_channels,

View File

@ -237,7 +237,10 @@ if os.environ.get('_KFP_RUNTIME', 'false') != 'true':
from kfp.dsl.placeholders import IfPresentPlaceholder
from kfp.dsl.structures import ContainerSpec
from kfp.dsl.tasks_group import Condition
from kfp.dsl.tasks_group import Elif
from kfp.dsl.tasks_group import Else
from kfp.dsl.tasks_group import ExitHandler
from kfp.dsl.tasks_group import If
from kfp.dsl.tasks_group import ParallelFor
__all__.extend([
'component',
@ -246,6 +249,9 @@ if os.environ.get('_KFP_RUNTIME', 'false') != 'true':
'importer',
'ContainerSpec',
'Condition',
'If',
'Elif',
'Else',
'ExitHandler',
'ParallelFor',
'Collected',

View File

@ -24,17 +24,20 @@ from kfp.dsl.types import type_utils
@dataclasses.dataclass
class ConditionOperator:
"""Represents a condition expression to be used in dsl.Condition().
class ConditionOperation:
"""Represents a condition expression to be used in condition control flow
group.
Attributes:
operator: The operator of the condition.
left_operand: The left operand.
right_operand: The right operand.
negate: Whether to negate the result of the binary operation.
"""
operator: str
left_operand: Union['PipelineParameterChannel', type_utils.PARAMETER_TYPES]
right_operand: Union['PipelineParameterChannel', type_utils.PARAMETER_TYPES]
negate: bool = False
# The string template used to generate the placeholder of a PipelineChannel.
@ -149,22 +152,22 @@ class PipelineChannel(abc.ABC):
return hash(self.pattern)
def __eq__(self, other):
return ConditionOperator('==', self, other)
return ConditionOperation('==', self, other)
def __ne__(self, other):
return ConditionOperator('!=', self, other)
return ConditionOperation('!=', self, other)
def __lt__(self, other):
return ConditionOperator('<', self, other)
return ConditionOperation('<', self, other)
def __le__(self, other):
return ConditionOperator('<=', self, other)
return ConditionOperation('<=', self, other)
def __gt__(self, other):
return ConditionOperator('>', self, other)
return ConditionOperation('>', self, other)
def __ge__(self, other):
return ConditionOperator('>=', self, other)
return ConditionOperation('>=', self, other)
class PipelineParameterChannel(PipelineChannel):

View File

@ -189,6 +189,12 @@ class Pipeline:
"""Removes the current TasksGroup from the stack."""
del self.groups[-1]
def get_last_tasks_group(self) -> Optional['tasks_group.TasksGroup']:
"""Gets the last TasksGroup added to the pipeline at the current level
of the pipeline definition."""
groups = self.groups[-1].groups
return groups[-1] if groups else None
def remove_task_from_groups(self, task: pipeline_task.PipelineTask):
"""Removes a task from the pipeline.

View File

@ -13,8 +13,10 @@
# limitations under the License.
"""Definition for TasksGroup."""
import copy
import enum
from typing import Optional, Union
from typing import List, Optional, Union
import warnings
from kfp.dsl import for_loop
from kfp.dsl import pipeline_channel
@ -52,7 +54,7 @@ class TasksGroup:
group_type: TasksGroupType,
name: Optional[str] = None,
is_root: bool = False,
):
) -> None:
"""Create a new instance of TasksGroup.
Args:
@ -117,7 +119,7 @@ class ExitHandler(TasksGroup):
self,
exit_task: pipeline_task.PipelineTask,
name: Optional[str] = None,
):
) -> None:
"""Initializes a Condition task group."""
super().__init__(
group_type=TasksGroupType.EXIT_HANDLER,
@ -138,9 +140,31 @@ class ExitHandler(TasksGroup):
self.exit_task = exit_task
class Condition(TasksGroup):
"""A class for creating conditional control flow within a pipeline
definition.
class _ConditionBase(TasksGroup):
"""Parent class for condition control flow context managers (Condition, If,
Elif, Else).
Args:
condition: A list of binary operations to be combined via conjunction.
name: The name of the condition group.
"""
def __init__(
self,
conditions: List[pipeline_channel.ConditionOperation],
name: Optional[str] = None,
) -> None:
super().__init__(
group_type=TasksGroupType.CONDITION,
name=name,
is_root=False,
)
self.conditions: List[pipeline_channel.ConditionOperation] = conditions
class If(_ConditionBase):
"""A class for creating a conditional control flow "if" block within a
pipeline.
Args:
condition: A comparative expression that evaluates to True or False. At least one of the operands must be an output from an upstream task or a pipeline parameter.
@ -150,22 +174,151 @@ class Condition(TasksGroup):
::
task1 = my_component1(...)
with Condition(task1.output=='pizza', 'pizza-condition'):
with dsl.If(task1.output=='pizza', 'pizza-condition'):
task2 = my_component2(...)
"""
def __init__(
self,
condition: pipeline_channel.ConditionOperator,
condition,
name: Optional[str] = None,
):
"""Initializes a conditional task group."""
) -> None:
super().__init__(
group_type=TasksGroupType.CONDITION,
conditions=[condition],
name=name,
is_root=False,
)
self.condition = condition
if isinstance(condition, bool):
raise ValueError(
f'Got constant boolean {condition} as a condition. This is likely because the provided condition evaluated immediately. At least one of the operands must be an output from an upstream task or a pipeline parameter.'
)
copied_condition = copy.copy(condition)
copied_condition.negate = True
self._negated_upstream_conditions = [copied_condition]
class Condition(If):
"""Deprecated.
Use dsl.If instead.
"""
def __enter__(self):
super().__enter__()
warnings.warn(
'dsl.Condition is deprecated. Please use dsl.If instead.',
category=DeprecationWarning,
stacklevel=2)
return self
class Elif(_ConditionBase):
"""A class for creating a conditional control flow "else if" block within a
pipeline. Can be used following an upstream dsl.If or dsl.Elif.
Args:
condition: A comparative expression that evaluates to True or False. At least one of the operands must be an output from an upstream task or a pipeline parameter.
name: The name of the condition group.
Example:
::
task1 = my_component1(...)
task2 = my_component2(...)
with dsl.If(task1.output=='pizza', 'pizza-condition'):
task3 = my_component3(...)
with dsl.Elif(task2.output=='pasta', 'pasta-condition'):
task4 = my_component4(...)
"""
def __init__(
self,
condition,
name: Optional[str] = None,
) -> None:
prev_cond = pipeline_context.Pipeline.get_default_pipeline(
).get_last_tasks_group()
if not isinstance(prev_cond, (Condition, If, Elif)):
# prefer pushing toward dsl.If rather than dsl.Condition for syntactic consistency with the if-elif-else keywords in Python
raise InvalidControlFlowException(
'dsl.Elif can only be used following an upstream dsl.If or dsl.Elif.'
)
if isinstance(condition, bool):
raise ValueError(
f'Got constant boolean {condition} as a condition. This is likely because the provided condition evaluated immediately. At least one of the operands must be an output from an upstream task or a pipeline parameter.'
)
copied_condition = copy.copy(condition)
copied_condition.negate = True
self._negated_upstream_conditions = _shallow_copy_list_of_binary_operations(
prev_cond._negated_upstream_conditions) + [copied_condition]
conditions = _shallow_copy_list_of_binary_operations(
prev_cond._negated_upstream_conditions)
conditions.append(condition)
super().__init__(
conditions=conditions,
name=name,
)
class Else(_ConditionBase):
"""A class for creating a conditional control flow "else" block within a
pipeline. Can be used following an upstream dsl.If or dsl.Elif.
Args:
name: The name of the condition group.
Example:
::
task1 = my_component1(...)
task2 = my_component2(...)
with dsl.If(task1.output=='pizza', 'pizza-condition'):
task3 = my_component3(...)
with dsl.Elif(task2.output=='pasta', 'pasta-condition'):
task4 = my_component4(...)
with dsl.Else():
my_component5(...)
"""
def __init__(
self,
name: Optional[str] = None,
) -> None:
prev_cond = pipeline_context.Pipeline.get_default_pipeline(
).get_last_tasks_group()
if isinstance(prev_cond, Else):
# prefer pushing toward dsl.If rather than dsl.Condition for syntactic consistency with the if-elif-else keywords in Python
raise InvalidControlFlowException(
'Cannot use dsl.Else following another dsl.Else. dsl.Else can only be used following an upstream dsl.If or dsl.Elif.'
)
if not isinstance(prev_cond, (Condition, If, Elif)):
# prefer pushing toward dsl.If rather than dsl.Condition for syntactic consistency with the if-elif-else keywords in Python
raise InvalidControlFlowException(
'dsl.Else can only be used following an upstream dsl.If or dsl.Elif.'
)
super().__init__(
conditions=prev_cond._negated_upstream_conditions,
name=name,
)
class InvalidControlFlowException(Exception):
pass
def _shallow_copy_list_of_binary_operations(
operations: List[pipeline_channel.ConditionOperation]
) -> List[pipeline_channel.ConditionOperation]:
# shallow copy is sufficient to allow us to invert the negate flag of a ConditionOperation without affecting copies. deep copy not needed and would result in many copies of the full pipeline since PipelineChannels hold references to the pipeline.
return [copy.copy(operation) for operation in operations]
class ParallelFor(TasksGroup):
@ -198,7 +351,7 @@ class ParallelFor(TasksGroup):
items: Union[for_loop.ItemList, pipeline_channel.PipelineChannel],
name: Optional[str] = None,
parallelism: Optional[int] = None,
):
) -> None:
"""Initializes a for loop task group."""
parallelism = parallelism or 0
if parallelism < 0:

View File

@ -12,13 +12,15 @@
# See the License for the specific language governing permissions and
# limitations under the License.
from absl.testing import parameterized
import unittest
from kfp import dsl
from kfp.dsl import for_loop
from kfp.dsl import pipeline_context
from kfp.dsl import tasks_group
class ParallelForTest(parameterized.TestCase):
class ParallelForTest(unittest.TestCase):
def test_basic(self):
loop_items = ['pizza', 'hotdog', 'pasta']
@ -58,3 +60,21 @@ class ParallelForTest(parameterized.TestCase):
'ParallelFor parallelism must be >= 0.'):
with pipeline_context.Pipeline('pipeline') as p:
tasks_group.ParallelFor(items=loop_items, parallelism=-1)
class TestConditionDeprecated(unittest.TestCase):
def test(self):
@dsl.component
def foo() -> str:
return 'foo'
@dsl.pipeline
def my_pipeline(string: str):
with self.assertWarnsRegex(
DeprecationWarning,
'dsl\.Condition is deprecated\. Please use dsl\.If instead\.'
):
with dsl.Condition(string == 'text'):
foo()

View File

@ -0,0 +1,51 @@
# Copyright 2023 The Kubeflow Authors
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from kfp import compiler
from kfp import dsl
@dsl.component
def flip_three_sided_die() -> str:
import random
val = random.randint(0, 2)
if val == 0:
return 'heads'
elif val == 1:
return 'tails'
else:
return 'draw'
@dsl.component
def print_and_return(text: str) -> str:
print(text)
return text
@dsl.pipeline
def roll_die_pipeline():
flip_coin_task = flip_three_sided_die()
with dsl.If(flip_coin_task.output == 'heads'):
print_and_return(text='Got heads!')
with dsl.Elif(flip_coin_task.output == 'tails'):
print_and_return(text='Got tails!')
with dsl.Else():
print_and_return(text='Draw!')
if __name__ == '__main__':
compiler.Compiler().compile(
pipeline_func=roll_die_pipeline,
package_path=__file__.replace('.py', '.yaml'))

View File

@ -0,0 +1,280 @@
# PIPELINE DEFINITION
# Name: roll-die-pipeline
components:
comp-condition-1:
dag:
tasks:
print-and-return:
cachingOptions:
enableCache: true
componentRef:
name: comp-print-and-return
inputs:
parameters:
text:
runtimeValue:
constant: Got heads!
taskInfo:
name: print-and-return
inputDefinitions:
parameters:
pipelinechannel--flip-three-sided-die-Output:
parameterType: STRING
comp-condition-2:
dag:
tasks:
print-and-return-2:
cachingOptions:
enableCache: true
componentRef:
name: comp-print-and-return-2
inputs:
parameters:
text:
runtimeValue:
constant: Got tails!
taskInfo:
name: print-and-return-2
inputDefinitions:
parameters:
pipelinechannel--flip-three-sided-die-Output:
parameterType: STRING
comp-condition-3:
dag:
tasks:
print-and-return-3:
cachingOptions:
enableCache: true
componentRef:
name: comp-print-and-return-3
inputs:
parameters:
text:
runtimeValue:
constant: Draw!
taskInfo:
name: print-and-return-3
inputDefinitions:
parameters:
pipelinechannel--flip-three-sided-die-Output:
parameterType: STRING
comp-flip-three-sided-die:
executorLabel: exec-flip-three-sided-die
outputDefinitions:
parameters:
Output:
parameterType: STRING
comp-print-and-return:
executorLabel: exec-print-and-return
inputDefinitions:
parameters:
text:
parameterType: STRING
outputDefinitions:
parameters:
Output:
parameterType: STRING
comp-print-and-return-2:
executorLabel: exec-print-and-return-2
inputDefinitions:
parameters:
text:
parameterType: STRING
outputDefinitions:
parameters:
Output:
parameterType: STRING
comp-print-and-return-3:
executorLabel: exec-print-and-return-3
inputDefinitions:
parameters:
text:
parameterType: STRING
outputDefinitions:
parameters:
Output:
parameterType: STRING
deploymentSpec:
executors:
exec-flip-three-sided-die:
container:
args:
- --executor_input
- '{{$}}'
- --function_to_execute
- flip_three_sided_die
command:
- sh
- -c
- "\nif ! [ -x \"$(command -v pip)\" ]; then\n python3 -m ensurepip ||\
\ python3 -m ensurepip --user || apt-get install python3-pip\nfi\n\nPIP_DISABLE_PIP_VERSION_CHECK=1\
\ python3 -m pip install --quiet --no-warn-script-location 'kfp==2.1.3'\
\ '--no-deps' 'typing-extensions>=3.7.4,<5; python_version<\"3.9\"' && \"\
$0\" \"$@\"\n"
- sh
- -ec
- 'program_path=$(mktemp -d)
printf "%s" "$0" > "$program_path/ephemeral_component.py"
_KFP_RUNTIME=true python3 -m kfp.dsl.executor_main --component_module_path "$program_path/ephemeral_component.py" "$@"
'
- "\nimport kfp\nfrom kfp import dsl\nfrom kfp.dsl import *\nfrom typing import\
\ *\n\ndef flip_three_sided_die() -> str:\n import random\n val =\
\ random.randint(0, 2)\n\n if val == 0:\n return 'heads'\n \
\ elif val == 1:\n return 'tails'\n else:\n return 'draw'\n\
\n"
image: python:3.7
exec-print-and-return:
container:
args:
- --executor_input
- '{{$}}'
- --function_to_execute
- print_and_return
command:
- sh
- -c
- "\nif ! [ -x \"$(command -v pip)\" ]; then\n python3 -m ensurepip ||\
\ python3 -m ensurepip --user || apt-get install python3-pip\nfi\n\nPIP_DISABLE_PIP_VERSION_CHECK=1\
\ python3 -m pip install --quiet --no-warn-script-location 'kfp==2.1.3'\
\ '--no-deps' 'typing-extensions>=3.7.4,<5; python_version<\"3.9\"' && \"\
$0\" \"$@\"\n"
- sh
- -ec
- 'program_path=$(mktemp -d)
printf "%s" "$0" > "$program_path/ephemeral_component.py"
_KFP_RUNTIME=true python3 -m kfp.dsl.executor_main --component_module_path "$program_path/ephemeral_component.py" "$@"
'
- "\nimport kfp\nfrom kfp import dsl\nfrom kfp.dsl import *\nfrom typing import\
\ *\n\ndef print_and_return(text: str) -> str:\n print(text)\n return\
\ text\n\n"
image: python:3.7
exec-print-and-return-2:
container:
args:
- --executor_input
- '{{$}}'
- --function_to_execute
- print_and_return
command:
- sh
- -c
- "\nif ! [ -x \"$(command -v pip)\" ]; then\n python3 -m ensurepip ||\
\ python3 -m ensurepip --user || apt-get install python3-pip\nfi\n\nPIP_DISABLE_PIP_VERSION_CHECK=1\
\ python3 -m pip install --quiet --no-warn-script-location 'kfp==2.1.3'\
\ '--no-deps' 'typing-extensions>=3.7.4,<5; python_version<\"3.9\"' && \"\
$0\" \"$@\"\n"
- sh
- -ec
- 'program_path=$(mktemp -d)
printf "%s" "$0" > "$program_path/ephemeral_component.py"
_KFP_RUNTIME=true python3 -m kfp.dsl.executor_main --component_module_path "$program_path/ephemeral_component.py" "$@"
'
- "\nimport kfp\nfrom kfp import dsl\nfrom kfp.dsl import *\nfrom typing import\
\ *\n\ndef print_and_return(text: str) -> str:\n print(text)\n return\
\ text\n\n"
image: python:3.7
exec-print-and-return-3:
container:
args:
- --executor_input
- '{{$}}'
- --function_to_execute
- print_and_return
command:
- sh
- -c
- "\nif ! [ -x \"$(command -v pip)\" ]; then\n python3 -m ensurepip ||\
\ python3 -m ensurepip --user || apt-get install python3-pip\nfi\n\nPIP_DISABLE_PIP_VERSION_CHECK=1\
\ python3 -m pip install --quiet --no-warn-script-location 'kfp==2.1.3'\
\ '--no-deps' 'typing-extensions>=3.7.4,<5; python_version<\"3.9\"' && \"\
$0\" \"$@\"\n"
- sh
- -ec
- 'program_path=$(mktemp -d)
printf "%s" "$0" > "$program_path/ephemeral_component.py"
_KFP_RUNTIME=true python3 -m kfp.dsl.executor_main --component_module_path "$program_path/ephemeral_component.py" "$@"
'
- "\nimport kfp\nfrom kfp import dsl\nfrom kfp.dsl import *\nfrom typing import\
\ *\n\ndef print_and_return(text: str) -> str:\n print(text)\n return\
\ text\n\n"
image: python:3.7
pipelineInfo:
name: roll-die-pipeline
root:
dag:
tasks:
condition-1:
componentRef:
name: comp-condition-1
dependentTasks:
- flip-three-sided-die
inputs:
parameters:
pipelinechannel--flip-three-sided-die-Output:
taskOutputParameter:
outputParameterKey: Output
producerTask: flip-three-sided-die
taskInfo:
name: condition-1
triggerPolicy:
condition: inputs.parameter_values['pipelinechannel--flip-three-sided-die-Output']
== 'heads'
condition-2:
componentRef:
name: comp-condition-2
dependentTasks:
- flip-three-sided-die
inputs:
parameters:
pipelinechannel--flip-three-sided-die-Output:
taskOutputParameter:
outputParameterKey: Output
producerTask: flip-three-sided-die
taskInfo:
name: condition-2
triggerPolicy:
condition: '!(inputs.parameter_values[''pipelinechannel--flip-three-sided-die-Output'']
== ''heads'') && inputs.parameter_values[''pipelinechannel--flip-three-sided-die-Output'']
== ''tails'''
condition-3:
componentRef:
name: comp-condition-3
dependentTasks:
- flip-three-sided-die
inputs:
parameters:
pipelinechannel--flip-three-sided-die-Output:
taskOutputParameter:
outputParameterKey: Output
producerTask: flip-three-sided-die
taskInfo:
name: condition-3
triggerPolicy:
condition: '!(inputs.parameter_values[''pipelinechannel--flip-three-sided-die-Output'']
== ''heads'') && !(inputs.parameter_values[''pipelinechannel--flip-three-sided-die-Output'']
== ''tails'')'
flip-three-sided-die:
cachingOptions:
enableCache: true
componentRef:
name: comp-flip-three-sided-die
taskInfo:
name: flip-three-sided-die
schemaVersion: 2.1.0
sdkVersion: kfp-2.1.3

View File

@ -0,0 +1,86 @@
# Copyright 2023 The Kubeflow Authors
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from typing import List
from kfp import compiler
from kfp import dsl
@dsl.component
def int_0_to_9999() -> int:
import random
return random.randint(0, 9999)
@dsl.component
def is_even_or_odd(num: int) -> str:
return 'odd' if num % 2 else 'even'
@dsl.component
def print_and_return(text: str) -> str:
print(text)
return text
@dsl.component
def print_strings(strings: List[str]):
print(strings)
@dsl.pipeline
def lucky_number_pipeline(add_drumroll: bool = True,
repeat_if_lucky_number: bool = True,
trials: List[int] = [1, 2, 3]):
with dsl.ParallelFor(trials) as trial:
int_task = int_0_to_9999().set_caching_options(False)
with dsl.If(add_drumroll == True):
with dsl.If(trial == 3):
print_and_return(text='Adding drumroll on last trial!')
with dsl.If(int_task.output < 5000):
even_or_odd_task = is_even_or_odd(num=int_task.output)
with dsl.If(even_or_odd_task.output == 'even'):
print_and_return(text='Got a low even number!')
with dsl.Else():
print_and_return(text='Got a low odd number!')
with dsl.Elif(int_task.output > 5000):
even_or_odd_task = is_even_or_odd(num=int_task.output)
with dsl.If(even_or_odd_task.output == 'even'):
print_and_return(text='Got a high even number!')
with dsl.Else():
print_and_return(text='Got a high odd number!')
with dsl.Else():
print_and_return(
text='Announcing: Got the lucky number 5000! A one in 10,000 chance.'
)
with dsl.If(repeat_if_lucky_number == True):
with dsl.ParallelFor([1, 2]) as _:
print_and_return(
text='Announcing again: Got the lucky number 5000! A one in 10,000 chance.'
)
print_strings(strings=dsl.Collected(even_or_odd_task.output))
if __name__ == '__main__':
compiler.Compiler().compile(
pipeline_func=lucky_number_pipeline,
package_path=__file__.replace('.py', '.yaml'))

View File

@ -0,0 +1,910 @@
# PIPELINE DEFINITION
# Name: lucky-number-pipeline
# Inputs:
# add_drumroll: bool [Default: True]
# repeat_if_lucky_number: bool [Default: True]
# trials: list [Default: [1.0, 2.0, 3.0]]
components:
comp-condition-10:
dag:
tasks:
condition-11:
componentRef:
name: comp-condition-11
inputs:
parameters:
pipelinechannel--int-0-to-9999-Output:
componentInputParameter: pipelinechannel--int-0-to-9999-Output
pipelinechannel--repeat_if_lucky_number:
componentInputParameter: pipelinechannel--repeat_if_lucky_number
taskInfo:
name: condition-11
triggerPolicy:
condition: inputs.parameter_values['pipelinechannel--repeat_if_lucky_number']
== true
print-and-return-6:
cachingOptions:
enableCache: true
componentRef:
name: comp-print-and-return-6
inputs:
parameters:
text:
runtimeValue:
constant: 'Announcing: Got the lucky number 5000! A one in 10,000
chance.'
taskInfo:
name: print-and-return-6
inputDefinitions:
parameters:
pipelinechannel--int-0-to-9999-Output:
parameterType: NUMBER_INTEGER
pipelinechannel--repeat_if_lucky_number:
parameterType: BOOLEAN
comp-condition-11:
dag:
tasks:
for-loop-13:
componentRef:
name: comp-for-loop-13
inputs:
parameters:
pipelinechannel--int-0-to-9999-Output:
componentInputParameter: pipelinechannel--int-0-to-9999-Output
pipelinechannel--repeat_if_lucky_number:
componentInputParameter: pipelinechannel--repeat_if_lucky_number
parameterIterator:
itemInput: pipelinechannel--loop-item-param-12
items:
raw: '[1, 2]'
taskInfo:
name: for-loop-13
inputDefinitions:
parameters:
pipelinechannel--int-0-to-9999-Output:
parameterType: NUMBER_INTEGER
pipelinechannel--repeat_if_lucky_number:
parameterType: BOOLEAN
comp-condition-2:
dag:
tasks:
condition-3:
componentRef:
name: comp-condition-3
inputs:
parameters:
pipelinechannel--add_drumroll:
componentInputParameter: pipelinechannel--add_drumroll
pipelinechannel--trials-loop-item:
componentInputParameter: pipelinechannel--trials-loop-item
taskInfo:
name: condition-3
triggerPolicy:
condition: int(inputs.parameter_values['pipelinechannel--trials-loop-item'])
== 3
inputDefinitions:
parameters:
pipelinechannel--add_drumroll:
parameterType: BOOLEAN
pipelinechannel--trials-loop-item:
parameterType: NUMBER_INTEGER
comp-condition-3:
dag:
tasks:
print-and-return:
cachingOptions:
enableCache: true
componentRef:
name: comp-print-and-return
inputs:
parameters:
text:
runtimeValue:
constant: Adding drumroll on last trial!
taskInfo:
name: print-and-return
inputDefinitions:
parameters:
pipelinechannel--add_drumroll:
parameterType: BOOLEAN
pipelinechannel--trials-loop-item:
parameterType: NUMBER_INTEGER
comp-condition-4:
dag:
tasks:
condition-5:
componentRef:
name: comp-condition-5
dependentTasks:
- is-even-or-odd
inputs:
parameters:
pipelinechannel--int-0-to-9999-Output:
componentInputParameter: pipelinechannel--int-0-to-9999-Output
pipelinechannel--is-even-or-odd-Output:
taskOutputParameter:
outputParameterKey: Output
producerTask: is-even-or-odd
taskInfo:
name: condition-5
triggerPolicy:
condition: inputs.parameter_values['pipelinechannel--is-even-or-odd-Output']
== 'even'
condition-6:
componentRef:
name: comp-condition-6
dependentTasks:
- is-even-or-odd
inputs:
parameters:
pipelinechannel--int-0-to-9999-Output:
componentInputParameter: pipelinechannel--int-0-to-9999-Output
pipelinechannel--is-even-or-odd-Output:
taskOutputParameter:
outputParameterKey: Output
producerTask: is-even-or-odd
taskInfo:
name: condition-6
triggerPolicy:
condition: '!(inputs.parameter_values[''pipelinechannel--is-even-or-odd-Output'']
== ''even'')'
is-even-or-odd:
cachingOptions:
enableCache: true
componentRef:
name: comp-is-even-or-odd
inputs:
parameters:
num:
componentInputParameter: pipelinechannel--int-0-to-9999-Output
taskInfo:
name: is-even-or-odd
inputDefinitions:
parameters:
pipelinechannel--int-0-to-9999-Output:
parameterType: NUMBER_INTEGER
comp-condition-5:
dag:
tasks:
print-and-return-2:
cachingOptions:
enableCache: true
componentRef:
name: comp-print-and-return-2
inputs:
parameters:
text:
runtimeValue:
constant: Got a low even number!
taskInfo:
name: print-and-return-2
inputDefinitions:
parameters:
pipelinechannel--int-0-to-9999-Output:
parameterType: NUMBER_INTEGER
pipelinechannel--is-even-or-odd-Output:
parameterType: STRING
comp-condition-6:
dag:
tasks:
print-and-return-3:
cachingOptions:
enableCache: true
componentRef:
name: comp-print-and-return-3
inputs:
parameters:
text:
runtimeValue:
constant: Got a low odd number!
taskInfo:
name: print-and-return-3
inputDefinitions:
parameters:
pipelinechannel--int-0-to-9999-Output:
parameterType: NUMBER_INTEGER
pipelinechannel--is-even-or-odd-Output:
parameterType: STRING
comp-condition-7:
dag:
outputs:
parameters:
pipelinechannel--is-even-or-odd-2-Output:
valueFromParameter:
outputParameterKey: Output
producerSubtask: is-even-or-odd-2
tasks:
condition-8:
componentRef:
name: comp-condition-8
dependentTasks:
- is-even-or-odd-2
inputs:
parameters:
pipelinechannel--int-0-to-9999-Output:
componentInputParameter: pipelinechannel--int-0-to-9999-Output
pipelinechannel--is-even-or-odd-2-Output:
taskOutputParameter:
outputParameterKey: Output
producerTask: is-even-or-odd-2
taskInfo:
name: condition-8
triggerPolicy:
condition: inputs.parameter_values['pipelinechannel--is-even-or-odd-2-Output']
== 'even'
condition-9:
componentRef:
name: comp-condition-9
dependentTasks:
- is-even-or-odd-2
inputs:
parameters:
pipelinechannel--int-0-to-9999-Output:
componentInputParameter: pipelinechannel--int-0-to-9999-Output
pipelinechannel--is-even-or-odd-2-Output:
taskOutputParameter:
outputParameterKey: Output
producerTask: is-even-or-odd-2
taskInfo:
name: condition-9
triggerPolicy:
condition: '!(inputs.parameter_values[''pipelinechannel--is-even-or-odd-2-Output'']
== ''even'')'
is-even-or-odd-2:
cachingOptions:
enableCache: true
componentRef:
name: comp-is-even-or-odd-2
inputs:
parameters:
num:
componentInputParameter: pipelinechannel--int-0-to-9999-Output
taskInfo:
name: is-even-or-odd-2
inputDefinitions:
parameters:
pipelinechannel--int-0-to-9999-Output:
parameterType: NUMBER_INTEGER
outputDefinitions:
parameters:
pipelinechannel--is-even-or-odd-2-Output:
parameterType: NUMBER_INTEGER
comp-condition-8:
dag:
tasks:
print-and-return-4:
cachingOptions:
enableCache: true
componentRef:
name: comp-print-and-return-4
inputs:
parameters:
text:
runtimeValue:
constant: Got a high even number!
taskInfo:
name: print-and-return-4
inputDefinitions:
parameters:
pipelinechannel--int-0-to-9999-Output:
parameterType: NUMBER_INTEGER
pipelinechannel--is-even-or-odd-2-Output:
parameterType: STRING
comp-condition-9:
dag:
tasks:
print-and-return-5:
cachingOptions:
enableCache: true
componentRef:
name: comp-print-and-return-5
inputs:
parameters:
text:
runtimeValue:
constant: Got a high odd number!
taskInfo:
name: print-and-return-5
inputDefinitions:
parameters:
pipelinechannel--int-0-to-9999-Output:
parameterType: NUMBER_INTEGER
pipelinechannel--is-even-or-odd-2-Output:
parameterType: STRING
comp-for-loop-1:
dag:
outputs:
parameters:
pipelinechannel--is-even-or-odd-2-Output:
valueFromParameter:
outputParameterKey: pipelinechannel--is-even-or-odd-2-Output
producerSubtask: condition-7
tasks:
condition-10:
componentRef:
name: comp-condition-10
dependentTasks:
- int-0-to-9999
inputs:
parameters:
pipelinechannel--int-0-to-9999-Output:
taskOutputParameter:
outputParameterKey: Output
producerTask: int-0-to-9999
pipelinechannel--repeat_if_lucky_number:
componentInputParameter: pipelinechannel--repeat_if_lucky_number
taskInfo:
name: condition-10
triggerPolicy:
condition: '!(int(inputs.parameter_values[''pipelinechannel--int-0-to-9999-Output''])
< 5000) && !(int(inputs.parameter_values[''pipelinechannel--int-0-to-9999-Output''])
> 5000)'
condition-2:
componentRef:
name: comp-condition-2
inputs:
parameters:
pipelinechannel--add_drumroll:
componentInputParameter: pipelinechannel--add_drumroll
pipelinechannel--trials-loop-item:
componentInputParameter: pipelinechannel--trials-loop-item
taskInfo:
name: condition-2
triggerPolicy:
condition: inputs.parameter_values['pipelinechannel--add_drumroll'] ==
true
condition-4:
componentRef:
name: comp-condition-4
dependentTasks:
- int-0-to-9999
inputs:
parameters:
pipelinechannel--int-0-to-9999-Output:
taskOutputParameter:
outputParameterKey: Output
producerTask: int-0-to-9999
taskInfo:
name: condition-4
triggerPolicy:
condition: int(inputs.parameter_values['pipelinechannel--int-0-to-9999-Output'])
< 5000
condition-7:
componentRef:
name: comp-condition-7
dependentTasks:
- int-0-to-9999
inputs:
parameters:
pipelinechannel--int-0-to-9999-Output:
taskOutputParameter:
outputParameterKey: Output
producerTask: int-0-to-9999
taskInfo:
name: condition-7
triggerPolicy:
condition: '!(int(inputs.parameter_values[''pipelinechannel--int-0-to-9999-Output''])
< 5000) && int(inputs.parameter_values[''pipelinechannel--int-0-to-9999-Output''])
> 5000'
int-0-to-9999:
cachingOptions: {}
componentRef:
name: comp-int-0-to-9999
taskInfo:
name: int-0-to-9999
inputDefinitions:
parameters:
pipelinechannel--add_drumroll:
parameterType: BOOLEAN
pipelinechannel--repeat_if_lucky_number:
parameterType: BOOLEAN
pipelinechannel--trials:
parameterType: LIST
pipelinechannel--trials-loop-item:
parameterType: NUMBER_INTEGER
outputDefinitions:
parameters:
pipelinechannel--is-even-or-odd-2-Output:
parameterType: NUMBER_INTEGER
comp-for-loop-13:
dag:
tasks:
print-and-return-7:
cachingOptions:
enableCache: true
componentRef:
name: comp-print-and-return-7
inputs:
parameters:
text:
runtimeValue:
constant: 'Announcing again: Got the lucky number 5000! A one in
10,000 chance.'
taskInfo:
name: print-and-return-7
inputDefinitions:
parameters:
pipelinechannel--int-0-to-9999-Output:
parameterType: NUMBER_INTEGER
pipelinechannel--loop-item-param-12:
parameterType: NUMBER_INTEGER
pipelinechannel--repeat_if_lucky_number:
parameterType: BOOLEAN
comp-int-0-to-9999:
executorLabel: exec-int-0-to-9999
outputDefinitions:
parameters:
Output:
parameterType: NUMBER_INTEGER
comp-is-even-or-odd:
executorLabel: exec-is-even-or-odd
inputDefinitions:
parameters:
num:
parameterType: NUMBER_INTEGER
outputDefinitions:
parameters:
Output:
parameterType: STRING
comp-is-even-or-odd-2:
executorLabel: exec-is-even-or-odd-2
inputDefinitions:
parameters:
num:
parameterType: NUMBER_INTEGER
outputDefinitions:
parameters:
Output:
parameterType: STRING
comp-print-and-return:
executorLabel: exec-print-and-return
inputDefinitions:
parameters:
text:
parameterType: STRING
outputDefinitions:
parameters:
Output:
parameterType: STRING
comp-print-and-return-2:
executorLabel: exec-print-and-return-2
inputDefinitions:
parameters:
text:
parameterType: STRING
outputDefinitions:
parameters:
Output:
parameterType: STRING
comp-print-and-return-3:
executorLabel: exec-print-and-return-3
inputDefinitions:
parameters:
text:
parameterType: STRING
outputDefinitions:
parameters:
Output:
parameterType: STRING
comp-print-and-return-4:
executorLabel: exec-print-and-return-4
inputDefinitions:
parameters:
text:
parameterType: STRING
outputDefinitions:
parameters:
Output:
parameterType: STRING
comp-print-and-return-5:
executorLabel: exec-print-and-return-5
inputDefinitions:
parameters:
text:
parameterType: STRING
outputDefinitions:
parameters:
Output:
parameterType: STRING
comp-print-and-return-6:
executorLabel: exec-print-and-return-6
inputDefinitions:
parameters:
text:
parameterType: STRING
outputDefinitions:
parameters:
Output:
parameterType: STRING
comp-print-and-return-7:
executorLabel: exec-print-and-return-7
inputDefinitions:
parameters:
text:
parameterType: STRING
outputDefinitions:
parameters:
Output:
parameterType: STRING
comp-print-strings:
executorLabel: exec-print-strings
inputDefinitions:
parameters:
strings:
parameterType: LIST
deploymentSpec:
executors:
exec-int-0-to-9999:
container:
args:
- --executor_input
- '{{$}}'
- --function_to_execute
- int_0_to_9999
command:
- sh
- -c
- "\nif ! [ -x \"$(command -v pip)\" ]; then\n python3 -m ensurepip ||\
\ python3 -m ensurepip --user || apt-get install python3-pip\nfi\n\nPIP_DISABLE_PIP_VERSION_CHECK=1\
\ python3 -m pip install --quiet --no-warn-script-location 'kfp==2.1.3'\
\ '--no-deps' 'typing-extensions>=3.7.4,<5; python_version<\"3.9\"' && \"\
$0\" \"$@\"\n"
- sh
- -ec
- 'program_path=$(mktemp -d)
printf "%s" "$0" > "$program_path/ephemeral_component.py"
_KFP_RUNTIME=true python3 -m kfp.dsl.executor_main --component_module_path "$program_path/ephemeral_component.py" "$@"
'
- "\nimport kfp\nfrom kfp import dsl\nfrom kfp.dsl import *\nfrom typing import\
\ *\n\ndef int_0_to_9999() -> int:\n import random\n return random.randint(0,\
\ 9999)\n\n"
image: python:3.7
exec-is-even-or-odd:
container:
args:
- --executor_input
- '{{$}}'
- --function_to_execute
- is_even_or_odd
command:
- sh
- -c
- "\nif ! [ -x \"$(command -v pip)\" ]; then\n python3 -m ensurepip ||\
\ python3 -m ensurepip --user || apt-get install python3-pip\nfi\n\nPIP_DISABLE_PIP_VERSION_CHECK=1\
\ python3 -m pip install --quiet --no-warn-script-location 'kfp==2.1.3'\
\ '--no-deps' 'typing-extensions>=3.7.4,<5; python_version<\"3.9\"' && \"\
$0\" \"$@\"\n"
- sh
- -ec
- 'program_path=$(mktemp -d)
printf "%s" "$0" > "$program_path/ephemeral_component.py"
_KFP_RUNTIME=true python3 -m kfp.dsl.executor_main --component_module_path "$program_path/ephemeral_component.py" "$@"
'
- "\nimport kfp\nfrom kfp import dsl\nfrom kfp.dsl import *\nfrom typing import\
\ *\n\ndef is_even_or_odd(num: int) -> str:\n return 'odd' if num % 2\
\ else 'even'\n\n"
image: python:3.7
exec-is-even-or-odd-2:
container:
args:
- --executor_input
- '{{$}}'
- --function_to_execute
- is_even_or_odd
command:
- sh
- -c
- "\nif ! [ -x \"$(command -v pip)\" ]; then\n python3 -m ensurepip ||\
\ python3 -m ensurepip --user || apt-get install python3-pip\nfi\n\nPIP_DISABLE_PIP_VERSION_CHECK=1\
\ python3 -m pip install --quiet --no-warn-script-location 'kfp==2.1.3'\
\ '--no-deps' 'typing-extensions>=3.7.4,<5; python_version<\"3.9\"' && \"\
$0\" \"$@\"\n"
- sh
- -ec
- 'program_path=$(mktemp -d)
printf "%s" "$0" > "$program_path/ephemeral_component.py"
_KFP_RUNTIME=true python3 -m kfp.dsl.executor_main --component_module_path "$program_path/ephemeral_component.py" "$@"
'
- "\nimport kfp\nfrom kfp import dsl\nfrom kfp.dsl import *\nfrom typing import\
\ *\n\ndef is_even_or_odd(num: int) -> str:\n return 'odd' if num % 2\
\ else 'even'\n\n"
image: python:3.7
exec-print-and-return:
container:
args:
- --executor_input
- '{{$}}'
- --function_to_execute
- print_and_return
command:
- sh
- -c
- "\nif ! [ -x \"$(command -v pip)\" ]; then\n python3 -m ensurepip ||\
\ python3 -m ensurepip --user || apt-get install python3-pip\nfi\n\nPIP_DISABLE_PIP_VERSION_CHECK=1\
\ python3 -m pip install --quiet --no-warn-script-location 'kfp==2.1.3'\
\ '--no-deps' 'typing-extensions>=3.7.4,<5; python_version<\"3.9\"' && \"\
$0\" \"$@\"\n"
- sh
- -ec
- 'program_path=$(mktemp -d)
printf "%s" "$0" > "$program_path/ephemeral_component.py"
_KFP_RUNTIME=true python3 -m kfp.dsl.executor_main --component_module_path "$program_path/ephemeral_component.py" "$@"
'
- "\nimport kfp\nfrom kfp import dsl\nfrom kfp.dsl import *\nfrom typing import\
\ *\n\ndef print_and_return(text: str) -> str:\n print(text)\n return\
\ text\n\n"
image: python:3.7
exec-print-and-return-2:
container:
args:
- --executor_input
- '{{$}}'
- --function_to_execute
- print_and_return
command:
- sh
- -c
- "\nif ! [ -x \"$(command -v pip)\" ]; then\n python3 -m ensurepip ||\
\ python3 -m ensurepip --user || apt-get install python3-pip\nfi\n\nPIP_DISABLE_PIP_VERSION_CHECK=1\
\ python3 -m pip install --quiet --no-warn-script-location 'kfp==2.1.3'\
\ '--no-deps' 'typing-extensions>=3.7.4,<5; python_version<\"3.9\"' && \"\
$0\" \"$@\"\n"
- sh
- -ec
- 'program_path=$(mktemp -d)
printf "%s" "$0" > "$program_path/ephemeral_component.py"
_KFP_RUNTIME=true python3 -m kfp.dsl.executor_main --component_module_path "$program_path/ephemeral_component.py" "$@"
'
- "\nimport kfp\nfrom kfp import dsl\nfrom kfp.dsl import *\nfrom typing import\
\ *\n\ndef print_and_return(text: str) -> str:\n print(text)\n return\
\ text\n\n"
image: python:3.7
exec-print-and-return-3:
container:
args:
- --executor_input
- '{{$}}'
- --function_to_execute
- print_and_return
command:
- sh
- -c
- "\nif ! [ -x \"$(command -v pip)\" ]; then\n python3 -m ensurepip ||\
\ python3 -m ensurepip --user || apt-get install python3-pip\nfi\n\nPIP_DISABLE_PIP_VERSION_CHECK=1\
\ python3 -m pip install --quiet --no-warn-script-location 'kfp==2.1.3'\
\ '--no-deps' 'typing-extensions>=3.7.4,<5; python_version<\"3.9\"' && \"\
$0\" \"$@\"\n"
- sh
- -ec
- 'program_path=$(mktemp -d)
printf "%s" "$0" > "$program_path/ephemeral_component.py"
_KFP_RUNTIME=true python3 -m kfp.dsl.executor_main --component_module_path "$program_path/ephemeral_component.py" "$@"
'
- "\nimport kfp\nfrom kfp import dsl\nfrom kfp.dsl import *\nfrom typing import\
\ *\n\ndef print_and_return(text: str) -> str:\n print(text)\n return\
\ text\n\n"
image: python:3.7
exec-print-and-return-4:
container:
args:
- --executor_input
- '{{$}}'
- --function_to_execute
- print_and_return
command:
- sh
- -c
- "\nif ! [ -x \"$(command -v pip)\" ]; then\n python3 -m ensurepip ||\
\ python3 -m ensurepip --user || apt-get install python3-pip\nfi\n\nPIP_DISABLE_PIP_VERSION_CHECK=1\
\ python3 -m pip install --quiet --no-warn-script-location 'kfp==2.1.3'\
\ '--no-deps' 'typing-extensions>=3.7.4,<5; python_version<\"3.9\"' && \"\
$0\" \"$@\"\n"
- sh
- -ec
- 'program_path=$(mktemp -d)
printf "%s" "$0" > "$program_path/ephemeral_component.py"
_KFP_RUNTIME=true python3 -m kfp.dsl.executor_main --component_module_path "$program_path/ephemeral_component.py" "$@"
'
- "\nimport kfp\nfrom kfp import dsl\nfrom kfp.dsl import *\nfrom typing import\
\ *\n\ndef print_and_return(text: str) -> str:\n print(text)\n return\
\ text\n\n"
image: python:3.7
exec-print-and-return-5:
container:
args:
- --executor_input
- '{{$}}'
- --function_to_execute
- print_and_return
command:
- sh
- -c
- "\nif ! [ -x \"$(command -v pip)\" ]; then\n python3 -m ensurepip ||\
\ python3 -m ensurepip --user || apt-get install python3-pip\nfi\n\nPIP_DISABLE_PIP_VERSION_CHECK=1\
\ python3 -m pip install --quiet --no-warn-script-location 'kfp==2.1.3'\
\ '--no-deps' 'typing-extensions>=3.7.4,<5; python_version<\"3.9\"' && \"\
$0\" \"$@\"\n"
- sh
- -ec
- 'program_path=$(mktemp -d)
printf "%s" "$0" > "$program_path/ephemeral_component.py"
_KFP_RUNTIME=true python3 -m kfp.dsl.executor_main --component_module_path "$program_path/ephemeral_component.py" "$@"
'
- "\nimport kfp\nfrom kfp import dsl\nfrom kfp.dsl import *\nfrom typing import\
\ *\n\ndef print_and_return(text: str) -> str:\n print(text)\n return\
\ text\n\n"
image: python:3.7
exec-print-and-return-6:
container:
args:
- --executor_input
- '{{$}}'
- --function_to_execute
- print_and_return
command:
- sh
- -c
- "\nif ! [ -x \"$(command -v pip)\" ]; then\n python3 -m ensurepip ||\
\ python3 -m ensurepip --user || apt-get install python3-pip\nfi\n\nPIP_DISABLE_PIP_VERSION_CHECK=1\
\ python3 -m pip install --quiet --no-warn-script-location 'kfp==2.1.3'\
\ '--no-deps' 'typing-extensions>=3.7.4,<5; python_version<\"3.9\"' && \"\
$0\" \"$@\"\n"
- sh
- -ec
- 'program_path=$(mktemp -d)
printf "%s" "$0" > "$program_path/ephemeral_component.py"
_KFP_RUNTIME=true python3 -m kfp.dsl.executor_main --component_module_path "$program_path/ephemeral_component.py" "$@"
'
- "\nimport kfp\nfrom kfp import dsl\nfrom kfp.dsl import *\nfrom typing import\
\ *\n\ndef print_and_return(text: str) -> str:\n print(text)\n return\
\ text\n\n"
image: python:3.7
exec-print-and-return-7:
container:
args:
- --executor_input
- '{{$}}'
- --function_to_execute
- print_and_return
command:
- sh
- -c
- "\nif ! [ -x \"$(command -v pip)\" ]; then\n python3 -m ensurepip ||\
\ python3 -m ensurepip --user || apt-get install python3-pip\nfi\n\nPIP_DISABLE_PIP_VERSION_CHECK=1\
\ python3 -m pip install --quiet --no-warn-script-location 'kfp==2.1.3'\
\ '--no-deps' 'typing-extensions>=3.7.4,<5; python_version<\"3.9\"' && \"\
$0\" \"$@\"\n"
- sh
- -ec
- 'program_path=$(mktemp -d)
printf "%s" "$0" > "$program_path/ephemeral_component.py"
_KFP_RUNTIME=true python3 -m kfp.dsl.executor_main --component_module_path "$program_path/ephemeral_component.py" "$@"
'
- "\nimport kfp\nfrom kfp import dsl\nfrom kfp.dsl import *\nfrom typing import\
\ *\n\ndef print_and_return(text: str) -> str:\n print(text)\n return\
\ text\n\n"
image: python:3.7
exec-print-strings:
container:
args:
- --executor_input
- '{{$}}'
- --function_to_execute
- print_strings
command:
- sh
- -c
- "\nif ! [ -x \"$(command -v pip)\" ]; then\n python3 -m ensurepip ||\
\ python3 -m ensurepip --user || apt-get install python3-pip\nfi\n\nPIP_DISABLE_PIP_VERSION_CHECK=1\
\ python3 -m pip install --quiet --no-warn-script-location 'kfp==2.1.3'\
\ '--no-deps' 'typing-extensions>=3.7.4,<5; python_version<\"3.9\"' && \"\
$0\" \"$@\"\n"
- sh
- -ec
- 'program_path=$(mktemp -d)
printf "%s" "$0" > "$program_path/ephemeral_component.py"
_KFP_RUNTIME=true python3 -m kfp.dsl.executor_main --component_module_path "$program_path/ephemeral_component.py" "$@"
'
- "\nimport kfp\nfrom kfp import dsl\nfrom kfp.dsl import *\nfrom typing import\
\ *\n\ndef print_strings(strings: List[str]):\n print(strings)\n\n"
image: python:3.7
pipelineInfo:
name: lucky-number-pipeline
root:
dag:
tasks:
for-loop-1:
componentRef:
name: comp-for-loop-1
inputs:
parameters:
pipelinechannel--add_drumroll:
componentInputParameter: add_drumroll
pipelinechannel--repeat_if_lucky_number:
componentInputParameter: repeat_if_lucky_number
pipelinechannel--trials:
componentInputParameter: trials
parameterIterator:
itemInput: pipelinechannel--trials-loop-item
items:
inputParameter: pipelinechannel--trials
taskInfo:
name: for-loop-1
print-strings:
cachingOptions:
enableCache: true
componentRef:
name: comp-print-strings
dependentTasks:
- for-loop-1
inputs:
parameters:
strings:
taskOutputParameter:
outputParameterKey: pipelinechannel--is-even-or-odd-2-Output
producerTask: for-loop-1
taskInfo:
name: print-strings
inputDefinitions:
parameters:
add_drumroll:
defaultValue: true
isOptional: true
parameterType: BOOLEAN
repeat_if_lucky_number:
defaultValue: true
isOptional: true
parameterType: BOOLEAN
trials:
defaultValue:
- 1.0
- 2.0
- 3.0
isOptional: true
parameterType: LIST
schemaVersion: 2.1.0
sdkVersion: kfp-2.1.3

View File

@ -0,0 +1,42 @@
# Copyright 2023 The Kubeflow Authors
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from kfp import compiler
from kfp import dsl
@dsl.component
def flip_coin() -> str:
import random
return 'heads' if random.randint(0, 1) == 0 else 'tails'
@dsl.component
def print_and_return(text: str) -> str:
print(text)
return text
@dsl.pipeline
def flip_coin_pipeline():
flip_coin_task = flip_coin()
with dsl.If(flip_coin_task.output == 'heads'):
print_and_return(text='Got heads!')
with dsl.Else():
print_and_return(text='Got tails!')
if __name__ == '__main__':
compiler.Compiler().compile(
pipeline_func=flip_coin_pipeline,
package_path=__file__.replace('.py', '.yaml'))

View File

@ -0,0 +1,202 @@
# PIPELINE DEFINITION
# Name: flip-coin-pipeline
components:
comp-condition-1:
dag:
tasks:
print-and-return:
cachingOptions:
enableCache: true
componentRef:
name: comp-print-and-return
inputs:
parameters:
text:
runtimeValue:
constant: Got heads!
taskInfo:
name: print-and-return
inputDefinitions:
parameters:
pipelinechannel--flip-coin-Output:
parameterType: STRING
comp-condition-2:
dag:
tasks:
print-and-return-2:
cachingOptions:
enableCache: true
componentRef:
name: comp-print-and-return-2
inputs:
parameters:
text:
runtimeValue:
constant: Got tails!
taskInfo:
name: print-and-return-2
inputDefinitions:
parameters:
pipelinechannel--flip-coin-Output:
parameterType: STRING
comp-flip-coin:
executorLabel: exec-flip-coin
outputDefinitions:
parameters:
Output:
parameterType: STRING
comp-print-and-return:
executorLabel: exec-print-and-return
inputDefinitions:
parameters:
text:
parameterType: STRING
outputDefinitions:
parameters:
Output:
parameterType: STRING
comp-print-and-return-2:
executorLabel: exec-print-and-return-2
inputDefinitions:
parameters:
text:
parameterType: STRING
outputDefinitions:
parameters:
Output:
parameterType: STRING
deploymentSpec:
executors:
exec-flip-coin:
container:
args:
- --executor_input
- '{{$}}'
- --function_to_execute
- flip_coin
command:
- sh
- -c
- "\nif ! [ -x \"$(command -v pip)\" ]; then\n python3 -m ensurepip ||\
\ python3 -m ensurepip --user || apt-get install python3-pip\nfi\n\nPIP_DISABLE_PIP_VERSION_CHECK=1\
\ python3 -m pip install --quiet --no-warn-script-location 'kfp==2.1.2'\
\ '--no-deps' 'typing-extensions>=3.7.4,<5; python_version<\"3.9\"' && \"\
$0\" \"$@\"\n"
- sh
- -ec
- 'program_path=$(mktemp -d)
printf "%s" "$0" > "$program_path/ephemeral_component.py"
_KFP_RUNTIME=true python3 -m kfp.dsl.executor_main --component_module_path "$program_path/ephemeral_component.py" "$@"
'
- "\nimport kfp\nfrom kfp import dsl\nfrom kfp.dsl import *\nfrom typing import\
\ *\n\ndef flip_coin() -> str:\n import random\n return 'heads' if\
\ random.randint(0, 1) == 0 else 'tails'\n\n"
image: python:3.7
exec-print-and-return:
container:
args:
- --executor_input
- '{{$}}'
- --function_to_execute
- print_and_return
command:
- sh
- -c
- "\nif ! [ -x \"$(command -v pip)\" ]; then\n python3 -m ensurepip ||\
\ python3 -m ensurepip --user || apt-get install python3-pip\nfi\n\nPIP_DISABLE_PIP_VERSION_CHECK=1\
\ python3 -m pip install --quiet --no-warn-script-location 'kfp==2.1.2'\
\ '--no-deps' 'typing-extensions>=3.7.4,<5; python_version<\"3.9\"' && \"\
$0\" \"$@\"\n"
- sh
- -ec
- 'program_path=$(mktemp -d)
printf "%s" "$0" > "$program_path/ephemeral_component.py"
_KFP_RUNTIME=true python3 -m kfp.dsl.executor_main --component_module_path "$program_path/ephemeral_component.py" "$@"
'
- "\nimport kfp\nfrom kfp import dsl\nfrom kfp.dsl import *\nfrom typing import\
\ *\n\ndef print_and_return(text: str) -> str:\n print(text)\n return\
\ text\n\n"
image: python:3.7
exec-print-and-return-2:
container:
args:
- --executor_input
- '{{$}}'
- --function_to_execute
- print_and_return
command:
- sh
- -c
- "\nif ! [ -x \"$(command -v pip)\" ]; then\n python3 -m ensurepip ||\
\ python3 -m ensurepip --user || apt-get install python3-pip\nfi\n\nPIP_DISABLE_PIP_VERSION_CHECK=1\
\ python3 -m pip install --quiet --no-warn-script-location 'kfp==2.1.2'\
\ '--no-deps' 'typing-extensions>=3.7.4,<5; python_version<\"3.9\"' && \"\
$0\" \"$@\"\n"
- sh
- -ec
- 'program_path=$(mktemp -d)
printf "%s" "$0" > "$program_path/ephemeral_component.py"
_KFP_RUNTIME=true python3 -m kfp.dsl.executor_main --component_module_path "$program_path/ephemeral_component.py" "$@"
'
- "\nimport kfp\nfrom kfp import dsl\nfrom kfp.dsl import *\nfrom typing import\
\ *\n\ndef print_and_return(text: str) -> str:\n print(text)\n return\
\ text\n\n"
image: python:3.7
pipelineInfo:
name: flip-coin-pipeline
root:
dag:
tasks:
condition-1:
componentRef:
name: comp-condition-1
dependentTasks:
- flip-coin
inputs:
parameters:
pipelinechannel--flip-coin-Output:
taskOutputParameter:
outputParameterKey: Output
producerTask: flip-coin
taskInfo:
name: condition-1
triggerPolicy:
condition: inputs.parameter_values['pipelinechannel--flip-coin-Output']
== 'heads'
condition-2:
componentRef:
name: comp-condition-2
dependentTasks:
- flip-coin
inputs:
parameters:
pipelinechannel--flip-coin-Output:
taskOutputParameter:
outputParameterKey: Output
producerTask: flip-coin
taskInfo:
name: condition-2
triggerPolicy:
condition: '!(inputs.parameter_values[''pipelinechannel--flip-coin-Output'']
== ''heads'')'
flip-coin:
cachingOptions:
enableCache: true
componentRef:
name: comp-flip-coin
taskInfo:
name: flip-coin
schemaVersion: 2.1.0
sdkVersion: kfp-2.1.2

View File

@ -168,6 +168,15 @@ pipelines:
- module: pipeline_with_metadata_fields
name: dataset_concatenator
execute: false
- module: if_else
name: flip_coin_pipeline
execute: false
- module: if_elif_else
name: roll_die_pipeline
execute: false
- module: if_elif_else_complex
name: lucky_number_pipeline
execute: false
components:
test_data_dir: sdk/python/test_data/components
read: true