# Copyright 2021 The Kubeflow Authors # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. """Definitions for component spec.""" import dataclasses import enum import json from typing import Any, Dict, Mapping, Optional, Sequence, Union from kfp.components import _components from kfp.components import structures import pydantic import yaml class InputSpec(pydantic.BaseModel): """Component input definitions. Attributes: type: The type of the input. default: Optional; the default value for the input. """ # TODO(ji-yaqi): Add logic to cast default value into the specified type. type: str default: Optional[Union[str, int, float, bool, dict, list]] = None class OutputSpec(pydantic.BaseModel): """Component output definitions. Attributes: type: The type of the output. """ type: Union[str, int, float, bool, dict, list] class BasePlaceholder(pydantic.BaseModel): """Base class for placeholders that could appear in container cmd and args. Attributes: name: Referencing an input or an output from the component. """ name: str class InputValuePlaceholder(BasePlaceholder): pass class InputPathPlaceholder(BasePlaceholder): pass class InputUriPlaceholder(BasePlaceholder): pass class OutputPathPlaceholder(BasePlaceholder): pass class OutputUriPlaceholder(BasePlaceholder): pass @dataclasses.dataclass class ResourceSpec: """The resource requirements of a container execution. Attributes: cpu_limit: Optional; the limit of the number of vCPU cores. memory_limit: Optional; the memory limit in GB. accelerator_type: Optional; the type of accelerators attached to the container. accelerator_count: Optional; the number of accelerators attached. """ cpu_limit: Optional[float] = None memory_limit: Optional[float] = None accelerator_type: Optional[str] = None accelerator_count: Optional[int] = None class ContainerSpec(pydantic.BaseModel): """Container implementation definition. Attributes: image: The container image. commands: Optional; the container entrypoint. arguments: Optional; the arguments to the container entrypoint. env: Optional; the environment variables to be passed to the container. resources: Optional; the specification on the resource requirements. """ image: str commands: Optional[Sequence[Union[str, BasePlaceholder]]] = None arguments: Optional[Sequence[Union[str, BasePlaceholder]]] = None env: Optional[Mapping[str, Union[str, BasePlaceholder]]] = None resources: Optional[ResourceSpec] = None @dataclasses.dataclass class ImporterSpec: """ImporterSpec definition. Attributes: artifact_uri: The URI of the artifact. type_schema: The type of the artifact. reimport: Whether or not import an artifact regardless it has been imported before. metadata: Optional; the properties of the artifact. """ artifact_uri: str type_schema: str reimport: bool metadata: Optional[Mapping[str, Any]] = None @dataclasses.dataclass class TaskSpec: """The spec of a pipeline task. Attributes: name: The name of the task. inputs: The sources of task inputs. Constant values or PipelineParams. dependent_tasks: The list of upstream tasks. enable_caching: Whether or not to enable caching for the task. component_ref: The name of a component spec this task is based on. trigger_condition: Optional; an expression which will be evaluated into a boolean value. True to trigger the task to run. trigger_strategy: Optional; when the task will be ready to be triggered. Valid values include: "TRIGGER_STRATEGY_UNSPECIFIED", "ALL_UPSTREAM_TASKS_SUCCEEDED", and "ALL_UPSTREAM_TASKS_COMPLETED". iterator_items: Optional; the items to iterate on. A constant value or a PipelineParam. iterator_item_input: Optional; the name of the input which has the item from the [items][] collection. """ name: str inputs: Mapping[str, Any] dependent_tasks: Sequence[str] enable_caching: bool component_ref: str trigger_condition: Optional[str] = None trigger_strategy: Optional[str] = None iterator_items: Optional[Any] = None iterator_item_input: Optional[str] = None @dataclasses.dataclass class DagSpec: """DAG(graph) implementation definition. Attributes: tasks: The tasks inside the DAG. outputs: Defines how the outputs of the dag are linked to the sub tasks. """ tasks: Mapping[str, TaskSpec] # TODO(chensun): revisit if we need a DagOutputsSpec class. outputs: Mapping[str, Any] class SchemaVersion(str, enum.Enum): V1 = 'v1' V2 = 'v2' class ComponentSpec(pydantic.BaseModel): """The definition of a component. Attributes: name: The name of the component. implementation: The implementation of the component. Either an executor (container, importer) or a DAG consists of other components. inputs: Optional; the input definitions of the component. outputs: Optional; the output definitions of the component. description: Optional; the description of the component. annotations: Optional; the annotations of the component as key-value pairs. labels: Optional; the labels of the component as key-value pairs. schema_version: Internal field for tracking component version. """ name: str description: Optional[str] = None implementation: Union[ContainerSpec, ImporterSpec, DagSpec] inputs: Optional[Dict[str, InputSpec]] = None outputs: Optional[Dict[str, OutputSpec]] = None description: Optional[str] = None annotations: Optional[Mapping[str, str]] = None labels: Optional[Mapping[str, str]] = None schema_version: SchemaVersion = SchemaVersion.V2 def _validate_placeholders( self, implementation: Union[ContainerSpec, ImporterSpec, DagSpec], ) -> Union[ContainerSpec, ImporterSpec, DagSpec]: """Validates placeholders reference existing input/output names. Args: implementation: The component implementation spec. Returns: The original component implementation spec if no validation error. Raises: ValueError: if any placeholder references a non-existing input or output. TypeError: if any argument is neither a str nor a placeholder instance. """ if not isinstance(implementation, ContainerSpec): return implementation input_names = [input_spec.name for input_spec in self.input_specs or []] output_names = [ output_spec.name for output_spec in self.output_specs or [] ] for arg in [ *(implementation.commands or []), *(implementation.arguments or []) ]: if isinstance(arg, (InputValuePlaceholder, InputPathPlaceholder, InputUriPlaceholder)): if arg.name not in input_names: raise ValueError( f'Argument "{arg}" references non-existing input.') elif isinstance(arg, (OutputPathPlaceholder, OutputUriPlaceholder)): if arg.name not in output_names: raise ValueError( f'Argument "{arg}" references non-existing output.') # TODO(chensun): revisit if we need to support IfPlaceholder, # ConcatPlaceholder, etc. in the new format. elif not isinstance(arg, str): raise TypeError(f'Unexpected argument "{arg}".') return implementation @classmethod def from_v1_component_spec( cls, v1_component_spec: structures.ComponentSpec) -> 'ComponentSpec': """Converts V1 ComponentSpec to V2 ComponentSpec. Args: v1_component_spec: The V1 ComponentSpec. Returns: Component spec in the form of V2 ComponentSpec. Raises: ValueError: If implementation is not found. TypeError: if any argument is neither a str nor Dict. """ component_dict = v1_component_spec.to_dict() if component_dict.get('implementation') is None: raise ValueError('Implementation field not found') if 'container' not in component_dict.get('implementation'): raise NotImplementedError def _transform_arg( arg: Union[str, Dict[str, str]]) -> Union[str, BasePlaceholder]: if isinstance(arg, str): return arg elif 'inputValue' in arg: return InputValuePlaceholder(name=arg['inputValue']) elif 'inputPath' in arg: return InputPathPlaceholder(name=arg['inputPath']) elif 'inputUri' in arg: return InputUriPlaceholder(name=arg['inputUri']) elif 'outputPath' in arg: return OutputPathPlaceholder(name=arg['outputPath']) elif 'outputUri' in arg: return OutputUriPlaceholder(name=arg['outputUri']) else: raise ValueError( f'Unexpected command/argument type: "{arg}" of type "{type(arg)}".' ) implementation = component_dict['implementation']['container'] implementation['commands'] = [ _transform_arg(command) for command in implementation.pop('command', []) ] implementation['arguments'] = [ _transform_arg(command) for command in implementation.pop('args', []) ] implementation['env'] = { key: _transform_arg(command) for key, command in implementation.pop('env', {}).items() } container_spec = ContainerSpec.parse_obj(implementation) return ComponentSpec( name=component_dict.get('name', 'name'), description=component_dict.get('description'), implementation=container_spec, inputs={ spec['name']: InputSpec( type=spec.get('type', 'Artifact'), default=spec.get('default', None)) for spec in component_dict.get('inputs', []) }, outputs={ spec['name']: OutputSpec(type=spec.get('type', 'String')) for spec in component_dict.get('outputs', []) }, schema_version=SchemaVersion.V1) def to_v1_component_spec(self) -> structures.ComponentSpec: """Converts to v1 ComponentSpec. Returns: Component spec in the form of V1 ComponentSpec. Needed until downstream accept new ComponentSpec. """ if isinstance(self.implementation, DagSpec): raise NotImplementedError def _transform_arg(arg: Union[str, BasePlaceholder]) -> Any: if isinstance(arg, str): return arg elif isinstance(arg, InputValuePlaceholder): return structures.InputValuePlaceholder(arg.name) elif isinstance(arg, InputPathPlaceholder): return structures.InputPathPlaceholder(arg.name) elif isinstance(arg, InputUriPlaceholder): return structures.InputUriPlaceholder(arg.name) elif isinstance(arg, OutputPathPlaceholder): return structures.OutputPathPlaceholder(arg.name) elif isinstance(arg, OutputUriPlaceholder): return structures.OutputUriPlaceholder(arg.name) else: # TODO(chensun): transform additional placeholders: if, concat, etc.? raise ValueError( f'Unexpected command/argument type: "{arg}" of type "{type(arg)}".' ) return structures.ComponentSpec( name=self.name, inputs=[ structures.InputSpec( name=name, type=input_spec.type, default=input_spec.default, ) for name, input_spec in self.inputs.items() ], outputs=[ structures.OutputSpec( name=name, type=output_spec.type, ) for name, output_spec in self.outputs.items() ], implementation=structures.ContainerImplementation( container=structures.ContainerSpec( image=self.implementation.image, command=[ _transform_arg(cmd) for cmd in self.implementation.commands or [] ], args=[ _transform_arg(arg) for arg in self.implementation.arguments or [] ], env={ name: _transform_arg(value) for name, value in self.implementation.env or {} }, )), ) @classmethod def load_from_component_yaml(cls, component_yaml: str) -> 'ComponentSpec': """Loads V1 or V2 component yaml into ComponentSpec. Args: component_yaml: the component yaml in string format. Returns: Component spec in the form of V2 ComponentSpec. """ json_component = yaml.safe_load(component_yaml) if 'schema_version' in json_component and json_component[ 'schema_version'] == SchemaVersion.V2: return ComponentSpec.parse_obj(json_component) v1_component = _components._load_component_spec_from_component_text( component_yaml) return cls.from_v1_component_spec(v1_component) def save_to_component_yaml(self, output_file: str) -> None: """Saves ComponentSpec into yaml file. Args: output_file: File path to store the component yaml. """ with open(output_file, 'a') as output_file: json_component = self.json(exclude_none=True) yaml_file = yaml.safe_dump(json.loads(json_component)) output_file.write(yaml_file)