diff --git a/sdk/python/kfp/compiler/compiler.py b/sdk/python/kfp/compiler/compiler.py index 373d55be75..134a0e6813 100644 --- a/sdk/python/kfp/compiler/compiler.py +++ b/sdk/python/kfp/compiler/compiler.py @@ -26,7 +26,8 @@ from ._k8s_helper import K8sHelper from ._op_to_template import _op_to_template from ._default_transformers import add_pod_env -from ..dsl._metadata import ParameterMeta, _extract_pipeline_metadata +from ..components._structures import InputSpec +from ..dsl._metadata import _extract_pipeline_metadata from ..dsl._ops_group import OpsGroup @@ -764,14 +765,15 @@ class Compiler(object): if params_list and pipeline_meta.inputs: raise ValueError('Either specify pipeline params in the pipeline function, or in "params_list", but not both.') - args_list = [] - if pipeline_meta.inputs: - input_types = { - input.name : input.param_type for input in pipeline_meta.inputs } - for arg_name in argspec.args: - arg_type = input_types.get(arg_name, None) - args_list.append(dsl.PipelineParam(K8sHelper.sanitize_k8s_name(arg_name), param_type=arg_type)) + args_list = [] + for arg_name in argspec.args: + arg_type = None + for input in pipeline_meta.inputs or []: + if arg_name == input.name: + arg_type = input.type + break + args_list.append(dsl.PipelineParam(K8sHelper.sanitize_k8s_name(arg_name), param_type=arg_type)) with dsl.Pipeline(pipeline_name) as dsl_pipeline: pipeline_func(*args_list) @@ -780,23 +782,23 @@ class Compiler(object): self._sanitize_and_inject_artifact(dsl_pipeline) # Fill in the default values. + args_list_with_defaults = [] if pipeline_meta.inputs: args_list_with_defaults = [dsl.PipelineParam(K8sHelper.sanitize_k8s_name(arg_name)) for arg_name in argspec.args] if argspec.defaults: for arg, default in zip(reversed(args_list_with_defaults), reversed(argspec.defaults)): arg.value = default.value if isinstance(default, dsl.PipelineParam) else default - else: + elif params_list: # Or, if args are provided by params_list, fill in pipeline_meta. for param in params_list: param.value = default_param_values[param.name] args_list_with_defaults = params_list pipeline_meta.inputs = [ - ParameterMeta( + InputSpec( name=param.name, - description='', - param_type=param.param_type, + type=param.param_type, default=param.value) for param in params_list] op_transformers = [add_pod_env] diff --git a/sdk/python/kfp/components/_dsl_bridge.py b/sdk/python/kfp/components/_dsl_bridge.py index 92aed1735b..89a6d9a7b9 100644 --- a/sdk/python/kfp/components/_dsl_bridge.py +++ b/sdk/python/kfp/components/_dsl_bridge.py @@ -12,11 +12,11 @@ # See the License for the specific language governing permissions and # limitations under the License. +import copy from collections import OrderedDict from typing import Mapping from ._structures import ContainerImplementation, ConcatPlaceholder, IfPlaceholder, InputValuePlaceholder, InputPathPlaceholder, IsPresentPlaceholder, OutputPathPlaceholder, TaskSpec from ._components import _generate_input_file_name, _generate_output_file_name, _default_component_name -from kfp.dsl._metadata import ComponentMeta, ParameterMeta def create_container_op_from_task(task_spec: TaskSpec): argument_values = task_spec.arguments @@ -145,16 +145,6 @@ def _create_container_op_from_resolved_task(name:str, container_image:str, comma output_name_to_kubernetes = generate_unique_name_conversion_table(output_names, _sanitize_python_function_name) output_paths_for_container_op = {output_name_to_kubernetes[name]: path for name, path in output_paths.items()} - # Construct the ComponentMeta - component_meta = ComponentMeta(name=component_spec.name, description=component_spec.description) - # Inputs - if component_spec.inputs is not None: - for input in component_spec.inputs: - component_meta.inputs.append(ParameterMeta(name=input.name, description=input.description, param_type=input.type, default=input.default)) - if component_spec.outputs is not None: - for output in component_spec.outputs: - component_meta.outputs.append(ParameterMeta(name=output.name, description=output.description, param_type=output.type)) - task = dsl.ContainerOp( name=name, image=container_image, @@ -164,6 +154,8 @@ def _create_container_op_from_resolved_task(name:str, container_image:str, comma artifact_argument_paths=[dsl.InputArgumentPath(argument=artifact_arguments[input_name], input=input_name, path=path) for input_name, path in input_paths.items()], ) + component_meta = copy.copy(component_spec) + component_meta.implementation = None task._set_metadata(component_meta) if env: diff --git a/sdk/python/kfp/dsl/_component.py b/sdk/python/kfp/dsl/_component.py index aa618d030e..efbd7ef6bc 100644 --- a/sdk/python/kfp/dsl/_component.py +++ b/sdk/python/kfp/dsl/_component.py @@ -71,18 +71,18 @@ def component(func): if kfp.TYPE_CHECK: arg_index = 0 for arg in args: - if isinstance(arg, PipelineParam) and not check_types(arg.param_type, component_meta.inputs[arg_index].param_type): + if isinstance(arg, PipelineParam) and not check_types(arg.param_type, component_meta.inputs[arg_index].type): raise InconsistentTypeException('Component "' + component_meta.name + '" is expecting ' + component_meta.inputs[arg_index].name + - ' to be type(' + str(component_meta.inputs[arg_index].param_type) + + ' to be type(' + str(component_meta.inputs[arg_index].type) + '), but the passed argument is type(' + str(arg.param_type) + ')') arg_index += 1 if kargs is not None: for key in kargs: if isinstance(kargs[key], PipelineParam): for input_spec in component_meta.inputs: - if input_spec.name == key and not check_types(kargs[key].param_type, input_spec.param_type): + if input_spec.name == key and not check_types(kargs[key].param_type, input_spec.type): raise InconsistentTypeException('Component "' + component_meta.name + '" is expecting ' + input_spec.name + - ' to be type(' + str(input_spec.param_type) + + ' to be type(' + str(input_spec.type) + '), but the passed argument is type(' + str(kargs[key].param_type) + ')') container_op = func(*args, **kargs) diff --git a/sdk/python/kfp/dsl/_container_op.py b/sdk/python/kfp/dsl/_container_op.py index 59089bb3a0..e5245cb4b6 100644 --- a/sdk/python/kfp/dsl/_container_op.py +++ b/sdk/python/kfp/dsl/_container_op.py @@ -25,7 +25,7 @@ from kubernetes.client.models import ( ) from . import _pipeline_param -from ._metadata import ComponentMeta +from ..components._structures import ComponentSpec # generics T = TypeVar('T') @@ -1148,10 +1148,10 @@ class ContainerOp(BaseOp): '''_set_metadata passes the containerop the metadata information and configures the right output Args: - metadata (ComponentMeta): component metadata + metadata (ComponentSpec): component metadata ''' - if not isinstance(metadata, ComponentMeta): - raise ValueError('_set_medata is expecting ComponentMeta.') + if not isinstance(metadata, ComponentSpec): + raise ValueError('_set_metadata is expecting ComponentSpec.') self._metadata = metadata @@ -1160,7 +1160,7 @@ class ContainerOp(BaseOp): output_type = self.outputs[output].param_type for output_meta in self._metadata.outputs: if output_meta.name == output: - output_type = output_meta.param_type + output_type = output_meta.type self.outputs[output].param_type = output_type self.output = None diff --git a/sdk/python/kfp/dsl/_metadata.py b/sdk/python/kfp/dsl/_metadata.py index 2a0891416e..f2a7c028b9 100644 --- a/sdk/python/kfp/dsl/_metadata.py +++ b/sdk/python/kfp/dsl/_metadata.py @@ -12,102 +12,11 @@ # See the License for the specific language governing permissions and # limitations under the License. -from typing import Dict, List -from abc import ABCMeta, abstractmethod import warnings from .types import BaseType, _check_valid_type_dict, _instance_to_dict +from ..components._structures import ComponentSpec, InputSpec, OutputSpec -class BaseMeta(object): - __metaclass__ = ABCMeta - def __init__(self): - pass - - @abstractmethod - def to_dict(self): - pass - - def serialize(self): - import yaml - return yaml.dump(self.to_dict()) - - def __eq__(self, other): - return self.__dict__ == other.__dict__ - - -class ParameterMeta(BaseMeta): - def __init__(self, - name: str, - description: str = '', - param_type = None, - default = None): - self.name = name - self.description = description - self.param_type = param_type - self.default = default - - def to_dict(self): - result = {} - if self.name: - result['name'] = self.name - if self.description: - result['description'] = self.description - if self.param_type: - result['type'] = self.param_type - if self.default: - result['default'] = self.default - return result - - -class ComponentMeta(BaseMeta): - def __init__( - self, - name: str, - description: str = '', - inputs: List[ParameterMeta] = None, - outputs: List[ParameterMeta] = None - ): - self.name = name - self.description = description - self.inputs = [] if inputs is None else inputs - self.outputs = [] if outputs is None else outputs - - def to_dict(self): - result = {} - if self.name: - result['name'] = self.name - if self.description: - result['description'] = self.description - if self.inputs: - result['inputs'] = [input.to_dict() for input in self.inputs] - if self.outputs: - result['outputs'] = [output.to_dict() for output in self.outputs] - return result - - -# Add a pipeline level metadata calss here. -# If one day we combine the component and pipeline yaml, ComponentMeta and PipelineMeta will become one, too. -class PipelineMeta(BaseMeta): - def __init__( - self, - name: str, - description: str = '', - inputs: List[ParameterMeta] = None - ): - self.name = name - self.description = description - self.inputs = [] if inputs is None else inputs - - def to_dict(self): - result = {} - if self.name: - result['name'] = self.name - if self.description: - result['description'] = self.description - if self.inputs: - result['inputs'] = [input.to_dict() for input in self.inputs] - return result - def _annotation_to_typemeta(annotation): '''_annotation_to_type_meta converts an annotation to a type structure Args: @@ -158,21 +67,21 @@ def _extract_component_metadata(func): arg_default = arg_default.value if arg in annotations: arg_type = _annotation_to_typemeta(annotations[arg]) - inputs.append(ParameterMeta(name=arg, param_type=arg_type, default=arg_default)) + inputs.append(InputSpec(name=arg, type=arg_type, default=arg_default)) # Outputs outputs = [] if 'return' in annotations: for output in annotations['return']: arg_type = _annotation_to_typemeta(annotations['return'][output]) - outputs.append(ParameterMeta(name=output, param_type=arg_type)) + outputs.append(OutputSpec(name=output, type=arg_type)) #TODO: add descriptions to the metadata #docstring parser: # https://github.com/rr-/docstring_parser # https://github.com/terrencepreilly/darglint/blob/master/darglint/parse.py - # Construct the ComponentMeta - return ComponentMeta( + # Construct the ComponentSpec + return ComponentSpec( name=func.__name__, inputs=inputs if inputs else None, outputs=outputs if outputs else None, @@ -197,12 +106,8 @@ def _extract_pipeline_metadata(func): for arg, default in zip(reversed(fullargspec.args), reversed(fullargspec.defaults)): arg_defaults[arg] = default - # Construct the PipelineMeta - pipeline_meta = PipelineMeta( - name=getattr(func, '_pipeline_name', func.__name__), - description=getattr(func, '_pipeline_description', func.__doc__) - ) # Inputs + inputs = [] for arg in args: arg_type = None arg_default = arg_defaults[arg] if arg in arg_defaults else None @@ -220,10 +125,17 @@ def _extract_pipeline_metadata(func): # In case the property value for the schema validator is a string instead of a dict. schema_object = json.loads(schema_object) validate(instance=arg_default, schema=schema_object) - pipeline_meta.inputs.append(ParameterMeta(name=arg, param_type=arg_type, default=arg_default)) + inputs.append(InputSpec(name=arg, type=arg_type, default=arg_default)) #TODO: add descriptions to the metadata #docstring parser: # https://github.com/rr-/docstring_parser # https://github.com/terrencepreilly/darglint/blob/master/darglint/parse.py + + # Construct the ComponentSpec + pipeline_meta = ComponentSpec( + name=getattr(func, '_pipeline_name', func.__name__), + description=getattr(func, '_pipeline_description', func.__doc__), + inputs=inputs if inputs else None, + ) return pipeline_meta diff --git a/sdk/python/kfp/dsl/_pipeline.py b/sdk/python/kfp/dsl/_pipeline.py index 12216c2a8d..32a9d0cb1f 100644 --- a/sdk/python/kfp/dsl/_pipeline.py +++ b/sdk/python/kfp/dsl/_pipeline.py @@ -243,8 +243,6 @@ class Pipeline(): Args: metadata (ComponentMeta): component metadata ''' - if not isinstance(metadata, PipelineMeta): # noqa: F821 TODO - raise ValueError('_set_medata is expecting PipelineMeta.') self._metadata = metadata diff --git a/sdk/python/tests/dsl/component_tests.py b/sdk/python/tests/dsl/component_tests.py index 65cdba0545..3b2ee009c3 100644 --- a/sdk/python/tests/dsl/component_tests.py +++ b/sdk/python/tests/dsl/component_tests.py @@ -15,9 +15,9 @@ import kfp import kfp.dsl as dsl from kfp.dsl import component, graph_component -from kfp.dsl._metadata import ComponentMeta, ParameterMeta from kfp.dsl.types import Integer, GCSPath, InconsistentTypeException from kfp.dsl import ContainerOp, Pipeline, PipelineParam +from kfp.components._structures import ComponentSpec, InputSpec, OutputSpec import unittest class TestPythonComponent(unittest.TestCase): @@ -35,11 +35,11 @@ class TestPythonComponent(unittest.TestCase): containerOp = componentA(1,2,c=3) - golden_meta = ComponentMeta(name='componentA') - golden_meta.inputs.append(ParameterMeta(name='a', param_type={'ArtifactA': {'file_type': 'csv'}})) - golden_meta.inputs.append(ParameterMeta(name='b', param_type={'Integer': {'openapi_schema_validator': {"type": "integer"}}}, default=12)) - golden_meta.inputs.append(ParameterMeta(name='c', param_type={'ArtifactB': {'path_type':'file', 'file_type': 'tsv'}}, default='gs://hello/world')) - golden_meta.outputs.append(ParameterMeta(name='model', param_type={'Integer': {'openapi_schema_validator': {"type": "integer"}}})) + golden_meta = ComponentSpec(name='componentA', inputs=[], outputs=[]) + golden_meta.inputs.append(InputSpec(name='a', type={'ArtifactA': {'file_type': 'csv'}})) + golden_meta.inputs.append(InputSpec(name='b', type={'Integer': {'openapi_schema_validator': {"type": "integer"}}}, default=12)) + golden_meta.inputs.append(InputSpec(name='c', type={'ArtifactB': {'path_type':'file', 'file_type': 'tsv'}}, default='gs://hello/world')) + golden_meta.outputs.append(OutputSpec(name='model', type={'Integer': {'openapi_schema_validator': {"type": "integer"}}})) self.assertEqual(containerOp._metadata, golden_meta) diff --git a/sdk/python/tests/dsl/metadata_tests.py b/sdk/python/tests/dsl/metadata_tests.py index 99bf74f687..ddf66dd428 100644 --- a/sdk/python/tests/dsl/metadata_tests.py +++ b/sdk/python/tests/dsl/metadata_tests.py @@ -12,43 +12,42 @@ # See the License for the specific language governing permissions and # limitations under the License. -from kfp.dsl._metadata import ComponentMeta, ParameterMeta +from kfp.components._structures import ComponentSpec, InputSpec, OutputSpec import unittest class TestComponentMeta(unittest.TestCase): def test_to_dict(self): - component_meta = ComponentMeta(name='foobar', + component_meta = ComponentSpec(name='foobar', description='foobar example', - inputs=[ParameterMeta(name='input1', + inputs=[InputSpec(name='input1', description='input1 desc', - param_type={'GCSPath': { + type={'GCSPath': { 'bucket_type': 'directory', 'file_type': 'csv' }}, default='default1' ), - ParameterMeta(name='input2', + InputSpec(name='input2', description='input2 desc', - param_type={'TFModel': { + type={'TFModel': { 'input_data': 'tensor', 'version': '1.8.0' }}, default='default2' ), - ParameterMeta(name='input3', + InputSpec(name='input3', description='input3 desc', - param_type='Integer', + type='Integer', default='default3' ), ], - outputs=[ParameterMeta(name='output1', + outputs=[OutputSpec(name='output1', description='output1 desc', - param_type={'Schema': { + type={'Schema': { 'file_type': 'tsv' }}, - default='default_output1' ) ] ) @@ -94,7 +93,6 @@ class TestComponentMeta(unittest.TestCase): 'file_type': 'tsv' } }, - 'default': 'default_output1' } ] } diff --git a/sdk/python/tests/dsl/pipeline_tests.py b/sdk/python/tests/dsl/pipeline_tests.py index 3ffdea12e9..0f9ff08c81 100644 --- a/sdk/python/tests/dsl/pipeline_tests.py +++ b/sdk/python/tests/dsl/pipeline_tests.py @@ -14,8 +14,9 @@ import kfp from kfp.dsl import Pipeline, PipelineParam, ContainerOp, pipeline -from kfp.dsl._metadata import PipelineMeta, ParameterMeta, _extract_pipeline_metadata +from kfp.dsl._metadata import _extract_pipeline_metadata from kfp.dsl.types import GCSPath, Integer +from kfp.components._structures import ComponentSpec, InputSpec import unittest @@ -69,9 +70,9 @@ class TestPipeline(unittest.TestCase): def my_pipeline1(a: {'Schema': {'file_type': 'csv'}}='good', b: Integer()=12): pass - golden_meta = PipelineMeta(name='p1', description='description1') - golden_meta.inputs.append(ParameterMeta(name='a', param_type={'Schema': {'file_type': 'csv'}}, default='good')) - golden_meta.inputs.append(ParameterMeta(name='b', param_type={'Integer': {'openapi_schema_validator': {"type": "integer"}}}, default=12)) + golden_meta = ComponentSpec(name='p1', description='description1', inputs=[]) + golden_meta.inputs.append(InputSpec(name='a', type={'Schema': {'file_type': 'csv'}}, default='good')) + golden_meta.inputs.append(InputSpec(name='b', type={'Integer': {'openapi_schema_validator': {"type": "integer"}}}, default=12)) pipeline_meta = _extract_pipeline_metadata(my_pipeline1) self.assertEqual(pipeline_meta, golden_meta) \ No newline at end of file