SDK - Refactoring - Replaced the *Meta classes with the *Spec classes (#1944)

* SDK - Refactoring - Replaced the ParameterMeta class with InputSpec and OutputSpec

* SDK - Refactoring - Replaced the internal PipelineMeta class with ComponentSpec

* SDK - Refactoring - Replaced the internal ComponentMeta class with ComponentSpec

* SDK - Refactoring - Replaced the *Meta classes with the *Spec classes

Replaced the ComponentMeta class with ComponentSpec
Replaced the PipelineMeta class with ComponentSpec
Replaced the ParameterMeta class with InputSpec and OutputSpec

* Removed empty fields
This commit is contained in:
Alexey Volkov 2019-09-16 18:41:12 -07:00 committed by GitHub
parent 2ca7d0ac31
commit 0e2bf15dbc
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
9 changed files with 61 additions and 158 deletions

View File

@ -26,7 +26,8 @@ from ._k8s_helper import K8sHelper
from ._op_to_template import _op_to_template
from ._default_transformers import add_pod_env
from ..dsl._metadata import ParameterMeta, _extract_pipeline_metadata
from ..components._structures import InputSpec
from ..dsl._metadata import _extract_pipeline_metadata
from ..dsl._ops_group import OpsGroup
@ -764,14 +765,15 @@ class Compiler(object):
if params_list and pipeline_meta.inputs:
raise ValueError('Either specify pipeline params in the pipeline function, or in "params_list", but not both.')
args_list = []
if pipeline_meta.inputs:
input_types = {
input.name : input.param_type for input in pipeline_meta.inputs }
for arg_name in argspec.args:
arg_type = input_types.get(arg_name, None)
args_list.append(dsl.PipelineParam(K8sHelper.sanitize_k8s_name(arg_name), param_type=arg_type))
args_list = []
for arg_name in argspec.args:
arg_type = None
for input in pipeline_meta.inputs or []:
if arg_name == input.name:
arg_type = input.type
break
args_list.append(dsl.PipelineParam(K8sHelper.sanitize_k8s_name(arg_name), param_type=arg_type))
with dsl.Pipeline(pipeline_name) as dsl_pipeline:
pipeline_func(*args_list)
@ -780,23 +782,23 @@ class Compiler(object):
self._sanitize_and_inject_artifact(dsl_pipeline)
# Fill in the default values.
args_list_with_defaults = []
if pipeline_meta.inputs:
args_list_with_defaults = [dsl.PipelineParam(K8sHelper.sanitize_k8s_name(arg_name))
for arg_name in argspec.args]
if argspec.defaults:
for arg, default in zip(reversed(args_list_with_defaults), reversed(argspec.defaults)):
arg.value = default.value if isinstance(default, dsl.PipelineParam) else default
else:
elif params_list:
# Or, if args are provided by params_list, fill in pipeline_meta.
for param in params_list:
param.value = default_param_values[param.name]
args_list_with_defaults = params_list
pipeline_meta.inputs = [
ParameterMeta(
InputSpec(
name=param.name,
description='',
param_type=param.param_type,
type=param.param_type,
default=param.value) for param in params_list]
op_transformers = [add_pod_env]

View File

@ -12,11 +12,11 @@
# See the License for the specific language governing permissions and
# limitations under the License.
import copy
from collections import OrderedDict
from typing import Mapping
from ._structures import ContainerImplementation, ConcatPlaceholder, IfPlaceholder, InputValuePlaceholder, InputPathPlaceholder, IsPresentPlaceholder, OutputPathPlaceholder, TaskSpec
from ._components import _generate_input_file_name, _generate_output_file_name, _default_component_name
from kfp.dsl._metadata import ComponentMeta, ParameterMeta
def create_container_op_from_task(task_spec: TaskSpec):
argument_values = task_spec.arguments
@ -145,16 +145,6 @@ def _create_container_op_from_resolved_task(name:str, container_image:str, comma
output_name_to_kubernetes = generate_unique_name_conversion_table(output_names, _sanitize_python_function_name)
output_paths_for_container_op = {output_name_to_kubernetes[name]: path for name, path in output_paths.items()}
# Construct the ComponentMeta
component_meta = ComponentMeta(name=component_spec.name, description=component_spec.description)
# Inputs
if component_spec.inputs is not None:
for input in component_spec.inputs:
component_meta.inputs.append(ParameterMeta(name=input.name, description=input.description, param_type=input.type, default=input.default))
if component_spec.outputs is not None:
for output in component_spec.outputs:
component_meta.outputs.append(ParameterMeta(name=output.name, description=output.description, param_type=output.type))
task = dsl.ContainerOp(
name=name,
image=container_image,
@ -164,6 +154,8 @@ def _create_container_op_from_resolved_task(name:str, container_image:str, comma
artifact_argument_paths=[dsl.InputArgumentPath(argument=artifact_arguments[input_name], input=input_name, path=path) for input_name, path in input_paths.items()],
)
component_meta = copy.copy(component_spec)
component_meta.implementation = None
task._set_metadata(component_meta)
if env:

View File

@ -71,18 +71,18 @@ def component(func):
if kfp.TYPE_CHECK:
arg_index = 0
for arg in args:
if isinstance(arg, PipelineParam) and not check_types(arg.param_type, component_meta.inputs[arg_index].param_type):
if isinstance(arg, PipelineParam) and not check_types(arg.param_type, component_meta.inputs[arg_index].type):
raise InconsistentTypeException('Component "' + component_meta.name + '" is expecting ' + component_meta.inputs[arg_index].name +
' to be type(' + str(component_meta.inputs[arg_index].param_type) +
' to be type(' + str(component_meta.inputs[arg_index].type) +
'), but the passed argument is type(' + str(arg.param_type) + ')')
arg_index += 1
if kargs is not None:
for key in kargs:
if isinstance(kargs[key], PipelineParam):
for input_spec in component_meta.inputs:
if input_spec.name == key and not check_types(kargs[key].param_type, input_spec.param_type):
if input_spec.name == key and not check_types(kargs[key].param_type, input_spec.type):
raise InconsistentTypeException('Component "' + component_meta.name + '" is expecting ' + input_spec.name +
' to be type(' + str(input_spec.param_type) +
' to be type(' + str(input_spec.type) +
'), but the passed argument is type(' + str(kargs[key].param_type) + ')')
container_op = func(*args, **kargs)

View File

@ -25,7 +25,7 @@ from kubernetes.client.models import (
)
from . import _pipeline_param
from ._metadata import ComponentMeta
from ..components._structures import ComponentSpec
# generics
T = TypeVar('T')
@ -1148,10 +1148,10 @@ class ContainerOp(BaseOp):
'''_set_metadata passes the containerop the metadata information
and configures the right output
Args:
metadata (ComponentMeta): component metadata
metadata (ComponentSpec): component metadata
'''
if not isinstance(metadata, ComponentMeta):
raise ValueError('_set_medata is expecting ComponentMeta.')
if not isinstance(metadata, ComponentSpec):
raise ValueError('_set_metadata is expecting ComponentSpec.')
self._metadata = metadata
@ -1160,7 +1160,7 @@ class ContainerOp(BaseOp):
output_type = self.outputs[output].param_type
for output_meta in self._metadata.outputs:
if output_meta.name == output:
output_type = output_meta.param_type
output_type = output_meta.type
self.outputs[output].param_type = output_type
self.output = None

View File

@ -12,102 +12,11 @@
# See the License for the specific language governing permissions and
# limitations under the License.
from typing import Dict, List
from abc import ABCMeta, abstractmethod
import warnings
from .types import BaseType, _check_valid_type_dict, _instance_to_dict
from ..components._structures import ComponentSpec, InputSpec, OutputSpec
class BaseMeta(object):
__metaclass__ = ABCMeta
def __init__(self):
pass
@abstractmethod
def to_dict(self):
pass
def serialize(self):
import yaml
return yaml.dump(self.to_dict())
def __eq__(self, other):
return self.__dict__ == other.__dict__
class ParameterMeta(BaseMeta):
def __init__(self,
name: str,
description: str = '',
param_type = None,
default = None):
self.name = name
self.description = description
self.param_type = param_type
self.default = default
def to_dict(self):
result = {}
if self.name:
result['name'] = self.name
if self.description:
result['description'] = self.description
if self.param_type:
result['type'] = self.param_type
if self.default:
result['default'] = self.default
return result
class ComponentMeta(BaseMeta):
def __init__(
self,
name: str,
description: str = '',
inputs: List[ParameterMeta] = None,
outputs: List[ParameterMeta] = None
):
self.name = name
self.description = description
self.inputs = [] if inputs is None else inputs
self.outputs = [] if outputs is None else outputs
def to_dict(self):
result = {}
if self.name:
result['name'] = self.name
if self.description:
result['description'] = self.description
if self.inputs:
result['inputs'] = [input.to_dict() for input in self.inputs]
if self.outputs:
result['outputs'] = [output.to_dict() for output in self.outputs]
return result
# Add a pipeline level metadata calss here.
# If one day we combine the component and pipeline yaml, ComponentMeta and PipelineMeta will become one, too.
class PipelineMeta(BaseMeta):
def __init__(
self,
name: str,
description: str = '',
inputs: List[ParameterMeta] = None
):
self.name = name
self.description = description
self.inputs = [] if inputs is None else inputs
def to_dict(self):
result = {}
if self.name:
result['name'] = self.name
if self.description:
result['description'] = self.description
if self.inputs:
result['inputs'] = [input.to_dict() for input in self.inputs]
return result
def _annotation_to_typemeta(annotation):
'''_annotation_to_type_meta converts an annotation to a type structure
Args:
@ -158,21 +67,21 @@ def _extract_component_metadata(func):
arg_default = arg_default.value
if arg in annotations:
arg_type = _annotation_to_typemeta(annotations[arg])
inputs.append(ParameterMeta(name=arg, param_type=arg_type, default=arg_default))
inputs.append(InputSpec(name=arg, type=arg_type, default=arg_default))
# Outputs
outputs = []
if 'return' in annotations:
for output in annotations['return']:
arg_type = _annotation_to_typemeta(annotations['return'][output])
outputs.append(ParameterMeta(name=output, param_type=arg_type))
outputs.append(OutputSpec(name=output, type=arg_type))
#TODO: add descriptions to the metadata
#docstring parser:
# https://github.com/rr-/docstring_parser
# https://github.com/terrencepreilly/darglint/blob/master/darglint/parse.py
# Construct the ComponentMeta
return ComponentMeta(
# Construct the ComponentSpec
return ComponentSpec(
name=func.__name__,
inputs=inputs if inputs else None,
outputs=outputs if outputs else None,
@ -197,12 +106,8 @@ def _extract_pipeline_metadata(func):
for arg, default in zip(reversed(fullargspec.args), reversed(fullargspec.defaults)):
arg_defaults[arg] = default
# Construct the PipelineMeta
pipeline_meta = PipelineMeta(
name=getattr(func, '_pipeline_name', func.__name__),
description=getattr(func, '_pipeline_description', func.__doc__)
)
# Inputs
inputs = []
for arg in args:
arg_type = None
arg_default = arg_defaults[arg] if arg in arg_defaults else None
@ -220,10 +125,17 @@ def _extract_pipeline_metadata(func):
# In case the property value for the schema validator is a string instead of a dict.
schema_object = json.loads(schema_object)
validate(instance=arg_default, schema=schema_object)
pipeline_meta.inputs.append(ParameterMeta(name=arg, param_type=arg_type, default=arg_default))
inputs.append(InputSpec(name=arg, type=arg_type, default=arg_default))
#TODO: add descriptions to the metadata
#docstring parser:
# https://github.com/rr-/docstring_parser
# https://github.com/terrencepreilly/darglint/blob/master/darglint/parse.py
# Construct the ComponentSpec
pipeline_meta = ComponentSpec(
name=getattr(func, '_pipeline_name', func.__name__),
description=getattr(func, '_pipeline_description', func.__doc__),
inputs=inputs if inputs else None,
)
return pipeline_meta

View File

@ -243,8 +243,6 @@ class Pipeline():
Args:
metadata (ComponentMeta): component metadata
'''
if not isinstance(metadata, PipelineMeta): # noqa: F821 TODO
raise ValueError('_set_medata is expecting PipelineMeta.')
self._metadata = metadata

View File

@ -15,9 +15,9 @@
import kfp
import kfp.dsl as dsl
from kfp.dsl import component, graph_component
from kfp.dsl._metadata import ComponentMeta, ParameterMeta
from kfp.dsl.types import Integer, GCSPath, InconsistentTypeException
from kfp.dsl import ContainerOp, Pipeline, PipelineParam
from kfp.components._structures import ComponentSpec, InputSpec, OutputSpec
import unittest
class TestPythonComponent(unittest.TestCase):
@ -35,11 +35,11 @@ class TestPythonComponent(unittest.TestCase):
containerOp = componentA(1,2,c=3)
golden_meta = ComponentMeta(name='componentA')
golden_meta.inputs.append(ParameterMeta(name='a', param_type={'ArtifactA': {'file_type': 'csv'}}))
golden_meta.inputs.append(ParameterMeta(name='b', param_type={'Integer': {'openapi_schema_validator': {"type": "integer"}}}, default=12))
golden_meta.inputs.append(ParameterMeta(name='c', param_type={'ArtifactB': {'path_type':'file', 'file_type': 'tsv'}}, default='gs://hello/world'))
golden_meta.outputs.append(ParameterMeta(name='model', param_type={'Integer': {'openapi_schema_validator': {"type": "integer"}}}))
golden_meta = ComponentSpec(name='componentA', inputs=[], outputs=[])
golden_meta.inputs.append(InputSpec(name='a', type={'ArtifactA': {'file_type': 'csv'}}))
golden_meta.inputs.append(InputSpec(name='b', type={'Integer': {'openapi_schema_validator': {"type": "integer"}}}, default=12))
golden_meta.inputs.append(InputSpec(name='c', type={'ArtifactB': {'path_type':'file', 'file_type': 'tsv'}}, default='gs://hello/world'))
golden_meta.outputs.append(OutputSpec(name='model', type={'Integer': {'openapi_schema_validator': {"type": "integer"}}}))
self.assertEqual(containerOp._metadata, golden_meta)

View File

@ -12,43 +12,42 @@
# See the License for the specific language governing permissions and
# limitations under the License.
from kfp.dsl._metadata import ComponentMeta, ParameterMeta
from kfp.components._structures import ComponentSpec, InputSpec, OutputSpec
import unittest
class TestComponentMeta(unittest.TestCase):
def test_to_dict(self):
component_meta = ComponentMeta(name='foobar',
component_meta = ComponentSpec(name='foobar',
description='foobar example',
inputs=[ParameterMeta(name='input1',
inputs=[InputSpec(name='input1',
description='input1 desc',
param_type={'GCSPath': {
type={'GCSPath': {
'bucket_type': 'directory',
'file_type': 'csv'
}},
default='default1'
),
ParameterMeta(name='input2',
InputSpec(name='input2',
description='input2 desc',
param_type={'TFModel': {
type={'TFModel': {
'input_data': 'tensor',
'version': '1.8.0'
}},
default='default2'
),
ParameterMeta(name='input3',
InputSpec(name='input3',
description='input3 desc',
param_type='Integer',
type='Integer',
default='default3'
),
],
outputs=[ParameterMeta(name='output1',
outputs=[OutputSpec(name='output1',
description='output1 desc',
param_type={'Schema': {
type={'Schema': {
'file_type': 'tsv'
}},
default='default_output1'
)
]
)
@ -94,7 +93,6 @@ class TestComponentMeta(unittest.TestCase):
'file_type': 'tsv'
}
},
'default': 'default_output1'
}
]
}

View File

@ -14,8 +14,9 @@
import kfp
from kfp.dsl import Pipeline, PipelineParam, ContainerOp, pipeline
from kfp.dsl._metadata import PipelineMeta, ParameterMeta, _extract_pipeline_metadata
from kfp.dsl._metadata import _extract_pipeline_metadata
from kfp.dsl.types import GCSPath, Integer
from kfp.components._structures import ComponentSpec, InputSpec
import unittest
@ -69,9 +70,9 @@ class TestPipeline(unittest.TestCase):
def my_pipeline1(a: {'Schema': {'file_type': 'csv'}}='good', b: Integer()=12):
pass
golden_meta = PipelineMeta(name='p1', description='description1')
golden_meta.inputs.append(ParameterMeta(name='a', param_type={'Schema': {'file_type': 'csv'}}, default='good'))
golden_meta.inputs.append(ParameterMeta(name='b', param_type={'Integer': {'openapi_schema_validator': {"type": "integer"}}}, default=12))
golden_meta = ComponentSpec(name='p1', description='description1', inputs=[])
golden_meta.inputs.append(InputSpec(name='a', type={'Schema': {'file_type': 'csv'}}, default='good'))
golden_meta.inputs.append(InputSpec(name='b', type={'Integer': {'openapi_schema_validator': {"type": "integer"}}}, default=12))
pipeline_meta = _extract_pipeline_metadata(my_pipeline1)
self.assertEqual(pipeline_meta, golden_meta)