From 5cd708de3714fbe63088e06eabd40f322dbf2a1f Mon Sep 17 00:00:00 2001 From: Connor McCarthy Date: Sat, 9 Dec 2023 14:08:21 -0500 Subject: [PATCH] feat(sdk): add local execution skeleton #localexecution (#10292) --- sdk/python/kfp/dsl/pipeline_task.py | 79 +++++++++++++- sdk/python/kfp/dsl/pipeline_task_test.py | 129 +++++++++++++++++++++++ sdk/python/kfp/local/task_dispatcher.py | 34 ++++++ 3 files changed, 237 insertions(+), 5 deletions(-) create mode 100644 sdk/python/kfp/local/task_dispatcher.py diff --git a/sdk/python/kfp/dsl/pipeline_task.py b/sdk/python/kfp/dsl/pipeline_task.py index ecf1640a88..a6be07a22e 100644 --- a/sdk/python/kfp/dsl/pipeline_task.py +++ b/sdk/python/kfp/dsl/pipeline_task.py @@ -14,6 +14,8 @@ """Pipeline task class and operations.""" import copy +import enum +import functools import inspect import itertools import re @@ -26,12 +28,43 @@ from kfp.dsl import placeholders from kfp.dsl import structures from kfp.dsl import utils from kfp.dsl.types import type_utils +from kfp.local import task_dispatcher from kfp.pipeline_spec import pipeline_spec_pb2 +TEMPORARILY_BLOCK_LOCAL_EXECUTION = True + _register_task_handler = lambda task: utils.maybe_rename_for_k8s( task.component_spec.name) +class TaskState(enum.Enum): + FUTURE = 'FUTURE' + FINAL = 'FINAL' + + +def block_if_final(custom_message: Optional[str] = None): + + def actual_decorator(method): + method_name = method.__name__ + + @functools.wraps(method) + def wrapper(self: 'PipelineTask', *args, **kwargs): + if self.state == TaskState.FINAL: + raise Exception( + custom_message or + f"Task configuration methods are not supported for local execution. Got call to '.{method_name}()'." + ) + elif self.state == TaskState.FUTURE: + return method(self, *args, **kwargs) + else: + raise ValueError( + f'Got unknown {TaskState.__name__}: {self.state}.') + + return wrapper + + return actual_decorator + + class PipelineTask: """Represents a pipeline task (instantiated component). @@ -65,12 +98,12 @@ class PipelineTask: def __init__( self, component_spec: structures.ComponentSpec, - args: Mapping[str, Any], + args: Dict[str, Any], ): """Initilizes a PipelineTask instance.""" # import within __init__ to avoid circular import from kfp.dsl.tasks_group import TasksGroup - + self.state = TaskState.FUTURE self.parent_task_group: Union[None, TasksGroup] = None args = args or {} @@ -148,7 +181,27 @@ class PipelineTask: if not isinstance(value, pipeline_channel.PipelineChannel) ]) + from kfp.dsl import pipeline_context + + # TODO: remove feature flag + if not TEMPORARILY_BLOCK_LOCAL_EXECUTION and pipeline_context.Pipeline.get_default_pipeline( + ) is None: + self._execute_locally() + + def _execute_locally(self) -> None: + """Execute the pipeline task locally. + + Set the task state to FINAL and update the outputs. + """ + self._outputs = task_dispatcher.run_single_component( + pipeline_spec=self.pipeline_spec, + arguments=self.args, + ) + self.state = TaskState.FINAL + @property + @block_if_final( + 'Platform-specific features are not supported for local execution.') def platform_spec(self) -> pipeline_spec_pb2.PlatformSpec: """PlatformSpec for all tasks in the pipeline as task. @@ -173,9 +226,9 @@ class PipelineTask: @property def inputs( self - ) -> List[Union[type_utils.PARAMETER_TYPES, - pipeline_channel.PipelineChannel]]: - """The list of actual inputs passed to the task.""" + ) -> Dict[str, Union[type_utils.PARAMETER_TYPES, + pipeline_channel.PipelineChannel]]: + """The inputs passed to the task.""" return self._inputs @property @@ -208,6 +261,8 @@ class PipelineTask: return self._outputs @property + @block_if_final( + 'Task has no dependent tasks since it is executed independently.') def dependent_tasks(self) -> List[str]: """A list of the dependent task names.""" return self._task_spec.dependent_tasks @@ -236,6 +291,7 @@ class PipelineTask: ] return container_spec + @block_if_final() def set_caching_options(self, enable_caching: bool) -> 'PipelineTask': """Sets caching options for the task. @@ -280,6 +336,7 @@ class PipelineTask: return float(cpu[:-1]) / 1000 if cpu.endswith('m') else float(cpu) + @block_if_final() def set_cpu_request(self, cpu: str) -> 'PipelineTask': """Sets CPU request (minimum) for the task. @@ -304,6 +361,7 @@ class PipelineTask: return self + @block_if_final() def set_cpu_limit(self, cpu: str) -> 'PipelineTask': """Sets CPU limit (maximum) for the task. @@ -328,6 +386,7 @@ class PipelineTask: return self + @block_if_final() def set_accelerator_limit(self, limit: int) -> 'PipelineTask': """Sets accelerator limit (maximum) for the task. Only applies if accelerator type is also set via .set_accelerator_type(). @@ -353,6 +412,7 @@ class PipelineTask: return self + @block_if_final() def set_gpu_limit(self, gpu: str) -> 'PipelineTask': """Sets GPU limit (maximum) for the task. Only applies if accelerator type is also set via .add_accelerator_type(). @@ -422,6 +482,7 @@ class PipelineTask: return memory + @block_if_final() def set_memory_request(self, memory: str) -> 'PipelineTask': """Sets memory request (minimum) for the task. @@ -445,6 +506,7 @@ class PipelineTask: return self + @block_if_final() def set_memory_limit(self, memory: str) -> 'PipelineTask': """Sets memory limit (maximum) for the task. @@ -468,6 +530,7 @@ class PipelineTask: return self + @block_if_final() def set_retry(self, num_retries: int, backoff_duration: Optional[str] = None, @@ -492,6 +555,7 @@ class PipelineTask: ) return self + @block_if_final() def add_node_selector_constraint(self, accelerator: str) -> 'PipelineTask': """Sets accelerator type to use when executing this task. @@ -506,6 +570,7 @@ class PipelineTask: category=DeprecationWarning) return self.set_accelerator_type(accelerator) + @block_if_final() def set_accelerator_type(self, accelerator: str) -> 'PipelineTask': """Sets accelerator type to use when executing this task. @@ -527,6 +592,7 @@ class PipelineTask: return self + @block_if_final() def set_display_name(self, name: str) -> 'PipelineTask': """Sets display name for the task. @@ -539,6 +605,7 @@ class PipelineTask: self._task_spec.display_name = name return self + @block_if_final() def set_env_variable(self, name: str, value: str) -> 'PipelineTask': """Sets environment variable for the task. @@ -557,6 +624,7 @@ class PipelineTask: self.container_spec.env = {name: value} return self + @block_if_final() def after(self, *tasks) -> 'PipelineTask': """Specifies an explicit dependency on other tasks by requiring this task be executed after other tasks finish completion. @@ -580,6 +648,7 @@ class PipelineTask: self._task_spec.dependent_tasks.append(task.name) return self + @block_if_final() def ignore_upstream_failure(self) -> 'PipelineTask': """If called, the pipeline task will run when any specified upstream tasks complete, even if unsuccessful. diff --git a/sdk/python/kfp/dsl/pipeline_task_test.py b/sdk/python/kfp/dsl/pipeline_task_test.py index 6e7443fc1a..c5bedc3811 100644 --- a/sdk/python/kfp/dsl/pipeline_task_test.py +++ b/sdk/python/kfp/dsl/pipeline_task_test.py @@ -377,5 +377,134 @@ class TestPlatformSpecificFunctionality(unittest.TestCase): t.platform_spec +class TestTaskInFinalState(unittest.TestCase): + """Tests PipelineTask in the state FINAL. + + Many properties and methods will be blocked. + + Also tests that the .output and .outputs behavior behaves as expected when the outputs are values, not placeholders, as will be the case when PipelineTask is in the state FINAL. + """ + + def test_output_property(self): + task = pipeline_task.PipelineTask( + component_spec=structures.ComponentSpec.from_yaml_documents( + V2_YAML), + args={'input1': 'value'}, + ) + task.state = pipeline_task.TaskState.FINAL + task._outputs = {'Output': 1} + self.assertEqual(task.output, 1) + self.assertEqual(task.outputs['Output'], 1) + + def test_outputs_property(self): + task = pipeline_task.PipelineTask( + component_spec=structures.ComponentSpec.from_yaml_documents( + V2_YAML), + args={'input1': 'value'}, + ) + task.state = pipeline_task.TaskState.FINAL + task._outputs = { + 'int_output': + 1, + 'str_output': + 'foo', + 'dataset_output': + dsl.Dataset( + name='dataset_output', + uri='foo/bar/dataset_output', + metadata={'key': 'value'}) + } + self.assertEqual(task.outputs['int_output'], 1) + self.assertEqual(task.outputs['str_output'], 'foo') + assert_artifacts_equal( + self, + task.outputs['dataset_output'], + dsl.Dataset( + name='dataset_output', + uri='foo/bar/dataset_output', + metadata={'key': 'value'}), + ) + + def test_platform_spec_property(self): + task = pipeline_task.PipelineTask( + component_spec=structures.ComponentSpec.from_yaml_documents( + V2_YAML), + args={'input1': 'value'}, + ) + task.state = pipeline_task.TaskState.FINAL + with self.assertRaisesRegex( + Exception, + r'Platform-specific features are not supported for local execution\.' + ): + task.platform_spec + + def test_name_property(self): + task = pipeline_task.PipelineTask( + component_spec=structures.ComponentSpec.from_yaml_documents( + V2_YAML), + args={'input1': 'value'}, + ) + task.state = pipeline_task.TaskState.FINAL + self.assertEqual(task.name, 'component1') + + def test_inputs_property(self): + task = pipeline_task.PipelineTask( + component_spec=structures.ComponentSpec.from_yaml_documents( + V2_YAML), + args={'input1': 'value'}, + ) + task.state = pipeline_task.TaskState.FINAL + self.assertEqual(task.inputs, {'input1': 'value'}) + + def test_dependent_tasks_property(self): + task = pipeline_task.PipelineTask( + component_spec=structures.ComponentSpec.from_yaml_documents( + V2_YAML), + args={'input1': 'value'}, + ) + task.state = pipeline_task.TaskState.FINAL + with self.assertRaisesRegex( + Exception, + r'Task has no dependent tasks since it is executed independently\.' + ): + task.dependent_tasks + + def test_sampling_of_task_configuration_methods(self): + task = pipeline_task.PipelineTask( + component_spec=structures.ComponentSpec.from_yaml_documents( + V2_YAML), + args={'input1': 'value'}, + ) + task.state = pipeline_task.TaskState.FINAL + with self.assertRaisesRegex( + Exception, + r"Task configuration methods are not supported for local execution\. Got call to '\.set_caching_options\(\)'\." + ): + task.set_caching_options(enable_caching=True) + with self.assertRaisesRegex( + Exception, + r"Task configuration methods are not supported for local execution\. Got call to '\.set_env_variable\(\)'\." + ): + task.set_env_variable(name='foo', value='BAR') + with self.assertRaisesRegex( + Exception, + r"Task configuration methods are not supported for local execution\. Got call to '\.ignore_upstream_failure\(\)'\." + ): + task.ignore_upstream_failure() + + +def assert_artifacts_equal( + test_class: unittest.TestCase, + a1: dsl.Artifact, + a2: dsl.Artifact, +) -> None: + test_class.assertEqual(a1.name, a2.name) + test_class.assertEqual(a1.uri, a2.uri) + test_class.assertEqual(a1.metadata, a2.metadata) + test_class.assertEqual(a1.schema_title, a2.schema_title) + test_class.assertEqual(a1.schema_version, a2.schema_version) + test_class.assertIsInstance(a1, type(a2)) + + if __name__ == '__main__': unittest.main() diff --git a/sdk/python/kfp/local/task_dispatcher.py b/sdk/python/kfp/local/task_dispatcher.py new file mode 100644 index 0000000000..d3503f20ea --- /dev/null +++ b/sdk/python/kfp/local/task_dispatcher.py @@ -0,0 +1,34 @@ +# Copyright 2023 The Kubeflow Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +"""Code for dispatching a local task execution.""" +from typing import Any, Dict + +from kfp.pipeline_spec import pipeline_spec_pb2 + + +def run_single_component( + pipeline_spec: pipeline_spec_pb2.PipelineSpec, + arguments: Dict[str, Any], +) -> Dict[str, Any]: + """Runs a single component from its compiled PipelineSpec. + + Args: + pipeline_spec: The PipelineSpec of the component to run. + arguments: The runtime arguments. + + Returns: + A LocalTask instance. + """ + # TODO: implement and return outputs + return {}