chore(sdk): refactor python component executor (#9990)
This commit is contained in:
parent
f8f01bcd08
commit
d001b8055f
|
|
@ -14,69 +14,76 @@
|
|||
import inspect
|
||||
import json
|
||||
import os
|
||||
import re
|
||||
from typing import Any, Callable, Dict, List, Optional, Union
|
||||
|
||||
from kfp import dsl
|
||||
from kfp.dsl import task_final_status
|
||||
from kfp.dsl.types import artifact_types
|
||||
from kfp.dsl.types import type_annotations
|
||||
|
||||
|
||||
class Executor():
|
||||
"""Executor executes v2-based Python function components."""
|
||||
class Executor:
|
||||
"""Executor executes Python function components."""
|
||||
|
||||
def __init__(
|
||||
self, executor_input: Dict,
|
||||
self,
|
||||
executor_input: Dict,
|
||||
function_to_execute: Union[Callable,
|
||||
'python_component.PythonComponent']):
|
||||
'python_component.PythonComponent'],
|
||||
):
|
||||
|
||||
if hasattr(function_to_execute, 'python_func'):
|
||||
self._func = function_to_execute.python_func
|
||||
self.func = function_to_execute.python_func
|
||||
else:
|
||||
self._func = function_to_execute
|
||||
self.func = function_to_execute
|
||||
|
||||
self._input = executor_input
|
||||
self._input_artifacts: Dict[str,
|
||||
Union[artifact_types.Artifact,
|
||||
List[artifact_types.Artifact]]] = {}
|
||||
self._output_artifacts: Dict[str, artifact_types.Artifact] = {}
|
||||
self.executor_input = executor_input
|
||||
self.input_artifacts: Dict[str, Union[dsl.Artifact,
|
||||
List[dsl.Artifact]]] = {}
|
||||
self.output_artifacts: Dict[str, dsl.Artifact] = {}
|
||||
self.assign_input_and_output_artifacts()
|
||||
|
||||
for name, artifacts in self._input.get('inputs',
|
||||
{}).get('artifacts', {}).items():
|
||||
self.return_annotation = inspect.signature(self.func).return_annotation
|
||||
self.excutor_output = {}
|
||||
|
||||
def assign_input_and_output_artifacts(self) -> None:
|
||||
for name, artifacts in self.executor_input.get('inputs',
|
||||
{}).get('artifacts',
|
||||
{}).items():
|
||||
list_of_artifact_proto_structs = artifacts.get('artifacts')
|
||||
if list_of_artifact_proto_structs:
|
||||
annotation = self._func.__annotations__[name]
|
||||
annotation = self.func.__annotations__[name]
|
||||
# InputPath has no attribute __origin__ and also should be handled as a single artifact
|
||||
if type_annotations.is_Input_Output_artifact_annotation(
|
||||
annotation) and type_annotations.is_list_of_artifacts(
|
||||
annotation.__origin__):
|
||||
self._input_artifacts[name] = [
|
||||
self.input_artifacts[name] = [
|
||||
self.make_artifact(
|
||||
msg,
|
||||
name,
|
||||
self._func,
|
||||
self.func,
|
||||
) for msg in list_of_artifact_proto_structs
|
||||
]
|
||||
else:
|
||||
self._input_artifacts[name] = self.make_artifact(
|
||||
self.input_artifacts[name] = self.make_artifact(
|
||||
list_of_artifact_proto_structs[0],
|
||||
name,
|
||||
self._func,
|
||||
self.func,
|
||||
)
|
||||
|
||||
for name, artifacts in self._input.get('outputs',
|
||||
{}).get('artifacts', {}).items():
|
||||
for name, artifacts in self.executor_input.get('outputs',
|
||||
{}).get('artifacts',
|
||||
{}).items():
|
||||
list_of_artifact_proto_structs = artifacts.get('artifacts')
|
||||
if list_of_artifact_proto_structs:
|
||||
output_artifact = self.make_artifact(
|
||||
list_of_artifact_proto_structs[0],
|
||||
name,
|
||||
self._func,
|
||||
self.func,
|
||||
)
|
||||
self._output_artifacts[name] = output_artifact
|
||||
self.makedirs_recursively(output_artifact.path)
|
||||
|
||||
self._return_annotation = inspect.signature(
|
||||
self._func).return_annotation
|
||||
self._executor_output = {}
|
||||
self.output_artifacts[name] = output_artifact
|
||||
makedirs_recursively(output_artifact.path)
|
||||
|
||||
def make_artifact(
|
||||
self,
|
||||
|
|
@ -99,56 +106,51 @@ class Executor():
|
|||
return create_artifact_instance(
|
||||
runtime_artifact, artifact_cls=artifact_cls)
|
||||
|
||||
def makedirs_recursively(self, path: str) -> None:
|
||||
os.makedirs(os.path.dirname(path), exist_ok=True)
|
||||
def get_input_artifact(self, name: str) -> Optional[dsl.Artifact]:
|
||||
return self.input_artifacts.get(name)
|
||||
|
||||
def _get_input_artifact(self, name: str):
|
||||
return self._input_artifacts.get(name)
|
||||
def get_output_artifact(self, name: str) -> Optional[dsl.Artifact]:
|
||||
return self.output_artifacts.get(name)
|
||||
|
||||
def _get_output_artifact(self, name: str):
|
||||
return self._output_artifacts.get(name)
|
||||
|
||||
def _get_input_parameter_value(self, parameter_name: str):
|
||||
parameter_values = self._input.get('inputs',
|
||||
{}).get('parameterValues', None)
|
||||
def get_input_parameter_value(self, parameter_name: str) -> Optional[str]:
|
||||
parameter_values = self.executor_input.get('inputs', {}).get(
|
||||
'parameterValues', None)
|
||||
|
||||
if parameter_values is not None:
|
||||
return parameter_values.get(parameter_name, None)
|
||||
|
||||
return None
|
||||
|
||||
def _get_output_parameter_path(self, parameter_name: str):
|
||||
parameter = self._input.get('outputs',
|
||||
{}).get('parameters',
|
||||
{}).get(parameter_name, None)
|
||||
def get_output_parameter_path(self, parameter_name: str) -> Optional[str]:
|
||||
parameter = self.executor_input.get('outputs', {}).get(
|
||||
'parameters', {}).get(parameter_name, None)
|
||||
if parameter is None:
|
||||
return None
|
||||
|
||||
import os
|
||||
path = parameter.get('outputFile', None)
|
||||
if path:
|
||||
os.makedirs(os.path.dirname(path), exist_ok=True)
|
||||
return path
|
||||
|
||||
def _get_output_artifact_path(self, artifact_name: str):
|
||||
output_artifact = self._output_artifacts.get(artifact_name)
|
||||
def get_output_artifact_path(self, artifact_name: str) -> str:
|
||||
output_artifact = self.output_artifacts.get(artifact_name)
|
||||
if not output_artifact:
|
||||
raise ValueError(
|
||||
f'Failed to get output artifact path for artifact name {artifact_name}'
|
||||
)
|
||||
return output_artifact.path
|
||||
|
||||
def _get_input_artifact_path(self, artifact_name: str):
|
||||
input_artifact = self._input_artifacts.get(artifact_name)
|
||||
def get_input_artifact_path(self, artifact_name: str) -> str:
|
||||
input_artifact = self.input_artifacts.get(artifact_name)
|
||||
if not input_artifact:
|
||||
raise ValueError(
|
||||
f'Failed to get input artifact path for artifact name {artifact_name}'
|
||||
)
|
||||
return input_artifact.path
|
||||
|
||||
def _write_output_parameter_value(self, name: str,
|
||||
value: Union[str, int, float, bool, dict,
|
||||
list, Dict, List]):
|
||||
def write_output_parameter_value(
|
||||
self, name: str, value: Union[str, int, float, bool, dict, list,
|
||||
Dict, List]) -> None:
|
||||
if isinstance(value, (float, int)):
|
||||
output = str(value)
|
||||
elif isinstance(value, str):
|
||||
|
|
@ -161,66 +163,19 @@ class Executor():
|
|||
f'Unable to serialize unknown type `{value}` for parameter input with value `{type(value)}`'
|
||||
)
|
||||
|
||||
if not self._executor_output.get('parameterValues'):
|
||||
self._executor_output['parameterValues'] = {}
|
||||
if not self.excutor_output.get('parameterValues'):
|
||||
self.excutor_output['parameterValues'] = {}
|
||||
|
||||
self._executor_output['parameterValues'][name] = value
|
||||
self.excutor_output['parameterValues'][name] = value
|
||||
|
||||
def _write_output_artifact_payload(self, name: str, value: Any):
|
||||
path = self._get_output_artifact_path(name)
|
||||
def write_output_artifact_payload(self, name: str, value: Any) -> None:
|
||||
path = self.get_output_artifact_path(name)
|
||||
with open(path, 'w') as f:
|
||||
f.write(str(value))
|
||||
|
||||
# TODO: extract to a util
|
||||
@classmethod
|
||||
def _get_short_type_name(cls, type_name: str) -> str:
|
||||
"""Extracts the short form type name.
|
||||
|
||||
This method is used for looking up serializer for a given type.
|
||||
|
||||
For example:
|
||||
typing.List -> List
|
||||
typing.List[int] -> List
|
||||
typing.Dict[str, str] -> Dict
|
||||
List -> List
|
||||
str -> str
|
||||
|
||||
Args:
|
||||
type_name: The original type name.
|
||||
|
||||
Returns:
|
||||
The short form type name or the original name if pattern doesn't match.
|
||||
"""
|
||||
import re
|
||||
match = re.match('(typing\.)?(?P<type>\w+)(?:\[.+\])?', type_name)
|
||||
return match.group('type') if match else type_name
|
||||
|
||||
# TODO: merge with type_utils.is_parameter_type
|
||||
@classmethod
|
||||
def _is_parameter(cls, annotation: Any) -> bool:
|
||||
if type(annotation) == type:
|
||||
return annotation in [str, int, float, bool, dict, list]
|
||||
|
||||
# Annotation could be, for instance `typing.Dict[str, str]`, etc.
|
||||
return cls._get_short_type_name(str(annotation)) in ['Dict', 'List']
|
||||
|
||||
@classmethod
|
||||
def _is_artifact(cls, annotation: Any) -> bool:
|
||||
if type(annotation) == type:
|
||||
return type_annotations.is_artifact_class(annotation)
|
||||
return False
|
||||
|
||||
@classmethod
|
||||
def _is_named_tuple(cls, annotation: Any) -> bool:
|
||||
if type(annotation) == type:
|
||||
return issubclass(annotation, tuple) and hasattr(
|
||||
annotation, '_fields') and hasattr(annotation,
|
||||
'__annotations__')
|
||||
return False
|
||||
|
||||
def _handle_single_return_value(self, output_name: str,
|
||||
annotation_type: Any, return_value: Any):
|
||||
if self._is_parameter(annotation_type):
|
||||
def handle_single_return_value(self, output_name: str, annotation_type: Any,
|
||||
return_value: Any) -> None:
|
||||
if is_parameter(annotation_type):
|
||||
origin_type = getattr(annotation_type, '__origin__',
|
||||
None) or annotation_type
|
||||
# relax float-typed return to allow both int and float.
|
||||
|
|
@ -231,19 +186,19 @@ class Executor():
|
|||
accepted_types = origin_type
|
||||
if not isinstance(return_value, accepted_types):
|
||||
raise ValueError(
|
||||
f'Function `{self._func.__name__}` returned value of type {type(return_value)}; want type {origin_type}'
|
||||
f'Function `{self.func.__name__}` returned value of type {type(return_value)}; want type {origin_type}'
|
||||
)
|
||||
self._write_output_parameter_value(output_name, return_value)
|
||||
elif self._is_artifact(annotation_type):
|
||||
self._write_output_artifact_payload(output_name, return_value)
|
||||
self.write_output_parameter_value(output_name, return_value)
|
||||
elif is_artifact(annotation_type):
|
||||
self.write_output_artifact_payload(output_name, return_value)
|
||||
else:
|
||||
raise RuntimeError(
|
||||
f'Unknown return type: {annotation_type}. Must be one of the supported data types: https://www.kubeflow.org/docs/components/pipelines/v2/data-types/'
|
||||
)
|
||||
|
||||
def _write_executor_output(self,
|
||||
func_output: Optional[Any] = None
|
||||
) -> Optional[str]:
|
||||
def write_executor_output(self,
|
||||
func_output: Optional[Any] = None
|
||||
) -> Optional[str]:
|
||||
"""Writes executor output containing the Python function output. The
|
||||
executor output file will not be written if this code is executed from
|
||||
a non-chief node in a mirrored execution strategy.
|
||||
|
|
@ -254,10 +209,10 @@ class Executor():
|
|||
Returns:
|
||||
Optional[str]: Returns the location of the executor_output file as a string if the file is written. Else, None.
|
||||
"""
|
||||
if self._output_artifacts:
|
||||
self._executor_output['artifacts'] = {}
|
||||
if self.output_artifacts:
|
||||
self.excutor_output['artifacts'] = {}
|
||||
|
||||
for name, artifact in self._output_artifacts.items():
|
||||
for name, artifact in self.output_artifacts.items():
|
||||
runtime_artifact = {
|
||||
'name': artifact.name,
|
||||
'uri': artifact.uri,
|
||||
|
|
@ -265,32 +220,32 @@ class Executor():
|
|||
}
|
||||
artifacts_list = {'artifacts': [runtime_artifact]}
|
||||
|
||||
self._executor_output['artifacts'][name] = artifacts_list
|
||||
self.excutor_output['artifacts'][name] = artifacts_list
|
||||
|
||||
if func_output is not None:
|
||||
if self._is_parameter(self._return_annotation) or self._is_artifact(
|
||||
self._return_annotation):
|
||||
if is_parameter(self.return_annotation) or is_artifact(
|
||||
self.return_annotation):
|
||||
# Note: single output is named `Output` in component.yaml.
|
||||
self._handle_single_return_value('Output',
|
||||
self._return_annotation,
|
||||
func_output)
|
||||
elif self._is_named_tuple(self._return_annotation):
|
||||
if len(self._return_annotation._fields) != len(func_output):
|
||||
self.handle_single_return_value('Output',
|
||||
self.return_annotation,
|
||||
func_output)
|
||||
elif is_named_tuple(self.return_annotation):
|
||||
if len(self.return_annotation._fields) != len(func_output):
|
||||
raise RuntimeError(
|
||||
f'Expected {len(self._return_annotation._fields)} return values from function `{self._func.__name__}`, got {len(func_output)}'
|
||||
f'Expected {len(self.return_annotation._fields)} return values from function `{self.func.__name__}`, got {len(func_output)}'
|
||||
)
|
||||
for i in range(len(self._return_annotation._fields)):
|
||||
field = self._return_annotation._fields[i]
|
||||
field_type = self._return_annotation.__annotations__[field]
|
||||
for i in range(len(self.return_annotation._fields)):
|
||||
field = self.return_annotation._fields[i]
|
||||
field_type = self.return_annotation.__annotations__[field]
|
||||
if type(func_output) == tuple:
|
||||
field_value = func_output[i]
|
||||
else:
|
||||
field_value = getattr(func_output, field)
|
||||
self._handle_single_return_value(field, field_type,
|
||||
field_value)
|
||||
self.handle_single_return_value(field, field_type,
|
||||
field_value)
|
||||
else:
|
||||
raise RuntimeError(
|
||||
f'Unknown return type: {self._return_annotation}. Must be one of `str`, `int`, `float`, a subclass of `Artifact`, or a NamedTuple collection of these types.'
|
||||
f'Unknown return type: {self.return_annotation}. Must be one of `str`, `int`, `float`, a subclass of `Artifact`, or a NamedTuple collection of these types.'
|
||||
)
|
||||
|
||||
# This check is to ensure only one worker (in a mirrored, distributed training/compute strategy) attempts to write to the same executor output file at the same time using gcsfuse, which enforces immutability of files.
|
||||
|
|
@ -304,10 +259,10 @@ class Executor():
|
|||
write_file = cluster_spec['task']['type'] in CHIEF_NODE_LABELS
|
||||
|
||||
if write_file:
|
||||
executor_output_path = self._input['outputs']['outputFile']
|
||||
executor_output_path = self.executor_input['outputs']['outputFile']
|
||||
os.makedirs(os.path.dirname(executor_output_path), exist_ok=True)
|
||||
with open(executor_output_path, 'w') as f:
|
||||
f.write(json.dumps(self._executor_output))
|
||||
f.write(json.dumps(self.excutor_output))
|
||||
return executor_output_path
|
||||
|
||||
return None
|
||||
|
|
@ -320,7 +275,7 @@ class Executor():
|
|||
Returns:
|
||||
Optional[str]: Returns the location of the executor_output file as a string if the file is written. Else, None.
|
||||
"""
|
||||
annotations = inspect.getfullargspec(self._func).annotations
|
||||
annotations = inspect.getfullargspec(self.func).annotations
|
||||
|
||||
# Function arguments.
|
||||
func_kwargs = {}
|
||||
|
|
@ -335,7 +290,7 @@ class Executor():
|
|||
v = type_annotations.maybe_strip_optional_from_annotation(v)
|
||||
|
||||
if v == task_final_status.PipelineTaskFinalStatus:
|
||||
value = self._get_input_parameter_value(k)
|
||||
value = self.get_input_parameter_value(k)
|
||||
func_kwargs[k] = task_final_status.PipelineTaskFinalStatus(
|
||||
state=value.get('state'),
|
||||
pipeline_job_resource_name=value.get(
|
||||
|
|
@ -345,33 +300,33 @@ class Executor():
|
|||
error_message=value.get('error').get('message', None),
|
||||
)
|
||||
|
||||
elif self._is_parameter(v):
|
||||
value = self._get_input_parameter_value(k)
|
||||
elif is_parameter(v):
|
||||
value = self.get_input_parameter_value(k)
|
||||
if value is not None:
|
||||
func_kwargs[k] = value
|
||||
|
||||
elif type_annotations.is_Input_Output_artifact_annotation(v):
|
||||
if type_annotations.is_input_artifact(v):
|
||||
func_kwargs[k] = self._get_input_artifact(k)
|
||||
func_kwargs[k] = self.get_input_artifact(k)
|
||||
if type_annotations.is_output_artifact(v):
|
||||
func_kwargs[k] = self._get_output_artifact(k)
|
||||
func_kwargs[k] = self.get_output_artifact(k)
|
||||
|
||||
elif isinstance(v, type_annotations.OutputPath):
|
||||
if self._is_parameter(v.type):
|
||||
func_kwargs[k] = self._get_output_parameter_path(k)
|
||||
if is_parameter(v.type):
|
||||
func_kwargs[k] = self.get_output_parameter_path(k)
|
||||
else:
|
||||
func_kwargs[k] = self._get_output_artifact_path(k)
|
||||
func_kwargs[k] = self.get_output_artifact_path(k)
|
||||
|
||||
elif isinstance(v, type_annotations.InputPath):
|
||||
func_kwargs[k] = self._get_input_artifact_path(k)
|
||||
func_kwargs[k] = self.get_input_artifact_path(k)
|
||||
|
||||
result = self._func(**func_kwargs)
|
||||
return self._write_executor_output(result)
|
||||
result = self.func(**func_kwargs)
|
||||
return self.write_executor_output(result)
|
||||
|
||||
|
||||
def create_artifact_instance(
|
||||
runtime_artifact: Dict,
|
||||
artifact_cls=artifact_types.Artifact,
|
||||
artifact_cls=dsl.Artifact,
|
||||
) -> type:
|
||||
"""Creates an artifact class instances from a runtime artifact
|
||||
dictionary."""
|
||||
|
|
@ -388,3 +343,51 @@ def create_artifact_instance(
|
|||
name=runtime_artifact.get('name', ''),
|
||||
metadata=runtime_artifact.get('metadata', {}),
|
||||
)
|
||||
|
||||
|
||||
def get_short_type_name(type_name: str) -> str:
|
||||
"""Extracts the short form type name.
|
||||
|
||||
This method is used for looking up serializer for a given type.
|
||||
|
||||
For example:
|
||||
typing.List -> List
|
||||
typing.List[int] -> List
|
||||
typing.Dict[str, str] -> Dict
|
||||
List -> List
|
||||
str -> str
|
||||
|
||||
Args:
|
||||
type_name: The original type name.
|
||||
|
||||
Returns:
|
||||
The short form type name or the original name if pattern doesn't match.
|
||||
"""
|
||||
match = re.match('(typing\.)?(?P<type>\w+)(?:\[.+\])?', type_name)
|
||||
return match['type'] if match else type_name
|
||||
|
||||
|
||||
# TODO: merge with type_utils.is_parameter_type
|
||||
def is_parameter(annotation: Any) -> bool:
|
||||
if type(annotation) == type:
|
||||
return annotation in [str, int, float, bool, dict, list]
|
||||
|
||||
# Annotation could be, for instance `typing.Dict[str, str]`, etc.
|
||||
return get_short_type_name(str(annotation)) in ['Dict', 'List']
|
||||
|
||||
|
||||
def is_artifact(annotation: Any) -> bool:
|
||||
if type(annotation) == type:
|
||||
return type_annotations.is_artifact_class(annotation)
|
||||
return False
|
||||
|
||||
|
||||
def is_named_tuple(annotation: Any) -> bool:
|
||||
if type(annotation) == type:
|
||||
return issubclass(annotation, tuple) and hasattr(
|
||||
annotation, '_fields') and hasattr(annotation, '__annotations__')
|
||||
return False
|
||||
|
||||
|
||||
def makedirs_recursively(path: str) -> None:
|
||||
os.makedirs(os.path.dirname(path), exist_ok=True)
|
||||
|
|
|
|||
Loading…
Reference in New Issue