feat(sdk): Support using pipeline in exit handlers (#8220)

* Support using pipeline in exit handlers

* release note

* remove dead code

* fix component spec merging flakiness

* address comments in PR#8209

* address review comments

* fix bad merge in release note
This commit is contained in:
Chen Sun 2022-08-31 18:45:54 -07:00 committed by GitHub
parent 5d0687122a
commit d88358201d
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
9 changed files with 431 additions and 87 deletions

View File

@ -4,7 +4,7 @@
* Support parallelism setting in ParallelFor [\#8146](https://github.com/kubeflow/pipelines/pull/8146)
* Support for Python v3.10 [\#8186](https://github.com/kubeflow/pipelines/pull/8186)
* Support pipeline as a component [\#8179](https://github.com/kubeflow/pipelines/pull/8179), [\#8204](https://github.com/kubeflow/pipelines/pull/8204), [\#8209](https://github.com/kubeflow/pipelines/pull/8209)
* Extend upper bound for Kubernetes to <24 in KFP SDK [\#8173](https://github.com/kubeflow/pipelines/pull/8173)
* Support using pipeline in exit handlers [\#8220](https://github.com/kubeflow/pipelines/pull/8220)
## Breaking Changes
@ -18,6 +18,8 @@ Technically no breaking changes but compilation error could be exposed in a diff
## Bug Fixes and Other Changes
* Extend upper bound for Kubernetes to <24 in KFP SDK [\#8173](https://github.com/kubeflow/pipelines/pull/8173)
## Documentation Updates
# 2.0.0-beta.3

View File

@ -51,6 +51,7 @@ CONFIG = {
'pipeline_in_pipeline_complex',
'pipeline_with_outputs',
'pipeline_in_pipeline_loaded_from_yaml',
'pipeline_as_exit_task',
],
'test_data_dir': 'sdk/python/kfp/compiler/test_data/pipelines',
'config': {

View File

@ -1131,9 +1131,6 @@ def build_spec_by_group(
]
is_parent_component_root = (group_component_spec == pipeline_spec.root)
# Track if component spec is addeded from merging pipeline spec.
component_spec_added = False
if isinstance(subgroup, pipeline_task.PipelineTask):
subgroup_task_spec = build_task_spec_for_task(
@ -1165,13 +1162,12 @@ def build_spec_by_group(
deployment_config.executors[executor_label].importer.CopyFrom(
subgroup_importer_spec)
elif subgroup.pipeline_spec is not None:
merge_deployment_spec_and_component_spec(
sub_pipeline_spec = merge_deployment_spec_and_component_spec(
main_pipeline_spec=pipeline_spec,
main_deployment_config=deployment_config,
sub_pipeline_spec=subgroup.pipeline_spec,
sub_pipeline_component_name=subgroup_component_name,
)
component_spec_added = True
subgroup_component_spec = sub_pipeline_spec.root
else:
raise RuntimeError
elif isinstance(subgroup, tasks_group.ParallelFor):
@ -1270,16 +1266,15 @@ def build_spec_by_group(
subgroup_task_spec.dependent_tasks.extend(
[utils.sanitize_task_name(dep) for dep in group_dependencies])
# Add component spec if not already added from merging pipeline spec.
if not component_spec_added:
subgroup_component_name = utils.make_name_unique_by_adding_index(
name=subgroup_component_name,
collection=list(pipeline_spec.components.keys()),
delimiter='-')
# Add component spec
subgroup_component_name = utils.make_name_unique_by_adding_index(
name=subgroup_component_name,
collection=list(pipeline_spec.components.keys()),
delimiter='-')
subgroup_task_spec.component_ref.name = subgroup_component_name
pipeline_spec.components[subgroup_component_name].CopyFrom(
subgroup_component_spec)
subgroup_task_spec.component_ref.name = subgroup_component_name
pipeline_spec.components[subgroup_component_name].CopyFrom(
subgroup_component_spec)
# Add task spec
group_component_spec.dag.tasks[subgroup.name].CopyFrom(
@ -1306,6 +1301,13 @@ def build_exit_handler_groups_recursively(
return
for group in parent_group.groups:
if isinstance(group, tasks_group.ExitHandler):
# remove this if block to support nested exit handlers
if not parent_group.is_root:
raise ValueError(
f'{tasks_group.ExitHandler.__name__} can only be used within the outermost scope of a pipeline function definition. Using an {tasks_group.ExitHandler.__name__} within {group_type_to_dsl_class[parent_group.group_type].__name__} {parent_group.name} is not allowed.'
)
exit_task = group.exit_task
exit_task_name = utils.sanitize_task_name(exit_task.name)
exit_handler_group_task_name = utils.sanitize_task_name(group.name)
@ -1319,33 +1321,45 @@ def build_exit_handler_groups_recursively(
exit_task_component_spec = build_component_spec_for_exit_task(
task=exit_task)
exit_task_container_spec = build_container_spec_for_task(
task=exit_task)
# remove this if block to support nested exit handlers
if not parent_group.is_root:
raise ValueError(
f'{tasks_group.ExitHandler.__name__} can only be used within the outermost scope of a pipeline function definition. Using an {tasks_group.ExitHandler.__name__} within {group_type_to_dsl_class[parent_group.group_type].__name__} {parent_group.name} is not allowed.'
)
parent_dag = pipeline_spec.root.dag if parent_group.is_root else pipeline_spec.components[
utils.sanitize_component_name(parent_group.name)].dag
parent_dag.tasks[exit_task_name].CopyFrom(exit_task_task_spec)
# Add exit task component spec if it does not exist.
component_name = exit_task_task_spec.component_ref.name
if component_name not in pipeline_spec.components:
pipeline_spec.components[component_name].CopyFrom(
exit_task_component_spec)
# Add exit task container spec if it does not exist.
executor_label = exit_task_component_spec.executor_label
if executor_label not in deployment_config.executors:
# Add exit task container spec if applicable.
if exit_task.container_spec is not None:
exit_task_container_spec = build_container_spec_for_task(
task=exit_task)
executor_label = utils.make_name_unique_by_adding_index(
name=exit_task_component_spec.executor_label,
collection=list(deployment_config.executors.keys()),
delimiter='-')
exit_task_component_spec.executor_label = executor_label
deployment_config.executors[executor_label].container.CopyFrom(
exit_task_container_spec)
pipeline_spec.deployment_spec.update(
json_format.MessageToDict(deployment_config))
elif exit_task.pipeline_spec is not None:
exit_task_pipeline_spec = merge_deployment_spec_and_component_spec(
main_pipeline_spec=pipeline_spec,
main_deployment_config=deployment_config,
sub_pipeline_spec=exit_task.pipeline_spec,
)
exit_task_component_spec = exit_task_pipeline_spec.root
else:
raise RuntimeError(
f'Exit task {exit_task_name} is missing both container spec and pipeline spec.'
)
# Add exit task component spec.
component_name = utils.make_name_unique_by_adding_index(
name=exit_task_task_spec.component_ref.name,
collection=list(pipeline_spec.components.keys()),
delimiter='-')
exit_task_task_spec.component_ref.name = component_name
pipeline_spec.components[component_name].CopyFrom(
exit_task_component_spec)
# Add exit task task spec.
parent_dag = pipeline_spec.root.dag
parent_dag.tasks[exit_task_name].CopyFrom(exit_task_task_spec)
pipeline_spec.deployment_spec.update(
json_format.MessageToDict(deployment_config))
build_exit_handler_groups_recursively(
parent_group=group,
pipeline_spec=pipeline_spec,
@ -1356,8 +1370,7 @@ def merge_deployment_spec_and_component_spec(
main_pipeline_spec: pipeline_spec_pb2.PipelineSpec,
main_deployment_config: pipeline_spec_pb2.PipelineDeploymentConfig,
sub_pipeline_spec: pipeline_spec_pb2.PipelineSpec,
sub_pipeline_component_name: str,
) -> None:
) -> pipeline_spec_pb2.PipelineSpec:
"""Merges deployment spec and component spec from a sub pipeline spec into
the main spec.
@ -1371,8 +1384,9 @@ def merge_deployment_spec_and_component_spec(
sub_pipeline_spec: The pipeline spec of an inner pipeline whose
deployment specs and component specs need to be copied into the main
specs.
sub_pipeline_component_name: The name of sub pipeline's root component
spec.
Returns:
The possibly modified copy of pipeline spec.
"""
# Make a copy of the sub_pipeline_spec so that the "template" remains
# unchanged and works even the pipeline is reused multiple times.
@ -1385,8 +1399,8 @@ def merge_deployment_spec_and_component_spec(
_merge_component_spec(
main_pipeline_spec=main_pipeline_spec,
sub_pipeline_spec=sub_pipeline_spec_copy,
sub_pipeline_component_name=sub_pipeline_component_name,
)
return sub_pipeline_spec_copy
def _merge_deployment_spec(
@ -1422,12 +1436,12 @@ def _merge_deployment_spec(
for executor_label, executor_spec in sub_deployment_config.executors.items(
):
if executor_label in main_deployment_config.executors:
old_executor_label = executor_label
executor_label = utils.make_name_unique_by_adding_index(
name=executor_label,
collection=list(main_deployment_config.executors.keys()),
delimiter='-')
old_executor_label = executor_label
executor_label = utils.make_name_unique_by_adding_index(
name=executor_label,
collection=list(main_deployment_config.executors.keys()),
delimiter='-')
if executor_label != old_executor_label:
_rename_executor_labels(
pipeline_spec=sub_pipeline_spec,
old_executor_label=old_executor_label,
@ -1439,7 +1453,6 @@ def _merge_deployment_spec(
def _merge_component_spec(
main_pipeline_spec: pipeline_spec_pb2.PipelineSpec,
sub_pipeline_spec: pipeline_spec_pb2.PipelineSpec,
sub_pipeline_component_name: str,
) -> None:
"""Merges component spec from a sub pipeline spec into the main config.
@ -1451,8 +1464,6 @@ def _merge_component_spec(
main_pipeline_spec: The main pipeline spec to merge into.
sub_pipeline_spec: The pipeline spec of an inner pipeline whose
component specs need to be merged into the global config.
sub_pipeline_component_name: The name of sub pipeline's root component
spec.
"""
def _rename_component_refs(
@ -1462,30 +1473,38 @@ def _merge_component_spec(
) -> None:
"""Renames the old component_ref to the new one in task spec."""
for _, component_spec in pipeline_spec.components.items():
if component_spec.dag:
for _, task_spec in component_spec.dag.tasks.items():
if task_spec.component_ref.name == old_component_ref:
task_spec.component_ref.name = new_component_ref
if not component_spec.dag:
continue
for _, task_spec in component_spec.dag.tasks.items():
if task_spec.component_ref.name == old_component_ref:
task_spec.component_ref.name = new_component_ref
for _, task_spec in pipeline_spec.root.dag.tasks.items():
if task_spec.component_ref.name == old_component_ref:
task_spec.component_ref.name = new_component_ref
# Do all the renaming in place, then do the acutal merge of component specs
# in a second pass. This would ensure all component specs are in the final
# state at the time of merging.
old_name_to_new_name = {}
for component_name, component_spec in sub_pipeline_spec.components.items():
if component_name in main_pipeline_spec.components:
old_component_name = component_name
component_name = utils.make_name_unique_by_adding_index(
name=component_name,
collection=list(main_pipeline_spec.components.keys()),
delimiter='-')
old_component_name = component_name
new_component_name = utils.make_name_unique_by_adding_index(
name=component_name,
collection=list(main_pipeline_spec.components.keys()),
delimiter='-')
old_name_to_new_name[old_component_name] = new_component_name
if new_component_name != old_component_name:
_rename_component_refs(
pipeline_spec=sub_pipeline_spec,
old_component_ref=old_component_name,
new_component_ref=component_name)
main_pipeline_spec.components[component_name].CopyFrom(component_spec)
new_component_ref=new_component_name)
main_pipeline_spec.components[sub_pipeline_component_name].CopyFrom(
sub_pipeline_spec.root)
for old_component_name, component_spec in sub_pipeline_spec.components.items(
):
main_pipeline_spec.components[
old_name_to_new_name[old_component_name]].CopyFrom(component_spec)
def create_pipeline_spec(

View File

@ -226,30 +226,28 @@ class PipelineSpecBuilderTest(parameterized.TestCase):
pipeline_spec_pb2.ComponentSpec(executor_label='exec-1-2'))
expected_main_pipeline_spec.components['comp-2'].CopyFrom(
pipeline_spec_pb2.ComponentSpec(executor_label='exec-2'))
expected_main_pipeline_spec.components['inner-pipeline'].CopyFrom(
pipeline_spec_pb2.ComponentSpec(dag=pipeline_spec_pb2.DagSpec()))
expected_main_pipeline_spec.components['inner-pipeline'].dag.tasks[
'task-1'].CopyFrom(
pipeline_spec_pb2.PipelineTaskSpec(
component_ref=pipeline_spec_pb2.ComponentRef(
name='comp-1-2')))
expected_main_pipeline_spec.components['inner-pipeline'].dag.tasks[
'task-2'].CopyFrom(
pipeline_spec_pb2.PipelineTaskSpec(
component_ref=pipeline_spec_pb2.ComponentRef(
name='comp-2')))
pipeline_spec_builder.merge_deployment_spec_and_component_spec(
expected_sub_pipeline_spec_root = pipeline_spec_pb2.ComponentSpec(
dag=pipeline_spec_pb2.DagSpec())
expected_sub_pipeline_spec_root.dag.tasks['task-1'].CopyFrom(
pipeline_spec_pb2.PipelineTaskSpec(
component_ref=pipeline_spec_pb2.ComponentRef(name='comp-1-2')))
expected_sub_pipeline_spec_root.dag.tasks['task-2'].CopyFrom(
pipeline_spec_pb2.PipelineTaskSpec(
component_ref=pipeline_spec_pb2.ComponentRef(name='comp-2')))
sub_pipeline_spec_copy = pipeline_spec_builder.merge_deployment_spec_and_component_spec(
main_pipeline_spec=main_pipeline_spec,
main_deployment_config=main_deployment_config,
sub_pipeline_spec=sub_pipeline_spec,
sub_pipeline_component_name='inner-pipeline',
)
self.assertEqual(sub_pipeline_spec, expected_sub_pipeline_spec)
self.assertEqual(main_pipeline_spec, expected_main_pipeline_spec)
self.assertEqual(main_deployment_config,
expected_main_deployment_config)
self.assertEqual(sub_pipeline_spec_copy.root,
expected_sub_pipeline_spec_root)
def pipeline_spec_from_file(filepath: str) -> str:

View File

@ -0,0 +1,61 @@
# Copyright 2022 The Kubeflow Authors
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Pipeline using ExitHandler with PipelineTaskFinalStatus."""
from kfp import compiler
from kfp import dsl
from kfp.dsl import component
from kfp.dsl import PipelineTaskFinalStatus
@component
def print_op(message: str):
"""Prints a message."""
print(message)
@component
def fail_op(message: str):
"""Fails."""
import sys
print(message)
sys.exit(1)
@component
def get_run_state(status: dict) -> str:
print('Pipeline status: ', status)
return status['state']
@dsl.pipeline(name='conditional-notification')
def exit_op(status: PipelineTaskFinalStatus):
"""Checks pipeline run status."""
with dsl.Condition(get_run_state(status=status).output == 'FAILED'):
print_op(message='notify task failure.')
@dsl.pipeline(name='pipeline-with-task-final-status-conditional')
def my_pipeline(message: str = 'Hello World!'):
exit_task = exit_op()
with dsl.ExitHandler(exit_task, name='my-pipeline'):
print_op(message=message)
fail_op(message='Task failed.')
if __name__ == '__main__':
compiler.Compiler().compile(
pipeline_func=my_pipeline,
package_path=__file__.replace('.py', '.yaml'))

View File

@ -0,0 +1,259 @@
components:
comp-condition-1:
dag:
tasks:
print-op:
cachingOptions:
enableCache: true
componentRef:
name: comp-print-op-2
inputs:
parameters:
message:
runtimeValue:
constant: notify task failure.
taskInfo:
name: print-op
inputDefinitions:
parameters:
pipelinechannel--get-run-state-Output:
parameterType: STRING
comp-conditional-notification:
dag:
tasks:
condition-1:
componentRef:
name: comp-condition-1
dependentTasks:
- get-run-state
inputs:
parameters:
pipelinechannel--get-run-state-Output:
taskOutputParameter:
outputParameterKey: Output
producerTask: get-run-state
taskInfo:
name: condition-1
triggerPolicy:
condition: inputs.parameter_values['pipelinechannel--get-run-state-Output']
== 'FAILED'
get-run-state:
cachingOptions:
enableCache: true
componentRef:
name: comp-get-run-state
inputs:
parameters:
status:
componentInputParameter: status
taskInfo:
name: get-run-state
inputDefinitions:
parameters:
status:
parameterType: STRUCT
comp-exit-handler-1:
dag:
tasks:
fail-op:
cachingOptions:
enableCache: true
componentRef:
name: comp-fail-op
inputs:
parameters:
message:
runtimeValue:
constant: Task failed.
taskInfo:
name: fail-op
print-op:
cachingOptions:
enableCache: true
componentRef:
name: comp-print-op
inputs:
parameters:
message:
componentInputParameter: pipelinechannel--message
taskInfo:
name: print-op
inputDefinitions:
parameters:
pipelinechannel--message:
parameterType: STRING
comp-fail-op:
executorLabel: exec-fail-op
inputDefinitions:
parameters:
message:
parameterType: STRING
comp-get-run-state:
executorLabel: exec-get-run-state
inputDefinitions:
parameters:
status:
parameterType: STRUCT
outputDefinitions:
parameters:
Output:
parameterType: STRING
comp-print-op:
executorLabel: exec-print-op
inputDefinitions:
parameters:
message:
parameterType: STRING
comp-print-op-2:
executorLabel: exec-print-op-2
inputDefinitions:
parameters:
message:
parameterType: STRING
deploymentSpec:
executors:
exec-fail-op:
container:
args:
- --executor_input
- '{{$}}'
- --function_to_execute
- fail_op
command:
- sh
- -c
- "\nif ! [ -x \"$(command -v pip)\" ]; then\n python3 -m ensurepip ||\
\ python3 -m ensurepip --user || apt-get install python3-pip\nfi\n\nPIP_DISABLE_PIP_VERSION_CHECK=1\
\ python3 -m pip install --quiet --no-warn-script-location 'kfp==2.0.0-beta.3'\
\ && \"$0\" \"$@\"\n"
- sh
- -ec
- 'program_path=$(mktemp -d)
printf "%s" "$0" > "$program_path/ephemeral_component.py"
python3 -m kfp.components.executor_main --component_module_path "$program_path/ephemeral_component.py" "$@"
'
- "\nimport kfp\nfrom kfp import dsl\nfrom kfp.dsl import *\nfrom typing import\
\ *\n\ndef fail_op(message: str):\n \"\"\"Fails.\"\"\"\n import sys\n\
\ print(message)\n sys.exit(1)\n\n"
image: python:3.7
exec-get-run-state:
container:
args:
- --executor_input
- '{{$}}'
- --function_to_execute
- get_run_state
command:
- sh
- -c
- "\nif ! [ -x \"$(command -v pip)\" ]; then\n python3 -m ensurepip ||\
\ python3 -m ensurepip --user || apt-get install python3-pip\nfi\n\nPIP_DISABLE_PIP_VERSION_CHECK=1\
\ python3 -m pip install --quiet --no-warn-script-location 'kfp==2.0.0-beta.3'\
\ && \"$0\" \"$@\"\n"
- sh
- -ec
- 'program_path=$(mktemp -d)
printf "%s" "$0" > "$program_path/ephemeral_component.py"
python3 -m kfp.components.executor_main --component_module_path "$program_path/ephemeral_component.py" "$@"
'
- "\nimport kfp\nfrom kfp import dsl\nfrom kfp.dsl import *\nfrom typing import\
\ *\n\ndef get_run_state(status: dict) -> str:\n print('Pipeline status:\
\ ', status)\n return status['state']\n\n"
image: python:3.7
exec-print-op:
container:
args:
- --executor_input
- '{{$}}'
- --function_to_execute
- print_op
command:
- sh
- -c
- "\nif ! [ -x \"$(command -v pip)\" ]; then\n python3 -m ensurepip ||\
\ python3 -m ensurepip --user || apt-get install python3-pip\nfi\n\nPIP_DISABLE_PIP_VERSION_CHECK=1\
\ python3 -m pip install --quiet --no-warn-script-location 'kfp==2.0.0-beta.3'\
\ && \"$0\" \"$@\"\n"
- sh
- -ec
- 'program_path=$(mktemp -d)
printf "%s" "$0" > "$program_path/ephemeral_component.py"
python3 -m kfp.components.executor_main --component_module_path "$program_path/ephemeral_component.py" "$@"
'
- "\nimport kfp\nfrom kfp import dsl\nfrom kfp.dsl import *\nfrom typing import\
\ *\n\ndef print_op(message: str):\n \"\"\"Prints a message.\"\"\"\n\
\ print(message)\n\n"
image: python:3.7
exec-print-op-2:
container:
args:
- --executor_input
- '{{$}}'
- --function_to_execute
- print_op
command:
- sh
- -c
- "\nif ! [ -x \"$(command -v pip)\" ]; then\n python3 -m ensurepip ||\
\ python3 -m ensurepip --user || apt-get install python3-pip\nfi\n\nPIP_DISABLE_PIP_VERSION_CHECK=1\
\ python3 -m pip install --quiet --no-warn-script-location 'kfp==2.0.0-beta.3'\
\ && \"$0\" \"$@\"\n"
- sh
- -ec
- 'program_path=$(mktemp -d)
printf "%s" "$0" > "$program_path/ephemeral_component.py"
python3 -m kfp.components.executor_main --component_module_path "$program_path/ephemeral_component.py" "$@"
'
- "\nimport kfp\nfrom kfp import dsl\nfrom kfp.dsl import *\nfrom typing import\
\ *\n\ndef print_op(message: str):\n \"\"\"Prints a message.\"\"\"\n\
\ print(message)\n\n"
image: python:3.7
pipelineInfo:
name: pipeline-with-task-final-status-conditional
root:
dag:
tasks:
conditional-notification:
cachingOptions:
enableCache: true
componentRef:
name: comp-conditional-notification
dependentTasks:
- exit-handler-1
inputs:
parameters:
status:
taskFinalStatus:
producerTask: exit-handler-1
taskInfo:
name: conditional-notification
triggerPolicy:
strategy: ALL_UPSTREAM_TASKS_COMPLETED
exit-handler-1:
componentRef:
name: comp-exit-handler-1
inputs:
parameters:
pipelinechannel--message:
componentInputParameter: message
taskInfo:
name: my-pipeline
inputDefinitions:
parameters:
message:
defaultValue: Hello World!
parameterType: STRING
schemaVersion: 2.1.0
sdkVersion: kfp-2.0.0-beta.3

View File

@ -403,11 +403,11 @@ class Implementation(base_model.BaseModel):
@classmethod
def from_pipeline_spec_dict(cls, pipeline_spec_dict: Dict[str, Any],
component_name: str) -> 'Implementation':
"""Creates an Implmentation object from a deployment spec message in
dict format (pipeline_spec.deploymentSpec).
"""Creates an Implementation object from a PipelineSpec message in dict
format.
Args:
deployment_spec_dict (Dict[str, Any]): PipelineDeploymentConfig message in dict format.
pipeline_spec_dict (Dict[str, Any]): PipelineSpec message in dict format.
component_name (str): The name of the component.
Returns:

View File

@ -235,6 +235,10 @@ def verify_type_compatibility(
str(expected_type).lower() == 'artifact'):
return True
# Special handling for PipelineTaskFinalStatus, treat it as Dict type.
if is_task_final_status_type(given_type):
given_type = 'Dict'
# Normalize parameter type names.
if is_parameter_type(given_type):
given_type = get_parameter_type_name(given_type)

View File

@ -28,7 +28,7 @@ class YamlComponent(components.BaseComponent):
**Note:** ``YamlComponent`` is not intended to be used to construct components directly. Use ``kfp.components.load_component_from_*()`` instead.
Artribute:
Attribute:
component_spec: Component definition.
component_yaml: The yaml string that this component is loaded from.
"""