# Copyright 2021 The Kubeflow Authors # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. """Definition for TasksGroup.""" import copy import enum from typing import List, Optional, Union import warnings from kfp.dsl import for_loop from kfp.dsl import pipeline_channel from kfp.dsl import pipeline_context from kfp.dsl import pipeline_task class TasksGroupType(str, enum.Enum): """Types of TasksGroup.""" PIPELINE = 'pipeline' CONDITION = 'condition' CONDITION_BRANCHES = 'condition-branches' FOR_LOOP = 'for-loop' EXIT_HANDLER = 'exit-handler' class TasksGroup: """Represents a logical group of tasks and groups of TasksGroups. This class is the base class for groups of tasks, such as tasks sharing an exit handler, a condition branch, or a loop. This class is not supposed to be used by pipeline authors. It is useful for implementing a compiler. Attributes: group_type: The type of the TasksGroup. tasks: A list of all PipelineTasks in this group. groups: A list of TasksGroups in this group. display_name: The optional user given name of the group. dependencies: A list of tasks or groups this group depends on. is_root: If TasksGroup is root group. """ def __init__( self, group_type: TasksGroupType, name: Optional[str] = None, is_root: bool = False, ) -> None: """Create a new instance of TasksGroup. Args: group_type: The type of the group. name: The name of the group. Used as display name in UI. """ self.group_type = group_type self.tasks = [] self.groups = [] self.display_name = name self.dependencies = [] self.is_root = is_root # backref to parent, set when the pipeline is called in pipeline_context self.parent_task_group: Optional[TasksGroup] = None def __enter__(self): if not pipeline_context.Pipeline.get_default_pipeline(): raise ValueError('Default pipeline not defined.') self._make_name_unique() pipeline_context.Pipeline.get_default_pipeline().push_tasks_group(self) return self def __exit__(self, *unused_args): pipeline_context.Pipeline.get_default_pipeline().pop_tasks_group() def _make_name_unique(self): """Generates a unique TasksGroup name in the pipeline.""" if not pipeline_context.Pipeline.get_default_pipeline(): raise ValueError('Default pipeline not defined.') group_id = pipeline_context.Pipeline.get_default_pipeline( ).get_next_group_id() self.name = f'{self.group_type.value}-{group_id}' self.name = self.name.replace('_', '-') def remove_task_recursive(self, task: pipeline_task.PipelineTask): """Removes a task from the group recursively.""" if self.tasks and task in self.tasks: self.tasks.remove(task) for group in self.groups or []: group.remove_task_recursive(task) class ExitHandler(TasksGroup): """A class for setting an exit handler task that is invoked upon exiting a group of other tasks. Args: exit_task: The task that is invoked after exiting a group of other tasks. name: The name of the exit handler group. Example: :: exit_task = ExitComponent(...) with ExitHandler(exit_task): task1 = my_component1(...) task2 = my_component2(...) """ def __init__( self, exit_task: pipeline_task.PipelineTask, name: Optional[str] = None, ) -> None: """Initializes a Condition task group.""" super().__init__( group_type=TasksGroupType.EXIT_HANDLER, name=name, is_root=False, ) if exit_task.dependent_tasks: raise ValueError('exit_task cannot depend on any other tasks.') # Removing exit_task form any group pipeline_context.Pipeline.get_default_pipeline( ).remove_task_from_groups(exit_task) # Set is_exit_handler since the compiler might be using this attribute. exit_task.is_exit_handler = True self.exit_task = exit_task class ConditionBranches(TasksGroup): _oneof_id = 0 def __init__(self) -> None: super().__init__( group_type=TasksGroupType.CONDITION_BRANCHES, name=None, is_root=False, ) def get_oneof_id(self) -> int: """Incrementor for uniquely identifying a OneOf for the parent ConditionBranches group. This is analogous to incrementing a unique identifier for tasks groups belonging to a pipeline. """ self._oneof_id += 1 return self._oneof_id class _ConditionBase(TasksGroup): """Parent class for condition control flow context managers (Condition, If, Elif, Else). Args: condition: A list of binary operations to be combined via conjunction. name: The name of the condition group. """ def __init__( self, conditions: List[pipeline_channel.ConditionOperation], name: Optional[str] = None, ) -> None: super().__init__( group_type=TasksGroupType.CONDITION, name=name, is_root=False, ) self.conditions: List[pipeline_channel.ConditionOperation] = conditions class If(_ConditionBase): """A class for creating a conditional control flow "if" block within a pipeline. Args: condition: A comparative expression that evaluates to True or False. At least one of the operands must be an output from an upstream task or a pipeline parameter. name: The name of the condition group. Example: :: task1 = my_component1(...) with dsl.If(task1.output=='pizza', 'pizza-condition'): task2 = my_component2(...) """ def __init__( self, condition, name: Optional[str] = None, ) -> None: super().__init__( conditions=[condition], name=name, ) if isinstance(condition, bool): raise ValueError( f'Got constant boolean {condition} as a condition. This is likely because the provided condition evaluated immediately. At least one of the operands must be an output from an upstream task or a pipeline parameter.' ) copied_condition = copy.copy(condition) copied_condition.negate = True self._negated_upstream_conditions = [copied_condition] class Condition(If): """Deprecated. Use dsl.If instead. """ def __enter__(self): super().__enter__() warnings.warn( 'dsl.Condition is deprecated. Please use dsl.If instead.', category=DeprecationWarning, stacklevel=2) return self class Elif(_ConditionBase): """A class for creating a conditional control flow "else if" block within a pipeline. Can be used following an upstream dsl.If or dsl.Elif. Args: condition: A comparative expression that evaluates to True or False. At least one of the operands must be an output from an upstream task or a pipeline parameter. name: The name of the condition group. Example: :: task1 = my_component1(...) task2 = my_component2(...) with dsl.If(task1.output=='pizza', 'pizza-condition'): task3 = my_component3(...) with dsl.Elif(task2.output=='pasta', 'pasta-condition'): task4 = my_component4(...) """ def __init__( self, condition, name: Optional[str] = None, ) -> None: prev_cond = pipeline_context.Pipeline.get_default_pipeline( ).get_last_tasks_group() if not isinstance(prev_cond, (Condition, If, Elif)): # prefer pushing toward dsl.If rather than dsl.Condition for syntactic consistency with the if-elif-else keywords in Python raise InvalidControlFlowException( 'dsl.Elif can only be used following an upstream dsl.If or dsl.Elif.' ) if isinstance(condition, bool): raise ValueError( f'Got constant boolean {condition} as a condition. This is likely because the provided condition evaluated immediately. At least one of the operands must be an output from an upstream task or a pipeline parameter.' ) copied_condition = copy.copy(condition) copied_condition.negate = True self._negated_upstream_conditions = _shallow_copy_list_of_binary_operations( prev_cond._negated_upstream_conditions) + [copied_condition] conditions = _shallow_copy_list_of_binary_operations( prev_cond._negated_upstream_conditions) conditions.append(condition) super().__init__( conditions=conditions, name=name, ) def __enter__(self): if not pipeline_context.Pipeline.get_default_pipeline(): raise ValueError('Default pipeline not defined.') pipeline = pipeline_context.Pipeline.get_default_pipeline() maybe_make_and_insert_conditional_branches_group(pipeline) self._make_name_unique() pipeline.push_tasks_group(self) return self class Else(_ConditionBase): """A class for creating a conditional control flow "else" block within a pipeline. Can be used following an upstream dsl.If or dsl.Elif. Args: name: The name of the condition group. Example: :: task1 = my_component1(...) task2 = my_component2(...) with dsl.If(task1.output=='pizza', 'pizza-condition'): task3 = my_component3(...) with dsl.Elif(task2.output=='pasta', 'pasta-condition'): task4 = my_component4(...) with dsl.Else(): my_component5(...) """ def __init__( self, name: Optional[str] = None, ) -> None: prev_cond = pipeline_context.Pipeline.get_default_pipeline( ).get_last_tasks_group() # if it immediately follows as TasksGroup, this is because it immediately # follows Else in the user code and we wrap Else in a TasksGroup if isinstance(prev_cond, ConditionBranches): # prefer pushing toward dsl.If rather than dsl.Condition for syntactic consistency with the if-elif-else keywords in Python raise InvalidControlFlowException( 'Cannot use dsl.Else following another dsl.Else. dsl.Else can only be used following an upstream dsl.If or dsl.Elif.' ) if not isinstance(prev_cond, (Condition, If, Elif)): # prefer pushing toward dsl.If rather than dsl.Condition for syntactic consistency with the if-elif-else keywords in Python raise InvalidControlFlowException( 'dsl.Else can only be used following an upstream dsl.If or dsl.Elif.' ) super().__init__( conditions=prev_cond._negated_upstream_conditions, name=name, ) def __enter__(self): if not pipeline_context.Pipeline.get_default_pipeline(): raise ValueError('Default pipeline not defined.') pipeline = pipeline_context.Pipeline.get_default_pipeline() maybe_make_and_insert_conditional_branches_group(pipeline) self._make_name_unique() pipeline.push_tasks_group(self) return self def __exit__(self, *unused_args): pipeline = pipeline_context.Pipeline.get_default_pipeline() pipeline.pop_tasks_group() # since this is an else, also pop off the parent dag for conditional branches # this parent TasksGroup is not a context manager, so we simulate its # __exit__ call with this pipeline.pop_tasks_group() def maybe_make_and_insert_conditional_branches_group( pipeline: 'pipeline_context.Pipeline') -> None: already_has_pipeline_wrapper = isinstance( pipeline.get_last_tasks_group(), Elif, ) if already_has_pipeline_wrapper: return condition_wrapper_group = ConditionBranches() condition_wrapper_group._make_name_unique() # swap outer and inner group ids so that numbering stays sequentially consistent with how such hypothetical code would be authored def swap_group_ids(parent: TasksGroup, cond: TasksGroup): parent_name, parent_id = parent.name.rsplit('-', 1) cond_name, cond_id = cond.name.split('-') cond.name = f'{cond_name}-{parent_id}' parent.name = f'{parent_name}-{cond_id}' # replace last pushed group (If or Elif) with condition group last_pushed_group = pipeline.groups[-1].groups.pop() swap_group_ids(condition_wrapper_group, last_pushed_group) pipeline.push_tasks_group(condition_wrapper_group) # then repush (__enter__) and pop (__exit__) the last pushed group # before the wrapper to emulate re-entering and exiting its context pipeline.push_tasks_group(last_pushed_group) pipeline.pop_tasks_group() class InvalidControlFlowException(Exception): pass def _shallow_copy_list_of_binary_operations( operations: List[pipeline_channel.ConditionOperation] ) -> List[pipeline_channel.ConditionOperation]: # shallow copy is sufficient to allow us to invert the negate flag of a ConditionOperation without affecting copies. deep copy not needed and would result in many copies of the full pipeline since PipelineChannels hold references to the pipeline. return [copy.copy(operation) for operation in operations] class ParallelFor(TasksGroup): """A class for creating parallelized for loop control flow over a static set of items within a pipeline definition. Args: items: The items to loop over. It can be either a constant Python list or a list output from an upstream task. name: The name of the for loop group. parallelism: The maximum number of concurrent iterations that can be scheduled for execution. A value of 0 represents unconstrained parallelism (default is unconstrained). Example: :: with dsl.ParallelFor( items=[{'a': 1, 'b': 10}, {'a': 2, 'b': 20}], parallelism=1 ) as item: task1 = my_component(..., number=item.a) task2 = my_component(..., number=item.b) In the example, the group of tasks containing ``task1`` and ``task2`` would be executed twice, once with case ``args=[{'a': 1, 'b': 10}]`` and once with case ``args=[{'a': 2, 'b': 20}]``. The ``parallelism=1`` setting causes only 1 execution to be scheduled at a time. """ def __init__( self, items: Union[for_loop.ItemList, pipeline_channel.PipelineChannel], name: Optional[str] = None, parallelism: Optional[int] = None, ) -> None: """Initializes a for loop task group.""" parallelism = parallelism or 0 if parallelism < 0: raise ValueError( f'ParallelFor parallelism must be >= 0. Got: {parallelism}.') super().__init__( group_type=TasksGroupType.FOR_LOOP, name=name, is_root=False, ) if isinstance(items, pipeline_channel.PipelineParameterChannel): self.loop_argument = for_loop.LoopParameterArgument.from_pipeline_channel( items) self.items_is_pipeline_channel = True elif isinstance(items, pipeline_channel.PipelineArtifactChannel): self.loop_argument = for_loop.LoopArtifactArgument.from_pipeline_channel( items) self.items_is_pipeline_channel = True else: self.loop_argument = for_loop.LoopParameterArgument.from_raw_items( raw_items=items, name_code=pipeline_context.Pipeline.get_default_pipeline() .get_next_group_id(), ) self.items_is_pipeline_channel = False # TODO: support artifact constants here. self.parallelism_limit = parallelism def __enter__( self ) -> Union[for_loop.LoopParameterArgument, for_loop.LoopArtifactArgument]: super().__enter__() return self.loop_argument