# Copyright 2021 The Kubeflow Authors # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. import inspect import itertools import re import textwrap from typing import Callable, Dict, List, Mapping, Optional, TypeVar import warnings import docstring_parser from kfp import components as v1_components from kfp.components import _components, _data_passing, structures, type_annotation_utils from kfp.v2.components.types import artifact_types, type_annotations _DEFAULT_BASE_IMAGE = 'python:3.7' def _python_function_name_to_component_name(name): name_with_spaces = re.sub(' +', ' ', name.replace('_', ' ')).strip(' ') return name_with_spaces[0].upper() + name_with_spaces[1:] def _get_packages_to_install_command( package_list: Optional[List[str]] = None) -> List[str]: result = [] if package_list: install_pip_command = 'python3 -m ensurepip' install_packages_command = ( 'PIP_DISABLE_PIP_VERSION_CHECK=1 python3 -m pip install --quiet \ --no-warn-script-location {}').format(' '.join( [repr(str(package)) for package in package_list])) result = [ 'sh', '-c', '({install_pip} || {install_pip} --user) &&' ' ({install_packages} || {install_packages} --user) && "$0" "$@"' .format( install_pip=install_pip_command, install_packages=install_packages_command) ] return result def _get_default_kfp_package_path() -> str: import kfp return 'kfp=={}'.format(kfp.__version__) def _get_function_source_definition(func: Callable) -> str: func_code = inspect.getsource(func) # Function might be defined in some indented scope (e.g. in another # function). We need to handle this and properly dedent the function source # code func_code = textwrap.dedent(func_code) func_code_lines = func_code.split('\n') # Removing possible decorators (can be multiline) until the function # definition is found func_code_lines = itertools.dropwhile(lambda x: not x.startswith('def'), func_code_lines) if not func_code_lines: raise ValueError( 'Failed to dedent and clean up the source of function "{}". ' 'It is probably not properly indented.'.format(func.__name__)) return '\n'.join(func_code_lines) def _annotation_to_type_struct(annotation): if not annotation or annotation == inspect.Parameter.empty: return None if hasattr(annotation, 'to_dict'): annotation = annotation.to_dict() if isinstance(annotation, dict): return annotation if isinstance(annotation, type): type_struct = _data_passing.get_canonical_type_name_for_type(annotation) if type_struct: return type_struct type_name = str(annotation.__name__) elif hasattr( annotation, '__forward_arg__' ): # Handling typing.ForwardRef('Type_name') (the name was _ForwardRef in python 3.5-3.6) type_name = str(annotation.__forward_arg__) else: type_name = str(annotation) # It's also possible to get the converter by type name type_struct = _data_passing.get_canonical_type_name_for_type(type_name) if type_struct: return type_struct return type_name def _maybe_make_unique(name: str, names: List[str]): if name not in names: return name for i in range(2, 100): unique_name = '{}_{}'.format(name, i) if unique_name not in names: return unique_name raise RuntimeError('Too many arguments with the name {}'.format(name)) # TODO(KFPv2): Replace with v2 ComponentSpec. def _func_to_component_spec( func: Callable, base_image: Optional[str] = None, packages_to_install: Optional[List[str]] = None, install_kfp_package: bool = True, kfp_package_path: Optional[str] = None) -> structures.ComponentSpec: decorator_base_image = getattr(func, '_component_base_image', None) if decorator_base_image is not None: if base_image is not None and decorator_base_image != base_image: raise ValueError( 'base_image ({}) conflicts with the decorator-specified base image metadata ({})' .format(base_image, decorator_base_image)) else: base_image = decorator_base_image else: if base_image is None: base_image = _DEFAULT_BASE_IMAGE if isinstance(base_image, Callable): base_image = base_image() imports_source = [ "from kfp.v2.dsl import *", "from typing import *", ] func_source = _get_function_source_definition(func) source = textwrap.dedent(""" {imports_source} {func_source}\n""").format( imports_source='\n'.join(imports_source), func_source=func_source) packages_to_install = packages_to_install or [] if install_kfp_package: if kfp_package_path is None: kfp_package_path = _get_default_kfp_package_path() packages_to_install.append(kfp_package_path) packages_to_install_command = _get_packages_to_install_command( package_list=packages_to_install) from kfp.components._structures import ExecutorInputPlaceholder component_spec = extract_component_interface(func) component_spec.implementation = structures.ContainerImplementation( container=structures.ContainerSpec( image=base_image, command=packages_to_install_command + [ 'sh', '-ec', textwrap.dedent('''\ program_path=$(mktemp -d) printf "%s" "$0" > "$program_path/ephemeral_component.py" python3 -m kfp.v2.components.executor_main \ --component_module_path \ "$program_path/ephemeral_component.py" \ "$@" '''), source, ], args=[ "--executor_input", ExecutorInputPlaceholder(), "--function_to_execute", func.__name__, ])) return component_spec def extract_component_interface(func: Callable) -> structures.ComponentSpec: single_output_name_const = 'Output' signature = inspect.signature(func) parameters = list(signature.parameters.values()) parsed_docstring = docstring_parser.parse(inspect.getdoc(func)) doc_dict = {p.arg_name: p.description for p in parsed_docstring.params} inputs = [] outputs = [] input_names = set() output_names = set() for parameter in parameters: parameter_type = type_annotation_utils.maybe_strip_optional_from_annotation( parameter.annotation) passing_style = None io_name = parameter.name if type_annotations.is_artifact_annotation(parameter_type): # passing_style is either type_annotations.InputAnnotation or # type_annotations.OutputAnnotation. passing_style = type_annotations.get_io_artifact_annotation( parameter_type) # parameter_type is type_annotations.Artifact or one of its subclasses. parameter_type = type_annotations.get_io_artifact_class( parameter_type) if not issubclass(parameter_type, artifact_types.Artifact): raise ValueError( 'Input[T] and Output[T] are only supported when T is a ' 'subclass of Artifact. Found `{} with type {}`'.format( io_name, parameter_type)) if parameter.default is not inspect.Parameter.empty: raise ValueError( 'Default values for Input/Output artifacts are not supported.' ) elif isinstance(parameter_type, (v1_components.InputPath, v1_components.OutputPath)): raise TypeError( 'In v2 components, please import the Python function' ' annotations `InputPath` and `OutputPath` from' ' package `kfp.v2.dsl` instead of `kfp.dsl`.') elif isinstance( parameter_type, (type_annotations.InputPath, type_annotations.OutputPath)): passing_style = type(parameter_type) parameter_type = parameter_type.type if parameter.default is not inspect.Parameter.empty and not ( passing_style == type_annotations.InputPath and parameter.default is None): raise ValueError( 'Path inputs only support default values of None. Default values for outputs are not supported.' ) type_struct = _annotation_to_type_struct(parameter_type) if passing_style in [ type_annotations.OutputAnnotation, type_annotations.OutputPath ]: io_name = _maybe_make_unique(io_name, output_names) output_names.add(io_name) output_spec = structures.OutputSpec( name=io_name, type=type_struct, description=doc_dict.get(parameter.name)) output_spec._passing_style = passing_style output_spec._parameter_name = parameter.name outputs.append(output_spec) else: io_name = _maybe_make_unique(io_name, input_names) input_names.add(io_name) input_spec = structures.InputSpec( name=io_name, type=type_struct, description=doc_dict.get(parameter.name)) if parameter.default is not inspect.Parameter.empty: input_spec.optional = True if parameter.default is not None: outer_type_name = list(type_struct.keys())[0] if isinstance( type_struct, dict) else type_struct try: input_spec.default = _data_passing.serialize_value( parameter.default, outer_type_name) except Exception as ex: warnings.warn( 'Could not serialize the default value of the parameter "{}". {}' .format(parameter.name, ex)) input_spec._passing_style = passing_style input_spec._parameter_name = parameter.name inputs.append(input_spec) #Analyzing the return type annotations. return_ann = signature.return_annotation if hasattr(return_ann, '_fields'): #NamedTuple # Getting field type annotations. # __annotations__ does not exist in python 3.5 and earlier # _field_types does not exist in python 3.9 and later field_annotations = getattr(return_ann, '__annotations__', None) or getattr( return_ann, '_field_types', None) for field_name in return_ann._fields: type_struct = None if field_annotations: type_struct = _annotation_to_type_struct( field_annotations.get(field_name, None)) output_name = _maybe_make_unique(field_name, output_names) output_names.add(output_name) output_spec = structures.OutputSpec( name=output_name, type=type_struct, ) output_spec._passing_style = None output_spec._return_tuple_field_name = field_name outputs.append(output_spec) # Deprecated dict-based way of declaring multiple outputs. Was only used by the @component decorator elif isinstance(return_ann, dict): warnings.warn( "The ability to specify multiple outputs using the dict syntax has been deprecated." "It will be removed soon after release 0.1.32." "Please use typing.NamedTuple to declare multiple outputs.") for output_name, output_type_annotation in return_ann.items(): output_type_struct = _annotation_to_type_struct( output_type_annotation) output_spec = structures.OutputSpec( name=output_name, type=output_type_struct, ) outputs.append(output_spec) elif signature.return_annotation is not None and signature.return_annotation != inspect.Parameter.empty: output_name = _maybe_make_unique(single_output_name_const, output_names) # Fixes exotic, but possible collision: `def func(output_path: OutputPath()) -> str: ...` output_names.add(output_name) type_struct = _annotation_to_type_struct(signature.return_annotation) output_spec = structures.OutputSpec( name=output_name, type=type_struct, ) output_spec._passing_style = None outputs.append(output_spec) # Component name and description are derived from the function's name and docstring. # The name can be overridden by setting setting func.__name__ attribute (of the legacy func._component_human_name attribute). # The description can be overridden by setting the func.__doc__ attribute (or the legacy func._component_description attribute). component_name = getattr(func, '_component_human_name', None) or _python_function_name_to_component_name( func.__name__) description = getattr(func, '_component_description', None) or parsed_docstring.short_description if description: description = description.strip() component_spec = structures.ComponentSpec( name=component_name, description=description, inputs=inputs if inputs else None, outputs=outputs if outputs else None, ) return component_spec def create_component_from_func(func: Callable, base_image: Optional[str] = None, packages_to_install: List[str] = None, output_component_file: Optional[str] = None, install_kfp_package: bool = True, kfp_package_path: Optional[str] = None): """Converts a Python function to a v2 lightweight component. A lightweight component is a self-contained Python function that includes all necessary imports and dependencies. Args: func: The python function to create a component from. The function should have type annotations for all its arguments, indicating how it is intended to be used (e.g. as an input/output Artifact object, a plain parameter, or a path to a file). base_image: The image to use when executing |func|. It should contain a default Python interpreter that is compatible with KFP. packages_to_install: A list of optional packages to install before executing |func|. install_kfp_package: Specifies if we should add a KFP Python package to |packages_to_install|. Lightweight Python functions always require an installation of KFP in |base_image| to work. If you specify a |base_image| that already contains KFP, you can set this to False. kfp_package_path: Specifies the location from which to install KFP. By default, this will try to install from PyPi using the same version as that used when this component was created. KFP developers can choose to override this to point to a Github pull request or other pip-compatible location when testing changes to lightweight Python functions. Returns: A component task factory that can be used in pipeline definitions. """ component_spec = _func_to_component_spec( func=func, base_image=base_image, packages_to_install=packages_to_install, install_kfp_package=install_kfp_package, kfp_package_path=kfp_package_path) if output_component_file: component_spec.save(output_component_file) # TODO(KFPv2): Replace with v2 BaseComponent. return _components._create_task_factory_from_component_spec(component_spec)