feat(sdk): add local execution skeleton #localexecution (#10292)
This commit is contained in:
parent
5c60d37616
commit
5cd708de37
|
|
@ -14,6 +14,8 @@
|
|||
"""Pipeline task class and operations."""
|
||||
|
||||
import copy
|
||||
import enum
|
||||
import functools
|
||||
import inspect
|
||||
import itertools
|
||||
import re
|
||||
|
|
@ -26,12 +28,43 @@ from kfp.dsl import placeholders
|
|||
from kfp.dsl import structures
|
||||
from kfp.dsl import utils
|
||||
from kfp.dsl.types import type_utils
|
||||
from kfp.local import task_dispatcher
|
||||
from kfp.pipeline_spec import pipeline_spec_pb2
|
||||
|
||||
TEMPORARILY_BLOCK_LOCAL_EXECUTION = True
|
||||
|
||||
_register_task_handler = lambda task: utils.maybe_rename_for_k8s(
|
||||
task.component_spec.name)
|
||||
|
||||
|
||||
class TaskState(enum.Enum):
|
||||
FUTURE = 'FUTURE'
|
||||
FINAL = 'FINAL'
|
||||
|
||||
|
||||
def block_if_final(custom_message: Optional[str] = None):
|
||||
|
||||
def actual_decorator(method):
|
||||
method_name = method.__name__
|
||||
|
||||
@functools.wraps(method)
|
||||
def wrapper(self: 'PipelineTask', *args, **kwargs):
|
||||
if self.state == TaskState.FINAL:
|
||||
raise Exception(
|
||||
custom_message or
|
||||
f"Task configuration methods are not supported for local execution. Got call to '.{method_name}()'."
|
||||
)
|
||||
elif self.state == TaskState.FUTURE:
|
||||
return method(self, *args, **kwargs)
|
||||
else:
|
||||
raise ValueError(
|
||||
f'Got unknown {TaskState.__name__}: {self.state}.')
|
||||
|
||||
return wrapper
|
||||
|
||||
return actual_decorator
|
||||
|
||||
|
||||
class PipelineTask:
|
||||
"""Represents a pipeline task (instantiated component).
|
||||
|
||||
|
|
@ -65,12 +98,12 @@ class PipelineTask:
|
|||
def __init__(
|
||||
self,
|
||||
component_spec: structures.ComponentSpec,
|
||||
args: Mapping[str, Any],
|
||||
args: Dict[str, Any],
|
||||
):
|
||||
"""Initilizes a PipelineTask instance."""
|
||||
# import within __init__ to avoid circular import
|
||||
from kfp.dsl.tasks_group import TasksGroup
|
||||
|
||||
self.state = TaskState.FUTURE
|
||||
self.parent_task_group: Union[None, TasksGroup] = None
|
||||
args = args or {}
|
||||
|
||||
|
|
@ -148,7 +181,27 @@ class PipelineTask:
|
|||
if not isinstance(value, pipeline_channel.PipelineChannel)
|
||||
])
|
||||
|
||||
from kfp.dsl import pipeline_context
|
||||
|
||||
# TODO: remove feature flag
|
||||
if not TEMPORARILY_BLOCK_LOCAL_EXECUTION and pipeline_context.Pipeline.get_default_pipeline(
|
||||
) is None:
|
||||
self._execute_locally()
|
||||
|
||||
def _execute_locally(self) -> None:
|
||||
"""Execute the pipeline task locally.
|
||||
|
||||
Set the task state to FINAL and update the outputs.
|
||||
"""
|
||||
self._outputs = task_dispatcher.run_single_component(
|
||||
pipeline_spec=self.pipeline_spec,
|
||||
arguments=self.args,
|
||||
)
|
||||
self.state = TaskState.FINAL
|
||||
|
||||
@property
|
||||
@block_if_final(
|
||||
'Platform-specific features are not supported for local execution.')
|
||||
def platform_spec(self) -> pipeline_spec_pb2.PlatformSpec:
|
||||
"""PlatformSpec for all tasks in the pipeline as task.
|
||||
|
||||
|
|
@ -173,9 +226,9 @@ class PipelineTask:
|
|||
@property
|
||||
def inputs(
|
||||
self
|
||||
) -> List[Union[type_utils.PARAMETER_TYPES,
|
||||
pipeline_channel.PipelineChannel]]:
|
||||
"""The list of actual inputs passed to the task."""
|
||||
) -> Dict[str, Union[type_utils.PARAMETER_TYPES,
|
||||
pipeline_channel.PipelineChannel]]:
|
||||
"""The inputs passed to the task."""
|
||||
return self._inputs
|
||||
|
||||
@property
|
||||
|
|
@ -208,6 +261,8 @@ class PipelineTask:
|
|||
return self._outputs
|
||||
|
||||
@property
|
||||
@block_if_final(
|
||||
'Task has no dependent tasks since it is executed independently.')
|
||||
def dependent_tasks(self) -> List[str]:
|
||||
"""A list of the dependent task names."""
|
||||
return self._task_spec.dependent_tasks
|
||||
|
|
@ -236,6 +291,7 @@ class PipelineTask:
|
|||
]
|
||||
return container_spec
|
||||
|
||||
@block_if_final()
|
||||
def set_caching_options(self, enable_caching: bool) -> 'PipelineTask':
|
||||
"""Sets caching options for the task.
|
||||
|
||||
|
|
@ -280,6 +336,7 @@ class PipelineTask:
|
|||
|
||||
return float(cpu[:-1]) / 1000 if cpu.endswith('m') else float(cpu)
|
||||
|
||||
@block_if_final()
|
||||
def set_cpu_request(self, cpu: str) -> 'PipelineTask':
|
||||
"""Sets CPU request (minimum) for the task.
|
||||
|
||||
|
|
@ -304,6 +361,7 @@ class PipelineTask:
|
|||
|
||||
return self
|
||||
|
||||
@block_if_final()
|
||||
def set_cpu_limit(self, cpu: str) -> 'PipelineTask':
|
||||
"""Sets CPU limit (maximum) for the task.
|
||||
|
||||
|
|
@ -328,6 +386,7 @@ class PipelineTask:
|
|||
|
||||
return self
|
||||
|
||||
@block_if_final()
|
||||
def set_accelerator_limit(self, limit: int) -> 'PipelineTask':
|
||||
"""Sets accelerator limit (maximum) for the task. Only applies if
|
||||
accelerator type is also set via .set_accelerator_type().
|
||||
|
|
@ -353,6 +412,7 @@ class PipelineTask:
|
|||
|
||||
return self
|
||||
|
||||
@block_if_final()
|
||||
def set_gpu_limit(self, gpu: str) -> 'PipelineTask':
|
||||
"""Sets GPU limit (maximum) for the task. Only applies if accelerator
|
||||
type is also set via .add_accelerator_type().
|
||||
|
|
@ -422,6 +482,7 @@ class PipelineTask:
|
|||
|
||||
return memory
|
||||
|
||||
@block_if_final()
|
||||
def set_memory_request(self, memory: str) -> 'PipelineTask':
|
||||
"""Sets memory request (minimum) for the task.
|
||||
|
||||
|
|
@ -445,6 +506,7 @@ class PipelineTask:
|
|||
|
||||
return self
|
||||
|
||||
@block_if_final()
|
||||
def set_memory_limit(self, memory: str) -> 'PipelineTask':
|
||||
"""Sets memory limit (maximum) for the task.
|
||||
|
||||
|
|
@ -468,6 +530,7 @@ class PipelineTask:
|
|||
|
||||
return self
|
||||
|
||||
@block_if_final()
|
||||
def set_retry(self,
|
||||
num_retries: int,
|
||||
backoff_duration: Optional[str] = None,
|
||||
|
|
@ -492,6 +555,7 @@ class PipelineTask:
|
|||
)
|
||||
return self
|
||||
|
||||
@block_if_final()
|
||||
def add_node_selector_constraint(self, accelerator: str) -> 'PipelineTask':
|
||||
"""Sets accelerator type to use when executing this task.
|
||||
|
||||
|
|
@ -506,6 +570,7 @@ class PipelineTask:
|
|||
category=DeprecationWarning)
|
||||
return self.set_accelerator_type(accelerator)
|
||||
|
||||
@block_if_final()
|
||||
def set_accelerator_type(self, accelerator: str) -> 'PipelineTask':
|
||||
"""Sets accelerator type to use when executing this task.
|
||||
|
||||
|
|
@ -527,6 +592,7 @@ class PipelineTask:
|
|||
|
||||
return self
|
||||
|
||||
@block_if_final()
|
||||
def set_display_name(self, name: str) -> 'PipelineTask':
|
||||
"""Sets display name for the task.
|
||||
|
||||
|
|
@ -539,6 +605,7 @@ class PipelineTask:
|
|||
self._task_spec.display_name = name
|
||||
return self
|
||||
|
||||
@block_if_final()
|
||||
def set_env_variable(self, name: str, value: str) -> 'PipelineTask':
|
||||
"""Sets environment variable for the task.
|
||||
|
||||
|
|
@ -557,6 +624,7 @@ class PipelineTask:
|
|||
self.container_spec.env = {name: value}
|
||||
return self
|
||||
|
||||
@block_if_final()
|
||||
def after(self, *tasks) -> 'PipelineTask':
|
||||
"""Specifies an explicit dependency on other tasks by requiring this
|
||||
task be executed after other tasks finish completion.
|
||||
|
|
@ -580,6 +648,7 @@ class PipelineTask:
|
|||
self._task_spec.dependent_tasks.append(task.name)
|
||||
return self
|
||||
|
||||
@block_if_final()
|
||||
def ignore_upstream_failure(self) -> 'PipelineTask':
|
||||
"""If called, the pipeline task will run when any specified upstream
|
||||
tasks complete, even if unsuccessful.
|
||||
|
|
|
|||
|
|
@ -377,5 +377,134 @@ class TestPlatformSpecificFunctionality(unittest.TestCase):
|
|||
t.platform_spec
|
||||
|
||||
|
||||
class TestTaskInFinalState(unittest.TestCase):
|
||||
"""Tests PipelineTask in the state FINAL.
|
||||
|
||||
Many properties and methods will be blocked.
|
||||
|
||||
Also tests that the .output and .outputs behavior behaves as expected when the outputs are values, not placeholders, as will be the case when PipelineTask is in the state FINAL.
|
||||
"""
|
||||
|
||||
def test_output_property(self):
|
||||
task = pipeline_task.PipelineTask(
|
||||
component_spec=structures.ComponentSpec.from_yaml_documents(
|
||||
V2_YAML),
|
||||
args={'input1': 'value'},
|
||||
)
|
||||
task.state = pipeline_task.TaskState.FINAL
|
||||
task._outputs = {'Output': 1}
|
||||
self.assertEqual(task.output, 1)
|
||||
self.assertEqual(task.outputs['Output'], 1)
|
||||
|
||||
def test_outputs_property(self):
|
||||
task = pipeline_task.PipelineTask(
|
||||
component_spec=structures.ComponentSpec.from_yaml_documents(
|
||||
V2_YAML),
|
||||
args={'input1': 'value'},
|
||||
)
|
||||
task.state = pipeline_task.TaskState.FINAL
|
||||
task._outputs = {
|
||||
'int_output':
|
||||
1,
|
||||
'str_output':
|
||||
'foo',
|
||||
'dataset_output':
|
||||
dsl.Dataset(
|
||||
name='dataset_output',
|
||||
uri='foo/bar/dataset_output',
|
||||
metadata={'key': 'value'})
|
||||
}
|
||||
self.assertEqual(task.outputs['int_output'], 1)
|
||||
self.assertEqual(task.outputs['str_output'], 'foo')
|
||||
assert_artifacts_equal(
|
||||
self,
|
||||
task.outputs['dataset_output'],
|
||||
dsl.Dataset(
|
||||
name='dataset_output',
|
||||
uri='foo/bar/dataset_output',
|
||||
metadata={'key': 'value'}),
|
||||
)
|
||||
|
||||
def test_platform_spec_property(self):
|
||||
task = pipeline_task.PipelineTask(
|
||||
component_spec=structures.ComponentSpec.from_yaml_documents(
|
||||
V2_YAML),
|
||||
args={'input1': 'value'},
|
||||
)
|
||||
task.state = pipeline_task.TaskState.FINAL
|
||||
with self.assertRaisesRegex(
|
||||
Exception,
|
||||
r'Platform-specific features are not supported for local execution\.'
|
||||
):
|
||||
task.platform_spec
|
||||
|
||||
def test_name_property(self):
|
||||
task = pipeline_task.PipelineTask(
|
||||
component_spec=structures.ComponentSpec.from_yaml_documents(
|
||||
V2_YAML),
|
||||
args={'input1': 'value'},
|
||||
)
|
||||
task.state = pipeline_task.TaskState.FINAL
|
||||
self.assertEqual(task.name, 'component1')
|
||||
|
||||
def test_inputs_property(self):
|
||||
task = pipeline_task.PipelineTask(
|
||||
component_spec=structures.ComponentSpec.from_yaml_documents(
|
||||
V2_YAML),
|
||||
args={'input1': 'value'},
|
||||
)
|
||||
task.state = pipeline_task.TaskState.FINAL
|
||||
self.assertEqual(task.inputs, {'input1': 'value'})
|
||||
|
||||
def test_dependent_tasks_property(self):
|
||||
task = pipeline_task.PipelineTask(
|
||||
component_spec=structures.ComponentSpec.from_yaml_documents(
|
||||
V2_YAML),
|
||||
args={'input1': 'value'},
|
||||
)
|
||||
task.state = pipeline_task.TaskState.FINAL
|
||||
with self.assertRaisesRegex(
|
||||
Exception,
|
||||
r'Task has no dependent tasks since it is executed independently\.'
|
||||
):
|
||||
task.dependent_tasks
|
||||
|
||||
def test_sampling_of_task_configuration_methods(self):
|
||||
task = pipeline_task.PipelineTask(
|
||||
component_spec=structures.ComponentSpec.from_yaml_documents(
|
||||
V2_YAML),
|
||||
args={'input1': 'value'},
|
||||
)
|
||||
task.state = pipeline_task.TaskState.FINAL
|
||||
with self.assertRaisesRegex(
|
||||
Exception,
|
||||
r"Task configuration methods are not supported for local execution\. Got call to '\.set_caching_options\(\)'\."
|
||||
):
|
||||
task.set_caching_options(enable_caching=True)
|
||||
with self.assertRaisesRegex(
|
||||
Exception,
|
||||
r"Task configuration methods are not supported for local execution\. Got call to '\.set_env_variable\(\)'\."
|
||||
):
|
||||
task.set_env_variable(name='foo', value='BAR')
|
||||
with self.assertRaisesRegex(
|
||||
Exception,
|
||||
r"Task configuration methods are not supported for local execution\. Got call to '\.ignore_upstream_failure\(\)'\."
|
||||
):
|
||||
task.ignore_upstream_failure()
|
||||
|
||||
|
||||
def assert_artifacts_equal(
|
||||
test_class: unittest.TestCase,
|
||||
a1: dsl.Artifact,
|
||||
a2: dsl.Artifact,
|
||||
) -> None:
|
||||
test_class.assertEqual(a1.name, a2.name)
|
||||
test_class.assertEqual(a1.uri, a2.uri)
|
||||
test_class.assertEqual(a1.metadata, a2.metadata)
|
||||
test_class.assertEqual(a1.schema_title, a2.schema_title)
|
||||
test_class.assertEqual(a1.schema_version, a2.schema_version)
|
||||
test_class.assertIsInstance(a1, type(a2))
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
unittest.main()
|
||||
|
|
|
|||
|
|
@ -0,0 +1,34 @@
|
|||
# Copyright 2023 The Kubeflow Authors
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
"""Code for dispatching a local task execution."""
|
||||
from typing import Any, Dict
|
||||
|
||||
from kfp.pipeline_spec import pipeline_spec_pb2
|
||||
|
||||
|
||||
def run_single_component(
|
||||
pipeline_spec: pipeline_spec_pb2.PipelineSpec,
|
||||
arguments: Dict[str, Any],
|
||||
) -> Dict[str, Any]:
|
||||
"""Runs a single component from its compiled PipelineSpec.
|
||||
|
||||
Args:
|
||||
pipeline_spec: The PipelineSpec of the component to run.
|
||||
arguments: The runtime arguments.
|
||||
|
||||
Returns:
|
||||
A LocalTask instance.
|
||||
"""
|
||||
# TODO: implement and return outputs
|
||||
return {}
|
||||
Loading…
Reference in New Issue